Implement journal write batching and slightly refactor journal writes

Slightly reduces WA. For example, in 4K T1Q128 replicated randwrite tests
WA is reduced from ~3.6 to ~3.1, in T1Q64 from ~3.8 to ~3.4.

Only effective without no_same_sector_overwrites.
sec_osd_msgr
Vitaliy Filippov 2021-12-15 02:43:12 +03:00
parent 999bed8514
commit f93491bc6c
8 changed files with 163 additions and 219 deletions

View File

@ -235,6 +235,12 @@ void blockstore_impl_t::loop()
{
throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
}
for (auto s: journal.submitting_sectors)
{
// Mark journal sector writes as submitted
journal.sector_info[s].submit_id = 0;
}
journal.submitting_sectors.clear();
if ((initial_ring_space - ringloop->space_left()) > 0)
{
live = true;

View File

@ -54,6 +54,14 @@
#define IS_BIG_WRITE(st) (((st) & 0x0F) == BS_ST_BIG_WRITE)
#define IS_DELETE(st) (((st) & 0x0F) == BS_ST_DELETE)
#define BS_SUBMIT_CHECK_SQES(n) \
if (ringloop->space_left() < (n))\
{\
/* Pause until there are more requests available */\
PRIV(op)->wait_for = WAIT_SQE;\
return 0;\
}
#define BS_SUBMIT_GET_SQE(sqe, data) \
BS_SUBMIT_GET_ONLY_SQE(sqe); \
struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
@ -170,7 +178,7 @@ struct blockstore_op_private_t
std::vector<fulfill_read_t> read_vec;
// Sync, write
uint64_t min_flushed_journal_sector, max_flushed_journal_sector;
int min_flushed_journal_sector, max_flushed_journal_sector;
// Write
struct iovec iov_zerofill[3];
@ -283,6 +291,10 @@ class blockstore_impl_t
void open_journal();
uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset);
// Journaling
void prepare_journal_sector_write(int sector, blockstore_op_t *op);
void handle_journal_write(ring_data_t *data, uint64_t flush_id);
// Asynchronous init
int initialized;
int metadata_buf_size;
@ -310,21 +322,18 @@ class blockstore_impl_t
// Sync
int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync);
void handle_sync_event(ring_data_t *data, blockstore_op_t *op);
void ack_sync(blockstore_op_t *op);
// Stabilize
int dequeue_stable(blockstore_op_t *op);
int continue_stable(blockstore_op_t *op);
void mark_stable(const obj_ver_id & ov, bool forget_dirty = false);
void handle_stable_event(ring_data_t *data, blockstore_op_t *op);
void stabilize_object(object_id oid, uint64_t max_ver);
// Rollback
int dequeue_rollback(blockstore_op_t *op);
int continue_rollback(blockstore_op_t *op);
void mark_rolled_back(const obj_ver_id & ov);
void handle_rollback_event(ring_data_t *data, blockstore_op_t *op);
void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc);
// List

View File

