Compare commits

...

26 Commits

Author SHA1 Message Date
dc75450f6d WIP Hide base64 encoding/decoding inside etcd_state_client 2023-01-06 17:27:09 +03:00
373f9d0387 Try to re-peer PGs on history change 2023-01-06 12:46:44 +03:00
c4516ea971 Also remove deleted OSD from PG configuration and last_clean_pgs 2023-01-06 12:46:44 +03:00
91065c80fc Try to prevent left_on_dead when deleting OSDs by removing them from PG history 2023-01-06 12:46:43 +03:00
0f6b946add Time changes with every stat change, do not schedule checks based on it 2023-01-05 13:54:16 +03:00
465cbf0b2f Do not re-schedule recheck indefinitely, run it after mon_change_timeout in any case 2023-01-05 13:48:06 +03:00
41add50e4e Track last_clean_pgs on a per-pool basis 2023-01-03 02:20:50 +03:00
02e7be7dc9 Prevent reenterability side effects during PG history operation resume 2023-01-03 02:20:50 +03:00
73940adf07 Prioritize EC (non-instantly-stable) operations under journal pressure
This reduces the probability of hitting OSD stalls with EC due to "deadlocks"
where two parallel write operations wait for each other to complete
2023-01-03 00:05:45 +03:00
e950c024d3 Do not sync peer OSDs before listing
Sync before listing was added to wait for all PG writes possibly left in queue
from the previous master to finish before listing it

But in fact it may block the cluster when EC is used and some unstable writes
are left in the queue - they block journal flushing, rollback/stabilize is
required to unblock them, but rollback/stabilize may only happen after PG is
peered. But peering needs listings, listings are requested only after sync, and
sync itself waits for currently blocked writes waiting in the queue
2023-01-03 00:05:45 +03:00
71d6d9f868 Fix possible crash on ENOSPC during operation cancel in blockstore 2023-01-03 00:05:45 +03:00
a4dfa519af Report PG history synchronously during write
This has 2 effects:
1) OSD sets aren't added into PG history until actual write attempts anymore
   which removes unneeded extra osd_sets in PG history
2) New OSD sets are reported synchronously and can't be lost on PG restarts
   happening at the same time with reconfiguration
2023-01-01 23:41:05 +03:00
37a6aff2fa Write OSD numbers always as numbers in mon 2023-01-01 23:17:42 +03:00
67019f5b02 Make OSD sort & sanitize PG history items 2023-01-01 23:17:42 +03:00
0593e5c21c Fix OSD peer config safety check 2022-12-31 02:24:42 +03:00
998e24adf8 Add a new recovery_pg_switch setting to mix all PGs during recovery 2022-12-30 02:03:33 +03:00
d7bd36dc32 Fix another rare journal flush stall 2022-12-30 02:03:33 +03:00
cf5c562800 Log all object locations when peering PGs 2022-12-30 02:03:33 +03:00
629200b0cc Return ENOSPC as the primary OSD 2022-12-30 02:03:33 +03:00
3589ccec22 Do not disconnect peer on ENOSPC during write 2022-12-30 01:54:25 +03:00
8d55a1e780 Build osd_rmw_test both with and without ISA-L 2022-12-29 19:13:57 +03:00
65f6b3a4eb Fix jerasure crashing on bitmap calculation/restoration due to the lack of 16-byte alignment 2022-12-29 19:13:57 +03:00
fd216eac77 Add a test for missing parity chunk calculation 2022-12-29 19:13:57 +03:00
61fca7c426 Fix crash when calculating a parity chunk with previous parity chunk missing (test coming shortly) 2022-12-29 19:13:57 +03:00
1c29ed80b9 Fix quote in docs :) 2022-12-28 18:08:53 +03:00
68f3fb795e Suppress warnings in vitastor-disk purge correctly 2022-12-27 11:09:19 +03:00
34 changed files with 917 additions and 247 deletions

View File

