Compare commits

...

55 Commits

Author SHA1 Message Date
Gyu-Ho Lee
e5b7ee2d03 version: bump up to 3.1.6
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-19 08:28:06 -07:00
Anthony Romano
a4c5731c38 ctlv3: keep lease as integer in fields printer
Output was giving %!d(string=) instead of the expected lease ID
value.
2017-04-19 08:27:52 -07:00
Gyu-Ho Lee
1f558ae678 integration: test auth API response header revision
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-17 20:06:14 -07:00
Hitoshi Mitake
df93627bbb etcdserver: fill-in Auth API Header in apply layer
Replacing "etcdserver: fill a response header in auth RPCs"
The revision should be set at the time of "apply",
not in later RPC layer.

Fix https://github.com/coreos/etcd/issues/7691

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-17 20:06:08 -07:00
Anthony Romano
a20295c65b auth: fix race on stopping simple token keeper
run goroutine was resetting a field for no reason and without holding a lock.
This patch cleans up the run goroutine management to make the start/stop path
less racey in general.
2017-04-14 16:52:25 -07:00
Hitoshi Mitake
9f7bb0df3a etcdserver: let Status() not require authentication
The information that can be obtained with the RPC doesn't need to be
protected.

Fix https://github.com/coreos/etcd/issues/7721
2017-04-13 15:56:26 -07:00
Gyu-Ho Lee
6a805e5222 test: do not run extra static checks on release branch
Things are usually already fixed in master branch
but not worth backporting.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-13 14:44:22 -07:00
Gyu-Ho Lee
38f79fa565 clientv3: fix gofmt warnings
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-13 14:44:22 -07:00
Anthony Romano
37a502cc88 integration: test requests with valid auth token but disabled auth
etcd was crashing since auth was assuming a token implies auth is enabled.
2017-04-13 14:44:22 -07:00
Anthony Romano
9be7fc5320 auth: protect simpleToken with single mutex and check if enabled
Dual locking doesn't really give a convincing performance improvement and
the lock ordering makes it impossible to safely check if the TTL keeper
is enabled or not.

Fixes #7722
2017-04-13 14:44:16 -07:00
Gyu-Ho Lee
288bccd288 pkg/transport: remove port in Certificate.IPAddresses
etcd passes 'url.URL.Host' to 'SelfCert' which contains
client, peer port. 'net.ParseIP("127.0.0.1:2379")' returns
'nil', and the client on this self-cert will see errors
of '127.0.0.1 because it doesn't contain any IP SANs'

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-05 04:30:26 -07:00
Anthony Romano
8cb5b48f58 clientv3: test dial timeout is respected when using auth 2017-04-04 14:14:23 -07:00
Anthony Romano
6538217528 clientv3: respect dial timeout when authenticating
Fixes #7627
2017-04-04 14:12:32 -07:00
Gyu-Ho Lee
e983d6b343 version: bump up to 3.1.5+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-04 14:10:15 -07:00
Gyu-Ho Lee
20490caaf0 version: bump up to 3.1.5
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-27 10:20:28 -07:00
fanmin shi
e156746959 raft: use rs.req.Entries[0].Data as the key for deletion in advance()
advance() should use rs.req.Entries[0].Data as the context instead of
req.Context for deletion. Since req.Context is never set, there won't be
any context being deleted from pendingReadIndex; results mem leak.

FIXES #7571
2017-03-24 15:51:39 -07:00
Artem Panchenko
d84bf983cc Dockerfile-release: add nsswitch.conf into image
The file '/etc/nsswitch.conf' is created in order to
take in account '/etc/hosts' entries while resolving
domain names.
2017-03-23 15:20:49 -07:00
Anthony Romano
b44c6bff9d clientv3: use waitgroup to wait for substream goroutine teardown
When a grpc watch stream is torn down, it will join on its logical substream
goroutines by waiting for each to close a channel. This doesn't guarantee
the substream is fully exited, though, but only about to exit and can be
waiting to resume even after Watch.Close finishes. Instead, use a
waitgroup.Done at the very end of the substream defer.

Fixes #7573
2017-03-23 12:26:32 -07:00
Anthony Romano
8c3c1b4a9c *: use filepath.Join for files 2017-03-23 09:53:56 -07:00
Tess Rinearson
b478387a59 wal: use path/filepath instead of path
Use the path/filepath package instead of the path package. The
path package assumes slash-separated paths, which doesn't work
on Windows. But path/filepath manipulates filename paths in a way
that's compatible across OSes.
2017-03-23 09:50:41 -07:00
Gyu-Ho Lee
dfc1f21f9d version: bump to 3.1.4+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-23 09:49:51 -07:00
Gyu-Ho Lee
41e52ebc22 version: bump to 3.1.4
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-22 09:46:23 -07:00
Xiang
7bb538d4d4 backend: add FillPercent option 2017-03-21 12:12:32 -07:00
Gyu-Ho Lee
1622782e49 integration: ensure 'StopNotify' on publish error
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-21 12:12:13 -07:00
Gyu-Ho Lee
99b47e0c1e etcdmain: handle StopNotify when ErrStopped aborted publish
Fix https://github.com/coreos/etcd/issues/7512.

If a server starts and aborts due to config error,
it is possible to get stuck in ReadyNotify waits.
This adds select case to get notified on stop channel.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-21 12:10:36 -07:00
Anthony Romano
350d0cd211 ctlv3: have "protobuf" in output help string instead of "proto"
Fixes #7538
2017-03-20 12:40:25 -07:00
Jonathan Sokolowski
72f37ff79a embed: Clear default initial cluster
NewConfig() should sets initial cluster from name but we should clear it
in the event that another discovery option has been specified.

Fixes #7516
2017-03-18 07:56:18 -07:00
Gyu-Ho Lee
3221454cab etcdserver: remove possibly compacted entry look-up
Fix https://github.com/coreos/etcd/issues/7470.

This patch removes unnecessary term look-up in
'createMergedSnapshotMessage', which can trigger panic
if raft entry at etcdProgress.appliedi got compacted
by subsequent 'MsgSnap' messages--if a follower is
being (in this case, network latency spikes) slow, it
could receive subsequent 'MsgSnap' requests from leader.

etcd server-side 'applyAll' routine and raft's Ready
processing routine becomes asynchronous after raft
entries are persisted. And given that raft Ready routine
takes less time to finish, it is possible that second
'MsgSnap' is being handled, while the slow 'applyAll'
is still processing the first(old) 'MsgSnap'. Then raft
Ready routine can compact the log entries at future
index to 'applyAll'. That is how 'createMergedSnapshotMessage'
tried to look up raft term with outdated etcdProgress.appliedi.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-18 07:56:18 -07:00
Anthony Romano
4a1bffdbc6 clientv3: close open watch channel if substream is closing on reconnect
If substream is closing but outc is still open while reconnecting, then outc
would only be closed once the watch client would connect or once the watch
client is closed. This was leading to deadlocks in the proxy tests. Instead,
close immediately if the context is canceled.

Fixes #7503
2017-03-18 07:56:18 -07:00
Anthony Romano
9d9be2bc86 ctlv3: ensure synced member list before printing env vars on member add
In cases of multiple endpoints, it's possible member add would get a its
member list from a member that has not yet recognized the membership
update. Instead, confirm that the member list response is from the
member that acked the member add or from a member that has synced
with the cluster following the member add.

Fixes #7498
2017-03-18 07:56:18 -07:00
Gyu-Ho Lee
e5462f74f1 auth: get rid of deadlocking channel passing scheme in simpleTokenTTL
Cherry-picked from 1b1fabef8f.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-18 07:56:05 -07:00
Gyu-Ho Lee
c68c1d9344 discovery: fix print format
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-17 14:21:57 -07:00
Anthony Romano
6ed56cd723 auth: nil check AuthInfo when checking admin permissions
If the context does not include auth information, get authinfo will
return a nil auth info and a nil error. This is then passed to
IsAdminPermitted, which would dereference the nil auth info.
2017-03-17 14:21:39 -07:00
Gyu-Ho Lee
a3c6f6bf81 version: bump up to 3.1.3+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-17 14:21:15 -07:00
Gyu-Ho Lee
21fdcc6443 version: bump up to 3.1.3
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-10 09:05:16 -08:00
Gyu-Ho Lee
8d122e7011 etcdmain: SdNotify when gateway, grpc-proxy are ready
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-09 11:35:20 -08:00
Gyu-Ho Lee
ade1d97893 lease: guard 'Lease.itemSet' from concurrent writes
Fix https://github.com/coreos/etcd/issues/7448.

Affected if etcd builds with Go 1.8+.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-08 14:50:06 -08:00
Boris Dudelsack
1300189581 gateway: fix the dns discovery method
strip the scheme from the endpoints to have a clean hostname for TCP proxy

Fixes #7452
2017-03-08 14:49:50 -08:00
Anthony Romano
1971517806 etcdctl: correctly batch revisions in make-mirror
Fixes #7410
2017-03-06 14:55:47 -08:00
Gyu-Ho Lee
d614bb0799 etcdmain: log machine default host after update check
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-06 14:55:31 -08:00
Gyu-Ho Lee
059dc91d4c embed: use machine default host only for default value, 0.0.0.0
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-06 14:55:24 -08:00
Gyu-Ho Lee
5fdbaee761 version: bump up to 3.1.2+git 2017-02-24 10:34:53 -08:00
Gyu-Ho Lee
714e7ec8db version: bump up to 3.1.2 2017-02-22 10:45:48 -08:00
Anthony Romano
2cdaf6d661 netutil: use ipv4 host by default
Was non-deterministic.
2017-02-22 10:45:38 -08:00
Gyu-Ho Lee
77a51e0dbf pkg/netutil: name GetDefaultInterfaces consistent 2017-02-22 10:45:29 -08:00
felixoid
d96d3aa0ed netutil: add dualstack to linux_route
in v3.1.0 netutil couldn't get default interface for ipv6only hosts

Fixes #7219
2017-02-22 10:45:19 -08:00
Anthony Romano
66e7532f57 pkg/netutil: use native byte ordering for route information
Fixes #7199
2017-02-22 10:45:07 -08:00
Anthony Romano
3eff360e79 pkg/cpuutil: add cpuutil
A package for unsafe cpu-ish things.
2017-02-22 10:44:59 -08:00
Gyu-Ho Lee
1487071966 integration: add 'TestV3HashRestart' 2017-02-22 10:39:49 -08:00
Gyu-Ho Lee
5d62bba9c7 auth: keep old revision in 'NewAuthStore'
When there's no changes yet (right after auth
store initialization), we should commit old revision.

Fix https://github.com/coreos/etcd/issues/7359.
2017-02-22 10:39:28 -08:00
Anthony Romano
114e293119 integration: test keepalives for short TTLs 2017-02-22 10:38:38 -08:00
Anthony Romano
1439955536 clientv3: do not set next keepalive time <= now+TTL 2017-02-22 10:38:28 -08:00
Anthony Romano
2c8ecc7e13 tcpproxy: don't use range variable in reactivate goroutine
Ends up trying to reactivate only the last endpoint.
2017-02-22 10:38:19 -08:00
Dylan.Wen
7b4d622a7e raft: fix read index request for #7331 2017-02-22 10:38:09 -08:00
Gyu-Ho Lee
db8abbd975 version: bump to 3.1.1+git 2017-02-21 17:00:02 -08:00
64 changed files with 1138 additions and 472 deletions

View File

@@ -32,18 +32,6 @@ matrix:
- go: tip
env: TARGET=ppc64le
addons:
apt:
packages:
- libpcap-dev
- libaspell-dev
- libhunspell-dev
before_install:
- go get -v github.com/chzchzchz/goword
- go get -v honnef.co/go/simple/cmd/gosimple
- go get -v honnef.co/go/unused/cmd/unused
# disable godep restore override
install:
- pushd cmd/etcd && go get -t -v ./... && popd

View File

