Simplified NFS proxy based on own NFS/XDR implementation

hugo-docs
Vitaliy Filippov 2022-02-12 01:30:50 +03:00
parent a2189100dd
commit 7c2379d458
34 changed files with 10868 additions and 11 deletions

View File

@ -52,6 +52,7 @@ Vitastor на данный момент находится в статусе п
- Слияние снапшотов (vitastor-cli {snap-rm,flatten,merge})
- Консольный интерфейс для управления образами (vitastor-cli {ls,create,modify})
- Плагин для Proxmox
- Упрощённая NFS-прокси для эмуляции файлового доступа к образам (подходит для VMWare)
## Планы развития
@ -59,7 +60,6 @@ Vitastor на данный момент находится в статусе п
- Другие инструменты администрирования
- Плагины для OpenNebula и других облачных систем
- iSCSI-прокси
- Упрощённый NFS прокси
- Более быстрое переключение при отказах
- Фоновая проверка целостности без контрольных сумм (сверка реплик)
- Контрольные суммы
@ -530,9 +530,48 @@ vitastor-nbd map --etcd_address 10.115.0.10:2379/v3 --image testimg
Для обращения по номеру инода, аналогично другим командам, можно использовать опции
`--pool <POOL> --inode <INODE> --size <SIZE>` вместо `--image testimg`.
### NFS
В Vitastor реализована упрощённая NFS 3.0 прокси для эмуляции файлового доступа к образам.
Это не полноценная файловая система, т.к. метаданные всех файлов (образов) сохраняются
в etcd и всё время хранятся в оперативной памяти - то есть, положить туда много файлов
не получится.
Однако в качестве способа доступа к образам виртуальных машин NFS прокси прекрасно подходит
и позволяет подключить Vitastor, например, к VMWare.
При этом, если вы используете режим immediate_commit=all (для SSD с конденсаторами или HDD
с отключённым кэшем), то NFS-сервер не имеет состояния и вы можете свободно поднять
его в нескольких экземплярах и использовать поверх них сетевой балансировщик нагрузки или
схему с отказоустойчивостью.
Использование vitastor-nfs:
```
vitastor-nfs [--etcd_address ADDR] [ДРУГИЕ ОПЦИИ]
--subdir <DIR> экспортировать "поддиректорию" - образы с префиксом имени <DIR>/ (по умолчанию пусто - экспортировать все образы)
--portmap 0 отключить сервис portmap/rpcbind на порту 111 (по умолчанию включён и требует root привилегий)
--bind <IP> принимать соединения по адресу <IP> (по умолчанию 0.0.0.0 - на всех)
--nfspath <PATH> установить путь NFS-экспорта в <PATH> (по умолчанию /)
--port <PORT> использовать порт <PORT> для NFS-сервисов (по умолчанию 2049)
--pool <POOL> исползовать пул <POOL> для новых образов (обязательно, если пул в кластере не один)
--foreground 1 не уходить в фон после запуска
```
Пример монтирования Vitastor через NFS:
```
vitastor-nfs --etcd_address 192.168.5.10:2379 --portmap 0 --port 2050 --pool testpool
```
```
mount localhost:/ /mnt/ -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp
```
### Kubernetes
У Vitastor есть CSI-плагин для Kubernetes, поддерживающий RWO-тома.
У Vitastor есть CSI-плагин для Kubernetes, поддерживающий RWO, а также блочные RWX, тома.
Для установки возьмите манифесты из директории [csi/deploy/](csi/deploy/), поместите
вашу конфигурацию подключения к Vitastor в [csi/deploy/001-csi-config-map.yaml](001-csi-config-map.yaml),

View File

@ -46,6 +46,7 @@ breaking changes in the future. However, the following is implemented:
- Snapshot merge tool (vitastor-cli {snap-rm,flatten,merge})
- Image management CLI (vitastor-cli {ls,create,modify})
- Proxmox storage plugin
- Simplified NFS proxy for file-based image access emulation (suitable for VMWare)
## Roadmap
@ -53,7 +54,6 @@ breaking changes in the future. However, the following is implemented:
- Other administrative tools
- Plugins for OpenNebula and other cloud systems
- iSCSI proxy
- Simplified NFS proxy
- Faster failover
- Scrubbing without checksums (verification of replicas)
- Checksums
@ -479,9 +479,49 @@ It will output the device name, like /dev/nbd0 which you can then format and mou
Again, you can use `--pool <POOL> --inode <INODE> --size <SIZE>` insteaf of `--image <IMAGE>` if you want.
### NFS
Vitastor has a simplified NFS 3.0 proxy for file-based image access emulation. It's not
suitable as a full-featured file system, at least because all file/image metadata is stored
in etcd and kept in memory all the time - thus you can't put a lot of files in it.
However, NFS proxy is totally fine as a method to provide VM image access and allows to
plug Vitastor into, for example, VMWare. It's important to note that for VMWare it's a much
better access method than iSCSI, because with iSCSI we'd have to put all VM images into one
Vitastor image exported as a LUN to VMWare and formatted with VMFS. VMWare doesn't use VMFS
over NFS.
NFS proxy is stateless if you use immediate_commit=all mode (for SSD with capacitors or
HDDs with disabled cache), so you can run multiple NFS proxies and use a network load
balancer or any failover method you want to in that case.
vitastor-nfs usage:
```
vitastor-nfs [--etcd_address ADDR] [OTHER OPTIONS]
--subdir <DIR> export images prefixed <DIR>/ (default empty - export all images)
--portmap 0 do not listen on port 111 (portmap/rpcbind, requires root)
--bind <IP> bind service to <IP> address (default 0.0.0.0)
--nfspath <PATH> set NFS export path to <PATH> (default is /)
--port <PORT> use port <PORT> for NFS services (default is 2049)
--pool <POOL> use <POOL> as default pool for new files (images)
--foreground 1 stay in foreground, do not daemonize
```
Example start and mount commands:
```
vitastor-nfs --etcd_address 192.168.5.10:2379 --portmap 0 --port 2050 --pool testpool
```
```
mount localhost:/ /mnt/ -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp
```
### Kubernetes
Vitastor has a CSI plugin for Kubernetes which supports RWO volumes.
Vitastor has a CSI plugin for Kubernetes which supports RWO (and block RWX) volumes.
To deploy it, take manifests from [csi/deploy/](csi/deploy/) directory, put your
Vitastor configuration in [csi/deploy/001-csi-config-map.yaml](001-csi-config-map.yaml),

@ -1 +1 @@
Subproject commit 6e201464060ace53db809d65da7b0e2800673f8f
Subproject commit 903ec858bc8ab00fc0fbd44c23f0ab7770772353

View File

@ -2,5 +2,6 @@ usr/bin/vita
usr/bin/vitastor-cli
usr/bin/vitastor-rm
usr/bin/vitastor-nbd
usr/bin/vitastor-nfs
usr/lib/*/libvitastor*.so*
mon/make-osd.sh /usr/lib/vitastor

View File

@ -119,6 +119,7 @@ cp -r mon %buildroot/usr/lib/vitastor
%files -n vitastor-client
%_bindir/vitastor-nbd
%_bindir/vitastor-nfs
%_bindir/vitastor-cli
%_bindir/vitastor-rm
%_bindir/vita

View File

@ -116,6 +116,7 @@ cp -r mon %buildroot/usr/lib/vitastor
%files -n vitastor-client
%_bindir/vitastor-nbd
%_bindir/vitastor-nfs
%_bindir/vitastor-cli
%_bindir/vitastor-rm
%_bindir/vita

View File

@ -164,6 +164,21 @@ target_link_libraries(vitastor-nbd
vitastor_client
)
# vitastor-nfs
add_executable(vitastor-nfs
nfs_proxy.cpp
nfs_conn.cpp
nfs_portmap.cpp
sha256.c
nfs/xdr_impl.cpp
nfs/rpc_xdr.cpp
nfs/portmap_xdr.cpp
nfs/nfs_xdr.cpp
)
target_link_libraries(vitastor-nfs
vitastor_client
)
# vitastor-cli
add_executable(vitastor-cli
cli.cpp

View File

@ -25,7 +25,9 @@ struct image_creator_t
pool_id_t new_pool_id = 0;
std::string new_pool_name;
std::string image_name, new_snap, new_parent;
json11::Json new_meta;
uint64_t size;
bool force_size = false;
pool_id_t old_pool_id = 0;
inode_t new_parent_id = 0;
@ -137,7 +139,7 @@ struct image_creator_t
state = 100;
return;
}
if (!size)
if (!size && !force_size)
{
result = (cli_result_t){ .err = EINVAL, .text = "Image size is missing" };
state = 100;
@ -387,6 +389,7 @@ resume_3:
.size = size,
.parent_id = (new_snap != "" ? INODE_WITH_POOL(old_pool_id, old_id) : new_parent_id),
.readonly = false,
.meta = new_meta,
};
json11::Json::array checks = json11::Json::array {
json11::Json::object {
@ -538,6 +541,11 @@ std::function<bool(cli_result_t &)> cli_tool_t::start_create(json11::Json cfg)
image_creator->image_name = cfg["image"].string_value();
image_creator->new_pool_id = cfg["pool"].uint64_value();
image_creator->new_pool_name = cfg["pool"].string_value();
image_creator->force_size = cfg["force_size"].bool_value();
if (cfg["image_meta"].is_object())
{
image_creator->new_meta = cfg["image-meta"];
}
if (cfg["snapshot"].string_value() != "")
{
image_creator->new_snap = cfg["snapshot"].string_value();
@ -554,7 +562,7 @@ std::function<bool(cli_result_t &)> cli_tool_t::start_create(json11::Json cfg)
return true;
};
}
if (image_creator->size % 4096)
if ((image_creator->size % 4096) && !cfg["force_size"].bool_value())
{
delete image_creator;
return [](cli_result_t & result)

View File

@ -81,14 +81,14 @@ struct image_changer_t
}
if ((!set_readwrite || !cfg.readonly) &&
(!set_readonly || cfg.readonly) &&
(!new_size || cfg.size == new_size) &&
(!new_size && !force_size || cfg.size == new_size) &&
(new_name == "" || new_name == image_name))
{
result = (cli_result_t){ .text = "No change" };
state = 100;
return;
}
if (new_size != 0)
if (new_size != 0 || force_size)
{
if (cfg.size >= new_size)
{

View File

@ -374,6 +374,11 @@ void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes
continue_ops();
}
bool cluster_client_t::get_immediate_commit()
{
return immediate_commit;
}
void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd)
{
if (msgr.wanted_peers.find(peer_osd) != msgr.wanted_peers.end())

View File

@ -118,6 +118,8 @@ public:
bool is_ready();
void on_ready(std::function<void(void)> fn);
bool get_immediate_commit();
static void copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers);
void continue_ops(bool up_retry = false);
inode_list_t *list_inode_start(inode_t inode,

View File

@ -338,9 +338,14 @@ void etcd_state_client_t::start_etcd_watcher()
{
if (data["result"]["created"].bool_value())
{
if (etcd_watches_initialised == 3 && this->log_level > 0)
fprintf(stderr, "Successfully subscribed to etcd at %s\n", selected_etcd_address.c_str());
uint64_t watch_id = data["result"]["watch_id"].uint64_value();
if (watch_id == ETCD_CONFIG_WATCH_ID ||
watch_id == ETCD_PG_STATE_WATCH_ID ||
watch_id == ETCD_PG_HISTORY_WATCH_ID ||
watch_id == ETCD_OSD_STATE_WATCH_ID)
etcd_watches_initialised++;
if (etcd_watches_initialised == 4 && this->log_level > 0)
fprintf(stderr, "Successfully subscribed to etcd at %s\n", selected_etcd_address.c_str());
}
if (data["result"]["canceled"].bool_value())
{
@ -469,6 +474,10 @@ void etcd_state_client_t::start_etcd_watcher()
{ "progress_notify", true },
} }
}).dump());
if (on_start_watcher_hook)
{
on_start_watcher_hook(etcd_watch_ws);
}
if (ws_keepalive_timer < 0)
{
ws_keepalive_timer = tfd->set_timer(etcd_ws_keepalive_interval*1000, true, [this](int)
@ -954,6 +963,10 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
}
if (!value.is_object())
{
if (on_inode_change_hook != NULL)
{
on_inode_change_hook(inode_num, true);
}
this->inode_config.erase(inode_num);
}
else
@ -981,6 +994,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
.size = value["size"].uint64_value(),
.parent_id = parent_inode_num,
.readonly = value["readonly"].bool_value(),
.meta = value["meta"],
.mod_revision = kv.mod_revision,
});
}
@ -1002,6 +1016,10 @@ void etcd_state_client_t::insert_inode_config(const inode_config_t & cfg)
}
}
}
if (on_inode_change_hook != NULL)
{
on_inode_change_hook(cfg.num, false);
}
}
inode_watch_t* etcd_state_client_t::watch_inode(std::string name)
@ -1046,6 +1064,10 @@ json11::Json::object etcd_state_client_t::serialize_inode_cfg(inode_config_t *cf
{
new_cfg["readonly"] = true;
}
if (cfg->meta.is_object())
{
new_cfg["meta"] = cfg->meta;
}
return new_cfg;
}

View File

@ -56,6 +56,8 @@ struct inode_config_t
uint64_t size;
inode_t parent_id;
bool readonly;
// Arbitrary metadata
json11::Json meta;
// Change revision of the metadata in etcd
uint64_t mod_revision;
};
@ -109,6 +111,8 @@ public:
std::function<void(pool_id_t, pg_num_t)> on_change_pg_history_hook;
std::function<void(osd_num_t)> on_change_osd_state_hook;
std::function<void()> on_reload_hook;
std::function<void(inode_t, bool)> on_inode_change_hook;
std::function<void(http_co_t *)> on_start_watcher_hook;
json11::Json::object serialize_inode_cfg(inode_config_t *cfg);
etcd_kv_t parse_etcd_kv(const json11::Json & kv_json);

1690
src/nfs/nfs.h Normal file

File diff suppressed because it is too large Load Diff

1380
src/nfs/nfs.x Normal file

File diff suppressed because it is too large Load Diff

2954
src/nfs/nfs_xdr.cpp Normal file

File diff suppressed because it is too large Load Diff

190
src/nfs/portmap.h Normal file
View File

@ -0,0 +1,190 @@
/*
* Please do not edit this file.
* It was generated using rpcgen.
*/
#ifndef _PORTMAP_H_RPCGEN
#define _PORTMAP_H_RPCGEN
#include "xdr_impl.h"
#ifdef __cplusplus
extern "C" {
#endif
#define PMAP_PORT 111
struct pmap2_mapping {
u_int prog;
u_int vers;
u_int prot;
u_int port;
};
typedef struct pmap2_mapping pmap2_mapping;
struct pmap2_call_args {
u_int prog;
u_int vers;
u_int proc;
xdr_string_t args;
};
typedef struct pmap2_call_args pmap2_call_args;
struct pmap2_call_result {
u_int port;
xdr_string_t res;
};
typedef struct pmap2_call_result pmap2_call_result;
struct pmap2_mapping_list {
pmap2_mapping map;
struct pmap2_mapping_list *next;
};
typedef struct pmap2_mapping_list pmap2_mapping_list;
struct pmap2_dump_result {
struct pmap2_mapping_list *list;
};
typedef struct pmap2_dump_result pmap2_dump_result;
struct pmap3_string_result {
xdr_string_t addr;
};
typedef struct pmap3_string_result pmap3_string_result;
struct pmap3_mapping {
u_int prog;
u_int vers;
xdr_string_t netid;
xdr_string_t addr;
xdr_string_t owner;
};
typedef struct pmap3_mapping pmap3_mapping;
struct pmap3_mapping_list {
pmap3_mapping map;
struct pmap3_mapping_list *next;
};
typedef struct pmap3_mapping_list pmap3_mapping_list;
struct pmap3_dump_result {
struct pmap3_mapping_list *list;
};
typedef struct pmap3_dump_result pmap3_dump_result;
struct pmap3_call_args {
u_int prog;
u_int vers;
u_int proc;
xdr_string_t args;
};
typedef struct pmap3_call_args pmap3_call_args;
struct pmap3_call_result {
u_int port;
xdr_string_t res;
};
typedef struct pmap3_call_result pmap3_call_result;
struct pmap3_netbuf {
u_int maxlen;
xdr_string_t buf;
};
typedef struct pmap3_netbuf pmap3_netbuf;
typedef pmap2_mapping PMAP2SETargs;
typedef pmap2_mapping PMAP2UNSETargs;
typedef pmap2_mapping PMAP2GETPORTargs;
typedef pmap2_call_args PMAP2CALLITargs;
typedef pmap2_call_result PMAP2CALLITres;
typedef pmap2_dump_result PMAP2DUMPres;
typedef pmap3_mapping PMAP3SETargs;
typedef pmap3_mapping PMAP3UNSETargs;
typedef pmap3_mapping PMAP3GETADDRargs;
typedef pmap3_string_result PMAP3GETADDRres;
typedef pmap3_dump_result PMAP3DUMPres;
typedef pmap3_call_result PMAP3CALLITargs;
typedef pmap3_call_result PMAP3CALLITres;
typedef pmap3_netbuf PMAP3UADDR2TADDRres;
typedef pmap3_netbuf PMAP3TADDR2UADDRargs;
typedef pmap3_string_result PMAP3TADDR2UADDRres;
#define PMAP_PROGRAM 100000
#define PMAP_V2 2
#define PMAP2_NULL 0
#define PMAP2_SET 1
#define PMAP2_UNSET 2
#define PMAP2_GETPORT 3
#define PMAP2_DUMP 4
#define PMAP2_CALLIT 5
#define PMAP_V3 3
#define PMAP3_NULL 0
#define PMAP3_SET 1
#define PMAP3_UNSET 2
#define PMAP3_GETADDR 3
#define PMAP3_DUMP 4
#define PMAP3_CALLIT 5
#define PMAP3_GETTIME 6
#define PMAP3_UADDR2TADDR 7
#define PMAP3_TADDR2UADDR 8
/* the xdr functions */
extern bool_t xdr_pmap2_mapping (XDR *, pmap2_mapping*);
extern bool_t xdr_pmap2_call_args (XDR *, pmap2_call_args*);
extern bool_t xdr_pmap2_call_result (XDR *, pmap2_call_result*);
extern bool_t xdr_pmap2_mapping_list (XDR *, pmap2_mapping_list*);
extern bool_t xdr_pmap2_dump_result (XDR *, pmap2_dump_result*);
extern bool_t xdr_pmap3_string_result (XDR *, pmap3_string_result*);
extern bool_t xdr_pmap3_mapping (XDR *, pmap3_mapping*);
extern bool_t xdr_pmap3_mapping_list (XDR *, pmap3_mapping_list*);
extern bool_t xdr_pmap3_dump_result (XDR *, pmap3_dump_result*);
extern bool_t xdr_pmap3_call_args (XDR *, pmap3_call_args*);
extern bool_t xdr_pmap3_call_result (XDR *, pmap3_call_result*);
extern bool_t xdr_pmap3_netbuf (XDR *, pmap3_netbuf*);
extern bool_t xdr_PMAP2SETargs (XDR *, PMAP2SETargs*);
extern bool_t xdr_PMAP2UNSETargs (XDR *, PMAP2UNSETargs*);
extern bool_t xdr_PMAP2GETPORTargs (XDR *, PMAP2GETPORTargs*);
extern bool_t xdr_PMAP2CALLITargs (XDR *, PMAP2CALLITargs*);
extern bool_t xdr_PMAP2CALLITres (XDR *, PMAP2CALLITres*);
extern bool_t xdr_PMAP2DUMPres (XDR *, PMAP2DUMPres*);
extern bool_t xdr_PMAP3SETargs (XDR *, PMAP3SETargs*);
extern bool_t xdr_PMAP3UNSETargs (XDR *, PMAP3UNSETargs*);
extern bool_t xdr_PMAP3GETADDRargs (XDR *, PMAP3GETADDRargs*);
extern bool_t xdr_PMAP3GETADDRres (XDR *, PMAP3GETADDRres*);
extern bool_t xdr_PMAP3DUMPres (XDR *, PMAP3DUMPres*);
extern bool_t xdr_PMAP3CALLITargs (XDR *, PMAP3CALLITargs*);
extern bool_t xdr_PMAP3CALLITres (XDR *, PMAP3CALLITres*);
extern bool_t xdr_PMAP3UADDR2TADDRres (XDR *, PMAP3UADDR2TADDRres*);
extern bool_t xdr_PMAP3TADDR2UADDRargs (XDR *, PMAP3TADDR2UADDRargs*);
extern bool_t xdr_PMAP3TADDR2UADDRres (XDR *, PMAP3TADDR2UADDRres*);
#ifdef __cplusplus
}
#endif
#endif /* !_PORTMAP_H_RPCGEN */

