Implement object list operation in blockstore

blocking-uring-test
Vitaliy Filippov 2019-12-19 20:50:20 +03:00
parent e8f7905e08
commit e88ad3f2ff
4 changed files with 101 additions and 2 deletions

View File

@ -19,15 +19,31 @@
#define MAX_BLOCK_SIZE 128*1024*1024
#define DISK_ALIGNMENT 512
#define BS_OP_MIN 1
#define BS_OP_READ 1
#define BS_OP_WRITE 2
#define BS_OP_SYNC 3
#define BS_OP_STABLE 4
#define BS_OP_DELETE 5
#define BS_OP_LIST 6
#define BS_OP_MAX 6
#define BS_OP_TYPE_MASK 0x7
#define BS_OP_PRIVATE_DATA_SIZE 256
/* BS_OP_LIST:
Input:
- len = divisor
- offset = modulo. object is listed if (object_id % len) == offset.
Output:
- retval = total obj_ver_id count
- version = stable obj_ver_id count
- buf = obj_ver_id array allocated by the blockstore. stable versions come first
*/
struct blockstore_op_t
{
// operation

View File

@ -280,7 +280,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
{
int type = op->opcode & BS_OP_TYPE_MASK;
if (type < BS_OP_READ || type > BS_OP_DELETE || (type == BS_OP_READ || type == BS_OP_WRITE) &&
if (type < BS_OP_MIN || type > BS_OP_MAX || (type == BS_OP_READ || type == BS_OP_WRITE) &&
(op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)) ||
readonly && type != BS_OP_READ)
{
@ -289,15 +289,90 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
op->callback(op);
return;
}
else if (type == BS_OP_LIST)
{
// List operation is processed synchronously
process_list(op);
op->callback(op);
return;
}
// Call constructor without allocating memory. We'll call destructor before returning op back
new ((void*)op->private_data) blockstore_op_private_t;
PRIV(op)->wait_for = 0;
PRIV(op)->sync_state = 0;
PRIV(op)->pending_ops = 0;
submit_queue.push_back(op);
if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE)
if (type == BS_OP_WRITE)
{
enqueue_write(op);
}
ringloop->wakeup();
}
void blockstore_impl_t::process_list(blockstore_op_t *op)
{
// Count objects
uint64_t stable_count = 0;
if (op->len)
{
for (auto it = clean_db.begin(); it != clean_db.end(); it++)
{
if ((it->first % op->len) == op->offset)
{
stable_count++;
}
}
}
else
{
stable_count = clean_db.size();
}
uint64_t total_count = stable_count;
for (auto it = dirty_db.begin(); it != dirty_db.end(); it++)
{
if (!op->len || (it->first.oid % op->len) == op->offset)
{
if (IS_STABLE(it->second.state))
{
stable_count++;
}
total_count++;
}
}
// Allocate memory
op->version = stable_count;
op->retval = total_count;
op->buf = memalign(512, sizeof(obj_ver_id) * total_count);
if (!op->buf)
{
op->retval = -ENOMEM;
return;
}
obj_ver_id *vers = (obj_ver_id*)op->buf;
int i = 0;
for (auto it = clean_db.begin(); it != clean_db.end(); it++)
{
if (!op->len || (it->first % op->len) == op->offset)
{
vers[i++] = {
.oid = it->first,
.version = it->second.version,
};
}
}
int j = stable_count;
for (auto it = dirty_db.begin(); it != dirty_db.end(); it++)
{
if (!op->len || (it->first.oid % op->len) == op->offset)
{
if (IS_STABLE(it->second.state))
{
vers[i++] = it->first;
}
else
{
vers[j++] = it->first;
}
}
}
}

View File

@ -251,6 +251,9 @@ class blockstore_impl_t
void handle_stable_event(ring_data_t *data, blockstore_op_t *op);
void stabilize_object(object_id oid, uint64_t max_ver);
// List
void process_list(blockstore_op_t *op);
public:
blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop);

View File

@ -10,6 +10,11 @@ struct __attribute__((__packed__)) object_id
uint64_t stripe;
};
inline uint64_t operator % (const object_id & a, const uint64_t b)
{
return ((a.inode % b) * (0x100000000 % b) * (0x100000000 % b) + a.stripe % b) % b;
}
inline bool operator == (const object_id & a, const object_id & b)
{
return a.inode == b.inode && a.stripe == b.stripe;