Compare commits

..

41 Commits

Author SHA1 Message Date
Gyu-Ho Lee
43b75072bf version: bump up to 3.1.7
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-27 08:25:50 -07:00
Anthony Romano
78141fae60 clientv3: set current revision to create rev regardless of CreateNotify
Turns out the optimization to ignore setting the init rev for
current revision watches breaks some ordering assumptions. Since
Watch only returns a channel once it gets a response, it should
bind the revision at the time of the first create response.

Was causing TestWatchReconnInit to fail.
2017-04-25 10:54:39 -07:00
Anthony Romano
3be37f042e integration: add pause/unpause to client bridge
Resetting connections sometimes isn't enough; need to stop/resume
accepting connections for some tests while keeping the member up.
2017-04-25 10:54:15 -07:00
Anthony Romano
7c896098d2 clientv3/integration: test watch resume with disconnect before first event 2017-04-25 10:53:58 -07:00
Anthony Romano
30f4e36de4 clientv3: only update initReq.rev == 0 with creation watch revision
Always updating the initReq.rev on watch create will resume from the wrong
revision if initReq is ever nonzero.
2017-04-25 10:53:37 -07:00
Anthony Romano
557abbe437 ctlv3: use printer for lease command results
Fixes #7783
2017-04-20 10:39:36 -07:00
Gyu-Ho Lee
4b448c209b version: bump up to 3.1.6+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-20 10:39:18 -07:00
Gyu-Ho Lee
e5b7ee2d03 version: bump up to 3.1.6
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-19 08:28:06 -07:00
Anthony Romano
a4c5731c38 ctlv3: keep lease as integer in fields printer
Output was giving %!d(string=) instead of the expected lease ID
value.
2017-04-19 08:27:52 -07:00
Gyu-Ho Lee
1f558ae678 integration: test auth API response header revision
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-17 20:06:14 -07:00
Hitoshi Mitake
df93627bbb etcdserver: fill-in Auth API Header in apply layer
Replacing "etcdserver: fill a response header in auth RPCs"
The revision should be set at the time of "apply",
not in later RPC layer.

Fix https://github.com/coreos/etcd/issues/7691

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-17 20:06:08 -07:00
Anthony Romano
a20295c65b auth: fix race on stopping simple token keeper
run goroutine was resetting a field for no reason and without holding a lock.
This patch cleans up the run goroutine management to make the start/stop path
less racey in general.
2017-04-14 16:52:25 -07:00
Hitoshi Mitake
9f7bb0df3a etcdserver: let Status() not require authentication
The information that can be obtained with the RPC doesn't need to be
protected.

Fix https://github.com/coreos/etcd/issues/7721
2017-04-13 15:56:26 -07:00
Gyu-Ho Lee
6a805e5222 test: do not run extra static checks on release branch
Things are usually already fixed in master branch
but not worth backporting.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-13 14:44:22 -07:00
Gyu-Ho Lee
38f79fa565 clientv3: fix gofmt warnings
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-13 14:44:22 -07:00
Anthony Romano
37a502cc88 integration: test requests with valid auth token but disabled auth
etcd was crashing since auth was assuming a token implies auth is enabled.
2017-04-13 14:44:22 -07:00
Anthony Romano
9be7fc5320 auth: protect simpleToken with single mutex and check if enabled
Dual locking doesn't really give a convincing performance improvement and
the lock ordering makes it impossible to safely check if the TTL keeper
is enabled or not.

Fixes #7722
2017-04-13 14:44:16 -07:00
Gyu-Ho Lee
288bccd288 pkg/transport: remove port in Certificate.IPAddresses
etcd passes 'url.URL.Host' to 'SelfCert' which contains
client, peer port. 'net.ParseIP("127.0.0.1:2379")' returns
'nil', and the client on this self-cert will see errors
of '127.0.0.1 because it doesn't contain any IP SANs'

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-05 04:30:26 -07:00
Anthony Romano
8cb5b48f58 clientv3: test dial timeout is respected when using auth 2017-04-04 14:14:23 -07:00
Anthony Romano
6538217528 clientv3: respect dial timeout when authenticating
Fixes #7627
2017-04-04 14:12:32 -07:00
Gyu-Ho Lee
e983d6b343 version: bump up to 3.1.5+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-04 14:10:15 -07:00
Gyu-Ho Lee
20490caaf0 version: bump up to 3.1.5
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-27 10:20:28 -07:00
fanmin shi
e156746959 raft: use rs.req.Entries[0].Data as the key for deletion in advance()
advance() should use rs.req.Entries[0].Data as the context instead of
req.Context for deletion. Since req.Context is never set, there won't be
any context being deleted from pendingReadIndex; results mem leak.

FIXES #7571
2017-03-24 15:51:39 -07:00
Artem Panchenko
d84bf983cc Dockerfile-release: add nsswitch.conf into image
The file '/etc/nsswitch.conf' is created in order to
take in account '/etc/hosts' entries while resolving
domain names.
2017-03-23 15:20:49 -07:00
Anthony Romano
b44c6bff9d clientv3: use waitgroup to wait for substream goroutine teardown
When a grpc watch stream is torn down, it will join on its logical substream
goroutines by waiting for each to close a channel. This doesn't guarantee
the substream is fully exited, though, but only about to exit and can be
waiting to resume even after Watch.Close finishes. Instead, use a
waitgroup.Done at the very end of the substream defer.

Fixes #7573
2017-03-23 12:26:32 -07:00
Anthony Romano
8c3c1b4a9c *: use filepath.Join for files 2017-03-23 09:53:56 -07:00
Tess Rinearson
b478387a59 wal: use path/filepath instead of path
Use the path/filepath package instead of the path package. The
path package assumes slash-separated paths, which doesn't work
on Windows. But path/filepath manipulates filename paths in a way
that's compatible across OSes.
2017-03-23 09:50:41 -07:00
Gyu-Ho Lee
dfc1f21f9d version: bump to 3.1.4+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-23 09:49:51 -07:00
Gyu-Ho Lee
41e52ebc22 version: bump to 3.1.4
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-22 09:46:23 -07:00
Xiang
7bb538d4d4 backend: add FillPercent option 2017-03-21 12:12:32 -07:00
Gyu-Ho Lee
1622782e49 integration: ensure 'StopNotify' on publish error
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-21 12:12:13 -07:00
Gyu-Ho Lee
99b47e0c1e etcdmain: handle StopNotify when ErrStopped aborted publish
Fix https://github.com/coreos/etcd/issues/7512.

If a server starts and aborts due to config error,
it is possible to get stuck in ReadyNotify waits.
This adds select case to get notified on stop channel.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-21 12:10:36 -07:00
Anthony Romano
350d0cd211 ctlv3: have "protobuf" in output help string instead of "proto"
Fixes #7538
2017-03-20 12:40:25 -07:00
Jonathan Sokolowski
72f37ff79a embed: Clear default initial cluster
NewConfig() should sets initial cluster from name but we should clear it
in the event that another discovery option has been specified.

Fixes #7516
2017-03-18 07:56:18 -07:00
Gyu-Ho Lee
3221454cab etcdserver: remove possibly compacted entry look-up
Fix https://github.com/coreos/etcd/issues/7470.

This patch removes unnecessary term look-up in
'createMergedSnapshotMessage', which can trigger panic
if raft entry at etcdProgress.appliedi got compacted
by subsequent 'MsgSnap' messages--if a follower is
being (in this case, network latency spikes) slow, it
could receive subsequent 'MsgSnap' requests from leader.

etcd server-side 'applyAll' routine and raft's Ready
processing routine becomes asynchronous after raft
entries are persisted. And given that raft Ready routine
takes less time to finish, it is possible that second
'MsgSnap' is being handled, while the slow 'applyAll'
is still processing the first(old) 'MsgSnap'. Then raft
Ready routine can compact the log entries at future
index to 'applyAll'. That is how 'createMergedSnapshotMessage'
tried to look up raft term with outdated etcdProgress.appliedi.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-18 07:56:18 -07:00
Anthony Romano
4a1bffdbc6 clientv3: close open watch channel if substream is closing on reconnect
If substream is closing but outc is still open while reconnecting, then outc
would only be closed once the watch client would connect or once the watch
client is closed. This was leading to deadlocks in the proxy tests. Instead,
close immediately if the context is canceled.

Fixes #7503
2017-03-18 07:56:18 -07:00
Anthony Romano
9d9be2bc86 ctlv3: ensure synced member list before printing env vars on member add
In cases of multiple endpoints, it's possible member add would get a its
member list from a member that has not yet recognized the membership
update. Instead, confirm that the member list response is from the
member that acked the member add or from a member that has synced
with the cluster following the member add.

Fixes #7498
2017-03-18 07:56:18 -07:00
Gyu-Ho Lee
e5462f74f1 auth: get rid of deadlocking channel passing scheme in simpleTokenTTL
Cherry-picked from 1b1fabef8f.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-18 07:56:05 -07:00
Gyu-Ho Lee
c68c1d9344 discovery: fix print format
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-17 14:21:57 -07:00
Anthony Romano
6ed56cd723 auth: nil check AuthInfo when checking admin permissions
If the context does not include auth information, get authinfo will
return a nil auth info and a nil error. This is then passed to
IsAdminPermitted, which would dereference the nil auth info.
2017-03-17 14:21:39 -07:00
Gyu-Ho Lee
a3c6f6bf81 version: bump up to 3.1.3+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-17 14:21:15 -07:00
51 changed files with 713 additions and 340 deletions