168
src/nfs/portmap.x Normal file
View File

@ -0,0 +1,168 @@
/*
Copyright (c) 2014, Ronnie Sahlberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The views and conclusions contained in the software and documentation are those
of the authors and should not be interpreted as representing official policies,
either expressed or implied, of the FreeBSD Project.
*/
const PMAP_PORT = 111; /* portmapper port number */
struct pmap2_mapping {
unsigned int prog;
unsigned int vers;
unsigned int prot;
unsigned int port;
};
struct pmap2_call_args {
unsigned int prog;
unsigned int vers;
unsigned int proc;
opaque args<>;
};
struct pmap2_call_result {
unsigned int port;
opaque res<>;
};
struct pmap2_mapping_list {
pmap2_mapping map;
pmap2_mapping_list *next;
};
struct pmap2_dump_result {
struct pmap2_mapping_list *list;
};
struct pmap3_string_result {
string addr<>;
};
struct pmap3_mapping {
unsigned int prog;
unsigned int vers;
string netid<>;
string addr<>;
string owner<>;
};
struct pmap3_mapping_list {
pmap3_mapping map;
pmap3_mapping_list *next;
};
struct pmap3_dump_result {
struct pmap3_mapping_list *list;
};
struct pmap3_call_args {
unsigned int prog;
unsigned int vers;
unsigned int proc;
opaque args<>;
};
struct pmap3_call_result {
unsigned int port;
opaque res<>;
};
struct pmap3_netbuf {
unsigned int maxlen;
/* This pretty much contains a sockaddr_storage.
* Beware differences in endianess for ss_family
* and whether or not ss_len exists.
*/
opaque buf<>;
};
typedef pmap2_mapping PMAP2SETargs;
typedef pmap2_mapping PMAP2UNSETargs;
typedef pmap2_mapping PMAP2GETPORTargs;
typedef pmap2_call_args PMAP2CALLITargs;
typedef pmap2_call_result PMAP2CALLITres;
typedef pmap2_dump_result PMAP2DUMPres;
typedef pmap3_mapping PMAP3SETargs;
typedef pmap3_mapping PMAP3UNSETargs;
typedef pmap3_mapping PMAP3GETADDRargs;
typedef pmap3_string_result PMAP3GETADDRres;
typedef pmap3_dump_result PMAP3DUMPres;
typedef pmap3_call_result PMAP3CALLITargs;
typedef pmap3_call_result PMAP3CALLITres;
typedef pmap3_netbuf PMAP3UADDR2TADDRres;
typedef pmap3_netbuf PMAP3TADDR2UADDRargs;
typedef pmap3_string_result PMAP3TADDR2UADDRres;
program PMAP_PROGRAM {
version PMAP_V2 {
void
PMAP2_NULL(void) = 0;
uint32_t
PMAP2_SET(PMAP2SETargs) = 1;
uint32_t
PMAP2_UNSET(PMAP2UNSETargs) = 2;
uint32_t
PMAP2_GETPORT(PMAP2GETPORTargs) = 3;
PMAP2DUMPres
PMAP2_DUMP(void) = 4;
PMAP2CALLITres
PMAP2_CALLIT(PMAP2CALLITargs) = 5;
} = 2;
version PMAP_V3 {
void
PMAP3_NULL(void) = 0;
uint32_t
PMAP3_SET(PMAP3SETargs) = 1;
uint32_t
PMAP3_UNSET(PMAP3UNSETargs) = 2;
PMAP3GETADDRres
PMAP3_GETADDR(PMAP3GETADDRargs) = 3;
PMAP3DUMPres
PMAP3_DUMP(void) = 4;
PMAP3CALLITres
PMAP3_CALLIT(PMAP3CALLITargs) = 5;
uint32_t
PMAP3_GETTIME(void) = 6;
PMAP3UADDR2TADDRres
PMAP3_UADDR2TADDR(string) = 7;
PMAP3TADDR2UADDRres
PMAP3_TADDR2UADDR(PMAP3TADDR2UADDRargs) = 8;
} = 3;
} = 100000;

406
src/nfs/portmap_xdr.cpp Normal file
View File