@@ -17,6 +17,7 @@ initialization and can be changed with an OSD restart.
- [autosync_interval](#autosync_interval)
- [autosync_writes](#autosync_writes)
- [recovery_queue_depth](#recovery_queue_depth)
- [recovery_pg_switch](#recovery_pg_switch)
- [recovery_sync_batch](#recovery_sync_batch)
- [readonly](#readonly)
- [no_recovery](#no_recovery)
@@ -115,6 +116,16 @@ Maximum recovery operations per one primary OSD at any given moment of time.
Currently it's the only parameter available to tune the speed or recovery
and rebalancing, but it's planned to implement more.
## recovery_pg_switch
- Type: integer
- Default: 128
Number of recovery operations before switching to recovery of the next PG.
The idea is to mix all PGs during recovery for more even space and load
distribution but still benefit from recovery queue depth greater than 1.
Degraded PGs are anyway scanned first.
## recovery_sync_batch
- Type: integer

View File

@@ -18,6 +18,7 @@
- [autosync_interval](#autosync_interval)
- [autosync_writes](#autosync_writes)
- [recovery_queue_depth](#recovery_queue_depth)
- [recovery_pg_switch](#recovery_pg_switch)
- [recovery_sync_batch](#recovery_sync_batch)
- [readonly](#readonly)
- [no_recovery](#no_recovery)
@@ -119,6 +120,17 @@ OSD, чтобы успевать очищать журнал - без них OSD
для ускорения или замедления восстановления и перебалансировки данных, но
в планах реализация других параметров.
## recovery_pg_switch
- Тип: целое число
- Значение по умолчанию: 128
Число операций восстановления перед переключением на восстановление другой PG.
Идея заключается в том, чтобы восстанавливать все PG одновременно для более
равномерного распределения места и нагрузки, но при этом всё равно выигрывать
от глубины очереди восстановления, большей, чем 1. Деградированные PG в любом
случае сканируются первыми.
## recovery_sync_batch
- Тип: целое число

View File

@@ -102,6 +102,20 @@
момент времени. На данный момент единственный параметр, который можно менять
для ускорения или замедления восстановления и перебалансировки данных, но
в планах реализация других параметров.
- name: recovery_pg_switch
type: int
default: 128
info: |
Number of recovery operations before switching to recovery of the next PG.
The idea is to mix all PGs during recovery for more even space and load
distribution but still benefit from recovery queue depth greater than 1.
Degraded PGs are anyway scanned first.
info_ru: |
Число операций восстановления перед переключением на восстановление другой PG.
Идея заключается в том, чтобы восстанавливать все PG одновременно для более
равномерного распределения места и нагрузки, но при этом всё равно выигрывать
от глубины очереди восстановления, большей, чем 1. Деградированные PG в любом
случае сканируются первыми.
- name: recovery_sync_batch
type: int
default: 16

View File

@@ -70,7 +70,7 @@ For EC pools the configuration should look like the following:
```
etcdctl --endpoints=... put /vitastor/config/pools '{"2":{"name":"ecpool",
"scheme":"ec","pg_size":4,"parity_chunks":2,"pg_minsize":2,"pg_count":256,"failure_domain":"host"}`
"scheme":"ec","pg_size":4,"parity_chunks":2,"pg_minsize":2,"pg_count":256,"failure_domain":"host"}'
```
After you do this, one of the monitors will configure PGs and OSDs will start them.

View File

@@ -71,7 +71,7 @@ etcdctl --endpoints=... put /vitastor/config/pools '{"1":{"name":"testpool",
```
etcdctl --endpoints=... put /vitastor/config/pools '{"2":{"name":"ecpool",
"scheme":"ec","pg_size":4,"parity_chunks":2,"pg_minsize":2,"pg_count":256,"failure_domain":"host"}`
"scheme":"ec","pg_size":4,"parity_chunks":2,"pg_minsize":2,"pg_count":256,"failure_domain":"host"}'
```
После этого один из мониторов должен сконфигурировать PG, а OSD должны запустить их.

View File

@@ -21,7 +21,7 @@ function add_pg_history(new_pg_history, new_pg, prev_pgs, prev_pg_history, old_p
{
for (const pg of oh.osd_sets)
{
nh.osd_sets[pg.join(' ')] = pg;
nh.osd_sets[pg.join(' ')] = pg.map(osd_num => Number(osd_num));
}
}
if (oh && oh.all_peers && oh.all_peers.length)

View File

@@ -663,12 +663,15 @@ class Mon
async save_last_clean()
{
// last_clean_pgs is used to avoid extra data move when observing a series of changes in the cluster
const new_clean_pgs = { items: {} };
next_pool:
for (const pool_id in this.state.config.pools)
{
new_clean_pgs.items[pool_id] = (this.state.history.last_clean_pgs.items||{})[pool_id];
const pool_cfg = this.state.config.pools[pool_id];
if (!this.validate_pool_cfg(pool_id, pool_cfg, false))
{
continue;
continue next_pool;
}
for (let pg_num = 1; pg_num <= pool_cfg.pg_count; pg_num++)
{
@@ -677,17 +680,18 @@ class Mon
!(this.state.pg.state[pool_id][pg_num].state instanceof Array))
{
// Unclean
return;
continue next_pool;
}
let st = this.state.pg.state[pool_id][pg_num].state.join(',');
if (st != 'active' && st != 'active,left_on_dead' && st != 'left_on_dead,active')
{
// Unclean
return;
continue next_pool;
}
}
new_clean_pgs.items[pool_id] = this.state.config.pgs.items[pool_id];
}
this.state.history.last_clean_pgs = JSON.parse(JSON.stringify(this.state.config.pgs));
this.state.history.last_clean_pgs = new_clean_pgs;
await this.etcd_call('/kv/txn', {
success: [ { requestPut: {
key: b64(this.etcd_prefix+'/history/last_clean_pgs'),
@@ -1374,16 +1378,14 @@ class Mon
// This is required for multiple change events to trigger at most 1 recheck in 1s
schedule_recheck()
{
if (this.recheck_timer)
if (!this.recheck_timer)
{
clearTimeout(this.recheck_timer);
this.recheck_timer = null;
this.recheck_timer = setTimeout(() =>
{
this.recheck_timer = null;
this.recheck_pgs().catch(this.die);
}, this.config.mon_change_timeout || 1000);
}
this.recheck_timer = setTimeout(() =>
{
this.recheck_timer = null;
this.recheck_pgs().catch(this.die);
}, this.config.mon_change_timeout || 1000);
}
sum_op_stats(timestamp, prev_stats)
@@ -1719,11 +1721,11 @@ class Mon
else if (key_parts[0] === 'osd' && key_parts[1] === 'stats')
{
// Recheck OSD tree on OSD addition/deletion
if ((!old) != (!kv.value) || old && kv.value && (old.size != kv.value.size || old.time != kv.value.time))
if ((!old) != (!kv.value) || old && kv.value && old.size != kv.value.size)
{
this.schedule_recheck();
}
// Recheck PGs <osd_out_time> later
// Recheck PGs <osd_out_time> after last OSD statistics report
this.schedule_next_recheck_at(
!this.state.osd.stats[key[2]] ? 0 : this.state.osd.stats[key[2]].time+this.config.osd_out_time
);

View File

@@ -235,9 +235,16 @@ add_executable(osd_test osd_test.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(osd_test tcmalloc_minimal)
# osd_rmw_test
# FIXME: Move to tests
add_executable(osd_rmw_test osd_rmw_test.cpp allocator.cpp)
target_link_libraries(osd_rmw_test Jerasure ${ISAL_LIBRARIES} tcmalloc_minimal)
if (ISAL_LIBRARIES)
add_executable(osd_rmw_test_je osd_rmw_test.cpp allocator.cpp)
target_compile_definitions(osd_rmw_test_je PUBLIC -DNO_ISAL)
target_link_libraries(osd_rmw_test_je Jerasure tcmalloc_minimal)
endif (ISAL_LIBRARIES)
# stub_uring_osd
add_executable(stub_uring_osd
stub_uring_osd.cpp

View File

@@ -162,7 +162,8 @@ void journal_flusher_t::mark_trim_possible()
if (trim_wanted > 0)
{
dequeuing = true;
journal_trim_counter++;
if (!journal_trim_counter)
journal_trim_counter = journal_trim_interval;
bs->ringloop->wakeup();
}
}

View File

@@ -193,6 +193,7 @@ void blockstore_impl_t::loop()
}
if (wr_st == 2)
{
submit_queue[op_idx] = NULL;
new_idx--;
}
if (wr_st == 0)
@@ -582,7 +583,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
replace_stable(dirty_it->first.oid, 0, clean_stable_count, stable_count, stable);
}
}
else if (IS_STABLE(dirty_it->second.state))
else if (IS_STABLE(dirty_it->second.state) || (dirty_it->second.state & BS_ST_INSTANT))
{
// First try to replace a clean stable version in the first part of the list
if (!replace_stable(dirty_it->first.oid, dirty_it->first.version, 0, clean_stable_count, stable))

View File

@@ -16,6 +16,7 @@
// FIXME: This value should be dynamic i.e. Blockstore ideally shouldn't allow
// writing more than can be stabilized afterwards
#define JOURNAL_STABILIZE_RESERVATION 65536
#define JOURNAL_INSTANT_RESERVATION 131072
// Journal entries
// Journal entries are linked to each other by their crc32 value

View File

@@ -286,7 +286,10 @@ void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op
{
auto used = --journal.used_sectors[rv.journal_sector-1];
if (used == 0)
{
journal.used_sectors.erase(rv.journal_sector-1);
flusher->mark_trim_possible();
}
}
}
}

View File

@@ -127,7 +127,6 @@ resume_4:
{
mark_rolled_back(*v);
}
flusher->mark_trim_possible();
// Acknowledge op
op->retval = 0;
FINISH_OP(op);
@@ -232,6 +231,7 @@ void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start,
if (used == 0)
{
journal.used_sectors.erase(dirty_it->second.journal_sector);
flusher->mark_trim_possible();
}
if (dsk.clean_entry_bitmap_size > sizeof(void*))
{

View File

@@ -89,6 +89,9 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
else
{
// Invalid version requested
#ifdef BLOCKSTORE_DEBUG
printf("Write %lx:%lx v%lu requested, but we already have v%lu\n", op->oid.inode, op->oid.stripe, op->version, version);
#endif
op->retval = -EEXIST;
if (!is_del && dsk.clean_entry_bitmap_size > sizeof(void*))
{
@@ -115,8 +118,8 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
else if (!wait_del)
printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len);
#endif
// FIXME No strict need to add it into dirty_db here, it's just left
// from the previous implementation where reads waited for writes
// No strict need to add it into dirty_db here except maybe for listings to return
// correct data when there are inflight operations in the queue
uint32_t state;
if (is_del)
state = BS_ST_DELETE | BS_ST_IN_FLIGHT;
@@ -182,9 +185,15 @@ void blockstore_impl_t::cancel_all_writes(blockstore_op_t *op, blockstore_dirty_
bool found = false;
for (auto other_op: submit_queue)
{
// <op> may be present in queue multiple times due to moving operations in submit_queue
if (other_op == op)
if (!other_op)
{
// freed operations during submitting are zeroed
}
else if (other_op == op)
{
// <op> may be present in queue multiple times due to moving operations in submit_queue
found = true;
}
else if (found && other_op->oid == op->oid &&
(other_op->opcode == BS_OP_WRITE || other_op->opcode == BS_OP_WRITE_STABLE))
{
@@ -252,7 +261,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
{
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, unsynced_big_write_count + 1,
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION))
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size,
(dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION))
{
return 0;
}
@@ -332,7 +342,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
!space_check.check_available(op, unsynced_big_write_count,
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, 0)
|| !space_check.check_available(op, 1,
sizeof(journal_entry_small_write) + dsk.clean_entry_bitmap_size, op->len + JOURNAL_STABILIZE_RESERVATION))
sizeof(journal_entry_small_write) + dsk.clean_entry_bitmap_size,
op->len + ((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)))
{
return 0;
}
@@ -443,18 +454,19 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry
{
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, 1,
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION))
{
return 0;
}
BS_SUBMIT_CHECK_SQES(1);
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
assert(dirty_it != dirty_db.end());
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, 1,
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size,
((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)))
{
return 0;
}
BS_SUBMIT_CHECK_SQES(1);
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size
@@ -641,7 +653,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
});
assert(dirty_it != dirty_db.end());
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_STABILIZE_RESERVATION))
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_INSTANT_RESERVATION))
{
return 0;
}

View File

@@ -34,26 +34,22 @@ struct alloc_osd_t
json11::Json::object {
{ "target", "VERSION" },
{ "version", 0 },
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+"/osd/stats/"+std::to_string(new_id)
) },
{ "key", "/osd/stats/"+std::to_string(new_id) }
},
} },
{ "success", json11::Json::array {
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+"/osd/stats/"+std::to_string(new_id)
) },
{ "value", base64_encode("{}") },
{ "key", "/osd/stats/"+std::to_string(new_id) }
{ "value", "{}" },
} },
},
} },
{ "failure", json11::Json::array {
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats/") },
{ "range_end", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats0") },
{ "key", "/osd/stats/" },
{ "range_end", "/osd/stats0" },
{ "keys_only", true },
} },
},

View File