@ -153,23 +153,74 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
return je;
}
void prepare_journal_sector_write(journal_t & journal, int cur_sector, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb)
void blockstore_impl_t::prepare_journal_sector_write(int cur_sector, blockstore_op_t *op)
{
journal.sector_info[cur_sector].dirty = false;
journal.sector_info[cur_sector].written = true;
journal.sector_info[cur_sector].flush_count++;
// Don't submit the same sector twice in the same batch
if (!journal.sector_info[cur_sector].submit_id)
{
io_uring_sqe *sqe = get_sqe();
// Caller must ensure availability of an SQE
assert(sqe != NULL);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
journal.sector_info[cur_sector].written = true;
journal.sector_info[cur_sector].submit_id = ++journal.submit_id;
journal.submitting_sectors.push_back(cur_sector);
journal.sector_info[cur_sector].flush_count++;
data->iov = (struct iovec){
(journal.inmemory
? journal.buffer + journal.sector_info[cur_sector].offset
: journal.sector_buf + journal.block_size*cur_sector),
journal.block_size
};
data->callback = cb;
data->callback = [this, flush_id = journal.submit_id](ring_data_t *data) { handle_journal_write(data, flush_id); };
my_uring_prep_writev(
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset
);
}
journal.sector_info[cur_sector].dirty = false;
// But always remember that this operation has to wait until this exact journal write is finished
journal.flushing_ops.insert((pending_journaling_t){
.flush_id = journal.sector_info[cur_sector].submit_id,
.sector = cur_sector,
.op = op,
});
auto priv = PRIV(op);
priv->pending_ops++;
if (!priv->min_flushed_journal_sector)
priv->min_flushed_journal_sector = 1+cur_sector;
priv->max_flushed_journal_sector = 1+cur_sector;
}
void blockstore_impl_t::handle_journal_write(ring_data_t *data, uint64_t flush_id)
{
live = true;
if (data->res != data->iov.iov_len)
{
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
throw std::runtime_error(
"journal write failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
auto fl_it = journal.flushing_ops.upper_bound((pending_journaling_t){ .flush_id = flush_id });
if (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id)
{
journal.sector_info[fl_it->sector].flush_count--;
}
while (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id)
{
auto priv = PRIV(fl_it->op);
priv->pending_ops--;
assert(priv->pending_ops >= 0);
if (priv->pending_ops == 0)
{
release_journal_sectors(fl_it->op);
priv->op_state++;
ringloop->wakeup();
}
journal.flushing_ops.erase(fl_it++);
}
}
journal_t::~journal_t()
{

View File

@ -4,6 +4,7 @@
#pragma once
#include "crc32c.h"
#include <set>
#define MIN_JOURNAL_SIZE 4*1024*1024
#define JOURNAL_MAGIC 0x4A33
@ -145,8 +146,21 @@ struct journal_sector_info_t
uint64_t flush_count;
bool written;
bool dirty;
uint64_t submit_id;
};
struct pending_journaling_t
{
uint64_t flush_id;
int sector;
blockstore_op_t *op;
};
inline bool operator < (const pending_journaling_t & a, const pending_journaling_t & b)
{
return a.flush_id < b.flush_id || a.flush_id == b.flush_id && a.op < b.op;
}
struct journal_t
{
int fd;
@ -172,6 +186,9 @@ struct journal_t
bool no_same_sector_overwrites = false;
int cur_sector = 0;
int in_sector_pos = 0;
std::vector<int> submitting_sectors;
std::set<pending_journaling_t> flushing_ops;
uint64_t submit_id = 0;
// Used sector map
// May use ~ 80 MB per 1 GB of used journal space in the worst case
@ -200,5 +217,3 @@ struct blockstore_journal_check_t
};
journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size);
void prepare_journal_sector_write(journal_t & journal, int sector, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb);

View File

@ -74,24 +74,17 @@ skip_ov:
{
return 0;
}
// There is sufficient space. Get SQEs
struct io_uring_sqe *sqe[space_check.sectors_to_write];
for (i = 0; i < space_check.sectors_to_write; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
}
// There is sufficient space. Check SQEs
BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
// Prepare and submit journal entries
auto cb = [this, op](ring_data_t *data) { handle_rollback_event(data, op); };
int s = 0, cur_sector = -1;
int s = 0;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
if (!journal.entry_fits(sizeof(journal_entry_rollback)) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal.cur_sector, op);
s++;
}
journal_entry_rollback *je = (journal_entry_rollback*)
prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback));
@ -100,12 +93,9 @@ skip_ov:
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
}
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
prepare_journal_sector_write(journal.cur_sector, op);
s++;
assert(s == space_check.sectors_to_write);
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = 1;
return 1;
}
@ -114,30 +104,23 @@ int blockstore_impl_t::continue_rollback(blockstore_op_t *op)
{
if (PRIV(op)->op_state == 2)
goto resume_2;
else if (PRIV(op)->op_state == 3)
goto resume_3;
else if (PRIV(op)->op_state == 5)
goto resume_5;
else if (PRIV(op)->op_state == 4)
goto resume_4;
else
return 1;
resume_2:
// Release used journal sectors
release_journal_sectors(op);
resume_3:
if (!disable_journal_fsync)
{
io_uring_sqe *sqe;
BS_SUBMIT_GET_SQE_DECL(sqe);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_rollback_event(data, op); };
data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 4;
PRIV(op)->op_state = 3;
return 1;
}
resume_5:
resume_4:
obj_ver_id* v;
int i;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
@ -196,24 +179,6 @@ void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov)
}
}
void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
if (data->res != data->iov.iov_len)
{
throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
PRIV(op)->op_state++;
ringloop->wakeup();
}
}
void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc)
{
if (dirty_end == dirty_start)

View File

@ -97,25 +97,18 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
{
return 0;
}
// There is sufficient space. Get SQEs
struct io_uring_sqe *sqe[space_check.sectors_to_write];
for (i = 0; i < space_check.sectors_to_write; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
}
// There is sufficient space. Check SQEs
BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
// Prepare and submit journal entries
auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
int s = 0, cur_sector = -1;
int s = 0;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
// FIXME: Only stabilize versions that aren't stable yet
if (!journal.entry_fits(sizeof(journal_entry_stable)) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal.cur_sector, op);
s++;
}
journal_entry_stable *je = (journal_entry_stable*)
prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable));
@ -124,12 +117,9 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
}
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
prepare_journal_sector_write(journal.cur_sector, op);
s++;
assert(s == space_check.sectors_to_write);
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = 1;
return 1;
}
@ -138,30 +128,23 @@ int blockstore_impl_t::continue_stable(blockstore_op_t *op)
{
if (PRIV(op)->op_state == 2)
goto resume_2;
else if (PRIV(op)->op_state == 3)
goto resume_3;
else if (PRIV(op)->op_state == 5)
goto resume_5;
else if (PRIV(op)->op_state == 4)
goto resume_4;
else
return 1;
resume_2:
// Release used journal sectors
release_journal_sectors(op);
resume_3:
if (!disable_journal_fsync)
{
io_uring_sqe *sqe;
BS_SUBMIT_GET_SQE_DECL(sqe);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 4;
PRIV(op)->op_state = 3;
return 1;
}
resume_5:
resume_4:
// Mark dirty_db entries as stable, acknowledge op completion
obj_ver_id* v;
int i;
@ -257,21 +240,3 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
unstable_writes.erase(unstab_it);
}
}
void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
if (data->res != data->iov.iov_len)
{
throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
PRIV(op)->op_state++;
ringloop->wakeup();
}
}

