forked from vitalif/vitastor
Zero-fill new objects and write them to the main storage
parent
4afa95b0e3
commit
bb55a7fbf4
|
@ -13,6 +13,7 @@ blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config,
|
||||||
{
|
{
|
||||||
throw new std::runtime_error("Bad block size");
|
throw new std::runtime_error("Bad block size");
|
||||||
}
|
}
|
||||||
|
zero_object = (uint8_t*)memalign(DISK_ALIGNMENT, block_size);
|
||||||
data_fd = meta_fd = journal.fd = -1;
|
data_fd = meta_fd = journal.fd = -1;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -38,6 +39,7 @@ blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config,
|
||||||
|
|
||||||
blockstore::~blockstore()
|
blockstore::~blockstore()
|
||||||
{
|
{
|
||||||
|
free(zero_object);
|
||||||
ringloop->unregister_consumer(ring_consumer.number);
|
ringloop->unregister_consumer(ring_consumer.number);
|
||||||
if (data_fd >= 0)
|
if (data_fd >= 0)
|
||||||
close(data_fd);
|
close(data_fd);
|
||||||
|
|
11
blockstore.h
11
blockstore.h
|
@ -69,14 +69,17 @@
|
||||||
#define STRIPE_REPLICA(oid) ((oid) & 0xf)
|
#define STRIPE_REPLICA(oid) ((oid) & 0xf)
|
||||||
|
|
||||||
#define BS_SUBMIT_GET_SQE(sqe, data) \
|
#define BS_SUBMIT_GET_SQE(sqe, data) \
|
||||||
|
BS_SUBMIT_GET_ONLY_SQE(sqe); \
|
||||||
|
struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
|
||||||
|
|
||||||
|
#define BS_SUBMIT_GET_ONLY_SQE(sqe) \
|
||||||
struct io_uring_sqe *sqe = get_sqe();\
|
struct io_uring_sqe *sqe = get_sqe();\
|
||||||
if (!sqe)\
|
if (!sqe)\
|
||||||
{\
|
{\
|
||||||
/* Pause until there are more requests available */\
|
/* Pause until there are more requests available */\
|
||||||
op->wait_for = WAIT_SQE;\
|
op->wait_for = WAIT_SQE;\
|
||||||
return 0;\
|
return 0;\
|
||||||
}\
|
}
|
||||||
struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
|
|
||||||
|
|
||||||
#define BS_SUBMIT_GET_SQE_DECL(sqe) \
|
#define BS_SUBMIT_GET_SQE_DECL(sqe) \
|
||||||
sqe = get_sqe();\
|
sqe = get_sqe();\
|
||||||
|
@ -230,6 +233,9 @@ private:
|
||||||
// Sync, write
|
// Sync, write
|
||||||
uint64_t min_used_journal_sector, max_used_journal_sector;
|
uint64_t min_used_journal_sector, max_used_journal_sector;
|
||||||
|
|
||||||
|
// Write
|
||||||
|
struct iovec iov_zerofill[3];
|
||||||
|
|
||||||
// Sync
|
// Sync
|
||||||
std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
|
std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
|
||||||
std::list<blockstore_operation*>::iterator in_progress_ptr;
|
std::list<blockstore_operation*>::iterator in_progress_ptr;
|
||||||
|
@ -251,6 +257,7 @@ class blockstore
|
||||||
uint32_t block_order, block_size;
|
uint32_t block_order, block_size;
|
||||||
uint64_t block_count;
|
uint64_t block_count;
|
||||||
allocator *data_alloc;
|
allocator *data_alloc;
|
||||||
|
uint8_t *zero_object;
|
||||||
|
|
||||||
int meta_fd;
|
int meta_fd;
|
||||||
int data_fd;
|
int data_fd;
|
||||||
|
|
|
@ -174,7 +174,6 @@ class journal_flusher_t
|
||||||
std::vector<copy_buffer_t> v;
|
std::vector<copy_buffer_t> v;
|
||||||
std::vector<copy_buffer_t>::iterator it;
|
std::vector<copy_buffer_t>::iterator it;
|
||||||
uint64_t offset, len, submit_len, clean_loc;
|
uint64_t offset, len, submit_len, clean_loc;
|
||||||
bool allocated;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
journal_flusher_t(int flush_count);
|
journal_flusher_t(int flush_count);
|
||||||
|
@ -208,7 +207,6 @@ void journal_flusher_t::loop()
|
||||||
v.clear();
|
v.clear();
|
||||||
wait_count = 0;
|
wait_count = 0;
|
||||||
clean_loc = UINT64_MAX;
|
clean_loc = UINT64_MAX;
|
||||||
allocated = false;
|
|
||||||
skip_copy = false;
|
skip_copy = false;
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
|
@ -249,7 +247,7 @@ void journal_flusher_t::loop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// So subsequent stabilizers don't flush the entry again
|
// So subsequent stabilizers don't flush the entry again
|
||||||
dirty_it->second.state = ST_J_READ_SUBMITTED;
|
dirty_it->second.state = ST_J_MOVE_READ_SUBMITTED;
|
||||||
}
|
}
|
||||||
else if (dirty_it->second.state == ST_D_STABLE)
|
else if (dirty_it->second.state == ST_D_STABLE)
|
||||||
{
|
{
|
||||||
|
@ -272,15 +270,8 @@ void journal_flusher_t::loop()
|
||||||
auto clean_it = bs->clean_db.find(cur.oid);
|
auto clean_it = bs->clean_db.find(cur.oid);
|
||||||
if (clean_it == bs->clean_db.end())
|
if (clean_it == bs->clean_db.end())
|
||||||
{
|
{
|
||||||
// Object not present at all. We must allocate and zero it.
|
// Object not present at all. This is a bug.
|
||||||
clean_loc = allocator_find_free(bs->data_alloc);
|
throw new std::runtime_error("BUG: Object we are trying to flush not allocated on the data device");
|
||||||
if (clean_loc == UINT64_MAX)
|
|
||||||
{
|
|
||||||
throw new std::runtime_error("No space on the data device while trying to flush journal");
|
|
||||||
}
|
|
||||||
// This is an interesting part. Flushing journal results in an allocation we don't know where to put O_o.
|
|
||||||
allocator_set(bs->data_alloc, clean_loc, true);
|
|
||||||
allocated = true;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
clean_loc = clean_it->second.location;
|
clean_loc = clean_it->second.location;
|
||||||
|
|
|
@ -37,7 +37,7 @@ void blockstore::enqueue_write(blockstore_operation *op)
|
||||||
});
|
});
|
||||||
// Remember write as unsynced here, so external consumers could get
|
// Remember write as unsynced here, so external consumers could get
|
||||||
// the list of dirty objects to sync just before issuing a SYNC request
|
// the list of dirty objects to sync just before issuing a SYNC request
|
||||||
if (op->len == block_size)
|
if (op->len == block_size || op->version == 1)
|
||||||
{
|
{
|
||||||
// Remember big write as unsynced
|
// Remember big write as unsynced
|
||||||
unsynced_big_writes.push_back((obj_ver_id){
|
unsynced_big_writes.push_back((obj_ver_id){
|
||||||
|
@ -62,7 +62,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
|
||||||
.oid = op->oid,
|
.oid = op->oid,
|
||||||
.version = op->version,
|
.version = op->version,
|
||||||
});
|
});
|
||||||
if (op->len == block_size)
|
if (op->len == block_size || op->version == 1)
|
||||||
{
|
{
|
||||||
// Big (redirect) write
|
// Big (redirect) write
|
||||||
uint64_t loc = allocator_find_free(data_alloc);
|
uint64_t loc = allocator_find_free(data_alloc);
|
||||||
|
@ -77,10 +77,26 @@ int blockstore::dequeue_write(blockstore_operation *op)
|
||||||
dirty_it->second.location = loc << block_order;
|
dirty_it->second.location = loc << block_order;
|
||||||
dirty_it->second.state = ST_D_SUBMITTED;
|
dirty_it->second.state = ST_D_SUBMITTED;
|
||||||
allocator_set(data_alloc, loc, true);
|
allocator_set(data_alloc, loc, true);
|
||||||
data->iov = (struct iovec){ op->buf, op->len };
|
int vcnt = 0;
|
||||||
|
if (op->version == 1 && op->len != block_size)
|
||||||
|
{
|
||||||
|
// zero fill newly allocated object
|
||||||
|
// FIXME: it's not so good because it turns new small writes into big writes
|
||||||
|
// but it's the first and the simplest implementation
|
||||||
|
if (op->offset > 0)
|
||||||
|
op->iov_zerofill[vcnt++] = (struct iovec){ zero_object, op->offset };
|
||||||
|
op->iov_zerofill[vcnt++] = (struct iovec){ op->buf, op->len };
|
||||||
|
if (op->offset+op->len < block_size)
|
||||||
|
op->iov_zerofill[vcnt++] = (struct iovec){ zero_object, block_size - (op->offset + op->len) };
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
vcnt = 1;
|
||||||
|
op->iov_zerofill[0] = (struct iovec){ op->buf, op->len };
|
||||||
|
}
|
||||||
data->op = op;
|
data->op = op;
|
||||||
io_uring_prep_writev(
|
io_uring_prep_writev(
|
||||||
sqe, data_fd, &data->iov, 1, data_offset + (loc << block_order)
|
sqe, data_fd, op->iov_zerofill, vcnt, data_offset + (loc << block_order)
|
||||||
);
|
);
|
||||||
op->pending_ops = 1;
|
op->pending_ops = 1;
|
||||||
op->min_used_journal_sector = op->max_used_journal_sector = 0;
|
op->min_used_journal_sector = op->max_used_journal_sector = 0;
|
||||||
|
@ -100,7 +116,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
// There is sufficient space. Get SQE(s)
|
// There is sufficient space. Get SQE(s)
|
||||||
BS_SUBMIT_GET_SQE(sqe1, data1);
|
BS_SUBMIT_GET_ONLY_SQE(sqe1);
|
||||||
BS_SUBMIT_GET_SQE(sqe2, data2);
|
BS_SUBMIT_GET_SQE(sqe2, data2);
|
||||||
// Got SQEs. Prepare journal sector write
|
// Got SQEs. Prepare journal sector write
|
||||||
journal_entry_small_write *je = (journal_entry_small_write*)
|
journal_entry_small_write *je = (journal_entry_small_write*)
|
||||||
|
|
Loading…
Reference in New Issue