etcdserver, storage: optionally wait for Compaction completion in RPC

release-3.0
Anthony Romano 2016-03-28 16:07:56 -07:00
parent 2deed74494
commit 4b35cb9462
8 changed files with 65 additions and 16 deletions

View File

@ -37,6 +37,10 @@ const (
type applyResult struct { type applyResult struct {
resp proto.Message resp proto.Message
err error err error
// physc signals the physical effect of the request has completed in addition
// to being logically reflected by the node. Currently only used for
// Compaction requests.
physc <-chan struct{}
} }
// applierV3 is the interface for processing V3 raft messages // applierV3 is the interface for processing V3 raft messages
@ -45,7 +49,7 @@ type applierV3 interface {
Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error)
DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error) Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)
@ -69,7 +73,7 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) *applyResult {
case r.Txn != nil: case r.Txn != nil:
ar.resp, ar.err = s.applyV3.Txn(r.Txn) ar.resp, ar.err = s.applyV3.Txn(r.Txn)
case r.Compaction != nil: case r.Compaction != nil:
ar.resp, ar.err = s.applyV3.Compaction(r.Compaction) ar.resp, ar.physc, ar.err = s.applyV3.Compaction(r.Compaction)
case r.LeaseCreate != nil: case r.LeaseCreate != nil:
ar.resp, ar.err = s.applyV3.LeaseCreate(r.LeaseCreate) ar.resp, ar.err = s.applyV3.LeaseCreate(r.LeaseCreate)
case r.LeaseRevoke != nil: case r.LeaseRevoke != nil:
@ -362,16 +366,16 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestUnion) *pb.R
} }
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) { func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
resp := &pb.CompactionResponse{} resp := &pb.CompactionResponse{}
resp.Header = &pb.ResponseHeader{} resp.Header = &pb.ResponseHeader{}
err := a.s.KV().Compact(compaction.Revision) ch, err := a.s.KV().Compact(compaction.Revision)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
// get the current revision. which key to get is not important. // get the current revision. which key to get is not important.
_, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0) _, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0)
return resp, err return resp, ch, err
} }
func (a *applierV3backend) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { func (a *applierV3backend) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {

View File

@ -765,6 +765,10 @@ func (m *TxnResponse) GetResponses() []*ResponseUnion {
// revision. // revision.
type CompactionRequest struct { type CompactionRequest struct {
Revision int64 `protobuf:"varint,1,opt,name=revision,proto3" json:"revision,omitempty"` Revision int64 `protobuf:"varint,1,opt,name=revision,proto3" json:"revision,omitempty"`
// physical is set so the RPC will wait until the compaction is physically
// applied to the local database such that compacted entries are totally
// removed from the backing store.
Physical bool `protobuf:"varint,2,opt,name=physical,proto3" json:"physical,omitempty"`
} }
func (m *CompactionRequest) Reset() { *m = CompactionRequest{} } func (m *CompactionRequest) Reset() { *m = CompactionRequest{} }
@ -3411,6 +3415,16 @@ func (m *CompactionRequest) MarshalTo(data []byte) (int, error) {
i++ i++
i = encodeVarintRpc(data, i, uint64(m.Revision)) i = encodeVarintRpc(data, i, uint64(m.Revision))
} }
if m.Physical {
data[i] = 0x10
i++
if m.Physical {
data[i] = 1
} else {
data[i] = 0
}
i++
}
return i, nil return i, nil
} }
@ -5318,6 +5332,9 @@ func (m *CompactionRequest) Size() (n int) {
if m.Revision != 0 { if m.Revision != 0 {
n += 1 + sovRpc(uint64(m.Revision)) n += 1 + sovRpc(uint64(m.Revision))
} }
if m.Physical {
n += 2
}
return n return n
} }
@ -7660,6 +7677,26 @@ func (m *CompactionRequest) Unmarshal(data []byte) error {
break break
} }
} }
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Physical", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
v |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Physical = bool(v != 0)
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipRpc(data[iNdEx:]) skippy, err := skipRpc(data[iNdEx:])

