commit
f8752f9879
|
@ -1007,4 +1007,4 @@ curl http://127.0.0.1:2379/v2/stats/store
|
|||
|
||||
See the [other etcd APIs][other-apis] for details on the cluster management.
|
||||
|
||||
[other-apis]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/other_apis.md
|
||||
[other-apis]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md
|
||||
|
|
|
@ -167,8 +167,8 @@ type EtcdServer struct {
|
|||
// It must ensure that, after upgrading, the most recent version is present.
|
||||
func UpgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
|
||||
if ver == wal.WALv0_4 {
|
||||
log.Print("Converting v0.4 log to v0.5")
|
||||
err := migrate.Migrate4To5(cfg.DataDir, cfg.Name)
|
||||
log.Print("Converting v0.4 log to v2.0")
|
||||
err := migrate.Migrate4To2(cfg.DataDir, cfg.Name)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed migrating data-dir: %v", err)
|
||||
return err
|
||||
|
|
|
@ -16,7 +16,7 @@ func main() {
|
|||
log.Fatal("Must provide -data-dir flag")
|
||||
}
|
||||
|
||||
err := migrate.Migrate4To5(*from, *name)
|
||||
err := migrate.Migrate4To2(*from, *name)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed migrating data-dir: %v", err)
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ type Config4 struct {
|
|||
} `json:"peers"`
|
||||
}
|
||||
|
||||
func (c *Config4) HardState5() raftpb.HardState {
|
||||
func (c *Config4) HardState2() raftpb.HardState {
|
||||
return raftpb.HardState{
|
||||
Commit: c.CommitIndex,
|
||||
Term: 0,
|
||||
|
|
|
@ -26,19 +26,19 @@ func cfgFile4(dataDir string) string {
|
|||
return path.Join(dataDir, "conf")
|
||||
}
|
||||
|
||||
func snapDir5(dataDir string) string {
|
||||
func snapDir2(dataDir string) string {
|
||||
return path.Join(dataDir, "snap")
|
||||
}
|
||||
|
||||
func walDir5(dataDir string) string {
|
||||
func walDir2(dataDir string) string {
|
||||
return path.Join(dataDir, "wal")
|
||||
}
|
||||
|
||||
func Migrate4To5(dataDir string, name string) error {
|
||||
func Migrate4To2(dataDir string, name string) error {
|
||||
// prep new directories
|
||||
sd5 := snapDir5(dataDir)
|
||||
if err := os.MkdirAll(sd5, 0700); err != nil {
|
||||
return fmt.Errorf("failed creating snapshot directory %s: %v", sd5, err)
|
||||
sd2 := snapDir2(dataDir)
|
||||
if err := os.MkdirAll(sd2, 0700); err != nil {
|
||||
return fmt.Errorf("failed creating snapshot directory %s: %v", sd2, err)
|
||||
}
|
||||
|
||||
// read v0.4 data
|
||||
|
@ -65,50 +65,50 @@ func Migrate4To5(dataDir string, name string) error {
|
|||
}
|
||||
|
||||
metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: nodeID, ClusterID: 0x04add5})
|
||||
wd5 := walDir5(dataDir)
|
||||
w, err := wal.Create(wd5, metadata)
|
||||
wd2 := walDir2(dataDir)
|
||||
w, err := wal.Create(wd2, metadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed initializing wal at %s: %v", wd5, err)
|
||||
return fmt.Errorf("failed initializing wal at %s: %v", wd2, err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
// transform v0.4 data
|
||||
var snap5 *raftpb.Snapshot
|
||||
var snap2 *raftpb.Snapshot
|
||||
if snap4 == nil {
|
||||
log.Printf("No snapshot found")
|
||||
} else {
|
||||
log.Printf("Found snapshot: lastIndex=%d", snap4.LastIndex)
|
||||
|
||||
snap5 = snap4.Snapshot5()
|
||||
snap2 = snap4.Snapshot2()
|
||||
}
|
||||
|
||||
st5 := cfg4.HardState5()
|
||||
st2 := cfg4.HardState2()
|
||||
|
||||
// If we've got the most recent snapshot, we can use it's committed index. Still likely less than the current actual index, but worth it for the replay.
|
||||
if snap5 != nil {
|
||||
st5.Commit = snap5.Metadata.Index
|
||||
if snap2 != nil {
|
||||
st2.Commit = snap2.Metadata.Index
|
||||
}
|
||||
|
||||
ents5, err := Entries4To5(ents4)
|
||||
ents2, err := Entries4To2(ents4)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ents5Len := len(ents5)
|
||||
log.Printf("Found %d log entries: firstIndex=%d lastIndex=%d", ents5Len, ents5[0].Index, ents5[ents5Len-1].Index)
|
||||
ents2Len := len(ents2)
|
||||
log.Printf("Found %d log entries: firstIndex=%d lastIndex=%d", ents2Len, ents2[0].Index, ents2[ents2Len-1].Index)
|
||||
|
||||
// explicitly prepend an empty entry as the WAL code expects it
|
||||
ents5 = append(make([]raftpb.Entry, 1), ents5...)
|
||||
ents2 = append(make([]raftpb.Entry, 1), ents2...)
|
||||
|
||||
if err = w.Save(st5, ents5); err != nil {
|
||||
if err = w.Save(st2, ents2); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("Log migration successful")
|
||||
|
||||
// migrate snapshot (if necessary) and logs
|
||||
if snap5 != nil {
|
||||
ss := snap.New(sd5)
|
||||
if err := ss.SaveSnap(*snap5); err != nil {
|
||||
if snap2 != nil {
|
||||
ss := snap.New(sd2)
|
||||
if err := ss.SaveSnap(*snap2); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("Snapshot migration successful")
|
||||
|
|
116
migrate/log.go
116
migrate/log.go
|
@ -35,7 +35,7 @@ func (l Log4) NodeIDs() map[string]uint64 {
|
|||
if e.GetCommandName() == "etcd:join" {
|
||||
cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
|
||||
if err != nil {
|
||||
log.Println("error converting an etcd:join to v0.5 format. Likely corrupt!")
|
||||
log.Println("error converting an etcd:join to v2.0 format. Likely corrupt!")
|
||||
return nil
|
||||
}
|
||||
join := cmd4.(*JoinCommand)
|
||||
|
@ -123,8 +123,8 @@ func hashName(name string) uint64 {
|
|||
}
|
||||
|
||||
type Command4 interface {
|
||||
Type5() raftpb.EntryType
|
||||
Data5() ([]byte, error)
|
||||
Type2() raftpb.EntryType
|
||||
Data2() ([]byte, error)
|
||||
}
|
||||
|
||||
func NewCommand4(name string, data []byte, raftMap map[string]uint64) (Command4, error) {
|
||||
|
@ -196,17 +196,17 @@ type RemoveCommand struct {
|
|||
id uint64
|
||||
}
|
||||
|
||||
func (c *RemoveCommand) Type5() raftpb.EntryType {
|
||||
func (c *RemoveCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryConfChange
|
||||
}
|
||||
|
||||
func (c *RemoveCommand) Data5() ([]byte, error) {
|
||||
req5 := raftpb.ConfChange{
|
||||
func (c *RemoveCommand) Data2() ([]byte, error) {
|
||||
req2 := raftpb.ConfChange{
|
||||
ID: 0,
|
||||
Type: raftpb.ConfChangeRemoveNode,
|
||||
NodeID: c.id,
|
||||
}
|
||||
return req5.Marshal()
|
||||
return req2.Marshal()
|
||||
}
|
||||
|
||||
type JoinCommand struct {
|
||||
|
@ -216,23 +216,23 @@ type JoinCommand struct {
|
|||
memb member
|
||||
}
|
||||
|
||||
func (c *JoinCommand) Type5() raftpb.EntryType {
|
||||
func (c *JoinCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryConfChange
|
||||
}
|
||||
|
||||
func (c *JoinCommand) Data5() ([]byte, error) {
|
||||
func (c *JoinCommand) Data2() ([]byte, error) {
|
||||
b, err := json.Marshal(c.memb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req5 := &raftpb.ConfChange{
|
||||
req2 := &raftpb.ConfChange{
|
||||
ID: 0,
|
||||
Type: raftpb.ConfChangeAddNode,
|
||||
NodeID: uint64(c.memb.ID),
|
||||
Context: b,
|
||||
}
|
||||
return req5.Marshal()
|
||||
return req2.Marshal()
|
||||
}
|
||||
|
||||
type SetClusterConfigCommand struct {
|
||||
|
@ -243,24 +243,24 @@ type SetClusterConfigCommand struct {
|
|||
} `json:"config"`
|
||||
}
|
||||
|
||||
func (c *SetClusterConfigCommand) Type5() raftpb.EntryType {
|
||||
func (c *SetClusterConfigCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryNormal
|
||||
}
|
||||
|
||||
func (c *SetClusterConfigCommand) Data5() ([]byte, error) {
|
||||
func (c *SetClusterConfigCommand) Data2() ([]byte, error) {
|
||||
b, err := json.Marshal(c.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req5 := &etcdserverpb.Request{
|
||||
req2 := &etcdserverpb.Request{
|
||||
Method: "PUT",
|
||||
Path: "/v2/admin/config",
|
||||
Dir: false,
|
||||
Val: string(b),
|
||||
}
|
||||
|
||||
return req5.Marshal()
|
||||
return req2.Marshal()
|
||||
}
|
||||
|
||||
type CompareAndDeleteCommand struct {
|
||||
|
@ -269,18 +269,18 @@ type CompareAndDeleteCommand struct {
|
|||
PrevIndex uint64 `json:"prevIndex"`
|
||||
}
|
||||
|
||||
func (c *CompareAndDeleteCommand) Type5() raftpb.EntryType {
|
||||
func (c *CompareAndDeleteCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryNormal
|
||||
}
|
||||
|
||||
func (c *CompareAndDeleteCommand) Data5() ([]byte, error) {
|
||||
req5 := &etcdserverpb.Request{
|
||||
func (c *CompareAndDeleteCommand) Data2() ([]byte, error) {
|
||||
req2 := &etcdserverpb.Request{
|
||||
Method: "DELETE",
|
||||
Path: StorePath(c.Key),
|
||||
PrevValue: c.PrevValue,
|
||||
PrevIndex: c.PrevIndex,
|
||||
}
|
||||
return req5.Marshal()
|
||||
return req2.Marshal()
|
||||
}
|
||||
|
||||
type CompareAndSwapCommand struct {
|
||||
|
@ -291,12 +291,12 @@ type CompareAndSwapCommand struct {
|
|||
PrevIndex uint64 `json:"prevIndex"`
|
||||
}
|
||||
|
||||
func (c *CompareAndSwapCommand) Type5() raftpb.EntryType {
|
||||
func (c *CompareAndSwapCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryNormal
|
||||
}
|
||||
|
||||
func (c *CompareAndSwapCommand) Data5() ([]byte, error) {
|
||||
req5 := &etcdserverpb.Request{
|
||||
func (c *CompareAndSwapCommand) Data2() ([]byte, error) {
|
||||
req2 := &etcdserverpb.Request{
|
||||
Method: "PUT",
|
||||
Path: StorePath(c.Key),
|
||||
Val: c.Value,
|
||||
|
@ -304,7 +304,7 @@ func (c *CompareAndSwapCommand) Data5() ([]byte, error) {
|
|||
PrevIndex: c.PrevIndex,
|
||||
Expiration: UnixTimeOrPermanent(c.ExpireTime),
|
||||
}
|
||||
return req5.Marshal()
|
||||
return req2.Marshal()
|
||||
}
|
||||
|
||||
type CreateCommand struct {
|
||||
|
@ -315,25 +315,25 @@ type CreateCommand struct {
|
|||
Dir bool `json:"dir"`
|
||||
}
|
||||
|
||||
func (c *CreateCommand) Type5() raftpb.EntryType {
|
||||
func (c *CreateCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryNormal
|
||||
}
|
||||
|
||||
func (c *CreateCommand) Data5() ([]byte, error) {
|
||||
req5 := &etcdserverpb.Request{
|
||||
func (c *CreateCommand) Data2() ([]byte, error) {
|
||||
req2 := &etcdserverpb.Request{
|
||||
Path: StorePath(c.Key),
|
||||
Dir: c.Dir,
|
||||
Val: c.Value,
|
||||
Expiration: UnixTimeOrPermanent(c.ExpireTime),
|
||||
}
|
||||
if c.Unique {
|
||||
req5.Method = "POST"
|
||||
req2.Method = "POST"
|
||||
} else {
|
||||
var prevExist = true
|
||||
req5.Method = "PUT"
|
||||
req5.PrevExist = &prevExist
|
||||
req2.Method = "PUT"
|
||||
req2.PrevExist = &prevExist
|
||||
}
|
||||
return req5.Marshal()
|
||||
return req2.Marshal()
|
||||
}
|
||||
|
||||
type DeleteCommand struct {
|
||||
|
@ -342,18 +342,18 @@ type DeleteCommand struct {
|
|||
Dir bool `json:"dir"`
|
||||
}
|
||||
|
||||
func (c *DeleteCommand) Type5() raftpb.EntryType {
|
||||
func (c *DeleteCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryNormal
|
||||
}
|
||||
|
||||
func (c *DeleteCommand) Data5() ([]byte, error) {
|
||||
req5 := &etcdserverpb.Request{
|
||||
func (c *DeleteCommand) Data2() ([]byte, error) {
|
||||
req2 := &etcdserverpb.Request{
|
||||
Method: "DELETE",
|
||||
Path: StorePath(c.Key),
|
||||
Dir: c.Dir,
|
||||
Recursive: c.Recursive,
|
||||
}
|
||||
return req5.Marshal()
|
||||
return req2.Marshal()
|
||||
}
|
||||
|
||||
type SetCommand struct {
|
||||
|
@ -363,19 +363,19 @@ type SetCommand struct {
|
|||
Dir bool `json:"dir"`
|
||||
}
|
||||
|
||||
func (c *SetCommand) Type5() raftpb.EntryType {
|
||||
func (c *SetCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryNormal
|
||||
}
|
||||
|
||||
func (c *SetCommand) Data5() ([]byte, error) {
|
||||
req5 := &etcdserverpb.Request{
|
||||
func (c *SetCommand) Data2() ([]byte, error) {
|
||||
req2 := &etcdserverpb.Request{
|
||||
Method: "PUT",
|
||||
Path: StorePath(c.Key),
|
||||
Dir: c.Dir,
|
||||
Val: c.Value,
|
||||
Expiration: UnixTimeOrPermanent(c.ExpireTime),
|
||||
}
|
||||
return req5.Marshal()
|
||||
return req2.Marshal()
|
||||
}
|
||||
|
||||
type UpdateCommand struct {
|
||||
|
@ -384,36 +384,36 @@ type UpdateCommand struct {
|
|||
ExpireTime time.Time `json:"expireTime"`
|
||||
}
|
||||
|
||||
func (c *UpdateCommand) Type5() raftpb.EntryType {
|
||||
func (c *UpdateCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryNormal
|
||||
}
|
||||
|
||||
func (c *UpdateCommand) Data5() ([]byte, error) {
|
||||
func (c *UpdateCommand) Data2() ([]byte, error) {
|
||||
exist := true
|
||||
req5 := &etcdserverpb.Request{
|
||||
req2 := &etcdserverpb.Request{
|
||||
Method: "PUT",
|
||||
Path: StorePath(c.Key),
|
||||
Val: c.Value,
|
||||
PrevExist: &exist,
|
||||
Expiration: UnixTimeOrPermanent(c.ExpireTime),
|
||||
}
|
||||
return req5.Marshal()
|
||||
return req2.Marshal()
|
||||
}
|
||||
|
||||
type SyncCommand struct {
|
||||
Time time.Time `json:"time"`
|
||||
}
|
||||
|
||||
func (c *SyncCommand) Type5() raftpb.EntryType {
|
||||
func (c *SyncCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryNormal
|
||||
}
|
||||
|
||||
func (c *SyncCommand) Data5() ([]byte, error) {
|
||||
req5 := &etcdserverpb.Request{
|
||||
func (c *SyncCommand) Data2() ([]byte, error) {
|
||||
req2 := &etcdserverpb.Request{
|
||||
Method: "SYNC",
|
||||
Time: c.Time.UnixNano(),
|
||||
}
|
||||
return req5.Marshal()
|
||||
return req2.Marshal()
|
||||
}
|
||||
|
||||
type DefaultJoinCommand struct {
|
||||
|
@ -433,15 +433,15 @@ func (c NOPCommand) CommandName() string {
|
|||
return "raft:nop"
|
||||
}
|
||||
|
||||
func (c *NOPCommand) Type5() raftpb.EntryType {
|
||||
func (c *NOPCommand) Type2() raftpb.EntryType {
|
||||
return raftpb.EntryNormal
|
||||
}
|
||||
|
||||
func (c *NOPCommand) Data5() ([]byte, error) {
|
||||
func (c *NOPCommand) Data2() ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func Entries4To5(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) {
|
||||
func Entries4To2(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) {
|
||||
ents4Len := len(ents4)
|
||||
|
||||
if ents4Len == 0 {
|
||||
|
@ -459,38 +459,38 @@ func Entries4To5(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) {
|
|||
}
|
||||
|
||||
raftMap := make(map[string]uint64)
|
||||
ents5 := make([]raftpb.Entry, 0)
|
||||
ents2 := make([]raftpb.Entry, 0)
|
||||
for i, e := range ents4 {
|
||||
ent, err := toEntry5(e, raftMap)
|
||||
ent, err := toEntry2(e, raftMap)
|
||||
if err != nil {
|
||||
log.Fatalf("Error converting entry %d, %s", i, err)
|
||||
} else {
|
||||
ents5 = append(ents5, *ent)
|
||||
ents2 = append(ents2, *ent)
|
||||
}
|
||||
}
|
||||
|
||||
return ents5, nil
|
||||
return ents2, nil
|
||||
}
|
||||
|
||||
func toEntry5(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry, error) {
|
||||
func toEntry2(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry, error) {
|
||||
cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand(), raftMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err := cmd4.Data5()
|
||||
data, err := cmd4.Data2()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ent5 := raftpb.Entry{
|
||||
ent2 := raftpb.Entry{
|
||||
Term: ent4.GetTerm(),
|
||||
Index: ent4.GetIndex(),
|
||||
Type: cmd4.Type5(),
|
||||
Type: cmd4.Type2(),
|
||||
Data: data,
|
||||
}
|
||||
|
||||
return &ent5, nil
|
||||
return &ent2, nil
|
||||
}
|
||||
|
||||
func generateNodeMember(name, rafturl, etcdurl string) *member {
|
||||
|
|
|
@ -159,7 +159,7 @@ func (s *Snapshot4) GetNodesFromStore() map[string]uint64 {
|
|||
return pullNodesFromEtcd(etcd)
|
||||
}
|
||||
|
||||
func (s *Snapshot4) Snapshot5() *raftpb.Snapshot {
|
||||
func (s *Snapshot4) Snapshot2() *raftpb.Snapshot {
|
||||
st := &sstore{}
|
||||
if err := json.Unmarshal(s.State, st); err != nil {
|
||||
log.Fatal("Couldn't unmarshal snapshot")
|
||||
|
@ -177,7 +177,7 @@ func (s *Snapshot4) Snapshot5() *raftpb.Snapshot {
|
|||
nodeList = append(nodeList, v)
|
||||
}
|
||||
|
||||
snap5 := raftpb.Snapshot{
|
||||
snap2 := raftpb.Snapshot{
|
||||
Data: newState,
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
Index: s.LastIndex,
|
||||
|
@ -188,7 +188,7 @@ func (s *Snapshot4) Snapshot5() *raftpb.Snapshot {
|
|||
},
|
||||
}
|
||||
|
||||
return &snap5
|
||||
return &snap2
|
||||
}
|
||||
|
||||
func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) {
|
||||
|
|
Loading…
Reference in New Issue