integration/clientv3: fix 4 API misusage in test functions
parent
53f15caf73
commit
f00394e384
|
@ -16,6 +16,7 @@ package integration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -392,18 +393,22 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
donec := make(chan struct{})
|
errMsgCh := make(chan string, 1)
|
||||||
go func() {
|
go func() {
|
||||||
_, err := cli.Revoke(context.TODO(), leaseID)
|
_, err := cli.Revoke(context.TODO(), leaseID)
|
||||||
if !clientv3.IsConnCanceled(err) {
|
if !clientv3.IsConnCanceled(err) {
|
||||||
t.Fatalf("expected %v or server unavailable, got %v", context.Canceled, err)
|
errMsgCh <- fmt.Sprintf("expected %v or server unavailable, got %v", context.Canceled, err)
|
||||||
|
} else {
|
||||||
|
errMsgCh <- ""
|
||||||
}
|
}
|
||||||
close(donec)
|
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-time.After(integration.RequestWaitTimeout):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("le.Revoke took too long")
|
t.Fatal("le.Revoke took too long")
|
||||||
case <-donec:
|
case errMsg := <-errMsgCh:
|
||||||
|
if errMsg != "" {
|
||||||
|
t.Fatalf(errMsg)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1575,12 +1575,16 @@ func TestLeasingTxnAtomicCache(t *testing.T) {
|
||||||
var wgPutters, wgGetters sync.WaitGroup
|
var wgPutters, wgGetters sync.WaitGroup
|
||||||
wgPutters.Add(numPutters)
|
wgPutters.Add(numPutters)
|
||||||
wgGetters.Add(numGetters)
|
wgGetters.Add(numGetters)
|
||||||
|
txnerrCh := make(chan error, 1)
|
||||||
|
|
||||||
f := func() {
|
f := func() {
|
||||||
defer wgPutters.Done()
|
defer wgPutters.Done()
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
if _, txnerr := lkv.Txn(context.TODO()).Then(puts...).Commit(); err != nil {
|
if _, txnerr := lkv.Txn(context.TODO()).Then(puts...).Commit(); txnerr != nil {
|
||||||
t.Fatal(txnerr)
|
select {
|
||||||
|
case txnerrCh <- txnerr:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1619,6 +1623,11 @@ func TestLeasingTxnAtomicCache(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
wgPutters.Wait()
|
wgPutters.Wait()
|
||||||
|
select {
|
||||||
|
case txnerr := <-txnerrCh:
|
||||||
|
t.Fatal(txnerr)
|
||||||
|
default:
|
||||||
|
}
|
||||||
close(donec)
|
close(donec)
|
||||||
wgGetters.Wait()
|
wgGetters.Wait()
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,13 +140,11 @@ func TestElectionFailover(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// next leader
|
// next leader
|
||||||
electedc := make(chan struct{})
|
electedErrC := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
ee := concurrency.NewElection(ss[1], "test-election")
|
ee := concurrency.NewElection(ss[1], "test-election")
|
||||||
if eer := ee.Campaign(context.TODO(), "bar"); eer != nil {
|
eer := ee.Campaign(context.TODO(), "bar")
|
||||||
t.Fatal(eer)
|
electedErrC <- eer // If eer != nil, the test will fail by calling t.Fatal(eer)
|
||||||
}
|
|
||||||
electedc <- struct{}{}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// invoke leader failover
|
// invoke leader failover
|
||||||
|
@ -166,7 +164,10 @@ func TestElectionFailover(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// leader must ack election (otherwise, Campaign may see closed conn)
|
// leader must ack election (otherwise, Campaign may see closed conn)
|
||||||
<-electedc
|
eer := <-electedErrC
|
||||||
|
if eer != nil {
|
||||||
|
t.Fatal(eer)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestElectionSessionRelock ensures that campaigning twice on the same election
|
// TestElectionSessionRelock ensures that campaigning twice on the same election
|
||||||
|
|
|
@ -456,12 +456,14 @@ func TestV3TxnCmpHeaderRev(t *testing.T) {
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
// Concurrently put a key with a txn comparing on it.
|
// Concurrently put a key with a txn comparing on it.
|
||||||
revc := make(chan int64, 1)
|
revc := make(chan int64, 1)
|
||||||
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(revc)
|
defer close(revc)
|
||||||
pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")}
|
pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")}
|
||||||
presp, err := kvc.Put(context.TODO(), pr)
|
presp, err := kvc.Put(context.TODO(), pr)
|
||||||
|
errCh <- err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
return
|
||||||
}
|
}
|
||||||
revc <- presp.Header.Revision
|
revc <- presp.Header.Revision
|
||||||
}()
|
}()
|
||||||
|
@ -485,6 +487,9 @@ func TestV3TxnCmpHeaderRev(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
prev := <-revc
|
prev := <-revc
|
||||||
|
if err := <-errCh; err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
// put followed txn; should eval to false
|
// put followed txn; should eval to false
|
||||||
if prev > tresp.Header.Revision && !tresp.Succeeded {
|
if prev > tresp.Header.Revision && !tresp.Succeeded {
|
||||||
t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp)
|
t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp)
|
||||||
|
|
Loading…
Reference in New Issue