Compare commits

...

13 Commits

Author SHA1 Message Date
Gyu-Ho Lee
41e52ebc22 version: bump to 3.1.4
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-22 09:46:23 -07:00
Xiang
7bb538d4d4 backend: add FillPercent option 2017-03-21 12:12:32 -07:00
Gyu-Ho Lee
1622782e49 integration: ensure 'StopNotify' on publish error
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-21 12:12:13 -07:00
Gyu-Ho Lee
99b47e0c1e etcdmain: handle StopNotify when ErrStopped aborted publish
Fix https://github.com/coreos/etcd/issues/7512.

If a server starts and aborts due to config error,
it is possible to get stuck in ReadyNotify waits.
This adds select case to get notified on stop channel.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-21 12:10:36 -07:00
Anthony Romano
350d0cd211 ctlv3: have "protobuf" in output help string instead of "proto"
Fixes #7538
2017-03-20 12:40:25 -07:00
Jonathan Sokolowski
72f37ff79a embed: Clear default initial cluster
NewConfig() should sets initial cluster from name but we should clear it
in the event that another discovery option has been specified.

Fixes #7516
2017-03-18 07:56:18 -07:00
Gyu-Ho Lee
3221454cab etcdserver: remove possibly compacted entry look-up
Fix https://github.com/coreos/etcd/issues/7470.

This patch removes unnecessary term look-up in
'createMergedSnapshotMessage', which can trigger panic
if raft entry at etcdProgress.appliedi got compacted
by subsequent 'MsgSnap' messages--if a follower is
being (in this case, network latency spikes) slow, it
could receive subsequent 'MsgSnap' requests from leader.

etcd server-side 'applyAll' routine and raft's Ready
processing routine becomes asynchronous after raft
entries are persisted. And given that raft Ready routine
takes less time to finish, it is possible that second
'MsgSnap' is being handled, while the slow 'applyAll'
is still processing the first(old) 'MsgSnap'. Then raft
Ready routine can compact the log entries at future
index to 'applyAll'. That is how 'createMergedSnapshotMessage'
tried to look up raft term with outdated etcdProgress.appliedi.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-18 07:56:18 -07:00
Anthony Romano
4a1bffdbc6 clientv3: close open watch channel if substream is closing on reconnect
If substream is closing but outc is still open while reconnecting, then outc
would only be closed once the watch client would connect or once the watch
client is closed. This was leading to deadlocks in the proxy tests. Instead,
close immediately if the context is canceled.

Fixes #7503
2017-03-18 07:56:18 -07:00
Anthony Romano
9d9be2bc86 ctlv3: ensure synced member list before printing env vars on member add
In cases of multiple endpoints, it's possible member add would get a its
member list from a member that has not yet recognized the membership
update. Instead, confirm that the member list response is from the
member that acked the member add or from a member that has synced
with the cluster following the member add.

Fixes #7498
2017-03-18 07:56:18 -07:00
Gyu-Ho Lee
e5462f74f1 auth: get rid of deadlocking channel passing scheme in simpleTokenTTL
Cherry-picked from 1b1fabef8f.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-18 07:56:05 -07:00
Gyu-Ho Lee
c68c1d9344 discovery: fix print format
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-17 14:21:57 -07:00
Anthony Romano
6ed56cd723 auth: nil check AuthInfo when checking admin permissions
If the context does not include auth information, get authinfo will
return a nil auth info and a nil error. This is then passed to
IsAdminPermitted, which would dereference the nil auth info.
2017-03-17 14:21:39 -07:00
Gyu-Ho Lee
a3c6f6bf81 version: bump up to 3.1.3+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-17 14:21:15 -07:00
15 changed files with 143 additions and 62 deletions

View File

