Implement PG scrub runner
parent
2389b49a16
commit
726c6d3470
|
@ -107,6 +107,10 @@ const etcd_tree = {
|
|||
slow_log_interval: 10,
|
||||
inode_vanish_time: 60,
|
||||
osd_memlock: false,
|
||||
scrub_interval: '30d', // 1s/1m/1h/1d
|
||||
scrub_queue_depth: 1,
|
||||
scrub_sleep: 0, // milliseconds
|
||||
scrub_list_limit: 1000, // objects to list on one scrub iteration
|
||||
// blockstore - fixed in superblock
|
||||
block_size,
|
||||
disk_alignment,
|
||||
|
@ -168,6 +172,8 @@ const etcd_tree = {
|
|||
osd_tags?: 'nvme' | [ 'nvme', ... ],
|
||||
// prefer to put primary on OSD with these tags
|
||||
primary_affinity_tags?: 'nvme' | [ 'nvme', ... ],
|
||||
// scrub interval
|
||||
scrub_interval?: '30d',
|
||||
},
|
||||
...
|
||||
}, */
|
||||
|
@ -263,7 +269,7 @@ const etcd_tree = {
|
|||
primary: osd_num_t,
|
||||
state: ("starting"|"peering"|"incomplete"|"active"|"repeering"|"stopping"|"offline"|
|
||||
"degraded"|"has_corrupted"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"|
|
||||
"has_invalid"|"left_on_dead")[],
|
||||
"has_invalid"|"left_on_dead"|"scrubbing")[],
|
||||
}
|
||||
}, */
|
||||
},
|
||||
|
@ -285,6 +291,7 @@ const etcd_tree = {
|
|||
osd_sets: osd_num_t[][],
|
||||
all_peers: osd_num_t[],
|
||||
epoch: uint64_t,
|
||||
scrub_ts: uint64_t,
|
||||
},
|
||||
}, */
|
||||
},
|
||||
|
|
|
@ -299,7 +299,7 @@ add_executable(test_cluster_client
|
|||
EXCLUDE_FROM_ALL
|
||||
test_cluster_client.cpp
|
||||
pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
|
||||
etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp
|
||||
etcd_state_client.cpp timerfd_manager.cpp str_util.cpp ../json11/json11.cpp
|
||||
)
|
||||
target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__)
|
||||
target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mock)
|
||||
|
|
|
@ -410,14 +410,17 @@ struct rm_osd_t
|
|||
parent->cli->st_cli.etcd_prefix+"/pg/history/"+
|
||||
std::to_string(pool_cfg.id)+"/"+std::to_string(pg_num)
|
||||
);
|
||||
auto hist = json11::Json::object {
|
||||
{ "epoch", pg_cfg.epoch },
|
||||
{ "all_peers", pg_cfg.all_peers },
|
||||
{ "osd_sets", pg_cfg.target_history },
|
||||
};
|
||||
if (pg_cfg.scrub_ts)
|
||||
hist["scrub_ts"] = pg_cfg.scrub_ts;
|
||||
history_updates.push_back(json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", history_key },
|
||||
{ "value", base64_encode(json11::Json(json11::Json::object {
|
||||
{ "epoch", pg_cfg.epoch },
|
||||
{ "all_peers", pg_cfg.all_peers },
|
||||
{ "osd_sets", pg_cfg.target_history },
|
||||
}).dump()) },
|
||||
{ "value", base64_encode(json11::Json(hist).dump()) },
|
||||
} },
|
||||
});
|
||||
history_checks.push_back(json11::Json::object {
|
||||
|
|
|
@ -7,8 +7,8 @@
|
|||
#ifndef __MOCK__
|
||||
#include "addr_util.h"
|
||||
#include "http_client.h"
|
||||
#include "str_util.h"
|
||||
#endif
|
||||
#include "str_util.h"
|
||||
|
||||
etcd_state_client_t::~etcd_state_client_t()
|
||||
{
|
||||
|
@ -759,6 +759,10 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
|||
fprintf(stderr, "Pool %u has invalid bitmap_granularity (must divide block_size), skipping pool\n", pool_id);
|
||||
continue;
|
||||
}
|
||||
// Scrub Interval
|
||||
pc.scrub_interval = parse_time(pool_item.second["scrub_interval"].string_value());
|
||||
if (!pc.scrub_interval)
|
||||
pc.scrub_interval = 0;
|
||||
// Immediate Commit Mode
|
||||
pc.immediate_commit = pool_item.second["immediate_commit"].is_string()
|
||||
? (pool_item.second["immediate_commit"].string_value() == "all"
|
||||
|
@ -901,6 +905,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
|||
}
|
||||
// Read epoch
|
||||
pg_cfg.epoch = value["epoch"].uint64_value();
|
||||
// Scrub timestamp
|
||||
pg_cfg.scrub_ts = parse_time(value["scrub_ts"].string_value());
|
||||
if (on_change_pg_history_hook != NULL)
|
||||
{
|
||||
on_change_pg_history_hook(pool_id, pg_num);
|
||||
|
|
|
@ -39,6 +39,7 @@ struct pg_config_t
|
|||
osd_num_t cur_primary;
|
||||
int cur_state;
|
||||
uint64_t epoch;
|
||||
uint64_t scrub_ts;
|
||||
};
|
||||
|
||||
struct pool_config_t
|
||||
|
@ -55,6 +56,7 @@ struct pool_config_t
|
|||
uint64_t max_osd_combinations;
|
||||
uint64_t pg_stripe_size;
|
||||
std::map<pg_num_t, pg_config_t> pg_config;
|
||||
uint64_t scrub_interval;
|
||||
};
|
||||
|
||||
struct inode_config_t
|
||||
|
|
10
src/osd.cpp
10
src/osd.cpp
|
@ -178,6 +178,16 @@ void osd_t::parse_config(const json11::Json & config, bool allow_disk_params)
|
|||
inode_vanish_time = config["inode_vanish_time"].uint64_value();
|
||||
if (!inode_vanish_time)
|
||||
inode_vanish_time = 60;
|
||||
global_scrub_interval = config["scrub_interval"].uint64_value();
|
||||
if (!global_scrub_interval)
|
||||
global_scrub_interval = 30*86400;
|
||||
scrub_queue_depth = config["scrub_queue_depth"].uint64_value();
|
||||
if (scrub_queue_depth < 1 || scrub_queue_depth > MAX_RECOVERY_QUEUE)
|
||||
scrub_queue_depth = 1;
|
||||
scrub_sleep_ms = config["scrub_sleep"].uint64_value();
|
||||
scrub_list_limit = config["scrub_list_limit"].uint64_value();
|
||||
if (!scrub_list_limit)
|
||||
scrub_list_limit = 1000;
|
||||
}
|
||||
|
||||
void osd_t::bind_socket()
|
||||
|
|
21
src/osd.h
21
src/osd.h
|
@ -28,6 +28,7 @@
|
|||
#define OSD_PEERING_PGS 0x04
|
||||
#define OSD_FLUSHING_PGS 0x08
|
||||
#define OSD_RECOVERING 0x10
|
||||
#define OSD_SCRUBBING 0x20
|
||||
|
||||
#define MAX_AUTOSYNC_INTERVAL 3600
|
||||
#define DEFAULT_AUTOSYNC_INTERVAL 5
|
||||
|
@ -113,6 +114,10 @@ class osd_t
|
|||
int recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
|
||||
int inode_vanish_time = 60;
|
||||
int log_level = 0;
|
||||
uint64_t global_scrub_interval = 30*86400;
|
||||
uint64_t scrub_queue_depth = 1;
|
||||
uint64_t scrub_sleep_ms = 0;
|
||||
uint32_t scrub_list_limit = 1000;
|
||||
|
||||
// cluster state
|
||||
|
||||
|
@ -137,12 +142,21 @@ class osd_t
|
|||
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0, corrupted_objects = 0;
|
||||
int peering_state = 0;
|
||||
std::map<object_id, osd_recovery_op_t> recovery_ops;
|
||||
std::map<object_id, osd_op_t*> scrub_ops;
|
||||
bool recovery_last_degraded = true;
|
||||
pool_pg_num_t recovery_last_pg;
|
||||
object_id recovery_last_oid;
|
||||
int recovery_pg_done = 0, recovery_done = 0;
|
||||
osd_op_t *autosync_op = NULL;
|
||||
|
||||
// Scrubbing
|
||||
uint64_t scrub_nearest_ts = 0;
|
||||
int scrub_timer_id = -1;
|
||||
pool_pg_num_t scrub_last_pg;
|
||||
osd_op_t *scrub_list_op;
|
||||
pg_list_result_t scrub_cur_list = {};
|
||||
uint64_t scrub_list_pos = 0;
|
||||
|
||||
// Unstable writes
|
||||
uint64_t unstable_write_count = 0;
|
||||
std::map<osd_object_id_t, uint64_t> unstable_writes;
|
||||
|
@ -220,6 +234,13 @@ class osd_t
|
|||
bool continue_recovery();
|
||||
pg_osd_set_state_t* change_osd_set(pg_osd_set_state_t *st, pg_t *pg);
|
||||
|
||||
// scrub
|
||||
void scrub_list(pool_pg_num_t pg_id, osd_num_t role_osd, object_id min_oid);
|
||||
bool pick_next_scrub(object_id & next_oid);
|
||||
void submit_scrub_op(object_id oid);
|
||||
bool continue_scrub();
|
||||
void schedule_scrub(pg_t & pg);
|
||||
|
||||
// op execution
|
||||
void exec_op(osd_op_t *cur_op);
|
||||
void finish_op(osd_op_t *cur_op, int retval);
|
||||
|
|
|
@ -692,6 +692,12 @@ void osd_t::apply_pg_config()
|
|||
pg_it->second.all_peers == vec_all_peers)
|
||||
{
|
||||
// No change in osd_set and history
|
||||
if (pg_it->second.scrub_ts != pg_cfg.scrub_ts)
|
||||
{
|
||||
pg_it->second.scrub_ts = pg_cfg.scrub_ts;
|
||||
peering_state = peering_state | OSD_SCRUBBING;
|
||||
ringloop->wakeup();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
else
|
||||
|
@ -743,6 +749,7 @@ void osd_t::apply_pg_config()
|
|||
.reported_epoch = pg_cfg.epoch,
|
||||
.target_history = pg_cfg.target_history,
|
||||
.all_peers = vec_all_peers,
|
||||
.scrub_ts = pg_cfg.scrub_ts,
|
||||
.target_set = pg_cfg.target_set,
|
||||
};
|
||||
if (pg.scheme == POOL_SCHEME_EC)
|
||||
|
@ -873,6 +880,8 @@ void osd_t::report_pg_states()
|
|||
{ "all_peers", pg.all_peers },
|
||||
{ "osd_sets", pg.target_history },
|
||||
};
|
||||
if (pg.scrub_ts)
|
||||
history_value["scrub_ts"] = pg.scrub_ts;
|
||||
checks.push_back(json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", history_key },
|
||||
|
|
|
@ -24,6 +24,7 @@ void osd_t::handle_peers()
|
|||
if (!p.second.peering_state->list_ops.size())
|
||||
{
|
||||
p.second.calc_object_states(log_level);
|
||||
schedule_scrub(p.second);
|
||||
report_pg_state(p.second);
|
||||
incomplete_objects += p.second.incomplete_objects.size();
|
||||
misplaced_objects += p.second.misplaced_objects.size();
|
||||
|
@ -83,6 +84,13 @@ void osd_t::handle_peers()
|
|||
peering_state = peering_state & ~OSD_RECOVERING;
|
||||
}
|
||||
}
|
||||
if (peering_state & OSD_SCRUBBING)
|
||||
{
|
||||
if (!continue_scrub())
|
||||
{
|
||||
peering_state = peering_state & ~OSD_SCRUBBING;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::repeer_pgs(osd_num_t peer_osd)
|
||||
|
|
|
@ -463,7 +463,7 @@ void pg_t::calc_object_states(int log_level)
|
|||
void pg_t::print_state()
|
||||
{
|
||||
printf(
|
||||
"[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num,
|
||||
"[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num,
|
||||
(state & PG_STARTING) ? "starting" : "",
|
||||
(state & PG_OFFLINE) ? "offline" : "",
|
||||
(state & PG_PEERING) ? "peering" : "",
|
||||
|
@ -479,6 +479,7 @@ void pg_t::print_state()
|
|||
(state & PG_HAS_UNCLEAN) ? " + has_unclean" : "",
|
||||
(state & PG_HAS_INVALID) ? " + has_invalid" : "",
|
||||
(state & PG_LEFT_ON_DEAD) ? " + left_on_dead" : "",
|
||||
(state & PG_SCRUBBING) ? " + scrubbing" : "",
|
||||
total_count
|
||||
);
|
||||
}
|
||||
|
|
|
@ -95,6 +95,8 @@ struct pg_t
|
|||
// target history and all potential peers
|
||||
std::vector<std::vector<osd_num_t>> target_history;
|
||||
std::vector<osd_num_t> all_peers;
|
||||
// last scrub time
|
||||
uint64_t scrub_ts = 0;
|
||||
bool history_changed = false;
|
||||
// peer list from the last peering event
|
||||
std::vector<osd_num_t> cur_peers;
|
||||
|
|
|
@ -3,6 +3,326 @@
|
|||
|
||||
#include "osd_primary.h"
|
||||
|
||||
#define SELF_FD -1
|
||||
|
||||
void osd_t::scrub_list(pool_pg_num_t pg_id, osd_num_t role_osd, object_id min_oid)
|
||||
{
|
||||
pool_id_t pool_id = pg_id.pool_id;
|
||||
pg_num_t pg_num = pg_id.pg_num;
|
||||
assert(!scrub_list_op);
|
||||
if (role_osd == this->osd_num)
|
||||
{
|
||||
// Self
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = 0;
|
||||
op->peer_fd = SELF_FD;
|
||||
clock_gettime(CLOCK_REALTIME, &op->tv_begin);
|
||||
op->bs_op = new blockstore_op_t();
|
||||
op->bs_op->opcode = BS_OP_LIST;
|
||||
op->bs_op->pg_alignment = st_cli.pool_config[pool_id].pg_stripe_size;
|
||||
if (min_oid.inode != 0 || min_oid.stripe != 0)
|
||||
op->bs_op->min_oid = min_oid;
|
||||
else
|
||||
op->bs_op->min_oid.inode = ((uint64_t)pool_id << (64 - POOL_ID_BITS));
|
||||
op->bs_op->max_oid.inode = ((uint64_t)(pool_id+1) << (64 - POOL_ID_BITS)) - 1;
|
||||
op->bs_op->max_oid.stripe = UINT64_MAX;
|
||||
op->bs_op->list_stable_limit = scrub_list_limit;
|
||||
op->bs_op->pg_count = pg_counts[pool_id];
|
||||
op->bs_op->pg_number = pg_num-1;
|
||||
op->bs_op->callback = [this, op](blockstore_op_t *bs_op)
|
||||
{
|
||||
scrub_list_op = NULL;
|
||||
if (op->bs_op->retval < 0)
|
||||
{
|
||||
printf("Local OP_LIST failed: retval=%d\n", op->bs_op->retval);
|
||||
force_stop(1);
|
||||
return;
|
||||
}
|
||||
add_bs_subop_stats(op);
|
||||
scrub_cur_list = {
|
||||
.buf = (obj_ver_id*)op->bs_op->buf,
|
||||
.total_count = (uint64_t)op->bs_op->retval,
|
||||
.stable_count = op->bs_op->version,
|
||||
};
|
||||
delete op->bs_op;
|
||||
op->bs_op = NULL;
|
||||
delete op;
|
||||
continue_scrub();
|
||||
};
|
||||
scrub_list_op = op;
|
||||
bs->enqueue_op(op->bs_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Peer
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = OSD_OP_OUT;
|
||||
op->peer_fd = msgr.osd_peer_fds.at(role_osd);
|
||||
op->req = (osd_any_op_t){
|
||||
.sec_list = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = msgr.next_subop_id++,
|
||||
.opcode = OSD_OP_SEC_LIST,
|
||||
},
|
||||
.list_pg = pg_num,
|
||||
.pg_count = pg_counts[pool_id],
|
||||
.pg_stripe_size = st_cli.pool_config[pool_id].pg_stripe_size,
|
||||
.min_inode = min_oid.inode ? min_oid.inode : ((uint64_t)(pool_id) << (64 - POOL_ID_BITS)),
|
||||
.max_inode = ((uint64_t)(pool_id+1) << (64 - POOL_ID_BITS)) - 1,
|
||||
.min_stripe = min_oid.stripe,
|
||||
.stable_limit = scrub_list_limit,
|
||||
},
|
||||
};
|
||||
op->callback = [this, role_osd](osd_op_t *op)
|
||||
{
|
||||
scrub_list_op = NULL;
|
||||
if (op->reply.hdr.retval < 0)
|
||||
{
|
||||
printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval);
|
||||
int fail_fd = op->peer_fd;
|
||||
delete op;
|
||||
msgr.stop_client(fail_fd);
|
||||
return;
|
||||
}
|
||||
scrub_cur_list = {
|
||||
.buf = (obj_ver_id*)op->buf,
|
||||
.total_count = (uint64_t)op->reply.hdr.retval,
|
||||
.stable_count = op->reply.sec_list.stable_count,
|
||||
};
|
||||
// set op->buf to NULL so it doesn't get freed
|
||||
op->buf = NULL;
|
||||
delete op;
|
||||
continue_scrub();
|
||||
};
|
||||
scrub_list_op = op;
|
||||
msgr.outbox_push(op);
|
||||
}
|
||||
}
|
||||
|
||||
bool osd_t::pick_next_scrub(object_id & next_oid)
|
||||
{
|
||||
if (!pgs.size())
|
||||
{
|
||||
if (scrub_cur_list.buf)
|
||||
{
|
||||
free(scrub_cur_list.buf);
|
||||
scrub_cur_list = {};
|
||||
scrub_last_pg = {};
|
||||
}
|
||||
return false;
|
||||
}
|
||||
timespec tv_now;
|
||||
clock_gettime(CLOCK_REALTIME, &tv_now);
|
||||
bool rescan = scrub_last_pg.pool_id != 0 || scrub_last_pg.pg_num != 0;
|
||||
// Restart scanning from the same PG as the last time
|
||||
auto pg_it = pgs.lower_bound(scrub_last_pg);
|
||||
while (pg_it != pgs.end())
|
||||
{
|
||||
if (pg_it->second.state & PG_ACTIVE)
|
||||
{
|
||||
auto & pool_cfg = st_cli.pool_config.at(pg_it->first.pool_id);
|
||||
auto interval = pool_cfg.scrub_interval ? pool_cfg.scrub_interval : global_scrub_interval;
|
||||
if (pg_it->second.scrub_ts < tv_now.tv_sec-interval)
|
||||
{
|
||||
// Continue scrubbing from the next object
|
||||
if (scrub_last_pg == pg_it->first)
|
||||
{
|
||||
while (scrub_list_pos < scrub_cur_list.total_count)
|
||||
{
|
||||
auto oid = scrub_cur_list.buf[scrub_list_pos].oid;
|
||||
oid.stripe &= ~STRIPE_MASK;
|
||||
scrub_list_pos++;
|
||||
if (recovery_ops.find(oid) == recovery_ops.end() &&
|
||||
scrub_ops.find(oid) == scrub_ops.end())
|
||||
{
|
||||
next_oid = oid;
|
||||
if (!(pg_it->second.state & PG_SCRUBBING))
|
||||
{
|
||||
// Currently scrubbing this PG
|
||||
pg_it->second.state = pg_it->second.state | PG_SCRUBBING;
|
||||
report_pg_state(pg_it->second);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (scrub_last_pg == pg_it->first &&
|
||||
scrub_cur_list.total_count && scrub_list_pos >= scrub_cur_list.total_count &&
|
||||
scrub_cur_list.stable_count < scrub_list_limit)
|
||||
{
|
||||
// End of the list, mark this PG as scrubbed and go to the next PG
|
||||
}
|
||||
else
|
||||
{
|
||||
// Continue listing
|
||||
object_id scrub_last_oid;
|
||||
if (scrub_last_pg != pg_it->first)
|
||||
scrub_last_oid = (object_id){};
|
||||
else if (scrub_cur_list.stable_count > 0)
|
||||
{
|
||||
scrub_last_oid = scrub_cur_list.buf[scrub_cur_list.stable_count-1].oid;
|
||||
scrub_last_oid.stripe++;
|
||||
}
|
||||
osd_num_t scrub_osd = 0;
|
||||
for (osd_num_t pg_osd: pg_it->second.cur_set)
|
||||
{
|
||||
if (pg_osd == this->osd_num || scrub_osd == 0)
|
||||
scrub_osd = pg_osd;
|
||||
}
|
||||
if (!(pg_it->second.state & PG_SCRUBBING))
|
||||
{
|
||||
// Currently scrubbing this PG
|
||||
pg_it->second.state = pg_it->second.state | PG_SCRUBBING;
|
||||
report_pg_state(pg_it->second);
|
||||
}
|
||||
if (scrub_cur_list.buf)
|
||||
{
|
||||
free(scrub_cur_list.buf);
|
||||
scrub_cur_list = {};
|
||||
scrub_last_oid = {};
|
||||
}
|
||||
scrub_last_pg = pg_it->first;
|
||||
scrub_list(pg_it->first, scrub_osd, scrub_last_oid);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (pg_it->second.state & PG_SCRUBBING)
|
||||
{
|
||||
pg_it->second.scrub_ts = tv_now.tv_sec;
|
||||
pg_it->second.state = pg_it->second.state & ~PG_SCRUBBING;
|
||||
pg_it->second.history_changed = true;
|
||||
report_pg_state(pg_it->second);
|
||||
schedule_scrub(pg_it->second);
|
||||
}
|
||||
// The list is definitely not needed anymore
|
||||
if (scrub_cur_list.buf)
|
||||
{
|
||||
free(scrub_cur_list.buf);
|
||||
scrub_cur_list = {};
|
||||
}
|
||||
}
|
||||
pg_it++;
|
||||
if (pg_it == pgs.end() && rescan)
|
||||
{
|
||||
// Scan one more time to guarantee that there are no PGs to scrub
|
||||
pg_it = pgs.begin();
|
||||
rescan = false;
|
||||
}
|
||||
}
|
||||
// Scanned all PGs - no more scrubs to do
|
||||
return false;
|
||||
}
|
||||
|
||||
void osd_t::submit_scrub_op(object_id oid)
|
||||
{
|
||||
auto osd_op = new osd_op_t();
|
||||
osd_op->op_type = OSD_OP_OUT;
|
||||
osd_op->req = (osd_any_op_t){
|
||||
.rw = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = 1,
|
||||
.opcode = OSD_OP_SCRUB,
|
||||
},
|
||||
.inode = oid.inode,
|
||||
.offset = oid.stripe,
|
||||
.len = 0,
|
||||
},
|
||||
};
|
||||
if (log_level > 2)
|
||||
{
|
||||
printf("Submitting scrub for %lx:%lx\n", oid.inode, oid.stripe);
|
||||
}
|
||||
osd_op->callback = [this](osd_op_t *osd_op)
|
||||
{
|
||||
object_id oid = { .inode = osd_op->req.rw.inode, .stripe = osd_op->req.rw.offset };
|
||||
if (osd_op->reply.hdr.retval < 0 && osd_op->reply.hdr.retval != -ENOENT)
|
||||
{
|
||||
// Scrub error
|
||||
printf(
|
||||
"Scrub failed with object %lx:%lx (PG %u/%u): error %ld\n",
|
||||
oid.inode, oid.stripe, INODE_POOL(oid.inode),
|
||||
map_to_pg(oid, st_cli.pool_config.at(INODE_POOL(oid.inode)).pg_stripe_size),
|
||||
osd_op->reply.hdr.retval
|
||||
);
|
||||
}
|
||||
else if (log_level > 2)
|
||||
{
|
||||
printf("Scrubbed %lx:%lx OK\n", oid.inode, oid.stripe);
|
||||
}
|
||||
delete osd_op;
|
||||
if (scrub_sleep_ms)
|
||||
{
|
||||
this->tfd->set_timer(scrub_sleep_ms, false, [this, oid](int timer_id)
|
||||
{
|
||||
scrub_ops.erase(oid);
|
||||
continue_scrub();
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
scrub_ops.erase(oid);
|
||||
continue_scrub();
|
||||
}
|
||||
};
|
||||
scrub_ops[oid] = osd_op;
|
||||
exec_op(osd_op);
|
||||
}
|
||||
|
||||
// Triggers scrub requests
|
||||
// Scrub reads data from all replicas and compares it
|
||||
// To scrub first we need to read objects listings
|
||||
bool osd_t::continue_scrub()
|
||||
{
|
||||
if (scrub_list_op)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
while (scrub_ops.size() < scrub_queue_depth)
|
||||
{
|
||||
object_id oid;
|
||||
if (pick_next_scrub(oid))
|
||||
submit_scrub_op(oid);
|
||||
else
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void osd_t::schedule_scrub(pg_t & pg)
|
||||
{
|
||||
auto & pool_cfg = st_cli.pool_config.at(pg.pool_id);
|
||||
auto interval = pool_cfg.scrub_interval ? pool_cfg.scrub_interval : global_scrub_interval;
|
||||
if (!scrub_nearest_ts || scrub_nearest_ts > pg.scrub_ts+interval)
|
||||
{
|
||||
scrub_nearest_ts = pg.scrub_ts+interval;
|
||||
timespec tv_now;
|
||||
clock_gettime(CLOCK_REALTIME, &tv_now);
|
||||
if (scrub_timer_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(scrub_timer_id);
|
||||
scrub_timer_id = -1;
|
||||
}
|
||||
if (tv_now.tv_sec > scrub_nearest_ts)
|
||||
{
|
||||
scrub_nearest_ts = 0;
|
||||
peering_state = peering_state | OSD_SCRUBBING;
|
||||
ringloop->wakeup();
|
||||
}
|
||||
else
|
||||
{
|
||||
scrub_timer_id = tfd->set_timer((scrub_nearest_ts-tv_now.tv_sec)*1000, false, [this](int timer_id)
|
||||
{
|
||||
scrub_timer_id = -1;
|
||||
scrub_nearest_ts = 0;
|
||||
peering_state = peering_state | OSD_SCRUBBING;
|
||||
ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::continue_primary_scrub(osd_op_t *cur_op)
|
||||
{
|
||||
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
|
||||
|
|
|
@ -3,9 +3,9 @@
|
|||
|
||||
#include "pg_states.h"
|
||||
|
||||
const int pg_state_bit_count = 15;
|
||||
const int pg_state_bit_count = 16;
|
||||
|
||||
const int pg_state_bits[15] = {
|
||||
const int pg_state_bits[16] = {
|
||||
PG_STARTING,
|
||||
PG_PEERING,
|
||||
PG_INCOMPLETE,
|
||||
|
@ -21,9 +21,10 @@ const int pg_state_bits[15] = {
|
|||
PG_HAS_UNCLEAN,
|
||||
PG_HAS_INVALID,
|
||||
PG_LEFT_ON_DEAD,
|
||||
PG_SCRUBBING,
|
||||
};
|
||||
|
||||
const char *pg_state_names[15] = {
|
||||
const char *pg_state_names[16] = {
|
||||
"starting",
|
||||
"peering",
|
||||
"incomplete",
|
||||
|
@ -39,4 +40,5 @@ const char *pg_state_names[15] = {
|
|||
"has_unclean",
|
||||
"has_invalid",
|
||||
"left_on_dead",
|
||||
"scrubbing",
|
||||
};
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#define PG_HAS_INVALID (1<<12)
|
||||
#define PG_HAS_CORRUPTED (1<<13)
|
||||
#define PG_LEFT_ON_DEAD (1<<14)
|
||||
#define PG_SCRUBBING (1<<15)
|
||||
|
||||
// Lower bits that represent object role (EC 0/1/2... or always 0 with replication)
|
||||
// 12 bits is a safe default that doesn't depend on pg_stripe_size or pg_block_size
|
||||
|
|
|
@ -249,3 +249,35 @@ void print_help(const char *help_text, std::string exe_name, std::string cmd, bo
|
|||
fwrite(filtered_text.data(), filtered_text.size(), 1, stdout);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
uint64_t parse_time(std::string time_str, bool *ok)
|
||||
{
|
||||
if (!time_str.length())
|
||||
{
|
||||
if (ok)
|
||||
*ok = false;
|
||||
return 0;
|
||||
}
|
||||
uint64_t mul = 1;
|
||||
char type_char = tolower(time_str[time_str.length()-1]);
|
||||
if (type_char == 's' || type_char == 'm' || type_char == 'h' || type_char == 'd' || type_char == 'y')
|
||||
{
|
||||
if (type_char == 's')
|
||||
mul = 1;
|
||||
else if (time_str[time_str.length()-1] == 'M')
|
||||
mul = 30*86400;
|
||||
else if (type_char == 'm')
|
||||
mul = 60;
|
||||
else if (type_char == 'h')
|
||||
mul = 3600;
|
||||
else if (type_char == 'd')
|
||||
mul = 86400;
|
||||
else /*if (type_char == 'y')*/
|
||||
mul = 86400*365;
|
||||
time_str = time_str.substr(0, time_str.length()-1);
|
||||
}
|
||||
uint64_t ts = stoull_full(time_str, 0) * mul;
|
||||
if (ok)
|
||||
*ok = !(ts == 0 && time_str != "0" && (time_str != "" || mul != 1));
|
||||
return ts;
|
||||
}
|
||||
|
|
|
@ -15,3 +15,4 @@ std::string str_replace(const std::string & in, const std::string & needle, cons
|
|||
uint64_t stoull_full(const std::string & str, int base = 0);
|
||||
std::string format_size(uint64_t size, bool nobytes = false);
|
||||
void print_help(const char *help_text, std::string exe_name, std::string cmd, bool all);
|
||||
uint64_t parse_time(std::string time_str, bool *ok = NULL);
|
||||
|
|
Loading…
Reference in New Issue