Remove duplicate journal buffer submission code
parent
1f04db7d68
commit
46e96c5128
|
@ -253,6 +253,7 @@ int blockstore::enqueue_op(blockstore_operation *op)
|
||||||
}
|
}
|
||||||
op->wait_for = 0;
|
op->wait_for = 0;
|
||||||
op->sync_state = 0;
|
op->sync_state = 0;
|
||||||
|
op->pending_ops = 0;
|
||||||
submit_queue.push_back(op);
|
submit_queue.push_back(op);
|
||||||
if ((op->flags & OP_TYPE_MASK) == OP_WRITE)
|
if ((op->flags & OP_TYPE_MASK) == OP_WRITE)
|
||||||
{
|
{
|
||||||
|
|
|
@ -212,6 +212,7 @@ struct blockstore_operation
|
||||||
// FIXME: Move internal fields somewhere
|
// FIXME: Move internal fields somewhere
|
||||||
friend class blockstore;
|
friend class blockstore;
|
||||||
friend class blockstore_journal_check_t;
|
friend class blockstore_journal_check_t;
|
||||||
|
friend void prepare_journal_sector_write(blockstore_operation *op, journal_t & journal, io_uring_sqe *sqe);
|
||||||
private:
|
private:
|
||||||
// Wait status
|
// Wait status
|
||||||
int wait_for;
|
int wait_for;
|
||||||
|
|
|
@ -46,3 +46,14 @@ int blockstore_journal_check_t::check_available(blockstore_operation *op, int re
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void prepare_journal_sector_write(blockstore_operation *op, journal_t & journal, io_uring_sqe *sqe)
|
||||||
|
{
|
||||||
|
journal.sector_info[journal.cur_sector].usage_count++;
|
||||||
|
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||||
|
data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 };
|
||||||
|
data->op = op;
|
||||||
|
io_uring_prep_writev(
|
||||||
|
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
|
@ -155,3 +155,6 @@ inline journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t
|
||||||
je->crc32_prev = journal.crc32_last;
|
je->crc32_prev = journal.crc32_last;
|
||||||
return je;
|
return je;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME: make inline
|
||||||
|
void prepare_journal_sector_write(blockstore_operation *op, journal_t & journal, io_uring_sqe *sqe);
|
||||||
|
|
|
@ -78,13 +78,12 @@ int blockstore::dequeue_stable(blockstore_operation *op)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
// There is sufficient space. Get SQEs
|
// There is sufficient space. Get SQEs
|
||||||
struct io_uring_sqe *sqe[space_check.sectors_required+1];
|
struct io_uring_sqe *sqe[space_check.sectors_required];
|
||||||
for (int i = 0; i < space_check.sectors_required+1; i++)
|
for (i = 0; i < space_check.sectors_required; i++)
|
||||||
{
|
{
|
||||||
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
|
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
|
||||||
}
|
}
|
||||||
// Prepare and submit journal entries
|
// Prepare and submit journal entries
|
||||||
op->min_used_journal_sector = 1 + journal.cur_sector;
|
|
||||||
int s = 0, cur_sector = -1;
|
int s = 0, cur_sector = -1;
|
||||||
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
|
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
|
||||||
{
|
{
|
||||||
|
@ -96,20 +95,14 @@ int blockstore::dequeue_stable(blockstore_operation *op)
|
||||||
journal.crc32_last = je->crc32;
|
journal.crc32_last = je->crc32;
|
||||||
if (cur_sector != journal.cur_sector)
|
if (cur_sector != journal.cur_sector)
|
||||||
{
|
{
|
||||||
|
if (cur_sector == -1)
|
||||||
|
op->min_used_journal_sector = 1 + journal.cur_sector;
|
||||||
cur_sector = journal.cur_sector;
|
cur_sector = journal.cur_sector;
|
||||||
// FIXME: Deduplicate this piece of code, too (something like write_journal)
|
prepare_journal_sector_write(op, journal, sqe[s++]);
|
||||||
journal.sector_info[journal.cur_sector].usage_count++;
|
|
||||||
struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data);
|
|
||||||
data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 };
|
|
||||||
data->op = op;
|
|
||||||
io_uring_prep_writev(
|
|
||||||
sqe[s], journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset
|
|
||||||
);
|
|
||||||
s++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
op->pending_ops = s;
|
|
||||||
op->max_used_journal_sector = 1 + journal.cur_sector;
|
op->max_used_journal_sector = 1 + journal.cur_sector;
|
||||||
|
op->pending_ops = s;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -71,7 +71,6 @@ int blockstore::continue_sync(blockstore_operation *op)
|
||||||
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
|
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
|
||||||
}
|
}
|
||||||
// Prepare and submit journal entries
|
// Prepare and submit journal entries
|
||||||
op->min_used_journal_sector = 1 + journal.cur_sector;
|
|
||||||
auto it = op->sync_big_writes.begin();
|
auto it = op->sync_big_writes.begin();
|
||||||
int s = 0, cur_sector = -1;
|
int s = 0, cur_sector = -1;
|
||||||
while (it != op->sync_big_writes.end())
|
while (it != op->sync_big_writes.end())
|
||||||
|
@ -86,23 +85,18 @@ int blockstore::continue_sync(blockstore_operation *op)
|
||||||
it++;
|
it++;
|
||||||
if (cur_sector != journal.cur_sector)
|
if (cur_sector != journal.cur_sector)
|
||||||
{
|
{
|
||||||
|
if (cur_sector == -1)
|
||||||
|
op->min_used_journal_sector = 1 + journal.cur_sector;
|
||||||
cur_sector = journal.cur_sector;
|
cur_sector = journal.cur_sector;
|
||||||
journal.sector_info[journal.cur_sector].usage_count++;
|
prepare_journal_sector_write(op, journal, sqe[s++]);
|
||||||
struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data);
|
|
||||||
data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 };
|
|
||||||
data->op = op;
|
|
||||||
io_uring_prep_writev(
|
|
||||||
sqe[s], journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset
|
|
||||||
);
|
|
||||||
s++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
op->max_used_journal_sector = 1 + journal.cur_sector;
|
||||||
// ... And a journal fsync
|
// ... And a journal fsync
|
||||||
io_uring_prep_fsync(sqe[s], journal.fd, 0);
|
io_uring_prep_fsync(sqe[s], journal.fd, 0);
|
||||||
struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data);
|
struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data);
|
||||||
data->op = op;
|
data->op = op;
|
||||||
op->pending_ops = 1 + s;
|
op->pending_ops = 1 + s;
|
||||||
op->max_used_journal_sector = 1 + journal.cur_sector;
|
|
||||||
op->sync_state = SYNC_JOURNAL_SYNC_SENT;
|
op->sync_state = SYNC_JOURNAL_SYNC_SENT;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
@ -111,12 +111,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
|
||||||
je->len = op->len;
|
je->len = op->len;
|
||||||
je->crc32 = je_crc32((journal_entry*)je);
|
je->crc32 = je_crc32((journal_entry*)je);
|
||||||
journal.crc32_last = je->crc32;
|
journal.crc32_last = je->crc32;
|
||||||
data1->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 };
|
prepare_journal_sector_write(op, journal, sqe1);
|
||||||
data1->op = op;
|
|
||||||
io_uring_prep_writev(
|
|
||||||
sqe1, journal.fd, &data1->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset
|
|
||||||
);
|
|
||||||
journal.sector_info[journal.cur_sector].usage_count++;
|
|
||||||
op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector;
|
op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector;
|
||||||
// Prepare journal data write
|
// Prepare journal data write
|
||||||
journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free : 512;
|
journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free : 512;
|
||||||
|
|
Loading…
Reference in New Issue