@@ -5,6 +5,7 @@
#include "cli.h"
#include "cluster_client.h"
#include "str_util.h"
#include "epoll_manager.h"
#include <algorithm>
@@ -14,13 +15,21 @@ struct rm_osd_t
cli_tool_t *parent;
bool dry_run, force_warning, force_dataloss;
uint64_t etcd_tx_retry_ms = 500;
uint64_t etcd_tx_retries = 10000;
std::vector<uint64_t> osd_ids;
int state = 0;
cli_result_t result;
std::set<uint64_t> to_remove;
std::set<uint64_t> to_restart;
json11::Json::array pool_effects;
json11::Json::array history_updates, history_checks;
json11::Json new_pgs, new_clean_pgs;
uint64_t new_pgs_mod_rev, new_clean_pgs_mod_rev;
uint64_t cur_retry = 0;
uint64_t retry_wait = 0;
bool is_warning, is_dataloss;
bool is_done()
@@ -32,6 +41,12 @@ struct rm_osd_t
{
if (state == 1)
goto resume_1;
else if (state == 2)
goto resume_2;
else if (state == 3)
goto resume_3;
else if (state == 4)
goto resume_4;
if (!osd_ids.size())
{
result = (cli_result_t){ .err = EINVAL, .text = "OSD numbers are not specified" };
@@ -152,14 +167,48 @@ struct rm_osd_t
result.text = error;
if (dry_run || is_dataloss && !force_dataloss || is_warning && !force_warning)
{
result.err = is_dataloss || is_warning ? EBUSY : 0;
result.err = is_dataloss && !force_dataloss || is_warning && !force_warning ? EBUSY : 0;
state = 100;
return;
}
}
parent->etcd_txn(json11::Json::object { { "success", json11::Json::array {
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+"/config/pgs"
) },
} },
},
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+"/history/last_clean_pgs"
) },
} },
},
} } });
resume_4:
state = 4;
if (parent->waiting > 0)
return;
if (parent->etcd_err.err)
{
result = parent->etcd_err;
state = 100;
return;
}
{
auto kv = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][0]["response_range"]["kvs"][0]);
new_pgs = remove_osds_from_pgs(kv);
new_pgs_mod_rev = kv.mod_revision;
kv = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][1]["response_range"]["kvs"][0]);
new_clean_pgs = remove_osds_from_pgs(kv);
new_clean_pgs_mod_rev = kv.mod_revision;
}
// Remove keys from etcd
{
json11::Json::array rm_items;
json11::Json::array rm_items, rm_checks;
for (auto osd_id: osd_ids)
{
rm_items.push_back("/config/osd/"+std::to_string(osd_id));
@@ -178,7 +227,39 @@ struct rm_osd_t
} },
};
}
parent->etcd_txn(json11::Json::object { { "success", rm_items } });
if (!new_pgs.is_null())
{
auto pgs_key = base64_encode(parent->cli->st_cli.etcd_prefix+"/config/pgs");
rm_items.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", pgs_key },
{ "value", base64_encode(new_pgs.dump()) },
} },
});
rm_checks.push_back(json11::Json::object {
{ "target", "MOD" },
{ "key", pgs_key },
{ "result", "LESS" },
{ "mod_revision", new_pgs_mod_rev+1 },
});
}
if (!new_clean_pgs.is_null())
{
auto pgs_key = base64_encode(parent->cli->st_cli.etcd_prefix+"/history/last_clean_pgs");
rm_items.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", pgs_key },
{ "value", base64_encode(new_clean_pgs.dump()) },
} },
});
rm_checks.push_back(json11::Json::object {
{ "target", "MOD" },
{ "key", pgs_key },
{ "result", "LESS" },
{ "mod_revision", new_clean_pgs_mod_rev+1 },
});
}
parent->etcd_txn(json11::Json::object { { "success", rm_items }, { "checks", rm_checks } });
}
resume_1:
state = 1;
@@ -190,6 +271,46 @@ struct rm_osd_t
state = 100;
return;
}
// Remove old OSD from PG all_peers to prevent left_on_dead and from
// target_history to prevent INCOMPLETE if --allow-data-loss is specified
for (auto & rsp: parent->etcd_result["responses"].array_items())
{
if (rsp["response_delete_range"]["deleted"].uint64_value() > 0)
{
// Wait for mon_change_timeout before updating PG history, or the monitor's change will likely interfere with ours
retry_wait = parent->cli->merged_config["mon_change_timeout"].uint64_value();
if (!retry_wait)
retry_wait = 1000;
retry_wait += etcd_tx_retry_ms;
}
}
while (1)
{
resume_2:
if (!remove_osds_from_history(2))
return;
resume_3:
state = 3;
if (parent->waiting > 0)
return;
if (parent->etcd_err.err)
{
result = parent->etcd_err;
state = 100;
return;
}
if (parent->etcd_result["succeeded"].bool_value())
break;
if ((++cur_retry) >= etcd_tx_retries)
{
result.err = EAGAIN;
result.text += "Failed to remove OSDs from PG history due to update conflicts."
" Some PGs may remain left_on_dead or incomplete. Please retry later\n";
state = 100;
return;
}
retry_wait = etcd_tx_retry_ms;
}
std::string ids = "";
for (auto osd_id: osd_ids)
{
@@ -200,6 +321,138 @@ struct rm_osd_t
result.text = (result.text != "" ? ids+"\n"+result.text : ids);
result.err = 0;
}
json11::Json remove_osds_from_pgs(const etcd_kv_t & kv)
{
if (kv.value.is_null())
{
return kv.value;
}
json11::Json::object new_pgs;
for (auto & pp: kv.value["items"].object_items())
{
if (pp.second.is_object())
{
json11::Json::object new_pool;
for (auto & pgp: pp.second.object_items())
{
json11::Json::array osd_set;
for (auto & osd_json: pgp.second["osd_set"].array_items())
{
uint64_t osd_num = osd_json.uint64_value();
osd_set.push_back(osd_num == 0 || to_remove.find(osd_num) != to_remove.end() ? 0 : osd_num);
}
json11::Json::object new_pg = pgp.second.object_items();
new_pg["osd_set"] = osd_set;
new_pool[pgp.first] = new_pg;
}
new_pgs[pp.first] = new_pool;
}
else
new_pgs[pp.first] = pp.second;
}
auto res = kv.value.object_items();
res["items"] = new_pgs;
return res;
}
bool remove_osds_from_history(int base_state)
{
if (state == base_state+0)
goto resume_0;
history_updates.clear();
history_checks.clear();
for (auto & pp: parent->cli->st_cli.pool_config)
{
bool update_pg_history = false;
auto & pool_cfg = pp.second;
for (auto & pgp: pool_cfg.pg_config)
{
auto pg_num = pgp.first;
auto & pg_cfg = pgp.second;
for (int i = 0; i < pg_cfg.all_peers.size(); i++)
{
if (to_remove.find(pg_cfg.all_peers[i]) != to_remove.end())
{
update_pg_history = true;
pg_cfg.all_peers.erase(pg_cfg.all_peers.begin()+i, pg_cfg.all_peers.begin()+i+1);
i--;
}
}
for (int i = 0; i < pg_cfg.target_history.size(); i++)
{
int hist_size = 0, hist_rm = 0;
for (auto & old_osd: pg_cfg.target_history[i])
{
if (old_osd != 0)
{
hist_size++;
if (to_remove.find(old_osd) != to_remove.end())
{
hist_rm++;
old_osd = 0;
}
}
}
if (hist_rm > 0)
{
if (hist_size-hist_rm == 0)
{
pg_cfg.target_history.erase(pg_cfg.target_history.begin()+i, pg_cfg.target_history.begin()+i+1);
i--;
}
update_pg_history = true;
}
}
if (update_pg_history)
{
std::string history_key = base64_encode(
parent->cli->st_cli.etcd_prefix+"/pg/history/"+
std::to_string(pool_cfg.id)+"/"+std::to_string(pg_num)
);
history_updates.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", history_key },
{ "value", base64_encode(json11::Json(json11::Json::object {
{ "epoch", pg_cfg.epoch },
{ "all_peers", pg_cfg.all_peers },
{ "osd_sets", pg_cfg.target_history },
}).dump()) },
} },
});
history_checks.push_back(json11::Json::object {
{ "target", "MOD" },
{ "key", history_key },
{ "result", "LESS" },
{ "mod_revision", parent->cli->st_cli.etcd_watch_revision+1 },
});
}
}
}
if (history_updates.size())
{
if (retry_wait)
{
parent->waiting++;
parent->epmgr->tfd->set_timer(retry_wait, false, [this](int timer_id)
{
parent->waiting--;
parent->ringloop->wakeup();
});
resume_0:
state = base_state+0;
if (parent->waiting > 0)
return false;
}
parent->etcd_txn(json11::Json::object {
{ "success", history_updates },
{ "compare", history_checks },
});
}
else
parent->etcd_result = json11::Json::object{ { "succeeded", true } };
return true;
}
};
std::function<bool(cli_result_t &)> cli_tool_t::start_rm_osd(json11::Json cfg)
@@ -209,6 +462,14 @@ std::function<bool(cli_result_t &)> cli_tool_t::start_rm_osd(json11::Json cfg)
rm_osd->dry_run = cfg["dry_run"].bool_value();
rm_osd->force_dataloss = cfg["allow_data_loss"].bool_value();
rm_osd->force_warning = rm_osd->force_dataloss || cfg["force"].bool_value();
if (!cfg["etcd_tx_retries"].is_null())
rm_osd->etcd_tx_retries = cfg["etcd_tx_retries"].uint64_value();
if (!cfg["etcd_tx_retry_ms"].is_null())
{
rm_osd->etcd_tx_retry_ms = cfg["etcd_tx_retry_ms"].uint64_value();
if (rm_osd->etcd_tx_retry_ms < 100)
rm_osd->etcd_tx_retry_ms = 100;
}
if (cfg["osd_id"].is_number() || cfg["osd_id"].is_string())
rm_osd->osd_ids.push_back(cfg["osd_id"].uint64_value());
else

