Compare commits

...

2 Commits

Author SHA1 Message Date
Vitaliy Filippov dd4b0aed2b Support scattered write in node.js binding 2024-07-31 01:17:06 +03:00
Vitaliy Filippov 42851a061c Always continue operations to not miss resuming after the lack of PG primary
Should fix spurious client hangs during PG primary switchover
2024-07-31 01:17:03 +03:00
3 changed files with 38 additions and 15 deletions

View File

@ -24,6 +24,7 @@ public:
}
iovec iov;
std::vector<iovec> iov_list;
NodeVitastorImage *img = NULL;
int op = 0;
uint64_t offset = 0, len = 0, version = 0;
@ -141,10 +142,8 @@ NAN_METHOD(NodeVitastor::Read)
static NodeVitastorRequest* getWriteRequest(const Nan::FunctionCallbackInfo<v8::Value> & info, int argpos)
{
uint64_t offset = Nan::To<int64_t>(info[argpos+0]).FromJust();
char *buf = node::Buffer::Data(info[argpos+1]);
uint64_t len = node::Buffer::Length(info[argpos+1]);
const auto & bufarg = info[argpos+1];
uint64_t version = 0;
if (!info[argpos+2].IsEmpty() && info[argpos+2]->IsObject())
{
auto key = Nan::New<v8::String>("version").ToLocalChecked();
@ -159,14 +158,33 @@ static NodeVitastorRequest* getWriteRequest(const Nan::FunctionCallbackInfo<v8::
auto req = new NodeVitastorRequest(callback);
req->offset = offset;
req->len = len;
req->version = version;
req->iov = { .iov_base = buf, .iov_len = req->len };
if (bufarg->IsArray())
{
auto buffers = bufarg.As<v8::Array>();
req->len = 0;
for (uint32_t i = 0; i < buffers->Length(); i++)
{
auto buffer_obj = Nan::Get(buffers, i).ToLocalChecked();
char *buf = node::Buffer::Data(buffer_obj);
uint64_t len = node::Buffer::Length(buffer_obj);
req->iov_list.push_back({ .iov_base = buf, .iov_len = len });
req->len += len;
}
}
else
{
char *buf = node::Buffer::Data(bufarg);
uint64_t len = node::Buffer::Length(bufarg);
req->iov = { .iov_base = buf, .iov_len = req->len };
req->len = len;
}
return req;
}
// write(pool, inode, offset, buffer, { version }?, callback(err))
// write(pool, inode, offset, buf: Buffer | Buffer[], { version }?, callback(err))
NAN_METHOD(NodeVitastor::Write)
{
TRACE("NodeVitastor::Write");
@ -179,7 +197,10 @@ NAN_METHOD(NodeVitastor::Write)
auto req = getWriteRequest(info, 2);
std::unique_lock<std::mutex> lock(self->mu);
vitastor_c_write(self->c, ((pool << (64-POOL_ID_BITS)) | inode), req->offset, req->len, req->version, &req->iov, 1, on_write_finish, req);
vitastor_c_write(self->c, ((pool << (64-POOL_ID_BITS)) | inode), req->offset, req->len, req->version,
req->iov_list.size() ? req->iov_list.data() : &req->iov,
req->iov_list.size() ? req->iov_list.size() : 1,
on_write_finish, req);
}
// sync(callback(err))
@ -421,7 +442,10 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
else if (req->op == NODE_VITASTOR_WRITE)
{
uint64_t ino = vitastor_c_inode_get_num(watch);
vitastor_c_write(cli->c, ino, req->offset, req->len, req->version, &req->iov, 1, NodeVitastor::on_write_finish, req);
vitastor_c_write(cli->c, ino, req->offset, req->len, req->version,
req->iov_list.size() ? req->iov_list.data() : &req->iov,
req->iov_list.size() ? req->iov_list.size() : 1,
NodeVitastor::on_write_finish, req);
}
else if (req->op == NODE_VITASTOR_SYNC)
{

View File

@ -19,7 +19,7 @@ public:
static NAN_METHOD(Create);
// read(pool, inode, offset, len, callback(err, buffer, version))
static NAN_METHOD(Read);
// write(pool, inode, offset, buffer, { version }?, callback(err))
// write(pool, inode, offset, buf: Buffer | Buffer[], { version }?, callback(err))
static NAN_METHOD(Write);
// sync(callback(err))
static NAN_METHOD(Sync);
@ -56,7 +56,7 @@ public:
static NAN_METHOD(Create);
// read(offset, len, callback(err, buffer, version))
static NAN_METHOD(Read);
// write(offset, buffer, { version }?, callback(err))
// write(offset, buf: Buffer | Buffer[], { version }?, callback(err))
static NAN_METHOD(Write);
// sync(callback(err))
static NAN_METHOD(Sync);

View File

@ -452,11 +452,10 @@ void cluster_client_t::on_change_pg_state_hook(pool_id_t pool_id, pg_num_t pg_nu
if (pg_cfg.cur_primary != prev_primary)
{
// Repeat this PG operations because an OSD which stopped being primary may not fsync operations
if (wb->repeat_ops_for(this, 0, pool_id, pg_num) > 0)
{
continue_ops();
}
wb->repeat_ops_for(this, 0, pool_id, pg_num);
}
// Always continue to resume operations hung because of lack of the primary OSD
continue_ops();
}
bool cluster_client_t::get_immediate_commit(uint64_t inode)
@ -1066,11 +1065,11 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i)
!pg_it->second.pause && pg_it->second.cur_primary)
{
osd_num_t primary_osd = pg_it->second.cur_primary;
part->osd_num = primary_osd;
auto peer_it = msgr.osd_peer_fds.find(primary_osd);
if (peer_it != msgr.osd_peer_fds.end())
{
int peer_fd = peer_it->second;
part->osd_num = primary_osd;
part->flags |= PART_SENT;
op->inflight_count++;
uint64_t pg_bitmap_size = (pool_cfg.data_block_size / pool_cfg.bitmap_granularity / 8) * (