forked from vitalif/vitastor
Fix BS_OP_LIST to account for deleted objects: only list the newest stable entry of each object
This allows list responses to be unaffected by journal flushes, which, in turn, fixes PG peering when a peer OSD is replaying journal and journal contains deletionssync-io-test
parent
571be0f380
commit
05ea97119f
|
@ -407,9 +407,31 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
|
|||
ringloop->wakeup();
|
||||
}
|
||||
|
||||
static bool replace_stable(object_id oid, uint64_t version, int search_start, int search_end, obj_ver_id* list)
|
||||
{
|
||||
while (search_start < search_end)
|
||||
{
|
||||
int pos = search_start+(search_end-search_start)/2;
|
||||
if (oid < list[pos].oid)
|
||||
{
|
||||
search_end = pos;
|
||||
}
|
||||
else if (list[pos].oid < oid)
|
||||
{
|
||||
search_start = pos+1;
|
||||
}
|
||||
else
|
||||
{
|
||||
list[pos].version = version;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void blockstore_impl_t::process_list(blockstore_op_t *op)
|
||||
{
|
||||
// Count objects
|
||||
// Check PG
|
||||
uint32_t list_pg = op->offset;
|
||||
uint32_t pg_count = op->len;
|
||||
uint64_t pg_stripe_size = op->oid.stripe;
|
||||
|
@ -419,70 +441,131 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
|||
FINISH_OP(op);
|
||||
return;
|
||||
}
|
||||
uint64_t stable_count = 0;
|
||||
if (pg_count > 0)
|
||||
{
|
||||
for (auto it = clean_db.begin(); it != clean_db.end(); it++)
|
||||
{
|
||||
uint32_t pg = (it->first.inode + it->first.stripe / pg_stripe_size) % pg_count;
|
||||
if (pg == list_pg)
|
||||
{
|
||||
stable_count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
stable_count = clean_db.size();
|
||||
}
|
||||
uint64_t total_count = stable_count;
|
||||
for (auto it = dirty_db.begin(); it != dirty_db.end(); it++)
|
||||
{
|
||||
if (!pg_count || ((it->first.oid.inode + it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg)
|
||||
{
|
||||
if (IS_STABLE(it->second.state))
|
||||
{
|
||||
stable_count++;
|
||||
}
|
||||
total_count++;
|
||||
}
|
||||
}
|
||||
// Allocate memory
|
||||
op->version = stable_count;
|
||||
op->retval = total_count;
|
||||
op->buf = malloc(sizeof(obj_ver_id) * total_count);
|
||||
if (!op->buf)
|
||||
// Copy clean_db entries (sorted)
|
||||
int stable_count = 0, stable_alloc = clean_db.size() / (pg_count ? pg_count : 1);
|
||||
obj_ver_id *stable = (obj_ver_id*)malloc(sizeof(obj_ver_id) * stable_alloc);
|
||||
if (!stable)
|
||||
{
|
||||
op->retval = -ENOMEM;
|
||||
FINISH_OP(op);
|
||||
return;
|
||||
}
|
||||
obj_ver_id *vers = (obj_ver_id*)op->buf;
|
||||
int i = 0;
|
||||
for (auto it = clean_db.begin(); it != clean_db.end(); it++)
|
||||
{
|
||||
if (!pg_count || ((it->first.inode + it->first.stripe / pg_stripe_size) % pg_count) == list_pg)
|
||||
{
|
||||
vers[i++] = {
|
||||
if (stable_count >= stable_alloc)
|
||||
{
|
||||
stable_alloc += 32768;
|
||||
stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
|
||||
if (!stable)
|
||||
{
|
||||
op->retval = -ENOMEM;
|
||||
FINISH_OP(op);
|
||||
return;
|
||||
}
|
||||
}
|
||||
stable[stable_count++] = {
|
||||
.oid = it->first,
|
||||
.version = it->second.version,
|
||||
};
|
||||
}
|
||||
}
|
||||
int j = stable_count;
|
||||
int clean_stable_count = stable_count;
|
||||
// Copy dirty_db entries (sorted, too)
|
||||
int unstable_count = 0, unstable_alloc = 0;
|
||||
obj_ver_id *unstable = NULL;
|
||||
for (auto it = dirty_db.begin(); it != dirty_db.end(); it++)
|
||||
{
|
||||
if (!pg_count || ((it->first.oid.inode + it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg)
|
||||
{
|
||||
if (IS_STABLE(it->second.state))
|
||||
if (IS_DELETE(it->second.state))
|
||||
{
|
||||
vers[i++] = it->first;
|
||||
// Deletions are always stable, so try to zero out two possible entries
|
||||
if (!replace_stable(it->first.oid, 0, 0, clean_stable_count, stable))
|
||||
{
|
||||
replace_stable(it->first.oid, 0, clean_stable_count, stable_count, stable);
|
||||
}
|
||||
}
|
||||
else if (IS_STABLE(it->second.state))
|
||||
{
|
||||
// First try to replace a clean stable version in the first part of the list
|
||||
if (!replace_stable(it->first.oid, it->first.version, 0, clean_stable_count, stable))
|
||||
{
|
||||
// Then try to replace the last dirty stable version in the second part of the list
|
||||
if (stable[stable_count-1].oid == it->first.oid)
|
||||
{
|
||||
stable[stable_count-1].version = it->first.version;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (stable_count >= stable_alloc)
|
||||
{
|
||||
stable_alloc += 32768;
|
||||
stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
|
||||
if (!stable)
|
||||
{
|
||||
if (unstable)
|
||||
free(unstable);
|
||||
op->retval = -ENOMEM;
|
||||
FINISH_OP(op);
|
||||
return;
|
||||
}
|
||||
}
|
||||
stable[stable_count++] = it->first;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
vers[j++] = it->first;
|
||||
if (unstable_count >= unstable_alloc)
|
||||
{
|
||||
unstable_alloc += 32768;
|
||||
unstable = (obj_ver_id*)realloc(unstable, sizeof(obj_ver_id) * unstable_alloc);
|
||||
if (!unstable)
|
||||
{
|
||||
if (stable)
|
||||
free(stable);
|
||||
op->retval = -ENOMEM;
|
||||
FINISH_OP(op);
|
||||
return;
|
||||
}
|
||||
}
|
||||
unstable[unstable_count++] = it->first;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Remove zeroed out stable entries
|
||||
int j = 0;
|
||||
for (int i = 0; i < stable_count; i++)
|
||||
{
|
||||
if (stable[i].version != 0)
|
||||
{
|
||||
stable[j++] = stable[i];
|
||||
}
|
||||
}
|
||||
stable_count = j;
|
||||
if (stable_count+unstable_count > stable_alloc)
|
||||
{
|
||||
stable_alloc = stable_count+unstable_count;
|
||||
stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
|
||||
if (!stable)
|
||||
{
|
||||
if (unstable)
|
||||
free(unstable);
|
||||
op->retval = -ENOMEM;
|
||||
FINISH_OP(op);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Copy unstable entries
|
||||
for (int i = 0; i < unstable_count; i++)
|
||||
{
|
||||
stable[j++] = unstable[i];
|
||||
}
|
||||
free(unstable);
|
||||
op->version = stable_count;
|
||||
op->retval = stable_count+unstable_count;
|
||||
op->buf = stable;
|
||||
FINISH_OP(op);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue