etcd/migrate/log.go

520 lines
11 KiB
Go

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path"
"time"
etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
etcd4pb "github.com/coreos/etcd/migrate/etcd4pb"
"github.com/coreos/etcd/pkg/types"
raftpb "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
const etcdDefaultClusterName = "etcd-cluster"
func UnixTimeOrPermanent(expireTime time.Time) int64 {
expire := expireTime.Unix()
if expireTime == store.Permanent {
expire = 0
}
return expire
}
type Log4 []*etcd4pb.LogEntry
func (l Log4) NodeIDs() map[string]uint64 {
out := make(map[string]uint64)
for _, e := range l {
if e.GetCommandName() == "etcd:join" {
cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
if err != nil {
log.Println("error converting an etcd:join to v2.0 format. Likely corrupt!")
return nil
}
join := cmd4.(*JoinCommand)
m := generateNodeMember(join.Name, join.RaftURL, "")
out[join.Name] = uint64(m.ID)
}
if e.GetCommandName() == "etcd:remove" {
cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
if err != nil {
return nil
}
name := cmd4.(*RemoveCommand).Name
delete(out, name)
}
}
return out
}
func StorePath(key string) string {
return path.Join("/1", key)
}
func DecodeLog4FromFile(logpath string) (Log4, error) {
file, err := os.OpenFile(logpath, os.O_RDONLY, 0600)
if err != nil {
return nil, err
}
defer file.Close()
return DecodeLog4(file)
}
func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) {
var readBytes int64
entries := make([]*etcd4pb.LogEntry, 0)
for {
entry, n, err := DecodeNextEntry4(file)
if err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("failed decoding next log entry: %v", err)
}
entries = append(entries, entry)
readBytes += int64(n)
}
return entries, nil
}
// DecodeNextEntry4 unmarshals a v0.4 log entry from a reader. Returns the
// number of bytes read and any error that occurs.
func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) {
var length int
_, err := fmt.Fscanf(r, "%8x\n", &length)
if err != nil {
return nil, -1, err
}
data := make([]byte, length)
if _, err = io.ReadFull(r, data); err != nil {
return nil, -1, err
}
ent4 := new(etcd4pb.LogEntry)
if err = ent4.Unmarshal(data); err != nil {
return nil, -1, err
}
// add width of scanner token to length
length = length + 8 + 1
return ent4, length, nil
}
func hashName(name string) uint64 {
var sum uint64
for _, ch := range name {
sum = 131*sum + uint64(ch)
}
return sum
}
type Command4 interface {
Type2() raftpb.EntryType
Data2() ([]byte, error)
}
func NewCommand4(name string, data []byte, raftMap map[string]uint64) (Command4, error) {
var cmd Command4
switch name {
case "etcd:remove":
cmd = &RemoveCommand{}
case "etcd:join":
cmd = &JoinCommand{}
case "etcd:setClusterConfig":
cmd = &NOPCommand{}
case "etcd:compareAndDelete":
cmd = &CompareAndDeleteCommand{}
case "etcd:compareAndSwap":
cmd = &CompareAndSwapCommand{}
case "etcd:create":
cmd = &CreateCommand{}
case "etcd:delete":
cmd = &DeleteCommand{}
case "etcd:set":
cmd = &SetCommand{}
case "etcd:sync":
cmd = &SyncCommand{}
case "etcd:update":
cmd = &UpdateCommand{}
case "raft:join":
// These are subsumed by etcd:remove and etcd:join; we shouldn't see them.
fallthrough
case "raft:leave":
return nil, fmt.Errorf("found a raft join/leave command; these shouldn't be in an etcd log")
case "raft:nop":
cmd = &NOPCommand{}
default:
return nil, fmt.Errorf("unregistered command type %s", name)
}
// If data for the command was passed in the decode it.
if data != nil {
if err := json.NewDecoder(bytes.NewReader(data)).Decode(cmd); err != nil {
return nil, fmt.Errorf("unable to decode bytes %q: %v", data, err)
}
}
switch name {
case "etcd:join":
c := cmd.(*JoinCommand)
m := generateNodeMember(c.Name, c.RaftURL, c.EtcdURL)
c.memb = *m
if raftMap != nil {
raftMap[c.Name] = uint64(m.ID)
}
case "etcd:remove":
c := cmd.(*RemoveCommand)
if raftMap != nil {
m, ok := raftMap[c.Name]
if !ok {
return nil, fmt.Errorf("removing a node named %s before it joined", c.Name)
}
c.id = m
delete(raftMap, c.Name)
}
}
return cmd, nil
}
type RemoveCommand struct {
Name string `json:"name"`
id uint64
}
func (c *RemoveCommand) Type2() raftpb.EntryType {
return raftpb.EntryConfChange
}
func (c *RemoveCommand) Data2() ([]byte, error) {
req2 := raftpb.ConfChange{
ID: 0,
Type: raftpb.ConfChangeRemoveNode,
NodeID: c.id,
}
return req2.Marshal()
}
type JoinCommand struct {
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
memb member
}
func (c *JoinCommand) Type2() raftpb.EntryType {
return raftpb.EntryConfChange
}
func (c *JoinCommand) Data2() ([]byte, error) {
b, err := json.Marshal(c.memb)
if err != nil {
return nil, err
}
req2 := &raftpb.ConfChange{
ID: 0,
Type: raftpb.ConfChangeAddNode,
NodeID: uint64(c.memb.ID),
Context: b,
}
return req2.Marshal()
}
type SetClusterConfigCommand struct {
Config *struct {
ActiveSize int `json:"activeSize"`
RemoveDelay float64 `json:"removeDelay"`
SyncInterval float64 `json:"syncInterval"`
} `json:"config"`
}
func (c *SetClusterConfigCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *SetClusterConfigCommand) Data2() ([]byte, error) {
b, err := json.Marshal(c.Config)
if err != nil {
return nil, err
}
req2 := &etcdserverpb.Request{
Method: "PUT",
Path: "/v2/admin/config",
Dir: false,
Val: string(b),
}
return req2.Marshal()
}
type CompareAndDeleteCommand struct {
Key string `json:"key"`
PrevValue string `json:"prevValue"`
PrevIndex uint64 `json:"prevIndex"`
}
func (c *CompareAndDeleteCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *CompareAndDeleteCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Method: "DELETE",
Path: StorePath(c.Key),
PrevValue: c.PrevValue,
PrevIndex: c.PrevIndex,
}
return req2.Marshal()
}
type CompareAndSwapCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
PrevValue string `json:"prevValue"`
PrevIndex uint64 `json:"prevIndex"`
}
func (c *CompareAndSwapCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *CompareAndSwapCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Method: "PUT",
Path: StorePath(c.Key),
Val: c.Value,
PrevValue: c.PrevValue,
PrevIndex: c.PrevIndex,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
return req2.Marshal()
}
type CreateCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
Unique bool `json:"unique"`
Dir bool `json:"dir"`
}
func (c *CreateCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *CreateCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Path: StorePath(c.Key),
Dir: c.Dir,
Val: c.Value,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
if c.Unique {
req2.Method = "POST"
} else {
var prevExist = true
req2.Method = "PUT"
req2.PrevExist = &prevExist
}
return req2.Marshal()
}
type DeleteCommand struct {
Key string `json:"key"`
Recursive bool `json:"recursive"`
Dir bool `json:"dir"`
}
func (c *DeleteCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *DeleteCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Method: "DELETE",
Path: StorePath(c.Key),
Dir: c.Dir,
Recursive: c.Recursive,
}
return req2.Marshal()
}
type SetCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
Dir bool `json:"dir"`
}
func (c *SetCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *SetCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Method: "PUT",
Path: StorePath(c.Key),
Dir: c.Dir,
Val: c.Value,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
return req2.Marshal()
}
type UpdateCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
}
func (c *UpdateCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *UpdateCommand) Data2() ([]byte, error) {
exist := true
req2 := &etcdserverpb.Request{
Method: "PUT",
Path: StorePath(c.Key),
Val: c.Value,
PrevExist: &exist,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
return req2.Marshal()
}
type SyncCommand struct {
Time time.Time `json:"time"`
}
func (c *SyncCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *SyncCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Method: "SYNC",
Time: c.Time.UnixNano(),
}
return req2.Marshal()
}
type DefaultJoinCommand struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
}
type DefaultLeaveCommand struct {
Name string `json:"name"`
id uint64
}
type NOPCommand struct{}
//TODO(bcwaldon): Why is CommandName here?
func (c NOPCommand) CommandName() string {
return "raft:nop"
}
func (c *NOPCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *NOPCommand) Data2() ([]byte, error) {
return nil, nil
}
func Entries4To2(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) {
ents4Len := len(ents4)
if ents4Len == 0 {
return nil, nil
}
startIndex := ents4[0].GetIndex()
for i, e := range ents4[1:] {
eIndex := e.GetIndex()
// ensure indexes are monotonically increasing
wantIndex := startIndex + uint64(i+1)
if wantIndex != eIndex {
return nil, fmt.Errorf("skipped log index %d", wantIndex)
}
}
raftMap := make(map[string]uint64)
ents2 := make([]raftpb.Entry, 0)
for i, e := range ents4 {
ent, err := toEntry2(e, raftMap)
if err != nil {
log.Fatalf("Error converting entry %d, %s", i, err)
} else {
ents2 = append(ents2, *ent)
}
}
return ents2, nil
}
func toEntry2(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry, error) {
cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand(), raftMap)
if err != nil {
return nil, err
}
data, err := cmd4.Data2()
if err != nil {
return nil, err
}
ent2 := raftpb.Entry{
Term: ent4.GetTerm() + termOffset4to2,
Index: ent4.GetIndex(),
Type: cmd4.Type2(),
Data: data,
}
return &ent2, nil
}
func generateNodeMember(name, rafturl, etcdurl string) *member {
pURLs, err := types.NewURLs([]string{rafturl})
if err != nil {
log.Fatalf("Invalid Raft URL %s -- this log could never have worked", rafturl)
}
m := NewMember(name, pURLs, etcdDefaultClusterName)
m.ClientURLs = []string{etcdurl}
return m
}