Update conssitent_index when applying fails

When clients have no permission to perform whatever operation, then
the applying may fail. We should also move consistent_index forward
in this case, otherwise the consitent_index may smaller than the
snapshot index.
dependabot/go_modules/go.uber.org/atomic-1.10.0
ahrtr 2022-04-14 09:10:51 +08:00
parent 4555fc3998
commit 6eef7ede40
2 changed files with 35 additions and 8 deletions

View File

@ -1811,18 +1811,19 @@ func (s *EtcdServer) apply(
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
applyV3Performed := false
defer func() {
// The txPostLock callback will not get called in this case,
// so we should set the consistent index directly.
if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
}
}()
var ar *applyResult
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
defer func() {
// The txPostLockInsideApplyHook will not get called in some cases,
// in which we should move the consistent index forward directly.
if !applyV3Performed || (ar != nil && ar.err != nil) {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
}
}()
}
s.lg.Debug("apply entry normal",
zap.Uint64("consistent-index", index),
@ -1867,7 +1868,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
id = raftReq.Header.ID
}
var ar *applyResult
needResult := s.w.IsRegistered(id)
if needResult || !noSideEffect(&raftReq) {
if !needResult && raftReq.Txn != nil {

View File

@ -47,6 +47,33 @@ func TestV3AuthEmptyUserGet(t *testing.T) {
}
}
// TestV3AuthEmptyUserPut ensures that a put with an empty user will return an empty user error,
// and the consistent_index should be moved forward even the apply-->Put fails.
func TestV3AuthEmptyUserPut(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 1,
SnapshotCount: 3,
})
defer clus.Terminate(t)
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
api := integration.ToGRPC(clus.Client(0))
authSetupRoot(t, api.Auth)
// The SnapshotCount is 3, so there must be at least 3 new snapshot files being created.
// The VERIFY logic will check whether the consistent_index >= last snapshot index on
// cluster terminating.
for i := 0; i < 10; i++ {
_, err := api.KV.Put(ctx, &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if !eqErrGRPC(err, rpctypes.ErrUserEmpty) {
t.Fatalf("got %v, expected %v", err, rpctypes.ErrUserEmpty)
}
}
}
// TestV3AuthTokenWithDisable tests that auth won't crash if
// given a valid token when authentication is disabled
func TestV3AuthTokenWithDisable(t *testing.T) {