Implement PG scrub runner

test-double-alloc
Vitaliy Filippov 2023-02-21 00:21:23 +03:00
parent 43b77d7619
commit c3bd26193d
16 changed files with 437 additions and 12 deletions

View File

@ -110,6 +110,10 @@ const etcd_tree = {
print_stats_interval: 3,
slow_log_interval: 10,
inode_vanish_time: 60,
scrub_interval: '30d', // 1s/1m/1h/1d
scrub_queue_depth: 1,
scrub_sleep: 0, // milliseconds
scrub_list_limit: 1000, // objects to list on one scrub iteration
// blockstore - fixed in superblock
block_size,
disk_alignment,
@ -172,6 +176,8 @@ const etcd_tree = {
osd_tags?: 'nvme' | [ 'nvme', ... ],
// prefer to put primary on OSD with these tags
primary_affinity_tags?: 'nvme' | [ 'nvme', ... ],
// scrub interval
scrub_interval?: '30d',
},
...
}, */
@ -267,7 +273,7 @@ const etcd_tree = {
primary: osd_num_t,
state: ("starting"|"peering"|"incomplete"|"active"|"repeering"|"stopping"|"offline"|
"degraded"|"has_corrupted"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"|
"has_invalid"|"left_on_dead")[],
"has_invalid"|"left_on_dead"|"scrubbing")[],
}
}, */
},
@ -289,6 +295,7 @@ const etcd_tree = {
osd_sets: osd_num_t[][],
all_peers: osd_num_t[],
epoch: uint64_t,
scrub_ts: uint64_t,
},
}, */
},

View File

@ -299,7 +299,7 @@ add_executable(test_cluster_client
EXCLUDE_FROM_ALL
test_cluster_client.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp
etcd_state_client.cpp timerfd_manager.cpp str_util.cpp ../json11/json11.cpp
)
target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__)
target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mock)

View File

