forked from vitalif/vitastor
Implement PG epochs to prevent the "version split"
The "version split" is when: - A block is written to 1 OSD out of 3, all of them die - OSDs 2 and 3 come up, the same block is written to both of them - The remaining OSD comes up. Now all 3 OSDs have the same version of the same object, but with different data.
parent
e680d6c1c3
commit
a7929931eb
|
@ -351,6 +351,12 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
|
|||
{
|
||||
pg_cfg.all_peers.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
// Read epoch
|
||||
pg_cfg.epoch = value["epoch"].uint64_value();
|
||||
if (on_change_pg_history_hook != NULL)
|
||||
{
|
||||
on_change_pg_history_hook(pg_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (key.substr(0, etcd_prefix.length()+10) == etcd_prefix+"/pg/state/")
|
||||
|
|
|
@ -23,6 +23,7 @@ struct pg_config_t
|
|||
bool pause;
|
||||
osd_num_t cur_primary;
|
||||
int cur_state;
|
||||
uint64_t epoch;
|
||||
};
|
||||
|
||||
struct json_kv_t
|
||||
|
@ -48,7 +49,8 @@ struct etcd_state_client_t
|
|||
std::function<void(json11::Json::object &)> on_load_config_hook;
|
||||
std::function<json11::Json()> load_pgs_checks_hook;
|
||||
std::function<void(bool)> on_load_pgs_hook;
|
||||
std::function<void(uint64_t)> on_change_osd_state_hook;
|
||||
std::function<void(pg_num_t)> on_change_pg_history_hook;
|
||||
std::function<void(osd_num_t)> on_change_osd_state_hook;
|
||||
|
||||
json_kv_t parse_etcd_kv(const json11::Json & kv_json);
|
||||
void etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback);
|
||||
|
|
12
lp/mon.js
12
lp/mon.js
|
@ -371,6 +371,7 @@ class Mon
|
|||
pg_history[i] = {
|
||||
osd_sets: [],
|
||||
all_peers: [],
|
||||
epoch: 0,
|
||||
};
|
||||
for (let j = 0; j < mul; j++)
|
||||
{
|
||||
|
@ -384,6 +385,10 @@ class Mon
|
|||
{
|
||||
Array.prototype.push.apply(pg_history[i].all_peers, hist.all_peers);
|
||||
}
|
||||
if (hist && hist.epoch)
|
||||
{
|
||||
pg_history[i].epoch = pg_history[i].epoch < hist.epoch ? hist.epoch : pg_history[i].epoch;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -393,6 +398,7 @@ class Mon
|
|||
// So, merge ALL PGs history
|
||||
let all_sets = {};
|
||||
let all_peers = {};
|
||||
let max_epoch = 0;
|
||||
for (const pg of prev_pgs)
|
||||
{
|
||||
all_sets[pg.join(' ')] = pg;
|
||||
|
@ -414,12 +420,16 @@ class Mon
|
|||
all_peers[osd_num] = Number(osd_num);
|
||||
}
|
||||
}
|
||||
if (hist && hist.epoch)
|
||||
{
|
||||
max_epoch = max_epoch < hist.epoch ? hist.epoch : max_epoch;
|
||||
}
|
||||
}
|
||||
all_sets = Object.values(all_sets);
|
||||
all_peers = Object.values(all_peers);
|
||||
for (let i = 0; i < new_pg_count; i++)
|
||||
{
|
||||
pg_history[i] = { osd_sets: all_sets, all_peers };
|
||||
pg_history[i] = { osd_sets: all_sets, all_peers, epoch: max_epoch };
|
||||
}
|
||||
}
|
||||
// Mark history keys for removed PGs as removed
|
||||
|
|
5
osd.h
5
osd.h
|
@ -127,7 +127,8 @@ class osd_t
|
|||
// cluster connection
|
||||
void parse_config(blockstore_config_t & config);
|
||||
void init_cluster();
|
||||
void on_change_osd_state_hook(uint64_t osd_num);
|
||||
void on_change_osd_state_hook(osd_num_t peer_osd);
|
||||
void on_change_pg_history_hook(pg_num_t pg_num);
|
||||
void on_change_etcd_state_hook(json11::Json::object & changes);
|
||||
void on_load_config_hook(json11::Json::object & changes);
|
||||
json11::Json on_load_pgs_checks_hook();
|
||||
|
@ -194,7 +195,7 @@ class osd_t
|
|||
void handle_primary_bs_subop(osd_op_t *subop);
|
||||
void add_bs_subop_stats(osd_op_t *subop);
|
||||
void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
|
||||
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||
void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set);
|
||||
void submit_primary_sync_subops(osd_op_t *cur_op);
|
||||
void submit_primary_stab_subops(osd_op_t *cur_op);
|
||||
|
|
|
@ -43,7 +43,8 @@ void osd_t::init_cluster()
|
|||
{
|
||||
st_cli.tfd = tfd;
|
||||
st_cli.log_level = log_level;
|
||||
st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); };
|
||||
st_cli.on_change_osd_state_hook = [this](osd_num_t peer_osd) { on_change_osd_state_hook(peer_osd); };
|
||||
st_cli.on_change_pg_history_hook = [this](pg_num_t pg_num) { on_change_pg_history_hook(pg_num); };
|
||||
st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); };
|
||||
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
|
||||
st_cli.load_pgs_checks_hook = [this]() { return on_load_pgs_checks_hook(); };
|
||||
|
@ -207,7 +208,7 @@ void osd_t::report_statistics()
|
|||
});
|
||||
}
|
||||
|
||||
void osd_t::on_change_osd_state_hook(uint64_t peer_osd)
|
||||
void osd_t::on_change_osd_state_hook(osd_num_t peer_osd)
|
||||
{
|
||||
if (c_cli.wanted_peers.find(peer_osd) != c_cli.wanted_peers.end())
|
||||
{
|
||||
|
@ -222,6 +223,27 @@ void osd_t::on_change_etcd_state_hook(json11::Json::object & changes)
|
|||
apply_pg_config();
|
||||
}
|
||||
|
||||
void osd_t::on_change_pg_history_hook(pg_num_t pg_num)
|
||||
{
|
||||
auto pg_it = pgs.find(pg_num);
|
||||
if (pg_it != pgs.end() && pg_it->second.epoch > pg_it->second.reported_epoch &&
|
||||
st_cli.pg_config[pg_num].epoch >= pg_it->second.epoch)
|
||||
{
|
||||
pg_it->second.reported_epoch = st_cli.pg_config[pg_num].epoch;
|
||||
object_id oid = { 0 };
|
||||
bool first = true;
|
||||
for (auto op: pg_it->second.write_queue)
|
||||
{
|
||||
if (first || oid != op.first)
|
||||
{
|
||||
oid = op.first;
|
||||
first = false;
|
||||
continue_primary_write(op.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::on_load_config_hook(json11::Json::object & global_config)
|
||||
{
|
||||
blockstore_config_t osd_config = this->config;
|
||||
|
@ -549,6 +571,7 @@ void osd_t::apply_pg_config()
|
|||
.state = pg_cfg.cur_primary == this->osd_num ? PG_PEERING : PG_STARTING,
|
||||
.pg_cursize = 0,
|
||||
.pg_num = pg_num,
|
||||
.reported_epoch = pg_cfg.epoch,
|
||||
.target_history = pg_cfg.target_history,
|
||||
.all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end()),
|
||||
.target_set = pg_cfg.target_set,
|
||||
|
@ -584,7 +607,6 @@ void osd_t::report_pg_states()
|
|||
{
|
||||
return;
|
||||
}
|
||||
etcd_reporting_pg_state = true;
|
||||
std::vector<std::pair<pg_num_t,bool>> reporting_pgs;
|
||||
json11::Json::array checks;
|
||||
json11::Json::array success;
|
||||
|
@ -651,28 +673,28 @@ void osd_t::report_pg_states()
|
|||
});
|
||||
if (pg.history_changed)
|
||||
{
|
||||
// Prevent race conditions (for the case when the monitor is updating this key at the same time)
|
||||
pg.history_changed = false;
|
||||
if (pg.state == PG_ACTIVE)
|
||||
{
|
||||
success.push_back(json11::Json::object {
|
||||
{ "request_delete_range", json11::Json::object {
|
||||
{ "key", base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) },
|
||||
} }
|
||||
std::string history_key = base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num));
|
||||
json11::Json::object history_value = {
|
||||
{ "epoch", pg.epoch },
|
||||
{ "all_peers", pg.all_peers },
|
||||
{ "osd_sets", pg.target_history },
|
||||
};
|
||||
checks.push_back(json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", history_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", st_cli.etcd_watch_revision+1 },
|
||||
});
|
||||
}
|
||||
else if (pg.state == (PG_ACTIVE|PG_LEFT_ON_DEAD))
|
||||
{
|
||||
success.push_back(json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) },
|
||||
{ "value", base64_encode(json11::Json(json11::Json::object {
|
||||
{ "all_peers", pg.all_peers },
|
||||
}).dump()) },
|
||||
{ "key", history_key },
|
||||
{ "value", base64_encode(json11::Json(history_value).dump()) },
|
||||
} }
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
failure.push_back(json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", state_key_base64 },
|
||||
|
@ -680,6 +702,7 @@ void osd_t::report_pg_states()
|
|||
});
|
||||
}
|
||||
pg_state_dirty.clear();
|
||||
etcd_reporting_pg_state = true;
|
||||
st_cli.etcd_txn(json11::Json::object {
|
||||
{ "compare", checks }, { "success", success }, { "failure", failure }
|
||||
}, ETCD_QUICK_TIMEOUT, [this, reporting_pgs](std::string err, json11::Json data)
|
||||
|
@ -705,6 +728,8 @@ void osd_t::report_pg_states()
|
|||
if (res["kvs"].array_items().size())
|
||||
{
|
||||
auto kv = st_cli.parse_etcd_kv(res["kvs"][0]);
|
||||
if (kv.key.substr(st_cli.etcd_prefix.length()+10) == st_cli.etcd_prefix+"/pg/state/")
|
||||
{
|
||||
pg_num_t pg_num = stoull_full(kv.key.substr(st_cli.etcd_prefix.length()+10));
|
||||
auto pg_it = pgs.find(pg_num);
|
||||
if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING)
|
||||
|
@ -716,6 +741,7 @@ void osd_t::report_pg_states()
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Retry after a short pause (hope we'll get some updates and update PG states accordingly)
|
||||
tfd->set_timer(500, false, [this](int) { report_pg_states(); });
|
||||
}
|
||||
|
|
|
@ -344,6 +344,7 @@ void pg_t::calc_object_states(int log_level)
|
|||
st.log_level = log_level;
|
||||
st.pg = this;
|
||||
auto ps = peering_state;
|
||||
epoch = 0;
|
||||
for (auto it: ps->list_results)
|
||||
{
|
||||
auto nstab = it.second.stable_count;
|
||||
|
@ -354,6 +355,10 @@ void pg_t::calc_object_states(int log_level)
|
|||
obj_ver_id *ov = it.second.buf;
|
||||
for (uint64_t i = 0; i < n; i++, ov++)
|
||||
{
|
||||
if ((ov->version >> (64-PG_EPOCH_BITS)) > epoch)
|
||||
{
|
||||
epoch = (ov->version >> (64-PG_EPOCH_BITS));
|
||||
}
|
||||
st.list[start+i] = {
|
||||
.oid = ov->oid,
|
||||
.version = ov->version,
|
||||
|
@ -369,6 +374,11 @@ void pg_t::calc_object_states(int log_level)
|
|||
std::sort(st.list.begin(), st.list.end());
|
||||
// Walk over it and check object states
|
||||
st.walk();
|
||||
if (this->state & (PG_DEGRADED|PG_LEFT_ON_DEAD))
|
||||
{
|
||||
assert(epoch != ((1ul << PG_EPOCH_BITS)-1));
|
||||
epoch++;
|
||||
}
|
||||
}
|
||||
|
||||
void pg_t::print_state()
|
||||
|
|
|
@ -9,6 +9,8 @@
|
|||
#include "osd_ops.h"
|
||||
#include "pg_states.h"
|
||||
|
||||
#define PG_EPOCH_BITS 48
|
||||
|
||||
struct pg_obj_loc_t
|
||||
{
|
||||
uint64_t role;
|
||||
|
@ -72,6 +74,8 @@ struct pg_t
|
|||
uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2;
|
||||
pg_num_t pg_num;
|
||||
uint64_t clean_count = 0, total_count = 0;
|
||||
// epoch number - should increase with each non-clean activation of the PG
|
||||
uint64_t epoch = 0, reported_epoch = 0;
|
||||
// target history and all potential peers
|
||||
std::vector<std::vector<osd_num_t>> target_history;
|
||||
std::vector<osd_num_t> all_peers;
|
||||
|
|
|
@ -99,7 +99,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
|
|||
{
|
||||
// Fast happy-path
|
||||
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0);
|
||||
submit_primary_subops(SUBMIT_READ, pg.pg_minsize, pg.cur_set.data(), cur_op);
|
||||
submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.pg_minsize, pg.cur_set.data(), cur_op);
|
||||
op_data->st = 1;
|
||||
}
|
||||
else
|
||||
|
@ -116,7 +116,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
|
|||
op_data->pg_size = pg.pg_size;
|
||||
op_data->degraded = 1;
|
||||
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0);
|
||||
submit_primary_subops(SUBMIT_READ, pg.pg_size, cur_set, cur_op);
|
||||
submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.pg_size, cur_set, cur_op);
|
||||
op_data->st = 1;
|
||||
}
|
||||
}
|
||||
|
@ -200,6 +200,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
|
|||
else if (op_data->st == 7) goto resume_7;
|
||||
else if (op_data->st == 8) goto resume_8;
|
||||
else if (op_data->st == 9) goto resume_9;
|
||||
else if (op_data->st == 10) goto resume_10;
|
||||
assert(op_data->st == 0);
|
||||
if (!check_write_queue(cur_op, pg))
|
||||
{
|
||||
|
@ -213,7 +214,7 @@ resume_1:
|
|||
cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set,
|
||||
pg.pg_size, pg.pg_minsize, pg.pg_cursize, pg.cur_set.data(), bs_block_size);
|
||||
// Read required blocks
|
||||
submit_primary_subops(SUBMIT_RMW_READ, pg.pg_size, op_data->prev_set, cur_op);
|
||||
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op);
|
||||
resume_2:
|
||||
op_data->st = 2;
|
||||
return;
|
||||
|
@ -228,7 +229,34 @@ resume_3:
|
|||
// Recover missing stripes, calculate parity
|
||||
calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size);
|
||||
// Send writes
|
||||
submit_primary_subops(SUBMIT_WRITE, pg.pg_size, pg.cur_set.data(), cur_op);
|
||||
if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch)
|
||||
{
|
||||
op_data->target_ver = ((uint64_t)pg.epoch << (64-PG_EPOCH_BITS)) | 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
if ((op_data->fact_ver & (1ul<<(64-PG_EPOCH_BITS) - 1)) == (1ul<<(64-PG_EPOCH_BITS) - 1))
|
||||
{
|
||||
assert(pg.epoch != ((1ul << PG_EPOCH_BITS)-1));
|
||||
pg.epoch++;
|
||||
}
|
||||
op_data->target_ver = op_data->fact_ver + 1;
|
||||
}
|
||||
if (pg.epoch > pg.reported_epoch)
|
||||
{
|
||||
// Report newer epoch before writing
|
||||
// FIXME: We may report only one PG state here...
|
||||
this->pg_state_dirty.insert(pg.pg_num);
|
||||
pg.history_changed = true;
|
||||
report_pg_states();
|
||||
resume_10:
|
||||
if (pg.epoch > pg.reported_epoch)
|
||||
{
|
||||
op_data->st = 10;
|
||||
return;
|
||||
}
|
||||
}
|
||||
submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.pg_size, pg.cur_set.data(), cur_op);
|
||||
resume_4:
|
||||
op_data->st = 4;
|
||||
return;
|
||||
|
@ -614,7 +642,7 @@ resume_1:
|
|||
// Determine which OSDs contain this object and delete it
|
||||
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
|
||||
// Submit 1 read to determine the actual version number
|
||||
submit_primary_subops(SUBMIT_RMW_READ, pg.pg_size, op_data->prev_set, cur_op);
|
||||
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op);
|
||||
resume_2:
|
||||
op_data->st = 2;
|
||||
return;
|
||||
|
|
|
@ -76,7 +76,7 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
|
||||
void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
|
||||
{
|
||||
bool w = submit_type == SUBMIT_WRITE;
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
|
@ -102,7 +102,6 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
|||
{
|
||||
zero_read = -1;
|
||||
}
|
||||
uint64_t op_version = w ? op_data->fact_ver+1 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver);
|
||||
osd_op_t *subops = new osd_op_t[n_subops];
|
||||
op_data->fact_ver = 0;
|
||||
op_data->done = op_data->errors = 0;
|
||||
|
|
Loading…
Reference in New Issue