forked from vitalif/vitastor
Debug OP_STABLE so the basic case passes without problem
parent
299b7288d5
commit
2b12428cb1
|
@ -229,14 +229,16 @@ void blockstore::check_wait(blockstore_operation *op)
|
|||
}
|
||||
}
|
||||
|
||||
int blockstore::enqueue_op(blockstore_operation *op)
|
||||
void blockstore::enqueue_op(blockstore_operation *op)
|
||||
{
|
||||
if (op->offset >= block_size || op->len >= block_size-op->offset ||
|
||||
(op->len % DISK_ALIGNMENT) ||
|
||||
(op->flags & OP_TYPE_MASK) < OP_READ || (op->flags & OP_TYPE_MASK) > OP_DELETE)
|
||||
int type = op->flags & OP_TYPE_MASK;
|
||||
if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) &&
|
||||
(op->offset >= block_size || op->len >= block_size-op->offset || (op->len % DISK_ALIGNMENT)))
|
||||
{
|
||||
// Basic verification not passed
|
||||
return -EINVAL;
|
||||
op->retval = -EINVAL;
|
||||
op->callback(op);
|
||||
return;
|
||||
}
|
||||
op->wait_for = 0;
|
||||
op->sync_state = 0;
|
||||
|
@ -247,5 +249,4 @@ int blockstore::enqueue_op(blockstore_operation *op)
|
|||
enqueue_write(op);
|
||||
}
|
||||
ringloop->wakeup(ring_consumer);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -333,5 +333,5 @@ public:
|
|||
bool stop();
|
||||
|
||||
// Submission
|
||||
int enqueue_op(blockstore_operation *op);
|
||||
void enqueue_op(blockstore_operation *op);
|
||||
};
|
||||
|
|
|
@ -151,7 +151,7 @@ resume_0:
|
|||
wait_count = 0;
|
||||
clean_loc = UINT64_MAX;
|
||||
skip_copy = false;
|
||||
do
|
||||
while (1)
|
||||
{
|
||||
if (dirty_it->second.state == ST_J_STABLE && !skip_copy)
|
||||
{
|
||||
|
@ -166,9 +166,9 @@ resume_0:
|
|||
break;
|
||||
if (it == v.end() || it->offset > offset)
|
||||
{
|
||||
submit_len = it->offset >= offset+len ? len : it->offset-offset;
|
||||
submit_len = it == v.end() || it->offset >= offset+len ? len : it->offset-offset;
|
||||
await_sqe(1);
|
||||
v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
|
||||
it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
|
||||
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
|
||||
data->callback = simple_callback;
|
||||
my_uring_prep_readv(
|
||||
|
@ -185,6 +185,7 @@ resume_0:
|
|||
else if (dirty_it->second.state == ST_D_STABLE)
|
||||
{
|
||||
// There is an unflushed big write. Copy small writes in its position
|
||||
printf("found ");
|
||||
if (!skip_copy)
|
||||
{
|
||||
clean_loc = dirty_it->second.location;
|
||||
|
@ -195,8 +196,16 @@ resume_0:
|
|||
{
|
||||
throw std::runtime_error("BUG: Unexpected dirty_entry state during flush: " + std::to_string(dirty_it->second.state));
|
||||
}
|
||||
if (dirty_it == bs->dirty_db.begin())
|
||||
{
|
||||
break;
|
||||
}
|
||||
dirty_it--;
|
||||
} while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid);
|
||||
if (dirty_it->first.oid != cur.oid)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (wait_count == 0 && clean_loc == UINT64_MAX)
|
||||
{
|
||||
// Nothing to flush
|
||||
|
@ -219,11 +228,13 @@ resume_0:
|
|||
if (clean_it == bs->clean_db.end())
|
||||
{
|
||||
// Object not present at all. This is a bug.
|
||||
throw std::runtime_error("BUG: Object we are trying to flush not allocated on the data device");
|
||||
throw std::runtime_error("BUG: Object we are trying to flush is not allocated on the data device");
|
||||
}
|
||||
else
|
||||
clean_loc = clean_it->second.location;
|
||||
}
|
||||
else
|
||||
clean_it = bs->clean_db.end();
|
||||
// Also we need to submit the metadata read. We do a read-modify-write for every operation.
|
||||
// But we must check if the same sector is already in memory.
|
||||
// Another option is to keep all raw metadata in memory all the time. Maybe I'll do it sometime...
|
||||
|
|
|
@ -334,6 +334,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
.oid = je->small_write.oid,
|
||||
.version = je->small_write.version,
|
||||
};
|
||||
printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", ov.oid.inode, ov.oid.stripe, ov.version, je->small_write.offset, je->small_write.len);
|
||||
bs->dirty_db.emplace(ov, (dirty_entry){
|
||||
.state = ST_J_SYNCED,
|
||||
.flags = 0,
|
||||
|
@ -351,6 +352,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
.oid = je->big_write.oid,
|
||||
.version = je->big_write.version,
|
||||
};
|
||||
printf("je_big_write oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
|
||||
bs->dirty_db.emplace(ov, (dirty_entry){
|
||||
.state = ST_D_META_SYNCED,
|
||||
.flags = 0,
|
||||
|
@ -377,9 +379,18 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
}
|
||||
else
|
||||
{
|
||||
it->second.state = (it->second.state == ST_D_META_SYNCED
|
||||
? ST_D_STABLE
|
||||
: (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE));
|
||||
printf("je_stable oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
|
||||
while (1)
|
||||
{
|
||||
it->second.state = (it->second.state == ST_D_META_SYNCED
|
||||
? ST_D_STABLE
|
||||
: (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE));
|
||||
if (it == bs->dirty_db.begin())
|
||||
break;
|
||||
it--;
|
||||
if (it->first.oid != ov.oid || IS_STABLE(it->second.state))
|
||||
break;
|
||||
}
|
||||
bs->flusher->queue_flush(ov);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ int blockstore::dequeue_stable(blockstore_operation *op)
|
|||
if (clean_it == clean_db.end() || clean_it->second.version < v->version)
|
||||
{
|
||||
// No such object version
|
||||
op->retval = EINVAL;
|
||||
op->retval = -EINVAL;
|
||||
op->callback(op);
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ int main(int narg, char *args[])
|
|||
|
||||
blockstore_operation op;
|
||||
int main_state = 0;
|
||||
uint64_t version = 0;
|
||||
ring_consumer_t main_cons;
|
||||
op.callback = [&](blockstore_operation *op)
|
||||
{
|
||||
|
@ -93,6 +94,8 @@ int main(int narg, char *args[])
|
|||
main_state = 2;
|
||||
else if (main_state == 3)
|
||||
main_state = 4;
|
||||
else if (main_state == 5)
|
||||
main_state = 6;
|
||||
};
|
||||
main_cons.loop = [&]()
|
||||
{
|
||||
|
@ -114,11 +117,24 @@ int main(int narg, char *args[])
|
|||
}
|
||||
else if (main_state == 2)
|
||||
{
|
||||
printf("syncing\n");
|
||||
printf("version %u written, syncing\n", op.version);
|
||||
version = op.version;
|
||||
op.flags = OP_SYNC;
|
||||
bs->enqueue_op(&op);
|
||||
main_state = 3;
|
||||
}
|
||||
else if (main_state == 4)
|
||||
{
|
||||
printf("stabilizing version %u\n", version);
|
||||
op.flags = OP_STABLE;
|
||||
op.len = 1;
|
||||
*((obj_ver_id*)op.buf) = {
|
||||
.oid = { .inode = 1, .stripe = 0 },
|
||||
.version = version,
|
||||
};
|
||||
bs->enqueue_op(&op);
|
||||
main_state = 5;
|
||||
}
|
||||
};
|
||||
|
||||
ringloop->register_consumer(main_cons);
|
||||
|
|
Loading…
Reference in New Issue