View File

@@ -387,6 +387,14 @@ int disk_tool_t::purge_devices(const std::vector<std::string> & devices)
rm_osd_cli.push_back(std::to_string(osd_num));
}
// Check for data loss
if (options["force"] != "")
{
rm_osd_cli.push_back("--force");
}
else if (options["allow_data_loss"] != "")
{
rm_osd_cli.push_back("--allow-data-loss");
}
rm_osd_cli.push_back("--dry-run");
std::string dry_run_ignore_stdout;
if (shell_exec(rm_osd_cli, "", &dry_run_ignore_stdout, NULL) != 0)
@@ -405,14 +413,6 @@ int disk_tool_t::purge_devices(const std::vector<std::string> & devices)
}
// Remove OSD metadata
rm_osd_cli.pop_back();
if (options["force"] != "")
{
rm_osd_cli.push_back("--force");
}
else if (options["allow_data_loss"] != "")
{
rm_osd_cli.push_back("--allow-data-loss");
}
if (shell_exec(rm_osd_cli, "", NULL, NULL) != 0)
{
return 1;

View File

@@ -54,8 +54,55 @@ etcd_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
return kv;
}
json11::Json etcd_state_client_t::etcd_encode_actions(const json11::Json & items)
{
json11::Json::array encoded;
for (auto & v: items.array_items())
{
json11::Json::object act;
for (auto & kv: v.object_items())
{
if (kv.first == "key" || kv.first == "range_end")
act[kv.first] = base64_encode(etcd_prefix+kv.second.string_value());
else if (kv.first == "value")
act[kv.first] = base64_encode(kv.second.is_string() ? kv.second.string_value() : kv.second.dump());
else
act[kv.first] = kv.second;
}
encoded.push_back(act);
}
return encoded;
}
void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, int retries, int interval, std::function<void(std::string, json11::Json)> callback)
{
// FIXME: json11 is immutable which is very inconvenient for such cases
json11::Json::object encoded;
if (txn["compare"].is_array())
{
json11::Json::array compare;
for (auto & v: txn["compare"].array_items())
{
json11::Json::object cmp;
for (auto & kv: v.object_items())
{
if (kv.first == "key")
cmp[kv.first] = base64_encode(etcd_prefix+kv.second.string_value());
else
cmp[kv.first] = kv.second;
}
compare.push_back(cmp);
}
encoded["compare"] = compare;
}
if (txn["failure"].is_array())
{
encoded["failure"] = etcd_encode_actions(txn["failure"]);
}
if (txn["success"].is_array())
{
encoded["success"] = etcd_encode_actions(txn["success"]);
}
etcd_call("/kv/txn", txn, timeout, retries, interval, callback);
}
@@ -871,19 +918,33 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
pg_cfg.target_history.clear();
pg_cfg.all_peers.clear();
// Refuse to start PG if any set of the <osd_sets> has no live OSDs
for (auto hist_item: value["osd_sets"].array_items())
for (auto & hist_item: value["osd_sets"].array_items())
{
std::vector<osd_num_t> history_set;
for (auto pg_osd: hist_item.array_items())
for (auto & pg_osd: hist_item.array_items())
{
history_set.push_back(pg_osd.uint64_value());
osd_num_t pg_osd_num = pg_osd.uint64_value();
if (pg_osd_num != 0)
{
auto it = std::lower_bound(history_set.begin(), history_set.end(), pg_osd_num);
if (it == history_set.end() || *it != pg_osd_num)
history_set.insert(it, pg_osd_num);
}
}
pg_cfg.target_history.push_back(history_set);
auto it = std::lower_bound(pg_cfg.target_history.begin(), pg_cfg.target_history.end(), history_set);
if (it == pg_cfg.target_history.end() || *it != history_set)
pg_cfg.target_history.insert(it, history_set);
}
// Include these additional OSDs when peering the PG
for (auto pg_osd: value["all_peers"].array_items())
{
pg_cfg.all_peers.push_back(pg_osd.uint64_value());
osd_num_t pg_osd_num = pg_osd.uint64_value();
if (pg_osd_num != 0)
{
auto it = std::lower_bound(pg_cfg.all_peers.begin(), pg_cfg.all_peers.end(), pg_osd_num);
if (it == pg_cfg.all_peers.end() || *it != pg_osd_num)
pg_cfg.all_peers.insert(it, pg_osd_num);
}
}
// Read epoch
pg_cfg.epoch = value["epoch"].uint64_value();

View File

@@ -93,6 +93,7 @@ protected:
bool rand_initialized = false;
void add_etcd_url(std::string);
void pick_next_etcd();
json11::Json etcd_encode_actions(const json11::Json & items);
public:
int etcd_keepalive_timeout = 30;
int etcd_ws_keepalive_interval = 30;

View File

@@ -163,6 +163,9 @@ void osd_t::parse_config(const json11::Json & config, bool allow_disk_params)
recovery_queue_depth = config["recovery_queue_depth"].uint64_value();
if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE)
recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
recovery_pg_switch = config["recovery_pg_switch"].uint64_value();
if (recovery_pg_switch < 1)
recovery_pg_switch = DEFAULT_RECOVERY_PG_SWITCH;
recovery_sync_batch = config["recovery_sync_batch"].uint64_value();
if (recovery_sync_batch < 1 || recovery_sync_batch > MAX_RECOVERY_QUEUE)
recovery_sync_batch = DEFAULT_RECOVERY_BATCH;

View File

@@ -34,6 +34,7 @@
#define DEFAULT_AUTOSYNC_WRITES 128
#define MAX_RECOVERY_QUEUE 2048
#define DEFAULT_RECOVERY_QUEUE 4
#define DEFAULT_RECOVERY_PG_SWITCH 128
#define DEFAULT_RECOVERY_BATCH 16
//#define OSD_STUB
@@ -108,6 +109,7 @@ class osd_t
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // "emergency" sync every 5 seconds
int autosync_writes = DEFAULT_AUTOSYNC_WRITES;
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
int recovery_pg_switch = DEFAULT_RECOVERY_PG_SWITCH;
int recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
int inode_vanish_time = 60;
int log_level = 0;
@@ -135,7 +137,10 @@ class osd_t
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
int peering_state = 0;
std::map<object_id, osd_recovery_op_t> recovery_ops;
int recovery_done = 0;
bool recovery_last_degraded = true;
pool_pg_num_t recovery_last_pg;
object_id recovery_last_oid;
int recovery_pg_done = 0, recovery_done = 0;
osd_op_t *autosync_op = NULL;
// Unstable writes
@@ -200,7 +205,6 @@ class osd_t
bool check_peer_config(osd_client_t *cl, json11::Json conf);
void repeer_pgs(osd_num_t osd_num);
void start_pg_peering(pg_t & pg);
void submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
void discard_list_subop(osd_op_t *list_op);
bool stop_pg(pg_t & pg);

View File

