etcd-runner: add barrier, observe !ok handling, and election name arg to election-runner.
parent
fa85445ef8
commit
d57ad8ec8d
|
@ -20,32 +20,34 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/coreos/etcd/clientv3/concurrency"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// NewElectionCommand returns the cobra command for "election runner".
|
||||
func NewElectionCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "election",
|
||||
Use: "election [election name (defaults to 'elector')]",
|
||||
Short: "Performs election operation",
|
||||
Run: runElectionFunc,
|
||||
}
|
||||
cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run")
|
||||
cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections")
|
||||
return cmd
|
||||
}
|
||||
|
||||
func runElectionFunc(cmd *cobra.Command, args []string) {
|
||||
if len(args) > 0 {
|
||||
ExitWithError(ExitBadArgs, errors.New("election does not take any argument"))
|
||||
election := "elector"
|
||||
if len(args) == 1 {
|
||||
election = args[0]
|
||||
}
|
||||
if len(args) > 1 {
|
||||
ExitWithError(ExitBadArgs, errors.New("election takes at most one argument"))
|
||||
}
|
||||
|
||||
rcs := make([]roundClient, totalClientConnections)
|
||||
validatec, releasec := make(chan struct{}, len(rcs)), make(chan struct{}, len(rcs))
|
||||
for range rcs {
|
||||
releasec <- struct{}{}
|
||||
}
|
||||
|
||||
validatec := make(chan struct{}, len(rcs))
|
||||
// nextc closes when election is ready for next round.
|
||||
nextc := make(chan struct{})
|
||||
eps := endpointsFromFlag(cmd)
|
||||
dialTimeout := dialTimeoutFromCmd(cmd)
|
||||
|
||||
|
@ -53,6 +55,10 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
|
|||
v := fmt.Sprintf("%d", i)
|
||||
observedLeader := ""
|
||||
validateWaiters := 0
|
||||
var rcNextc chan struct{}
|
||||
setRcNextc := func() {
|
||||
rcNextc = nextc
|
||||
}
|
||||
|
||||
rcs[i].c = newClient(eps, dialTimeout)
|
||||
var (
|
||||
|
@ -65,18 +71,22 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
|
|||
break
|
||||
}
|
||||
}
|
||||
e := concurrency.NewElection(s, "electors")
|
||||
|
||||
rcs[i].acquire = func() error {
|
||||
<-releasec
|
||||
e := concurrency.NewElection(s, election)
|
||||
rcs[i].acquire = func() (err error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
donec := make(chan struct{})
|
||||
go func() {
|
||||
if ol, ok := <-e.Observe(ctx); ok {
|
||||
observedLeader = string(ol.Kvs[0].Value)
|
||||
if observedLeader != v {
|
||||
cancel()
|
||||
defer close(donec)
|
||||
for ctx.Err() == nil {
|
||||
if ol, ok := <-e.Observe(ctx); ok {
|
||||
observedLeader = string(ol.Kvs[0].Value)
|
||||
break
|
||||
}
|
||||
}
|
||||
if observedLeader != v {
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
err = e.Campaign(ctx, v)
|
||||
if err == nil {
|
||||
|
@ -85,18 +95,24 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
|
|||
if observedLeader == v {
|
||||
validateWaiters = len(rcs)
|
||||
}
|
||||
cancel()
|
||||
<-donec
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
}
|
||||
rcs[i].validate = func() error {
|
||||
if l, err := e.Leader(context.TODO()); err == nil && string(l.Kvs[0].Value) != observedLeader {
|
||||
return fmt.Errorf("expected leader %q, got %q", observedLeader, l)
|
||||
l, err := e.Leader(context.TODO())
|
||||
if err == nil && string(l.Kvs[0].Value) != observedLeader {
|
||||
return fmt.Errorf("expected leader %q, got %q", observedLeader, l.Kvs[0].Value)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
setRcNextc()
|
||||
validatec <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
@ -113,14 +129,15 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
|
|||
return err
|
||||
}
|
||||
if observedLeader == v {
|
||||
for range rcs {
|
||||
releasec <- struct{}{}
|
||||
}
|
||||
close(nextc)
|
||||
nextc = make(chan struct{})
|
||||
}
|
||||
<-rcNextc
|
||||
observedLeader = ""
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
doRounds(rcs, rounds)
|
||||
// each client creates 1 key from Campaign() and delete it from Resign()
|
||||
// a round involves in 2*len(rcs) requests.
|
||||
doRounds(rcs, rounds, 2*len(rcs))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue