diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 63da35d0..25401648 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -497,7 +497,8 @@ resume_1: } // All done #ifdef BLOCKSTORE_DEBUG - printf("Flushed %lx:%lx v%lu (%ld left)\n", cur.oid.inode, cur.oid.stripe, cur.version, flusher->flush_queue.size()); + printf("Flushed %lx:%lx v%lu (%d copies, wr:%d, del:%d), %ld left\n", cur.oid.inode, cur.oid.stripe, cur.version, + copy_count, has_writes, has_delete, flusher->flush_queue.size()); #endif flusher->active_flushers--; repeat_it = flusher->sync_to_repeat.find(cur.oid); @@ -530,7 +531,16 @@ bool journal_flusher_co::scan_dirty(int wait_base) clean_init_bitmap = false; while (1) { - if (dirty_it->second.state == (BS_ST_SMALL_WRITE | BS_ST_STABLE) && !skip_copy) + if (!IS_STABLE(dirty_it->second.state)) + { + char err[1024]; + snprintf( + err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu state during flush: %d", + dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state + ); + throw std::runtime_error(err); + } + else if (IS_JOURNAL(dirty_it->second.state) && !skip_copy) { // First we submit all reads has_writes = true; @@ -573,7 +583,7 @@ bool journal_flusher_co::scan_dirty(int wait_base) } } } - else if (dirty_it->second.state == (BS_ST_BIG_WRITE | BS_ST_STABLE) && !skip_copy) + else if (IS_BIG_WRITE(dirty_it->second.state) && !skip_copy) { // There is an unflushed big write. Copy small writes in its position has_writes = true; @@ -583,21 +593,12 @@ bool journal_flusher_co::scan_dirty(int wait_base) clean_bitmap_len = dirty_it->second.len; skip_copy = true; } - else if (dirty_it->second.state == (BS_ST_DELETE | BS_ST_STABLE) && !skip_copy) + else if (IS_DELETE(dirty_it->second.state) && !skip_copy) { // There is an unflushed delete has_delete = true; skip_copy = true; } - else if (!IS_STABLE(dirty_it->second.state)) - { - char err[1024]; - snprintf( - err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu state during flush: %d", - dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state - ); - throw std::runtime_error(err); - } dirty_start = dirty_it; if (dirty_it == bs->dirty_db.begin()) { @@ -663,7 +664,7 @@ void journal_flusher_co::update_clean_db() if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) { #ifdef BLOCKSTORE_DEBUG - printf("Free block %lu\n", old_clean_loc >> bs->block_order); + printf("Free block %lu (new location is %lu)\n", old_clean_loc >> bs->block_order, clean_loc >> bs->block_order); #endif bs->data_alloc->set(old_clean_loc >> bs->block_order, false); } diff --git a/blockstore_init.cpp b/blockstore_init.cpp index cabd21bd..0750cdd2 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -108,7 +108,7 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo { // free the previous block #ifdef BLOCKSTORE_DEBUG - printf("Free block %lu\n", clean_it->second.location >> bs->block_order); + printf("Free block %lu (new location is %lu)\n", clean_it->second.location >> block_order, done_cnt+i >> block_order); #endif bs->data_alloc->set(clean_it->second.location >> block_order, false); } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index a7ffcd41..f34c5f51 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -51,6 +51,13 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) op->retval = 0; return false; } + if (op->opcode == BS_OP_WRITE_STABLE && immediate_commit != IMMEDIATE_ALL && + (op->len == block_size || deleted || immediate_commit != IMMEDIATE_SMALL)) + { + // WRITE_STABLE only works with immediate commit by now + op->retval = -EINVAL; + return false; + } if (is_inflight_big && !is_del && !deleted && op->len < block_size && immediate_commit != IMMEDIATE_ALL) { diff --git a/cluster_client.cpp b/cluster_client.cpp index 45bb8572..64465bc4 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -183,6 +183,7 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) { msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; } + st_cli.start_etcd_watcher(); st_cli.load_pgs(); } diff --git a/cluster_client.h b/cluster_client.h index 7571c65b..4358b3d6 100644 --- a/cluster_client.h +++ b/cluster_client.h @@ -59,6 +59,7 @@ class cluster_client_t // FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory. uint64_t client_dirty_limit = 0; int log_level; + // FIXME: Put up_wait_retry_interval into config and fix it so it could actually work int up_wait_retry_interval = 500; // ms uint64_t op_id = 1; diff --git a/dump_journal.cpp b/dump_journal.cpp index 78b2a728..51e82c63 100644 --- a/dump_journal.cpp +++ b/dump_journal.cpp @@ -94,7 +94,7 @@ void journal_dump_t::dump_block(void *buf) while (pos < journal_block) { journal_entry *je = (journal_entry*)(buf + pos); - if (je->magic != JOURNAL_MAGIC || je->type < JE_START || je->type > JE_DELETE) + if (je->magic != JOURNAL_MAGIC || je->type < JE_MIN || je->type > JE_MAX) { break; } diff --git a/fio_cluster.cpp b/fio_cluster.cpp index d5ef1d61..02a5e51e 100644 --- a/fio_cluster.cpp +++ b/fio_cluster.cpp @@ -51,6 +51,7 @@ struct sec_options char *etcd_prefix = NULL; uint64_t pool = 0; uint64_t inode = 0; + int cluster_log = 0; int trace = 0; }; @@ -91,6 +92,16 @@ static struct fio_option options[] = { .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_FILENAME, }, + { + .name = "cluster_log_level", + .lname = "cluster log level", + .type = FIO_OPT_BOOL, + .off1 = offsetof(struct sec_options, cluster_log), + .help = "Set log level for the Vitastor client", + .def = "0", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, { .name = "osd_trace", .lname = "OSD trace", @@ -151,6 +162,7 @@ static int sec_init(struct thread_data *td) json11::Json cfg = json11::Json::object { { "etcd_address", std::string(o->etcd_host) }, { "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/vitastor") }, + { "log_level", o->cluster_log }, }; if (o->pool) diff --git a/messenger.cpp b/messenger.cpp index 1bb4fdd1..c7ba6b02 100644 --- a/messenger.cpp +++ b/messenger.cpp @@ -257,12 +257,13 @@ void osd_messenger_t::check_peer_config(osd_client_t & cl) { err = true; printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl.osd_num); - on_connect_peer(cl.osd_num, -1); } } if (err) { + osd_num_t osd_num = cl.osd_num; stop_client(op->peer_fd); + on_connect_peer(osd_num, -1); delete op; return; } diff --git a/osd.h b/osd.h index 564a7f74..d3d7e4cc 100644 --- a/osd.h +++ b/osd.h @@ -193,7 +193,7 @@ class osd_t void add_bs_subop_stats(osd_op_t *subop); void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval); void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op); - void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t pg_size, pg_osd_set_t & loc_set); + void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set); void submit_primary_sync_subops(osd_op_t *cur_op); void submit_primary_stab_subops(osd_op_t *cur_op); diff --git a/osd_peering.cpp b/osd_peering.cpp index cee57df8..6a5f5021 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -249,6 +249,7 @@ void osd_t::start_pg_peering(pg_t & pg) if (!pg.peering_state) { pg.peering_state = new pg_peering_state_t(); + pg.peering_state->pool_id = pg.pool_id; pg.peering_state->pg_num = pg.pg_num; } for (osd_num_t peer_osd: cur_peers) diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 1926d6e9..33339c5d 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -238,7 +238,7 @@ void pg_obj_state_check_t::finish_object() state = OBJ_INCOMPLETE; pg->state = pg->state | PG_HAS_INCOMPLETE; } - else if (n_roles < pg->pg_cursize) + else if ((replicated ? n_copies : n_roles) < pg->pg_cursize) { if (log_level > 1) { @@ -247,7 +247,7 @@ void pg_obj_state_check_t::finish_object() state = OBJ_DEGRADED; pg->state = pg->state | PG_HAS_DEGRADED; } - if (n_mismatched > 0) + else if (n_mismatched > 0) { if (log_level > 1 && (replicated || n_roles >= pg->pg_cursize)) { @@ -256,7 +256,7 @@ void pg_obj_state_check_t::finish_object() state |= OBJ_MISPLACED; pg->state = pg->state | PG_HAS_MISPLACED; } - if (log_level > 1 && (n_roles < pg->pg_cursize || n_mismatched > 0)) + if (log_level > 1 && ((replicated ? n_copies : n_roles) < pg->pg_cursize || n_mismatched > 0)) { if (log_level > 2) { @@ -308,8 +308,11 @@ void pg_obj_state_check_t::finish_object() .osd_num = list[i].osd_num, .outdated = true, }); - state |= OBJ_MISPLACED; - pg->state = pg->state | PG_HAS_MISPLACED; + if (!(state & (OBJ_INCOMPLETE | OBJ_DEGRADED))) + { + state |= OBJ_MISPLACED; + pg->state = pg->state | PG_HAS_MISPLACED; + } } } } @@ -327,16 +330,34 @@ void pg_obj_state_check_t::finish_object() if (it == pg->state_dict.end()) { std::vector read_target; - read_target.resize(pg->pg_size); - for (int i = 0; i < pg->pg_size; i++) + if (replicated) { - read_target[i] = 0; - } - for (auto & o: osd_set) - { - if (!o.outdated) + for (auto & o: osd_set) { - read_target[o.role] = o.osd_num; + if (!o.outdated) + { + read_target.push_back(o.osd_num); + } + } + while (read_target.size() < pg->pg_size) + { + // FIXME: This is because we then use .data() and assume it's at least long + read_target.push_back(0); + } + } + else + { + read_target.resize(pg->pg_size); + for (int i = 0; i < pg->pg_size; i++) + { + read_target[i] = 0; + } + for (auto & o: osd_set) + { + if (!o.outdated) + { + read_target[o.role] = o.osd_num; + } } } pg->state_dict[osd_set] = { diff --git a/osd_primary.cpp b/osd_primary.cpp index ff27f09f..e0c72de0 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -43,7 +43,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) ); op_data->pg_num = pg_num; op_data->oid = oid; - op_data->stripes = pool_cfg.scheme == POOL_SCHEME_REPLICATED ? NULL : ((osd_rmw_stripe_t*)(op_data+1)); + op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1)); op_data->scheme = pool_cfg.scheme; cur_op->op_data = op_data; split_stripes((pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_minsize), @@ -324,7 +324,7 @@ resume_5: recovery_stat_count[0][recovery_type]++; recovery_stat_bytes[0][recovery_type] = 0; } - for (int role = 0; role < pg.pg_size; role++) + for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); role++) { recovery_stat_bytes[0][recovery_type] += op_data->stripes[role].write_end - op_data->stripes[role].write_start; } diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp index 12c7471f..5b95a3a8 100644 --- a/osd_primary_subops.cpp +++ b/osd_primary_subops.cpp @@ -362,7 +362,7 @@ static bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num) return false; } -void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint64_t pg_size, pg_osd_set_t & loc_set) +void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set) { osd_primary_op_data_t *op_data = cur_op->op_data; bool rep = op_data->scheme == POOL_SCHEME_REPLICATED; @@ -370,7 +370,7 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint // ordered comparison for EC/XOR, unordered for replicated pools for (auto & chunk: loc_set) { - if (!cur_set || (rep ? contains_osd(cur_set, pg_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role])) + if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role])) { extra_chunks++; } @@ -386,7 +386,7 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint int i = 0; for (auto & chunk: loc_set) { - if (!cur_set || (rep ? contains_osd(cur_set, pg_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role])) + if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role])) { int stripe_num = op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role; if (chunk.osd_num == this->osd_num) diff --git a/osd_secondary.cpp b/osd_secondary.cpp index 2736f3ed..239d0d5f 100644 --- a/osd_secondary.cpp +++ b/osd_secondary.cpp @@ -83,6 +83,7 @@ void osd_t::exec_secondary(osd_op_t *cur_op) if (cur_op->req.sec_list.pg_count < cur_op->req.sec_list.list_pg) { // requested pg number is greater than total pg count + printf("Invalid LIST request: pg count %u < pg number %u\n", cur_op->req.sec_list.pg_count, cur_op->req.sec_list.list_pg); cur_op->bs_op->retval = -EINVAL; secondary_op_callback(cur_op); return;