Fix and improve parallel allocation
- Do not try to allocate more DB blocks in an inode block until it's "confirmed" and "locked" by the first write - Do not recheck for new zero DB blocks on first write into an inode block - a CAS failure means someone else is already writing into it - Throw new allocation blocks away regardless of whether the known_version is 0 on a CAS failureantietcd
parent
a64f0d1f73
commit
511bc3df1c
165
src/kv_db.cpp
165
src/kv_db.cpp
|
@ -123,6 +123,13 @@ struct kv_continue_write_t
|
||||||
std::function<void(int)> cb;
|
std::function<void(int)> cb;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct kv_alloc_block_t
|
||||||
|
{
|
||||||
|
uint64_t offset;
|
||||||
|
bool writing;
|
||||||
|
bool confirmed;
|
||||||
|
};
|
||||||
|
|
||||||
struct kv_db_t
|
struct kv_db_t
|
||||||
{
|
{
|
||||||
cluster_client_t *cli = NULL;
|
cluster_client_t *cli = NULL;
|
||||||
|
@ -137,7 +144,7 @@ struct kv_db_t
|
||||||
uint64_t evict_unused_age = 1000;
|
uint64_t evict_unused_age = 1000;
|
||||||
uint64_t evict_max_misses = 10;
|
uint64_t evict_max_misses = 10;
|
||||||
uint64_t evict_attempts_per_level = 3;
|
uint64_t evict_attempts_per_level = 3;
|
||||||
uint64_t allocate_blocks = 4;
|
uint64_t max_allocate_blocks = 4;
|
||||||
uint64_t log_level = 1;
|
uint64_t log_level = 1;
|
||||||
|
|
||||||
// state
|
// state
|
||||||
|
@ -146,7 +153,7 @@ struct kv_db_t
|
||||||
int base_block_level = 0;
|
int base_block_level = 0;
|
||||||
int usage_counter = 1;
|
int usage_counter = 1;
|
||||||
int allocating_block_pos = 0;
|
int allocating_block_pos = 0;
|
||||||
std::vector<uint64_t> allocating_blocks;
|
std::vector<kv_alloc_block_t> allocating_blocks;
|
||||||
std::set<uint64_t> block_levels;
|
std::set<uint64_t> block_levels;
|
||||||
std::map<uint64_t, kv_block_t> block_cache;
|
std::map<uint64_t, kv_block_t> block_cache;
|
||||||
std::map<uint64_t, uint64_t> known_versions;
|
std::map<uint64_t, uint64_t> known_versions;
|
||||||
|
@ -160,6 +167,8 @@ struct kv_db_t
|
||||||
|
|
||||||
uint64_t alloc_block();
|
uint64_t alloc_block();
|
||||||
void clear_allocation_block(uint64_t offset);
|
void clear_allocation_block(uint64_t offset);
|
||||||
|
void confirm_allocation_block(uint64_t offset);
|
||||||
|
void stop_writing_new(uint64_t offset);
|
||||||
|
|
||||||
void open(inode_t inode_id, json11::Json cfg, std::function<void(int)> cb);
|
void open(inode_t inode_id, json11::Json cfg, std::function<void(int)> cb);
|
||||||
void set_config(json11::Json cfg);
|
void set_config(json11::Json cfg);
|
||||||
|
@ -539,7 +548,7 @@ void kv_db_t::set_config(json11::Json cfg)
|
||||||
this->evict_attempts_per_level = cfg["kv_evict_attempts_per_level"].is_null() ? 3 : cfg["kv_evict_attempts_per_level"].uint64_value();
|
this->evict_attempts_per_level = cfg["kv_evict_attempts_per_level"].is_null() ? 3 : cfg["kv_evict_attempts_per_level"].uint64_value();
|
||||||
this->evict_unused_age = cfg["kv_evict_unused_age"].is_null() ? 1000 : cfg["kv_evict_unused_age"].uint64_value();
|
this->evict_unused_age = cfg["kv_evict_unused_age"].is_null() ? 1000 : cfg["kv_evict_unused_age"].uint64_value();
|
||||||
this->cache_max_blocks = this->memory_limit / this->kv_block_size;
|
this->cache_max_blocks = this->memory_limit / this->kv_block_size;
|
||||||
this->allocate_blocks = cfg["kv_allocate_blocks"].uint64_value() ? cfg["kv_allocate_blocks"].uint64_value() : 4;
|
this->max_allocate_blocks = cfg["kv_allocate_blocks"].uint64_value() ? cfg["kv_allocate_blocks"].uint64_value() : 4;
|
||||||
this->log_level = !cfg["kv_log_level"].is_null() ? cfg["kv_log_level"].uint64_value() : 1;
|
this->log_level = !cfg["kv_log_level"].is_null() ? cfg["kv_log_level"].uint64_value() : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -567,49 +576,96 @@ void kv_db_t::close(std::function<void()> cb)
|
||||||
|
|
||||||
uint64_t kv_db_t::alloc_block()
|
uint64_t kv_db_t::alloc_block()
|
||||||
{
|
{
|
||||||
// round robin between <allocate_blocks> blocks
|
// select from at least <max_allocate_blocks> allocation blocks
|
||||||
while (allocating_blocks.size() < allocate_blocks)
|
while (allocating_blocks.size() < max_allocate_blocks)
|
||||||
{
|
{
|
||||||
allocating_blocks.push_back(UINT64_MAX);
|
allocating_blocks.push_back({ .offset = UINT64_MAX });
|
||||||
}
|
}
|
||||||
if (allocating_blocks[allocating_block_pos] == UINT64_MAX)
|
bool found = false;
|
||||||
|
for (int i = 0; i < allocating_blocks.size(); i++)
|
||||||
{
|
{
|
||||||
// try v0 and only v0 of a new inode block
|
int next = (allocating_block_pos+i) % allocating_blocks.size();
|
||||||
while (known_versions[next_free/ino_block_size] != 0)
|
if (allocating_blocks[allocating_block_pos].offset != UINT64_MAX &&
|
||||||
|
allocating_blocks[next].confirmed && !allocating_blocks[next].writing)
|
||||||
{
|
{
|
||||||
next_free += ino_block_size;
|
allocating_block_pos = next;
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
allocating_blocks[allocating_block_pos] = next_free;
|
}
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
// Allow to allocate more new blocks in parallel than <max_allocate_blocks>
|
||||||
|
allocating_blocks.push_back({ .offset = UINT64_MAX });
|
||||||
|
allocating_block_pos = allocating_blocks.size()-1;
|
||||||
|
}
|
||||||
|
if (allocating_blocks[allocating_block_pos].offset == UINT64_MAX)
|
||||||
|
{
|
||||||
|
// allocate new blocks in the end
|
||||||
|
auto known_max = known_versions.end();
|
||||||
|
while (known_max != known_versions.begin())
|
||||||
|
{
|
||||||
|
known_max--;
|
||||||
|
// try v0 and only v0 of a new inode block
|
||||||
|
if (known_max->second != 0)
|
||||||
|
{
|
||||||
|
auto probably_unused = (known_max->first+1)*ino_block_size;
|
||||||
|
if (next_free < probably_unused)
|
||||||
|
next_free = probably_unused;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
allocating_blocks[allocating_block_pos] = {
|
||||||
|
.offset = next_free,
|
||||||
|
.writing = false,
|
||||||
|
.confirmed = false,
|
||||||
|
};
|
||||||
next_free += ino_block_size;
|
next_free += ino_block_size;
|
||||||
}
|
}
|
||||||
auto pos = allocating_blocks[allocating_block_pos];
|
auto pos = allocating_blocks[allocating_block_pos].offset;
|
||||||
allocating_blocks[allocating_block_pos] += kv_block_size;
|
allocating_blocks[allocating_block_pos].writing = true;
|
||||||
if (!(allocating_blocks[allocating_block_pos] % ino_block_size))
|
allocating_blocks[allocating_block_pos].offset += kv_block_size;
|
||||||
|
if (!(allocating_blocks[allocating_block_pos].offset % ino_block_size))
|
||||||
{
|
{
|
||||||
// Allow online reconfiguration
|
// Allow to reconfigure <max_allocate_blocks> online
|
||||||
if (allocating_blocks.size() > allocate_blocks)
|
if (allocating_blocks.size() > max_allocate_blocks)
|
||||||
allocating_blocks.erase(allocating_blocks.begin()+allocating_block_pos, allocating_blocks.begin()+allocating_block_pos+1);
|
allocating_blocks.erase(allocating_blocks.begin()+allocating_block_pos, allocating_blocks.begin()+allocating_block_pos+1);
|
||||||
else
|
else
|
||||||
allocating_blocks[allocating_block_pos] = UINT64_MAX;
|
allocating_blocks[allocating_block_pos].offset = UINT64_MAX;
|
||||||
}
|
}
|
||||||
allocating_block_pos = (allocating_block_pos+1) % allocating_blocks.size();
|
|
||||||
assert(block_cache.find(pos) == block_cache.end());
|
assert(block_cache.find(pos) == block_cache.end());
|
||||||
return pos;
|
return pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
void kv_db_t::clear_allocation_block(uint64_t offset)
|
void kv_db_t::clear_allocation_block(uint64_t offset)
|
||||||
{
|
{
|
||||||
// We want to be first when writing the first KV block into a new inode block
|
for (int i = 0; i < allocating_blocks.size(); i++)
|
||||||
// After it other writers may modify already placed blocks and the version
|
|
||||||
// will increase, but they won't use free space in it for new blocks
|
|
||||||
if (known_versions[offset/ino_block_size] == 0)
|
|
||||||
{
|
{
|
||||||
for (int i = 0; i < allocating_blocks.size(); i++)
|
if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size)
|
||||||
{
|
{
|
||||||
if (allocating_blocks[i]/ino_block_size == offset/ino_block_size)
|
allocating_blocks[i].offset = UINT64_MAX;
|
||||||
{
|
}
|
||||||
allocating_blocks[i] = UINT64_MAX;
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void kv_db_t::confirm_allocation_block(uint64_t offset)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < allocating_blocks.size(); i++)
|
||||||
|
{
|
||||||
|
if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size)
|
||||||
|
{
|
||||||
|
allocating_blocks[i].confirmed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void kv_db_t::stop_writing_new(uint64_t offset)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < allocating_blocks.size(); i++)
|
||||||
|
{
|
||||||
|
if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size)
|
||||||
|
{
|
||||||
|
allocating_blocks[i].writing = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -711,7 +767,10 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version)
|
||||||
{
|
{
|
||||||
if (db->known_versions[offset/db->ino_block_size] < version)
|
if (db->known_versions[offset/db->ino_block_size] < version)
|
||||||
{
|
{
|
||||||
db->clear_allocation_block(offset);
|
if (db->known_versions[offset/db->ino_block_size] == 0)
|
||||||
|
{
|
||||||
|
db->clear_allocation_block(offset);
|
||||||
|
}
|
||||||
auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size);
|
auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size);
|
||||||
while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size)
|
while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size)
|
||||||
{
|
{
|
||||||
|
@ -1230,14 +1289,41 @@ static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, const std:
|
||||||
return blk;
|
return blk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb);
|
||||||
|
|
||||||
|
static void place_again(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb)
|
||||||
|
{
|
||||||
|
auto old_offset = blk->offset;
|
||||||
|
auto new_offset = db->alloc_block();
|
||||||
|
del_block_level(db, blk);
|
||||||
|
std::swap(db->block_cache[new_offset], db->block_cache[old_offset]);
|
||||||
|
db->block_cache.erase(old_offset);
|
||||||
|
auto new_blk = &db->block_cache[new_offset];
|
||||||
|
new_blk->offset = new_offset;
|
||||||
|
new_blk->invalidated = false;
|
||||||
|
add_block_level(db, new_blk);
|
||||||
|
write_new_block(db, new_blk, cb);
|
||||||
|
}
|
||||||
|
|
||||||
static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb)
|
static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb)
|
||||||
{
|
{
|
||||||
write_block(db, blk, [=](int res)
|
write_block(db, blk, [=](int res)
|
||||||
{
|
{
|
||||||
|
db->stop_writing_new(blk->offset);
|
||||||
if (res == -EINTR)
|
if (res == -EINTR)
|
||||||
{
|
{
|
||||||
// CAS failure => re-read, then, if not zero, find position again and retry
|
// CAS failure => re-read, then, if not zero, find position again and retry
|
||||||
db->clear_allocation_block(blk->offset);
|
if (!(blk->offset % db->ino_block_size))
|
||||||
|
{
|
||||||
|
// Any failed write of the first block within an inode block
|
||||||
|
// means that someone else is already allocating blocks in it,
|
||||||
|
// so we HAVE to move to the next inode block immediately
|
||||||
|
db->clear_allocation_block(blk->offset);
|
||||||
|
place_again(db, blk, cb);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// On the other hand, if the block is already "ours", then live parts
|
||||||
|
// of it may change and we MAY recheck if the block is still zero on CAS failure
|
||||||
cluster_op_t *op = new cluster_op_t;
|
cluster_op_t *op = new cluster_op_t;
|
||||||
op->opcode = OSD_OP_READ;
|
op->opcode = OSD_OP_READ;
|
||||||
op->inode = db->inode_id;
|
op->inode = db->inode_id;
|
||||||
|
@ -1264,16 +1350,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Block is already occupied => place again
|
// Block is already occupied => place again
|
||||||
auto old_offset = blk->offset;
|
place_again(db, blk, cb);
|
||||||
auto new_offset = db->alloc_block();
|
|
||||||
del_block_level(db, blk);
|
|
||||||
std::swap(db->block_cache[new_offset], db->block_cache[old_offset]);
|
|
||||||
db->block_cache.erase(old_offset);
|
|
||||||
auto new_blk = &db->block_cache[new_offset];
|
|
||||||
new_blk->offset = new_offset;
|
|
||||||
new_blk->invalidated = false;
|
|
||||||
add_block_level(db, new_blk);
|
|
||||||
write_new_block(db, new_blk, cb);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
db->cli->execute(op);
|
db->cli->execute(op);
|
||||||
|
@ -1281,6 +1358,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
||||||
else if (res != 0)
|
else if (res != 0)
|
||||||
{
|
{
|
||||||
// Other failure => free the new unreferenced block and die
|
// Other failure => free the new unreferenced block and die
|
||||||
|
db->clear_allocation_block(blk->offset);
|
||||||
del_block_level(db, blk);
|
del_block_level(db, blk);
|
||||||
db->block_cache.erase(blk->offset);
|
db->block_cache.erase(blk->offset);
|
||||||
cb(res > 0 ? -EIO : res, NULL);
|
cb(res > 0 ? -EIO : res, NULL);
|
||||||
|
@ -1288,6 +1366,12 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// OK
|
// OK
|
||||||
|
if (!(blk->offset % db->ino_block_size))
|
||||||
|
{
|
||||||
|
// A successful first write into a new allocation block
|
||||||
|
// confirms that it's now "locked" by us
|
||||||
|
db->confirm_allocation_block(blk->offset);
|
||||||
|
}
|
||||||
cb(0, blk);
|
cb(0, blk);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -1405,6 +1489,7 @@ void kv_op_t::create_root()
|
||||||
{
|
{
|
||||||
if (res == -EINTR)
|
if (res == -EINTR)
|
||||||
{
|
{
|
||||||
|
db->clear_allocation_block(blk->offset);
|
||||||
auto blk_offset = blk->offset;
|
auto blk_offset = blk->offset;
|
||||||
del_block_level(db, blk);
|
del_block_level(db, blk);
|
||||||
db->block_cache.erase(blk_offset);
|
db->block_cache.erase(blk_offset);
|
||||||
|
@ -1413,6 +1498,8 @@ void kv_op_t::create_root()
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
db->stop_writing_new(blk->offset);
|
||||||
|
db->confirm_allocation_block(blk->offset);
|
||||||
db->stop_updating(blk);
|
db->stop_updating(blk);
|
||||||
finish(res);
|
finish(res);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue