forked from vitalif/vitastor
Correct reenterability fix (now verified with a test)
It's rather funny but 0.5.12 has to be re-published againrel-0.5
parent
59fbcef734
commit
18c72f4835
|
@ -10,7 +10,7 @@
|
||||||
#define PART_ERROR 4
|
#define PART_ERROR 4
|
||||||
#define CACHE_DIRTY 1
|
#define CACHE_DIRTY 1
|
||||||
#define CACHE_FLUSHING 2
|
#define CACHE_FLUSHING 2
|
||||||
#define CACHE_REPEATING 4
|
#define CACHE_REPEATING 3
|
||||||
#define OP_FLUSH_BUFFER 2
|
#define OP_FLUSH_BUFFER 2
|
||||||
|
|
||||||
cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
|
cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
|
||||||
|
@ -36,10 +36,10 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
|
||||||
for (auto & wr: dirty_buffers)
|
for (auto & wr: dirty_buffers)
|
||||||
{
|
{
|
||||||
if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) &&
|
if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) &&
|
||||||
!(wr.second.state & CACHE_REPEATING))
|
wr.second.state != CACHE_REPEATING)
|
||||||
{
|
{
|
||||||
// FIXME: Flush in larger parts
|
// FIXME: Flush in larger parts
|
||||||
flush_buffer(wr.first, wr.second);
|
flush_buffer(wr.first, &wr.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue_ops();
|
continue_ops();
|
||||||
|
@ -103,21 +103,22 @@ void cluster_client_t::continue_ops(bool up_retry)
|
||||||
}
|
}
|
||||||
restart:
|
restart:
|
||||||
continuing_ops = 1;
|
continuing_ops = 1;
|
||||||
|
op_queue_pos = 0;
|
||||||
bool has_flushes = false, has_writes = false;
|
bool has_flushes = false, has_writes = false;
|
||||||
int j = 0;
|
while (op_queue_pos < op_queue.size())
|
||||||
for (int i = 0; i < op_queue.size(); i++)
|
|
||||||
{
|
{
|
||||||
bool rm = false, is_flush = op_queue[i]->flags & OP_FLUSH_BUFFER;
|
auto op = op_queue[op_queue_pos];
|
||||||
auto opcode = op_queue[i]->opcode;
|
bool rm = false, is_flush = op->flags & OP_FLUSH_BUFFER;
|
||||||
if (!op_queue[i]->up_wait || up_retry)
|
auto opcode = op->opcode;
|
||||||
|
if (!op->up_wait || up_retry)
|
||||||
{
|
{
|
||||||
op_queue[i]->up_wait = false;
|
op->up_wait = false;
|
||||||
if (opcode == OSD_OP_READ || opcode == OSD_OP_WRITE)
|
if (opcode == OSD_OP_READ || opcode == OSD_OP_WRITE)
|
||||||
{
|
{
|
||||||
if (is_flush || !has_flushes)
|
if (is_flush || !has_flushes)
|
||||||
{
|
{
|
||||||
// Regular writes can't proceed before buffer flushes
|
// Regular writes can't proceed before buffer flushes
|
||||||
rm = continue_rw(op_queue[i]);
|
rm = continue_rw(op);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (opcode == OSD_OP_SYNC)
|
else if (opcode == OSD_OP_SYNC)
|
||||||
|
@ -125,7 +126,7 @@ restart:
|
||||||
if (!has_writes)
|
if (!has_writes)
|
||||||
{
|
{
|
||||||
// SYNC can't proceed before previous writes
|
// SYNC can't proceed before previous writes
|
||||||
rm = continue_sync(op_queue[i]);
|
rm = continue_sync(op);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -143,20 +144,20 @@ restart:
|
||||||
// ...so dirty_writes can't contain anything newer than SYNC
|
// ...so dirty_writes can't contain anything newer than SYNC
|
||||||
has_flushes = has_writes || !rm;
|
has_flushes = has_writes || !rm;
|
||||||
}
|
}
|
||||||
if (!rm)
|
if (rm)
|
||||||
{
|
{
|
||||||
op_queue[j++] = op_queue[i];
|
op_queue.erase(op_queue.begin()+op_queue_pos, op_queue.begin()+op_queue_pos+1);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
op_queue_pos++;
|
||||||
|
}
|
||||||
|
if (continuing_ops == 2)
|
||||||
|
{
|
||||||
|
goto restart;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
op_queue.resize(j);
|
continuing_ops = 0;
|
||||||
if (continuing_ops == 2)
|
|
||||||
{
|
|
||||||
goto restart;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
continuing_ops = 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint32_t is_power_of_two(uint64_t value)
|
static uint32_t is_power_of_two(uint64_t value)
|
||||||
|
@ -433,21 +434,30 @@ void cluster_client_t::copy_write(cluster_op_t *op, std::map<object_id, cluster_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t & wr)
|
void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr)
|
||||||
{
|
{
|
||||||
wr.state = CACHE_DIRTY | CACHE_REPEATING;
|
wr->state = CACHE_REPEATING;
|
||||||
cluster_op_t *op = new cluster_op_t;
|
cluster_op_t *op = new cluster_op_t;
|
||||||
op->flags = OP_FLUSH_BUFFER;
|
op->flags = OP_FLUSH_BUFFER;
|
||||||
op->opcode = OSD_OP_WRITE;
|
op->opcode = OSD_OP_WRITE;
|
||||||
op->inode = oid.inode;
|
op->inode = oid.inode;
|
||||||
op->offset = oid.stripe;
|
op->offset = oid.stripe;
|
||||||
op->len = wr.len;
|
op->len = wr->len;
|
||||||
op->iov.push_back(wr.buf, wr.len);
|
op->iov.push_back(wr->buf, wr->len);
|
||||||
op->callback = [](cluster_op_t* op)
|
op->callback = [wr](cluster_op_t* op)
|
||||||
{
|
{
|
||||||
|
if (wr->state == CACHE_REPEATING)
|
||||||
|
{
|
||||||
|
wr->state = CACHE_DIRTY;
|
||||||
|
}
|
||||||
delete op;
|
delete op;
|
||||||
};
|
};
|
||||||
op_queue.push_front(op);
|
op_queue.insert(op_queue.begin(), op);
|
||||||
|
if (continuing_ops)
|
||||||
|
{
|
||||||
|
continuing_ops = 2;
|
||||||
|
op_queue_pos++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int cluster_client_t::continue_rw(cluster_op_t *op)
|
int cluster_client_t::continue_rw(cluster_op_t *op)
|
||||||
|
|
|
@ -74,7 +74,7 @@ class cluster_client_t
|
||||||
int retry_timeout_id = 0;
|
int retry_timeout_id = 0;
|
||||||
uint64_t op_id = 1;
|
uint64_t op_id = 1;
|
||||||
std::vector<cluster_op_t*> offline_ops;
|
std::vector<cluster_op_t*> offline_ops;
|
||||||
std::deque<cluster_op_t*> op_queue;
|
std::vector<cluster_op_t*> op_queue;
|
||||||
std::map<object_id, cluster_buffer_t> dirty_buffers;
|
std::map<object_id, cluster_buffer_t> dirty_buffers;
|
||||||
std::set<osd_num_t> dirty_osds;
|
std::set<osd_num_t> dirty_osds;
|
||||||
uint64_t dirty_bytes = 0, dirty_ops = 0;
|
uint64_t dirty_bytes = 0, dirty_ops = 0;
|
||||||
|
@ -83,6 +83,7 @@ class cluster_client_t
|
||||||
ring_consumer_t consumer;
|
ring_consumer_t consumer;
|
||||||
std::vector<std::function<void(void)>> on_ready_hooks;
|
std::vector<std::function<void(void)>> on_ready_hooks;
|
||||||
int continuing_ops = 0;
|
int continuing_ops = 0;
|
||||||
|
int op_queue_pos = 0;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
etcd_state_client_t st_cli;
|
etcd_state_client_t st_cli;
|
||||||
|
@ -99,7 +100,7 @@ public:
|
||||||
void continue_ops(bool up_retry = false);
|
void continue_ops(bool up_retry = false);
|
||||||
protected:
|
protected:
|
||||||
bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
|
bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
|
||||||
void flush_buffer(const object_id & oid, cluster_buffer_t & wr);
|
void flush_buffer(const object_id & oid, cluster_buffer_t *wr);
|
||||||
void on_load_config_hook(json11::Json::object & config);
|
void on_load_config_hook(json11::Json::object & config);
|
||||||
void on_load_pgs_hook(bool success);
|
void on_load_pgs_hook(bool success);
|
||||||
void on_change_hook(json11::Json::object & changes);
|
void on_change_hook(json11::Json::object & changes);
|
||||||
|
|
|
@ -47,7 +47,7 @@ void configure_single_pg_pool(cluster_client_t *cli)
|
||||||
cli->st_cli.on_change_hook(changes);
|
cli->st_cli.on_change_hook(changes);
|
||||||
}
|
}
|
||||||
|
|
||||||
int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c)
|
int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function<void()> cb = NULL)
|
||||||
{
|
{
|
||||||
printf("Post write %lx+%lx\n", offset, len);
|
printf("Post write %lx+%lx\n", offset, len);
|
||||||
int *r = new int;
|
int *r = new int;
|
||||||
|
@ -59,7 +59,7 @@ int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c)
|
||||||
op->len = len;
|
op->len = len;
|
||||||
op->iov.push_back(malloc_or_die(len), len);
|
op->iov.push_back(malloc_or_die(len), len);
|
||||||
memset(op->iov.buf[0].iov_base, c, len);
|
memset(op->iov.buf[0].iov_base, c, len);
|
||||||
op->callback = [r](cluster_op_t *op)
|
op->callback = [r, cb](cluster_op_t *op)
|
||||||
{
|
{
|
||||||
if (*r == -1)
|
if (*r == -1)
|
||||||
printf("Error: Not allowed to complete yet\n");
|
printf("Error: Not allowed to complete yet\n");
|
||||||
|
@ -68,6 +68,8 @@ int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c)
|
||||||
free(op->iov.buf[0].iov_base);
|
free(op->iov.buf[0].iov_base);
|
||||||
printf("Done write %lx+%lx r=%d\n", op->offset, op->len, op->retval);
|
printf("Done write %lx+%lx r=%d\n", op->offset, op->len, op->retval);
|
||||||
delete op;
|
delete op;
|
||||||
|
if (cb != NULL)
|
||||||
|
cb();
|
||||||
};
|
};
|
||||||
cli->execute(op);
|
cli->execute(op);
|
||||||
return r;
|
return r;
|
||||||
|
@ -310,6 +312,28 @@ void test1()
|
||||||
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
|
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
|
||||||
check_completed(r1);
|
check_completed(r1);
|
||||||
|
|
||||||
|
// Check disconnect inside operation callback (reenterability)
|
||||||
|
// Probably doesn't happen too often, but possible in theory
|
||||||
|
r1 = test_write(cli, 0, 0x1000, 0x60, [cli]()
|
||||||
|
{
|
||||||
|
pretend_disconnected(cli, 1);
|
||||||
|
});
|
||||||
|
r2 = test_write(cli, 0x1000, 0x1000, 0x61);
|
||||||
|
check_op_count(cli, 1, 2);
|
||||||
|
can_complete(r1);
|
||||||
|
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
|
||||||
|
check_completed(r1);
|
||||||
|
check_disconnected(cli, 1);
|
||||||
|
pretend_connected(cli, 1);
|
||||||
|
cli->continue_ops(true);
|
||||||
|
check_op_count(cli, 1, 2);
|
||||||
|
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
|
||||||
|
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);
|
||||||
|
check_op_count(cli, 1, 1);
|
||||||
|
can_complete(r2);
|
||||||
|
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);
|
||||||
|
check_completed(r2);
|
||||||
|
|
||||||
// Free client
|
// Free client
|
||||||
delete cli;
|
delete cli;
|
||||||
delete tfd;
|
delete tfd;
|
||||||
|
|
Loading…
Reference in New Issue