@ -0,0 +1,406 @@
/*
* Please do not edit this file.
* It was generated using rpcgen.
*/
#include "portmap.h"
#include "xdr_impl_inline.h"
bool_t
xdr_pmap2_mapping (XDR *xdrs, pmap2_mapping *objp)
{
if (xdrs->x_op == XDR_ENCODE) {
if (1) {
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->prot))
return FALSE;
if (!xdr_u_int (xdrs, &objp->port))
return FALSE;
} else {
IXDR_PUT_U_LONG(buf, objp->prog);
IXDR_PUT_U_LONG(buf, objp->vers);
IXDR_PUT_U_LONG(buf, objp->prot);
IXDR_PUT_U_LONG(buf, objp->port);
}
return TRUE;
} else if (xdrs->x_op == XDR_DECODE) {
if (1) {
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->prot))
return FALSE;
if (!xdr_u_int (xdrs, &objp->port))
return FALSE;
} else {
objp->prog = IXDR_GET_U_LONG(buf);
objp->vers = IXDR_GET_U_LONG(buf);
objp->prot = IXDR_GET_U_LONG(buf);
objp->port = IXDR_GET_U_LONG(buf);
}
return TRUE;
}
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->prot))
return FALSE;
if (!xdr_u_int (xdrs, &objp->port))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap2_call_args (XDR *xdrs, pmap2_call_args *objp)
{
if (xdrs->x_op == XDR_ENCODE) {
if (1) {
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->proc))
return FALSE;
} else {
IXDR_PUT_U_LONG(buf, objp->prog);
IXDR_PUT_U_LONG(buf, objp->vers);
IXDR_PUT_U_LONG(buf, objp->proc);
}
if (!xdr_bytes(xdrs, &objp->args, ~0))
return FALSE;
return TRUE;
} else if (xdrs->x_op == XDR_DECODE) {
if (1) {
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->proc))
return FALSE;
} else {
objp->prog = IXDR_GET_U_LONG(buf);
objp->vers = IXDR_GET_U_LONG(buf);
objp->proc = IXDR_GET_U_LONG(buf);
}
if (!xdr_bytes(xdrs, &objp->args, ~0))
return FALSE;
return TRUE;
}
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->proc))
return FALSE;
if (!xdr_bytes(xdrs, &objp->args, ~0))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap2_call_result (XDR *xdrs, pmap2_call_result *objp)
{
if (!xdr_u_int (xdrs, &objp->port))
return FALSE;
if (!xdr_bytes(xdrs, &objp->res, ~0))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap2_mapping_list (XDR *xdrs, pmap2_mapping_list *objp)
{
if (!xdr_pmap2_mapping (xdrs, &objp->map))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (pmap2_mapping_list), (xdrproc_t) xdr_pmap2_mapping_list))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap2_dump_result (XDR *xdrs, pmap2_dump_result *objp)
{
if (!xdr_pointer (xdrs, (char **)&objp->list, sizeof (pmap2_mapping_list), (xdrproc_t) xdr_pmap2_mapping_list))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap3_string_result (XDR *xdrs, pmap3_string_result *objp)
{
if (!xdr_string (xdrs, &objp->addr, ~0))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap3_mapping (XDR *xdrs, pmap3_mapping *objp)
{
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_string (xdrs, &objp->netid, ~0))
return FALSE;
if (!xdr_string (xdrs, &objp->addr, ~0))
return FALSE;
if (!xdr_string (xdrs, &objp->owner, ~0))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap3_mapping_list (XDR *xdrs, pmap3_mapping_list *objp)
{
if (!xdr_pmap3_mapping (xdrs, &objp->map))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (pmap3_mapping_list), (xdrproc_t) xdr_pmap3_mapping_list))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap3_dump_result (XDR *xdrs, pmap3_dump_result *objp)
{
if (!xdr_pointer (xdrs, (char **)&objp->list, sizeof (pmap3_mapping_list), (xdrproc_t) xdr_pmap3_mapping_list))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap3_call_args (XDR *xdrs, pmap3_call_args *objp)
{
if (xdrs->x_op == XDR_ENCODE) {
if (1) {
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->proc))
return FALSE;
} else {
IXDR_PUT_U_LONG(buf, objp->prog);
IXDR_PUT_U_LONG(buf, objp->vers);
IXDR_PUT_U_LONG(buf, objp->proc);
}
if (!xdr_bytes(xdrs, &objp->args, ~0))
return FALSE;
return TRUE;
} else if (xdrs->x_op == XDR_DECODE) {
if (1) {
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->proc))
return FALSE;
} else {
objp->prog = IXDR_GET_U_LONG(buf);
objp->vers = IXDR_GET_U_LONG(buf);
objp->proc = IXDR_GET_U_LONG(buf);
}
if (!xdr_bytes(xdrs, &objp->args, ~0))
return FALSE;
return TRUE;
}
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->proc))
return FALSE;
if (!xdr_bytes(xdrs, &objp->args, ~0))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap3_call_result (XDR *xdrs, pmap3_call_result *objp)
{
if (!xdr_u_int (xdrs, &objp->port))
return FALSE;
if (!xdr_bytes(xdrs, &objp->res, ~0))
return FALSE;
return TRUE;
}
bool_t
xdr_pmap3_netbuf (XDR *xdrs, pmap3_netbuf *objp)
{
if (!xdr_u_int (xdrs, &objp->maxlen))
return FALSE;
if (!xdr_bytes(xdrs, &objp->buf, ~0))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP2SETargs (XDR *xdrs, PMAP2SETargs *objp)
{
if (!xdr_pmap2_mapping (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP2UNSETargs (XDR *xdrs, PMAP2UNSETargs *objp)
{
if (!xdr_pmap2_mapping (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP2GETPORTargs (XDR *xdrs, PMAP2GETPORTargs *objp)
{
if (!xdr_pmap2_mapping (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP2CALLITargs (XDR *xdrs, PMAP2CALLITargs *objp)
{
if (!xdr_pmap2_call_args (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP2CALLITres (XDR *xdrs, PMAP2CALLITres *objp)
{
if (!xdr_pmap2_call_result (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP2DUMPres (XDR *xdrs, PMAP2DUMPres *objp)
{
if (!xdr_pmap2_dump_result (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP3SETargs (XDR *xdrs, PMAP3SETargs *objp)
{
if (!xdr_pmap3_mapping (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP3UNSETargs (XDR *xdrs, PMAP3UNSETargs *objp)
{
if (!xdr_pmap3_mapping (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP3GETADDRargs (XDR *xdrs, PMAP3GETADDRargs *objp)
{
if (!xdr_pmap3_mapping (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP3GETADDRres (XDR *xdrs, PMAP3GETADDRres *objp)
{
if (!xdr_pmap3_string_result (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP3DUMPres (XDR *xdrs, PMAP3DUMPres *objp)
{
if (!xdr_pmap3_dump_result (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP3CALLITargs (XDR *xdrs, PMAP3CALLITargs *objp)
{
if (!xdr_pmap3_call_result (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP3CALLITres (XDR *xdrs, PMAP3CALLITres *objp)
{
if (!xdr_pmap3_call_result (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP3UADDR2TADDRres (XDR *xdrs, PMAP3UADDR2TADDRres *objp)
{
if (!xdr_pmap3_netbuf (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP3TADDR2UADDRargs (XDR *xdrs, PMAP3TADDR2UADDRargs *objp)
{
if (!xdr_pmap3_netbuf (xdrs, objp))
return FALSE;
return TRUE;
}
bool_t
xdr_PMAP3TADDR2UADDRres (XDR *xdrs, PMAP3TADDR2UADDRres *objp)
{
if (!xdr_pmap3_string_result (xdrs, objp))
return FALSE;
return TRUE;
}

160
src/nfs/rpc.h Normal file
View File

@ -0,0 +1,160 @@
/*
* Please do not edit this file.
* It was generated using rpcgen.
*/
#ifndef _RPC_H_RPCGEN
#define _RPC_H_RPCGEN
#include "xdr_impl.h"
#ifdef __cplusplus
extern "C" {
#endif
#define RPC_MSG_VERSION 2
enum rpc_auth_flavor {
RPC_AUTH_NONE = 0,
RPC_AUTH_SYS = 1,
RPC_AUTH_SHORT = 2,
RPC_AUTH_DH = 3,
RPC_RPCSEC_GSS = 6,
};
typedef enum rpc_auth_flavor rpc_auth_flavor;
enum rpc_msg_type {
RPC_CALL = 0,
RPC_REPLY = 1,
};
typedef enum rpc_msg_type rpc_msg_type;
enum rpc_reply_stat {
RPC_MSG_ACCEPTED = 0,
RPC_MSG_DENIED = 1,
};
typedef enum rpc_reply_stat rpc_reply_stat;
enum rpc_accept_stat {
RPC_SUCCESS = 0,
RPC_PROG_UNAVAIL = 1,
RPC_PROG_MISMATCH = 2,
RPC_PROC_UNAVAIL = 3,
RPC_GARBAGE_ARGS = 4,
RPC_SYSTEM_ERR = 5,
};
typedef enum rpc_accept_stat rpc_accept_stat;
enum rpc_reject_stat {
RPC_MISMATCH = 0,
RPC_AUTH_ERROR = 1,
};
typedef enum rpc_reject_stat rpc_reject_stat;
enum rpc_auth_stat {
RPC_AUTH_OK = 0,
RPC_AUTH_BADCRED = 1,
RPC_AUTH_REJECTEDCRED = 2,
RPC_AUTH_BADVERF = 3,
RPC_AUTH_REJECTEDVERF = 4,
RPC_AUTH_TOOWEAK = 5,
RPC_AUTH_INVALIDRESP = 6,
RPC_AUTH_FAILED = 7,
};
typedef enum rpc_auth_stat rpc_auth_stat;
struct rpc_opaque_auth {
rpc_auth_flavor flavor;
xdr_string_t body;
};
typedef struct rpc_opaque_auth rpc_opaque_auth;
struct rpc_call_body {
u_int rpcvers;
u_int prog;
u_int vers;
u_int proc;
rpc_opaque_auth cred;
rpc_opaque_auth verf;
};
typedef struct rpc_call_body rpc_call_body;
struct rpc_mismatch_info {
u_int min_version;
u_int max_version;
};
typedef struct rpc_mismatch_info rpc_mismatch_info;
struct rpc_accepted_reply_body {
rpc_accept_stat stat;
union {
rpc_mismatch_info mismatch_info;
};
};
typedef struct rpc_accepted_reply_body rpc_accepted_reply_body;
struct rpc_accepted_reply {
rpc_opaque_auth verf;
rpc_accepted_reply_body reply_data;
};
typedef struct rpc_accepted_reply rpc_accepted_reply;
struct rpc_rejected_reply {
rpc_reject_stat stat;
union {
rpc_mismatch_info mismatch_info;
rpc_auth_stat auth_stat;
};
};
typedef struct rpc_rejected_reply rpc_rejected_reply;
struct rpc_reply_body {
rpc_reply_stat stat;
union {
rpc_accepted_reply areply;
rpc_rejected_reply rreply;
};
};
typedef struct rpc_reply_body rpc_reply_body;
struct rpc_msg_body {
rpc_msg_type dir;
union {
rpc_call_body cbody;
rpc_reply_body rbody;
};
};
typedef struct rpc_msg_body rpc_msg_body;
struct rpc_msg {
u_int xid;
rpc_msg_body body;
};
typedef struct rpc_msg rpc_msg;
/* the xdr functions */
extern bool_t xdr_rpc_auth_flavor (XDR *, rpc_auth_flavor*);
extern bool_t xdr_rpc_msg_type (XDR *, rpc_msg_type*);
extern bool_t xdr_rpc_reply_stat (XDR *, rpc_reply_stat*);
extern bool_t xdr_rpc_accept_stat (XDR *, rpc_accept_stat*);
extern bool_t xdr_rpc_reject_stat (XDR *, rpc_reject_stat*);
extern bool_t xdr_rpc_auth_stat (XDR *, rpc_auth_stat*);
extern bool_t xdr_rpc_opaque_auth (XDR *, rpc_opaque_auth*);
extern bool_t xdr_rpc_call_body (XDR *, rpc_call_body*);
extern bool_t xdr_rpc_mismatch_info (XDR *, rpc_mismatch_info*);
extern bool_t xdr_rpc_accepted_reply_body (XDR *, rpc_accepted_reply_body*);
extern bool_t xdr_rpc_accepted_reply (XDR *, rpc_accepted_reply*);
extern bool_t xdr_rpc_rejected_reply (XDR *, rpc_rejected_reply*);
extern bool_t xdr_rpc_reply_body (XDR *, rpc_reply_body*);
extern bool_t xdr_rpc_msg_body (XDR *, rpc_msg_body*);
extern bool_t xdr_rpc_msg (XDR *, rpc_msg*);
#ifdef __cplusplus
}
#endif
#endif /* !_RPC_H_RPCGEN */

113
src/nfs/rpc.x Normal file
View File

@ -0,0 +1,113 @@
/* Based on RFC 5531 - RPC: Remote Procedure Call Protocol Specification Version 2 */
const RPC_MSG_VERSION = 2;
enum rpc_auth_flavor {
RPC_AUTH_NONE = 0,
RPC_AUTH_SYS = 1,
RPC_AUTH_SHORT = 2,
RPC_AUTH_DH = 3,
RPC_RPCSEC_GSS = 6
};
enum rpc_msg_type {
RPC_CALL = 0,
RPC_REPLY = 1
};
enum rpc_reply_stat {
RPC_MSG_ACCEPTED = 0,
RPC_MSG_DENIED = 1
};
enum rpc_accept_stat {
RPC_SUCCESS = 0,
RPC_PROG_UNAVAIL = 1,
RPC_PROG_MISMATCH = 2,
RPC_PROC_UNAVAIL = 3,
RPC_GARBAGE_ARGS = 4,
RPC_SYSTEM_ERR = 5
};
enum rpc_reject_stat {
RPC_MISMATCH = 0,
RPC_AUTH_ERROR = 1
};
enum rpc_auth_stat {
RPC_AUTH_OK = 0,
/*
* failed at remote end
*/
RPC_AUTH_BADCRED = 1, /* bogus credentials (seal broken) */
RPC_AUTH_REJECTEDCRED = 2, /* client should begin new session */
RPC_AUTH_BADVERF = 3, /* bogus verifier (seal broken) */
RPC_AUTH_REJECTEDVERF = 4, /* verifier expired or was replayed */
RPC_AUTH_TOOWEAK = 5, /* rejected due to security reasons */
/*
* failed locally
*/
RPC_AUTH_INVALIDRESP = 6, /* bogus response verifier */
RPC_AUTH_FAILED = 7 /* some unknown reason */
};
struct rpc_opaque_auth {
rpc_auth_flavor flavor;
opaque body<400>;
};
struct rpc_call_body {
u_int rpcvers;
u_int prog;
u_int vers;
u_int proc;
rpc_opaque_auth cred;
rpc_opaque_auth verf;
/* procedure-specific parameters start here */
};
struct rpc_mismatch_info {
unsigned int min_version;
unsigned int max_version;
};
union rpc_accepted_reply_body switch (rpc_accept_stat stat) {
case RPC_SUCCESS:
void;
/* procedure-specific results start here */
case RPC_PROG_MISMATCH:
rpc_mismatch_info mismatch_info;
default:
void;
};
struct rpc_accepted_reply {
rpc_opaque_auth verf;
rpc_accepted_reply_body reply_data;
};
union rpc_rejected_reply switch (rpc_reject_stat stat) {
case RPC_MISMATCH:
rpc_mismatch_info mismatch_info;
case RPC_AUTH_ERROR:
rpc_auth_stat auth_stat;
};
union rpc_reply_body switch (rpc_reply_stat stat) {
case RPC_MSG_ACCEPTED:
rpc_accepted_reply areply;
case RPC_MSG_DENIED:
rpc_rejected_reply rreply;
};
union rpc_msg_body switch (rpc_msg_type dir) {
case RPC_CALL:
rpc_call_body cbody;
case RPC_REPLY:
rpc_reply_body rbody;
};
struct rpc_msg {
u_int xid;
rpc_msg_body body;
};

43
src/nfs/rpc_impl.h Normal file
View File

@ -0,0 +1,43 @@
#pragma once
#include "rpc.h"
struct rpc_op_t;
// Handler should return 1 if the request is processed asynchronously
// and requires the incoming message to not be freed until processing ends,
// 0 otherwise.
typedef int (*rpc_handler_t)(void *opaque, rpc_op_t *rop);
struct rpc_service_proc_t
{
uint32_t prog;
uint32_t vers;
uint32_t proc;
rpc_handler_t handler_fn;
xdrproc_t req_fn;
uint32_t req_size;
xdrproc_t resp_fn;
uint32_t resp_size;
void *opaque;
};
inline bool operator < (const rpc_service_proc_t & a, const rpc_service_proc_t & b)
{
return a.prog < b.prog || a.prog == b.prog && (a.vers < b.vers || a.vers == b.vers && a.proc < b.proc);
}
struct rpc_op_t
{
void *client;
uint8_t *buffer;
XDR *xdrs;
rpc_msg in_msg, out_msg;
void *request;
void *reply;
xdrproc_t reply_fn;
uint32_t reply_marker;
bool referenced;
};
void rpc_queue_reply(rpc_op_t *rop);

253
src/nfs/rpc_xdr.cpp Normal file
View File

@ -0,0 +1,253 @@
/*
* Please do not edit this file.
* It was generated using rpcgen.
*/
#include "rpc.h"
#include "xdr_impl_inline.h"
bool_t
xdr_rpc_auth_flavor (XDR *xdrs, rpc_auth_flavor *objp)
{
if (!xdr_enum (xdrs, (enum_t *) objp))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_msg_type (XDR *xdrs, rpc_msg_type *objp)
{
if (!xdr_enum (xdrs, (enum_t *) objp))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_reply_stat (XDR *xdrs, rpc_reply_stat *objp)
{
if (!xdr_enum (xdrs, (enum_t *) objp))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_accept_stat (XDR *xdrs, rpc_accept_stat *objp)
{
if (!xdr_enum (xdrs, (enum_t *) objp))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_reject_stat (XDR *xdrs, rpc_reject_stat *objp)
{
if (!xdr_enum (xdrs, (enum_t *) objp))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_auth_stat (XDR *xdrs, rpc_auth_stat *objp)
{
if (!xdr_enum (xdrs, (enum_t *) objp))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_opaque_auth (XDR *xdrs, rpc_opaque_auth *objp)
{
if (!xdr_rpc_auth_flavor (xdrs, &objp->flavor))
return FALSE;
if (!xdr_bytes(xdrs, &objp->body, 400))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_call_body (XDR *xdrs, rpc_call_body *objp)
{
if (xdrs->x_op == XDR_ENCODE) {
if (1) {
if (!xdr_u_int (xdrs, &objp->rpcvers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->proc))
return FALSE;
} else {
IXDR_PUT_U_LONG(buf, objp->rpcvers);
IXDR_PUT_U_LONG(buf, objp->prog);
IXDR_PUT_U_LONG(buf, objp->vers);
IXDR_PUT_U_LONG(buf, objp->proc);
}
if (!xdr_rpc_opaque_auth (xdrs, &objp->cred))
return FALSE;
if (!xdr_rpc_opaque_auth (xdrs, &objp->verf))
return FALSE;
return TRUE;
} else if (xdrs->x_op == XDR_DECODE) {
if (1) {
if (!xdr_u_int (xdrs, &objp->rpcvers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->proc))
return FALSE;
} else {
objp->rpcvers = IXDR_GET_U_LONG(buf);
objp->prog = IXDR_GET_U_LONG(buf);
objp->vers = IXDR_GET_U_LONG(buf);
objp->proc = IXDR_GET_U_LONG(buf);
}
if (!xdr_rpc_opaque_auth (xdrs, &objp->cred))
return FALSE;
if (!xdr_rpc_opaque_auth (xdrs, &objp->verf))
return FALSE;
return TRUE;
}
if (!xdr_u_int (xdrs, &objp->rpcvers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->prog))
return FALSE;
if (!xdr_u_int (xdrs, &objp->vers))
return FALSE;
if (!xdr_u_int (xdrs, &objp->proc))
return FALSE;
if (!xdr_rpc_opaque_auth (xdrs, &objp->cred))
return FALSE;
if (!xdr_rpc_opaque_auth (xdrs, &objp->verf))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_mismatch_info (XDR *xdrs, rpc_mismatch_info *objp)
{
if (!xdr_u_int (xdrs, &objp->min_version))
return FALSE;
if (!xdr_u_int (xdrs, &objp->max_version))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_accepted_reply_body (XDR *xdrs, rpc_accepted_reply_body *objp)
{
if (!xdr_rpc_accept_stat (xdrs, &objp->stat))
return FALSE;
switch (objp->stat) {
case RPC_SUCCESS:
break;
case RPC_PROG_MISMATCH:
if (!xdr_rpc_mismatch_info (xdrs, &objp->mismatch_info))
return FALSE;
break;
default:
break;
}
return TRUE;
}
bool_t
xdr_rpc_accepted_reply (XDR *xdrs, rpc_accepted_reply *objp)
{
if (!xdr_rpc_opaque_auth (xdrs, &objp->verf))
return FALSE;
if (!xdr_rpc_accepted_reply_body (xdrs, &objp->reply_data))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rejected_reply (XDR *xdrs, rpc_rejected_reply *objp)
{
if (!xdr_rpc_reject_stat (xdrs, &objp->stat))
return FALSE;
switch (objp->stat) {
case RPC_MISMATCH:
if (!xdr_rpc_mismatch_info (xdrs, &objp->mismatch_info))
return FALSE;
break;
case RPC_AUTH_ERROR:
if (!xdr_rpc_auth_stat (xdrs, &objp->auth_stat))
return FALSE;
break;
default:
return FALSE;
}
return TRUE;
}
bool_t
xdr_rpc_reply_body (XDR *xdrs, rpc_reply_body *objp)
{
if (!xdr_rpc_reply_stat (xdrs, &objp->stat))
return FALSE;
switch (objp->stat) {
case RPC_MSG_ACCEPTED:
if (!xdr_rpc_accepted_reply (xdrs, &objp->areply))
return FALSE;
break;
case RPC_MSG_DENIED:
if (!xdr_rpc_rejected_reply (xdrs, &objp->rreply))
return FALSE;
break;
default:
return FALSE;
}
return TRUE;
}
bool_t
xdr_rpc_msg_body (XDR *xdrs, rpc_msg_body *objp)
{
if (!xdr_rpc_msg_type (xdrs, &objp->dir))
return FALSE;
switch (objp->dir) {
case RPC_CALL:
if (!xdr_rpc_call_body (xdrs, &objp->cbody))
return FALSE;
break;
case RPC_REPLY:
if (!xdr_rpc_reply_body (xdrs, &objp->rbody))
return FALSE;
break;
default:
return FALSE;
}
return TRUE;
}
bool_t
xdr_rpc_msg (XDR *xdrs, rpc_msg *objp)
{
if (!xdr_u_int (xdrs, &objp->xid))
return FALSE;
if (!xdr_rpc_msg_body (xdrs, &objp->body))
return FALSE;
return TRUE;
}

48
src/nfs/run-rpcgen.sh Executable file
View File

@ -0,0 +1,48 @@
#!/bin/bash
set -e
# 1) remove all extern non-xdr functions (service, client)
# 2) use xdr_string_t for strings instead of char*
# 3) remove K&R #ifdefs
# 4) remove register int32_t* buf
# 5) remove union names
# 6) use xdr_string_t for opaques instead of u_int + char*
# 7) TODO: generate normal procedure stubs
run_rpcgen() {
rpcgen -h $1.x | \
perl -e '
{ local $/ = undef; $_ = <>; }
s/^extern(?!.*"C"|.*bool_t xdr.*XDR).*\n//gm;
s/#include <rpc\/rpc.h>/#include "xdr_impl.h"/;
s/^typedef char \*/typedef xdr_string_t /gm;
s/^(\s*)char \*(?!.*_val)/$1xdr_string_t /gm;
# remove union names
s/ \w+_u;/;/gs;
# use xdr_string_t for opaques
s/struct\s*\{\s*u_int\s+\w+_len;\s*char\s+\*\w+_val;\s*\}\s*/xdr_string_t /gs;
# remove stdc/k&r
s/^#if.*__STDC__.*//gm;
s/\n#else[^\n]*K&R.*?\n#endif[^\n]*K&R[^\n]*//gs;
print;' > $1.h
rpcgen -c $1.x | \
perl -pe '
s/register int32_t \*buf;\s*//g;
s/\bbuf\s*=[^;]+;\s*//g;
s/\bbuf\s*==\s*NULL/1/g;
# remove union names
s/(\.|->)\w+_u\./$1/g;
# use xdr_string_t for opaques
# xdr_bytes(xdrs, (char**)&objp->data.data_val, (char**)&objp->data.data_len, 400)
# -> xdr_bytes(xdrs, &objp->data, 400)
# xdr_bytes(xdrs, (char**)&objp->data_val, (char**)&objp->data_len, 400)
# -> xdr_bytes(xdrs, objp, 400)
s/xdr_bytes\s*\(\s*xdrs,\s*\(\s*char\s*\*\*\s*\)\s*([^()]+?)\.\w+_val\s*,\s*\(\s*u_int\s*\*\s*\)\s*\1\.\w+_len,/xdr_bytes(xdrs, $1,/gs;
s/xdr_bytes\s*\(\s*xdrs,\s*\(\s*char\s*\*\*\s*\)\s*&\s*([^()]+?)->\w+_val\s*,\s*\(\s*u_int\s*\*\s*\)\s*&\s*\1->\w+_len,/xdr_bytes(xdrs, $1,/gs;
# add include
if (/#include/) { $_ .= "#include \"xdr_impl_inline.h\"\n"; }' > ${1}_xdr.cpp
}
run_rpcgen nfs
run_rpcgen rpc
run_rpcgen portmap

107
src/nfs/xdr_impl.cpp Normal file
View File

@ -0,0 +1,107 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Efficient XDR implementation almost compatible with rpcgen (see run-rpcgen.sh)
#include "xdr_impl_inline.h"
XDR* xdr_create()
{
return new XDR;
}
void xdr_destroy(XDR* xdrs)
{
xdr_reset(xdrs);
delete xdrs;
}
void xdr_reset(XDR *xdrs)
{
for (auto buf: xdrs->allocs)
{
free(buf);
}
xdrs->buf = NULL;
xdrs->avail = 0;
xdrs->allocs.resize(0);
xdrs->in_linked_list.resize(0);
xdrs->cur_out.resize(0);
xdrs->last_end = 0;
xdrs->buf_list.resize(0);
}
int xdr_decode(XDR *xdrs, void *buf, unsigned size, xdrproc_t fn, void *data)
{
xdrs->x_op = XDR_DECODE;
xdrs->buf = (uint8_t*)buf;
xdrs->avail = size;
return fn(xdrs, data);
}
int xdr_encode(XDR *xdrs, xdrproc_t fn, void *data)
{
xdrs->x_op = XDR_ENCODE;
return fn(xdrs, data);
}
void xdr_encode_finish(XDR *xdrs, iovec **iov_list, unsigned *iov_count)
{
if (xdrs->last_end < xdrs->cur_out.size())
{
xdrs->buf_list.push_back((iovec){
.iov_base = 0,
.iov_len = xdrs->cur_out.size() - xdrs->last_end,
});
xdrs->last_end = xdrs->cur_out.size();
}
uint8_t *cur_buf = xdrs->cur_out.data();
for (auto & buf: xdrs->buf_list)
{
if (!buf.iov_base)
{
buf.iov_base = cur_buf;
cur_buf += buf.iov_len;
}
}
*iov_list = xdrs->buf_list.data();
*iov_count = xdrs->buf_list.size();
}
void xdr_dump_encoded(XDR *xdrs)
{
for (auto & buf: xdrs->buf_list)
{
for (int i = 0; i < buf.iov_len; i++)
printf("%02x", ((uint8_t*)buf.iov_base)[i]);
}
printf("\n");
}
void xdr_add_malloc(XDR *xdrs, void *buf)
{
xdrs->allocs.push_back(buf);
}
xdr_string_t xdr_copy_string(XDR *xdrs, const std::string & str)
{
char *cp = (char*)malloc_or_die(str.size()+1);
memcpy(cp, str.data(), str.size());
cp[str.size()] = 0;
xdr_add_malloc(xdrs, cp);
return (xdr_string_t){ str.size(), cp };
}
xdr_string_t xdr_copy_string(XDR *xdrs, const char *str)
{
return xdr_copy_string(xdrs, str, strlen(str));
}
xdr_string_t xdr_copy_string(XDR *xdrs, const char *str, size_t len)
{
char *cp = (char*)malloc_or_die(len+1);
memcpy(cp, str, len);
cp[len] = 0;
xdr_add_malloc(xdrs, cp);
return (xdr_string_t){ len, cp };
}

83
src/nfs/xdr_impl.h Normal file
View File

@ -0,0 +1,83 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Efficient XDR implementation almost compatible with rpcgen (see run-rpcgen.sh)
#pragma once
#include <sys/uio.h>
#include <stdint.h>
#include <string>
#define XDR_COPY_LENGTH 128
struct xdr_string_t
{
size_t size;
char *data;
operator std::string()
{
return std::string(data, size);
}
bool operator == (const char *str)
{
if (!str)
return false;
int i;
for (i = 0; i < size; i++)
if (!str[i] || str[i] != data[i])
return false;
if (str[i])
return false;
return true;
}
bool operator != (const char *str)
{
return !(*this == str);
}
};
typedef uint32_t u_int;
typedef uint32_t enum_t;
typedef uint32_t bool_t;
struct XDR;
typedef int (*xdrproc_t)(XDR *xdrs, void *data);
// Create an empty XDR object
XDR* xdr_create();
// Destroy the XDR object
void xdr_destroy(XDR* xdrs);
// Free resources from any previous xdr_decode/xdr_encode calls
void xdr_reset(XDR *xdrs);
// Try to decode <size> bytes from buffer <buf> using <fn>
// Result may contain memory allocations that will be valid until the next call to xdr_{reset,destroy,decode,encode}
int xdr_decode(XDR *xdrs, void *buf, unsigned size, xdrproc_t fn, void *data);
// Try to encode <data> using <fn>
// May be mixed with xdr_decode
// May be called multiple times to encode multiple parts of the same message
int xdr_encode(XDR *xdrs, xdrproc_t fn, void *data);
// Get the result of previous xdr_encodes as a list of <struct iovec>'s
// in <iov_list> (start) and <iov_count> (count).
// The resulting iov_list is valid until the next call to xdr_{reset,destroy}.
// It may contain references to the original data, so original data must not
// be freed until the result is fully processed (sent).
void xdr_encode_finish(XDR *xdrs, iovec **iov_list, unsigned *iov_count);
// Remember an allocated buffer to free it later on xdr_reset() or xdr_destroy()
void xdr_add_malloc(XDR *xdrs, void *buf);
xdr_string_t xdr_copy_string(XDR *xdrs, const std::string & str);
xdr_string_t xdr_copy_string(XDR *xdrs, const char *str);
xdr_string_t xdr_copy_string(XDR *xdrs, const char *str, size_t len);
void xdr_dump_encoded(XDR *xdrs);

309
src/nfs/xdr_impl_inline.h Normal file
View File

@ -0,0 +1,309 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Efficient XDR implementation almost compatible with rpcgen (see run-rpcgen.sh)
// XDR in a nutshell:
//
// int: big endian 32bit
// unsigned: BE 32bit
// enum: BE 32bit
// bool: BE 32bit 0/1
// hyper: BE 64bit
// unsigned hyper: BE 64bit
// float: BE float
// double: BE double
// quadruple: BE long double
// opaque[n] (fixed-length): bytes, padded to !(n%4)
// opaque (variable-length): BE 32bit length, then n bytes, padded to !(n%4)
// string: same as opaque
// array<T>[n] (fixed-length): n items of type T
// vector<T> (variable-length): BE 32bit length, then n items of type T
// struct: components in the same order as specified
// union: BE 32bit variant id, then variant of the union
// void: nothing (empty, 0 byte data)
// optional (XDR T*): BE 32bit 1/0, then T or nothing
// linked list: sequence of optional entries
//
// RPC over TCP:
//
// BE 32bit length, then rpc_msg, then the procedure message itself
#pragma once
#include "xdr_impl.h"
#include <string.h>
#include <endian.h>
#include <vector>
#include "../malloc_or_die.h"
#define FALSE 0
#define TRUE 1
#define XDR_ENCODE 0
#define XDR_DECODE 1
#define BYTES_PER_XDR_UNIT 4
#define IXDR_PUT_U_LONG(a, b)
#define IXDR_GET_U_LONG(a) 0
#define IXDR_PUT_BOOL(a, b)
#define IXDR_GET_BOOL(a) 0
#define XDR_INLINE(xdrs, len) NULL
struct xdr_linked_list_t
{
xdrproc_t fn;
unsigned entry_size, size, cap;
void *base;
unsigned has_next, link_offset;
};
struct XDR
{
int x_op;
// For decoding:
uint8_t *buf = NULL;
unsigned avail = 0;
std::vector<void*> allocs;
std::vector<xdr_linked_list_t> in_linked_list;
// For encoding:
std::vector<uint8_t> cur_out;
unsigned last_end = 0;
std::vector<iovec> buf_list;
};
uint32_t inline len_pad4(uint32_t len)
{
return ((len+3)/4) * 4;
}
inline int xdr_opaque(XDR *xdrs, void *data, uint32_t len)
{
if (len <= 0)
{
return 1;
}
if (xdrs->x_op == XDR_DECODE)
{
uint32_t padded = len_pad4(len);
if (xdrs->avail < padded)
return 0;
memcpy(data, xdrs->buf, len);
xdrs->buf += padded;
xdrs->avail -= padded;
}
else
{
unsigned old = xdrs->cur_out.size();
uint32_t pad = (len & 3) ? (4 - (len & 3)) : 0;
xdrs->cur_out.resize(old + len + pad);
memcpy(xdrs->cur_out.data()+old, data, len);
for (uint32_t i = 0; i < pad; i++)
xdrs->cur_out[old+i] = 0;
}
return 1;
}
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
{
if (xdrs->x_op == XDR_DECODE)
{
if (xdrs->avail < 4)
return 0;
uint32_t len = be32toh(*((uint32_t*)xdrs->buf));
uint32_t padded = len_pad4(len);
if (xdrs->avail < 4+padded)
return 0;
data->size = len;
data->data = (char*)(xdrs->buf+4);
xdrs->buf += 4+padded;
xdrs->avail -= 4+padded;
}
else
{
if (data->size < XDR_COPY_LENGTH)
{
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old + 4+data->size);
*(uint32_t*)(xdrs->cur_out.data() + old) = htobe32(data->size);
memcpy(xdrs->cur_out.data()+old+4, data->data, data->size);
}
else
{
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old + 4);
*(uint32_t*)(xdrs->cur_out.data() + old) = htobe32(data->size);
xdrs->buf_list.push_back((iovec){
.iov_base = 0,
.iov_len = xdrs->cur_out.size() - xdrs->last_end,
});
xdrs->last_end = xdrs->cur_out.size();
xdrs->buf_list.push_back((iovec)
{
.iov_base = (void*)data->data,
.iov_len = data->size,
});
}
if (data->size & 3)
{
int pad = 4-(data->size & 3);
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old+pad);
for (int i = 0; i < pad; i++)
xdrs->cur_out[old+i] = 0;
}
}
return 1;
}
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
{
return xdr_bytes(xdrs, data, maxlen);
}
inline int xdr_u_int(XDR *xdrs, void *data)
{
if (xdrs->x_op == XDR_DECODE)
{
if (xdrs->avail < 4)
return 0;
*((uint32_t*)data) = be32toh(*((uint32_t*)xdrs->buf));
xdrs->buf += 4;
xdrs->avail -= 4;
}
else
{
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old + 4);
*(uint32_t*)(xdrs->cur_out.data() + old) = htobe32(*(uint32_t*)data);
}
return 1;
}
inline int xdr_enum(XDR *xdrs, void *data)
{
return xdr_u_int(xdrs, data);
}
inline int xdr_bool(XDR *xdrs, void *data)
{
return xdr_u_int(xdrs, data);
}
inline int xdr_uint64_t(XDR *xdrs, void *data)
{
if (xdrs->x_op == XDR_DECODE)
{
if (xdrs->avail < 8)
return 0;
*((uint64_t*)data) = be64toh(*((uint64_t*)xdrs->buf));
xdrs->buf += 8;
xdrs->avail -= 8;
}
else
{
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old + 8);
*(uint64_t*)(xdrs->cur_out.data() + old) = htobe64(*(uint64_t*)data);
}
return 1;
}
// Parse inconvenient shitty linked lists as arrays
inline int xdr_pointer(XDR *xdrs, char **data, unsigned entry_size, xdrproc_t entry_fn)
{
if (xdrs->x_op == XDR_DECODE)
{
if (xdrs->avail < 4)
return 0;
uint32_t has_next = be32toh(*((uint32_t*)xdrs->buf));
xdrs->buf += 4;
xdrs->avail -= 4;
*data = NULL;
if (!xdrs->in_linked_list.size() ||
xdrs->in_linked_list.back().fn != entry_fn)
{
if (has_next)
{
unsigned cap = 2;
void *base = malloc_or_die(entry_size * cap);
xdrs->in_linked_list.push_back((xdr_linked_list_t){
.fn = entry_fn,
.entry_size = entry_size,
.size = 1,
.cap = cap,
.base = base,
.has_next = 0,
.link_offset = 0,
});
*data = (char*)base;
if (!entry_fn(xdrs, base))
return 0;
auto & ll = xdrs->in_linked_list.back();
while (ll.has_next)
{
ll.has_next = 0;
if (ll.size >= ll.cap)
{
ll.cap *= 2;
ll.base = realloc_or_die(ll.base, ll.entry_size * ll.cap);
}
if (!entry_fn(xdrs, (uint8_t*)ll.base + ll.entry_size*ll.size))
return 0;
ll.size++;
}
for (unsigned i = 0; i < ll.size-1; i++)
{
*(void**)((uint8_t*)ll.base + i*ll.entry_size + ll.link_offset) =
(uint8_t*)ll.base + (i+1)*ll.entry_size;
}
xdrs->allocs.push_back(ll.base);
xdrs->in_linked_list.pop_back();
}
}
else
{
auto & ll = xdrs->in_linked_list.back();
xdrs->in_linked_list.back().has_next = has_next;
xdrs->in_linked_list.back().link_offset = (uint8_t*)data - (uint8_t*)ll.base - ll.entry_size*ll.size;
}
}
else
{
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old + 4);
*(uint32_t*)(xdrs->cur_out.data() + old) = htobe32(*data ? 1 : 0);
if (*data)
entry_fn(xdrs, *data);
}
return 1;
}
inline int xdr_array(XDR *xdrs, char **data, uint32_t* len, uint32_t maxlen, uint32_t entry_size, xdrproc_t fn)
{
if (xdrs->x_op == XDR_DECODE)
{
if (xdrs->avail < 4)
return 0;
*len = be32toh(*((uint32_t*)xdrs->buf));
if (*len > maxlen)
return 0;
xdrs->buf += 4;
xdrs->avail -= 4;
*data = (char*)malloc_or_die(entry_size * (*len));
for (uint32_t i = 0; i < *len; i++)
fn(xdrs, *data + entry_size*i);
xdrs->allocs.push_back(*data);
}
else
{
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old + 4);
*(uint32_t*)(xdrs->cur_out.data() + old) = htobe32(*len);
for (uint32_t i = 0; i < *len; i++)
fn(xdrs, *data + entry_size*i);
}
return 1;
}

1297
src/nfs_conn.cpp Normal file

File diff suppressed because it is too large Load Diff

184
src/nfs_portmap.cpp Normal file
View File

@ -0,0 +1,184 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Portmap service for NFS proxy
#include <netinet/in.h>
#include <string.h>
#include "nfs/portmap.h"
#include "nfs/xdr_impl_inline.h"
#include "malloc_or_die.h"
#include "nfs_portmap.h"
#include "sha256.h"
#include "base64.h"
/*
* The NULL procedure. All protocols/versions must provide a NULL procedure
* as index 0.
* It is used by clients, and rpcinfo, to "ping" a service and verify that
* the service is available and that it does support the indicated version.
*/
static int pmap2_null_proc(struct rpc_context *rpc, rpc_op_t *rop)
{
rpc_queue_reply(rop);
return 0;
}
/*
* v2 GETPORT.
* This is the lookup function for portmapper version 2.
* A client provides program, version and protocol (tcp or udp)
* and portmapper returns which port that service is available on,
* (or 0 if no such program is registered.)
*/
static int pmap2_getport_proc(portmap_service_t *self, rpc_op_t *rop)
{
PMAP2GETPORTargs *args = (PMAP2GETPORTargs *)rop->request;
uint32_t *reply = (uint32_t *)rop->reply;
auto it = self->reg_ports.lower_bound((portmap_id_t){
.prog = args->prog,
.vers = args->vers,
.udp = args->prot == IPPROTO_UDP,
.ipv6 = false,
});
if (it != self->reg_ports.end() &&
it->prog == args->prog && it->vers == args->vers &&
it->udp == (args->prot == IPPROTO_UDP))
{
*reply = it->port;
}
else
{
*reply = 0;
}
rpc_queue_reply(rop);
return 0;
}
/*
* v2 DUMP.
* This RPC returns a list of all endpoints that are registered with
* portmapper.
*/
static int pmap2_dump_proc(portmap_service_t *self, rpc_op_t *rop)
{
pmap2_mapping_list *list = (pmap2_mapping_list*)malloc_or_die(sizeof(pmap2_mapping_list) * self->reg_ports.size());
xdr_add_malloc(rop->xdrs, list);
PMAP2DUMPres *reply = (PMAP2DUMPres *)rop->reply;
int i = 0;
for (auto it = self->reg_ports.begin(); it != self->reg_ports.end(); it++)
{
if (it->ipv6)
continue;
list[i] = {
.map = {
.prog = it->prog,
.vers = it->vers,
.prot = it->udp ? IPPROTO_UDP : IPPROTO_TCP,
.port = it->port,
},
.next = list+i+1,
};
i++;
}
list[i-1].next = NULL;
// Send reply
reply->list = list;
rpc_queue_reply(rop);
return 0;
}
/*
* v3 GETADDR.
* This is the lookup function for portmapper version 3.
*/
static int pmap3_getaddr_proc(portmap_service_t *self, rpc_op_t *rop)
{
PMAP3GETADDRargs *args = (PMAP3GETADDRargs *)rop->request;
PMAP3GETADDRres *reply = (PMAP3GETADDRres *)rop->reply;
portmap_id_t ref = (portmap_id_t){
.prog = args->prog,
.vers = args->vers,
.udp = args->netid == "udp" || args->netid == "udp6",
.ipv6 = args->netid == "tcp6" || args->netid == "udp6",
};
auto it = self->reg_ports.lower_bound(ref);
if (it != self->reg_ports.end() &&
it->prog == ref.prog && it->vers == ref.vers &&
it->udp == ref.udp && it->ipv6 == ref.ipv6)
{
reply->addr = xdr_copy_string(rop->xdrs, it->addr);
}
else
{
reply->addr = {};
}
rpc_queue_reply(rop);
return 0;
}
/*
* v3 DUMP.
* This RPC returns a list of all endpoints that are registered with
* portmapper.
*/
static std::string netid_udp = "udp";
static std::string netid_udp6 = "udp6";
static std::string netid_tcp = "tcp";
static std::string netid_tcp6 = "tcp6";
static int pmap3_dump_proc(portmap_service_t *self, rpc_op_t *rop)
{
PMAP3DUMPres *reply = (PMAP3DUMPres *)rop->reply;
pmap3_mapping_list *list = (pmap3_mapping_list*)malloc(sizeof(pmap3_mapping_list*) * self->reg_ports.size());
xdr_add_malloc(rop->xdrs, list);
int i = 0;
for (auto it = self->reg_ports.begin(); it != self->reg_ports.end(); it++)
{
list[i] = (pmap3_mapping_list){
.map = (pmap3_mapping){
.prog = it->prog,
.vers = it->vers,
.netid = xdr_copy_string(rop->xdrs, it->ipv6
? (it->udp ? netid_udp6 : netid_tcp6)
: (it->udp ? netid_udp : netid_tcp)),
.addr = xdr_copy_string(rop->xdrs, it->addr), // 0.0.0.0.port
.owner = xdr_copy_string(rop->xdrs, it->owner),
},
.next = list+i+1,
};
i++;
}
list[i-1].next = NULL;
reply->list = list;
rpc_queue_reply(rop);
return 0;
}
portmap_service_t::portmap_service_t()
{
struct rpc_service_proc_t pt[] = {
{PMAP_PROGRAM, PMAP_V2, PMAP2_NULL, (rpc_handler_t)pmap2_null_proc, NULL, 0, NULL, 0, this},
{PMAP_PROGRAM, PMAP_V2, PMAP2_GETPORT, (rpc_handler_t)pmap2_getport_proc, (xdrproc_t)xdr_PMAP2GETPORTargs, sizeof(PMAP2GETPORTargs), (xdrproc_t)xdr_u_int, sizeof(u_int), this},
{PMAP_PROGRAM, PMAP_V2, PMAP2_DUMP, (rpc_handler_t)pmap2_dump_proc, NULL, 0, (xdrproc_t)xdr_PMAP2DUMPres, sizeof(PMAP2DUMPres), this},
{PMAP_PROGRAM, PMAP_V3, PMAP3_NULL, (rpc_handler_t)pmap2_null_proc, NULL, 0, NULL, 0, this},
{PMAP_PROGRAM, PMAP_V3, PMAP3_GETADDR, (rpc_handler_t)pmap3_getaddr_proc, (xdrproc_t)xdr_PMAP3GETADDRargs, sizeof(PMAP3GETADDRargs), (xdrproc_t)xdr_string, sizeof(xdr_string_t), this},
{PMAP_PROGRAM, PMAP_V3, PMAP3_DUMP, (rpc_handler_t)pmap3_dump_proc, NULL, 0, (xdrproc_t)xdr_PMAP3DUMPres, sizeof(PMAP3DUMPres), this},
};
for (int i = 0; i < sizeof(pt)/sizeof(pt[0]); i++)
{
proc_table.push_back(pt[i]);
}
}
std::string sha256(const std::string & str)
{
std::string hash;
hash.resize(32);
SHA256_CTX ctx;
sha256_init(&ctx);
sha256_update(&ctx, (uint8_t*)str.data(), str.size());
sha256_final(&ctx, (uint8_t*)hash.data());
return hash;
}

39
src/nfs_portmap.h Normal file
View File

@ -0,0 +1,39 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Portmap service for NFS proxy
#pragma once
#include <string>
#include <set>
#include <vector>
#include "nfs/rpc_impl.h"
struct portmap_id_t
{
unsigned prog, vers;
bool udp;
bool ipv6;
unsigned port;
std::string owner;
std::string addr;
};
class portmap_service_t
{
public:
std::set<portmap_id_t> reg_ports;
std::vector<rpc_service_proc_t> proc_table;
portmap_service_t();
};
inline bool operator < (const portmap_id_t &a, const portmap_id_t &b)
{
return a.prog < b.prog || a.prog == b.prog && a.vers < b.vers ||
a.prog == b.prog && a.vers == b.vers && a.udp < b.udp ||
a.prog == b.prog && a.vers == b.vers && a.udp == b.udp && a.ipv6 < b.ipv6;
}
std::string sha256(const std::string & str);

972
src/nfs_proxy.cpp Normal file
View File

@ -0,0 +1,972 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Simplified NFS proxy
// Presents all images as files
// Keeps image/file list in memory and is thus unsuitable for a large number of files
#define _XOPEN_SOURCE
#include <limits.h>
#include <sys/random.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
//#include <signal.h>
#include "nfs/nfs.h"
#include "nfs/rpc.h"
#include "nfs/portmap.h"
#include "addr_util.h"
#include "base64.h"
#include "nfs_proxy.h"
#include "http_client.h"
#include "cli.h"
#define ETCD_INODE_STATS_WATCH_ID 101
#define ETCD_POOL_STATS_WATCH_ID 102
const char *exe_name = NULL;
nfs_proxy_t::~nfs_proxy_t()
{
if (cmd)
delete cmd;
if (cli)
delete cli;
if (epmgr)
delete epmgr;
if (ringloop)
delete ringloop;
}
json11::Json::object nfs_proxy_t::parse_args(int narg, const char *args[])
{
json11::Json::object cfg;
for (int i = 1; i < narg; i++)
{
if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
{
printf(
"Vitastor NFS 3.0 proxy\n"
"(c) Vitaliy Filippov, 2021-2022 (VNPL-1.1)\n"
"\n"
"USAGE:\n"
" %s [--etcd_address ADDR] [OTHER OPTIONS]\n"
" --subdir <DIR> export images prefixed <DIR>/ (default empty - export all images)\n"
" --portmap 0 do not listen on port 111 (portmap/rpcbind, requires root)\n"
" --bind <IP> bind service to <IP> address (default 0.0.0.0)\n"
" --nfspath <PATH> set NFS export path to <PATH> (default is /)\n"
" --port <PORT> use port <PORT> for NFS services (default is 2049)\n"
" --pool <POOL> use <POOL> as default pool for new files (images)\n"
" --foreground 1 stay in foreground, do not daemonize\n"
"\n"
"NFS proxy is stateless if you use immediate_commit=all in your cluster, so\n"
"you can freely use multiple NFS proxies with L3 load balancing in this case.\n"
"\n"
"Example start and mount commands for a custom NFS port:\n"
" %s --etcd_address 192.168.5.10:2379 --portmap 0 --port 2050 --pool testpool\n"
" mount localhost:/ /mnt/ -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp\n",
exe_name, exe_name
);
exit(0);
}
else if (args[i][0] == '-' && args[i][1] == '-')
{
const char *opt = args[i]+2;
cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i];
}
}
return cfg;
}
void nfs_proxy_t::run(json11::Json cfg)
{
while (getrandom(&server_id, sizeof(server_id), 0) != sizeof(server_id)) {}
// Parse options
bind_address = cfg["bind"].string_value();
if (bind_address == "")
bind_address = "0.0.0.0";
default_pool = cfg["pool"].as_string();
portmap_enabled = cfg.object_items().find("portmap") == cfg.object_items().end() ||
cfg["portmap"].uint64_value() ||
cfg["portmap"].string_value() == "yes" ||
cfg["portmap"].string_value() == "true";
nfs_port = cfg["port"].uint64_value() & 0xffff;
if (!nfs_port)
nfs_port = 2049;
export_root = cfg["nfspath"].string_value();
if (!export_root.size())
export_root = "/";
name_prefix = cfg["subdir"].string_value();
{
int e = name_prefix.size();
while (e > 0 && name_prefix[e-1] == '/')
e--;
int s = 0;
while (s < e && name_prefix[s] == '/')
s++;
name_prefix = name_prefix.substr(s, e-s);
if (name_prefix.size())
name_prefix += "/";
}
// Create client
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
cmd = new cli_tool_t();
cmd->ringloop = ringloop;
cmd->epmgr = epmgr;
cmd->cli = cli;
// We need inode name hashes for NFS handles to remain stateless and <= 64 bytes long
dir_info[""] = (nfs_dir_t){
.id = 1,
.mod_rev = 0,
};
clock_gettime(CLOCK_REALTIME, &dir_info[""].mtime);
watch_stats();
assert(cli->st_cli.on_inode_change_hook == NULL);
cli->st_cli.on_inode_change_hook = [this](inode_t changed_inode, bool removed)
{
auto inode_cfg_it = cli->st_cli.inode_config.find(changed_inode);
if (inode_cfg_it == cli->st_cli.inode_config.end())
{
return;
}
auto & inode_cfg = inode_cfg_it->second;
std::string full_name = inode_cfg.name;
if (name_prefix != "" && full_name.substr(0, name_prefix.size()) != name_prefix)
{
return;
}
// Calculate directory modification time and revision (used as "cookie verifier")
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
dir_info[""].mod_rev = dir_info[""].mod_rev < inode_cfg.mod_revision ? inode_cfg.mod_revision : dir_info[""].mod_rev;
dir_info[""].mtime = now;
int pos = full_name.find('/', name_prefix.size());
while (pos >= 0)
{
std::string dir = full_name.substr(0, pos);
auto & dinf = dir_info[dir];
if (!dinf.id)
dinf.id = next_dir_id++;
dinf.mod_rev = dinf.mod_rev < inode_cfg.mod_revision ? inode_cfg.mod_revision : dinf.mod_rev;
dinf.mtime = now;
dir_by_hash["S"+base64_encode(sha256(dir))] = dir;
pos = full_name.find('/', pos+1);
}
// Alter inode_by_hash
if (removed)
{
auto ino_it = hash_by_inode.find(changed_inode);
if (ino_it != hash_by_inode.end())
{
inode_by_hash.erase(ino_it->second);
hash_by_inode.erase(ino_it);
}
}
else
{
std::string hash = "S"+base64_encode(sha256(full_name));
auto hbi_it = hash_by_inode.find(changed_inode);
if (hbi_it != hash_by_inode.end() && hbi_it->second != hash)
{
// inode had a different name, remove old hash=>inode pointer
inode_by_hash.erase(hbi_it->second);
}
inode_by_hash[hash] = changed_inode;
hash_by_inode[changed_inode] = hash;
}
};
// Load image metadata
while (!cli->is_ready())
{
ringloop->loop();
if (cli->is_ready())
break;
ringloop->wait();
}
// Check default pool
check_default_pool();
// Self-register portmap and NFS
pmap.reg_ports.insert((portmap_id_t){
.prog = PMAP_PROGRAM,
.vers = PMAP_V2,
.port = portmap_enabled ? 111 : nfs_port,
.owner = "portmapper-service",
.addr = portmap_enabled ? "0.0.0.0.0.111" : ("0.0.0.0.0."+std::to_string(nfs_port)),
});
pmap.reg_ports.insert((portmap_id_t){
.prog = PMAP_PROGRAM,
.vers = PMAP_V3,
.port = portmap_enabled ? 111 : nfs_port,
.owner = "portmapper-service",
.addr = portmap_enabled ? "0.0.0.0.0.111" : ("0.0.0.0.0."+std::to_string(nfs_port)),
});
pmap.reg_ports.insert((portmap_id_t){
.prog = NFS_PROGRAM,
.vers = NFS_V3,
.port = nfs_port,
.owner = "nfs-server",
.addr = "0.0.0.0.0."+std::to_string(nfs_port),
});
pmap.reg_ports.insert((portmap_id_t){
.prog = MOUNT_PROGRAM,
.vers = MOUNT_V3,
.port = nfs_port,
.owner = "rpc.mountd",
.addr = "0.0.0.0.0."+std::to_string(nfs_port),
});
// Create NFS socket and add it to epoll
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, NULL);
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
{
if (epoll_events & EPOLLRDHUP)
{
fprintf(stderr, "Listening portmap socket disconnected, exiting\n");
exit(1);
}
else
{
do_accept(nfs_socket);
}
});
if (portmap_enabled)
{
// Create portmap socket and add it to epoll
int portmap_socket = create_and_bind_socket(bind_address, 111, 128, NULL);
fcntl(portmap_socket, F_SETFL, fcntl(portmap_socket, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(portmap_socket, false, [this](int portmap_socket, int epoll_events)
{
if (epoll_events & EPOLLRDHUP)
{
fprintf(stderr, "Listening portmap socket disconnected, exiting\n");
exit(1);
}
else
{
do_accept(portmap_socket);
}
});
}
if (cfg["foreground"].is_null())
{
daemonize();
}
while (true)
{
ringloop->loop();
ringloop->wait();
}
/*// Sync at the end
cluster_op_t *close_sync = new cluster_op_t;
close_sync->opcode = OSD_OP_SYNC;
close_sync->callback = [&stop](cluster_op_t *op)
{
stop = true;
delete op;
};
cli->execute(close_sync);*/
// Destroy the client
delete cli;
delete epmgr;
delete ringloop;
cli = NULL;
epmgr = NULL;
ringloop = NULL;
}
void nfs_proxy_t::watch_stats()
{
assert(cli->st_cli.on_start_watcher_hook == NULL);
cli->st_cli.on_start_watcher_hook = [this](http_co_t *etcd_watch_ws)
{
http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object {
{ "create_request", json11::Json::object {
{ "key", base64_encode(cli->st_cli.etcd_prefix+"/inode/stats/") },
{ "range_end", base64_encode(cli->st_cli.etcd_prefix+"/inode/stats0") },
{ "start_revision", cli->st_cli.etcd_watch_revision },
{ "watch_id", ETCD_INODE_STATS_WATCH_ID },
{ "progress_notify", true },
} }
}).dump());
http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object {
{ "create_request", json11::Json::object {
{ "key", base64_encode(cli->st_cli.etcd_prefix+"/pool/stats/") },
{ "range_end", base64_encode(cli->st_cli.etcd_prefix+"/pool/stats0") },
{ "start_revision", cli->st_cli.etcd_watch_revision },
{ "watch_id", ETCD_POOL_STATS_WATCH_ID },
{ "progress_notify", true },
} }
}).dump());
cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "success", json11::Json::array {
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(cli->st_cli.etcd_prefix+"/inode/stats/") },
{ "range_end", base64_encode(cli->st_cli.etcd_prefix+"/inode/stats0") },
} }
},
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(cli->st_cli.etcd_prefix+"/pool/stats/") },
{ "range_end", base64_encode(cli->st_cli.etcd_prefix+"/pool/stats0") },
} }
},
} },
}, [this](std::string err, json11::Json res)
{
for (auto & rsp: res["responses"].array_items())
{
for (auto & item: rsp["response_range"]["kvs"].array_items())
{
etcd_kv_t kv = cli->st_cli.parse_etcd_kv(item);
parse_stats(kv);
}
}
});
};
cli->st_cli.on_change_hook = [this, old_hook = cli->st_cli.on_change_hook](std::map<std::string, etcd_kv_t> & changes)
{
for (auto & p: changes)
{
parse_stats(p.second);
}
};
}
void nfs_proxy_t::parse_stats(etcd_kv_t & kv)
{
auto & key = kv.key;
if (key.substr(0, cli->st_cli.etcd_prefix.length()+13) == cli->st_cli.etcd_prefix+"/inode/stats/")
{
pool_id_t pool_id = 0;
inode_t inode_num = 0;
char null_byte = 0;
sscanf(key.c_str() + cli->st_cli.etcd_prefix.length()+13, "%u/%lu%c", &pool_id, &inode_num, &null_byte);
if (!pool_id || pool_id >= POOL_ID_MAX || !inode_num || null_byte != 0)
{
fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str());
}
else
{
inode_stats[INODE_WITH_POOL(pool_id, inode_num)] = kv.value;
}
}
else if (key.substr(0, cli->st_cli.etcd_prefix.length()+12) == cli->st_cli.etcd_prefix+"/pool/stats/")
{
pool_id_t pool_id = 0;
char null_byte = 0;
sscanf(key.c_str() + cli->st_cli.etcd_prefix.length()+12, "%u%c", &pool_id, &null_byte);
if (!pool_id || pool_id >= POOL_ID_MAX)
{
fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str());
}
else
{
pool_stats[pool_id] = kv.value;
}
}
}
void nfs_proxy_t::check_default_pool()
{
if (default_pool == "")
{
if (cli->st_cli.pool_config.size() == 1)
{
default_pool = cli->st_cli.pool_config.begin()->second.name;
default_pool_id = cli->st_cli.pool_config.begin()->first;
}
else
{
fprintf(stderr, "There are %lu pools. Please select default pool with --pool option\n", cli->st_cli.pool_config.size());
exit(1);
}
}
else
{
for (auto & p: cli->st_cli.pool_config)
{
if (p.second.name == default_pool)
{
default_pool_id = p.first;
break;
}
}
if (!default_pool_id)
{
fprintf(stderr, "Pool %s is not found\n", default_pool.c_str());
exit(1);
}
}
}
void nfs_proxy_t::do_accept(int listen_fd)
{
struct sockaddr_storage addr;
socklen_t addr_size = sizeof(addr);
int nfs_fd = 0;
while ((nfs_fd = accept(listen_fd, (struct sockaddr *)&addr, &addr_size)) >= 0)
{
fprintf(stderr, "New client %d: connection from %s\n", nfs_fd, addr_to_string(addr).c_str());
fcntl(nfs_fd, F_SETFL, fcntl(nfs_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(nfs_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
auto cli = new nfs_client_t();
cli->parent = this;
cli->nfs_fd = nfs_fd;
for (auto & fn: pmap.proc_table)
{
cli->proc_table.insert(fn);
}
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events)
{
// Handle incoming event
if (epoll_events & EPOLLRDHUP)
{
fprintf(stderr, "Client %d disconnected\n", nfs_fd);
cli->stop();
return;
}
cli->epoll_events |= epoll_events;
if (epoll_events & EPOLLIN)
{
// Something is available for reading
cli->submit_read(0);
}
if (epoll_events & EPOLLOUT)
{
cli->submit_send();
}
});
}
if (nfs_fd < 0 && errno != EAGAIN)
{
fprintf(stderr, "Failed to accept connection: %s\n", strerror(errno));
exit(1);
}
}
// FIXME Move these functions to "rpc_context"
void nfs_client_t::select_read_buffer(unsigned wanted_size)
{
if (free_buffers.size())
{
auto & b = free_buffers.back();
if (b.size < wanted_size)
{
cur_buffer = {
.buf = (uint8_t*)malloc_or_die(wanted_size),
.size = wanted_size,
};
}
else
{
cur_buffer = {
.buf = b.buf,
.size = b.size,
};
}
free_buffers.pop_back();
}
else
{
unsigned sz = RPC_INIT_BUF_SIZE;
if (sz < wanted_size)
{
sz = wanted_size;
}
cur_buffer = {
.buf = (uint8_t*)malloc_or_die(sz),
.size = sz,
};
}
}
void nfs_client_t::submit_read(unsigned wanted_size)
{
if (read_msg.msg_iovlen)
{
return;
}
io_uring_sqe* sqe = parent->ringloop->get_sqe();
if (!sqe)
{
read_msg.msg_iovlen = 0;
parent->ringloop->wakeup();
return;
}
if (!cur_buffer.buf || cur_buffer.size <= cur_buffer.read_pos)
{
assert(!wanted_size);
if (cur_buffer.buf)
{
if (cur_buffer.refs > 0)
{
used_buffers[cur_buffer.buf] = (rpc_used_buffer_t){
.size = cur_buffer.size,
.refs = cur_buffer.refs,
};
}
else
{
free_buffers.push_back((rpc_free_buffer_t){
.buf = cur_buffer.buf,
.size = cur_buffer.size,
});
}
}
select_read_buffer(wanted_size);
}
assert(wanted_size <= cur_buffer.size-cur_buffer.read_pos);
read_iov = {
.iov_base = cur_buffer.buf+cur_buffer.read_pos,
.iov_len = wanted_size ? wanted_size : cur_buffer.size-cur_buffer.read_pos,
};
read_msg.msg_iov = &read_iov;
read_msg.msg_iovlen = 1;
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this](ring_data_t *data) { handle_read(data->res); };
my_uring_prep_recvmsg(sqe, nfs_fd, &read_msg, 0);
refs++;
}
void nfs_client_t::handle_read(int result)
{
read_msg.msg_iovlen = 0;
if (deref())
return;
if (result <= 0 && result != -EAGAIN && result != -EINTR)
{
printf("Failed read from client %d: %d (%s)\n", nfs_fd, result, strerror(-result));
stop();
return;
}
if (result > 0)
{
cur_buffer.read_pos += result;
assert(cur_buffer.read_pos <= cur_buffer.size);
// Try to parse incoming RPC messages
uint8_t *data = cur_buffer.buf + cur_buffer.parsed_pos;
unsigned left = cur_buffer.read_pos - cur_buffer.parsed_pos;
while (left > 0)
{
// Assemble all fragments
unsigned fragments = 0;
uint32_t wanted = 0;
while (1)
{
fragments++;
wanted += 4;
if (left < wanted)
{
break;
}
// FIXME: Limit message size
uint32_t frag_size = be32toh(*(uint32_t*)(data + wanted - 4));
wanted += (frag_size & 0x7FFFFFFF);
if (left < wanted || (frag_size & 0x80000000))
{
break;
}
}
if (left >= wanted)
{
if (fragments > 1)
{
// Merge fragments. Fragmented messages are probably not that common,
// so it's probably fine to do an additional memory copy
unsigned frag_offset = 8+be32toh(*(uint32_t*)(data));
unsigned dest_offset = 4+be32toh(*(uint32_t*)(data));
unsigned frag_num = 1;
while (frag_num < fragments)
{
uint32_t frag_size = be32toh(*(uint32_t*)(data + frag_offset - 4)) & 0x7FFFFFFF;
memmove(data + dest_offset, data + frag_offset, frag_size);
frag_offset += 4+frag_size;
dest_offset += frag_size;
frag_num++;
}
}
// Handle full message
int referenced = handle_rpc_message(cur_buffer.buf, data+4, wanted-4*fragments);
cur_buffer.refs += referenced ? 1 : 0;
cur_buffer.parsed_pos += 4+wanted-4*fragments;
data += wanted;
left -= wanted;
}
else if (cur_buffer.size >= (data - cur_buffer.buf + wanted))
{
// Read the tail and come back
submit_read(wanted-left);
break;
}
else
{
// No place to put the whole tail
if (cur_buffer.refs > 0)
{
used_buffers[cur_buffer.buf] = (rpc_used_buffer_t){
.size = cur_buffer.size,
.refs = cur_buffer.refs,
};
select_read_buffer(wanted);
memcpy(cur_buffer.buf, data, left);
}
else if (cur_buffer.size < wanted)
{
uint8_t *old_buf = cur_buffer.buf;
select_read_buffer(wanted);
memcpy(cur_buffer.buf, data, left);
free(old_buf);
}
else
{
memmove(cur_buffer.buf, data, left);
}
cur_buffer.read_pos = left;
cur_buffer.parsed_pos = 0;
// Restart from the beginning
submit_read(wanted-left);
break;
}
}
}
}
void nfs_client_t::submit_send()
{
if (write_msg.msg_iovlen || !send_list.size())
{
return;
}
io_uring_sqe* sqe = parent->ringloop->get_sqe();
if (!sqe)
{
write_msg.msg_iovlen = 0;
parent->ringloop->wakeup();
return;
}
write_msg.msg_iov = send_list.data();
write_msg.msg_iovlen = send_list.size() < IOV_MAX ? send_list.size() : IOV_MAX;
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this](ring_data_t *data) { handle_send(data->res); };
my_uring_prep_sendmsg(sqe, nfs_fd, &write_msg, 0);
refs++;
}
bool nfs_client_t::deref()
{
refs--;
if (stopped && refs <= 0)
{
stop();
return true;
}
return false;
}
void nfs_client_t::stop()
{
stopped = true;
if (refs <= 0)
{
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
close(nfs_fd);
delete this;
}
}
void nfs_client_t::handle_send(int result)
{
write_msg.msg_iovlen = 0;
if (deref())
return;
if (result <= 0 && result != -EAGAIN && result != -EINTR)
{
printf("Failed send to client %d: %d (%s)\n", nfs_fd, result, strerror(-result));
stop();
return;
}
if (result > 0)
{
int done = 0;
while (result > 0 && done < send_list.size())
{
iovec & iov = send_list[done];
if (iov.iov_len <= result)
{
auto rop = outbox[done];
if (rop)
{
// Reply fully sent
xdr_reset(rop->xdrs);
parent->xdr_pool.push_back(rop->xdrs);
if (rop->buffer && rop->referenced)
{
// Dereference the buffer
if (rop->buffer == cur_buffer.buf)
{
cur_buffer.refs--;
}
else
{
auto & ub = used_buffers.at(rop->buffer);
assert(ub.refs > 0);
ub.refs--;
if (ub.refs == 0)
{
// FIXME Maybe put free_buffers into parent
free_buffers.push_back((rpc_free_buffer_t){
.buf = rop->buffer,
.size = ub.size,
});
used_buffers.erase(rop->buffer);
}
}
}
free(rop);
}
result -= iov.iov_len;
done++;
}
else
{
iov.iov_len -= result;
iov.iov_base = (uint8_t*)iov.iov_base + result;
break;
}
}
if (done > 0)
{
send_list.erase(send_list.begin(), send_list.begin()+done);
outbox.erase(outbox.begin(), outbox.begin()+done);
}
if (next_send_list.size())
{
send_list.insert(send_list.end(), next_send_list.begin(), next_send_list.end());
outbox.insert(outbox.end(), next_outbox.begin(), next_outbox.end());
next_send_list.clear();
next_outbox.clear();
}
if (outbox.size() > 0)
{
submit_send();
}
}
}
void rpc_queue_reply(rpc_op_t *rop)
{
nfs_client_t *self = (nfs_client_t*)rop->client;
iovec *iov_list = NULL;
unsigned iov_count = 0;
int r = xdr_encode(rop->xdrs, (xdrproc_t)xdr_rpc_msg, &rop->out_msg);
assert(r);
if (rop->reply_fn != NULL)
{
r = xdr_encode(rop->xdrs, rop->reply_fn, rop->reply);
assert(r);
}
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
assert(iov_count > 0);
rop->reply_marker = 0;
for (unsigned i = 0; i < iov_count; i++)
{
rop->reply_marker += iov_list[i].iov_len;
}
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
to_outbox.push_back(NULL);
for (unsigned i = 0; i < iov_count; i++)
{
to_send_list.push_back(iov_list[i]);
to_outbox.push_back(NULL);
}
to_outbox[to_outbox.size()-1] = rop;
self->submit_send();
}
int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len)
{
// Take an XDR object from the pool
XDR *xdrs;
if (parent->xdr_pool.size())
{
xdrs = parent->xdr_pool.back();
parent->xdr_pool.pop_back();
}
else
{
xdrs = xdr_create();
}
// Decode the RPC header
char inmsg_data[sizeof(rpc_msg)];
rpc_msg *inmsg = (rpc_msg*)&inmsg_data;
if (!xdr_decode(xdrs, msg_buf, msg_len, (xdrproc_t)xdr_rpc_msg, inmsg))
{
// Invalid message, ignore it
xdr_reset(xdrs);
parent->xdr_pool.push_back(xdrs);
return 0;
}
if (inmsg->body.dir != RPC_CALL)
{
// Reply sent to the server? Strange thing. Also ignore it
xdr_reset(xdrs);
parent->xdr_pool.push_back(xdrs);
return 0;
}
if (inmsg->body.cbody.rpcvers != RPC_MSG_VERSION)
{
// Bad RPC version
rpc_op_t *rop = (rpc_op_t*)malloc(sizeof(rpc_op_t));
*rop = {
.client = this,
.xdrs = xdrs,
.out_msg = (rpc_msg){
.xid = inmsg->xid,
.body = (rpc_msg_body){
.dir = RPC_REPLY,
.rbody = (rpc_reply_body){
.stat = RPC_MSG_DENIED,
.rreply = (rpc_rejected_reply){
.stat = RPC_MISMATCH,
.mismatch_info = (rpc_mismatch_info){
.min_version = RPC_MSG_VERSION,
.max_version = RPC_MSG_VERSION,
},
},
},
},
},
};
rpc_queue_reply(rop);
// Incoming buffer isn't needed to handle request, so return 0
return 0;
}
// Find decoder for the request
auto proc_it = proc_table.find((rpc_service_proc_t){
.prog = inmsg->body.cbody.prog,
.vers = inmsg->body.cbody.vers,
.proc = inmsg->body.cbody.proc,
});
if (proc_it == proc_table.end())
{
// Procedure not implemented
uint32_t min_vers = 0, max_vers = 0;
auto prog_it = proc_table.lower_bound((rpc_service_proc_t){
.prog = inmsg->body.cbody.prog,
});
if (prog_it != proc_table.end())
{
min_vers = prog_it->vers;
auto max_vers_it = proc_table.lower_bound((rpc_service_proc_t){
.prog = inmsg->body.cbody.prog+1,
});
assert(max_vers_it != proc_table.begin());
max_vers_it--;
assert(max_vers_it->prog == inmsg->body.cbody.prog);
max_vers = max_vers_it->vers;
}
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(sizeof(rpc_op_t));
*rop = {
.client = this,
.xdrs = xdrs,
.out_msg = (rpc_msg){
.xid = inmsg->xid,
.body = (rpc_msg_body){
.dir = RPC_REPLY,
.rbody = (rpc_reply_body){
.stat = RPC_MSG_ACCEPTED,
.areply = (rpc_accepted_reply){
.reply_data = (rpc_accepted_reply_body){
.stat = (min_vers == 0
? RPC_PROG_UNAVAIL
: (min_vers <= inmsg->body.cbody.vers &&
max_vers >= inmsg->body.cbody.vers
? RPC_PROC_UNAVAIL
: RPC_PROG_MISMATCH)),
.mismatch_info = (rpc_mismatch_info){ .min_version = min_vers, .max_version = max_vers },
},
},
},
},
},
};
rpc_queue_reply(rop);
// Incoming buffer isn't needed to handle request, so return 0
return 0;
}
// Allocate memory
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(
sizeof(rpc_op_t) + proc_it->req_size + proc_it->resp_size
);
*rop = (rpc_op_t){
.client = this,
.buffer = (uint8_t*)base_buf,
.xdrs = xdrs,
.out_msg = (rpc_msg){
.xid = inmsg->xid,
.body = (rpc_msg_body){
.dir = RPC_REPLY,
.rbody = (rpc_reply_body){
.stat = RPC_MSG_ACCEPTED,
},
},
},
.request = ((uint8_t*)rop) + sizeof(rpc_op_t),
.reply = ((uint8_t*)rop) + sizeof(rpc_op_t) + proc_it->req_size,
};
memcpy(&rop->in_msg, inmsg, sizeof(rpc_msg));
// Try to decode the request
// req_fn may be NULL, that means function has no arguments
if (proc_it->req_fn && !proc_it->req_fn(xdrs, rop->request))
{
// Invalid request
rop->out_msg.body.rbody.areply.reply_data.stat = RPC_GARBAGE_ARGS;
rpc_queue_reply(rop);
// Incoming buffer isn't needed to handle request, so return 0
return 0;
}
rop->out_msg.body.rbody.areply.reply_data.stat = RPC_SUCCESS;
rop->reply_fn = proc_it->resp_fn;
int ref = proc_it->handler_fn(proc_it->opaque, rop);
rop->referenced = ref ? 1 : 0;
return ref;
}
void nfs_proxy_t::daemonize()
{
if (fork())
exit(0);
setsid();
if (fork())
exit(0);
if (chdir("/") != 0)
fprintf(stderr, "Warning: Failed to chdir into /\n");
close(0);
close(1);
close(2);
open("/dev/null", O_RDONLY);
open("/dev/null", O_WRONLY);
open("/dev/null", O_WRONLY);
}
int main(int narg, const char *args[])
{
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
exe_name = args[0];
nfs_proxy_t *p = new nfs_proxy_t();
p->run(nfs_proxy_t::parse_args(narg, args));
delete p;
return 0;
}

124
src/nfs_proxy.h Normal file
View File

@ -0,0 +1,124 @@
#pragma once
#include "cluster_client.h"
#include "epoll_manager.h"
#include "nfs_portmap.h"
#include "nfs/xdr_impl.h"
#define RPC_INIT_BUF_SIZE 32768
class cli_tool_t;
struct nfs_dir_t
{
uint64_t id;
uint64_t mod_rev;
timespec mtime;
};
class nfs_proxy_t
{
public:
std::string bind_address;
std::string name_prefix;
uint64_t fsid = 1;
uint64_t server_id = 0;
std::string default_pool;
std::string export_root;
bool portmap_enabled;
unsigned nfs_port;
pool_id_t default_pool_id;
portmap_service_t pmap;
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
cluster_client_t *cli = NULL;
cli_tool_t *cmd = NULL;
std::vector<XDR*> xdr_pool;
// filehandle = "S"+base64(sha256(full name with prefix)) or "roothandle" for mount root)
uint64_t next_dir_id = 2;
// filehandle => dir with name_prefix
std::map<std::string, std::string> dir_by_hash;
// dir with name_prefix => dir info
std::map<std::string, nfs_dir_t> dir_info;
// filehandle => inode ID
std::map<std::string, inode_t> inode_by_hash;
// inode ID => filehandle
std::map<inode_t, std::string> hash_by_inode;
// inode ID => statistics
std::map<inode_t, json11::Json> inode_stats;
// pool ID => statistics
std::map<pool_id_t, json11::Json> pool_stats;
~nfs_proxy_t();
static json11::Json::object parse_args(int narg, const char *args[]);
void run(json11::Json cfg);
void watch_stats();
void parse_stats(etcd_kv_t & kv);
void check_default_pool();
void do_accept(int listen_fd);
void daemonize();
};
struct rpc_cur_buffer_t
{
uint8_t *buf;
unsigned size;
unsigned read_pos;
unsigned parsed_pos;
int refs;
};
struct rpc_used_buffer_t
{
unsigned size;
int refs;
};
struct rpc_free_buffer_t
{
uint8_t *buf;
unsigned size;
};
class nfs_client_t
{
public:
nfs_proxy_t *parent = NULL;
int nfs_fd;
int epoll_events = 0;
int refs = 0;
bool stopped = false;
std::set<rpc_service_proc_t> proc_table;
// Read state
rpc_cur_buffer_t cur_buffer = { 0 };
std::map<uint8_t*, rpc_used_buffer_t> used_buffers;
std::vector<rpc_free_buffer_t> free_buffers;
iovec read_iov;
msghdr read_msg = { 0 };
// Write state
msghdr write_msg = { 0 };
std::vector<iovec> send_list, next_send_list;
std::vector<rpc_op_t*> outbox, next_outbox;
nfs_client_t();
~nfs_client_t();
void select_read_buffer(unsigned wanted_size);
void submit_read(unsigned wanted_size);
void handle_read(int result);
void submit_send();
void handle_send(int result);
int handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len);
bool deref();
void stop();
};

158
src/sha256.c Normal file
View File

@ -0,0 +1,158 @@
/*********************************************************************
* Filename: sha256.c
* Author: Brad Conte (brad AT bradconte.com)
* Copyright:
* Disclaimer: This code is presented "as is" without any guarantees.
* Details: Implementation of the SHA-256 hashing algorithm.
SHA-256 is one of the three algorithms in the SHA2
specification. The others, SHA-384 and SHA-512, are not
offered in this implementation.
Algorithm specification can be found here:
* http://csrc.nist.gov/publications/fips/fips180-2/fips180-2withchangenotice.pdf
This implementation uses little endian byte order.
*********************************************************************/
/*************************** HEADER FILES ***************************/
#include <stdlib.h>
#include <memory.h>
#include "sha256.h"
/****************************** MACROS ******************************/
#define ROTLEFT(a,b) (((a) << (b)) | ((a) >> (32-(b))))
#define ROTRIGHT(a,b) (((a) >> (b)) | ((a) << (32-(b))))
#define CH(x,y,z) (((x) & (y)) ^ (~(x) & (z)))
#define MAJ(x,y,z) (((x) & (y)) ^ ((x) & (z)) ^ ((y) & (z)))
#define EP0(x) (ROTRIGHT(x,2) ^ ROTRIGHT(x,13) ^ ROTRIGHT(x,22))
#define EP1(x) (ROTRIGHT(x,6) ^ ROTRIGHT(x,11) ^ ROTRIGHT(x,25))
#define SIG0(x) (ROTRIGHT(x,7) ^ ROTRIGHT(x,18) ^ ((x) >> 3))
#define SIG1(x) (ROTRIGHT(x,17) ^ ROTRIGHT(x,19) ^ ((x) >> 10))
/**************************** VARIABLES *****************************/
static const WORD k[64] = {
0x428a2f98,0x71374491,0xb5c0fbcf,0xe9b5dba5,0x3956c25b,0x59f111f1,0x923f82a4,0xab1c5ed5,
0xd807aa98,0x12835b01,0x243185be,0x550c7dc3,0x72be5d74,0x80deb1fe,0x9bdc06a7,0xc19bf174,
0xe49b69c1,0xefbe4786,0x0fc19dc6,0x240ca1cc,0x2de92c6f,0x4a7484aa,0x5cb0a9dc,0x76f988da,
0x983e5152,0xa831c66d,0xb00327c8,0xbf597fc7,0xc6e00bf3,0xd5a79147,0x06ca6351,0x14292967,
0x27b70a85,0x2e1b2138,0x4d2c6dfc,0x53380d13,0x650a7354,0x766a0abb,0x81c2c92e,0x92722c85,
0xa2bfe8a1,0xa81a664b,0xc24b8b70,0xc76c51a3,0xd192e819,0xd6990624,0xf40e3585,0x106aa070,
0x19a4c116,0x1e376c08,0x2748774c,0x34b0bcb5,0x391c0cb3,0x4ed8aa4a,0x5b9cca4f,0x682e6ff3,
0x748f82ee,0x78a5636f,0x84c87814,0x8cc70208,0x90befffa,0xa4506ceb,0xbef9a3f7,0xc67178f2
};
/*********************** FUNCTION DEFINITIONS ***********************/
void sha256_transform(SHA256_CTX *ctx, const BYTE data[])
{
WORD a, b, c, d, e, f, g, h, i, j, t1, t2, m[64];
for (i = 0, j = 0; i < 16; ++i, j += 4)
m[i] = (data[j] << 24) | (data[j + 1] << 16) | (data[j + 2] << 8) | (data[j + 3]);
for ( ; i < 64; ++i)
m[i] = SIG1(m[i - 2]) + m[i - 7] + SIG0(m[i - 15]) + m[i - 16];
a = ctx->state[0];
b = ctx->state[1];
c = ctx->state[2];
d = ctx->state[3];
e = ctx->state[4];
f = ctx->state[5];
g = ctx->state[6];
h = ctx->state[7];
for (i = 0; i < 64; ++i) {
t1 = h + EP1(e) + CH(e,f,g) + k[i] + m[i];
t2 = EP0(a) + MAJ(a,b,c);
h = g;
g = f;
f = e;
e = d + t1;
d = c;
c = b;
b = a;
a = t1 + t2;
}
ctx->state[0] += a;
ctx->state[1] += b;
ctx->state[2] += c;
ctx->state[3] += d;
ctx->state[4] += e;
ctx->state[5] += f;
ctx->state[6] += g;
ctx->state[7] += h;
}
void sha256_init(SHA256_CTX *ctx)
{
ctx->datalen = 0;
ctx->bitlen = 0;
ctx->state[0] = 0x6a09e667;
ctx->state[1] = 0xbb67ae85;
ctx->state[2] = 0x3c6ef372;
ctx->state[3] = 0xa54ff53a;
ctx->state[4] = 0x510e527f;
ctx->state[5] = 0x9b05688c;
ctx->state[6] = 0x1f83d9ab;
ctx->state[7] = 0x5be0cd19;
}
void sha256_update(SHA256_CTX *ctx, const BYTE data[], size_t len)
{
WORD i;
for (i = 0; i < len; ++i) {
ctx->data[ctx->datalen] = data[i];
ctx->datalen++;
if (ctx->datalen == 64) {
sha256_transform(ctx, ctx->data);
ctx->bitlen += 512;
ctx->datalen = 0;
}
}
}
void sha256_final(SHA256_CTX *ctx, BYTE hash[])
{
WORD i;
i = ctx->datalen;
// Pad whatever data is left in the buffer.
if (ctx->datalen < 56) {
ctx->data[i++] = 0x80;
while (i < 56)
ctx->data[i++] = 0x00;
}
else {
ctx->data[i++] = 0x80;
while (i < 64)
ctx->data[i++] = 0x00;
sha256_transform(ctx, ctx->data);
memset(ctx->data, 0, 56);
}
// Append to the padding the total message's length in bits and transform.
ctx->bitlen += ctx->datalen * 8;
ctx->data[63] = ctx->bitlen;
ctx->data[62] = ctx->bitlen >> 8;
ctx->data[61] = ctx->bitlen >> 16;
ctx->data[60] = ctx->bitlen >> 24;
ctx->data[59] = ctx->bitlen >> 32;
ctx->data[58] = ctx->bitlen >> 40;
ctx->data[57] = ctx->bitlen >> 48;
ctx->data[56] = ctx->bitlen >> 56;
sha256_transform(ctx, ctx->data);
// Since this implementation uses little endian byte ordering and SHA uses big endian,
// reverse all the bytes when copying the final state to the output hash.
for (i = 0; i < 4; ++i) {
hash[i] = (ctx->state[0] >> (24 - i * 8)) & 0x000000ff;
hash[i + 4] = (ctx->state[1] >> (24 - i * 8)) & 0x000000ff;
hash[i + 8] = (ctx->state[2] >> (24 - i * 8)) & 0x000000ff;
hash[i + 12] = (ctx->state[3] >> (24 - i * 8)) & 0x000000ff;
hash[i + 16] = (ctx->state[4] >> (24 - i * 8)) & 0x000000ff;
hash[i + 20] = (ctx->state[5] >> (24 - i * 8)) & 0x000000ff;
hash[i + 24] = (ctx->state[6] >> (24 - i * 8)) & 0x000000ff;
hash[i + 28] = (ctx->state[7] >> (24 - i * 8)) & 0x000000ff;
}
}

41
src/sha256.h Normal file
View File

@ -0,0 +1,41 @@
/*********************************************************************
* Filename: sha256.h
* Author: Brad Conte (brad AT bradconte.com)
* Copyright:
* Disclaimer: This code is presented "as is" without any guarantees.
* Details: Defines the API for the corresponding SHA1 implementation.
*********************************************************************/
#ifndef SHA256_H
#define SHA256_H
/*************************** HEADER FILES ***************************/
#include <stddef.h>
/****************************** MACROS ******************************/
#define SHA256_BLOCK_SIZE 32 // SHA256 outputs a 32 byte digest
#ifdef __cplusplus
extern "C" {
#endif
/**************************** DATA TYPES ****************************/
typedef unsigned char BYTE; // 8-bit byte
typedef unsigned int WORD; // 32-bit word, change to "long" for 16-bit machines
typedef struct {
BYTE data[64];
WORD datalen;
unsigned long long bitlen;
WORD state[8];
} SHA256_CTX;
/*********************** FUNCTION DECLARATIONS **********************/
void sha256_init(SHA256_CTX *ctx);
void sha256_update(SHA256_CTX *ctx, const BYTE data[], size_t len);
void sha256_final(SHA256_CTX *ctx, BYTE hash[]);
#ifdef __cplusplus
};
#endif
#endif // SHA256_H