Compare commits
26 Commits
v0.8.3
...
etcd-hide-
Author | SHA1 | Date | |
---|---|---|---|
dc75450f6d | |||
373f9d0387 | |||
c4516ea971 | |||
91065c80fc | |||
0f6b946add | |||
465cbf0b2f | |||
41add50e4e | |||
02e7be7dc9 | |||
73940adf07 | |||
e950c024d3 | |||
71d6d9f868 | |||
a4dfa519af | |||
37a6aff2fa | |||
67019f5b02 | |||
0593e5c21c | |||
998e24adf8 | |||
d7bd36dc32 | |||
cf5c562800 | |||
629200b0cc | |||
3589ccec22 | |||
8d55a1e780 | |||
65f6b3a4eb | |||
fd216eac77 | |||
61fca7c426 | |||
1c29ed80b9 | |||
68f3fb795e |
@@ -17,6 +17,7 @@ initialization and can be changed with an OSD restart.
|
||||
- [autosync_interval](#autosync_interval)
|
||||
- [autosync_writes](#autosync_writes)
|
||||
- [recovery_queue_depth](#recovery_queue_depth)
|
||||
- [recovery_pg_switch](#recovery_pg_switch)
|
||||
- [recovery_sync_batch](#recovery_sync_batch)
|
||||
- [readonly](#readonly)
|
||||
- [no_recovery](#no_recovery)
|
||||
@@ -115,6 +116,16 @@ Maximum recovery operations per one primary OSD at any given moment of time.
|
||||
Currently it's the only parameter available to tune the speed or recovery
|
||||
and rebalancing, but it's planned to implement more.
|
||||
|
||||
## recovery_pg_switch
|
||||
|
||||
- Type: integer
|
||||
- Default: 128
|
||||
|
||||
Number of recovery operations before switching to recovery of the next PG.
|
||||
The idea is to mix all PGs during recovery for more even space and load
|
||||
distribution but still benefit from recovery queue depth greater than 1.
|
||||
Degraded PGs are anyway scanned first.
|
||||
|
||||
## recovery_sync_batch
|
||||
|
||||
- Type: integer
|
||||
|
@@ -18,6 +18,7 @@
|
||||
- [autosync_interval](#autosync_interval)
|
||||
- [autosync_writes](#autosync_writes)
|
||||
- [recovery_queue_depth](#recovery_queue_depth)
|
||||
- [recovery_pg_switch](#recovery_pg_switch)
|
||||
- [recovery_sync_batch](#recovery_sync_batch)
|
||||
- [readonly](#readonly)
|
||||
- [no_recovery](#no_recovery)
|
||||
@@ -119,6 +120,17 @@ OSD, чтобы успевать очищать журнал - без них OSD
|
||||
для ускорения или замедления восстановления и перебалансировки данных, но
|
||||
в планах реализация других параметров.
|
||||
|
||||
## recovery_pg_switch
|
||||
|
||||
- Тип: целое число
|
||||
- Значение по умолчанию: 128
|
||||
|
||||
Число операций восстановления перед переключением на восстановление другой PG.
|
||||
Идея заключается в том, чтобы восстанавливать все PG одновременно для более
|
||||
равномерного распределения места и нагрузки, но при этом всё равно выигрывать
|
||||
от глубины очереди восстановления, большей, чем 1. Деградированные PG в любом
|
||||
случае сканируются первыми.
|
||||
|
||||
## recovery_sync_batch
|
||||
|
||||
- Тип: целое число
|
||||
|
@@ -102,6 +102,20 @@
|
||||
момент времени. На данный момент единственный параметр, который можно менять
|
||||
для ускорения или замедления восстановления и перебалансировки данных, но
|
||||
в планах реализация других параметров.
|
||||
- name: recovery_pg_switch
|
||||
type: int
|
||||
default: 128
|
||||
info: |
|
||||
Number of recovery operations before switching to recovery of the next PG.
|
||||
The idea is to mix all PGs during recovery for more even space and load
|
||||
distribution but still benefit from recovery queue depth greater than 1.
|
||||
Degraded PGs are anyway scanned first.
|
||||
info_ru: |
|
||||
Число операций восстановления перед переключением на восстановление другой PG.
|
||||
Идея заключается в том, чтобы восстанавливать все PG одновременно для более
|
||||
равномерного распределения места и нагрузки, но при этом всё равно выигрывать
|
||||
от глубины очереди восстановления, большей, чем 1. Деградированные PG в любом
|
||||
случае сканируются первыми.
|
||||
- name: recovery_sync_batch
|
||||
type: int
|
||||
default: 16
|
||||
|
@@ -70,7 +70,7 @@ For EC pools the configuration should look like the following:
|
||||
|
||||
```
|
||||
etcdctl --endpoints=... put /vitastor/config/pools '{"2":{"name":"ecpool",
|
||||
"scheme":"ec","pg_size":4,"parity_chunks":2,"pg_minsize":2,"pg_count":256,"failure_domain":"host"}`
|
||||
"scheme":"ec","pg_size":4,"parity_chunks":2,"pg_minsize":2,"pg_count":256,"failure_domain":"host"}'
|
||||
```
|
||||
|
||||
After you do this, one of the monitors will configure PGs and OSDs will start them.
|
||||
|
@@ -71,7 +71,7 @@ etcdctl --endpoints=... put /vitastor/config/pools '{"1":{"name":"testpool",
|
||||
|
||||
```
|
||||
etcdctl --endpoints=... put /vitastor/config/pools '{"2":{"name":"ecpool",
|
||||
"scheme":"ec","pg_size":4,"parity_chunks":2,"pg_minsize":2,"pg_count":256,"failure_domain":"host"}`
|
||||
"scheme":"ec","pg_size":4,"parity_chunks":2,"pg_minsize":2,"pg_count":256,"failure_domain":"host"}'
|
||||
```
|
||||
|
||||
После этого один из мониторов должен сконфигурировать PG, а OSD должны запустить их.
|
||||
|
@@ -21,7 +21,7 @@ function add_pg_history(new_pg_history, new_pg, prev_pgs, prev_pg_history, old_p
|
||||
{
|
||||
for (const pg of oh.osd_sets)
|
||||
{
|
||||
nh.osd_sets[pg.join(' ')] = pg;
|
||||
nh.osd_sets[pg.join(' ')] = pg.map(osd_num => Number(osd_num));
|
||||
}
|
||||
}
|
||||
if (oh && oh.all_peers && oh.all_peers.length)
|
||||
|
30
mon/mon.js
30
mon/mon.js
@@ -663,12 +663,15 @@ class Mon
|
||||
async save_last_clean()
|
||||
{
|
||||
// last_clean_pgs is used to avoid extra data move when observing a series of changes in the cluster
|
||||
const new_clean_pgs = { items: {} };
|
||||
next_pool:
|
||||
for (const pool_id in this.state.config.pools)
|
||||
{
|
||||
new_clean_pgs.items[pool_id] = (this.state.history.last_clean_pgs.items||{})[pool_id];
|
||||
const pool_cfg = this.state.config.pools[pool_id];
|
||||
if (!this.validate_pool_cfg(pool_id, pool_cfg, false))
|
||||
{
|
||||
continue;
|
||||
continue next_pool;
|
||||
}
|
||||
for (let pg_num = 1; pg_num <= pool_cfg.pg_count; pg_num++)
|
||||
{
|
||||
@@ -677,17 +680,18 @@ class Mon
|
||||
!(this.state.pg.state[pool_id][pg_num].state instanceof Array))
|
||||
{
|
||||
// Unclean
|
||||
return;
|
||||
continue next_pool;
|
||||
}
|
||||
let st = this.state.pg.state[pool_id][pg_num].state.join(',');
|
||||
if (st != 'active' && st != 'active,left_on_dead' && st != 'left_on_dead,active')
|
||||
{
|
||||
// Unclean
|
||||
return;
|
||||
continue next_pool;
|
||||
}
|
||||
}
|
||||
new_clean_pgs.items[pool_id] = this.state.config.pgs.items[pool_id];
|
||||
}
|
||||
this.state.history.last_clean_pgs = JSON.parse(JSON.stringify(this.state.config.pgs));
|
||||
this.state.history.last_clean_pgs = new_clean_pgs;
|
||||
await this.etcd_call('/kv/txn', {
|
||||
success: [ { requestPut: {
|
||||
key: b64(this.etcd_prefix+'/history/last_clean_pgs'),
|
||||
@@ -1374,16 +1378,14 @@ class Mon
|
||||
// This is required for multiple change events to trigger at most 1 recheck in 1s
|
||||
schedule_recheck()
|
||||
{
|
||||
if (this.recheck_timer)
|
||||
if (!this.recheck_timer)
|
||||
{
|
||||
clearTimeout(this.recheck_timer);
|
||||
this.recheck_timer = null;
|
||||
this.recheck_timer = setTimeout(() =>
|
||||
{
|
||||
this.recheck_timer = null;
|
||||
this.recheck_pgs().catch(this.die);
|
||||
}, this.config.mon_change_timeout || 1000);
|
||||
}
|
||||
this.recheck_timer = setTimeout(() =>
|
||||
{
|
||||
this.recheck_timer = null;
|
||||
this.recheck_pgs().catch(this.die);
|
||||
}, this.config.mon_change_timeout || 1000);
|
||||
}
|
||||
|
||||
sum_op_stats(timestamp, prev_stats)
|
||||
@@ -1719,11 +1721,11 @@ class Mon
|
||||
else if (key_parts[0] === 'osd' && key_parts[1] === 'stats')
|
||||
{
|
||||
// Recheck OSD tree on OSD addition/deletion
|
||||
if ((!old) != (!kv.value) || old && kv.value && (old.size != kv.value.size || old.time != kv.value.time))
|
||||
if ((!old) != (!kv.value) || old && kv.value && old.size != kv.value.size)
|
||||
{
|
||||
this.schedule_recheck();
|
||||
}
|
||||
// Recheck PGs <osd_out_time> later
|
||||
// Recheck PGs <osd_out_time> after last OSD statistics report
|
||||
this.schedule_next_recheck_at(
|
||||
!this.state.osd.stats[key[2]] ? 0 : this.state.osd.stats[key[2]].time+this.config.osd_out_time
|
||||
);
|
||||
|
@@ -235,9 +235,16 @@ add_executable(osd_test osd_test.cpp rw_blocking.cpp addr_util.cpp)
|
||||
target_link_libraries(osd_test tcmalloc_minimal)
|
||||
|
||||
# osd_rmw_test
|
||||
# FIXME: Move to tests
|
||||
add_executable(osd_rmw_test osd_rmw_test.cpp allocator.cpp)
|
||||
target_link_libraries(osd_rmw_test Jerasure ${ISAL_LIBRARIES} tcmalloc_minimal)
|
||||
|
||||
if (ISAL_LIBRARIES)
|
||||
add_executable(osd_rmw_test_je osd_rmw_test.cpp allocator.cpp)
|
||||
target_compile_definitions(osd_rmw_test_je PUBLIC -DNO_ISAL)
|
||||
target_link_libraries(osd_rmw_test_je Jerasure tcmalloc_minimal)
|
||||
endif (ISAL_LIBRARIES)
|
||||
|
||||
# stub_uring_osd
|
||||
add_executable(stub_uring_osd
|
||||
stub_uring_osd.cpp
|
||||
|
@@ -162,7 +162,8 @@ void journal_flusher_t::mark_trim_possible()
|
||||
if (trim_wanted > 0)
|
||||
{
|
||||
dequeuing = true;
|
||||
journal_trim_counter++;
|
||||
if (!journal_trim_counter)
|
||||
journal_trim_counter = journal_trim_interval;
|
||||
bs->ringloop->wakeup();
|
||||
}
|
||||
}
|
||||
|
@@ -193,6 +193,7 @@ void blockstore_impl_t::loop()
|
||||
}
|
||||
if (wr_st == 2)
|
||||
{
|
||||
submit_queue[op_idx] = NULL;
|
||||
new_idx--;
|
||||
}
|
||||
if (wr_st == 0)
|
||||
@@ -582,7 +583,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
||||
replace_stable(dirty_it->first.oid, 0, clean_stable_count, stable_count, stable);
|
||||
}
|
||||
}
|
||||
else if (IS_STABLE(dirty_it->second.state))
|
||||
else if (IS_STABLE(dirty_it->second.state) || (dirty_it->second.state & BS_ST_INSTANT))
|
||||
{
|
||||
// First try to replace a clean stable version in the first part of the list
|
||||
if (!replace_stable(dirty_it->first.oid, dirty_it->first.version, 0, clean_stable_count, stable))
|
||||
|
@@ -16,6 +16,7 @@
|
||||
// FIXME: This value should be dynamic i.e. Blockstore ideally shouldn't allow
|
||||
// writing more than can be stabilized afterwards
|
||||
#define JOURNAL_STABILIZE_RESERVATION 65536
|
||||
#define JOURNAL_INSTANT_RESERVATION 131072
|
||||
|
||||
// Journal entries
|
||||
// Journal entries are linked to each other by their crc32 value
|
||||
|
@@ -286,7 +286,10 @@ void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op
|
||||
{
|
||||
auto used = --journal.used_sectors[rv.journal_sector-1];
|
||||
if (used == 0)
|
||||
{
|
||||
journal.used_sectors.erase(rv.journal_sector-1);
|
||||
flusher->mark_trim_possible();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -127,7 +127,6 @@ resume_4:
|
||||
{
|
||||
mark_rolled_back(*v);
|
||||
}
|
||||
flusher->mark_trim_possible();
|
||||
// Acknowledge op
|
||||
op->retval = 0;
|
||||
FINISH_OP(op);
|
||||
@@ -232,6 +231,7 @@ void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start,
|
||||
if (used == 0)
|
||||
{
|
||||
journal.used_sectors.erase(dirty_it->second.journal_sector);
|
||||
flusher->mark_trim_possible();
|
||||
}
|
||||
if (dsk.clean_entry_bitmap_size > sizeof(void*))
|
||||
{
|
||||
|
@@ -89,6 +89,9 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
||||
else
|
||||
{
|
||||
// Invalid version requested
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("Write %lx:%lx v%lu requested, but we already have v%lu\n", op->oid.inode, op->oid.stripe, op->version, version);
|
||||
#endif
|
||||
op->retval = -EEXIST;
|
||||
if (!is_del && dsk.clean_entry_bitmap_size > sizeof(void*))
|
||||
{
|
||||
@@ -115,8 +118,8 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
||||
else if (!wait_del)
|
||||
printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len);
|
||||
#endif
|
||||
// FIXME No strict need to add it into dirty_db here, it's just left
|
||||
// from the previous implementation where reads waited for writes
|
||||
// No strict need to add it into dirty_db here except maybe for listings to return
|
||||
// correct data when there are inflight operations in the queue
|
||||
uint32_t state;
|
||||
if (is_del)
|
||||
state = BS_ST_DELETE | BS_ST_IN_FLIGHT;
|
||||
@@ -182,9 +185,15 @@ void blockstore_impl_t::cancel_all_writes(blockstore_op_t *op, blockstore_dirty_
|
||||
bool found = false;
|
||||
for (auto other_op: submit_queue)
|
||||
{
|
||||
// <op> may be present in queue multiple times due to moving operations in submit_queue
|
||||
if (other_op == op)
|
||||
if (!other_op)
|
||||
{
|
||||
// freed operations during submitting are zeroed
|
||||
}
|
||||
else if (other_op == op)
|
||||
{
|
||||
// <op> may be present in queue multiple times due to moving operations in submit_queue
|
||||
found = true;
|
||||
}
|
||||
else if (found && other_op->oid == op->oid &&
|
||||
(other_op->opcode == BS_OP_WRITE || other_op->opcode == BS_OP_WRITE_STABLE))
|
||||
{
|
||||
@@ -252,7 +261,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||
{
|
||||
blockstore_journal_check_t space_check(this);
|
||||
if (!space_check.check_available(op, unsynced_big_write_count + 1,
|
||||
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION))
|
||||
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size,
|
||||
(dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
@@ -332,7 +342,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||
!space_check.check_available(op, unsynced_big_write_count,
|
||||
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, 0)
|
||||
|| !space_check.check_available(op, 1,
|
||||
sizeof(journal_entry_small_write) + dsk.clean_entry_bitmap_size, op->len + JOURNAL_STABILIZE_RESERVATION))
|
||||
sizeof(journal_entry_small_write) + dsk.clean_entry_bitmap_size,
|
||||
op->len + ((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
@@ -443,18 +454,19 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
|
||||
resume_2:
|
||||
// Only for the immediate_commit mode: prepare and submit big_write journal entry
|
||||
{
|
||||
blockstore_journal_check_t space_check(this);
|
||||
if (!space_check.check_available(op, 1,
|
||||
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
BS_SUBMIT_CHECK_SQES(1);
|
||||
auto dirty_it = dirty_db.find((obj_ver_id){
|
||||
.oid = op->oid,
|
||||
.version = op->version,
|
||||
});
|
||||
assert(dirty_it != dirty_db.end());
|
||||
blockstore_journal_check_t space_check(this);
|
||||
if (!space_check.check_available(op, 1,
|
||||
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size,
|
||||
((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
BS_SUBMIT_CHECK_SQES(1);
|
||||
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
|
||||
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
|
||||
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size
|
||||
@@ -641,7 +653,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
|
||||
});
|
||||
assert(dirty_it != dirty_db.end());
|
||||
blockstore_journal_check_t space_check(this);
|
||||
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_STABILIZE_RESERVATION))
|
||||
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_INSTANT_RESERVATION))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
@@ -34,26 +34,22 @@ struct alloc_osd_t
|
||||
json11::Json::object {
|
||||
{ "target", "VERSION" },
|
||||
{ "version", 0 },
|
||||
{ "key", base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+"/osd/stats/"+std::to_string(new_id)
|
||||
) },
|
||||
{ "key", "/osd/stats/"+std::to_string(new_id) }
|
||||
},
|
||||
} },
|
||||
{ "success", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+"/osd/stats/"+std::to_string(new_id)
|
||||
) },
|
||||
{ "value", base64_encode("{}") },
|
||||
{ "key", "/osd/stats/"+std::to_string(new_id) }
|
||||
{ "value", "{}" },
|
||||
} },
|
||||
},
|
||||
} },
|
||||
{ "failure", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats/") },
|
||||
{ "range_end", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats0") },
|
||||
{ "key", "/osd/stats/" },
|
||||
{ "range_end", "/osd/stats0" },
|
||||
{ "keys_only", true },
|
||||
} },
|
||||
},
|
||||
|
@@ -5,6 +5,7 @@
|
||||
#include "cli.h"
|
||||
#include "cluster_client.h"
|
||||
#include "str_util.h"
|
||||
#include "epoll_manager.h"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
@@ -14,13 +15,21 @@ struct rm_osd_t
|
||||
cli_tool_t *parent;
|
||||
|
||||
bool dry_run, force_warning, force_dataloss;
|
||||
uint64_t etcd_tx_retry_ms = 500;
|
||||
uint64_t etcd_tx_retries = 10000;
|
||||
std::vector<uint64_t> osd_ids;
|
||||
|
||||
int state = 0;
|
||||
cli_result_t result;
|
||||
|
||||
std::set<uint64_t> to_remove;
|
||||
std::set<uint64_t> to_restart;
|
||||
json11::Json::array pool_effects;
|
||||
json11::Json::array history_updates, history_checks;
|
||||
json11::Json new_pgs, new_clean_pgs;
|
||||
uint64_t new_pgs_mod_rev, new_clean_pgs_mod_rev;
|
||||
uint64_t cur_retry = 0;
|
||||
uint64_t retry_wait = 0;
|
||||
bool is_warning, is_dataloss;
|
||||
|
||||
bool is_done()
|
||||
@@ -32,6 +41,12 @@ struct rm_osd_t
|
||||
{
|
||||
if (state == 1)
|
||||
goto resume_1;
|
||||
else if (state == 2)
|
||||
goto resume_2;
|
||||
else if (state == 3)
|
||||
goto resume_3;
|
||||
else if (state == 4)
|
||||
goto resume_4;
|
||||
if (!osd_ids.size())
|
||||
{
|
||||
result = (cli_result_t){ .err = EINVAL, .text = "OSD numbers are not specified" };
|
||||
@@ -152,14 +167,48 @@ struct rm_osd_t
|
||||
result.text = error;
|
||||
if (dry_run || is_dataloss && !force_dataloss || is_warning && !force_warning)
|
||||
{
|
||||
result.err = is_dataloss || is_warning ? EBUSY : 0;
|
||||
result.err = is_dataloss && !force_dataloss || is_warning && !force_warning ? EBUSY : 0;
|
||||
state = 100;
|
||||
return;
|
||||
}
|
||||
}
|
||||
parent->etcd_txn(json11::Json::object { { "success", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+"/config/pgs"
|
||||
) },
|
||||
} },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+"/history/last_clean_pgs"
|
||||
) },
|
||||
} },
|
||||
},
|
||||
} } });
|
||||
resume_4:
|
||||
state = 4;
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
if (parent->etcd_err.err)
|
||||
{
|
||||
result = parent->etcd_err;
|
||||
state = 100;
|
||||
return;
|
||||
}
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][0]["response_range"]["kvs"][0]);
|
||||
new_pgs = remove_osds_from_pgs(kv);
|
||||
new_pgs_mod_rev = kv.mod_revision;
|
||||
kv = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][1]["response_range"]["kvs"][0]);
|
||||
new_clean_pgs = remove_osds_from_pgs(kv);
|
||||
new_clean_pgs_mod_rev = kv.mod_revision;
|
||||
}
|
||||
// Remove keys from etcd
|
||||
{
|
||||
json11::Json::array rm_items;
|
||||
json11::Json::array rm_items, rm_checks;
|
||||
for (auto osd_id: osd_ids)
|
||||
{
|
||||
rm_items.push_back("/config/osd/"+std::to_string(osd_id));
|
||||
@@ -178,7 +227,39 @@ struct rm_osd_t
|
||||
} },
|
||||
};
|
||||
}
|
||||
parent->etcd_txn(json11::Json::object { { "success", rm_items } });
|
||||
if (!new_pgs.is_null())
|
||||
{
|
||||
auto pgs_key = base64_encode(parent->cli->st_cli.etcd_prefix+"/config/pgs");
|
||||
rm_items.push_back(json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", pgs_key },
|
||||
{ "value", base64_encode(new_pgs.dump()) },
|
||||
} },
|
||||
});
|
||||
rm_checks.push_back(json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", pgs_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", new_pgs_mod_rev+1 },
|
||||
});
|
||||
}
|
||||
if (!new_clean_pgs.is_null())
|
||||
{
|
||||
auto pgs_key = base64_encode(parent->cli->st_cli.etcd_prefix+"/history/last_clean_pgs");
|
||||
rm_items.push_back(json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", pgs_key },
|
||||
{ "value", base64_encode(new_clean_pgs.dump()) },
|
||||
} },
|
||||
});
|
||||
rm_checks.push_back(json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", pgs_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", new_clean_pgs_mod_rev+1 },
|
||||
});
|
||||
}
|
||||
parent->etcd_txn(json11::Json::object { { "success", rm_items }, { "checks", rm_checks } });
|
||||
}
|
||||
resume_1:
|
||||
state = 1;
|
||||
@@ -190,6 +271,46 @@ struct rm_osd_t
|
||||
state = 100;
|
||||
return;
|
||||
}
|
||||
// Remove old OSD from PG all_peers to prevent left_on_dead and from
|
||||
// target_history to prevent INCOMPLETE if --allow-data-loss is specified
|
||||
for (auto & rsp: parent->etcd_result["responses"].array_items())
|
||||
{
|
||||
if (rsp["response_delete_range"]["deleted"].uint64_value() > 0)
|
||||
{
|
||||
// Wait for mon_change_timeout before updating PG history, or the monitor's change will likely interfere with ours
|
||||
retry_wait = parent->cli->merged_config["mon_change_timeout"].uint64_value();
|
||||
if (!retry_wait)
|
||||
retry_wait = 1000;
|
||||
retry_wait += etcd_tx_retry_ms;
|
||||
}
|
||||
}
|
||||
while (1)
|
||||
{
|
||||
resume_2:
|
||||
if (!remove_osds_from_history(2))
|
||||
return;
|
||||
resume_3:
|
||||
state = 3;
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
if (parent->etcd_err.err)
|
||||
{
|
||||
result = parent->etcd_err;
|
||||
state = 100;
|
||||
return;
|
||||
}
|
||||
if (parent->etcd_result["succeeded"].bool_value())
|
||||
break;
|
||||
if ((++cur_retry) >= etcd_tx_retries)
|
||||
{
|
||||
result.err = EAGAIN;
|
||||
result.text += "Failed to remove OSDs from PG history due to update conflicts."
|
||||
" Some PGs may remain left_on_dead or incomplete. Please retry later\n";
|
||||
state = 100;
|
||||
return;
|
||||
}
|
||||
retry_wait = etcd_tx_retry_ms;
|
||||
}
|
||||
std::string ids = "";
|
||||
for (auto osd_id: osd_ids)
|
||||
{
|
||||
@@ -200,6 +321,138 @@ struct rm_osd_t
|
||||
result.text = (result.text != "" ? ids+"\n"+result.text : ids);
|
||||
result.err = 0;
|
||||
}
|
||||
|
||||
json11::Json remove_osds_from_pgs(const etcd_kv_t & kv)
|
||||
{
|
||||
if (kv.value.is_null())
|
||||
{
|
||||
return kv.value;
|
||||
}
|
||||
json11::Json::object new_pgs;
|
||||
for (auto & pp: kv.value["items"].object_items())
|
||||
{
|
||||
if (pp.second.is_object())
|
||||
{
|
||||
json11::Json::object new_pool;
|
||||
for (auto & pgp: pp.second.object_items())
|
||||
{
|
||||
json11::Json::array osd_set;
|
||||
for (auto & osd_json: pgp.second["osd_set"].array_items())
|
||||
{
|
||||
uint64_t osd_num = osd_json.uint64_value();
|
||||
osd_set.push_back(osd_num == 0 || to_remove.find(osd_num) != to_remove.end() ? 0 : osd_num);
|
||||
}
|
||||
json11::Json::object new_pg = pgp.second.object_items();
|
||||
new_pg["osd_set"] = osd_set;
|
||||
new_pool[pgp.first] = new_pg;
|
||||
}
|
||||
new_pgs[pp.first] = new_pool;
|
||||
}
|
||||
else
|
||||
new_pgs[pp.first] = pp.second;
|
||||
}
|
||||
auto res = kv.value.object_items();
|
||||
res["items"] = new_pgs;
|
||||
return res;
|
||||
}
|
||||
|
||||
bool remove_osds_from_history(int base_state)
|
||||
{
|
||||
if (state == base_state+0)
|
||||
goto resume_0;
|
||||
history_updates.clear();
|
||||
history_checks.clear();
|
||||
for (auto & pp: parent->cli->st_cli.pool_config)
|
||||
{
|
||||
bool update_pg_history = false;
|
||||
auto & pool_cfg = pp.second;
|
||||
for (auto & pgp: pool_cfg.pg_config)
|
||||
{
|
||||
auto pg_num = pgp.first;
|
||||
auto & pg_cfg = pgp.second;
|
||||
for (int i = 0; i < pg_cfg.all_peers.size(); i++)
|
||||
{
|
||||
if (to_remove.find(pg_cfg.all_peers[i]) != to_remove.end())
|
||||
{
|
||||
update_pg_history = true;
|
||||
pg_cfg.all_peers.erase(pg_cfg.all_peers.begin()+i, pg_cfg.all_peers.begin()+i+1);
|
||||
i--;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < pg_cfg.target_history.size(); i++)
|
||||
{
|
||||
int hist_size = 0, hist_rm = 0;
|
||||
for (auto & old_osd: pg_cfg.target_history[i])
|
||||
{
|
||||
if (old_osd != 0)
|
||||
{
|
||||
hist_size++;
|
||||
if (to_remove.find(old_osd) != to_remove.end())
|
||||
{
|
||||
hist_rm++;
|
||||
old_osd = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (hist_rm > 0)
|
||||
{
|
||||
if (hist_size-hist_rm == 0)
|
||||
{
|
||||
pg_cfg.target_history.erase(pg_cfg.target_history.begin()+i, pg_cfg.target_history.begin()+i+1);
|
||||
i--;
|
||||
}
|
||||
update_pg_history = true;
|
||||
}
|
||||
}
|
||||
if (update_pg_history)
|
||||
{
|
||||
std::string history_key = base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+"/pg/history/"+
|
||||
std::to_string(pool_cfg.id)+"/"+std::to_string(pg_num)
|
||||
);
|
||||
history_updates.push_back(json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", history_key },
|
||||
{ "value", base64_encode(json11::Json(json11::Json::object {
|
||||
{ "epoch", pg_cfg.epoch },
|
||||
{ "all_peers", pg_cfg.all_peers },
|
||||
{ "osd_sets", pg_cfg.target_history },
|
||||
}).dump()) },
|
||||
} },
|
||||
});
|
||||
history_checks.push_back(json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", history_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", parent->cli->st_cli.etcd_watch_revision+1 },
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
if (history_updates.size())
|
||||
{
|
||||
if (retry_wait)
|
||||
{
|
||||
parent->waiting++;
|
||||
parent->epmgr->tfd->set_timer(retry_wait, false, [this](int timer_id)
|
||||
{
|
||||
parent->waiting--;
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
resume_0:
|
||||
state = base_state+0;
|
||||
if (parent->waiting > 0)
|
||||
return false;
|
||||
}
|
||||
parent->etcd_txn(json11::Json::object {
|
||||
{ "success", history_updates },
|
||||
{ "compare", history_checks },
|
||||
});
|
||||
}
|
||||
else
|
||||
parent->etcd_result = json11::Json::object{ { "succeeded", true } };
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
std::function<bool(cli_result_t &)> cli_tool_t::start_rm_osd(json11::Json cfg)
|
||||
@@ -209,6 +462,14 @@ std::function<bool(cli_result_t &)> cli_tool_t::start_rm_osd(json11::Json cfg)
|
||||
rm_osd->dry_run = cfg["dry_run"].bool_value();
|
||||
rm_osd->force_dataloss = cfg["allow_data_loss"].bool_value();
|
||||
rm_osd->force_warning = rm_osd->force_dataloss || cfg["force"].bool_value();
|
||||
if (!cfg["etcd_tx_retries"].is_null())
|
||||
rm_osd->etcd_tx_retries = cfg["etcd_tx_retries"].uint64_value();
|
||||
if (!cfg["etcd_tx_retry_ms"].is_null())
|
||||
{
|
||||
rm_osd->etcd_tx_retry_ms = cfg["etcd_tx_retry_ms"].uint64_value();
|
||||
if (rm_osd->etcd_tx_retry_ms < 100)
|
||||
rm_osd->etcd_tx_retry_ms = 100;
|
||||
}
|
||||
if (cfg["osd_id"].is_number() || cfg["osd_id"].is_string())
|
||||
rm_osd->osd_ids.push_back(cfg["osd_id"].uint64_value());
|
||||
else
|
||||
|
@@ -387,6 +387,14 @@ int disk_tool_t::purge_devices(const std::vector<std::string> & devices)
|
||||
rm_osd_cli.push_back(std::to_string(osd_num));
|
||||
}
|
||||
// Check for data loss
|
||||
if (options["force"] != "")
|
||||
{
|
||||
rm_osd_cli.push_back("--force");
|
||||
}
|
||||
else if (options["allow_data_loss"] != "")
|
||||
{
|
||||
rm_osd_cli.push_back("--allow-data-loss");
|
||||
}
|
||||
rm_osd_cli.push_back("--dry-run");
|
||||
std::string dry_run_ignore_stdout;
|
||||
if (shell_exec(rm_osd_cli, "", &dry_run_ignore_stdout, NULL) != 0)
|
||||
@@ -405,14 +413,6 @@ int disk_tool_t::purge_devices(const std::vector<std::string> & devices)
|
||||
}
|
||||
// Remove OSD metadata
|
||||
rm_osd_cli.pop_back();
|
||||
if (options["force"] != "")
|
||||
{
|
||||
rm_osd_cli.push_back("--force");
|
||||
}
|
||||
else if (options["allow_data_loss"] != "")
|
||||
{
|
||||
rm_osd_cli.push_back("--allow-data-loss");
|
||||
}
|
||||
if (shell_exec(rm_osd_cli, "", NULL, NULL) != 0)
|
||||
{
|
||||
return 1;
|
||||
|
@@ -54,8 +54,55 @@ etcd_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
|
||||
return kv;
|
||||
}
|
||||
|
||||
json11::Json etcd_state_client_t::etcd_encode_actions(const json11::Json & items)
|
||||
{
|
||||
json11::Json::array encoded;
|
||||
for (auto & v: items.array_items())
|
||||
{
|
||||
json11::Json::object act;
|
||||
for (auto & kv: v.object_items())
|
||||
{
|
||||
if (kv.first == "key" || kv.first == "range_end")
|
||||
act[kv.first] = base64_encode(etcd_prefix+kv.second.string_value());
|
||||
else if (kv.first == "value")
|
||||
act[kv.first] = base64_encode(kv.second.is_string() ? kv.second.string_value() : kv.second.dump());
|
||||
else
|
||||
act[kv.first] = kv.second;
|
||||
}
|
||||
encoded.push_back(act);
|
||||
}
|
||||
return encoded;
|
||||
}
|
||||
|
||||
void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, int retries, int interval, std::function<void(std::string, json11::Json)> callback)
|
||||
{
|
||||
// FIXME: json11 is immutable which is very inconvenient for such cases
|
||||
json11::Json::object encoded;
|
||||
if (txn["compare"].is_array())
|
||||
{
|
||||
json11::Json::array compare;
|
||||
for (auto & v: txn["compare"].array_items())
|
||||
{
|
||||
json11::Json::object cmp;
|
||||
for (auto & kv: v.object_items())
|
||||
{
|
||||
if (kv.first == "key")
|
||||
cmp[kv.first] = base64_encode(etcd_prefix+kv.second.string_value());
|
||||
else
|
||||
cmp[kv.first] = kv.second;
|
||||
}
|
||||
compare.push_back(cmp);
|
||||
}
|
||||
encoded["compare"] = compare;
|
||||
}
|
||||
if (txn["failure"].is_array())
|
||||
{
|
||||
encoded["failure"] = etcd_encode_actions(txn["failure"]);
|
||||
}
|
||||
if (txn["success"].is_array())
|
||||
{
|
||||
encoded["success"] = etcd_encode_actions(txn["success"]);
|
||||
}
|
||||
etcd_call("/kv/txn", txn, timeout, retries, interval, callback);
|
||||
}
|
||||
|
||||
@@ -871,19 +918,33 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||
pg_cfg.target_history.clear();
|
||||
pg_cfg.all_peers.clear();
|
||||
// Refuse to start PG if any set of the <osd_sets> has no live OSDs
|
||||
for (auto hist_item: value["osd_sets"].array_items())
|
||||
for (auto & hist_item: value["osd_sets"].array_items())
|
||||
{
|
||||
std::vector<osd_num_t> history_set;
|
||||
for (auto pg_osd: hist_item.array_items())
|
||||
for (auto & pg_osd: hist_item.array_items())
|
||||
{
|
||||
history_set.push_back(pg_osd.uint64_value());
|
||||
osd_num_t pg_osd_num = pg_osd.uint64_value();
|
||||
if (pg_osd_num != 0)
|
||||
{
|
||||
auto it = std::lower_bound(history_set.begin(), history_set.end(), pg_osd_num);
|
||||
if (it == history_set.end() || *it != pg_osd_num)
|
||||
history_set.insert(it, pg_osd_num);
|
||||
}
|
||||
}
|
||||
pg_cfg.target_history.push_back(history_set);
|
||||
auto it = std::lower_bound(pg_cfg.target_history.begin(), pg_cfg.target_history.end(), history_set);
|
||||
if (it == pg_cfg.target_history.end() || *it != history_set)
|
||||
pg_cfg.target_history.insert(it, history_set);
|
||||
}
|
||||
// Include these additional OSDs when peering the PG
|
||||
for (auto pg_osd: value["all_peers"].array_items())
|
||||
{
|
||||
pg_cfg.all_peers.push_back(pg_osd.uint64_value());
|
||||
osd_num_t pg_osd_num = pg_osd.uint64_value();
|
||||
if (pg_osd_num != 0)
|
||||
{
|
||||
auto it = std::lower_bound(pg_cfg.all_peers.begin(), pg_cfg.all_peers.end(), pg_osd_num);
|
||||
if (it == pg_cfg.all_peers.end() || *it != pg_osd_num)
|
||||
pg_cfg.all_peers.insert(it, pg_osd_num);
|
||||
}
|
||||
}
|
||||
// Read epoch
|
||||
pg_cfg.epoch = value["epoch"].uint64_value();
|
||||
|
@@ -93,6 +93,7 @@ protected:
|
||||
bool rand_initialized = false;
|
||||
void add_etcd_url(std::string);
|
||||
void pick_next_etcd();
|
||||
json11::Json etcd_encode_actions(const json11::Json & items);
|
||||
public:
|
||||
int etcd_keepalive_timeout = 30;
|
||||
int etcd_ws_keepalive_interval = 30;
|
||||
|
@@ -163,6 +163,9 @@ void osd_t::parse_config(const json11::Json & config, bool allow_disk_params)
|
||||
recovery_queue_depth = config["recovery_queue_depth"].uint64_value();
|
||||
if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE)
|
||||
recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
|
||||
recovery_pg_switch = config["recovery_pg_switch"].uint64_value();
|
||||
if (recovery_pg_switch < 1)
|
||||
recovery_pg_switch = DEFAULT_RECOVERY_PG_SWITCH;
|
||||
recovery_sync_batch = config["recovery_sync_batch"].uint64_value();
|
||||
if (recovery_sync_batch < 1 || recovery_sync_batch > MAX_RECOVERY_QUEUE)
|
||||
recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
|
||||
|
@@ -34,6 +34,7 @@
|
||||
#define DEFAULT_AUTOSYNC_WRITES 128
|
||||
#define MAX_RECOVERY_QUEUE 2048
|
||||
#define DEFAULT_RECOVERY_QUEUE 4
|
||||
#define DEFAULT_RECOVERY_PG_SWITCH 128
|
||||
#define DEFAULT_RECOVERY_BATCH 16
|
||||
|
||||
//#define OSD_STUB
|
||||
@@ -108,6 +109,7 @@ class osd_t
|
||||
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // "emergency" sync every 5 seconds
|
||||
int autosync_writes = DEFAULT_AUTOSYNC_WRITES;
|
||||
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
|
||||
int recovery_pg_switch = DEFAULT_RECOVERY_PG_SWITCH;
|
||||
int recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
|
||||
int inode_vanish_time = 60;
|
||||
int log_level = 0;
|
||||
@@ -135,7 +137,10 @@ class osd_t
|
||||
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
|
||||
int peering_state = 0;
|
||||
std::map<object_id, osd_recovery_op_t> recovery_ops;
|
||||
int recovery_done = 0;
|
||||
bool recovery_last_degraded = true;
|
||||
pool_pg_num_t recovery_last_pg;
|
||||
object_id recovery_last_oid;
|
||||
int recovery_pg_done = 0, recovery_done = 0;
|
||||
osd_op_t *autosync_op = NULL;
|
||||
|
||||
// Unstable writes
|
||||
@@ -200,7 +205,6 @@ class osd_t
|
||||
bool check_peer_config(osd_client_t *cl, json11::Json conf);
|
||||
void repeer_pgs(osd_num_t osd_num);
|
||||
void start_pg_peering(pg_t & pg);
|
||||
void submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
|
||||
void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
|
||||
void discard_list_subop(osd_op_t *list_op);
|
||||
bool stop_pg(pg_t & pg);
|
||||
|
@@ -132,7 +132,7 @@ bool osd_t::check_peer_config(osd_client_t *cl, json11::Json conf)
|
||||
this->osd_num, immediate_commit == IMMEDIATE_ALL ? "all" : "small",
|
||||
cl->osd_num, conf["immediate_commit"].string_value().c_str()
|
||||
);
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
else if (conf["block_size"].uint64_value() != (uint64_t)this->bs_block_size)
|
||||
{
|
||||
@@ -140,7 +140,7 @@ bool osd_t::check_peer_config(osd_client_t *cl, json11::Json conf)
|
||||
"[OSD %lu] My block_size is %u, but peer OSD %lu has %lu. We can't work together\n",
|
||||
this->osd_num, this->bs_block_size, cl->osd_num, conf["block_size"].uint64_value()
|
||||
);
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
else if (conf["bitmap_granularity"].uint64_value() != (uint64_t)this->bs_bitmap_granularity)
|
||||
{
|
||||
@@ -148,7 +148,7 @@ bool osd_t::check_peer_config(osd_client_t *cl, json11::Json conf)
|
||||
"[OSD %lu] My bitmap_granularity is %u, but peer OSD %lu has %lu. We can't work together\n",
|
||||
this->osd_num, this->bs_bitmap_granularity, cl->osd_num, conf["bitmap_granularity"].uint64_value()
|
||||
);
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
@@ -382,30 +382,6 @@ void osd_t::on_change_etcd_state_hook(std::map<std::string, etcd_kv_t> & changes
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num)
|
||||
{
|
||||
auto pg_it = pgs.find({
|
||||
.pool_id = pool_id,
|
||||
.pg_num = pg_num,
|
||||
});
|
||||
if (pg_it != pgs.end() && pg_it->second.epoch > pg_it->second.reported_epoch &&
|
||||
st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg_it->second.epoch)
|
||||
{
|
||||
pg_it->second.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch;
|
||||
object_id oid = { 0 };
|
||||
bool first = true;
|
||||
for (auto op: pg_it->second.write_queue)
|
||||
{
|
||||
if (first || oid != op.first)
|
||||
{
|
||||
oid = op.first;
|
||||
first = false;
|
||||
continue_primary_write(op.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::on_load_config_hook(json11::Json::object & global_config)
|
||||
{
|
||||
json11::Json::object osd_config = this->config;
|
||||
@@ -704,13 +680,16 @@ void osd_t::apply_pg_config()
|
||||
}
|
||||
}
|
||||
}
|
||||
auto vec_all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end());
|
||||
if (currently_taken)
|
||||
{
|
||||
if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING | PG_REPEERING | PG_PEERED))
|
||||
{
|
||||
if (pg_it->second.target_set == pg_cfg.target_set)
|
||||
if (pg_it->second.target_set == pg_cfg.target_set &&
|
||||
pg_it->second.target_history == pg_cfg.target_history &&
|
||||
pg_it->second.all_peers == vec_all_peers)
|
||||
{
|
||||
// No change in osd_set; history changes are ignored
|
||||
// No change in osd_set and history
|
||||
continue;
|
||||
}
|
||||
else
|
||||
@@ -761,7 +740,7 @@ void osd_t::apply_pg_config()
|
||||
.pg_num = pg_num,
|
||||
.reported_epoch = pg_cfg.epoch,
|
||||
.target_history = pg_cfg.target_history,
|
||||
.all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end()),
|
||||
.all_peers = vec_all_peers,
|
||||
.target_set = pg_cfg.target_set,
|
||||
};
|
||||
if (pg.scheme == POOL_SCHEME_EC)
|
||||
|
@@ -226,42 +226,51 @@ bool osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
|
||||
|
||||
bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
|
||||
{
|
||||
if (!no_recovery)
|
||||
if (!pgs.size())
|
||||
{
|
||||
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
|
||||
{
|
||||
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
|
||||
{
|
||||
for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
|
||||
{
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
{
|
||||
op.degraded = true;
|
||||
op.oid = obj_it->first;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
if (!no_rebalance)
|
||||
// Restart scanning from the same degraded/misplaced status as the last time
|
||||
for (int tried_degraded = 0; tried_degraded < 2; tried_degraded++)
|
||||
{
|
||||
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
|
||||
if (recovery_last_degraded ? !no_recovery : !no_rebalance)
|
||||
{
|
||||
// Don't try to "recover" misplaced objects if "recovery" would make them degraded
|
||||
if ((pg_it->second.state & (PG_ACTIVE | PG_DEGRADED | PG_HAS_MISPLACED)) == (PG_ACTIVE | PG_HAS_MISPLACED))
|
||||
auto mask = recovery_last_degraded ? (PG_ACTIVE | PG_HAS_DEGRADED) : (PG_ACTIVE | PG_DEGRADED | PG_HAS_MISPLACED);
|
||||
auto check = recovery_last_degraded ? (PG_ACTIVE | PG_HAS_DEGRADED) : (PG_ACTIVE | PG_HAS_MISPLACED);
|
||||
// Restart scanning from the same PG as the last time
|
||||
for (auto pg_it = pgs.lower_bound(recovery_last_pg); pg_it != pgs.end(); pg_it++)
|
||||
{
|
||||
for (auto obj_it = pg_it->second.misplaced_objects.begin(); obj_it != pg_it->second.misplaced_objects.end(); obj_it++)
|
||||
if ((pg_it->second.state & mask) == check)
|
||||
{
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
auto & src = recovery_last_degraded ? pg_it->second.degraded_objects : pg_it->second.misplaced_objects;
|
||||
assert(src.size() > 0);
|
||||
// Restart scanning from the next object
|
||||
for (auto obj_it = src.upper_bound(recovery_last_oid); obj_it != src.end(); obj_it++)
|
||||
{
|
||||
op.degraded = false;
|
||||
op.oid = obj_it->first;
|
||||
return true;
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
{
|
||||
op.degraded = recovery_last_degraded;
|
||||
recovery_last_oid = op.oid = obj_it->first;
|
||||
recovery_pg_done++;
|
||||
// Switch to another PG after recovery_pg_switch operations
|
||||
// to always mix all PGs during recovery but still benefit
|
||||
// from recovery queue depth greater than 1
|
||||
if (recovery_pg_done >= recovery_pg_switch)
|
||||
{
|
||||
recovery_pg_done = 0;
|
||||
recovery_last_pg.pg_num++;
|
||||
recovery_last_oid = {};
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
recovery_last_degraded = !recovery_last_degraded;
|
||||
recovery_last_pg = {};
|
||||
recovery_last_oid = {};
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
10
src/osd_id.h
10
src/osd_id.h
@@ -28,3 +28,13 @@ inline bool operator < (const pool_pg_num_t & a, const pool_pg_num_t & b)
|
||||
{
|
||||
return a.pool_id < b.pool_id || a.pool_id == b.pool_id && a.pg_num < b.pg_num;
|
||||
}
|
||||
|
||||
inline bool operator == (const pool_pg_num_t & a, const pool_pg_num_t & b)
|
||||
{
|
||||
return a.pool_id == b.pool_id && a.pg_num == b.pg_num;
|
||||
}
|
||||
|
||||
inline bool operator != (const pool_pg_num_t & a, const pool_pg_num_t & b)
|
||||
{
|
||||
return a.pool_id != b.pool_id || a.pg_num != b.pg_num;
|
||||
}
|
||||
|
@@ -32,7 +32,16 @@ void osd_t::handle_peers()
|
||||
if (p.second.state & PG_HAS_UNCLEAN)
|
||||
peering_state = peering_state | OSD_FLUSHING_PGS;
|
||||
else if (p.second.state & (PG_HAS_DEGRADED | PG_HAS_MISPLACED))
|
||||
{
|
||||
peering_state = peering_state | OSD_RECOVERING;
|
||||
if (p.second.state & PG_HAS_DEGRADED)
|
||||
{
|
||||
// Restart recovery from degraded objects
|
||||
recovery_last_degraded = true;
|
||||
recovery_last_pg = {};
|
||||
recovery_last_oid = {};
|
||||
}
|
||||
}
|
||||
ringloop->wakeup();
|
||||
return;
|
||||
}
|
||||
@@ -302,82 +311,11 @@ void osd_t::start_pg_peering(pg_t & pg)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
submit_sync_and_list_subop(peer_osd, pg.peering_state);
|
||||
submit_list_subop(peer_osd, pg.peering_state);
|
||||
}
|
||||
ringloop->wakeup();
|
||||
}
|
||||
|
||||
void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
|
||||
{
|
||||
// Sync before listing, if not readonly
|
||||
if (readonly)
|
||||
{
|
||||
submit_list_subop(role_osd, ps);
|
||||
}
|
||||
else if (role_osd == this->osd_num)
|
||||
{
|
||||
// Self
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = 0;
|
||||
op->peer_fd = SELF_FD;
|
||||
clock_gettime(CLOCK_REALTIME, &op->tv_begin);
|
||||
op->bs_op = new blockstore_op_t();
|
||||
op->bs_op->opcode = BS_OP_SYNC;
|
||||
op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
|
||||
{
|
||||
if (bs_op->retval < 0)
|
||||
{
|
||||
printf("Local OP_SYNC failed: %d (%s)\n", bs_op->retval, strerror(-bs_op->retval));
|
||||
force_stop(1);
|
||||
return;
|
||||
}
|
||||
add_bs_subop_stats(op);
|
||||
delete op->bs_op;
|
||||
op->bs_op = NULL;
|
||||
delete op;
|
||||
ps->list_ops.erase(role_osd);
|
||||
submit_list_subop(role_osd, ps);
|
||||
};
|
||||
ps->list_ops[role_osd] = op;
|
||||
bs->enqueue_op(op->bs_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Peer
|
||||
auto & cl = msgr.clients.at(msgr.osd_peer_fds.at(role_osd));
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = OSD_OP_OUT;
|
||||
op->peer_fd = cl->peer_fd;
|
||||
op->req = (osd_any_op_t){
|
||||
.sec_sync = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = msgr.next_subop_id++,
|
||||
.opcode = OSD_OP_SEC_SYNC,
|
||||
},
|
||||
},
|
||||
};
|
||||
op->callback = [this, ps, role_osd](osd_op_t *op)
|
||||
{
|
||||
if (op->reply.hdr.retval < 0)
|
||||
{
|
||||
// FIXME: Mark peer as failed and don't reconnect immediately after dropping the connection
|
||||
printf("Failed to sync OSD %lu: %ld (%s), disconnecting peer\n", role_osd, op->reply.hdr.retval, strerror(-op->reply.hdr.retval));
|
||||
int fail_fd = op->peer_fd;
|
||||
ps->list_ops.erase(role_osd);
|
||||
delete op;
|
||||
msgr.stop_client(fail_fd);
|
||||
return;
|
||||
}
|
||||
delete op;
|
||||
ps->list_ops.erase(role_osd);
|
||||
submit_list_subop(role_osd, ps);
|
||||
};
|
||||
ps->list_ops[role_osd] = op;
|
||||
msgr.outbox_push(op);
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
|
||||
{
|
||||
if (role_osd == this->osd_num)
|
||||
@@ -551,13 +489,17 @@ void osd_t::report_pg_state(pg_t & pg)
|
||||
pg.history_changed = true;
|
||||
pg.target_history.clear();
|
||||
pg.all_peers = pg.target_set;
|
||||
std::sort(pg.all_peers.begin(), pg.all_peers.end());
|
||||
pg.cur_peers = pg.target_set;
|
||||
}
|
||||
else if (pg.state == (PG_ACTIVE|PG_LEFT_ON_DEAD))
|
||||
{
|
||||
// Clear history of active+left_on_dead PGs, but leave dead OSDs in all_peers
|
||||
pg.history_changed = true;
|
||||
pg.target_history.clear();
|
||||
if (pg.target_history.size())
|
||||
{
|
||||
pg.history_changed = true;
|
||||
pg.target_history.clear();
|
||||
}
|
||||
std::set<osd_num_t> dead_peers;
|
||||
for (auto pg_osd: pg.all_peers)
|
||||
{
|
||||
@@ -574,8 +516,12 @@ void osd_t::report_pg_state(pg_t & pg)
|
||||
dead_peers.insert(pg_osd);
|
||||
}
|
||||
}
|
||||
pg.all_peers.clear();
|
||||
pg.all_peers.insert(pg.all_peers.begin(), dead_peers.begin(), dead_peers.end());
|
||||
auto new_all_peers = std::vector<osd_num_t>(dead_peers.begin(), dead_peers.end());
|
||||
if (pg.all_peers != new_all_peers)
|
||||
{
|
||||
pg.history_changed = true;
|
||||
pg.all_peers = new_all_peers;
|
||||
}
|
||||
pg.cur_peers.clear();
|
||||
for (auto pg_osd: pg.target_set)
|
||||
{
|
||||
|
@@ -86,18 +86,9 @@ void pg_obj_state_check_t::walk()
|
||||
}
|
||||
if (pg->pg_cursize < pg->pg_size)
|
||||
{
|
||||
// Report PG history and activate
|
||||
// Activate as degraded
|
||||
// Current OSD set will be added into target_history on first write
|
||||
pg->state |= PG_DEGRADED | PG_PEERED;
|
||||
std::vector<osd_num_t> history_set;
|
||||
for (auto peer_osd: pg->cur_set)
|
||||
{
|
||||
if (peer_osd != 0)
|
||||
{
|
||||
history_set.push_back(peer_osd);
|
||||
}
|
||||
}
|
||||
pg->target_history.push_back(history_set);
|
||||
pg->history_changed = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -435,11 +426,35 @@ void pg_t::calc_object_states(int log_level)
|
||||
std::sort(st.list.begin(), st.list.end());
|
||||
// Walk over it and check object states
|
||||
st.walk();
|
||||
if (this->state & (PG_DEGRADED|PG_LEFT_ON_DEAD))
|
||||
if (this->state != PG_ACTIVE)
|
||||
{
|
||||
assert(epoch != (((uint64_t)1 << PG_EPOCH_BITS)-1));
|
||||
epoch++;
|
||||
}
|
||||
if (log_level > 0)
|
||||
{
|
||||
std::string osd_set_desc;
|
||||
for (auto & osd_num: target_set)
|
||||
{
|
||||
osd_set_desc += (osd_set_desc == "" ? "" : ", ")+std::to_string(osd_num);
|
||||
}
|
||||
printf(
|
||||
"[PG %u/%u] %lu clean objects on target OSD set %s\n",
|
||||
pool_id, pg_num, clean_count, osd_set_desc.c_str()
|
||||
);
|
||||
for (auto & stp: state_dict)
|
||||
{
|
||||
osd_set_desc = "";
|
||||
for (auto & loc: stp.first)
|
||||
{
|
||||
osd_set_desc += (osd_set_desc == "" ? "" : ", ")+
|
||||
std::to_string(loc.osd_num)+
|
||||
(st.replicated ? "" : "("+std::to_string(loc.role)+")")+
|
||||
(loc.outdated ? "(old)" : "");
|
||||
}
|
||||
printf("[PG %u/%u] %lu objects on OSD set %s\n", pool_id, pg_num, stp.second.object_count, osd_set_desc.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void pg_t::print_state()
|
||||
|
@@ -228,7 +228,7 @@ resume_1:
|
||||
resume_2:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
finish_op(cur_op, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
cur_op->reply.rw.version = op_data->fact_ver;
|
||||
@@ -350,7 +350,7 @@ resume_2:
|
||||
resume_3:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
// Check CAS version
|
||||
@@ -371,7 +371,7 @@ resume_4:
|
||||
resume_5:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
// Remove version override
|
||||
|
@@ -24,7 +24,7 @@ struct osd_primary_op_data_t
|
||||
uint64_t target_ver;
|
||||
uint64_t orig_ver = 0, fact_ver = 0;
|
||||
uint64_t scheme = 0;
|
||||
int n_subops = 0, done = 0, errors = 0, epipe = 0;
|
||||
int n_subops = 0, done = 0, errors = 0, errcode = 0;
|
||||
int degraded = 0, pg_size, pg_data_size;
|
||||
osd_rmw_stripe_t *stripes;
|
||||
osd_op_t *subops = NULL;
|
||||
|
@@ -42,7 +42,7 @@ resume_4:
|
||||
{
|
||||
free(op_data->chain_reads);
|
||||
op_data->chain_reads = NULL;
|
||||
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
finish_op(cur_op, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
send_chained_read_results(pg, cur_op);
|
||||
|
@@ -122,7 +122,7 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, const ui
|
||||
zero_read = -1;
|
||||
osd_op_t *subops = new osd_op_t[n_subops];
|
||||
op_data->fact_ver = 0;
|
||||
op_data->done = op_data->errors = 0;
|
||||
op_data->done = op_data->errors = op_data->errcode = 0;
|
||||
op_data->n_subops = n_subops;
|
||||
op_data->subops = subops;
|
||||
int sent = submit_primary_subop_batch(submit_type, op_data->oid.inode, op_version, op_data->stripes, osd_set, cur_op, 0, zero_read);
|
||||
@@ -263,9 +263,11 @@ void osd_t::handle_primary_bs_subop(osd_op_t *subop)
|
||||
blockstore_op_t *bs_op = subop->bs_op;
|
||||
int expected = bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE
|
||||
|| bs_op->opcode == BS_OP_WRITE_STABLE ? bs_op->len : 0;
|
||||
if (bs_op->retval != expected && bs_op->opcode != BS_OP_READ)
|
||||
if (bs_op->retval != expected && bs_op->opcode != BS_OP_READ &&
|
||||
(bs_op->opcode != BS_OP_WRITE && bs_op->opcode != BS_OP_WRITE_STABLE ||
|
||||
bs_op->retval != -ENOSPC))
|
||||
{
|
||||
// die
|
||||
// die on any error except ENOSPC
|
||||
throw std::runtime_error(
|
||||
"local blockstore modification failed (opcode = "+std::to_string(bs_op->opcode)+
|
||||
" retval = "+std::to_string(bs_op->retval)+")"
|
||||
@@ -276,6 +278,8 @@ void osd_t::handle_primary_bs_subop(osd_op_t *subop)
|
||||
subop->reply.hdr.retval = bs_op->retval;
|
||||
if (bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE || bs_op->opcode == BS_OP_WRITE_STABLE)
|
||||
{
|
||||
subop->req.sec_rw.oid = bs_op->oid;
|
||||
subop->req.sec_rw.version = bs_op->version;
|
||||
subop->req.sec_rw.len = bs_op->len;
|
||||
subop->reply.sec_rw.version = bs_op->version;
|
||||
}
|
||||
@@ -337,14 +341,17 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
|
||||
osd_op_names[opcode], subop->peer_fd, retval, expected
|
||||
);
|
||||
}
|
||||
if (retval == -EPIPE)
|
||||
// Error priority: EIO > ENOSPC > EPIPE
|
||||
if (op_data->errcode == 0 || retval == -EIO ||
|
||||
retval == -ENOSPC && op_data->errcode == -EPIPE)
|
||||
{
|
||||
op_data->epipe++;
|
||||
op_data->errcode = retval;
|
||||
}
|
||||
op_data->errors++;
|
||||
if (subop->peer_fd >= 0)
|
||||
if (subop->peer_fd >= 0 && (opcode != OSD_OP_SEC_WRITE && opcode != OSD_OP_SEC_WRITE_STABLE ||
|
||||
retval != -ENOSPC))
|
||||
{
|
||||
// Drop connection on any error
|
||||
// Drop connection on any error expect ENOSPC
|
||||
msgr.stop_client(subop->peer_fd);
|
||||
}
|
||||
}
|
||||
@@ -408,7 +415,8 @@ void osd_t::cancel_primary_write(osd_op_t *cur_op)
|
||||
// are sent to peer OSDs, so we can't just throw them away.
|
||||
// Mark them with an extra EPIPE.
|
||||
cur_op->op_data->errors++;
|
||||
cur_op->op_data->epipe++;
|
||||
if (cur_op->op_data->errcode == 0)
|
||||
cur_op->op_data->errcode = -EPIPE;
|
||||
cur_op->op_data->done--; // Caution: `done` must be signed because may become -1 here
|
||||
}
|
||||
else
|
||||
@@ -460,7 +468,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
op_data->n_subops = chunks_to_delete_count;
|
||||
op_data->done = op_data->errors = 0;
|
||||
op_data->done = op_data->errors = op_data->errcode = 0;
|
||||
if (!op_data->n_subops)
|
||||
{
|
||||
return;
|
||||
@@ -523,7 +531,7 @@ int osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
int n_osds = op_data->dirty_osd_count;
|
||||
osd_op_t *subops = new osd_op_t[n_osds];
|
||||
op_data->done = op_data->errors = 0;
|
||||
op_data->done = op_data->errors = op_data->errcode = 0;
|
||||
op_data->n_subops = n_osds;
|
||||
op_data->subops = subops;
|
||||
std::map<uint64_t, int>::iterator peer_it;
|
||||
@@ -579,7 +587,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
int n_osds = op_data->unstable_write_osds->size();
|
||||
osd_op_t *subops = new osd_op_t[n_osds];
|
||||
op_data->done = op_data->errors = 0;
|
||||
op_data->done = op_data->errors = op_data->errcode = 0;
|
||||
op_data->n_subops = n_osds;
|
||||
op_data->subops = subops;
|
||||
for (int i = 0; i < n_osds; i++)
|
||||
|
@@ -240,7 +240,7 @@ resume_8:
|
||||
}
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
finish_op(cur_op, op_data->errcode);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@@ -93,7 +93,7 @@ resume_2:
|
||||
resume_3:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
// Check CAS version
|
||||
@@ -155,17 +155,36 @@ resume_3:
|
||||
if (pg.epoch > pg.reported_epoch)
|
||||
{
|
||||
// Report newer epoch before writing
|
||||
// FIXME: We may report only one PG state here...
|
||||
// FIXME: We don't have to report all changed PG states here
|
||||
this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
||||
pg.history_changed = true;
|
||||
if (pg.state != PG_ACTIVE)
|
||||
{
|
||||
// Check that current OSD set is in history and/or add it there
|
||||
std::vector<osd_num_t> history_set;
|
||||
for (auto peer_osd: pg.cur_set)
|
||||
if (peer_osd != 0)
|
||||
history_set.push_back(peer_osd);
|
||||
std::sort(history_set.begin(), history_set.end());
|
||||
auto it = std::lower_bound(pg.target_history.begin(), pg.target_history.end(), history_set);
|
||||
if (it == pg.target_history.end() || *it != history_set)
|
||||
pg.target_history.insert(it, history_set);
|
||||
pg.history_changed = true;
|
||||
}
|
||||
report_pg_states();
|
||||
resume_10:
|
||||
if (pg.epoch > pg.reported_epoch)
|
||||
{
|
||||
op_data->st = 10;
|
||||
#define PG_EPOCH_WAIT_STATE 10
|
||||
op_data->st = PG_EPOCH_WAIT_STATE;
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Recheck PG state after reporting history - maybe it's already stopping/restarting
|
||||
if (pg.state & (PG_STOPPING|PG_REPEERING))
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, -EPIPE);
|
||||
return;
|
||||
}
|
||||
submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.cur_set.data(), cur_op);
|
||||
resume_4:
|
||||
op_data->st = 4;
|
||||
@@ -178,7 +197,7 @@ resume_5:
|
||||
}
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
if (op_data->object_state)
|
||||
@@ -255,7 +274,7 @@ resume_8:
|
||||
resume_9:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -287,6 +306,50 @@ continue_others:
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num)
|
||||
{
|
||||
auto pg_it = pgs.find({
|
||||
.pool_id = pool_id,
|
||||
.pg_num = pg_num,
|
||||
});
|
||||
if (pg_it == pgs.end())
|
||||
{
|
||||
return;
|
||||
}
|
||||
auto & pg = pg_it->second;
|
||||
if (pg.epoch > pg.reported_epoch &&
|
||||
st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg.epoch)
|
||||
{
|
||||
pg.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch;
|
||||
std::vector<object_id> resume_oids;
|
||||
for (auto & op: pg.write_queue)
|
||||
{
|
||||
if (op.second->op_data->st == PG_EPOCH_WAIT_STATE)
|
||||
{
|
||||
// Run separately to prevent side effects
|
||||
resume_oids.push_back(op.first);
|
||||
}
|
||||
}
|
||||
for (auto & oid: resume_oids)
|
||||
{
|
||||
auto pg_it = pgs.find({
|
||||
.pool_id = pool_id,
|
||||
.pg_num = pg_num,
|
||||
});
|
||||
if (pg_it != pgs.end())
|
||||
{
|
||||
auto & pg = pg_it->second;
|
||||
auto op_it = pg.write_queue.find(oid);
|
||||
if (op_it != pg.write_queue.end() &&
|
||||
op_it->second->op_data->st == PG_EPOCH_WAIT_STATE)
|
||||
{
|
||||
continue_primary_write(op_it->second);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
@@ -337,7 +400,7 @@ resume_7:
|
||||
op_data->unstable_write_osds = NULL;
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
180
src/osd_rmw.cpp
180
src/osd_rmw.cpp
@@ -154,6 +154,8 @@ struct reed_sol_matrix_t
|
||||
int refs = 0;
|
||||
int *je_data;
|
||||
uint8_t *isal_data;
|
||||
// 32 bytes = 256/8 = max pg_size/8
|
||||
std::map<std::array<uint8_t, 32>, void*> subdata;
|
||||
std::map<reed_sol_erased_t, void*> decodings;
|
||||
};
|
||||
|
||||
@@ -194,6 +196,12 @@ void use_ec(int pg_size, int pg_minsize, bool use)
|
||||
free(rs_it->second.je_data);
|
||||
if (rs_it->second.isal_data)
|
||||
free(rs_it->second.isal_data);
|
||||
for (auto sub_it = rs_it->second.subdata.begin(); sub_it != rs_it->second.subdata.end();)
|
||||
{
|
||||
void *data = sub_it->second;
|
||||
rs_it->second.subdata.erase(sub_it++);
|
||||
free(data);
|
||||
}
|
||||
for (auto dec_it = rs_it->second.decodings.begin(); dec_it != rs_it->second.decodings.end();)
|
||||
{
|
||||
void *data = dec_it->second;
|
||||
@@ -294,6 +302,47 @@ static void* get_jerasure_decoding_matrix(osd_rmw_stripe_t *stripes, int pg_size
|
||||
return dec_it->second;
|
||||
}
|
||||
|
||||
#ifndef WITH_ISAL
|
||||
#define JERASURE_ALIGNMENT 16
|
||||
|
||||
// jerasure requires 16-byte alignment for SSE...
|
||||
// FIXME: jerasure/gf-complete should probably be patched to automatically choose non-sse version for unaligned buffers
|
||||
static void jerasure_matrix_encode_unaligned(int k, int m, int w, int *matrix, char **data_ptrs, char **coding_ptrs, int size)
|
||||
{
|
||||
bool unaligned = false;
|
||||
for (int i = 0; i < k; i++)
|
||||
if (((unsigned long)data_ptrs[i]) % JERASURE_ALIGNMENT)
|
||||
unaligned = true;
|
||||
for (int i = 0; i < m; i++)
|
||||
if (((unsigned long)coding_ptrs[i]) % JERASURE_ALIGNMENT)
|
||||
unaligned = true;
|
||||
if (!unaligned)
|
||||
{
|
||||
jerasure_matrix_encode(k, m, w, matrix, data_ptrs, coding_ptrs, size);
|
||||
return;
|
||||
}
|
||||
int aligned_size = ((size+JERASURE_ALIGNMENT-1)/JERASURE_ALIGNMENT)*JERASURE_ALIGNMENT;
|
||||
int copy_size = aligned_size*(k+m);
|
||||
char local_data[copy_size > 4096 ? 0 : copy_size];
|
||||
char *data_copy = copy_size > 4096 || (unsigned long)local_data % JERASURE_ALIGNMENT
|
||||
? (char*)memalign_or_die(JERASURE_ALIGNMENT, aligned_size*(k+m))
|
||||
: local_data;
|
||||
char *aligned_ptrs[k+m];
|
||||
for (int i = 0; i < k; i++)
|
||||
{
|
||||
memcpy(data_copy + i*aligned_size, data_ptrs[i], size);
|
||||
aligned_ptrs[i] = data_copy + i*aligned_size;
|
||||
}
|
||||
for (int i = 0; i < m; i++)
|
||||
aligned_ptrs[k+i] = data_copy + (k+i)*aligned_size;
|
||||
jerasure_matrix_encode(k, m, w, matrix, aligned_ptrs, aligned_ptrs+k, size);
|
||||
for (int i = 0; i < m; i++)
|
||||
memcpy(coding_ptrs[i], aligned_ptrs[k+i], size);
|
||||
if (copy_size > 4096 || (unsigned long)local_data % JERASURE_ALIGNMENT)
|
||||
free(data_copy);
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_ISAL
|
||||
void reconstruct_stripes_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize, uint32_t bitmap_size)
|
||||
{
|
||||
@@ -357,10 +406,12 @@ void reconstruct_stripes_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsi
|
||||
{
|
||||
data_ptrs[role] = NULL;
|
||||
}
|
||||
bool recovered = false;
|
||||
for (int role = 0; role < pg_minsize; role++)
|
||||
{
|
||||
if (stripes[role].read_end != 0 && stripes[role].missing)
|
||||
{
|
||||
recovered = true;
|
||||
if (stripes[role].read_end > stripes[role].read_start)
|
||||
{
|
||||
for (int other = 0; other < pg_size; other++)
|
||||
@@ -378,18 +429,64 @@ void reconstruct_stripes_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsi
|
||||
data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start
|
||||
);
|
||||
}
|
||||
for (int other = 0; other < pg_size; other++)
|
||||
}
|
||||
}
|
||||
if (recovered && bitmap_size > 0)
|
||||
{
|
||||
bool unaligned = false;
|
||||
for (int role = 0; role < pg_size; role++)
|
||||
{
|
||||
if (stripes[role].read_end != 0)
|
||||
{
|
||||
if (stripes[other].read_end != 0 && !stripes[other].missing)
|
||||
data_ptrs[role] = (char*)stripes[role].bmp_buf;
|
||||
if (((unsigned long)stripes[role].bmp_buf) % JERASURE_ALIGNMENT)
|
||||
unaligned = true;
|
||||
}
|
||||
}
|
||||
if (!unaligned)
|
||||
{
|
||||
for (int role = 0; role < pg_minsize; role++)
|
||||
{
|
||||
if (stripes[role].read_end != 0 && stripes[role].missing)
|
||||
{
|
||||
data_ptrs[other] = (char*)(stripes[other].bmp_buf);
|
||||
jerasure_matrix_dotprod(
|
||||
pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
|
||||
data_ptrs, data_ptrs+pg_minsize, bitmap_size
|
||||
);
|
||||
}
|
||||
}
|
||||
data_ptrs[role] = (char*)stripes[role].bmp_buf;
|
||||
jerasure_matrix_dotprod(
|
||||
pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
|
||||
data_ptrs, data_ptrs+pg_minsize, bitmap_size
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
// jerasure_matrix_dotprod requires 16-byte alignment for SSE...
|
||||
int aligned_size = ((bitmap_size+JERASURE_ALIGNMENT-1)/JERASURE_ALIGNMENT)*JERASURE_ALIGNMENT;
|
||||
int copy_size = aligned_size*pg_size;
|
||||
char local_data[copy_size > 4096 ? 0 : copy_size];
|
||||
bool alloc_copy = copy_size > 4096 || (unsigned long)local_data % JERASURE_ALIGNMENT;
|
||||
char *data_copy = alloc_copy
|
||||
? (char*)memalign_or_die(JERASURE_ALIGNMENT, copy_size)
|
||||
: local_data;
|
||||
for (int role = 0; role < pg_size; role++)
|
||||
{
|
||||
if (stripes[role].read_end != 0)
|
||||
{
|
||||
data_ptrs[role] = data_copy + role*aligned_size;
|
||||
memcpy(data_ptrs[role], stripes[role].bmp_buf, bitmap_size);
|
||||
}
|
||||
}
|
||||
for (int role = 0; role < pg_size; role++)
|
||||
{
|
||||
if (stripes[role].read_end != 0 && stripes[role].missing)
|
||||
{
|
||||
jerasure_matrix_dotprod(
|
||||
pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
|
||||
data_ptrs, data_ptrs+pg_minsize, bitmap_size
|
||||
);
|
||||
memcpy(stripes[role].bmp_buf, data_ptrs[role], bitmap_size);
|
||||
}
|
||||
}
|
||||
if (alloc_copy)
|
||||
free(data_copy);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -808,11 +905,56 @@ void calc_rmw_parity_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
|
||||
if (end != 0)
|
||||
{
|
||||
int write_parity = 0;
|
||||
for (int i = pg_minsize; i < pg_size; i++)
|
||||
bool is_seq = true;
|
||||
for (int i = pg_size-1; i >= pg_minsize; i--)
|
||||
{
|
||||
if (write_osd_set[i] != 0)
|
||||
write_parity++;
|
||||
else if (write_parity != 0)
|
||||
is_seq = false;
|
||||
}
|
||||
if (write_parity > 0)
|
||||
{
|
||||
// First get the coding matrix or sub-matrix
|
||||
void *matrix_data =
|
||||
#ifdef WITH_ISAL
|
||||
matrix->isal_data;
|
||||
#else
|
||||
matrix->je_data;
|
||||
#endif
|
||||
if (!is_seq)
|
||||
{
|
||||
// We need a coding sub-matrix
|
||||
std::array<uint8_t, 32> missing_parity = {};
|
||||
for (int i = pg_minsize; i < pg_size; i++)
|
||||
{
|
||||
if (!write_osd_set[i])
|
||||
missing_parity[(i-pg_minsize) >> 3] |= (1 << ((i-pg_minsize) & 0x7));
|
||||
}
|
||||
auto sub_it = matrix->subdata.find(missing_parity);
|
||||
if (sub_it == matrix->subdata.end())
|
||||
{
|
||||
int item_size =
|
||||
#ifdef WITH_ISAL
|
||||
32;
|
||||
#else
|
||||
sizeof(int);
|
||||
#endif
|
||||
void *subm = malloc_or_die(item_size * write_parity * pg_minsize);
|
||||
for (int i = pg_minsize, j = 0; i < pg_size; i++)
|
||||
{
|
||||
if (write_osd_set[i])
|
||||
{
|
||||
memcpy(subm + item_size*pg_minsize*j, matrix_data + item_size*pg_minsize*(i-pg_minsize), item_size*pg_minsize);
|
||||
j++;
|
||||
}
|
||||
}
|
||||
matrix->subdata[missing_parity] = subm;
|
||||
matrix_data = subm;
|
||||
}
|
||||
else
|
||||
matrix_data = sub_it->second;
|
||||
}
|
||||
// Calculate new coding chunks
|
||||
buf_len_t bufs[pg_size][3];
|
||||
int nbuf[pg_size], curbuf[pg_size];
|
||||
@@ -841,13 +983,13 @@ void calc_rmw_parity_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
|
||||
while (pos < end)
|
||||
{
|
||||
uint32_t next_end = end;
|
||||
for (int i = 0; i < pg_size; i++)
|
||||
for (int i = 0, j = 0; i < pg_size; i++)
|
||||
{
|
||||
if (i < pg_minsize || write_osd_set[i] != 0)
|
||||
{
|
||||
assert(curbuf[i] < nbuf[i]);
|
||||
assert(bufs[i][curbuf[i]].buf);
|
||||
data_ptrs[i] = (uint8_t*)bufs[i][curbuf[i]].buf + pos-positions[i];
|
||||
data_ptrs[j++] = (uint8_t*)bufs[i][curbuf[i]].buf + pos-positions[i];
|
||||
uint32_t this_end = bufs[i][curbuf[i]].len + positions[i];
|
||||
if (next_end > this_end)
|
||||
next_end = this_end;
|
||||
@@ -868,32 +1010,30 @@ void calc_rmw_parity_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
|
||||
}
|
||||
#ifdef WITH_ISAL
|
||||
ec_encode_data(
|
||||
next_end-pos, pg_minsize, write_parity, matrix->isal_data,
|
||||
next_end-pos, pg_minsize, write_parity, (uint8_t*)matrix_data,
|
||||
(uint8_t**)data_ptrs, (uint8_t**)data_ptrs+pg_minsize
|
||||
);
|
||||
#else
|
||||
jerasure_matrix_encode(
|
||||
pg_minsize, write_parity, OSD_JERASURE_W, matrix->je_data,
|
||||
pg_minsize, write_parity, OSD_JERASURE_W, (int*)matrix_data,
|
||||
(char**)data_ptrs, (char**)data_ptrs+pg_minsize, next_end-pos
|
||||
);
|
||||
#endif
|
||||
pos = next_end;
|
||||
}
|
||||
for (int i = 0; i < pg_size; i++)
|
||||
for (int i = 0, j = 0; i < pg_size; i++)
|
||||
{
|
||||
if (i < pg_minsize || write_osd_set[i] != 0)
|
||||
{
|
||||
data_ptrs[i] = stripes[i].bmp_buf;
|
||||
}
|
||||
data_ptrs[j++] = stripes[i].bmp_buf;
|
||||
}
|
||||
#ifdef WITH_ISAL
|
||||
ec_encode_data(
|
||||
bitmap_size, pg_minsize, write_parity, matrix->isal_data,
|
||||
bitmap_size, pg_minsize, write_parity, (uint8_t*)matrix_data,
|
||||
(uint8_t**)data_ptrs, (uint8_t**)data_ptrs+pg_minsize
|
||||
);
|
||||
#else
|
||||
jerasure_matrix_encode(
|
||||
pg_minsize, write_parity, OSD_JERASURE_W, matrix->je_data,
|
||||
jerasure_matrix_encode_unaligned(
|
||||
pg_minsize, write_parity, OSD_JERASURE_W, (int*)matrix_data,
|
||||
(char**)data_ptrs, (char**)data_ptrs+pg_minsize, bitmap_size
|
||||
);
|
||||
#endif
|
||||
|
@@ -3,6 +3,10 @@
|
||||
|
||||
#define RMW_DEBUG
|
||||
|
||||
#ifdef NO_ISAL
|
||||
#undef WITH_ISAL
|
||||
#endif
|
||||
|
||||
#include <string.h>
|
||||
#include "osd_rmw.cpp"
|
||||
#include "test_pattern.h"
|
||||
@@ -21,6 +25,7 @@ void test12();
|
||||
void test13();
|
||||
void test14();
|
||||
void test15();
|
||||
void test16();
|
||||
|
||||
int main(int narg, char *args[])
|
||||
{
|
||||
@@ -50,6 +55,8 @@ int main(int narg, char *args[])
|
||||
test14();
|
||||
// Test 15
|
||||
test15();
|
||||
// Test 16
|
||||
test16();
|
||||
// End
|
||||
printf("all ok\n");
|
||||
return 0;
|
||||
@@ -876,3 +883,106 @@ void test15()
|
||||
free(write_buf);
|
||||
use_ec(3, 2, false);
|
||||
}
|
||||
|
||||
/***
|
||||
|
||||
16. EC 2+2 write one parity block with another missing
|
||||
calc_rmw(offset=0, len=0, osd_set=[1,2,0,0], write_set=[1,2,0,3])
|
||||
= {
|
||||
read: [ [ 0, 128K ], [ 0, 128K ], [ 0, 0 ], [ 0, 0 ] ],
|
||||
write: [ [ 0, 0 ], [ 0, 0 ], [ 0, 0 ], [ 0, 128K ] ],
|
||||
input buffer: [],
|
||||
rmw buffer: [ write3, read0, read1 ],
|
||||
}
|
||||
|
||||
***/
|
||||
|
||||
void test16()
|
||||
{
|
||||
const int bmp = 128*1024 / 4096 / 8;
|
||||
use_ec(4, 2, true);
|
||||
osd_num_t osd_set[4] = { 1, 2, 0, 0 };
|
||||
osd_num_t write_osd_set[4] = { 1, 2, 0, 3 };
|
||||
osd_rmw_stripe_t stripes[4] = {};
|
||||
unsigned bitmaps[4] = { 0 };
|
||||
// Test 16.0
|
||||
void *write_buf = NULL;
|
||||
split_stripes(2, 128*1024, 0, 0, stripes);
|
||||
assert(stripes[0].req_start == 0 && stripes[0].req_end == 0);
|
||||
assert(stripes[1].req_start == 0 && stripes[1].req_end == 0);
|
||||
assert(stripes[2].req_start == 0 && stripes[2].req_end == 0);
|
||||
assert(stripes[3].req_start == 0 && stripes[3].req_end == 0);
|
||||
// Test 16.1
|
||||
void *rmw_buf = calc_rmw(write_buf, stripes, osd_set, 4, 2, 3, write_osd_set, 128*1024, bmp);
|
||||
for (int i = 0; i < 4; i++)
|
||||
stripes[i].bmp_buf = bitmaps+i;
|
||||
assert(rmw_buf);
|
||||
assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024);
|
||||
assert(stripes[1].read_start == 0 && stripes[1].read_end == 128*1024);
|
||||
assert(stripes[2].read_start == 0 && stripes[2].read_end == 0);
|
||||
assert(stripes[3].read_start == 0 && stripes[3].read_end == 0);
|
||||
assert(stripes[0].write_start == 0 && stripes[0].write_end == 0);
|
||||
assert(stripes[1].write_start == 0 && stripes[1].write_end == 0);
|
||||
assert(stripes[2].write_start == 0 && stripes[2].write_end == 0);
|
||||
assert(stripes[3].write_start == 0 && stripes[3].write_end == 128*1024);
|
||||
assert(stripes[0].read_buf == (uint8_t*)rmw_buf+128*1024);
|
||||
assert(stripes[1].read_buf == (uint8_t*)rmw_buf+256*1024);
|
||||
assert(stripes[2].read_buf == NULL);
|
||||
assert(stripes[3].read_buf == NULL);
|
||||
assert(stripes[0].write_buf == NULL);
|
||||
assert(stripes[1].write_buf == NULL);
|
||||
assert(stripes[2].write_buf == NULL);
|
||||
assert(stripes[3].write_buf == rmw_buf);
|
||||
// Test 16.2 - encode
|
||||
set_pattern(stripes[0].read_buf, 128*1024, PATTERN1);
|
||||
set_pattern(stripes[1].read_buf, 128*1024, PATTERN2);
|
||||
memset(stripes[0].bmp_buf, 0xff, bmp);
|
||||
memset(stripes[1].bmp_buf, 0xff, bmp);
|
||||
calc_rmw_parity_ec(stripes, 4, 2, osd_set, write_osd_set, 128*1024, bmp);
|
||||
assert(*(uint32_t*)stripes[2].bmp_buf == 0);
|
||||
assert(*(uint32_t*)stripes[3].bmp_buf == 0xF1F1F1F1);
|
||||
assert(stripes[0].write_start == 0 && stripes[0].write_end == 0);
|
||||
assert(stripes[1].write_start == 0 && stripes[1].write_end == 0);
|
||||
assert(stripes[2].write_start == 0 && stripes[2].write_end == 0);
|
||||
assert(stripes[3].write_start == 0 && stripes[3].write_end == 128*1024);
|
||||
assert(stripes[0].write_buf == NULL);
|
||||
assert(stripes[1].write_buf == NULL);
|
||||
assert(stripes[2].write_buf == NULL);
|
||||
assert(stripes[3].write_buf == rmw_buf);
|
||||
check_pattern(stripes[3].write_buf, 128*1024, 0x7eb9ae9cd8e652c3); // 2nd EC chunk
|
||||
// Test 16.3 - decode and verify
|
||||
osd_num_t read_osd_set[4] = { 0, 2, 0, 3 };
|
||||
memset(stripes, 0, sizeof(stripes));
|
||||
split_stripes(2, 128*1024, 0, 256*1024, stripes);
|
||||
assert(stripes[0].req_start == 0 && stripes[0].req_end == 128*1024);
|
||||
assert(stripes[1].req_start == 0 && stripes[1].req_end == 128*1024);
|
||||
assert(stripes[2].req_start == 0 && stripes[2].req_end == 0);
|
||||
assert(stripes[3].req_start == 0 && stripes[3].req_end == 0);
|
||||
for (int role = 0; role < 4; role++)
|
||||
{
|
||||
stripes[role].read_start = stripes[role].req_start;
|
||||
stripes[role].read_end = stripes[role].req_end;
|
||||
}
|
||||
assert(extend_missing_stripes(stripes, read_osd_set, 2, 4) == 0);
|
||||
assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024);
|
||||
assert(stripes[1].read_start == 0 && stripes[1].read_end == 128*1024);
|
||||
assert(stripes[2].read_start == 0 && stripes[2].read_end == 0);
|
||||
assert(stripes[3].read_start == 0 && stripes[3].read_end == 128*1024);
|
||||
void *read_buf = alloc_read_buffer(stripes, 4, 0);
|
||||
for (int i = 0; i < 4; i++)
|
||||
stripes[i].bmp_buf = bitmaps+i;
|
||||
assert(read_buf);
|
||||
assert(stripes[0].read_buf == read_buf);
|
||||
assert(stripes[1].read_buf == (uint8_t*)read_buf+128*1024);
|
||||
assert(stripes[3].read_buf == (uint8_t*)read_buf+2*128*1024);
|
||||
set_pattern(stripes[1].read_buf, 128*1024, PATTERN2);
|
||||
memcpy(stripes[3].read_buf, rmw_buf, 128*1024);
|
||||
reconstruct_stripes_ec(stripes, 4, 2, bmp);
|
||||
assert(bitmaps[0] == 0xFFFFFFFF);
|
||||
check_pattern(stripes[0].read_buf, 128*1024, PATTERN1);
|
||||
free(read_buf);
|
||||
// Done
|
||||
free(rmw_buf);
|
||||
free(write_buf);
|
||||
use_ec(3, 2, false);
|
||||
}
|
||||
|
Reference in New Issue
Block a user