Fix journal sequencing: make each journal write wait for all previous journal writes
Test / buildenv (push) Successful in 10s Details
Test / build (push) Successful in 12s Details
Test / test_cas (push) Successful in 8s Details
Test / make_test (push) Successful in 36s Details
Test / test_change_pg_count (push) Successful in 38s Details
Test / test_change_pg_size (push) Successful in 9s Details
Test / test_change_pg_count_ec (push) Successful in 35s Details
Test / test_create_nomaxid (push) Successful in 8s Details
Test / test_etcd_fail (push) Successful in 56s Details
Test / test_interrupted_rebalance_imm (push) Successful in 2m7s Details
Test / test_interrupted_rebalance (push) Successful in 2m14s Details
Test / test_failure_domain (push) Successful in 38s Details
Test / test_interrupted_rebalance_ec (push) Successful in 2m17s Details
Test / test_snapshot (push) Successful in 18s Details
Test / test_add_osd (push) Successful in 3m58s Details
Test / test_minsize_1 (push) Successful in 14s Details
Test / test_move_reappear (push) Successful in 20s Details
Test / test_interrupted_rebalance_ec_imm (push) Successful in 1m24s Details
Test / test_snapshot_ec (push) Successful in 31s Details
Test / test_rm (push) Successful in 15s Details
Test / test_snapshot_down (push) Successful in 30s Details
Test / test_snapshot_down_ec (push) Successful in 32s Details
Test / test_splitbrain (push) Successful in 25s Details
Test / test_snapshot_chain (push) Successful in 2m11s Details
Test / test_snapshot_chain_ec (push) Successful in 3m1s Details
Test / test_rebalance_verify_imm (push) Successful in 2m35s Details
Test / test_rebalance_verify (push) Successful in 3m10s Details
Test / test_switch_primary (push) Successful in 39s Details
Test / test_write (push) Successful in 43s Details
Test / test_write_no_same (push) Successful in 18s Details
Test / test_write_xor (push) Successful in 1m3s Details
Test / test_rebalance_verify_ec (push) Successful in 4m38s Details
Test / test_heal_pg_size_2 (push) Successful in 3m22s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 5m11s Details
Test / test_heal_ec (push) Successful in 4m23s Details
Test / test_heal_csum_32k_dmj (push) Successful in 4m55s Details
Test / test_heal_csum_32k_dj (push) Successful in 6m31s Details
Test / test_heal_csum_32k (push) Successful in 6m29s Details
Test / test_heal_csum_4k_dmj (push) Successful in 7m18s Details
Test / test_scrub_zero_osd_2 (push) Successful in 1m0s Details
Test / test_scrub (push) Failing after 3m19s Details
Test / test_heal_csum_4k_dj (push) Successful in 6m39s Details
Test / test_scrub_xor (push) Successful in 58s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 1m13s Details
Test / test_scrub_ec (push) Successful in 50s Details
Test / test_scrub_pg_size_3 (push) Successful in 1m51s Details
Test / test_heal_csum_4k (push) Successful in 5m13s Details
Test / test_nfs (push) Successful in 23s Details

antietcd
Vitaliy Filippov 2024-04-06 02:23:35 +03:00
parent f36d7eb76c
commit c5195666cd
3 changed files with 46 additions and 23 deletions

View File