@ -410,14 +410,17 @@ struct rm_osd_t
parent->cli->st_cli.etcd_prefix+"/pg/history/"+
std::to_string(pool_cfg.id)+"/"+std::to_string(pg_num)
);
auto hist = json11::Json::object {
{ "epoch", pg_cfg.epoch },
{ "all_peers", pg_cfg.all_peers },
{ "osd_sets", pg_cfg.target_history },
};
if (pg_cfg.scrub_ts)
hist["scrub_ts"] = pg_cfg.scrub_ts;
history_updates.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", history_key },
{ "value", base64_encode(json11::Json(json11::Json::object {
{ "epoch", pg_cfg.epoch },
{ "all_peers", pg_cfg.all_peers },
{ "osd_sets", pg_cfg.target_history },
}).dump()) },
{ "value", base64_encode(json11::Json(hist).dump()) },
} },
});
history_checks.push_back(json11::Json::object {

View File

@ -7,8 +7,8 @@
#ifndef __MOCK__
#include "addr_util.h"
#include "http_client.h"
#include "str_util.h"
#endif
#include "str_util.h"
etcd_state_client_t::~etcd_state_client_t()
{
@ -777,6 +777,10 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
fprintf(stderr, "Pool %u has invalid bitmap_granularity (must divide block_size), skipping pool\n", pool_id);
continue;
}
// Scrub Interval
pc.scrub_interval = parse_time(pool_item.second["scrub_interval"].string_value());
if (!pc.scrub_interval)
pc.scrub_interval = 0;
// Immediate Commit Mode
pc.immediate_commit = pool_item.second["immediate_commit"].is_string()
? (pool_item.second["immediate_commit"].string_value() == "all"
@ -919,6 +923,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
}
// Read epoch
pg_cfg.epoch = value["epoch"].uint64_value();
// Scrub timestamp
pg_cfg.scrub_ts = parse_time(value["scrub_ts"].string_value());
if (on_change_pg_history_hook != NULL)
{
on_change_pg_history_hook(pool_id, pg_num);

View File

@ -39,6 +39,7 @@ struct pg_config_t
osd_num_t cur_primary;
int cur_state;
uint64_t epoch;
uint64_t scrub_ts;
};
struct pool_config_t
@ -55,6 +56,7 @@ struct pool_config_t
uint64_t max_osd_combinations;
uint64_t pg_stripe_size;
std::map<pg_num_t, pg_config_t> pg_config;
uint64_t scrub_interval;
};
struct inode_config_t

View File

@ -207,6 +207,16 @@ void osd_t::parse_config(bool init)
inode_vanish_time = config["inode_vanish_time"].uint64_value();
if (!inode_vanish_time)
inode_vanish_time = 60;
global_scrub_interval = config["scrub_interval"].uint64_value();
if (!global_scrub_interval)
global_scrub_interval = 30*86400;
scrub_queue_depth = config["scrub_queue_depth"].uint64_value();
if (scrub_queue_depth < 1 || scrub_queue_depth > MAX_RECOVERY_QUEUE)
scrub_queue_depth = 1;
scrub_sleep_ms = config["scrub_sleep"].uint64_value();
scrub_list_limit = config["scrub_list_limit"].uint64_value();
if (!scrub_list_limit)
scrub_list_limit = 1000;
if ((old_no_rebalance && !no_rebalance || old_no_recovery && !no_recovery) &&
!(peering_state & (OSD_RECOVERING | OSD_FLUSHING_PGS)))
{

View File

@ -28,6 +28,7 @@
#define OSD_PEERING_PGS 0x04
#define OSD_FLUSHING_PGS 0x08
#define OSD_RECOVERING 0x10
#define OSD_SCRUBBING 0x20
#define MAX_AUTOSYNC_INTERVAL 3600
#define DEFAULT_AUTOSYNC_INTERVAL 5
@ -113,6 +114,10 @@ class osd_t
int recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
int inode_vanish_time = 60;
int log_level = 0;
uint64_t global_scrub_interval = 30*86400;
uint64_t scrub_queue_depth = 1;
uint64_t scrub_sleep_ms = 0;
uint32_t scrub_list_limit = 1000;
// cluster state
@ -138,12 +143,21 @@ class osd_t
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0, corrupted_objects = 0;
int peering_state = 0;
std::map<object_id, osd_recovery_op_t> recovery_ops;
std::map<object_id, osd_op_t*> scrub_ops;
bool recovery_last_degraded = true;
pool_pg_num_t recovery_last_pg;
object_id recovery_last_oid;
int recovery_pg_done = 0, recovery_done = 0;
osd_op_t *autosync_op = NULL;
// Scrubbing
uint64_t scrub_nearest_ts = 0;
int scrub_timer_id = -1;
pool_pg_num_t scrub_last_pg;
osd_op_t *scrub_list_op;
pg_list_result_t scrub_cur_list = {};
uint64_t scrub_list_pos = 0;
// Unstable writes
uint64_t unstable_write_count = 0;
std::map<osd_object_id_t, uint64_t> unstable_writes;
@ -221,6 +235,13 @@ class osd_t
bool continue_recovery();
pg_osd_set_state_t* change_osd_set(pg_osd_set_state_t *st, pg_t *pg);
// scrub
void scrub_list(pool_pg_num_t pg_id, osd_num_t role_osd, object_id min_oid);
bool pick_next_scrub(object_id & next_oid);
void submit_scrub_op(object_id oid);
bool continue_scrub();
void schedule_scrub(pg_t & pg);
// op execution
void exec_op(osd_op_t *cur_op);
void finish_op(osd_op_t *cur_op, int retval);

View File

@ -694,6 +694,12 @@ void osd_t::apply_pg_config()
pg_it->second.all_peers == vec_all_peers)
{
// No change in osd_set and history
if (pg_it->second.scrub_ts != pg_cfg.scrub_ts)
{
pg_it->second.scrub_ts = pg_cfg.scrub_ts;
peering_state = peering_state | OSD_SCRUBBING;
ringloop->wakeup();
}
continue;
}
else
@ -745,6 +751,7 @@ void osd_t::apply_pg_config()
.reported_epoch = pg_cfg.epoch,
.target_history = pg_cfg.target_history,
.all_peers = vec_all_peers,
.scrub_ts = pg_cfg.scrub_ts,
.target_set = pg_cfg.target_set,
};
if (pg.scheme == POOL_SCHEME_EC)
@ -885,6 +892,8 @@ void osd_t::report_pg_states()
{ "all_peers", pg.all_peers },
{ "osd_sets", pg.target_history },
};
if (pg.scrub_ts)
history_value["scrub_ts"] = pg.scrub_ts;
checks.push_back(json11::Json::object {
{ "target", "MOD" },
{ "key", history_key },

View File

@ -24,6 +24,7 @@ void osd_t::handle_peers()
if (!p.second.peering_state->list_ops.size())
{
p.second.calc_object_states(log_level);
schedule_scrub(p.second);
report_pg_state(p.second);
incomplete_objects += p.second.incomplete_objects.size();
misplaced_objects += p.second.misplaced_objects.size();
@ -83,6 +84,13 @@ void osd_t::handle_peers()
peering_state = peering_state & ~OSD_RECOVERING;
}
}
if (peering_state & OSD_SCRUBBING)
{
if (!continue_scrub())
{
peering_state = peering_state & ~OSD_SCRUBBING;
}
}
}
void osd_t::repeer_pgs(osd_num_t peer_osd)