@@ -132,7 +132,7 @@ bool osd_t::check_peer_config(osd_client_t *cl, json11::Json conf)
this->osd_num, immediate_commit == IMMEDIATE_ALL ? "all" : "small",
cl->osd_num, conf["immediate_commit"].string_value().c_str()
);
return true;
return false;
}
else if (conf["block_size"].uint64_value() != (uint64_t)this->bs_block_size)
{
@@ -140,7 +140,7 @@ bool osd_t::check_peer_config(osd_client_t *cl, json11::Json conf)
"[OSD %lu] My block_size is %u, but peer OSD %lu has %lu. We can't work together\n",
this->osd_num, this->bs_block_size, cl->osd_num, conf["block_size"].uint64_value()
);
return true;
return false;
}
else if (conf["bitmap_granularity"].uint64_value() != (uint64_t)this->bs_bitmap_granularity)
{
@@ -148,7 +148,7 @@ bool osd_t::check_peer_config(osd_client_t *cl, json11::Json conf)
"[OSD %lu] My bitmap_granularity is %u, but peer OSD %lu has %lu. We can't work together\n",
this->osd_num, this->bs_bitmap_granularity, cl->osd_num, conf["bitmap_granularity"].uint64_value()
);
return true;
return false;
}
}
return true;
@@ -382,30 +382,6 @@ void osd_t::on_change_etcd_state_hook(std::map<std::string, etcd_kv_t> & changes
}
}
void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num)
{
auto pg_it = pgs.find({
.pool_id = pool_id,
.pg_num = pg_num,
});
if (pg_it != pgs.end() && pg_it->second.epoch > pg_it->second.reported_epoch &&
st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg_it->second.epoch)
{
pg_it->second.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch;
object_id oid = { 0 };
bool first = true;
for (auto op: pg_it->second.write_queue)
{
if (first || oid != op.first)
{
oid = op.first;
first = false;
continue_primary_write(op.second);
}
}
}
}
void osd_t::on_load_config_hook(json11::Json::object & global_config)
{
json11::Json::object osd_config = this->config;
@@ -704,13 +680,16 @@ void osd_t::apply_pg_config()
}
}
}
auto vec_all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end());
if (currently_taken)
{
if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING | PG_REPEERING | PG_PEERED))
{
if (pg_it->second.target_set == pg_cfg.target_set)
if (pg_it->second.target_set == pg_cfg.target_set &&
pg_it->second.target_history == pg_cfg.target_history &&
pg_it->second.all_peers == vec_all_peers)
{
// No change in osd_set; history changes are ignored
// No change in osd_set and history
continue;
}
else
@@ -761,7 +740,7 @@ void osd_t::apply_pg_config()
.pg_num = pg_num,
.reported_epoch = pg_cfg.epoch,
.target_history = pg_cfg.target_history,
.all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end()),
.all_peers = vec_all_peers,
.target_set = pg_cfg.target_set,
};
if (pg.scheme == POOL_SCHEME_EC)

View File

@@ -226,42 +226,51 @@ bool osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
{
if (!no_recovery)
if (!pgs.size())
{
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
{
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
{
for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
{
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
{
op.degraded = true;
op.oid = obj_it->first;
return true;
}
}
}
}
return false;
}
if (!no_rebalance)
// Restart scanning from the same degraded/misplaced status as the last time
for (int tried_degraded = 0; tried_degraded < 2; tried_degraded++)
{
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
if (recovery_last_degraded ? !no_recovery : !no_rebalance)
{
// Don't try to "recover" misplaced objects if "recovery" would make them degraded
if ((pg_it->second.state & (PG_ACTIVE | PG_DEGRADED | PG_HAS_MISPLACED)) == (PG_ACTIVE | PG_HAS_MISPLACED))
auto mask = recovery_last_degraded ? (PG_ACTIVE | PG_HAS_DEGRADED) : (PG_ACTIVE | PG_DEGRADED | PG_HAS_MISPLACED);
auto check = recovery_last_degraded ? (PG_ACTIVE | PG_HAS_DEGRADED) : (PG_ACTIVE | PG_HAS_MISPLACED);
// Restart scanning from the same PG as the last time
for (auto pg_it = pgs.lower_bound(recovery_last_pg); pg_it != pgs.end(); pg_it++)
{
for (auto obj_it = pg_it->second.misplaced_objects.begin(); obj_it != pg_it->second.misplaced_objects.end(); obj_it++)
if ((pg_it->second.state & mask) == check)
{
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
auto & src = recovery_last_degraded ? pg_it->second.degraded_objects : pg_it->second.misplaced_objects;
assert(src.size() > 0);
// Restart scanning from the next object
for (auto obj_it = src.upper_bound(recovery_last_oid); obj_it != src.end(); obj_it++)
{
op.degraded = false;
op.oid = obj_it->first;
return true;
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
{
op.degraded = recovery_last_degraded;
recovery_last_oid = op.oid = obj_it->first;
recovery_pg_done++;
// Switch to another PG after recovery_pg_switch operations
// to always mix all PGs during recovery but still benefit
// from recovery queue depth greater than 1
if (recovery_pg_done >= recovery_pg_switch)
{
recovery_pg_done = 0;
recovery_last_pg.pg_num++;
recovery_last_oid = {};
}
return true;
}
}
}
}
}
recovery_last_degraded = !recovery_last_degraded;
recovery_last_pg = {};
recovery_last_oid = {};
}
return false;
}

View File

@@ -28,3 +28,13 @@ inline bool operator < (const pool_pg_num_t & a, const pool_pg_num_t & b)
{
return a.pool_id < b.pool_id || a.pool_id == b.pool_id && a.pg_num < b.pg_num;
}
inline bool operator == (const pool_pg_num_t & a, const pool_pg_num_t & b)
{
return a.pool_id == b.pool_id && a.pg_num == b.pg_num;
}
inline bool operator != (const pool_pg_num_t & a, const pool_pg_num_t & b)
{
return a.pool_id != b.pool_id || a.pg_num != b.pg_num;
}

View File

@@ -32,7 +32,16 @@ void osd_t::handle_peers()
if (p.second.state & PG_HAS_UNCLEAN)
peering_state = peering_state | OSD_FLUSHING_PGS;
else if (p.second.state & (PG_HAS_DEGRADED | PG_HAS_MISPLACED))
{
peering_state = peering_state | OSD_RECOVERING;
if (p.second.state & PG_HAS_DEGRADED)
{
// Restart recovery from degraded objects
recovery_last_degraded = true;
recovery_last_pg = {};
recovery_last_oid = {};
}
}
ringloop->wakeup();
return;
}
@@ -302,82 +311,11 @@ void osd_t::start_pg_peering(pg_t & pg)
{
continue;
}
submit_sync_and_list_subop(peer_osd, pg.peering_state);
submit_list_subop(peer_osd, pg.peering_state);
}
ringloop->wakeup();
}
void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
{
// Sync before listing, if not readonly
if (readonly)
{
submit_list_subop(role_osd, ps);
}
else if (role_osd == this->osd_num)
{
// Self
osd_op_t *op = new osd_op_t();
op->op_type = 0;
op->peer_fd = SELF_FD;
clock_gettime(CLOCK_REALTIME, &op->tv_begin);
op->bs_op = new blockstore_op_t();
op->bs_op->opcode = BS_OP_SYNC;
op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
{
if (bs_op->retval < 0)
{
printf("Local OP_SYNC failed: %d (%s)\n", bs_op->retval, strerror(-bs_op->retval));
force_stop(1);
return;
}
add_bs_subop_stats(op);
delete op->bs_op;
op->bs_op = NULL;
delete op;
ps->list_ops.erase(role_osd);
submit_list_subop(role_osd, ps);
};
ps->list_ops[role_osd] = op;
bs->enqueue_op(op->bs_op);
}
else
{
// Peer
auto & cl = msgr.clients.at(msgr.osd_peer_fds.at(role_osd));
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->peer_fd = cl->peer_fd;
op->req = (osd_any_op_t){
.sec_sync = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SEC_SYNC,
},
},
};
op->callback = [this, ps, role_osd](osd_op_t *op)
{
if (op->reply.hdr.retval < 0)
{
// FIXME: Mark peer as failed and don't reconnect immediately after dropping the connection
printf("Failed to sync OSD %lu: %ld (%s), disconnecting peer\n", role_osd, op->reply.hdr.retval, strerror(-op->reply.hdr.retval));
int fail_fd = op->peer_fd;
ps->list_ops.erase(role_osd);
delete op;
msgr.stop_client(fail_fd);
return;
}
delete op;
ps->list_ops.erase(role_osd);
submit_list_subop(role_osd, ps);
};
ps->list_ops[role_osd] = op;
msgr.outbox_push(op);
}
}
void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
{
if (role_osd == this->osd_num)
@@ -551,13 +489,17 @@ void osd_t::report_pg_state(pg_t & pg)
pg.history_changed = true;
pg.target_history.clear();
pg.all_peers = pg.target_set;
std::sort(pg.all_peers.begin(), pg.all_peers.end());
pg.cur_peers = pg.target_set;
}
else if (pg.state == (PG_ACTIVE|PG_LEFT_ON_DEAD))
{
// Clear history of active+left_on_dead PGs, but leave dead OSDs in all_peers
pg.history_changed = true;
pg.target_history.clear();
if (pg.target_history.size())
{
pg.history_changed = true;
pg.target_history.clear();
}
std::set<osd_num_t> dead_peers;
for (auto pg_osd: pg.all_peers)
{
@@ -574,8 +516,12 @@ void osd_t::report_pg_state(pg_t & pg)
dead_peers.insert(pg_osd);
}
}
pg.all_peers.clear();
pg.all_peers.insert(pg.all_peers.begin(), dead_peers.begin(), dead_peers.end());
auto new_all_peers = std::vector<osd_num_t>(dead_peers.begin(), dead_peers.end());
if (pg.all_peers != new_all_peers)
{
pg.history_changed = true;
pg.all_peers = new_all_peers;
}
pg.cur_peers.clear();
for (auto pg_osd: pg.target_set)
{

View File

@@ -86,18 +86,9 @@ void pg_obj_state_check_t::walk()
}
if (pg->pg_cursize < pg->pg_size)
{
// Report PG history and activate
// Activate as degraded
// Current OSD set will be added into target_history on first write
pg->state |= PG_DEGRADED | PG_PEERED;
std::vector<osd_num_t> history_set;
for (auto peer_osd: pg->cur_set)
{
if (peer_osd != 0)
{
history_set.push_back(peer_osd);
}
}
pg->target_history.push_back(history_set);
pg->history_changed = true;
}
else
{
@@ -435,11 +426,35 @@ void pg_t::calc_object_states(int log_level)
std::sort(st.list.begin(), st.list.end());
// Walk over it and check object states
st.walk();
if (this->state & (PG_DEGRADED|PG_LEFT_ON_DEAD))
if (this->state != PG_ACTIVE)
{
assert(epoch != (((uint64_t)1 << PG_EPOCH_BITS)-1));
epoch++;
}
if (log_level > 0)
{
std::string osd_set_desc;
for (auto & osd_num: target_set)
{
osd_set_desc += (osd_set_desc == "" ? "" : ", ")+std::to_string(osd_num);
}
printf(
"[PG %u/%u] %lu clean objects on target OSD set %s\n",
pool_id, pg_num, clean_count, osd_set_desc.c_str()
);
for (auto & stp: state_dict)
{
osd_set_desc = "";
for (auto & loc: stp.first)
{
osd_set_desc += (osd_set_desc == "" ? "" : ", ")+
std::to_string(loc.osd_num)+
(st.replicated ? "" : "("+std::to_string(loc.role)+")")+
(loc.outdated ? "(old)" : "");
}
printf("[PG %u/%u] %lu objects on OSD set %s\n", pool_id, pg_num, stp.second.object_count, osd_set_desc.c_str());
}
}
}
void pg_t::print_state()

