Compare commits

...

10 Commits

Author SHA1 Message Date
Vitaliy Filippov a9ef9a86c0 Add update() API to kv_db
Test / test_snapshot_ec (push) Has been skipped Details
Test / test_minsize_1 (push) Has been skipped Details
Test / test_move_reappear (push) Has been skipped Details
Test / test_rm (push) Has been skipped Details
Test / test_snapshot_chain (push) Has been skipped Details
Test / test_snapshot_chain_ec (push) Has been skipped Details
Test / test_snapshot_down (push) Has been skipped Details
Test / test_snapshot_down_ec (push) Has been skipped Details
Test / test_splitbrain (push) Has been skipped Details
Test / test_rebalance_verify (push) Has been skipped Details
Test / test_rebalance_verify_imm (push) Has been skipped Details
Test / test_rebalance_verify_ec (push) Has been skipped Details
Test / test_rebalance_verify_ec_imm (push) Has been skipped Details
Test / test_write (push) Has been skipped Details
Test / test_write_xor (push) Has been skipped Details
Test / test_write_no_same (push) Has been skipped Details
Test / test_heal_pg_size_2 (push) Has been skipped Details
Test / test_heal_ec (push) Has been skipped Details
Test / test_heal_csum_32k_dmj (push) Has been skipped Details
Test / test_heal_csum_32k_dj (push) Has been skipped Details
Test / test_heal_csum_32k (push) Has been skipped Details
Test / test_heal_csum_4k_dmj (push) Has been skipped Details
Test / test_heal_csum_4k_dj (push) Has been skipped Details
Test / test_heal_csum_4k (push) Has been skipped Details
Test / test_scrub (push) Has been skipped Details
Test / test_scrub_zero_osd_2 (push) Has been skipped Details
Test / test_scrub_xor (push) Has been skipped Details
Test / test_scrub_pg_size_3 (push) Has been skipped Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Has been skipped Details
Test / test_scrub_ec (push) Has been skipped Details
2024-01-06 17:14:42 +03:00
Vitaliy Filippov 25832cb7e4 Fix eviction when random_pos selects the end
Test / test_scrub (push) Blocked by required conditions Details
Test / test_scrub_zero_osd_2 (push) Blocked by required conditions Details
Test / test_scrub_xor (push) Blocked by required conditions Details
Test / test_scrub_pg_size_3 (push) Blocked by required conditions Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Blocked by required conditions Details
Test / test_scrub_ec (push) Blocked by required conditions Details
Test / buildenv (push) Has been cancelled Details
Test / build (push) Has been cancelled Details
Test / make_test (push) Has been cancelled Details
Test / test_add_osd (push) Has been cancelled Details
Test / test_cas (push) Has been cancelled Details
Test / test_change_pg_count (push) Has been cancelled Details
Test / test_change_pg_count_ec (push) Has been cancelled Details
Test / test_change_pg_size (push) Has been cancelled Details
Test / test_create_nomaxid (push) Has been cancelled Details
Test / test_etcd_fail (push) Has been cancelled Details
Test / test_interrupted_rebalance (push) Has been cancelled Details
Test / test_interrupted_rebalance_imm (push) Has been cancelled Details
Test / test_interrupted_rebalance_ec (push) Has been cancelled Details
Test / test_interrupted_rebalance_ec_imm (push) Has been cancelled Details
Test / test_failure_domain (push) Has been cancelled Details
Test / test_snapshot (push) Has been cancelled Details
Test / test_snapshot_ec (push) Has been cancelled Details
Test / test_minsize_1 (push) Has been cancelled Details
Test / test_move_reappear (push) Has been cancelled Details
Test / test_rm (push) Has been cancelled Details
Test / test_snapshot_chain (push) Has been cancelled Details
Test / test_snapshot_chain_ec (push) Has been cancelled Details
Test / test_snapshot_down (push) Has been cancelled Details
Test / test_snapshot_down_ec (push) Has been cancelled Details
2024-01-02 13:24:15 +03:00
Vitaliy Filippov e6326c6539 Implement min/max list_count to make listings during performance test reasonable 2024-01-02 13:24:15 +03:00
Vitaliy Filippov e32f382815 Fix and improve parallel allocation
- Do not try to allocate more DB blocks in an inode block until it's "confirmed" and "locked" by the first write
- Do not recheck for new zero DB blocks on first write into an inode block - a CAS failure means someone else is already writing into it
- Throw new allocation blocks away regardless of whether the known_version is 0 on a CAS failure
2024-01-02 13:24:15 +03:00
Vitaliy Filippov fb23d94000 Implement key_prefix for K/V stress test 2024-01-02 13:24:15 +03:00
Vitaliy Filippov ee462c2dad More fixes
- do not overwrite a block with older version if known version is newer
  (read may start before update and end after update)
- invalidated block versions can't be remembered and trusted
- right boundary for split blocks is right_half when diving down, not key_lt
- restart update also when block is "invalidated", not just on version mismatch
- copy callback in listings to avoid closure destruction bugs too
2024-01-02 13:24:15 +03:00
Vitaliy Filippov 16e4c767f1 Add logging and one more assert 2024-01-02 13:24:15 +03:00
Vitaliy Filippov 9e2b677499 Make get_block() wait for updating when unrelated block is found along the path 2024-01-02 13:24:15 +03:00
Vitaliy Filippov fd57096d2d Fix a race condition where changed blocks were parsed over existing cached blocks and getting a mix of data 2024-01-02 13:24:15 +03:00
Vitaliy Filippov e5ae907256 Simplify code by removing an unneeded "optimisation" 2024-01-02 13:24:15 +03:00
3 changed files with 307 additions and 116 deletions

View File

