diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 5455b23a7..db0a47d86 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -37,6 +37,10 @@ const ( type applyResult struct { resp proto.Message err error + // physc signals the physical effect of the request has completed in addition + // to being logically reflected by the node. Currently only used for + // Compaction requests. + physc <-chan struct{} } // applierV3 is the interface for processing V3 raft messages @@ -45,7 +49,7 @@ type applierV3 interface { Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) - Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) + Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error) @@ -69,7 +73,7 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) *applyResult { case r.Txn != nil: ar.resp, ar.err = s.applyV3.Txn(r.Txn) case r.Compaction != nil: - ar.resp, ar.err = s.applyV3.Compaction(r.Compaction) + ar.resp, ar.physc, ar.err = s.applyV3.Compaction(r.Compaction) case r.LeaseCreate != nil: ar.resp, ar.err = s.applyV3.LeaseCreate(r.LeaseCreate) case r.LeaseRevoke != nil: @@ -362,16 +366,16 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestUnion) *pb.R } -func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) { +func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) { resp := &pb.CompactionResponse{} resp.Header = &pb.ResponseHeader{} - err := a.s.KV().Compact(compaction.Revision) + ch, err := a.s.KV().Compact(compaction.Revision) if err != nil { - return nil, err + return nil, nil, err } // get the current revision. which key to get is not important. _, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0) - return resp, err + return resp, ch, err } func (a *applierV3backend) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index 0f5b47851..0da5800c3 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -765,6 +765,10 @@ func (m *TxnResponse) GetResponses() []*ResponseUnion { // revision. type CompactionRequest struct { Revision int64 `protobuf:"varint,1,opt,name=revision,proto3" json:"revision,omitempty"` + // physical is set so the RPC will wait until the compaction is physically + // applied to the local database such that compacted entries are totally + // removed from the backing store. + Physical bool `protobuf:"varint,2,opt,name=physical,proto3" json:"physical,omitempty"` } func (m *CompactionRequest) Reset() { *m = CompactionRequest{} } @@ -3411,6 +3415,16 @@ func (m *CompactionRequest) MarshalTo(data []byte) (int, error) { i++ i = encodeVarintRpc(data, i, uint64(m.Revision)) } + if m.Physical { + data[i] = 0x10 + i++ + if m.Physical { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } return i, nil } @@ -5318,6 +5332,9 @@ func (m *CompactionRequest) Size() (n int) { if m.Revision != 0 { n += 1 + sovRpc(uint64(m.Revision)) } + if m.Physical { + n += 2 + } return n } @@ -7660,6 +7677,26 @@ func (m *CompactionRequest) Unmarshal(data []byte) error { break } } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Physical", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Physical = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRpc(data[iNdEx:]) diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index bd5ad7fe5..e60cc77a9 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -289,6 +289,10 @@ message TxnResponse { // revision. message CompactionRequest { int64 revision = 1; + // physical is set so the RPC will wait until the compaction is physically + // applied to the local database such that compacted entries are totally + // removed from the backing store. + bool physical = 2; } message CompactionResponse { diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 0d4da0149..393dbdc52 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -97,6 +97,9 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. if err != nil { return nil, err } + if r.Physical && result.physc != nil { + <-result.physc + } resp := result.resp.(*pb.CompactionResponse) if resp == nil { resp = &pb.CompactionResponse{} diff --git a/storage/kv.go b/storage/kv.go index cd63a354c..9f934b2e7 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -67,7 +67,7 @@ type KV interface { TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) - Compact(rev int64) error + Compact(rev int64) (<-chan struct{}, error) // Hash retrieves the hash of KV state. // This method is designed for consistency checking purpose. diff --git a/storage/kv_test.go b/storage/kv_test.go index 0ed0cf8b2..0d4469a05 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -186,7 +186,7 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) { defer cleanup(s, b, tmpPath) put3TestKVs(s) - if err := s.Compact(4); err != nil { + if _, err := s.Compact(4); err != nil { t.Fatalf("compact error (%v)", err) } @@ -566,7 +566,7 @@ func TestKVCompactReserveLastValue(t *testing.T) { }, } for i, tt := range tests { - err := s.Compact(tt.rev) + _, err := s.Compact(tt.rev) if err != nil { t.Errorf("#%d: unexpect compact error %v", i, err) } @@ -602,7 +602,7 @@ func TestKVCompactBad(t *testing.T) { {100, ErrFutureRev}, } for i, tt := range tests { - err := s.Compact(tt.rev) + _, err := s.Compact(tt.rev) if err != tt.werr { t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr) } diff --git a/storage/kvstore.go b/storage/kvstore.go index a86aa8e26..3760fde61 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -218,14 +218,14 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return n, rev, nil } -func (s *store) Compact(rev int64) error { +func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.mu.Lock() defer s.mu.Unlock() if rev <= s.compactMainRev { - return ErrCompacted + return nil, ErrCompacted } if rev > s.currentRev.main { - return ErrFutureRev + return nil, ErrFutureRev } start := time.Now() @@ -243,8 +243,9 @@ func (s *store) Compact(rev int64) error { s.b.ForceCommit() keep := s.kvindex.Compact(rev) - + ch := make(chan struct{}) var j = func(ctx context.Context) { + defer close(ch) select { case <-ctx.Done(): return @@ -256,7 +257,7 @@ func (s *store) Compact(rev int64) error { s.fifoSched.Schedule(j) indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond)) - return nil + return ch, nil } func (s *store) Hash() (uint32, error) { diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 8c701b6d0..95f133e6a 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -234,7 +234,7 @@ func TestWatchCompacted(t *testing.T) { for i := 0; i < maxRev; i++ { s.Put(testKey, testValue, lease.NoLease) } - err := s.Compact(compactRev) + _, err := s.Compact(compactRev) if err != nil { t.Fatalf("failed to compact kv (%v)", err) } diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index d12c754e0..36a8ff4ca 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -322,6 +322,11 @@ func (c *cluster) compactKV(rev int64) error { conn *grpc.ClientConn err error ) + + if rev <= 0 { + return nil + } + for _, u := range c.GRPCURLs { conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) if err != nil { @@ -329,7 +334,7 @@ func (c *cluster) compactKV(rev int64) error { } kvc := pb.NewKVClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - _, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev}) + _, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}) cancel() conn.Close() if err == nil { @@ -338,3 +343,33 @@ func (c *cluster) compactKV(rev int64) error { } return err } + +func (c *cluster) checkCompact(rev int64) error { + if rev == 0 { + return nil + } + for _, u := range c.GRPCURLs { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{u}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1)) + wr, ok := <-wch + cancel() + + cli.Close() + + if !ok { + return fmt.Errorf("watch channel terminated") + } + if wr.CompactRevision != rev { + return fmt.Errorf("got compact revision %v, wanted %v", wr.CompactRevision, rev) + } + } + return nil +} diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index 2284724f9..79d916798 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -149,8 +149,15 @@ func (tt *tester) runLoop() { } plog.Printf("[round#%d] compacted storage", i) - // TODO: make sure compaction is finished - time.Sleep(30 * time.Second) + plog.Printf("[round#%d] check compaction at %d", i, revToCompact) + if err := tt.cluster.checkCompact(revToCompact); err != nil { + plog.Printf("[round#%d] checkCompact error (%v)", i, err) + if err := tt.cleanup(i, 0); err != nil { + plog.Printf("[round#%d] cleanup error: %v", i, err) + return + } + } + plog.Printf("[round#%d] confirmed compaction at %d", i, revToCompact) } }