@ -177,6 +177,7 @@ void blockstore_impl_t::prepare_journal_sector_write(int cur_sector, blockstore_
ring_data_t *data = ((ring_data_t*)sqe->user_data); ring_data_t *data = ((ring_data_t*)sqe->user_data);
journal.sector_info[cur_sector].written = true; journal.sector_info[cur_sector].written = true;
journal.sector_info[cur_sector].submit_id = ++journal.submit_id; journal.sector_info[cur_sector].submit_id = ++journal.submit_id;
assert(journal.submit_id != 0); // check overflow
journal.submitting_sectors.push_back(cur_sector); journal.submitting_sectors.push_back(cur_sector);
journal.sector_info[cur_sector].flush_count++; journal.sector_info[cur_sector].flush_count++;
data->iov = (struct iovec){ data->iov = (struct iovec){
@ -192,8 +193,8 @@ void blockstore_impl_t::prepare_journal_sector_write(int cur_sector, blockstore_
} }
journal.sector_info[cur_sector].dirty = false; journal.sector_info[cur_sector].dirty = false;
// But always remember that this operation has to wait until this exact journal write is finished // But always remember that this operation has to wait until this exact journal write is finished
journal.flushing_ops.insert((pending_journaling_t){ journal.flushing_ops.emplace(journal.sector_info[cur_sector].submit_id, (pending_journaling_t){
.flush_id = journal.sector_info[cur_sector].submit_id, .pending = 1,
.sector = cur_sector, .sector = cur_sector,
.op = op, .op = op,
}); });
@ -213,23 +214,43 @@ void blockstore_impl_t::handle_journal_write(ring_data_t *data, uint64_t flush_i
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die // FIXME: our state becomes corrupted after a write error. maybe do something better than just die
disk_error_abort("journal write", data->res, data->iov.iov_len); disk_error_abort("journal write", data->res, data->iov.iov_len);
} }
auto fl_it = journal.flushing_ops.upper_bound((pending_journaling_t){ .flush_id = flush_id }); auto fl_it = journal.flushing_ops.lower_bound(flush_id);
if (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id) if (fl_it != journal.flushing_ops.end() && fl_it->first == flush_id && fl_it->second.sector >= 0)
{ {
journal.sector_info[fl_it->sector].flush_count--; journal.sector_info[fl_it->second.sector].flush_count--;
} }
while (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id) auto is_first = fl_it == journal.flushing_ops.begin();
while (fl_it != journal.flushing_ops.end())
{ {
auto priv = PRIV(fl_it->op); bool del = false;
priv->pending_ops--; if (fl_it->first == flush_id)
assert(priv->pending_ops >= 0);
if (priv->pending_ops == 0)
{ {
release_journal_sectors(fl_it->op); fl_it->second.pending = 0;
priv->op_state++; del = is_first;
ringloop->wakeup(); }
else
{
del = !fl_it->second.pending;
}
if (del)
{
// Do not complete this operation if previous writes are unfinished
// Otherwise also complete following operations waiting for this one
auto priv = PRIV(fl_it->second.op);
priv->pending_ops--;
assert(priv->pending_ops >= 0);
if (priv->pending_ops == 0)
{
release_journal_sectors(fl_it->second.op);
priv->op_state++;
ringloop->wakeup();
}
journal.flushing_ops.erase(fl_it++);
}
else
{
fl_it++;
} }
journal.flushing_ops.erase(fl_it++);
} }
} }

View File

@ -156,16 +156,11 @@ struct journal_sector_info_t
struct pending_journaling_t struct pending_journaling_t
{ {
uint64_t flush_id; int pending;
int sector; int sector;
blockstore_op_t *op; blockstore_op_t *op;
}; };
inline bool operator < (const pending_journaling_t & a, const pending_journaling_t & b)
{
return a.flush_id < b.flush_id || a.flush_id == b.flush_id && a.op < b.op;
}
struct journal_t struct journal_t
{ {
int fd; int fd;
@ -191,7 +186,7 @@ struct journal_t
int cur_sector = 0; int cur_sector = 0;
int in_sector_pos = 0; int in_sector_pos = 0;
std::vector<int> submitting_sectors; std::vector<int> submitting_sectors;
std::set<pending_journaling_t> flushing_ops; std::multimap<uint64_t, pending_journaling_t> flushing_ops;
uint64_t submit_id = 0; uint64_t submit_id = 0;
// Used sector map // Used sector map

View File

@ -427,7 +427,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
); );
write_iodepth++; write_iodepth++;
// Got SQEs. Prepare previous journal sector write if required // Got SQEs. Prepare previous journal sector write if required
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
if (immediate_commit == IMMEDIATE_NONE && if (immediate_commit == IMMEDIATE_NONE &&
!journal.entry_fits(sizeof(journal_entry_small_write) + dyn_size)) !journal.entry_fits(sizeof(journal_entry_small_write) + dyn_size))
{ {
@ -503,7 +502,15 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
} }
BS_SUBMIT_GET_SQE(sqe2, data2); BS_SUBMIT_GET_SQE(sqe2, data2);
data2->iov = (struct iovec){ op->buf, op->len }; data2->iov = (struct iovec){ op->buf, op->len };
data2->callback = cb; ++journal.submit_id;
assert(journal.submit_id != 0); // check overflow
// Make subsequent journal writes wait for our data write
journal.flushing_ops.emplace(journal.submit_id, (pending_journaling_t){
.pending = 1,
.sector = -1,
.op = op,
});
data2->callback = [this, flush_id = journal.submit_id](ring_data_t *data) { handle_journal_write(data, flush_id); };
my_uring_prep_writev( my_uring_prep_writev(
sqe2, dsk.journal_fd, &data2->iov, 1, journal.offset + journal.next_free sqe2, dsk.journal_fd, &data2->iov, 1, journal.offset + journal.next_free
); );