@ -37,6 +37,7 @@
#define KV_RECHECK_NONE 0 #define KV_RECHECK_NONE 0
#define KV_RECHECK_LEAF 1 #define KV_RECHECK_LEAF 1
#define KV_RECHECK_ALL 2 #define KV_RECHECK_ALL 2
#define KV_RECHECK_WAIT 3
#define KV_CH_ADD 1 #define KV_CH_ADD 1
#define KV_CH_DEL 2 #define KV_CH_DEL 2
@ -48,6 +49,10 @@
#define LEVEL_BITS 8 #define LEVEL_BITS 8
#define NO_LEVEL_MASK (((uint64_t)1 << (64-LEVEL_BITS)) - 1) #define NO_LEVEL_MASK (((uint64_t)1 << (64-LEVEL_BITS)) - 1)
#define BLK_NOCHANGE 0
#define BLK_RELOADED 1
#define BLK_UPDATING 2
struct __attribute__((__packed__)) kv_stored_block_t struct __attribute__((__packed__)) kv_stored_block_t
{ {
uint64_t magic; uint64_t magic;
@ -118,6 +123,13 @@ struct kv_continue_write_t
std::function<void(int)> cb; std::function<void(int)> cb;
}; };
struct kv_alloc_block_t
{
uint64_t offset;
bool writing;
bool confirmed;
};
struct kv_db_t struct kv_db_t
{ {
cluster_client_t *cli = NULL; cluster_client_t *cli = NULL;
@ -132,7 +144,7 @@ struct kv_db_t
uint64_t evict_unused_age = 1000; uint64_t evict_unused_age = 1000;
uint64_t evict_max_misses = 10; uint64_t evict_max_misses = 10;
uint64_t evict_attempts_per_level = 3; uint64_t evict_attempts_per_level = 3;
uint64_t allocate_blocks = 4; uint64_t max_allocate_blocks = 4;
uint64_t log_level = 1; uint64_t log_level = 1;
// state // state
@ -141,7 +153,7 @@ struct kv_db_t
int base_block_level = 0; int base_block_level = 0;
int usage_counter = 1; int usage_counter = 1;
int allocating_block_pos = 0; int allocating_block_pos = 0;
std::vector<uint64_t> allocating_blocks; std::vector<kv_alloc_block_t> allocating_blocks;
std::set<uint64_t> block_levels; std::set<uint64_t> block_levels;
std::map<uint64_t, kv_block_t> block_cache; std::map<uint64_t, kv_block_t> block_cache;
std::map<uint64_t, uint64_t> known_versions; std::map<uint64_t, uint64_t> known_versions;
@ -155,6 +167,8 @@ struct kv_db_t
uint64_t alloc_block(); uint64_t alloc_block();
void clear_allocation_block(uint64_t offset); void clear_allocation_block(uint64_t offset);
void confirm_allocation_block(uint64_t offset);
void stop_writing_new(uint64_t offset);
void open(inode_t inode_id, json11::Json cfg, std::function<void(int)> cb); void open(inode_t inode_id, json11::Json cfg, std::function<void(int)> cb);
void set_config(json11::Json cfg); void set_config(json11::Json cfg);
@ -190,13 +204,13 @@ protected:
std::string prev_key_ge, prev_key_lt; std::string prev_key_ge, prev_key_lt;
int cur_level = 0; int cur_level = 0;
std::vector<kv_path_t> path; std::vector<kv_path_t> path;
bool updated = false; int updating_on_path = 0;
int retry = 0; int retry = 0;
bool skip_equal = false; bool skip_equal = false;
void finish(int res); void finish(int res);
void get(); void get();
int handle_block(int res, bool updated, bool stop_on_split); int handle_block(int res, int refresh, bool stop_on_split);
void update(); void update();
void update_find(); void update_find();
@ -204,7 +218,7 @@ protected:
void resume_split(); void resume_split();
void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function<void(int)> cb); void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function<void(int)> cb);
void next_handle_block(int res, bool updated); void next_handle_block(int res, int refresh);
void next_get(); void next_get();
void next_go_up(); void next_go_up();
}; };
@ -245,6 +259,7 @@ int kv_block_t::parse(uint64_t offset, uint8_t *data, int size)
fprintf(stderr, "K/V: Invalid block %lu magic, size, type or item count\n", offset); fprintf(stderr, "K/V: Invalid block %lu magic, size, type or item count\n", offset);
return -EILSEQ; return -EILSEQ;
} }
assert(!this->type);
this->type = blk->type; this->type = blk->type;
int pos = blk->data - data; int pos = blk->data - data;
this->key_ge = read_string(data, size, &pos); this->key_ge = read_string(data, size, &pos);
@ -533,7 +548,7 @@ void kv_db_t::set_config(json11::Json cfg)
this->evict_attempts_per_level = cfg["kv_evict_attempts_per_level"].is_null() ? 3 : cfg["kv_evict_attempts_per_level"].uint64_value(); this->evict_attempts_per_level = cfg["kv_evict_attempts_per_level"].is_null() ? 3 : cfg["kv_evict_attempts_per_level"].uint64_value();
this->evict_unused_age = cfg["kv_evict_unused_age"].is_null() ? 1000 : cfg["kv_evict_unused_age"].uint64_value(); this->evict_unused_age = cfg["kv_evict_unused_age"].is_null() ? 1000 : cfg["kv_evict_unused_age"].uint64_value();
this->cache_max_blocks = this->memory_limit / this->kv_block_size; this->cache_max_blocks = this->memory_limit / this->kv_block_size;
this->allocate_blocks = cfg["kv_allocate_blocks"].uint64_value() ? cfg["kv_allocate_blocks"].uint64_value() : 4; this->max_allocate_blocks = cfg["kv_allocate_blocks"].uint64_value() ? cfg["kv_allocate_blocks"].uint64_value() : 4;
this->log_level = !cfg["kv_log_level"].is_null() ? cfg["kv_log_level"].uint64_value() : 1; this->log_level = !cfg["kv_log_level"].is_null() ? cfg["kv_log_level"].uint64_value() : 1;
} }
@ -561,50 +576,98 @@ void kv_db_t::close(std::function<void()> cb)
uint64_t kv_db_t::alloc_block() uint64_t kv_db_t::alloc_block()
{ {
// round robin between <allocate_blocks> blocks // select from at least <max_allocate_blocks> allocation blocks
while (allocating_blocks.size() < allocate_blocks) while (allocating_blocks.size() < max_allocate_blocks)
{ {
allocating_blocks.push_back(UINT64_MAX); allocating_blocks.push_back({ .offset = UINT64_MAX });
} }
if (allocating_blocks[allocating_block_pos] == UINT64_MAX) bool found = false;
for (int i = 0; i < allocating_blocks.size(); i++)
{ {
int next = (allocating_block_pos+i) % allocating_blocks.size();
if (allocating_blocks[allocating_block_pos].offset != UINT64_MAX &&
allocating_blocks[next].confirmed && !allocating_blocks[next].writing)
{
allocating_block_pos = next;
found = true;
break;
}
}
if (!found)
{
// Allow to allocate more new blocks in parallel than <max_allocate_blocks>
allocating_blocks.push_back({ .offset = UINT64_MAX });
allocating_block_pos = allocating_blocks.size()-1;
}
if (allocating_blocks[allocating_block_pos].offset == UINT64_MAX)
{
// allocate new blocks in the end
auto known_max = known_versions.end();
while (known_max != known_versions.begin())
{
known_max--;
// try v0 and only v0 of a new inode block // try v0 and only v0 of a new inode block
while (known_versions[next_free/ino_block_size] != 0) if (known_max->second != 0)
{ {
auto probably_unused = (known_max->first+1)*ino_block_size;
if (next_free < probably_unused)
next_free = probably_unused;
break;
}
}
allocating_blocks[allocating_block_pos] = {
.offset = next_free,
.writing = false,
.confirmed = false,
};
next_free += ino_block_size; next_free += ino_block_size;
} }
allocating_blocks[allocating_block_pos] = next_free; auto pos = allocating_blocks[allocating_block_pos].offset;
next_free += ino_block_size; allocating_blocks[allocating_block_pos].writing = true;
} allocating_blocks[allocating_block_pos].offset += kv_block_size;
auto pos = allocating_blocks[allocating_block_pos]; if (!(allocating_blocks[allocating_block_pos].offset % ino_block_size))
allocating_blocks[allocating_block_pos] += kv_block_size;
if (!(allocating_blocks[allocating_block_pos] % ino_block_size))
{ {
// Allow online reconfiguration // Allow to reconfigure <max_allocate_blocks> online
if (allocating_blocks.size() > allocate_blocks) if (allocating_blocks.size() > max_allocate_blocks)
allocating_blocks.erase(allocating_blocks.begin()+allocating_block_pos, allocating_blocks.begin()+allocating_block_pos+1); allocating_blocks.erase(allocating_blocks.begin()+allocating_block_pos, allocating_blocks.begin()+allocating_block_pos+1);
else else
allocating_blocks[allocating_block_pos] = UINT64_MAX; allocating_blocks[allocating_block_pos].offset = UINT64_MAX;
} }
allocating_block_pos = (allocating_block_pos+1) % allocating_blocks.size(); assert(block_cache.find(pos) == block_cache.end());
return pos; return pos;
} }
void kv_db_t::clear_allocation_block(uint64_t offset) void kv_db_t::clear_allocation_block(uint64_t offset)
{ {
// We want to be first when writing the first KV block into a new inode block for (int i = 0; i < allocating_blocks.size(); i++)
// After it other writers may modify already placed blocks and the version {
// will increase, but they won't use free space in it for new blocks if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size)
if (known_versions[offset/ino_block_size] == 0) {
allocating_blocks[i].offset = UINT64_MAX;
}
}
}
void kv_db_t::confirm_allocation_block(uint64_t offset)
{ {
for (int i = 0; i < allocating_blocks.size(); i++) for (int i = 0; i < allocating_blocks.size(); i++)
{ {
if (allocating_blocks[i]/ino_block_size == offset/ino_block_size) if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size)
{ {
allocating_blocks[i] = UINT64_MAX; allocating_blocks[i].confirmed = true;
} }
} }
} }
void kv_db_t::stop_writing_new(uint64_t offset)
{
for (int i = 0; i < allocating_blocks.size(); i++)
{
if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size)
{
allocating_blocks[i].writing = false;
}
}
} }
static bool is_zero(void *buf, int size) static bool is_zero(void *buf, int size)
@ -703,8 +766,11 @@ static void add_block_level(kv_db_t *db, kv_block_t *blk)
static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version) static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version)
{ {
if (db->known_versions[offset/db->ino_block_size] < version) if (db->known_versions[offset/db->ino_block_size] < version)
{
if (db->known_versions[offset/db->ino_block_size] == 0)
{ {
db->clear_allocation_block(offset); db->clear_allocation_block(offset);
}
auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size); auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size);
while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size) while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size)
{ {
@ -764,9 +830,17 @@ static void try_evict(kv_db_t *db)
int misses = 0; int misses = 0;
bool wrapped = false; bool wrapped = false;
while (db->block_cache.size() > db->cache_max_blocks && while (db->block_cache.size() > db->cache_max_blocks &&
(!wrapped || *random_it < random_pos) &&
(db->evict_max_misses <= 0 || misses < db->evict_max_misses)) (db->evict_max_misses <= 0 || misses < db->evict_max_misses))
{ {
if (random_it == db->block_levels.end() || (*random_it >> (64-LEVEL_BITS)) > evict_level)
{
if (wrapped)
break;
random_it = db->block_levels.lower_bound(evict_level << (64-LEVEL_BITS));
wrapped = true;
}
else if (wrapped && *random_it >= random_pos)
break;
auto b_it = db->block_cache.find((*random_it & NO_LEVEL_MASK) * db->kv_block_size); auto b_it = db->block_cache.find((*random_it & NO_LEVEL_MASK) * db->kv_block_size);
auto blk = &b_it->second; auto blk = &b_it->second;
if (b_it != db->block_cache.end() && !blk->updating && blk->usage < db->usage_counter) if (b_it != db->block_cache.end() && !blk->updating && blk->usage < db->usage_counter)
@ -779,13 +853,6 @@ static void try_evict(kv_db_t *db)
random_it++; random_it++;
misses++; misses++;
} }
if (random_it == db->block_levels.end() || (*random_it >> (64-LEVEL_BITS)) > evict_level)
{
if (wrapped)
break;
random_it = db->block_levels.lower_bound(evict_level << (64-LEVEL_BITS));
wrapped = true;
}
} }
if (db->block_cache.size() <= db->cache_max_blocks) if (db->block_cache.size() <= db->cache_max_blocks)
{ {
@ -795,16 +862,27 @@ static void try_evict(kv_db_t *db)
} }
} }
static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, bool)> cb) static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, int)> cb)
{ {
auto b_it = db->block_cache.find(offset); auto b_it = db->block_cache.find(offset);
if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE && !b_it->second.invalidated || if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE && !b_it->second.invalidated ||
recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF && !b_it->second.invalidated || recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF && !b_it->second.invalidated ||
b_it->second.updating)) b_it->second.updating > 0))
{ {
auto blk = &b_it->second;
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
{
// Wait until block update stops
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
{
get_block(db, offset, cur_level, recheck_policy, cb);
db->run_continue_update(blk_offset);
});
return;
}
// Block already in cache, we can proceed // Block already in cache, we can proceed
b_it->second.usage = db->usage_counter; blk->usage = db->usage_counter;
cb(0, false); cb(0, BLK_UPDATING);
return; return;
} }
cluster_op_t *op = new cluster_op_t; cluster_op_t *op = new cluster_op_t;
@ -818,34 +896,51 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
if (op->retval != op->len) if (op->retval != op->len)
{ {
// error // error
cb(op->retval >= 0 ? -EIO : op->retval, false); cb(op->retval >= 0 ? -EIO : op->retval, BLK_NOCHANGE);
return; return;
} }
if (db->block_cache.find(op->offset) != db->block_cache.end() && invalidate(db, op->offset, op->version);
db->known_versions[op->offset/db->ino_block_size] == op->version) auto blk_it = db->block_cache.find(op->offset);
if (blk_it != db->block_cache.end() &&
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
(db->known_versions[op->offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
{ {
auto blk = &db->block_cache.at(op->offset); auto blk = &db->block_cache.at(op->offset);
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
{
// Wait until block update stops
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
{
get_block(db, offset, cur_level, recheck_policy, cb);
db->run_continue_update(blk_offset);
});
return;
}
blk->usage = db->usage_counter; blk->usage = db->usage_counter;
cb(0, false); cb(0, blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
} }
else else
{ {
invalidate(db, op->offset, op->version);
try_evict(db);
auto blk = &db->block_cache[op->offset]; auto blk = &db->block_cache[op->offset];
if (blk_it != db->block_cache.end())
{
del_block_level(db, blk);
*blk = {};
}
int err = blk->parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len); int err = blk->parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len);
if (err == 0) if (err == 0)
{ {
blk->level = cur_level; blk->level = cur_level;
blk->usage = db->usage_counter; blk->usage = db->usage_counter;
add_block_level(db, blk); add_block_level(db, blk);
cb(0, true); cb(0, BLK_RELOADED);
} }
else else
{ {
db->block_cache.erase(op->offset); db->block_cache.erase(op->offset);
cb(err, false); cb(err, BLK_NOCHANGE);
} }
try_evict(db);
} }
free(op->iov.buf[0].iov_base); free(op->iov.buf[0].iov_base);
delete op; delete op;
@ -900,9 +995,9 @@ void kv_op_t::finish(int res)
void kv_op_t::get() void kv_op_t::get()
{ {
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated) get_block(db, cur_block, cur_level, recheck_policy, [=](int res, int refresh)
{ {
res = handle_block(res, updated, false); res = handle_block(res, refresh, false);
if (res == -EAGAIN) if (res == -EAGAIN)
{ {
get(); get();
@ -939,13 +1034,13 @@ void kv_op_t::get()
}); });
} }
int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) int kv_op_t::handle_block(int res, int refresh, bool stop_on_split)
{ {
if (res < 0) if (res < 0)
{ {
return res; return res;
} }
this->updated = this->updated || updated; this->updating_on_path = this->updating_on_path | refresh;
auto blk = &db->block_cache.at(cur_block); auto blk = &db->block_cache.at(cur_block);
if (opcode != KV_GET && opcode != KV_GET_CACHED) if (opcode != KV_GET && opcode != KV_GET_CACHED)
{ {
@ -953,7 +1048,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
// and recheck parent blocks if their versions change. This is required // and recheck parent blocks if their versions change. This is required
// because we have to handle parallel splits of parent blocks correctly // because we have to handle parallel splits of parent blocks correctly
assert(path.size() > 0); assert(path.size() > 0);
path[path.size()-1].version = db->known_versions[cur_block/db->ino_block_size]; path[path.size()-1].version = blk->invalidated ? 0 : db->known_versions[cur_block/db->ino_block_size];
} }
if (key < blk->key_ge || blk->key_lt.size() && key >= blk->key_lt) if (key < blk->key_ge || blk->key_lt.size() && key >= blk->key_lt)
{ {
@ -966,16 +1061,26 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
// We may read P on step (1), get a link to A, and read A on step (4). // We may read P on step (1), get a link to A, and read A on step (4).
// It will miss data from [c, b). // It will miss data from [c, b).
// Retry once. If we don't see any updates after retrying - fail with EILSEQ. // Retry once. If we don't see any updates after retrying - fail with EILSEQ.
bool fatal = !this->updated && this->retry > 0; bool fatal = !this->updating_on_path && this->retry > 0;
if (fatal || db->log_level > 0) if (fatal || db->log_level > 0)
{ {
fprintf(stderr, "K/V: %sgot unrelated block %lu: key=%s range=[%s, %s) from=[%s, %s)\n", fprintf(stderr, "K/V: %sgot unrelated block %lu: key=%s range=[%s, %s) from=[%s, %s)\n",
fatal ? "Error: " : "Warning: read/update collision: ", fatal ? "Error: " : "Warning: read/update collision: ",
cur_block, key.c_str(), blk->key_ge.c_str(), blk->key_lt.c_str(), prev_key_ge.c_str(), prev_key_lt.c_str()); cur_block, key.c_str(), blk->key_ge.c_str(), blk->key_lt.c_str(), prev_key_ge.c_str(), prev_key_lt.c_str());
} if (fatal)
if (this->updated)
{ {
this->updated = false; blk->dump(db->base_block_level);
}
}
this->recheck_policy = KV_RECHECK_ALL;
if (this->updating_on_path)
{
if (this->updating_on_path & BLK_UPDATING)
{
// Wait for "updating" blocks on next run
this->recheck_policy = KV_RECHECK_WAIT;
}
this->updating_on_path = 0;
this->retry = 0; this->retry = 0;
} }
else if (this->retry > 0) else if (this->retry > 0)
@ -984,7 +1089,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
} }
else else
{ {
this->updated = false; this->updating_on_path = 0;
this->retry++; this->retry++;
} }
prev_key_ge = prev_key_lt = ""; prev_key_ge = prev_key_lt = "";
@ -995,7 +1100,6 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
path.clear(); path.clear();
path.push_back((kv_path_t){ .offset = 0 }); path.push_back((kv_path_t){ .offset = 0 });
} }
this->recheck_policy = KV_RECHECK_ALL;
return -EAGAIN; return -EAGAIN;
} }
if (stop_on_split && (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) && if (stop_on_split && (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) &&
@ -1028,7 +1132,9 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
fprintf(stderr, "K/V: Internal block %lu misses boundary for %s\n", cur_block, key.c_str()); fprintf(stderr, "K/V: Internal block %lu misses boundary for %s\n", cur_block, key.c_str());
return -EILSEQ; return -EILSEQ;
} }
auto m = child_it == blk->data.end() ? blk->key_lt : child_it->first; auto m = child_it == blk->data.end()
? (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT
? blk->right_half : blk->key_lt) : child_it->first;
child_it--; child_it--;
if (child_it->second.size() != sizeof(uint64_t)) if (child_it->second.size() != sizeof(uint64_t))
{ {
@ -1184,14 +1290,41 @@ static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, const std:
return blk; return blk;
} }
static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb);
static void place_again(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb)
{
auto old_offset = blk->offset;
auto new_offset = db->alloc_block();
del_block_level(db, blk);
std::swap(db->block_cache[new_offset], db->block_cache[old_offset]);
db->block_cache.erase(old_offset);
auto new_blk = &db->block_cache[new_offset];
new_blk->offset = new_offset;
new_blk->invalidated = false;
add_block_level(db, new_blk);
write_new_block(db, new_blk, cb);
}
static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb) static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int, kv_block_t *)> cb)
{ {
write_block(db, blk, [=](int res) write_block(db, blk, [=](int res)
{ {
db->stop_writing_new(blk->offset);
if (res == -EINTR) if (res == -EINTR)
{ {
// CAS failure => re-read, then, if not zero, find position again and retry // CAS failure => re-read, then, if not zero, find position again and retry
if (!(blk->offset % db->ino_block_size))
{
// Any failed write of the first block within an inode block
// means that someone else is already allocating blocks in it,
// so we HAVE to move to the next inode block immediately
db->clear_allocation_block(blk->offset); db->clear_allocation_block(blk->offset);
place_again(db, blk, cb);
return;
}
// On the other hand, if the block is already "ours", then live parts
// of it may change and we MAY recheck if the block is still zero on CAS failure
cluster_op_t *op = new cluster_op_t; cluster_op_t *op = new cluster_op_t;
op->opcode = OSD_OP_READ; op->opcode = OSD_OP_READ;
op->inode = db->inode_id; op->inode = db->inode_id;
@ -1218,16 +1351,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
else else
{ {
// Block is already occupied => place again // Block is already occupied => place again
auto old_offset = blk->offset; place_again(db, blk, cb);
auto new_offset = db->alloc_block();
del_block_level(db, blk);
std::swap(db->block_cache[new_offset], db->block_cache[old_offset]);
db->block_cache.erase(old_offset);
auto new_blk = &db->block_cache[new_offset];
new_blk->offset = new_offset;
new_blk->invalidated = false;
add_block_level(db, new_blk);
write_new_block(db, new_blk, cb);
} }
}; };
db->cli->execute(op); db->cli->execute(op);
@ -1235,6 +1359,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
else if (res != 0) else if (res != 0)
{ {
// Other failure => free the new unreferenced block and die // Other failure => free the new unreferenced block and die
db->clear_allocation_block(blk->offset);
del_block_level(db, blk); del_block_level(db, blk);
db->block_cache.erase(blk->offset); db->block_cache.erase(blk->offset);
cb(res > 0 ? -EIO : res, NULL); cb(res > 0 ? -EIO : res, NULL);
@ -1242,6 +1367,12 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
else else
{ {
// OK // OK
if (!(blk->offset % db->ino_block_size))
{
// A successful first write into a new allocation block
// confirms that it's now "locked" by us
db->confirm_allocation_block(blk->offset);
}
cb(0, blk); cb(0, blk);
} }
}); });
@ -1293,28 +1424,15 @@ void kv_op_t::update()
void kv_op_t::update_find() void kv_op_t::update_find()
{ {
auto blk_it = db->block_cache.find(cur_block); get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, int refresh)
if (blk_it != db->block_cache.end())
{ {
auto blk = &blk_it->second; res = handle_block(res, refresh, true);
if (blk->updating)
{
// Optimisation: do not recheck the block if it's already being modified
db->continue_update.emplace(blk->offset, [=]() { update_find(); });
return;
}
}
get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, bool updated)
{
res = handle_block(res, updated, true);
if (res == -EAGAIN) if (res == -EAGAIN)
{ {
db->run_continue_update(checked_block);
update_find(); update_find();
} }
else if (res == -ENOTBLK) else if (res == -ENOTBLK)
{ {
db->run_continue_update(checked_block);
if (opcode == KV_SET) if (opcode == KV_SET)
{ {
// Check CAS callback // Check CAS callback
@ -1330,12 +1448,10 @@ void kv_op_t::update_find()
} }
else if (res == -ECHILD) else if (res == -ECHILD)
{ {
db->run_continue_update(checked_block);
resume_split(); resume_split();
} }
else if (res < 0) else if (res < 0)
{ {
db->run_continue_update(checked_block);
finish(res); finish(res);
} }
else else
@ -1374,6 +1490,7 @@ void kv_op_t::create_root()
{ {
if (res == -EINTR) if (res == -EINTR)
{ {
db->clear_allocation_block(blk->offset);
auto blk_offset = blk->offset; auto blk_offset = blk->offset;
del_block_level(db, blk); del_block_level(db, blk);
db->block_cache.erase(blk_offset); db->block_cache.erase(blk_offset);
@ -1382,6 +1499,8 @@ void kv_op_t::create_root()
} }
else else
{ {
db->stop_writing_new(blk->offset);
db->confirm_allocation_block(blk->offset);
db->stop_updating(blk); db->stop_updating(blk);
finish(res); finish(res);
} }
@ -1392,7 +1511,8 @@ void kv_op_t::resume_split()
{ {
// We hit a block which we started to split, but didn't finish splitting // We hit a block which we started to split, but didn't finish splitting
// Generally it shouldn't happen, but it MAY happen if a K/V client dies // Generally it shouldn't happen, but it MAY happen if a K/V client dies
// In this case we want to finish the split and retry update from the beginning // Also we may hit such blocks during concurrent updates
// In these cases we want to finish the split and retry update from the beginning
if (path.size() == 1) if (path.size() == 1)
{ {
// It shouldn't be the root block because we don't split it via INT_SPLIT/LEAF_SPLIT // It shouldn't be the root block because we don't split it via INT_SPLIT/LEAF_SPLIT
@ -1417,7 +1537,7 @@ void kv_op_t::resume_split()
} }
void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key, void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key,
const std::string & value, std::function<void(int)> cb) const std::string & arg_value, std::function<void(int)> cb)
{ {
auto blk_it = db->block_cache.find(path[path_pos].offset); auto blk_it = db->block_cache.find(path[path_pos].offset);
if (blk_it == db->block_cache.end()) if (blk_it == db->block_cache.end())
@ -1432,10 +1552,10 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
if (blk->updating) if (blk->updating)
{ {
// Wait if block is being modified // Wait if block is being modified
db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, cb); }); db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, arg_value, cb); });
return; return;
} }
if (db->known_versions[blk->offset/db->ino_block_size] != block_ver) if (db->known_versions[blk->offset/db->ino_block_size] != block_ver || blk->invalidated)
{ {
// Recheck if block was modified in the meantime // Recheck if block was modified in the meantime
db->run_continue_update(blk->offset); db->run_continue_update(blk->offset);
@ -1444,6 +1564,18 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
} }
uint32_t rm_size = 0; uint32_t rm_size = 0;
auto d_it = blk->data.find(key); auto d_it = blk->data.find(key);
if (cas_cb && path_pos == path.size()-1 && !cas_cb(d_it != blk->data.end() ? 0 : -ENOENT, d_it != blk->data.end() ? d_it->second : ""))
{
// CAS failure
db->run_continue_update(blk->offset);
cb(-EAGAIN);
return;
}
if (path_pos == path.size()-1)
{
is_delete = op->opcode == KV_DEL;
}
const std::string & value = path_pos == path.size()-1 ? this->value : arg_value;
if (d_it != blk->data.end()) if (d_it != blk->data.end())
{ {
if (!is_delete && d_it->second == value) if (!is_delete && d_it->second == value)
@ -1462,13 +1594,6 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
cb(0); cb(0);
return; return;
} }
if (cas_cb && path_pos == path.size()-1 && !cas_cb(d_it != blk->data.end() ? 0 : -ENOENT, d_it != blk->data.end() ? d_it->second : ""))
{
// CAS failure
db->run_continue_update(blk->offset);
cb(-EAGAIN);
return;
}
// This condition means this is a block split during previous updates // This condition means this is a block split during previous updates
// Its parent is already updated because prev_key_lt <= blk->right_half // Its parent is already updated because prev_key_lt <= blk->right_half
// That means we can safely remove the "split reference" // That means we can safely remove the "split reference"
@ -1483,7 +1608,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
{ {
blk->dump(0); blk->dump(0);
// Should not happen - we should have resumed the split // Should not happen - we should have resumed the split
fprintf(stderr, "K/V: attempt to write into a block %lu instead of resuming the split (got here from %s..%s)\n", fprintf(stderr, "K/V: attempt to write into block %lu instead of resuming the split (got here from %s..%s)\n",
blk->offset, prev_key_ge.c_str(), prev_key_lt.c_str()); blk->offset, prev_key_ge.c_str(), prev_key_lt.c_str());
abort(); abort();
} }
@ -1493,6 +1618,13 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
{ {
// New item fits. // New item fits.
// No need to split the block => just modify and write it // No need to split the block => just modify and write it
if ((blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) && key >= blk->right_half)
{
fprintf(stderr, "K/V: attempt to modify %s in unrelated split block %lu [%s..%s..%s)\n",
key.c_str(), blk->offset, blk->key_ge.c_str(), blk->right_half.c_str(), blk->key_lt.c_str());
blk->dump(db->base_block_level);
abort();
}
if (is_delete) if (is_delete)
{ {
blk->change_type |= KV_CH_DEL; blk->change_type |= KV_CH_DEL;
@ -1679,15 +1811,15 @@ void kv_op_t::next()
{ {
return; return;
} }
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated) get_block(db, cur_block, cur_level, recheck_policy, [=](int res, int refresh)
{ {
next_handle_block(res, updated); next_handle_block(res, refresh);
}); });
} }
void kv_op_t::next_handle_block(int res, bool updated) void kv_op_t::next_handle_block(int res, int refresh)
{ {
res = handle_block(res, updated, false); res = handle_block(res, refresh, false);
if (res == -EAGAIN) if (res == -EAGAIN)
{ {
next(); next();
@ -1730,7 +1862,7 @@ void kv_op_t::next_get()
this->key = kv_it->first; this->key = kv_it->first;
this->value = kv_it->second; this->value = kv_it->second;
skip_equal = true; skip_equal = true;
callback(this); (std::function<void(kv_op_t *)>(callback))(this);
} }
// Find next block // Find next block
else if (blk->type == KV_LEAF_SPLIT) else if (blk->type == KV_LEAF_SPLIT)
@ -1880,6 +2012,32 @@ void kv_dbw_t::del(const std::string & key, std::function<void(int res)> cb,
op->exec(); op->exec();
} }
void kv_dbw_t::update(const std::string & key, std::function<int(int res, const std::string & old_value, std::string & new_value)> cas_cb,
std::function<void(int res)> cb)
{
auto *op = new kv_op_t;
op->db = db;
op->opcode = KV_SET;
op->key = key;
op->cas_cb = [cas_cb, op](int res, const std::string & old_value)
{
int action = cas_cb(res, old_value, op->value);
if (action == 1)
op->opcode = KV_SET;
else if (action == 2 && res != -ENOENT)
op->opcode = KV_DEL;
else
return false;
return true;
};
op->callback = [cb](kv_op_t *op)
{
cb(op->res);
delete op;
};
op->exec();
}
void* kv_dbw_t::list_start(const std::string & start) void* kv_dbw_t::list_start(const std::string & start)
{ {
auto *op = new kv_op_t; auto *op = new kv_op_t;

View File

@ -27,6 +27,9 @@ struct kv_dbw_t
std::function<bool(int res, const std::string & value)> cas_compare = NULL); std::function<bool(int res, const std::string & value)> cas_compare = NULL);
void del(const std::string & key, std::function<void(int res)> cb, void del(const std::string & key, std::function<void(int res)> cb,
std::function<bool(int res, const std::string & value)> cas_compare = NULL); std::function<bool(int res, const std::string & value)> cas_compare = NULL);
void update(const std::string & key,
std::function<int(int res, const std::string & old_value, std::string & new_value)> cas_compare,
std::function<bool(int res)> cb);
void* list_start(const std::string & start); void* list_start(const std::string & start);
void list_next(void *handle, std::function<void(int res, const std::string & key, const std::string & value)> cb); void list_next(void *handle, std::function<void(int res, const std::string & key, const std::string & value)> cb);

View File

@ -20,6 +20,7 @@ const char *exe_name = NULL;
struct kv_test_listing_t struct kv_test_listing_t
{ {
uint64_t count = 0, done = 0;
void *handle = NULL; void *handle = NULL;
std::string next_after; std::string next_after;
std::set<std::string> inflights; std::set<std::string> inflights;
@ -44,7 +45,7 @@ class kv_test_t
public: public:
// Config // Config
json11::Json::object kv_cfg; json11::Json::object kv_cfg;
std::string key_suffix; std::string key_prefix, key_suffix;
uint64_t inode_id = 0; uint64_t inode_id = 0;
uint64_t op_count = 1000000; uint64_t op_count = 1000000;
uint64_t runtime_sec = 0; uint64_t runtime_sec = 0;
@ -59,8 +60,11 @@ public:
uint64_t max_key_len = 70; uint64_t max_key_len = 70;
uint64_t min_value_len = 50; uint64_t min_value_len = 50;
uint64_t max_value_len = 300; uint64_t max_value_len = 300;
uint64_t min_list_count = 10;
uint64_t max_list_count = 1000;
uint64_t print_stats_interval = 1; uint64_t print_stats_interval = 1;
bool json_output = false; bool json_output = false;
uint64_t log_level = 1;
bool trace = false; bool trace = false;
bool stop_on_error = false; bool stop_on_error = false;
// FIXME: Multiple clients // FIXME: Multiple clients
@ -125,8 +129,10 @@ json11::Json::object kv_test_t::parse_args(int narg, const char *args[])
"USAGE: %s --pool_id POOL_ID --inode_id INODE_ID [OPTIONS]\n" "USAGE: %s --pool_id POOL_ID --inode_id INODE_ID [OPTIONS]\n"
" --op_count 1000000\n" " --op_count 1000000\n"
" Total operations to run during test. 0 means unlimited\n" " Total operations to run during test. 0 means unlimited\n"
" --key_prefix \"\"\n"
" Prefix for all keys read or written (to avoid collisions)\n"
" --key_suffix \"\"\n" " --key_suffix \"\"\n"
" Suffix for all keys read or written (to avoid collisions)\n" " Suffix for all keys read or written (to avoid collisions, but scan all DB)\n"
" --runtime 0\n" " --runtime 0\n"
" Run for this number of seconds. 0 means unlimited\n" " Run for this number of seconds. 0 means unlimited\n"
" --parallelism 4\n" " --parallelism 4\n"
@ -149,6 +155,10 @@ json11::Json::object kv_test_t::parse_args(int narg, const char *args[])
" Minimum value size in bytes\n" " Minimum value size in bytes\n"
" --max_value_len 300\n" " --max_value_len 300\n"
" Maximum value size in bytes\n" " Maximum value size in bytes\n"
" --min_list_count 10\n"
" Minimum number of keys read in listing (0 = all keys)\n"
" --max_list_count 1000\n"
" Maximum number of keys read in listing\n"
" --print_stats 1\n" " --print_stats 1\n"
" Print operation statistics every this number of seconds\n" " Print operation statistics every this number of seconds\n"
" --json\n" " --json\n"
@ -187,6 +197,7 @@ void kv_test_t::parse_config(json11::Json cfg)
inode_id = INODE_WITH_POOL(cfg["pool_id"].uint64_value(), cfg["inode_id"].uint64_value()); inode_id = INODE_WITH_POOL(cfg["pool_id"].uint64_value(), cfg["inode_id"].uint64_value());
if (cfg["op_count"].uint64_value() > 0) if (cfg["op_count"].uint64_value() > 0)
op_count = cfg["op_count"].uint64_value(); op_count = cfg["op_count"].uint64_value();
key_prefix = cfg["key_prefix"].string_value();
key_suffix = cfg["key_suffix"].string_value(); key_suffix = cfg["key_suffix"].string_value();
if (cfg["runtime"].uint64_value() > 0) if (cfg["runtime"].uint64_value() > 0)
runtime_sec = cfg["runtime"].uint64_value(); runtime_sec = cfg["runtime"].uint64_value();
@ -212,6 +223,10 @@ void kv_test_t::parse_config(json11::Json cfg)
min_value_len = cfg["min_value_len"].uint64_value(); min_value_len = cfg["min_value_len"].uint64_value();
if (cfg["max_value_len"].uint64_value() > 0) if (cfg["max_value_len"].uint64_value() > 0)
max_value_len = cfg["max_value_len"].uint64_value(); max_value_len = cfg["max_value_len"].uint64_value();
if (!cfg["min_list_count"].is_null())
min_list_count = cfg["min_list_count"].uint64_value();
if (!cfg["max_list_count"].is_null())
max_list_count = cfg["max_list_count"].uint64_value();
if (!cfg["print_stats"].is_null()) if (!cfg["print_stats"].is_null())
print_stats_interval = cfg["print_stats"].uint64_value(); print_stats_interval = cfg["print_stats"].uint64_value();
if (!cfg["json"].is_null()) if (!cfg["json"].is_null())
@ -230,7 +245,8 @@ void kv_test_t::parse_config(json11::Json cfg)
kv_cfg["kv_evict_unused_age"] = cfg["kv_evict_unused_age"]; kv_cfg["kv_evict_unused_age"] = cfg["kv_evict_unused_age"];
if (!cfg["kv_log_level"].is_null()) if (!cfg["kv_log_level"].is_null())
{ {
trace = cfg["kv_log_level"].uint64_value() >= 10; log_level = cfg["kv_log_level"].uint64_value();
trace = log_level >= 10;
kv_cfg["kv_log_level"] = cfg["kv_log_level"]; kv_cfg["kv_log_level"] = cfg["kv_log_level"];
} }
total_prob = reopen_prob+get_prob+add_prob+update_prob+del_prob+list_prob; total_prob = reopen_prob+get_prob+add_prob+update_prob+del_prob+list_prob;
@ -396,7 +412,7 @@ void kv_test_t::loop()
// add // add
is_add = true; is_add = true;
uint64_t key_len = min_key_len + (max_key_len > min_key_len ? lrand48() % (max_key_len-min_key_len) : 0); uint64_t key_len = min_key_len + (max_key_len > min_key_len ? lrand48() % (max_key_len-min_key_len) : 0);
key = random_str(key_len) + key_suffix; key = key_prefix + random_str(key_len) + key_suffix;
} }
else else
{ {
@ -480,9 +496,10 @@ void kv_test_t::loop()
in_progress++; in_progress++;
auto key = random_str(max_key_len); auto key = random_str(max_key_len);
auto lst = new kv_test_listing_t; auto lst = new kv_test_listing_t;
lst->handle = db->list_start(key);
auto k_it = values.lower_bound(key); auto k_it = values.lower_bound(key);
lst->next_after = k_it == values.begin() ? "" : key; lst->count = min_list_count + (max_list_count > min_list_count ? lrand48() % (max_list_count-min_list_count) : 0);
lst->handle = db->list_start(k_it == values.begin() ? key_prefix : key);
lst->next_after = k_it == values.begin() ? key_prefix : key;
lst->inflights = changing_keys; lst->inflights = changing_keys;
listings.insert(lst); listings.insert(lst);
if (trace) if (trace)
@ -490,10 +507,22 @@ void kv_test_t::loop()
clock_gettime(CLOCK_REALTIME, &lst->tv_begin); clock_gettime(CLOCK_REALTIME, &lst->tv_begin);
db->list_next(lst->handle, [this, lst](int res, const std::string & key, const std::string & value) db->list_next(lst->handle, [this, lst](int res, const std::string & key, const std::string & value)
{ {
if (res < 0) if (log_level >= 11)
printf("list: %s = %s\n", key.c_str(), value.c_str());
if (res >= 0 && key_prefix.size() && (key.size() < key_prefix.size() ||
key.substr(0, key_prefix.size()) != key_prefix))
{
// stop at this key
res = -ENOENT;
}
if (res < 0 || (lst->count > 0 && lst->done >= lst->count))
{ {
add_stat(stat.list, lst->tv_begin); add_stat(stat.list, lst->tv_begin);
if (res != -ENOENT) if (res == 0)
{
// ok (done >= count)
}
else if (res != -ENOENT)
{ {
fprintf(stderr, "ERROR: list: %d (%s)\n", res, strerror(-res)); fprintf(stderr, "ERROR: list: %d (%s)\n", res, strerror(-res));
lst->error = true; lst->error = true;
@ -530,6 +559,7 @@ void kv_test_t::loop()
key.substr(key.size()-key_suffix.size()) == key_suffix) && key.substr(key.size()-key_suffix.size()) == key_suffix) &&
lst->inflights.find(key) == lst->inflights.end()) lst->inflights.find(key) == lst->inflights.end())
{ {
lst->done++;
auto k_it = lst->next_after == "" ? values.begin() : values.upper_bound(lst->next_after); auto k_it = lst->next_after == "" ? values.begin() : values.upper_bound(lst->next_after);
while (true) while (true)
{ {