From 6f2e7875aaa3765ca42a7f242588ad5963c2a8f3 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 16 May 2016 14:40:52 -0700 Subject: [PATCH] etcdctl: add migrate command Migrate command accepts a datadir and an optional user-provided transformer function that transform v2 keys to v2 keys. Migrate command then builds a v3 backend state based on the existing v2 keys and the output of the transformer function. --- etcdctl/ctlv3/command/migrate_command.go | 350 +++++++++++++++++++++++ etcdctl/ctlv3/ctl.go | 1 + etcdserver/apply_v2.go | 41 ++- etcdserver/server.go | 4 +- etcdserver/server_test.go | 22 +- mvcc/util.go | 57 ++++ 6 files changed, 447 insertions(+), 28 deletions(-) create mode 100644 etcdctl/ctlv3/command/migrate_command.go create mode 100644 mvcc/util.go diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go new file mode 100644 index 000000000..0fd7940e8 --- /dev/null +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -0,0 +1,350 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "path" + "time" + + "github.com/coreos/etcd/client" + "github.com/coreos/etcd/etcdserver" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" + "github.com/coreos/etcd/store" + "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" + "github.com/gogo/protobuf/proto" + "github.com/spf13/cobra" +) + +var ( + migrateDatadir string + migrateWALdir string + migrateTransformer string +) + +// NewMigrateCommand returns the cobra command for "migrate". +func NewMigrateCommand() *cobra.Command { + mc := &cobra.Command{ + Use: "migrate", + Short: "migrate", + Run: migrateCommandFunc, + } + + mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory.") + mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory.") + mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program.") + return mc +} + +func migrateCommandFunc(cmd *cobra.Command, args []string) { + var ( + writer io.WriteCloser + reader io.ReadCloser + errc chan error + ) + if migrateTransformer != "" { + writer, reader, errc = startTransformer() + } else { + fmt.Println("using default transformer") + writer, reader, errc = defaultTransformer() + } + + st := rebuildStoreV2() + be := prepareBackend() + defer be.Close() + + maxIndexc := make(chan uint64, 1) + go func() { + maxIndexc <- writeStore(writer, st) + writer.Close() + }() + + readKeys(reader, be) + mvcc.UpdateConsistentIndex(be, <-maxIndexc) + err := <-errc + if err != nil { + fmt.Println("failed to transform keys") + ExitWithError(ExitError, err) + } + + fmt.Println("finished transforming keys") +} + +func prepareBackend() backend.Backend { + dbpath := path.Join(migrateDatadir, "member", "snap", "db") + be := backend.New(dbpath, time.Second, 10000) + tx := be.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket([]byte("key")) + tx.UnsafeCreateBucket([]byte("meta")) + tx.Unlock() + return be +} + +func rebuildStoreV2() store.Store { + waldir := migrateWALdir + if len(waldir) == 0 { + waldir = path.Join(migrateDatadir, "member", "wal") + } + snapdir := path.Join(migrateDatadir, "member", "snap") + + ss := snap.New(snapdir) + snapshot, err := ss.Load() + if err != nil && err != snap.ErrNoSnapshot { + ExitWithError(ExitError, err) + } + + var walsnap walpb.Snapshot + if snapshot != nil { + walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + } + + w, err := wal.OpenForRead(waldir, walsnap) + if err != nil { + ExitWithError(ExitError, err) + } + defer w.Close() + + _, _, ents, err := w.ReadAll() + if err != nil { + ExitWithError(ExitError, err) + } + + st := store.New() + if snapshot != nil { + err := st.Recovery(snapshot.Data) + if err != nil { + ExitWithError(ExitError, err) + } + } + + applier := etcdserver.NewApplierV2(st, nil) + for _, ent := range ents { + if ent.Type != raftpb.EntryNormal { + continue + } + + var raftReq pb.InternalRaftRequest + if !pbutil.MaybeUnmarshal(&raftReq, ent.Data) { // backward compatible + var r pb.Request + pbutil.MustUnmarshal(&r, ent.Data) + applyRequest(&r, applier) + } else { + if raftReq.V2 != nil { + req := raftReq.V2 + applyRequest(req, applier) + } + } + } + + return st +} + +func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) { + toTTLOptions(r) + switch r.Method { + case "POST": + applyV2.Post(r) + case "PUT": + applyV2.Put(r) + case "DELETE": + applyV2.Delete(r) + case "QGET": + applyV2.QGet(r) + case "SYNC": + applyV2.Sync(r) + default: + panic("unknown command") + } +} + +func toTTLOptions(r *pb.Request) store.TTLOptionSet { + refresh, _ := pbutil.GetBool(r.Refresh) + ttlOptions := store.TTLOptionSet{Refresh: refresh} + if r.Expiration != 0 { + ttlOptions.ExpireTime = time.Unix(0, r.Expiration) + } + return ttlOptions +} + +func writeStore(w io.Writer, st store.Store) uint64 { + all, err := st.Get("/1", true, true) + if err != nil { + ExitWithError(ExitError, err) + } + return writeKeys(w, all.Node) +} + +func writeKeys(w io.Writer, n *store.NodeExtern) uint64 { + maxIndex := n.ModifiedIndex + + nodes := n.Nodes + // remove store v2 bucket prefix + n.Key = n.Key[2:] + if n.Key == "" { + n.Key = "/" + } + if n.Dir { + n.Nodes = nil + } + b, err := json.Marshal(n) + if err != nil { + ExitWithError(ExitError, err) + } + fmt.Fprintf(w, string(b)) + for _, nn := range nodes { + max := writeKeys(w, nn) + if max > maxIndex { + maxIndex = max + } + } + return maxIndex +} + +func readKeys(r io.Reader, be backend.Backend) error { + for { + length64, err := readInt64(r) + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + buf := make([]byte, int(length64)) + if _, err = io.ReadFull(r, buf); err != nil { + return err + } + + var kv mvccpb.KeyValue + err = proto.Unmarshal(buf, &kv) + if err != nil { + return err + } + + mvcc.WriteKV(be, kv) + } +} + +func readInt64(r io.Reader) (int64, error) { + var n int64 + err := binary.Read(r, binary.LittleEndian, &n) + return n, err +} + +func startTransformer() (io.WriteCloser, io.ReadCloser, chan error) { + cmd := exec.Command(migrateTransformer) + cmd.Stderr = os.Stderr + + writer, err := cmd.StdinPipe() + if err != nil { + ExitWithError(ExitError, err) + } + + reader, rerr := cmd.StdoutPipe() + if rerr != nil { + ExitWithError(ExitError, rerr) + } + + if err := cmd.Start(); err != nil { + ExitWithError(ExitError, err) + } + + errc := make(chan error, 1) + + go func() { + errc <- cmd.Wait() + }() + + return writer, reader, errc +} + +func defaultTransformer() (io.WriteCloser, io.ReadCloser, chan error) { + // transformer decodes v2 keys from sr + sr, sw := io.Pipe() + // transformer encodes v3 keys into dw + dr, dw := io.Pipe() + + decoder := json.NewDecoder(sr) + + errc := make(chan error, 1) + + go func() { + defer func() { + sr.Close() + dw.Close() + }() + + for decoder.More() { + node := &client.Node{} + if err := decoder.Decode(node); err != nil { + errc <- err + return + } + + kv := transform(node) + if kv == nil { + continue + } + + data, err := proto.Marshal(kv) + if err != nil { + errc <- err + return + } + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, uint64(len(data))) + if _, err := dw.Write(buf); err != nil { + errc <- err + return + } + if _, err := dw.Write(data); err != nil { + errc <- err + return + } + } + + errc <- nil + }() + + return sw, dr, errc +} + +func transform(n *client.Node) *mvccpb.KeyValue { + const unKnownVersion = 1 + if n.Dir { + return nil + } + kv := &mvccpb.KeyValue{ + Key: []byte(n.Key), + Value: []byte(n.Value), + CreateRevision: int64(n.CreatedIndex), + ModRevision: int64(n.ModifiedIndex), + Version: unKnownVersion, + } + return kv +} diff --git a/etcdctl/ctlv3/ctl.go b/etcdctl/ctlv3/ctl.go index 09da7120d..823ba550f 100644 --- a/etcdctl/ctlv3/ctl.go +++ b/etcdctl/ctlv3/ctl.go @@ -74,6 +74,7 @@ func init() { command.NewMemberCommand(), command.NewSnapshotCommand(), command.NewMakeMirrorCommand(), + command.NewMigrateCommand(), command.NewLockCommand(), command.NewAuthCommand(), command.NewElectCommand(), diff --git a/etcdserver/apply_v2.go b/etcdserver/apply_v2.go index d7490d0f1..703c2e9f9 100644 --- a/etcdserver/apply_v2.go +++ b/etcdserver/apply_v2.go @@ -26,8 +26,8 @@ import ( "github.com/coreos/go-semver/semver" ) -// applierV2 is the interface for processing V2 raft messages -type applierV2 interface { +// ApplierV2 is the interface for processing V2 raft messages +type ApplierV2 interface { Delete(r *pb.Request) Response Post(r *pb.Request) Response Put(r *pb.Request) Response @@ -35,19 +35,26 @@ type applierV2 interface { Sync(r *pb.Request) Response } -type applierV2store struct{ s *EtcdServer } +func NewApplierV2(s store.Store, c *membership.RaftCluster) ApplierV2 { + return &applierV2store{store: s, cluster: c} +} + +type applierV2store struct { + store store.Store + cluster *membership.RaftCluster +} func (a *applierV2store) Delete(r *pb.Request) Response { switch { case r.PrevIndex > 0 || r.PrevValue != "": - return toResponse(a.s.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)) + return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)) default: - return toResponse(a.s.store.Delete(r.Path, r.Dir, r.Recursive)) + return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive)) } } func (a *applierV2store) Post(r *pb.Request) Response { - return toResponse(a.s.store.Create(r.Path, r.Dir, r.Val, true, toTTLOptions(r))) + return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, toTTLOptions(r))) } func (a *applierV2store) Put(r *pb.Request) Response { @@ -57,14 +64,14 @@ func (a *applierV2store) Put(r *pb.Request) Response { case existsSet: if exists { if r.PrevIndex == 0 && r.PrevValue == "" { - return toResponse(a.s.store.Update(r.Path, r.Val, ttlOptions)) + return toResponse(a.store.Update(r.Path, r.Val, ttlOptions)) } else { - return toResponse(a.s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) + return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) } } - return toResponse(a.s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions)) + return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions)) case r.PrevIndex > 0 || r.PrevValue != "": - return toResponse(a.s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) + return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) default: if storeMemberAttributeRegexp.MatchString(r.Path) { id := membership.MustParseMemberIDFromKey(path.Dir(r.Path)) @@ -72,25 +79,29 @@ func (a *applierV2store) Put(r *pb.Request) Response { if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { plog.Panicf("unmarshal %s should never fail: %v", r.Val, err) } - a.s.cluster.UpdateAttributes(id, attr) + if a.cluster != nil { + a.cluster.UpdateAttributes(id, attr) + } // return an empty response since there is no consumer. return Response{} } if r.Path == membership.StoreClusterVersionKey() { - a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) + if a.cluster != nil { + a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) + } // return an empty response since there is no consumer. return Response{} } - return toResponse(a.s.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) + return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) } } func (a *applierV2store) QGet(r *pb.Request) Response { - return toResponse(a.s.store.Get(r.Path, r.Recursive, r.Sorted)) + return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted)) } func (a *applierV2store) Sync(r *pb.Request) Response { - a.s.store.DeleteExpiredKeys(time.Unix(0, r.Time)) + a.store.DeleteExpiredKeys(time.Unix(0, r.Time)) return Response{} } diff --git a/etcdserver/server.go b/etcdserver/server.go index 7dff81697..056b6f643 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -180,7 +180,7 @@ type EtcdServer struct { store store.Store - applyV2 applierV2 + applyV2 ApplierV2 applyV3 applierV3 kv mvcc.ConsistentWatchableKV @@ -391,7 +391,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } - srv.applyV2 = &applierV2store{srv} + srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} srv.be = be srv.lessor = lease.NewLessor(srv.be) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index ff77f19d2..cae030d54 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -179,7 +179,7 @@ func TestApplyRepeat(t *testing.T) { cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - s.applyV2 = &applierV2store{s} + s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster} s.start() req := &pb.Request{Method: "QGET", ID: uint64(1)} ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}} @@ -445,7 +445,7 @@ func TestApplyRequest(t *testing.T) { for i, tt := range tests { st := mockstore.NewRecorder() srv := &EtcdServer{store: st} - srv.applyV2 = &applierV2store{srv} + srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} resp := srv.applyV2Request(&tt.req) if !reflect.DeepEqual(resp, tt.wresp) { @@ -464,7 +464,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { store: mockstore.NewRecorder(), cluster: cl, } - srv.applyV2 = &applierV2store{srv} + srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} req := pb.Request{ Method: "PUT", @@ -639,7 +639,7 @@ func TestDoProposal(t *testing.T) { store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - srv.applyV2 = &applierV2store{srv} + srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} srv.start() resp, err := srv.Do(context.Background(), tt) srv.Stop() @@ -666,7 +666,7 @@ func TestDoProposalCancelled(t *testing.T) { w: wt, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - srv.applyV2 = &applierV2store{srv} + srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -688,7 +688,7 @@ func TestDoProposalTimeout(t *testing.T) { w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), } - srv.applyV2 = &applierV2store{srv} + srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} ctx, _ := context.WithTimeout(context.Background(), 0) _, err := srv.Do(ctx, pb.Request{Method: "PUT"}) @@ -704,7 +704,7 @@ func TestDoProposalStopped(t *testing.T) { w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), } - srv.applyV2 = &applierV2store{srv} + srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} srv.done = make(chan struct{}) close(srv.done) @@ -721,7 +721,7 @@ func TestSync(t *testing.T) { r: raftNode{Node: n}, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - srv.applyV2 = &applierV2store{srv} + srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} // check that sync is non-blocking done := make(chan struct{}) @@ -761,7 +761,7 @@ func TestSyncTimeout(t *testing.T) { r: raftNode{Node: n}, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - srv.applyV2 = &applierV2store{srv} + srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} // check that sync is non-blocking done := make(chan struct{}) @@ -900,7 +900,7 @@ func TestTriggerSnap(t *testing.T) { store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - srv.applyV2 = &applierV2store{srv} + srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex) srv.be = be @@ -968,7 +968,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { cluster: cl, msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } - s.applyV2 = &applierV2store{s} + s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster} be, tmpPath := backend.NewDefaultTmpBackend() defer func() { diff --git a/mvcc/util.go b/mvcc/util.go new file mode 100644 index 000000000..c2d1a3146 --- /dev/null +++ b/mvcc/util.go @@ -0,0 +1,57 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "encoding/binary" + "log" + + "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/mvcc/mvccpb" +) + +func UpdateConsistentIndex(be backend.Backend, index uint64) { + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + + var oldi uint64 + _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) + if len(vs) != 0 { + oldi = binary.BigEndian.Uint64(vs[0]) + } + + if index <= oldi { + return + } + + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, index) + tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) +} + +func WriteKV(be backend.Backend, kv mvccpb.KeyValue) { + ibytes := newRevBytes() + revToBytes(revision{main: kv.ModRevision}, ibytes) + + d, err := kv.Marshal() + if err != nil { + log.Fatalf("mvcc: cannot marshal event: %v", err) + } + + be.BatchTx().Lock() + be.BatchTx().UnsafePut(keyBucketName, ibytes, d) + be.BatchTx().Unlock() +}