forked from vitalif/vitastor
Implement basic primary-sync-stabilize
parent
74673c761f
commit
a406c62a71
89
blockstore.h
89
blockstore.h
|
@ -35,17 +35,88 @@
|
|||
|
||||
#define BS_OP_PRIVATE_DATA_SIZE 256
|
||||
|
||||
/* BS_OP_LIST:
|
||||
/*
|
||||
|
||||
Blockstore opcode documentation:
|
||||
|
||||
## BS_OP_READ / BS_OP_WRITE
|
||||
|
||||
Read or write object data.
|
||||
|
||||
Input:
|
||||
- oid.stripe = parity block size
|
||||
- oid = requested object
|
||||
- version = requested version.
|
||||
For reads:
|
||||
- version == 0: read the last stable version,
|
||||
- version == UINT64_MAX: read the last version,
|
||||
- otherwise: read the newest version that is <= the specified version
|
||||
For writes:
|
||||
- if version == 0, a new version is assigned automatically
|
||||
- if version != 0, it is assigned for the new write if possible, otherwise -EINVAL is returned
|
||||
- offset, len = offset and length within object. length may be zero, in that case
|
||||
read operation only returns the version / write operation only bumps the version
|
||||
- buf = pre-allocated buffer for data (read) / with data (write). may be NULL if len == 0.
|
||||
|
||||
Output:
|
||||
- retval = number of bytes actually read/written or negative error number (-EINVAL or -ENOSPC)
|
||||
- version = the version actually read or written
|
||||
|
||||
## BS_OP_DELETE
|
||||
|
||||
Delete an object.
|
||||
|
||||
Input:
|
||||
- oid = requested object
|
||||
- version = requested version. Treated the same as with BS_OP_WRITE
|
||||
|
||||
Output:
|
||||
- retval = 0 or negative error number (-EINVAL)
|
||||
- version = the version actually written (delete is initially written as an object version)
|
||||
|
||||
## BS_OP_SYNC
|
||||
|
||||
Make sure all previously issued modifications reach physical media.
|
||||
|
||||
Input: Nothing except opcode
|
||||
Output:
|
||||
- retval = 0 or negative error number (-EINVAL)
|
||||
|
||||
## BS_OP_STABLE / BS_OP_ROLLBACK
|
||||
|
||||
Mark objects as stable / rollback previous unstable writes.
|
||||
|
||||
Input:
|
||||
- len = count of obj_ver_id's to stabilize or rollback
|
||||
- stabilize: all object versions up to the requested version of each object are marked as stable
|
||||
- rollback: all objects are rolled back to the requested stable versions
|
||||
- buf = pre-allocated obj_ver_id array <len> units long
|
||||
|
||||
Output:
|
||||
- retval = 0 or negative error number (-EINVAL)
|
||||
|
||||
## BS_OP_SYNC_STAB_ALL
|
||||
|
||||
ONLY FOR TESTS! Sync and mark all unstable object versions as stable, at once.
|
||||
|
||||
Input: Nothing except opcode
|
||||
Output:
|
||||
- retval = 0 or negative error number (-EINVAL)
|
||||
|
||||
## BS_OP_LIST
|
||||
|
||||
Get a list of all objects in this Blockstore.
|
||||
|
||||
Input:
|
||||
- oid.stripe = PG alignment
|
||||
- len = PG count or 0 to list all objects
|
||||
- offset = PG number
|
||||
|
||||
Output:
|
||||
- retval = total obj_ver_id count
|
||||
- version = stable obj_ver_id count
|
||||
- buf = obj_ver_id array allocated by the blockstore. stable versions come first
|
||||
- buf = obj_ver_id array allocated by the blockstore. Stable versions come first.
|
||||
You must free it yourself after usage with free().
|
||||
Output includes all objects for which (((inode + stripe / <PG alignment>) % <PG count>) == <PG number>).
|
||||
|
||||
*/
|
||||
|
||||
|
@ -55,21 +126,9 @@ struct blockstore_op_t
|
|||
uint64_t opcode;
|
||||
// finish callback
|
||||
std::function<void (blockstore_op_t*)> callback;
|
||||
// For reads, writes & deletes: oid is the requested object
|
||||
object_id oid;
|
||||
// For reads:
|
||||
// version == 0 -> read the last stable version,
|
||||
// version == UINT64_MAX -> read the last version,
|
||||
// otherwise -> read the newest version that is <= the specified version
|
||||
// after execution, version is equal to the version that was read from the blockstore
|
||||
// For writes & deletes:
|
||||
// if version == 0, a new version is assigned automatically
|
||||
// if version != 0, it is assigned for the new write if possible, otherwise -EINVAL is returned
|
||||
// after execution, version is equal to the version that was written to the blockstore
|
||||
uint64_t version;
|
||||
// For reads & writes: offset & len are the requested part of the object, buf is the buffer
|
||||
uint32_t offset;
|
||||
// For stabilize requests: buf contains <len> obj_ver_id's to stabilize
|
||||
uint32_t len;
|
||||
void *buf;
|
||||
int retval;
|
||||
|
|
4
osd.cpp
4
osd.cpp
|
@ -340,6 +340,10 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
|||
{
|
||||
continue_primary_write(cur_op);
|
||||
}
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_SYNC)
|
||||
{
|
||||
continue_primary_sync(cur_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
exec_secondary(cur_op);
|
||||
|
|
25
osd.h
25
osd.h
|
@ -159,6 +159,12 @@ struct osd_client_t
|
|||
|
||||
struct osd_rmw_stripe_t;
|
||||
|
||||
struct osd_object_id_t
|
||||
{
|
||||
osd_num_t osd_num;
|
||||
object_id oid;
|
||||
};
|
||||
|
||||
class osd_t
|
||||
{
|
||||
// config
|
||||
|
@ -181,7 +187,8 @@ class osd_t
|
|||
uint64_t next_subop_id = 1;
|
||||
|
||||
// Unstable writes
|
||||
spp::sparse_hash_map<osd_num_t, spp::sparse_hash_map<object_id, uint64_t>> unstable_writes;
|
||||
std::map<osd_object_id_t, uint64_t> unstable_writes;
|
||||
std::deque<osd_op_t*> syncs_in_progress;
|
||||
|
||||
// client & peer I/O
|
||||
|
||||
|
@ -240,12 +247,26 @@ class osd_t
|
|||
bool prepare_primary_rw(osd_op_t *cur_op);
|
||||
void continue_primary_read(osd_op_t *cur_op);
|
||||
void continue_primary_write(osd_op_t *cur_op);
|
||||
void exec_primary_sync(osd_op_t *cur_op);
|
||||
void continue_primary_sync(osd_op_t *cur_op);
|
||||
void finish_primary_op(osd_op_t *cur_op, int retval);
|
||||
void handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version);
|
||||
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||
void submit_primary_sync_subops(osd_op_t *cur_op);
|
||||
void submit_primary_stab_subops(osd_op_t *cur_op);
|
||||
public:
|
||||
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
||||
~osd_t();
|
||||
bool shutdown();
|
||||
};
|
||||
|
||||
inline bool operator == (const osd_object_id_t & a, const osd_object_id_t & b)
|
||||
{
|
||||
return a.osd_num == b.osd_num && a.oid.inode == b.oid.inode && a.oid.stripe == b.oid.stripe;
|
||||
}
|
||||
|
||||
inline bool operator < (const osd_object_id_t & a, const osd_object_id_t & b)
|
||||
{
|
||||
return a.osd_num < b.osd_num || a.osd_num == b.osd_num && (
|
||||
a.oid.inode < b.oid.inode || a.oid.inode == b.oid.inode && a.oid.stripe < b.oid.stripe
|
||||
);
|
||||
}
|
||||
|
|
|
@ -103,7 +103,7 @@ struct __attribute__((__packed__)) osd_op_secondary_stabilize_t
|
|||
{
|
||||
osd_op_header_t header;
|
||||
// obj_ver_id array length in bytes
|
||||
uint32_t len;
|
||||
uint64_t len;
|
||||
};
|
||||
typedef osd_op_secondary_stabilize_t osd_op_secondary_rollback_t;
|
||||
|
||||
|
|
208
osd_primary.cpp
208
osd_primary.cpp
|
@ -14,6 +14,12 @@
|
|||
//
|
||||
// sync: sync peers, get unstable versions from somewhere, stabilize them
|
||||
|
||||
struct unstable_osd_num_t
|
||||
{
|
||||
osd_num_t osd_num;
|
||||
int start, len;
|
||||
};
|
||||
|
||||
struct osd_primary_op_data_t
|
||||
{
|
||||
int st = 0;
|
||||
|
@ -25,6 +31,9 @@ struct osd_primary_op_data_t
|
|||
int degraded = 0, pg_size, pg_minsize;
|
||||
osd_rmw_stripe_t *stripes;
|
||||
osd_op_t *subops = NULL;
|
||||
// for sync. oops, requires freeing
|
||||
std::vector<unstable_osd_num_t> *unstable_write_osds = NULL;
|
||||
obj_ver_id *unstable_writes = NULL;
|
||||
};
|
||||
|
||||
void osd_t::finish_primary_op(osd_op_t *cur_op, int retval)
|
||||
|
@ -350,9 +359,12 @@ resume_5:
|
|||
{
|
||||
if (osd_set[role] != 0)
|
||||
{
|
||||
this->unstable_writes[osd_set[role]][(object_id){
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | role,
|
||||
this->unstable_writes[(osd_object_id_t){
|
||||
.osd_num = osd_set[role],
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | role,
|
||||
},
|
||||
}] = op_data->fact_ver;
|
||||
}
|
||||
}
|
||||
|
@ -374,7 +386,193 @@ resume_5:
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::exec_primary_sync(osd_op_t *cur_op)
|
||||
// Save and clear unstable_writes -> SYNC all -> STABLE all
|
||||
// FIXME: Run regular automatic syncs based on the number of unstable writes and/or system time
|
||||
void osd_t::continue_primary_sync(osd_op_t *cur_op)
|
||||
{
|
||||
|
||||
if (!cur_op->op_data)
|
||||
{
|
||||
cur_op->op_data = (osd_primary_op_data_t*)calloc(sizeof(osd_primary_op_data_t), 1);
|
||||
}
|
||||
if (cur_op->op_data->st == 1) goto resume_1;
|
||||
else if (cur_op->op_data->st == 2) goto resume_2;
|
||||
else if (cur_op->op_data->st == 3) goto resume_3;
|
||||
else if (cur_op->op_data->st == 4) goto resume_4;
|
||||
else if (cur_op->op_data->st == 5) goto resume_5;
|
||||
else if (cur_op->op_data->st == 6) goto resume_6;
|
||||
if (syncs_in_progress.size() > 0)
|
||||
{
|
||||
// Wait for previous syncs, if any
|
||||
// FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all
|
||||
syncs_in_progress.push_back(cur_op);
|
||||
cur_op->op_data->st = 1;
|
||||
resume_1:
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
syncs_in_progress.push_back(cur_op);
|
||||
}
|
||||
resume_2:
|
||||
// FIXME: Handle operation cancel
|
||||
if (unstable_writes.size() == 0)
|
||||
{
|
||||
// Nothing to sync
|
||||
goto finish;
|
||||
}
|
||||
// Save and clear unstable_writes
|
||||
// FIXME: This is possible to do it on a per-client basis
|
||||
// It would be cool not to copy them here at all, but someone has to deduplicate them by object IDs anyway
|
||||
cur_op->op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
|
||||
cur_op->op_data->unstable_writes = new obj_ver_id[unstable_writes.size()];
|
||||
{
|
||||
osd_num_t last_osd = 0;
|
||||
int last_start = 0, last_end = 0;
|
||||
for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++)
|
||||
{
|
||||
if (last_osd != it->first.osd_num)
|
||||
{
|
||||
if (last_osd != 0)
|
||||
{
|
||||
cur_op->op_data->unstable_write_osds->push_back((unstable_osd_num_t){
|
||||
.osd_num = last_osd,
|
||||
.start = last_start,
|
||||
.len = last_end - last_start,
|
||||
});
|
||||
}
|
||||
last_osd = it->first.osd_num;
|
||||
last_start = last_end;
|
||||
}
|
||||
cur_op->op_data->unstable_writes[last_end] = (obj_ver_id){
|
||||
.oid = it->first.oid,
|
||||
.version = it->second,
|
||||
};
|
||||
last_start++;
|
||||
last_end++;
|
||||
}
|
||||
if (last_osd != 0)
|
||||
{
|
||||
cur_op->op_data->unstable_write_osds->push_back((unstable_osd_num_t){
|
||||
.osd_num = last_osd,
|
||||
.start = last_start,
|
||||
.len = last_end - last_start,
|
||||
});
|
||||
}
|
||||
}
|
||||
unstable_writes.clear();
|
||||
// SYNC
|
||||
submit_primary_sync_subops(cur_op);
|
||||
resume_3:
|
||||
cur_op->op_data->st = 3;
|
||||
return;
|
||||
resume_4:
|
||||
// Stabilize version sets
|
||||
submit_primary_stab_subops(cur_op);
|
||||
resume_5:
|
||||
cur_op->op_data->st = 5;
|
||||
return;
|
||||
resume_6:
|
||||
// FIXME: Free them correctly (via a destructor or so)
|
||||
delete cur_op->op_data->unstable_write_osds;
|
||||
delete cur_op->op_data->unstable_writes;
|
||||
cur_op->op_data->unstable_writes = NULL;
|
||||
cur_op->op_data->unstable_write_osds = NULL;
|
||||
finish:
|
||||
assert(syncs_in_progress.front() == cur_op);
|
||||
syncs_in_progress.pop_front();
|
||||
finish_primary_op(cur_op, 0);
|
||||
if (syncs_in_progress.size() > 0)
|
||||
{
|
||||
osd_op_t *next_op = syncs_in_progress.front();
|
||||
next_op->op_data->st++;
|
||||
continue_primary_sync(next_op);
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
int n_osds = op_data->unstable_write_osds->size();
|
||||
osd_op_t *subops = new osd_op_t[n_osds];
|
||||
op_data->done = op_data->errors = 0;
|
||||
op_data->n_subops = n_osds;
|
||||
op_data->subops = subops;
|
||||
for (int i = 0; i < n_osds; i++)
|
||||
{
|
||||
osd_num_t sync_osd = (*(op_data->unstable_write_osds))[i].osd_num;
|
||||
if (sync_osd == this->osd_num)
|
||||
{
|
||||
subops[i].bs_op = new blockstore_op_t({
|
||||
.opcode = BS_OP_SYNC,
|
||||
.callback = [cur_op, this](blockstore_op_t *subop)
|
||||
{
|
||||
handle_primary_subop(cur_op, subop->retval == 0, 0);
|
||||
},
|
||||
});
|
||||
bs->enqueue_op(subops[i].bs_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
subops[i].op_type = OSD_OP_OUT;
|
||||
subops[i].peer_fd = osd_peer_fds.at(sync_osd);
|
||||
subops[i].req.sec_sync = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = this->next_subop_id++,
|
||||
.opcode = OSD_OP_SECONDARY_SYNC,
|
||||
},
|
||||
};
|
||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||
{
|
||||
handle_primary_subop(cur_op, subop->reply.hdr.retval == 0, 0);
|
||||
};
|
||||
outbox_push(clients[subops[i].peer_fd], &subops[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
int n_osds = op_data->unstable_write_osds->size();
|
||||
osd_op_t *subops = new osd_op_t[n_osds];
|
||||
op_data->done = op_data->errors = 0;
|
||||
op_data->n_subops = n_osds;
|
||||
op_data->subops = subops;
|
||||
for (int i = 0; i < n_osds; i++)
|
||||
{
|
||||
auto & stab_osd = (*(op_data->unstable_write_osds))[i];
|
||||
if (stab_osd.osd_num == this->osd_num)
|
||||
{
|
||||
subops[i].bs_op = new blockstore_op_t({
|
||||
.opcode = BS_OP_STABLE,
|
||||
.callback = [cur_op, this](blockstore_op_t *subop)
|
||||
{
|
||||
handle_primary_subop(cur_op, subop->retval == 0, 0);
|
||||
},
|
||||
.len = (uint32_t)stab_osd.len,
|
||||
.buf = (void*)(op_data->unstable_writes + stab_osd.start),
|
||||
});
|
||||
bs->enqueue_op(subops[i].bs_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
subops[i].op_type = OSD_OP_OUT;
|
||||
subops[i].peer_fd = osd_peer_fds.at(stab_osd.osd_num);
|
||||
subops[i].req.sec_stab = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = this->next_subop_id++,
|
||||
.opcode = OSD_OP_SECONDARY_STABILIZE,
|
||||
},
|
||||
.len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)),
|
||||
};
|
||||
subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id));
|
||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||
{
|
||||
handle_primary_subop(cur_op, subop->reply.hdr.retval == 0, 0);
|
||||
};
|
||||
outbox_push(clients[subops[i].peer_fd], &subops[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue