2019-11-03 01:34:29 +03:00
|
|
|
#include "blockstore.h"
|
2019-11-01 02:47:57 +03:00
|
|
|
|
2019-11-05 02:43:21 +03:00
|
|
|
blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config, ring_loop_t *ringloop)
|
2019-11-01 02:47:57 +03:00
|
|
|
{
|
2019-11-05 02:43:21 +03:00
|
|
|
this->ringloop = ringloop;
|
|
|
|
ring_consumer.loop = [this]() { loop(); };
|
|
|
|
ringloop->register_consumer(ring_consumer);
|
2019-11-03 01:34:29 +03:00
|
|
|
initialized = 0;
|
2019-11-16 02:12:27 +03:00
|
|
|
block_order = strtoull(config["block_size_order"].c_str(), NULL, 10);
|
|
|
|
if (block_order == 0)
|
|
|
|
{
|
|
|
|
block_order = DEFAULT_ORDER;
|
|
|
|
}
|
2019-11-03 01:34:29 +03:00
|
|
|
block_size = 1 << block_order;
|
|
|
|
if (block_size <= 1 || block_size >= MAX_BLOCK_SIZE)
|
2019-11-01 02:47:57 +03:00
|
|
|
{
|
2019-11-16 02:12:27 +03:00
|
|
|
throw std::runtime_error("Bad block size");
|
2019-10-31 13:49:46 +03:00
|
|
|
}
|
2019-11-12 19:30:28 +03:00
|
|
|
zero_object = (uint8_t*)memalign(DISK_ALIGNMENT, block_size);
|
2019-11-07 16:58:30 +03:00
|
|
|
data_fd = meta_fd = journal.fd = -1;
|
2019-11-03 01:34:29 +03:00
|
|
|
try
|
2019-10-31 13:49:46 +03:00
|
|
|
{
|
2019-11-03 01:34:29 +03:00
|
|
|
open_data(config);
|
|
|
|
open_meta(config);
|
|
|
|
open_journal(config);
|
|
|
|
calc_lengths(config);
|
2019-11-27 00:50:57 +03:00
|
|
|
data_alloc = new allocator(block_count);
|
2019-10-31 13:49:46 +03:00
|
|
|
}
|
2019-11-03 01:34:29 +03:00
|
|
|
catch (std::exception & e)
|
2019-10-31 13:49:46 +03:00
|
|
|
{
|
|
|
|
if (data_fd >= 0)
|
|
|
|
close(data_fd);
|
|
|
|
if (meta_fd >= 0 && meta_fd != data_fd)
|
|
|
|
close(meta_fd);
|
2019-11-07 16:58:30 +03:00
|
|
|
if (journal.fd >= 0 && journal.fd != meta_fd)
|
|
|
|
close(journal.fd);
|
2019-11-16 02:12:27 +03:00
|
|
|
throw;
|
2019-10-31 13:49:46 +03:00
|
|
|
}
|
2019-11-16 02:12:27 +03:00
|
|
|
int flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10);
|
2019-11-13 17:45:37 +03:00
|
|
|
if (!flusher_count)
|
|
|
|
flusher_count = 32;
|
|
|
|
flusher = new journal_flusher_t(flusher_count, this);
|
2019-11-03 01:34:29 +03:00
|
|
|
}
|
2019-10-31 13:49:46 +03:00
|
|
|
|
2019-11-03 01:34:29 +03:00
|
|
|
blockstore::~blockstore()
|
|
|
|
{
|
2019-11-27 00:50:57 +03:00
|
|
|
delete data_alloc;
|
2019-11-13 17:45:37 +03:00
|
|
|
delete flusher;
|
2019-11-12 19:30:28 +03:00
|
|
|
free(zero_object);
|
2019-11-18 14:08:11 +03:00
|
|
|
ringloop->unregister_consumer(ring_consumer);
|
2019-11-03 01:34:29 +03:00
|
|
|
if (data_fd >= 0)
|
|
|
|
close(data_fd);
|
|
|
|
if (meta_fd >= 0 && meta_fd != data_fd)
|
|
|
|
close(meta_fd);
|
2019-11-07 16:58:30 +03:00
|
|
|
if (journal.fd >= 0 && journal.fd != meta_fd)
|
|
|
|
close(journal.fd);
|
|
|
|
free(journal.sector_buf);
|
|
|
|
free(journal.sector_info);
|
2019-11-03 01:34:29 +03:00
|
|
|
}
|
|
|
|
|
2019-11-18 02:36:53 +03:00
|
|
|
bool blockstore::is_started()
|
|
|
|
{
|
|
|
|
return initialized == 10;
|
|
|
|
}
|
|
|
|
|
2019-11-05 02:43:21 +03:00
|
|
|
// main event loop - produce requests
|
|
|
|
void blockstore::loop()
|
2019-11-05 02:12:04 +03:00
|
|
|
{
|
2019-11-05 02:43:21 +03:00
|
|
|
if (initialized != 10)
|
2019-11-05 02:12:04 +03:00
|
|
|
{
|
2019-11-05 02:43:21 +03:00
|
|
|
// read metadata, then journal
|
|
|
|
if (initialized == 0)
|
|
|
|
{
|
|
|
|
metadata_init_reader = new blockstore_init_meta(this);
|
|
|
|
initialized = 1;
|
2019-11-17 22:28:48 +03:00
|
|
|
printf("reading blockstore metadata\n");
|
2019-11-05 02:43:21 +03:00
|
|
|
}
|
2019-11-17 22:28:48 +03:00
|
|
|
if (initialized == 1)
|
2019-11-05 02:12:04 +03:00
|
|
|
{
|
2019-11-05 02:43:21 +03:00
|
|
|
int res = metadata_init_reader->loop();
|
|
|
|
if (!res)
|
2019-11-05 02:12:04 +03:00
|
|
|
{
|
2019-11-05 02:43:21 +03:00
|
|
|
delete metadata_init_reader;
|
|
|
|
metadata_init_reader = NULL;
|
|
|
|
journal_init_reader = new blockstore_init_journal(this);
|
|
|
|
initialized = 2;
|
2019-11-17 22:28:48 +03:00
|
|
|
printf("reading blockstore journal\n");
|
2019-11-05 02:12:04 +03:00
|
|
|
}
|
2019-11-05 02:43:21 +03:00
|
|
|
}
|
2019-11-17 22:28:48 +03:00
|
|
|
if (initialized == 2)
|
2019-11-05 02:43:21 +03:00
|
|
|
{
|
|
|
|
int res = journal_init_reader->loop();
|
|
|
|
if (!res)
|
2019-11-05 02:12:04 +03:00
|
|
|
{
|
2019-11-05 02:43:21 +03:00
|
|
|
delete journal_init_reader;
|
|
|
|
journal_init_reader = NULL;
|
|
|
|
initialized = 10;
|
2019-11-17 22:28:48 +03:00
|
|
|
printf("journal read\n");
|
2019-11-05 02:12:04 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-11-05 02:43:21 +03:00
|
|
|
else
|
|
|
|
{
|
2019-11-06 19:27:48 +03:00
|
|
|
// try to submit ops
|
2019-11-10 12:46:58 +03:00
|
|
|
auto cur_sync = in_progress_syncs.begin();
|
|
|
|
while (cur_sync != in_progress_syncs.end())
|
|
|
|
{
|
|
|
|
continue_sync(*cur_sync++);
|
|
|
|
}
|
2019-11-08 11:36:08 +03:00
|
|
|
auto cur = submit_queue.begin();
|
2019-11-08 14:10:24 +03:00
|
|
|
bool has_writes = false;
|
2019-11-08 11:36:08 +03:00
|
|
|
while (cur != submit_queue.end())
|
2019-11-06 19:27:48 +03:00
|
|
|
{
|
2019-11-08 11:36:08 +03:00
|
|
|
auto op_ptr = cur;
|
|
|
|
auto op = *(cur++);
|
|
|
|
if (op->wait_for)
|
2019-11-08 02:16:31 +03:00
|
|
|
{
|
2019-11-10 01:40:48 +03:00
|
|
|
check_wait(op);
|
2019-11-08 11:36:08 +03:00
|
|
|
if (op->wait_for == WAIT_SQE)
|
2019-11-10 01:40:48 +03:00
|
|
|
break;
|
|
|
|
else if (op->wait_for)
|
|
|
|
continue;
|
2019-11-08 02:16:31 +03:00
|
|
|
}
|
2019-11-16 02:12:27 +03:00
|
|
|
unsigned ring_space = io_uring_sq_space_left(&ringloop->ring);
|
|
|
|
unsigned prev_sqe_pos = ringloop->ring.sq.sqe_tail;
|
2019-11-08 19:54:31 +03:00
|
|
|
int dequeue_op = 0;
|
2019-11-10 13:26:56 +03:00
|
|
|
if ((op->flags & OP_TYPE_MASK) == OP_READ)
|
2019-11-06 19:27:48 +03:00
|
|
|
{
|
2019-11-08 19:54:31 +03:00
|
|
|
dequeue_op = dequeue_read(op);
|
2019-11-06 19:27:48 +03:00
|
|
|
}
|
2019-11-08 11:36:08 +03:00
|
|
|
else if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
|
|
|
|
(op->flags & OP_TYPE_MASK) == OP_DELETE)
|
2019-11-07 02:24:12 +03:00
|
|
|
{
|
2019-11-08 14:10:24 +03:00
|
|
|
has_writes = true;
|
2019-11-08 19:54:31 +03:00
|
|
|
dequeue_op = dequeue_write(op);
|
2019-11-07 02:24:12 +03:00
|
|
|
}
|
2019-11-08 11:36:08 +03:00
|
|
|
else if ((op->flags & OP_TYPE_MASK) == OP_SYNC)
|
2019-11-07 02:24:12 +03:00
|
|
|
{
|
2019-11-08 11:36:08 +03:00
|
|
|
// wait for all small writes to be submitted
|
|
|
|
// wait for all big writes to complete, submit data device fsync
|
|
|
|
// wait for the data device fsync to complete, then submit journal writes for big writes
|
|
|
|
// then submit an fsync operation
|
2019-11-08 14:10:24 +03:00
|
|
|
if (has_writes)
|
|
|
|
{
|
|
|
|
// Can't submit SYNC before previous writes
|
|
|
|
continue;
|
|
|
|
}
|
2019-11-08 19:54:31 +03:00
|
|
|
dequeue_op = dequeue_sync(op);
|
|
|
|
}
|
|
|
|
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
|
|
|
|
{
|
2019-11-10 14:37:45 +03:00
|
|
|
dequeue_op = dequeue_stable(op);
|
2019-11-08 19:54:31 +03:00
|
|
|
}
|
|
|
|
if (dequeue_op)
|
|
|
|
{
|
|
|
|
submit_queue.erase(op_ptr);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2019-11-16 02:12:27 +03:00
|
|
|
ringloop->ring.sq.sqe_tail = prev_sqe_pos;
|
2019-11-08 19:54:31 +03:00
|
|
|
if (op->wait_for == WAIT_SQE)
|
2019-11-08 14:10:24 +03:00
|
|
|
{
|
2019-11-08 19:54:31 +03:00
|
|
|
op->wait_detail = 1 + ring_space;
|
2019-11-08 14:10:24 +03:00
|
|
|
// ring is full, stop submission
|
|
|
|
break;
|
|
|
|
}
|
2019-11-07 02:24:12 +03:00
|
|
|
}
|
2019-11-06 19:27:48 +03:00
|
|
|
}
|
2019-11-15 14:09:41 +03:00
|
|
|
flusher->loop();
|
2019-11-19 18:07:40 +03:00
|
|
|
int ret = ringloop->submit();
|
|
|
|
if (ret < 0)
|
|
|
|
{
|
|
|
|
throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
|
|
|
|
}
|
2019-11-06 19:27:48 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-25 01:29:07 +03:00
|
|
|
bool blockstore::is_safe_to_stop()
|
2019-11-11 21:22:28 +03:00
|
|
|
{
|
2019-11-25 01:29:07 +03:00
|
|
|
// It's safe to stop blockstore when there are no in-flight operations,
|
|
|
|
// no in-progress syncs and flusher isn't doing anything
|
|
|
|
if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || flusher->is_active())
|
|
|
|
{
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
if (unsynced_big_writes.size() > 0 || unsynced_small_writes.size() > 0)
|
|
|
|
{
|
|
|
|
if (!stop_sync_submitted)
|
|
|
|
{
|
|
|
|
// We should sync the blockstore before unmounting
|
|
|
|
blockstore_operation *op = new blockstore_operation;
|
|
|
|
op->flags = OP_SYNC;
|
|
|
|
op->buf = NULL;
|
|
|
|
op->callback = [&](blockstore_operation *op)
|
|
|
|
{
|
|
|
|
delete op;
|
|
|
|
};
|
|
|
|
enqueue_op(op);
|
|
|
|
stop_sync_submitted = true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
2019-11-11 21:22:28 +03:00
|
|
|
}
|
|
|
|
|
2019-11-10 01:40:48 +03:00
|
|
|
void blockstore::check_wait(blockstore_operation *op)
|
|
|
|
{
|
|
|
|
if (op->wait_for == WAIT_SQE)
|
|
|
|
{
|
2019-11-16 02:12:27 +03:00
|
|
|
if (io_uring_sq_space_left(&ringloop->ring) < op->wait_detail)
|
2019-11-10 01:40:48 +03:00
|
|
|
{
|
|
|
|
// stop submission if there's still no free space
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
op->wait_for = 0;
|
|
|
|
}
|
|
|
|
else if (op->wait_for == WAIT_IN_FLIGHT)
|
|
|
|
{
|
|
|
|
auto dirty_it = dirty_db.find((obj_ver_id){
|
|
|
|
.oid = op->oid,
|
|
|
|
.version = op->wait_detail,
|
|
|
|
});
|
|
|
|
if (dirty_it != dirty_db.end() && IS_IN_FLIGHT(dirty_it->second.state))
|
|
|
|
{
|
|
|
|
// do not submit
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
op->wait_for = 0;
|
|
|
|
}
|
|
|
|
else if (op->wait_for == WAIT_JOURNAL)
|
|
|
|
{
|
|
|
|
if (journal.used_start < op->wait_detail)
|
|
|
|
{
|
|
|
|
// do not submit
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
op->wait_for = 0;
|
|
|
|
}
|
|
|
|
else if (op->wait_for == WAIT_JOURNAL_BUFFER)
|
|
|
|
{
|
|
|
|
if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0)
|
|
|
|
{
|
|
|
|
// do not submit
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
op->wait_for = 0;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2019-11-16 02:12:27 +03:00
|
|
|
throw std::runtime_error("BUG: op->wait_for value is unexpected");
|
2019-11-10 01:40:48 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-21 02:09:12 +03:00
|
|
|
void blockstore::enqueue_op(blockstore_operation *op)
|
2019-11-06 19:27:48 +03:00
|
|
|
{
|
2019-11-21 02:09:12 +03:00
|
|
|
int type = op->flags & OP_TYPE_MASK;
|
|
|
|
if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) &&
|
2019-11-26 20:37:12 +03:00
|
|
|
(op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)))
|
2019-11-06 19:27:48 +03:00
|
|
|
{
|
2019-11-07 02:24:12 +03:00
|
|
|
// Basic verification not passed
|
2019-11-21 02:09:12 +03:00
|
|
|
op->retval = -EINVAL;
|
|
|
|
op->callback(op);
|
|
|
|
return;
|
2019-11-06 19:27:48 +03:00
|
|
|
}
|
2019-11-08 02:16:31 +03:00
|
|
|
op->wait_for = 0;
|
2019-11-10 01:40:48 +03:00
|
|
|
op->sync_state = 0;
|
2019-11-11 18:24:04 +03:00
|
|
|
op->pending_ops = 0;
|
2019-11-06 19:27:48 +03:00
|
|
|
submit_queue.push_back(op);
|
|
|
|
if ((op->flags & OP_TYPE_MASK) == OP_WRITE)
|
|
|
|
{
|
2019-11-10 13:27:59 +03:00
|
|
|
enqueue_write(op);
|
2019-11-05 02:43:21 +03:00
|
|
|
}
|
2019-11-18 14:08:11 +03:00
|
|
|
ringloop->wakeup(ring_consumer);
|
2019-11-05 02:12:04 +03:00
|
|
|
}
|