View File

@@ -228,7 +228,7 @@ resume_1:
resume_2:
if (op_data->errors > 0)
{
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
finish_op(cur_op, op_data->errcode);
return;
}
cur_op->reply.rw.version = op_data->fact_ver;
@@ -350,7 +350,7 @@ resume_2:
resume_3:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
return;
}
// Check CAS version
@@ -371,7 +371,7 @@ resume_4:
resume_5:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
return;
}
// Remove version override

View File

@@ -24,7 +24,7 @@ struct osd_primary_op_data_t
uint64_t target_ver;
uint64_t orig_ver = 0, fact_ver = 0;
uint64_t scheme = 0;
int n_subops = 0, done = 0, errors = 0, epipe = 0;
int n_subops = 0, done = 0, errors = 0, errcode = 0;
int degraded = 0, pg_size, pg_data_size;
osd_rmw_stripe_t *stripes;
osd_op_t *subops = NULL;

View File

@@ -42,7 +42,7 @@ resume_4:
{
free(op_data->chain_reads);
op_data->chain_reads = NULL;
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
finish_op(cur_op, op_data->errcode);
return;
}
send_chained_read_results(pg, cur_op);

View File

@@ -122,7 +122,7 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, const ui
zero_read = -1;
osd_op_t *subops = new osd_op_t[n_subops];
op_data->fact_ver = 0;
op_data->done = op_data->errors = 0;
op_data->done = op_data->errors = op_data->errcode = 0;
op_data->n_subops = n_subops;
op_data->subops = subops;
int sent = submit_primary_subop_batch(submit_type, op_data->oid.inode, op_version, op_data->stripes, osd_set, cur_op, 0, zero_read);
@@ -263,9 +263,11 @@ void osd_t::handle_primary_bs_subop(osd_op_t *subop)
blockstore_op_t *bs_op = subop->bs_op;
int expected = bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE
|| bs_op->opcode == BS_OP_WRITE_STABLE ? bs_op->len : 0;
if (bs_op->retval != expected && bs_op->opcode != BS_OP_READ)
if (bs_op->retval != expected && bs_op->opcode != BS_OP_READ &&
(bs_op->opcode != BS_OP_WRITE && bs_op->opcode != BS_OP_WRITE_STABLE ||
bs_op->retval != -ENOSPC))
{
// die
// die on any error except ENOSPC
throw std::runtime_error(
"local blockstore modification failed (opcode = "+std::to_string(bs_op->opcode)+
" retval = "+std::to_string(bs_op->retval)+")"
@@ -276,6 +278,8 @@ void osd_t::handle_primary_bs_subop(osd_op_t *subop)
subop->reply.hdr.retval = bs_op->retval;
if (bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE || bs_op->opcode == BS_OP_WRITE_STABLE)
{
subop->req.sec_rw.oid = bs_op->oid;
subop->req.sec_rw.version = bs_op->version;
subop->req.sec_rw.len = bs_op->len;
subop->reply.sec_rw.version = bs_op->version;
}
@@ -337,14 +341,17 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
osd_op_names[opcode], subop->peer_fd, retval, expected
);
}
if (retval == -EPIPE)
// Error priority: EIO > ENOSPC > EPIPE
if (op_data->errcode == 0 || retval == -EIO ||
retval == -ENOSPC && op_data->errcode == -EPIPE)
{
op_data->epipe++;
op_data->errcode = retval;
}
op_data->errors++;
if (subop->peer_fd >= 0)
if (subop->peer_fd >= 0 && (opcode != OSD_OP_SEC_WRITE && opcode != OSD_OP_SEC_WRITE_STABLE ||
retval != -ENOSPC))
{
// Drop connection on any error
// Drop connection on any error expect ENOSPC
msgr.stop_client(subop->peer_fd);
}
}
@@ -408,7 +415,8 @@ void osd_t::cancel_primary_write(osd_op_t *cur_op)
// are sent to peer OSDs, so we can't just throw them away.
// Mark them with an extra EPIPE.
cur_op->op_data->errors++;
cur_op->op_data->epipe++;
if (cur_op->op_data->errcode == 0)
cur_op->op_data->errcode = -EPIPE;
cur_op->op_data->done--; // Caution: `done` must be signed because may become -1 here
}
else
@@ -460,7 +468,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
{
osd_primary_op_data_t *op_data = cur_op->op_data;
op_data->n_subops = chunks_to_delete_count;
op_data->done = op_data->errors = 0;
op_data->done = op_data->errors = op_data->errcode = 0;
if (!op_data->n_subops)
{
return;
@@ -523,7 +531,7 @@ int osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
osd_primary_op_data_t *op_data = cur_op->op_data;
int n_osds = op_data->dirty_osd_count;
osd_op_t *subops = new osd_op_t[n_osds];
op_data->done = op_data->errors = 0;
op_data->done = op_data->errors = op_data->errcode = 0;
op_data->n_subops = n_osds;
op_data->subops = subops;
std::map<uint64_t, int>::iterator peer_it;
@@ -579,7 +587,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
osd_primary_op_data_t *op_data = cur_op->op_data;
int n_osds = op_data->unstable_write_osds->size();
osd_op_t *subops = new osd_op_t[n_osds];
op_data->done = op_data->errors = 0;
op_data->done = op_data->errors = op_data->errcode = 0;
op_data->n_subops = n_osds;
op_data->subops = subops;
for (int i = 0; i < n_osds; i++)

View File

@@ -240,7 +240,7 @@ resume_8:
}
if (op_data->errors > 0)
{
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
finish_op(cur_op, op_data->errcode);
}
else
{

View File

@@ -93,7 +93,7 @@ resume_2:
resume_3:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
return;
}
// Check CAS version
@@ -155,17 +155,36 @@ resume_3:
if (pg.epoch > pg.reported_epoch)
{
// Report newer epoch before writing
// FIXME: We may report only one PG state here...
// FIXME: We don't have to report all changed PG states here
this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
pg.history_changed = true;
if (pg.state != PG_ACTIVE)
{
// Check that current OSD set is in history and/or add it there
std::vector<osd_num_t> history_set;
for (auto peer_osd: pg.cur_set)
if (peer_osd != 0)
history_set.push_back(peer_osd);
std::sort(history_set.begin(), history_set.end());
auto it = std::lower_bound(pg.target_history.begin(), pg.target_history.end(), history_set);
if (it == pg.target_history.end() || *it != history_set)
pg.target_history.insert(it, history_set);
pg.history_changed = true;
}
report_pg_states();
resume_10:
if (pg.epoch > pg.reported_epoch)
{
op_data->st = 10;
#define PG_EPOCH_WAIT_STATE 10
op_data->st = PG_EPOCH_WAIT_STATE;
return;
}
}
// Recheck PG state after reporting history - maybe it's already stopping/restarting
if (pg.state & (PG_STOPPING|PG_REPEERING))
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, -EPIPE);
return;
}
submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.cur_set.data(), cur_op);
resume_4:
op_data->st = 4;
@@ -178,7 +197,7 @@ resume_5:
}
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
return;
}
if (op_data->object_state)
@@ -255,7 +274,7 @@ resume_8:
resume_9:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
return;
}
}
@@ -287,6 +306,50 @@ continue_others:
}
}
void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num)
{
auto pg_it = pgs.find({
.pool_id = pool_id,
.pg_num = pg_num,
});
if (pg_it == pgs.end())
{
return;
}
auto & pg = pg_it->second;
if (pg.epoch > pg.reported_epoch &&
st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg.epoch)
{
pg.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch;
std::vector<object_id> resume_oids;
for (auto & op: pg.write_queue)
{
if (op.second->op_data->st == PG_EPOCH_WAIT_STATE)
{
// Run separately to prevent side effects
resume_oids.push_back(op.first);
}
}
for (auto & oid: resume_oids)
{
auto pg_it = pgs.find({
.pool_id = pool_id,
.pg_num = pg_num,
});
if (pg_it != pgs.end())
{
auto & pg = pg_it->second;
auto op_it = pg.write_queue.find(oid);
if (op_it != pg.write_queue.end() &&
op_it->second->op_data->st == PG_EPOCH_WAIT_STATE)
{
continue_primary_write(op_it->second);
}
}
}
}
}
bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
@@ -337,7 +400,7 @@ resume_7:
op_data->unstable_write_osds = NULL;
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
return false;
}
}

