diff --git a/etcdctlv3/command/compaction.go b/etcdctlv3/command/compaction.go new file mode 100644 index 000000000..6b697c1de --- /dev/null +++ b/etcdctlv3/command/compaction.go @@ -0,0 +1,55 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "strconv" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// NewCompactionCommand returns the CLI command for "compaction". +func NewCompactionCommand() cli.Command { + return cli.Command{ + Name: "compaction", + Action: func(c *cli.Context) { + compactionCommandFunc(c) + }, + } +} + +// compactionCommandFunc executes the "compaction" command. +func compactionCommandFunc(c *cli.Context) { + if len(c.Args()) != 1 { + panic("bad arg") + } + + rev, err := strconv.ParseInt(c.Args()[0], 10, 64) + if err != nil { + panic("bad arg") + } + + conn, err := grpc.Dial(c.GlobalString("endpoint")) + if err != nil { + panic(err) + } + etcd := pb.NewEtcdClient(conn) + req := &pb.CompactionRequest{Revision: rev} + + etcd.Compact(context.Background(), req) +} diff --git a/etcdctlv3/main.go b/etcdctlv3/main.go index 00b162d92..bb9fadacd 100644 --- a/etcdctlv3/main.go +++ b/etcdctlv3/main.go @@ -35,6 +35,7 @@ func main() { command.NewPutCommand(), command.NewDeleteRangeCommand(), command.NewTxnCommand(), + command.NewCompactionCommand(), } app.Run(os.Args) diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index 513d25db2..d00cc6645 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -84,7 +84,12 @@ func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e } func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { - panic("not implemented") + resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Compaction: r}) + if err != nil { + err = togRPCError(err) + } + + return resp.(*pb.CompactionResponse), nil } func checkRangeRequest(r *pb.RangeRequest) error { diff --git a/etcdserver/etcdserverpb/raft_internal.pb.go b/etcdserver/etcdserverpb/raft_internal.pb.go index fb79e9399..25773a69b 100644 --- a/etcdserver/etcdserverpb/raft_internal.pb.go +++ b/etcdserver/etcdserverpb/raft_internal.pb.go @@ -23,6 +23,7 @@ type InternalRaftRequest struct { Put *PutRequest `protobuf:"bytes,4,opt,name=put" json:"put,omitempty"` DeleteRange *DeleteRangeRequest `protobuf:"bytes,5,opt,name=delete_range" json:"delete_range,omitempty"` Txn *TxnRequest `protobuf:"bytes,6,opt,name=txn" json:"txn,omitempty"` + Compaction *CompactionRequest `protobuf:"bytes,7,opt,name=compaction" json:"compaction,omitempty"` } func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} } @@ -106,6 +107,16 @@ func (m *InternalRaftRequest) MarshalTo(data []byte) (int, error) { } i += n5 } + if m.Compaction != nil { + data[i] = 0x3a + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.Compaction.Size())) + n6, err := m.Compaction.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n6 + } return i, nil } @@ -180,6 +191,10 @@ func (m *InternalRaftRequest) Size() (n int) { l = m.Txn.Size() n += 1 + l + sovRaftInternal(uint64(l)) } + if m.Compaction != nil { + l = m.Compaction.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } return n } @@ -387,6 +402,36 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error { return err } iNdEx = postIndex + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Compaction", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRaftInternal + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Compaction == nil { + m.Compaction = &CompactionRequest{} + } + if err := m.Compaction.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: var sizeOfWire int for { diff --git a/etcdserver/etcdserverpb/raft_internal.proto b/etcdserver/etcdserverpb/raft_internal.proto index b774b6cb4..6b9adc6c7 100644 --- a/etcdserver/etcdserverpb/raft_internal.proto +++ b/etcdserver/etcdserverpb/raft_internal.proto @@ -19,6 +19,7 @@ message InternalRaftRequest { PutRequest put = 4; DeleteRangeRequest delete_range = 5; TxnRequest txn = 6; + CompactionRequest compaction = 7; } message EmptyResponse { diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 957ac5cbc..c90d4e5db 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -60,6 +60,8 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { return applyDeleteRange(s.kv, r.DeleteRange) case r.Txn != nil: return applyTxn(s.kv, r.Txn) + case r.Compaction != nil: + return applyCompaction(s.kv, r.Compaction) default: panic("not implemented") } @@ -128,6 +130,18 @@ func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse { return txnResp } +func applyCompaction(kv dstorage.KV, compaction *pb.CompactionRequest) *pb.CompactionResponse { + resp := &pb.CompactionResponse{} + resp.Header = &pb.ResponseHeader{} + err := kv.Compact(compaction.Revision) + if err != nil { + panic("handle error") + } + // get the current revision. which key to get is not important. + _, resp.Header.Revision, _ = kv.Range([]byte("compaction"), nil, 1, 0) + return resp +} + func applyUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion { switch { case union.RequestRange != nil: