Compare commits
31 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
e348b1aedd | ||
![]() |
4355d91fcc | ||
![]() |
ce7b86b65a | ||
![]() |
d70a218b19 | ||
![]() |
e029de320a | ||
![]() |
863a56a998 | ||
![]() |
3282d90707 | ||
![]() |
b2d5c6c7bd | ||
![]() |
6fe7316ec4 | ||
![]() |
40e02256c7 | ||
![]() |
c9d46ab379 | ||
![]() |
d1da2023b9 | ||
![]() |
eaa0050d4d | ||
![]() |
99a12662c1 | ||
![]() |
e6d44fa3f2 | ||
![]() |
43caf2b28a | ||
![]() |
bfb7a155b4 | ||
![]() |
f76ef3ce8d | ||
![]() |
462ba8bb09 | ||
![]() |
146ed08052 | ||
![]() |
1bc974d536 | ||
![]() |
3e3468d1fa | ||
![]() |
207f19354b | ||
![]() |
bb8a5377ce | ||
![]() |
8291e16128 | ||
![]() |
a5b31087e8 | ||
![]() |
cec79dd706 | ||
![]() |
3641af83e7 | ||
![]() |
240fda5128 | ||
![]() |
d627301735 | ||
![]() |
534c31b4ca |
@@ -2,7 +2,7 @@
|
||||
|
||||
TEST_SUFFIX=$(date +%s | base64 | head -c 15)
|
||||
|
||||
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.3.0"
|
||||
TEST_OPTS="PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.3.1"
|
||||
if [ "$TEST_ARCH" == "386" ]; then
|
||||
TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'"
|
||||
fi
|
||||
|
@@ -6,7 +6,7 @@ sudo: required
|
||||
services: docker
|
||||
|
||||
go:
|
||||
- 1.9.4
|
||||
- "1.9.4"
|
||||
- tip
|
||||
|
||||
notifications:
|
||||
@@ -30,7 +30,7 @@ matrix:
|
||||
- go: tip
|
||||
env: TARGET=amd64-go-tip
|
||||
exclude:
|
||||
- go: 1.9.4
|
||||
- go: "1.9.4"
|
||||
env: TARGET=amd64-go-tip
|
||||
- go: tip
|
||||
env: TARGET=amd64
|
||||
|
@@ -47,7 +47,7 @@ When considering features, support, and stability, new applications planning to
|
||||
|
||||
### Consul
|
||||
|
||||
Consul bills itself as an end-to-end service discovery framework. To wit, it includes services such as health checking, failure detection, and DNS. Incidentally, Consul also exposes a key value store with mediocre performance and an intricate API. As it stands in Consul 0.7, the storage system does not scales well; systems requiring millions of keys will suffer from high latencies and memory pressure. The key value API is missing, most notably, multi-version keys, conditional transactions, and reliable streaming watches.
|
||||
Consul is an end-to-end service discovery framework. It provides built-in health checking, failure detection, and DNS services. In addition, Consul exposes a key value store with RESTful HTTP APIs. [As it stands in Consul 1.0][dbtester-comparison-results], the storage system does not scale as well as other systems like etcd or Zookeeper in key-value operations; systems requiring millions of keys will suffer from high latencies and memory pressure. The key value API is missing, most notably, multi-version keys, conditional transactions, and reliable streaming watches.
|
||||
|
||||
etcd and Consul solve different problems. If looking for a distributed consistent key value store, etcd is a better choice over Consul. If looking for end-to-end cluster service discovery, etcd will not have enough features; choose Kubernetes, Consul, or SmartStack.
|
||||
|
||||
@@ -113,3 +113,4 @@ For distributed coordination, choosing etcd can help prevent operational headach
|
||||
[container-linux]: https://coreos.com/why
|
||||
[locksmith]: https://github.com/coreos/locksmith
|
||||
[kubernetes]: http://kubernetes.io/docs/whatisk8s
|
||||
[dbtester-comparison-results]: https://github.com/coreos/dbtester/tree/master/test-results/2018Q1-02-etcd-zookeeper-consul
|
||||
|
@@ -47,7 +47,9 @@ $ etcdctl defrag
|
||||
Finished defragmenting etcd member[127.0.0.1:2379]
|
||||
```
|
||||
|
||||
Note that defragmentation to a live member blocks the system from reading and writing data while rebuilding its states.
|
||||
**Note that defragmentation to a live member blocks the system from reading and writing data while rebuilding its states**.
|
||||
|
||||
**Note that defragmentation request does not get replicated over cluster. That is, the request is only applied to the local node. Specify all members in `--endpoints` flag.**
|
||||
|
||||
To defragment an etcd data directory directly, while etcd is not running, use the command:
|
||||
|
||||
|
@@ -8,6 +8,8 @@ Before [starting an upgrade](#upgrade-procedure), read through the rest of this
|
||||
|
||||
### Upgrade checklists
|
||||
|
||||
**NOTE:** When [migrating from v2 with no v3 data](https://github.com/coreos/etcd/issues/9480), etcd server v3.2+ panics when etcd restores from existing snapshots but no v3 `ETCD_DATA_DIR/member/snap/db` file. This happens when the server had migrated from v2 with no previous v3 data. This also prevents accidental v3 data loss (e.g. `db` file might have been moved). etcd requires that post v3 migration can only happen with v3 data. Do not upgrade to newer v3 versions until v3.0 server contains v3 data.
|
||||
|
||||
#### Upgrade requirements
|
||||
|
||||
To upgrade an existing etcd deployment to 3.0, the running cluster must be 2.3 or greater. If it's before 2.3, please upgrade to [2.3](https://github.com/coreos/etcd/releases/tag/v2.3.8) before upgrading to 3.0.
|
||||
|
@@ -8,6 +8,8 @@ Before [starting an upgrade](#upgrade-procedure), read through the rest of this
|
||||
|
||||
### Upgrade checklists
|
||||
|
||||
**NOTE:** When [migrating from v2 with no v3 data](https://github.com/coreos/etcd/issues/9480), etcd server v3.2+ panics when etcd restores from existing snapshots but no v3 `ETCD_DATA_DIR/member/snap/db` file. This happens when the server had migrated from v2 with no previous v3 data. This also prevents accidental v3 data loss (e.g. `db` file might have been moved). etcd requires that post v3 migration can only happen with v3 data. Do not upgrade to newer v3 versions until v3.0 server contains v3 data.
|
||||
|
||||
#### Monitoring
|
||||
|
||||
Following metrics from v3.0.x have been deprecated in favor of [go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus):
|
||||
|
@@ -8,8 +8,14 @@ Before [starting an upgrade](#upgrade-procedure), read through the rest of this
|
||||
|
||||
### Upgrade checklists
|
||||
|
||||
**NOTE:** When [migrating from v2 with no v3 data](https://github.com/coreos/etcd/issues/9480), etcd server v3.2+ panics when etcd restores from existing snapshots but no v3 `ETCD_DATA_DIR/member/snap/db` file. This happens when the server had migrated from v2 with no previous v3 data. This also prevents accidental v3 data loss (e.g. `db` file might have been moved). etcd requires that post v3 migration can only happen with v3 data. Do not upgrade to newer v3 versions until v3.0 server contains v3 data.
|
||||
|
||||
Highlighted breaking changes in 3.2.
|
||||
|
||||
#### Change in default `snapshot-count` value
|
||||
|
||||
The default value of `--snapshot-count` has [changed from from 10,000 to 100,000](https://github.com/coreos/etcd/pull/7160). Higher snapshot count means it holds Raft entries in memory for longer before discarding old entries. It is a trade-off between less frequent snapshotting and [higher memory usage](https://github.com/kubernetes/kubernetes/issues/60589#issuecomment-371977156). Higher `--snapshot-count` will be manifested with higher memory usage, while retaining more Raft entries helps with the availabilities of slow followers: leader is still able to replicate its logs to followers, rather than forcing followers to rebuild its stores from leader snapshots.
|
||||
|
||||
#### Change in gRPC dependency (>=3.2.10)
|
||||
|
||||
3.2.10 or later now requires [grpc/grpc-go](https://github.com/grpc/grpc-go/releases) `v1.7.5` (<=3.2.9 requires `v1.2.1`).
|
||||
|
@@ -8,6 +8,8 @@ Before [starting an upgrade](#upgrade-procedure), read through the rest of this
|
||||
|
||||
### Upgrade checklists
|
||||
|
||||
**NOTE:** When [migrating from v2 with no v3 data](https://github.com/coreos/etcd/issues/9480), etcd server v3.2+ panics when etcd restores from existing snapshots but no v3 `ETCD_DATA_DIR/member/snap/db` file. This happens when the server had migrated from v2 with no previous v3 data. This also prevents accidental v3 data loss (e.g. `db` file might have been moved). etcd requires that post v3 migration can only happen with v3 data. Do not upgrade to newer v3 versions until v3.0 server contains v3 data.
|
||||
|
||||
Highlighted breaking changes in 3.3.
|
||||
|
||||
#### Change in `etcdserver.EtcdServer` struct
|
||||
|
171
Documentation/upgrades/upgrade_3_4.md
Normal file
171
Documentation/upgrades/upgrade_3_4.md
Normal file
@@ -0,0 +1,171 @@
|
||||
## Upgrade etcd from 3.3 to 3.4
|
||||
|
||||
In the general case, upgrading from etcd 3.3 to 3.4 can be a zero-downtime, rolling upgrade:
|
||||
- one by one, stop the etcd v3.3 processes and replace them with etcd v3.4 processes
|
||||
- after running all v3.4 processes, new features in v3.4 are available to the cluster
|
||||
|
||||
Before [starting an upgrade](#upgrade-procedure), read through the rest of this guide to prepare.
|
||||
|
||||
### Upgrade checklists
|
||||
|
||||
**NOTE:** When [migrating from v2 with no v3 data](https://github.com/coreos/etcd/issues/9480), etcd server v3.2+ panics when etcd restores from existing snapshots but no v3 `ETCD_DATA_DIR/member/snap/db` file. This happens when the server had migrated from v2 with no previous v3 data. This also prevents accidental v3 data loss (e.g. `db` file might have been moved). etcd requires that post v3 migration can only happen with v3 data. Do not upgrade to newer v3 versions until v3.0 server contains v3 data.
|
||||
|
||||
Highlighted breaking changes in 3.4.
|
||||
|
||||
#### Change in `etcd` flags
|
||||
|
||||
`--ca-file` and `--peer-ca-file` flags are deprecated; they have been deprecated since v2.1.
|
||||
|
||||
```diff
|
||||
-etcd --ca-file ca-client.crt
|
||||
+etcd --trusted-ca-file ca-client.crt
|
||||
```
|
||||
|
||||
```diff
|
||||
-etcd --peer-ca-file ca-peer.crt
|
||||
+etcd --peer-trusted-ca-file ca-peer.crt
|
||||
```
|
||||
|
||||
#### Change in ``pkg/transport`
|
||||
|
||||
Deprecated `pkg/transport.TLSInfo.CAFile` field.
|
||||
|
||||
```diff
|
||||
import "github.com/coreos/etcd/pkg/transport"
|
||||
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: "/tmp/test-certs/test.pem",
|
||||
KeyFile: "/tmp/test-certs/test-key.pem",
|
||||
- CAFile: "/tmp/test-certs/trusted-ca.pem",
|
||||
+ TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
|
||||
}
|
||||
tlsConfig, err := tlsInfo.ClientConfig()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
```
|
||||
|
||||
### Server upgrade checklists
|
||||
|
||||
#### Upgrade requirements
|
||||
|
||||
To upgrade an existing etcd deployment to 3.4, the running cluster must be 3.3 or greater. If it's before 3.3, please [upgrade to 3.3](upgrade_3_3.md) before upgrading to 3.4.
|
||||
|
||||
Also, to ensure a smooth rolling upgrade, the running cluster must be healthy. Check the health of the cluster by using the `etcdctl endpoint health` command before proceeding.
|
||||
|
||||
#### Preparation
|
||||
|
||||
Before upgrading etcd, always test the services relying on etcd in a staging environment before deploying the upgrade to the production environment.
|
||||
|
||||
Before beginning, [backup the etcd data](../op-guide/maintenance.md#snapshot-backup). Should something go wrong with the upgrade, it is possible to use this backup to [downgrade](#downgrade) back to existing etcd version. Please note that the `snapshot` command only backs up the v3 data. For v2 data, see [backing up v2 datastore](../v2/admin_guide.md#backing-up-the-datastore).
|
||||
|
||||
#### Mixed versions
|
||||
|
||||
While upgrading, an etcd cluster supports mixed versions of etcd members, and operates with the protocol of the lowest common version. The cluster is only considered upgraded once all of its members are upgraded to version 3.4. Internally, etcd members negotiate with each other to determine the overall cluster version, which controls the reported version and the supported features.
|
||||
|
||||
#### Limitations
|
||||
|
||||
Note: If the cluster only has v3 data and no v2 data, it is not subject to this limitation.
|
||||
|
||||
If the cluster is serving a v2 data set larger than 50MB, each newly upgraded member may take up to two minutes to catch up with the existing cluster. Check the size of a recent snapshot to estimate the total data size. In other words, it is safest to wait for 2 minutes between upgrading each member.
|
||||
|
||||
For a much larger total data size, 100MB or more , this one-time process might take even more time. Administrators of very large etcd clusters of this magnitude can feel free to contact the [etcd team][etcd-contact] before upgrading, and we'll be happy to provide advice on the procedure.
|
||||
|
||||
#### Downgrade
|
||||
|
||||
If all members have been upgraded to v3.4, the cluster will be upgraded to v3.4, and downgrade from this completed state is **not possible**. If any single member is still v3.3, however, the cluster and its operations remains "v3.3", and it is possible from this mixed cluster state to return to using a v3.3 etcd binary on all members.
|
||||
|
||||
Please [backup the data directory](../op-guide/maintenance.md#snapshot-backup) of all etcd members to make downgrading the cluster possible even after it has been completely upgraded.
|
||||
|
||||
### Upgrade procedure
|
||||
|
||||
This example shows how to upgrade a 3-member v3.3 ectd cluster running on a local machine.
|
||||
|
||||
#### 1. Check upgrade requirements
|
||||
|
||||
Is the cluster healthy and running v3.3.x?
|
||||
|
||||
```
|
||||
$ ETCDCTL_API=3 etcdctl endpoint health --endpoints=localhost:2379,localhost:22379,localhost:32379
|
||||
localhost:2379 is healthy: successfully committed proposal: took = 6.600684ms
|
||||
localhost:22379 is healthy: successfully committed proposal: took = 8.540064ms
|
||||
localhost:32379 is healthy: successfully committed proposal: took = 8.763432ms
|
||||
|
||||
$ curl http://localhost:2379/version
|
||||
{"etcdserver":"3.3.0","etcdcluster":"3.3.0"}
|
||||
```
|
||||
|
||||
#### 2. Stop the existing etcd process
|
||||
|
||||
When each etcd process is stopped, expected errors will be logged by other cluster members. This is normal since a cluster member connection has been (temporarily) broken:
|
||||
|
||||
```
|
||||
14:13:31.491746 I | raft: c89feb932daef420 [term 3] received MsgTimeoutNow from 6d4f535bae3ab960 and starts an election to get leadership.
|
||||
14:13:31.491769 I | raft: c89feb932daef420 became candidate at term 4
|
||||
14:13:31.491788 I | raft: c89feb932daef420 received MsgVoteResp from c89feb932daef420 at term 4
|
||||
14:13:31.491797 I | raft: c89feb932daef420 [logterm: 3, index: 9] sent MsgVote request to 6d4f535bae3ab960 at term 4
|
||||
14:13:31.491805 I | raft: c89feb932daef420 [logterm: 3, index: 9] sent MsgVote request to 9eda174c7df8a033 at term 4
|
||||
14:13:31.491815 I | raft: raft.node: c89feb932daef420 lost leader 6d4f535bae3ab960 at term 4
|
||||
14:13:31.524084 I | raft: c89feb932daef420 received MsgVoteResp from 6d4f535bae3ab960 at term 4
|
||||
14:13:31.524108 I | raft: c89feb932daef420 [quorum:2] has received 2 MsgVoteResp votes and 0 vote rejections
|
||||
14:13:31.524123 I | raft: c89feb932daef420 became leader at term 4
|
||||
14:13:31.524136 I | raft: raft.node: c89feb932daef420 elected leader c89feb932daef420 at term 4
|
||||
14:13:31.592650 W | rafthttp: lost the TCP streaming connection with peer 6d4f535bae3ab960 (stream MsgApp v2 reader)
|
||||
14:13:31.592825 W | rafthttp: lost the TCP streaming connection with peer 6d4f535bae3ab960 (stream Message reader)
|
||||
14:13:31.693275 E | rafthttp: failed to dial 6d4f535bae3ab960 on stream Message (dial tcp [::1]:2380: getsockopt: connection refused)
|
||||
14:13:31.693289 I | rafthttp: peer 6d4f535bae3ab960 became inactive
|
||||
14:13:31.936678 W | rafthttp: lost the TCP streaming connection with peer 6d4f535bae3ab960 (stream Message writer)
|
||||
```
|
||||
|
||||
It's a good idea at this point to [backup the etcd data](../op-guide/maintenance.md#snapshot-backup) to provide a downgrade path should any problems occur:
|
||||
|
||||
```
|
||||
$ etcdctl snapshot save backup.db
|
||||
```
|
||||
|
||||
#### 3. Drop-in etcd v3.4 binary and start the new etcd process
|
||||
|
||||
The new v3.4 etcd will publish its information to the cluster:
|
||||
|
||||
```
|
||||
14:14:25.363225 I | etcdserver: published {Name:s1 ClientURLs:[http://localhost:2379]} to cluster a9ededbffcb1b1f1
|
||||
```
|
||||
|
||||
Verify that each member, and then the entire cluster, becomes healthy with the new v3.4 etcd binary:
|
||||
|
||||
```
|
||||
$ ETCDCTL_API=3 /etcdctl endpoint health --endpoints=localhost:2379,localhost:22379,localhost:32379
|
||||
localhost:22379 is healthy: successfully committed proposal: took = 5.540129ms
|
||||
localhost:32379 is healthy: successfully committed proposal: took = 7.321771ms
|
||||
localhost:2379 is healthy: successfully committed proposal: took = 10.629901ms
|
||||
```
|
||||
|
||||
Upgraded members will log warnings like the following until the entire cluster is upgraded. This is expected and will cease after all etcd cluster members are upgraded to v3.4:
|
||||
|
||||
```
|
||||
14:15:17.071804 W | etcdserver: member c89feb932daef420 has a higher version 3.4.0
|
||||
14:15:21.073110 W | etcdserver: the local etcd version 3.3.0 is not up-to-date
|
||||
14:15:21.073142 W | etcdserver: member 6d4f535bae3ab960 has a higher version 3.4.0
|
||||
14:15:21.073157 W | etcdserver: the local etcd version 3.3.0 is not up-to-date
|
||||
14:15:21.073164 W | etcdserver: member c89feb932daef420 has a higher version 3.4.0
|
||||
```
|
||||
|
||||
#### 4. Repeat step 2 to step 3 for all other members
|
||||
|
||||
#### 5. Finish
|
||||
|
||||
When all members are upgraded, the cluster will report upgrading to 3.4 successfully:
|
||||
|
||||
```
|
||||
14:15:54.536901 N | etcdserver/membership: updated the cluster version from 3.3 to 3.4
|
||||
14:15:54.537035 I | etcdserver/api: enabled capabilities for version 3.4
|
||||
```
|
||||
|
||||
```
|
||||
$ ETCDCTL_API=3 /etcdctl endpoint health --endpoints=localhost:2379,localhost:22379,localhost:32379
|
||||
localhost:2379 is healthy: successfully committed proposal: took = 2.312897ms
|
||||
localhost:22379 is healthy: successfully committed proposal: took = 2.553476ms
|
||||
localhost:32379 is healthy: successfully committed proposal: took = 2.517902ms
|
||||
```
|
||||
|
||||
[etcd-contact]: https://groups.google.com/forum/#!forum/etcd-dev
|
@@ -55,6 +55,11 @@ func TestLeaseGrant(t *testing.T) {
|
||||
|
||||
kv := clus.RandClient()
|
||||
|
||||
_, merr := lapi.Grant(context.Background(), clientv3.MaxLeaseTTL+1)
|
||||
if merr != rpctypes.ErrLeaseTTLTooLarge {
|
||||
t.Fatalf("err = %v, want %v", merr, rpctypes.ErrLeaseTTLTooLarge)
|
||||
}
|
||||
|
||||
resp, err := lapi.Grant(context.Background(), 10)
|
||||
if err != nil {
|
||||
t.Errorf("failed to create lease %v", err)
|
||||
|
@@ -44,3 +44,6 @@ var (
|
||||
// Some options are exposed to "clientv3.Config".
|
||||
// Defaults will be overridden by the settings in "clientv3.Config".
|
||||
var defaultCallOpts = []grpc.CallOption{defaultFailFast, defaultMaxCallSendMsgSize, defaultMaxCallRecvMsgSize}
|
||||
|
||||
// MaxLeaseTTL is the maximum lease TTL value
|
||||
const MaxLeaseTTL = 9000000000
|
@@ -29,8 +29,6 @@ var (
|
||||
)
|
||||
|
||||
const (
|
||||
checkCompactionInterval = 5 * time.Minute
|
||||
|
||||
ModePeriodic = "periodic"
|
||||
ModeRevision = "revision"
|
||||
)
|
||||
|
@@ -46,30 +46,74 @@ type Periodic struct {
|
||||
// NewPeriodic creates a new instance of Periodic compactor that purges
|
||||
// the log older than h Duration.
|
||||
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
|
||||
return &Periodic{
|
||||
clock: clockwork.NewRealClock(),
|
||||
return newPeriodic(clockwork.NewRealClock(), h, rg, c)
|
||||
}
|
||||
|
||||
func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
|
||||
t := &Periodic{
|
||||
clock: clock,
|
||||
period: h,
|
||||
rg: rg,
|
||||
c: c,
|
||||
revs: make([]int64, 0),
|
||||
}
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
return t
|
||||
}
|
||||
|
||||
// periodDivisor divides Periodic.period in into checkCompactInterval duration
|
||||
const periodDivisor = 10
|
||||
/*
|
||||
Compaction period 1-hour:
|
||||
1. compute compaction period, which is 1-hour
|
||||
2. record revisions for every 1/10 of 1-hour (6-minute)
|
||||
3. keep recording revisions with no compaction for first 1-hour
|
||||
4. do compact with revs[0]
|
||||
- success? contiue on for-loop and move sliding window; revs = revs[1:]
|
||||
- failure? update revs, and retry after 1/10 of 1-hour (6-minute)
|
||||
|
||||
Compaction period 24-hour:
|
||||
1. compute compaction period, which is 1-hour
|
||||
2. record revisions for every 1/10 of 1-hour (6-minute)
|
||||
3. keep recording revisions with no compaction for first 24-hour
|
||||
4. do compact with revs[0]
|
||||
- success? contiue on for-loop and move sliding window; revs = revs[1:]
|
||||
- failure? update revs, and retry after 1/10 of 1-hour (6-minute)
|
||||
|
||||
Compaction period 59-min:
|
||||
1. compute compaction period, which is 59-min
|
||||
2. record revisions for every 1/10 of 59-min (5.9-min)
|
||||
3. keep recording revisions with no compaction for first 59-min
|
||||
4. do compact with revs[0]
|
||||
- success? contiue on for-loop and move sliding window; revs = revs[1:]
|
||||
- failure? update revs, and retry after 1/10 of 59-min (5.9-min)
|
||||
|
||||
Compaction period 5-sec:
|
||||
1. compute compaction period, which is 5-sec
|
||||
2. record revisions for every 1/10 of 5-sec (0.5-sec)
|
||||
3. keep recording revisions with no compaction for first 5-sec
|
||||
4. do compact with revs[0]
|
||||
- success? contiue on for-loop and move sliding window; revs = revs[1:]
|
||||
- failure? update revs, and retry after 1/10 of 5-sec (0.5-sec)
|
||||
*/
|
||||
|
||||
// Run runs periodic compactor.
|
||||
func (t *Periodic) Run() {
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
t.revs = make([]int64, 0)
|
||||
clock := t.clock
|
||||
checkCompactInterval := t.period / time.Duration(periodDivisor)
|
||||
compactInterval := t.getCompactInterval()
|
||||
retryInterval := t.getRetryInterval()
|
||||
retentions := t.getRetentions()
|
||||
|
||||
go func() {
|
||||
last := clock.Now()
|
||||
lastSuccess := t.clock.Now()
|
||||
baseInterval := t.period
|
||||
for {
|
||||
t.revs = append(t.revs, t.rg.Rev())
|
||||
if len(t.revs) > retentions {
|
||||
t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago
|
||||
}
|
||||
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
return
|
||||
case <-clock.After(checkCompactInterval):
|
||||
case <-t.clock.After(retryInterval):
|
||||
t.mu.Lock()
|
||||
p := t.paused
|
||||
t.mu.Unlock()
|
||||
@@ -77,46 +121,71 @@ func (t *Periodic) Run() {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if clock.Now().Sub(last) < t.period {
|
||||
|
||||
if t.clock.Now().Sub(lastSuccess) < baseInterval {
|
||||
continue
|
||||
}
|
||||
rev, remaining := t.getRev()
|
||||
if rev < 0 {
|
||||
continue
|
||||
|
||||
// wait up to initial given period
|
||||
if baseInterval == t.period {
|
||||
baseInterval = compactInterval
|
||||
}
|
||||
rev := t.revs[0]
|
||||
|
||||
plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
|
||||
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
|
||||
if err == nil || err == mvcc.ErrCompacted {
|
||||
t.revs = remaining
|
||||
lastSuccess = t.clock.Now()
|
||||
plog.Noticef("Finished auto-compaction at revision %d", rev)
|
||||
} else {
|
||||
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
|
||||
plog.Noticef("Retry after %v", checkCompactInterval)
|
||||
plog.Noticef("Retry after %v", retryInterval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// if given compaction period x is <1-hour, compact every x duration.
|
||||
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
|
||||
// if given compaction period x is >1-hour, compact every hour.
|
||||
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
|
||||
func (t *Periodic) getCompactInterval() time.Duration {
|
||||
itv := t.period
|
||||
if itv > time.Hour {
|
||||
itv = time.Hour
|
||||
}
|
||||
return itv
|
||||
}
|
||||
|
||||
func (t *Periodic) getRetentions() int {
|
||||
return int(t.period/t.getRetryInterval()) + 1
|
||||
}
|
||||
|
||||
const retryDivisor = 10
|
||||
|
||||
func (t *Periodic) getRetryInterval() time.Duration {
|
||||
itv := t.period
|
||||
if itv > time.Hour {
|
||||
itv = time.Hour
|
||||
}
|
||||
return itv / retryDivisor
|
||||
}
|
||||
|
||||
// Stop stops periodic compactor.
|
||||
func (t *Periodic) Stop() {
|
||||
t.cancel()
|
||||
}
|
||||
|
||||
// Pause pauses periodic compactor.
|
||||
func (t *Periodic) Pause() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = true
|
||||
}
|
||||
|
||||
// Resume resumes periodic compactor.
|
||||
func (t *Periodic) Resume() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = false
|
||||
}
|
||||
|
||||
func (t *Periodic) getRev() (int64, []int64) {
|
||||
i := len(t.revs) - periodDivisor
|
||||
if i < 0 {
|
||||
return -1, t.revs
|
||||
}
|
||||
return t.revs[i], t.revs[i+1:]
|
||||
}
|
||||
|
@@ -21,76 +21,129 @@ import (
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
)
|
||||
|
||||
func TestPeriodic(t *testing.T) {
|
||||
func TestPeriodicHourly(t *testing.T) {
|
||||
retentionHours := 2
|
||||
retentionDuration := time.Duration(retentionHours) * time.Hour
|
||||
|
||||
fc := clockwork.NewFakeClock()
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := &Periodic{
|
||||
clock: fc,
|
||||
period: retentionDuration,
|
||||
rg: rg,
|
||||
c: compactable,
|
||||
}
|
||||
tb := newPeriodic(fc, retentionDuration, rg, compactable)
|
||||
|
||||
tb.Run()
|
||||
defer tb.Stop()
|
||||
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
|
||||
n := periodDivisor
|
||||
// simulate 5 hours worth of intervals.
|
||||
for i := 0; i < n/retentionHours*5; i++ {
|
||||
|
||||
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
|
||||
|
||||
// compaction doesn't happen til 2 hours elapse
|
||||
for i := 0; i < initialIntervals; i++ {
|
||||
rg.Wait(1)
|
||||
fc.Advance(checkCompactInterval)
|
||||
// compaction doesn't happen til 2 hours elapses.
|
||||
if i < n {
|
||||
continue
|
||||
fc.Advance(tb.getRetryInterval())
|
||||
}
|
||||
|
||||
// very first compaction
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expectedRevision := int64(1)
|
||||
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
|
||||
}
|
||||
|
||||
// simulate 3 hours
|
||||
// now compactor kicks in, every hour
|
||||
for i := 0; i < 3; i++ {
|
||||
// advance one hour, one revision for each interval
|
||||
for j := 0; j < intervalsPerPeriod; j++ {
|
||||
rg.Wait(1)
|
||||
fc.Advance(tb.getRetryInterval())
|
||||
}
|
||||
// after 2 hours, compaction happens at every checkCompactInterval.
|
||||
a, err := compactable.Wait(1)
|
||||
|
||||
a, err = compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expectedRevision := int64(i + 1 - n)
|
||||
|
||||
expectedRevision = int64((i + 1) * 10)
|
||||
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// unblock the rev getter, so we can stop the compactor routine.
|
||||
_, err := rg.Wait(1)
|
||||
func TestPeriodicMinutes(t *testing.T) {
|
||||
retentionMinutes := 5
|
||||
retentionDuration := time.Duration(retentionMinutes) * time.Minute
|
||||
|
||||
fc := clockwork.NewFakeClock()
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := newPeriodic(fc, retentionDuration, rg, compactable)
|
||||
|
||||
tb.Run()
|
||||
defer tb.Stop()
|
||||
|
||||
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
|
||||
|
||||
// compaction doesn't happen til 5 minutes elapse
|
||||
for i := 0; i < initialIntervals; i++ {
|
||||
rg.Wait(1)
|
||||
fc.Advance(tb.getRetryInterval())
|
||||
}
|
||||
|
||||
// very first compaction
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expectedRevision := int64(1)
|
||||
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
|
||||
}
|
||||
|
||||
// compaction happens at every interval
|
||||
for i := 0; i < 5; i++ {
|
||||
// advance 5-minute, one revision for each interval
|
||||
for j := 0; j < intervalsPerPeriod; j++ {
|
||||
rg.Wait(1)
|
||||
fc.Advance(tb.getRetryInterval())
|
||||
}
|
||||
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expectedRevision = int64((i + 1) * 10)
|
||||
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeriodicPause(t *testing.T) {
|
||||
fc := clockwork.NewFakeClock()
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
retentionDuration := time.Hour
|
||||
tb := &Periodic{
|
||||
clock: fc,
|
||||
period: retentionDuration,
|
||||
rg: rg,
|
||||
c: compactable,
|
||||
}
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := newPeriodic(fc, retentionDuration, rg, compactable)
|
||||
|
||||
tb.Run()
|
||||
tb.Pause()
|
||||
|
||||
n := tb.getRetentions()
|
||||
|
||||
// tb will collect 3 hours of revisions but not compact since paused
|
||||
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
|
||||
n := periodDivisor
|
||||
for i := 0; i < 3*n; i++ {
|
||||
for i := 0; i < n*3; i++ {
|
||||
rg.Wait(1)
|
||||
fc.Advance(checkCompactInterval)
|
||||
fc.Advance(tb.getRetryInterval())
|
||||
}
|
||||
// tb ends up waiting for the clock
|
||||
// t.revs = [21 22 23 24 25 26 27 28 29 30]
|
||||
|
||||
select {
|
||||
case a := <-compactable.Chan():
|
||||
@@ -100,14 +153,17 @@ func TestPeriodicPause(t *testing.T) {
|
||||
|
||||
// tb resumes to being blocked on the clock
|
||||
tb.Resume()
|
||||
|
||||
// unblock clock, will kick off a compaction at hour 3:06
|
||||
rg.Wait(1)
|
||||
fc.Advance(checkCompactInterval)
|
||||
|
||||
// unblock clock, will kick off a compaction at T=3h6m by retry
|
||||
fc.Advance(tb.getRetryInterval())
|
||||
|
||||
// T=3h6m
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// compact the revision from hour 2:06
|
||||
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
|
||||
if !reflect.DeepEqual(a[0].Params[0], wreq) {
|
||||
|
@@ -17,6 +17,7 @@ package compactor
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
@@ -43,25 +44,31 @@ type Revision struct {
|
||||
// NewRevision creates a new instance of Revisonal compactor that purges
|
||||
// the log older than retention revisions from the current revision.
|
||||
func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision {
|
||||
return &Revision{
|
||||
clock: clockwork.NewRealClock(),
|
||||
return newRevision(clockwork.NewRealClock(), retention, rg, c)
|
||||
}
|
||||
|
||||
func newRevision(clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
|
||||
t := &Revision{
|
||||
clock: clock,
|
||||
retention: retention,
|
||||
rg: rg,
|
||||
c: c,
|
||||
}
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *Revision) Run() {
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
clock := t.clock
|
||||
previous := int64(0)
|
||||
const revInterval = 5 * time.Minute
|
||||
|
||||
// Run runs revision-based compactor.
|
||||
func (t *Revision) Run() {
|
||||
prev := int64(0)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
return
|
||||
case <-clock.After(checkCompactionInterval):
|
||||
case <-t.clock.After(revInterval):
|
||||
t.mu.Lock()
|
||||
p := t.paused
|
||||
t.mu.Unlock()
|
||||
@@ -71,34 +78,36 @@ func (t *Revision) Run() {
|
||||
}
|
||||
|
||||
rev := t.rg.Rev() - t.retention
|
||||
|
||||
if rev <= 0 || rev == previous {
|
||||
if rev <= 0 || rev == prev {
|
||||
continue
|
||||
}
|
||||
|
||||
plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention)
|
||||
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
|
||||
if err == nil || err == mvcc.ErrCompacted {
|
||||
previous = rev
|
||||
prev = rev
|
||||
plog.Noticef("Finished auto-compaction at revision %d", rev)
|
||||
} else {
|
||||
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
|
||||
plog.Noticef("Retry after %v", checkCompactionInterval)
|
||||
plog.Noticef("Retry after %v", revInterval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop stops revision-based compactor.
|
||||
func (t *Revision) Stop() {
|
||||
t.cancel()
|
||||
}
|
||||
|
||||
// Pause pauses revision-based compactor.
|
||||
func (t *Revision) Pause() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = true
|
||||
}
|
||||
|
||||
// Resume resumes revision-based compactor.
|
||||
func (t *Revision) Resume() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
)
|
||||
|
||||
@@ -28,23 +29,18 @@ func TestRevision(t *testing.T) {
|
||||
fc := clockwork.NewFakeClock()
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := &Revision{
|
||||
clock: fc,
|
||||
retention: 10,
|
||||
rg: rg,
|
||||
c: compactable,
|
||||
}
|
||||
tb := newRevision(fc, 10, rg, compactable)
|
||||
|
||||
tb.Run()
|
||||
defer tb.Stop()
|
||||
|
||||
fc.Advance(checkCompactionInterval)
|
||||
fc.Advance(revInterval)
|
||||
rg.Wait(1)
|
||||
// nothing happens
|
||||
|
||||
rg.SetRev(99) // will be 100
|
||||
expectedRevision := int64(90)
|
||||
fc.Advance(checkCompactionInterval)
|
||||
fc.Advance(revInterval)
|
||||
rg.Wait(1)
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
@@ -61,7 +57,7 @@ func TestRevision(t *testing.T) {
|
||||
|
||||
rg.SetRev(199) // will be 200
|
||||
expectedRevision = int64(190)
|
||||
fc.Advance(checkCompactionInterval)
|
||||
fc.Advance(revInterval)
|
||||
rg.Wait(1)
|
||||
a, err = compactable.Wait(1)
|
||||
if err != nil {
|
||||
@@ -74,22 +70,17 @@ func TestRevision(t *testing.T) {
|
||||
|
||||
func TestRevisionPause(t *testing.T) {
|
||||
fc := clockwork.NewFakeClock()
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
|
||||
tb := &Revision{
|
||||
clock: fc,
|
||||
retention: 10,
|
||||
rg: rg,
|
||||
c: compactable,
|
||||
}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := newRevision(fc, 10, rg, compactable)
|
||||
|
||||
tb.Run()
|
||||
tb.Pause()
|
||||
|
||||
// tb will collect 3 hours of revisions but not compact since paused
|
||||
n := int(time.Hour / checkCompactionInterval)
|
||||
n := int(time.Hour / revInterval)
|
||||
for i := 0; i < 3*n; i++ {
|
||||
fc.Advance(checkCompactionInterval)
|
||||
fc.Advance(revInterval)
|
||||
}
|
||||
// tb ends up waiting for the clock
|
||||
|
||||
@@ -103,7 +94,7 @@ func TestRevisionPause(t *testing.T) {
|
||||
tb.Resume()
|
||||
|
||||
// unblock clock, will kick off a compaction at hour 3:05
|
||||
fc.Advance(checkCompactionInterval)
|
||||
fc.Advance(revInterval)
|
||||
rg.Wait(1)
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
|
13
e2e/util.go
13
e2e/util.go
@@ -42,9 +42,14 @@ func spawnWithExpect(args []string, expected string) error {
|
||||
}
|
||||
|
||||
func spawnWithExpects(args []string, xs ...string) error {
|
||||
_, err := spawnWithExpectLines(args, xs...)
|
||||
return err
|
||||
}
|
||||
|
||||
func spawnWithExpectLines(args []string, xs ...string) ([]string, error) {
|
||||
proc, err := spawnCmd(args)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
// process until either stdout or stderr contains
|
||||
// the expected string
|
||||
@@ -57,7 +62,7 @@ func spawnWithExpects(args []string, xs ...string) error {
|
||||
l, lerr := proc.ExpectFunc(lineFunc)
|
||||
if lerr != nil {
|
||||
proc.Close()
|
||||
return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines)
|
||||
return nil, fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines)
|
||||
}
|
||||
lines = append(lines, l)
|
||||
if strings.Contains(l, txt) {
|
||||
@@ -67,9 +72,9 @@ func spawnWithExpects(args []string, xs ...string) error {
|
||||
}
|
||||
perr := proc.Close()
|
||||
if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output
|
||||
return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
|
||||
return nil, fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
|
||||
}
|
||||
return perr
|
||||
return lines, perr
|
||||
}
|
||||
|
||||
func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error {
|
||||
|
@@ -15,10 +15,13 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"path"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
|
||||
@@ -271,3 +274,119 @@ func testV3CurlAuth(t *testing.T, pathPrefix string) {
|
||||
t.Fatalf("failed auth put with curl (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestV3CurlCampaignAlpha(t *testing.T) { testV3CurlCampaign(t, "/v3alpha") }
|
||||
func TestV3CurlCampaignBeta(t *testing.T) { testV3CurlCampaign(t, "/v3beta") }
|
||||
func testV3CurlCampaign(t *testing.T, pathPrefix string) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
epc, err := newEtcdProcessCluster(&configNoTLS)
|
||||
if err != nil {
|
||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||
}
|
||||
defer func() {
|
||||
if cerr := epc.Close(); err != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", cerr)
|
||||
}
|
||||
}()
|
||||
|
||||
cdata, err := json.Marshal(&epb.CampaignRequest{
|
||||
Name: []byte("/election-prefix"),
|
||||
Value: []byte("v1"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cargs := cURLPrefixArgs(epc, "POST", cURLReq{
|
||||
endpoint: path.Join(pathPrefix, "/election/campaign"),
|
||||
value: string(cdata),
|
||||
})
|
||||
lines, err := spawnWithExpectLines(cargs, `"leader":{"name":"`)
|
||||
if err != nil {
|
||||
t.Fatalf("failed post campaign request (%s) (%v)", pathPrefix, err)
|
||||
}
|
||||
if len(lines) != 1 {
|
||||
t.Fatalf("len(lines) expected 1, got %+v", lines)
|
||||
}
|
||||
|
||||
var cresp campaignResponse
|
||||
if err = json.Unmarshal([]byte(lines[0]), &cresp); err != nil {
|
||||
t.Fatalf("failed to unmarshal campaign response %v", err)
|
||||
}
|
||||
ndata, err := base64.StdEncoding.DecodeString(cresp.Leader.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to decode leader key %v", err)
|
||||
}
|
||||
kdata, err := base64.StdEncoding.DecodeString(cresp.Leader.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to decode leader key %v", err)
|
||||
}
|
||||
|
||||
rev, _ := strconv.ParseInt(cresp.Leader.Rev, 10, 64)
|
||||
lease, _ := strconv.ParseInt(cresp.Leader.Lease, 10, 64)
|
||||
pdata, err := json.Marshal(&epb.ProclaimRequest{
|
||||
Leader: &epb.LeaderKey{
|
||||
Name: ndata,
|
||||
Key: kdata,
|
||||
Rev: rev,
|
||||
Lease: lease,
|
||||
},
|
||||
Value: []byte("v2"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = cURLPost(epc, cURLReq{
|
||||
endpoint: path.Join(pathPrefix, "/election/proclaim"),
|
||||
value: string(pdata),
|
||||
expected: `"revision":`,
|
||||
}); err != nil {
|
||||
t.Fatalf("failed post proclaim request (%s) (%v)", pathPrefix, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestV3CurlProclaimMissiongLeaderKeyNoTLS(t *testing.T) {
|
||||
testCtl(t, testV3CurlProclaimMissiongLeaderKey, withCfg(configNoTLS))
|
||||
}
|
||||
|
||||
func testV3CurlProclaimMissiongLeaderKey(cx ctlCtx) {
|
||||
pdata, err := json.Marshal(&epb.ProclaimRequest{Value: []byte("v2")})
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
if err = cURLPost(cx.epc, cURLReq{
|
||||
endpoint: path.Join("/v3beta", "/election/proclaim"),
|
||||
value: string(pdata),
|
||||
expected: `{"error":"\"leader\" field must be provided","code":2}`,
|
||||
}); err != nil {
|
||||
cx.t.Fatalf("failed post proclaim request (%s) (%v)", "/v3beta", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestV3CurlResignMissiongLeaderKeyNoTLS(t *testing.T) {
|
||||
testCtl(t, testV3CurlResignMissiongLeaderKey, withCfg(configNoTLS))
|
||||
}
|
||||
|
||||
func testV3CurlResignMissiongLeaderKey(cx ctlCtx) {
|
||||
if err := cURLPost(cx.epc, cURLReq{
|
||||
endpoint: path.Join("/v3beta", "/election/resign"),
|
||||
value: `{}`,
|
||||
expected: `{"error":"\"leader\" field must be provided","code":2}`,
|
||||
}); err != nil {
|
||||
cx.t.Fatalf("failed post resign request (%s) (%v)", "/v3beta", err)
|
||||
}
|
||||
}
|
||||
|
||||
// to manually decode; JSON marshals integer fields with
|
||||
// string types, so can't unmarshal with epb.CampaignResponse
|
||||
type campaignResponse struct {
|
||||
Leader struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Key string `json:"key,omitempty"`
|
||||
Rev string `json:"rev,omitempty"`
|
||||
Lease string `json:"lease,omitempty"`
|
||||
} `json:"leader,omitempty"`
|
||||
}
|
||||
|
@@ -26,6 +26,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/compactor"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/netutil"
|
||||
@@ -90,16 +91,22 @@ func init() {
|
||||
type Config struct {
|
||||
// member
|
||||
|
||||
CorsInfo *cors.CORSInfo
|
||||
LPUrls, LCUrls []url.URL
|
||||
Dir string `json:"data-dir"`
|
||||
WalDir string `json:"wal-dir"`
|
||||
MaxSnapFiles uint `json:"max-snapshots"`
|
||||
MaxWalFiles uint `json:"max-wals"`
|
||||
Name string `json:"name"`
|
||||
SnapCount uint64 `json:"snapshot-count"`
|
||||
CorsInfo *cors.CORSInfo
|
||||
LPUrls, LCUrls []url.URL
|
||||
Dir string `json:"data-dir"`
|
||||
WalDir string `json:"wal-dir"`
|
||||
MaxSnapFiles uint `json:"max-snapshots"`
|
||||
MaxWalFiles uint `json:"max-wals"`
|
||||
Name string `json:"name"`
|
||||
SnapCount uint64 `json:"snapshot-count"`
|
||||
|
||||
// AutoCompactionMode is either 'periodic' or 'revision'.
|
||||
AutoCompactionMode string `json:"auto-compaction-mode"`
|
||||
// AutoCompactionRetention is either duration string with time unit
|
||||
// (e.g. '5m' for 5-minute), or revision unit (e.g. '5000').
|
||||
// If no time unit is provided and compaction mode is 'periodic',
|
||||
// the unit defaults to hour. For example, '5' translates into 5-hour.
|
||||
AutoCompactionRetention string `json:"auto-compaction-retention"`
|
||||
AutoCompactionMode string `json:"auto-compaction-mode"`
|
||||
|
||||
// TickMs is the number of milliseconds between heartbeat ticks.
|
||||
// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
|
||||
@@ -388,6 +395,7 @@ func (cfg *configYAML) configFromFile(path string) error {
|
||||
return cfg.Validate()
|
||||
}
|
||||
|
||||
// Validate ensures that '*embed.Config' fields are properly configured.
|
||||
func (cfg *Config) Validate() error {
|
||||
if err := checkBindURLs(cfg.LPUrls); err != nil {
|
||||
return err
|
||||
@@ -449,6 +457,13 @@ func (cfg *Config) Validate() error {
|
||||
return ErrUnsetAdvertiseClientURLsFlag
|
||||
}
|
||||
|
||||
switch cfg.AutoCompactionMode {
|
||||
case "":
|
||||
case compactor.ModeRevision, compactor.ModePeriodic:
|
||||
default:
|
||||
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -148,3 +148,22 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File {
|
||||
}
|
||||
return tmpfile
|
||||
}
|
||||
|
||||
func TestAutoCompactionModeInvalid(t *testing.T) {
|
||||
cfg := NewConfig()
|
||||
cfg.AutoCompactionMode = "period"
|
||||
err := cfg.Validate()
|
||||
if err == nil {
|
||||
t.Errorf("expected non-nil error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAutoCompactionModeParse(t *testing.T) {
|
||||
dur, err := parseCompactionRetention("revision", "1")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if dur != 1 {
|
||||
t.Fatalf("AutoCompactionRetention expected 1, got %d", dur)
|
||||
}
|
||||
}
|
||||
|
@@ -27,6 +27,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/compactor"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||
@@ -134,22 +135,13 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
autoCompactionRetention time.Duration
|
||||
h int
|
||||
)
|
||||
// AutoCompactionRetention defaults to "0" if not set.
|
||||
if len(cfg.AutoCompactionRetention) == 0 {
|
||||
cfg.AutoCompactionRetention = "0"
|
||||
}
|
||||
h, err = strconv.Atoi(cfg.AutoCompactionRetention)
|
||||
if err == nil {
|
||||
autoCompactionRetention = time.Duration(int64(h)) * time.Hour
|
||||
} else {
|
||||
autoCompactionRetention, err = time.ParseDuration(cfg.AutoCompactionRetention)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing AutoCompactionRetention: %v", err)
|
||||
}
|
||||
autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
|
||||
if err != nil {
|
||||
return e, err
|
||||
}
|
||||
|
||||
srvcfg := etcdserver.ServerConfig{
|
||||
@@ -562,3 +554,22 @@ func (e *Etcd) errHandler(err error) {
|
||||
case e.errc <- err:
|
||||
}
|
||||
}
|
||||
|
||||
func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
|
||||
h, err := strconv.Atoi(retention)
|
||||
if err == nil {
|
||||
switch mode {
|
||||
case compactor.ModeRevision:
|
||||
ret = time.Duration(int64(h))
|
||||
case compactor.ModePeriodic:
|
||||
ret = time.Duration(int64(h)) * time.Hour
|
||||
}
|
||||
} else {
|
||||
// periodic compaction
|
||||
ret, err = time.ParseDuration(retention)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error parsing CompactionRetention: %v", err)
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
@@ -876,10 +876,11 @@ If NOSPACE alarm is present:
|
||||
|
||||
### DEFRAG [options]
|
||||
|
||||
DEFRAG defragments the backend database file for a set of given endpoints while etcd is running, or directly defragments an
|
||||
etcd data directory while etcd is not running. When an etcd member reclaims storage space from deleted and compacted keys, the
|
||||
space is kept in a free list and the database file remains the same size. By defragmenting the database, the etcd member
|
||||
releases this free space back to the file system.
|
||||
DEFRAG defragments the backend database file for a set of given endpoints while etcd is running, or directly defragments an etcd data directory while etcd is not running. When an etcd member reclaims storage space from deleted and compacted keys, the space is kept in a free list and the database file remains the same size. By defragmenting the database, the etcd member releases this free space back to the file system.
|
||||
|
||||
**Note that defragmentation to a live member blocks the system from reading and writing data while rebuilding its states.**
|
||||
|
||||
**Note that defragmentation request does not get replicated over cluster. That is, the request is only applied to the local node. Specify all members in `--endpoints` flag.**
|
||||
|
||||
#### Options
|
||||
|
||||
|
@@ -16,12 +16,17 @@ package v3election
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/clientv3/concurrency"
|
||||
epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
|
||||
)
|
||||
|
||||
// ErrMissingLeaderKey is returned when election API request
|
||||
// is missing the "leader" field.
|
||||
var ErrMissingLeaderKey = errors.New(`"leader" field must be provided`)
|
||||
|
||||
type electionServer struct {
|
||||
c *clientv3.Client
|
||||
}
|
||||
@@ -51,6 +56,9 @@ func (es *electionServer) Campaign(ctx context.Context, req *epb.CampaignRequest
|
||||
}
|
||||
|
||||
func (es *electionServer) Proclaim(ctx context.Context, req *epb.ProclaimRequest) (*epb.ProclaimResponse, error) {
|
||||
if req.Leader == nil {
|
||||
return nil, ErrMissingLeaderKey
|
||||
}
|
||||
s, err := es.session(ctx, req.Leader.Lease)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -98,6 +106,9 @@ func (es *electionServer) Leader(ctx context.Context, req *epb.LeaderRequest) (*
|
||||
}
|
||||
|
||||
func (es *electionServer) Resign(ctx context.Context, req *epb.ResignRequest) (*epb.ResignResponse, error) {
|
||||
if req.Leader == nil {
|
||||
return nil, ErrMissingLeaderKey
|
||||
}
|
||||
s, err := es.session(ctx, req.Leader.Lease)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@@ -31,8 +31,9 @@ var (
|
||||
ErrGRPCFutureRev = status.New(codes.OutOfRange, "etcdserver: mvcc: required revision is a future revision").Err()
|
||||
ErrGRPCNoSpace = status.New(codes.ResourceExhausted, "etcdserver: mvcc: database space exceeded").Err()
|
||||
|
||||
ErrGRPCLeaseNotFound = status.New(codes.NotFound, "etcdserver: requested lease not found").Err()
|
||||
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
|
||||
ErrGRPCLeaseNotFound = status.New(codes.NotFound, "etcdserver: requested lease not found").Err()
|
||||
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
|
||||
ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()
|
||||
|
||||
ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
|
||||
ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
|
||||
@@ -80,8 +81,9 @@ var (
|
||||
ErrorDesc(ErrGRPCFutureRev): ErrGRPCFutureRev,
|
||||
ErrorDesc(ErrGRPCNoSpace): ErrGRPCNoSpace,
|
||||
|
||||
ErrorDesc(ErrGRPCLeaseNotFound): ErrGRPCLeaseNotFound,
|
||||
ErrorDesc(ErrGRPCLeaseExist): ErrGRPCLeaseExist,
|
||||
ErrorDesc(ErrGRPCLeaseNotFound): ErrGRPCLeaseNotFound,
|
||||
ErrorDesc(ErrGRPCLeaseExist): ErrGRPCLeaseExist,
|
||||
ErrorDesc(ErrGRPCLeaseTTLTooLarge): ErrGRPCLeaseTTLTooLarge,
|
||||
|
||||
ErrorDesc(ErrGRPCMemberExist): ErrGRPCMemberExist,
|
||||
ErrorDesc(ErrGRPCPeerURLExist): ErrGRPCPeerURLExist,
|
||||
@@ -131,8 +133,9 @@ var (
|
||||
ErrFutureRev = Error(ErrGRPCFutureRev)
|
||||
ErrNoSpace = Error(ErrGRPCNoSpace)
|
||||
|
||||
ErrLeaseNotFound = Error(ErrGRPCLeaseNotFound)
|
||||
ErrLeaseExist = Error(ErrGRPCLeaseExist)
|
||||
ErrLeaseNotFound = Error(ErrGRPCLeaseNotFound)
|
||||
ErrLeaseExist = Error(ErrGRPCLeaseExist)
|
||||
ErrLeaseTTLTooLarge = Error(ErrGRPCLeaseTTLTooLarge)
|
||||
|
||||
ErrMemberExist = Error(ErrGRPCMemberExist)
|
||||
ErrPeerURLExist = Error(ErrGRPCPeerURLExist)
|
||||
|
@@ -52,8 +52,9 @@ var toGRPCErrorMap = map[error]error{
|
||||
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
|
||||
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
|
||||
|
||||
lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
|
||||
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
|
||||
lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
|
||||
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
|
||||
lease.ErrLeaseTTLTooLarge: rpctypes.ErrGRPCLeaseTTLTooLarge,
|
||||
|
||||
auth.ErrRootUserNotExist: rpctypes.ErrGRPCRootUserNotExist,
|
||||
auth.ErrRootRoleNotExist: rpctypes.ErrGRPCRootRoleNotExist,
|
||||
|
@@ -95,6 +95,7 @@ type raftNode struct {
|
||||
term uint64
|
||||
lead uint64
|
||||
|
||||
tickMu *sync.Mutex
|
||||
raftNodeConfig
|
||||
|
||||
// a chan to send/receive snapshot
|
||||
@@ -131,6 +132,7 @@ type raftNodeConfig struct {
|
||||
|
||||
func newRaftNode(cfg raftNodeConfig) *raftNode {
|
||||
r := &raftNode{
|
||||
tickMu: new(sync.Mutex),
|
||||
raftNodeConfig: cfg,
|
||||
// set up contention detectors for raft heartbeat message.
|
||||
// expect to send a heartbeat within 2 heartbeat intervals.
|
||||
@@ -149,6 +151,13 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
|
||||
return r
|
||||
}
|
||||
|
||||
// raft.Node does not have locks in Raft package
|
||||
func (r *raftNode) tick() {
|
||||
r.tickMu.Lock()
|
||||
r.Tick()
|
||||
r.tickMu.Unlock()
|
||||
}
|
||||
|
||||
// start prepares and starts raftNode in a new goroutine. It is no longer safe
|
||||
// to modify the fields after it has been started.
|
||||
func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
@@ -161,7 +170,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
for {
|
||||
select {
|
||||
case <-r.ticker.C:
|
||||
r.Tick()
|
||||
r.tick()
|
||||
case rd := <-r.Ready():
|
||||
if rd.SoftState != nil {
|
||||
newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
|
||||
@@ -368,13 +377,13 @@ func (r *raftNode) resumeSending() {
|
||||
p.Resume()
|
||||
}
|
||||
|
||||
// advanceTicksForElection advances ticks to the node for fast election.
|
||||
// This reduces the time to wait for first leader election if bootstrapping the whole
|
||||
// cluster, while leaving at least 1 heartbeat for possible existing leader
|
||||
// to contact it.
|
||||
func advanceTicksForElection(n raft.Node, electionTicks int) {
|
||||
for i := 0; i < electionTicks-1; i++ {
|
||||
n.Tick()
|
||||
// advanceTicks advances ticks of Raft node.
|
||||
// This can be used for fast-forwarding election
|
||||
// ticks in multi data-center deployments, thus
|
||||
// speeding up election process.
|
||||
func (r *raftNode) advanceTicks(ticks int) {
|
||||
for i := 0; i < ticks; i++ {
|
||||
r.tick()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -415,7 +424,6 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
|
||||
raftStatusMu.Lock()
|
||||
raftStatus = n.Status
|
||||
raftStatusMu.Unlock()
|
||||
advanceTicksForElection(n, c.ElectionTick)
|
||||
return id, n, s, w
|
||||
}
|
||||
|
||||
@@ -449,7 +457,6 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
|
||||
raftStatusMu.Lock()
|
||||
raftStatus = n.Status
|
||||
raftStatusMu.Unlock()
|
||||
advanceTicksForElection(n, c.ElectionTick)
|
||||
return id, cl, n, s, w
|
||||
}
|
||||
|
||||
@@ -498,6 +505,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
|
||||
Storage: s,
|
||||
MaxSizePerMsg: maxSizePerMsg,
|
||||
MaxInflightMsgs: maxInflightMsgs,
|
||||
CheckQuorum: true,
|
||||
}
|
||||
n := raft.RestartNode(c)
|
||||
raftStatus = n.Status
|
||||
|
@@ -521,12 +521,51 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
return srv, nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) adjustTicks() {
|
||||
clusterN := len(s.cluster.Members())
|
||||
|
||||
// single-node fresh start, or single-node recovers from snapshot
|
||||
if clusterN == 1 {
|
||||
ticks := s.Cfg.ElectionTicks - 1
|
||||
plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
|
||||
s.r.advanceTicks(ticks)
|
||||
return
|
||||
}
|
||||
|
||||
// retry up to "rafthttp.ConnReadTimeout", which is 5-sec
|
||||
// until peer connection reports; otherwise:
|
||||
// 1. all connections failed, or
|
||||
// 2. no active peers, or
|
||||
// 3. restarted single-node with no snapshot
|
||||
// then, do nothing, because advancing ticks would have no effect
|
||||
waitTime := rafthttp.ConnReadTimeout
|
||||
itv := 50 * time.Millisecond
|
||||
for i := int64(0); i < int64(waitTime/itv); i++ {
|
||||
select {
|
||||
case <-time.After(itv):
|
||||
case <-s.stopping:
|
||||
return
|
||||
}
|
||||
|
||||
peerN := s.r.transport.ActivePeers()
|
||||
if peerN > 1 {
|
||||
// multi-node received peer connection reports
|
||||
// adjust ticks, in case slow leader message receive
|
||||
ticks := s.Cfg.ElectionTicks - 2
|
||||
plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
|
||||
s.r.advanceTicks(ticks)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start performs any initialization of the Server necessary for it to
|
||||
// begin serving requests. It must be called before Do or Process.
|
||||
// Start must be non-blocking; any long-running server functionality
|
||||
// should be implemented in goroutines.
|
||||
func (s *EtcdServer) Start() {
|
||||
s.start()
|
||||
s.goAttach(func() { s.adjustTicks() })
|
||||
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
|
||||
s.goAttach(s.purgeFile)
|
||||
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
|
||||
|
@@ -83,6 +83,7 @@ func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporterWithActiveTime) RemoveAllPeers() {}
|
||||
func (s *nopTransporterWithActiveTime) UpdatePeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { return s.activeMap[id] }
|
||||
func (s *nopTransporterWithActiveTime) ActivePeers() int { return 0 }
|
||||
func (s *nopTransporterWithActiveTime) Stop() {}
|
||||
func (s *nopTransporterWithActiveTime) Pause() {}
|
||||
func (s *nopTransporterWithActiveTime) Resume() {}
|
||||
|
@@ -3,8 +3,11 @@
|
||||
|
||||
|
||||
# Example:
|
||||
# make clean -f ./hack/scripts-dev/Makefile
|
||||
# make build -f ./hack/scripts-dev/Makefile
|
||||
# make clean -f ./hack/scripts-dev/Makefile
|
||||
# make clean-docker -f ./hack/scripts-dev/Makefile
|
||||
# make restart-docker -f ./hack/scripts-dev/Makefile
|
||||
# make delete-docker-images -f ./hack/scripts-dev/Makefile
|
||||
|
||||
.PHONY: build
|
||||
build:
|
||||
@@ -25,9 +28,20 @@ clean:
|
||||
rm -f ./clientv3/integration/127.0.0.1:* ./clientv3/integration/localhost:*
|
||||
rm -f ./clientv3/ordering/127.0.0.1:* ./clientv3/ordering/localhost:*
|
||||
|
||||
clean-docker:
|
||||
docker images
|
||||
docker image prune --force
|
||||
|
||||
restart-docker:
|
||||
service docker restart
|
||||
|
||||
delete-docker-images:
|
||||
docker rm --force $(docker ps -a -q) || true
|
||||
docker rmi --force $(docker images -q) || true
|
||||
|
||||
|
||||
GO_VERSION ?= 1.9.4
|
||||
|
||||
GO_VERSION ?= 1.10
|
||||
ETCD_VERSION ?= $(shell git rev-parse --short HEAD || echo "GitNotFound")
|
||||
|
||||
TEST_SUFFIX = $(shell date +%s | base64 | head -c 15)
|
||||
@@ -71,14 +85,23 @@ pull-docker-test:
|
||||
# Example:
|
||||
# make build-docker-test -f ./hack/scripts-dev/Makefile
|
||||
# make compile-with-docker-test -f ./hack/scripts-dev/Makefile
|
||||
# make compile-setup-gopath-with-docker-test -f ./hack/scripts-dev/Makefile
|
||||
|
||||
compile-with-docker-test:
|
||||
$(info GO_VERSION: $(GO_VERSION))
|
||||
docker run \
|
||||
--rm \
|
||||
--mount type=bind,source=`pwd`,destination=/go/src/github.com/coreos/etcd \
|
||||
gcr.io/etcd-development/etcd-test:go$(GO_VERSION) \
|
||||
/bin/bash -c "GO_BUILD_FLAGS=-v ./build && ./bin/etcd --version"
|
||||
|
||||
compile-setup-gopath-with-docker-test:
|
||||
$(info GO_VERSION: $(GO_VERSION))
|
||||
docker run \
|
||||
--rm \
|
||||
--mount type=bind,source=`pwd`,destination=/etcd \
|
||||
gcr.io/etcd-development/etcd-test:go$(GO_VERSION) \
|
||||
/bin/bash -c "cd /etcd && GO_BUILD_FLAGS=-v ./build && ./bin/etcd --version"
|
||||
/bin/bash -c "cd /etcd && ETCD_SETUP_GOPATH=1 GO_BUILD_FLAGS=-v ./build && ./bin/etcd --version && rm -rf ./gopath"
|
||||
|
||||
|
||||
|
||||
@@ -87,7 +110,7 @@ compile-with-docker-test:
|
||||
# Local machine:
|
||||
# TEST_OPTS="PASSES='fmt'" make test -f ./hack/scripts-dev/Makefile
|
||||
# TEST_OPTS="PASSES='fmt bom dep compile build unit'" make test -f ./hack/scripts-dev/Makefile
|
||||
# TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional'" make test -f ./hack/scripts-dev/Makefile
|
||||
# TEST_OPTS="PASSES='build unit release integration_e2e functional'" make test -f ./hack/scripts-dev/Makefile
|
||||
# TEST_OPTS="PASSES='build grpcproxy'" make test -f ./hack/scripts-dev/Makefile
|
||||
#
|
||||
# Example (test with docker):
|
||||
@@ -99,8 +122,8 @@ compile-with-docker-test:
|
||||
# TEST_OPTS="PASSES='fmt bom dep compile build unit'" make docker-test -f ./hack/scripts-dev/Makefile
|
||||
#
|
||||
# Semaphore CI (test with docker):
|
||||
# TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional'" make docker-test -f ./hack/scripts-dev/Makefile
|
||||
# HOST_TMP_DIR=/tmp TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional'" make docker-test -f ./hack/scripts-dev/Makefile
|
||||
# TEST_OPTS="PASSES='build unit release integration_e2e functional'" make docker-test -f ./hack/scripts-dev/Makefile
|
||||
# HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build unit release integration_e2e functional'" make docker-test -f ./hack/scripts-dev/Makefile
|
||||
# TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'" make docker-test -f ./hack/scripts-dev/Makefile
|
||||
#
|
||||
# grpc-proxy tests (test with docker):
|
||||
@@ -146,11 +169,12 @@ docker-test-coverage:
|
||||
|
||||
|
||||
# Example:
|
||||
# ETCD_VERSION=v3.3.0-test.0 make build-docker-release-master -f ./hack/scripts-dev/Makefile
|
||||
# ETCD_VERSION=v3.3.0-test.0 make push-docker-release-master -f ./hack/scripts-dev/Makefile
|
||||
# make compile-with-docker-test -f ./hack/scripts-dev/Makefile
|
||||
# ETCD_VERSION=v3-test make build-docker-release-master -f ./hack/scripts-dev/Makefile
|
||||
# ETCD_VERSION=v3-test make push-docker-release-master -f ./hack/scripts-dev/Makefile
|
||||
# gsutil -m acl ch -u allUsers:R -r gs://artifacts.etcd-development.appspot.com
|
||||
|
||||
build-docker-release-master: compile-with-docker-test
|
||||
build-docker-release-master:
|
||||
$(info ETCD_VERSION: $(ETCD_VERSION))
|
||||
cp ./Dockerfile-release ./bin/Dockerfile-release
|
||||
docker build \
|
||||
@@ -234,6 +258,7 @@ docker-static-ip-test-certs-metrics-proxy-run:
|
||||
# make push-docker-dns-test -f ./hack/scripts-dev/Makefile
|
||||
# gsutil -m acl ch -u allUsers:R -r gs://artifacts.etcd-development.appspot.com
|
||||
# make pull-docker-dns-test -f ./hack/scripts-dev/Makefile
|
||||
# make docker-dns-test-insecure-run -f ./hack/scripts-dev/Makefile
|
||||
# make docker-dns-test-certs-run -f ./hack/scripts-dev/Makefile
|
||||
# make docker-dns-test-certs-gateway-run -f ./hack/scripts-dev/Makefile
|
||||
# make docker-dns-test-certs-wildcard-run -f ./hack/scripts-dev/Makefile
|
||||
@@ -263,6 +288,20 @@ pull-docker-dns-test:
|
||||
$(info GO_VERSION: $(GO_VERSION))
|
||||
docker pull gcr.io/etcd-development/etcd-dns-test:go$(GO_VERSION)
|
||||
|
||||
docker-dns-test-insecure-run:
|
||||
$(info GO_VERSION: $(GO_VERSION))
|
||||
$(info HOST_TMP_DIR: $(HOST_TMP_DIR))
|
||||
$(info TMP_DIR_MOUNT_FLAG: $(TMP_DIR_MOUNT_FLAG))
|
||||
docker run \
|
||||
--rm \
|
||||
--tty \
|
||||
--dns 127.0.0.1 \
|
||||
$(TMP_DIR_MOUNT_FLAG) \
|
||||
--mount type=bind,source=`pwd`/bin,destination=/etcd \
|
||||
--mount type=bind,source=`pwd`/hack/scripts-dev/docker-dns/insecure,destination=/insecure \
|
||||
gcr.io/etcd-development/etcd-dns-test:go$(GO_VERSION) \
|
||||
/bin/bash -c "cd /etcd && /insecure/run.sh && rm -rf m*.etcd"
|
||||
|
||||
docker-dns-test-certs-run:
|
||||
$(info GO_VERSION: $(GO_VERSION))
|
||||
$(info HOST_TMP_DIR: $(HOST_TMP_DIR))
|
||||
@@ -418,7 +457,7 @@ docker-dns-srv-test-certs-wildcard-run:
|
||||
# make build-etcd-test-proxy -f ./hack/scripts-dev/Makefile
|
||||
|
||||
build-etcd-test-proxy:
|
||||
go build -v -o ./bin/etcd-test-proxy ./cmd/tools/etcd-test-proxy
|
||||
go build -v -o ./bin/etcd-test-proxy ./tools/etcd-test-proxy
|
||||
|
||||
|
||||
|
||||
|
@@ -31,3 +31,52 @@ ETCDCTL_API=3 ./etcdctl \
|
||||
--key=/certs/server.key.insecure \
|
||||
--endpoints=https://m1.etcd.local:2379,https://m2.etcd.local:22379,https://m3.etcd.local:32379 \
|
||||
get abc
|
||||
|
||||
printf "\nWriting v2 key...\n"
|
||||
curl -L https://127.0.0.1:2379/v2/keys/queue \
|
||||
--cacert /certs/ca.crt \
|
||||
--cert /certs/server.crt \
|
||||
--key /certs/server.key.insecure \
|
||||
-X POST \
|
||||
-d value=data
|
||||
|
||||
printf "\nWriting v2 key...\n"
|
||||
curl -L https://m1.etcd.local:2379/v2/keys/queue \
|
||||
--cacert /certs/ca.crt \
|
||||
--cert /certs/server.crt \
|
||||
--key /certs/server.key.insecure \
|
||||
-X POST \
|
||||
-d value=data
|
||||
|
||||
printf "\nWriting v3 key...\n"
|
||||
curl -L https://127.0.0.1:2379/v3/kv/put \
|
||||
--cacert /certs/ca.crt \
|
||||
--cert /certs/server.crt \
|
||||
--key /certs/server.key.insecure \
|
||||
-X POST \
|
||||
-d '{"key": "Zm9v", "value": "YmFy"}'
|
||||
|
||||
printf "\n\nWriting v3 key...\n"
|
||||
curl -L https://m1.etcd.local:2379/v3/kv/put \
|
||||
--cacert /certs/ca.crt \
|
||||
--cert /certs/server.crt \
|
||||
--key /certs/server.key.insecure \
|
||||
-X POST \
|
||||
-d '{"key": "Zm9v", "value": "YmFy"}'
|
||||
|
||||
printf "\n\nReading v3 key...\n"
|
||||
curl -L https://m1.etcd.local:2379/v3/kv/range \
|
||||
--cacert /certs/ca.crt \
|
||||
--cert /certs/server.crt \
|
||||
--key /certs/server.key.insecure \
|
||||
-X POST \
|
||||
-d '{"key": "Zm9v"}'
|
||||
|
||||
printf "\n\nFetching 'curl https://m1.etcd.local:2379/metrics'...\n"
|
||||
curl \
|
||||
--cacert /certs/ca.crt \
|
||||
--cert /certs/server.crt \
|
||||
--key /certs/server.key.insecure \
|
||||
-L https://m1.etcd.local:2379/metrics | grep Put | tail -3
|
||||
|
||||
printf "\n\nDone!!!\n\n"
|
||||
|
6
hack/scripts-dev/docker-dns/insecure/Procfile
Normal file
6
hack/scripts-dev/docker-dns/insecure/Procfile
Normal file
@@ -0,0 +1,6 @@
|
||||
# Use goreman to run `go get github.com/mattn/goreman`
|
||||
etcd1: ./etcd --name m1 --data-dir /tmp/m1.data --listen-client-urls http://127.0.0.1:2379 --advertise-client-urls http://m1.etcd.local:2379 --listen-peer-urls http://127.0.0.1:2380 --initial-advertise-peer-urls=http://m1.etcd.local:2380 --initial-cluster-token tkn --initial-cluster=m1=http://m1.etcd.local:2380,m2=http://m2.etcd.local:22380,m3=http://m3.etcd.local:32380 --host-whitelist "localhost,127.0.0.1,m1.etcd.local"
|
||||
|
||||
etcd2: ./etcd --name m2 --data-dir /tmp/m2.data --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://m2.etcd.local:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls=http://m2.etcd.local:22380 --initial-cluster-token tkn --initial-cluster=m1=http://m1.etcd.local:2380,m2=http://m2.etcd.local:22380,m3=http://m3.etcd.local:32380 --host-whitelist "localhost,127.0.0.1,m1.etcd.local"
|
||||
|
||||
etcd3: ./etcd --name m3 --data-dir /tmp/m3.data --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://m3.etcd.local:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls=http://m3.etcd.local:32380 --initial-cluster-token tkn --initial-cluster=m1=http://m1.etcd.local:2380,m2=http://m2.etcd.local:22380,m3=http://m3.etcd.local:32380 --host-whitelist "localhost,127.0.0.1,m1.etcd.local"
|
89
hack/scripts-dev/docker-dns/insecure/run.sh
Executable file
89
hack/scripts-dev/docker-dns/insecure/run.sh
Executable file
@@ -0,0 +1,89 @@
|
||||
#!/bin/sh
|
||||
rm -rf /tmp/m1.data /tmp/m2.data /tmp/m3.data
|
||||
|
||||
/etc/init.d/bind9 start
|
||||
|
||||
# get rid of hosts so go lookup won't resolve 127.0.0.1 to localhost
|
||||
cat /dev/null >/etc/hosts
|
||||
|
||||
goreman -f /insecure/Procfile start &
|
||||
|
||||
# TODO: remove random sleeps
|
||||
sleep 7s
|
||||
|
||||
ETCDCTL_API=3 ./etcdctl \
|
||||
--endpoints=http://m1.etcd.local:2379 \
|
||||
endpoint health --cluster
|
||||
|
||||
ETCDCTL_API=3 ./etcdctl \
|
||||
--endpoints=http://m1.etcd.local:2379,http://m2.etcd.local:22379,http://m3.etcd.local:32379 \
|
||||
put abc def
|
||||
|
||||
ETCDCTL_API=3 ./etcdctl \
|
||||
--endpoints=http://m1.etcd.local:2379,http://m2.etcd.local:22379,http://m3.etcd.local:32379 \
|
||||
get abc
|
||||
|
||||
printf "\nWriting v2 key...\n"
|
||||
curl \
|
||||
-L http://127.0.0.1:2379/v2/keys/queue \
|
||||
-X POST \
|
||||
-d value=data
|
||||
|
||||
printf "\nWriting v2 key...\n"
|
||||
curl \
|
||||
-L http://m1.etcd.local:2379/v2/keys/queue \
|
||||
-X POST \
|
||||
-d value=data
|
||||
|
||||
printf "\nWriting v3 key...\n"
|
||||
curl \
|
||||
-L http://127.0.0.1:2379/v3/kv/put \
|
||||
-X POST \
|
||||
-d '{"key": "Zm9v", "value": "YmFy"}'
|
||||
|
||||
printf "\n\nWriting v3 key...\n"
|
||||
curl \
|
||||
-L http://m1.etcd.local:2379/v3/kv/put \
|
||||
-X POST \
|
||||
-d '{"key": "Zm9v", "value": "YmFy"}'
|
||||
|
||||
printf "\n\nReading v3 key...\n"
|
||||
curl \
|
||||
-L http://m1.etcd.local:2379/v3/kv/range \
|
||||
-X POST \
|
||||
-d '{"key": "Zm9v"}'
|
||||
|
||||
printf "\n\nFetching 'curl http://m1.etcd.local:2379/metrics'...\n"
|
||||
curl \
|
||||
-L http://m1.etcd.local:2379/metrics | grep Put | tail -3
|
||||
|
||||
name1=$(base64 <<< "/election-prefix")
|
||||
val1=$(base64 <<< "v1")
|
||||
data1="{\"name\":\"${name1}\", \"value\":\"${val1}\"}"
|
||||
|
||||
printf "\n\nCampaign: ${data1}\n"
|
||||
result1=$(curl -L http://m1.etcd.local:2379/v3/election/campaign -X POST -d "${data1}")
|
||||
echo ${result1}
|
||||
|
||||
# should not panic servers
|
||||
val2=$(base64 <<< "v2")
|
||||
data2="{\"value\": \"${val2}\"}"
|
||||
printf "\n\nProclaim (wrong-format): ${data2}\n"
|
||||
curl \
|
||||
-L http://m1.etcd.local:2379/v3/election/proclaim \
|
||||
-X POST \
|
||||
-d "${data2}"
|
||||
|
||||
printf "\n\nProclaim (wrong-format)...\n"
|
||||
curl \
|
||||
-L http://m1.etcd.local:2379/v3/election/proclaim \
|
||||
-X POST \
|
||||
-d '}'
|
||||
|
||||
printf "\n\nProclaim (wrong-format)...\n"
|
||||
curl \
|
||||
-L http://m1.etcd.local:2379/v3/election/proclaim \
|
||||
-X POST \
|
||||
-d '{"value": "Zm9v"}'
|
||||
|
||||
printf "\n\nDone!!!\n\n"
|
@@ -29,6 +29,9 @@ import (
|
||||
// NoLease is a special LeaseID representing the absence of a lease.
|
||||
const NoLease = LeaseID(0)
|
||||
|
||||
// MaxLeaseTTL is the maximum lease TTL value
|
||||
const MaxLeaseTTL = 9000000000
|
||||
|
||||
var (
|
||||
forever = time.Time{}
|
||||
|
||||
@@ -37,9 +40,10 @@ var (
|
||||
// maximum number of leases to revoke per second; configurable for tests
|
||||
leaseRevokeRate = 1000
|
||||
|
||||
ErrNotPrimary = errors.New("not a primary lessor")
|
||||
ErrLeaseNotFound = errors.New("lease not found")
|
||||
ErrLeaseExists = errors.New("lease already exists")
|
||||
ErrNotPrimary = errors.New("not a primary lessor")
|
||||
ErrLeaseNotFound = errors.New("lease not found")
|
||||
ErrLeaseExists = errors.New("lease already exists")
|
||||
ErrLeaseTTLTooLarge = errors.New("too large lease TTL")
|
||||
)
|
||||
|
||||
// TxnDelete is a TxnWrite that only permits deletes. Defined here
|
||||
@@ -198,6 +202,10 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
|
||||
return nil, ErrLeaseNotFound
|
||||
}
|
||||
|
||||
if ttl > MaxLeaseTTL {
|
||||
return nil, ErrLeaseTTLTooLarge
|
||||
}
|
||||
|
||||
// TODO: when lessor is under high load, it should give out lease
|
||||
// with longer TTL to reduce renew load.
|
||||
l := &Lease{
|
||||
|
@@ -451,6 +451,20 @@ func TestLessorExpireAndDemote(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLessorMaxTTL(t *testing.T) {
|
||||
dir, be := NewTestBackend(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
le := newLessor(be, minLeaseTTL)
|
||||
defer le.Stop()
|
||||
|
||||
_, err := le.Grant(1, MaxLeaseTTL+1)
|
||||
if err != ErrLeaseTTLTooLarge {
|
||||
t.Fatalf("grant unexpectedly succeeded")
|
||||
}
|
||||
}
|
||||
|
||||
type fakeDeleter struct {
|
||||
deleted []string
|
||||
tx backend.BatchTx
|
||||
|
@@ -119,6 +119,7 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
case <-closeCh:
|
||||
atomic.StoreInt32(&requestClosed, 1)
|
||||
plog.Printf("client %v closed request prematurely", clientreq.RemoteAddr)
|
||||
cancel()
|
||||
case <-completeCh:
|
||||
}
|
||||
}()
|
||||
|
@@ -230,6 +230,7 @@ func (p *peer) send(m raftpb.Message) {
|
||||
plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
|
||||
}
|
||||
plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
|
||||
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -53,6 +53,7 @@ func (g *remote) send(m raftpb.Message) {
|
||||
plog.MergeWarningf("dropped internal raft message to %s since sending buffer is full (bad/overloaded network)", g.id)
|
||||
}
|
||||
plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
|
||||
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -85,6 +85,8 @@ type Transporter interface {
|
||||
// If the connection is active since peer was added, it returns the adding time.
|
||||
// If the connection is currently inactive, it returns zero time.
|
||||
ActiveSince(id types.ID) time.Time
|
||||
// ActivePeers returns the number of active peers.
|
||||
ActivePeers() int
|
||||
// Stop closes the connections and stops the transporter.
|
||||
Stop()
|
||||
}
|
||||
@@ -375,6 +377,20 @@ func (t *Transport) Resume() {
|
||||
}
|
||||
}
|
||||
|
||||
// ActivePeers returns a channel that closes when an initial
|
||||
// peer connection has been established. Use this to wait until the
|
||||
// first peer connection becomes active.
|
||||
func (t *Transport) ActivePeers() (cnt int) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
for _, p := range t.peers {
|
||||
if !p.activeSince().IsZero() {
|
||||
cnt++
|
||||
}
|
||||
}
|
||||
return cnt
|
||||
}
|
||||
|
||||
type nopTransporter struct{}
|
||||
|
||||
func NewNopTransporter() Transporter {
|
||||
@@ -391,6 +407,7 @@ func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporter) RemoveAllPeers() {}
|
||||
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
|
||||
func (s *nopTransporter) ActivePeers() int { return 0 }
|
||||
func (s *nopTransporter) Stop() {}
|
||||
func (s *nopTransporter) Pause() {}
|
||||
func (s *nopTransporter) Resume() {}
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.3.1"
|
||||
Version = "3.3.3"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user