Compare commits
No commits in common. "eaac1fc5d15dc083ef954f186d297537d5376316" and "c467acc38800d8ce49fb8d171337c07255b533e6" have entirely different histories.
eaac1fc5d1
...
c467acc388
|
@ -35,7 +35,7 @@ etcd_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
|
||||||
kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err);
|
kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err);
|
||||||
if (json_err != "")
|
if (json_err != "")
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str());
|
printf("Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str());
|
||||||
kv.key = "";
|
kv.key = "";
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -81,7 +81,7 @@ void etcd_state_client_t::add_etcd_url(std::string addr)
|
||||||
addr = addr.substr(7);
|
addr = addr.substr(7);
|
||||||
else if (strtolower(addr.substr(0, 8)) == "https://")
|
else if (strtolower(addr.substr(0, 8)) == "https://")
|
||||||
{
|
{
|
||||||
fprintf(stderr, "HTTPS is unsupported for etcd. Either use plain HTTP or setup a local proxy for etcd interaction\n");
|
printf("HTTPS is unsupported for etcd. Either use plain HTTP or setup a local proxy for etcd interaction\n");
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
if (addr.find('/') == std::string::npos)
|
if (addr.find('/') == std::string::npos)
|
||||||
|
@ -149,7 +149,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||||
json11::Json data = json11::Json::parse(msg->body, json_err);
|
json11::Json data = json11::Json::parse(msg->body, json_err);
|
||||||
if (json_err != "")
|
if (json_err != "")
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str());
|
printf("Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -175,7 +175,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||||
{
|
{
|
||||||
if (this->log_level > 3)
|
if (this->log_level > 3)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.value.dump().c_str());
|
printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.value.dump().c_str());
|
||||||
}
|
}
|
||||||
parse_state(kv.second);
|
parse_state(kv.second);
|
||||||
}
|
}
|
||||||
|
@ -250,7 +250,7 @@ void etcd_state_client_t::load_global_config()
|
||||||
{
|
{
|
||||||
if (err != "")
|
if (err != "")
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Error reading OSD configuration from etcd: %s\n", err.c_str());
|
printf("Error reading OSD configuration from etcd: %s\n", err.c_str());
|
||||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
||||||
{
|
{
|
||||||
load_global_config();
|
load_global_config();
|
||||||
|
@ -323,7 +323,7 @@ void etcd_state_client_t::load_pgs()
|
||||||
{
|
{
|
||||||
if (err != "")
|
if (err != "")
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Error loading PGs from etcd: %s\n", err.c_str());
|
printf("Error loading PGs from etcd: %s\n", err.c_str());
|
||||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
||||||
{
|
{
|
||||||
load_pgs();
|
load_pgs();
|
||||||
|
@ -386,7 +386,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
sscanf(pool_item.first.c_str(), "%u%c", &pool_id, &null_byte);
|
sscanf(pool_item.first.c_str(), "%u%c", &pool_id, &null_byte);
|
||||||
if (!pool_id || pool_id >= POOL_ID_MAX || null_byte != 0)
|
if (!pool_id || pool_id >= POOL_ID_MAX || null_byte != 0)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool ID %s is invalid (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX);
|
printf("Pool ID %s is invalid (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
pc.id = pool_id;
|
pc.id = pool_id;
|
||||||
|
@ -394,7 +394,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
pc.name = pool_item.second["name"].string_value();
|
pc.name = pool_item.second["name"].string_value();
|
||||||
if (pc.name == "")
|
if (pc.name == "")
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool %u has empty name, skipping pool\n", pool_id);
|
printf("Pool %u has empty name, skipping pool\n", pool_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Failure Domain
|
// Failure Domain
|
||||||
|
@ -408,7 +408,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
pc.scheme = POOL_SCHEME_JERASURE;
|
pc.scheme = POOL_SCHEME_JERASURE;
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool %u has invalid coding scheme (one of \"xor\", \"replicated\" or \"jerasure\" required), skipping pool\n", pool_id);
|
printf("Pool %u has invalid coding scheme (one of \"xor\", \"replicated\" or \"jerasure\" required), skipping pool\n", pool_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// PG Size
|
// PG Size
|
||||||
|
@ -418,7 +418,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
(pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) ||
|
(pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) ||
|
||||||
pool_item.second["pg_size"].uint64_value() > 256)
|
pool_item.second["pg_size"].uint64_value() > 256)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool %u has invalid pg_size, skipping pool\n", pool_id);
|
printf("Pool %u has invalid pg_size, skipping pool\n", pool_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Parity Chunks
|
// Parity Chunks
|
||||||
|
@ -427,7 +427,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
{
|
{
|
||||||
if (pc.parity_chunks > 1)
|
if (pc.parity_chunks > 1)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool %u has invalid parity_chunks (must be 1), skipping pool\n", pool_id);
|
printf("Pool %u has invalid parity_chunks (must be 1), skipping pool\n", pool_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
pc.parity_chunks = 1;
|
pc.parity_chunks = 1;
|
||||||
|
@ -435,7 +435,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
if (pc.scheme == POOL_SCHEME_JERASURE &&
|
if (pc.scheme == POOL_SCHEME_JERASURE &&
|
||||||
(pc.parity_chunks < 1 || pc.parity_chunks > pc.pg_size-2))
|
(pc.parity_chunks < 1 || pc.parity_chunks > pc.pg_size-2))
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool %u has invalid parity_chunks (must be between 1 and pg_size-2), skipping pool\n", pool_id);
|
printf("Pool %u has invalid parity_chunks (must be between 1 and pg_size-2), skipping pool\n", pool_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// PG MinSize
|
// PG MinSize
|
||||||
|
@ -444,14 +444,14 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
(pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) &&
|
(pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) &&
|
||||||
pc.pg_minsize < (pc.pg_size-pc.parity_chunks))
|
pc.pg_minsize < (pc.pg_size-pc.parity_chunks))
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool %u has invalid pg_minsize, skipping pool\n", pool_id);
|
printf("Pool %u has invalid pg_minsize, skipping pool\n", pool_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// PG Count
|
// PG Count
|
||||||
pc.pg_count = pool_item.second["pg_count"].uint64_value();
|
pc.pg_count = pool_item.second["pg_count"].uint64_value();
|
||||||
if (pc.pg_count < 1)
|
if (pc.pg_count < 1)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool %u has invalid pg_count, skipping pool\n", pool_id);
|
printf("Pool %u has invalid pg_count, skipping pool\n", pool_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Max OSD Combinations
|
// Max OSD Combinations
|
||||||
|
@ -460,7 +460,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
pc.max_osd_combinations = 10000;
|
pc.max_osd_combinations = 10000;
|
||||||
if (pc.max_osd_combinations > 0 && pc.max_osd_combinations < 100)
|
if (pc.max_osd_combinations > 0 && pc.max_osd_combinations < 100)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool %u has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id);
|
printf("Pool %u has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// PG Stripe Size
|
// PG Stripe Size
|
||||||
|
@ -478,7 +478,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
{
|
{
|
||||||
if (pg_item.second.target_set.size() != parsed_cfg.pg_size)
|
if (pg_item.second.target_set.size() != parsed_cfg.pg_size)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n",
|
printf("Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n",
|
||||||
pool_id, pg_item.first, pg_item.second.target_set.size(), parsed_cfg.pg_size);
|
pool_id, pg_item.first, pg_item.second.target_set.size(), parsed_cfg.pg_size);
|
||||||
pg_item.second.pause = true;
|
pg_item.second.pause = true;
|
||||||
}
|
}
|
||||||
|
@ -501,7 +501,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
sscanf(pool_item.first.c_str(), "%u%c", &pool_id, &null_byte);
|
sscanf(pool_item.first.c_str(), "%u%c", &pool_id, &null_byte);
|
||||||
if (!pool_id || pool_id >= POOL_ID_MAX || null_byte != 0)
|
if (!pool_id || pool_id >= POOL_ID_MAX || null_byte != 0)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool ID %s is invalid in PG configuration (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX);
|
printf("Pool ID %s is invalid in PG configuration (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
for (auto & pg_item: pool_item.second.object_items())
|
for (auto & pg_item: pool_item.second.object_items())
|
||||||
|
@ -510,7 +510,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
sscanf(pg_item.first.c_str(), "%u%c", &pg_num, &null_byte);
|
sscanf(pg_item.first.c_str(), "%u%c", &pg_num, &null_byte);
|
||||||
if (!pg_num || null_byte != 0)
|
if (!pg_num || null_byte != 0)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Bad key in pool %u PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str());
|
printf("Bad key in pool %u PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num];
|
auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num];
|
||||||
|
@ -524,7 +524,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
}
|
}
|
||||||
if (parsed_cfg.target_set.size() != pool_config[pool_id].pg_size)
|
if (parsed_cfg.target_set.size() != pool_config[pool_id].pg_size)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n",
|
printf("Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n",
|
||||||
pool_id, pg_num, parsed_cfg.target_set.size(), pool_config[pool_id].pg_size);
|
pool_id, pg_num, parsed_cfg.target_set.size(), pool_config[pool_id].pg_size);
|
||||||
parsed_cfg.pause = true;
|
parsed_cfg.pause = true;
|
||||||
}
|
}
|
||||||
|
@ -537,8 +537,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
{
|
{
|
||||||
if (pg_it->second.exists && pg_it->first != ++n)
|
if (pg_it->second.exists && pg_it->first != ++n)
|
||||||
{
|
{
|
||||||
fprintf(
|
printf(
|
||||||
stderr, "Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n",
|
"Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n",
|
||||||
pool_item.second.id, pool_item.second.pg_config.size()
|
pool_item.second.id, pool_item.second.pg_config.size()
|
||||||
);
|
);
|
||||||
for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
|
for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
|
||||||
|
@ -561,7 +561,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
sscanf(key.c_str() + etcd_prefix.length()+12, "%u/%u%c", &pool_id, &pg_num, &null_byte);
|
sscanf(key.c_str() + etcd_prefix.length()+12, "%u/%u%c", &pool_id, &pg_num, &null_byte);
|
||||||
if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
|
if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str());
|
printf("Bad etcd key %s, ignoring\n", key.c_str());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -600,7 +600,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
sscanf(key.c_str() + etcd_prefix.length()+10, "%u/%u%c", &pool_id, &pg_num, &null_byte);
|
sscanf(key.c_str() + etcd_prefix.length()+10, "%u/%u%c", &pool_id, &pg_num, &null_byte);
|
||||||
if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
|
if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str());
|
printf("Bad etcd key %s, ignoring\n", key.c_str());
|
||||||
}
|
}
|
||||||
else if (value.is_null())
|
else if (value.is_null())
|
||||||
{
|
{
|
||||||
|
@ -624,7 +624,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
}
|
}
|
||||||
if (i >= pg_state_bit_count)
|
if (i >= pg_state_bit_count)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Unexpected pool %u PG %u state keyword in etcd: %s\n", pool_id, pg_num, e.dump().c_str());
|
printf("Unexpected pool %u PG %u state keyword in etcd: %s\n", pool_id, pg_num, e.dump().c_str());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -633,7 +633,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
(state & PG_PEERING) && state != PG_PEERING ||
|
(state & PG_PEERING) && state != PG_PEERING ||
|
||||||
(state & PG_INCOMPLETE) && state != PG_INCOMPLETE)
|
(state & PG_INCOMPLETE) && state != PG_INCOMPLETE)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str());
|
printf("Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary;
|
this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary;
|
||||||
|
@ -671,7 +671,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
sscanf(key.c_str() + etcd_prefix.length()+14, "%lu/%lu%c", &pool_id, &inode_num, &null_byte);
|
sscanf(key.c_str() + etcd_prefix.length()+14, "%lu/%lu%c", &pool_id, &inode_num, &null_byte);
|
||||||
if (!pool_id || pool_id >= POOL_ID_MAX || !inode_num || (inode_num >> (64-POOL_ID_BITS)) || null_byte != 0)
|
if (!pool_id || pool_id >= POOL_ID_MAX || !inode_num || (inode_num >> (64-POOL_ID_BITS)) || null_byte != 0)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str());
|
printf("Bad etcd key %s, ignoring\n", key.c_str());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -706,8 +706,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||||
parent_inode_num |= pool_id << (64-POOL_ID_BITS);
|
parent_inode_num |= pool_id << (64-POOL_ID_BITS);
|
||||||
else if (parent_pool_id >= POOL_ID_MAX)
|
else if (parent_pool_id >= POOL_ID_MAX)
|
||||||
{
|
{
|
||||||
fprintf(
|
printf(
|
||||||
stderr, "Inode %lu/%lu parent_pool value is invalid, ignoring parent setting\n",
|
"Inode %lu/%lu parent_pool value is invalid, ignoring parent setting\n",
|
||||||
inode_num >> (64-POOL_ID_BITS), inode_num & ((1l << (64-POOL_ID_BITS)) - 1)
|
inode_num >> (64-POOL_ID_BITS), inode_num & ((1l << (64-POOL_ID_BITS)) - 1)
|
||||||
);
|
);
|
||||||
parent_inode_num = 0;
|
parent_inode_num = 0;
|
||||||
|
|
|
@ -10,7 +10,6 @@
|
||||||
#include <netinet/tcp.h>
|
#include <netinet/tcp.h>
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
#include <sys/un.h>
|
#include <sys/un.h>
|
||||||
#include <sys/epoll.h>
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
|
@ -201,10 +200,9 @@ public:
|
||||||
fcntl(sockfd[0], F_SETFL, fcntl(sockfd[0], F_GETFL, 0) | O_NONBLOCK);
|
fcntl(sockfd[0], F_SETFL, fcntl(sockfd[0], F_GETFL, 0) | O_NONBLOCK);
|
||||||
nbd_fd = sockfd[0];
|
nbd_fd = sockfd[0];
|
||||||
load_module();
|
load_module();
|
||||||
bool bg = cfg["foreground"].is_null();
|
|
||||||
if (!cfg["dev_num"].is_null())
|
if (!cfg["dev_num"].is_null())
|
||||||
{
|
{
|
||||||
if (run_nbd(sockfd, cfg["dev_num"].int64_value(), device_size, NBD_FLAG_SEND_FLUSH, 30, bg) < 0)
|
if (run_nbd(sockfd, cfg["dev_num"].int64_value(), device_size, NBD_FLAG_SEND_FLUSH, 30) < 0)
|
||||||
{
|
{
|
||||||
perror("run_nbd");
|
perror("run_nbd");
|
||||||
exit(1);
|
exit(1);
|
||||||
|
@ -216,7 +214,7 @@ public:
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
int r = run_nbd(sockfd, i, device_size, NBD_FLAG_SEND_FLUSH, 30, bg);
|
int r = run_nbd(sockfd, i, device_size, NBD_FLAG_SEND_FLUSH, 30);
|
||||||
if (r == 0)
|
if (r == 0)
|
||||||
{
|
{
|
||||||
printf("/dev/nbd%d\n", i);
|
printf("/dev/nbd%d\n", i);
|
||||||
|
@ -239,7 +237,7 @@ public:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (bg)
|
if (cfg["foreground"].is_null())
|
||||||
{
|
{
|
||||||
daemonize();
|
daemonize();
|
||||||
}
|
}
|
||||||
|
@ -256,42 +254,17 @@ public:
|
||||||
};
|
};
|
||||||
ringloop->register_consumer(&consumer);
|
ringloop->register_consumer(&consumer);
|
||||||
// Add FD to epoll
|
// Add FD to epoll
|
||||||
bool stop = false;
|
epmgr->tfd->set_fd_handler(sockfd[0], false, [this](int peer_fd, int epoll_events)
|
||||||
epmgr->tfd->set_fd_handler(sockfd[0], false, [this, &stop](int peer_fd, int epoll_events)
|
|
||||||
{
|
{
|
||||||
if (epoll_events & EPOLLRDHUP)
|
read_ready++;
|
||||||
{
|
submit_read();
|
||||||
close(peer_fd);
|
|
||||||
stop = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
read_ready++;
|
|
||||||
submit_read();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
while (!stop)
|
while (1)
|
||||||
{
|
{
|
||||||
ringloop->loop();
|
ringloop->loop();
|
||||||
ringloop->wait();
|
ringloop->wait();
|
||||||
}
|
}
|
||||||
stop = false;
|
// FIXME: Cleanup when exiting
|
||||||
cluster_op_t *close_sync = new cluster_op_t;
|
|
||||||
close_sync->opcode = OSD_OP_SYNC;
|
|
||||||
close_sync->callback = [this, &stop](cluster_op_t *op)
|
|
||||||
{
|
|
||||||
stop = true;
|
|
||||||
delete op;
|
|
||||||
};
|
|
||||||
cli->execute(close_sync);
|
|
||||||
while (!stop)
|
|
||||||
{
|
|
||||||
ringloop->loop();
|
|
||||||
ringloop->wait();
|
|
||||||
}
|
|
||||||
delete cli;
|
|
||||||
delete epmgr;
|
|
||||||
delete ringloop;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void load_module()
|
void load_module()
|
||||||
|
@ -438,7 +411,7 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
int run_nbd(int sockfd[2], int dev_num, uint64_t size, uint64_t flags, unsigned timeout, bool bg)
|
int run_nbd(int sockfd[2], int dev_num, uint64_t size, uint64_t flags, unsigned timeout)
|
||||||
{
|
{
|
||||||
// Check handle size
|
// Check handle size
|
||||||
assert(sizeof(cur_req.handle) == 8);
|
assert(sizeof(cur_req.handle) == 8);
|
||||||
|
@ -486,14 +459,11 @@ protected:
|
||||||
{
|
{
|
||||||
// Run in child
|
// Run in child
|
||||||
close(sockfd[0]);
|
close(sockfd[0]);
|
||||||
if (bg)
|
|
||||||
{
|
|
||||||
daemonize();
|
|
||||||
}
|
|
||||||
r = ioctl(nbd, NBD_DO_IT);
|
r = ioctl(nbd, NBD_DO_IT);
|
||||||
if (r < 0)
|
if (r < 0)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "NBD device terminated with error: %s\n", strerror(errno));
|
fprintf(stderr, "NBD device terminated with error: %s\n", strerror(errno));
|
||||||
|
kill(getppid(), SIGTERM);
|
||||||
}
|
}
|
||||||
close(sockfd[1]);
|
close(sockfd[1]);
|
||||||
ioctl(nbd, NBD_CLEAR_QUE);
|
ioctl(nbd, NBD_CLEAR_QUE);
|
||||||
|
|
Loading…
Reference in New Issue