View File

@ -44,10 +44,8 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
if (journal.sector_info[journal.cur_sector].dirty)
{
// Write out the last journal sector if it happens to be dirty
BS_SUBMIT_GET_ONLY_SQE(sqe);
prepare_journal_sector_write(journal, journal.cur_sector, sqe, [this, op](ring_data_t *data) { handle_sync_event(data, op); });
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
BS_SUBMIT_CHECK_SQES(1);
prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
return 1;
}
@ -64,7 +62,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = SYNC_DATA_SYNC_SENT;
@ -85,24 +83,18 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
{
return 0;
}
// Get SQEs. Don't bother about merging, submit each journal sector as a separate request
struct io_uring_sqe *sqe[space_check.sectors_to_write];
for (int i = 0; i < space_check.sectors_to_write; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
}
// Check SQEs. Don't bother about merging, submit each journal sector as a separate request
BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
// Prepare and submit journal entries
auto it = PRIV(op)->sync_big_writes.begin();
int s = 0, cur_sector = -1;
int s = 0;
while (it != PRIV(op)->sync_big_writes.end())
{
if (!journal.entry_fits(sizeof(journal_entry_big_write) + clean_entry_bitmap_size) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal.cur_sector, op);
s++;
}
auto & dirty_entry = dirty_db.at(*it);
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
@ -129,12 +121,9 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
journal.crc32_last = je->crc32;
it++;
}
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
prepare_journal_sector_write(journal.cur_sector, op);
s++;
assert(s == space_check.sectors_to_write);
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
return 1;
}
@ -145,7 +134,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT;
@ -164,42 +153,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
return 1;
}
void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
if (data->res != data->iov.iov_len)
{
throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
// Release used journal sectors
release_journal_sectors(op);
// Handle states
if (PRIV(op)->op_state == SYNC_DATA_SYNC_SENT)
{
PRIV(op)->op_state = SYNC_DATA_SYNC_DONE;
}
else if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_SENT)
{
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE;
}
else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT)
{
PRIV(op)->op_state = SYNC_DONE;
}
else
{
throw std::runtime_error("BUG: unexpected sync op state");
}
ringloop->wakeup();
}
}
void blockstore_impl_t::ack_sync(blockstore_op_t *op)
{
// Handle states

View File

@ -268,8 +268,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
cancel_all_writes(op, dirty_it, -ENOSPC);
return 2;
}
write_iodepth++;
BS_SUBMIT_GET_SQE(sqe, data);
write_iodepth++;
dirty_it->second.location = loc << block_order;
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED;
#ifdef BLOCKSTORE_DEBUG
@ -324,29 +324,21 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
{
return 0;
}
write_iodepth++;
// There is sufficient space. Get SQE(s)
struct io_uring_sqe *sqe1 = NULL;
if (immediate_commit != IMMEDIATE_NONE ||
!journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size))
{
// There is sufficient space. Check SQE(s)
BS_SUBMIT_CHECK_SQES(
// Write current journal sector only if it's dirty and full, or in the immediate_commit mode
BS_SUBMIT_GET_SQE_DECL(sqe1);
}
struct io_uring_sqe *sqe2 = NULL;
if (op->len > 0)
{
BS_SUBMIT_GET_SQE_DECL(sqe2);
}
(immediate_commit != IMMEDIATE_NONE ||
!journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size) ? 1 : 0) +
(op->len > 0 ? 1 : 0)
);
write_iodepth++;
// Got SQEs. Prepare previous journal sector write if required
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
if (immediate_commit == IMMEDIATE_NONE)
{
if (sqe1)
if (!journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size))
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
prepare_journal_sector_write(journal.cur_sector, op);
}
else
{
@ -380,9 +372,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
journal.crc32_last = je->crc32;
if (immediate_commit != IMMEDIATE_NONE)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
prepare_journal_sector_write(journal.cur_sector, op);
}
if (op->len > 0)
{
@ -392,7 +382,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
// Copy data
memcpy(journal.buffer + journal.next_free, op->buf, op->len);
}
ring_data_t *data2 = ((ring_data_t*)sqe2->user_data);
BS_SUBMIT_GET_SQE(sqe2, data2);
data2->iov = (struct iovec){ op->buf, op->len };
data2->callback = cb;
my_uring_prep_writev(
@ -441,13 +431,12 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry
{
BS_SUBMIT_CHECK_SQES(1);
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
assert(dirty_it != dirty_db.end());
io_uring_sqe *sqe = NULL;
BS_SUBMIT_GET_SQE_DECL(sqe);
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write) + clean_entry_bitmap_size
@ -469,10 +458,7 @@ resume_2:
memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size);
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
prepare_journal_sector_write(journal, journal.cur_sector, sqe,
[this, op](ring_data_t *data) { handle_write_event(data, op); });
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->op_state = 3;
return 1;
}
@ -587,6 +573,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o
);
}
PRIV(op)->pending_ops--;
assert(PRIV(op)->pending_ops >= 0);
if (PRIV(op)->pending_ops == 0)
{
release_journal_sectors(op);
@ -604,7 +591,6 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
uint64_t s = PRIV(op)->min_flushed_journal_sector;
while (1)
{
journal.sector_info[s-1].flush_count--;
if (s != (1+journal.cur_sector) && journal.sector_info[s-1].flush_count == 0)
{
// We know for sure that we won't write into this sector anymore
@ -644,23 +630,19 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
return 0;
}
write_iodepth++;
io_uring_sqe *sqe = NULL;
if (immediate_commit != IMMEDIATE_NONE ||
(journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
journal.sector_info[journal.cur_sector].dirty)
{
// Write current journal sector only if it's dirty and full, or in the immediate_commit mode
BS_SUBMIT_GET_SQE_DECL(sqe);
}
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
BS_SUBMIT_CHECK_SQES(
(immediate_commit != IMMEDIATE_NONE ||
(journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
journal.sector_info[journal.cur_sector].dirty) ? 1 : 0
);
// Prepare journal sector write
if (immediate_commit == IMMEDIATE_NONE)
{
if (sqe)
if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
journal.sector_info[journal.cur_sector].dirty)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
prepare_journal_sector_write(journal.cur_sector, op);
}
else
{
@ -687,9 +669,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
dirty_it->second.state = BS_ST_DELETE | BS_ST_SUBMITTED;
if (immediate_commit != IMMEDIATE_NONE)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
prepare_journal_sector_write(journal.cur_sector, op);
}
if (!PRIV(op)->pending_ops)
{