View File

@@ -154,6 +154,8 @@ struct reed_sol_matrix_t
int refs = 0;
int *je_data;
uint8_t *isal_data;
// 32 bytes = 256/8 = max pg_size/8
std::map<std::array<uint8_t, 32>, void*> subdata;
std::map<reed_sol_erased_t, void*> decodings;
};
@@ -194,6 +196,12 @@ void use_ec(int pg_size, int pg_minsize, bool use)
free(rs_it->second.je_data);
if (rs_it->second.isal_data)
free(rs_it->second.isal_data);
for (auto sub_it = rs_it->second.subdata.begin(); sub_it != rs_it->second.subdata.end();)
{
void *data = sub_it->second;
rs_it->second.subdata.erase(sub_it++);
free(data);
}
for (auto dec_it = rs_it->second.decodings.begin(); dec_it != rs_it->second.decodings.end();)
{
void *data = dec_it->second;
@@ -294,6 +302,47 @@ static void* get_jerasure_decoding_matrix(osd_rmw_stripe_t *stripes, int pg_size
return dec_it->second;
}
#ifndef WITH_ISAL
#define JERASURE_ALIGNMENT 16
// jerasure requires 16-byte alignment for SSE...
// FIXME: jerasure/gf-complete should probably be patched to automatically choose non-sse version for unaligned buffers
static void jerasure_matrix_encode_unaligned(int k, int m, int w, int *matrix, char **data_ptrs, char **coding_ptrs, int size)
{
bool unaligned = false;
for (int i = 0; i < k; i++)
if (((unsigned long)data_ptrs[i]) % JERASURE_ALIGNMENT)
unaligned = true;
for (int i = 0; i < m; i++)
if (((unsigned long)coding_ptrs[i]) % JERASURE_ALIGNMENT)
unaligned = true;
if (!unaligned)
{
jerasure_matrix_encode(k, m, w, matrix, data_ptrs, coding_ptrs, size);
return;
}
int aligned_size = ((size+JERASURE_ALIGNMENT-1)/JERASURE_ALIGNMENT)*JERASURE_ALIGNMENT;
int copy_size = aligned_size*(k+m);
char local_data[copy_size > 4096 ? 0 : copy_size];
char *data_copy = copy_size > 4096 || (unsigned long)local_data % JERASURE_ALIGNMENT
? (char*)memalign_or_die(JERASURE_ALIGNMENT, aligned_size*(k+m))
: local_data;
char *aligned_ptrs[k+m];
for (int i = 0; i < k; i++)
{
memcpy(data_copy + i*aligned_size, data_ptrs[i], size);
aligned_ptrs[i] = data_copy + i*aligned_size;
}
for (int i = 0; i < m; i++)
aligned_ptrs[k+i] = data_copy + (k+i)*aligned_size;
jerasure_matrix_encode(k, m, w, matrix, aligned_ptrs, aligned_ptrs+k, size);
for (int i = 0; i < m; i++)
memcpy(coding_ptrs[i], aligned_ptrs[k+i], size);
if (copy_size > 4096 || (unsigned long)local_data % JERASURE_ALIGNMENT)
free(data_copy);
}
#endif
#ifdef WITH_ISAL
void reconstruct_stripes_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize, uint32_t bitmap_size)
{
@@ -357,10 +406,12 @@ void reconstruct_stripes_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsi
{
data_ptrs[role] = NULL;
}
bool recovered = false;
for (int role = 0; role < pg_minsize; role++)
{
if (stripes[role].read_end != 0 && stripes[role].missing)
{
recovered = true;
if (stripes[role].read_end > stripes[role].read_start)
{
for (int other = 0; other < pg_size; other++)
@@ -378,18 +429,64 @@ void reconstruct_stripes_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsi
data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start
);
}
for (int other = 0; other < pg_size; other++)
}
}
if (recovered && bitmap_size > 0)
{
bool unaligned = false;
for (int role = 0; role < pg_size; role++)
{
if (stripes[role].read_end != 0)
{
if (stripes[other].read_end != 0 && !stripes[other].missing)
data_ptrs[role] = (char*)stripes[role].bmp_buf;
if (((unsigned long)stripes[role].bmp_buf) % JERASURE_ALIGNMENT)
unaligned = true;
}
}
if (!unaligned)
{
for (int role = 0; role < pg_minsize; role++)
{
if (stripes[role].read_end != 0 && stripes[role].missing)
{
data_ptrs[other] = (char*)(stripes[other].bmp_buf);
jerasure_matrix_dotprod(
pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
data_ptrs, data_ptrs+pg_minsize, bitmap_size
);
}
}
data_ptrs[role] = (char*)stripes[role].bmp_buf;
jerasure_matrix_dotprod(
pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
data_ptrs, data_ptrs+pg_minsize, bitmap_size
);
}
else
{
// jerasure_matrix_dotprod requires 16-byte alignment for SSE...
int aligned_size = ((bitmap_size+JERASURE_ALIGNMENT-1)/JERASURE_ALIGNMENT)*JERASURE_ALIGNMENT;
int copy_size = aligned_size*pg_size;
char local_data[copy_size > 4096 ? 0 : copy_size];
bool alloc_copy = copy_size > 4096 || (unsigned long)local_data % JERASURE_ALIGNMENT;
char *data_copy = alloc_copy
? (char*)memalign_or_die(JERASURE_ALIGNMENT, copy_size)
: local_data;
for (int role = 0; role < pg_size; role++)
{
if (stripes[role].read_end != 0)
{
data_ptrs[role] = data_copy + role*aligned_size;
memcpy(data_ptrs[role], stripes[role].bmp_buf, bitmap_size);
}
}
for (int role = 0; role < pg_size; role++)
{
if (stripes[role].read_end != 0 && stripes[role].missing)
{
jerasure_matrix_dotprod(
pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
data_ptrs, data_ptrs+pg_minsize, bitmap_size
);
memcpy(stripes[role].bmp_buf, data_ptrs[role], bitmap_size);
}
}
if (alloc_copy)
free(data_copy);
}
}
}
@@ -808,11 +905,56 @@ void calc_rmw_parity_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
if (end != 0)
{
int write_parity = 0;
for (int i = pg_minsize; i < pg_size; i++)
bool is_seq = true;
for (int i = pg_size-1; i >= pg_minsize; i--)
{
if (write_osd_set[i] != 0)
write_parity++;
else if (write_parity != 0)
is_seq = false;
}
if (write_parity > 0)
{
// First get the coding matrix or sub-matrix
void *matrix_data =
#ifdef WITH_ISAL
matrix->isal_data;
#else
matrix->je_data;
#endif
if (!is_seq)
{
// We need a coding sub-matrix
std::array<uint8_t, 32> missing_parity = {};
for (int i = pg_minsize; i < pg_size; i++)
{
if (!write_osd_set[i])
missing_parity[(i-pg_minsize) >> 3] |= (1 << ((i-pg_minsize) & 0x7));
}
auto sub_it = matrix->subdata.find(missing_parity);
if (sub_it == matrix->subdata.end())
{
int item_size =
#ifdef WITH_ISAL
32;
#else
sizeof(int);
#endif
void *subm = malloc_or_die(item_size * write_parity * pg_minsize);
for (int i = pg_minsize, j = 0; i < pg_size; i++)
{
if (write_osd_set[i])
{
memcpy(subm + item_size*pg_minsize*j, matrix_data + item_size*pg_minsize*(i-pg_minsize), item_size*pg_minsize);
j++;
}
}
matrix->subdata[missing_parity] = subm;
matrix_data = subm;
}
else
matrix_data = sub_it->second;
}
// Calculate new coding chunks
buf_len_t bufs[pg_size][3];
int nbuf[pg_size], curbuf[pg_size];
@@ -841,13 +983,13 @@ void calc_rmw_parity_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
while (pos < end)
{
uint32_t next_end = end;
for (int i = 0; i < pg_size; i++)
for (int i = 0, j = 0; i < pg_size; i++)
{
if (i < pg_minsize || write_osd_set[i] != 0)
{
assert(curbuf[i] < nbuf[i]);
assert(bufs[i][curbuf[i]].buf);
data_ptrs[i] = (uint8_t*)bufs[i][curbuf[i]].buf + pos-positions[i];
data_ptrs[j++] = (uint8_t*)bufs[i][curbuf[i]].buf + pos-positions[i];
uint32_t this_end = bufs[i][curbuf[i]].len + positions[i];
if (next_end > this_end)
next_end = this_end;
@@ -868,32 +1010,30 @@ void calc_rmw_parity_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
}
#ifdef WITH_ISAL
ec_encode_data(
next_end-pos, pg_minsize, write_parity, matrix->isal_data,
next_end-pos, pg_minsize, write_parity, (uint8_t*)matrix_data,
(uint8_t**)data_ptrs, (uint8_t**)data_ptrs+pg_minsize
);
#else
jerasure_matrix_encode(
pg_minsize, write_parity, OSD_JERASURE_W, matrix->je_data,
pg_minsize, write_parity, OSD_JERASURE_W, (int*)matrix_data,
(char**)data_ptrs, (char**)data_ptrs+pg_minsize, next_end-pos
);
#endif
pos = next_end;
}
for (int i = 0; i < pg_size; i++)
for (int i = 0, j = 0; i < pg_size; i++)
{
if (i < pg_minsize || write_osd_set[i] != 0)
{
data_ptrs[i] = stripes[i].bmp_buf;
}
data_ptrs[j++] = stripes[i].bmp_buf;
}
#ifdef WITH_ISAL
ec_encode_data(
bitmap_size, pg_minsize, write_parity, matrix->isal_data,
bitmap_size, pg_minsize, write_parity, (uint8_t*)matrix_data,
(uint8_t**)data_ptrs, (uint8_t**)data_ptrs+pg_minsize
);
#else
jerasure_matrix_encode(
pg_minsize, write_parity, OSD_JERASURE_W, matrix->je_data,
jerasure_matrix_encode_unaligned(
pg_minsize, write_parity, OSD_JERASURE_W, (int*)matrix_data,
(char**)data_ptrs, (char**)data_ptrs+pg_minsize, bitmap_size
);
#endif

View File

@@ -3,6 +3,10 @@
#define RMW_DEBUG
#ifdef NO_ISAL
#undef WITH_ISAL
#endif
#include <string.h>
#include "osd_rmw.cpp"
#include "test_pattern.h"
@@ -21,6 +25,7 @@ void test12();
void test13();
void test14();
void test15();
void test16();
int main(int narg, char *args[])
{
@@ -50,6 +55,8 @@ int main(int narg, char *args[])
test14();
// Test 15
test15();
// Test 16
test16();
// End
printf("all ok\n");
return 0;
@@ -876,3 +883,106 @@ void test15()
free(write_buf);
use_ec(3, 2, false);
}
/***
16. EC 2+2 write one parity block with another missing
calc_rmw(offset=0, len=0, osd_set=[1,2,0,0], write_set=[1,2,0,3])
= {
read: [ [ 0, 128K ], [ 0, 128K ], [ 0, 0 ], [ 0, 0 ] ],
write: [ [ 0, 0 ], [ 0, 0 ], [ 0, 0 ], [ 0, 128K ] ],
input buffer: [],
rmw buffer: [ write3, read0, read1 ],
}
***/
void test16()
{
const int bmp = 128*1024 / 4096 / 8;
use_ec(4, 2, true);
osd_num_t osd_set[4] = { 1, 2, 0, 0 };
osd_num_t write_osd_set[4] = { 1, 2, 0, 3 };
osd_rmw_stripe_t stripes[4] = {};
unsigned bitmaps[4] = { 0 };
// Test 16.0
void *write_buf = NULL;
split_stripes(2, 128*1024, 0, 0, stripes);
assert(stripes[0].req_start == 0 && stripes[0].req_end == 0);
assert(stripes[1].req_start == 0 && stripes[1].req_end == 0);
assert(stripes[2].req_start == 0 && stripes[2].req_end == 0);
assert(stripes[3].req_start == 0 && stripes[3].req_end == 0);
// Test 16.1
void *rmw_buf = calc_rmw(write_buf, stripes, osd_set, 4, 2, 3, write_osd_set, 128*1024, bmp);
for (int i = 0; i < 4; i++)
stripes[i].bmp_buf = bitmaps+i;
assert(rmw_buf);
assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024);
assert(stripes[1].read_start == 0 && stripes[1].read_end == 128*1024);
assert(stripes[2].read_start == 0 && stripes[2].read_end == 0);
assert(stripes[3].read_start == 0 && stripes[3].read_end == 0);
assert(stripes[0].write_start == 0 && stripes[0].write_end == 0);
assert(stripes[1].write_start == 0 && stripes[1].write_end == 0);
assert(stripes[2].write_start == 0 && stripes[2].write_end == 0);
assert(stripes[3].write_start == 0 && stripes[3].write_end == 128*1024);
assert(stripes[0].read_buf == (uint8_t*)rmw_buf+128*1024);
assert(stripes[1].read_buf == (uint8_t*)rmw_buf+256*1024);
assert(stripes[2].read_buf == NULL);
assert(stripes[3].read_buf == NULL);
assert(stripes[0].write_buf == NULL);
assert(stripes[1].write_buf == NULL);
assert(stripes[2].write_buf == NULL);
assert(stripes[3].write_buf == rmw_buf);
// Test 16.2 - encode
set_pattern(stripes[0].read_buf, 128*1024, PATTERN1);
set_pattern(stripes[1].read_buf, 128*1024, PATTERN2);
memset(stripes[0].bmp_buf, 0xff, bmp);
memset(stripes[1].bmp_buf, 0xff, bmp);
calc_rmw_parity_ec(stripes, 4, 2, osd_set, write_osd_set, 128*1024, bmp);
assert(*(uint32_t*)stripes[2].bmp_buf == 0);
assert(*(uint32_t*)stripes[3].bmp_buf == 0xF1F1F1F1);
assert(stripes[0].write_start == 0 && stripes[0].write_end == 0);
assert(stripes[1].write_start == 0 && stripes[1].write_end == 0);
assert(stripes[2].write_start == 0 && stripes[2].write_end == 0);
assert(stripes[3].write_start == 0 && stripes[3].write_end == 128*1024);
assert(stripes[0].write_buf == NULL);
assert(stripes[1].write_buf == NULL);
assert(stripes[2].write_buf == NULL);
assert(stripes[3].write_buf == rmw_buf);
check_pattern(stripes[3].write_buf, 128*1024, 0x7eb9ae9cd8e652c3); // 2nd EC chunk
// Test 16.3 - decode and verify
osd_num_t read_osd_set[4] = { 0, 2, 0, 3 };
memset(stripes, 0, sizeof(stripes));
split_stripes(2, 128*1024, 0, 256*1024, stripes);
assert(stripes[0].req_start == 0 && stripes[0].req_end == 128*1024);
assert(stripes[1].req_start == 0 && stripes[1].req_end == 128*1024);
assert(stripes[2].req_start == 0 && stripes[2].req_end == 0);
assert(stripes[3].req_start == 0 && stripes[3].req_end == 0);
for (int role = 0; role < 4; role++)
{
stripes[role].read_start = stripes[role].req_start;
stripes[role].read_end = stripes[role].req_end;
}
assert(extend_missing_stripes(stripes, read_osd_set, 2, 4) == 0);
assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024);
assert(stripes[1].read_start == 0 && stripes[1].read_end == 128*1024);
assert(stripes[2].read_start == 0 && stripes[2].read_end == 0);
assert(stripes[3].read_start == 0 && stripes[3].read_end == 128*1024);
void *read_buf = alloc_read_buffer(stripes, 4, 0);
for (int i = 0; i < 4; i++)
stripes[i].bmp_buf = bitmaps+i;
assert(read_buf);
assert(stripes[0].read_buf == read_buf);
assert(stripes[1].read_buf == (uint8_t*)read_buf+128*1024);
assert(stripes[3].read_buf == (uint8_t*)read_buf+2*128*1024);
set_pattern(stripes[1].read_buf, 128*1024, PATTERN2);
memcpy(stripes[3].read_buf, rmw_buf, 128*1024);
reconstruct_stripes_ec(stripes, 4, 2, bmp);
assert(bitmaps[0] == 0xFFFFFFFF);
check_pattern(stripes[0].read_buf, 128*1024, PATTERN1);
free(read_buf);
// Done
free(rmw_buf);
free(write_buf);
use_ec(3, 2, false);
}