Create inode, then direntry, not direntry, then inode; retry ID collisions

master
Vitaliy Filippov 2024-03-03 00:21:05 +03:00
parent 6213fbd8c6
commit 88f9d18be3
1 changed files with 98 additions and 84 deletions

View File

@ -76,6 +76,7 @@ struct kv_create_state
uint64_t verf = 0;
uint64_t dir_ino = 0;
std::string filename;
int res = 0;
uint64_t new_id = 0;
json11::Json::object attrobj;
json11::Json attrs;
@ -84,8 +85,14 @@ struct kv_create_state
std::function<void(int res)> cb;
};
static void kv_do_create(kv_create_state *st)
static void kv_continue_create(kv_create_state *st, int state)
{
if (state == 0) {}
else if (state == 1) goto resume_1;
else if (state == 2) goto resume_2;
else if (state == 3) goto resume_3;
else if (state == 4) goto resume_4;
else if (state == 5) goto resume_5;
if (st->self->parent->trace)
fprintf(stderr, "[%d] CREATE %ju/%s ATTRS %s\n", st->self->nfs_fd, st->dir_ino, st->filename.c_str(), json11::Json(st->attrobj).dump().c_str());
if (st->filename == "" || st->filename.find("/") != std::string::npos)
@ -98,101 +105,108 @@ static void kv_do_create(kv_create_state *st)
st->attrobj["mtime"] = nfstime_now_str();
if (st->attrobj.find("atime") == st->attrobj.end())
st->attrobj["atime"] = st->attrobj["mtime"];
st->attrs = std::move(st->attrobj);
resume_1:
// Generate inode ID
allocate_new_id(st->self, [st](int res, uint64_t new_id)
{
if (res < 0)
{
auto cb = std::move(st->cb);
cb(res);
return;
}
st->res = res;
st->new_id = new_id;
kv_continue_create(st, 2);
});
return;
resume_2:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
st->self->parent->db->set(kv_inode_key(st->new_id), st->attrs.dump().c_str(), [st](int res)
{
st->res = res;
kv_continue_create(st, 3);
}, [st](int res, const std::string & value)
{
return res == -ENOENT;
});
return;
resume_3:
if (st->res == -EAGAIN)
{
// Inode ID generator failure - retry
goto resume_1;
}
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
{
auto direntry = json11::Json::object{ { "ino", st->new_id } };
if (st->attrobj.find("type") != st->attrobj.end() &&
st->attrobj["type"].string_value() == "dir")
if (st->attrs["type"].string_value() == "dir")
{
direntry["type"] = "dir";
}
st->attrs = std::move(st->attrobj);
st->direntry_text = json11::Json(direntry).dump().c_str();
// Set direntry
st->self->parent->db->set(kv_direntry_key(st->dir_ino, st->filename), st->direntry_text, [st](int res)
}
// Set direntry
st->dup_ino = 0;
st->self->parent->db->set(kv_direntry_key(st->dir_ino, st->filename), st->direntry_text, [st](int res)
{
st->res = res;
kv_continue_create(st, 4);
}, [st](int res, const std::string & value)
{
// CAS compare - check that the key doesn't exist
if (res == 0)
{
if (res < 0)
std::string err;
auto direntry = json11::Json::parse(value, err);
if (err != "")
{
st->self->parent->kvfs->unallocated_ids.push_back(st->new_id);
if (res == -EAGAIN)
{
if (st->dup_ino)
{
st->new_id = st->dup_ino;
res = 0;
}
else
res = -EEXIST;
}
else
fprintf(stderr, "create %ju/%s failed: %s (code %d)\n", st->dir_ino, st->filename.c_str(), strerror(-res), res);
auto cb = std::move(st->cb);
cb(res);
fprintf(stderr, "Invalid JSON in direntry %s = %s: %s, overwriting\n",
kv_direntry_key(st->dir_ino, st->filename).c_str(), value.c_str(), err.c_str());
return true;
}
else
if (st->exclusive && direntry["verf"].uint64_value() == st->verf)
{
st->self->parent->db->set(kv_inode_key(st->new_id), st->attrs.dump().c_str(), [st](int res)
{
if (res == -EAGAIN)
{
res = -EEXIST;
}
if (res < 0)
{
st->self->parent->db->del(kv_direntry_key(st->dir_ino, st->filename), [st, res](int del_res)
{
if (!del_res)
{
st->self->parent->kvfs->unallocated_ids.push_back(st->new_id);
}
auto cb = std::move(st->cb);
cb(res);
}, [st](int res, const std::string & value)
{
return res != -ENOENT && value == st->direntry_text;
});
}
else
{
auto cb = std::move(st->cb);
cb(0);
}
}, [st](int res, const std::string & value)
{
return res == -ENOENT;
});
}
}, [st](int res, const std::string & value)
{
// CAS compare - check that the key doesn't exist
if (res == 0)
{
std::string err;
auto direntry = json11::Json::parse(value, err);
if (err != "")
{
fprintf(stderr, "Invalid JSON in direntry %s = %s: %s, overwriting\n",
kv_direntry_key(st->dir_ino, st->filename).c_str(), value.c_str(), err.c_str());
return true;
}
if (st->exclusive && direntry["verf"].uint64_value() == st->verf)
{
st->dup_ino = direntry["ino"].uint64_value();
return false;
}
st->dup_ino = direntry["ino"].uint64_value();
return false;
}
return true;
});
return false;
}
return true;
});
return;
resume_4:
if (st->res == -EAGAIN)
{
// Direntry already exists
st->self->parent->db->del(kv_inode_key(st->new_id), [st](int res)
{
st->res = res;
kv_continue_create(st, 5);
});
resume_5:
if (st->res < 0)
{
fprintf(stderr, "failed to delete duplicate inode %ju left from create %s (code %d)\n", st->new_id, strerror(-st->res), st->res);
}
else
{
st->self->parent->kvfs->unallocated_ids.push_back(st->new_id);
}
if (st->dup_ino)
{
// Successfully created by the previous "exclusive" request
st->new_id = st->dup_ino;
}
st->res = st->dup_ino ? 0 : -EEXIST;
}
auto cb = std::move(st->cb);
cb(st->res);
}
static void kv_create_setattr(json11::Json::object & attrobj, sattr3 & sattr)
@ -260,7 +274,7 @@ int kv_nfs3_create_proc(void *opaque, rpc_op_t *rop)
}
}
st->cb = [st](int res) { kv_create_reply<CREATE3res, CREATE3resok>(st, res); };
kv_do_create(st);
kv_continue_create(st, 0);
return 1;
}
@ -276,7 +290,7 @@ int kv_nfs3_mkdir_proc(void *opaque, rpc_op_t *rop)
st->attrobj["parent_ino"] = st->dir_ino;
kv_create_setattr(st->attrobj, args->attributes);
st->cb = [st](int res) { kv_create_reply<MKDIR3res, MKDIR3resok>(st, res); };
kv_do_create(st);
kv_continue_create(st, 0);
return 1;
}
@ -292,7 +306,7 @@ int kv_nfs3_symlink_proc(void *opaque, rpc_op_t *rop)
st->attrobj["symlink"] = (std::string)args->symlink.symlink_data;
kv_create_setattr(st->attrobj, args->symlink.symlink_attributes);
st->cb = [st](int res) { kv_create_reply<SYMLINK3res, SYMLINK3resok>(st, res); };
kv_do_create(st);
kv_continue_create(st, 0);
return 1;
}
@ -324,6 +338,6 @@ int kv_nfs3_mknod_proc(void *opaque, rpc_op_t *rop)
return 0;
}
st->cb = [st](int res) { kv_create_reply<MKNOD3res, MKNOD3resok>(st, res); };
kv_do_create(st);
kv_continue_create(st, 0);
return 1;
}