forked from vitalif/vitastor
Fix replicated pool bugs
parent
242d9a42a2
commit
44973e7f27
|
@ -497,7 +497,8 @@ resume_1:
|
|||
}
|
||||
// All done
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("Flushed %lx:%lx v%lu (%ld left)\n", cur.oid.inode, cur.oid.stripe, cur.version, flusher->flush_queue.size());
|
||||
printf("Flushed %lx:%lx v%lu (%d copies, wr:%d, del:%d), %ld left\n", cur.oid.inode, cur.oid.stripe, cur.version,
|
||||
copy_count, has_writes, has_delete, flusher->flush_queue.size());
|
||||
#endif
|
||||
flusher->active_flushers--;
|
||||
repeat_it = flusher->sync_to_repeat.find(cur.oid);
|
||||
|
@ -530,7 +531,16 @@ bool journal_flusher_co::scan_dirty(int wait_base)
|
|||
clean_init_bitmap = false;
|
||||
while (1)
|
||||
{
|
||||
if (dirty_it->second.state == (BS_ST_SMALL_WRITE | BS_ST_STABLE) && !skip_copy)
|
||||
if (!IS_STABLE(dirty_it->second.state))
|
||||
{
|
||||
char err[1024];
|
||||
snprintf(
|
||||
err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu state during flush: %d",
|
||||
dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state
|
||||
);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
else if (IS_JOURNAL(dirty_it->second.state) && !skip_copy)
|
||||
{
|
||||
// First we submit all reads
|
||||
has_writes = true;
|
||||
|
@ -573,7 +583,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
|
|||
}
|
||||
}
|
||||
}
|
||||
else if (dirty_it->second.state == (BS_ST_BIG_WRITE | BS_ST_STABLE) && !skip_copy)
|
||||
else if (IS_BIG_WRITE(dirty_it->second.state) && !skip_copy)
|
||||
{
|
||||
// There is an unflushed big write. Copy small writes in its position
|
||||
has_writes = true;
|
||||
|
@ -583,21 +593,12 @@ bool journal_flusher_co::scan_dirty(int wait_base)
|
|||
clean_bitmap_len = dirty_it->second.len;
|
||||
skip_copy = true;
|
||||
}
|
||||
else if (dirty_it->second.state == (BS_ST_DELETE | BS_ST_STABLE) && !skip_copy)
|
||||
else if (IS_DELETE(dirty_it->second.state) && !skip_copy)
|
||||
{
|
||||
// There is an unflushed delete
|
||||
has_delete = true;
|
||||
skip_copy = true;
|
||||
}
|
||||
else if (!IS_STABLE(dirty_it->second.state))
|
||||
{
|
||||
char err[1024];
|
||||
snprintf(
|
||||
err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu state during flush: %d",
|
||||
dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state
|
||||
);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
dirty_start = dirty_it;
|
||||
if (dirty_it == bs->dirty_db.begin())
|
||||
{
|
||||
|
@ -663,7 +664,7 @@ void journal_flusher_co::update_clean_db()
|
|||
if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc)
|
||||
{
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("Free block %lu\n", old_clean_loc >> bs->block_order);
|
||||
printf("Free block %lu (new location is %lu)\n", old_clean_loc >> bs->block_order, clean_loc >> bs->block_order);
|
||||
#endif
|
||||
bs->data_alloc->set(old_clean_loc >> bs->block_order, false);
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
|
|||
{
|
||||
// free the previous block
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("Free block %lu\n", clean_it->second.location >> bs->block_order);
|
||||
printf("Free block %lu (new location is %lu)\n", clean_it->second.location >> block_order, done_cnt+i >> block_order);
|
||||
#endif
|
||||
bs->data_alloc->set(clean_it->second.location >> block_order, false);
|
||||
}
|
||||
|
|
|
@ -51,6 +51,13 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
|||
op->retval = 0;
|
||||
return false;
|
||||
}
|
||||
if (op->opcode == BS_OP_WRITE_STABLE && immediate_commit != IMMEDIATE_ALL &&
|
||||
(op->len == block_size || deleted || immediate_commit != IMMEDIATE_SMALL))
|
||||
{
|
||||
// WRITE_STABLE only works with immediate commit by now
|
||||
op->retval = -EINVAL;
|
||||
return false;
|
||||
}
|
||||
if (is_inflight_big && !is_del && !deleted && op->len < block_size &&
|
||||
immediate_commit != IMMEDIATE_ALL)
|
||||
{
|
||||
|
|
|
@ -183,6 +183,7 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
|
|||
{
|
||||
msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
}
|
||||
st_cli.start_etcd_watcher();
|
||||
st_cli.load_pgs();
|
||||
}
|
||||
|
||||
|
|
|
@ -59,6 +59,7 @@ class cluster_client_t
|
|||
// FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory.
|
||||
uint64_t client_dirty_limit = 0;
|
||||
int log_level;
|
||||
// FIXME: Put up_wait_retry_interval into config and fix it so it could actually work
|
||||
int up_wait_retry_interval = 500; // ms
|
||||
|
||||
uint64_t op_id = 1;
|
||||
|
|
|
@ -94,7 +94,7 @@ void journal_dump_t::dump_block(void *buf)
|
|||
while (pos < journal_block)
|
||||
{
|
||||
journal_entry *je = (journal_entry*)(buf + pos);
|
||||
if (je->magic != JOURNAL_MAGIC || je->type < JE_START || je->type > JE_DELETE)
|
||||
if (je->magic != JOURNAL_MAGIC || je->type < JE_MIN || je->type > JE_MAX)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ struct sec_options
|
|||
char *etcd_prefix = NULL;
|
||||
uint64_t pool = 0;
|
||||
uint64_t inode = 0;
|
||||
int cluster_log = 0;
|
||||
int trace = 0;
|
||||
};
|
||||
|
||||
|
@ -91,6 +92,16 @@ static struct fio_option options[] = {
|
|||
.category = FIO_OPT_C_ENGINE,
|
||||
.group = FIO_OPT_G_FILENAME,
|
||||
},
|
||||
{
|
||||
.name = "cluster_log_level",
|
||||
.lname = "cluster log level",
|
||||
.type = FIO_OPT_BOOL,
|
||||
.off1 = offsetof(struct sec_options, cluster_log),
|
||||
.help = "Set log level for the Vitastor client",
|
||||
.def = "0",
|
||||
.category = FIO_OPT_C_ENGINE,
|
||||
.group = FIO_OPT_G_FILENAME,
|
||||
},
|
||||
{
|
||||
.name = "osd_trace",
|
||||
.lname = "OSD trace",
|
||||
|
@ -151,6 +162,7 @@ static int sec_init(struct thread_data *td)
|
|||
json11::Json cfg = json11::Json::object {
|
||||
{ "etcd_address", std::string(o->etcd_host) },
|
||||
{ "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/vitastor") },
|
||||
{ "log_level", o->cluster_log },
|
||||
};
|
||||
|
||||
if (o->pool)
|
||||
|
|
|
@ -257,12 +257,13 @@ void osd_messenger_t::check_peer_config(osd_client_t & cl)
|
|||
{
|
||||
err = true;
|
||||
printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl.osd_num);
|
||||
on_connect_peer(cl.osd_num, -1);
|
||||
}
|
||||
}
|
||||
if (err)
|
||||
{
|
||||
osd_num_t osd_num = cl.osd_num;
|
||||
stop_client(op->peer_fd);
|
||||
on_connect_peer(osd_num, -1);
|
||||
delete op;
|
||||
return;
|
||||
}
|
||||
|
|
2
osd.h
2
osd.h
|
@ -193,7 +193,7 @@ class osd_t
|
|||
void add_bs_subop_stats(osd_op_t *subop);
|
||||
void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
|
||||
void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t pg_size, pg_osd_set_t & loc_set);
|
||||
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set);
|
||||
void submit_primary_sync_subops(osd_op_t *cur_op);
|
||||
void submit_primary_stab_subops(osd_op_t *cur_op);
|
||||
|
||||
|
|
|
@ -249,6 +249,7 @@ void osd_t::start_pg_peering(pg_t & pg)
|
|||
if (!pg.peering_state)
|
||||
{
|
||||
pg.peering_state = new pg_peering_state_t();
|
||||
pg.peering_state->pool_id = pg.pool_id;
|
||||
pg.peering_state->pg_num = pg.pg_num;
|
||||
}
|
||||
for (osd_num_t peer_osd: cur_peers)
|
||||
|
|
|
@ -238,7 +238,7 @@ void pg_obj_state_check_t::finish_object()
|
|||
state = OBJ_INCOMPLETE;
|
||||
pg->state = pg->state | PG_HAS_INCOMPLETE;
|
||||
}
|
||||
else if (n_roles < pg->pg_cursize)
|
||||
else if ((replicated ? n_copies : n_roles) < pg->pg_cursize)
|
||||
{
|
||||
if (log_level > 1)
|
||||
{
|
||||
|
@ -247,7 +247,7 @@ void pg_obj_state_check_t::finish_object()
|
|||
state = OBJ_DEGRADED;
|
||||
pg->state = pg->state | PG_HAS_DEGRADED;
|
||||
}
|
||||
if (n_mismatched > 0)
|
||||
else if (n_mismatched > 0)
|
||||
{
|
||||
if (log_level > 1 && (replicated || n_roles >= pg->pg_cursize))
|
||||
{
|
||||
|
@ -256,7 +256,7 @@ void pg_obj_state_check_t::finish_object()
|
|||
state |= OBJ_MISPLACED;
|
||||
pg->state = pg->state | PG_HAS_MISPLACED;
|
||||
}
|
||||
if (log_level > 1 && (n_roles < pg->pg_cursize || n_mismatched > 0))
|
||||
if (log_level > 1 && ((replicated ? n_copies : n_roles) < pg->pg_cursize || n_mismatched > 0))
|
||||
{
|
||||
if (log_level > 2)
|
||||
{
|
||||
|
@ -308,8 +308,11 @@ void pg_obj_state_check_t::finish_object()
|
|||
.osd_num = list[i].osd_num,
|
||||
.outdated = true,
|
||||
});
|
||||
state |= OBJ_MISPLACED;
|
||||
pg->state = pg->state | PG_HAS_MISPLACED;
|
||||
if (!(state & (OBJ_INCOMPLETE | OBJ_DEGRADED)))
|
||||
{
|
||||
state |= OBJ_MISPLACED;
|
||||
pg->state = pg->state | PG_HAS_MISPLACED;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -327,16 +330,34 @@ void pg_obj_state_check_t::finish_object()
|
|||
if (it == pg->state_dict.end())
|
||||
{
|
||||
std::vector<uint64_t> read_target;
|
||||
read_target.resize(pg->pg_size);
|
||||
for (int i = 0; i < pg->pg_size; i++)
|
||||
if (replicated)
|
||||
{
|
||||
read_target[i] = 0;
|
||||
}
|
||||
for (auto & o: osd_set)
|
||||
{
|
||||
if (!o.outdated)
|
||||
for (auto & o: osd_set)
|
||||
{
|
||||
read_target[o.role] = o.osd_num;
|
||||
if (!o.outdated)
|
||||
{
|
||||
read_target.push_back(o.osd_num);
|
||||
}
|
||||
}
|
||||
while (read_target.size() < pg->pg_size)
|
||||
{
|
||||
// FIXME: This is because we then use .data() and assume it's at least <pg_size> long
|
||||
read_target.push_back(0);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
read_target.resize(pg->pg_size);
|
||||
for (int i = 0; i < pg->pg_size; i++)
|
||||
{
|
||||
read_target[i] = 0;
|
||||
}
|
||||
for (auto & o: osd_set)
|
||||
{
|
||||
if (!o.outdated)
|
||||
{
|
||||
read_target[o.role] = o.osd_num;
|
||||
}
|
||||
}
|
||||
}
|
||||
pg->state_dict[osd_set] = {
|
||||
|
|
|
@ -43,7 +43,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
|||
);
|
||||
op_data->pg_num = pg_num;
|
||||
op_data->oid = oid;
|
||||
op_data->stripes = pool_cfg.scheme == POOL_SCHEME_REPLICATED ? NULL : ((osd_rmw_stripe_t*)(op_data+1));
|
||||
op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1));
|
||||
op_data->scheme = pool_cfg.scheme;
|
||||
cur_op->op_data = op_data;
|
||||
split_stripes((pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_minsize),
|
||||
|
@ -324,7 +324,7 @@ resume_5:
|
|||
recovery_stat_count[0][recovery_type]++;
|
||||
recovery_stat_bytes[0][recovery_type] = 0;
|
||||
}
|
||||
for (int role = 0; role < pg.pg_size; role++)
|
||||
for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); role++)
|
||||
{
|
||||
recovery_stat_bytes[0][recovery_type] += op_data->stripes[role].write_end - op_data->stripes[role].write_start;
|
||||
}
|
||||
|
|
|
@ -362,7 +362,7 @@ static bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num)
|
|||
return false;
|
||||
}
|
||||
|
||||
void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint64_t pg_size, pg_osd_set_t & loc_set)
|
||||
void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
|
||||
|
@ -370,7 +370,7 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint
|
|||
// ordered comparison for EC/XOR, unordered for replicated pools
|
||||
for (auto & chunk: loc_set)
|
||||
{
|
||||
if (!cur_set || (rep ? contains_osd(cur_set, pg_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
|
||||
if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
|
||||
{
|
||||
extra_chunks++;
|
||||
}
|
||||
|
@ -386,7 +386,7 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint
|
|||
int i = 0;
|
||||
for (auto & chunk: loc_set)
|
||||
{
|
||||
if (!cur_set || (rep ? contains_osd(cur_set, pg_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
|
||||
if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
|
||||
{
|
||||
int stripe_num = op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role;
|
||||
if (chunk.osd_num == this->osd_num)
|
||||
|
|
|
@ -83,6 +83,7 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
|
|||
if (cur_op->req.sec_list.pg_count < cur_op->req.sec_list.list_pg)
|
||||
{
|
||||
// requested pg number is greater than total pg count
|
||||
printf("Invalid LIST request: pg count %u < pg number %u\n", cur_op->req.sec_list.pg_count, cur_op->req.sec_list.list_pg);
|
||||
cur_op->bs_op->retval = -EINVAL;
|
||||
secondary_op_callback(cur_op);
|
||||
return;
|
||||
|
|
Loading…
Reference in New Issue