@@ -5,6 +5,12 @@ ADD etcdctl /usr/local/bin/
RUN mkdir -p /var/etcd/
RUN mkdir -p /var/lib/etcd/
# Alpine Linux doesn't use pam, which means that there is no /etc/nsswitch.conf,
# but Golang relies on /etc/nsswitch.conf to check the order of DNS resolving
# (see https://github.com/golang/go/commit/9dee7771f561cf6aee081c0af6658cc81fac3918)
# To fix this we just create /etc/nsswitch.conf and add the following line:
RUN echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf
EXPOSE 2379 2380
# Define default command.

View File

@@ -21,85 +21,92 @@ import (
"crypto/rand"
"math/big"
"strings"
"sync"
"time"
)
const (
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
defaultSimpleTokenLength = 16
)
// var for testing purposes
var (
simpleTokenTTL = 5 * time.Minute
simpleTokenTTLResolution = 1 * time.Second
)
type simpleTokenTTLKeeper struct {
tokens map[string]time.Time
addSimpleTokenCh chan string
resetSimpleTokenCh chan string
deleteSimpleTokenCh chan string
stopCh chan chan struct{}
deleteTokenFunc func(string)
}
func NewSimpleTokenTTLKeeper(deletefunc func(string)) *simpleTokenTTLKeeper {
stk := &simpleTokenTTLKeeper{
tokens: make(map[string]time.Time),
addSimpleTokenCh: make(chan string, 1),
resetSimpleTokenCh: make(chan string, 1),
deleteSimpleTokenCh: make(chan string, 1),
stopCh: make(chan chan struct{}),
deleteTokenFunc: deletefunc,
}
go stk.run()
return stk
tokens map[string]time.Time
donec chan struct{}
stopc chan struct{}
deleteTokenFunc func(string)
mu *sync.Mutex
}
func (tm *simpleTokenTTLKeeper) stop() {
waitCh := make(chan struct{})
tm.stopCh <- waitCh
<-waitCh
close(tm.stopCh)
select {
case tm.stopc <- struct{}{}:
case <-tm.donec:
}
<-tm.donec
}
func (tm *simpleTokenTTLKeeper) addSimpleToken(token string) {
tm.addSimpleTokenCh <- token
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
}
func (tm *simpleTokenTTLKeeper) resetSimpleToken(token string) {
tm.resetSimpleTokenCh <- token
if _, ok := tm.tokens[token]; ok {
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
}
}
func (tm *simpleTokenTTLKeeper) deleteSimpleToken(token string) {
tm.deleteSimpleTokenCh <- token
delete(tm.tokens, token)
}
func (tm *simpleTokenTTLKeeper) run() {
tokenTicker := time.NewTicker(simpleTokenTTLResolution)
defer tokenTicker.Stop()
defer func() {
tokenTicker.Stop()
close(tm.donec)
}()
for {
select {
case t := <-tm.addSimpleTokenCh:
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
case t := <-tm.resetSimpleTokenCh:
if _, ok := tm.tokens[t]; ok {
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
}
case t := <-tm.deleteSimpleTokenCh:
delete(tm.tokens, t)
case <-tokenTicker.C:
nowtime := time.Now()
tm.mu.Lock()
for t, tokenendtime := range tm.tokens {
if nowtime.After(tokenendtime) {
tm.deleteTokenFunc(t)
delete(tm.tokens, t)
}
}
case waitCh := <-tm.stopCh:
tm.tokens = make(map[string]time.Time)
waitCh <- struct{}{}
tm.mu.Unlock()
case <-tm.stopc:
return
}
}
}
func (as *authStore) enable() {
delf := func(tk string) {
if username, ok := as.simpleTokens[tk]; ok {
plog.Infof("deleting token %s for user %s", tk, username)
delete(as.simpleTokens, tk)
}
}
as.simpleTokenKeeper = &simpleTokenTTLKeeper{
tokens: make(map[string]time.Time),
donec: make(chan struct{}),
stopc: make(chan struct{}),
deleteTokenFunc: delf,
mu: &as.simpleTokensMu,
}
go as.simpleTokenKeeper.run()
}
func (as *authStore) GenSimpleToken() (string, error) {
ret := make([]byte, defaultSimpleTokenLength)
@@ -117,7 +124,6 @@ func (as *authStore) GenSimpleToken() (string, error) {
func (as *authStore) assignSimpleTokenToUser(username, token string) {
as.simpleTokensMu.Lock()
_, ok := as.simpleTokens[token]
if ok {
plog.Panicf("token %s is alredy used", token)
@@ -129,13 +135,15 @@ func (as *authStore) assignSimpleTokenToUser(username, token string) {
}
func (as *authStore) invalidateUser(username string) {
if as.simpleTokenKeeper == nil {
return
}
as.simpleTokensMu.Lock()
defer as.simpleTokensMu.Unlock()
for token, name := range as.simpleTokens {
if strings.Compare(name, username) == 0 {
delete(as.simpleTokens, token)
as.simpleTokenKeeper.deleteSimpleToken(token)
}
}
as.simpleTokensMu.Unlock()
}

View File

@@ -168,13 +168,13 @@ type authStore struct {
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
simpleTokensMu sync.RWMutex
simpleTokens map[string]string // token -> username
simpleTokenKeeper *simpleTokenTTLKeeper
revision uint64
indexWaiter func(uint64) <-chan struct{}
// tokenSimple in v3.2+
indexWaiter func(uint64) <-chan struct{}
simpleTokenKeeper *simpleTokenTTLKeeper
simpleTokensMu sync.Mutex
simpleTokens map[string]string // token -> username
}
func newDeleterFunc(as *authStore) func(string) {
@@ -215,8 +215,7 @@ func (as *authStore) AuthEnable() error {
tx.UnsafePut(authBucketName, enableFlagKey, authEnabled)
as.enabled = true
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(newDeleterFunc(as))
as.enable()
as.rangePermCache = make(map[string]*unifiedRangePermissions)
@@ -244,11 +243,12 @@ func (as *authStore) AuthDisable() {
as.enabled = false
as.simpleTokensMu.Lock()
tk := as.simpleTokenKeeper
as.simpleTokenKeeper = nil
as.simpleTokens = make(map[string]string) // invalidate all tokens
as.simpleTokensMu.Unlock()
if as.simpleTokenKeeper != nil {
as.simpleTokenKeeper.stop()
as.simpleTokenKeeper = nil
if tk != nil {
tk.stop()
}
plog.Noticef("Authentication disabled")
@@ -646,13 +646,14 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
}
func (as *authStore) AuthInfoFromToken(token string) (*AuthInfo, bool) {
as.simpleTokensMu.RLock()
defer as.simpleTokensMu.RUnlock()
t, ok := as.simpleTokens[token]
if ok {
// same as '(t *tokenSimple) info' in v3.2+
as.simpleTokensMu.Lock()
username, ok := as.simpleTokens[token]
if ok && as.simpleTokenKeeper != nil {
as.simpleTokenKeeper.resetSimpleToken(token)
}
return &AuthInfo{Username: t, Revision: as.revision}, ok
as.simpleTokensMu.Unlock()
return &AuthInfo{Username: username, Revision: as.revision}, ok
}
type permSlice []*authpb.Permission
@@ -764,6 +765,9 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
if !as.isAuthEnabled() {
return nil
}
if authInfo == nil {
return ErrUserEmpty
}
tx := as.be.BatchTx()
tx.Lock()
@@ -908,10 +912,12 @@ func NewAuthStore(be backend.Backend, indexWaiter func(uint64) <-chan struct{})
}
if enabled {
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(newDeleterFunc(as))
as.enable()
}
as.commitRevision(tx)
if as.revision == 0 {
as.commitRevision(tx)
}
tx.Unlock()
be.ForceCommit()

View File

@@ -34,31 +34,30 @@ func dummyIndexWaiter(index uint64) <-chan struct{} {
return ch
}
func TestUserAdd(t *testing.T) {
// TestNewAuthStoreRevision ensures newly auth store
// keeps the old revision when there are no changes.
func TestNewAuthStoreRevision(t *testing.T) {
b, tPath := backend.NewDefaultTmpBackend()
defer func() {
b.Close()
os.Remove(tPath)
}()
defer os.Remove(tPath)
as := NewAuthStore(b, dummyIndexWaiter)
ua := &pb.AuthUserAddRequest{Name: "foo"}
_, err := as.UserAdd(ua) // add a non-existing user
err := enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
}
_, err = as.UserAdd(ua) // add an existing user
if err == nil {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
}
if err != ErrUserAlreadyExist {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
}
old := as.Revision()
b.Close()
as.Close()
ua = &pb.AuthUserAddRequest{Name: ""}
_, err = as.UserAdd(ua) // add a user with empty name
if err != ErrUserEmpty {
t.Fatal(err)
// no changes to commit
b2 := backend.NewDefaultBackend(tPath)
as = NewAuthStore(b2, dummyIndexWaiter)
new := as.Revision()
b2.Close()
as.Close()
if old != new {
t.Fatalf("expected revision %d, got %d", old, new)
}
}

View File

@@ -282,8 +282,16 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo
tokenMu: &sync.RWMutex{},
}
err := c.getToken(context.TODO())
if err != nil {
ctx := c.ctx
if c.cfg.DialTimeout > 0 {
cctx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
defer cancel()
ctx = cctx
}
if err := c.getToken(ctx); err != nil {
if err == ctx.Err() && ctx.Err() != c.ctx.Err() {
err = grpc.ErrClientConnTimeout
}
return nil, err
}
@@ -335,6 +343,8 @@ func newClient(cfg *Config) (*Client, error) {
client.balancer = newSimpleBalancer(cfg.Endpoints)
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
if err != nil {
client.cancel()
client.balancer.Close()
return nil, err
}
client.conn = conn
@@ -353,6 +363,7 @@ func newClient(cfg *Config) (*Client, error) {
}
if !hasConn {
client.cancel()
client.balancer.Close()
conn.Close()
return nil, grpc.ErrClientConnTimeout
}

View File

@@ -70,33 +70,45 @@ func TestDialCancel(t *testing.T) {
func TestDialTimeout(t *testing.T) {
defer testutil.AfterTest(t)
donec := make(chan error)
go func() {
// without timeout, grpc keeps redialing if connection refused
cfg := Config{
Endpoints: []string{"localhost:12345"},
DialTimeout: 2 * time.Second}
c, err := New(cfg)
if c != nil || err == nil {
t.Errorf("new client should fail")
}
donec <- err
}()
time.Sleep(10 * time.Millisecond)
select {
case err := <-donec:
t.Errorf("dial didn't wait (%v)", err)
default:
testCfgs := []Config{
{
Endpoints: []string{"http://254.0.0.1:12345"},
DialTimeout: 2 * time.Second,
},
{
Endpoints: []string{"http://254.0.0.1:12345"},
DialTimeout: time.Second,
Username: "abc",
Password: "def",
},
}
select {
case <-time.After(5 * time.Second):
t.Errorf("failed to timeout dial on time")
case err := <-donec:
if err != grpc.ErrClientConnTimeout {
t.Errorf("unexpected error %v, want %v", err, grpc.ErrClientConnTimeout)
for i, cfg := range testCfgs {
donec := make(chan error)
go func() {
// without timeout, dial continues forever on ipv4 blackhole
c, err := New(cfg)
if c != nil || err == nil {
t.Errorf("#%d: new client should fail", i)
}
donec <- err
}()
time.Sleep(10 * time.Millisecond)
select {
case err := <-donec:
t.Errorf("#%d: dial didn't wait (%v)", i, err)
default:
}
select {
case <-time.After(5 * time.Second):
t.Errorf("#%d: failed to timeout dial on time", i)
case err := <-donec:
if err != grpc.ErrClientConnTimeout {
t.Errorf("#%d: unexpected error %v, want %v", i, err, grpc.ErrClientConnTimeout)
}
}
}
}

View File

@@ -156,6 +156,30 @@ func TestLeaseKeepAlive(t *testing.T) {
}
}
func TestLeaseKeepAliveOneSecond(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
resp, err := cli.Grant(context.Background(), 1)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
for i := 0; i < 3; i++ {
if _, ok := <-rc; !ok {
t.Errorf("chan is closed, want not closed")
}
}
}
// TODO: add a client that can connect to all the members of cluster via unix sock.
// TODO: test handle more complicated failures.
func TestLeaseKeepAliveHandleFailure(t *testing.T) {

View File

@@ -416,7 +416,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
}
// send update to all channels
nextKeepAlive := time.Now().Add(time.Duration(karesp.TTL+2) / 3 * time.Second)
nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
for _, ch := range ka.chs {
select {

View File

@@ -132,6 +132,8 @@ type watchGrpcStream struct {
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// wg is Done when all substream goroutines have exited
wg sync.WaitGroup
// resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
@@ -406,7 +408,7 @@ func (w *watchGrpcStream) run() {
for range closing {
w.closeSubstream(<-w.closingc)
}
w.wg.Wait()
w.owner.closeStream(w)
}()
@@ -431,6 +433,7 @@ func (w *watchGrpcStream) run() {
}
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume
@@ -576,6 +579,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
if !resuming {
w.closingc <- ws
}
w.wg.Done()
}()
emptyWr := &WatchResponse{}
@@ -674,6 +678,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
continue
}
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
}
@@ -694,6 +699,10 @@ func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan str
go func(ws *watcherStream) {
defer wg.Done()
if ws.closing {
if ws.initReq.ctx.Err() != nil && ws.outc != nil {
close(ws.outc)
ws.outc = nil
}
return
}
select {

View File

@@ -74,7 +74,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
shortHost := strings.TrimSuffix(srv.Target, ".")
urlHost := net.JoinHostPort(shortHost, port)
stringParts = append(stringParts, fmt.Sprintf("%s=%s://%s", n, scheme, urlHost))
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, scheme, urlHost)
plog.Noticef("got bootstrap from DNS for %s at %s://%s", service, scheme, urlHost)
if ok && url.Scheme != scheme {
plog.Errorf("bootstrap at %s from DNS for %s has scheme mismatch with expected peer %s", scheme+"://"+urlHost, service, url.String())
}

View File

@@ -55,20 +55,12 @@ var (
DefaultInitialAdvertisePeerURLs = "http://localhost:2380"
DefaultAdvertiseClientURLs = "http://localhost:2379"
defaultHostname string = "localhost"
defaultHostname string
defaultHostStatus error
)
func init() {
ip, err := netutil.GetDefaultHost()
if err != nil {
defaultHostStatus = err
return
}
// found default host, advertise on it
DefaultInitialAdvertisePeerURLs = "http://" + ip + ":2380"
DefaultAdvertiseClientURLs = "http://" + ip + ":2379"
defaultHostname = ip
defaultHostname, defaultHostStatus = netutil.GetDefaultHost()
}
// Config holds the arguments for configuring an etcd server.
@@ -237,6 +229,9 @@ func (cfg *configYAML) configFromFile(path string) error {
cfg.ACUrls = []url.URL(u)
}
if (cfg.Durl != "" || cfg.DNSCluster != "") && cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) {
cfg.InitialCluster = ""
}
if cfg.ClusterState == "" {
cfg.ClusterState = ClusterStateFlagNew
}
@@ -346,34 +341,52 @@ func (cfg Config) InitialClusterFromName(name string) (ret string) {
func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew }
func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
// IsDefaultHost returns the default hostname, if used, and the error, if any,
// from getting the machine's default host.
func (cfg Config) IsDefaultHost() (string, error) {
if len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs {
return defaultHostname, defaultHostStatus
}
if len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs {
return defaultHostname, defaultHostStatus
}
return "", defaultHostStatus
func (cfg Config) defaultPeerHost() bool {
return len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs
}
// UpdateDefaultClusterFromName updates cluster advertise URLs with default host.
func (cfg Config) defaultClientHost() bool {
return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs
}
// UpdateDefaultClusterFromName updates cluster advertise URLs with, if available, default host,
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
// e.g. advertise peer URL localhost:2380 or listen peer URL 0.0.0.0:2380
// then the advertise peer host would be updated with machine's default host,
// while keeping the listen URL's port.
// User can work around this by explicitly setting URL with 127.0.0.1.
// It returns the default hostname, if used, and the error, if any, from getting the machine's default host.
// TODO: check whether fields are set instead of whether fields have default value
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) {
defaultHost, defaultHostErr := cfg.IsDefaultHost()
defaultHostOverride := defaultHost == "" || defaultHostErr == nil
if (defaultHostOverride || cfg.Name != DefaultName) && cfg.InitialCluster == defaultInitialCluster {
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
ip, _, _ := net.SplitHostPort(cfg.LCUrls[0].Host)
// if client-listen-url is 0.0.0.0, just use detected default host
// otherwise, rewrite advertise-client-url with localhost
if ip != "0.0.0.0" {
_, acPort, _ := net.SplitHostPort(cfg.ACUrls[0].Host)
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("localhost:%s", acPort)}
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (string, error) {
if defaultHostname == "" || defaultHostStatus != nil {
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
}
return "", defaultHostStatus
}
used := false
pip, pport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
if cfg.defaultPeerHost() && pip == "0.0.0.0" {
cfg.APUrls[0] = url.URL{Scheme: cfg.APUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)}
used = true
}
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
}
cip, cport, _ := net.SplitHostPort(cfg.LCUrls[0].Host)
if cfg.defaultClientHost() && cip == "0.0.0.0" {
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)}
used = true
}
dhost := defaultHostname
if !used {
dhost = ""
}
return dhost, defaultHostStatus
}
// checkBindURLs returns an error if any URL uses a domain name.

View File

@@ -15,11 +15,15 @@
package embed
import (
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"testing"
"github.com/coreos/etcd/pkg/transport"
"github.com/ghodss/yaml"
)
@@ -61,6 +65,70 @@ func TestConfigFileOtherFields(t *testing.T) {
}
}
// TestUpdateDefaultClusterFromName ensures that etcd can start with 'etcd --name=abc'.
func TestUpdateDefaultClusterFromName(t *testing.T) {
cfg := NewConfig()
defaultInitialCluster := cfg.InitialCluster
oldscheme := cfg.APUrls[0].Scheme
origpeer := cfg.APUrls[0].String()
origadvc := cfg.ACUrls[0].String()
cfg.Name = "abc"
_, lpport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
// in case of 'etcd --name=abc'
exp := fmt.Sprintf("%s=%s://localhost:%s", cfg.Name, oldscheme, lpport)
cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
if exp != cfg.InitialCluster {
t.Fatalf("initial-cluster expected %q, got %q", exp, cfg.InitialCluster)
}
// advertise peer URL should not be affected
if origpeer != cfg.APUrls[0].String() {
t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.APUrls[0].String())
}
// advertise client URL should not be affected
if origadvc != cfg.ACUrls[0].String() {
t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
}
}
// TestUpdateDefaultClusterFromNameOverwrite ensures that machine's default host is only used
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) {
if defaultHostname == "" {
t.Skip("machine's default host not found")
}
cfg := NewConfig()
defaultInitialCluster := cfg.InitialCluster
oldscheme := cfg.APUrls[0].Scheme
origadvc := cfg.ACUrls[0].String()
cfg.Name = "abc"
_, lpport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
cfg.LPUrls[0] = url.URL{Scheme: cfg.LPUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)}
dhost, _ := cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
if dhost != defaultHostname {
t.Fatalf("expected default host %q, got %q", defaultHostname, dhost)
}
aphost, apport, _ := net.SplitHostPort(cfg.APUrls[0].Host)
if apport != lpport {
t.Fatalf("advertise peer url got different port %s, expected %s", apport, lpport)
}
if aphost != defaultHostname {
t.Fatalf("advertise peer url expected machine default host %q, got %q", defaultHostname, aphost)
}
expected := fmt.Sprintf("%s=%s://%s:%s", cfg.Name, oldscheme, defaultHostname, lpport)
if expected != cfg.InitialCluster {
t.Fatalf("initial-cluster expected %q, got %q", expected, cfg.InitialCluster)
}
// advertise client URL should not be affected
if origadvc != cfg.ACUrls[0].String() {
t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
}
}
func (s *securityConfig) equals(t *transport.TLSInfo) bool {
return s.CAFile == t.CAFile &&
s.CertFile == t.CertFile &&

View File

@@ -19,7 +19,7 @@ import (
"fmt"
"net"
"net/http"
"path"
"path/filepath"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http"
@@ -166,7 +166,7 @@ func startPeerListeners(cfg *Config) (plns []net.Listener, err error) {
for i, u := range cfg.LPUrls {
phosts[i] = u.Host
}
cfg.PeerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts)
cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts)
if err != nil {
plog.Fatalf("could not get certs (%v)", err)
}
@@ -221,7 +221,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
for i, u := range cfg.LCUrls {
chosts[i] = u.Host
}
cfg.ClientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts)
cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts)
if err != nil {
plog.Fatalf("could not get certs (%v)", err)
}

View File

@@ -15,7 +15,7 @@
package embed
import (
"path"
"path/filepath"
"github.com/coreos/etcd/wal"
)
@@ -23,7 +23,7 @@ import (
func isMemberInitialized(cfg *Config) bool {
waldir := cfg.WalDir
if waldir == "" {
waldir = path.Join(cfg.Dir, "member", "wal")
waldir = filepath.Join(cfg.Dir, "member", "wal")
}
return wal.Exist(waldir)

View File

@@ -17,7 +17,7 @@ package command
import (
"fmt"
"log"
"path"
"path/filepath"
"time"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -50,19 +50,19 @@ func handleBackup(c *cli.Context) error {
var srcWAL string
var destWAL string
srcSnap := path.Join(c.String("data-dir"), "member", "snap")
destSnap := path.Join(c.String("backup-dir"), "member", "snap")
srcSnap := filepath.Join(c.String("data-dir"), "member", "snap")
destSnap := filepath.Join(c.String("backup-dir"), "member", "snap")
if c.String("wal-dir") != "" {
srcWAL = c.String("wal-dir")
} else {
srcWAL = path.Join(c.String("data-dir"), "member", "wal")
srcWAL = filepath.Join(c.String("data-dir"), "member", "wal")
}
if c.String("backup-wal-dir") != "" {
destWAL = c.String("backup-wal-dir")
} else {
destWAL = path.Join(c.String("backup-dir"), "member", "wal")
destWAL = filepath.Join(c.String("backup-dir"), "member", "wal")
}
if err := fileutil.CreateDirAll(destSnap); err != nil {

View File

@@ -125,18 +125,19 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
return rpctypes.ErrCompacted
}
var rev int64
var lastRev int64
ops := []clientv3.Op{}
for _, ev := range wr.Events {
nrev := ev.Kv.ModRevision
if rev != 0 && nrev > rev {
nextRev := ev.Kv.ModRevision
if lastRev != 0 && nextRev > lastRev {
_, err := dc.Txn(ctx).Then(ops...).Commit()
if err != nil {
return err
}
ops = []clientv3.Op{}
}
lastRev = nextRev
switch ev.Type {
case mvccpb.PUT:
ops = append(ops, clientv3.OpPut(modifyPrefix(string(ev.Kv.Key)), string(ev.Kv.Value)))

View File

@@ -107,7 +107,8 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
urls := strings.Split(memberPeerURLs, ",")
ctx, cancel := commandCtx(cmd)
resp, err := mustClientFromCmd(cmd).MemberAdd(ctx, urls)
cli := mustClientFromCmd(cmd)
resp, err := cli.MemberAdd(ctx, urls)
cancel()
if err != nil {
ExitWithError(ExitError, err)
@@ -118,12 +119,24 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
if _, ok := (display).(*simplePrinter); ok {
ctx, cancel = commandCtx(cmd)
listResp, err := mustClientFromCmd(cmd).MemberList(ctx)
cancel()
if err != nil {
ExitWithError(ExitError, err)
listResp, err := cli.MemberList(ctx)
// get latest member list; if there's failover new member might have outdated list
for {
if err != nil {
ExitWithError(ExitError, err)
}
if listResp.Header.MemberId == resp.Header.MemberId {
break
}
// quorum get to sync cluster list
gresp, gerr := cli.Get(ctx, "_")
if gerr != nil {
ExitWithError(ExitError, err)
}
resp.Header.MemberId = gresp.Header.MemberId
listResp, err = cli.MemberList(ctx)
}
cancel()
conf := []string{}
for _, memb := range listResp.Members {

View File

@@ -21,7 +21,7 @@ import (
"io"
"os"
"os/exec"
"path"
"path/filepath"
"time"
"github.com/coreos/etcd/client"
@@ -103,7 +103,7 @@ func prepareBackend() backend.Backend {
var be backend.Backend
bch := make(chan struct{})
dbpath := path.Join(migrateDatadir, "member", "snap", "db")
dbpath := filepath.Join(migrateDatadir, "member", "snap", "db")
go func() {
defer close(bch)
be = backend.New(dbpath, time.Second, 10000)
@@ -130,9 +130,9 @@ func rebuildStoreV2() (store.Store, uint64) {
waldir := migrateWALdir
if len(waldir) == 0 {
waldir = path.Join(migrateDatadir, "member", "wal")
waldir = filepath.Join(migrateDatadir, "member", "wal")
}
snapdir := path.Join(migrateDatadir, "member", "snap")
snapdir := filepath.Join(migrateDatadir, "member", "snap")
ss := snap.New(snapdir)
snapshot, err := ss.Load()

View File

@@ -30,7 +30,7 @@ func (p *fieldsPrinter) kv(pfx string, kv *spb.KeyValue) {
fmt.Printf("\"%sModRevision\" : %d\n", pfx, kv.ModRevision)
fmt.Printf("\"%sVersion\" : %d\n", pfx, kv.Version)
fmt.Printf("\"%sValue\" : %q\n", pfx, string(kv.Value))
fmt.Printf("\"%sLease\" : %d\n", pfx, string(kv.Lease))
fmt.Printf("\"%sLease\" : %d\n", pfx, kv.Lease)
}
func (p *fieldsPrinter) hdr(h *pb.ResponseHeader) {

View File

@@ -23,7 +23,7 @@ import (
"io"
"math"
"os"
"path"
"path/filepath"
"reflect"
"strings"
@@ -186,8 +186,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
basedir = restoreName + ".etcd"
}
waldir := path.Join(basedir, "member", "wal")
snapdir := path.Join(basedir, "member", "snap")
waldir := filepath.Join(basedir, "member", "wal")
snapdir := filepath.Join(basedir, "member", "snap")
if _, err := os.Stat(basedir); err == nil {
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
@@ -325,7 +325,7 @@ func makeDB(snapdir, dbfile string, commit int) {
ExitWithError(ExitIO, err)
}
dbpath := path.Join(snapdir, "db")
dbpath := filepath.Join(snapdir, "db")
db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
if dberr != nil {
ExitWithError(ExitIO, dberr)

View File

@@ -45,7 +45,7 @@ var (
func init() {
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, proto, simple, table)")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")
rootCmd.PersistentFlags().DurationVar(&globalFlags.DialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections")

View File

@@ -22,7 +22,7 @@ import (
"net"
"net/http"
"os"
"path"
"path/filepath"
"reflect"
"runtime"
"strings"
@@ -39,8 +39,6 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/proxy/httpproxy"
"github.com/coreos/etcd/version"
"github.com/coreos/go-systemd/daemon"
systemdutil "github.com/coreos/go-systemd/util"
"github.com/coreos/pkg/capnslog"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
@@ -85,7 +83,13 @@ func startEtcdOrProxyV2() {
GoMaxProcs := runtime.GOMAXPROCS(0)
plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU())
(&cfg.Config).UpdateDefaultClusterFromName(defaultInitialCluster)
defaultHost, dhErr := (&cfg.Config).UpdateDefaultClusterFromName(defaultInitialCluster)
if defaultHost != "" {
plog.Infof("advertising using detected default host %q", defaultHost)
}
if dhErr != nil {
plog.Noticef("failed to detect default host (%v)", dhErr)
}
if cfg.Dir == "" {
cfg.Dir = fmt.Sprintf("%v.etcd", cfg.Name)
@@ -157,20 +161,12 @@ func startEtcdOrProxyV2() {
osutil.HandleInterrupts()
if systemdutil.IsRunningSystemd() {
// At this point, the initialization of etcd is done.
// The listeners are listening on the TCP ports and ready
// for accepting connections. The etcd instance should be
// joined with the cluster and ready to serve incoming
// connections.
sent, err := daemon.SdNotify(false, "READY=1")
if err != nil {
plog.Errorf("failed to notify systemd for readiness: %v", err)
}
if !sent {
plog.Errorf("forgot to set Type=notify in systemd service file?")
}
}
// At this point, the initialization of etcd is done.
// The listeners are listening on the TCP ports and ready
// for accepting connections. The etcd instance should be
// joined with the cluster and ready to serve incoming
// connections.
notifySystemd()
select {
case lerr := <-errc:
@@ -184,15 +180,6 @@ func startEtcdOrProxyV2() {
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
defaultHost, dhErr := cfg.IsDefaultHost()
if defaultHost != "" {
if dhErr == nil {
plog.Infof("advertising using detected default host %q", defaultHost)
} else {
plog.Noticef("failed to detect default host, advertise falling back to %q (%v)", defaultHost, dhErr)
}
}
if cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()
}
@@ -202,7 +189,10 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
return nil, nil, err
}
osutil.RegisterInterruptHandler(e.Server.Stop)
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
}
return e.Server.StopNotify(), e.Err(), nil
}
@@ -221,14 +211,14 @@ func startProxy(cfg *config) error {
return err
}
cfg.Dir = path.Join(cfg.Dir, "proxy")
cfg.Dir = filepath.Join(cfg.Dir, "proxy")
err = os.MkdirAll(cfg.Dir, fileutil.PrivateDirMode)
if err != nil {
return err
}
var peerURLs []string
clusterfile := path.Join(cfg.Dir, "cluster")
clusterfile := filepath.Join(cfg.Dir, "cluster")
b, err := ioutil.ReadFile(clusterfile)
switch {

View File

@@ -17,12 +17,14 @@ package etcdmain
import (
"fmt"
"net"
"net/url"
"os"
"time"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/proxy/tcpproxy"
"github.com/spf13/cobra"
)
@@ -77,6 +79,20 @@ func newGatewayStartCommand() *cobra.Command {
return &cmd
}
func stripSchema(eps []string) []string {
var endpoints []string
for _, ep := range eps {
if u, err := url.Parse(ep); err == nil && u.Host != "" {
ep = u.Host
}
endpoints = append(endpoints, ep)
}
return endpoints
}
func startGateway(cmd *cobra.Command, args []string) {
endpoints := gatewayEndpoints
if gatewayDNSCluster != "" {
@@ -101,6 +117,9 @@ func startGateway(cmd *cobra.Command, args []string) {
}
}
// Strip the schema from the endpoints because we start just a TCP proxy
endpoints = stripSchema(endpoints)
if len(endpoints) == 0 {
plog.Fatalf("no endpoints found")
}
@@ -117,5 +136,8 @@ func startGateway(cmd *cobra.Command, args []string) {
MonitorInterval: getewayRetryDelay,
}
// At this point, etcd gateway listener is initialized
notifySystemd()
tp.Run()
}

View File

@@ -144,6 +144,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
go func() { errc <- m.Serve() }()
// grpc-proxy is initialized, ready to serve
notifySystemd()
fmt.Fprintln(os.Stderr, <-errc)
os.Exit(1)
}

View File

@@ -17,6 +17,9 @@ package etcdmain
import (
"fmt"
"os"
"github.com/coreos/go-systemd/daemon"
systemdutil "github.com/coreos/go-systemd/util"
)
func Main() {
@@ -35,3 +38,16 @@ func Main() {
startEtcdOrProxyV2()
}
func notifySystemd() {
if !systemdutil.IsRunningSystemd() {
return
}
sent, err := daemon.SdNotify(false, "READY=1")
if err != nil {
plog.Errorf("failed to notify systemd for readiness: %v", err)
}
if !sent {
plog.Errorf("forgot to set Type=notify in systemd service file?")
}
}

View File

@@ -185,9 +185,5 @@ func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (
}
func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}
return ams.maintenanceServer.Status(ctx, ar)
}

View File

@@ -520,15 +520,14 @@ func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantR
if err == nil {
resp.ID = int64(l.ID)
resp.TTL = l.TTL()
resp.Header = &pb.ResponseHeader{Revision: a.s.KV().Rev()}
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
return &pb.LeaseRevokeResponse{Header: &pb.ResponseHeader{Revision: a.s.KV().Rev()}}, err
return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
}
func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
@@ -609,69 +608,125 @@ func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
if err != nil {
return nil, err
}
return &pb.AuthEnableResponse{}, nil
return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil
}
func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
a.s.AuthStore().AuthDisable()
return &pb.AuthDisableResponse{}, nil
return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil
}
func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
ctx := context.WithValue(context.WithValue(context.TODO(), "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken)
return a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
ctx := context.WithValue(context.WithValue(context.Background(), "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken)
resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
return a.s.AuthStore().UserAdd(r)
resp, err := a.s.AuthStore().UserAdd(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
return a.s.AuthStore().UserDelete(r)
resp, err := a.s.AuthStore().UserDelete(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
return a.s.AuthStore().UserChangePassword(r)
resp, err := a.s.AuthStore().UserChangePassword(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
return a.s.AuthStore().UserGrantRole(r)
resp, err := a.s.AuthStore().UserGrantRole(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
return a.s.AuthStore().UserGet(r)
resp, err := a.s.AuthStore().UserGet(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
return a.s.AuthStore().UserRevokeRole(r)
resp, err := a.s.AuthStore().UserRevokeRole(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
return a.s.AuthStore().RoleAdd(r)
resp, err := a.s.AuthStore().RoleAdd(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
return a.s.AuthStore().RoleGrantPermission(r)
resp, err := a.s.AuthStore().RoleGrantPermission(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
return a.s.AuthStore().RoleGet(r)
resp, err := a.s.AuthStore().RoleGet(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
return a.s.AuthStore().RoleRevokePermission(r)
resp, err := a.s.AuthStore().RoleRevokePermission(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
return a.s.AuthStore().RoleDelete(r)
resp, err := a.s.AuthStore().RoleDelete(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
return a.s.AuthStore().UserList(r)
resp, err := a.s.AuthStore().UserList(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
return a.s.AuthStore().RoleList(r)
resp, err := a.s.AuthStore().RoleList(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
type quotaApplierV3 struct {
@@ -836,3 +891,12 @@ func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
}
rr.KVs = rr.KVs[:j]
}
func newHeader(s *EtcdServer) *pb.ResponseHeader {
return &pb.ResponseHeader{
ClusterId: uint64(s.Cluster().ID()),
MemberId: uint64(s.ID()),
Revision: s.KV().Rev(),
RaftTerm: s.Term(),
}
}

View File

@@ -16,7 +16,7 @@ package etcdserver
import (
"fmt"
"path"
"path/filepath"
"sort"
"strings"
"time"
@@ -118,16 +118,16 @@ func (c *ServerConfig) advertiseMatchesCluster() error {
return nil
}
func (c *ServerConfig) MemberDir() string { return path.Join(c.DataDir, "member") }
func (c *ServerConfig) MemberDir() string { return filepath.Join(c.DataDir, "member") }
func (c *ServerConfig) WALDir() string {
if c.DedicatedWALDir != "" {
return c.DedicatedWALDir
}
return path.Join(c.MemberDir(), "wal")
return filepath.Join(c.MemberDir(), "wal")
}
func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") }
func (c *ServerConfig) SnapDir() string { return filepath.Join(c.MemberDir(), "snap") }
func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }

View File

@@ -23,6 +23,7 @@ import (
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"sync"
"sync/atomic"
@@ -263,7 +264,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
ss := snap.New(cfg.SnapDir())
bepath := path.Join(cfg.SnapDir(), databaseFilename)
bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
beExist := fileutil.Exist(bepath)
var be backend.Backend
@@ -594,6 +595,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
}
@@ -666,6 +668,7 @@ func (s *EtcdServer) run() {
ep := etcdProgress{
confState: snap.Metadata.ConfState,
snapi: snap.Metadata.Index,
appliedt: snap.Metadata.Term,
appliedi: snap.Metadata.Index,
}
@@ -765,7 +768,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
}
@@ -789,7 +792,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
plog.Panicf("get database snapshot file path error: %v", err)
}
fn := path.Join(s.Cfg.SnapDir(), databaseFilename)
fn := filepath.Join(s.Cfg.SnapDir(), databaseFilename)
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
@@ -867,6 +870,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}
plog.Info("finished adding peers from new cluster configuration into network...")
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
@@ -888,7 +892,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
return
}
var shouldstop bool
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
@@ -1242,9 +1246,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
// apply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer.
// The given entries should not be empty.
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
var applied uint64
var shouldstop bool
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
for i := range es {
e := es[i]
switch e.Type {
@@ -1254,16 +1256,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState)
shouldstop = shouldstop || removedSelf
shouldStop = shouldStop || removedSelf
s.w.Trigger(cc.ID, err)
default:
plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
}
atomic.StoreUint64(&s.r.index, e.Index)
atomic.StoreUint64(&s.r.term, e.Term)
applied = e.Index
appliedt = e.Term
appliedi = e.Index
}
return applied, shouldstop
return appliedt, appliedi, shouldStop
}
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer

View File

@@ -613,7 +613,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
ents = append(ents, ent)
}
_, shouldStop := srv.apply(ents, &raftpb.ConfState{})
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
if !shouldStop {
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
}

View File

@@ -16,7 +16,6 @@ package etcdserver
import (
"io"
"log"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
@@ -26,12 +25,7 @@ import (
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
// as ReadCloser.
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
snapt, err := s.r.raftStorage.Term(snapi)
if err != nil {
log.Panicf("get term should never fail: %v", err)
}
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
// get a snapshot of v2 store as []byte
clone := s.store.Clone()
d, err := clone.SaveNoCopy()

View File

@@ -449,6 +449,8 @@ type member struct {
grpcServer *grpc.Server
grpcAddr string
grpcBridge *bridge
keepDataDirTerminate bool
}
func (m *member) GRPCAddr() string { return m.grpcAddr }
@@ -746,8 +748,10 @@ func (m *member) Restart(t *testing.T) error {
func (m *member) Terminate(t *testing.T) {
plog.Printf("terminating %s (%s)", m.Name, m.grpcAddr)
m.Close()
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
if !m.keepDataDirTerminate {
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
}
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
}

View File

@@ -27,6 +27,7 @@ import (
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/pkg/capnslog"
"golang.org/x/net/context"
)
@@ -441,6 +442,51 @@ func TestRejectUnhealthyRemove(t *testing.T) {
}
}
// TestRestartRemoved ensures that restarting removed member must exit
// if 'initial-cluster-state' is set 'new' and old data directory still exists
// (see https://github.com/coreos/etcd/issues/7512 for more).
func TestRestartRemoved(t *testing.T) {
defer testutil.AfterTest(t)
capnslog.SetGlobalLogLevel(capnslog.INFO)
// 1. start single-member cluster
c := NewCluster(t, 1)
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
c.Launch(t)
defer c.Terminate(t)
// 2. add a new member
c.AddMember(t)
c.WaitLeader(t)
oldm := c.Members[0]
oldm.keepDataDirTerminate = true
// 3. remove first member, shut down without deleting data
if err := c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
t.Fatalf("expected to remove member, got error %v", err)
}
c.WaitLeader(t)
// 4. restart first member with 'initial-cluster-state=new'
// wrong config, expects exit within ReqTimeout
oldm.ServerConfig.NewCluster = false
if err := oldm.Restart(t); err != nil {
t.Fatalf("unexpected ForceRestart error: %v", err)
}
defer func() {
oldm.Close()
os.RemoveAll(oldm.ServerConfig.DataDir)
}()
select {
case <-oldm.s.StopNotify():
case <-time.After(time.Minute):
t.Fatalf("removed member didn't exit within %v", time.Minute)
}
}
// clusterMustProgress ensures that cluster can make progress. It creates
// a random key first, and check the new key could be got from all client urls
// of the cluster.

View File

@@ -18,7 +18,7 @@ import (
"fmt"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"testing"
@@ -58,7 +58,7 @@ func TestEmbedEtcd(t *testing.T) {
setupEmbedCfg(&tests[5].cfg, []url.URL{urls[4]}, []url.URL{urls[5], urls[6]})
setupEmbedCfg(&tests[6].cfg, []url.URL{urls[7], urls[8]}, []url.URL{urls[9]})
dir := path.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
dir := filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
os.RemoveAll(dir)
defer os.RemoveAll(dir)

View File

@@ -20,6 +20,7 @@ import (
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"
@@ -35,23 +36,85 @@ func TestV3AuthEmptyUserGet(t *testing.T) {
defer cancel()
api := toGRPC(clus.Client(0))
auth := api.Auth
if _, err := auth.UserAdd(ctx, &pb.AuthUserAddRequest{Name: "root", Password: "123"}); err != nil {
t.Fatal(err)
}
if _, err := auth.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: "root"}); err != nil {
t.Fatal(err)
}
if _, err := auth.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: "root", Role: "root"}); err != nil {
t.Fatal(err)
}
if _, err := auth.AuthEnable(ctx, &pb.AuthEnableRequest{}); err != nil {
t.Fatal(err)
}
authSetupRoot(t, api.Auth)
_, err := api.KV.Range(ctx, &pb.RangeRequest{Key: []byte("abc")})
if !eqErrGRPC(err, rpctypes.ErrUserEmpty) {
t.Fatalf("got %v, expected %v", err, rpctypes.ErrUserEmpty)
}
}
// TestV3AuthTokenWithDisable tests that auth won't crash if
// given a valid token when authentication is disabled
func TestV3AuthTokenWithDisable(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
c, cerr := clientv3.New(clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"})
if cerr != nil {
t.Fatal(cerr)
}
defer c.Close()
rctx, cancel := context.WithCancel(context.TODO())
donec := make(chan struct{})
go func() {
defer close(donec)
for rctx.Err() == nil {
c.Put(rctx, "abc", "def")
}
}()
time.Sleep(10 * time.Millisecond)
if _, err := c.AuthDisable(context.TODO()); err != nil {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
cancel()
<-donec
}
func TestV3AuthRevision(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
api := toGRPC(clus.Client(0))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
presp, perr := api.KV.Put(ctx, &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
cancel()
if perr != nil {
t.Fatal(perr)
}
rev := presp.Header.Revision
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
aresp, aerr := api.Auth.UserAdd(ctx, &pb.AuthUserAddRequest{Name: "root", Password: "123"})
cancel()
if aerr != nil {
t.Fatal(aerr)
}
if aresp.Header.Revision != rev {
t.Fatalf("revision expected %d, got %d", rev, aresp.Header.Revision)
}
}
func authSetupRoot(t *testing.T, auth pb.AuthClient) {
if _, err := auth.UserAdd(context.TODO(), &pb.AuthUserAddRequest{Name: "root", Password: "123"}); err != nil {
t.Fatal(err)
}
if _, err := auth.RoleAdd(context.TODO(), &pb.AuthRoleAddRequest{Name: "root"}); err != nil {
t.Fatal(err)
}
if _, err := auth.UserGrantRole(context.TODO(), &pb.AuthUserGrantRoleRequest{User: "root", Role: "root"}); err != nil {
t.Fatal(err)
}
if _, err := auth.AuthEnable(context.TODO(), &pb.AuthEnableRequest{}); err != nil {
t.Fatal(err)
}
}

View File

@@ -581,6 +581,37 @@ func TestV3Hash(t *testing.T) {
}
}
// TestV3HashRestart ensures that hash stays the same after restart.
func TestV3HashRestart(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
resp, err := toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
if err != nil || resp.Hash == 0 {
t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
}
hash1 := resp.Hash
clus.Members[0].Stop(t)
clus.Members[0].Restart(t)
clus.waitLeader(t, clus.Members)
kvc := toGRPC(clus.Client(0)).KV
waitForRestart(t, kvc)
cli = clus.RandClient()
resp, err = toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
if err != nil || resp.Hash == 0 {
t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
}
hash2 := resp.Hash
if hash1 != hash2 {
t.Fatalf("hash expected %d, got %d", hash1, hash2)
}
}
// TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
func TestV3StorageQuotaAPI(t *testing.T) {
defer testutil.AfterTest(t)

View File

@@ -252,10 +252,7 @@ func (le *lessor) Revoke(id LeaseID) error {
// sort keys so deletes are in same order among all members,
// otherwise the backened hashes will be different
keys := make([]string, 0, len(l.itemSet))
for item := range l.itemSet {
keys = append(keys, item.Key)
}
keys := l.Keys()
sort.StringSlice(keys).Sort()
for _, key := range keys {
_, _, err := le.rd.TxnDeleteRange(tid, []byte(key), nil)
@@ -367,10 +364,12 @@ func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
return ErrLeaseNotFound
}
l.mu.Lock()
for _, it := range items {
l.itemSet[it] = struct{}{}
le.itemMap[it] = id
}
l.mu.Unlock()
return nil
}
@@ -392,10 +391,12 @@ func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
return ErrLeaseNotFound
}
l.mu.Lock()
for _, it := range items {
delete(l.itemSet, it)
delete(le.itemMap, it)
}
l.mu.Unlock()
return nil
}
@@ -506,6 +507,8 @@ type Lease struct {
// expiry is time when lease should expire; must be 64-bit aligned.
expiry monotime.Time
// mu protects concurrent accesses to itemSet
mu sync.RWMutex
itemSet map[LeaseItem]struct{}
revokec chan struct{}
}
@@ -544,10 +547,12 @@ func (l *Lease) forever() { atomic.StoreUint64((*uint64)(&l.expiry), uint64(fore
// Keys returns all the keys attached to the lease.
func (l *Lease) Keys() []string {
l.mu.RLock()
keys := make([]string, 0, len(l.itemSet))
for k := range l.itemSet {
keys = append(keys, k.Key)
}
l.mu.RUnlock()
return keys
}

View File

@@ -15,11 +15,13 @@
package lease
import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"sort"
"sync"
"testing"
"time"
@@ -76,6 +78,53 @@ func TestLessorGrant(t *testing.T) {
be.BatchTx().Unlock()
}
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
// from concurrent map writes on 'itemSet'.
func TestLeaseConcurrentKeys(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
fd := &fakeDeleter{}
le := newLessor(be, minLeaseTTL)
le.SetRangeDeleter(fd)
// grant a lease with long term (100 seconds) to
// avoid early termination during the test.
l, err := le.Grant(1, 100)
if err != nil {
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
}
itemn := 10
items := make([]LeaseItem, itemn)
for i := 0; i < itemn; i++ {
items[i] = LeaseItem{Key: fmt.Sprintf("foo%d", i)}
}
if err = le.Attach(l.ID, items); err != nil {
t.Fatalf("failed to attach items to the lease: %v", err)
}
donec := make(chan struct{})
go func() {
le.Detach(l.ID, items)
close(donec)
}()
var wg sync.WaitGroup
wg.Add(itemn)
for i := 0; i < itemn; i++ {
go func() {
defer wg.Done()
l.Keys()
}()
}
<-donec
wg.Wait()
}
// TestLessorRevoke ensures Lessor can revoke a lease.
// The items in the revoked lease should be removed from
// the backend.
@@ -351,5 +400,5 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
t.Fatalf("failed to create tmpdir (%v)", err)
}
return tmpPath, backend.New(path.Join(tmpPath, "be"), time.Second, 10000)
return tmpPath, backend.New(filepath.Join(tmpPath, "be"), time.Second, 10000)
}

View File

@@ -20,7 +20,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"
@@ -303,6 +303,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
tmpb.FillPercent = 0.9 // for seq write in for each
if berr != nil {
return berr
}
@@ -319,6 +320,8 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return err
}
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for seq write in for each
count = 0
}
return tmpb.Put(k, v)
@@ -334,7 +337,7 @@ func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, strin
if err != nil {
plog.Fatal(err)
}
tmpPath := path.Join(dir, "database")
tmpPath := filepath.Join(dir, "database")
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
}

16
pkg/cpuutil/doc.go Normal file
View File

@@ -0,0 +1,16 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package cpuutil provides facilities for detecting cpu-specific features.
package cpuutil

36
pkg/cpuutil/endian.go Normal file
View File

@@ -0,0 +1,36 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cpuutil
import (
"encoding/binary"
"unsafe"
)
const intWidth int = int(unsafe.Sizeof(0))
var byteOrder binary.ByteOrder
// ByteOrder returns the byte order for the CPU's native endianness.
func ByteOrder() binary.ByteOrder { return byteOrder }
func init() {
var i int = 0x1
if v := (*[intWidth]byte)(unsafe.Pointer(&i)); v[0] == 0 {
byteOrder = binary.BigEndian
} else {
byteOrder = binary.LittleEndian
}
}

View File

@@ -19,7 +19,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"github.com/coreos/pkg/capnslog"
@@ -39,7 +39,7 @@ var (
// IsDirWriteable checks if dir is writable by writing and removing a file
// to dir. It returns nil if dir is writable.
func IsDirWriteable(dir string) error {
f := path.Join(dir, ".touch")
f := filepath.Join(dir, ".touch")
if err := ioutil.WriteFile(f, []byte(""), PrivateFileMode); err != nil {
return err
}

View File

@@ -16,7 +16,7 @@ package fileutil
import (
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
@@ -45,7 +45,7 @@ func purgeFile(dirname string, suffix string, max uint, interval time.Duration,
sort.Strings(newfnames)
fnames = newfnames
for len(newfnames) > int(max) {
f := path.Join(dirname, newfnames[0])
f := filepath.Join(dirname, newfnames[0])
l, err := TryLockFile(f, os.O_WRONLY, PrivateFileMode)
if err != nil {
break

View File

@@ -18,7 +18,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
"time"
@@ -33,7 +33,7 @@ func TestPurgeFile(t *testing.T) {
// minimal file set
for i := 0; i < 3; i++ {
f, ferr := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
f, ferr := os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", i)))
if ferr != nil {
t.Fatal(err)
}
@@ -53,7 +53,7 @@ func TestPurgeFile(t *testing.T) {
// rest of the files
for i := 4; i < 10; i++ {
go func(n int) {
f, ferr := os.Create(path.Join(dir, fmt.Sprintf("%d.test", n)))
f, ferr := os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", n)))
if ferr != nil {
t.Fatal(err)
}
@@ -99,7 +99,7 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
for i := 0; i < 10; i++ {
var f *os.File
f, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
f, err = os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", i)))
if err != nil {
t.Fatal(err)
}
@@ -107,7 +107,7 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
}
// create a purge barrier at 5
p := path.Join(dir, fmt.Sprintf("%d.test", 5))
p := filepath.Join(dir, fmt.Sprintf("%d.test", 5))
l, err := LockFile(p, os.O_WRONLY, PrivateFileMode)
if err != nil {
t.Fatal(err)

View File

@@ -43,7 +43,7 @@ func RecoverPort(port int) error {
// SetLatency adds latency in millisecond scale with random variations.
func SetLatency(ms, rv int) error {
ifce, err := GetDefaultInterface()
ifces, err := GetDefaultInterfaces()
if err != nil {
return err
}
@@ -51,14 +51,16 @@ func SetLatency(ms, rv int) error {
if rv > ms {
rv = 1
}
cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
// the rule has already been added. Overwrite it.
cmdStr = fmt.Sprintf("sudo tc qdisc change dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
for ifce := range ifces {
cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
// the rule has already been added. Overwrite it.
cmdStr = fmt.Sprintf("sudo tc qdisc change dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}
}
}
return nil
@@ -66,10 +68,15 @@ func SetLatency(ms, rv int) error {
// RemoveLatency resets latency configurations.
func RemoveLatency() error {
ifce, err := GetDefaultInterface()
ifces, err := GetDefaultInterfaces()
if err != nil {
return err
}
_, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output()
return err
for ifce := range ifces {
_, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output()
if err != nil {
return err
}
}
return nil
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !linux !386,!amd64
// +build !linux
package netutil
@@ -27,7 +27,7 @@ func GetDefaultHost() (string, error) {
return "", fmt.Errorf("default host not supported on %s_%s", runtime.GOOS, runtime.GOARCH)
}
// GetDefaultInterface fetches the device name of default routable interface.
func GetDefaultInterface() (string, error) {
return "", fmt.Errorf("default host not supported on %s_%s", runtime.GOOS, runtime.GOARCH)
// GetDefaultInterfaces fetches the device name of default routable interface.
func GetDefaultInterfaces() (map[string]uint8, error) {
return nil, fmt.Errorf("default host not supported on %s_%s", runtime.GOOS, runtime.GOARCH)
}

View File

@@ -13,9 +13,6 @@
// limitations under the License.
// +build linux
// +build 386 amd64
// TODO support native endian but without using "unsafe"
package netutil
@@ -24,27 +21,57 @@ import (
"encoding/binary"
"fmt"
"net"
"sort"
"syscall"
"github.com/coreos/etcd/pkg/cpuutil"
)
var errNoDefaultRoute = fmt.Errorf("could not find default route")
var errNoDefaultHost = fmt.Errorf("could not find default host")
var errNoDefaultInterface = fmt.Errorf("could not find default interface")
// GetDefaultHost obtains the first IP address of machine from the routing table and returns the IP address as string.
// An IPv4 address is preferred to an IPv6 address for backward compatibility.
func GetDefaultHost() (string, error) {
rmsg, rerr := getDefaultRoute()
rmsgs, rerr := getDefaultRoutes()
if rerr != nil {
return "", rerr
}
host, oif, err := parsePREFSRC(rmsg)
if err != nil {
return "", err
// prioritize IPv4
if rmsg, ok := rmsgs[syscall.AF_INET]; ok {
if host, err := chooseHost(syscall.AF_INET, rmsg); host != "" || err != nil {
return host, err
}
delete(rmsgs, syscall.AF_INET)
}
if host != "" {
return host, nil
// sort so choice is deterministic
var families []int
for family := range rmsgs {
families = append(families, int(family))
}
sort.Ints(families)
for _, f := range families {
family := uint8(f)
if host, err := chooseHost(family, rmsgs[family]); host != "" || err != nil {
return host, err
}
}
return "", errNoDefaultHost
}
func chooseHost(family uint8, rmsg *syscall.NetlinkMessage) (string, error) {
host, oif, err := parsePREFSRC(rmsg)
if host != "" || err != nil {
return host, err
}
// prefsrc not detected, fall back to getting address from iface
ifmsg, ierr := getIface(oif)
ifmsg, ierr := getIfaceAddr(oif, family)
if ierr != nil {
return "", ierr
}
@@ -55,15 +82,16 @@ func GetDefaultHost() (string, error) {
}
for _, attr := range attrs {
if attr.Attr.Type == syscall.RTA_SRC {
// search for RTA_DST because ipv6 doesn't have RTA_SRC
if attr.Attr.Type == syscall.RTA_DST {
return net.IP(attr.Value).String(), nil
}
}
return "", errNoDefaultRoute
return "", nil
}
func getDefaultRoute() (*syscall.NetlinkMessage, error) {
func getDefaultRoutes() (map[uint8]*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETROUTE, syscall.AF_UNSPEC)
if err != nil {
return nil, err
@@ -74,26 +102,33 @@ func getDefaultRoute() (*syscall.NetlinkMessage, error) {
return nil, msgErr
}
routes := make(map[uint8]*syscall.NetlinkMessage)
rtmsg := syscall.RtMsg{}
for _, m := range msgs {
if m.Header.Type != syscall.RTM_NEWROUTE {
continue
}
buf := bytes.NewBuffer(m.Data[:syscall.SizeofRtMsg])
if rerr := binary.Read(buf, binary.LittleEndian, &rtmsg); rerr != nil {
if rerr := binary.Read(buf, cpuutil.ByteOrder(), &rtmsg); rerr != nil {
continue
}
if rtmsg.Dst_len == 0 {
if rtmsg.Dst_len == 0 && rtmsg.Table == syscall.RT_TABLE_MAIN {
// zero-length Dst_len implies default route
return &m, nil
msg := m
routes[rtmsg.Family] = &msg
}
}
if len(routes) > 0 {
return routes, nil
}
return nil, errNoDefaultRoute
}
func getIface(idx uint32) (*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETADDR, syscall.AF_UNSPEC)
// Used to get an address of interface.
func getIfaceAddr(idx uint32, family uint8) (*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETADDR, int(family))
if err != nil {
return nil, err
}
@@ -109,7 +144,7 @@ func getIface(idx uint32) (*syscall.NetlinkMessage, error) {
continue
}
buf := bytes.NewBuffer(m.Data[:syscall.SizeofIfAddrmsg])
if rerr := binary.Read(buf, binary.LittleEndian, &ifaddrmsg); rerr != nil {
if rerr := binary.Read(buf, cpuutil.ByteOrder(), &ifaddrmsg); rerr != nil {
continue
}
if ifaddrmsg.Index == idx {
@@ -117,38 +152,75 @@ func getIface(idx uint32) (*syscall.NetlinkMessage, error) {
}
}
return nil, errNoDefaultRoute
return nil, fmt.Errorf("could not find address for interface index %v", idx)
}
var errNoDefaultInterface = fmt.Errorf("could not find default interface")
func GetDefaultInterface() (string, error) {
rmsg, rerr := getDefaultRoute()
if rerr != nil {
return "", rerr
}
_, oif, err := parsePREFSRC(rmsg)
// Used to get a name of interface.
func getIfaceLink(idx uint32) (*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETLINK, syscall.AF_UNSPEC)
if err != nil {
return "", err
return nil, err
}
ifmsg, ierr := getIface(oif)
if ierr != nil {
return "", ierr
msgs, msgErr := syscall.ParseNetlinkMessage(dat)
if msgErr != nil {
return nil, msgErr
}
attrs, aerr := syscall.ParseNetlinkRouteAttr(ifmsg)
if aerr != nil {
return "", aerr
}
for _, attr := range attrs {
if attr.Attr.Type == syscall.IFLA_IFNAME {
return string(attr.Value[:len(attr.Value)-1]), nil
ifinfomsg := syscall.IfInfomsg{}
for _, m := range msgs {
if m.Header.Type != syscall.RTM_NEWLINK {
continue
}
buf := bytes.NewBuffer(m.Data[:syscall.SizeofIfInfomsg])
if rerr := binary.Read(buf, cpuutil.ByteOrder(), &ifinfomsg); rerr != nil {
continue
}
if ifinfomsg.Index == int32(idx) {
return &m, nil
}
}
return "", errNoDefaultInterface
return nil, fmt.Errorf("could not find link for interface index %v", idx)
}
// GetDefaultInterfaces gets names of interfaces and returns a map[interface]families.
func GetDefaultInterfaces() (map[string]uint8, error) {
interfaces := make(map[string]uint8)
rmsgs, rerr := getDefaultRoutes()
if rerr != nil {
return interfaces, rerr
}
for family, rmsg := range rmsgs {
_, oif, err := parsePREFSRC(rmsg)
if err != nil {
return interfaces, err
}
ifmsg, ierr := getIfaceLink(oif)
if ierr != nil {
return interfaces, ierr
}
attrs, aerr := syscall.ParseNetlinkRouteAttr(ifmsg)
if aerr != nil {
return interfaces, aerr
}
for _, attr := range attrs {
if attr.Attr.Type == syscall.IFLA_IFNAME {
// key is an interface name
// possible values: 2 - AF_INET, 10 - AF_INET6, 12 - dualstack
interfaces[string(attr.Value[:len(attr.Value)-1])] += family
}
}
}
if len(interfaces) > 0 {
return interfaces, nil
}
return interfaces, errNoDefaultInterface
}
// parsePREFSRC returns preferred source address and output interface index (RTA_OIF).
@@ -164,7 +236,7 @@ func parsePREFSRC(m *syscall.NetlinkMessage) (host string, oif uint32, err error
host = net.IP(attr.Value).String()
}
if attr.Attr.Type == syscall.RTA_OIF {
oif = binary.LittleEndian.Uint32(attr.Value)
oif = cpuutil.ByteOrder().Uint32(attr.Value)
}
if host != "" && oif != uint32(0) {
break

View File

@@ -19,9 +19,17 @@ package netutil
import "testing"
func TestGetDefaultInterface(t *testing.T) {
ifc, err := GetDefaultInterface()
ifc, err := GetDefaultInterfaces()
if err != nil {
t.Fatal(err)
}
t.Logf("default network interface: %q\n", ifc)
t.Logf("default network interfaces: %+v\n", ifc)
}
func TestGetDefaultHost(t *testing.T) {
ip, err := GetDefaultHost()
if err != nil {
t.Fatal(err)
}
t.Logf("default ip: %v", ip)
}

View File

@@ -27,8 +27,7 @@ import (
"math/big"
"net"
"os"
"path"
"strings"
"path/filepath"
"time"
"github.com/coreos/etcd/pkg/fileutil"
@@ -91,8 +90,8 @@ func SelfCert(dirpath string, hosts []string) (info TLSInfo, err error) {
return
}
certPath := path.Join(dirpath, "cert.pem")
keyPath := path.Join(dirpath, "key.pem")
certPath := filepath.Join(dirpath, "cert.pem")
keyPath := filepath.Join(dirpath, "key.pem")
_, errcert := os.Stat(certPath)
_, errkey := os.Stat(keyPath)
if errcert == nil && errkey == nil {
@@ -120,10 +119,11 @@ func SelfCert(dirpath string, hosts []string) (info TLSInfo, err error) {
}
for _, host := range hosts {
if ip := net.ParseIP(host); ip != nil {
h, _, _ := net.SplitHostPort(host)
if ip := net.ParseIP(h); ip != nil {
tmpl.IPAddresses = append(tmpl.IPAddresses, ip)
} else {
tmpl.DNSNames = append(tmpl.DNSNames, strings.Split(host, ":")[0])
tmpl.DNSNames = append(tmpl.DNSNames, h)
}
}

View File

@@ -147,16 +147,17 @@ func (tp *TCPProxy) runMonitor() {
select {
case <-time.After(tp.MonitorInterval):
tp.mu.Lock()
for _, r := range tp.remotes {
if !r.isActive() {
go func() {
if err := r.tryReactivate(); err != nil {
plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
} else {
plog.Printf("activated %s", r.addr)
}
}()
for _, rem := range tp.remotes {
if rem.isActive() {
continue
}
go func(r *remote) {
if err := r.tryReactivate(); err != nil {
plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
} else {
plog.Printf("activated %s", r.addr)
}
}(rem)
}
tp.mu.Unlock()
case <-tp.donec:

View File

@@ -823,6 +823,11 @@ func stepLeader(r *raft, m pb.Message) {
return
case pb.MsgReadIndex:
if r.quorum() > 1 {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return
}
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.

View File

@@ -1246,6 +1246,55 @@ func TestHandleHeartbeatResp(t *testing.T) {
}
}
// TestRaftFreesReadOnlyMem ensures raft will free read request from
// readOnly readIndexQueue and pendingReadIndex map.
// related issue: https://github.com/coreos/etcd/issues/7571
func TestRaftFreesReadOnlyMem(t *testing.T) {
sm := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
sm.becomeCandidate()
sm.becomeLeader()
sm.raftLog.commitTo(sm.raftLog.lastIndex())
ctx := []byte("ctx")
// leader starts linearizable read request.
// more info: raft dissertation 6.4, step 2.
sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
msgs := sm.readMessages()
if len(msgs) != 1 {
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
}
if msgs[0].Type != pb.MsgHeartbeat {
t.Fatalf("type = %v, want MsgHeartbeat", msgs[0].Type)
}
if !bytes.Equal(msgs[0].Context, ctx) {
t.Fatalf("Context = %v, want %v", msgs[0].Context, ctx)
}
if len(sm.readOnly.readIndexQueue) != 1 {
t.Fatalf("len(readIndexQueue) = %v, want 1", len(sm.readOnly.readIndexQueue))
}
if len(sm.readOnly.pendingReadIndex) != 1 {
t.Fatalf("len(pendingReadIndex) = %v, want 1", len(sm.readOnly.pendingReadIndex))
}
if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; !ok {
t.Fatalf("can't find context %v in pendingReadIndex ", ctx)
}
// heartbeat responses from majority of followers (1 in this case)
// acknowledge the authority of the leader.
// more info: raft dissertation 6.4, step 3.
sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx})
if len(sm.readOnly.readIndexQueue) != 0 {
t.Fatalf("len(readIndexQueue) = %v, want 0", len(sm.readOnly.readIndexQueue))
}
if len(sm.readOnly.pendingReadIndex) != 0 {
t.Fatalf("len(pendingReadIndex) = %v, want 0", len(sm.readOnly.pendingReadIndex))
}
if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; ok {
t.Fatalf("found context %v in pendingReadIndex, want none", ctx)
}
}
// TestMsgAppRespWaitReset verifies the resume behavior of a leader
// MsgAppResp.
func TestMsgAppRespWaitReset(t *testing.T) {
@@ -1856,6 +1905,77 @@ func TestReadOnlyOptionLeaseWithoutCheckQuorum(t *testing.T) {
}
}
// TestReadOnlyForNewLeader ensures that a leader only accepts MsgReadIndex message
// when it commits at least one log entry at it term.
func TestReadOnlyForNewLeader(t *testing.T) {
cfg := newTestConfig(1, []uint64{1, 2, 3}, 10, 1,
&MemoryStorage{
ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}},
hardState: pb.HardState{Commit: 1, Term: 1},
})
cfg.Applied = 1
a := newRaft(cfg)
cfg = newTestConfig(2, []uint64{1, 2, 3}, 10, 1,
&MemoryStorage{
ents: []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}},
hardState: pb.HardState{Commit: 2, Term: 1},
})
cfg.Applied = 2
b := newRaft(cfg)
cfg = newTestConfig(2, []uint64{1, 2, 3}, 10, 1,
&MemoryStorage{
ents: []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}},
hardState: pb.HardState{Commit: 2, Term: 1},
})
cfg.Applied = 2
c := newRaft(cfg)
nt := newNetwork(a, b, c)
// Drop MsgApp to forbid peer a to commit any log entry at its term after it becomes leader.
nt.ignore(pb.MsgApp)
// Force peer a to become leader.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
if a.state != StateLeader {
t.Fatalf("state = %s, want %s", a.state, StateLeader)
}
// Ensure peer a drops read only request.
var windex uint64 = 4
wctx := []byte("ctx")
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
if len(a.readStates) != 0 {
t.Fatalf("len(readStates) = %d, want zero", len(a.readStates))
}
nt.recover()
// Force peer a to commit a log entry at its term
for i := 0; i < a.heartbeatTimeout; i++ {
a.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
if a.raftLog.committed != 4 {
t.Fatalf("committed = %d, want 4", a.raftLog.committed)
}
lastLogTerm := a.raftLog.zeroTermOnErrCompacted(a.raftLog.term(a.raftLog.committed))
if lastLogTerm != a.Term {
t.Fatalf("last log term = %d, want %d", lastLogTerm, a.Term)
}
// Ensure peer a accepts read only request after it commits a entry at its term.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
if len(a.readStates) != 1 {
t.Fatalf("len(readStates) = %d, want 1", len(a.readStates))
}
rs := a.readStates[0]
if rs.Index != windex {
t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
}
if !bytes.Equal(rs.RequestCtx, wctx) {
t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
}
}
func TestLeaderAppResp(t *testing.T) {
// initial progress: match = 0; next = 3
tests := []struct {

View File

@@ -100,7 +100,7 @@ func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
if found {
ro.readIndexQueue = ro.readIndexQueue[i:]
for _, rs := range rss {
delete(ro.pendingReadIndex, string(rs.req.Context))
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
}
return rss
}

View File

@@ -19,7 +19,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
)
@@ -41,7 +41,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
os.Remove(f.Name())
return n, err
}
fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
fn := filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
if fileutil.Exist(fn) {
os.Remove(f.Name())
return n, nil
@@ -67,7 +67,7 @@ func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
wfn := fmt.Sprintf("%016x.snap.db", id)
for _, fn := range fns {
if fn == wfn {
return path.Join(s.dir, fn), nil
return filepath.Join(s.dir, fn), nil
}
}
return "", fmt.Errorf("snap: snapshot file doesn't exist")

View File

@@ -21,7 +21,7 @@ import (
"hash/crc32"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
@@ -84,13 +84,13 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
}
err = pioutil.WriteAndSyncFile(path.Join(s.dir, fname), d, 0666)
err = pioutil.WriteAndSyncFile(filepath.Join(s.dir, fname), d, 0666)
if err == nil {
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
} else {
err1 := os.Remove(path.Join(s.dir, fname))
err1 := os.Remove(filepath.Join(s.dir, fname))
if err1 != nil {
plog.Errorf("failed to remove broken snapshot file %s", path.Join(s.dir, fname))
plog.Errorf("failed to remove broken snapshot file %s", filepath.Join(s.dir, fname))
}
}
return err
@@ -114,7 +114,7 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
}
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
fpath := path.Join(dir, name)
fpath := filepath.Join(dir, name)
snap, err := Read(fpath)
if err != nil {
renameBroken(fpath)

View File

@@ -19,7 +19,7 @@ import (
"hash/crc32"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
@@ -38,7 +38,7 @@ var testSnap = &raftpb.Snapshot{
}
func TestSaveAndLoad(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -60,7 +60,7 @@ func TestSaveAndLoad(t *testing.T) {
}
func TestBadCRC(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -76,14 +76,14 @@ func TestBadCRC(t *testing.T) {
// fake a crc mismatch
crcTable = crc32.MakeTable(crc32.Koopman)
_, err = Read(path.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
_, err = Read(filepath.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
if err == nil || err != ErrCRCMismatch {
t.Errorf("err = %v, want %v", err, ErrCRCMismatch)
}
}
func TestFailback(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -91,7 +91,7 @@ func TestFailback(t *testing.T) {
defer os.RemoveAll(dir)
large := fmt.Sprintf("%016x-%016x-%016x.snap", 0xFFFF, 0xFFFF, 0xFFFF)
err = ioutil.WriteFile(path.Join(dir, large), []byte("bad data"), 0666)
err = ioutil.WriteFile(filepath.Join(dir, large), []byte("bad data"), 0666)
if err != nil {
t.Fatal(err)
}
@@ -109,7 +109,7 @@ func TestFailback(t *testing.T) {
if !reflect.DeepEqual(g, testSnap) {
t.Errorf("snap = %#v, want %#v", g, testSnap)
}
if f, err := os.Open(path.Join(dir, large) + ".broken"); err != nil {
if f, err := os.Open(filepath.Join(dir, large) + ".broken"); err != nil {
t.Fatal("broken snapshot does not exist")
} else {
f.Close()
@@ -117,7 +117,7 @@ func TestFailback(t *testing.T) {
}
func TestSnapNames(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -125,7 +125,7 @@ func TestSnapNames(t *testing.T) {
defer os.RemoveAll(dir)
for i := 1; i <= 5; i++ {
var f *os.File
if f, err = os.Create(path.Join(dir, fmt.Sprintf("%d.snap", i))); err != nil {
if f, err = os.Create(filepath.Join(dir, fmt.Sprintf("%d.snap", i))); err != nil {
t.Fatal(err)
} else {
f.Close()
@@ -146,7 +146,7 @@ func TestSnapNames(t *testing.T) {
}
func TestLoadNewestSnap(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -175,7 +175,7 @@ func TestLoadNewestSnap(t *testing.T) {
}
func TestNoSnapshot(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -189,19 +189,19 @@ func TestNoSnapshot(t *testing.T) {
}
func TestEmptySnapshot(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte(""), 0x700)
err = ioutil.WriteFile(filepath.Join(dir, "1.snap"), []byte(""), 0x700)
if err != nil {
t.Fatal(err)
}
_, err = Read(path.Join(dir, "1.snap"))
_, err = Read(filepath.Join(dir, "1.snap"))
if err != ErrEmptySnapshot {
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
}
@@ -210,14 +210,14 @@ func TestEmptySnapshot(t *testing.T) {
// TestAllSnapshotBroken ensures snapshotter returns
// ErrNoSnapshot if all the snapshots are broken.
func TestAllSnapshotBroken(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("bad"), 0x700)
err = ioutil.WriteFile(filepath.Join(dir, "1.snap"), []byte("bad"), 0x700)
if err != nil {
t.Fatal(err)
}

48
test
View File

@@ -9,7 +9,7 @@
# PKG=./wal ./test
# PKG=snap ./test
#
# Run code coverage
# Run code coverage
# COVERDIR=coverage PASSES=cov ./test
set -e
@@ -32,10 +32,6 @@ TEST_PKGS=`find . -name \*_test.go | while read a; do dirname $a; done | sort |
FORMATTABLE=`find . -name \*.go | while read a; do echo $(dirname $a)/"*.go"; done | sort | uniq | egrep -v "$IGNORE_PKGS" | sed "s|\./||g"`
TESTABLE_AND_FORMATTABLE=`echo "$TEST_PKGS" | egrep -v "$INTEGRATION_PKGS"`
# TODO: 'client' pkg fails with gosimple from generated files
# TODO: 'rafttest' is failing with unused
GOSIMPLE_UNUSED_PATHS=`find . -name \*.go | while read a; do dirname $a; done | sort | uniq | egrep -v "$IGNORE_PKGS" | grep -v 'client'`
if [ -z "$GOARCH" ]; then
GOARCH=$(go env GOARCH);
fi
@@ -194,48 +190,6 @@ function fmt_pass {
fi
done
if which goword >/dev/null; then
echo "Checking goword..."
# get all go files to process
gofiles=`find $FMT -iname '*.go' 2>/dev/null`
# ignore tests and protobuf files
gofiles=`echo ${gofiles} | sort | uniq | sed "s/ /\n/g" | egrep -v "(\\_test.go|\\.pb\\.go)"`
# only check for broken exported godocs
gowordRes=`goword -use-spell=false ${gofiles} | grep godoc-export | sort`
if [ ! -z "$gowordRes" ]; then
echo -e "goword checking failed:\n${gowordRes}"
exit 255
fi
else
echo "Skipping goword..."
fi
if which gosimple >/dev/null; then
echo "Checking gosimple..."
for path in $GOSIMPLE_UNUSED_PATHS; do
simplResult=`gosimple ${path} 2>&1 || true`
if [ -n "${simplResult}" ]; then
echo -e "gosimple checking ${path} failed:\n${simplResult}"
exit 255
fi
done
else
echo "Skipping gosimple..."
fi
if which unused >/dev/null; then
echo "Checking unused..."
for path in $GOSIMPLE_UNUSED_PATHS; do
unusedResult=`unused ${path} 2>&1 || true`
if [ -n "${unusedResult}" ]; then
echo -e "unused checking ${path} failed:\n${unusedResult}"
exit 255
fi
done
else
echo "Skipping unused..."
fi
echo "Checking for license header..."
licRes=$(for file in $(find . -type f -iname '*.go' ! -path './cmd/*' ! -path './gopath.proto/*'); do
head -n3 "${file}" | grep -Eq "(Copyright|generated|GENERATED)" || echo -e " ${file}"

View File

@@ -18,7 +18,7 @@ import (
"flag"
"fmt"
"log"
"path"
"path/filepath"
"time"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -58,7 +58,7 @@ func main() {
ss := snap.New(snapDir(*from))
snapshot, err = ss.Load()
} else {
snapshot, err = snap.Read(path.Join(snapDir(*from), *snapfile))
snapshot, err = snap.Read(filepath.Join(snapDir(*from), *snapfile))
}
switch err {
@@ -132,9 +132,9 @@ func main() {
}
}
func walDir(dataDir string) string { return path.Join(dataDir, "member", "wal") }
func walDir(dataDir string) string { return filepath.Join(dataDir, "member", "wal") }
func snapDir(dataDir string) string { return path.Join(dataDir, "member", "snap") }
func snapDir(dataDir string) string { return filepath.Join(dataDir, "member", "snap") }
func parseWALMetadata(b []byte) (id, cid types.ID) {
var metadata etcdserverpb.Metadata

View File

@@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.1.1"
Version = "3.1.6"
APIVersion = "unknown"
// Git SHA Value will be set during build

View File

@@ -17,7 +17,7 @@ package wal
import (
"fmt"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
)
@@ -65,7 +65,7 @@ func (fp *filePipeline) Close() error {
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
// count % 2 so this file isn't the same as the one last published
fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return nil, err
}

View File

@@ -17,7 +17,7 @@ package wal
import (
"io"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/wal/walpb"
@@ -94,6 +94,6 @@ func openLast(dirpath string) (*fileutil.LockedFile, error) {
if err != nil {
return nil, err
}
last := path.Join(dirpath, names[len(names)-1])
last := filepath.Join(dirpath, names[len(names)-1])
return fileutil.LockFile(last, os.O_RDWR, fileutil.PrivateFileMode)
}

View File

@@ -21,7 +21,7 @@ import (
"hash/crc32"
"io"
"os"
"path"
"path/filepath"
"sync"
"time"
@@ -97,7 +97,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
}
// keep temporary wal directory so WAL initialization appears atomic
tmpdirpath := path.Clean(dirpath) + ".tmp"
tmpdirpath := filepath.Clean(dirpath) + ".tmp"
if fileutil.Exist(tmpdirpath) {
if err := os.RemoveAll(tmpdirpath); err != nil {
return nil, err
@@ -107,7 +107,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
return nil, err
}
p := path.Join(tmpdirpath, walName(0, 0))
p := filepath.Join(tmpdirpath, walName(0, 0))
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil {
return nil, err
@@ -143,7 +143,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
}
// directory was renamed; sync parent dir to persist rename
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
if perr != nil {
return nil, perr
}
@@ -196,7 +196,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
rs := make([]io.Reader, 0)
ls := make([]*fileutil.LockedFile, 0)
for _, name := range names[nameIndex:] {
p := path.Join(dirpath, name)
p := filepath.Join(dirpath, name)
if write {
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
@@ -232,7 +232,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
// write reuses the file descriptors from read; don't close so
// WAL can append without dropping the file lock
w.readClose = nil
if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
closer()
return nil, err
}
@@ -372,7 +372,7 @@ func (w *WAL) cut() error {
return err
}
fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
// create a temp wal file with name sequence + 1, or truncate the existing one
newTail, err := w.fp.Open()
@@ -464,7 +464,7 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
found := false
for i, l := range w.locks {
_, lockIndex, err := parseWalName(path.Base(l.Name()))
_, lockIndex, err := parseWalName(filepath.Base(l.Name()))
if err != nil {
return err
}
@@ -611,7 +611,7 @@ func (w *WAL) seq() uint64 {
if t == nil {
return 0
}
seq, _, err := parseWalName(path.Base(t.Name()))
seq, _, err := parseWalName(filepath.Base(t.Name()))
if err != nil {
plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
}

View File

@@ -19,7 +19,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
@@ -40,7 +40,7 @@ func TestNew(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := path.Base(w.tail().Name()); g != walName(0, 0) {
if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
}
defer w.Close()
@@ -51,7 +51,7 @@ func TestNew(t *testing.T) {
t.Fatal(err)
}
gd := make([]byte, off)
f, err := os.Open(path.Join(p, path.Base(w.tail().Name())))
f, err := os.Open(filepath.Join(p, filepath.Base(w.tail().Name())))
if err != nil {
t.Fatal(err)
}
@@ -90,7 +90,7 @@ func TestNewForInitedDir(t *testing.T) {
}
defer os.RemoveAll(p)
os.Create(path.Join(p, walName(0, 0)))
os.Create(filepath.Join(p, walName(0, 0)))
if _, err = Create(p, nil); err == nil || err != os.ErrExist {
t.Errorf("err = %v, want %v", err, os.ErrExist)
}
@@ -103,7 +103,7 @@ func TestOpenAtIndex(t *testing.T) {
}
defer os.RemoveAll(dir)
f, err := os.Create(path.Join(dir, walName(0, 0)))
f, err := os.Create(filepath.Join(dir, walName(0, 0)))
if err != nil {
t.Fatal(err)
}
@@ -113,7 +113,7 @@ func TestOpenAtIndex(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := path.Base(w.tail().Name()); g != walName(0, 0) {
if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
}
if w.seq() != 0 {
@@ -122,7 +122,7 @@ func TestOpenAtIndex(t *testing.T) {
w.Close()
wname := walName(2, 10)
f, err = os.Create(path.Join(dir, wname))
f, err = os.Create(filepath.Join(dir, wname))
if err != nil {
t.Fatal(err)
}
@@ -132,7 +132,7 @@ func TestOpenAtIndex(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := path.Base(w.tail().Name()); g != wname {
if g := filepath.Base(w.tail().Name()); g != wname {
t.Errorf("name = %+v, want %+v", g, wname)
}
if w.seq() != 2 {
@@ -172,7 +172,7 @@ func TestCut(t *testing.T) {
t.Fatal(err)
}
wname := walName(1, 1)
if g := path.Base(w.tail().Name()); g != wname {
if g := filepath.Base(w.tail().Name()); g != wname {
t.Errorf("name = %s, want %s", g, wname)
}
@@ -188,14 +188,14 @@ func TestCut(t *testing.T) {
t.Fatal(err)
}
wname = walName(2, 2)
if g := path.Base(w.tail().Name()); g != wname {
if g := filepath.Base(w.tail().Name()); g != wname {
t.Errorf("name = %s, want %s", g, wname)
}
// check the state in the last WAL
// We do check before closing the WAL to ensure that Cut syncs the data
// into the disk.
f, err := os.Open(path.Join(p, wname))
f, err := os.Open(filepath.Join(p, wname))
if err != nil {
t.Fatal(err)
}
@@ -254,7 +254,7 @@ func TestSaveWithCut(t *testing.T) {
}
defer neww.Close()
wname := walName(1, index)
if g := path.Base(neww.tail().Name()); g != wname {
if g := filepath.Base(neww.tail().Name()); g != wname {
t.Errorf("name = %s, want %s", g, wname)
}
@@ -416,7 +416,7 @@ func TestRecoverAfterCut(t *testing.T) {
}
md.Close()
if err := os.Remove(path.Join(p, walName(4, 4))); err != nil {
if err := os.Remove(filepath.Join(p, walName(4, 4))); err != nil {
t.Fatal(err)
}
@@ -570,7 +570,7 @@ func TestReleaseLockTo(t *testing.T) {
}
for i, l := range w.locks {
var lockIndex uint64
_, lockIndex, err = parseWalName(path.Base(l.Name()))
_, lockIndex, err = parseWalName(filepath.Base(l.Name()))
if err != nil {
t.Fatal(err)
}
@@ -588,7 +588,7 @@ func TestReleaseLockTo(t *testing.T) {
if len(w.locks) != 1 {
t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
}
_, lockIndex, err := parseWalName(path.Base(w.locks[0].Name()))
_, lockIndex, err := parseWalName(filepath.Base(w.locks[0].Name()))
if err != nil {
t.Fatal(err)
}
@@ -673,11 +673,11 @@ func TestRestartCreateWal(t *testing.T) {
defer os.RemoveAll(p)
// make temporary directory so it looks like initialization is interrupted
tmpdir := path.Clean(p) + ".tmp"
tmpdir := filepath.Clean(p) + ".tmp"
if err = os.Mkdir(tmpdir, fileutil.PrivateDirMode); err != nil {
t.Fatal(err)
}
if _, err = os.OpenFile(path.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
if _, err = os.OpenFile(filepath.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
t.Fatal(err)
}
@@ -729,7 +729,7 @@ func TestOpenOnTornWrite(t *testing.T) {
}
}
fn := path.Join(p, path.Base(w.tail().Name()))
fn := filepath.Join(p, filepath.Base(w.tail().Name()))
w.Close()
// clobber some entry with 0's to simulate a torn write