forked from vitalif/vitastor
Add etcd retries everywhere (they were missing in some places)
parent
31b9c683ee
commit
ba63af49b4
20
src/cli.cpp
20
src/cli.cpp
|
@ -177,7 +177,7 @@ void cli_tool_t::change_parent(inode_t cur, inode_t new_parent)
|
|||
new_cfg.parent_id = new_parent;
|
||||
json11::Json::object cur_cfg_json = cli->st_cli.serialize_inode_cfg(&new_cfg);
|
||||
waiting++;
|
||||
cli->st_cli.etcd_txn(json11::Json::object {
|
||||
cli->st_cli.etcd_txn_slow(json11::Json::object {
|
||||
{ "compare", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
|
@ -194,7 +194,7 @@ void cli_tool_t::change_parent(inode_t cur, inode_t new_parent)
|
|||
} }
|
||||
},
|
||||
} },
|
||||
}, cli->st_cli.etcd_slow_timeout, [this, new_parent, cur, cur_name](std::string err, json11::Json res)
|
||||
}, [this, new_parent, cur, cur_name](std::string err, json11::Json res)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
|
@ -229,6 +229,22 @@ void cli_tool_t::change_parent(inode_t cur, inode_t new_parent)
|
|||
});
|
||||
}
|
||||
|
||||
void cli_tool_t::etcd_txn(json11::Json txn)
|
||||
{
|
||||
waiting++;
|
||||
cli->st_cli.etcd_txn_slow(txn, [this](std::string err, json11::Json res)
|
||||
{
|
||||
waiting--;
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error reading from etcd: %s\n", err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
etcd_result = res;
|
||||
ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
|
||||
inode_config_t* cli_tool_t::get_inode_cfg(const std::string & name)
|
||||
{
|
||||
for (auto & ic: cli->st_cli.inode_config)
|
||||
|
|
|
@ -34,6 +34,7 @@ public:
|
|||
cluster_client_t *cli = NULL;
|
||||
|
||||
int waiting = 0;
|
||||
json11::Json etcd_result;
|
||||
ring_consumer_t consumer;
|
||||
std::function<bool(void)> action_cb;
|
||||
|
||||
|
@ -60,6 +61,8 @@ public:
|
|||
std::function<bool(void)> start_snap_rm(json11::Json);
|
||||
std::function<bool(void)> start_alloc_osd(json11::Json cfg, uint64_t *out = NULL);
|
||||
std::function<bool(void)> simple_offsets(json11::Json cfg);
|
||||
|
||||
void etcd_txn(json11::Json txn);
|
||||
};
|
||||
|
||||
uint64_t parse_size(std::string size_str);
|
||||
|
|
|
@ -13,7 +13,6 @@ struct alloc_osd_t
|
|||
{
|
||||
cli_tool_t *parent;
|
||||
|
||||
json11::Json result;
|
||||
uint64_t new_id = 1;
|
||||
|
||||
int state = 0;
|
||||
|
@ -29,7 +28,7 @@ struct alloc_osd_t
|
|||
goto resume_1;
|
||||
do
|
||||
{
|
||||
etcd_txn(json11::Json::object {
|
||||
parent->etcd_txn(json11::Json::object {
|
||||
{ "compare", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "target", "VERSION" },
|
||||
|
@ -63,10 +62,10 @@ struct alloc_osd_t
|
|||
state = 1;
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
if (!result["succeeded"].bool_value())
|
||||
if (!parent->etcd_result["succeeded"].bool_value())
|
||||
{
|
||||
std::vector<osd_num_t> used;
|
||||
for (auto kv: result["responses"][0]["response_range"]["kvs"].array_items())
|
||||
for (auto kv: parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items())
|
||||
{
|
||||
std::string key = base64_decode(kv["key"].string_value());
|
||||
osd_num_t cur_osd;
|
||||
|
@ -98,25 +97,9 @@ struct alloc_osd_t
|
|||
new_id = used[e-1]+1;
|
||||
}
|
||||
}
|
||||
} while (!result["succeeded"].bool_value());
|
||||
} while (!parent->etcd_result["succeeded"].bool_value());
|
||||
state = 100;
|
||||
}
|
||||
|
||||
void etcd_txn(json11::Json txn)
|
||||
{
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(txn, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error reading from etcd: %s\n", err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
this->result = res;
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
std::function<bool(void)> cli_tool_t::start_alloc_osd(json11::Json cfg, uint64_t *out)
|
||||
|
|
|
@ -31,7 +31,6 @@ struct image_creator_t
|
|||
inode_t new_parent_id = 0;
|
||||
inode_t new_id = 0, old_id = 0;
|
||||
uint64_t max_id_mod_rev = 0, cfg_mod_rev = 0, idx_mod_rev = 0;
|
||||
json11::Json result;
|
||||
|
||||
int state = 0;
|
||||
|
||||
|
@ -125,26 +124,26 @@ struct image_creator_t
|
|||
}
|
||||
do
|
||||
{
|
||||
etcd_txn(json11::Json::object {
|
||||
parent->etcd_txn(json11::Json::object {
|
||||
{ "success", json11::Json::array { get_next_id() } }
|
||||
});
|
||||
state = 2;
|
||||
resume_2:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
extract_next_id(result["responses"][0]);
|
||||
extract_next_id(parent->etcd_result["responses"][0]);
|
||||
attempt_create();
|
||||
state = 3;
|
||||
resume_3:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
if (!result["succeeded"].bool_value() &&
|
||||
result["responses"][0]["response_range"]["kvs"].array_items().size() > 0)
|
||||
if (!parent->etcd_result["succeeded"].bool_value() &&
|
||||
parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items().size() > 0)
|
||||
{
|
||||
fprintf(stderr, "Image %s already exists\n", image_name.c_str());
|
||||
exit(1);
|
||||
}
|
||||
} while (!result["succeeded"].bool_value());
|
||||
} while (!parent->etcd_result["succeeded"].bool_value());
|
||||
if (parent->progress)
|
||||
{
|
||||
printf("Image %s created\n", image_name.c_str());
|
||||
|
@ -196,13 +195,13 @@ resume_3:
|
|||
resume_4:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
if (!result["succeeded"].bool_value() &&
|
||||
result["responses"][0]["response_range"]["kvs"].array_items().size() > 0)
|
||||
if (!parent->etcd_result["succeeded"].bool_value() &&
|
||||
parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items().size() > 0)
|
||||
{
|
||||
fprintf(stderr, "Snapshot %s@%s already exists\n", image_name.c_str(), new_snap.c_str());
|
||||
exit(1);
|
||||
}
|
||||
} while (!result["succeeded"].bool_value());
|
||||
} while (!parent->etcd_result["succeeded"].bool_value());
|
||||
if (parent->progress)
|
||||
{
|
||||
printf("Snapshot %s@%s created\n", image_name.c_str(), new_snap.c_str());
|
||||
|
@ -246,7 +245,7 @@ resume_4:
|
|||
goto resume_2;
|
||||
else if (state == 3)
|
||||
goto resume_3;
|
||||
etcd_txn(json11::Json::object { { "success", json11::Json::array {
|
||||
parent->etcd_txn(json11::Json::object { { "success", json11::Json::array {
|
||||
get_next_id(),
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
|
@ -260,11 +259,11 @@ resume_4:
|
|||
resume_2:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
extract_next_id(result["responses"][0]);
|
||||
extract_next_id(parent->etcd_result["responses"][0]);
|
||||
old_id = 0;
|
||||
old_pool_id = 0;
|
||||
cfg_mod_rev = idx_mod_rev = 0;
|
||||
if (result["responses"][1]["response_range"]["kvs"].array_items().size() == 0)
|
||||
if (parent->etcd_result["responses"][1]["response_range"]["kvs"].array_items().size() == 0)
|
||||
{
|
||||
for (auto & ic: parent->cli->st_cli.inode_config)
|
||||
{
|
||||
|
@ -283,7 +282,7 @@ resume_2:
|
|||
{
|
||||
// FIXME: Parse kvs in etcd_state_client automatically
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(result["responses"][1]["response_range"]["kvs"][0]);
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][1]["response_range"]["kvs"][0]);
|
||||
old_id = INODE_NO_POOL(kv.value["id"].uint64_value());
|
||||
old_pool_id = (pool_id_t)kv.value["pool_id"].uint64_value();
|
||||
idx_mod_rev = kv.mod_revision;
|
||||
|
@ -293,7 +292,7 @@ resume_2:
|
|||
exit(1);
|
||||
}
|
||||
}
|
||||
etcd_txn(json11::Json::object {
|
||||
parent->etcd_txn(json11::Json::object {
|
||||
{ "success", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
|
@ -310,7 +309,7 @@ resume_3:
|
|||
if (parent->waiting > 0)
|
||||
return;
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(result["responses"][0]["response_range"]["kvs"][0]);
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][0]["response_range"]["kvs"][0]);
|
||||
size = kv.value["size"].uint64_value();
|
||||
new_parent_id = kv.value["parent_id"].uint64_value();
|
||||
uint64_t parent_pool_id = kv.value["parent_pool_id"].uint64_value();
|
||||
|
@ -439,28 +438,12 @@ resume_3:
|
|||
} },
|
||||
});
|
||||
};
|
||||
etcd_txn(json11::Json::object {
|
||||
parent->etcd_txn(json11::Json::object {
|
||||
{ "compare", checks },
|
||||
{ "success", success },
|
||||
{ "failure", failure },
|
||||
});
|
||||
}
|
||||
|
||||
void etcd_txn(json11::Json txn)
|
||||
{
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(txn, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error reading from etcd: %s\n", err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
this->result = res;
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
uint64_t parse_size(std::string size_str)
|
||||
|
|
|
@ -24,8 +24,7 @@ struct pool_lister_t
|
|||
if (state == 1)
|
||||
goto resume_1;
|
||||
// Space statistics - pool/stats/<pool>
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(json11::Json::object {
|
||||
parent->etcd_txn(json11::Json::object {
|
||||
{ "success", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
|
@ -48,21 +47,12 @@ struct pool_lister_t
|
|||
} },
|
||||
},
|
||||
} },
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error reading from etcd: %s\n", err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
space_info = res;
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
state = 1;
|
||||
resume_1:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
space_info = parent->etcd_result;
|
||||
std::map<pool_id_t, uint64_t> osd_free;
|
||||
for (auto & kv_item: space_info["responses"][0]["response_range"]["kvs"].array_items())
|
||||
{
|
||||
|
|
|
@ -84,8 +84,7 @@ struct image_lister_t
|
|||
// Space statistics
|
||||
// inode/stats/<pool>/<inode>::raw_used divided by pool/stats/<pool>::pg_real_size
|
||||
// multiplied by 1 or number of data drives
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(json11::Json::object {
|
||||
parent->etcd_txn(json11::Json::object {
|
||||
{ "success", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
|
@ -112,21 +111,12 @@ struct image_lister_t
|
|||
} },
|
||||
},
|
||||
} },
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error reading from etcd: %s\n", err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
space_info = res;
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
state = 1;
|
||||
resume_1:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
space_info = parent->etcd_result;
|
||||
std::map<pool_id_t, uint64_t> pool_pg_real_size;
|
||||
for (auto & kv_item: space_info["responses"][0]["response_range"]["kvs"].array_items())
|
||||
{
|
||||
|
|
|
@ -170,29 +170,19 @@ resume_1:
|
|||
} }
|
||||
});
|
||||
}
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(json11::Json::object {
|
||||
parent->etcd_txn(json11::Json::object {
|
||||
{ "compare", checks },
|
||||
{ "success", success },
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error changing %s: %s\n", image_name.c_str(), err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
if (!res["succeeded"].bool_value())
|
||||
{
|
||||
fprintf(stderr, "Image %s was modified by someone else, please repeat your request\n", image_name.c_str());
|
||||
exit(1);
|
||||
}
|
||||
parent->waiting--;
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
state = 2;
|
||||
resume_2:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
if (!parent->etcd_result["succeeded"].bool_value())
|
||||
{
|
||||
fprintf(stderr, "Image %s was modified by someone else, please repeat your request\n", image_name.c_str());
|
||||
exit(1);
|
||||
}
|
||||
printf("Image %s modified\n", image_name.c_str());
|
||||
state = 100;
|
||||
}
|
||||
|
|
|
@ -256,9 +256,9 @@ resume_9:
|
|||
});
|
||||
}
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(json11::Json::object {
|
||||
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
|
||||
{ "success", reads },
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json data)
|
||||
}, [this](std::string err, json11::Json data)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
|
@ -414,10 +414,10 @@ resume_9:
|
|||
}
|
||||
}
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(json11::Json::object {
|
||||
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
|
||||
{ "compare", cmp },
|
||||
{ "success", txn },
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this, target_name, child_name](std::string err, json11::Json res)
|
||||
}, [this, target_name, child_name](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
|
@ -454,7 +454,7 @@ resume_9:
|
|||
"/"+std::to_string(INODE_NO_POOL(cur))
|
||||
);
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn(json11::Json::object {
|
||||
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
|
||||
{ "compare", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
|
@ -475,7 +475,7 @@ resume_9:
|
|||
} },
|
||||
},
|
||||
} },
|
||||
}, parent->cli->st_cli.etcd_slow_timeout, [this, cur_name](std::string err, json11::Json res)
|
||||
}, [this, cur_name](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
|
|
|
@ -54,12 +54,18 @@ etcd_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
|
|||
return kv;
|
||||
}
|
||||
|
||||
void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function<void(std::string, json11::Json)> callback)
|
||||
void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, int retries, int interval, std::function<void(std::string, json11::Json)> callback)
|
||||
{
|
||||
etcd_call("/kv/txn", txn, timeout, callback);
|
||||
etcd_call("/kv/txn", txn, timeout, retries, interval, callback);
|
||||
}
|
||||
|
||||
void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback)
|
||||
void etcd_state_client_t::etcd_txn_slow(json11::Json txn, std::function<void(std::string, json11::Json)> callback)
|
||||
{
|
||||
etcd_call("/kv/txn", txn, etcd_slow_timeout, max_etcd_attempts, 0, callback);
|
||||
}
|
||||
|
||||
void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout,
|
||||
int retries, int interval, std::function<void(std::string, json11::Json)> callback)
|
||||
{
|
||||
if (!etcd_addresses.size() && !etcd_local.size())
|
||||
{
|
||||
|
@ -83,7 +89,8 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t
|
|||
"Connection: keep-alive\r\n"
|
||||
"Keep-Alive: timeout="+std::to_string(etcd_keepalive_timeout)+"\r\n"
|
||||
"\r\n"+req;
|
||||
auto cb = [this, cur_addr = selected_etcd_address, callback](const http_response_t *response)
|
||||
auto cb = [this, api, payload, timeout, retries, interval, callback,
|
||||
cur_addr = selected_etcd_address](const http_response_t *response)
|
||||
{
|
||||
std::string err;
|
||||
json11::Json data;
|
||||
|
@ -92,8 +99,30 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t
|
|||
{
|
||||
if (cur_addr == selected_etcd_address)
|
||||
selected_etcd_address = "";
|
||||
if (retries > 0)
|
||||
{
|
||||
if (this->log_level > 0)
|
||||
{
|
||||
printf(
|
||||
"Warning: etcd request failed: %s, retrying %d more times\n",
|
||||
err.c_str(), retries
|
||||
);
|
||||
}
|
||||
if (interval > 0)
|
||||
{
|
||||
tfd->set_timer(interval, false, [this, api, payload, timeout, retries, interval, callback](int)
|
||||
{
|
||||
etcd_call(api, payload, timeout, retries-1, interval, callback);
|
||||
});
|
||||
}
|
||||
else
|
||||
etcd_call(api, payload, timeout, retries-1, interval, callback);
|
||||
}
|
||||
else
|
||||
callback(err, data);
|
||||
}
|
||||
callback(err, data);
|
||||
else
|
||||
callback(err, data);
|
||||
};
|
||||
if (!keepalive_client)
|
||||
{
|
||||
|
@ -255,6 +284,8 @@ void etcd_state_client_t::start_etcd_watcher()
|
|||
http_close(etcd_watch_ws);
|
||||
etcd_watch_ws = NULL;
|
||||
}
|
||||
if (this->log_level > 1)
|
||||
printf("Trying to connect to etcd websocket at %s\n", etcd_address.c_str());
|
||||
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", etcd_slow_timeout,
|
||||
[this, cur_addr = selected_etcd_address](const http_response_t *msg)
|
||||
{
|
||||
|
@ -438,7 +469,7 @@ void etcd_state_client_t::load_global_config()
|
|||
{
|
||||
etcd_call("/kv/range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/config/global") }
|
||||
}, etcd_slow_timeout, [this](std::string err, json11::Json data)
|
||||
}, etcd_slow_timeout, max_etcd_attempts, 0, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
|
@ -511,10 +542,11 @@ void etcd_state_client_t::load_pgs()
|
|||
{
|
||||
req["compare"] = checks;
|
||||
}
|
||||
etcd_txn(req, etcd_slow_timeout, [this](std::string err, json11::Json data)
|
||||
etcd_txn_slow(req, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
// Retry indefinitely
|
||||
fprintf(stderr, "Error loading PGs from etcd: %s\n", err.c_str());
|
||||
tfd->set_timer(etcd_slow_timeout, false, [this](int timer_id)
|
||||
{
|
||||
|
|
|
@ -112,8 +112,9 @@ public:
|
|||
|
||||
json11::Json::object serialize_inode_cfg(inode_config_t *cfg);
|
||||
etcd_kv_t parse_etcd_kv(const json11::Json & kv_json);
|
||||
void etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback);
|
||||
void etcd_txn(json11::Json txn, int timeout, std::function<void(std::string, json11::Json)> callback);
|
||||
void etcd_call(std::string api, json11::Json payload, int timeout, int retries, int interval, std::function<void(std::string, json11::Json)> callback);
|
||||
void etcd_txn(json11::Json txn, int timeout, int retries, int interval, std::function<void(std::string, json11::Json)> callback);
|
||||
void etcd_txn_slow(json11::Json txn, std::function<void(std::string, json11::Json)> callback);
|
||||
void start_etcd_watcher();
|
||||
void load_global_config();
|
||||
void load_pgs();
|
||||
|
|
|
@ -59,6 +59,7 @@ struct http_co_t
|
|||
inline void stackin() { onstack++; }
|
||||
inline void stackout() { onstack--; if (!onstack && ended) end(); }
|
||||
inline void end() { ended = true; if (!onstack) { delete this; } }
|
||||
void run_cb_and_clear();
|
||||
void start_connection();
|
||||
void close_connection();
|
||||
void handle_events();
|
||||
|
@ -119,6 +120,16 @@ void http_request(http_co_t *handler, const std::string & host, const std::strin
|
|||
handler->send_request(host, request, options, response_callback);
|
||||
}
|
||||
|
||||
void http_co_t::run_cb_and_clear()
|
||||
{
|
||||
parsed.eof = true;
|
||||
std::function<void(const http_response_t*)> cb;
|
||||
cb.swap(response_callback);
|
||||
// Call callback after clearing it because otherwise we may hit reenterability problems
|
||||
if (cb != NULL)
|
||||
cb(&parsed);
|
||||
}
|
||||
|
||||
void http_co_t::send_request(const std::string & host, const std::string & request,
|
||||
const http_options_t & options, std::function<void(const http_response_t *response)> response_callback)
|
||||
{
|
||||
|
@ -137,6 +148,10 @@ void http_co_t::send_request(const std::string & host, const std::string & reque
|
|||
stackout();
|
||||
return;
|
||||
}
|
||||
if (state == HTTP_CO_KEEPALIVE && connected_host != host)
|
||||
{
|
||||
close_connection();
|
||||
}
|
||||
this->request_timeout = options.timeout < 0 ? 0 : (options.timeout == 0 ? DEFAULT_TIMEOUT : options.timeout);
|
||||
this->want_streaming = options.want_streaming;
|
||||
this->keepalive = options.keepalive;
|
||||
|
@ -146,22 +161,14 @@ void http_co_t::send_request(const std::string & host, const std::string & reque
|
|||
this->sent = 0;
|
||||
this->response_callback = response_callback;
|
||||
this->parsed = {};
|
||||
if (state == HTTP_CO_KEEPALIVE && connected_host != host)
|
||||
{
|
||||
close_connection();
|
||||
}
|
||||
if (request_timeout > 0)
|
||||
{
|
||||
timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id)
|
||||
{
|
||||
stackin();
|
||||
close_connection();
|
||||
if (this->response_callback != NULL)
|
||||
{
|
||||
parsed = { .error = "HTTP request timed out" };
|
||||
this->response_callback(&parsed);
|
||||
this->response_callback = NULL;
|
||||
}
|
||||
parsed = { .error = "HTTP request timed out" };
|
||||
run_cb_and_clear();
|
||||
stackout();
|
||||
});
|
||||
}
|
||||
|
@ -258,6 +265,7 @@ void http_co_t::close_connection()
|
|||
state = HTTP_CO_CLOSED;
|
||||
connected_host = "";
|
||||
response = "";
|
||||
epoll_events = 0;
|
||||
}
|
||||
|
||||
void http_co_t::start_connection()
|
||||
|
@ -267,8 +275,7 @@ void http_co_t::start_connection()
|
|||
if (!string_to_addr(host.c_str(), 1, 80, &addr))
|
||||
{
|
||||
parsed = { .error = "Invalid address: "+host };
|
||||
response_callback(&parsed);
|
||||
response_callback = NULL;
|
||||
run_cb_and_clear();
|
||||
stackout();
|
||||
return;
|
||||
}
|
||||
|
@ -276,8 +283,7 @@ void http_co_t::start_connection()
|
|||
if (peer_fd < 0)
|
||||
{
|
||||
parsed = { .error = std::string("socket: ")+strerror(errno) };
|
||||
response_callback(&parsed);
|
||||
response_callback = NULL;
|
||||
run_cb_and_clear();
|
||||
stackout();
|
||||
return;
|
||||
}
|
||||
|
@ -289,8 +295,7 @@ void http_co_t::start_connection()
|
|||
{
|
||||
close_connection();
|
||||
parsed = { .error = std::string("connect: ")+strerror(errno) };
|
||||
response_callback(&parsed);
|
||||
response_callback = NULL;
|
||||
run_cb_and_clear();
|
||||
stackout();
|
||||
return;
|
||||
}
|
||||
|
@ -323,12 +328,7 @@ void http_co_t::handle_events()
|
|||
else if (epoll_events & (EPOLLRDHUP|EPOLLERR))
|
||||
{
|
||||
close_connection();
|
||||
if (response_callback != NULL)
|
||||
{
|
||||
parsed.eof = true;
|
||||
response_callback(&parsed);
|
||||
response_callback = NULL;
|
||||
}
|
||||
run_cb_and_clear();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -349,8 +349,7 @@ void http_co_t::handle_connect_result()
|
|||
{
|
||||
close_connection();
|
||||
parsed = { .error = std::string("connect: ")+strerror(result) };
|
||||
response_callback(&parsed);
|
||||
response_callback = NULL;
|
||||
run_cb_and_clear();
|
||||
stackout();
|
||||
return;
|
||||
}
|
||||
|
@ -388,12 +387,8 @@ again:
|
|||
else if (res < 0)
|
||||
{
|
||||
close_connection();
|
||||
if (response_callback != NULL)
|
||||
{
|
||||
parsed = { .error = std::string("sendmsg: ")+strerror(errno) };
|
||||
response_callback(&parsed);
|
||||
response_callback = NULL;
|
||||
}
|
||||
parsed = { .error = std::string("sendmsg: ")+strerror(errno) };
|
||||
run_cb_and_clear();
|
||||
stackout();
|
||||
return;
|
||||
}
|
||||
|
@ -442,13 +437,7 @@ void http_co_t::submit_read()
|
|||
close_connection();
|
||||
if (res < 0)
|
||||
parsed = { .error = std::string("recvmsg: ")+strerror(-res) };
|
||||
else
|
||||
parsed.eof = true;
|
||||
if (response_callback != NULL)
|
||||
{
|
||||
response_callback(&parsed);
|
||||
response_callback = NULL;
|
||||
}
|
||||
run_cb_and_clear();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -497,12 +486,8 @@ bool http_co_t::handle_read()
|
|||
{
|
||||
// Sorry, unsupported response
|
||||
close_connection();
|
||||
if (response_callback != NULL)
|
||||
{
|
||||
parsed = { .error = "Response has neither Connection: close, nor Transfer-Encoding: chunked nor Content-Length headers" };
|
||||
response_callback(&parsed);
|
||||
response_callback = NULL;
|
||||
}
|
||||
parsed = { .error = "Response has neither Connection: close, nor Transfer-Encoding: chunked nor Content-Length headers" };
|
||||
run_cb_and_clear();
|
||||
stackout();
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -277,7 +277,7 @@ void osd_t::report_statistics()
|
|||
} }
|
||||
});
|
||||
}
|
||||
st_cli.etcd_txn(json11::Json::object { { "success", txn } }, st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res)
|
||||
st_cli.etcd_txn_slow(json11::Json::object { { "success", txn } }, [this](std::string err, json11::Json res)
|
||||
{
|
||||
etcd_reporting_stats = false;
|
||||
if (err != "")
|
||||
|
@ -356,11 +356,11 @@ void osd_t::acquire_lease()
|
|||
// Maximum lease TTL is (report interval) + retries * (timeout + repeat interval)
|
||||
st_cli.etcd_call("/lease/grant", json11::Json::object {
|
||||
{ "TTL", etcd_report_interval+(st_cli.max_etcd_attempts*(2*st_cli.etcd_quick_timeout)+999)/1000 }
|
||||
}, st_cli.etcd_quick_timeout, [this](std::string err, json11::Json data)
|
||||
}, st_cli.etcd_quick_timeout, 0, 0, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "" || data["ID"].string_value() == "")
|
||||
{
|
||||
printf("Error acquiring a lease from etcd: %s\n", err.c_str());
|
||||
printf("Error acquiring a lease from etcd: %s, retrying\n", err.c_str());
|
||||
tfd->set_timer(st_cli.etcd_quick_timeout, false, [this](int timer_id)
|
||||
{
|
||||
acquire_lease();
|
||||
|
@ -408,7 +408,7 @@ void osd_t::create_osd_state()
|
|||
} }
|
||||
},
|
||||
} },
|
||||
}, st_cli.etcd_quick_timeout, [this](std::string err, json11::Json data)
|
||||
}, st_cli.etcd_quick_timeout, 0, 0, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
|
@ -452,7 +452,7 @@ void osd_t::renew_lease()
|
|||
{
|
||||
st_cli.etcd_call("/lease/keepalive", json11::Json::object {
|
||||
{ "ID", etcd_lease_id }
|
||||
}, st_cli.etcd_quick_timeout, [this](std::string err, json11::Json data)
|
||||
}, st_cli.etcd_quick_timeout, 0, 0, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err == "" && data["result"]["TTL"].string_value() == "")
|
||||
{
|
||||
|
@ -488,7 +488,7 @@ void osd_t::force_stop(int exitcode)
|
|||
{
|
||||
st_cli.etcd_call("/kv/lease/revoke", json11::Json::object {
|
||||
{ "ID", etcd_lease_id }
|
||||
}, st_cli.etcd_quick_timeout, [this, exitcode](std::string err, json11::Json data)
|
||||
}, st_cli.etcd_quick_timeout, st_cli.max_etcd_attempts, 0, [this, exitcode](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
|
@ -826,7 +826,7 @@ void osd_t::report_pg_states()
|
|||
etcd_reporting_pg_state = true;
|
||||
st_cli.etcd_txn(json11::Json::object {
|
||||
{ "compare", checks }, { "success", success }, { "failure", failure }
|
||||
}, st_cli.etcd_quick_timeout, [this, reporting_pgs](std::string err, json11::Json data)
|
||||
}, st_cli.etcd_quick_timeout, 0, 0, [this, reporting_pgs](std::string err, json11::Json data)
|
||||
{
|
||||
etcd_reporting_pg_state = false;
|
||||
if (!data["succeeded"].bool_value())
|
||||
|
|
|
@ -31,7 +31,7 @@ else
|
|||
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"xor","pg_size":'$PG_SIZE',"pg_minsize":'$PG_MINSIZE',"parity_chunks":1,"pg_count":'$PG_COUNT',"failure_domain":"osd"}}'
|
||||
fi
|
||||
|
||||
sleep 2
|
||||
sleep 3
|
||||
|
||||
if ! ($ETCDCTL get /vitastor/config/pgs --print-value-only | jq -s -e '(. | length) != 0 and ([ .[0].items["1"][] | select(((.osd_set | select(. != 0) | sort | unique) | length) == '$PG_SIZE') ] | length) == '$PG_COUNT); then
|
||||
format_error "FAILED: $PG_COUNT PG(s) NOT CONFIGURED"
|
||||
|
|
Loading…
Reference in New Issue