Compare commits

...

3 Commits

Author SHA1 Message Date
Vitaliy Filippov 0fcbf56431 Add an alternative RDMA implementation via RDMA-CM
Required for non-RoCE cards: iWARP and, possibly, Infiniband
2025-03-31 20:39:58 +03:00
Vitaliy Filippov 737c4805a1 Support multiple RDMA networks 2025-03-31 20:39:58 +03:00
Vitaliy Filippov f11db37b17 Support multiple OSD networks and separate OSD cluster network 2025-03-31 20:39:55 +03:00
31 changed files with 1392 additions and 499 deletions

View File

@ -12,6 +12,8 @@ between clients, OSDs and etcd.
- [tcp_header_buffer_size](#tcp_header_buffer_size)
- [use_sync_send_recv](#use_sync_send_recv)
- [use_rdma](#use_rdma)
- [use_rdmacm](#use_rdmacm)
- [disable_tcp](#disable_tcp)
- [rdma_device](#rdma_device)
- [rdma_port_num](#rdma_port_num)
- [rdma_gid_index](#rdma_gid_index)
@ -59,10 +61,33 @@ but may be required for clients with old kernel versions.
- Type: boolean
- Default: true
Try to use RDMA for communication if it's available. Disable if you don't
want Vitastor to use RDMA. TCP-only clients can also talk to an RDMA-enabled
cluster, so disabling RDMA may be needed if clients have RDMA devices,
but they are not connected to the cluster.
Try to use RDMA through libibverbs for communication if it's available.
Disable if you don't want Vitastor to use RDMA. TCP-only clients can also
talk to an RDMA-enabled cluster, so disabling RDMA may be needed if clients
have RDMA devices, but they are not connected to the cluster.
`use_rdma` works with RoCEv1/RoCEv2 networks, but not with iWARP and,
maybe, with some Infiniband configurations which require RDMA-CM.
Consider `use_rdmacm` for such networks.
## use_rdmacm
- Type: boolean
- Default: true
Use an alternative implementation of RDMA through RDMA-CM (Connection
Manager). Works with all RDMA networks: Infiniband, iWARP and
RoCEv1/RoCEv2, and even allows to disable TCP and run only with RDMA.
When enabled, OSDs listen on the same address(es) and port(s) using
TCP and RDMA-CM. `use_rdma` is automatically disabled when `use_rdmacm`
is enabled.
## disable_tcp
- Type: boolean
- Default: true
Fully disable TCP and only use RDMA-CM for OSD communication.
## rdma_device
@ -93,12 +118,13 @@ PFC (Priority Flow Control) and ECN (Explicit Congestion Notification).
## rdma_port_num
- Type: integer
- Default: 1
RDMA device port number to use. Only for devices that have more than 1 port.
See `phys_port_cnt` in `ibv_devinfo -v` output to determine how many ports
your device has.
Not relevant for RDMA-CM (use_rdmacm).
## rdma_gid_index
- Type: integer
@ -114,13 +140,14 @@ GID auto-selection is unsupported with libibverbs < v32.
A correct rdma_gid_index for RoCEv2 is usually 1 (IPv6) or 3 (IPv4).
Not relevant for RDMA-CM (use_rdmacm).
## rdma_mtu
- Type: integer
- Default: 4096
RDMA Path MTU to use. Must be 1024, 2048 or 4096. There is usually no
sense to change it from the default 4096.
RDMA Path MTU to use. Must be 1024, 2048 or 4096. Default is to use the
RDMA device's MTU.
## rdma_max_sge

View File

@ -12,6 +12,8 @@
- [tcp_header_buffer_size](#tcp_header_buffer_size)
- [use_sync_send_recv](#use_sync_send_recv)
- [use_rdma](#use_rdma)
- [use_rdmacm](#use_rdmacm)
- [disable_tcp](#disable_tcp)
- [rdma_device](#rdma_device)
- [rdma_port_num](#rdma_port_num)
- [rdma_gid_index](#rdma_gid_index)
@ -61,11 +63,34 @@ Vitastor содержат 128-байтные заголовки, за котор
- Тип: булево (да/нет)
- Значение по умолчанию: true
Пытаться использовать RDMA для связи при наличии доступных устройств.
Отключите, если вы не хотите, чтобы Vitastor использовал RDMA.
TCP-клиенты также могут работать с RDMA-кластером, так что отключать
RDMA может быть нужно только если у клиентов есть RDMA-устройства,
но они не имеют соединения с кластером Vitastor.
Попробовать использовать RDMA через libibverbs для связи при наличии
доступных устройств. Отключите, если вы не хотите, чтобы Vitastor
использовал RDMA. TCP-клиенты также могут работать с RDMA-кластером,
так что отключать RDMA может быть нужно, только если у клиентов есть
RDMA-устройства, но они не имеют соединения с кластером Vitastor.
`use_rdma` работает с RoCEv1/RoCEv2 сетями, но не работает с iWARP и
может не работать с частью конфигураций Infiniband, требующих RDMA-CM.
Рассмотрите включение `use_rdmacm` для таких сетей.
## use_rdmacm
- Тип: булево (да/нет)
- Значение по умолчанию: true
Использовать альтернативную реализацию RDMA на основе RDMA-CM (Connection
Manager). Работает со всеми типами RDMA-сетей: Infiniband, iWARP и
RoCEv1/RoCEv2, и даже позволяет полностью отключить TCP и работать
только на RDMA. Когда опция включена, OSD слушают один и тот же порт
на одних и тех же адресах через TCP и RDMA-CM. Также при включении
автоматически отключается опция `use_rdma`.
## disable_tcp
- Тип: булево (да/нет)
- Значение по умолчанию: true
Полностью отключить TCP и использовать только RDMA-CM для соединений с OSD.
## rdma_device
@ -97,13 +122,14 @@ Control) и ECN (Explicit Congestion Notification).
## rdma_port_num
- Тип: целое число
- Значение по умолчанию: 1
Номер порта RDMA-устройства, который следует использовать. Имеет смысл
только для устройств, у которых более 1 порта. Чтобы узнать, сколько портов
у вашего адаптера, посмотрите `phys_port_cnt` в выводе команды
`ibv_devinfo -v`.
Опция неприменима к RDMA-CM (use_rdmacm).
## rdma_gid_index
- Тип: целое число
@ -120,13 +146,14 @@ libibverbs < v32.
Правильный rdma_gid_index для RoCEv2, как правило, 1 (IPv6) или 3 (IPv4).
Опция неприменима к RDMA-CM (use_rdmacm).
## rdma_mtu
- Тип: целое число
- Значение по умолчанию: 4096
Максимальная единица передачи (Path MTU) для RDMA. Должно быть равно 1024,
2048 или 4096. Обычно нет смысла менять значение по умолчанию, равное 4096.
2048 или 4096. По умолчанию используется значение MTU RDMA-устройства.
## rdma_max_sge

View File

@ -10,13 +10,14 @@ These parameters only apply to OSDs, are not fixed at the moment of OSD drive
initialization and can be changed - in /etc/vitastor/vitastor.conf or [vitastor-disk update-sb](../usage/disk.en.md#update-sb)
with an OSD restart or, for some of them, even without restarting by updating configuration in etcd.
- [osd_network](#osd_network)
- [osd_cluster_network](#osd_cluster_network)
- [bind_address](#bind_address)
- [bind_port](#bind_port)
- [osd_iothread_count](#osd_iothread_count)
- [etcd_report_interval](#etcd_report_interval)
- [etcd_stats_interval](#etcd_stats_interval)
- [run_primary](#run_primary)
- [osd_network](#osd_network)
- [bind_address](#bind_address)
- [bind_port](#bind_port)
- [autosync_interval](#autosync_interval)
- [autosync_writes](#autosync_writes)
- [recovery_queue_depth](#recovery_queue_depth)
@ -65,6 +66,45 @@ with an OSD restart or, for some of them, even without restarting by updating co
- [min_discard_size](#min_discard_size)
- [allow_net_split](#allow_net_split)
## osd_network
- Type: string or array of strings
Network mask of public OSD network(s) (IPv4 or IPv6). Each OSD listens on all
addresses of UP + RUNNING interfaces matching one of these networks, on the
same port. Port is auto-selected except if [bind_port](#bind_port) is
explicitly specified. Bind address(es) may also be overridden manually by
specifying [bind_address](#bind_address). If OSD networks are not specified
at all, OSD just listens on a wildcard address (0.0.0.0).
## osd_cluster_network
- Type: string or array of strings
Network mask of the separate network (IPv4 or IPv6) to use for OSD
cluster connections. Note that
although it's possible to specify multiple networks here, this does not
mean that OSDs will create multiple listening sockets - they'll only
pick the first matching address of an UP + RUNNING interface. Separate
networks for cluster and client connections are also not implemented, but
they are mostly useless anyway, so it's not a big deal.
## bind_address
- Type: string or array of strings
Instead of the network mask, you can also set OSD listen addresses explicitly
using this parameter. May be useful if you want to start OSDs on interfaces
that are not UP + RUNNING.
## bind_port
- Type: integer
By default, OSDs pick random ports to use for incoming connections
automatically. With this option you can set a specific port for a specific
OSD by hand.
## osd_iothread_count
- Type: integer
@ -107,34 +147,6 @@ debugging purposes. It's possible to implement additional feature for the
monitor which may allow to separate primary and secondary OSDs, but it's
unclear why anyone could need it, so it's not implemented.
## osd_network
- Type: string or array of strings
Network mask of the network (IPv4 or IPv6) to use for OSDs. Note that
although it's possible to specify multiple networks here, this does not
mean that OSDs will create multiple listening sockets - they'll only
pick the first matching address of an UP + RUNNING interface. Separate
networks for cluster and client connections are also not implemented, but
they are mostly useless anyway, so it's not a big deal.
## bind_address
- Type: string
- Default: 0.0.0.0
Instead of the network mask, you can also set OSD listen address explicitly
using this parameter. May be useful if you want to start OSDs on interfaces
that are not UP + RUNNING.
## bind_port
- Type: integer
By default, OSDs pick random ports to use for incoming connections
automatically. With this option you can set a specific port for a specific
OSD by hand.
## autosync_interval
- Type: seconds

View File

@ -11,13 +11,14 @@
момент с перезапуском OSD в /etc/vitastor/vitastor.conf или [vitastor-disk update-sb](../usage/disk.ru.md#update-sb),
а некоторые и без перезапуска, с помощью изменения конфигурации в etcd.
- [osd_network](#osd_network)
- [osd_cluster_network](#osd_cluster_network)
- [bind_address](#bind_address)
- [bind_port](#bind_port)
- [osd_iothread_count](#osd_iothread_count)
- [etcd_report_interval](#etcd_report_interval)
- [etcd_stats_interval](#etcd_stats_interval)
- [run_primary](#run_primary)
- [osd_network](#osd_network)
- [bind_address](#bind_address)
- [bind_port](#bind_port)
- [autosync_interval](#autosync_interval)
- [autosync_writes](#autosync_writes)
- [recovery_queue_depth](#recovery_queue_depth)
@ -66,6 +67,44 @@
- [min_discard_size](#min_discard_size)
- [allow_net_split](#allow_net_split)
## osd_network
- Тип: строка или массив строк
Маска подсети (IPv4 или IPv6) публичной сети или сетей OSD. Каждый OSD слушает
один и тот же порт на всех адресах поднятых (UP + RUNNING) сетевых интерфейсов,
соответствующих одной из указанных сетей. Порт выбирается автоматически, если
только [bind_port](#bind_port) не задан явно. Адреса для подключений можно
также переопределить явно, задав [bind_address](#bind_address). Если сети OSD
не заданы вообще, OSD слушает все адреса (0.0.0.0).
## osd_cluster_network
- Тип: строка или массив строк
Маска подсети (IPv4 или IPv6) для использования для соединений с OSD.
Имейте в виду, что хотя сейчас и можно передать в этот параметр несколько
подсетей, это не означает, что OSD будут создавать несколько слушающих
сокетов - они лишь будут выбирать адрес первого поднятого (состояние UP +
RUNNING), подходящий под заданную маску. Также не реализовано разделение
кластерной и публичной сетей OSD. Правда, от него обычно всё равно довольно
мало толку, так что особенной проблемы в этом нет.
## bind_address
- Тип: строка или массив строк
Этим параметром можно явным образом задать адрес(а), на котором будет ожидать
соединений OSD (вместо использования маски подсети). Может быть полезно,
например, чтобы запускать OSD на неподнятых интерфейсах (не UP + RUNNING).
## bind_port
- Тип: целое число
По умолчанию OSD сами выбирают случайные порты для входящих подключений.
С помощью данной опции вы можете задать порт для отдельного OSD вручную.
## osd_iothread_count
- Тип: целое число
@ -110,34 +149,6 @@ max_etcd_attempts * etcd_quick_timeout.
первичные OSD от вторичных, но пока не понятно, зачем это может кому-то
понадобиться, поэтому это не реализовано.
## osd_network
- Тип: строка или массив строк
Маска подсети (IPv4 или IPv6) для использования для соединений с OSD.
Имейте в виду, что хотя сейчас и можно передать в этот параметр несколько
подсетей, это не означает, что OSD будут создавать несколько слушающих
сокетов - они лишь будут выбирать адрес первого поднятого (состояние UP +
RUNNING), подходящий под заданную маску. Также не реализовано разделение
кластерной и публичной сетей OSD. Правда, от него обычно всё равно довольно
мало толку, так что особенной проблемы в этом нет.
## bind_address
- Тип: строка
- Значение по умолчанию: 0.0.0.0
Этим параметром можно явным образом задать адрес, на котором будет ожидать
соединений OSD (вместо использования маски подсети). Может быть полезно,
например, чтобы запускать OSD на неподнятых интерфейсах (не UP + RUNNING).
## bind_port
- Тип: целое число
По умолчанию OSD сами выбирают случайные порты для входящих подключений.
С помощью данной опции вы можете задать порт для отдельного OSD вручную.
## autosync_interval
- Тип: секунды

View File

@ -34,16 +34,48 @@
type: bool
default: true
info: |
Try to use RDMA for communication if it's available. Disable if you don't
want Vitastor to use RDMA. TCP-only clients can also talk to an RDMA-enabled
cluster, so disabling RDMA may be needed if clients have RDMA devices,
but they are not connected to the cluster.
Try to use RDMA through libibverbs for communication if it's available.
Disable if you don't want Vitastor to use RDMA. TCP-only clients can also
talk to an RDMA-enabled cluster, so disabling RDMA may be needed if clients
have RDMA devices, but they are not connected to the cluster.
`use_rdma` works with RoCEv1/RoCEv2 networks, but not with iWARP and,
maybe, with some Infiniband configurations which require RDMA-CM.
Consider `use_rdmacm` for such networks.
info_ru: |
Пытаться использовать RDMA для связи при наличии доступных устройств.
Отключите, если вы не хотите, чтобы Vitastor использовал RDMA.
TCP-клиенты также могут работать с RDMA-кластером, так что отключать
RDMA может быть нужно только если у клиентов есть RDMA-устройства,
но они не имеют соединения с кластером Vitastor.
Попробовать использовать RDMA через libibverbs для связи при наличии
доступных устройств. Отключите, если вы не хотите, чтобы Vitastor
использовал RDMA. TCP-клиенты также могут работать с RDMA-кластером,
так что отключать RDMA может быть нужно, только если у клиентов есть
RDMA-устройства, но они не имеют соединения с кластером Vitastor.
`use_rdma` работает с RoCEv1/RoCEv2 сетями, но не работает с iWARP и
может не работать с частью конфигураций Infiniband, требующих RDMA-CM.
Рассмотрите включение `use_rdmacm` для таких сетей.
- name: use_rdmacm
type: bool
default: true
info: |
Use an alternative implementation of RDMA through RDMA-CM (Connection
Manager). Works with all RDMA networks: Infiniband, iWARP and
RoCEv1/RoCEv2, and even allows to disable TCP and run only with RDMA.
When enabled, OSDs listen on the same address(es) and port(s) using
TCP and RDMA-CM. `use_rdma` is automatically disabled when `use_rdmacm`
is enabled.
info_ru: |
Использовать альтернативную реализацию RDMA на основе RDMA-CM (Connection
Manager). Работает со всеми типами RDMA-сетей: Infiniband, iWARP и
RoCEv1/RoCEv2, и даже позволяет полностью отключить TCP и работать
только на RDMA. Когда опция включена, OSD слушают один и тот же порт
на одних и тех же адресах через TCP и RDMA-CM. Также при включении
автоматически отключается опция `use_rdma`.
- name: disable_tcp
type: bool
default: true
info: |
Fully disable TCP and only use RDMA-CM for OSD communication.
info_ru: |
Полностью отключить TCP и использовать только RDMA-CM для соединений с OSD.
- name: rdma_device
type: string
info: |
@ -93,16 +125,19 @@
Control) и ECN (Explicit Congestion Notification).
- name: rdma_port_num
type: int
default: 1
info: |
RDMA device port number to use. Only for devices that have more than 1 port.
See `phys_port_cnt` in `ibv_devinfo -v` output to determine how many ports
your device has.
Not relevant for RDMA-CM (use_rdmacm).
info_ru: |
Номер порта RDMA-устройства, который следует использовать. Имеет смысл
только для устройств, у которых более 1 порта. Чтобы узнать, сколько портов
у вашего адаптера, посмотрите `phys_port_cnt` в выводе команды
`ibv_devinfo -v`.
Опция неприменима к RDMA-CM (use_rdmacm).
- name: rdma_gid_index
type: int
info: |
@ -116,6 +151,8 @@
GID auto-selection is unsupported with libibverbs < v32.
A correct rdma_gid_index for RoCEv2 is usually 1 (IPv6) or 3 (IPv4).
Not relevant for RDMA-CM (use_rdmacm).
info_ru: |
Номер глобального идентификатора адреса RDMA-устройства, который следует
использовать. Разным gid_index могут соответствовать разные протоколы связи:
@ -128,15 +165,16 @@
libibverbs < v32.
Правильный rdma_gid_index для RoCEv2, как правило, 1 (IPv6) или 3 (IPv4).
Опция неприменима к RDMA-CM (use_rdmacm).
- name: rdma_mtu
type: int
default: 4096
info: |
RDMA Path MTU to use. Must be 1024, 2048 or 4096. There is usually no
sense to change it from the default 4096.
RDMA Path MTU to use. Must be 1024, 2048 or 4096. Default is to use the
RDMA device's MTU.
info_ru: |
Максимальная единица передачи (Path MTU) для RDMA. Должно быть равно 1024,
2048 или 4096. Обычно нет смысла менять значение по умолчанию, равное 4096.
2048 или 4096. По умолчанию используется значение MTU RDMA-устройства.
- name: rdma_max_sge
type: int
default: 128

View File

@ -1,3 +1,59 @@
- name: osd_network
type: string or array of strings
type_ru: строка или массив строк
info: |
Network mask of public OSD network(s) (IPv4 or IPv6). Each OSD listens on all
addresses of UP + RUNNING interfaces matching one of these networks, on the
same port. Port is auto-selected except if [bind_port](#bind_port) is
explicitly specified. Bind address(es) may also be overridden manually by
specifying [bind_address](#bind_address). If OSD networks are not specified
at all, OSD just listens on a wildcard address (0.0.0.0).
info_ru: |
Маска подсети (IPv4 или IPv6) публичной сети или сетей OSD. Каждый OSD слушает
один и тот же порт на всех адресах поднятых (UP + RUNNING) сетевых интерфейсов,
соответствующих одной из указанных сетей. Порт выбирается автоматически, если
только [bind_port](#bind_port) не задан явно. Адреса для подключений можно
также переопределить явно, задав [bind_address](#bind_address). Если сети OSD
не заданы вообще, OSD слушает все адреса (0.0.0.0).
- name: osd_cluster_network
type: string or array of strings
type_ru: строка или массив строк
info: |
Network mask of the separate network (IPv4 or IPv6) to use for OSD
cluster connections. Note that
although it's possible to specify multiple networks here, this does not
mean that OSDs will create multiple listening sockets - they'll only
pick the first matching address of an UP + RUNNING interface. Separate
networks for cluster and client connections are also not implemented, but
they are mostly useless anyway, so it's not a big deal.
info_ru: |
Маска подсети (IPv4 или IPv6) для использования для соединений с OSD.
Имейте в виду, что хотя сейчас и можно передать в этот параметр несколько
подсетей, это не означает, что OSD будут создавать несколько слушающих
сокетов - они лишь будут выбирать адрес первого поднятого (состояние UP +
RUNNING), подходящий под заданную маску. Также не реализовано разделение
кластерной и публичной сетей OSD. Правда, от него обычно всё равно довольно
мало толку, так что особенной проблемы в этом нет.
- name: bind_address
type: string or array of strings
type_ru: строка или массив строк
info: |
Instead of the network mask, you can also set OSD listen addresses explicitly
using this parameter. May be useful if you want to start OSDs on interfaces
that are not UP + RUNNING.
info_ru: |
Этим параметром можно явным образом задать адрес(а), на котором будет ожидать
соединений OSD (вместо использования маски подсети). Может быть полезно,
например, чтобы запускать OSD на неподнятых интерфейсах (не UP + RUNNING).
- name: bind_port
type: int
info: |
By default, OSDs pick random ports to use for incoming connections
automatically. With this option you can set a specific port for a specific
OSD by hand.
info_ru: |
По умолчанию OSD сами выбирают случайные порты для входящих подключений.
С помощью данной опции вы можете задать порт для отдельного OSD вручную.
- name: osd_iothread_count
type: int
default: 0
@ -56,44 +112,6 @@
реализовать дополнительный режим для монитора, который позволит отделять
первичные OSD от вторичных, но пока не понятно, зачем это может кому-то
понадобиться, поэтому это не реализовано.
- name: osd_network
type: string or array of strings
type_ru: строка или массив строк
info: |
Network mask of the network (IPv4 or IPv6) to use for OSDs. Note that
although it's possible to specify multiple networks here, this does not
mean that OSDs will create multiple listening sockets - they'll only
pick the first matching address of an UP + RUNNING interface. Separate
networks for cluster and client connections are also not implemented, but
they are mostly useless anyway, so it's not a big deal.
info_ru: |
Маска подсети (IPv4 или IPv6) для использования для соединений с OSD.
Имейте в виду, что хотя сейчас и можно передать в этот параметр несколько
подсетей, это не означает, что OSD будут создавать несколько слушающих
сокетов - они лишь будут выбирать адрес первого поднятого (состояние UP +
RUNNING), подходящий под заданную маску. Также не реализовано разделение
кластерной и публичной сетей OSD. Правда, от него обычно всё равно довольно
мало толку, так что особенной проблемы в этом нет.
- name: bind_address
type: string
default: "0.0.0.0"
info: |
Instead of the network mask, you can also set OSD listen address explicitly
using this parameter. May be useful if you want to start OSDs on interfaces
that are not UP + RUNNING.
info_ru: |
Этим параметром можно явным образом задать адрес, на котором будет ожидать
соединений OSD (вместо использования маски подсети). Может быть полезно,
например, чтобы запускать OSD на неподнятых интерфейсах (не UP + RUNNING).
- name: bind_port
type: int
info: |
By default, OSDs pick random ports to use for incoming connections
automatically. With this option you can set a specific port for a specific
OSD by hand.
info_ru: |
По умолчанию OSD сами выбирают случайные порты для входящих подключений.
С помощью данной опции вы можете задать порт для отдельного OSD вручную.
- name: autosync_interval
type: sec
default: 5

View File

@ -16,7 +16,7 @@
designated initializers support from C++20
- CMake
- liburing, jerasure headers and libraries
- ISA-L, libibverbs headers and libraries (optional)
- ISA-L, libibverbs and librdmacm headers and libraries (optional)
- tcmalloc (google-perftools-dev)
## Basic instructions

View File

@ -16,7 +16,7 @@
назначенных инициализаторов (designated initializers) из C++20
- CMake
- Заголовки и библиотеки liburing, jerasure
- Опционально - заголовки и библиотеки ISA-L, libibverbs
- Опционально - заголовки и библиотеки ISA-L, libibverbs, librdmacm
- tcmalloc (google-perftools-dev)
## Базовая инструкция

View File

@ -28,7 +28,7 @@
- Per-OSD and per-image I/O and space usage statistics in etcd
- Snapshots and copy-on-write image clones
- [Write throttling to smooth random write workloads in SSD+HDD configurations](../config/osd.en.md#throttle_small_writes)
- [RDMA/RoCEv2 support via libibverbs](../config/network.en.md#rdma_device)
- RDMA/RoCEv2 support [via libibverbs](../config/network.en.md#use_rdma) or [RDMA-CM](../config/network.en.md#use_rdmacm)
- [Scrubbing](../config/osd.en.md#auto_scrub) (verification of copies)
- [Checksums](../config/layout-osd.en.md#data_csum_type)
- [Client write-back cache](../config/client.en.md#client_enable_writeback)

View File

@ -30,7 +30,7 @@
- Именование инодов через хранение их метаданных в etcd
- Снапшоты и copy-on-write клоны
- [Сглаживание производительности случайной записи в SSD+HDD конфигурациях](../config/osd.ru.md#throttle_small_writes)
- [Поддержка RDMA/RoCEv2 через libibverbs](../config/network.ru.md#rdma_device)
- Поддержка RDMA/RoCEv2 [через libibverbs](../config/network.ru.md#use_rdma) или [RDMA-CM](../config/network.ru.md#use_rdmacm)
- [Фоновая проверка целостности](../config/osd.ru.md#auto_scrub) (сверка копий)
- [Контрольные суммы](../config/layout-osd.ru.md#data_csum_type)
- [Буферизация записи на стороне клиента](../config/client.ru.md#client_enable_writeback)

View File

@ -7,10 +7,14 @@ set(MSGR_RDMA "")
if (IBVERBS_LIBRARIES)
set(MSGR_RDMA "msgr_rdma.cpp")
endif (IBVERBS_LIBRARIES)
set(MSGR_RDMACM "")
if (RDMACM_LIBRARIES)
set(MSGR_RDMACM "msgr_rdmacm.cpp")
endif (RDMACM_LIBRARIES)
add_library(vitastor_common STATIC
../util/epoll_manager.cpp etcd_state_client.cpp messenger.cpp ../util/addr_util.cpp
msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ../util/ringloop.cpp ../../json11/json11.cpp
http_client.cpp osd_ops.cpp pg_states.cpp ../util/timerfd_manager.cpp ../util/str_util.cpp ../util/json_util.cpp ${MSGR_RDMA}
http_client.cpp osd_ops.cpp pg_states.cpp ../util/timerfd_manager.cpp ../util/str_util.cpp ../util/json_util.cpp ${MSGR_RDMA} ${MSGR_RDMACM}
)
target_link_libraries(vitastor_common pthread)
target_compile_options(vitastor_common PUBLIC -fPIC)
@ -28,6 +32,7 @@ target_link_libraries(vitastor_client
vitastor_cli
${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
)
set_target_properties(vitastor_client PROPERTIES VERSION ${VITASTOR_VERSION} SOVERSION 0)
configure_file(vitastor.pc.in vitastor.pc @ONLY)

View File

@ -182,7 +182,7 @@ void etcd_state_client_t::add_etcd_url(std::string addr)
exit(1);
}
if (!local_ips.size())
local_ips = getifaddr_list(std::vector<std::string>(), true);
local_ips = getifaddr_list(std::vector<addr_mask_t>(), true);
std::string check_addr;
int pos = addr.find('/');
int pos2 = addr.find(':');

View File

@ -117,29 +117,53 @@ void msgr_iothread_t::run()
void osd_messenger_t::init()
{
#ifdef WITH_RDMACM
if (use_rdmacm)
{
// RDMA-CM only requires the event channel. All the remaining work is done separately
rdmacm_evch = rdma_create_event_channel();
if (!rdmacm_evch)
{
// ENODEV means that the client doesn't have RDMA devices available
if (errno != ENODEV || log_level > 0)
fprintf(stderr, "Failed to initialize RDMA-CM event channel: %s (code %d)\n", strerror(errno), errno);
}
else
{
fcntl(rdmacm_evch->fd, F_SETFL, fcntl(rdmacm_evch->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdmacm_evch->fd, false, [this](int rdmacm_eventfd, int epoll_events)
{
handle_rdmacm_events();
});
}
}
else
#endif
#ifdef WITH_RDMA
if (use_rdma)
{
rdma_context = msgr_rdma_context_t::create(
osd_networks, rdma_device != "" ? rdma_device.c_str() : NULL,
rdma_contexts = msgr_rdma_context_t::create_all(
osd_num && osd_cluster_network_masks.size() ? osd_cluster_network_masks : osd_network_masks,
rdma_device != "" ? rdma_device.c_str() : NULL,
rdma_port_num, rdma_gid_index, rdma_mtu, rdma_odp, log_level
);
if (!rdma_context)
if (!rdma_contexts.size())
{
if (log_level > 0)
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num);
}
else
{
rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
for (msgr_rdma_context_t* rdma_context: rdma_contexts)
{
handle_rdma_events();
});
handle_rdma_events();
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this, rdma_context](int notify_fd, int epoll_events)
{
handle_rdma_events(rdma_context);
});
handle_rdma_events(rdma_context);
}
}
}
#endif
@ -247,10 +271,19 @@ osd_messenger_t::~osd_messenger_t()
iothreads.clear();
}
#ifdef WITH_RDMA
if (rdma_context)
for (auto rdma_context: rdma_contexts)
{
delete rdma_context;
}
rdma_contexts.clear();
#endif
#ifdef WITH_RDMACM
if (rdmacm_evch)
{
tfd->set_fd_handler(rdmacm_evch->fd, false, NULL);
rdma_destroy_event_channel(rdmacm_evch);
rdmacm_evch = NULL;
}
#endif
}
@ -262,10 +295,14 @@ void osd_messenger_t::parse_config(const json11::Json & config)
// RDMA is on by default in RDMA-enabled builds
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
}
#ifdef WITH_RDMACM
// Use RDMA CM? (required for iWARP and may be useful for IB)
// FIXME: Only parse during start
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
#endif
this->rdma_device = config["rdma_device"].string_value();
this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
if (!this->rdma_port_num)
this->rdma_port_num = 1;
if (!config["rdma_gid_index"].is_null())
this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
@ -282,15 +319,6 @@ void osd_messenger_t::parse_config(const json11::Json & config)
if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
this->rdma_max_msg = 129*1024;
this->rdma_odp = config["rdma_odp"].bool_value();
std::vector<std::string> mask;
if (config["bind_address"].is_string())
mask.push_back(config["bind_address"].string_value());
else if (config["osd_network"].is_string())
mask.push_back(config["osd_network"].string_value());
else
for (auto v: config["osd_network"].array_items())
mask.push_back(v.string_value());
this->osd_networks = mask;
#endif
if (!osd_num)
this->iothread_count = (uint32_t)config["client_iothread_count"].uint64_value();
@ -314,23 +342,87 @@ void osd_messenger_t::parse_config(const json11::Json & config)
if (!this->osd_ping_timeout)
this->osd_ping_timeout = 5;
this->log_level = config["log_level"].uint64_value();
// OSD public & cluster networks
this->osd_networks.clear();
if (config["osd_network"].is_string())
this->osd_networks.push_back(config["osd_network"].string_value());
else
for (auto v: config["osd_network"].array_items())
this->osd_networks.push_back(v.string_value());
this->osd_cluster_networks.clear();
if (config["osd_cluster_network"].is_string())
this->osd_cluster_networks.push_back(config["osd_cluster_network"].string_value());
else
for (auto v: config["osd_cluster_network"].array_items())
this->osd_cluster_networks.push_back(v.string_value());
if (this->osd_cluster_networks.size())
for (auto & net: this->osd_cluster_networks)
for (int i = this->osd_networks.size()-1; i >= 0; i--)
if (this->osd_networks[i] == net)
this->osd_networks.erase(this->osd_networks.begin()+i, this->osd_networks.begin()+i+1);
this->osd_network_masks.clear();
for (auto & netstr: this->osd_networks)
this->osd_network_masks.push_back(cidr_parse(netstr));
this->osd_cluster_network_masks.clear();
for (auto & netstr: this->osd_cluster_networks)
this->osd_cluster_network_masks.push_back(cidr_parse(netstr));
this->all_osd_networks.clear();
this->all_osd_networks.insert(this->all_osd_networks.end(), this->osd_networks.begin(), this->osd_networks.end());
this->all_osd_networks.insert(this->all_osd_networks.end(), this->osd_cluster_networks.begin(), this->osd_cluster_networks.end());
this->all_osd_network_masks.clear();
this->all_osd_network_masks.insert(this->all_osd_network_masks.end(), this->osd_network_masks.begin(), this->osd_network_masks.end());
this->all_osd_network_masks.insert(this->all_osd_network_masks.end(), this->osd_cluster_network_masks.begin(), this->osd_cluster_network_masks.end());
if (!this->osd_networks.size())
{
this->osd_networks = this->osd_cluster_networks;
this->osd_network_masks = this->osd_cluster_network_masks;
}
}
void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
{
if (wanted_peers.find(peer_osd) == wanted_peers.end())
if (wanted_peers[peer_osd].raw_address_list != peer_state["addresses"])
{
wanted_peers[peer_osd] = (osd_wanted_peer_t){
.address_list = peer_state["addresses"],
.port = (int)peer_state["port"].int64_value(),
};
wanted_peers[peer_osd].raw_address_list = peer_state["addresses"];
// We are an OSD -> try to select a cluster address
// We are a client -> try to select a public address
// OSD only has 1 address -> don't try anything, it's pointless
// FIXME: Maybe support optional fallback from cluster to public network?
auto & match_masks = (this->osd_num ? osd_cluster_network_masks : osd_network_masks);
if (peer_state["addresses"].array_items().size() > 1 && match_masks.size())
{
json11::Json::array address_list;
for (auto json_addr: peer_state["addresses"].array_items())
{
struct sockaddr_storage addr;
auto ok = string_to_addr(json_addr.string_value(), false, 0, &addr);
if (ok)
{
bool matches = false;
for (auto & mask: match_masks)
{
if (cidr_sockaddr_match(addr, mask))
{
matches = true;
break;
}
}
if (matches)
address_list.push_back(json_addr);
}
}
if (!address_list.size())
address_list = peer_state["addresses"].array_items();
wanted_peers[peer_osd].address_list = address_list;
}
else
wanted_peers[peer_osd].address_list = peer_state["addresses"];
wanted_peers[peer_osd].address_changed = true;
}
else
{
wanted_peers[peer_osd].address_list = peer_state["addresses"];
wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
}
wanted_peers[peer_osd].address_changed = true;
#ifdef WITH_RDMACM
wanted_peers[peer_osd].peer_rdmacm = peer_state["rdmacm"].bool_value();
#endif
wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
try_connect_peer(peer_osd);
}
@ -355,12 +447,24 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
wp.cur_addr = wp.address_list[wp.address_index].string_value();
wp.cur_port = wp.port;
wp.connecting = true;
try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
#ifdef WITH_RDMACM
if (use_rdmacm && wp.peer_rdmacm)
rdmacm_try_connect_peer(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
else
#endif
try_connect_peer_tcp(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
}
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
void osd_messenger_t::try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port)
{
assert(peer_osd != this->osd_num);
#ifdef WITH_RDMACM
if (disable_tcp)
{
on_connect_peer(peer_osd, -EINVAL);
return;
}
#endif
struct sockaddr_storage addr;
if (!string_to_addr(peer_host, 0, peer_port, &addr))
{
@ -524,20 +628,30 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
},
};
#ifdef WITH_RDMA
if (rdma_context)
if (rdma_contexts.size())
{
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
if (cl->rdma_conn)
// Choose the right context for the selected network
msgr_rdma_context_t *selected_ctx = choose_rdma_context(cl);
if (!selected_ctx)
{
json11::Json payload = json11::Json::object {
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
{ "rdma_max_msg", cl->rdma_conn->max_msg },
};
std::string payload_str = payload.dump();
op->req.show_conf.json_len = payload_str.size();
op->buf = malloc_or_die(payload_str.size());
op->iov.push_back(op->buf, payload_str.size());
memcpy(op->buf, payload_str.c_str(), payload_str.size());
if (log_level > 0)
fprintf(stderr, "No RDMA context for OSD %ju connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
}
else
{
cl->rdma_conn = msgr_rdma_connection_t::create(selected_ctx, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
if (cl->rdma_conn)
{
json11::Json payload = json11::Json::object {
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
{ "rdma_max_msg", cl->rdma_conn->max_msg },
};
std::string payload_str = payload.dump();
op->req.show_conf.json_len = payload_str.size();
op->buf = malloc_or_die(payload_str.size());
op->iov.push_back(op->buf, payload_str.size());
memcpy(op->buf, payload_str.c_str(), payload_str.size());
}
}
}
#endif
@ -667,9 +781,29 @@ void osd_messenger_t::accept_connections(int listen_fd)
}
#ifdef WITH_RDMA
msgr_rdma_context_t* osd_messenger_t::choose_rdma_context(osd_client_t *cl)
{
// Choose the right context for the selected network
msgr_rdma_context_t *selected_ctx = NULL;
for (auto rdma_ctx: rdma_contexts)
{
if (!rdma_ctx->net_mask.family && !selected_ctx ||
rdma_ctx->net_mask.family && cidr_sockaddr_match(cl->peer_addr, rdma_ctx->net_mask))
{
selected_ctx = rdma_ctx;
}
}
return selected_ctx;
}
bool osd_messenger_t::is_rdma_enabled()
{
return rdma_context != NULL;
return rdma_contexts.size() > 0;
}
bool osd_messenger_t::is_use_rdmacm()
{
return use_rdmacm;
}
#endif

View File

@ -16,6 +16,7 @@
#include "json11/json11.hpp"
#include "msgr_op.h"
#include "timerfd_manager.h"
#include "addr_util.h"
#include <ringloop.h>
#define CL_READ_HDR 1
@ -49,10 +50,10 @@ struct osd_client_t
{
int refs = 0;
sockaddr_storage peer_addr;
int peer_port;
sockaddr_storage peer_addr = {};
int peer_port = 0;
int peer_fd = -1;
int peer_state;
int peer_state = 0;
int connect_timeout_id = -1;
int ping_time_remaining = 0;
int idle_time_remaining = 0;
@ -93,13 +94,15 @@ struct osd_client_t
struct osd_wanted_peer_t
{
json11::Json raw_address_list;
json11::Json address_list;
int port;
time_t last_connect_attempt;
bool connecting, address_changed;
int address_index;
bool peer_rdmacm = false;
int port = 0;
time_t last_connect_attempt = 0;
bool connecting = false, address_changed = false;
int address_index = 0;
std::string cur_addr;
int cur_port;
int cur_port = 0;
};
struct osd_op_stats_t
@ -149,6 +152,15 @@ public:
};
#endif
#ifdef WITH_RDMA
struct rdma_event_channel;
struct rdma_cm_id;
struct rdma_cm_event;
struct ibv_context;
struct osd_messenger_t;
struct rdmacm_connecting_t;
#endif
struct osd_messenger_t
{
protected:
@ -165,14 +177,19 @@ protected:
#ifdef WITH_RDMA
bool use_rdma = true;
std::vector<std::string> osd_networks;
bool use_rdmacm = false;
bool disable_tcp = false;
std::string rdma_device;
uint64_t rdma_port_num = 1, rdma_mtu = 0;
uint64_t rdma_port_num = 1;
int rdma_mtu = 0;
int rdma_gid_index = -1;
msgr_rdma_context_t *rdma_context = NULL;
std::vector<msgr_rdma_context_t *> rdma_contexts;
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
uint64_t rdma_max_msg = 0;
bool rdma_odp = false;
rdma_event_channel *rdmacm_evch = NULL;
std::map<rdma_cm_id*, osd_client_t*> rdmacm_connections;
std::map<rdma_cm_id*, rdmacm_connecting_t*> rdmacm_connecting;
#endif
std::vector<msgr_iothread_t*> iothreads;
@ -190,6 +207,12 @@ public:
std::map<int, osd_client_t*> clients;
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
std::map<uint64_t, int> osd_peer_fds;
std::vector<std::string> osd_networks;
std::vector<addr_mask_t> osd_network_masks;
std::vector<std::string> osd_cluster_networks;
std::vector<addr_mask_t> osd_cluster_network_masks;
std::vector<std::string> all_osd_networks;
std::vector<addr_mask_t> all_osd_network_masks;
// op statistics
osd_op_stats_t stats, recovery_stats;
@ -216,13 +239,18 @@ public:
bool is_rdma_enabled();
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
#endif
#ifdef WITH_RDMACM
bool is_use_rdmacm();
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level);
void rdmacm_destroy_listener(rdma_cm_id *listener);
#endif
void inc_op_stats(osd_op_stats_t & stats, uint64_t opcode, timespec & tv_begin, timespec & tv_end, uint64_t len);
void measure_exec(osd_op_t *cur_op);
protected:
void try_connect_peer(uint64_t osd_num);
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
void try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port);
void handle_peer_epoll(int peer_fd, int epoll_events);
void handle_connect_epoll(int peer_fd);
void on_connect_peer(osd_num_t peer_osd, int peer_fd);
@ -247,6 +275,18 @@ protected:
void try_send_rdma_odp(osd_client_t *cl);
void try_send_rdma_nodp(osd_client_t *cl);
bool try_recv_rdma(osd_client_t *cl);
void handle_rdma_events();
void handle_rdma_events(msgr_rdma_context_t *rdma_context);
msgr_rdma_context_t* choose_rdma_context(osd_client_t *cl);
#endif
#ifdef WITH_RDMACM
void handle_rdmacm_events();
msgr_rdma_context_t* rdmacm_get_context(ibv_context *verbs);
msgr_rdma_context_t* rdmacm_create_qp(rdma_cm_id *cmid);
void rdmacm_accept(rdma_cm_event *ev);
void rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port);
void rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res);
void rdmacm_address_resolved(rdma_cm_event *ev);
void rdmacm_route_resolved(rdma_cm_event *ev);
void rdmacm_established(rdma_cm_event *ev);
#endif
};

View File

@ -3,10 +3,35 @@
#include <stdio.h>
#include <stdlib.h>
#include "addr_util.h"
#include "msgr_rdma.h"
#include "messenger.h"
static uint32_t ibv_mtu_to_bytes(ibv_mtu mtu)
{
switch (mtu)
{
case IBV_MTU_256: return 256;
case IBV_MTU_512: return 512;
case IBV_MTU_1024: return 1024;
case IBV_MTU_2048: return 2048;
case IBV_MTU_4096: return 4096;
}
return 4096;
}
static ibv_mtu bytes_to_ibv_mtu(uint32_t mtu)
{
switch (mtu)
{
case 256: return IBV_MTU_256;
case 512: return IBV_MTU_512;
case 1024: return IBV_MTU_1024;
case 2048: return IBV_MTU_2048;
case 4096: return IBV_MTU_4096;
}
return IBV_MTU_4096;
}
std::string msgr_rdma_address_t::to_string()
{
char msg[sizeof "0000:00000000:00000000:00000000000000000000000000000000"];
@ -38,15 +63,22 @@ msgr_rdma_context_t::~msgr_rdma_context_t()
ibv_dereg_mr(mr);
if (pd)
ibv_dealloc_pd(pd);
if (context)
if (context && !is_cm)
ibv_close_device(context);
}
msgr_rdma_connection_t::~msgr_rdma_connection_t()
{
ctx->used_max_cqe -= max_send+max_recv;
if (qp)
ctx->reserve_cqe(-max_send-max_recv);
if (qp && !cmid)
ibv_destroy_qp(qp);
if (cmid)
{
ctx->cm_refs--;
if (cmid->qp)
rdma_destroy_qp(cmid);
rdma_destroy_id(cmid);
}
if (recv_buffers.size())
{
for (auto b: recv_buffers)
@ -77,21 +109,21 @@ static bool is_ipv4_gid(ibv_gid_entry *gidx)
((uint32_t*)gidx->gid.raw)[2] == 0xffff0000);
}
static bool match_gid(ibv_gid_entry *gidx, addr_mask_t *networks, int nnet)
static int match_gid(ibv_gid_entry *gidx, const addr_mask_t *networks, int nnet)
{
if (gidx->gid_type != IBV_GID_TYPE_ROCE_V1 &&
gidx->gid_type != IBV_GID_TYPE_ROCE_V2 ||
((uint64_t*)gidx->gid.raw)[0] == 0 &&
((uint64_t*)gidx->gid.raw)[1] == 0)
{
return false;
return -1;
}
if (is_ipv4_gid(gidx))
{
for (int i = 0; i < nnet; i++)
{
if (networks[i].family == AF_INET && cidr_match(*(in_addr*)(gidx->gid.raw+12), networks[i].ipv4, networks[i].bits))
return true;
return i;
}
}
else
@ -99,119 +131,67 @@ static bool match_gid(ibv_gid_entry *gidx, addr_mask_t *networks, int nnet)
for (int i = 0; i < nnet; i++)
{
if (networks[i].family == AF_INET6 && cidr6_match(*(in6_addr*)gidx->gid.raw, networks[i].ipv6, networks[i].bits))
return true;
return i;
}
}
return false;
return -1;
}
struct matched_dev
{
int dev = -1;
int port = -1;
int gid = -1;
bool rocev2 = false;
};
static void log_rdma_dev_port_gid(ibv_device *dev, int ib_port, int gid_index, ibv_gid_entry & gidx)
static void log_rdma_dev_port_gid(ibv_device *dev, int ib_port, int gid_index, int mtu, ibv_gid_entry & gidx)
{
bool is4 = ((uint64_t*)gidx.gid.raw)[0] == 0 && ((uint32_t*)gidx.gid.raw)[2] == 0xffff0000;
char buf[256];
inet_ntop(is4 ? AF_INET : AF_INET6, is4 ? gidx.gid.raw+12 : gidx.gid.raw, buf, sizeof(buf));
fprintf(
stderr, "Auto-selected RDMA device %s port %d GID %d - ROCEv%d IPv%d %s\n",
stderr, "Selected RDMA device %s port %d GID %d - ROCEv%d IPv%d %s, MTU %d\n",
ibv_get_device_name(dev), ib_port, gid_index,
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 ? 2 : 1, is4 ? 4 : 6, buf
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 ? 2 : 1, is4 ? 4 : 6, buf, mtu
);
}
static matched_dev match_device(ibv_device **dev_list, addr_mask_t *networks, int nnet, int log_level)
static int match_port_gid(const std::vector<addr_mask_t> & osd_network_masks, ibv_context *context,
int port_num, int gid_count, int log_level, ibv_gid_entry *best_gidx, int *net_num)
{
matched_dev best;
ibv_device_attr attr;
ibv_port_attr portinfo;
ibv_gid_entry best_gidx;
int res;
bool have_non_roce = false, have_roce = false;
for (int i = 0; dev_list[i]; ++i)
// Try to find a port with matching address
int best_gid_idx = -1, res = 0;
for (int k = 0; k < gid_count; k++)
{
auto dev = dev_list[i];
ibv_context *context = ibv_open_device(dev_list[i]);
if ((res = ibv_query_device(context, &attr)) != 0)
ibv_gid_entry gidx;
if ((res = ibv_query_gid_ex(context, port_num, k, &gidx, 0)) != 0)
{
fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(dev_list[i]), strerror(res));
goto cleanup;
}
for (int j = 1; j <= attr.phys_port_cnt; j++)
{
// Try to find a port with matching address
if ((res = ibv_query_port(context, j, &portinfo)) != 0)
if (res != ENODATA)
{
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(dev), j, strerror(res));
goto cleanup;
fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(context->device), k, strerror(res));
continue;
}
for (int k = 0; k < portinfo.gid_tbl_len; k++)
else
break;
}
if ((res = match_gid(&gidx, osd_network_masks.data(), osd_network_masks.size())) >= 0)
{
// Prefer RoCEv2
if (best_gid_idx < 0 || best_gidx->gid_type != IBV_GID_TYPE_ROCE_V2 && gidx.gid_type == IBV_GID_TYPE_ROCE_V2)
{
ibv_gid_entry gidx;
if ((res = ibv_query_gid_ex(context, j, k, &gidx, 0)) != 0)
{
if (res != ENODATA)
{
fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(dev), k, strerror(res));
goto cleanup;
}
else
break;
}
if (gidx.gid_type != IBV_GID_TYPE_ROCE_V1 &&
gidx.gid_type != IBV_GID_TYPE_ROCE_V2)
have_non_roce = true;
else
have_roce = true;
if (match_gid(&gidx, networks, nnet))
{
// Prefer RoCEv2
if (!best.rocev2)
{
best.dev = i;
best.port = j;
best.gid = k;
best.rocev2 = (gidx.gid_type == IBV_GID_TYPE_ROCE_V2);
best_gidx = gidx;
}
}
best_gid_idx = k;
*best_gidx = gidx;
*net_num = res;
}
}
cleanup:
ibv_close_device(context);
if (best.rocev2)
{
break;
}
}
if (best.dev >= 0 && log_level > 0)
{
log_rdma_dev_port_gid(dev_list[best.dev], best.port, best.gid, best_gidx);
}
if (best.dev < 0 && have_non_roce && !have_roce)
{
best.dev = -2;
}
return best;
return best_gid_idx;
}
#endif
msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_networks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level)
std::vector<msgr_rdma_context_t*> msgr_rdma_context_t::create_all(const std::vector<addr_mask_t> & osd_network_masks,
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level)
{
int res;
std::vector<msgr_rdma_context_t*> ret;
ibv_device **raw_dev_list = NULL;
ibv_device **dev_list = NULL;
msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
ctx->mtu = mtu;
ibv_device *single_list[2] = {};
timespec tv;
clock_gettime(CLOCK_REALTIME, &tv);
srand48(tv.tv_sec*1000000000 + tv.tv_nsec);
dev_list = ibv_get_device_list(NULL);
raw_dev_list = dev_list = ibv_get_device_list(NULL);
if (!dev_list || !*dev_list)
{
if (errno == -ENOSYS || errno == ENOSYS)
@ -228,121 +208,131 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno));
goto cleanup;
}
if (ib_devname)
if (sel_dev_name)
{
int i;
for (i = 0; dev_list[i]; ++i)
if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname))
if (!strcmp(ibv_get_device_name(dev_list[i]), sel_dev_name))
break;
ctx->dev = dev_list[i];
if (!ctx->dev)
if (!dev_list[i])
{
fprintf(stderr, "RDMA device %s not found\n", ib_devname);
fprintf(stderr, "RDMA device %s not found\n", sel_dev_name);
goto cleanup;
}
}
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
else if (osd_networks.size())
{
std::vector<addr_mask_t> nets;
for (auto & netstr: osd_networks)
{
nets.push_back(cidr_parse(netstr));
}
auto best = match_device(dev_list, nets.data(), nets.size(), log_level);
if (best.dev == -2)
{
best.dev = 0;
if (log_level > 0)
fprintf(stderr, "No RoCE devices found, using first available RDMA device %s\n", ibv_get_device_name(*dev_list));
}
else if (best.dev < 0)
{
if (log_level > 0)
fprintf(stderr, "RDMA device matching osd_network is not found, disabling RDMA\n");
goto cleanup;
}
else
{
ib_port = best.port;
gid_index = best.gid;
}
ctx->dev = dev_list[best.dev];
}
#endif
else
{
ctx->dev = *dev_list;
single_list[0] = dev_list[i];
dev_list = single_list;
}
ctx->context = ibv_open_device(ctx->dev);
if (!ctx->context)
for (int i = 0; dev_list[i]; ++i)
{
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(ctx->dev));
goto cleanup;
}
ctx->ib_port = ib_port;
if ((res = ibv_query_port(ctx->context, ib_port, &ctx->portinfo)) != 0)
{
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(ctx->dev), ib_port, strerror(res));
goto cleanup;
}
ctx->my_lid = ctx->portinfo.lid;
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid)
{
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev));
goto cleanup;
}
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
if (gid_index != -1)
#endif
{
ctx->gid_index = gid_index < 0 ? 0 : gid_index;
if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid))
auto dev = dev_list[i];
ibv_context *context = ibv_open_device(dev);
if (!context)
{
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index);
goto cleanup;
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(dev));
continue;
}
}
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
else
{
// Auto-guess GID
ibv_gid_entry best_gidx;
for (int k = 0; k < ctx->portinfo.gid_tbl_len; k++)
ibv_device_attr attr;
if ((res = ibv_query_device(context, &attr)) != 0)
{
ibv_gid_entry gidx;
if (ibv_query_gid_ex(ctx->context, ib_port, k, &gidx, 0) != 0)
fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(dev), strerror(res));
goto cleanup_dev;
}
if (sel_port_num && sel_port_num > attr.phys_port_cnt)
{
fprintf(stderr, "RDMA device %s port %d does not exist\n", ibv_get_device_name(dev), sel_port_num);
goto cleanup_dev;
}
for (int port_num = (sel_port_num ? sel_port_num : 1); port_num <= (sel_port_num ? sel_port_num : attr.phys_port_cnt); port_num++)
{
ibv_port_attr portinfo;
if ((res = ibv_query_port(context, port_num, &portinfo)) != 0)
{
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), k);
goto cleanup;
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(dev), port_num, strerror(res));
continue;
}
// Skip empty GID
if (((uint64_t*)gidx.gid.raw)[0] == 0 &&
((uint64_t*)gidx.gid.raw)[1] == 0)
if (portinfo.state != IBV_PORT_ACTIVE)
{
continue;
}
// Prefer IPv4 RoCEv2 -> IPv6 RoCEv2 -> IPv4 RoCEv1 -> IPv6 RoCEv1 -> IB
if (gid_index == -1 ||
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 && best_gidx.gid_type != IBV_GID_TYPE_ROCE_V2 ||
gidx.gid_type == IBV_GID_TYPE_ROCE_V1 && best_gidx.gid_type == IBV_GID_TYPE_IB ||
gidx.gid_type == best_gidx.gid_type && is_ipv4_gid(&gidx))
if (sel_gid_index >= (int)portinfo.gid_tbl_len)
{
gid_index = k;
best_gidx = gidx;
fprintf(stderr, "RDMA device %s port %d GID %d does not exist\n", ibv_get_device_name(dev), port_num, sel_gid_index);
continue;
}
uint32_t port_mtu = sel_mtu ? sel_mtu : ibv_mtu_to_bytes(portinfo.active_mtu);
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
if (sel_gid_index < 0)
{
ibv_gid_entry best_gidx;
int net_num = 0;
int best_gid_idx = match_port_gid(osd_network_masks, context, port_num, portinfo.gid_tbl_len, log_level, &best_gidx, &net_num);
if (best_gid_idx >= 0)
{
if (log_level > 0)
log_rdma_dev_port_gid(dev, port_num, best_gid_idx, port_mtu, best_gidx);
auto ctx = msgr_rdma_context_t::create(dev, portinfo, port_num, best_gid_idx, port_mtu, odp, log_level);
if (ctx)
{
ctx->net_mask = osd_network_masks[net_num];
ret.push_back(ctx);
}
}
}
else
#endif
{
int best_gid_idx = sel_gid_index >= 0 ? sel_gid_index : 0;
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
if (log_level > 0)
{
ibv_gid_entry gidx;
ibv_query_gid_ex(context, port_num, best_gid_idx, &gidx, 0);
log_rdma_dev_port_gid(dev, port_num, best_gid_idx, port_mtu, gidx);
}
#endif
auto ctx = msgr_rdma_context_t::create(dev, portinfo, port_num, best_gid_idx, port_mtu, odp, log_level);
if (ctx)
ret.push_back(ctx);
}
}
ctx->gid_index = gid_index = (gid_index == -1 ? 0 : gid_index);
if (log_level > 0)
{
log_rdma_dev_port_gid(ctx->dev, ctx->ib_port, ctx->gid_index, best_gidx);
}
ctx->my_gid = best_gidx.gid;
cleanup_dev:
ibv_close_device(context);
}
cleanup:
if (raw_dev_list)
ibv_free_device_list(raw_dev_list);
return ret;
}
msgr_rdma_context_t *msgr_rdma_context_t::create(ibv_device *dev, ibv_port_attr & portinfo, int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level)
{
msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
ibv_context *context = ibv_open_device(dev);
if (!context)
{
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(dev));
goto cleanup;
}
ctx->mtu = mtu;
ctx->context = context;
ctx->ib_port = ib_port;
ctx->portinfo = portinfo;
ctx->my_lid = ctx->portinfo.lid;
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid)
{
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(dev));
goto cleanup;
}
ctx->gid_index = gid_index;
if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid))
{
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(dev), gid_index);
goto cleanup;
}
#endif
ctx->pd = ibv_alloc_pd(ctx->context);
if (!ctx->pd)
@ -351,18 +341,19 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
goto cleanup;
}
if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx))
{
if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx))
{
fprintf(stderr, "Couldn't query RDMA device for its features\n");
goto cleanup;
}
ctx->odp = odp;
if (ctx->odp &&
(!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
fprintf(stderr, "Couldn't query RDMA device for its features\n");
goto cleanup;
}
ctx->odp = odp;
if (ctx->odp)
{
if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)))
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{
ctx->odp = false;
if (log_level > 0)
@ -395,20 +386,43 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
goto cleanup;
}
if (dev_list)
ibv_free_device_list(dev_list);
return ctx;
cleanup:
if (context)
ibv_close_device(context);
delete ctx;
if (dev_list)
ibv_free_device_list(dev_list);
return NULL;
}
bool msgr_rdma_context_t::reserve_cqe(int n)
{
this->used_max_cqe += n;
if (this->used_max_cqe > this->max_cqe)
{
// Resize CQ
// Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ
int new_max_cqe = this->max_cqe;
while (this->used_max_cqe > new_max_cqe)
{
new_max_cqe *= 2;
}
if (ibv_resize_cq(this->cq, new_max_cqe) != 0)
{
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
return false;
}
this->max_cqe = new_max_cqe;
}
return true;
}
msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send,
uint32_t max_recv, uint32_t max_sge, uint32_t max_msg)
{
if (!ctx->reserve_cqe(max_send+max_recv))
return NULL;
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
max_sge = max_sge > ctx->attrx.orig_attr.max_sge ? ctx->attrx.orig_attr.max_sge : max_sge;
@ -419,25 +433,6 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
conn->max_sge = max_sge;
conn->max_msg = max_msg;
ctx->used_max_cqe += max_send+max_recv;
if (ctx->used_max_cqe > ctx->max_cqe)
{
// Resize CQ
// Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ
int new_max_cqe = ctx->max_cqe;
while (ctx->used_max_cqe > new_max_cqe)
{
new_max_cqe *= 2;
}
if (ibv_resize_cq(ctx->cq, new_max_cqe) != 0)
{
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
delete conn;
return NULL;
}
ctx->max_cqe = new_max_cqe;
}
ibv_qp_init_attr init_attr = {
.send_cq = ctx->cq,
.recv_cq = ctx->cq,
@ -480,25 +475,12 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
return conn;
}
static ibv_mtu mtu_to_ibv_mtu(uint32_t mtu)
{
switch (mtu)
{
case 256: return IBV_MTU_256;
case 512: return IBV_MTU_512;
case 1024: return IBV_MTU_1024;
case 2048: return IBV_MTU_2048;
case 4096: return IBV_MTU_4096;
}
return IBV_MTU_4096;
}
int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest)
{
auto conn = this;
ibv_qp_attr attr = {
.qp_state = IBV_QPS_RTR,
.path_mtu = mtu_to_ibv_mtu(conn->ctx->mtu),
.path_mtu = bytes_to_ibv_mtu(conn->ctx->mtu),
.rq_psn = dest->psn,
.sq_psn = conn->addr.psn,
.dest_qp_num = dest->qpn,
@ -550,7 +532,15 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64
{
client_max_msg = rdma_max_msg;
}
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
auto cl = clients.at(peer_fd);
msgr_rdma_context_t *selected_ctx = choose_rdma_context(cl);
if (!selected_ctx)
{
if (log_level > 0)
fprintf(stderr, "No RDMA context for peer %d, using only TCP\n", cl->peer_fd);
return false;
}
msgr_rdma_connection_t *rdma_conn = msgr_rdma_connection_t::create(selected_ctx, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
if (rdma_conn)
{
int r = rdma_conn->connect(&addr);
@ -588,7 +578,7 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
if (err || bad_wr)
{
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
fprintf(stderr, "RDMA send failed: %s (code %d)\n", strerror(err), err);
exit(1);
}
cl->rdma_conn->cur_send++;
@ -669,9 +659,9 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
// Allocate send ring buffer, if not yet
rc->send_out_size = rc->max_msg*rdma_max_send;
rc->send_out.buf = malloc_or_die(rc->send_out_size);
if (!rdma_context->odp)
if (!rc->ctx->odp)
{
rc->send_out.mr = ibv_reg_mr(rdma_context->pd, rc->send_out.buf, rc->send_out_size, 0);
rc->send_out.mr = ibv_reg_mr(rc->ctx->pd, rc->send_out.buf, rc->send_out_size, 0);
if (!rc->send_out.mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
@ -701,7 +691,7 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
ibv_sge sge = {
.addr = (uintptr_t)dst,
.length = (uint32_t)copied,
.lkey = rdma_context->odp ? rdma_context->mr->lkey : rc->send_out.mr->lkey,
.lkey = rc->ctx->odp ? rc->ctx->mr->lkey : rc->send_out.mr->lkey,
};
try_send_rdma_wr(cl, &sge, 1);
rc->send_sizes.push_back(copied);
@ -711,7 +701,7 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
void osd_messenger_t::try_send_rdma(osd_client_t *cl)
{
if (rdma_context->odp)
if (cl->rdma_conn->ctx->odp)
try_send_rdma_odp(cl);
else
try_send_rdma_nodp(cl);
@ -746,9 +736,9 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
{
msgr_rdma_buf_t b;
b.buf = malloc_or_die(rc->max_msg);
if (!rdma_context->odp)
if (!rc->ctx->odp)
{
b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
b.mr = ibv_reg_mr(rc->ctx->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
if (!b.mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
@ -763,7 +753,7 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
#define RDMA_EVENTS_AT_ONCE 32
void osd_messenger_t::handle_rdma_events()
void osd_messenger_t::handle_rdma_events(msgr_rdma_context_t *rdma_context)
{
// Request next notification
ibv_cq *ev_cq;

View File

@ -2,9 +2,13 @@
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once
#ifdef WITH_RDMACM
#include <rdma/rdma_cma.h>
#endif
#include <infiniband/verbs.h>
#include <string>
#include <vector>
#include "addr_util.h"
struct msgr_rdma_address_t
{
@ -20,7 +24,6 @@ struct msgr_rdma_address_t
struct msgr_rdma_context_t
{
ibv_context *context = NULL;
ibv_device *dev = NULL;
ibv_device_attr_ex attrx;
ibv_pd *pd = NULL;
bool odp = false;
@ -35,8 +38,17 @@ struct msgr_rdma_context_t
uint32_t mtu;
int max_cqe = 0;
int used_max_cqe = 0;
addr_mask_t net_mask = {};
bool is_cm = false;
int cm_refs = 0;
static std::vector<msgr_rdma_context_t*> create_all(const std::vector<addr_mask_t> & osd_network_masks,
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level);
static msgr_rdma_context_t *create(ibv_device *dev, ibv_port_attr & portinfo,
int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
static msgr_rdma_context_t* create_cm(ibv_context *ctx);
bool reserve_cqe(int n);
static msgr_rdma_context_t *create(std::vector<std::string> osd_networks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
~msgr_rdma_context_t();
};
@ -50,11 +62,14 @@ struct msgr_rdma_connection_t
{
msgr_rdma_context_t *ctx = NULL;
ibv_qp *qp = NULL;
#ifdef WITH_RDMACM
rdma_cm_id *cmid = NULL;
#endif
msgr_rdma_address_t addr;
int max_send = 0, max_recv = 0, max_sge = 0;
int cur_send = 0, cur_recv = 0;
uint64_t max_msg = 0;
int cur_send = 0, cur_recv = 0;
int send_pos = 0, send_buf_pos = 0;
int next_recv_buf = 0;
std::vector<msgr_rdma_buf_t> recv_buffers;

525
src/client/msgr_rdmacm.cpp Normal file
View File

@ -0,0 +1,525 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include "msgr_rdma.h"
#include "messenger.h"
struct rdmacm_connecting_t
{
rdma_cm_id *cmid = NULL;
int peer_fd = -1;
osd_num_t peer_osd = 0;
std::string addr;
sockaddr_storage parsed_addr = {};
int peer_port = 0;
int timeout_ms = 0;
int timeout_id = -1;
msgr_rdma_context_t *rdma_context = NULL;
};
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level)
{
sockaddr_storage addr = {};
rdma_cm_id *listener = NULL;
int r = rdma_create_id(rdmacm_evch, &listener, NULL, RDMA_PS_TCP);
if (r != 0)
{
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
goto fail;
}
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
{
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
goto fail;
}
r = rdma_bind_addr(listener, (sockaddr*)&addr);
if (r != 0)
{
fprintf(stderr, "Failed to bind RDMA-CM to %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
goto fail;
}
r = rdma_listen(listener, 128);
if (r != 0)
{
fprintf(stderr, "Failed to listen to RDMA-CM address %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
goto fail;
}
if (bound_port)
{
*bound_port = ntohs(rdma_get_src_port(listener));
}
if (log_level > 0)
{
fprintf(stderr, "Listening to RDMA-CM address %s port %d\n", bind_address.c_str(), *bound_port);
}
return listener;
fail:
rdma_destroy_id(listener);
return NULL;
}
void osd_messenger_t::rdmacm_destroy_listener(rdma_cm_id *listener)
{
rdma_destroy_id(listener);
}
void osd_messenger_t::handle_rdmacm_events()
{
// rdma_destroy_id infinitely waits for pthread_cond if called before all events are acked :-(
std::vector<rdma_cm_event> events_copy;
while (1)
{
rdma_cm_event *ev = NULL;
int r = rdma_get_cm_event(rdmacm_evch, &ev);
if (r != 0)
{
if (errno == EAGAIN || errno == EINTR)
break;
fprintf(stderr, "Failed to get RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
events_copy.push_back(*ev);
r = rdma_ack_cm_event(ev);
if (r != 0)
{
fprintf(stderr, "Failed to ack (free) RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
}
for (auto & evl: events_copy)
{
auto ev = &evl;
if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{
rdmacm_accept(ev);
}
else if (ev->event == RDMA_CM_EVENT_CONNECT_ERROR ||
ev->event == RDMA_CM_EVENT_REJECTED ||
ev->event == RDMA_CM_EVENT_DISCONNECTED ||
ev->event == RDMA_CM_EVENT_DEVICE_REMOVAL)
{
auto event_type_name = ev->event == RDMA_CM_EVENT_CONNECT_ERROR ? "RDMA_CM_EVENT_CONNECT_ERROR" : (
ev->event == RDMA_CM_EVENT_REJECTED ? "RDMA_CM_EVENT_REJECTED" : (
ev->event == RDMA_CM_EVENT_DISCONNECTED ? "RDMA_CM_EVENT_DISCONNECTED" : "RDMA_CM_EVENT_DEVICE_REMOVAL"));
auto cli_it = rdmacm_connections.find(ev->id);
if (cli_it != rdmacm_connections.end())
{
fprintf(stderr, "Received %s event for peer %d, closing connection\n",
event_type_name, cli_it->second->peer_fd);
stop_client(cli_it->second->peer_fd);
}
else if (rdmacm_connecting.find(ev->id) != rdmacm_connecting.end())
{
fprintf(stderr, "Received %s event for RDMA-CM OSD %ju connection\n",
event_type_name, rdmacm_connecting[ev->id]->peer_osd);
rdmacm_established(ev);
}
else
{
fprintf(stderr, "Received %s event for an unknown RDMA-CM connection 0x%jx - ignoring\n",
event_type_name, (uint64_t)ev->id);
}
}
else if (ev->event == RDMA_CM_EVENT_ADDR_RESOLVED || ev->event == RDMA_CM_EVENT_ADDR_ERROR)
{
rdmacm_address_resolved(ev);
}
else if (ev->event == RDMA_CM_EVENT_ROUTE_RESOLVED || ev->event == RDMA_CM_EVENT_ROUTE_ERROR)
{
rdmacm_route_resolved(ev);
}
else if (ev->event == RDMA_CM_EVENT_CONNECT_RESPONSE)
{
// Just OK
}
else if (ev->event == RDMA_CM_EVENT_UNREACHABLE || ev->event == RDMA_CM_EVENT_REJECTED)
{
// Handle error
rdmacm_established(ev);
}
else if (ev->event == RDMA_CM_EVENT_ESTABLISHED)
{
rdmacm_established(ev);
}
else if (ev->event == RDMA_CM_EVENT_ADDR_CHANGE || ev->event == RDMA_CM_EVENT_TIMEWAIT_EXIT)
{
// Do nothing
}
else
{
// Other events are unexpected
fprintf(stderr, "Unexpected RDMA-CM event type: %d\n", ev->event);
}
}
}
msgr_rdma_context_t* msgr_rdma_context_t::create_cm(ibv_context *ctx)
{
auto rdma_context = new msgr_rdma_context_t;
rdma_context->is_cm = true;
rdma_context->context = ctx;
rdma_context->pd = ibv_alloc_pd(ctx);
if (!rdma_context->pd)
{
fprintf(stderr, "Couldn't allocate RDMA protection domain\n");
delete rdma_context;
return NULL;
}
rdma_context->odp = false;
rdma_context->channel = ibv_create_comp_channel(rdma_context->context);
if (!rdma_context->channel)
{
fprintf(stderr, "Couldn't create RDMA completion channel\n");
delete rdma_context;
return NULL;
}
rdma_context->max_cqe = 4096;
rdma_context->cq = ibv_create_cq(rdma_context->context, rdma_context->max_cqe, NULL, rdma_context->channel, 0);
if (!rdma_context->cq)
{
fprintf(stderr, "Couldn't create RDMA completion queue\n");
delete rdma_context;
return NULL;
}
if (ibv_query_device_ex(rdma_context->context, NULL, &rdma_context->attrx))
{
fprintf(stderr, "Couldn't query RDMA device for its features\n");
delete rdma_context;
return NULL;
}
return rdma_context;
}
msgr_rdma_context_t* osd_messenger_t::rdmacm_get_context(ibv_context *verbs)
{
// Find the context by device
// We assume that RDMA_CM ev->id->verbs is always the same for the same device (but PD for example isn't)
msgr_rdma_context_t *rdma_context = NULL;
for (auto ctx: rdma_contexts)
{
if (ctx->context == verbs)
{
rdma_context = ctx;
break;
}
}
if (!rdma_context)
{
// Wrap into a new msgr_rdma_context_t
rdma_context = msgr_rdma_context_t::create_cm(verbs);
if (!rdma_context)
return NULL;
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this, rdma_context](int notify_fd, int epoll_events)
{
handle_rdma_events(rdma_context);
});
handle_rdma_events(rdma_context);
rdma_contexts.push_back(rdma_context);
}
return rdma_context;
}
msgr_rdma_context_t* osd_messenger_t::rdmacm_create_qp(rdma_cm_id *cmid)
{
auto rdma_context = rdmacm_get_context(cmid->verbs);
if (!rdma_context)
{
return NULL;
}
rdma_context->reserve_cqe(rdma_max_send+rdma_max_recv);
auto max_sge = rdma_max_sge > rdma_context->attrx.orig_attr.max_sge
? rdma_context->attrx.orig_attr.max_sge : rdma_max_sge;
ibv_qp_init_attr init_attr = {
.send_cq = rdma_context->cq,
.recv_cq = rdma_context->cq,
.cap = {
.max_send_wr = (uint32_t)rdma_max_send,
.max_recv_wr = (uint32_t)rdma_max_recv,
.max_send_sge = (uint32_t)max_sge,
.max_recv_sge = (uint32_t)max_sge,
},
.qp_type = IBV_QPT_RC,
};
int r = rdma_create_qp(cmid, rdma_context->pd, &init_attr);
if (r != 0)
{
fprintf(stderr, "Failed to create a queue pair via RDMA-CM: %s (code %d)\n", strerror(errno), errno);
rdma_context->reserve_cqe(-rdma_max_send-rdma_max_recv);
return NULL;
}
return rdma_context;
}
void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
{
// Make a fake FD (FIXME: do not use FDs for identifying clients!)
int fake_fd = socket(AF_INET, SOCK_STREAM, 0);
if (fake_fd < 0)
{
fprintf(stderr, "Failed to allocate a fake socket for RDMA-CM client: %s (code %d)\n", strerror(errno), errno);
rdma_destroy_id(ev->id);
return;
}
auto rdma_context = rdmacm_create_qp(ev->id);
if (!rdma_context)
{
rdma_destroy_id(ev->id);
return;
}
// We don't need private_data, RDMA_READ or ATOMIC so use default 1
rdma_conn_param conn_params = {
.responder_resources = 1,
.initiator_depth = 1,
.retry_count = 7,
.rnr_retry_count = 7,
};
if (rdma_accept(ev->id, &conn_params) != 0)
{
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
rdma_context->reserve_cqe(-rdma_max_send-rdma_max_recv);
rdma_destroy_qp(ev->id);
rdma_destroy_id(ev->id);
return;
}
rdma_context->cm_refs++;
// Wrap into a new msgr_rdma_connection_t
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
conn->ctx = rdma_context;
conn->max_send = rdma_max_send;
conn->max_recv = rdma_max_recv;
conn->max_sge = rdma_max_sge > rdma_context->attrx.orig_attr.max_sge
? rdma_context->attrx.orig_attr.max_sge : rdma_max_sge;
conn->max_msg = rdma_max_msg;
conn->cmid = ev->id;
conn->qp = ev->id->qp;
auto cl = new osd_client_t();
cl->peer_fd = fake_fd;
cl->peer_state = PEER_RDMA;
cl->peer_addr = *(sockaddr_storage*)rdma_get_peer_addr(ev->id);
cl->in_buf = malloc_or_die(receive_buffer_size);
cl->rdma_conn = conn;
clients[fake_fd] = cl;
rdmacm_connections[ev->id] = cl;
// Add initial receive request(s)
try_recv_rdma(cl);
fprintf(stderr, "[OSD %ju] new client %d: connection from %s via RDMA-CM\n", this->osd_num, fake_fd,
addr_to_string(cl->peer_addr).c_str());
}
void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
{
auto conn = rdmacm_connecting.at(cmid);
auto addr = conn->addr;
auto peer_port = conn->peer_port;
auto peer_osd = conn->peer_osd;
if (conn->timeout_id >= 0)
tfd->clear_timer(conn->timeout_id);
if (conn->peer_fd >= 0)
close(conn->peer_fd);
if (conn->rdma_context)
conn->rdma_context->reserve_cqe(-rdma_max_send-rdma_max_recv);
if (conn->cmid)
{
if (conn->cmid->qp)
rdma_destroy_qp(conn->cmid);
rdma_destroy_id(conn->cmid);
}
rdmacm_connecting.erase(cmid);
delete conn;
if (!disable_tcp)
{
// Fall back to TCP instead of just reporting the error to on_connect_peer()
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
}
else
{
// TCP is disabled
on_connect_peer(peer_osd, res == 0 ? -EINVAL : (res > 0 ? -res : res));
}
}
void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port)
{
struct sockaddr_storage sa = {};
if (!string_to_addr(addr, false, peer_port, &sa))
{
fprintf(stderr, "Address %s is invalid\n", addr.c_str());
on_connect_peer(peer_osd, -EINVAL);
return;
}
rdma_cm_id *cmid = NULL;
if (rdma_create_id(rdmacm_evch, &cmid, NULL, RDMA_PS_TCP) != 0)
{
int res = -errno;
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d), using TCP\n", strerror(errno), errno);
if (!disable_tcp)
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
else
on_connect_peer(peer_osd, res);
return;
}
// Make a fake FD (FIXME: do not use FDs for identifying clients!)
int fake_fd = socket(AF_INET, SOCK_STREAM, 0);
if (fake_fd < 0)
{
int res = -errno;
rdma_destroy_id(cmid);
// Can't create socket, pointless to try TCP
on_connect_peer(peer_osd, res);
return;
}
auto conn = new rdmacm_connecting_t;
rdmacm_connecting[cmid] = conn;
conn->cmid = cmid;
conn->peer_fd = fake_fd;
conn->peer_osd = peer_osd;
conn->addr = addr;
conn->parsed_addr = sa;
conn->peer_port = peer_port;
conn->timeout_ms = peer_connect_timeout*1000;
conn->timeout_id = -1;
if (peer_connect_timeout > 0)
{
conn->timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, cmid](int timer_id)
{
auto conn = rdmacm_connecting.at(cmid);
conn->timeout_id = -1;
fprintf(stderr, "RDMA-CM connection to %s timed out\n", conn->addr.c_str());
rdmacm_on_connect_peer_error(cmid, -EPIPE);
return;
});
}
if (rdma_resolve_addr(cmid, NULL, (sockaddr*)&conn->parsed_addr, conn->timeout_ms) != 0)
{
auto res = -errno;
// ENODEV means that the client doesn't have an RDMA device for this address
if (res != -ENODEV || log_level > 0)
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", addr.c_str(), strerror(errno), errno);
rdmacm_on_connect_peer_error(cmid, res);
return;
}
}
void osd_messenger_t::rdmacm_address_resolved(rdma_cm_event *ev)
{
auto cmid = ev->id;
auto conn_it = rdmacm_connecting.find(cmid);
if (conn_it == rdmacm_connecting.end())
{
// Silently ignore unknown IDs
return;
}
auto conn = conn_it->second;
if (ev->event != RDMA_CM_EVENT_ADDR_RESOLVED || ev->status != 0)
{
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status);
return;
}
auto rdma_context = rdmacm_create_qp(cmid);
if (!rdma_context)
{
rdmacm_on_connect_peer_error(cmid, -EIO);
return;
}
conn->rdma_context = rdma_context;
if (rdma_resolve_route(cmid, conn->timeout_ms) != 0)
{
int res = -errno;
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(errno), errno);
rdmacm_on_connect_peer_error(cmid, res);
return;
}
}
void osd_messenger_t::rdmacm_route_resolved(rdma_cm_event *ev)
{
auto cmid = ev->id;
auto conn_it = rdmacm_connecting.find(cmid);
if (conn_it == rdmacm_connecting.end())
{
// Silently ignore unknown IDs
return;
}
auto conn = conn_it->second;
if (ev->event != RDMA_CM_EVENT_ROUTE_RESOLVED || ev->status != 0)
{
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status);
return;
}
// We don't need private_data, RDMA_READ or ATOMIC so use default 1
rdma_conn_param conn_params = {
.responder_resources = 1,
.initiator_depth = 1,
.retry_count = 7,
.rnr_retry_count = 7,
};
if (rdma_connect(cmid, &conn_params) != 0)
{
int res = -errno;
fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port, strerror(errno), errno);
rdmacm_on_connect_peer_error(cmid, res);
return;
}
}
void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
{
auto cmid = ev->id;
auto conn_it = rdmacm_connecting.find(cmid);
if (conn_it == rdmacm_connecting.end())
{
// Silently ignore unknown IDs
return;
}
auto conn = conn_it->second;
auto peer_osd = conn->peer_osd;
if (ev->event != RDMA_CM_EVENT_ESTABLISHED || ev->status != 0)
{
fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port,
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status);
return;
}
// Wrap into a new msgr_rdma_connection_t
msgr_rdma_connection_t *rc = new msgr_rdma_connection_t;
rc->ctx = conn->rdma_context;
rc->ctx->cm_refs++;
rc->max_send = rdma_max_send;
rc->max_recv = rdma_max_recv;
rc->max_sge = rdma_max_sge > rc->ctx->attrx.orig_attr.max_sge
? rc->ctx->attrx.orig_attr.max_sge : rdma_max_sge;
rc->max_msg = rdma_max_msg;
rc->cmid = conn->cmid;
rc->qp = conn->cmid->qp;
// And an osd_client_t
auto cl = new osd_client_t();
cl->peer_addr = conn->parsed_addr;
cl->peer_port = conn->peer_port;
cl->peer_fd = conn->peer_fd;
cl->peer_state = PEER_RDMA;
cl->connect_timeout_id = -1;
cl->osd_num = peer_osd;
cl->in_buf = malloc_or_die(receive_buffer_size);
cl->rdma_conn = rc;
clients[conn->peer_fd] = cl;
if (conn->timeout_id >= 0)
tfd->clear_timer(conn->timeout_id);
delete conn;
rdmacm_connecting.erase(cmid);
rdmacm_connections[cmid] = cl;
if (log_level > 0)
fprintf(stderr, "Successfully connected with OSD %ju using RDMA-CM\n", peer_osd);
// Add initial receive request(s)
try_recv_rdma(cl);
osd_peer_fds[peer_osd] = cl->peer_fd;
on_connect_peer(peer_osd, cl->peer_fd);
}

View File

@ -98,5 +98,3 @@ std::string format_lat(uint64_t lat);
std::string format_q(double depth);
bool stupid_glob(const std::string str, const std::string glob);
std::string implode(const std::string & sep, json11::Json array);

View File

@ -7,6 +7,7 @@
#include "epoll_manager.h"
#include "pg_states.h"
#include "str_util.h"
#include "json_util.h"
struct placement_osd_t
{

View File

@ -5,6 +5,7 @@
#include "cluster_client.h"
#include "pg_states.h"
#include "str_util.h"
#include "json_util.h"
struct pg_lister_t
{

View File

@ -10,6 +10,7 @@
#include "epoll_manager.h"
#include "pg_states.h"
#include "str_util.h"
#include "json_util.h"
struct pool_creator_t
{

View File

@ -5,6 +5,7 @@
#include "cli.h"
#include "cluster_client.h"
#include "str_util.h"
#include "json_util.h"
#include "pg_states.h"
// List pools with space statistics
@ -665,19 +666,3 @@ std::function<bool(cli_result_t &)> cli_tool_t::start_pool_ls(json11::Json cfg)
return false;
};
}
std::string implode(const std::string & sep, json11::Json array)
{
if (array.is_number() || array.is_bool() || array.is_string())
{
return array.as_string();
}
std::string res;
bool first = true;
for (auto & item: array.array_items())
{
res += (first ? item.as_string() : sep+item.as_string());
first = false;
}
return res;
}

View File

@ -14,6 +14,7 @@ target_link_libraries(vitastor-osd
Jerasure
${ISAL_LIBRARIES}
${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
)
# osd_rmw_test

View File

@ -120,7 +120,14 @@ osd_t::~osd_t()
delete epmgr;
if (bs)
delete bs;
close(listen_fd);
#ifdef WITH_RDMACM
for (rdma_cm_id *listener: rdmacm_listeners)
msgr.rdmacm_destroy_listener(listener);
rdmacm_listeners.clear();
#endif
for (auto listen_fd: listen_fds)
close(listen_fd);
listen_fds.clear();
free(zero_buffer);
}
@ -162,12 +169,23 @@ void osd_t::parse_config(bool init)
else
immediate_commit = IMMEDIATE_NONE;
// Bind address
bind_address = config["bind_address"].string_value();
if (bind_address == "")
bind_address = "0.0.0.0";
cfg_bind_addresses.clear();
if (config.find("bind_address") != config.end())
{
if (config["bind_address"].is_string())
cfg_bind_addresses.push_back(config["bind_address"].string_value());
else if (config["bind_address"].is_array())
for (auto & addr: config["bind_address"].array_items())
cfg_bind_addresses.push_back(addr.string_value());
}
bind_port = config["bind_port"].uint64_value();
if (bind_port <= 0 || bind_port > 65535)
bind_port = 0;
#ifdef WITH_RDMACM
// Use RDMA CM? (required for iWARP and may be useful for IB)
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
#endif
// OSD configuration
etcd_report_interval = config["etcd_report_interval"].uint64_value();
if (etcd_report_interval <= 0)
@ -322,41 +340,53 @@ void osd_t::parse_config(bool init)
void osd_t::bind_socket()
{
if (config["osd_network"].is_string() ||
config["osd_network"].is_array())
if (cfg_bind_addresses.size())
{
std::vector<std::string> mask;
if (config["osd_network"].is_string())
mask.push_back(config["osd_network"].string_value());
else
for (auto v: config["osd_network"].array_items())
mask.push_back(v.string_value());
auto matched_addrs = getifaddr_list(mask);
if (matched_addrs.size() > 1)
bind_addresses = cfg_bind_addresses;
}
else if (msgr.all_osd_network_masks.size())
{
bind_addresses = getifaddr_list(msgr.all_osd_network_masks);
if (!bind_addresses.size())
{
fprintf(stderr, "More than 1 address matches requested network(s): %s\n", json11::Json(matched_addrs).dump().c_str());
force_stop(1);
}
if (!matched_addrs.size())
{
std::string nets;
for (auto v: mask)
nets += (nets == "" ? v : ","+v);
auto nets = implode(", ", msgr.all_osd_networks);
fprintf(stderr, "Addresses matching osd_network(s) %s not found\n", nets.c_str());
force_stop(1);
}
bind_address = matched_addrs[0];
}
// FIXME Support multiple listening sockets
listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
else
{
msgr.accept_connections(listen_fd);
});
bind_addresses.push_back("0.0.0.0");
}
if (!disable_tcp)
{
for (auto & bind_address: bind_addresses)
{
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
{
msgr.accept_connections(fd);
});
listen_fds.push_back(listen_fd);
}
}
#ifdef WITH_RDMACM
if (use_rdmacm)
{
for (auto & bind_address: bind_addresses)
{
auto listener = msgr.rdmacm_listen(bind_address, listening_port, &listening_port, log_level);
if (listener)
rdmacm_listeners.push_back(listener);
}
if (!rdmacm_listeners.size() && disable_tcp)
{
fprintf(stderr, "Failed to create RDMA-CM listeners, exiting\n");
force_stop(1);
}
}
#endif
}
bool osd_t::shutdown()

View File

@ -107,8 +107,10 @@ class osd_t
bool no_recovery = false;
bool no_scrub = false;
bool allow_net_split = false;
std::string bind_address;
std::vector<std::string> cfg_bind_addresses;
int bind_port, listen_backlog = 128;
bool use_rdmacm = false;
bool disable_tcp = false;
// FIXME: Implement client queue depth limit
int client_queue_depth = 128;
bool allow_test_ops = false;
@ -200,7 +202,11 @@ class osd_t
epoll_manager_t *epmgr = NULL;
int listening_port = 0;
int listen_fd = 0;
std::vector<std::string> bind_addresses;
std::vector<int> listen_fds;
#ifdef WITH_RDMACM
std::vector<rdma_cm_id *> rdmacm_listeners;
#endif
ring_consumer_t consumer;
// op statistics

View File

@ -165,13 +165,17 @@ json11::Json osd_t::get_osd_state()
hostname.resize(strnlen(hostname.data(), hostname.size()));
json11::Json::object st;
st["state"] = "up";
if (bind_address != "0.0.0.0")
st["addresses"] = json11::Json::array { bind_address };
if (bind_addresses.size() != 1 || bind_addresses[0] != "0.0.0.0")
st["addresses"] = bind_addresses;
else
st["addresses"] = getifaddr_list();
st["host"] = std::string(hostname.data(), hostname.size());
st["version"] = VITASTOR_VERSION;
st["port"] = listening_port;
#ifdef WITH_RDMACM
if (rdmacm_listeners.size())
st["rdmacm"] = true;
#endif
st["primary_enabled"] = run_primary;
st["blockstore_enabled"] = bs ? true : false;
return st;

View File

@ -25,6 +25,7 @@ target_link_libraries(stub_uring_osd
vitastor_common
${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
tcmalloc_minimal
)

View File

@ -93,6 +93,13 @@ bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits)
return true;
}
bool cidr_sockaddr_match(const sockaddr_storage &addr, const addr_mask_t &mask)
{
return mask.family == addr.ss_family && (mask.family == AF_INET
? cidr_match(((sockaddr_in*)&addr)->sin_addr, mask.ipv4, mask.bits)
: cidr6_match(((sockaddr_in6*)&addr)->sin6_addr, mask.ipv6, mask.bits));
}
addr_mask_t cidr_parse(std::string mask)
{
unsigned bits = 255;
@ -126,13 +133,11 @@ addr_mask_t cidr_parse(std::string mask)
}
}
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg, bool include_v6)
std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks, bool include_v6)
{
std::vector<addr_mask_t> masks;
for (auto mask: mask_cfg)
for (auto & mask: masks)
{
masks.push_back(cidr_parse(mask));
if (masks[masks.size()-1].family == AF_INET6)
if (mask.family == AF_INET6)
{
// Auto-enable IPv6 addresses
include_v6 = true;

View File

@ -18,5 +18,6 @@ std::string addr_to_string(const sockaddr_storage &addr);
addr_mask_t cidr_parse(std::string mask);
bool cidr_match(const in_addr &address, const in_addr &network, uint8_t bits);
bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits);
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg = std::vector<std::string>(), bool include_v6 = false);
bool cidr_sockaddr_match(const sockaddr_storage &addr, const addr_mask_t &mask);
std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks = std::vector<addr_mask_t>(), bool include_v6 = false);
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);

View File

@ -33,3 +33,19 @@ bool json_is_false(const json11::Json & val)
return !val.bool_value();
return false;
}
std::string implode(const std::string & sep, json11::Json array)
{
if (array.is_number() || array.is_bool() || array.is_string())
{
return array.as_string();
}
std::string res;
bool first = true;
for (auto & item: array.array_items())
{
res += (first ? item.as_string() : sep+item.as_string());
first = false;
}
return res;
}

View File

@ -11,3 +11,4 @@
std::map<std::string, std::string> json_to_string_map(const json11::Json::object & config);
bool json_is_true(const json11::Json & val);
bool json_is_false(const json11::Json & val);
std::string implode(const std::string & sep, json11::Json array);