forked from vitalif/vitastor
Remove etcd timeout and keepalive interval hardcode
parent
9c3653b1e1
commit
a5cf06acd0
|
@ -85,7 +85,11 @@ const etcd_tree = {
|
|||
up_wait_retry_interval: 500, // ms. min: 50
|
||||
// osd
|
||||
etcd_report_interval: 5, // seconds
|
||||
etcd_keepalive_interval: 10, // seconds, default is etcd_report_interval*2
|
||||
max_etcd_attempts: 5,
|
||||
etcd_quick_timeout: 1000, // ms
|
||||
etcd_slow_timeout: 5000, // ms
|
||||
etcd_keepalive_timeout: 30, // seconds, default is min(30, etcd_report_interval*2)
|
||||
etcd_ws_keepalive_interval: 30, // seconds
|
||||
run_primary: true,
|
||||
osd_network: null, // "192.168.7.0/24" or an array of masks
|
||||
bind_address: "0.0.0.0",
|
||||
|
|
|
@ -194,7 +194,7 @@ void cli_tool_t::change_parent(inode_t cur, inode_t new_parent)
|
|||
} }
|
||||
},
|
||||
} },
|
||||
}, ETCD_SLOW_TIMEOUT, [this, new_parent, cur, cur_name](std::string err, json11::Json res)
|
||||
}, cli->st_cli.etcd_slow_timeout, [this, new_parent, cur, cur_name](std::string err, json11::Json res)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
|
|
|
@ -105,7 +105,7 @@ struct alloc_osd_t
|
|||
void etcd_txn(json11::Json txn)
|
||||
{
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(txn, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res)
|
||||
parent->cli->st_cli.etcd_txn(txn, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
|
|
|
@ -449,7 +449,7 @@ resume_3:
|
|||
void etcd_txn(json11::Json txn)
|
||||
{
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(txn, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res)
|
||||
parent->cli->st_cli.etcd_txn(txn, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
|
|
|
@ -48,7 +48,7 @@ struct pool_lister_t
|
|||
} },
|
||||
},
|
||||
} },
|
||||
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res)
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
|
|
|
@ -112,7 +112,7 @@ struct image_lister_t
|
|||
} },
|
||||
},
|
||||
} },
|
||||
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res)
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
|
|
|
@ -174,7 +174,7 @@ resume_1:
|
|||
parent->cli->st_cli.etcd_txn(json11::Json::object {
|
||||
{ "compare", checks },
|
||||
{ "success", success },
|
||||
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res)
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
|
|
|
@ -258,7 +258,7 @@ resume_9:
|
|||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(json11::Json::object {
|
||||
{ "success", reads },
|
||||
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json data)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
|
@ -417,7 +417,7 @@ resume_9:
|
|||
parent->cli->st_cli.etcd_txn(json11::Json::object {
|
||||
{ "compare", cmp },
|
||||
{ "success", txn },
|
||||
}, ETCD_SLOW_TIMEOUT, [this, target_name, child_name](std::string err, json11::Json res)
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this, target_name, child_name](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
|
@ -475,7 +475,7 @@ resume_9:
|
|||
} },
|
||||
},
|
||||
} },
|
||||
}, ETCD_SLOW_TIMEOUT, [this, cur_name](std::string err, json11::Json res)
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this, cur_name](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
|
|
|
@ -81,7 +81,7 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t
|
|||
"Content-Type: application/json\r\n"
|
||||
"Content-Length: "+std::to_string(req.size())+"\r\n"
|
||||
"Connection: keep-alive\r\n"
|
||||
"Keep-Alive: timeout="+std::to_string(etcd_keepalive_interval)+"\r\n"
|
||||
"Keep-Alive: timeout="+std::to_string(etcd_keepalive_timeout)+"\r\n"
|
||||
"\r\n"+req;
|
||||
auto cb = [this, cur_addr = selected_etcd_address, callback](const http_response_t *response)
|
||||
{
|
||||
|
@ -173,12 +173,32 @@ void etcd_state_client_t::parse_config(const json11::Json & config)
|
|||
this->etcd_prefix = "/"+this->etcd_prefix;
|
||||
}
|
||||
this->log_level = config["log_level"].int64_value();
|
||||
this->etcd_keepalive_interval = config["etcd_keepalive_interval"].uint64_value();
|
||||
if (this->etcd_keepalive_interval <= 0)
|
||||
this->etcd_keepalive_timeout = config["etcd_keepalive_timeout"].uint64_value();
|
||||
if (this->etcd_keepalive_timeout <= 0)
|
||||
{
|
||||
this->etcd_keepalive_interval = config["etcd_report_interval"].uint64_value() * 2;
|
||||
if (this->etcd_keepalive_interval <= 0)
|
||||
this->etcd_keepalive_interval = 10;
|
||||
this->etcd_keepalive_timeout = config["etcd_report_interval"].uint64_value() * 2;
|
||||
if (this->etcd_keepalive_timeout < 30)
|
||||
this->etcd_keepalive_timeout = 30;
|
||||
}
|
||||
this->etcd_ws_keepalive_interval = config["etcd_ws_keepalive_interval"].uint64_value();
|
||||
if (this->etcd_ws_keepalive_interval <= 0)
|
||||
{
|
||||
this->etcd_ws_keepalive_interval = 30;
|
||||
}
|
||||
this->max_etcd_attempts = config["max_etcd_attempts"].uint64_value();
|
||||
if (this->max_etcd_attempts <= 0)
|
||||
{
|
||||
this->max_etcd_attempts = 5;
|
||||
}
|
||||
this->etcd_slow_timeout = config["etcd_slow_timeout"].uint64_value();
|
||||
if (this->etcd_slow_timeout <= 0)
|
||||
{
|
||||
this->etcd_slow_timeout = 5000;
|
||||
}
|
||||
this->etcd_quick_timeout = config["etcd_quick_timeout"].uint64_value();
|
||||
if (this->etcd_quick_timeout <= 0)
|
||||
{
|
||||
this->etcd_quick_timeout = 1000;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -235,7 +255,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
|||
http_close(etcd_watch_ws);
|
||||
etcd_watch_ws = NULL;
|
||||
}
|
||||
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT,
|
||||
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", etcd_slow_timeout,
|
||||
[this, cur_addr = selected_etcd_address](const http_response_t *msg)
|
||||
{
|
||||
if (msg->body.length())
|
||||
|
@ -327,8 +347,8 @@ void etcd_state_client_t::start_etcd_watcher()
|
|||
etcd_watch_ws = NULL;
|
||||
if (etcd_watches_initialised == 0)
|
||||
{
|
||||
// Connection not established, retry in <ETCD_QUICK_TIMEOUT>
|
||||
tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int)
|
||||
// Connection not established, retry in <etcd_quick_timeout>
|
||||
tfd->set_timer(etcd_quick_timeout, false, [this](int)
|
||||
{
|
||||
start_etcd_watcher();
|
||||
});
|
||||
|
@ -378,7 +398,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
|||
}).dump());
|
||||
if (ws_keepalive_timer < 0)
|
||||
{
|
||||
ws_keepalive_timer = tfd->set_timer(ETCD_KEEPALIVE_TIMEOUT, true, [this](int)
|
||||
ws_keepalive_timer = tfd->set_timer(etcd_ws_keepalive_interval*1000, true, [this](int)
|
||||
{
|
||||
if (!etcd_watch_ws)
|
||||
{
|
||||
|
@ -409,12 +429,12 @@ void etcd_state_client_t::load_global_config()
|
|||
{
|
||||
etcd_call("/kv/range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/config/global") }
|
||||
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
}, etcd_slow_timeout, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error reading OSD configuration from etcd: %s\n", err.c_str());
|
||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
||||
tfd->set_timer(etcd_slow_timeout, false, [this](int timer_id)
|
||||
{
|
||||
load_global_config();
|
||||
});
|
||||
|
@ -482,12 +502,12 @@ void etcd_state_client_t::load_pgs()
|
|||
{
|
||||
req["compare"] = checks;
|
||||
}
|
||||
etcd_txn(req, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
etcd_txn(req, etcd_slow_timeout, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error loading PGs from etcd: %s\n", err.c_str());
|
||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
||||
tfd->set_timer(etcd_slow_timeout, false, [this](int timer_id)
|
||||
{
|
||||
load_pgs();
|
||||
});
|
||||
|
|
|
@ -12,12 +12,6 @@
|
|||
#define ETCD_PG_HISTORY_WATCH_ID 3
|
||||
#define ETCD_OSD_STATE_WATCH_ID 4
|
||||
|
||||
// FIXME: Remove hardcode
|
||||
#define MAX_ETCD_ATTEMPTS 5
|
||||
#define ETCD_SLOW_TIMEOUT 5000
|
||||
#define ETCD_QUICK_TIMEOUT 1000
|
||||
#define ETCD_KEEPALIVE_TIMEOUT 30000
|
||||
|
||||
#define DEFAULT_BLOCK_SIZE 128*1024
|
||||
|
||||
struct etcd_kv_t
|
||||
|
@ -88,10 +82,15 @@ protected:
|
|||
int ws_alive = 0;
|
||||
bool rand_initialized = false;
|
||||
uint64_t bs_block_size = DEFAULT_BLOCK_SIZE;
|
||||
int etcd_keepalive_interval = 10;
|
||||
void add_etcd_url(std::string);
|
||||
void pick_next_etcd();
|
||||
public:
|
||||
int etcd_keepalive_timeout = 30;
|
||||
int etcd_ws_keepalive_interval = 30;
|
||||
int max_etcd_attempts = 5;
|
||||
int etcd_quick_timeout = 1000;
|
||||
int etcd_slow_timeout = 5000;
|
||||
|
||||
std::string etcd_prefix;
|
||||
int log_level = 0;
|
||||
timerfd_manager_t *tfd = NULL;
|
||||
|
|
|
@ -277,14 +277,14 @@ void osd_t::report_statistics()
|
|||
} }
|
||||
});
|
||||
}
|
||||
st_cli.etcd_txn(json11::Json::object { { "success", txn } }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res)
|
||||
st_cli.etcd_txn(json11::Json::object { { "success", txn } }, st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
etcd_reporting_stats = false;
|
||||
if (err != "")
|
||||
{
|
||||
printf("[OSD %lu] Error reporting state to etcd: %s\n", this->osd_num, err.c_str());
|
||||
// Retry indefinitely
|
||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
||||
tfd->set_timer(st_cli.etcd_slow_timeout, false, [this](int timer_id)
|
||||
{
|
||||
report_statistics();
|
||||
});
|
||||
|
@ -355,13 +355,13 @@ void osd_t::acquire_lease()
|
|||
{
|
||||
// Maximum lease TTL is (report interval) + retries * (timeout + repeat interval)
|
||||
st_cli.etcd_call("/lease/grant", json11::Json::object {
|
||||
{ "TTL", etcd_report_interval+(MAX_ETCD_ATTEMPTS*(2*ETCD_QUICK_TIMEOUT)+999)/1000 }
|
||||
}, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
{ "TTL", etcd_report_interval+(st_cli.max_etcd_attempts*(2*st_cli.etcd_quick_timeout)+999)/1000 }
|
||||
}, st_cli.etcd_quick_timeout, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "" || data["ID"].string_value() == "")
|
||||
{
|
||||
printf("Error acquiring a lease from etcd: %s\n", err.c_str());
|
||||
tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int timer_id)
|
||||
tfd->set_timer(st_cli.etcd_quick_timeout, false, [this](int timer_id)
|
||||
{
|
||||
acquire_lease();
|
||||
});
|
||||
|
@ -408,19 +408,19 @@ void osd_t::create_osd_state()
|
|||
} }
|
||||
},
|
||||
} },
|
||||
}, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
}, st_cli.etcd_quick_timeout, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
etcd_failed_attempts++;
|
||||
printf("Error creating OSD state key: %s\n", err.c_str());
|
||||
if (etcd_failed_attempts > MAX_ETCD_ATTEMPTS)
|
||||
if (etcd_failed_attempts > st_cli.max_etcd_attempts)
|
||||
{
|
||||
// Die
|
||||
throw std::runtime_error("Cluster connection failed");
|
||||
}
|
||||
// Retry
|
||||
tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int timer_id)
|
||||
tfd->set_timer(st_cli.etcd_quick_timeout, false, [this](int timer_id)
|
||||
{
|
||||
create_osd_state();
|
||||
});
|
||||
|
@ -452,7 +452,7 @@ void osd_t::renew_lease()
|
|||
{
|
||||
st_cli.etcd_call("/lease/keepalive", json11::Json::object {
|
||||
{ "ID", etcd_lease_id }
|
||||
}, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
}, st_cli.etcd_quick_timeout, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err == "" && data["result"]["TTL"].string_value() == "")
|
||||
{
|
||||
|
@ -463,13 +463,13 @@ void osd_t::renew_lease()
|
|||
{
|
||||
etcd_failed_attempts++;
|
||||
printf("Error renewing etcd lease: %s\n", err.c_str());
|
||||
if (etcd_failed_attempts > MAX_ETCD_ATTEMPTS)
|
||||
if (etcd_failed_attempts > st_cli.max_etcd_attempts)
|
||||
{
|
||||
// Die
|
||||
throw std::runtime_error("Cluster connection failed");
|
||||
}
|
||||
// Retry
|
||||
tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int timer_id)
|
||||
tfd->set_timer(st_cli.etcd_quick_timeout, false, [this](int timer_id)
|
||||
{
|
||||
renew_lease();
|
||||
});
|
||||
|
@ -488,7 +488,7 @@ void osd_t::force_stop(int exitcode)
|
|||
{
|
||||
st_cli.etcd_call("/kv/lease/revoke", json11::Json::object {
|
||||
{ "ID", etcd_lease_id }
|
||||
}, ETCD_QUICK_TIMEOUT, [this, exitcode](std::string err, json11::Json data)
|
||||
}, st_cli.etcd_quick_timeout, [this, exitcode](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
|
@ -826,7 +826,7 @@ void osd_t::report_pg_states()
|
|||
etcd_reporting_pg_state = true;
|
||||
st_cli.etcd_txn(json11::Json::object {
|
||||
{ "compare", checks }, { "success", success }, { "failure", failure }
|
||||
}, ETCD_QUICK_TIMEOUT, [this, reporting_pgs](std::string err, json11::Json data)
|
||||
}, st_cli.etcd_quick_timeout, [this, reporting_pgs](std::string err, json11::Json data)
|
||||
{
|
||||
etcd_reporting_pg_state = false;
|
||||
if (!data["succeeded"].bool_value())
|
||||
|
|
|
@ -159,7 +159,7 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
|
|||
{ "readonly", readonly },
|
||||
{ "immediate_commit", (immediate_commit == IMMEDIATE_ALL ? "all" :
|
||||
(immediate_commit == IMMEDIATE_SMALL ? "small" : "none")) },
|
||||
{ "lease_timeout", etcd_report_interval+(MAX_ETCD_ATTEMPTS*(2*ETCD_QUICK_TIMEOUT)+999)/1000 },
|
||||
{ "lease_timeout", etcd_report_interval+(st_cli.max_etcd_attempts*(2*st_cli.etcd_quick_timeout)+999)/1000 },
|
||||
};
|
||||
#ifdef WITH_RDMA
|
||||
if (msgr.is_rdma_enabled())
|
||||
|
|
Loading…
Reference in New Issue