@@ -21,33 +21,33 @@ import (
"crypto/rand"
"math/big"
"strings"
"sync"
"time"
)
const (
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
defaultSimpleTokenLength = 16
)
// var for testing purposes
var (
simpleTokenTTL = 5 * time.Minute
simpleTokenTTLResolution = 1 * time.Second
)
type simpleTokenTTLKeeper struct {
tokens map[string]time.Time
addSimpleTokenCh chan string
resetSimpleTokenCh chan string
deleteSimpleTokenCh chan string
stopCh chan chan struct{}
deleteTokenFunc func(string)
tokensMu sync.Mutex
tokens map[string]time.Time
stopCh chan chan struct{}
deleteTokenFunc func(string)
}
func NewSimpleTokenTTLKeeper(deletefunc func(string)) *simpleTokenTTLKeeper {
stk := &simpleTokenTTLKeeper{
tokens: make(map[string]time.Time),
addSimpleTokenCh: make(chan string, 1),
resetSimpleTokenCh: make(chan string, 1),
deleteSimpleTokenCh: make(chan string, 1),
stopCh: make(chan chan struct{}),
deleteTokenFunc: deletefunc,
tokens: make(map[string]time.Time),
stopCh: make(chan chan struct{}),
deleteTokenFunc: deletefunc,
}
go stk.run()
return stk
@@ -61,37 +61,34 @@ func (tm *simpleTokenTTLKeeper) stop() {
}
func (tm *simpleTokenTTLKeeper) addSimpleToken(token string) {
tm.addSimpleTokenCh <- token
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
}
func (tm *simpleTokenTTLKeeper) resetSimpleToken(token string) {
tm.resetSimpleTokenCh <- token
if _, ok := tm.tokens[token]; ok {
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
}
}
func (tm *simpleTokenTTLKeeper) deleteSimpleToken(token string) {
tm.deleteSimpleTokenCh <- token
delete(tm.tokens, token)
}
func (tm *simpleTokenTTLKeeper) run() {
tokenTicker := time.NewTicker(simpleTokenTTLResolution)
defer tokenTicker.Stop()
for {
select {
case t := <-tm.addSimpleTokenCh:
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
case t := <-tm.resetSimpleTokenCh:
if _, ok := tm.tokens[t]; ok {
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
}
case t := <-tm.deleteSimpleTokenCh:
delete(tm.tokens, t)
case <-tokenTicker.C:
nowtime := time.Now()
tm.tokensMu.Lock()
for t, tokenendtime := range tm.tokens {
if nowtime.After(tokenendtime) {
tm.deleteTokenFunc(t)
delete(tm.tokens, t)
}
}
tm.tokensMu.Unlock()
case waitCh := <-tm.stopCh:
tm.tokens = make(map[string]time.Time)
waitCh <- struct{}{}
@@ -116,6 +113,7 @@ func (as *authStore) GenSimpleToken() (string, error) {
}
func (as *authStore) assignSimpleTokenToUser(username, token string) {
as.simpleTokenKeeper.tokensMu.Lock()
as.simpleTokensMu.Lock()
_, ok := as.simpleTokens[token]
@@ -126,16 +124,21 @@ func (as *authStore) assignSimpleTokenToUser(username, token string) {
as.simpleTokens[token] = username
as.simpleTokenKeeper.addSimpleToken(token)
as.simpleTokensMu.Unlock()
as.simpleTokenKeeper.tokensMu.Unlock()
}
func (as *authStore) invalidateUser(username string) {
if as.simpleTokenKeeper == nil {
return
}
as.simpleTokenKeeper.tokensMu.Lock()
as.simpleTokensMu.Lock()
defer as.simpleTokensMu.Unlock()
for token, name := range as.simpleTokens {
if strings.Compare(name, username) == 0 {
delete(as.simpleTokens, token)
as.simpleTokenKeeper.deleteSimpleToken(token)
}
}
as.simpleTokensMu.Unlock()
as.simpleTokenKeeper.tokensMu.Unlock()
}

View File

@@ -168,13 +168,13 @@ type authStore struct {
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
simpleTokensMu sync.RWMutex
simpleTokens map[string]string // token -> username
simpleTokenKeeper *simpleTokenTTLKeeper
revision uint64
indexWaiter func(uint64) <-chan struct{}
// tokenSimple in v3.2+
indexWaiter func(uint64) <-chan struct{}
simpleTokenKeeper *simpleTokenTTLKeeper
simpleTokensMu sync.Mutex
simpleTokens map[string]string // token -> username
}
func newDeleterFunc(as *authStore) func(string) {
@@ -646,13 +646,16 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
}
func (as *authStore) AuthInfoFromToken(token string) (*AuthInfo, bool) {
as.simpleTokensMu.RLock()
defer as.simpleTokensMu.RUnlock()
t, ok := as.simpleTokens[token]
// same as '(t *tokenSimple) info' in v3.2+
as.simpleTokenKeeper.tokensMu.Lock()
as.simpleTokensMu.Lock()
username, ok := as.simpleTokens[token]
if ok {
as.simpleTokenKeeper.resetSimpleToken(token)
}
return &AuthInfo{Username: t, Revision: as.revision}, ok
as.simpleTokensMu.Unlock()
as.simpleTokenKeeper.tokensMu.Unlock()
return &AuthInfo{Username: username, Revision: as.revision}, ok
}
type permSlice []*authpb.Permission
@@ -764,6 +767,9 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
if !as.isAuthEnabled() {
return nil
}
if authInfo == nil {
return ErrUserEmpty
}
tx := as.be.BatchTx()
tx.Lock()

View File

@@ -694,6 +694,10 @@ func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan str
go func(ws *watcherStream) {
defer wg.Done()
if ws.closing {
if ws.initReq.ctx.Err() != nil && ws.outc != nil {
close(ws.outc)
ws.outc = nil
}
return
}
select {

View File

@@ -74,7 +74,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
shortHost := strings.TrimSuffix(srv.Target, ".")
urlHost := net.JoinHostPort(shortHost, port)
stringParts = append(stringParts, fmt.Sprintf("%s=%s://%s", n, scheme, urlHost))
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, scheme, urlHost)
plog.Noticef("got bootstrap from DNS for %s at %s://%s", service, scheme, urlHost)
if ok && url.Scheme != scheme {
plog.Errorf("bootstrap at %s from DNS for %s has scheme mismatch with expected peer %s", scheme+"://"+urlHost, service, url.String())
}

View File

@@ -229,6 +229,9 @@ func (cfg *configYAML) configFromFile(path string) error {
cfg.ACUrls = []url.URL(u)
}
if (cfg.Durl != "" || cfg.DNSCluster != "") && cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) {
cfg.InitialCluster = ""
}
if cfg.ClusterState == "" {
cfg.ClusterState = ClusterStateFlagNew
}

View File

@@ -107,7 +107,8 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
urls := strings.Split(memberPeerURLs, ",")
ctx, cancel := commandCtx(cmd)
resp, err := mustClientFromCmd(cmd).MemberAdd(ctx, urls)
cli := mustClientFromCmd(cmd)
resp, err := cli.MemberAdd(ctx, urls)
cancel()
if err != nil {
ExitWithError(ExitError, err)
@@ -118,12 +119,24 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
if _, ok := (display).(*simplePrinter); ok {
ctx, cancel = commandCtx(cmd)
listResp, err := mustClientFromCmd(cmd).MemberList(ctx)
cancel()
if err != nil {
ExitWithError(ExitError, err)
listResp, err := cli.MemberList(ctx)
// get latest member list; if there's failover new member might have outdated list
for {
if err != nil {
ExitWithError(ExitError, err)
}
if listResp.Header.MemberId == resp.Header.MemberId {
break
}
// quorum get to sync cluster list
gresp, gerr := cli.Get(ctx, "_")
if gerr != nil {
ExitWithError(ExitError, err)
}
resp.Header.MemberId = gresp.Header.MemberId
listResp, err = cli.MemberList(ctx)
}
cancel()
conf := []string{}
for _, memb := range listResp.Members {

View File

@@ -45,7 +45,7 @@ var (
func init() {
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, proto, simple, table)")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")
rootCmd.PersistentFlags().DurationVar(&globalFlags.DialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections")

View File

@@ -189,7 +189,10 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
return nil, nil, err
}
osutil.RegisterInterruptHandler(e.Server.Stop)
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
}
return e.Server.StopNotify(), e.Err(), nil
}

View File

@@ -594,6 +594,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
}
@@ -666,6 +667,7 @@ func (s *EtcdServer) run() {
ep := etcdProgress{
confState: snap.Metadata.ConfState,
snapi: snap.Metadata.Index,
appliedt: snap.Metadata.Term,
appliedi: snap.Metadata.Index,
}
@@ -765,7 +767,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
}
@@ -867,6 +869,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}
plog.Info("finished adding peers from new cluster configuration into network...")
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
@@ -888,7 +891,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
return
}
var shouldstop bool
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
@@ -1242,9 +1245,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
// apply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer.
// The given entries should not be empty.
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
var applied uint64
var shouldstop bool
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
for i := range es {
e := es[i]
switch e.Type {
@@ -1254,16 +1255,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState)
shouldstop = shouldstop || removedSelf
shouldStop = shouldStop || removedSelf
s.w.Trigger(cc.ID, err)
default:
plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
}
atomic.StoreUint64(&s.r.index, e.Index)
atomic.StoreUint64(&s.r.term, e.Term)
applied = e.Index
appliedt = e.Term
appliedi = e.Index
}
return applied, shouldstop
return appliedt, appliedi, shouldStop
}
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer

View File

@@ -613,7 +613,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
ents = append(ents, ent)
}
_, shouldStop := srv.apply(ents, &raftpb.ConfState{})
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
if !shouldStop {
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
}

View File

@@ -16,7 +16,6 @@ package etcdserver
import (
"io"
"log"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
@@ -26,12 +25,7 @@ import (
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
// as ReadCloser.
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
snapt, err := s.r.raftStorage.Term(snapi)
if err != nil {
log.Panicf("get term should never fail: %v", err)
}
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
// get a snapshot of v2 store as []byte
clone := s.store.Clone()
d, err := clone.SaveNoCopy()

View File

@@ -449,6 +449,8 @@ type member struct {
grpcServer *grpc.Server
grpcAddr string
grpcBridge *bridge
keepDataDirTerminate bool
}
func (m *member) GRPCAddr() string { return m.grpcAddr }
@@ -746,8 +748,10 @@ func (m *member) Restart(t *testing.T) error {
func (m *member) Terminate(t *testing.T) {
plog.Printf("terminating %s (%s)", m.Name, m.grpcAddr)
m.Close()
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
if !m.keepDataDirTerminate {
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
}
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
}

View File

@@ -27,6 +27,7 @@ import (
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/pkg/capnslog"
"golang.org/x/net/context"
)
@@ -441,6 +442,51 @@ func TestRejectUnhealthyRemove(t *testing.T) {
}
}
// TestRestartRemoved ensures that restarting removed member must exit
// if 'initial-cluster-state' is set 'new' and old data directory still exists
// (see https://github.com/coreos/etcd/issues/7512 for more).
func TestRestartRemoved(t *testing.T) {
defer testutil.AfterTest(t)
capnslog.SetGlobalLogLevel(capnslog.INFO)
// 1. start single-member cluster
c := NewCluster(t, 1)
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
c.Launch(t)
defer c.Terminate(t)
// 2. add a new member
c.AddMember(t)
c.WaitLeader(t)
oldm := c.Members[0]
oldm.keepDataDirTerminate = true
// 3. remove first member, shut down without deleting data
if err := c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
t.Fatalf("expected to remove member, got error %v", err)
}
c.WaitLeader(t)
// 4. restart first member with 'initial-cluster-state=new'
// wrong config, expects exit within ReqTimeout
oldm.ServerConfig.NewCluster = false
if err := oldm.Restart(t); err != nil {
t.Fatalf("unexpected ForceRestart error: %v", err)
}
defer func() {
oldm.Close()
os.RemoveAll(oldm.ServerConfig.DataDir)
}()
select {
case <-oldm.s.StopNotify():
case <-time.After(time.Minute):
t.Fatalf("removed member didn't exit within %v", time.Minute)
}
}
// clusterMustProgress ensures that cluster can make progress. It creates
// a random key first, and check the new key could be got from all client urls
// of the cluster.

View File

@@ -303,6 +303,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
tmpb.FillPercent = 0.9 // for seq write in for each
if berr != nil {
return berr
}
@@ -319,6 +320,8 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return err
}
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for seq write in for each
count = 0
}
return tmpb.Put(k, v)

View File

@@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.1.3"
Version = "3.1.4"
APIVersion = "unknown"
// Git SHA Value will be set during build