Compare commits
11 Commits
a6bf6a2cf0
...
90a56a0519
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 90a56a0519 | |
Vitaliy Filippov | d84b84f58d | |
Vitaliy Filippov | 8cfe705d7a | |
Vitaliy Filippov | 66c9271cbd | |
Vitaliy Filippov | 7b37ba921d | |
Vitaliy Filippov | 262c581400 | |
Vitaliy Filippov | ad3b6b7267 | |
Vitaliy Filippov | 1f6a061283 | |
Vitaliy Filippov | fc4d97da10 | |
Vitaliy Filippov | c7a4ce7341 | |
Vitaliy Filippov | ddea31d86d |
|
@ -288,6 +288,24 @@ jobs:
|
|||
echo ""
|
||||
done
|
||||
|
||||
test_create_halfhost:
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
container: ${{env.TEST_IMAGE}}:${{github.sha}}
|
||||
steps:
|
||||
- name: Run test
|
||||
id: test
|
||||
timeout-minutes: 3
|
||||
run: /root/vitastor/tests/test_create_halfhost.sh
|
||||
- name: Print logs
|
||||
if: always() && steps.test.outcome == 'failure'
|
||||
run: |
|
||||
for i in /root/vitastor/testdata/*.log /root/vitastor/testdata/*.txt; do
|
||||
echo "-------- $i --------"
|
||||
cat $i
|
||||
echo ""
|
||||
done
|
||||
|
||||
test_failure_domain:
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
|
|
|
@ -68,11 +68,17 @@ but they are not connected to the cluster.
|
|||
- Type: string
|
||||
|
||||
RDMA device name to use for Vitastor OSD communications (for example,
|
||||
"rocep5s0f0"). Now Vitastor supports all adapters, even ones without
|
||||
ODP support, like Mellanox ConnectX-3 and non-Mellanox cards.
|
||||
"rocep5s0f0"). If not specified, Vitastor will try to find an RoCE
|
||||
device matching [osd_network](osd.en.md#osd_network), preferring RoCEv2,
|
||||
or choose the first available RDMA device if no RoCE devices are
|
||||
found or if `osd_network` is not specified. Auto-selection is also
|
||||
unsupported with old libibverbs < v32, like in Debian 10 Buster or
|
||||
CentOS 7.
|
||||
|
||||
Versions up to Vitastor 1.2.0 required ODP which is only present in
|
||||
Mellanox ConnectX >= 4. See also [rdma_odp](#rdma_odp).
|
||||
Vitastor supports all adapters, even ones without ODP support, like
|
||||
Mellanox ConnectX-3 and non-Mellanox cards. Versions up to Vitastor
|
||||
1.2.0 required ODP which is only present in Mellanox ConnectX >= 4.
|
||||
See also [rdma_odp](#rdma_odp).
|
||||
|
||||
Run `ibv_devinfo -v` as root to list available RDMA devices and their
|
||||
features.
|
||||
|
@ -95,15 +101,17 @@ your device has.
|
|||
## rdma_gid_index
|
||||
|
||||
- Type: integer
|
||||
- Default: 0
|
||||
|
||||
Global address identifier index of the RDMA device to use. Different GID
|
||||
indexes may correspond to different protocols like RoCEv1, RoCEv2 and iWARP.
|
||||
Search for "GID" in `ibv_devinfo -v` output to determine which GID index
|
||||
you need.
|
||||
|
||||
**IMPORTANT:** If you want to use RoCEv2 (as recommended) then the correct
|
||||
rdma_gid_index is usually 1 (IPv6) or 3 (IPv4).
|
||||
If not specified, Vitastor will try to auto-select a RoCEv2 IPv4 GID, then
|
||||
RoCEv2 IPv6 GID, then RoCEv1 IPv4 GID, then RoCEv1 IPv6 GID, then IB GID.
|
||||
GID auto-selection is unsupported with libibverbs < v32.
|
||||
|
||||
A correct rdma_gid_index for RoCEv2 is usually 1 (IPv6) or 3 (IPv4).
|
||||
|
||||
## rdma_mtu
|
||||
|
||||
|
|
|
@ -71,12 +71,17 @@ RDMA может быть нужно только если у клиентов е
|
|||
- Тип: строка
|
||||
|
||||
Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0").
|
||||
Сейчас Vitastor поддерживает все модели адаптеров, включая те, у которых
|
||||
нет поддержки ODP, то есть вы можете использовать RDMA с ConnectX-3 и
|
||||
картами производства не Mellanox.
|
||||
Если не указано, Vitastor попробует найти RoCE-устройство, соответствующее
|
||||
[osd_network](osd.en.md#osd_network), предпочитая RoCEv2, или выбрать первое
|
||||
попавшееся RDMA-устройство, если RoCE-устройств нет или если сеть `osd_network`
|
||||
не задана. Также автовыбор не поддерживается со старыми версиями библиотеки
|
||||
libibverbs < v32, например в Debian 10 Buster или CentOS 7.
|
||||
|
||||
Версии Vitastor до 1.2.0 включительно требовали ODP, который есть только
|
||||
на Mellanox ConnectX 4 и более новых. См. также [rdma_odp](#rdma_odp).
|
||||
Vitastor поддерживает все модели адаптеров, включая те, у которых
|
||||
нет поддержки ODP, то есть вы можете использовать RDMA с ConnectX-3 и
|
||||
картами производства не Mellanox. Версии Vitastor до 1.2.0 включительно
|
||||
требовали ODP, который есть только на Mellanox ConnectX 4 и более новых.
|
||||
См. также [rdma_odp](#rdma_odp).
|
||||
|
||||
Запустите `ibv_devinfo -v` от имени суперпользователя, чтобы посмотреть
|
||||
список доступных RDMA-устройств, их параметры и возможности.
|
||||
|
@ -101,15 +106,18 @@ Control) и ECN (Explicit Congestion Notification).
|
|||
## rdma_gid_index
|
||||
|
||||
- Тип: целое число
|
||||
- Значение по умолчанию: 0
|
||||
|
||||
Номер глобального идентификатора адреса RDMA-устройства, который следует
|
||||
использовать. Разным gid_index могут соответствовать разные протоколы связи:
|
||||
RoCEv1, RoCEv2, iWARP. Чтобы понять, какой нужен вам - смотрите строчки со
|
||||
словом "GID" в выводе команды `ibv_devinfo -v`.
|
||||
|
||||
**ВАЖНО:** Если вы хотите использовать RoCEv2 (как мы и рекомендуем), то
|
||||
правильный rdma_gid_index, как правило, 1 (IPv6) или 3 (IPv4).
|
||||
Если не указан, Vitastor попробует автоматически выбрать сначала GID,
|
||||
соответствующий RoCEv2 IPv4, потом RoCEv2 IPv6, потом RoCEv1 IPv4, потом
|
||||
RoCEv1 IPv6, потом IB. Авто-выбор GID не поддерживается со старыми версиями
|
||||
libibverbs < v32.
|
||||
|
||||
Правильный rdma_gid_index для RoCEv2, как правило, 1 (IPv6) или 3 (IPv4).
|
||||
|
||||
## rdma_mtu
|
||||
|
||||
|
|
|
@ -48,11 +48,17 @@
|
|||
type: string
|
||||
info: |
|
||||
RDMA device name to use for Vitastor OSD communications (for example,
|
||||
"rocep5s0f0"). Now Vitastor supports all adapters, even ones without
|
||||
ODP support, like Mellanox ConnectX-3 and non-Mellanox cards.
|
||||
"rocep5s0f0"). If not specified, Vitastor will try to find an RoCE
|
||||
device matching [osd_network](osd.en.md#osd_network), preferring RoCEv2,
|
||||
or choose the first available RDMA device if no RoCE devices are
|
||||
found or if `osd_network` is not specified. Auto-selection is also
|
||||
unsupported with old libibverbs < v32, like in Debian 10 Buster or
|
||||
CentOS 7.
|
||||
|
||||
Versions up to Vitastor 1.2.0 required ODP which is only present in
|
||||
Mellanox ConnectX >= 4. See also [rdma_odp](#rdma_odp).
|
||||
Vitastor supports all adapters, even ones without ODP support, like
|
||||
Mellanox ConnectX-3 and non-Mellanox cards. Versions up to Vitastor
|
||||
1.2.0 required ODP which is only present in Mellanox ConnectX >= 4.
|
||||
See also [rdma_odp](#rdma_odp).
|
||||
|
||||
Run `ibv_devinfo -v` as root to list available RDMA devices and their
|
||||
features.
|
||||
|
@ -64,12 +70,17 @@
|
|||
PFC (Priority Flow Control) and ECN (Explicit Congestion Notification).
|
||||
info_ru: |
|
||||
Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0").
|
||||
Сейчас Vitastor поддерживает все модели адаптеров, включая те, у которых
|
||||
нет поддержки ODP, то есть вы можете использовать RDMA с ConnectX-3 и
|
||||
картами производства не Mellanox.
|
||||
Если не указано, Vitastor попробует найти RoCE-устройство, соответствующее
|
||||
[osd_network](osd.en.md#osd_network), предпочитая RoCEv2, или выбрать первое
|
||||
попавшееся RDMA-устройство, если RoCE-устройств нет или если сеть `osd_network`
|
||||
не задана. Также автовыбор не поддерживается со старыми версиями библиотеки
|
||||
libibverbs < v32, например в Debian 10 Buster или CentOS 7.
|
||||
|
||||
Версии Vitastor до 1.2.0 включительно требовали ODP, который есть только
|
||||
на Mellanox ConnectX 4 и более новых. См. также [rdma_odp](#rdma_odp).
|
||||
Vitastor поддерживает все модели адаптеров, включая те, у которых
|
||||
нет поддержки ODP, то есть вы можете использовать RDMA с ConnectX-3 и
|
||||
картами производства не Mellanox. Версии Vitastor до 1.2.0 включительно
|
||||
требовали ODP, который есть только на Mellanox ConnectX 4 и более новых.
|
||||
См. также [rdma_odp](#rdma_odp).
|
||||
|
||||
Запустите `ibv_devinfo -v` от имени суперпользователя, чтобы посмотреть
|
||||
список доступных RDMA-устройств, их параметры и возможности.
|
||||
|
@ -94,23 +105,29 @@
|
|||
`ibv_devinfo -v`.
|
||||
- name: rdma_gid_index
|
||||
type: int
|
||||
default: 0
|
||||
info: |
|
||||
Global address identifier index of the RDMA device to use. Different GID
|
||||
indexes may correspond to different protocols like RoCEv1, RoCEv2 and iWARP.
|
||||
Search for "GID" in `ibv_devinfo -v` output to determine which GID index
|
||||
you need.
|
||||
|
||||
**IMPORTANT:** If you want to use RoCEv2 (as recommended) then the correct
|
||||
rdma_gid_index is usually 1 (IPv6) or 3 (IPv4).
|
||||
If not specified, Vitastor will try to auto-select a RoCEv2 IPv4 GID, then
|
||||
RoCEv2 IPv6 GID, then RoCEv1 IPv4 GID, then RoCEv1 IPv6 GID, then IB GID.
|
||||
GID auto-selection is unsupported with libibverbs < v32.
|
||||
|
||||
A correct rdma_gid_index for RoCEv2 is usually 1 (IPv6) or 3 (IPv4).
|
||||
info_ru: |
|
||||
Номер глобального идентификатора адреса RDMA-устройства, который следует
|
||||
использовать. Разным gid_index могут соответствовать разные протоколы связи:
|
||||
RoCEv1, RoCEv2, iWARP. Чтобы понять, какой нужен вам - смотрите строчки со
|
||||
словом "GID" в выводе команды `ibv_devinfo -v`.
|
||||
|
||||
**ВАЖНО:** Если вы хотите использовать RoCEv2 (как мы и рекомендуем), то
|
||||
правильный rdma_gid_index, как правило, 1 (IPv6) или 3 (IPv4).
|
||||
Если не указан, Vitastor попробует автоматически выбрать сначала GID,
|
||||
соответствующий RoCEv2 IPv4, потом RoCEv2 IPv6, потом RoCEv1 IPv4, потом
|
||||
RoCEv1 IPv6, потом IB. Авто-выбор GID не поддерживается со старыми версиями
|
||||
libibverbs < v32.
|
||||
|
||||
Правильный rdma_gid_index для RoCEv2, как правило, 1 (IPv6) или 3 (IPv4).
|
||||
- name: rdma_mtu
|
||||
type: int
|
||||
default: 4096
|
||||
|
|
|
@ -232,6 +232,7 @@ class EtcdAdapter
|
|||
async become_master()
|
||||
{
|
||||
const state = { ...this.mon.get_mon_state(), id: ''+this.mon.etcd_lease_id };
|
||||
console.log('Waiting to become master');
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (1)
|
||||
{
|
||||
|
@ -243,7 +244,6 @@ class EtcdAdapter
|
|||
{
|
||||
break;
|
||||
}
|
||||
console.log('Waiting to become master');
|
||||
await new Promise(ok => setTimeout(ok, this.mon.config.etcd_start_timeout));
|
||||
}
|
||||
console.log('Became master');
|
||||
|
|
|
@ -56,6 +56,7 @@ const etcd_tree = {
|
|||
osd_out_time: 600, // seconds. min: 0
|
||||
placement_levels: { datacenter: 1, rack: 2, host: 3, osd: 4, ... },
|
||||
use_old_pg_combinator: false,
|
||||
osd_backfillfull_ratio: 0.99,
|
||||
// client and osd
|
||||
tcp_header_buffer_size: 65536,
|
||||
use_sync_send_recv: false,
|
||||
|
|
54
mon/mon.js
54
mon/mon.js
|
@ -74,6 +74,7 @@ class Mon
|
|||
this.state = JSON.parse(JSON.stringify(etcd_tree));
|
||||
this.prev_stats = { osd_stats: {}, osd_diff: {} };
|
||||
this.recheck_pgs_active = false;
|
||||
this.updating_total_stats = false;
|
||||
this.watcher_active = false;
|
||||
this.old_pg_config = false;
|
||||
this.old_pg_stats_seen = false;
|
||||
|
@ -658,7 +659,19 @@ class Mon
|
|||
this.etcd_watch_revision, pool_id, up_osds, osd_tree, real_prev_pgs, pool_res.pgs, pg_history);
|
||||
}
|
||||
new_pg_config.hash = tree_hash;
|
||||
return await this.save_pg_config(new_pg_config, etcd_request);
|
||||
const { backfillfull_pools, backfillfull_osds } = sum_object_counts(
|
||||
{ ...this.state, pg: { ...this.state.pg, config: new_pg_config } }, this.config
|
||||
);
|
||||
if (backfillfull_pools.join(',') != ((this.state.pg.config||{}).backfillfull_pools||[]).join(','))
|
||||
{
|
||||
this.log_backfillfull(backfillfull_osds, backfillfull_pools);
|
||||
}
|
||||
new_pg_config.backfillfull_pools = backfillfull_pools.length ? backfillfull_pools : undefined;
|
||||
if (!await this.save_pg_config(new_pg_config, etcd_request))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
async save_pg_config(new_pg_config, etcd_request = { compare: [], success: [] })
|
||||
|
@ -730,7 +743,7 @@ class Mon
|
|||
async update_total_stats()
|
||||
{
|
||||
const txn = [];
|
||||
const { object_counts, object_bytes } = sum_object_counts(this.state, this.config);
|
||||
const { object_counts, object_bytes, backfillfull_pools, backfillfull_osds } = sum_object_counts(this.state, this.config);
|
||||
let stats = sum_op_stats(this.state.osd, this.prev_stats);
|
||||
let { inode_stats, seen_pools } = sum_inode_stats(this.state, this.prev_stats);
|
||||
stats.object_counts = object_counts;
|
||||
|
@ -783,6 +796,27 @@ class Mon
|
|||
{
|
||||
await this.etcd.etcd_call('/kv/txn', { success: txn }, this.config.etcd_mon_timeout, 0);
|
||||
}
|
||||
if (!this.recheck_pgs_active &&
|
||||
backfillfull_pools.join(',') != ((this.state.pg.config||{}).backfillfull_pools||[]).join(','))
|
||||
{
|
||||
this.log_backfillfull(backfillfull_osds, backfillfull_pools);
|
||||
const new_pg_config = { ...this.state.pg.config, backfillfull_pools: backfillfull_pools.length ? backfillfull_pools : undefined };
|
||||
await this.save_pg_config(new_pg_config);
|
||||
}
|
||||
}
|
||||
|
||||
log_backfillfull(osds, pools)
|
||||
{
|
||||
for (const osd in osds)
|
||||
{
|
||||
const bf = osds[osd];
|
||||
console.log('OSD '+osd+' may fill up during rebalance: capacity '+(bf.cap/1024n/1024n)+
|
||||
' MB, target user data '+(bf.clean/1024n/1024n)+' MB');
|
||||
}
|
||||
console.log(
|
||||
(pools.length ? 'Pool(s) '+pools.join(', ') : 'No pools')+
|
||||
' are backfillfull now, applying rebalance configuration'
|
||||
);
|
||||
}
|
||||
|
||||
schedule_update_stats()
|
||||
|
@ -794,7 +828,21 @@ class Mon
|
|||
this.stats_timer = setTimeout(() =>
|
||||
{
|
||||
this.stats_timer = null;
|
||||
this.update_total_stats().catch(console.error);
|
||||
if (this.updating_total_stats)
|
||||
{
|
||||
this.schedule_update_stats();
|
||||
return;
|
||||
}
|
||||
this.updating_total_stats = true;
|
||||
try
|
||||
{
|
||||
this.update_total_stats().catch(console.error);
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
console.error(e);
|
||||
}
|
||||
this.updating_total_stats = false;
|
||||
}, this.config.mon_stats_timeout);
|
||||
}
|
||||
|
||||
|
|
39
mon/stats.js
39
mon/stats.js
|
@ -109,6 +109,8 @@ function sum_object_counts(state, global_config)
|
|||
pgstats[pool_id] = { ...(state.pg.stats[pool_id] || {}), ...(pgstats[pool_id] || {}) };
|
||||
}
|
||||
}
|
||||
const pool_per_osd = {};
|
||||
const clean_per_osd = {};
|
||||
for (const pool_id in pgstats)
|
||||
{
|
||||
let object_size = 0;
|
||||
|
@ -143,10 +145,45 @@ function sum_object_counts(state, global_config)
|
|||
object_bytes[k] += BigInt(st[k+'_count']) * object_size;
|
||||
}
|
||||
}
|
||||
if (st.object_count)
|
||||
{
|
||||
for (const pg_osd of (((state.pg.config.items||{})[pool_id]||{})[pg_num]||{}).osd_set||[])
|
||||
{
|
||||
if (!(pg_osd in clean_per_osd))
|
||||
{
|
||||
clean_per_osd[pg_osd] = 0n;
|
||||
}
|
||||
clean_per_osd[pg_osd] += BigInt(st.object_count);
|
||||
pool_per_osd[pg_osd] = pool_per_osd[pg_osd]||{};
|
||||
pool_per_osd[pg_osd][pool_id] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return { object_counts, object_bytes };
|
||||
// If clean_per_osd[osd] is larger than osd capacity then it will fill up during rebalance
|
||||
let backfillfull_pools = {};
|
||||
let backfillfull_osds = {};
|
||||
for (const osd in clean_per_osd)
|
||||
{
|
||||
const st = state.osd.stats[osd];
|
||||
if (!st || !st.size || !st.data_block_size)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let cap = BigInt(st.size)/BigInt(st.data_block_size);
|
||||
cap = cap * BigInt((global_config.osd_backfillfull_ratio||0.99)*1000000) / 1000000n;
|
||||
if (cap < clean_per_osd[osd])
|
||||
{
|
||||
backfillfull_osds[osd] = { cap: BigInt(st.size), clean: clean_per_osd[osd]*BigInt(st.data_block_size) };
|
||||
for (const pool_id in pool_per_osd[osd])
|
||||
{
|
||||
backfillfull_pools[pool_id] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
backfillfull_pools = Object.keys(backfillfull_pools).sort();
|
||||
return { object_counts, object_bytes, backfillfull_pools, backfillfull_osds };
|
||||
}
|
||||
|
||||
// sum_inode_stats(this.state, this.prev_stats)
|
||||
|
|
|
@ -61,6 +61,10 @@ pkg_check_modules(ISAL libisal)
|
|||
if (ISAL_LIBRARIES)
|
||||
add_definitions(-DWITH_ISAL)
|
||||
endif (ISAL_LIBRARIES)
|
||||
pkg_check_modules(RDMACM librdmacm)
|
||||
if (RDMACM_LIBRARIES)
|
||||
add_definitions(-DWITH_RDMACM)
|
||||
endif (RDMACM_LIBRARIES)
|
||||
|
||||
add_custom_target(build_tests)
|
||||
add_custom_target(test
|
||||
|
|
|
@ -785,7 +785,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
|||
}
|
||||
for (auto & pool_item: value.object_items())
|
||||
{
|
||||
pool_config_t pc;
|
||||
pool_config_t pc = {};
|
||||
// ID
|
||||
pool_id_t pool_id;
|
||||
char null_byte = 0;
|
||||
|
@ -931,12 +931,28 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
|||
// Ignore old key if the new one is present
|
||||
return;
|
||||
}
|
||||
for (auto & pool_id_json: value["backfillfull_pools"].array_items())
|
||||
{
|
||||
auto pool_id = pool_id_json.uint64_value();
|
||||
auto pool_it = this->pool_config.find(pool_id);
|
||||
if (pool_it != this->pool_config.end())
|
||||
{
|
||||
pool_it->second.backfillfull |= 2;
|
||||
}
|
||||
}
|
||||
for (auto & pool_item: this->pool_config)
|
||||
{
|
||||
for (auto & pg_item: pool_item.second.pg_config)
|
||||
{
|
||||
pg_item.second.config_exists = false;
|
||||
}
|
||||
// 3 = was 1 and became 1, 0 = was 0 and became 0
|
||||
if (pool_item.second.backfillfull == 2 || pool_item.second.backfillfull == 1)
|
||||
{
|
||||
if (on_change_backfillfull_hook)
|
||||
on_change_backfillfull_hook(pool_item.first);
|
||||
}
|
||||
pool_item.second.backfillfull = pool_item.second.backfillfull >> 1;
|
||||
}
|
||||
for (auto & pool_item: value["items"].object_items())
|
||||
{
|
||||
|
|
|
@ -62,6 +62,7 @@ struct pool_config_t
|
|||
std::map<pg_num_t, pg_config_t> pg_config;
|
||||
uint64_t scrub_interval;
|
||||
std::string used_for_fs;
|
||||
int backfillfull;
|
||||
};
|
||||
|
||||
struct inode_config_t
|
||||
|
@ -131,6 +132,7 @@ public:
|
|||
std::function<json11::Json()> load_pgs_checks_hook;
|
||||
std::function<void(bool)> on_load_pgs_hook;
|
||||
std::function<void()> on_change_pool_config_hook;
|
||||
std::function<void(pool_id_t)> on_change_backfillfull_hook;
|
||||
std::function<void(pool_id_t, pg_num_t, osd_num_t)> on_change_pg_state_hook;
|
||||
std::function<void(pool_id_t, pg_num_t)> on_change_pg_history_hook;
|
||||
std::function<void(osd_num_t)> on_change_osd_state_hook;
|
||||
|
|
|
@ -70,6 +70,7 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
|
|||
send_out_size = 0;
|
||||
}
|
||||
|
||||
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
|
||||
static bool is_ipv4_gid(ibv_gid_entry *gidx)
|
||||
{
|
||||
return (((uint64_t*)gidx->gid.raw)[0] == 0 &&
|
||||
|
@ -131,6 +132,7 @@ static matched_dev match_device(ibv_device **dev_list, addr_mask_t *networks, in
|
|||
ibv_port_attr portinfo;
|
||||
ibv_gid_entry best_gidx;
|
||||
int res;
|
||||
bool have_non_roce = false, have_roce = false;
|
||||
for (int i = 0; dev_list[i]; ++i)
|
||||
{
|
||||
auto dev = dev_list[i];
|
||||
|
@ -161,6 +163,11 @@ static matched_dev match_device(ibv_device **dev_list, addr_mask_t *networks, in
|
|||
else
|
||||
break;
|
||||
}
|
||||
if (gidx.gid_type != IBV_GID_TYPE_ROCE_V1 &&
|
||||
gidx.gid_type != IBV_GID_TYPE_ROCE_V2)
|
||||
have_non_roce = true;
|
||||
else
|
||||
have_roce = true;
|
||||
if (match_gid(&gidx, networks, nnet))
|
||||
{
|
||||
// Prefer RoCEv2
|
||||
|
@ -186,8 +193,13 @@ cleanup:
|
|||
{
|
||||
log_rdma_dev_port_gid(dev_list[best.dev], best.port, best.gid, best_gidx);
|
||||
}
|
||||
if (best.dev < 0 && have_non_roce && !have_roce)
|
||||
{
|
||||
best.dev = -2;
|
||||
}
|
||||
return best;
|
||||
}
|
||||
#endif
|
||||
|
||||
msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_networks, const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, bool odp, int log_level)
|
||||
{
|
||||
|
@ -229,6 +241,7 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
|
|||
goto cleanup;
|
||||
}
|
||||
}
|
||||
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
|
||||
else if (osd_networks.size())
|
||||
{
|
||||
std::vector<addr_mask_t> nets;
|
||||
|
@ -237,11 +250,17 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
|
|||
nets.push_back(cidr_parse(netstr));
|
||||
}
|
||||
auto best = match_device(dev_list, nets.data(), nets.size(), log_level);
|
||||
if (best.dev < 0)
|
||||
if (best.dev == -2)
|
||||
{
|
||||
best.dev = 0;
|
||||
if (log_level > 0)
|
||||
fprintf(stderr, "No RoCE devices found, using first available RDMA device %s\n", ibv_get_device_name(*dev_list));
|
||||
}
|
||||
else if (best.dev < 0)
|
||||
{
|
||||
if (log_level > 0)
|
||||
fprintf(stderr, "RDMA device matching osd_network is not found, using first available device\n");
|
||||
best.dev = 0;
|
||||
fprintf(stderr, "RDMA device matching osd_network is not found, disabling RDMA\n");
|
||||
goto cleanup;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -250,6 +269,7 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
|
|||
}
|
||||
ctx->dev = dev_list[best.dev];
|
||||
}
|
||||
#endif
|
||||
else
|
||||
{
|
||||
ctx->dev = *dev_list;
|
||||
|
@ -275,18 +295,22 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
|
|||
goto cleanup;
|
||||
}
|
||||
|
||||
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
|
||||
if (gid_index != -1)
|
||||
#endif
|
||||
{
|
||||
ctx->gid_index = gid_index;
|
||||
if (ibv_query_gid_ex(ctx->context, ib_port, gid_index, &ctx->my_gid, 0))
|
||||
ctx->gid_index = gid_index < 0 ? 0 : gid_index;
|
||||
if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid))
|
||||
{
|
||||
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
|
||||
else
|
||||
{
|
||||
// Auto-guess GID
|
||||
ibv_gid_entry best_gidx;
|
||||
for (int k = 0; k < ctx->portinfo.gid_tbl_len; k++)
|
||||
{
|
||||
ibv_gid_entry gidx;
|
||||
|
@ -301,21 +325,24 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
|
|||
{
|
||||
continue;
|
||||
}
|
||||
// Prefer IPv4 RoCEv2 GID by default
|
||||
// Prefer IPv4 RoCEv2 -> IPv6 RoCEv2 -> IPv4 RoCEv1 -> IPv6 RoCEv1 -> IB
|
||||
if (gid_index == -1 ||
|
||||
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 &&
|
||||
(ctx->my_gid.gid_type != IBV_GID_TYPE_ROCE_V2 || is_ipv4_gid(&gidx)))
|
||||
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 && best_gidx.gid_type != IBV_GID_TYPE_ROCE_V2 ||
|
||||
gidx.gid_type == IBV_GID_TYPE_ROCE_V1 && best_gidx.gid_type == IBV_GID_TYPE_IB ||
|
||||
gidx.gid_type == best_gidx.gid_type && is_ipv4_gid(&gidx))
|
||||
{
|
||||
gid_index = k;
|
||||
ctx->my_gid = gidx;
|
||||
best_gidx = gidx;
|
||||
}
|
||||
}
|
||||
ctx->gid_index = gid_index = (gid_index == -1 ? 0 : gid_index);
|
||||
if (log_level > 0)
|
||||
{
|
||||
log_rdma_dev_port_gid(ctx->dev, ctx->ib_port, ctx->gid_index, ctx->my_gid);
|
||||
log_rdma_dev_port_gid(ctx->dev, ctx->ib_port, ctx->gid_index, best_gidx);
|
||||
}
|
||||
ctx->my_gid = best_gidx.gid;
|
||||
}
|
||||
#endif
|
||||
|
||||
ctx->pd = ibv_alloc_pd(ctx->context);
|
||||
if (!ctx->pd)
|
||||
|
@ -431,7 +458,7 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
|
|||
}
|
||||
|
||||
conn->addr.lid = ctx->my_lid;
|
||||
conn->addr.gid = ctx->my_gid.gid;
|
||||
conn->addr.gid = ctx->my_gid;
|
||||
conn->addr.qpn = conn->qp->qp_num;
|
||||
conn->addr.psn = lrand48() & 0xffffff;
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ struct msgr_rdma_context_t
|
|||
uint8_t ib_port;
|
||||
uint8_t gid_index;
|
||||
uint16_t my_lid;
|
||||
ibv_gid_entry my_gid;
|
||||
ibv_gid my_gid;
|
||||
uint32_t mtu;
|
||||
int max_cqe = 0;
|
||||
int used_max_cqe = 0;
|
||||
|
|
|
@ -64,7 +64,7 @@ static void netlink_sock_alloc(struct netlink_ctx *ctx)
|
|||
if (nl_driver_id < 0)
|
||||
{
|
||||
nl_socket_free(sk);
|
||||
fail("Couldn't resolve the nbd netlink family\n");
|
||||
fail("Couldn't resolve the nbd netlink family: %s (code %d)\n", nl_geterror(nl_driver_id), nl_driver_id);
|
||||
}
|
||||
|
||||
ctx->driver_id = nl_driver_id;
|
||||
|
@ -555,7 +555,12 @@ help:
|
|||
fcntl(sockfd[0], F_SETFL, fcntl(sockfd[0], F_GETFL, 0) | O_NONBLOCK);
|
||||
nbd_fd = sockfd[0];
|
||||
load_module();
|
||||
|
||||
bool bg = cfg["foreground"].is_null();
|
||||
if (cfg["logfile"].string_value() != "")
|
||||
{
|
||||
logfile = cfg["logfile"].string_value();
|
||||
}
|
||||
|
||||
if (netlink)
|
||||
{
|
||||
|
@ -588,6 +593,10 @@ help:
|
|||
}
|
||||
close(sockfd[1]);
|
||||
printf("/dev/nbd%d\n", err);
|
||||
if (bg)
|
||||
{
|
||||
daemonize_reopen_stdio();
|
||||
}
|
||||
#else
|
||||
fprintf(stderr, "netlink support is disabled in this build\n");
|
||||
exit(1);
|
||||
|
@ -631,14 +640,10 @@ help:
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (cfg["logfile"].string_value() != "")
|
||||
{
|
||||
logfile = cfg["logfile"].string_value();
|
||||
}
|
||||
if (bg)
|
||||
{
|
||||
daemonize();
|
||||
if (bg)
|
||||
{
|
||||
daemonize();
|
||||
}
|
||||
}
|
||||
// Initialize read state
|
||||
read_state = CL_READ_HDR;
|
||||
|
@ -716,13 +721,17 @@ help:
|
|||
}
|
||||
}
|
||||
|
||||
void daemonize()
|
||||
void daemonize_fork()
|
||||
{
|
||||
if (fork())
|
||||
exit(0);
|
||||
setsid();
|
||||
if (fork())
|
||||
exit(0);
|
||||
}
|
||||
|
||||
void daemonize_reopen_stdio()
|
||||
{
|
||||
close(0);
|
||||
close(1);
|
||||
close(2);
|
||||
|
@ -733,6 +742,12 @@ help:
|
|||
fprintf(stderr, "Warning: Failed to chdir into /\n");
|
||||
}
|
||||
|
||||
void daemonize()
|
||||
{
|
||||
daemonize_fork();
|
||||
daemonize_reopen_stdio();
|
||||
}
|
||||
|
||||
json11::Json::object list_mapped()
|
||||
{
|
||||
const char *self_filename = exe_name;
|
||||
|
@ -783,8 +798,9 @@ help:
|
|||
if (!strcmp(pid_filename, self_filename))
|
||||
{
|
||||
json11::Json::object cfg = nbd_proxy::parse_args(argv.size(), argv.data());
|
||||
if (cfg["command"] == "map")
|
||||
if (cfg["command"] == "map" || cfg["command"] == "netlink-map")
|
||||
{
|
||||
cfg["interface"] = (cfg["command"] == "netlink-map") ? "netlink" : "nbd";
|
||||
cfg.erase("command");
|
||||
cfg["pid"] = pid;
|
||||
mapped["/dev/nbd"+std::to_string(dev_num)] = cfg;
|
||||
|
|
|
@ -261,6 +261,7 @@ struct dd_out_info_t
|
|||
else
|
||||
{
|
||||
// ok
|
||||
out_size = owatch->cfg.size;
|
||||
return true;
|
||||
}
|
||||
// Wait for sub-command
|
||||
|
@ -882,6 +883,8 @@ resume_2:
|
|||
oinfo.end_fsync = oinfo.end_fsync && oinfo.out_seekable;
|
||||
read_offset = 0;
|
||||
read_end = iinfo.in_seekable ? iinfo.in_size-iseek : 0;
|
||||
if (oinfo.out_size && (!read_end || read_end > oinfo.out_size-oseek))
|
||||
read_end = oinfo.out_size-oseek;
|
||||
if (bytelimit && (!read_end || read_end > bytelimit))
|
||||
read_end = bytelimit;
|
||||
clock_gettime(CLOCK_REALTIME, &tv_begin);
|
||||
|
|
|
@ -35,6 +35,7 @@ struct pool_creator_t
|
|||
uint64_t new_pools_mod_rev;
|
||||
json11::Json state_node_tree;
|
||||
json11::Json new_pools;
|
||||
std::map<osd_num_t, json11::Json> osd_stats;
|
||||
|
||||
bool is_done() { return state == 100; }
|
||||
|
||||
|
@ -46,8 +47,6 @@ struct pool_creator_t
|
|||
goto resume_2;
|
||||
else if (state == 3)
|
||||
goto resume_3;
|
||||
else if (state == 4)
|
||||
goto resume_4;
|
||||
else if (state == 5)
|
||||
goto resume_5;
|
||||
else if (state == 6)
|
||||
|
@ -90,13 +89,19 @@ resume_1:
|
|||
// If not forced, check that we have enough osds for pg_size
|
||||
if (!force)
|
||||
{
|
||||
// Get node_placement configuration from etcd
|
||||
// Get node_placement configuration from etcd and OSD stats
|
||||
parent->etcd_txn(json11::Json::object {
|
||||
{ "success", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/config/node_placement") },
|
||||
} }
|
||||
} },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats/") },
|
||||
{ "range_end", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats0") },
|
||||
} },
|
||||
},
|
||||
} },
|
||||
});
|
||||
|
@ -112,10 +117,21 @@ resume_2:
|
|||
return;
|
||||
}
|
||||
|
||||
// Get state_node_tree based on node_placement and osd peer states
|
||||
// Get state_node_tree based on node_placement and osd stats
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][0]["response_range"]["kvs"][0]);
|
||||
state_node_tree = get_state_node_tree(kv.value.object_items());
|
||||
auto node_placement_kv = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][0]["response_range"]["kvs"][0]);
|
||||
timespec tv_now;
|
||||
clock_gettime(CLOCK_REALTIME, &tv_now);
|
||||
uint64_t osd_out_time = parent->cli->config["osd_out_time"].uint64_value();
|
||||
if (!osd_out_time)
|
||||
osd_out_time = 600;
|
||||
osd_stats.clear();
|
||||
parent->iterate_kvs_1(parent->etcd_result["responses"][1]["response_range"]["kvs"], "/osd/stats/", [&](uint64_t cur_osd, json11::Json value)
|
||||
{
|
||||
if ((uint64_t)value["time"].number_value()+osd_out_time >= tv_now.tv_sec)
|
||||
osd_stats[cur_osd] = value;
|
||||
});
|
||||
state_node_tree = get_state_node_tree(node_placement_kv.value.object_items(), osd_stats);
|
||||
}
|
||||
|
||||
// Skip tag checks, if pool has none
|
||||
|
@ -158,42 +174,18 @@ resume_3:
|
|||
}
|
||||
}
|
||||
|
||||
// Get stats (for block_size, bitmap_granularity, ...) of osds in state_node_tree
|
||||
{
|
||||
json11::Json::array osd_stats;
|
||||
|
||||
for (auto osd_num: state_node_tree["osds"].array_items())
|
||||
{
|
||||
osd_stats.push_back(json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats/"+osd_num.as_string()) },
|
||||
} }
|
||||
});
|
||||
}
|
||||
|
||||
parent->etcd_txn(json11::Json::object{ { "success", osd_stats } });
|
||||
}
|
||||
|
||||
state = 4;
|
||||
resume_4:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
if (parent->etcd_err.err)
|
||||
{
|
||||
result = parent->etcd_err;
|
||||
state = 100;
|
||||
return;
|
||||
}
|
||||
|
||||
// Filter osds from state_node_tree based on pool parameters and osd stats
|
||||
{
|
||||
std::vector<json11::Json> osd_stats;
|
||||
for (auto & ocr: parent->etcd_result["responses"].array_items())
|
||||
std::vector<json11::Json> filtered_osd_stats;
|
||||
for (auto & osd_num: state_node_tree["osds"].array_items())
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(ocr["response_range"]["kvs"][0]);
|
||||
osd_stats.push_back(kv.value);
|
||||
auto st_it = osd_stats.find(osd_num.uint64_value());
|
||||
if (st_it != osd_stats.end())
|
||||
{
|
||||
filtered_osd_stats.push_back(st_it->second);
|
||||
}
|
||||
}
|
||||
guess_block_size(osd_stats);
|
||||
guess_block_size(filtered_osd_stats);
|
||||
state_node_tree = filter_state_node_tree_by_stats(state_node_tree, osd_stats);
|
||||
}
|
||||
|
||||
|
@ -201,8 +193,7 @@ resume_4:
|
|||
{
|
||||
auto failure_domain = cfg["failure_domain"].string_value() == ""
|
||||
? "host" : cfg["failure_domain"].string_value();
|
||||
uint64_t max_pg_size = get_max_pg_size(state_node_tree["nodes"].object_items(),
|
||||
failure_domain, cfg["root_node"].string_value());
|
||||
uint64_t max_pg_size = get_max_pg_size(state_node_tree, failure_domain, cfg["root_node"].string_value());
|
||||
|
||||
if (cfg["pg_size"].uint64_value() > max_pg_size)
|
||||
{
|
||||
|
@ -358,56 +349,50 @@ resume_8:
|
|||
|
||||
// Returns a JSON object of form {"nodes": {...}, "osds": [...]} that
|
||||
// contains: all nodes (osds, hosts, ...) based on node_placement config
|
||||
// and current peer state, and a list of active peer osds.
|
||||
json11::Json get_state_node_tree(json11::Json::object node_placement)
|
||||
// and current osd stats.
|
||||
json11::Json get_state_node_tree(json11::Json::object node_placement, std::map<osd_num_t, json11::Json> & osd_stats)
|
||||
{
|
||||
// Erase non-peer osd nodes from node_placement
|
||||
// Erase non-existing osd nodes from node_placement
|
||||
for (auto np_it = node_placement.begin(); np_it != node_placement.end();)
|
||||
{
|
||||
// Numeric nodes are osds
|
||||
osd_num_t osd_num = stoull_full(np_it->first);
|
||||
|
||||
// If node is osd and it is not in peer states, erase it
|
||||
if (osd_num > 0 &&
|
||||
parent->cli->st_cli.peer_states.find(osd_num) == parent->cli->st_cli.peer_states.end())
|
||||
{
|
||||
// If node is osd and its stats do not exist, erase it
|
||||
if (osd_num > 0 && osd_stats.find(osd_num) == osd_stats.end())
|
||||
node_placement.erase(np_it++);
|
||||
}
|
||||
else
|
||||
np_it++;
|
||||
}
|
||||
|
||||
// List of peer osds
|
||||
std::vector<std::string> peer_osds;
|
||||
// List of osds
|
||||
std::vector<std::string> existing_osds;
|
||||
|
||||
// Record peer osds and add missing osds/hosts to np
|
||||
for (auto & ps: parent->cli->st_cli.peer_states)
|
||||
// Record osds and add missing osds/hosts to np
|
||||
for (auto & ps: osd_stats)
|
||||
{
|
||||
std::string osd_num = std::to_string(ps.first);
|
||||
|
||||
// Record peer osd
|
||||
peer_osds.push_back(osd_num);
|
||||
// Record osd
|
||||
existing_osds.push_back(osd_num);
|
||||
|
||||
// Add osd, if necessary
|
||||
if (node_placement.find(osd_num) == node_placement.end())
|
||||
// Add host if necessary
|
||||
std::string osd_host = ps.second["host"].as_string();
|
||||
if (node_placement.find(osd_host) == node_placement.end())
|
||||
{
|
||||
std::string osd_host = ps.second["host"].as_string();
|
||||
|
||||
// Add host, if necessary
|
||||
if (node_placement.find(osd_host) == node_placement.end())
|
||||
{
|
||||
node_placement[osd_host] = json11::Json::object {
|
||||
{ "level", "host" }
|
||||
};
|
||||
}
|
||||
|
||||
node_placement[osd_num] = json11::Json::object {
|
||||
{ "parent", osd_host }
|
||||
node_placement[osd_host] = json11::Json::object {
|
||||
{ "level", "host" }
|
||||
};
|
||||
}
|
||||
|
||||
// Add osd
|
||||
node_placement[osd_num] = json11::Json::object {
|
||||
{ "parent", node_placement[osd_num]["parent"].is_null() ? osd_host : node_placement[osd_num]["parent"] },
|
||||
{ "level", "osd" },
|
||||
};
|
||||
}
|
||||
|
||||
return json11::Json::object { { "osds", peer_osds }, { "nodes", node_placement } };
|
||||
return json11::Json::object { { "osds", existing_osds }, { "nodes", node_placement } };
|
||||
}
|
||||
|
||||
// Returns new state_node_tree based on given state_node_tree with osds
|
||||
|
@ -534,15 +519,13 @@ resume_8:
|
|||
// filtered out by stats parameters (block_size, bitmap_granularity) in
|
||||
// given osd_stats and current pool config.
|
||||
// Requires: state_node_tree["osds"] must match osd_stats 1-1
|
||||
json11::Json filter_state_node_tree_by_stats(const json11::Json & state_node_tree, std::vector<json11::Json> & osd_stats)
|
||||
json11::Json filter_state_node_tree_by_stats(const json11::Json & state_node_tree, std::map<osd_num_t, json11::Json> & osd_stats)
|
||||
{
|
||||
auto & osds = state_node_tree["osds"].array_items();
|
||||
|
||||
// Accepted state_node_tree nodes
|
||||
auto accepted_nodes = state_node_tree["nodes"].object_items();
|
||||
|
||||
// List of accepted osds
|
||||
std::vector<std::string> accepted_osds;
|
||||
json11::Json::array accepted_osds;
|
||||
|
||||
block_size = cfg["block_size"].uint64_value()
|
||||
? cfg["block_size"].uint64_value()
|
||||
|
@ -554,21 +537,25 @@ resume_8:
|
|||
? etcd_state_client_t::parse_immediate_commit(cfg["immediate_commit"].string_value(), IMMEDIATE_ALL)
|
||||
: parent->cli->st_cli.global_immediate_commit;
|
||||
|
||||
for (size_t i = 0; i < osd_stats.size(); i++)
|
||||
for (auto osd_num_json: state_node_tree["osds"].array_items())
|
||||
{
|
||||
auto & os = osd_stats[i];
|
||||
// Get osd number
|
||||
auto osd_num = osds[i].as_string();
|
||||
auto osd_num = osd_num_json.uint64_value();
|
||||
auto os_it = osd_stats.find(osd_num);
|
||||
if (os_it == osd_stats.end())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
auto & os = os_it->second;
|
||||
if (!os["data_block_size"].is_null() && os["data_block_size"] != block_size ||
|
||||
!os["bitmap_granularity"].is_null() && os["bitmap_granularity"] != bitmap_granularity ||
|
||||
!os["immediate_commit"].is_null() &&
|
||||
etcd_state_client_t::parse_immediate_commit(os["immediate_commit"].string_value(), IMMEDIATE_NONE) < immediate_commit)
|
||||
{
|
||||
accepted_nodes.erase(osd_num);
|
||||
accepted_nodes.erase(osd_num_json.as_string());
|
||||
}
|
||||
else
|
||||
{
|
||||
accepted_osds.push_back(osd_num);
|
||||
accepted_osds.push_back(osd_num_json);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -576,87 +563,28 @@ resume_8:
|
|||
}
|
||||
|
||||
// Returns maximum pg_size possible for given node_tree and failure_domain, starting at parent_node
|
||||
uint64_t get_max_pg_size(json11::Json::object node_tree, const std::string & level, const std::string & parent_node)
|
||||
uint64_t get_max_pg_size(json11::Json state_node_tree, const std::string & level, const std::string & root_node)
|
||||
{
|
||||
uint64_t max_pg_sz = 0;
|
||||
|
||||
std::vector<std::string> nodes;
|
||||
|
||||
// Check if parent node is an osd (numeric)
|
||||
if (parent_node != "" && stoull_full(parent_node))
|
||||
std::set<std::string> level_seen;
|
||||
for (auto & osd: state_node_tree["osds"].array_items())
|
||||
{
|
||||
// Add it to node list if osd is in node tree
|
||||
if (node_tree.find(parent_node) != node_tree.end())
|
||||
nodes.push_back(parent_node);
|
||||
}
|
||||
// If parent node given, ...
|
||||
else if (parent_node != "")
|
||||
{
|
||||
// ... look for children nodes of this parent
|
||||
for (auto & sn: node_tree)
|
||||
// find OSD parent at <level>, but stop at <root_node>
|
||||
auto cur_id = osd.string_value();
|
||||
auto cur = state_node_tree["nodes"][cur_id];
|
||||
while (!cur.is_null())
|
||||
{
|
||||
auto & props = sn.second.object_items();
|
||||
|
||||
auto parent_prop = props.find("parent");
|
||||
if (parent_prop != props.end() && (parent_prop->second.as_string() == parent_node))
|
||||
if (cur["level"] == level)
|
||||
{
|
||||
nodes.push_back(sn.first);
|
||||
|
||||
// If we're not looking for all osds, we only need a single
|
||||
// child osd node
|
||||
if (level != "osd" && stoull_full(sn.first))
|
||||
break;
|
||||
level_seen.insert(cur_id);
|
||||
break;
|
||||
}
|
||||
if (cur_id == root_node)
|
||||
break;
|
||||
cur_id = cur["parent"].string_value();
|
||||
cur = state_node_tree["nodes"][cur_id];
|
||||
}
|
||||
}
|
||||
// No parent node given, and we're not looking for all osds
|
||||
else if (level != "osd")
|
||||
{
|
||||
// ... look for all level nodes
|
||||
for (auto & sn: node_tree)
|
||||
{
|
||||
auto & props = sn.second.object_items();
|
||||
|
||||
auto level_prop = props.find("level");
|
||||
if (level_prop != props.end() && (level_prop->second.as_string() == level))
|
||||
{
|
||||
nodes.push_back(sn.first);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Otherwise, ...
|
||||
else
|
||||
{
|
||||
// ... we're looking for osd nodes only
|
||||
for (auto & sn: node_tree)
|
||||
{
|
||||
if (stoull_full(sn.first))
|
||||
{
|
||||
nodes.push_back(sn.first);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process gathered nodes
|
||||
for (auto & node: nodes)
|
||||
{
|
||||
// Check for osd node, return constant max size
|
||||
if (stoull_full(node))
|
||||
{
|
||||
max_pg_sz += 1;
|
||||
}
|
||||
// Otherwise, ...
|
||||
else
|
||||
{
|
||||
// ... exclude parent node from tree, and ...
|
||||
node_tree.erase(parent_node);
|
||||
|
||||
// ... descend onto the resulting tree
|
||||
max_pg_sz += get_max_pg_size(node_tree, level, node);
|
||||
}
|
||||
}
|
||||
|
||||
return max_pg_sz;
|
||||
return level_seen.size();
|
||||
}
|
||||
|
||||
json11::Json create_pool(const etcd_kv_t & kv)
|
||||
|
|
|
@ -315,7 +315,7 @@ static int block_nfs3_read_proc(void *opaque, rpc_op_t *rop)
|
|||
if (aligned_count % alignment)
|
||||
aligned_count = aligned_count + alignment - (aligned_count % alignment);
|
||||
aligned_count -= aligned_offset;
|
||||
void *buf = malloc_or_die(aligned_count);
|
||||
void *buf = malloc_or_die(aligned_count); // тут нужен RDMA-доступный буфер
|
||||
xdr_add_malloc(rop->xdrs, buf);
|
||||
cluster_op_t *op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_READ;
|
||||
|
|
|
@ -499,6 +499,19 @@ void nfs_proxy_t::check_default_pool()
|
|||
}
|
||||
}
|
||||
|
||||
void nfs_proxy_t::create_client()
|
||||
{
|
||||
auto cli = new nfs_client_t();
|
||||
cli->parent = this->proxy;
|
||||
if (kvfs)
|
||||
nfs_kv_procs(cli);
|
||||
else
|
||||
nfs_block_procs(cli);
|
||||
for (auto & fn: pmap.proc_table)
|
||||
cli->proc_table.insert(fn);
|
||||
rpc_clients.insert(cli);
|
||||
}
|
||||
|
||||
void nfs_proxy_t::do_accept(int listen_fd)
|
||||
{
|
||||
struct sockaddr_storage addr;
|
||||
|
@ -512,18 +525,8 @@ void nfs_proxy_t::do_accept(int listen_fd)
|
|||
fcntl(nfs_fd, F_SETFL, fcntl(nfs_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
int one = 1;
|
||||
setsockopt(nfs_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||
auto cli = new nfs_client_t();
|
||||
if (kvfs)
|
||||
nfs_kv_procs(cli);
|
||||
else
|
||||
nfs_block_procs(cli);
|
||||
cli->parent = this;
|
||||
auto cli = this->create_client();
|
||||
cli->nfs_fd = nfs_fd;
|
||||
for (auto & fn: pmap.proc_table)
|
||||
{
|
||||
cli->proc_table.insert(fn);
|
||||
}
|
||||
rpc_clients[nfs_fd] = cli;
|
||||
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events)
|
||||
{
|
||||
// Handle incoming event
|
||||
|
@ -781,7 +784,7 @@ void nfs_client_t::stop()
|
|||
if (refs <= 0)
|
||||
{
|
||||
auto parent = this->parent;
|
||||
parent->rpc_clients.erase(nfs_fd);
|
||||
parent->rpc_clients.erase(this);
|
||||
parent->active_connections--;
|
||||
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
|
||||
close(nfs_fd);
|
||||
|
@ -885,25 +888,32 @@ void rpc_queue_reply(rpc_op_t *rop)
|
|||
r = xdr_encode(rop->xdrs, rop->reply_fn, rop->reply);
|
||||
assert(r);
|
||||
}
|
||||
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
|
||||
assert(iov_count > 0);
|
||||
rop->reply_marker = 0;
|
||||
for (unsigned i = 0; i < iov_count; i++)
|
||||
if (!self->rdma_conn)
|
||||
{
|
||||
rop->reply_marker += iov_list[i].iov_len;
|
||||
}
|
||||
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
|
||||
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
|
||||
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
|
||||
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
|
||||
to_outbox.push_back(NULL);
|
||||
for (unsigned i = 0; i < iov_count; i++)
|
||||
{
|
||||
to_send_list.push_back(iov_list[i]);
|
||||
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
|
||||
assert(iov_count > 0);
|
||||
rop->reply_marker = 0;
|
||||
for (unsigned i = 0; i < iov_count; i++)
|
||||
{
|
||||
rop->reply_marker += iov_list[i].iov_len;
|
||||
}
|
||||
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
|
||||
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
|
||||
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
|
||||
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
|
||||
to_outbox.push_back(NULL);
|
||||
for (unsigned i = 0; i < iov_count; i++)
|
||||
{
|
||||
to_send_list.push_back(iov_list[i]);
|
||||
to_outbox.push_back(NULL);
|
||||
}
|
||||
to_outbox[to_outbox.size()-1] = rop;
|
||||
self->submit_send();
|
||||
}
|
||||
else
|
||||
{
|
||||
self->rdma_conn->queue_reply(rop, iov_list, iov_count);
|
||||
}
|
||||
to_outbox[to_outbox.size()-1] = rop;
|
||||
self->submit_send();
|
||||
}
|
||||
|
||||
int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len)
|
||||
|
@ -968,6 +978,11 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
|
|||
// Incoming buffer isn't needed to handle request, so return 0
|
||||
return 0;
|
||||
}
|
||||
return handle_rpc_body(xdrs, &inmsg);
|
||||
}
|
||||
|
||||
int nfs_client_t::handle_rpc_body(XDR *xdrs, rpc_msg *inmsg, rdma_msg *rmsg)
|
||||
{
|
||||
// Find decoder for the request
|
||||
auto proc_it = proc_table.find((rpc_service_proc_t){
|
||||
.prog = inmsg->body.cbody.prog,
|
||||
|
@ -1045,7 +1060,12 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
|
|||
.request = ((uint8_t*)rop) + sizeof(rpc_op_t),
|
||||
.reply = ((uint8_t*)rop) + sizeof(rpc_op_t) + proc_it->req_size,
|
||||
};
|
||||
// FIXME: malloc and avoid copy?
|
||||
memcpy(&rop->in_msg, inmsg, sizeof(rpc_msg));
|
||||
if (rmsg)
|
||||
{
|
||||
memcpy(&rop->in_rdma_msg, rmsg, sizeof(rdma_msg));
|
||||
}
|
||||
// Try to decode the request
|
||||
// req_fn may be NULL, that means function has no arguments
|
||||
if (proc_it->req_fn && !proc_it->req_fn(xdrs, rop->request))
|
||||
|
@ -1068,8 +1088,8 @@ void nfs_proxy_t::daemonize()
|
|||
// Stop all clients because client I/O sometimes breaks during daemonize
|
||||
// I.e. the new process stops receiving events on the old FD
|
||||
// It doesn't happen if we call sleep(1) here, but we don't want to call sleep(1)...
|
||||
for (auto & clp: rpc_clients)
|
||||
clp.second->stop();
|
||||
for (auto & cli: rpc_clients)
|
||||
cli->stop();
|
||||
if (fork())
|
||||
exit(0);
|
||||
setsid();
|
||||
|
|
|
@ -55,7 +55,7 @@ public:
|
|||
vitastorkv_dbw_t *db = NULL;
|
||||
kv_fs_state_t *kvfs = NULL;
|
||||
block_fs_state_t *blockfs = NULL;
|
||||
std::map<int, nfs_client_t*> rpc_clients;
|
||||
std::set<nfs_client_t*> rpc_clients;
|
||||
|
||||
std::vector<XDR*> xdr_pool;
|
||||
|
||||
|
@ -72,6 +72,7 @@ public:
|
|||
void watch_stats();
|
||||
void parse_stats(etcd_kv_t & kv);
|
||||
void check_default_pool();
|
||||
nfs_client_t* create_client();
|
||||
void do_accept(int listen_fd);
|
||||
void daemonize();
|
||||
void write_pid();
|
||||
|
@ -101,15 +102,20 @@ struct rpc_free_buffer_t
|
|||
unsigned size;
|
||||
};
|
||||
|
||||
struct nfs_rdma_conn_t;
|
||||
|
||||
class nfs_client_t
|
||||
{
|
||||
public:
|
||||
nfs_proxy_t *parent = NULL;
|
||||
int nfs_fd;
|
||||
int epoll_events = 0;
|
||||
int refs = 0;
|
||||
bool stopped = false;
|
||||
std::set<rpc_service_proc_t> proc_table;
|
||||
nfs_rdma_conn_t *rdma_conn = NULL;
|
||||
|
||||
// <TCP>
|
||||
int nfs_fd;
|
||||
int epoll_events = 0;
|
||||
|
||||
// Read state
|
||||
rpc_cur_buffer_t cur_buffer = { 0 };
|
||||
|
@ -130,7 +136,9 @@ public:
|
|||
void submit_send();
|
||||
void handle_send(int result);
|
||||
int handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len);
|
||||
// </TCP>
|
||||
|
||||
int handle_rpc_body(XDR *xdrs, rpc_msg *inmsg, rdma_msg *rmsg);
|
||||
bool deref();
|
||||
void stop();
|
||||
};
|
||||
|
|
|
@ -0,0 +1,697 @@
|
|||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
//
|
||||
// NFS RDMA support
|
||||
|
||||
#define _XOPEN_SOURCE
|
||||
#include <limits.h>
|
||||
|
||||
#include <netinet/tcp.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/wait.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include "proto/nfs.h"
|
||||
#include "proto/rpc.h"
|
||||
#include "proto/portmap.h"
|
||||
|
||||
#include "addr_util.h"
|
||||
#include "str_util.h"
|
||||
#include "json_util.h"
|
||||
#include "nfs_proxy.h"
|
||||
#include "nfs_kv.h"
|
||||
#include "nfs_block.h"
|
||||
#include "nfs_common.h"
|
||||
#include "http_client.h"
|
||||
#include "cli.h"
|
||||
|
||||
#define NFS_RDMACM_PRIVATE_DATA_MAGIC_LE 0x180eabf6
|
||||
|
||||
struct __attribute__((__packed__)) nfs_rdmacm_private
|
||||
{
|
||||
uint32_t format_identifier; // magic, should be 0xf6ab0e18 in big endian
|
||||
uint8_t version; // version, 1
|
||||
uint8_t remote_invalidate; // remote invalidation flag (1 or 0)
|
||||
uint8_t max_send_size; // maximum RDMA Send operation size / 1024 - 1 (i.e. 0 is 1 KB, 255 is 256 KB)
|
||||
uint8_t max_recv_size; // maximum RDMA Receive operation size / 1024 - 1 (i.e. 0 is 1 KB, 255 is 256 KB)
|
||||
};
|
||||
|
||||
struct nfs_rdma_context_t
|
||||
{
|
||||
std::string bind_address;
|
||||
int rdmacm_port = 0;
|
||||
int max_send = 8, max_recv = 8; --- FIXME max_send and max_recv should probably be equal
|
||||
uint64_t max_send_size = 256*1024, max_recv_size = 256*1024;
|
||||
|
||||
nfs_proxy_t *proxy = NULL;
|
||||
epoll_manager_t *epmgr = NULL;
|
||||
|
||||
int max_cqe = 0, used_max_cqe = 0;
|
||||
rdma_event_channel *rdmacm_evch = NULL;
|
||||
rdma_cm_id *listener_id = NULL;
|
||||
ibv_comp_channel *channel = NULL;
|
||||
ibv_cq *cq = NULL;
|
||||
};
|
||||
|
||||
struct nfs_rdma_buf_t
|
||||
{
|
||||
void *buf = NULL;
|
||||
size_t len = 0;
|
||||
ibv_mr *mr = NULL;
|
||||
};
|
||||
|
||||
struct nfs_rdma_conn_t
|
||||
{
|
||||
rdma_cm_id *id = NULL;
|
||||
int max_send = 8, max_recv = 8;
|
||||
bool established = false;
|
||||
std::vector<nfs_rdma_buf_t> recv_buffers;
|
||||
int next_recv_buf = 0;
|
||||
std::vector<nfs_rdma_buf_t> send_buffers;
|
||||
std::vector<rpc_op_t*> outbox;
|
||||
int outbox_pos = 0;
|
||||
};
|
||||
|
||||
nfs_rdma_context_t* nfs_proxy_t::create_rdma(const std::string & bind_address, int rdmacm_port)
|
||||
{
|
||||
nfs_rdma_context_t* self = new nfs_rdma_context_t;
|
||||
self->proxy = this;
|
||||
self->epmgr = epmgr;
|
||||
self->bind_address = bind_address;
|
||||
self->rdmacm_port = rdmacm_port;
|
||||
self->rdmacm_evch = rdma_create_event_channel();
|
||||
if (!self->rdmacm_evch)
|
||||
{
|
||||
fprintf(stderr, "Failed to initialize RDMA-CM event channel: %s (code %d)\n", strerror(errno), errno);
|
||||
delete self;
|
||||
return NULL;
|
||||
}
|
||||
fcntl(self->rdmacm_evch->fd, F_SETFL, fcntl(self->rdmacm_evch->fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->tfd->set_fd_handler(self->rdmacm_evch->fd, false, [this](int rdmacm_eventfd, int epoll_events)
|
||||
{
|
||||
self->handle_rdmacm_events();
|
||||
});
|
||||
int r = rdma_create_id(self->rdmacm_evch, &self->listener_id, NULL, RDMA_PS_TCP);
|
||||
if (r != 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
|
||||
delete self;
|
||||
return NULL;
|
||||
}
|
||||
sockaddr_storage addr;
|
||||
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
|
||||
{
|
||||
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
|
||||
delete self;
|
||||
return NULL;
|
||||
}
|
||||
r = rdma_bind_addr(self->listener_id, (sockaddr*)&addr);
|
||||
if (r != 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to bind RDMA-CM to %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
|
||||
delete self;
|
||||
return NULL;
|
||||
}
|
||||
r = rdma_listen(self->listener_id, 128);
|
||||
if (r != 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to listen RDMA-CM: %s (code %d)\n", strerror(errno), errno);
|
||||
delete self;
|
||||
return NULL;
|
||||
}
|
||||
self->channel = ibv_create_comp_channel(self->listener_id->ibv_context);
|
||||
if (!self->channel)
|
||||
{
|
||||
fprintf(stderr, "Couldn't create RDMA completion channel\n");
|
||||
delete self;
|
||||
return NULL;
|
||||
}
|
||||
fcntl(self->channel->fd, F_SETFL, fcntl(self->channel->fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->tfd->set_fd_handler(self->channel->fd, false, [this](int channel_eventfd, int epoll_events)
|
||||
{
|
||||
handle_io();
|
||||
});
|
||||
self->max_cqe = 4096;
|
||||
self->cq = ibv_create_cq(self->listener_id->ibv_context, self->max_cqe, NULL, self->channel, 0);
|
||||
if (!self->cq)
|
||||
{
|
||||
fprintf(stderr, "Couldn't create RDMA completion queue\n");
|
||||
delete self;
|
||||
return NULL;
|
||||
}
|
||||
return self;
|
||||
}
|
||||
|
||||
nfs_rdma_context_t::~nfs_rdma_context_t()
|
||||
{
|
||||
if (listener_id)
|
||||
{
|
||||
int r = rdma_destroy_id(listener_id);
|
||||
if (r != 0)
|
||||
fprintf(stderr, "Failed to destroy RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
|
||||
else
|
||||
listener_id = NULL;
|
||||
}
|
||||
if (rdmacm_evch)
|
||||
{
|
||||
epmgr->tfd->set_fd_handler(rdmacm_evch->fd, false, NULL);
|
||||
rdma_destroy_event_channel(rdmacm_evch);
|
||||
rdmacm_evch = NULL;
|
||||
}
|
||||
if (cq)
|
||||
{
|
||||
ibv_destroy_cq(cq);
|
||||
cq = NULL;
|
||||
}
|
||||
if (channel)
|
||||
{
|
||||
ibv_destroy_comp_channel(channel);
|
||||
channel = NULL;
|
||||
}
|
||||
//if (mr)
|
||||
// ibv_dereg_mr(mr);
|
||||
//if (pd)
|
||||
// ibv_dealloc_pd(pd);
|
||||
//if (context)
|
||||
// ibv_close_device(context);
|
||||
}
|
||||
|
||||
void nfs_proxy_t::handle_rdmacm_events()
|
||||
{
|
||||
rdma_cm_event *ev = NULL;
|
||||
while (1)
|
||||
{
|
||||
int r = rdma_get_cm_event(rdmacm_evch, &ev);
|
||||
if (r != 0)
|
||||
{
|
||||
if (errno == EAGAIN || errno == EINTR)
|
||||
break;
|
||||
fprintf(stderr, "Failed to get RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
|
||||
exit(1);
|
||||
}
|
||||
if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST)
|
||||
{
|
||||
rdmacm_accept(ev);
|
||||
}
|
||||
else if (ev->event == RDMA_CM_EVENT_CONNECT_ERROR ||
|
||||
ev->event == RDMA_CM_EVENT_REJECTED ||
|
||||
ev->event == RDMA_CM_EVENT_DISCONNECTED ||
|
||||
ev->event == RDMA_CM_EVENT_DEVICE_REMOVAL)
|
||||
{
|
||||
auto event_type_name = ev->event == RDMA_CM_EVENT_CONNECT_ERROR ? "RDMA_CM_EVENT_CONNECT_ERROR" : (
|
||||
ev->event == RDMA_CM_EVENT_REJECTED ? "RDMA_CM_EVENT_REJECTED" : (
|
||||
ev->event == RDMA_CM_EVENT_DISCONNECTED ? "RDMA_CM_EVENT_DISCONNECTED" : "RDMA_CM_EVENT_DEVICE_REMOVAL"));
|
||||
auto conn_it = rdma_connections.find(ev->id);
|
||||
if (conn_it == rdma_connections.end())
|
||||
{
|
||||
fprintf(stderr, "Received %s event for an unknown connection 0x%lx - ignoring\n",
|
||||
event_type_name, (uint64_t)ev->id);
|
||||
}
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "Received %s event for connection 0x%lx - closing it\n",
|
||||
event_type_name, (uint64_t)ev->id);
|
||||
auto conn = conn_it->second;
|
||||
delete conn;
|
||||
}
|
||||
}
|
||||
else if (ev->event == RDMA_CM_EVENT_ESTABLISHED)
|
||||
{
|
||||
rdmacm_established(ev);
|
||||
}
|
||||
else if (ev->event == RDMA_CM_EVENT_ADDR_CHANGE || ev->event == RDMA_CM_EVENT_TIMEWAIT_EXIT)
|
||||
{
|
||||
// Do nothing
|
||||
}
|
||||
else
|
||||
{
|
||||
// Other events are unexpected
|
||||
fprintf(stderr, "Unexpected RDMA-CM event type: %d\n", ev->event);
|
||||
}
|
||||
r = rdma_ack_cm_event(ev);
|
||||
if (r != 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to ack (free) RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
|
||||
{
|
||||
ctx->used_max_cqe += max_send+max_recv;
|
||||
if (ctx->used_max_cqe > ctx->max_cqe)
|
||||
{
|
||||
// Resize CQ
|
||||
int new_max_cqe = ctx->max_cqe;
|
||||
while (ctx->used_max_cqe > new_max_cqe)
|
||||
{
|
||||
new_max_cqe *= 2;
|
||||
}
|
||||
if (ibv_resize_cq(ctx->cq, new_max_cqe) != 0)
|
||||
{
|
||||
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
|
||||
delete conn;
|
||||
return NULL;
|
||||
}
|
||||
ctx->max_cqe = new_max_cqe;
|
||||
}
|
||||
ibv_qp_init_attr init_attr = {
|
||||
.send_cq = ctx->cq,
|
||||
.recv_cq = ctx->cq,
|
||||
.cap = {
|
||||
.max_send_wr = max_send*2, // ?????? тут большой вопрос сколько на самом деле
|
||||
.max_recv_wr = max_recv,
|
||||
.max_send_sge = max_sge,
|
||||
.max_recv_sge = max_sge,
|
||||
},
|
||||
.qp_type = IBV_QPT_RC,
|
||||
};
|
||||
r = rdma_create_qp(ev->id, NULL, &init_attr);
|
||||
if (r != 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to create a queue pair via RDMA-CM: %s (code %d)\n", strerror(errno), errno);
|
||||
exit(1);
|
||||
}
|
||||
nfs_rdmacm_private private_data = {
|
||||
.format_identifier = NFS_RDMACM_PRIVATE_DATA_MAGIC_LE,
|
||||
.version = 1,
|
||||
.remote_invalidate = ?,
|
||||
.max_send_size = (max_send_size <= 256*1024 ? max_send_size/1024 - 1 : 255),
|
||||
.max_recv_size = (max_recv_size <= 256*1024 ? max_recv_size/1024 - 1 : 255),
|
||||
};
|
||||
rdma_conn_param conn_params = {
|
||||
.private_data = &private_data,
|
||||
.private_data_len = sizeof(private_data),
|
||||
//.responder_resources = max_qp_rd_atom of the device?,
|
||||
//.initiator_depth = max_qp_init_rd_atom of the device?,
|
||||
.rnr_retry_count = 7,
|
||||
//.qp_num = manually created QP number?,
|
||||
};
|
||||
r = rdma_accept(ev->id, &conn_params);
|
||||
if (r != 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
|
||||
rdma_destroy_qp(ev->id);
|
||||
rdma_destroy_id(ev->id);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto conn = new nfs_rdma_conn_t();
|
||||
conn->id = ctx->id;
|
||||
rdma_connections[ctx->id] = conn;
|
||||
rdma_connections_by_qp[conn->id->qp->qp_num];
|
||||
auto cli = this->proxy->create_client();
|
||||
cli->rdma_conn = conn;
|
||||
}
|
||||
}
|
||||
|
||||
nfs_rdma_conn_t::~nfs_rdma_conn_t()
|
||||
{
|
||||
if (id)
|
||||
{
|
||||
parent->rdma_connections.erase(id);
|
||||
if (id->qp)
|
||||
{
|
||||
parent->rdma_connections_by_qp.erase(id->qp->qp_num);
|
||||
rdma_destroy_qp(id);
|
||||
}
|
||||
rdma_destroy_id(id);
|
||||
}
|
||||
}
|
||||
|
||||
void nfs_rdma_context_t::rdmacm_established(rdma_cm_event *ev)
|
||||
{
|
||||
auto conn_it = rdma_connections.find(ev->id);
|
||||
if (conn_it == rdma_connections.end())
|
||||
{
|
||||
fprintf(stderr, "Received RDMA_CM_EVENT_ESTABLISHED event for an unknown connection 0x%lx - ignoring\n", (uint64_t)ev->id);
|
||||
return;
|
||||
}
|
||||
fprintf(stderr, "Received RDMA_CM_EVENT_ESTABLISHED event for connection 0x%lx - connection established\n", (uint64_t)ev->id);
|
||||
auto conn = conn_it->second;
|
||||
conn->established = true;
|
||||
// Handle NFS private_data
|
||||
if (ev->private_data_len >= sizeof(nfs_rdmacm_private))
|
||||
{
|
||||
nfs_rdmacm_private *private_data = (nfs_rdmacm_private *)ev->private_data;
|
||||
if (private_data->format_identifier == NFS_RDMACM_PRIVATE_DATA_MAGIC_LE &&
|
||||
private_data->version == 1)
|
||||
{
|
||||
conn->remote_invalidate = private_data->remote_invalidate;
|
||||
conn->remote_max_send = (private_data->max_send_size+1) * 1024;
|
||||
conn->remote_max_recv = (private_data->max_recv_size+1) * 1024;
|
||||
if (conn->remote_max_recv < conn->max_send)
|
||||
conn->max_send = conn->remote_max_recv;
|
||||
}
|
||||
}
|
||||
// Post initial receive requests
|
||||
conn->post_initial_receives();
|
||||
}
|
||||
|
||||
void nfs_rdma_conn_t::post_initial_receives()
|
||||
{
|
||||
while (cur_recv < max_recv)
|
||||
{
|
||||
auto b = create_buf(max_recv_size);
|
||||
recv_buffers.push_back(b);
|
||||
post_recv(b);
|
||||
}
|
||||
}
|
||||
|
||||
nfs_rdma_buf_t nfs_rdma_conn_t::create_buf(size_t len)
|
||||
{
|
||||
nfs_rdma_buf_t b;
|
||||
b.buf = malloc_or_die(len);
|
||||
b.len = len;
|
||||
b.mr = ibv_reg_mr(id->pd, b.buf, len, IBV_ACCESS_LOCAL_WRITE);
|
||||
if (!b.mr)
|
||||
{
|
||||
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
void nfs_rdma_conn_t::post_recv(nfs_rdma_buf_t b)
|
||||
{
|
||||
ibv_sge sge = {
|
||||
.addr = (uintptr_t)b.buf,
|
||||
.length = (uint32_t)b.len,
|
||||
.lkey = b.mr->lkey,
|
||||
};
|
||||
ibv_recv_wr *bad_wr = NULL;
|
||||
ibv_recv_wr wr = {
|
||||
.wr_id = 1, // 1 is any read, 2 is any write :)
|
||||
.sg_list = &sge,
|
||||
.num_sge = 1,
|
||||
};
|
||||
int err = ibv_post_recv(id->qp, &wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
{
|
||||
fprintf(stderr, "RDMA receive failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
cur_recv++;
|
||||
}
|
||||
|
||||
void nfs_rdma_conn_t::queue_reply(rpc_op_t *rop)
|
||||
{
|
||||
outbox.push_back(rop);
|
||||
post_send();
|
||||
}
|
||||
|
||||
void nfs_rdma_conn_t::post_send()
|
||||
{
|
||||
while (outbox.size() > outbox_pos)
|
||||
{
|
||||
auto rop = outbox[outbox_pos];
|
||||
// Check that exactly 1 write chunk is provided for READ3 and READLINK3
|
||||
if (rop->in_msg.body.cbody.prog == NFS_PROGRAM &&
|
||||
(rop->in_msg.body.cbody.proc == NFS3_READ || rop->in_msg.body.cbody.proc == NFS3_READLINK) &&
|
||||
(!rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes ||
|
||||
rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes->next))
|
||||
{
|
||||
rop->rdma_error = ERR_CHUNK;
|
||||
...
|
||||
}
|
||||
iovec *iov_list = NULL;
|
||||
unsigned iov_count = 0;
|
||||
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
|
||||
assert(iov_count > 0);
|
||||
nfs_rdma_buf_t b;
|
||||
if (send_buffers.size())
|
||||
{
|
||||
b = send_buffers.back();
|
||||
send_buffers.pop_back();
|
||||
}
|
||||
else
|
||||
{
|
||||
b = create_buf(max_send_size);
|
||||
}
|
||||
// READ3resok and READLINK3resok - extract last byte buffer from iovecs and send it in a "write chunk"
|
||||
iovec *chunk_iov = NULL;
|
||||
if (rop->in_msg.body.cbody.prog == NFS_PROGRAM &&
|
||||
(rop->in_msg.body.cbody.proc == NFS3_READ && ((READ3res*)rop->reply)->status == NFS3_OK ||
|
||||
rop->in_msg.body.cbody.proc == NFS3_READLINK && ((READLINK3res*)rop->reply)->status == NFS3_OK))
|
||||
{
|
||||
assert(iov_count > 1);
|
||||
iov_count--;
|
||||
chunk_iov = &iov_list[iov_count];
|
||||
}
|
||||
// FIXME: Avoid extra copy - to do that we have to initially encode into nfs_rdma_buf_t
|
||||
size_t pos = 0;
|
||||
for (unsigned i = 0; i < iov_count; i++)
|
||||
{
|
||||
assert(pos + iov_list[i].iov_len <= b.len);
|
||||
memcpy(b.buf + pos, iov_list[i].iov_base, iov_list[i].iov_len);
|
||||
pos += iov_list[i].iov_len;
|
||||
}
|
||||
ibv_sge chunk_sge;
|
||||
ibv_send_wr chunk_wr;
|
||||
ibv_sge sge = {
|
||||
.addr = (uintptr_t)b.buf,
|
||||
.length = (uint32_t)pos,
|
||||
.lkey = b.mr->lkey,
|
||||
};
|
||||
ibv_send_wr *bad_wr = NULL;
|
||||
ibv_send_wr wr = {
|
||||
.wr_id = 2, // 2 is send
|
||||
.sg_list = &sge,
|
||||
.num_sge = 1,
|
||||
.opcode = IBV_WR_SEND,
|
||||
.send_flags = IBV_SEND_SIGNALED,
|
||||
};
|
||||
ibv_send_wr *send_wr = ≀
|
||||
if (chunk_iov != NULL)
|
||||
{
|
||||
auto & wr_chunk = rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes->entry.target;
|
||||
chunk_sge = {
|
||||
.addr = (uintptr_t)chunk_iov->iov_base,
|
||||
.length = (uint32_t)chunk_iov->iov_len,
|
||||
.lkey = parent->get_rdma_data_lkey(chunk_iov->iov_base),
|
||||
};
|
||||
chunk_wr = {
|
||||
.wr_id = 4, // 2 is chunk write
|
||||
.sg_list = &chunk_sge,
|
||||
.num_sge = 1,
|
||||
.opcode = IBV_WR_RDMA_WRITE,
|
||||
.wr = {
|
||||
.rdma = {
|
||||
.remote_addr = wr_chunk.offset,
|
||||
.rkey = wr_chunk.handle,
|
||||
},
|
||||
},
|
||||
};
|
||||
// send chunk_wr first, then normal wr
|
||||
chunk_wr.next = ≀
|
||||
send_wr = &chunk_wr;
|
||||
}
|
||||
int err = ibv_post_send(id->qp, send_wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
{
|
||||
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
cl->rdma_conn->cur_send++;
|
||||
}
|
||||
}
|
||||
|
||||
#define RDMA_EVENTS_AT_ONCE 32
|
||||
|
||||
void nfs_rdma_context_t::handle_io()
|
||||
{
|
||||
// Request next notification
|
||||
ibv_cq *ev_cq;
|
||||
void *ev_ctx;
|
||||
// FIXME: This is inefficient as it calls read()...
|
||||
if (ibv_get_cq_event(channel, &ev_cq, &ev_ctx) == 0)
|
||||
{
|
||||
ibv_ack_cq_events(cq, 1);
|
||||
}
|
||||
if (ibv_req_notify_cq(cq, 0) != 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to request RDMA completion notification, exiting\n");
|
||||
exit(1);
|
||||
}
|
||||
ibv_wc wc[RDMA_EVENTS_AT_ONCE];
|
||||
int event_count;
|
||||
do
|
||||
{
|
||||
event_count = ibv_poll_cq(cq, RDMA_EVENTS_AT_ONCE, wc);
|
||||
for (int i = 0; i < event_count; i++)
|
||||
{
|
||||
auto conn_it = rdma_connections_by_qp.find(wc[i].qp_num);
|
||||
if (conn_it == rdma_connections_by_qp.end())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
auto conn = conn_it->second;
|
||||
if (wc[i].status != IBV_WC_SUCCESS)
|
||||
{
|
||||
fprintf(stderr, "RDMA work request failed for queue %d with status: %s, stopping client\n", wc[i].qp_num, ibv_wc_status_str(wc[i].status));
|
||||
delete conn;
|
||||
continue;
|
||||
}
|
||||
//auto read_buf_it = conn->buffers.find(wc[i].wr_id);
|
||||
//auto read_buf = read_buf_it != conn->buffers.end() ? read_buf_it->second : NULL;
|
||||
auto is_send = wc[i].wr_id == 2;//conn->sends.at(wc[i].wr_id);
|
||||
if (!is_send)
|
||||
{
|
||||
conn->cur_recv--;
|
||||
auto & b = conn->recv_buffers[conn->next_recv_buf];
|
||||
auto is_continued = conn->handle_recv(b.buf, wc[i].byte_len);
|
||||
if (is_continued)
|
||||
{
|
||||
// Buffer is required to handle request
|
||||
// Due to the credit-based flow control in RPC-RDMA, we can just remove that buffer and reuse it later
|
||||
used_buffers[b.buf] = b;
|
||||
conn->recv_buffers.erase(conn->recv_buffers.begin()+conn->next_recv_buf, conn->recv_buffers.begin()+conn->next_recv_buf+1);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Buffer is not required to handle request and can be reused immediately
|
||||
conn->post_recv(b);
|
||||
conn->next_recv_buf = (conn->next_recv_buf+1) % conn->recv_buffers.size();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto rop = conn->outbox[0];
|
||||
conn->outbox.erase(conn->outbox.begin(), conn->outbox.begin()+1);
|
||||
xdr_reset(rop->xdrs);
|
||||
parent->xdr_pool.push_back(rop->xdrs);
|
||||
if (rop->buffer && rop->referenced)
|
||||
{
|
||||
// Reuse the buffer
|
||||
auto & ub = conn->used_buffers.at(rop->buffer);
|
||||
conn->recv_buffers.push_back(ub);
|
||||
conn->post_recv(ub);
|
||||
}
|
||||
free(rop);
|
||||
conn->post_send();
|
||||
}
|
||||
}
|
||||
} while (event_count > 0);
|
||||
}
|
||||
|
||||
// returns false if handling is done, returns true if handling is continued asynchronously
|
||||
bool nfs_rdma_conn_t::handle_recv(uint8_t *buf, size_t len)
|
||||
{
|
||||
// Take an XDR object from the pool
|
||||
XDR *xdrs;
|
||||
if (parent->xdr_pool.size())
|
||||
{
|
||||
xdrs = parent->xdr_pool.back();
|
||||
parent->xdr_pool.pop_back();
|
||||
}
|
||||
else
|
||||
{
|
||||
xdrs = xdr_create();
|
||||
}
|
||||
// Decode the RDMA-RPC header
|
||||
rdma_msg rmsg;
|
||||
if (!xdr_decode(xdrs, buf, len, (xdrproc_t)xdr_rdma_msg, &rmsg))
|
||||
{
|
||||
// Invalid message, ignore it
|
||||
xdr_reset(xdrs);
|
||||
parent->xdr_pool.push_back(xdrs);
|
||||
return 0;
|
||||
}
|
||||
if (rmsg.rdma_vers != 1 || rmsg.rdma_body.proc != RDMA_MSG /*&& rmsg.rdma_body.proc != RDMA_NOMSG*/)
|
||||
{
|
||||
// Bad RDMA-RPC version
|
||||
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(sizeof(rpc_op_t));
|
||||
*rop = (rpc_op_t){
|
||||
.client = this,
|
||||
.xdrs = xdrs,
|
||||
.rdma_error = ERR_VERS,
|
||||
/*
|
||||
uint32_t x = 1;
|
||||
.out_rdma_msg = (rdma_msg){
|
||||
.rdma_xid = rmsg.rdma_xid,
|
||||
.rdma_vers = rmsg.rdma_vers,
|
||||
.rdma_credit = rmsg.rdma_credit,
|
||||
.rdma_body = (rdma_body){
|
||||
.proc = RDMA_ERROR,
|
||||
.rdma_error = (rpc_rdma_error){
|
||||
.err = ERR_VERS,
|
||||
.range = (rpc_rdma_errvers){
|
||||
.rdma_vers_low = x,
|
||||
.rdma_vers_high = x,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
*/
|
||||
};
|
||||
rpc_queue_reply(rop);
|
||||
// Incoming buffer isn't needed to handle request, so return 0
|
||||
return 0;
|
||||
}
|
||||
rpc_msg inmsg = { .xid = rmsg.rdma_xid };
|
||||
if (!xdr_rpc_msg_body(xdrs, &inmsg.body) || inmsg.body.dir != RPC_CALL)
|
||||
{
|
||||
// Invalid message, ignore it
|
||||
xdr_reset(xdrs);
|
||||
parent->xdr_pool.push_back(xdrs);
|
||||
return 0;
|
||||
}
|
||||
// Check that exactly 1 read chunk is provided for WRITE3 and SYMLINK3
|
||||
if (inmsg.body.cbody.prog == NFS_PROGRAM &&
|
||||
(inmsg.body.cbody.proc == NFS3_WRITE || inmsg.body.cbody.proc == NFS3_SYMLINK) &&
|
||||
(!rmsg.rdma_body.rdma_msg.rdma_reads || rmsg.rdma_body.rdma_msg.rdma_reads->next))
|
||||
{
|
||||
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(sizeof(rpc_op_t));
|
||||
*rop = (rpc_op_t){
|
||||
.client = this,
|
||||
.xdrs = xdrs,
|
||||
.rdma_error = ERR_CHUNK,
|
||||
};
|
||||
rpc_queue_reply(rop);
|
||||
return 0;
|
||||
}
|
||||
// Read that chunk
|
||||
if (inmsg.body.cbody.prog == NFS_PROGRAM && inmsg.body.cbody.proc == NFS3_WRITE)
|
||||
{
|
||||
auto & rd_chunk = rmsg.rdma_body.rdma_msg.rdma_reads->entry.target;
|
||||
auto buf = parent->malloc_rdma(rd_chunk.length);
|
||||
ibv_sge chunk_sge = {
|
||||
.addr = (uintptr_t)buf,
|
||||
.length = rd_chunk.length,
|
||||
.lkey = parent->get_rdma_data_lkey(buf),
|
||||
};
|
||||
ibv_recv_wr *bad_wr = NULL;
|
||||
ibv_recv_wr wr = {
|
||||
.wr_id = 3, // 3 is chunk read
|
||||
.sg_list = &chunk_sge,
|
||||
.num_sge = 1,
|
||||
.opcode = IBV_WR_RDMA_READ,
|
||||
.wr = {
|
||||
.rdma = {
|
||||
.remote_addr = rd_chunk.offset,
|
||||
.rkey = rd_chunk.handle,
|
||||
},
|
||||
},
|
||||
};
|
||||
int err = ibv_post_recv(id->qp, &wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
{
|
||||
fprintf(stderr, "RDMA receive failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
cur_recv++;
|
||||
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(sizeof(rpc_op_t));
|
||||
*rop = (rpc_op_t){
|
||||
.client = this,
|
||||
.xdrs = xdrs,
|
||||
.request = ((uint8_t*)rop) + sizeof(rpc_op_t),
|
||||
.reply = ((uint8_t*)rop) + sizeof(rpc_op_t) + proc_it->req_size,
|
||||
};
|
||||
chunk_inbox.push_back();
|
||||
return 1;
|
||||
}
|
||||
return client->handle_rpc_body(xdrs, &inmsg, &rmsg);
|
||||
}
|
|
@ -168,7 +168,7 @@ struct WRITE3args {
|
|||
offset3 offset;
|
||||
count3 count;
|
||||
stable_how stable;
|
||||
opaque data<>;
|
||||
opaque data<>; /* RDMA DDP-eligible */
|
||||
};
|
||||
|
||||
typedef opaque writeverf3[NFS3_WRITEVERFSIZE];
|
||||
|
@ -409,7 +409,7 @@ struct READ3resok {
|
|||
post_op_attr file_attributes;
|
||||
count3 count;
|
||||
bool eof;
|
||||
opaque data<>;
|
||||
opaque data<>; /* RDMA DDP-eligible */
|
||||
};
|
||||
|
||||
struct READ3resfail {
|
||||
|
@ -514,7 +514,7 @@ typedef string nfspath3<>;
|
|||
|
||||
struct symlinkdata3 {
|
||||
sattr3 symlink_attributes;
|
||||
nfspath3 symlink_data;
|
||||
nfspath3 symlink_data; /* RDMA DDP-eligible */
|
||||
};
|
||||
|
||||
struct SYMLINK3args {
|
||||
|
@ -546,7 +546,7 @@ struct READLINK3args {
|
|||
|
||||
struct READLINK3resok {
|
||||
post_op_attr symlink_attributes;
|
||||
nfspath3 data;
|
||||
nfspath3 data; /* RDMA DDP-eligible */
|
||||
};
|
||||
|
||||
struct READLINK3resfail {
|
||||
|
|
|
@ -272,7 +272,7 @@ xdr_WRITE3args (XDR *xdrs, WRITE3args *objp)
|
|||
return FALSE;
|
||||
if (!xdr_stable_how (xdrs, &objp->stable))
|
||||
return FALSE;
|
||||
if (!xdr_bytes(xdrs, &objp->data, ~0))
|
||||
if (!xdr_bytes(xdrs, &objp->data, ~0, true))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
@ -829,7 +829,7 @@ xdr_READ3resok (XDR *xdrs, READ3resok *objp)
|
|||
return FALSE;
|
||||
if (!xdr_bool (xdrs, &objp->eof))
|
||||
return FALSE;
|
||||
if (!xdr_bytes(xdrs, &objp->data, ~0))
|
||||
if (!xdr_bytes(xdrs, &objp->data, ~0, true))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
@ -1173,10 +1173,10 @@ xdr_PATHCONF3res (XDR *xdrs, PATHCONF3res *objp)
|
|||
}
|
||||
|
||||
bool_t
|
||||
xdr_nfspath3 (XDR *xdrs, nfspath3 *objp)
|
||||
xdr_nfspath3 (XDR *xdrs, nfspath3 *objp, bool rdma_chunk)
|
||||
{
|
||||
|
||||
if (!xdr_string (xdrs, objp, ~0))
|
||||
if (!xdr_string (xdrs, objp, ~0, rdma_chunk))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
@ -1187,7 +1187,7 @@ xdr_symlinkdata3 (XDR *xdrs, symlinkdata3 *objp)
|
|||
|
||||
if (!xdr_sattr3 (xdrs, &objp->symlink_attributes))
|
||||
return FALSE;
|
||||
if (!xdr_nfspath3 (xdrs, &objp->symlink_data))
|
||||
if (!xdr_nfspath3 (xdrs, &objp->symlink_data, true))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
@ -1259,7 +1259,7 @@ xdr_READLINK3resok (XDR *xdrs, READLINK3resok *objp)
|
|||
|
||||
if (!xdr_post_op_attr (xdrs, &objp->symlink_attributes))
|
||||
return FALSE;
|
||||
if (!xdr_nfspath3 (xdrs, &objp->data))
|
||||
if (!xdr_nfspath3 (xdrs, &objp->data, true))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "rpc.h"
|
||||
#include "rpc_rdma.h"
|
||||
|
||||
struct rpc_op_t;
|
||||
|
||||
|
@ -27,12 +28,15 @@ inline bool operator < (const rpc_service_proc_t & a, const rpc_service_proc_t &
|
|||
return a.prog < b.prog || a.prog == b.prog && (a.vers < b.vers || a.vers == b.vers && a.proc < b.proc);
|
||||
}
|
||||
|
||||
struct rdma_msg;
|
||||
|
||||
struct rpc_op_t
|
||||
{
|
||||
void *client;
|
||||
uint8_t *buffer;
|
||||
XDR *xdrs;
|
||||
rpc_msg in_msg, out_msg;
|
||||
rdma_msg in_rdma_msg;
|
||||
void *request;
|
||||
void *reply;
|
||||
xdrproc_t reply_fn;
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
/*
|
||||
* Please do not edit this file.
|
||||
* It was generated using rpcgen.
|
||||
*/
|
||||
|
||||
#ifndef _RPC_RDMA_H_RPCGEN
|
||||
#define _RPC_RDMA_H_RPCGEN
|
||||
|
||||
#include "xdr_impl.h"
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
|
||||
struct xdr_rdma_segment {
|
||||
uint32 handle;
|
||||
uint32 length;
|
||||
uint64 offset;
|
||||
};
|
||||
typedef struct xdr_rdma_segment xdr_rdma_segment;
|
||||
|
||||
struct xdr_read_chunk {
|
||||
uint32 position;
|
||||
struct xdr_rdma_segment target;
|
||||
};
|
||||
typedef struct xdr_read_chunk xdr_read_chunk;
|
||||
|
||||
struct xdr_read_list {
|
||||
struct xdr_read_chunk entry;
|
||||
struct xdr_read_list *next;
|
||||
};
|
||||
typedef struct xdr_read_list xdr_read_list;
|
||||
|
||||
struct xdr_write_chunk {
|
||||
struct {
|
||||
u_int target_len;
|
||||
struct xdr_rdma_segment *target_val;
|
||||
} target;
|
||||
};
|
||||
typedef struct xdr_write_chunk xdr_write_chunk;
|
||||
|
||||
struct xdr_write_list {
|
||||
struct xdr_write_chunk entry;
|
||||
struct xdr_write_list *next;
|
||||
};
|
||||
typedef struct xdr_write_list xdr_write_list;
|
||||
|
||||
struct rpc_rdma_header {
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
};
|
||||
typedef struct rpc_rdma_header rpc_rdma_header;
|
||||
|
||||
struct rpc_rdma_header_nomsg {
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
};
|
||||
typedef struct rpc_rdma_header_nomsg rpc_rdma_header_nomsg;
|
||||
|
||||
struct rpc_rdma_header_padded {
|
||||
uint32 rdma_align;
|
||||
uint32 rdma_thresh;
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
};
|
||||
typedef struct rpc_rdma_header_padded rpc_rdma_header_padded;
|
||||
|
||||
enum rpc_rdma_errcode {
|
||||
ERR_VERS = 1,
|
||||
ERR_CHUNK = 2,
|
||||
};
|
||||
typedef enum rpc_rdma_errcode rpc_rdma_errcode;
|
||||
|
||||
struct rpc_rdma_errvers {
|
||||
uint32 rdma_vers_low;
|
||||
uint32 rdma_vers_high;
|
||||
};
|
||||
typedef struct rpc_rdma_errvers rpc_rdma_errvers;
|
||||
|
||||
struct rpc_rdma_error {
|
||||
rpc_rdma_errcode err;
|
||||
union {
|
||||
rpc_rdma_errvers range;
|
||||
};
|
||||
};
|
||||
typedef struct rpc_rdma_error rpc_rdma_error;
|
||||
|
||||
enum rdma_proc {
|
||||
RDMA_MSG = 0,
|
||||
RDMA_NOMSG = 1,
|
||||
RDMA_MSGP = 2,
|
||||
RDMA_DONE = 3,
|
||||
RDMA_ERROR = 4,
|
||||
};
|
||||
typedef enum rdma_proc rdma_proc;
|
||||
|
||||
struct rdma_body {
|
||||
rdma_proc proc;
|
||||
union {
|
||||
rpc_rdma_header rdma_msg;
|
||||
rpc_rdma_header_nomsg rdma_nomsg;
|
||||
rpc_rdma_header_padded rdma_msgp;
|
||||
rpc_rdma_error rdma_error;
|
||||
};
|
||||
};
|
||||
typedef struct rdma_body rdma_body;
|
||||
|
||||
struct rdma_msg {
|
||||
uint32 rdma_xid;
|
||||
uint32 rdma_vers;
|
||||
uint32 rdma_credit;
|
||||
rdma_body rdma_body;
|
||||
};
|
||||
typedef struct rdma_msg rdma_msg;
|
||||
|
||||
/* the xdr functions */
|
||||
|
||||
|
||||
extern bool_t xdr_xdr_rdma_segment (XDR *, xdr_rdma_segment*);
|
||||
extern bool_t xdr_xdr_read_chunk (XDR *, xdr_read_chunk*);
|
||||
extern bool_t xdr_xdr_read_list (XDR *, xdr_read_list*);
|
||||
extern bool_t xdr_xdr_write_chunk (XDR *, xdr_write_chunk*);
|
||||
extern bool_t xdr_xdr_write_list (XDR *, xdr_write_list*);
|
||||
extern bool_t xdr_rpc_rdma_header (XDR *, rpc_rdma_header*);
|
||||
extern bool_t xdr_rpc_rdma_header_nomsg (XDR *, rpc_rdma_header_nomsg*);
|
||||
extern bool_t xdr_rpc_rdma_header_padded (XDR *, rpc_rdma_header_padded*);
|
||||
extern bool_t xdr_rpc_rdma_errcode (XDR *, rpc_rdma_errcode*);
|
||||
extern bool_t xdr_rpc_rdma_errvers (XDR *, rpc_rdma_errvers*);
|
||||
extern bool_t xdr_rpc_rdma_error (XDR *, rpc_rdma_error*);
|
||||
extern bool_t xdr_rdma_proc (XDR *, rdma_proc*);
|
||||
extern bool_t xdr_rdma_body (XDR *, rdma_body*);
|
||||
extern bool_t xdr_rdma_msg (XDR *, rdma_msg*);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* !_RPC_RDMA_H_RPCGEN */
|
|
@ -0,0 +1,166 @@
|
|||
/* RFC 8166 - Remote Direct Memory Access Transport for Remote Procedure Call Version 1 */
|
||||
|
||||
/*
|
||||
* Copyright (c) 2010-2017 IETF Trust and the persons
|
||||
* identified as authors of the code. All rights reserved.
|
||||
*
|
||||
* The authors of the code are:
|
||||
* B. Callaghan, T. Talpey, and C. Lever
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with
|
||||
* or without modification, are permitted provided that the
|
||||
* following conditions are met:
|
||||
*
|
||||
* - Redistributions of source code must retain the above
|
||||
* copyright notice, this list of conditions and the
|
||||
* following disclaimer.
|
||||
*
|
||||
* - Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the
|
||||
* following disclaimer in the documentation and/or other
|
||||
* materials provided with the distribution.
|
||||
*
|
||||
* - Neither the name of Internet Society, IETF or IETF
|
||||
* Trust, nor the names of specific contributors, may be
|
||||
* used to endorse or promote products derived from this
|
||||
* software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS
|
||||
* AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
|
||||
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
||||
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
|
||||
* EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
||||
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
|
||||
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||||
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
|
||||
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
||||
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Plain RDMA segment (Section 3.4.3)
|
||||
*/
|
||||
struct xdr_rdma_segment {
|
||||
uint32 handle; /* Registered memory handle */
|
||||
uint32 length; /* Length of the chunk in bytes */
|
||||
uint64 offset; /* Chunk virtual address or offset */
|
||||
};
|
||||
|
||||
/*
|
||||
* RDMA read segment (Section 3.4.5)
|
||||
*/
|
||||
struct xdr_read_chunk {
|
||||
uint32 position; /* Position in XDR stream */
|
||||
struct xdr_rdma_segment target;
|
||||
};
|
||||
|
||||
/*
|
||||
* Read list (Section 4.3.1)
|
||||
*/
|
||||
struct xdr_read_list {
|
||||
struct xdr_read_chunk entry;
|
||||
struct xdr_read_list *next;
|
||||
};
|
||||
|
||||
/*
|
||||
* Write chunk (Section 3.4.6)
|
||||
*/
|
||||
struct xdr_write_chunk {
|
||||
struct xdr_rdma_segment target<>;
|
||||
};
|
||||
|
||||
/*
|
||||
* Write list (Section 4.3.2)
|
||||
*/
|
||||
struct xdr_write_list {
|
||||
struct xdr_write_chunk entry;
|
||||
struct xdr_write_list *next;
|
||||
};
|
||||
|
||||
/*
|
||||
* Chunk lists (Section 4.3)
|
||||
*/
|
||||
struct rpc_rdma_header {
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
/* rpc body follows */
|
||||
};
|
||||
|
||||
struct rpc_rdma_header_nomsg {
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
};
|
||||
|
||||
/* Not to be used */
|
||||
struct rpc_rdma_header_padded {
|
||||
uint32 rdma_align;
|
||||
uint32 rdma_thresh;
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
/* rpc body follows */
|
||||
};
|
||||
|
||||
/*
|
||||
* Error handling (Section 4.5)
|
||||
*/
|
||||
enum rpc_rdma_errcode {
|
||||
ERR_VERS = 1, /* Value fixed for all versions */
|
||||
ERR_CHUNK = 2
|
||||
};
|
||||
|
||||
/* Structure fixed for all versions */
|
||||
struct rpc_rdma_errvers {
|
||||
uint32 rdma_vers_low;
|
||||
uint32 rdma_vers_high;
|
||||
};
|
||||
|
||||
union rpc_rdma_error switch (rpc_rdma_errcode err) {
|
||||
case ERR_VERS:
|
||||
rpc_rdma_errvers range;
|
||||
case ERR_CHUNK:
|
||||
void;
|
||||
};
|
||||
|
||||
/*
|
||||
* Procedures (Section 4.2.4)
|
||||
*/
|
||||
enum rdma_proc {
|
||||
RDMA_MSG = 0, /* Value fixed for all versions */
|
||||
RDMA_NOMSG = 1, /* Value fixed for all versions */
|
||||
RDMA_MSGP = 2, /* Not to be used */
|
||||
RDMA_DONE = 3, /* Not to be used */
|
||||
RDMA_ERROR = 4 /* Value fixed for all versions */
|
||||
};
|
||||
|
||||
/* The position of the proc discriminator field is
|
||||
* fixed for all versions */
|
||||
union rdma_body switch (rdma_proc proc) {
|
||||
case RDMA_MSG:
|
||||
rpc_rdma_header rdma_msg;
|
||||
case RDMA_NOMSG:
|
||||
rpc_rdma_header_nomsg rdma_nomsg;
|
||||
case RDMA_MSGP: /* Not to be used */
|
||||
rpc_rdma_header_padded rdma_msgp;
|
||||
case RDMA_DONE: /* Not to be used */
|
||||
void;
|
||||
case RDMA_ERROR:
|
||||
rpc_rdma_error rdma_error;
|
||||
};
|
||||
|
||||
/*
|
||||
* Fixed header fields (Section 4.2)
|
||||
*/
|
||||
struct rdma_msg {
|
||||
uint32 rdma_xid; /* Position fixed for all versions */
|
||||
uint32 rdma_vers; /* Position fixed for all versions */
|
||||
uint32 rdma_credit; /* Position fixed for all versions */
|
||||
rdma_body rdma_body;
|
||||
};
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
* Please do not edit this file.
|
||||
* It was generated using rpcgen.
|
||||
*/
|
||||
|
||||
#include "rpc_rdma.h"
|
||||
#include "xdr_impl_inline.h"
|
||||
|
||||
bool_t
|
||||
xdr_xdr_rdma_segment (XDR *xdrs, xdr_rdma_segment *objp)
|
||||
{
|
||||
|
||||
if (!xdr_uint32 (xdrs, &objp->handle))
|
||||
return FALSE;
|
||||
if (!xdr_uint32 (xdrs, &objp->length))
|
||||
return FALSE;
|
||||
if (!xdr_uint64 (xdrs, &objp->offset))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_xdr_read_chunk (XDR *xdrs, xdr_read_chunk *objp)
|
||||
{
|
||||
|
||||
if (!xdr_uint32 (xdrs, &objp->position))
|
||||
return FALSE;
|
||||
if (!xdr_xdr_rdma_segment (xdrs, &objp->target))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_xdr_read_list (XDR *xdrs, xdr_read_list *objp)
|
||||
{
|
||||
|
||||
if (!xdr_xdr_read_chunk (xdrs, &objp->entry))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_xdr_write_chunk (XDR *xdrs, xdr_write_chunk *objp)
|
||||
{
|
||||
|
||||
if (!xdr_array (xdrs, (char **)&objp->target.target_val, (u_int *) &objp->target.target_len, ~0,
|
||||
sizeof (xdr_rdma_segment), (xdrproc_t) xdr_xdr_rdma_segment))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_xdr_write_list (XDR *xdrs, xdr_write_list *objp)
|
||||
{
|
||||
|
||||
if (!xdr_xdr_write_chunk (xdrs, &objp->entry))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_header (XDR *xdrs, rpc_rdma_header *objp)
|
||||
{
|
||||
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_header_nomsg (XDR *xdrs, rpc_rdma_header_nomsg *objp)
|
||||
{
|
||||
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_header_padded (XDR *xdrs, rpc_rdma_header_padded *objp)
|
||||
{
|
||||
|
||||
if (!xdr_uint32 (xdrs, &objp->rdma_align))
|
||||
return FALSE;
|
||||
if (!xdr_uint32 (xdrs, &objp->rdma_thresh))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_errcode (XDR *xdrs, rpc_rdma_errcode *objp)
|
||||
{
|
||||
|
||||
if (!xdr_enum (xdrs, (enum_t *) objp))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_errvers (XDR *xdrs, rpc_rdma_errvers *objp)
|
||||
{
|
||||
|
||||
if (!xdr_uint32 (xdrs, &objp->rdma_vers_low))
|
||||
return FALSE;
|
||||
if (!xdr_uint32 (xdrs, &objp->rdma_vers_high))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_error (XDR *xdrs, rpc_rdma_error *objp)
|
||||
{
|
||||
|
||||
if (!xdr_rpc_rdma_errcode (xdrs, &objp->err))
|
||||
return FALSE;
|
||||
switch (objp->err) {
|
||||
case ERR_VERS:
|
||||
if (!xdr_rpc_rdma_errvers (xdrs, &objp->range))
|
||||
return FALSE;
|
||||
break;
|
||||
case ERR_CHUNK:
|
||||
break;
|
||||
default:
|
||||
return FALSE;
|
||||
}
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rdma_proc (XDR *xdrs, rdma_proc *objp)
|
||||
{
|
||||
|
||||
if (!xdr_enum (xdrs, (enum_t *) objp))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rdma_body (XDR *xdrs, rdma_body *objp)
|
||||
{
|
||||
|
||||
if (!xdr_rdma_proc (xdrs, &objp->proc))
|
||||
return FALSE;
|
||||
switch (objp->proc) {
|
||||
case RDMA_MSG:
|
||||
if (!xdr_rpc_rdma_header (xdrs, &objp->rdma_msg))
|
||||
return FALSE;
|
||||
break;
|
||||
case RDMA_NOMSG:
|
||||
if (!xdr_rpc_rdma_header_nomsg (xdrs, &objp->rdma_nomsg))
|
||||
return FALSE;
|
||||
break;
|
||||
case RDMA_MSGP:
|
||||
if (!xdr_rpc_rdma_header_padded (xdrs, &objp->rdma_msgp))
|
||||
return FALSE;
|
||||
break;
|
||||
case RDMA_DONE:
|
||||
break;
|
||||
case RDMA_ERROR:
|
||||
if (!xdr_rpc_rdma_error (xdrs, &objp->rdma_error))
|
||||
return FALSE;
|
||||
break;
|
||||
default:
|
||||
return FALSE;
|
||||
}
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rdma_msg (XDR *xdrs, rdma_msg *objp)
|
||||
{
|
||||
|
||||
if (!xdr_uint32 (xdrs, &objp->rdma_xid))
|
||||
return FALSE;
|
||||
if (!xdr_uint32 (xdrs, &objp->rdma_vers))
|
||||
return FALSE;
|
||||
if (!xdr_uint32 (xdrs, &objp->rdma_credit))
|
||||
return FALSE;
|
||||
if (!xdr_rdma_body (xdrs, &objp->rdma_body))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
|
@ -46,3 +46,4 @@ run_rpcgen() {
|
|||
run_rpcgen nfs
|
||||
run_rpcgen rpc
|
||||
run_rpcgen portmap
|
||||
run_rpcgen rpc_rdma
|
||||
|
|
|
@ -28,6 +28,19 @@
|
|||
// RPC over TCP:
|
||||
//
|
||||
// BE 32bit length, then rpc_msg, then the procedure message itself
|
||||
//
|
||||
// RPC over RDMA:
|
||||
// RFC 8166 - Remote Direct Memory Access Transport for Remote Procedure Call Version 1
|
||||
// RFC 8267 - Network File System (NFS) Upper-Layer Binding to RPC-over-RDMA Version 1
|
||||
// RFC 8797 - Remote Direct Memory Access - Connection Manager (RDMA-CM) Private Data for RPC-over-RDMA Version 1
|
||||
// message is received in an RDMA Receive operation
|
||||
// message: list of read chunks, list of write chunks, optional reply write chunk, then actual RPC body if present
|
||||
// read chunk: BE 32bit position, BE 32bit registered memory key, BE 32bit length, BE 64bit offset
|
||||
// write chunk: BE 32bit registered memory key, BE 32bit length, BE 64bit offset
|
||||
// in reality for NFS 3.0: only 1 read chunk in write3 and symlink3, only 1 write chunk in read3 and readlink3
|
||||
// read chunk is read by the server using RDMA Read from the client memory after receiving RPC request
|
||||
// write chunk is pushed by the server using RDMA Write to the client memory before sending RPC reply
|
||||
// connection is established using RDMA-CM at default port 20049
|
||||
|
||||
#pragma once
|
||||
|
||||
|
@ -106,13 +119,19 @@ inline int xdr_opaque(XDR *xdrs, void *data, uint32_t len)
|
|||
return 1;
|
||||
}
|
||||
|
||||
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
||||
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_chunk = false)
|
||||
{
|
||||
if (xdrs->x_op == XDR_DECODE)
|
||||
{
|
||||
if (xdrs->avail < 4)
|
||||
return 0;
|
||||
uint32_t len = be32toh(*((uint32_t*)xdrs->buf));
|
||||
if (rdma_chunk && xdrs->rdma)
|
||||
{
|
||||
// Skip RDMA chunks while decoding
|
||||
data->size = len;
|
||||
return 1;
|
||||
}
|
||||
uint32_t padded = len_pad4(len);
|
||||
if (xdrs->avail < 4+padded)
|
||||
return 0;
|
||||
|
@ -123,7 +142,8 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
|||
}
|
||||
else
|
||||
{
|
||||
if (data->size < XDR_COPY_LENGTH)
|
||||
// Always encode RDMA chunks as separate iovecs
|
||||
if (data->size < XDR_COPY_LENGTH && (!rdma_chunk || !xdrs->rdma))
|
||||
{
|
||||
unsigned old = xdrs->cur_out.size();
|
||||
xdrs->cur_out.resize(old + 4+data->size);
|
||||
|
@ -146,8 +166,9 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
|||
.iov_len = data->size,
|
||||
});
|
||||
}
|
||||
if (data->size & 3)
|
||||
if ((data->size & 3) && (!rdma_chunk || !xdrs->rdma))
|
||||
{
|
||||
// No padding for RDMA chunks
|
||||
int pad = 4-(data->size & 3);
|
||||
unsigned old = xdrs->cur_out.size();
|
||||
xdrs->cur_out.resize(old+pad);
|
||||
|
@ -158,9 +179,9 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
|||
return 1;
|
||||
}
|
||||
|
||||
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
||||
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_chunk = false)
|
||||
{
|
||||
return xdr_bytes(xdrs, data, maxlen);
|
||||
return xdr_bytes(xdrs, data, maxlen, rdma_chunk);
|
||||
}
|
||||
|
||||
inline int xdr_u_int(XDR *xdrs, void *data)
|
||||
|
|
|
@ -226,6 +226,7 @@ class osd_t
|
|||
void parse_config(bool init);
|
||||
void init_cluster();
|
||||
void on_change_osd_state_hook(osd_num_t peer_osd);
|
||||
void on_change_backfillfull_hook(pool_id_t pool_id);
|
||||
void on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num);
|
||||
void on_change_etcd_state_hook(std::map<std::string, etcd_kv_t> & changes);
|
||||
void on_load_config_hook(json11::Json::object & changes);
|
||||
|
|
|
@ -65,6 +65,7 @@ void osd_t::init_cluster()
|
|||
st_cli.tfd = tfd;
|
||||
st_cli.log_level = log_level;
|
||||
st_cli.on_change_osd_state_hook = [this](osd_num_t peer_osd) { on_change_osd_state_hook(peer_osd); };
|
||||
st_cli.on_change_backfillfull_hook = [this](pool_id_t pool_id) { on_change_backfillfull_hook(pool_id); };
|
||||
st_cli.on_change_pg_history_hook = [this](pool_id_t pool_id, pg_num_t pg_num) { on_change_pg_history_hook(pool_id, pg_num); };
|
||||
st_cli.on_change_hook = [this](std::map<std::string, etcd_kv_t> & changes) { on_change_etcd_state_hook(changes); };
|
||||
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
|
||||
|
@ -414,6 +415,14 @@ void osd_t::on_change_osd_state_hook(osd_num_t peer_osd)
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::on_change_backfillfull_hook(pool_id_t pool_id)
|
||||
{
|
||||
if (!(peering_state & (OSD_RECOVERING | OSD_FLUSHING_PGS)))
|
||||
{
|
||||
peering_state = peering_state | OSD_RECOVERING;
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::on_change_etcd_state_hook(std::map<std::string, etcd_kv_t> & changes)
|
||||
{
|
||||
if (changes.find(st_cli.etcd_prefix+"/config/global") != changes.end())
|
||||
|
|
|
@ -252,10 +252,18 @@ bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
|
|||
auto mask = recovery_last_degraded ? (PG_ACTIVE | PG_HAS_DEGRADED) : (PG_ACTIVE | PG_DEGRADED | PG_HAS_MISPLACED);
|
||||
auto check = recovery_last_degraded ? (PG_ACTIVE | PG_HAS_DEGRADED) : (PG_ACTIVE | PG_HAS_MISPLACED);
|
||||
// Restart scanning from the same PG as the last time
|
||||
restart:
|
||||
for (auto pg_it = pgs.lower_bound(recovery_last_pg); pg_it != pgs.end(); pg_it++)
|
||||
{
|
||||
if ((pg_it->second.state & mask) == check)
|
||||
{
|
||||
auto pool_it = st_cli.pool_config.find(pg_it->first.pool_id);
|
||||
if (pool_it != st_cli.pool_config.end() && pool_it->second.backfillfull)
|
||||
{
|
||||
// Skip the pool
|
||||
recovery_last_pg.pool_id++;
|
||||
goto restart;
|
||||
}
|
||||
auto & src = recovery_last_degraded ? pg_it->second.degraded_objects : pg_it->second.misplaced_objects;
|
||||
assert(src.size() > 0);
|
||||
// Restart scanning from the next object
|
||||
|
|
|
@ -19,9 +19,12 @@ ANTIETCD=1 ./test_etcd_fail.sh
|
|||
|
||||
./test_interrupted_rebalance.sh
|
||||
IMMEDIATE_COMMIT=1 ./test_interrupted_rebalance.sh
|
||||
|
||||
SCHEME=ec ./test_interrupted_rebalance.sh
|
||||
SCHEME=ec IMMEDIATE_COMMIT=1 ./test_interrupted_rebalance.sh
|
||||
|
||||
./test_create_halfhost.sh
|
||||
|
||||
./test_failure_domain.sh
|
||||
|
||||
./test_snapshot.sh
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
#!/bin/bash -ex
|
||||
|
||||
. `dirname $0`/common.sh
|
||||
|
||||
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
|
||||
MON_PID=$!
|
||||
wait_etcd
|
||||
|
||||
TIME=$(date '+%s')
|
||||
$ETCDCTL put /vitastor/config/global '{"placement_levels":{"dc":10,"host":100,"half":105,"osd":110}}'
|
||||
$ETCDCTL put /vitastor/config/node_placement '{
|
||||
"h11":{"level":"half","parent":"host1"},
|
||||
"h12":{"level":"half","parent":"host1"},
|
||||
"h21":{"level":"half","parent":"host2"},
|
||||
"h22":{"level":"half","parent":"host2"},
|
||||
"h31":{"level":"half","parent":"host3"},
|
||||
"h32":{"level":"half","parent":"host3"},
|
||||
"1":{"parent":"h11"},
|
||||
"2":{"parent":"h12"},
|
||||
"3":{"parent":"h21"},
|
||||
"4":{"parent":"h22"},
|
||||
"5":{"parent":"h31"},
|
||||
"6":{"parent":"h32"}
|
||||
}'
|
||||
$ETCDCTL put /vitastor/osd/stats/1 '{"host":"host1","size":1073741824,"time":"'$TIME'"}'
|
||||
$ETCDCTL put /vitastor/osd/stats/2 '{"host":"host1","size":1073741824,"time":"'$TIME'"}'
|
||||
$ETCDCTL put /vitastor/osd/stats/3 '{"host":"host2","size":1073741824,"time":"'$TIME'"}'
|
||||
$ETCDCTL put /vitastor/osd/stats/4 '{"host":"host2","size":1073741824,"time":"'$TIME'"}'
|
||||
$ETCDCTL put /vitastor/osd/stats/5 '{"host":"host3","size":1073741824,"time":"'$TIME'"}'
|
||||
$ETCDCTL put /vitastor/osd/stats/6 '{"host":"host3","size":1073741824,"time":"'$TIME'"}'
|
||||
build/src/cmd/vitastor-cli --etcd_address $ETCD_URL osd-tree
|
||||
# check that it doesn't fail
|
||||
build/src/cmd/vitastor-cli --etcd_address $ETCD_URL create-pool testpool --ec 2+1 -n 32
|
||||
|
||||
format_green OK
|
Loading…
Reference in New Issue