|
|
|
@ -37,6 +37,7 @@
|
|
|
|
|
#define KV_RECHECK_NONE 0
|
|
|
|
|
#define KV_RECHECK_LEAF 1
|
|
|
|
|
#define KV_RECHECK_ALL 2
|
|
|
|
|
#define KV_RECHECK_WAIT 3
|
|
|
|
|
|
|
|
|
|
#define KV_CH_ADD 1
|
|
|
|
|
#define KV_CH_DEL 2
|
|
|
|
@ -48,6 +49,10 @@
|
|
|
|
|
#define LEVEL_BITS 8
|
|
|
|
|
#define NO_LEVEL_MASK (((uint64_t)1 << (64-LEVEL_BITS)) - 1)
|
|
|
|
|
|
|
|
|
|
#define BLK_NOCHANGE 0
|
|
|
|
|
#define BLK_RELOADED 1
|
|
|
|
|
#define BLK_UPDATING 2
|
|
|
|
|
|
|
|
|
|
struct __attribute__((__packed__)) kv_stored_block_t
|
|
|
|
|
{
|
|
|
|
|
uint64_t magic;
|
|
|
|
@ -118,6 +123,13 @@ struct kv_continue_write_t
|
|
|
|
|
std::function<void(int)> cb;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
struct kv_alloc_block_t
|
|
|
|
|
{
|
|
|
|
|
uint64_t offset;
|
|
|
|
|
bool writing;
|
|
|
|
|
bool confirmed;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
struct kv_db_t
|
|
|
|
|
{
|
|
|
|
|
cluster_client_t *cli = NULL;
|
|
|
|
@ -132,7 +144,7 @@ struct kv_db_t
|
|
|
|
|
uint64_t evict_unused_age = 1000;
|
|
|
|
|
uint64_t evict_max_misses = 10;
|
|
|
|
|
uint64_t evict_attempts_per_level = 3;
|
|
|
|
|
uint64_t allocate_blocks = 4;
|
|
|
|
|
uint64_t max_allocate_blocks = 4;
|
|
|
|
|
uint64_t log_level = 1;
|
|
|
|
|
|
|
|
|
|
// state
|
|
|
|
@ -141,7 +153,7 @@ struct kv_db_t
|
|
|
|
|
int base_block_level = 0;
|
|
|
|
|
int usage_counter = 1;
|
|
|
|
|
int allocating_block_pos = 0;
|
|
|
|
|
std::vector<uint64_t> allocating_blocks;
|
|
|
|
|
std::vector<kv_alloc_block_t> allocating_blocks;
|
|
|
|
|
std::set<uint64_t> block_levels;
|
|
|
|
|
std::map<uint64_t, kv_block_t> block_cache;
|
|
|
|
|
std::map<uint64_t, uint64_t> known_versions;
|
|
|
|
@ -155,6 +167,8 @@ struct kv_db_t
|
|
|
|
|
|
|
|
|
|
uint64_t alloc_block();
|
|
|
|
|
void clear_allocation_block(uint64_t offset);
|
|
|
|
|
void confirm_allocation_block(uint64_t offset);
|
|
|
|
|
void stop_writing_new(uint64_t offset);
|
|
|
|
|
|
|
|
|
|
void open(inode_t inode_id, json11::Json cfg, std::function<void(int)> cb);
|
|
|
|
|
void set_config(json11::Json cfg);
|
|
|
|
@ -190,13 +204,13 @@ protected:
|
|
|
|
|
std::string prev_key_ge, prev_key_lt;
|
|
|
|
|
int cur_level = 0;
|
|
|
|
|
std::vector<kv_path_t> path;
|
|
|
|
|
bool updated = false;
|
|
|
|
|
int updating_on_path = 0;
|
|
|
|
|
int retry = 0;
|
|
|
|
|
bool skip_equal = false;
|
|
|
|
|
|
|
|
|
|
void finish(int res);
|
|
|
|
|
void get();
|
|
|
|
|
int handle_block(int res, bool updated, bool stop_on_split);
|
|
|
|
|
int handle_block(int res, int refresh, bool stop_on_split);
|
|
|
|
|
|
|
|
|
|
void update();
|
|
|
|
|
void update_find();
|
|
|
|
@ -204,7 +218,7 @@ protected:
|
|
|
|
|
void resume_split();
|
|
|
|
|
void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function<void(int)> cb);
|
|
|
|
|
|
|
|
|
|
void next_handle_block(int res, bool updated);
|
|
|
|
|
void next_handle_block(int res, int refresh);
|
|
|
|
|
void next_get();
|
|
|
|
|
void next_go_up();
|
|
|
|
|
};
|
|
|
|
@ -245,6 +259,7 @@ int kv_block_t::parse(uint64_t offset, uint8_t *data, int size)
|
|
|
|
|
fprintf(stderr, "K/V: Invalid block %lu magic, size, type or item count\n", offset);
|
|
|
|
|
return -EILSEQ;
|
|
|
|
|
}
|
|
|
|
|
assert(!this->type);
|
|
|
|
|
this->type = blk->type;
|
|
|
|
|
int pos = blk->data - data;
|
|
|
|
|
this->key_ge = read_string(data, size, &pos);
|
|
|
|
@ -533,7 +548,7 @@ void kv_db_t::set_config(json11::Json cfg)
|
|
|
|
|
this->evict_attempts_per_level = cfg["kv_evict_attempts_per_level"].is_null() ? 3 : cfg["kv_evict_attempts_per_level"].uint64_value();
|
|
|
|
|
this->evict_unused_age = cfg["kv_evict_unused_age"].is_null() ? 1000 : cfg["kv_evict_unused_age"].uint64_value();
|
|
|
|
|
this->cache_max_blocks = this->memory_limit / this->kv_block_size;
|
|
|
|
|
this->allocate_blocks = cfg["kv_allocate_blocks"].uint64_value() ? cfg["kv_allocate_blocks"].uint64_value() : 4;
|
|
|
|
|
this->max_allocate_blocks = cfg["kv_allocate_blocks"].uint64_value() ? cfg["kv_allocate_blocks"].uint64_value() : 4;
|
|
|
|
|
this->log_level = !cfg["kv_log_level"].is_null() ? cfg["kv_log_level"].uint64_value() : 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -561,50 +576,98 @@ void kv_db_t::close(std::function<void()> cb)
|
|
|
|
|
|
|
|
|
|
uint64_t kv_db_t::alloc_block()
|
|
|
|
|
{
|
|
|
|
|
// round robin between <allocate_blocks> blocks
|
|
|
|
|
while (allocating_blocks.size() < allocate_blocks)
|
|
|
|
|
// select from at least <max_allocate_blocks> allocation blocks
|
|
|
|
|
while (allocating_blocks.size() < max_allocate_blocks)
|
|
|
|
|
{
|
|
|
|
|
allocating_blocks.push_back(UINT64_MAX);
|
|
|
|
|
allocating_blocks.push_back({ .offset = UINT64_MAX });
|
|
|
|
|
}
|
|
|
|
|
if (allocating_blocks[allocating_block_pos] == UINT64_MAX)
|
|
|
|
|
bool found = false;
|
|
|
|
|
for (int i = 0; i < allocating_blocks.size(); i++)
|
|
|
|
|
{
|
|
|
|
|
int next = (allocating_block_pos+i) % allocating_blocks.size();
|
|
|
|
|
if (allocating_blocks[allocating_block_pos].offset != UINT64_MAX &&
|
|
|
|
|
allocating_blocks[next].confirmed && !allocating_blocks[next].writing)
|
|
|
|
|
{
|
|
|
|
|
allocating_block_pos = next;
|
|
|
|
|
found = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (!found)
|
|
|
|
|
{
|
|
|
|
|
// Allow to allocate more new blocks in parallel than <max_allocate_blocks>
|
|
|
|
|
allocating_blocks.push_back({ .offset = UINT64_MAX });
|
|
|
|
|
allocating_block_pos = allocating_blocks.size()-1;
|
|
|
|
|
}
|
|
|
|
|
if (allocating_blocks[allocating_block_pos].offset == UINT64_MAX)
|
|
|
|
|
{
|
|
|
|
|
// allocate new blocks in the end
|
|
|
|
|
auto known_max = known_versions.end();
|
|
|
|
|
while (known_max != known_versions.begin())
|
|
|
|
|
{
|
|
|
|
|
known_max--;
|
|
|
|
|
// try v0 and only v0 of a new inode block
|
|
|
|
|
while (known_versions[next_free/ino_block_size] != 0)
|
|
|
|
|
if (known_max->second != 0)
|
|
|
|
|
{
|
|
|
|
|
auto probably_unused = (known_max->first+1)*ino_block_size;
|
|
|
|
|
if (next_free < probably_unused)
|
|
|
|
|
next_free = probably_unused;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
allocating_blocks[allocating_block_pos] = {
|
|
|
|
|
.offset = next_free,
|
|
|
|
|
.writing = false,
|
|
|
|
|
.confirmed = false,
|
|
|
|
|
};
|
|
|
|
|
next_free += ino_block_size;
|
|
|
|
|
}
|
|
|
|
|
allocating_blocks[allocating_block_pos] = next_free;
|
|
|
|
|
next_free += ino_block_size;
|
|
|
|
|
}
|
|
|
|
|
auto pos = allocating_blocks[allocating_block_pos];
|
|
|
|
|
allocating_blocks[allocating_block_pos] += kv_block_size;
|
|
|
|
|
if (!(allocating_blocks[allocating_block_pos] % ino_block_size))
|
|
|
|
|
auto pos = allocating_blocks[allocating_block_pos].offset;
|
|
|
|
|
allocating_blocks[allocating_block_pos].writing = true;
|
|
|
|
|
allocating_blocks[allocating_block_pos].offset += kv_block_size;
|
|
|
|
|
if (!(allocating_blocks[allocating_block_pos].offset % ino_block_size))
|
|
|
|
|
{
|
|
|
|
|
// Allow online reconfiguration
|
|
|
|
|
if (allocating_blocks.size() > allocate_blocks)
|
|
|
|
|
// Allow to reconfigure <max_allocate_blocks> online
|
|
|
|
|
if (allocating_blocks.size() > max_allocate_blocks)
|
|
|
|
|
allocating_blocks.erase(allocating_blocks.begin()+allocating_block_pos, allocating_blocks.begin()+allocating_block_pos+1);
|
|
|
|
|
else
|
|
|
|
|
allocating_blocks[allocating_block_pos] = UINT64_MAX;
|
|
|
|
|
allocating_blocks[allocating_block_pos].offset = UINT64_MAX;
|
|
|
|
|
}
|
|
|
|
|
allocating_block_pos = (allocating_block_pos+1) % allocating_blocks.size();
|
|
|
|
|
assert(block_cache.find(pos) == block_cache.end());
|
|
|
|
|
return pos;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void kv_db_t::clear_allocation_block(uint64_t offset)
|
|
|
|
|
{
|
|
|
|
|
// We want to be first when writing the first KV block into a new inode block
|
|
|
|
|
// After it other writers may modify already placed blocks and the version
|
|
|
|
|
// will increase, but they won't use free space in it for new blocks
|
|
|
|
|
if (known_versions[offset/ino_block_size] == 0)
|
|
|
|
|
for (int i = 0; i < allocating_blocks.size(); i++)
|
|
|
|
|
{
|
|
|
|
|
if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size)
|
|
|
|
|
{
|
|
|
|
|
allocating_blocks[i].offset = UINT64_MAX;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void kv_db_t::confirm_allocation_block(uint64_t offset)
|
|
|
|
|
{
|
|
|
|
|
for (int i = 0; i < allocating_blocks.size(); i++)
|
|
|
|
|
{
|
|
|
|
|
if (allocating_blocks[i]/ino_block_size == offset/ino_block_size)
|
|
|
|
|
if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size)
|
|
|
|
|
{
|
|
|
|
|
allocating_blocks[i] = UINT64_MAX;
|
|
|
|
|
allocating_blocks[i].confirmed = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void kv_db_t::stop_writing_new(uint64_t offset)
|
|
|
|
|
{
|
|
|
|
|
for (int i = 0; i < allocating_blocks.size(); i++)
|
|
|
|
|
{
|
|
|
|
|
if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size)
|
|
|
|
|
{
|
|
|
|
|
allocating_blocks[i].writing = false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool is_zero(void *buf, int size)
|
|
|
|
@ -703,8 +766,11 @@ static void add_block_level(kv_db_t *db, kv_block_t *blk)
|
|
|
|
|
static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version)
|
|
|
|
|
{
|
|
|
|
|
if (db->known_versions[offset/db->ino_block_size] < version)
|
|
|
|
|
{
|
|
|
|
|
if (db->known_versions[offset/db->ino_block_size] == 0)
|
|
|
|
|
{
|
|
|
|
|
db->clear_allocation_block(offset);
|
|
|
|
|
}
|
|
|
|
|
auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size);
|
|
|
|
|
while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size)
|
|
|
|
|
{
|
|
|
|
@ -764,9 +830,17 @@ static void try_evict(kv_db_t *db)
|
|
|
|
|
int misses = 0;
|
|
|
|
|
bool wrapped = false;
|
|
|
|
|
while (db->block_cache.size() > db->cache_max_blocks &&
|
|
|
|
|
(!wrapped || *random_it < random_pos) &&
|
|
|
|
|
(db->evict_max_misses <= 0 || misses < db->evict_max_misses))
|
|
|
|
|
{
|
|
|
|
|
if (random_it == db->block_levels.end() || (*random_it >> (64-LEVEL_BITS)) > evict_level)
|
|
|
|
|
{
|
|
|
|
|
if (wrapped)
|
|
|
|
|
break;
|
|
|
|
|
random_it = db->block_levels.lower_bound(evict_level << (64-LEVEL_BITS));
|
|
|
|
|
wrapped = true;
|
|
|
|
|
}
|
|
|
|
|
else if (wrapped && *random_it >= random_pos)
|
|
|
|
|
break;
|
|
|
|
|
auto b_it = db->block_cache.find((*random_it & NO_LEVEL_MASK) * db->kv_block_size);
|
|
|
|
|
auto blk = &b_it->second;
|
|
|
|
|
if (b_it != db->block_cache.end() && !blk->updating && blk->usage < db->usage_counter)
|
|
|
|
@ -779,13 +853,6 @@ static void try_evict(kv_db_t *db)
|
|
|
|
|
random_it++;
|
|
|
|
|
misses++;
|
|
|
|
|
}
|
|
|
|
|
if (random_it == db->block_levels.end() || (*random_it >> (64-LEVEL_BITS)) > evict_level)
|
|
|
|
|
{
|
|
|
|
|
if (wrapped)
|
|
|
|
|
break;
|
|
|
|
|
random_it = db->block_levels.lower_bound(evict_level << (64-LEVEL_BITS));
|
|
|
|
|
wrapped = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (db->block_cache.size() <= db->cache_max_blocks)
|
|
|
|
|
{
|
|
|
|
@ -795,16 +862,27 @@ static void try_evict(kv_db_t *db)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, bool)> cb)
|
|
|
|
|
static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, int)> cb)
|
|
|
|
|
{
|
|
|
|
|
auto b_it = db->block_cache.find(offset);
|
|
|
|
|
if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE && !b_it->second.invalidated ||
|
|
|
|
|
recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF && !b_it->second.invalidated ||
|
|
|
|
|
b_it->second.updating))
|
|
|
|
|
b_it->second.updating > 0))
|
|
|
|
|
{
|
|
|
|
|
auto blk = &b_it->second;
|
|
|
|
|
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
|
|
|
|
|
{
|
|
|
|
|
// Wait until block update stops
|
|
|
|
|
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
|
|
|
|
|
{
|
|
|
|
|
get_block(db, offset, cur_level, recheck_policy, cb);
|
|
|
|
|
db->run_continue_update(blk_offset);
|
|
|
|
|
});
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// Block already in cache, we can proceed
|
|
|
|
|
b_it->second.usage = db->usage_counter;
|
|
|
|
|
cb(0, false);
|
|
|
|
|
blk->usage = db->usage_counter;
|
|
|
|
|
cb(0, BLK_UPDATING);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
cluster_op_t *op = new cluster_op_t;
|
|
|
|
@ -818,34 +896,51 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
|
|
|
|
|
if (op->retval != op->len)
|
|
|
|
|
{
|
|
|
|
|
// error
|
|
|
|
|
cb(op->retval >= 0 ? -EIO : op->retval, false);
|
|
|
|
|
cb(op->retval >= 0 ? -EIO : op->retval, BLK_NOCHANGE);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (db->block_cache.find(op->offset) != db->block_cache.end() &&
|
|
|
|
|
db->known_versions[op->offset/db->ino_block_size] == op->version)
|
|
|
|
|
invalidate(db, op->offset, op->version);
|
|
|
|
|
auto blk_it = db->block_cache.find(op->offset);
|
|
|
|
|
if (blk_it != db->block_cache.end() &&
|
|
|
|
|
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
|
|
|
|
|
(db->known_versions[op->offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
|
|
|
|
|
{
|
|
|
|
|
auto blk = &db->block_cache.at(op->offset);
|
|
|
|
|
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
|
|
|
|
|
{
|
|
|
|
|
// Wait until block update stops
|
|
|
|
|
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
|
|
|
|
|
{
|
|
|
|
|
get_block(db, offset, cur_level, recheck_policy, cb);
|
|
|
|
|
db->run_continue_update(blk_offset);
|
|
|
|
|
});
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
blk->usage = db->usage_counter;
|
|
|
|
|
cb(0, false);
|
|
|
|
|
cb(0, blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
invalidate(db, op->offset, op->version);
|
|
|
|
|
try_evict(db);
|
|
|
|
|
auto blk = &db->block_cache[op->offset];
|
|
|
|
|
if (blk_it != db->block_cache.end())
|
|
|
|
|
{
|
|
|
|
|
del_block_level(db, blk);
|
|
|
|
|
*blk = {};
|
|
|
|
|
}
|
|
|
|
|
int err = blk->parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len);
|
|
|
|
|
if (err == 0)
|
|
|
|
|
{
|
|
|
|
|
blk->level = cur_level;
|
|
|
|
|
blk->usage = db->usage_counter;
|
|
|
|
|
add_block_level(db, blk);
|
|
|
|
|
cb(0, true);
|
|
|
|
|
cb(0, BLK_RELOADED);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
db->block_cache.erase(op->offset);
|
|
|
|
|
cb(err, false);
|
|
|
|
|
cb(err, BLK_NOCHANGE);
|
|
|
|
|
}
|
|
|
|
|
try_evict(db);
|
|
|
|
|
}
|
|
|
|
|
free(op->iov.buf[0].iov_base);
|
|
|
|
|
delete op;
|
|
|
|
@ -900,9 +995,9 @@ void kv_op_t::finish(int res)
|
|
|
|
|
|
|
|
|
|
void kv_op_t::get()
|
|
|
|
|
{
|
|
|
|
|
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated)
|
|
|
|
|
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, int refresh)
|
|
|
|
|
{
|
|
|
|
|
res = handle_block(res, updated, false);
|
|
|
|
|
res = handle_block(res, refresh, false);
|
|
|
|
|
if (res == -EAGAIN)
|
|
|
|
|
{
|
|
|
|
|
get();
|
|
|
|
@ -939,13 +1034,13 @@ void kv_op_t::get()
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
|
|
|
|
|
int kv_op_t::handle_block(int res, int refresh, bool stop_on_split)
|
|
|
|
|
{
|
|
|
|
|
if (res < 0)
|
|
|
|
|
{
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
this->updated = this->updated || updated;
|
|
|
|
|
this->updating_on_path = this->updating_on_path | refresh;
|
|
|
|
|
auto blk = &db->block_cache.at(cur_block);
|
|
|
|
|
if (opcode != KV_GET && opcode != KV_GET_CACHED)
|
|
|
|
|
{
|
|
|
|
@ -953,7 +1048,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
|
|
|
|
|
// and recheck parent blocks if their versions change. This is required
|
|
|
|
|
// because we have to handle parallel splits of parent blocks correctly
|
|
|
|
|
assert(path.size() > 0);
|
|
|
|
|
path[path.size()-1].version = db->known_versions[cur_block/db->ino_block_size];
|
|
|
|
|
path[path.size()-1].version = blk->invalidated ? 0 : db->known_versions[cur_block/db->ino_block_size];
|
|
|
|
|
}
|
|
|
|
|
if (key < blk->key_ge || blk->key_lt.size() && key >= blk->key_lt)
|
|
|
|
|
{
|
|
|
|
@ -966,16 +1061,26 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
|
|
|
|
|
// We may read P on step (1), get a link to A, and read A on step (4).
|
|
|
|
|
// It will miss data from [c, b).
|
|
|
|
|
// Retry once. If we don't see any updates after retrying - fail with EILSEQ.
|
|
|
|
|
bool fatal = !this->updated && this->retry > 0;
|
|
|
|
|
bool fatal = !this->updating_on_path && this->retry > 0;
|
|
|
|
|
if (fatal || db->log_level > 0)
|
|
|
|
|
{
|
|
|
|
|
fprintf(stderr, "K/V: %sgot unrelated block %lu: key=%s range=[%s, %s) from=[%s, %s)\n",
|
|
|
|
|
fatal ? "Error: " : "Warning: read/update collision: ",
|
|
|
|
|
cur_block, key.c_str(), blk->key_ge.c_str(), blk->key_lt.c_str(), prev_key_ge.c_str(), prev_key_lt.c_str());
|
|
|
|
|
}
|
|
|
|
|
if (this->updated)
|
|
|
|
|
if (fatal)
|
|
|
|
|
{
|
|
|
|
|
this->updated = false;
|
|
|
|
|
blk->dump(db->base_block_level);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
this->recheck_policy = KV_RECHECK_ALL;
|
|
|
|
|
if (this->updating_on_path)
|
|
|
|
|
{
|
|
|
|
|
if (this->updating_on_path & BLK_UPDATING)
|
|
|
|
|
{
|
|
|
|
|
// Wait for "updating" blocks on next run
|
|
|
|
|
this->recheck_policy = KV_RECHECK_WAIT;
|
|
|
|
|
}
|
|
|
|
|
this->updating_on_path = 0;
|
|
|
|
|
this->retry = 0;
|
|
|
|
|
}
|
|
|
|
|
else if (this->retry > 0)
|
|
|
|
@ -984,7 +1089,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
this->updated = false;
|
|
|
|
|
this->updating_on_path = 0;
|
|
|
|
|
this->retry++;
|
|
|
|
|
}
|
|
|
|
|
prev_key_ge = prev_key_lt = "";
|
|
|
|
@ -995,7 +1100,6 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
|
|
|
|
|
path.clear();
|
|
|
|
|
path.push_back((kv_path_t){ .offset = 0 });
|
|
|
|
|
}
|
|
|
|
|
this->recheck_policy = KV_RECHECK_ALL;
|
|
|
|
|
return -EAGAIN;
|
|
|
|
|
}
|
|
|
|
|
if (stop_on_split && (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) &&
|
|
|
|
@ -1028,7 +1132,9 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
|
|
|
|
|
fprintf(stderr, "K/V: Internal block %lu misses boundary for %s\n", cur_block, key.c_str());
|
|
|
|
|
return -EILSEQ;
|
|
|
|
|
}
|
|
|
|
|
auto m = child_it == blk->data.end() ? blk->key_lt : child_it->first;
|
|
|
|
|
auto m = child_it == blk->data.end()
|
|
|
|
|
? (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT
|
|
|
|
|
? blk->right_half : blk->key_lt) : child_it->first;
|
|
|
|
|
child_it--;
|
|
|
|
|
if (child_it->second.size() != sizeof(uint64_t))
|
|
|
|
|
{
|
|
|
|
@ -1184,14 +1290,41 @@ static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, const std:
|
|
|
|
|
return blk;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb);
|
|
|
|
|
|
|
|
|
|
static void place_again(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb)
|
|
|
|
|
{
|
|
|
|
|
auto old_offset = blk->offset;
|
|
|
|
|
auto new_offset = db->alloc_block();
|
|
|
|
|
del_block_level(db, blk);
|
|
|
|
|
std::swap(db->block_cache[new_offset], db->block_cache[old_offset]);
|
|
|
|
|
db->block_cache.erase(old_offset);
|
|
|
|
|
auto new_blk = &db->block_cache[new_offset];
|
|
|
|
|
new_blk->offset = new_offset;
|
|
|
|
|
new_blk->invalidated = false;
|
|
|
|
|
add_block_level(db, new_blk);
|
|
|
|
|
write_new_block(db, new_blk, cb);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb)
|
|
|
|
|
{
|
|
|
|
|
write_block(db, blk, [=](int res)
|
|
|
|
|
{
|
|
|
|
|
db->stop_writing_new(blk->offset);
|
|
|
|
|
if (res == -EINTR)
|
|
|
|
|
{
|
|
|
|
|
// CAS failure => re-read, then, if not zero, find position again and retry
|
|
|
|
|
if (!(blk->offset % db->ino_block_size))
|
|
|
|
|
{
|
|
|
|
|
// Any failed write of the first block within an inode block
|
|
|
|
|
// means that someone else is already allocating blocks in it,
|
|
|
|
|
// so we HAVE to move to the next inode block immediately
|
|
|
|
|
db->clear_allocation_block(blk->offset);
|
|
|
|
|
place_again(db, blk, cb);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// On the other hand, if the block is already "ours", then live parts
|
|
|
|
|
// of it may change and we MAY recheck if the block is still zero on CAS failure
|
|
|
|
|
cluster_op_t *op = new cluster_op_t;
|
|
|
|
|
op->opcode = OSD_OP_READ;
|
|
|
|
|
op->inode = db->inode_id;
|
|
|
|
@ -1218,16 +1351,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// Block is already occupied => place again
|
|
|
|
|
auto old_offset = blk->offset;
|
|
|
|
|
auto new_offset = db->alloc_block();
|
|
|
|
|
del_block_level(db, blk);
|
|
|
|
|
std::swap(db->block_cache[new_offset], db->block_cache[old_offset]);
|
|
|
|
|
db->block_cache.erase(old_offset);
|
|
|
|
|
auto new_blk = &db->block_cache[new_offset];
|
|
|
|
|
new_blk->offset = new_offset;
|
|
|
|
|
new_blk->invalidated = false;
|
|
|
|
|
add_block_level(db, new_blk);
|
|
|
|
|
write_new_block(db, new_blk, cb);
|
|
|
|
|
place_again(db, blk, cb);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
db->cli->execute(op);
|
|
|
|
@ -1235,6 +1359,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|
|
|
|
else if (res != 0)
|
|
|
|
|
{
|
|
|
|
|
// Other failure => free the new unreferenced block and die
|
|
|
|
|
db->clear_allocation_block(blk->offset);
|
|
|
|
|
del_block_level(db, blk);
|
|
|
|
|
db->block_cache.erase(blk->offset);
|
|
|
|
|
cb(res > 0 ? -EIO : res, NULL);
|
|
|
|
@ -1242,6 +1367,12 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// OK
|
|
|
|
|
if (!(blk->offset % db->ino_block_size))
|
|
|
|
|
{
|
|
|
|
|
// A successful first write into a new allocation block
|
|
|
|
|
// confirms that it's now "locked" by us
|
|
|
|
|
db->confirm_allocation_block(blk->offset);
|
|
|
|
|
}
|
|
|
|
|
cb(0, blk);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
@ -1293,28 +1424,15 @@ void kv_op_t::update()
|
|
|
|
|
|
|
|
|
|
void kv_op_t::update_find()
|
|
|
|
|
{
|
|
|
|
|
auto blk_it = db->block_cache.find(cur_block);
|
|
|
|
|
if (blk_it != db->block_cache.end())
|
|
|
|
|
get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, int refresh)
|
|
|
|
|
{
|
|
|
|
|
auto blk = &blk_it->second;
|
|
|
|
|
if (blk->updating)
|
|
|
|
|
{
|
|
|
|
|
// Optimisation: do not recheck the block if it's already being modified
|
|
|
|
|
db->continue_update.emplace(blk->offset, [=]() { update_find(); });
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, bool updated)
|
|
|
|
|
{
|
|
|
|
|
res = handle_block(res, updated, true);
|
|
|
|
|
res = handle_block(res, refresh, true);
|
|
|
|
|
if (res == -EAGAIN)
|
|
|
|
|
{
|
|
|
|
|
db->run_continue_update(checked_block);
|
|
|
|
|
update_find();
|
|
|
|
|
}
|
|
|
|
|
else if (res == -ENOTBLK)
|
|
|
|
|
{
|
|
|
|
|
db->run_continue_update(checked_block);
|
|
|
|
|
if (opcode == KV_SET)
|
|
|
|
|
{
|
|
|
|
|
// Check CAS callback
|
|
|
|
@ -1330,12 +1448,10 @@ void kv_op_t::update_find()
|
|
|
|
|
}
|
|
|
|
|
else if (res == -ECHILD)
|
|
|
|
|
{
|
|
|
|
|
db->run_continue_update(checked_block);
|
|
|
|
|
resume_split();
|
|
|
|
|
}
|
|
|
|
|
else if (res < 0)
|
|
|
|
|
{
|
|
|
|
|
db->run_continue_update(checked_block);
|
|
|
|
|
finish(res);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
@ -1374,6 +1490,7 @@ void kv_op_t::create_root()
|
|
|
|
|
{
|
|
|
|
|
if (res == -EINTR)
|
|
|
|
|
{
|
|
|
|
|
db->clear_allocation_block(blk->offset);
|
|
|
|
|
auto blk_offset = blk->offset;
|
|
|
|
|
del_block_level(db, blk);
|
|
|
|
|
db->block_cache.erase(blk_offset);
|
|
|
|
@ -1382,6 +1499,8 @@ void kv_op_t::create_root()
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
db->stop_writing_new(blk->offset);
|
|
|
|
|
db->confirm_allocation_block(blk->offset);
|
|
|
|
|
db->stop_updating(blk);
|
|
|
|
|
finish(res);
|
|
|
|
|
}
|
|
|
|
@ -1392,7 +1511,8 @@ void kv_op_t::resume_split()
|
|
|
|
|
{
|
|
|
|
|
// We hit a block which we started to split, but didn't finish splitting
|
|
|
|
|
// Generally it shouldn't happen, but it MAY happen if a K/V client dies
|
|
|
|
|
// In this case we want to finish the split and retry update from the beginning
|
|
|
|
|
// Also we may hit such blocks during concurrent updates
|
|
|
|
|
// In these cases we want to finish the split and retry update from the beginning
|
|
|
|
|
if (path.size() == 1)
|
|
|
|
|
{
|
|
|
|
|
// It shouldn't be the root block because we don't split it via INT_SPLIT/LEAF_SPLIT
|
|
|
|
@ -1417,7 +1537,7 @@ void kv_op_t::resume_split()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key,
|
|
|
|
|
const std::string & value, std::function<void(int)> cb)
|
|
|
|
|
const std::string & arg_value, std::function<void(int)> cb)
|
|
|
|
|
{
|
|
|
|
|
auto blk_it = db->block_cache.find(path[path_pos].offset);
|
|
|
|
|
if (blk_it == db->block_cache.end())
|
|
|
|
@ -1432,10 +1552,10 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
|
|
|
|
|
if (blk->updating)
|
|
|
|
|
{
|
|
|
|
|
// Wait if block is being modified
|
|
|
|
|
db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, cb); });
|
|
|
|
|
db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, arg_value, cb); });
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (db->known_versions[blk->offset/db->ino_block_size] != block_ver)
|
|
|
|
|
if (db->known_versions[blk->offset/db->ino_block_size] != block_ver || blk->invalidated)
|
|
|
|
|
{
|
|
|
|
|
// Recheck if block was modified in the meantime
|
|
|
|
|
db->run_continue_update(blk->offset);
|
|
|
|
@ -1444,6 +1564,18 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
|
|
|
|
|
}
|
|
|
|
|
uint32_t rm_size = 0;
|
|
|
|
|
auto d_it = blk->data.find(key);
|
|
|
|
|
if (cas_cb && path_pos == path.size()-1 && !cas_cb(d_it != blk->data.end() ? 0 : -ENOENT, d_it != blk->data.end() ? d_it->second : ""))
|
|
|
|
|
{
|
|
|
|
|
// CAS failure
|
|
|
|
|
db->run_continue_update(blk->offset);
|
|
|
|
|
cb(-EAGAIN);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (path_pos == path.size()-1)
|
|
|
|
|
{
|
|
|
|
|
is_delete = op->opcode == KV_DEL;
|
|
|
|
|
}
|
|
|
|
|
const std::string & value = path_pos == path.size()-1 ? this->value : arg_value;
|
|
|
|
|
if (d_it != blk->data.end())
|
|
|
|
|
{
|
|
|
|
|
if (!is_delete && d_it->second == value)
|
|
|
|
@ -1462,13 +1594,6 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
|
|
|
|
|
cb(0);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (cas_cb && path_pos == path.size()-1 && !cas_cb(d_it != blk->data.end() ? 0 : -ENOENT, d_it != blk->data.end() ? d_it->second : ""))
|
|
|
|
|
{
|
|
|
|
|
// CAS failure
|
|
|
|
|
db->run_continue_update(blk->offset);
|
|
|
|
|
cb(-EAGAIN);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// This condition means this is a block split during previous updates
|
|
|
|
|
// Its parent is already updated because prev_key_lt <= blk->right_half
|
|
|
|
|
// That means we can safely remove the "split reference"
|
|
|
|
@ -1483,7 +1608,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
|
|
|
|
|
{
|
|
|
|
|
blk->dump(0);
|
|
|
|
|
// Should not happen - we should have resumed the split
|
|
|
|
|
fprintf(stderr, "K/V: attempt to write into a block %lu instead of resuming the split (got here from %s..%s)\n",
|
|
|
|
|
fprintf(stderr, "K/V: attempt to write into block %lu instead of resuming the split (got here from %s..%s)\n",
|
|
|
|
|
blk->offset, prev_key_ge.c_str(), prev_key_lt.c_str());
|
|
|
|
|
abort();
|
|
|
|
|
}
|
|
|
|
@ -1493,6 +1618,13 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
|
|
|
|
|
{
|
|
|
|
|
// New item fits.
|
|
|
|
|
// No need to split the block => just modify and write it
|
|
|
|
|
if ((blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) && key >= blk->right_half)
|
|
|
|
|
{
|
|
|
|
|
fprintf(stderr, "K/V: attempt to modify %s in unrelated split block %lu [%s..%s..%s)\n",
|
|
|
|
|
key.c_str(), blk->offset, blk->key_ge.c_str(), blk->right_half.c_str(), blk->key_lt.c_str());
|
|
|
|
|
blk->dump(db->base_block_level);
|
|
|
|
|
abort();
|
|
|
|
|
}
|
|
|
|
|
if (is_delete)
|
|
|
|
|
{
|
|
|
|
|
blk->change_type |= KV_CH_DEL;
|
|
|
|
@ -1679,15 +1811,15 @@ void kv_op_t::next()
|
|
|
|
|
{
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated)
|
|
|
|
|
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, int refresh)
|
|
|
|
|
{
|
|
|
|
|
next_handle_block(res, updated);
|
|
|
|
|
next_handle_block(res, refresh);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void kv_op_t::next_handle_block(int res, bool updated)
|
|
|
|
|
void kv_op_t::next_handle_block(int res, int refresh)
|
|
|
|
|
{
|
|
|
|
|
res = handle_block(res, updated, false);
|
|
|
|
|
res = handle_block(res, refresh, false);
|
|
|
|
|
if (res == -EAGAIN)
|
|
|
|
|
{
|
|
|
|
|
next();
|
|
|
|
@ -1730,7 +1862,7 @@ void kv_op_t::next_get()
|
|
|
|
|
this->key = kv_it->first;
|
|
|
|
|
this->value = kv_it->second;
|
|
|
|
|
skip_equal = true;
|
|
|
|
|
callback(this);
|
|
|
|
|
(std::function<void(kv_op_t *)>(callback))(this);
|
|
|
|
|
}
|
|
|
|
|
// Find next block
|
|
|
|
|
else if (blk->type == KV_LEAF_SPLIT)
|
|
|
|
@ -1880,6 +2012,32 @@ void kv_dbw_t::del(const std::string & key, std::function<void(int res)> cb,
|
|
|
|
|
op->exec();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void kv_dbw_t::update(const std::string & key, std::function<int(int res, const std::string & old_value, std::string & new_value)> cas_cb,
|
|
|
|
|
std::function<void(int res)> cb)
|
|
|
|
|
{
|
|
|
|
|
auto *op = new kv_op_t;
|
|
|
|
|
op->db = db;
|
|
|
|
|
op->opcode = KV_SET;
|
|
|
|
|
op->key = key;
|
|
|
|
|
op->cas_cb = [cas_cb, op](int res, const std::string & old_value)
|
|
|
|
|
{
|
|
|
|
|
int action = cas_cb(res, old_value, op->value);
|
|
|
|
|
if (action == 1)
|
|
|
|
|
op->opcode = KV_SET;
|
|
|
|
|
else if (action == 2 && res != -ENOENT)
|
|
|
|
|
op->opcode = KV_DEL;
|
|
|
|
|
else
|
|
|
|
|
return false;
|
|
|
|
|
return true;
|
|
|
|
|
};
|
|
|
|
|
op->callback = [cb](kv_op_t *op)
|
|
|
|
|
{
|
|
|
|
|
cb(op->res);
|
|
|
|
|
delete op;
|
|
|
|
|
};
|
|
|
|
|
op->exec();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void* kv_dbw_t::list_start(const std::string & start)
|
|
|
|
|
{
|
|
|
|
|
auto *op = new kv_op_t;
|
|
|
|
|