View File

@ -463,7 +463,7 @@ void pg_t::calc_object_states(int log_level)
void pg_t::print_state()
{
printf(
"[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num,
"[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num,
(state & PG_STARTING) ? "starting" : "",
(state & PG_OFFLINE) ? "offline" : "",
(state & PG_PEERING) ? "peering" : "",
@ -479,6 +479,7 @@ void pg_t::print_state()
(state & PG_HAS_UNCLEAN) ? " + has_unclean" : "",
(state & PG_HAS_INVALID) ? " + has_invalid" : "",
(state & PG_LEFT_ON_DEAD) ? " + left_on_dead" : "",
(state & PG_SCRUBBING) ? " + scrubbing" : "",
total_count
);
}

View File

@ -95,6 +95,8 @@ struct pg_t
// target history and all potential peers
std::vector<std::vector<osd_num_t>> target_history;
std::vector<osd_num_t> all_peers;
// last scrub time
uint64_t scrub_ts = 0;
bool history_changed = false;
// peer list from the last peering event
std::vector<osd_num_t> cur_peers;

View File

@ -3,6 +3,326 @@
#include "osd_primary.h"
#define SELF_FD -1
void osd_t::scrub_list(pool_pg_num_t pg_id, osd_num_t role_osd, object_id min_oid)
{
pool_id_t pool_id = pg_id.pool_id;
pg_num_t pg_num = pg_id.pg_num;
assert(!scrub_list_op);
if (role_osd == this->osd_num)
{
// Self
osd_op_t *op = new osd_op_t();
op->op_type = 0;
op->peer_fd = SELF_FD;
clock_gettime(CLOCK_REALTIME, &op->tv_begin);
op->bs_op = new blockstore_op_t();
op->bs_op->opcode = BS_OP_LIST;
op->bs_op->pg_alignment = st_cli.pool_config[pool_id].pg_stripe_size;
if (min_oid.inode != 0 || min_oid.stripe != 0)
op->bs_op->min_oid = min_oid;
else
op->bs_op->min_oid.inode = ((uint64_t)pool_id << (64 - POOL_ID_BITS));
op->bs_op->max_oid.inode = ((uint64_t)(pool_id+1) << (64 - POOL_ID_BITS)) - 1;
op->bs_op->max_oid.stripe = UINT64_MAX;
op->bs_op->list_stable_limit = scrub_list_limit;
op->bs_op->pg_count = pg_counts[pool_id];
op->bs_op->pg_number = pg_num-1;
op->bs_op->callback = [this, op](blockstore_op_t *bs_op)
{
scrub_list_op = NULL;
if (op->bs_op->retval < 0)
{
printf("Local OP_LIST failed: retval=%d\n", op->bs_op->retval);
force_stop(1);
return;
}
add_bs_subop_stats(op);
scrub_cur_list = {
.buf = (obj_ver_id*)op->bs_op->buf,
.total_count = (uint64_t)op->bs_op->retval,
.stable_count = op->bs_op->version,
};
delete op->bs_op;
op->bs_op = NULL;
delete op;
continue_scrub();
};
scrub_list_op = op;
bs->enqueue_op(op->bs_op);
}
else
{
// Peer
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->peer_fd = msgr.osd_peer_fds.at(role_osd);
op->req = (osd_any_op_t){
.sec_list = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SEC_LIST,
},
.list_pg = pg_num,
.pg_count = pg_counts[pool_id],
.pg_stripe_size = st_cli.pool_config[pool_id].pg_stripe_size,
.min_inode = min_oid.inode ? min_oid.inode : ((uint64_t)(pool_id) << (64 - POOL_ID_BITS)),
.max_inode = ((uint64_t)(pool_id+1) << (64 - POOL_ID_BITS)) - 1,
.min_stripe = min_oid.stripe,
.stable_limit = scrub_list_limit,
},
};
op->callback = [this, role_osd](osd_op_t *op)
{
scrub_list_op = NULL;
if (op->reply.hdr.retval < 0)
{
printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval);
int fail_fd = op->peer_fd;
delete op;
msgr.stop_client(fail_fd);
return;
}
scrub_cur_list = {
.buf = (obj_ver_id*)op->buf,
.total_count = (uint64_t)op->reply.hdr.retval,
.stable_count = op->reply.sec_list.stable_count,
};
// set op->buf to NULL so it doesn't get freed
op->buf = NULL;
delete op;
continue_scrub();
};
scrub_list_op = op;
msgr.outbox_push(op);
}
}
bool osd_t::pick_next_scrub(object_id & next_oid)
{
if (!pgs.size())
{
if (scrub_cur_list.buf)
{
free(scrub_cur_list.buf);
scrub_cur_list = {};
scrub_last_pg = {};
}
return false;
}
timespec tv_now;
clock_gettime(CLOCK_REALTIME, &tv_now);
bool rescan = scrub_last_pg.pool_id != 0 || scrub_last_pg.pg_num != 0;
// Restart scanning from the same PG as the last time
auto pg_it = pgs.lower_bound(scrub_last_pg);
while (pg_it != pgs.end())
{
if (pg_it->second.state & PG_ACTIVE)
{
auto & pool_cfg = st_cli.pool_config.at(pg_it->first.pool_id);
auto interval = pool_cfg.scrub_interval ? pool_cfg.scrub_interval : global_scrub_interval;
if (pg_it->second.scrub_ts < tv_now.tv_sec-interval)
{
// Continue scrubbing from the next object
if (scrub_last_pg == pg_it->first)
{
while (scrub_list_pos < scrub_cur_list.total_count)
{
auto oid = scrub_cur_list.buf[scrub_list_pos].oid;
oid.stripe &= ~STRIPE_MASK;
scrub_list_pos++;
if (recovery_ops.find(oid) == recovery_ops.end() &&
scrub_ops.find(oid) == scrub_ops.end())
{
next_oid = oid;
if (!(pg_it->second.state & PG_SCRUBBING))
{
// Currently scrubbing this PG
pg_it->second.state = pg_it->second.state | PG_SCRUBBING;
report_pg_state(pg_it->second);
}
return true;
}
}
}
if (scrub_last_pg == pg_it->first &&
scrub_cur_list.total_count && scrub_list_pos >= scrub_cur_list.total_count &&
scrub_cur_list.stable_count < scrub_list_limit)
{
// End of the list, mark this PG as scrubbed and go to the next PG
}
else
{
// Continue listing
object_id scrub_last_oid;
if (scrub_last_pg != pg_it->first)
scrub_last_oid = (object_id){};
else if (scrub_cur_list.stable_count > 0)
{
scrub_last_oid = scrub_cur_list.buf[scrub_cur_list.stable_count-1].oid;
scrub_last_oid.stripe++;
}
osd_num_t scrub_osd = 0;
for (osd_num_t pg_osd: pg_it->second.cur_set)
{
if (pg_osd == this->osd_num || scrub_osd == 0)
scrub_osd = pg_osd;
}
if (!(pg_it->second.state & PG_SCRUBBING))
{
// Currently scrubbing this PG
pg_it->second.state = pg_it->second.state | PG_SCRUBBING;
report_pg_state(pg_it->second);
}
if (scrub_cur_list.buf)
{
free(scrub_cur_list.buf);
scrub_cur_list = {};
scrub_last_oid = {};
}
scrub_last_pg = pg_it->first;
scrub_list(pg_it->first, scrub_osd, scrub_last_oid);
return true;
}
}
if (pg_it->second.state & PG_SCRUBBING)
{
pg_it->second.scrub_ts = tv_now.tv_sec;
pg_it->second.state = pg_it->second.state & ~PG_SCRUBBING;
pg_it->second.history_changed = true;
report_pg_state(pg_it->second);
schedule_scrub(pg_it->second);
}
// The list is definitely not needed anymore
if (scrub_cur_list.buf)
{
free(scrub_cur_list.buf);
scrub_cur_list = {};
}
}
pg_it++;
if (pg_it == pgs.end() && rescan)
{
// Scan one more time to guarantee that there are no PGs to scrub
pg_it = pgs.begin();
rescan = false;
}
}
// Scanned all PGs - no more scrubs to do
return false;
}
void osd_t::submit_scrub_op(object_id oid)
{
auto osd_op = new osd_op_t();
osd_op->op_type = OSD_OP_OUT;
osd_op->req = (osd_any_op_t){
.rw = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = 1,
.opcode = OSD_OP_SCRUB,
},
.inode = oid.inode,
.offset = oid.stripe,
.len = 0,
},
};
if (log_level > 2)
{
printf("Submitting scrub for %lx:%lx\n", oid.inode, oid.stripe);
}
osd_op->callback = [this](osd_op_t *osd_op)
{
object_id oid = { .inode = osd_op->req.rw.inode, .stripe = osd_op->req.rw.offset };
if (osd_op->reply.hdr.retval < 0 && osd_op->reply.hdr.retval != -ENOENT)
{
// Scrub error
printf(
"Scrub failed with object %lx:%lx (PG %u/%u): error %ld\n",
oid.inode, oid.stripe, INODE_POOL(oid.inode),
map_to_pg(oid, st_cli.pool_config.at(INODE_POOL(oid.inode)).pg_stripe_size),
osd_op->reply.hdr.retval
);
}
else if (log_level > 2)
{
printf("Scrubbed %lx:%lx OK\n", oid.inode, oid.stripe);
}
delete osd_op;
if (scrub_sleep_ms)
{
this->tfd->set_timer(scrub_sleep_ms, false, [this, oid](int timer_id)
{
scrub_ops.erase(oid);
continue_scrub();
});
}
else
{
scrub_ops.erase(oid);
continue_scrub();
}
};
scrub_ops[oid] = osd_op;
exec_op(osd_op);
}
// Triggers scrub requests
// Scrub reads data from all replicas and compares it
// To scrub first we need to read objects listings
bool osd_t::continue_scrub()
{
if (scrub_list_op)
{
return true;
}
while (scrub_ops.size() < scrub_queue_depth)
{
object_id oid;
if (pick_next_scrub(oid))
submit_scrub_op(oid);
else
return false;
}
return true;
}
void osd_t::schedule_scrub(pg_t & pg)
{
auto & pool_cfg = st_cli.pool_config.at(pg.pool_id);
auto interval = pool_cfg.scrub_interval ? pool_cfg.scrub_interval : global_scrub_interval;
if (!scrub_nearest_ts || scrub_nearest_ts > pg.scrub_ts+interval)
{
scrub_nearest_ts = pg.scrub_ts+interval;
timespec tv_now;
clock_gettime(CLOCK_REALTIME, &tv_now);
if (scrub_timer_id >= 0)
{
tfd->clear_timer(scrub_timer_id);
scrub_timer_id = -1;
}
if (tv_now.tv_sec > scrub_nearest_ts)
{
scrub_nearest_ts = 0;
peering_state = peering_state | OSD_SCRUBBING;
ringloop->wakeup();
}
else
{
scrub_timer_id = tfd->set_timer((scrub_nearest_ts-tv_now.tv_sec)*1000, false, [this](int timer_id)
{
scrub_timer_id = -1;
scrub_nearest_ts = 0;
peering_state = peering_state | OSD_SCRUBBING;
ringloop->wakeup();
});
}
}
}
void osd_t::continue_primary_scrub(osd_op_t *cur_op)
{
if (!cur_op->op_data && !prepare_primary_rw(cur_op))

View File

@ -3,9 +3,9 @@
#include "pg_states.h"
const int pg_state_bit_count = 15;
const int pg_state_bit_count = 16;
const int pg_state_bits[15] = {
const int pg_state_bits[16] = {
PG_STARTING,
PG_PEERING,
PG_INCOMPLETE,
@ -21,9 +21,10 @@ const int pg_state_bits[15] = {
PG_HAS_UNCLEAN,
PG_HAS_INVALID,
PG_LEFT_ON_DEAD,
PG_SCRUBBING,
};
const char *pg_state_names[15] = {
const char *pg_state_names[16] = {
"starting",
"peering",
"incomplete",
@ -39,4 +40,5 @@ const char *pg_state_names[15] = {
"has_unclean",
"has_invalid",
"left_on_dead",
"scrubbing",
};

View File

@ -24,6 +24,7 @@
#define PG_HAS_INVALID (1<<12)
#define PG_HAS_CORRUPTED (1<<13)
#define PG_LEFT_ON_DEAD (1<<14)
#define PG_SCRUBBING (1<<15)
// Lower bits that represent object role (EC 0/1/2... or always 0 with replication)
// 12 bits is a safe default that doesn't depend on pg_stripe_size or pg_block_size

View File

@ -249,3 +249,35 @@ void print_help(const char *help_text, std::string exe_name, std::string cmd, bo
fwrite(filtered_text.data(), filtered_text.size(), 1, stdout);
exit(0);
}
uint64_t parse_time(std::string time_str, bool *ok)
{
if (!time_str.length())
{
if (ok)
*ok = false;
return 0;
}
uint64_t mul = 1;
char type_char = tolower(time_str[time_str.length()-1]);
if (type_char == 's' || type_char == 'm' || type_char == 'h' || type_char == 'd' || type_char == 'y')
{
if (type_char == 's')
mul = 1;
else if (time_str[time_str.length()-1] == 'M')
mul = 30*86400;
else if (type_char == 'm')
mul = 60;
else if (type_char == 'h')
mul = 3600;
else if (type_char == 'd')
mul = 86400;
else /*if (type_char == 'y')*/
mul = 86400*365;
time_str = time_str.substr(0, time_str.length()-1);
}
uint64_t ts = stoull_full(time_str, 0) * mul;
if (ok)
*ok = !(ts == 0 && time_str != "0" && (time_str != "" || mul != 1));
return ts;
}

View File

@ -15,3 +15,4 @@ std::string str_replace(const std::string & in, const std::string & needle, cons
uint64_t stoull_full(const std::string & str, int base = 0);
std::string format_size(uint64_t size, bool nobytes = false);
void print_help(const char *help_text, std::string exe_name, std::string cmd, bool all);
uint64_t parse_time(std::string time_str, bool *ok = NULL);