View File

@@ -32,18 +32,6 @@ matrix:
- go: tip
env: TARGET=ppc64le
addons:
apt:
packages:
- libpcap-dev
- libaspell-dev
- libhunspell-dev
before_install:
- go get -v github.com/chzchzchz/goword
- go get -v honnef.co/go/simple/cmd/gosimple
- go get -v honnef.co/go/unused/cmd/unused
# disable godep restore override
install:
- pushd cmd/etcd && go get -t -v ./... && popd

View File

@@ -5,6 +5,12 @@ ADD etcdctl /usr/local/bin/
RUN mkdir -p /var/etcd/
RUN mkdir -p /var/lib/etcd/
# Alpine Linux doesn't use pam, which means that there is no /etc/nsswitch.conf,
# but Golang relies on /etc/nsswitch.conf to check the order of DNS resolving
# (see https://github.com/golang/go/commit/9dee7771f561cf6aee081c0af6658cc81fac3918)
# To fix this we just create /etc/nsswitch.conf and add the following line:
RUN echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf
EXPOSE 2379 2380
# Define default command.

View File

@@ -21,85 +21,92 @@ import (
"crypto/rand"
"math/big"
"strings"
"sync"
"time"
)
const (
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
defaultSimpleTokenLength = 16
)
// var for testing purposes
var (
simpleTokenTTL = 5 * time.Minute
simpleTokenTTLResolution = 1 * time.Second
)
type simpleTokenTTLKeeper struct {
tokens map[string]time.Time
addSimpleTokenCh chan string
resetSimpleTokenCh chan string
deleteSimpleTokenCh chan string
stopCh chan chan struct{}
deleteTokenFunc func(string)
}
func NewSimpleTokenTTLKeeper(deletefunc func(string)) *simpleTokenTTLKeeper {
stk := &simpleTokenTTLKeeper{
tokens: make(map[string]time.Time),
addSimpleTokenCh: make(chan string, 1),
resetSimpleTokenCh: make(chan string, 1),
deleteSimpleTokenCh: make(chan string, 1),
stopCh: make(chan chan struct{}),
deleteTokenFunc: deletefunc,
}
go stk.run()
return stk
tokens map[string]time.Time
donec chan struct{}
stopc chan struct{}
deleteTokenFunc func(string)
mu *sync.Mutex
}
func (tm *simpleTokenTTLKeeper) stop() {
waitCh := make(chan struct{})
tm.stopCh <- waitCh
<-waitCh
close(tm.stopCh)
select {
case tm.stopc <- struct{}{}:
case <-tm.donec:
}
<-tm.donec
}
func (tm *simpleTokenTTLKeeper) addSimpleToken(token string) {
tm.addSimpleTokenCh <- token
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
}
func (tm *simpleTokenTTLKeeper) resetSimpleToken(token string) {
tm.resetSimpleTokenCh <- token
if _, ok := tm.tokens[token]; ok {
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
}
}
func (tm *simpleTokenTTLKeeper) deleteSimpleToken(token string) {
tm.deleteSimpleTokenCh <- token
delete(tm.tokens, token)
}
func (tm *simpleTokenTTLKeeper) run() {
tokenTicker := time.NewTicker(simpleTokenTTLResolution)
defer tokenTicker.Stop()
defer func() {
tokenTicker.Stop()
close(tm.donec)
}()
for {
select {
case t := <-tm.addSimpleTokenCh:
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
case t := <-tm.resetSimpleTokenCh:
if _, ok := tm.tokens[t]; ok {
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
}
case t := <-tm.deleteSimpleTokenCh:
delete(tm.tokens, t)
case <-tokenTicker.C:
nowtime := time.Now()
tm.mu.Lock()
for t, tokenendtime := range tm.tokens {
if nowtime.After(tokenendtime) {
tm.deleteTokenFunc(t)
delete(tm.tokens, t)
}
}
case waitCh := <-tm.stopCh:
tm.tokens = make(map[string]time.Time)
waitCh <- struct{}{}
tm.mu.Unlock()
case <-tm.stopc:
return
}
}
}
func (as *authStore) enable() {
delf := func(tk string) {
if username, ok := as.simpleTokens[tk]; ok {
plog.Infof("deleting token %s for user %s", tk, username)
delete(as.simpleTokens, tk)
}
}
as.simpleTokenKeeper = &simpleTokenTTLKeeper{
tokens: make(map[string]time.Time),
donec: make(chan struct{}),
stopc: make(chan struct{}),
deleteTokenFunc: delf,
mu: &as.simpleTokensMu,
}
go as.simpleTokenKeeper.run()
}
func (as *authStore) GenSimpleToken() (string, error) {
ret := make([]byte, defaultSimpleTokenLength)
@@ -117,7 +124,6 @@ func (as *authStore) GenSimpleToken() (string, error) {
func (as *authStore) assignSimpleTokenToUser(username, token string) {
as.simpleTokensMu.Lock()
_, ok := as.simpleTokens[token]
if ok {
plog.Panicf("token %s is alredy used", token)
@@ -129,13 +135,15 @@ func (as *authStore) assignSimpleTokenToUser(username, token string) {
}
func (as *authStore) invalidateUser(username string) {
if as.simpleTokenKeeper == nil {
return
}
as.simpleTokensMu.Lock()
defer as.simpleTokensMu.Unlock()
for token, name := range as.simpleTokens {
if strings.Compare(name, username) == 0 {
delete(as.simpleTokens, token)
as.simpleTokenKeeper.deleteSimpleToken(token)
}
}
as.simpleTokensMu.Unlock()
}

View File

@@ -168,13 +168,13 @@ type authStore struct {
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
simpleTokensMu sync.RWMutex
simpleTokens map[string]string // token -> username
simpleTokenKeeper *simpleTokenTTLKeeper
revision uint64
indexWaiter func(uint64) <-chan struct{}
// tokenSimple in v3.2+
indexWaiter func(uint64) <-chan struct{}
simpleTokenKeeper *simpleTokenTTLKeeper
simpleTokensMu sync.Mutex
simpleTokens map[string]string // token -> username
}
func newDeleterFunc(as *authStore) func(string) {
@@ -215,8 +215,7 @@ func (as *authStore) AuthEnable() error {
tx.UnsafePut(authBucketName, enableFlagKey, authEnabled)
as.enabled = true
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(newDeleterFunc(as))
as.enable()
as.rangePermCache = make(map[string]*unifiedRangePermissions)
@@ -244,11 +243,12 @@ func (as *authStore) AuthDisable() {
as.enabled = false
as.simpleTokensMu.Lock()
tk := as.simpleTokenKeeper
as.simpleTokenKeeper = nil
as.simpleTokens = make(map[string]string) // invalidate all tokens
as.simpleTokensMu.Unlock()
if as.simpleTokenKeeper != nil {
as.simpleTokenKeeper.stop()
as.simpleTokenKeeper = nil
if tk != nil {
tk.stop()
}
plog.Noticef("Authentication disabled")
@@ -646,13 +646,14 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
}
func (as *authStore) AuthInfoFromToken(token string) (*AuthInfo, bool) {
as.simpleTokensMu.RLock()
defer as.simpleTokensMu.RUnlock()
t, ok := as.simpleTokens[token]
if ok {
// same as '(t *tokenSimple) info' in v3.2+
as.simpleTokensMu.Lock()
username, ok := as.simpleTokens[token]
if ok && as.simpleTokenKeeper != nil {
as.simpleTokenKeeper.resetSimpleToken(token)
}
return &AuthInfo{Username: t, Revision: as.revision}, ok
as.simpleTokensMu.Unlock()
return &AuthInfo{Username: username, Revision: as.revision}, ok
}
type permSlice []*authpb.Permission
@@ -764,6 +765,9 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
if !as.isAuthEnabled() {
return nil
}
if authInfo == nil {
return ErrUserEmpty
}
tx := as.be.BatchTx()
tx.Lock()
@@ -908,7 +912,7 @@ func NewAuthStore(be backend.Backend, indexWaiter func(uint64) <-chan struct{})
}
if enabled {
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(newDeleterFunc(as))
as.enable()
}
if as.revision == 0 {

View File

@@ -282,8 +282,16 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo
tokenMu: &sync.RWMutex{},
}
err := c.getToken(context.TODO())
if err != nil {
ctx := c.ctx
if c.cfg.DialTimeout > 0 {
cctx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
defer cancel()
ctx = cctx
}
if err := c.getToken(ctx); err != nil {
if err == ctx.Err() && ctx.Err() != c.ctx.Err() {
err = grpc.ErrClientConnTimeout
}
return nil, err
}
@@ -335,6 +343,8 @@ func newClient(cfg *Config) (*Client, error) {
client.balancer = newSimpleBalancer(cfg.Endpoints)
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
if err != nil {
client.cancel()
client.balancer.Close()
return nil, err
}
client.conn = conn
@@ -353,6 +363,7 @@ func newClient(cfg *Config) (*Client, error) {
}
if !hasConn {
client.cancel()
client.balancer.Close()
conn.Close()
return nil, grpc.ErrClientConnTimeout
}

View File

@@ -70,33 +70,45 @@ func TestDialCancel(t *testing.T) {
func TestDialTimeout(t *testing.T) {
defer testutil.AfterTest(t)
donec := make(chan error)
go func() {
// without timeout, grpc keeps redialing if connection refused
cfg := Config{
Endpoints: []string{"localhost:12345"},
DialTimeout: 2 * time.Second}
c, err := New(cfg)
if c != nil || err == nil {
t.Errorf("new client should fail")
}
donec <- err
}()
time.Sleep(10 * time.Millisecond)
select {
case err := <-donec:
t.Errorf("dial didn't wait (%v)", err)
default:
testCfgs := []Config{
{
Endpoints: []string{"http://254.0.0.1:12345"},
DialTimeout: 2 * time.Second,
},
{
Endpoints: []string{"http://254.0.0.1:12345"},
DialTimeout: time.Second,
Username: "abc",
Password: "def",
},
}
select {
case <-time.After(5 * time.Second):
t.Errorf("failed to timeout dial on time")
case err := <-donec:
if err != grpc.ErrClientConnTimeout {
t.Errorf("unexpected error %v, want %v", err, grpc.ErrClientConnTimeout)
for i, cfg := range testCfgs {
donec := make(chan error)
go func() {
// without timeout, dial continues forever on ipv4 blackhole
c, err := New(cfg)
if c != nil || err == nil {
t.Errorf("#%d: new client should fail", i)
}
donec <- err
}()
time.Sleep(10 * time.Millisecond)
select {
case err := <-donec:
t.Errorf("#%d: dial didn't wait (%v)", i, err)
default:
}
select {
case <-time.After(5 * time.Second):
t.Errorf("#%d: failed to timeout dial on time", i)
case err := <-donec:
if err != grpc.ErrClientConnTimeout {
t.Errorf("#%d: unexpected error %v, want %v", i, err, grpc.ErrClientConnTimeout)
}
}
}
}

View File

@@ -347,7 +347,57 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
}
}
// TestWatchResumeComapcted checks that the watcher gracefully closes in case
func TestWatchResumeInitRev(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
if _, err := cli.Put(context.TODO(), "b", "2"); err != nil {
t.Fatal(err)
}
if _, err := cli.Put(context.TODO(), "a", "3"); err != nil {
t.Fatal(err)
}
// if resume is broken, it'll pick up this key first instead of a=3
if _, err := cli.Put(context.TODO(), "a", "4"); err != nil {
t.Fatal(err)
}
wch := clus.Client(0).Watch(context.Background(), "a", clientv3.WithRev(1), clientv3.WithCreatedNotify())
if resp, ok := <-wch; !ok || resp.Header.Revision != 4 {
t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok)
}
// pause wch
clus.Members[0].DropConnections()
clus.Members[0].PauseConnections()
select {
case resp, ok := <-wch:
t.Skipf("wch should block, got (%+v, %v); drop not fast enough", resp, ok)
case <-time.After(100 * time.Millisecond):
}
// resume wch
clus.Members[0].UnpauseConnections()
select {
case resp, ok := <-wch:
if !ok {
t.Fatal("unexpected watch close")
}
if len(resp.Events) == 0 {
t.Fatal("expected event on watch")
}
if string(resp.Events[0].Kv.Value) != "3" {
t.Fatalf("expected value=3, got event %+v", resp.Events[0])
}
case <-time.After(5 * time.Second):
t.Fatal("watch timed out")
}
}
// TestWatchResumeCompacted checks that the watcher gracefully closes in case
// that it tries to resume to a revision that's been compacted out of the store.
// Since the watcher's server restarts with stale data, the watcher will receive
// either a compaction error or all keys by staying in sync before the compaction

View File

@@ -132,6 +132,8 @@ type watchGrpcStream struct {
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// wg is Done when all substream goroutines have exited
wg sync.WaitGroup
// resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
@@ -406,7 +408,7 @@ func (w *watchGrpcStream) run() {
for range closing {
w.closeSubstream(<-w.closingc)
}
w.wg.Wait()
w.owner.closeStream(w)
}()
@@ -431,6 +433,7 @@ func (w *watchGrpcStream) run() {
}
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume
@@ -576,6 +579,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
if !resuming {
w.closingc <- ws
}
w.wg.Done()
}()
emptyWr := &WatchResponse{}
@@ -612,10 +616,24 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
if ws.initReq.createdNotify {
ws.outc <- *wr
}
// once the watch channel is returned, a current revision
// watch must resume at the store revision. This is necessary
// for the following case to work as expected:
// wch := m1.Watch("a")
// m2.Put("a", "b")
// <-wch
// If the revision is only bound on the first observed event,
// if wch is disconnected before the Put is issued, then reconnects
// after it is committed, it'll miss the Put.
if ws.initReq.rev == 0 {
nextRev = wr.Header.Revision
}
}
} else {
// current progress of watch; <= store revision
nextRev = wr.Header.Revision
}
nextRev = wr.Header.Revision
if len(wr.Events) > 0 {
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
}
@@ -674,6 +692,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
continue
}
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
}
@@ -694,6 +713,10 @@ func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan str
go func(ws *watcherStream) {
defer wg.Done()
if ws.closing {
if ws.initReq.ctx.Err() != nil && ws.outc != nil {
close(ws.outc)
ws.outc = nil
}
return
}
select {

View File

@@ -74,7 +74,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
shortHost := strings.TrimSuffix(srv.Target, ".")
urlHost := net.JoinHostPort(shortHost, port)
stringParts = append(stringParts, fmt.Sprintf("%s=%s://%s", n, scheme, urlHost))
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, scheme, urlHost)
plog.Noticef("got bootstrap from DNS for %s at %s://%s", service, scheme, urlHost)
if ok && url.Scheme != scheme {
plog.Errorf("bootstrap at %s from DNS for %s has scheme mismatch with expected peer %s", scheme+"://"+urlHost, service, url.String())
}

View File

@@ -229,6 +229,9 @@ func (cfg *configYAML) configFromFile(path string) error {
cfg.ACUrls = []url.URL(u)
}
if (cfg.Durl != "" || cfg.DNSCluster != "") && cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) {
cfg.InitialCluster = ""
}
if cfg.ClusterState == "" {
cfg.ClusterState = ClusterStateFlagNew
}

View File

@@ -19,7 +19,7 @@ import (
"fmt"
"net"
"net/http"
"path"
"path/filepath"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http"
@@ -166,7 +166,7 @@ func startPeerListeners(cfg *Config) (plns []net.Listener, err error) {
for i, u := range cfg.LPUrls {
phosts[i] = u.Host
}
cfg.PeerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts)
cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts)
if err != nil {
plog.Fatalf("could not get certs (%v)", err)
}
@@ -221,7 +221,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
for i, u := range cfg.LCUrls {
chosts[i] = u.Host
}
cfg.ClientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts)
cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts)
if err != nil {
plog.Fatalf("could not get certs (%v)", err)
}

View File

@@ -15,7 +15,7 @@
package embed
import (
"path"
"path/filepath"
"github.com/coreos/etcd/wal"
)
@@ -23,7 +23,7 @@ import (
func isMemberInitialized(cfg *Config) bool {
waldir := cfg.WalDir
if waldir == "" {
waldir = path.Join(cfg.Dir, "member", "wal")
waldir = filepath.Join(cfg.Dir, "member", "wal")
}
return wal.Exist(waldir)

View File

@@ -17,7 +17,7 @@ package command
import (
"fmt"
"log"
"path"
"path/filepath"
"time"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -50,19 +50,19 @@ func handleBackup(c *cli.Context) error {
var srcWAL string
var destWAL string
srcSnap := path.Join(c.String("data-dir"), "member", "snap")
destSnap := path.Join(c.String("backup-dir"), "member", "snap")
srcSnap := filepath.Join(c.String("data-dir"), "member", "snap")
destSnap := filepath.Join(c.String("backup-dir"), "member", "snap")
if c.String("wal-dir") != "" {
srcWAL = c.String("wal-dir")
} else {
srcWAL = path.Join(c.String("data-dir"), "member", "wal")
srcWAL = filepath.Join(c.String("data-dir"), "member", "wal")
}
if c.String("backup-wal-dir") != "" {
destWAL = c.String("backup-wal-dir")
} else {
destWAL = path.Join(c.String("backup-dir"), "member", "wal")
destWAL = filepath.Join(c.String("backup-dir"), "member", "wal")
}
if err := fileutil.CreateDirAll(destSnap); err != nil {

View File

@@ -67,7 +67,7 @@ func leaseGrantCommandFunc(cmd *cobra.Command, args []string) {
if err != nil {
ExitWithError(ExitError, fmt.Errorf("failed to grant lease (%v)\n", err))
}
fmt.Printf("lease %016x granted with TTL(%ds)\n", resp.ID, resp.TTL)
display.Grant(*resp)
}
// NewLeaseRevokeCommand returns the cobra command for "lease revoke".
@@ -90,12 +90,12 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
id := leaseFromArgs(args[0])
ctx, cancel := commandCtx(cmd)
_, err := mustClientFromCmd(cmd).Revoke(ctx, id)
resp, err := mustClientFromCmd(cmd).Revoke(ctx, id)
cancel()
if err != nil {
ExitWithError(ExitError, fmt.Errorf("failed to revoke lease (%v)\n", err))
}
fmt.Printf("lease %016x revoked\n", id)
display.Revoke(id, *resp)
}
var timeToLiveKeys bool
@@ -154,9 +154,12 @@ func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) {
}
for resp := range respc {
fmt.Printf("lease %016x keepalived with TTL(%d)\n", resp.ID, resp.TTL)
display.KeepAlive(*resp)
}
if _, ok := (display).(*simplePrinter); ok {
fmt.Printf("lease %016x expired or revoked.\n", id)
}
fmt.Printf("lease %016x expired or revoked.\n", id)
}
func leaseFromArgs(arg string) v3.LeaseID {

View File

@@ -107,7 +107,8 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
urls := strings.Split(memberPeerURLs, ",")
ctx, cancel := commandCtx(cmd)
resp, err := mustClientFromCmd(cmd).MemberAdd(ctx, urls)
cli := mustClientFromCmd(cmd)
resp, err := cli.MemberAdd(ctx, urls)
cancel()
if err != nil {
ExitWithError(ExitError, err)
@@ -118,12 +119,24 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
if _, ok := (display).(*simplePrinter); ok {
ctx, cancel = commandCtx(cmd)
listResp, err := mustClientFromCmd(cmd).MemberList(ctx)
cancel()
if err != nil {
ExitWithError(ExitError, err)
listResp, err := cli.MemberList(ctx)
// get latest member list; if there's failover new member might have outdated list
for {
if err != nil {
ExitWithError(ExitError, err)
}
if listResp.Header.MemberId == resp.Header.MemberId {
break
}
// quorum get to sync cluster list
gresp, gerr := cli.Get(ctx, "_")
if gerr != nil {
ExitWithError(ExitError, err)
}
resp.Header.MemberId = gresp.Header.MemberId
listResp, err = cli.MemberList(ctx)
}
cancel()
conf := []string{}
for _, memb := range listResp.Members {

View File

@@ -21,7 +21,7 @@ import (
"io"
"os"
"os/exec"
"path"
"path/filepath"
"time"
"github.com/coreos/etcd/client"
@@ -103,7 +103,7 @@ func prepareBackend() backend.Backend {
var be backend.Backend
bch := make(chan struct{})
dbpath := path.Join(migrateDatadir, "member", "snap", "db")
dbpath := filepath.Join(migrateDatadir, "member", "snap", "db")
go func() {
defer close(bch)
be = backend.New(dbpath, time.Second, 10000)
@@ -130,9 +130,9 @@ func rebuildStoreV2() (store.Store, uint64) {
waldir := migrateWALdir
if len(waldir) == 0 {
waldir = path.Join(migrateDatadir, "member", "wal")
waldir = filepath.Join(migrateDatadir, "member", "wal")
}
snapdir := path.Join(migrateDatadir, "member", "snap")
snapdir := filepath.Join(migrateDatadir, "member", "snap")
ss := snap.New(snapdir)
snapshot, err := ss.Load()

View File

@@ -32,6 +32,9 @@ type printer interface {
Txn(v3.TxnResponse)
Watch(v3.WatchResponse)
Grant(r v3.LeaseGrantResponse)
Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse)
KeepAlive(r v3.LeaseKeepAliveResponse)
TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool)
MemberAdd(v3.MemberAddResponse)
@@ -81,13 +84,18 @@ type printerRPC struct {
p func(interface{})
}
func (p *printerRPC) Del(r v3.DeleteResponse) { p.p((*pb.DeleteRangeResponse)(&r)) }
func (p *printerRPC) Get(r v3.GetResponse) { p.p((*pb.RangeResponse)(&r)) }
func (p *printerRPC) Put(r v3.PutResponse) { p.p((*pb.PutResponse)(&r)) }
func (p *printerRPC) Txn(r v3.TxnResponse) { p.p((*pb.TxnResponse)(&r)) }
func (p *printerRPC) Watch(r v3.WatchResponse) { p.p(&r) }
func (p *printerRPC) Del(r v3.DeleteResponse) { p.p((*pb.DeleteRangeResponse)(&r)) }
func (p *printerRPC) Get(r v3.GetResponse) { p.p((*pb.RangeResponse)(&r)) }
func (p *printerRPC) Put(r v3.PutResponse) { p.p((*pb.PutResponse)(&r)) }
func (p *printerRPC) Txn(r v3.TxnResponse) { p.p((*pb.TxnResponse)(&r)) }
func (p *printerRPC) Watch(r v3.WatchResponse) { p.p(&r) }
func (p *printerRPC) Grant(r v3.LeaseGrantResponse) { p.p(r) }
func (p *printerRPC) Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse) { p.p(r) }
func (p *printerRPC) KeepAlive(r v3.LeaseKeepAliveResponse) { p.p(r) }
func (p *printerRPC) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) { p.p(&r) }
func (p *printerRPC) MemberAdd(r v3.MemberAddResponse) { p.p((*pb.MemberAddResponse)(&r)) }
func (p *printerRPC) MemberAdd(r v3.MemberAddResponse) { p.p((*pb.MemberAddResponse)(&r)) }
func (p *printerRPC) MemberRemove(id uint64, r v3.MemberRemoveResponse) {
p.p((*pb.MemberRemoveResponse)(&r))
}

View File

@@ -30,7 +30,7 @@ func (p *fieldsPrinter) kv(pfx string, kv *spb.KeyValue) {
fmt.Printf("\"%sModRevision\" : %d\n", pfx, kv.ModRevision)
fmt.Printf("\"%sVersion\" : %d\n", pfx, kv.Version)
fmt.Printf("\"%sValue\" : %q\n", pfx, string(kv.Value))
fmt.Printf("\"%sLease\" : %d\n", pfx, string(kv.Lease))
fmt.Printf("\"%sLease\" : %d\n", pfx, kv.Lease)
}
func (p *fieldsPrinter) hdr(h *pb.ResponseHeader) {
@@ -92,6 +92,22 @@ func (p *fieldsPrinter) Watch(resp v3.WatchResponse) {
}
}
func (p *fieldsPrinter) Grant(r v3.LeaseGrantResponse) {
p.hdr(r.ResponseHeader)
fmt.Println(`"ID" :`, r.ID)
fmt.Println(`"TTL" :`, r.TTL)
}
func (p *fieldsPrinter) Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse) {
p.hdr(r.Header)
}
func (p *fieldsPrinter) KeepAlive(r v3.LeaseKeepAliveResponse) {
p.hdr(r.ResponseHeader)
fmt.Println(`"ID" :`, r.ID)
fmt.Println(`"TTL" :`, r.TTL)
}
func (p *fieldsPrinter) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) {
p.hdr(r.ResponseHeader)
fmt.Println(`"ID" :`, r.ID)

View File

@@ -79,6 +79,18 @@ func (s *simplePrinter) Watch(resp v3.WatchResponse) {
}
}
func (s *simplePrinter) Grant(resp v3.LeaseGrantResponse) {
fmt.Printf("lease %016x granted with TTL(%ds)\n", resp.ID, resp.TTL)
}
func (p *simplePrinter) Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse) {
fmt.Printf("lease %016x revoked\n", id)
}
func (p *simplePrinter) KeepAlive(resp v3.LeaseKeepAliveResponse) {
fmt.Printf("lease %016x keepalived with TTL(%d)\n", resp.ID, resp.TTL)
}
func (s *simplePrinter) TimeToLive(resp v3.LeaseTimeToLiveResponse, keys bool) {
txt := fmt.Sprintf("lease %016x granted with TTL(%ds), remaining(%ds)", resp.ID, resp.GrantedTTL, resp.TTL)
if keys {

View File

@@ -23,7 +23,7 @@ import (
"io"
"math"
"os"
"path"
"path/filepath"
"reflect"
"strings"
@@ -186,8 +186,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
basedir = restoreName + ".etcd"
}
waldir := path.Join(basedir, "member", "wal")
snapdir := path.Join(basedir, "member", "snap")
waldir := filepath.Join(basedir, "member", "wal")
snapdir := filepath.Join(basedir, "member", "snap")
if _, err := os.Stat(basedir); err == nil {
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
@@ -325,7 +325,7 @@ func makeDB(snapdir, dbfile string, commit int) {
ExitWithError(ExitIO, err)
}
dbpath := path.Join(snapdir, "db")
dbpath := filepath.Join(snapdir, "db")
db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
if dberr != nil {
ExitWithError(ExitIO, dberr)

View File

@@ -45,7 +45,7 @@ var (
func init() {
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, proto, simple, table)")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")
rootCmd.PersistentFlags().DurationVar(&globalFlags.DialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections")

View File

@@ -22,7 +22,7 @@ import (
"net"
"net/http"
"os"
"path"
"path/filepath"
"reflect"
"runtime"
"strings"
@@ -189,7 +189,10 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
return nil, nil, err
}
osutil.RegisterInterruptHandler(e.Server.Stop)
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
}
return e.Server.StopNotify(), e.Err(), nil
}
@@ -208,14 +211,14 @@ func startProxy(cfg *config) error {
return err
}
cfg.Dir = path.Join(cfg.Dir, "proxy")
cfg.Dir = filepath.Join(cfg.Dir, "proxy")
err = os.MkdirAll(cfg.Dir, fileutil.PrivateDirMode)
if err != nil {
return err
}
var peerURLs []string
clusterfile := path.Join(cfg.Dir, "cluster")
clusterfile := filepath.Join(cfg.Dir, "cluster")
b, err := ioutil.ReadFile(clusterfile)
switch {

View File

@@ -185,9 +185,5 @@ func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (
}
func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}
return ams.maintenanceServer.Status(ctx, ar)
}

View File

@@ -520,15 +520,14 @@ func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantR
if err == nil {
resp.ID = int64(l.ID)
resp.TTL = l.TTL()
resp.Header = &pb.ResponseHeader{Revision: a.s.KV().Rev()}
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
return &pb.LeaseRevokeResponse{Header: &pb.ResponseHeader{Revision: a.s.KV().Rev()}}, err
return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
}
func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
@@ -609,69 +608,125 @@ func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
if err != nil {
return nil, err
}
return &pb.AuthEnableResponse{}, nil
return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil
}
func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
a.s.AuthStore().AuthDisable()
return &pb.AuthDisableResponse{}, nil
return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil
}
func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
ctx := context.WithValue(context.WithValue(context.TODO(), "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken)
return a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
ctx := context.WithValue(context.WithValue(context.Background(), "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken)
resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
return a.s.AuthStore().UserAdd(r)
resp, err := a.s.AuthStore().UserAdd(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
return a.s.AuthStore().UserDelete(r)
resp, err := a.s.AuthStore().UserDelete(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
return a.s.AuthStore().UserChangePassword(r)
resp, err := a.s.AuthStore().UserChangePassword(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
return a.s.AuthStore().UserGrantRole(r)
resp, err := a.s.AuthStore().UserGrantRole(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
return a.s.AuthStore().UserGet(r)
resp, err := a.s.AuthStore().UserGet(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
return a.s.AuthStore().UserRevokeRole(r)
resp, err := a.s.AuthStore().UserRevokeRole(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
return a.s.AuthStore().RoleAdd(r)
resp, err := a.s.AuthStore().RoleAdd(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
return a.s.AuthStore().RoleGrantPermission(r)
resp, err := a.s.AuthStore().RoleGrantPermission(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
return a.s.AuthStore().RoleGet(r)
resp, err := a.s.AuthStore().RoleGet(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
return a.s.AuthStore().RoleRevokePermission(r)
resp, err := a.s.AuthStore().RoleRevokePermission(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
return a.s.AuthStore().RoleDelete(r)
resp, err := a.s.AuthStore().RoleDelete(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
return a.s.AuthStore().UserList(r)
resp, err := a.s.AuthStore().UserList(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
return a.s.AuthStore().RoleList(r)
resp, err := a.s.AuthStore().RoleList(r)
if resp != nil {
resp.Header = newHeader(a.s)
}
return resp, err
}
type quotaApplierV3 struct {
@@ -836,3 +891,12 @@ func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
}
rr.KVs = rr.KVs[:j]
}
func newHeader(s *EtcdServer) *pb.ResponseHeader {
return &pb.ResponseHeader{
ClusterId: uint64(s.Cluster().ID()),
MemberId: uint64(s.ID()),
Revision: s.KV().Rev(),
RaftTerm: s.Term(),
}
}

View File

@@ -16,7 +16,7 @@ package etcdserver
import (
"fmt"
"path"
"path/filepath"
"sort"
"strings"
"time"
@@ -118,16 +118,16 @@ func (c *ServerConfig) advertiseMatchesCluster() error {
return nil
}
func (c *ServerConfig) MemberDir() string { return path.Join(c.DataDir, "member") }
func (c *ServerConfig) MemberDir() string { return filepath.Join(c.DataDir, "member") }
func (c *ServerConfig) WALDir() string {
if c.DedicatedWALDir != "" {
return c.DedicatedWALDir
}
return path.Join(c.MemberDir(), "wal")
return filepath.Join(c.MemberDir(), "wal")
}
func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") }
func (c *ServerConfig) SnapDir() string { return filepath.Join(c.MemberDir(), "snap") }
func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }

View File

@@ -23,6 +23,7 @@ import (
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"sync"
"sync/atomic"
@@ -263,7 +264,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
ss := snap.New(cfg.SnapDir())
bepath := path.Join(cfg.SnapDir(), databaseFilename)
bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
beExist := fileutil.Exist(bepath)
var be backend.Backend
@@ -594,6 +595,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
}
@@ -666,6 +668,7 @@ func (s *EtcdServer) run() {
ep := etcdProgress{
confState: snap.Metadata.ConfState,
snapi: snap.Metadata.Index,
appliedt: snap.Metadata.Term,
appliedi: snap.Metadata.Index,
}
@@ -765,7 +768,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
}
@@ -789,7 +792,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
plog.Panicf("get database snapshot file path error: %v", err)
}
fn := path.Join(s.Cfg.SnapDir(), databaseFilename)
fn := filepath.Join(s.Cfg.SnapDir(), databaseFilename)
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
@@ -867,6 +870,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}
plog.Info("finished adding peers from new cluster configuration into network...")
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
@@ -888,7 +892,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
return
}
var shouldstop bool
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
@@ -1242,9 +1246,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
// apply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer.
// The given entries should not be empty.
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
var applied uint64
var shouldstop bool
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
for i := range es {
e := es[i]
switch e.Type {
@@ -1254,16 +1256,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState)
shouldstop = shouldstop || removedSelf
shouldStop = shouldStop || removedSelf
s.w.Trigger(cc.ID, err)
default:
plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
}
atomic.StoreUint64(&s.r.index, e.Index)
atomic.StoreUint64(&s.r.term, e.Term)
applied = e.Index
appliedt = e.Term
appliedi = e.Index
}
return applied, shouldstop
return appliedt, appliedi, shouldStop
}
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer

View File

@@ -613,7 +613,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
ents = append(ents, ent)
}
_, shouldStop := srv.apply(ents, &raftpb.ConfState{})
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
if !shouldStop {
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
}

View File

@@ -16,7 +16,6 @@ package etcdserver
import (
"io"
"log"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
@@ -26,12 +25,7 @@ import (
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
// as ReadCloser.
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
snapt, err := s.r.raftStorage.Term(snapi)
if err != nil {
log.Panicf("get term should never fail: %v", err)
}
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
// get a snapshot of v2 store as []byte
clone := s.store.Clone()
d, err := clone.SaveNoCopy()

View File

@@ -31,8 +31,9 @@ type bridge struct {
l net.Listener
conns map[*bridgeConn]struct{}
stopc chan struct{}
wg sync.WaitGroup
stopc chan struct{}
pausec chan struct{}
wg sync.WaitGroup
mu sync.Mutex
}
@@ -43,8 +44,11 @@ func newBridge(addr string) (*bridge, error) {
inaddr: addr + "0",
outaddr: addr,
conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}, 1),
stopc: make(chan struct{}),
pausec: make(chan struct{}),
}
close(b.pausec)
l, err := transport.NewUnixListener(b.inaddr)
if err != nil {
return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
@@ -59,10 +63,13 @@ func (b *bridge) URL() string { return "unix://" + b.inaddr }
func (b *bridge) Close() {
b.l.Close()
b.mu.Lock()
select {
case b.stopc <- struct{}{}:
case <-b.stopc:
default:
close(b.stopc)
}
b.mu.Unlock()
b.wg.Wait()
}
@@ -75,6 +82,22 @@ func (b *bridge) Reset() {
b.conns = make(map[*bridgeConn]struct{})
}
func (b *bridge) Pause() {
b.mu.Lock()
b.pausec = make(chan struct{})
b.mu.Unlock()
}
func (b *bridge) Unpause() {
b.mu.Lock()
select {
case <-b.pausec:
default:
close(b.pausec)
}
b.mu.Unlock()
}
func (b *bridge) serveListen() {
defer func() {
b.l.Close()
@@ -91,13 +114,22 @@ func (b *bridge) serveListen() {
if ierr != nil {
return
}
b.mu.Lock()
pausec := b.pausec
b.mu.Unlock()
select {
case <-b.stopc:
return
case <-pausec:
}
outc, oerr := net.Dial("unix", b.outaddr)
if oerr != nil {
inc.Close()
return
}
bc := &bridgeConn{inc, outc}
bc := &bridgeConn{inc, outc, make(chan struct{})}
b.wg.Add(1)
b.mu.Lock()
b.conns[bc] = struct{}{}
@@ -108,6 +140,7 @@ func (b *bridge) serveListen() {
func (b *bridge) serveConn(bc *bridgeConn) {
defer func() {
close(bc.donec)
bc.Close()
b.mu.Lock()
delete(b.conns, bc)
@@ -129,11 +162,13 @@ func (b *bridge) serveConn(bc *bridgeConn) {
}
type bridgeConn struct {
in net.Conn
out net.Conn
in net.Conn
out net.Conn
donec chan struct{}
}
func (bc *bridgeConn) Close() {
bc.in.Close()
bc.out.Close()
<-bc.donec
}

View File

@@ -449,6 +449,8 @@ type member struct {
grpcServer *grpc.Server
grpcAddr string
grpcBridge *bridge
keepDataDirTerminate bool
}
func (m *member) GRPCAddr() string { return m.grpcAddr }
@@ -530,7 +532,9 @@ func (m *member) electionTimeout() time.Duration {
return time.Duration(m.s.Cfg.ElectionTicks) * time.Millisecond
}
func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
// NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) {
@@ -746,8 +750,10 @@ func (m *member) Restart(t *testing.T) error {
func (m *member) Terminate(t *testing.T) {
plog.Printf("terminating %s (%s)", m.Name, m.grpcAddr)
m.Close()
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
if !m.keepDataDirTerminate {
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
}
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
}

View File

@@ -27,6 +27,7 @@ import (
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/pkg/capnslog"
"golang.org/x/net/context"
)
@@ -441,6 +442,51 @@ func TestRejectUnhealthyRemove(t *testing.T) {
}
}
// TestRestartRemoved ensures that restarting removed member must exit
// if 'initial-cluster-state' is set 'new' and old data directory still exists
// (see https://github.com/coreos/etcd/issues/7512 for more).
func TestRestartRemoved(t *testing.T) {
defer testutil.AfterTest(t)
capnslog.SetGlobalLogLevel(capnslog.INFO)
// 1. start single-member cluster
c := NewCluster(t, 1)
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
c.Launch(t)
defer c.Terminate(t)
// 2. add a new member
c.AddMember(t)
c.WaitLeader(t)
oldm := c.Members[0]
oldm.keepDataDirTerminate = true
// 3. remove first member, shut down without deleting data
if err := c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
t.Fatalf("expected to remove member, got error %v", err)
}
c.WaitLeader(t)
// 4. restart first member with 'initial-cluster-state=new'
// wrong config, expects exit within ReqTimeout
oldm.ServerConfig.NewCluster = false
if err := oldm.Restart(t); err != nil {
t.Fatalf("unexpected ForceRestart error: %v", err)
}
defer func() {
oldm.Close()
os.RemoveAll(oldm.ServerConfig.DataDir)
}()
select {
case <-oldm.s.StopNotify():
case <-time.After(time.Minute):
t.Fatalf("removed member didn't exit within %v", time.Minute)
}
}
// clusterMustProgress ensures that cluster can make progress. It creates
// a random key first, and check the new key could be got from all client urls
// of the cluster.

View File

@@ -18,7 +18,7 @@ import (
"fmt"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"testing"
@@ -58,7 +58,7 @@ func TestEmbedEtcd(t *testing.T) {
setupEmbedCfg(&tests[5].cfg, []url.URL{urls[4]}, []url.URL{urls[5], urls[6]})
setupEmbedCfg(&tests[6].cfg, []url.URL{urls[7], urls[8]}, []url.URL{urls[9]})
dir := path.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
dir := filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
os.RemoveAll(dir)
defer os.RemoveAll(dir)

View File

@@ -20,6 +20,7 @@ import (
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"
@@ -35,23 +36,85 @@ func TestV3AuthEmptyUserGet(t *testing.T) {
defer cancel()
api := toGRPC(clus.Client(0))
auth := api.Auth
if _, err := auth.UserAdd(ctx, &pb.AuthUserAddRequest{Name: "root", Password: "123"}); err != nil {
t.Fatal(err)
}
if _, err := auth.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: "root"}); err != nil {
t.Fatal(err)
}
if _, err := auth.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: "root", Role: "root"}); err != nil {
t.Fatal(err)
}
if _, err := auth.AuthEnable(ctx, &pb.AuthEnableRequest{}); err != nil {
t.Fatal(err)
}
authSetupRoot(t, api.Auth)
_, err := api.KV.Range(ctx, &pb.RangeRequest{Key: []byte("abc")})
if !eqErrGRPC(err, rpctypes.ErrUserEmpty) {
t.Fatalf("got %v, expected %v", err, rpctypes.ErrUserEmpty)
}
}
// TestV3AuthTokenWithDisable tests that auth won't crash if
// given a valid token when authentication is disabled
func TestV3AuthTokenWithDisable(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
c, cerr := clientv3.New(clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"})
if cerr != nil {
t.Fatal(cerr)
}
defer c.Close()
rctx, cancel := context.WithCancel(context.TODO())
donec := make(chan struct{})
go func() {
defer close(donec)
for rctx.Err() == nil {
c.Put(rctx, "abc", "def")
}
}()
time.Sleep(10 * time.Millisecond)
if _, err := c.AuthDisable(context.TODO()); err != nil {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
cancel()
<-donec
}
func TestV3AuthRevision(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
api := toGRPC(clus.Client(0))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
presp, perr := api.KV.Put(ctx, &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
cancel()
if perr != nil {
t.Fatal(perr)
}
rev := presp.Header.Revision
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
aresp, aerr := api.Auth.UserAdd(ctx, &pb.AuthUserAddRequest{Name: "root", Password: "123"})
cancel()
if aerr != nil {
t.Fatal(aerr)
}
if aresp.Header.Revision != rev {
t.Fatalf("revision expected %d, got %d", rev, aresp.Header.Revision)
}
}
func authSetupRoot(t *testing.T, auth pb.AuthClient) {
if _, err := auth.UserAdd(context.TODO(), &pb.AuthUserAddRequest{Name: "root", Password: "123"}); err != nil {
t.Fatal(err)
}
if _, err := auth.RoleAdd(context.TODO(), &pb.AuthRoleAddRequest{Name: "root"}); err != nil {
t.Fatal(err)
}
if _, err := auth.UserGrantRole(context.TODO(), &pb.AuthUserGrantRoleRequest{User: "root", Role: "root"}); err != nil {
t.Fatal(err)
}
if _, err := auth.AuthEnable(context.TODO(), &pb.AuthEnableRequest{}); err != nil {
t.Fatal(err)
}
}

View File

@@ -18,7 +18,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"sort"
"sync"
@@ -400,5 +400,5 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
t.Fatalf("failed to create tmpdir (%v)", err)
}
return tmpPath, backend.New(path.Join(tmpPath, "be"), time.Second, 10000)
return tmpPath, backend.New(filepath.Join(tmpPath, "be"), time.Second, 10000)
}

View File

@@ -20,7 +20,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"
@@ -303,6 +303,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
tmpb.FillPercent = 0.9 // for seq write in for each
if berr != nil {
return berr
}
@@ -319,6 +320,8 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return err
}
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for seq write in for each
count = 0
}
return tmpb.Put(k, v)
@@ -334,7 +337,7 @@ func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, strin
if err != nil {
plog.Fatal(err)
}
tmpPath := path.Join(dir, "database")
tmpPath := filepath.Join(dir, "database")
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
}

View File

@@ -19,7 +19,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"github.com/coreos/pkg/capnslog"
@@ -39,7 +39,7 @@ var (
// IsDirWriteable checks if dir is writable by writing and removing a file
// to dir. It returns nil if dir is writable.
func IsDirWriteable(dir string) error {
f := path.Join(dir, ".touch")
f := filepath.Join(dir, ".touch")
if err := ioutil.WriteFile(f, []byte(""), PrivateFileMode); err != nil {
return err
}

View File

@@ -16,7 +16,7 @@ package fileutil
import (
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
@@ -45,7 +45,7 @@ func purgeFile(dirname string, suffix string, max uint, interval time.Duration,
sort.Strings(newfnames)
fnames = newfnames
for len(newfnames) > int(max) {
f := path.Join(dirname, newfnames[0])
f := filepath.Join(dirname, newfnames[0])
l, err := TryLockFile(f, os.O_WRONLY, PrivateFileMode)
if err != nil {
break

View File

@@ -18,7 +18,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
"time"
@@ -33,7 +33,7 @@ func TestPurgeFile(t *testing.T) {
// minimal file set
for i := 0; i < 3; i++ {
f, ferr := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
f, ferr := os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", i)))
if ferr != nil {
t.Fatal(err)
}
@@ -53,7 +53,7 @@ func TestPurgeFile(t *testing.T) {
// rest of the files
for i := 4; i < 10; i++ {
go func(n int) {
f, ferr := os.Create(path.Join(dir, fmt.Sprintf("%d.test", n)))
f, ferr := os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", n)))
if ferr != nil {
t.Fatal(err)
}
@@ -99,7 +99,7 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
for i := 0; i < 10; i++ {
var f *os.File
f, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
f, err = os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", i)))
if err != nil {
t.Fatal(err)
}
@@ -107,7 +107,7 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
}
// create a purge barrier at 5
p := path.Join(dir, fmt.Sprintf("%d.test", 5))
p := filepath.Join(dir, fmt.Sprintf("%d.test", 5))
l, err := LockFile(p, os.O_WRONLY, PrivateFileMode)
if err != nil {
t.Fatal(err)

View File

@@ -27,8 +27,7 @@ import (
"math/big"
"net"
"os"
"path"
"strings"
"path/filepath"
"time"
"github.com/coreos/etcd/pkg/fileutil"
@@ -91,8 +90,8 @@ func SelfCert(dirpath string, hosts []string) (info TLSInfo, err error) {
return
}
certPath := path.Join(dirpath, "cert.pem")
keyPath := path.Join(dirpath, "key.pem")
certPath := filepath.Join(dirpath, "cert.pem")
keyPath := filepath.Join(dirpath, "key.pem")
_, errcert := os.Stat(certPath)
_, errkey := os.Stat(keyPath)
if errcert == nil && errkey == nil {
@@ -120,10 +119,11 @@ func SelfCert(dirpath string, hosts []string) (info TLSInfo, err error) {
}
for _, host := range hosts {
if ip := net.ParseIP(host); ip != nil {
h, _, _ := net.SplitHostPort(host)
if ip := net.ParseIP(h); ip != nil {
tmpl.IPAddresses = append(tmpl.IPAddresses, ip)
} else {
tmpl.DNSNames = append(tmpl.DNSNames, strings.Split(host, ":")[0])
tmpl.DNSNames = append(tmpl.DNSNames, h)
}
}

View File

@@ -1246,6 +1246,55 @@ func TestHandleHeartbeatResp(t *testing.T) {
}
}
// TestRaftFreesReadOnlyMem ensures raft will free read request from
// readOnly readIndexQueue and pendingReadIndex map.
// related issue: https://github.com/coreos/etcd/issues/7571
func TestRaftFreesReadOnlyMem(t *testing.T) {
sm := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
sm.becomeCandidate()
sm.becomeLeader()
sm.raftLog.commitTo(sm.raftLog.lastIndex())
ctx := []byte("ctx")
// leader starts linearizable read request.
// more info: raft dissertation 6.4, step 2.
sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
msgs := sm.readMessages()
if len(msgs) != 1 {
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
}
if msgs[0].Type != pb.MsgHeartbeat {
t.Fatalf("type = %v, want MsgHeartbeat", msgs[0].Type)
}
if !bytes.Equal(msgs[0].Context, ctx) {
t.Fatalf("Context = %v, want %v", msgs[0].Context, ctx)
}
if len(sm.readOnly.readIndexQueue) != 1 {
t.Fatalf("len(readIndexQueue) = %v, want 1", len(sm.readOnly.readIndexQueue))
}
if len(sm.readOnly.pendingReadIndex) != 1 {
t.Fatalf("len(pendingReadIndex) = %v, want 1", len(sm.readOnly.pendingReadIndex))
}
if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; !ok {
t.Fatalf("can't find context %v in pendingReadIndex ", ctx)
}
// heartbeat responses from majority of followers (1 in this case)
// acknowledge the authority of the leader.
// more info: raft dissertation 6.4, step 3.
sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx})
if len(sm.readOnly.readIndexQueue) != 0 {
t.Fatalf("len(readIndexQueue) = %v, want 0", len(sm.readOnly.readIndexQueue))
}
if len(sm.readOnly.pendingReadIndex) != 0 {
t.Fatalf("len(pendingReadIndex) = %v, want 0", len(sm.readOnly.pendingReadIndex))
}
if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; ok {
t.Fatalf("found context %v in pendingReadIndex, want none", ctx)
}
}
// TestMsgAppRespWaitReset verifies the resume behavior of a leader
// MsgAppResp.
func TestMsgAppRespWaitReset(t *testing.T) {

View File

@@ -100,7 +100,7 @@ func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
if found {
ro.readIndexQueue = ro.readIndexQueue[i:]
for _, rs := range rss {
delete(ro.pendingReadIndex, string(rs.req.Context))
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
}
return rss
}

View File

@@ -19,7 +19,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
)
@@ -41,7 +41,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
os.Remove(f.Name())
return n, err
}
fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
fn := filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
if fileutil.Exist(fn) {
os.Remove(f.Name())
return n, nil
@@ -67,7 +67,7 @@ func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
wfn := fmt.Sprintf("%016x.snap.db", id)
for _, fn := range fns {
if fn == wfn {
return path.Join(s.dir, fn), nil
return filepath.Join(s.dir, fn), nil
}
}
return "", fmt.Errorf("snap: snapshot file doesn't exist")

View File

@@ -21,7 +21,7 @@ import (
"hash/crc32"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
@@ -84,13 +84,13 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
}
err = pioutil.WriteAndSyncFile(path.Join(s.dir, fname), d, 0666)
err = pioutil.WriteAndSyncFile(filepath.Join(s.dir, fname), d, 0666)
if err == nil {
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
} else {
err1 := os.Remove(path.Join(s.dir, fname))
err1 := os.Remove(filepath.Join(s.dir, fname))
if err1 != nil {
plog.Errorf("failed to remove broken snapshot file %s", path.Join(s.dir, fname))
plog.Errorf("failed to remove broken snapshot file %s", filepath.Join(s.dir, fname))
}
}
return err
@@ -114,7 +114,7 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
}
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
fpath := path.Join(dir, name)
fpath := filepath.Join(dir, name)
snap, err := Read(fpath)
if err != nil {
renameBroken(fpath)

View File

@@ -19,7 +19,7 @@ import (
"hash/crc32"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
@@ -38,7 +38,7 @@ var testSnap = &raftpb.Snapshot{
}
func TestSaveAndLoad(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -60,7 +60,7 @@ func TestSaveAndLoad(t *testing.T) {
}
func TestBadCRC(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -76,14 +76,14 @@ func TestBadCRC(t *testing.T) {
// fake a crc mismatch
crcTable = crc32.MakeTable(crc32.Koopman)
_, err = Read(path.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
_, err = Read(filepath.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
if err == nil || err != ErrCRCMismatch {
t.Errorf("err = %v, want %v", err, ErrCRCMismatch)
}
}
func TestFailback(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -91,7 +91,7 @@ func TestFailback(t *testing.T) {
defer os.RemoveAll(dir)
large := fmt.Sprintf("%016x-%016x-%016x.snap", 0xFFFF, 0xFFFF, 0xFFFF)
err = ioutil.WriteFile(path.Join(dir, large), []byte("bad data"), 0666)
err = ioutil.WriteFile(filepath.Join(dir, large), []byte("bad data"), 0666)
if err != nil {
t.Fatal(err)
}
@@ -109,7 +109,7 @@ func TestFailback(t *testing.T) {
if !reflect.DeepEqual(g, testSnap) {
t.Errorf("snap = %#v, want %#v", g, testSnap)
}
if f, err := os.Open(path.Join(dir, large) + ".broken"); err != nil {
if f, err := os.Open(filepath.Join(dir, large) + ".broken"); err != nil {
t.Fatal("broken snapshot does not exist")
} else {
f.Close()
@@ -117,7 +117,7 @@ func TestFailback(t *testing.T) {
}
func TestSnapNames(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -125,7 +125,7 @@ func TestSnapNames(t *testing.T) {
defer os.RemoveAll(dir)
for i := 1; i <= 5; i++ {
var f *os.File
if f, err = os.Create(path.Join(dir, fmt.Sprintf("%d.snap", i))); err != nil {
if f, err = os.Create(filepath.Join(dir, fmt.Sprintf("%d.snap", i))); err != nil {
t.Fatal(err)
} else {
f.Close()
@@ -146,7 +146,7 @@ func TestSnapNames(t *testing.T) {
}
func TestLoadNewestSnap(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -175,7 +175,7 @@ func TestLoadNewestSnap(t *testing.T) {
}
func TestNoSnapshot(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@@ -189,19 +189,19 @@ func TestNoSnapshot(t *testing.T) {
}
func TestEmptySnapshot(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte(""), 0x700)
err = ioutil.WriteFile(filepath.Join(dir, "1.snap"), []byte(""), 0x700)
if err != nil {
t.Fatal(err)
}
_, err = Read(path.Join(dir, "1.snap"))
_, err = Read(filepath.Join(dir, "1.snap"))
if err != ErrEmptySnapshot {
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
}
@@ -210,14 +210,14 @@ func TestEmptySnapshot(t *testing.T) {
// TestAllSnapshotBroken ensures snapshotter returns
// ErrNoSnapshot if all the snapshots are broken.
func TestAllSnapshotBroken(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("bad"), 0x700)
err = ioutil.WriteFile(filepath.Join(dir, "1.snap"), []byte("bad"), 0x700)
if err != nil {
t.Fatal(err)
}

48
test
View File

@@ -9,7 +9,7 @@
# PKG=./wal ./test
# PKG=snap ./test
#
# Run code coverage
# Run code coverage
# COVERDIR=coverage PASSES=cov ./test
set -e
@@ -32,10 +32,6 @@ TEST_PKGS=`find . -name \*_test.go | while read a; do dirname $a; done | sort |
FORMATTABLE=`find . -name \*.go | while read a; do echo $(dirname $a)/"*.go"; done | sort | uniq | egrep -v "$IGNORE_PKGS" | sed "s|\./||g"`
TESTABLE_AND_FORMATTABLE=`echo "$TEST_PKGS" | egrep -v "$INTEGRATION_PKGS"`
# TODO: 'client' pkg fails with gosimple from generated files
# TODO: 'rafttest' is failing with unused
GOSIMPLE_UNUSED_PATHS=`find . -name \*.go | while read a; do dirname $a; done | sort | uniq | egrep -v "$IGNORE_PKGS" | grep -v 'client'`
if [ -z "$GOARCH" ]; then
GOARCH=$(go env GOARCH);
fi
@@ -194,48 +190,6 @@ function fmt_pass {
fi
done
if which goword >/dev/null; then
echo "Checking goword..."
# get all go files to process
gofiles=`find $FMT -iname '*.go' 2>/dev/null`
# ignore tests and protobuf files
gofiles=`echo ${gofiles} | sort | uniq | sed "s/ /\n/g" | egrep -v "(\\_test.go|\\.pb\\.go)"`
# only check for broken exported godocs
gowordRes=`goword -use-spell=false ${gofiles} | grep godoc-export | sort`
if [ ! -z "$gowordRes" ]; then
echo -e "goword checking failed:\n${gowordRes}"
exit 255
fi
else
echo "Skipping goword..."
fi
if which gosimple >/dev/null; then
echo "Checking gosimple..."
for path in $GOSIMPLE_UNUSED_PATHS; do
simplResult=`gosimple ${path} 2>&1 || true`
if [ -n "${simplResult}" ]; then
echo -e "gosimple checking ${path} failed:\n${simplResult}"
exit 255
fi
done
else
echo "Skipping gosimple..."
fi
if which unused >/dev/null; then
echo "Checking unused..."
for path in $GOSIMPLE_UNUSED_PATHS; do
unusedResult=`unused ${path} 2>&1 || true`
if [ -n "${unusedResult}" ]; then
echo -e "unused checking ${path} failed:\n${unusedResult}"
exit 255
fi
done
else
echo "Skipping unused..."
fi
echo "Checking for license header..."
licRes=$(for file in $(find . -type f -iname '*.go' ! -path './cmd/*' ! -path './gopath.proto/*'); do
head -n3 "${file}" | grep -Eq "(Copyright|generated|GENERATED)" || echo -e " ${file}"

View File

@@ -18,7 +18,7 @@ import (
"flag"
"fmt"
"log"
"path"
"path/filepath"
"time"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -58,7 +58,7 @@ func main() {
ss := snap.New(snapDir(*from))
snapshot, err = ss.Load()
} else {
snapshot, err = snap.Read(path.Join(snapDir(*from), *snapfile))
snapshot, err = snap.Read(filepath.Join(snapDir(*from), *snapfile))
}
switch err {
@@ -132,9 +132,9 @@ func main() {
}
}
func walDir(dataDir string) string { return path.Join(dataDir, "member", "wal") }
func walDir(dataDir string) string { return filepath.Join(dataDir, "member", "wal") }
func snapDir(dataDir string) string { return path.Join(dataDir, "member", "snap") }
func snapDir(dataDir string) string { return filepath.Join(dataDir, "member", "snap") }
func parseWALMetadata(b []byte) (id, cid types.ID) {
var metadata etcdserverpb.Metadata

View File

@@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.1.3"
Version = "3.1.7"
APIVersion = "unknown"
// Git SHA Value will be set during build

View File

@@ -17,7 +17,7 @@ package wal
import (
"fmt"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
)
@@ -65,7 +65,7 @@ func (fp *filePipeline) Close() error {
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
// count % 2 so this file isn't the same as the one last published
fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return nil, err
}

View File

@@ -17,7 +17,7 @@ package wal
import (
"io"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/wal/walpb"
@@ -94,6 +94,6 @@ func openLast(dirpath string) (*fileutil.LockedFile, error) {
if err != nil {
return nil, err
}
last := path.Join(dirpath, names[len(names)-1])
last := filepath.Join(dirpath, names[len(names)-1])
return fileutil.LockFile(last, os.O_RDWR, fileutil.PrivateFileMode)
}

View File

@@ -21,7 +21,7 @@ import (
"hash/crc32"
"io"
"os"
"path"
"path/filepath"
"sync"
"time"
@@ -97,7 +97,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
}
// keep temporary wal directory so WAL initialization appears atomic
tmpdirpath := path.Clean(dirpath) + ".tmp"
tmpdirpath := filepath.Clean(dirpath) + ".tmp"
if fileutil.Exist(tmpdirpath) {
if err := os.RemoveAll(tmpdirpath); err != nil {
return nil, err
@@ -107,7 +107,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
return nil, err
}
p := path.Join(tmpdirpath, walName(0, 0))
p := filepath.Join(tmpdirpath, walName(0, 0))
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil {
return nil, err
@@ -143,7 +143,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
}
// directory was renamed; sync parent dir to persist rename
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
if perr != nil {
return nil, perr
}
@@ -196,7 +196,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
rs := make([]io.Reader, 0)
ls := make([]*fileutil.LockedFile, 0)
for _, name := range names[nameIndex:] {
p := path.Join(dirpath, name)
p := filepath.Join(dirpath, name)
if write {
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
@@ -232,7 +232,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
// write reuses the file descriptors from read; don't close so
// WAL can append without dropping the file lock
w.readClose = nil
if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
closer()
return nil, err
}
@@ -372,7 +372,7 @@ func (w *WAL) cut() error {
return err
}
fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
// create a temp wal file with name sequence + 1, or truncate the existing one
newTail, err := w.fp.Open()
@@ -464,7 +464,7 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
found := false
for i, l := range w.locks {
_, lockIndex, err := parseWalName(path.Base(l.Name()))
_, lockIndex, err := parseWalName(filepath.Base(l.Name()))
if err != nil {
return err
}
@@ -611,7 +611,7 @@ func (w *WAL) seq() uint64 {
if t == nil {
return 0
}
seq, _, err := parseWalName(path.Base(t.Name()))
seq, _, err := parseWalName(filepath.Base(t.Name()))
if err != nil {
plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
}

View File

@@ -19,7 +19,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
@@ -40,7 +40,7 @@ func TestNew(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := path.Base(w.tail().Name()); g != walName(0, 0) {
if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
}
defer w.Close()
@@ -51,7 +51,7 @@ func TestNew(t *testing.T) {
t.Fatal(err)
}
gd := make([]byte, off)
f, err := os.Open(path.Join(p, path.Base(w.tail().Name())))
f, err := os.Open(filepath.Join(p, filepath.Base(w.tail().Name())))
if err != nil {
t.Fatal(err)
}
@@ -90,7 +90,7 @@ func TestNewForInitedDir(t *testing.T) {
}
defer os.RemoveAll(p)
os.Create(path.Join(p, walName(0, 0)))
os.Create(filepath.Join(p, walName(0, 0)))
if _, err = Create(p, nil); err == nil || err != os.ErrExist {
t.Errorf("err = %v, want %v", err, os.ErrExist)
}
@@ -103,7 +103,7 @@ func TestOpenAtIndex(t *testing.T) {
}
defer os.RemoveAll(dir)
f, err := os.Create(path.Join(dir, walName(0, 0)))
f, err := os.Create(filepath.Join(dir, walName(0, 0)))
if err != nil {
t.Fatal(err)
}
@@ -113,7 +113,7 @@ func TestOpenAtIndex(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := path.Base(w.tail().Name()); g != walName(0, 0) {
if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
}
if w.seq() != 0 {
@@ -122,7 +122,7 @@ func TestOpenAtIndex(t *testing.T) {
w.Close()
wname := walName(2, 10)
f, err = os.Create(path.Join(dir, wname))
f, err = os.Create(filepath.Join(dir, wname))
if err != nil {
t.Fatal(err)
}
@@ -132,7 +132,7 @@ func TestOpenAtIndex(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := path.Base(w.tail().Name()); g != wname {
if g := filepath.Base(w.tail().Name()); g != wname {
t.Errorf("name = %+v, want %+v", g, wname)
}
if w.seq() != 2 {
@@ -172,7 +172,7 @@ func TestCut(t *testing.T) {
t.Fatal(err)
}
wname := walName(1, 1)
if g := path.Base(w.tail().Name()); g != wname {
if g := filepath.Base(w.tail().Name()); g != wname {
t.Errorf("name = %s, want %s", g, wname)
}
@@ -188,14 +188,14 @@ func TestCut(t *testing.T) {
t.Fatal(err)
}
wname = walName(2, 2)
if g := path.Base(w.tail().Name()); g != wname {
if g := filepath.Base(w.tail().Name()); g != wname {
t.Errorf("name = %s, want %s", g, wname)
}
// check the state in the last WAL
// We do check before closing the WAL to ensure that Cut syncs the data
// into the disk.
f, err := os.Open(path.Join(p, wname))
f, err := os.Open(filepath.Join(p, wname))
if err != nil {
t.Fatal(err)
}
@@ -254,7 +254,7 @@ func TestSaveWithCut(t *testing.T) {
}
defer neww.Close()
wname := walName(1, index)
if g := path.Base(neww.tail().Name()); g != wname {
if g := filepath.Base(neww.tail().Name()); g != wname {
t.Errorf("name = %s, want %s", g, wname)
}
@@ -416,7 +416,7 @@ func TestRecoverAfterCut(t *testing.T) {
}
md.Close()
if err := os.Remove(path.Join(p, walName(4, 4))); err != nil {
if err := os.Remove(filepath.Join(p, walName(4, 4))); err != nil {
t.Fatal(err)
}
@@ -570,7 +570,7 @@ func TestReleaseLockTo(t *testing.T) {
}
for i, l := range w.locks {
var lockIndex uint64
_, lockIndex, err = parseWalName(path.Base(l.Name()))
_, lockIndex, err = parseWalName(filepath.Base(l.Name()))
if err != nil {
t.Fatal(err)
}
@@ -588,7 +588,7 @@ func TestReleaseLockTo(t *testing.T) {
if len(w.locks) != 1 {
t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
}
_, lockIndex, err := parseWalName(path.Base(w.locks[0].Name()))
_, lockIndex, err := parseWalName(filepath.Base(w.locks[0].Name()))
if err != nil {
t.Fatal(err)
}
@@ -673,11 +673,11 @@ func TestRestartCreateWal(t *testing.T) {
defer os.RemoveAll(p)
// make temporary directory so it looks like initialization is interrupted
tmpdir := path.Clean(p) + ".tmp"
tmpdir := filepath.Clean(p) + ".tmp"
if err = os.Mkdir(tmpdir, fileutil.PrivateDirMode); err != nil {
t.Fatal(err)
}
if _, err = os.OpenFile(path.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
if _, err = os.OpenFile(filepath.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
t.Fatal(err)
}
@@ -729,7 +729,7 @@ func TestOpenOnTornWrite(t *testing.T) {
}
}
fn := path.Join(p, path.Base(w.tail().Name()))
fn := filepath.Join(p, filepath.Base(w.tail().Name()))
w.Close()
// clobber some entry with 0's to simulate a torn write