Compare commits
10 Commits
55263bc6b5
...
654eaba5a1
Author | SHA1 | Date |
---|---|---|
Yicheng Qin | 654eaba5a1 | |
Yicheng Qin | 9fa3bea5a2 | |
Xiang Li | 2fc8304300 | |
Xiang Li | 087ba30a90 | |
Yicheng Qin | e1df265dc5 | |
Yicheng Qin | 8059598332 | |
Yicheng Qin | e1e2daa205 | |
Yicheng Qin | 399931cec9 | |
Yicheng Qin | 49715173cb | |
Yicheng Qin | ad4f231b40 |
|
@ -309,7 +309,6 @@ func (e *Etcd) runServer() {
|
||||||
for {
|
for {
|
||||||
if e.mode == PeerMode {
|
if e.mode == PeerMode {
|
||||||
log.Infof("%v starting in peer mode", e.Config.Name)
|
log.Infof("%v starting in peer mode", e.Config.Name)
|
||||||
go registerAvailableInternalVersions(e.Config.Name, e.Config.Addr, e.Config.EtcdTLSInfo())
|
|
||||||
// Starting peer server should be followed close by listening on its port
|
// Starting peer server should be followed close by listening on its port
|
||||||
// If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
|
// If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
|
||||||
// One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster,
|
// One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster,
|
||||||
|
|
|
@ -1,59 +0,0 @@
|
||||||
package etcd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"runtime"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/log"
|
|
||||||
"github.com/coreos/etcd/server"
|
|
||||||
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
|
||||||
)
|
|
||||||
|
|
||||||
var defaultEtcdBinaryDir = "/usr/libexec/etcd/internal_versions/"
|
|
||||||
|
|
||||||
func registerAvailableInternalVersions(name string, addr string, tls *server.TLSInfo) {
|
|
||||||
var c *etcd.Client
|
|
||||||
if tls.Scheme() == "http" {
|
|
||||||
c = etcd.NewClient([]string{addr})
|
|
||||||
} else {
|
|
||||||
var err error
|
|
||||||
c, err = etcd.NewTLSClient([]string{addr}, tls.CertFile, tls.KeyFile, tls.CAFile)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("client TLS error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
vers, err := getInternalVersions()
|
|
||||||
if err != nil {
|
|
||||||
log.Infof("failed to get local etcd versions: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, v := range vers {
|
|
||||||
for {
|
|
||||||
_, err := c.Set("/_etcd/available-internal-versions/"+v+"/"+name, "ok", 0)
|
|
||||||
if err == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.Infof("%s: available_internal_versions %s is registered into key space successfully.", name, vers)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getInternalVersions() ([]string, error) {
|
|
||||||
if runtime.GOOS != "linux" {
|
|
||||||
return nil, fmt.Errorf("unmatched os version %v", runtime.GOOS)
|
|
||||||
}
|
|
||||||
etcdBinaryDir := os.Getenv("ETCD_BINARY_DIR")
|
|
||||||
if etcdBinaryDir == "" {
|
|
||||||
etcdBinaryDir = defaultEtcdBinaryDir
|
|
||||||
}
|
|
||||||
dir, err := os.Open(etcdBinaryDir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer dir.Close()
|
|
||||||
return dir.Readdirnames(-1)
|
|
||||||
}
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -293,7 +292,6 @@ func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error {
|
||||||
s.startRoutine(s.monitorTimeoutThreshold)
|
s.startRoutine(s.monitorTimeoutThreshold)
|
||||||
s.startRoutine(s.monitorActiveSize)
|
s.startRoutine(s.monitorActiveSize)
|
||||||
s.startRoutine(s.monitorPeerActivity)
|
s.startRoutine(s.monitorPeerActivity)
|
||||||
s.startRoutine(s.monitorVersion)
|
|
||||||
|
|
||||||
// open the snapshot
|
// open the snapshot
|
||||||
if snapshot {
|
if snapshot {
|
||||||
|
@ -372,7 +370,6 @@ func (s *PeerServer) HTTPHandler() http.Handler {
|
||||||
router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET")
|
router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET")
|
||||||
router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET")
|
router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET")
|
||||||
router.HandleFunc("/v2/admin/machines/{name}", s.RemoveHttpHandler).Methods("DELETE")
|
router.HandleFunc("/v2/admin/machines/{name}", s.RemoveHttpHandler).Methods("DELETE")
|
||||||
router.HandleFunc("/v2/admin/next-internal-version", s.NextInternalVersionHandler).Methods("GET")
|
|
||||||
|
|
||||||
return router
|
return router
|
||||||
}
|
}
|
||||||
|
@ -898,30 +895,3 @@ func (s *PeerServer) monitorPeerActivity() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PeerServer) monitorVersion() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-s.closeChan:
|
|
||||||
return
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := s.store.Get("/_etcd/next-internal-version", false, false)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// only support upgrading to etcd2
|
|
||||||
if *resp.Node.Value == "2" {
|
|
||||||
log.Infof("%s: detected next internal version 2, exit after 10 seconds.", s.Config.Name)
|
|
||||||
} else {
|
|
||||||
log.Infof("%s: detected invaild next internal version %s", s.Config.Name, *resp.Node.Value)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
time.Sleep(10 * time.Second)
|
|
||||||
// be nice to raft. try not to corrupt log file.
|
|
||||||
go s.raftServer.Stop()
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
os.Exit(0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ package server
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -310,48 +309,6 @@ func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Reques
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *PeerServer) NextInternalVersionHandler(w http.ResponseWriter, req *http.Request) {
|
|
||||||
for i := 0; i < 50; i++ {
|
|
||||||
if ps.raftServer.State() != raft.Leader {
|
|
||||||
l := ps.raftServer.Leader()
|
|
||||||
if l == "" {
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
url, _ := ps.registry.PeerURL(l)
|
|
||||||
uhttp.Redirect(url, w, req)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
resp, err := ps.store.Get("/_etcd/available-internal-versions/2", true, true)
|
|
||||||
if err != nil {
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
available := make(map[string]bool)
|
|
||||||
for _, n := range resp.Node.Nodes {
|
|
||||||
available[path.Base(n.Key)] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
notfound := false
|
|
||||||
for _, n := range ps.registry.Names() {
|
|
||||||
if !available[n] {
|
|
||||||
notfound = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if notfound {
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
c := ps.store.CommandFactory().CreateSetCommand("/_etcd/next-internal-version", false, "2", store.Permanent)
|
|
||||||
_, err = ps.raftServer.Do(c)
|
|
||||||
if err == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
w.WriteHeader(http.StatusServiceUnavailable)
|
|
||||||
}
|
|
||||||
|
|
||||||
// machineMessage represents information about a peer or standby in the registry.
|
// machineMessage represents information about a peer or standby in the registry.
|
||||||
type machineMessage struct {
|
type machineMessage struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
const ReleaseVersion = "0.4.9+git"
|
|
@ -3,8 +3,10 @@ package server
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/pprof"
|
"net/http/pprof"
|
||||||
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -135,6 +137,7 @@ func (s *Server) installV2(r *mux.Router) {
|
||||||
s.handleFunc(r2, "/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET", "HEAD")
|
s.handleFunc(r2, "/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET", "HEAD")
|
||||||
s.handleFunc(r2, "/v2/stats/store", s.GetStoreStatsHandler).Methods("GET", "HEAD")
|
s.handleFunc(r2, "/v2/stats/store", s.GetStoreStatsHandler).Methods("GET", "HEAD")
|
||||||
s.handleFunc(r2, "/v2/speedTest", s.SpeedTestHandler).Methods("GET", "HEAD")
|
s.handleFunc(r2, "/v2/speedTest", s.SpeedTestHandler).Methods("GET", "HEAD")
|
||||||
|
s.handleFunc(r2, "/v2/migration/snapshot", s.SnapshotHandler).Methods("GET")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) installMod(r *mux.Router) {
|
func (s *Server) installMod(r *mux.Router) {
|
||||||
|
@ -285,7 +288,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
|
||||||
// Handler to return the current version of etcd.
|
// Handler to return the current version of etcd.
|
||||||
func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error {
|
func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
fmt.Fprintf(w, `{"releaseVersion":"%s","internalVersion":"%s"}`, ReleaseVersion, InternalVersion)
|
fmt.Fprintf(w, "etcd %s", ReleaseVersion)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -359,6 +362,41 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SnapshotHandler forces etcd store to do a snapshot. If the disk parameter is set, the snapshot
|
||||||
|
// will be written to disk at data-dir/index-migrate.snap. Or the snapshot will be returned as
|
||||||
|
// http body.
|
||||||
|
func (s *Server) SnapshotHandler(w http.ResponseWriter, req *http.Request) error {
|
||||||
|
data, err := s.Store().Save()
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "failed to create snapshot: "+err.Error(), http.StatusInternalServerError)
|
||||||
|
log.Warn("Failed to create snapshot:" + err.Error())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
disk := req.FormValue("disk")
|
||||||
|
if disk == "true" {
|
||||||
|
name := fmt.Sprintf("%d-migrate.snap", s.peerServer.RaftServer().CommitIndex())
|
||||||
|
err = ioutil.WriteFile(path.Join(s.peerServer.RaftServer().Path(), name), data, 0600)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "failed to save snapshot: "+err.Error(), http.StatusInternalServerError)
|
||||||
|
log.Warn("server: failed to save snapshot: " + err.Error())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
log.Infof("server: saved snapshot file %s successfully", name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if disk != "" && disk != "false" {
|
||||||
|
http.Error(w, "invalid parameter: disk="+disk, http.StatusBadRequest)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = w.Write(data)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("server: failed to write snapshot to %s: %v", req.RemoteAddr, err.Error())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Retrieves metrics from bucket
|
// Retrieves metrics from bucket
|
||||||
func (s *Server) GetMetricsHandler(w http.ResponseWriter, req *http.Request) error {
|
func (s *Server) GetMetricsHandler(w http.ResponseWriter, req *http.Request) error {
|
||||||
(*s.metrics).Dump(w)
|
(*s.metrics).Dump(w)
|
||||||
|
|
|
@ -187,14 +187,6 @@ func (s *StandbyServer) monitorCluster() {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, err := s.checkMemberInternalVersionIsV2()
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("fail checking internal version(%v): %v", s.ClusterURLs(), err)
|
|
||||||
} else if ok {
|
|
||||||
log.Infof("Detect the cluster has been upgraded to v2. Exit now.")
|
|
||||||
os.Exit(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.syncCluster(nil); err != nil {
|
if err := s.syncCluster(nil); err != nil {
|
||||||
log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
|
log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
|
||||||
continue
|
continue
|
||||||
|
@ -224,39 +216,6 @@ func (s *StandbyServer) monitorCluster() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StandbyServer) checkMemberInternalVersionIsV2() (bool, error) {
|
|
||||||
c := &http.Client{Transport: s.client.Client.Transport}
|
|
||||||
for _, memb := range s.Cluster {
|
|
||||||
url := memb.ClientURL
|
|
||||||
resp, err := c.Get(url + "/version")
|
|
||||||
if err != nil {
|
|
||||||
log.Debugf("failed to get /version from %s", url)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
b, err := ioutil.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
log.Debugf("failed to read body from %s", url)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var m map[string]string
|
|
||||||
err = json.Unmarshal(b, &m)
|
|
||||||
if err != nil {
|
|
||||||
log.Debugf("failed to unmarshal body %s from %s", b, url)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
switch m["internalVersion"] {
|
|
||||||
case "1":
|
|
||||||
return false, nil
|
|
||||||
case "2":
|
|
||||||
return true, nil
|
|
||||||
default:
|
|
||||||
log.Warnf("unrecognized internal version %s from %s", m["internalVersion"], url)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, fmt.Errorf("failed to get version")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StandbyServer) syncCluster(peerURLs []string) error {
|
func (s *StandbyServer) syncCluster(peerURLs []string) error {
|
||||||
peerURLs = append(s.ClusterURLs(), peerURLs...)
|
peerURLs = append(s.ClusterURLs(), peerURLs...)
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
const ReleaseVersion = "0.4.8+git"
|
|
||||||
const InternalVersion = "1"
|
|
||||||
const Version = "v2"
|
const Version = "v2"
|
||||||
|
|
Loading…
Reference in New Issue