View File

@ -289,6 +289,10 @@ message TxnResponse {
// revision. // revision.
message CompactionRequest { message CompactionRequest {
int64 revision = 1; int64 revision = 1;
// physical is set so the RPC will wait until the compaction is physically
// applied to the local database such that compacted entries are totally
// removed from the backing store.
bool physical = 2;
} }
message CompactionResponse { message CompactionResponse {

View File

@ -97,6 +97,9 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
if err != nil { if err != nil {
return nil, err return nil, err
} }
if r.Physical && result.physc != nil {
<-result.physc
}
resp := result.resp.(*pb.CompactionResponse) resp := result.resp.(*pb.CompactionResponse)
if resp == nil { if resp == nil {
resp = &pb.CompactionResponse{} resp = &pb.CompactionResponse{}

View File

@ -67,7 +67,7 @@ type KV interface {
TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
Compact(rev int64) error Compact(rev int64) (<-chan struct{}, error)
// Hash retrieves the hash of KV state. // Hash retrieves the hash of KV state.
// This method is designed for consistency checking purpose. // This method is designed for consistency checking purpose.

View File

@ -186,7 +186,7 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) {
defer cleanup(s, b, tmpPath) defer cleanup(s, b, tmpPath)
put3TestKVs(s) put3TestKVs(s)
if err := s.Compact(4); err != nil { if _, err := s.Compact(4); err != nil {
t.Fatalf("compact error (%v)", err) t.Fatalf("compact error (%v)", err)
} }
@ -566,7 +566,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
}, },
} }
for i, tt := range tests { for i, tt := range tests {
err := s.Compact(tt.rev) _, err := s.Compact(tt.rev)
if err != nil { if err != nil {
t.Errorf("#%d: unexpect compact error %v", i, err) t.Errorf("#%d: unexpect compact error %v", i, err)
} }
@ -602,7 +602,7 @@ func TestKVCompactBad(t *testing.T) {
{100, ErrFutureRev}, {100, ErrFutureRev},
} }
for i, tt := range tests { for i, tt := range tests {
err := s.Compact(tt.rev) _, err := s.Compact(tt.rev)
if err != tt.werr { if err != tt.werr {
t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr) t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr)
} }

View File

@ -218,14 +218,14 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
return n, rev, nil return n, rev, nil
} }
func (s *store) Compact(rev int64) error { func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if rev <= s.compactMainRev { if rev <= s.compactMainRev {
return ErrCompacted return nil, ErrCompacted
} }
if rev > s.currentRev.main { if rev > s.currentRev.main {
return ErrFutureRev return nil, ErrFutureRev
} }
start := time.Now() start := time.Now()
@ -243,8 +243,9 @@ func (s *store) Compact(rev int64) error {
s.b.ForceCommit() s.b.ForceCommit()
keep := s.kvindex.Compact(rev) keep := s.kvindex.Compact(rev)
ch := make(chan struct{})
var j = func(ctx context.Context) { var j = func(ctx context.Context) {
defer close(ch)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -256,7 +257,7 @@ func (s *store) Compact(rev int64) error {
s.fifoSched.Schedule(j) s.fifoSched.Schedule(j)
indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond)) indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond))
return nil return ch, nil
} }
func (s *store) Hash() (uint32, error) { func (s *store) Hash() (uint32, error) {

View File

@ -234,7 +234,7 @@ func TestWatchCompacted(t *testing.T) {
for i := 0; i < maxRev; i++ { for i := 0; i < maxRev; i++ {
s.Put(testKey, testValue, lease.NoLease) s.Put(testKey, testValue, lease.NoLease)
} }
err := s.Compact(compactRev) _, err := s.Compact(compactRev)
if err != nil { if err != nil {
t.Fatalf("failed to compact kv (%v)", err) t.Fatalf("failed to compact kv (%v)", err)
} }