From 6b2dd50f27922ede9e8f2483dec17199e2b550b1 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 8 May 2021 18:20:43 +0300 Subject: [PATCH 1/9] Fix build without RDMA --- src/messenger.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/messenger.cpp b/src/messenger.cpp index 59dc0c21..64862bbd 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -516,10 +516,12 @@ void osd_messenger_t::accept_connections(int listen_fd) } } +#ifdef WITH_RDMA bool osd_messenger_t::is_rdma_enabled() { return rdma_context != NULL; } +#endif json11::Json osd_messenger_t::read_config(const json11::Json & config) { From 699a0fbbc7bf5bf7ebb9ea8a7ba2a5281eb36fab Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 15 May 2021 19:20:38 +0300 Subject: [PATCH 2/9] Log to stderr instead of stdout in client --- src/cluster_client.cpp | 6 +++--- src/messenger.cpp | 32 ++++++++++++++++---------------- src/msgr_op.h | 8 ++++---- src/msgr_rdma.cpp | 16 ++++++++-------- src/msgr_receive.cpp | 8 ++++---- src/msgr_send.cpp | 4 ++-- src/msgr_stop.cpp | 4 ++-- 7 files changed, 39 insertions(+), 39 deletions(-) diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index 408e30a7..6bc2ff7e 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -51,7 +51,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd msgr.exec_op = [this](osd_op_t *op) { // Garbage in - printf("Incoming garbage from peer %d\n", op->peer_fd); + fprintf(stderr, "Incoming garbage from peer %d\n", op->peer_fd); msgr.stop_client(op->peer_fd); delete op; }; @@ -1072,8 +1072,8 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) if (part->op.reply.hdr.retval != expected) { // Operation failed, retry - printf( - "%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n", + fprintf( + stderr, "%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n", osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected ); if (part->op.reply.hdr.retval == -EPIPE) diff --git a/src/messenger.cpp b/src/messenger.cpp index 64862bbd..d25bfe1a 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -21,13 +21,13 @@ void osd_messenger_t::init() ); if (!rdma_context) { - printf("[OSD %lu] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num); + fprintf(stderr, "[OSD %lu] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num); } else { rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge ? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge; - printf("[OSD %lu] RDMA initialized successfully\n", osd_num); + fprintf(stderr, "[OSD %lu] RDMA initialized successfully\n", osd_num); fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK); tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events) { @@ -55,7 +55,7 @@ void osd_messenger_t::init() if (!cl->ping_time_remaining) { // Ping timed out, stop the client - printf("Ping timed out for OSD %lu (client %d), disconnecting peer\n", cl->osd_num, cl->peer_fd); + fprintf(stderr, "Ping timed out for OSD %lu (client %d), disconnecting peer\n", cl->osd_num, cl->peer_fd); to_stop.push_back(cl->peer_fd); } } @@ -82,7 +82,7 @@ void osd_messenger_t::init() delete op; if (fail_fd >= 0) { - printf("Ping failed for OSD %lu (client %d), disconnecting peer\n", cl->osd_num, cl->peer_fd); + fprintf(stderr, "Ping failed for OSD %lu (client %d), disconnecting peer\n", cl->osd_num, cl->peer_fd); stop_client(fail_fd, true); } }; @@ -305,7 +305,7 @@ void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events) if (epoll_events & EPOLLRDHUP) { // Stop client - printf("[OSD %lu] client %d disconnected\n", this->osd_num, peer_fd); + fprintf(stderr, "[OSD %lu] client %d disconnected\n", this->osd_num, peer_fd); stop_client(peer_fd, true); } else if (epoll_events & EPOLLIN) @@ -330,7 +330,7 @@ void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) wp.connecting = false; if (peer_fd < 0) { - printf("Failed to connect to peer OSD %lu address %s port %d: %s\n", peer_osd, wp.cur_addr.c_str(), wp.cur_port, strerror(-peer_fd)); + fprintf(stderr, "Failed to connect to peer OSD %lu address %s port %d: %s\n", peer_osd, wp.cur_addr.c_str(), wp.cur_port, strerror(-peer_fd)); if (wp.address_changed) { wp.address_changed = false; @@ -357,7 +357,7 @@ void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) } if (log_level > 0) { - printf("[OSD %lu] Connected with peer OSD %lu (client %d)\n", osd_num, peer_osd, peer_fd); + fprintf(stderr, "[OSD %lu] Connected with peer OSD %lu (client %d)\n", osd_num, peer_osd, peer_fd); } wanted_peers.erase(peer_osd); repeer_pgs(peer_osd); @@ -403,7 +403,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) if (op->reply.hdr.retval < 0) { err = true; - printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl->osd_num, op->reply.hdr.retval); + fprintf(stderr, "Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl->osd_num, op->reply.hdr.retval); } else { @@ -411,18 +411,18 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) if (json_err != "") { err = true; - printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl->osd_num, json_err.c_str()); + fprintf(stderr, "Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl->osd_num, json_err.c_str()); } else if (config["osd_num"].uint64_value() != cl->osd_num) { err = true; - printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl->osd_num); + fprintf(stderr, "Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl->osd_num); } else if (config["protocol_version"].uint64_value() != OSD_PROTOCOL_VERSION) { err = true; - printf( - "OSD %lu protocol version is %lu, but only version %u is supported.\n" + fprintf( + stderr, "OSD %lu protocol version is %lu, but only version %u is supported.\n" " If you need to upgrade from 0.5.x please request it via the issue tracker.\n", cl->osd_num, config["protocol_version"].uint64_value(), OSD_PROTOCOL_VERSION ); @@ -443,8 +443,8 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) || cl->rdma_conn->connect(&addr) != 0) { - printf( - "Failed to connect to OSD %lu (address %s) using RDMA\n", + fprintf( + stderr, "Failed to connect to OSD %lu (address %s) using RDMA\n", cl->osd_num, config["rdma_address"].string_value().c_str() ); delete cl->rdma_conn; @@ -465,7 +465,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) } if (log_level > 0) { - printf("Connected to OSD %lu using RDMA\n", cl->osd_num); + fprintf(stderr, "Connected to OSD %lu using RDMA\n", cl->osd_num); } cl->peer_state = PEER_RDMA; tfd->set_fd_handler(cl->peer_fd, false, NULL); @@ -491,7 +491,7 @@ void osd_messenger_t::accept_connections(int listen_fd) { assert(peer_fd != 0); char peer_str[256]; - printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd, + fprintf(stderr, "[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd, inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); int one = 1; diff --git a/src/msgr_op.h b/src/msgr_op.h index 3a75b117..fb571901 100644 --- a/src/msgr_op.h +++ b/src/msgr_op.h @@ -76,7 +76,7 @@ struct osd_op_buf_list_t buf = (iovec*)malloc(sizeof(iovec) * alloc); if (!buf) { - printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); + fprintf(stderr, "Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); exit(1); } memcpy(buf, inline_buf, sizeof(iovec) * old); @@ -87,7 +87,7 @@ struct osd_op_buf_list_t buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); if (!buf) { - printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); + fprintf(stderr, "Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); exit(1); } } @@ -109,7 +109,7 @@ struct osd_op_buf_list_t buf = (iovec*)malloc(sizeof(iovec) * alloc); if (!buf) { - printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); + fprintf(stderr, "Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); exit(1); } memcpy(buf, inline_buf, sizeof(iovec)*old); @@ -120,7 +120,7 @@ struct osd_op_buf_list_t buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); if (!buf) { - printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); + fprintf(stderr, "Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); exit(1); } } diff --git a/src/msgr_rdma.cpp b/src/msgr_rdma.cpp index e082bf32..57276d98 100644 --- a/src/msgr_rdma.cpp +++ b/src/msgr_rdma.cpp @@ -315,8 +315,8 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64 if (r != 0) { delete rdma_conn; - printf( - "Failed to connect RDMA queue pair to %s (client %d)\n", + fprintf( + stderr, "Failed to connect RDMA queue pair to %s (client %d)\n", addr.to_string().c_str(), peer_fd ); } @@ -346,7 +346,7 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); if (err || bad_wr) { - printf("RDMA send failed: %s\n", strerror(err)); + fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); exit(1); } cl->rdma_conn->cur_send++; @@ -408,7 +408,7 @@ static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr); if (err || bad_wr) { - printf("RDMA receive failed: %s\n", strerror(err)); + fprintf(stderr, "RDMA receive failed: %s\n", strerror(err)); exit(1); } cl->rdma_conn->cur_recv++; @@ -445,7 +445,7 @@ void osd_messenger_t::handle_rdma_events() } if (ibv_req_notify_cq(rdma_context->cq, 0) != 0) { - printf("Failed to request RDMA completion notification, exiting\n"); + fprintf(stderr, "Failed to request RDMA completion notification, exiting\n"); exit(1); } ibv_wc wc[RDMA_EVENTS_AT_ONCE]; @@ -465,12 +465,12 @@ void osd_messenger_t::handle_rdma_events() osd_client_t *cl = cl_it->second; if (wc[i].status != IBV_WC_SUCCESS) { - printf("RDMA work request failed for client %d", client_id); + fprintf(stderr, "RDMA work request failed for client %d", client_id); if (cl->osd_num) { - printf(" (OSD %lu)", cl->osd_num); + fprintf(stderr, " (OSD %lu)", cl->osd_num); } - printf(" with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status)); + fprintf(stderr, " with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status)); stop_client(client_id); continue; } diff --git a/src/msgr_receive.cpp b/src/msgr_receive.cpp index 873527b4..d90ace26 100644 --- a/src/msgr_receive.cpp +++ b/src/msgr_receive.cpp @@ -72,7 +72,7 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl) // this is a client socket, so don't panic on error. just disconnect it if (result != 0) { - printf("Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result)); + fprintf(stderr, "Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result)); } stop_client(cl->peer_fd); return false; @@ -177,7 +177,7 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl) handle_op_hdr(cl); else { - printf("Received garbage: magic=%lx id=%lu opcode=%lx from %d\n", cl->read_op->req.hdr.magic, cl->read_op->req.hdr.id, cl->read_op->req.hdr.opcode, cl->peer_fd); + fprintf(stderr, "Received garbage: magic=%lx id=%lu opcode=%lx from %d\n", cl->read_op->req.hdr.magic, cl->read_op->req.hdr.id, cl->read_op->req.hdr.opcode, cl->peer_fd); stop_client(cl->peer_fd); return false; } @@ -292,7 +292,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) if (req_it == cl->sent_ops.end()) { // Command out of sync. Drop connection - printf("Client %d command out of sync: id %lu\n", cl->peer_fd, cl->read_op->req.hdr.id); + fprintf(stderr, "Client %d command out of sync: id %lu\n", cl->peer_fd, cl->read_op->req.hdr.id); stop_client(cl->peer_fd); return false; } @@ -307,7 +307,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) if (op->reply.hdr.retval >= 0 && (op->reply.hdr.retval != expected_size || bmp_len > op->bitmap_len)) { // Check reply length to not overflow the buffer - printf("Client %d read reply of different length: expected %u+%u, got %ld+%u\n", + fprintf(stderr, "Client %d read reply of different length: expected %u+%u, got %ld+%u\n", cl->peer_fd, expected_size, op->bitmap_len, op->reply.hdr.retval, bmp_len); cl->sent_ops[op->req.hdr.id] = op; stop_client(cl->peer_fd); diff --git a/src/msgr_send.cpp b/src/msgr_send.cpp index 972ec0bd..44ee7570 100644 --- a/src/msgr_send.cpp +++ b/src/msgr_send.cpp @@ -227,7 +227,7 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl) if (result < 0 && result != -EAGAIN) { // this is a client socket, so don't panic. just disconnect it - printf("Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result)); + fprintf(stderr, "Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result)); stop_client(cl->peer_fd); return; } @@ -274,7 +274,7 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl) // FIXME: Ignore pings during RDMA state transition if (log_level > 0) { - printf("Successfully connected with client %d using RDMA\n", cl->peer_fd); + fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd); } cl->peer_state = PEER_RDMA; tfd->set_fd_handler(cl->peer_fd, false, NULL); diff --git a/src/msgr_stop.cpp b/src/msgr_stop.cpp index ae15a923..cf8de660 100644 --- a/src/msgr_stop.cpp +++ b/src/msgr_stop.cpp @@ -58,11 +58,11 @@ void osd_messenger_t::stop_client(int peer_fd, bool force) { if (cl->osd_num) { - printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num); + fprintf(stderr, "[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num); } else { - printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); + fprintf(stderr, "[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); } } // First set state to STOPPED so another stop_client() call doesn't try to free it again From bf591ba3ee7b22aac1ec63c56378dd3c0f1f07c6 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 15 May 2021 19:21:01 +0300 Subject: [PATCH 3/9] Fix nbd module load check --- src/nbd_proxy.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nbd_proxy.cpp b/src/nbd_proxy.cpp index 2a0c5811..8d827a64 100644 --- a/src/nbd_proxy.cpp +++ b/src/nbd_proxy.cpp @@ -269,7 +269,7 @@ public: void load_module() { - if (access("/sys/module/nbd", F_OK)) + if (access("/sys/module/nbd", F_OK) == 0) { return; } From c467acc38800d8ce49fb8d171337c07255b533e6 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 15 May 2021 19:21:38 +0300 Subject: [PATCH 4/9] Fix /v3 appendage to etcd URLs without /v3 --- src/etcd_state_client.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index 15b7c833..527047ef 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -84,7 +84,7 @@ void etcd_state_client_t::add_etcd_url(std::string addr) printf("HTTPS is unsupported for etcd. Either use plain HTTP or setup a local proxy for etcd interaction\n"); exit(1); } - if (addr.find('/') < 0) + if (addr.find('/') == std::string::npos) addr += "/v3"; this->etcd_addresses.push_back(addr); } From 57be1923d3ef298642c9bd0a1954311cb3ad93cf Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 15 May 2021 22:14:57 +0300 Subject: [PATCH 5/9] Daemonize NBD_DO_IT process, correctly cleanup unmounted NBD clients --- src/nbd_proxy.cpp | 50 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/src/nbd_proxy.cpp b/src/nbd_proxy.cpp index 8d827a64..f1ae3794 100644 --- a/src/nbd_proxy.cpp +++ b/src/nbd_proxy.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -200,9 +201,10 @@ public: fcntl(sockfd[0], F_SETFL, fcntl(sockfd[0], F_GETFL, 0) | O_NONBLOCK); nbd_fd = sockfd[0]; load_module(); + bool bg = cfg["foreground"].is_null(); if (!cfg["dev_num"].is_null()) { - if (run_nbd(sockfd, cfg["dev_num"].int64_value(), device_size, NBD_FLAG_SEND_FLUSH, 30) < 0) + if (run_nbd(sockfd, cfg["dev_num"].int64_value(), device_size, NBD_FLAG_SEND_FLUSH, 30, bg) < 0) { perror("run_nbd"); exit(1); @@ -214,7 +216,7 @@ public: int i = 0; while (true) { - int r = run_nbd(sockfd, i, device_size, NBD_FLAG_SEND_FLUSH, 30); + int r = run_nbd(sockfd, i, device_size, NBD_FLAG_SEND_FLUSH, 30, bg); if (r == 0) { printf("/dev/nbd%d\n", i); @@ -237,7 +239,7 @@ public: } } } - if (cfg["foreground"].is_null()) + if (bg) { daemonize(); } @@ -254,17 +256,42 @@ public: }; ringloop->register_consumer(&consumer); // Add FD to epoll - epmgr->tfd->set_fd_handler(sockfd[0], false, [this](int peer_fd, int epoll_events) + bool stop = false; + epmgr->tfd->set_fd_handler(sockfd[0], false, [this, &stop](int peer_fd, int epoll_events) { - read_ready++; - submit_read(); + if (epoll_events & EPOLLRDHUP) + { + close(peer_fd); + stop = true; + } + else + { + read_ready++; + submit_read(); + } }); - while (1) + while (!stop) { ringloop->loop(); ringloop->wait(); } - // FIXME: Cleanup when exiting + stop = false; + cluster_op_t *close_sync = new cluster_op_t; + close_sync->opcode = OSD_OP_SYNC; + close_sync->callback = [this, &stop](cluster_op_t *op) + { + stop = true; + delete op; + }; + cli->execute(close_sync); + while (!stop) + { + ringloop->loop(); + ringloop->wait(); + } + delete cli; + delete epmgr; + delete ringloop; } void load_module() @@ -411,7 +438,7 @@ public: } protected: - int run_nbd(int sockfd[2], int dev_num, uint64_t size, uint64_t flags, unsigned timeout) + int run_nbd(int sockfd[2], int dev_num, uint64_t size, uint64_t flags, unsigned timeout, bool bg) { // Check handle size assert(sizeof(cur_req.handle) == 8); @@ -459,11 +486,14 @@ protected: { // Run in child close(sockfd[0]); + if (bg) + { + daemonize(); + } r = ioctl(nbd, NBD_DO_IT); if (r < 0) { fprintf(stderr, "NBD device terminated with error: %s\n", strerror(errno)); - kill(getppid(), SIGTERM); } close(sockfd[1]); ioctl(nbd, NBD_CLEAR_QUE); From eaac1fc5d15dc083ef954f186d297537d5376316 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 16 May 2021 01:04:59 +0300 Subject: [PATCH 6/9] Log to stderr in etcd_state_client, too --- src/etcd_state_client.cpp | 56 +++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index 527047ef..1dfa5440 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -35,7 +35,7 @@ etcd_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json) kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err); if (json_err != "") { - printf("Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str()); + fprintf(stderr, "Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str()); kv.key = ""; } else @@ -81,7 +81,7 @@ void etcd_state_client_t::add_etcd_url(std::string addr) addr = addr.substr(7); else if (strtolower(addr.substr(0, 8)) == "https://") { - printf("HTTPS is unsupported for etcd. Either use plain HTTP or setup a local proxy for etcd interaction\n"); + fprintf(stderr, "HTTPS is unsupported for etcd. Either use plain HTTP or setup a local proxy for etcd interaction\n"); exit(1); } if (addr.find('/') == std::string::npos) @@ -149,7 +149,7 @@ void etcd_state_client_t::start_etcd_watcher() json11::Json data = json11::Json::parse(msg->body, json_err); if (json_err != "") { - printf("Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str()); + fprintf(stderr, "Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str()); } else { @@ -175,7 +175,7 @@ void etcd_state_client_t::start_etcd_watcher() { if (this->log_level > 3) { - printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.value.dump().c_str()); + fprintf(stderr, "Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.value.dump().c_str()); } parse_state(kv.second); } @@ -250,7 +250,7 @@ void etcd_state_client_t::load_global_config() { if (err != "") { - printf("Error reading OSD configuration from etcd: %s\n", err.c_str()); + fprintf(stderr, "Error reading OSD configuration from etcd: %s\n", err.c_str()); tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) { load_global_config(); @@ -323,7 +323,7 @@ void etcd_state_client_t::load_pgs() { if (err != "") { - printf("Error loading PGs from etcd: %s\n", err.c_str()); + fprintf(stderr, "Error loading PGs from etcd: %s\n", err.c_str()); tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) { load_pgs(); @@ -386,7 +386,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) sscanf(pool_item.first.c_str(), "%u%c", &pool_id, &null_byte); if (!pool_id || pool_id >= POOL_ID_MAX || null_byte != 0) { - printf("Pool ID %s is invalid (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX); + fprintf(stderr, "Pool ID %s is invalid (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX); continue; } pc.id = pool_id; @@ -394,7 +394,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) pc.name = pool_item.second["name"].string_value(); if (pc.name == "") { - printf("Pool %u has empty name, skipping pool\n", pool_id); + fprintf(stderr, "Pool %u has empty name, skipping pool\n", pool_id); continue; } // Failure Domain @@ -408,7 +408,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) pc.scheme = POOL_SCHEME_JERASURE; else { - printf("Pool %u has invalid coding scheme (one of \"xor\", \"replicated\" or \"jerasure\" required), skipping pool\n", pool_id); + fprintf(stderr, "Pool %u has invalid coding scheme (one of \"xor\", \"replicated\" or \"jerasure\" required), skipping pool\n", pool_id); continue; } // PG Size @@ -418,7 +418,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) (pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) || pool_item.second["pg_size"].uint64_value() > 256) { - printf("Pool %u has invalid pg_size, skipping pool\n", pool_id); + fprintf(stderr, "Pool %u has invalid pg_size, skipping pool\n", pool_id); continue; } // Parity Chunks @@ -427,7 +427,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) { if (pc.parity_chunks > 1) { - printf("Pool %u has invalid parity_chunks (must be 1), skipping pool\n", pool_id); + fprintf(stderr, "Pool %u has invalid parity_chunks (must be 1), skipping pool\n", pool_id); continue; } pc.parity_chunks = 1; @@ -435,7 +435,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) if (pc.scheme == POOL_SCHEME_JERASURE && (pc.parity_chunks < 1 || pc.parity_chunks > pc.pg_size-2)) { - printf("Pool %u has invalid parity_chunks (must be between 1 and pg_size-2), skipping pool\n", pool_id); + fprintf(stderr, "Pool %u has invalid parity_chunks (must be between 1 and pg_size-2), skipping pool\n", pool_id); continue; } // PG MinSize @@ -444,14 +444,14 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) (pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) && pc.pg_minsize < (pc.pg_size-pc.parity_chunks)) { - printf("Pool %u has invalid pg_minsize, skipping pool\n", pool_id); + fprintf(stderr, "Pool %u has invalid pg_minsize, skipping pool\n", pool_id); continue; } // PG Count pc.pg_count = pool_item.second["pg_count"].uint64_value(); if (pc.pg_count < 1) { - printf("Pool %u has invalid pg_count, skipping pool\n", pool_id); + fprintf(stderr, "Pool %u has invalid pg_count, skipping pool\n", pool_id); continue; } // Max OSD Combinations @@ -460,7 +460,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) pc.max_osd_combinations = 10000; if (pc.max_osd_combinations > 0 && pc.max_osd_combinations < 100) { - printf("Pool %u has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id); + fprintf(stderr, "Pool %u has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id); continue; } // PG Stripe Size @@ -478,7 +478,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) { if (pg_item.second.target_set.size() != parsed_cfg.pg_size) { - printf("Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n", + fprintf(stderr, "Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n", pool_id, pg_item.first, pg_item.second.target_set.size(), parsed_cfg.pg_size); pg_item.second.pause = true; } @@ -501,7 +501,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) sscanf(pool_item.first.c_str(), "%u%c", &pool_id, &null_byte); if (!pool_id || pool_id >= POOL_ID_MAX || null_byte != 0) { - printf("Pool ID %s is invalid in PG configuration (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX); + fprintf(stderr, "Pool ID %s is invalid in PG configuration (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX); continue; } for (auto & pg_item: pool_item.second.object_items()) @@ -510,7 +510,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) sscanf(pg_item.first.c_str(), "%u%c", &pg_num, &null_byte); if (!pg_num || null_byte != 0) { - printf("Bad key in pool %u PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str()); + fprintf(stderr, "Bad key in pool %u PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str()); continue; } auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num]; @@ -524,7 +524,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) } if (parsed_cfg.target_set.size() != pool_config[pool_id].pg_size) { - printf("Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n", + fprintf(stderr, "Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n", pool_id, pg_num, parsed_cfg.target_set.size(), pool_config[pool_id].pg_size); parsed_cfg.pause = true; } @@ -537,8 +537,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) { if (pg_it->second.exists && pg_it->first != ++n) { - printf( - "Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n", + fprintf( + stderr, "Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n", pool_item.second.id, pool_item.second.pg_config.size() ); for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++) @@ -561,7 +561,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) sscanf(key.c_str() + etcd_prefix.length()+12, "%u/%u%c", &pool_id, &pg_num, &null_byte); if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0) { - printf("Bad etcd key %s, ignoring\n", key.c_str()); + fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str()); } else { @@ -600,7 +600,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) sscanf(key.c_str() + etcd_prefix.length()+10, "%u/%u%c", &pool_id, &pg_num, &null_byte); if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0) { - printf("Bad etcd key %s, ignoring\n", key.c_str()); + fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str()); } else if (value.is_null()) { @@ -624,7 +624,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) } if (i >= pg_state_bit_count) { - printf("Unexpected pool %u PG %u state keyword in etcd: %s\n", pool_id, pg_num, e.dump().c_str()); + fprintf(stderr, "Unexpected pool %u PG %u state keyword in etcd: %s\n", pool_id, pg_num, e.dump().c_str()); return; } } @@ -633,7 +633,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) (state & PG_PEERING) && state != PG_PEERING || (state & PG_INCOMPLETE) && state != PG_INCOMPLETE) { - printf("Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str()); + fprintf(stderr, "Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str()); return; } this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary; @@ -671,7 +671,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) sscanf(key.c_str() + etcd_prefix.length()+14, "%lu/%lu%c", &pool_id, &inode_num, &null_byte); if (!pool_id || pool_id >= POOL_ID_MAX || !inode_num || (inode_num >> (64-POOL_ID_BITS)) || null_byte != 0) { - printf("Bad etcd key %s, ignoring\n", key.c_str()); + fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str()); } else { @@ -706,8 +706,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) parent_inode_num |= pool_id << (64-POOL_ID_BITS); else if (parent_pool_id >= POOL_ID_MAX) { - printf( - "Inode %lu/%lu parent_pool value is invalid, ignoring parent setting\n", + fprintf( + stderr, "Inode %lu/%lu parent_pool value is invalid, ignoring parent setting\n", inode_num >> (64-POOL_ID_BITS), inode_num & ((1l << (64-POOL_ID_BITS)) - 1) ); parent_inode_num = 0; From fd8244699b83e8cae54d155cd54437b49d5bccee Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 16 May 2021 01:15:43 +0300 Subject: [PATCH 7/9] Implement basic CSI driver Currently can create and remove volumes, but resizing and snapshots is not supported yet --- csi/.dockerignore | 3 + csi/Dockerfile | 32 ++ csi/Makefile | 9 + csi/deploy/000-csi-namespace.yaml | 5 + csi/deploy/001-csi-config-map.yaml | 9 + csi/deploy/002-csi-nodeplugin-rbac.yaml | 37 ++ csi/deploy/003-csi-nodeplugin-psp.yaml | 72 +++ csi/deploy/004-csi-nodeplugin.yaml | 140 ++++++ csi/deploy/005-csi-provisioner-rbac.yaml | 102 +++++ csi/deploy/006-csi-provisioner-psp.yaml | 60 +++ csi/deploy/007-csi-provisioner.yaml | 159 +++++++ csi/deploy/008-csi-driver.yaml | 11 + csi/deploy/example-pvc.yaml | 12 + csi/deploy/storage-class.yaml | 19 + csi/go.mod | 35 ++ csi/src/config.go | 22 + csi/src/controllerserver.go | 530 +++++++++++++++++++++++ csi/src/grpc.go | 137 ++++++ csi/src/identityserver.go | 60 +++ csi/src/nodeserver.go | 279 ++++++++++++ csi/src/server.go | 36 ++ csi/vitastor-csi.go | 39 ++ 22 files changed, 1808 insertions(+) create mode 100644 csi/.dockerignore create mode 100644 csi/Dockerfile create mode 100644 csi/Makefile create mode 100644 csi/deploy/000-csi-namespace.yaml create mode 100644 csi/deploy/001-csi-config-map.yaml create mode 100644 csi/deploy/002-csi-nodeplugin-rbac.yaml create mode 100644 csi/deploy/003-csi-nodeplugin-psp.yaml create mode 100644 csi/deploy/004-csi-nodeplugin.yaml create mode 100644 csi/deploy/005-csi-provisioner-rbac.yaml create mode 100644 csi/deploy/006-csi-provisioner-psp.yaml create mode 100644 csi/deploy/007-csi-provisioner.yaml create mode 100644 csi/deploy/008-csi-driver.yaml create mode 100644 csi/deploy/example-pvc.yaml create mode 100644 csi/deploy/storage-class.yaml create mode 100644 csi/go.mod create mode 100644 csi/src/config.go create mode 100644 csi/src/controllerserver.go create mode 100644 csi/src/grpc.go create mode 100644 csi/src/identityserver.go create mode 100644 csi/src/nodeserver.go create mode 100644 csi/src/server.go create mode 100644 csi/vitastor-csi.go diff --git a/csi/.dockerignore b/csi/.dockerignore new file mode 100644 index 00000000..2d1d3bab --- /dev/null +++ b/csi/.dockerignore @@ -0,0 +1,3 @@ +vitastor-csi +go.sum +Dockerfile diff --git a/csi/Dockerfile b/csi/Dockerfile new file mode 100644 index 00000000..eea29631 --- /dev/null +++ b/csi/Dockerfile @@ -0,0 +1,32 @@ +# Compile stage +FROM golang:buster AS build + +ADD go.mod /app/ +RUN cd /app; CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go mod download -x +ADD . /app +RUN perl -i -e '$/ = undef; while(<>) { s/\n\s*(\{\s*\n)/$1\n/g; s/\}(\s*\n\s*)else\b/$1} else/g; print; }' `find /app -name '*.go'` +RUN cd /app; CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o vitastor-csi + +# Final stage +FROM debian:buster + +LABEL maintainers="Vitaliy Filippov " +LABEL description="Vitastor CSI Driver" + +ENV NODE_ID="" +ENV CSI_ENDPOINT="" + +RUN apt-get update && \ + apt-get install -y wget && \ + wget -q -O /etc/apt/trusted.gpg.d/vitastor.gpg https://vitastor.io/debian/pubkey.gpg && \ + (echo deb http://vitastor.io/debian buster main > /etc/apt/sources.list.d/vitastor.list) && \ + (echo deb http://deb.debian.org/debian buster-backports main > /etc/apt/sources.list.d/backports.list) && \ + (echo "APT::Install-Recommends false;" > /etc/apt/apt.conf) && \ + apt-get update && \ + apt-get install -y e2fsprogs xfsprogs vitastor kmod && \ + apt-get clean && \ + (echo options nbd nbds_max=128 > /etc/modprobe.d/nbd.conf) + +COPY --from=build /app/vitastor-csi /bin/ + +ENTRYPOINT ["/bin/vitastor-csi"] diff --git a/csi/Makefile b/csi/Makefile new file mode 100644 index 00000000..726f71c4 --- /dev/null +++ b/csi/Makefile @@ -0,0 +1,9 @@ +VERSION ?= v0.6.3 + +all: build push + +build: + @docker build --rm -t vitalif/vitastor-csi:$(VERSION) . + +push: + @docker push vitalif/vitastor-csi:$(VERSION) diff --git a/csi/deploy/000-csi-namespace.yaml b/csi/deploy/000-csi-namespace.yaml new file mode 100644 index 00000000..786b6553 --- /dev/null +++ b/csi/deploy/000-csi-namespace.yaml @@ -0,0 +1,5 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + name: vitastor-system diff --git a/csi/deploy/001-csi-config-map.yaml b/csi/deploy/001-csi-config-map.yaml new file mode 100644 index 00000000..7a39523b --- /dev/null +++ b/csi/deploy/001-csi-config-map.yaml @@ -0,0 +1,9 @@ +--- +apiVersion: v1 +kind: ConfigMap +data: + vitastor.conf: |- + {"etcd_address":"http://192.168.7.2:2379","etcd_prefix":"/vitastor"} +metadata: + namespace: vitastor-system + name: vitastor-config diff --git a/csi/deploy/002-csi-nodeplugin-rbac.yaml b/csi/deploy/002-csi-nodeplugin-rbac.yaml new file mode 100644 index 00000000..de5bd721 --- /dev/null +++ b/csi/deploy/002-csi-nodeplugin-rbac.yaml @@ -0,0 +1,37 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + namespace: vitastor-system + name: vitastor-csi-nodeplugin +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + namespace: vitastor-system + name: vitastor-csi-nodeplugin +rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get"] + # allow to read Vault Token and connection options from the Tenants namespace + - apiGroups: [""] + resources: ["secrets"] + verbs: ["get"] + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + namespace: vitastor-system + name: vitastor-csi-nodeplugin +subjects: + - kind: ServiceAccount + name: vitastor-csi-nodeplugin + namespace: vitastor-system +roleRef: + kind: ClusterRole + name: vitastor-csi-nodeplugin + apiGroup: rbac.authorization.k8s.io diff --git a/csi/deploy/003-csi-nodeplugin-psp.yaml b/csi/deploy/003-csi-nodeplugin-psp.yaml new file mode 100644 index 00000000..b0d60197 --- /dev/null +++ b/csi/deploy/003-csi-nodeplugin-psp.yaml @@ -0,0 +1,72 @@ +--- +apiVersion: policy/v1beta1 +kind: PodSecurityPolicy +metadata: + namespace: vitastor-system + name: vitastor-csi-nodeplugin-psp +spec: + allowPrivilegeEscalation: true + allowedCapabilities: + - 'SYS_ADMIN' + fsGroup: + rule: RunAsAny + privileged: true + hostNetwork: true + hostPID: true + runAsUser: + rule: RunAsAny + seLinux: + rule: RunAsAny + supplementalGroups: + rule: RunAsAny + volumes: + - 'configMap' + - 'emptyDir' + - 'projected' + - 'secret' + - 'downwardAPI' + - 'hostPath' + allowedHostPaths: + - pathPrefix: '/dev' + readOnly: false + - pathPrefix: '/run/mount' + readOnly: false + - pathPrefix: '/sys' + readOnly: false + - pathPrefix: '/lib/modules' + readOnly: true + - pathPrefix: '/var/lib/kubelet/pods' + readOnly: false + - pathPrefix: '/var/lib/kubelet/plugins/csi.vitastor.io' + readOnly: false + - pathPrefix: '/var/lib/kubelet/plugins_registry' + readOnly: false + - pathPrefix: '/var/lib/kubelet/plugins' + readOnly: false + +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + namespace: vitastor-system + name: vitastor-csi-nodeplugin-psp +rules: + - apiGroups: ['policy'] + resources: ['podsecuritypolicies'] + verbs: ['use'] + resourceNames: ['vitastor-csi-nodeplugin-psp'] + +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + namespace: vitastor-system + name: vitastor-csi-nodeplugin-psp +subjects: + - kind: ServiceAccount + name: vitastor-csi-nodeplugin + namespace: vitastor-system +roleRef: + kind: Role + name: vitastor-csi-nodeplugin-psp + apiGroup: rbac.authorization.k8s.io diff --git a/csi/deploy/004-csi-nodeplugin.yaml b/csi/deploy/004-csi-nodeplugin.yaml new file mode 100644 index 00000000..5dcf2cc6 --- /dev/null +++ b/csi/deploy/004-csi-nodeplugin.yaml @@ -0,0 +1,140 @@ +--- +kind: DaemonSet +apiVersion: apps/v1 +metadata: + namespace: vitastor-system + name: csi-vitastor +spec: + selector: + matchLabels: + app: csi-vitastor + template: + metadata: + namespace: vitastor-system + labels: + app: csi-vitastor + spec: + serviceAccountName: vitastor-csi-nodeplugin + hostNetwork: true + hostPID: true + priorityClassName: system-node-critical + # to use e.g. Rook orchestrated cluster, and mons' FQDN is + # resolved through k8s service, set dns policy to cluster first + dnsPolicy: ClusterFirstWithHostNet + containers: + - name: driver-registrar + # This is necessary only for systems with SELinux, where + # non-privileged sidecar containers cannot access unix domain socket + # created by privileged CSI driver container. + securityContext: + privileged: true + image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.2.0 + args: + - "--v=5" + - "--csi-address=/csi/csi.sock" + - "--kubelet-registration-path=/var/lib/kubelet/plugins/csi.vitastor.io/csi.sock" + env: + - name: KUBE_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + volumeMounts: + - name: socket-dir + mountPath: /csi + - name: registration-dir + mountPath: /registration + - name: csi-vitastor + securityContext: + privileged: true + capabilities: + add: ["SYS_ADMIN"] + allowPrivilegeEscalation: true + image: vitalif/vitastor-csi:v0.6.3 + args: + - "--node=$(NODE_ID)" + - "--endpoint=$(CSI_ENDPOINT)" + env: + - name: NODE_ID + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + imagePullPolicy: "IfNotPresent" + ports: + - containerPort: 9898 + name: healthz + protocol: TCP + livenessProbe: + failureThreshold: 5 + httpGet: + path: /healthz + port: healthz + initialDelaySeconds: 10 + timeoutSeconds: 3 + periodSeconds: 2 + volumeMounts: + - name: socket-dir + mountPath: /csi + - mountPath: /dev + name: host-dev + - mountPath: /sys + name: host-sys + - mountPath: /run/mount + name: host-mount + - mountPath: /lib/modules + name: lib-modules + readOnly: true + - name: vitastor-config + mountPath: /etc/vitastor + - name: plugin-dir + mountPath: /var/lib/kubelet/plugins + mountPropagation: "Bidirectional" + - name: mountpoint-dir + mountPath: /var/lib/kubelet/pods + mountPropagation: "Bidirectional" + - name: liveness-probe + securityContext: + privileged: true + image: quay.io/k8scsi/livenessprobe:v1.1.0 + args: + - "--csi-address=$(CSI_ENDPOINT)" + - "--health-port=9898" + env: + - name: CSI_ENDPOINT + value: unix://csi/csi.sock + volumeMounts: + - mountPath: /csi + name: socket-dir + volumes: + - name: socket-dir + hostPath: + path: /var/lib/kubelet/plugins/csi.vitastor.io + type: DirectoryOrCreate + - name: plugin-dir + hostPath: + path: /var/lib/kubelet/plugins + type: Directory + - name: mountpoint-dir + hostPath: + path: /var/lib/kubelet/pods + type: DirectoryOrCreate + - name: registration-dir + hostPath: + path: /var/lib/kubelet/plugins_registry/ + type: Directory + - name: host-dev + hostPath: + path: /dev + - name: host-sys + hostPath: + path: /sys + - name: host-mount + hostPath: + path: /run/mount + - name: lib-modules + hostPath: + path: /lib/modules + - name: vitastor-config + configMap: + name: vitastor-config diff --git a/csi/deploy/005-csi-provisioner-rbac.yaml b/csi/deploy/005-csi-provisioner-rbac.yaml new file mode 100644 index 00000000..d289dc7e --- /dev/null +++ b/csi/deploy/005-csi-provisioner-rbac.yaml @@ -0,0 +1,102 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + namespace: vitastor-system + name: vitastor-csi-provisioner + +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + namespace: vitastor-system + name: vitastor-external-provisioner-runner +rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["secrets"] + verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["events"] + verbs: ["list", "watch", "create", "update", "patch"] + - apiGroups: [""] + resources: ["persistentvolumes"] + verbs: ["get", "list", "watch", "create", "update", "delete", "patch"] + - apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "list", "watch", "update"] + - apiGroups: [""] + resources: ["persistentvolumeclaims/status"] + verbs: ["update", "patch"] + - apiGroups: ["storage.k8s.io"] + resources: ["storageclasses"] + verbs: ["get", "list", "watch"] + - apiGroups: ["snapshot.storage.k8s.io"] + resources: ["volumesnapshots"] + verbs: ["get", "list"] + - apiGroups: ["snapshot.storage.k8s.io"] + resources: ["volumesnapshotcontents"] + verbs: ["create", "get", "list", "watch", "update", "delete"] + - apiGroups: ["snapshot.storage.k8s.io"] + resources: ["volumesnapshotclasses"] + verbs: ["get", "list", "watch"] + - apiGroups: ["storage.k8s.io"] + resources: ["volumeattachments"] + verbs: ["get", "list", "watch", "update", "patch"] + - apiGroups: ["storage.k8s.io"] + resources: ["volumeattachments/status"] + verbs: ["patch"] + - apiGroups: ["storage.k8s.io"] + resources: ["csinodes"] + verbs: ["get", "list", "watch"] + - apiGroups: ["snapshot.storage.k8s.io"] + resources: ["volumesnapshotcontents/status"] + verbs: ["update"] + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + namespace: vitastor-system + name: vitastor-csi-provisioner-role +subjects: + - kind: ServiceAccount + name: vitastor-csi-provisioner + namespace: vitastor-system +roleRef: + kind: ClusterRole + name: vitastor-external-provisioner-runner + apiGroup: rbac.authorization.k8s.io + +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + namespace: vitastor-system + name: vitastor-external-provisioner-cfg +rules: + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "list", "watch", "create", "update", "delete"] + - apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["get", "watch", "list", "delete", "update", "create"] + +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vitastor-csi-provisioner-role-cfg + namespace: vitastor-system +subjects: + - kind: ServiceAccount + name: vitastor-csi-provisioner + namespace: vitastor-system +roleRef: + kind: Role + name: vitastor-external-provisioner-cfg + apiGroup: rbac.authorization.k8s.io diff --git a/csi/deploy/006-csi-provisioner-psp.yaml b/csi/deploy/006-csi-provisioner-psp.yaml new file mode 100644 index 00000000..7cb812c8 --- /dev/null +++ b/csi/deploy/006-csi-provisioner-psp.yaml @@ -0,0 +1,60 @@ +--- +apiVersion: policy/v1beta1 +kind: PodSecurityPolicy +metadata: + namespace: vitastor-system + name: vitastor-csi-provisioner-psp +spec: + allowPrivilegeEscalation: true + allowedCapabilities: + - 'SYS_ADMIN' + fsGroup: + rule: RunAsAny + privileged: true + runAsUser: + rule: RunAsAny + seLinux: + rule: RunAsAny + supplementalGroups: + rule: RunAsAny + volumes: + - 'configMap' + - 'emptyDir' + - 'projected' + - 'secret' + - 'downwardAPI' + - 'hostPath' + allowedHostPaths: + - pathPrefix: '/dev' + readOnly: false + - pathPrefix: '/sys' + readOnly: false + - pathPrefix: '/lib/modules' + readOnly: true + +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + namespace: vitastor-system + name: vitastor-csi-provisioner-psp +rules: + - apiGroups: ['policy'] + resources: ['podsecuritypolicies'] + verbs: ['use'] + resourceNames: ['vitastor-csi-provisioner-psp'] + +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vitastor-csi-provisioner-psp + namespace: vitastor-system +subjects: + - kind: ServiceAccount + name: vitastor-csi-provisioner + namespace: vitastor-system +roleRef: + kind: Role + name: vitastor-csi-provisioner-psp + apiGroup: rbac.authorization.k8s.io diff --git a/csi/deploy/007-csi-provisioner.yaml b/csi/deploy/007-csi-provisioner.yaml new file mode 100644 index 00000000..07a721ea --- /dev/null +++ b/csi/deploy/007-csi-provisioner.yaml @@ -0,0 +1,159 @@ +--- +kind: Service +apiVersion: v1 +metadata: + namespace: vitastor-system + name: csi-vitastor-provisioner + labels: + app: csi-metrics +spec: + selector: + app: csi-vitastor-provisioner + ports: + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8680 + +--- +kind: Deployment +apiVersion: apps/v1 +metadata: + namespace: vitastor-system + name: csi-vitastor-provisioner +spec: + replicas: 3 + selector: + matchLabels: + app: csi-vitastor-provisioner + template: + metadata: + namespace: vitastor-system + labels: + app: csi-vitastor-provisioner + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app + operator: In + values: + - csi-vitastor-provisioner + topologyKey: "kubernetes.io/hostname" + serviceAccountName: vitastor-csi-provisioner + priorityClassName: system-cluster-critical + containers: + - name: csi-provisioner + image: k8s.gcr.io/sig-storage/csi-provisioner:v2.2.0 + args: + - "--csi-address=$(ADDRESS)" + - "--v=5" + - "--timeout=150s" + - "--retry-interval-start=500ms" + - "--leader-election=true" + # set it to true to use topology based provisioning + - "--feature-gates=Topology=false" + # if fstype is not specified in storageclass, ext4 is default + - "--default-fstype=ext4" + - "--extra-create-metadata=true" + env: + - name: ADDRESS + value: unix:///csi/csi-provisioner.sock + imagePullPolicy: "IfNotPresent" + volumeMounts: + - name: socket-dir + mountPath: /csi + - name: csi-snapshotter + image: k8s.gcr.io/sig-storage/csi-snapshotter:v4.0.0 + args: + - "--csi-address=$(ADDRESS)" + - "--v=5" + - "--timeout=150s" + - "--leader-election=true" + env: + - name: ADDRESS + value: unix:///csi/csi-provisioner.sock + imagePullPolicy: "IfNotPresent" + securityContext: + privileged: true + volumeMounts: + - name: socket-dir + mountPath: /csi + - name: csi-attacher + image: k8s.gcr.io/sig-storage/csi-attacher:v3.1.0 + args: + - "--v=5" + - "--csi-address=$(ADDRESS)" + - "--leader-election=true" + - "--retry-interval-start=500ms" + env: + - name: ADDRESS + value: /csi/csi-provisioner.sock + imagePullPolicy: "IfNotPresent" + volumeMounts: + - name: socket-dir + mountPath: /csi + - name: csi-resizer + image: k8s.gcr.io/sig-storage/csi-resizer:v1.1.0 + args: + - "--csi-address=$(ADDRESS)" + - "--v=5" + - "--timeout=150s" + - "--leader-election" + - "--retry-interval-start=500ms" + - "--handle-volume-inuse-error=false" + env: + - name: ADDRESS + value: unix:///csi/csi-provisioner.sock + imagePullPolicy: "IfNotPresent" + volumeMounts: + - name: socket-dir + mountPath: /csi + - name: csi-vitastor + securityContext: + privileged: true + capabilities: + add: ["SYS_ADMIN"] + image: vitalif/vitastor-csi:v0.6.3 + args: + - "--node=$(NODE_ID)" + - "--endpoint=$(CSI_ENDPOINT)" + env: + - name: NODE_ID + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: CSI_ENDPOINT + value: unix:///csi/csi-provisioner.sock + imagePullPolicy: "IfNotPresent" + volumeMounts: + - name: socket-dir + mountPath: /csi + - mountPath: /dev + name: host-dev + - mountPath: /sys + name: host-sys + - mountPath: /lib/modules + name: lib-modules + readOnly: true + - name: vitastor-config + mountPath: /etc/vitastor + volumes: + - name: host-dev + hostPath: + path: /dev + - name: host-sys + hostPath: + path: /sys + - name: lib-modules + hostPath: + path: /lib/modules + - name: socket-dir + emptyDir: { + medium: "Memory" + } + - name: vitastor-config + configMap: + name: vitastor-config diff --git a/csi/deploy/008-csi-driver.yaml b/csi/deploy/008-csi-driver.yaml new file mode 100644 index 00000000..747283b4 --- /dev/null +++ b/csi/deploy/008-csi-driver.yaml @@ -0,0 +1,11 @@ +--- +# if Kubernetes version is less than 1.18 change +# apiVersion to storage.k8s.io/v1betav1 +apiVersion: storage.k8s.io/v1 +kind: CSIDriver +metadata: + namespace: vitastor-system + name: csi.vitastor.io +spec: + attachRequired: true + podInfoOnMount: false diff --git a/csi/deploy/example-pvc.yaml b/csi/deploy/example-pvc.yaml new file mode 100644 index 00000000..fc9c3171 --- /dev/null +++ b/csi/deploy/example-pvc.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: test-vitastor-pvc +spec: + storageClassName: vitastor-csi-storage-class + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi diff --git a/csi/deploy/storage-class.yaml b/csi/deploy/storage-class.yaml new file mode 100644 index 00000000..fcf46286 --- /dev/null +++ b/csi/deploy/storage-class.yaml @@ -0,0 +1,19 @@ +--- +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + namespace: vitastor-system + name: vitastor-csi-storage-class + annotations: + storageclass.kubernetes.io/is-default-class: "true" +provisioner: csi.vitastor.io +volumeBindingMode: Immediate +parameters: + etcdVolumePrefix: "" + poolId: "1" + # you can choose other configuration file if you have it in the config map + #configPath: "/etc/vitastor/vitastor.conf" + # you can also specify etcdUrl here, maybe to connect to another Vitastor cluster + # multiple etcdUrls may be specified, delimited by comma + #etcdUrl: "http://192.168.7.2:2379" + #etcdPrefix: "/vitastor" diff --git a/csi/go.mod b/csi/go.mod new file mode 100644 index 00000000..7c9143e3 --- /dev/null +++ b/csi/go.mod @@ -0,0 +1,35 @@ +module vitastor.io/csi + +go 1.15 + +require ( + github.com/container-storage-interface/spec v1.4.0 + github.com/coreos/bbolt v0.0.0-00010101000000-000000000000 // indirect + github.com/coreos/etcd v3.3.25+incompatible // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect + github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect + github.com/dustin/go-humanize v1.0.0 // indirect + github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b + github.com/gorilla/websocket v1.4.2 // indirect + github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect + github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect + github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect + github.com/jonboulle/clockwork v0.2.2 // indirect + github.com/kubernetes-csi/csi-lib-utils v0.9.1 + github.com/soheilhy/cmux v0.1.5 // indirect + github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect + github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect + go.etcd.io/bbolt v0.0.0-00010101000000-000000000000 // indirect + go.etcd.io/etcd v3.3.25+incompatible + golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb + google.golang.org/grpc v1.33.1 + k8s.io/klog v1.0.0 + k8s.io/utils v0.0.0-20210305010621-2afb4311ab10 +) + +replace github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.5 + +replace go.etcd.io/bbolt => github.com/coreos/bbolt v1.3.5 + +replace google.golang.org/grpc => google.golang.org/grpc v1.25.1 diff --git a/csi/src/config.go b/csi/src/config.go new file mode 100644 index 00000000..024afde1 --- /dev/null +++ b/csi/src/config.go @@ -0,0 +1,22 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +package vitastor + +const ( + vitastorCSIDriverName = "csi.vitastor.io" + vitastorCSIDriverVersion = "0.6.3" +) + +// Config struct fills the parameters of request or user input +type Config struct +{ + Endpoint string + NodeID string +} + +// NewConfig returns config struct to initialize new driver +func NewConfig() *Config +{ + return &Config{} +} diff --git a/csi/src/controllerserver.go b/csi/src/controllerserver.go new file mode 100644 index 00000000..78b199bf --- /dev/null +++ b/csi/src/controllerserver.go @@ -0,0 +1,530 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +package vitastor + +import ( + "context" + "encoding/json" + "strings" + "bytes" + "strconv" + "time" + "fmt" + "os" + "os/exec" + "io/ioutil" + + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" + "k8s.io/klog" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "go.etcd.io/etcd/clientv3" + + "github.com/container-storage-interface/spec/lib/go/csi" +) + +const ( + KB int64 = 1024 + MB int64 = 1024 * KB + GB int64 = 1024 * MB + TB int64 = 1024 * GB + ETCD_TIMEOUT time.Duration = 15*time.Second +) + +type InodeIndex struct +{ + Id uint64 `json:"id"` + PoolId uint64 `json:"pool_id"` +} + +type InodeConfig struct +{ + Name string `json:"name"` + Size uint64 `json:"size,omitempty"` + ParentPool uint64 `json:"parent_pool,omitempty"` + ParentId uint64 `json:"parent_id,omitempty"` + Readonly bool `json:"readonly,omitempty"` +} + +type ControllerServer struct +{ + *Driver +} + +// NewControllerServer create new instance controller +func NewControllerServer(driver *Driver) *ControllerServer +{ + return &ControllerServer{ + Driver: driver, + } +} + +func GetConnectionParams(params map[string]string) (map[string]string, []string, string) +{ + ctxVars := make(map[string]string) + configPath := params["configPath"] + if (configPath == "") + { + configPath = "/etc/vitastor/vitastor.conf" + } + else + { + ctxVars["configPath"] = configPath + } + config := make(map[string]interface{}) + if configFD, err := os.Open(configPath); err == nil + { + defer configFD.Close() + data, _ := ioutil.ReadAll(configFD) + json.Unmarshal(data, &config) + } + // Try to load prefix & etcd URL from the config + var etcdUrl []string + if (params["etcdUrl"] != "") + { + ctxVars["etcdUrl"] = params["etcdUrl"] + etcdUrl = strings.Split(params["etcdUrl"], ",") + } + if (len(etcdUrl) == 0) + { + switch config["etcd_address"].(type) + { + case string: + etcdUrl = strings.Split(config["etcd_address"].(string), ",") + case []string: + etcdUrl = config["etcd_address"].([]string) + } + } + etcdPrefix := params["etcdPrefix"] + if (etcdPrefix == "") + { + etcdPrefix, _ = config["etcd_prefix"].(string) + if (etcdPrefix == "") + { + etcdPrefix = "/vitastor" + } + } + else + { + ctxVars["etcdPrefix"] = etcdPrefix + } + return ctxVars, etcdUrl, etcdPrefix +} + +// Create the volume +func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) +{ + klog.Infof("received controller create volume request %+v", protosanitizer.StripSecrets(req)) + if (req == nil) + { + return nil, status.Errorf(codes.InvalidArgument, "request cannot be empty") + } + if (req.GetName() == "") + { + return nil, status.Error(codes.InvalidArgument, "name is a required field") + } + volumeCapabilities := req.GetVolumeCapabilities() + if (volumeCapabilities == nil) + { + return nil, status.Error(codes.InvalidArgument, "volume capabilities is a required field") + } + + etcdVolumePrefix := req.Parameters["etcdVolumePrefix"] + poolId, _ := strconv.ParseUint(req.Parameters["poolId"], 10, 64) + if (poolId == 0) + { + return nil, status.Error(codes.InvalidArgument, "poolId is missing in storage class configuration") + } + + volName := etcdVolumePrefix + req.GetName() + volSize := 1 * GB + if capRange := req.GetCapacityRange(); capRange != nil + { + volSize = ((capRange.GetRequiredBytes() + MB - 1) / MB) * MB + } + + // FIXME: The following should PROBABLY be implemented externally in a management tool + + ctxVars, etcdUrl, etcdPrefix := GetConnectionParams(req.Parameters) + if (len(etcdUrl) == 0) + { + return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf") + } + + // Connect to etcd + cli, err := clientv3.New(clientv3.Config{ + DialTimeout: ETCD_TIMEOUT, + Endpoints: etcdUrl, + }) + if (err != nil) + { + return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error()) + } + defer cli.Close() + + var imageId uint64 = 0 + for + { + // Check if the image exists + ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) + resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName) + cancel() + if (err != nil) + { + return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) + } + if (len(resp.Kvs) > 0) + { + kv := resp.Kvs[0] + var v InodeIndex + err := json.Unmarshal(kv.Value, &v) + if (err != nil) + { + return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error()) + } + poolId = v.PoolId + imageId = v.Id + inodeCfgKey := fmt.Sprintf("/config/inode/%d/%d", poolId, imageId) + ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) + resp, err := cli.Get(ctx, etcdPrefix+inodeCfgKey) + cancel() + if (err != nil) + { + return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) + } + if (len(resp.Kvs) == 0) + { + return nil, status.Error(codes.Internal, "missing "+inodeCfgKey+" key in etcd") + } + var inodeCfg InodeConfig + err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg) + if (err != nil) + { + return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error()) + } + if (inodeCfg.Size < uint64(volSize)) + { + return nil, status.Error(codes.Internal, "image "+volName+" is already created, but size is less than expected") + } + } + else + { + // Find a free ID + // Create image metadata in a transaction verifying that the image doesn't exist yet AND ID is still free + maxIdKey := fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId) + ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) + resp, err := cli.Get(ctx, maxIdKey) + cancel() + if (err != nil) + { + return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) + } + var modRev int64 + var nextId uint64 + if (len(resp.Kvs) > 0) + { + var err error + nextId, err = strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64) + if (err != nil) + { + return nil, status.Error(codes.Internal, maxIdKey+" contains invalid ID") + } + modRev = resp.Kvs[0].ModRevision + nextId++ + } + else + { + nextId = 1 + } + inodeIdxJson, _ := json.Marshal(InodeIndex{ + Id: nextId, + PoolId: poolId, + }) + inodeCfgJson, _ := json.Marshal(InodeConfig{ + Name: volName, + Size: uint64(volSize), + }) + ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT) + txnResp, err := cli.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId)), "=", modRev), + clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)), "=", 0), + clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId)), "=", 0), + ).Then( + clientv3.OpPut(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId), fmt.Sprintf("%d", nextId)), + clientv3.OpPut(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName), string(inodeIdxJson)), + clientv3.OpPut(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId), string(inodeCfgJson)), + ).Commit() + cancel() + if (err != nil) + { + return nil, status.Error(codes.Internal, "failed to commit transaction in etcd: "+err.Error()) + } + if (txnResp.Succeeded) + { + imageId = nextId + break + } + // Start over if the transaction fails + } + } + + ctxVars["name"] = volName + volumeIdJson, _ := json.Marshal(ctxVars) + return &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + // Ugly, but VolumeContext isn't passed to DeleteVolume :-( + VolumeId: string(volumeIdJson), + CapacityBytes: volSize, + }, + }, nil +} + +// DeleteVolume deletes the given volume +func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) +{ + klog.Infof("received controller delete volume request %+v", protosanitizer.StripSecrets(req)) + if (req == nil) + { + return nil, status.Error(codes.InvalidArgument, "request cannot be empty") + } + + ctxVars := make(map[string]string) + err := json.Unmarshal([]byte(req.VolumeId), &ctxVars) + if (err != nil) + { + return nil, status.Error(codes.Internal, "volume ID not in JSON format") + } + volName := ctxVars["name"] + + _, etcdUrl, etcdPrefix := GetConnectionParams(ctxVars) + if (len(etcdUrl) == 0) + { + return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf") + } + + cli, err := clientv3.New(clientv3.Config{ + DialTimeout: ETCD_TIMEOUT, + Endpoints: etcdUrl, + }) + if (err != nil) + { + return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error()) + } + defer cli.Close() + + // Find inode by name + ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) + resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName) + cancel() + if (err != nil) + { + return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) + } + if (len(resp.Kvs) == 0) + { + return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist") + } + var idx InodeIndex + err = json.Unmarshal(resp.Kvs[0].Value, &idx) + if (err != nil) + { + return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error()) + } + + // Get inode config + inodeCfgKey := fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id) + ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT) + resp, err = cli.Get(ctx, inodeCfgKey) + cancel() + if (err != nil) + { + return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) + } + if (len(resp.Kvs) == 0) + { + return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist") + } + var inodeCfg InodeConfig + err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg) + if (err != nil) + { + return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error()) + } + + // Delete inode data by invoking vitastor-rm + args := []string{ + "--etcd_address", strings.Join(etcdUrl, ","), + "--pool", fmt.Sprintf("%d", idx.PoolId), + "--inode", fmt.Sprintf("%d", idx.Id), + } + if (ctxVars["configPath"] != "") + { + args = append(args, "--config_path", ctxVars["configPath"]) + } + c := exec.Command("/usr/bin/vitastor-rm", args...) + var stderr bytes.Buffer + c.Stdout = nil + c.Stderr = &stderr + err = c.Run() + stderrStr := string(stderr.Bytes()) + if (err != nil) + { + klog.Errorf("vitastor-rm failed: %s, status %s\n", stderrStr, err) + return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")") + } + + // Delete inode config in etcd + ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT) + txnResp, err := cli.Txn(ctx).Then( + clientv3.OpDelete(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)), + clientv3.OpDelete(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id)), + ).Commit() + cancel() + if (err != nil) + { + return nil, status.Error(codes.Internal, "failed to delete keys in etcd: "+err.Error()) + } + if (!txnResp.Succeeded) + { + return nil, status.Error(codes.Internal, "failed to delete keys in etcd: transaction failed") + } + + return &csi.DeleteVolumeResponse{}, nil +} + +// ControllerPublishVolume return Unimplemented error +func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} + +// ControllerUnpublishVolume return Unimplemented error +func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} + +// ValidateVolumeCapabilities checks whether the volume capabilities requested are supported. +func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) +{ + klog.Infof("received controller validate volume capability request %+v", protosanitizer.StripSecrets(req)) + if (req == nil) + { + return nil, status.Errorf(codes.InvalidArgument, "request is nil") + } + volumeID := req.GetVolumeId() + if (volumeID == "") + { + return nil, status.Error(codes.InvalidArgument, "volumeId is nil") + } + volumeCapabilities := req.GetVolumeCapabilities() + if (volumeCapabilities == nil) + { + return nil, status.Error(codes.InvalidArgument, "volumeCapabilities is nil") + } + + var volumeCapabilityAccessModes []*csi.VolumeCapability_AccessMode + for _, mode := range []csi.VolumeCapability_AccessMode_Mode{ + csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + } { + volumeCapabilityAccessModes = append(volumeCapabilityAccessModes, &csi.VolumeCapability_AccessMode{Mode: mode}) + } + + capabilitySupport := false + for _, capability := range volumeCapabilities + { + for _, volumeCapabilityAccessMode := range volumeCapabilityAccessModes + { + if (volumeCapabilityAccessMode.Mode == capability.AccessMode.Mode) + { + capabilitySupport = true + } + } + } + + if (!capabilitySupport) + { + return nil, status.Errorf(codes.NotFound, "%v not supported", req.GetVolumeCapabilities()) + } + + return &csi.ValidateVolumeCapabilitiesResponse{ + Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{ + VolumeCapabilities: req.VolumeCapabilities, + }, + }, nil +} + +// ListVolumes returns a list of volumes +func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} + +// GetCapacity returns the capacity of the storage pool +func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} + +// ControllerGetCapabilities returns the capabilities of the controller service. +func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) +{ + functionControllerServerCapabilities := func(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability + { + return &csi.ControllerServiceCapability{ + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: cap, + }, + }, + } + } + + var controllerServerCapabilities []*csi.ControllerServiceCapability + for _, capability := range []csi.ControllerServiceCapability_RPC_Type{ + csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + csi.ControllerServiceCapability_RPC_LIST_VOLUMES, + csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, + csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, + } { + controllerServerCapabilities = append(controllerServerCapabilities, functionControllerServerCapabilities(capability)) + } + + return &csi.ControllerGetCapabilitiesResponse{ + Capabilities: controllerServerCapabilities, + }, nil +} + +// CreateSnapshot create snapshot of an existing PV +func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} + +// DeleteSnapshot delete provided snapshot of a PV +func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} + +// ListSnapshots list the snapshots of a PV +func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} + +// ControllerExpandVolume resizes a volume +func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} + +// ControllerGetVolume get volume info +func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} diff --git a/csi/src/grpc.go b/csi/src/grpc.go new file mode 100644 index 00000000..9db30e27 --- /dev/null +++ b/csi/src/grpc.go @@ -0,0 +1,137 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vitastor + +import ( + "fmt" + "net" + "os" + "strings" + "sync" + + "github.com/golang/glog" + "golang.org/x/net/context" + "google.golang.org/grpc" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" +) + +// Defines Non blocking GRPC server interfaces +type NonBlockingGRPCServer interface { + // Start services at the endpoint + Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) + // Waits for the service to stop + Wait() + // Stops the service gracefully + Stop() + // Stops the service forcefully + ForceStop() +} + +func NewNonBlockingGRPCServer() NonBlockingGRPCServer { + return &nonBlockingGRPCServer{} +} + +// NonBlocking server +type nonBlockingGRPCServer struct { + wg sync.WaitGroup + server *grpc.Server +} + +func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + + s.wg.Add(1) + + go s.serve(endpoint, ids, cs, ns) + + return +} + +func (s *nonBlockingGRPCServer) Wait() { + s.wg.Wait() +} + +func (s *nonBlockingGRPCServer) Stop() { + s.server.GracefulStop() +} + +func (s *nonBlockingGRPCServer) ForceStop() { + s.server.Stop() +} + +func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + + proto, addr, err := ParseEndpoint(endpoint) + if err != nil { + glog.Fatal(err.Error()) + } + + if proto == "unix" { + addr = "/" + addr + if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { + glog.Fatalf("Failed to remove %s, error: %s", addr, err.Error()) + } + } + + listener, err := net.Listen(proto, addr) + if err != nil { + glog.Fatalf("Failed to listen: %v", err) + } + + opts := []grpc.ServerOption{ + grpc.UnaryInterceptor(logGRPC), + } + server := grpc.NewServer(opts...) + s.server = server + + if ids != nil { + csi.RegisterIdentityServer(server, ids) + } + if cs != nil { + csi.RegisterControllerServer(server, cs) + } + if ns != nil { + csi.RegisterNodeServer(server, ns) + } + + glog.Infof("Listening for connections on address: %#v", listener.Addr()) + + server.Serve(listener) +} + +func ParseEndpoint(ep string) (string, string, error) { + if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") { + s := strings.SplitN(ep, "://", 2) + if s[1] != "" { + return s[0], s[1], nil + } + } + return "", "", fmt.Errorf("Invalid endpoint: %v", ep) +} + +func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + glog.V(3).Infof("GRPC call: %s", info.FullMethod) + glog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) + resp, err := handler(ctx, req) + if err != nil { + glog.Errorf("GRPC error: %v", err) + } else { + glog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(resp)) + } + return resp, err +} diff --git a/csi/src/identityserver.go b/csi/src/identityserver.go new file mode 100644 index 00000000..b1050411 --- /dev/null +++ b/csi/src/identityserver.go @@ -0,0 +1,60 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +package vitastor + +import ( + "context" + + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" + "k8s.io/klog" + + "github.com/container-storage-interface/spec/lib/go/csi" +) + +// IdentityServer struct of Vitastor CSI driver with supported methods of CSI identity server spec. +type IdentityServer struct +{ + *Driver +} + +// NewIdentityServer create new instance identity +func NewIdentityServer(driver *Driver) *IdentityServer +{ + return &IdentityServer{ + Driver: driver, + } +} + +// GetPluginInfo returns metadata of the plugin +func (is *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) +{ + klog.Infof("received identity plugin info request %+v", protosanitizer.StripSecrets(req)) + return &csi.GetPluginInfoResponse{ + Name: vitastorCSIDriverName, + VendorVersion: vitastorCSIDriverVersion, + }, nil +} + +// GetPluginCapabilities returns available capabilities of the plugin +func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) +{ + klog.Infof("received identity plugin capabilities request %+v", protosanitizer.StripSecrets(req)) + return &csi.GetPluginCapabilitiesResponse{ + Capabilities: []*csi.PluginCapability{ + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_CONTROLLER_SERVICE, + }, + }, + }, + }, + }, nil +} + +// Probe returns the health and readiness of the plugin +func (is *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) +{ + return &csi.ProbeResponse{}, nil +} diff --git a/csi/src/nodeserver.go b/csi/src/nodeserver.go new file mode 100644 index 00000000..01fa526c --- /dev/null +++ b/csi/src/nodeserver.go @@ -0,0 +1,279 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +package vitastor + +import ( + "context" + "os" + "os/exec" + "encoding/json" + "strings" + "bytes" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/utils/mount" + utilexec "k8s.io/utils/exec" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" + "k8s.io/klog" +) + +// NodeServer struct of Vitastor CSI driver with supported methods of CSI node server spec. +type NodeServer struct +{ + *Driver + mounter mount.Interface +} + +// NewNodeServer create new instance node +func NewNodeServer(driver *Driver) *NodeServer +{ + return &NodeServer{ + Driver: driver, + mounter: mount.New(""), + } +} + +// NodeStageVolume mounts the volume to a staging path on the node. +func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) +{ + return &csi.NodeStageVolumeResponse{}, nil +} + +// NodeUnstageVolume unstages the volume from the staging path +func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) +{ + return &csi.NodeUnstageVolumeResponse{}, nil +} + +func Contains(list []string, s string) bool +{ + for i := 0; i < len(list); i++ + { + if (list[i] == s) + { + return true + } + } + return false +} + +// NodePublishVolume mounts the volume mounted to the staging path to the target path +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) +{ + klog.Infof("received node publish volume request %+v", protosanitizer.StripSecrets(req)) + + targetPath := req.GetTargetPath() + + // Check that it's not already mounted + free, error := mount.IsNotMountPoint(ns.mounter, targetPath) + if (error != nil) + { + if (os.IsNotExist(error)) + { + error := os.MkdirAll(targetPath, 0777) + if (error != nil) + { + return nil, status.Error(codes.Internal, error.Error()) + } + free = true + } + else + { + return nil, status.Error(codes.Internal, error.Error()) + } + } + if (!free) + { + return &csi.NodePublishVolumeResponse{}, nil + } + + ctxVars := make(map[string]string) + err := json.Unmarshal([]byte(req.VolumeId), &ctxVars) + if (err != nil) + { + return nil, status.Error(codes.Internal, "volume ID not in JSON format") + } + volName := ctxVars["name"] + + _, etcdUrl, etcdPrefix := GetConnectionParams(ctxVars) + if (len(etcdUrl) == 0) + { + return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf") + } + + // Map NBD device + // FIXME: Check if already mapped + args := []string{ + "map", "--etcd_address", strings.Join(etcdUrl, ","), + "--etcd_prefix", etcdPrefix, + "--image", volName, + }; + if (ctxVars["configPath"] != "") + { + args = append(args, "--config_path", ctxVars["configPath"]) + } + if (req.GetReadonly()) + { + args = append(args, "--readonly", "1") + } + c := exec.Command("/usr/bin/vitastor-nbd", args...) + var stdout, stderr bytes.Buffer + c.Stdout, c.Stderr = &stdout, &stderr + err = c.Run() + stdoutStr, stderrStr := string(stdout.Bytes()), string(stderr.Bytes()) + if (err != nil) + { + klog.Errorf("vitastor-nbd map failed: %s, status %s\n", stdoutStr+stderrStr, err) + return nil, status.Error(codes.Internal, stdoutStr+stderrStr+" (status "+err.Error()+")") + } + devicePath := strings.TrimSpace(stdoutStr) + + // Check existing format + diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: utilexec.New()} + existingFormat, err := diskMounter.GetDiskFormat(devicePath) + if (err != nil) + { + klog.Errorf("failed to get disk format for path %s, error: %v", err) + // unmap NBD device + unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput() + if (unmapErr != nil) + { + klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr) + } + return nil, err + } + + // Format the device (ext4 or xfs) + fsType := req.GetVolumeCapability().GetMount().GetFsType() + isBlock := req.GetVolumeCapability().GetBlock() != nil + opt := req.GetVolumeCapability().GetMount().GetMountFlags() + opt = append(opt, "_netdev") + if ((req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY || + req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY) && + !Contains(opt, "ro")) + { + opt = append(opt, "ro") + } + if (fsType == "xfs") + { + opt = append(opt, "nouuid") + } + readOnly := Contains(opt, "ro") + if (existingFormat == "" && !readOnly) + { + args := []string{} + switch fsType + { + case "ext4": + args = []string{"-m0", "-Enodiscard,lazy_itable_init=1,lazy_journal_init=1", devicePath} + case "xfs": + args = []string{"-K", devicePath} + } + if (len(args) > 0) + { + cmdOut, cmdErr := diskMounter.Exec.Command("mkfs."+fsType, args...).CombinedOutput() + if (cmdErr != nil) + { + klog.Errorf("failed to run mkfs error: %v, output: %v", cmdErr, string(cmdOut)) + // unmap NBD device + unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput() + if (unmapErr != nil) + { + klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr) + } + return nil, status.Error(codes.Internal, cmdErr.Error()) + } + } + } + if (isBlock) + { + opt = append(opt, "bind") + err = diskMounter.Mount(devicePath, targetPath, fsType, opt) + } + else + { + err = diskMounter.FormatAndMount(devicePath, targetPath, fsType, opt) + } + if (err != nil) + { + klog.Errorf( + "failed to mount device path (%s) to path (%s) for volume (%s) error: %s", + devicePath, targetPath, volName, err, + ) + // unmap NBD device + unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput() + if (unmapErr != nil) + { + klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr) + } + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.NodePublishVolumeResponse{}, nil +} + +// NodeUnpublishVolume unmounts the volume from the target path +func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) +{ + klog.Infof("received node unpublish volume request %+v", protosanitizer.StripSecrets(req)) + targetPath := req.GetTargetPath() + devicePath, refCount, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath) + if (err != nil) + { + if (os.IsNotExist(err)) + { + return nil, status.Error(codes.NotFound, "Target path not found") + } + return nil, status.Error(codes.Internal, err.Error()) + } + if (devicePath == "") + { + return nil, status.Error(codes.NotFound, "Volume not mounted") + } + // unmount + err = mount.CleanupMountPoint(targetPath, ns.mounter, false) + if (err != nil) + { + return nil, status.Error(codes.Internal, err.Error()) + } + // unmap NBD device + if (refCount == 1) + { + unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput() + if (unmapErr != nil) + { + klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr) + } + } + return &csi.NodeUnpublishVolumeResponse{}, nil +} + +// NodeGetVolumeStats returns volume capacity statistics available for the volume +func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} + +// NodeExpandVolume expanding the file system on the node +func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) +{ + return nil, status.Error(codes.Unimplemented, "") +} + +// NodeGetCapabilities returns the supported capabilities of the node server +func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) +{ + return &csi.NodeGetCapabilitiesResponse{}, nil +} + +// NodeGetInfo returns NodeGetInfoResponse for CO. +func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) +{ + klog.Infof("received node get info request %+v", protosanitizer.StripSecrets(req)) + return &csi.NodeGetInfoResponse{ + NodeId: ns.NodeID, + }, nil +} diff --git a/csi/src/server.go b/csi/src/server.go new file mode 100644 index 00000000..b097184e --- /dev/null +++ b/csi/src/server.go @@ -0,0 +1,36 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +package vitastor + +import ( + "k8s.io/klog" +) + +type Driver struct +{ + *Config +} + +// NewDriver create new instance driver +func NewDriver(config *Config) (*Driver, error) +{ + if (config == nil) + { + klog.Errorf("Vitastor CSI driver initialization failed") + return nil, nil + } + driver := &Driver{ + Config: config, + } + klog.Infof("Vitastor CSI driver initialized") + return driver, nil +} + +// Start server +func (driver *Driver) Run() +{ + server := NewNonBlockingGRPCServer() + server.Start(driver.Endpoint, NewIdentityServer(driver), NewControllerServer(driver), NewNodeServer(driver)) + server.Wait() +} diff --git a/csi/vitastor-csi.go b/csi/vitastor-csi.go new file mode 100644 index 00000000..a160ac19 --- /dev/null +++ b/csi/vitastor-csi.go @@ -0,0 +1,39 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +package main + +import ( + "flag" + "fmt" + "os" + "k8s.io/klog" + "vitastor.io/csi/src" +) + +func main() +{ + var config = vitastor.NewConfig() + flag.StringVar(&config.Endpoint, "endpoint", "", "CSI endpoint") + flag.StringVar(&config.NodeID, "node", "", "Node ID") + flag.Parse() + if (config.Endpoint == "") + { + config.Endpoint = os.Getenv("CSI_ENDPOINT") + } + if (config.NodeID == "") + { + config.NodeID = os.Getenv("NODE_ID") + } + if (config.Endpoint == "" && config.NodeID == "") + { + fmt.Fprintf(os.Stderr, "Please set -endpoint and -node / CSI_ENDPOINT & NODE_ID env vars\n") + os.Exit(1) + } + drv, err := vitastor.NewDriver(config) + if (err != nil) + { + klog.Fatalln(err) + } + drv.Run() +} From 10ee4f7c1d8dbdfa45f1ff78baf4ca5f4d56fe07 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 16 May 2021 01:37:51 +0300 Subject: [PATCH 8/9] Add notes about CSI to README --- README-ru.md | 21 +++++++++++++++---- README.md | 21 +++++++++++++++---- ...rage-class.yaml => 009-storage-class.yaml} | 2 +- csi/deploy/example-pvc.yaml | 2 +- 4 files changed, 36 insertions(+), 10 deletions(-) rename csi/deploy/{storage-class.yaml => 009-storage-class.yaml} (94%) diff --git a/README-ru.md b/README-ru.md index a18db93e..5824f4e9 100644 --- a/README-ru.md +++ b/README-ru.md @@ -22,7 +22,6 @@ Vitastor на данный момент находится в статусе п Однако следующее уже реализовано: -0.5.x (стабильная версия): - Базовая часть - надёжное кластерное блочное хранилище без единой точки отказа - Производительность ;-D - Несколько схем отказоустойчивости: репликация, XOR n+1 (1 диск чётности), коды коррекции ошибок @@ -43,19 +42,18 @@ Vitastor на данный момент находится в статусе п - NBD-прокси для монтирования образов ядром ("блочное устройство в режиме пользователя") - Утилита удаления образов/инодов (vitastor-rm) - Пакеты для Debian и CentOS - -0.6.x (master-ветка): - Статистика операций ввода/вывода и занятого места в разрезе инодов - Именование инодов через хранение их метаданных в etcd - Снапшоты и copy-on-write клоны - Сглаживание производительности случайной записи в SSD+HDD конфигурациях - Поддержка RDMA/RoCEv2 через libibverbs +- CSI-плагин для Kubernetes ## Планы развития - Более корректные скрипты разметки дисков и автоматического запуска OSD - Другие инструменты администрирования -- Плагины для OpenStack, Kubernetes, OpenNebula, Proxmox и других облачных систем +- Плагины для OpenStack, OpenNebula, Proxmox и других облачных систем - iSCSI-прокси - Более быстрое переключение при отказах - Фоновая проверка целостности без контрольных сумм (сверка реплик) @@ -511,6 +509,21 @@ vitastor-nbd map --etcd_address 10.115.0.10:2379/v3 --image testimg Для обращения по номеру инода, аналогично другим командам, можно использовать опции `--pool --inode --size ` вместо `--image testimg`. +### Kubernetes + +У Vitastor есть CSI-плагин для Kubernetes, поддерживающий RWO-тома. + +Для установки возьмите манифесты из директории [csi/deploy/](csi/deploy/), поместите +вашу конфигурацию подключения к Vitastor в [csi/deploy/001-csi-config-map.yaml](001-csi-config-map.yaml), +настройте StorageClass в [csi/deploy/009-storage-class.yaml](009-storage-class.yaml) +и примените все `NNN-*.yaml` к вашей инсталляции Kubernetes. + +``` +for i in ./???-*.yaml; do kubectl apply -f $i; done +``` + +После этого вы сможете создавать PersistentVolume. Пример смотрите в файле [csi/deploy/example-pvc.yaml](csi/deploy/example-pvc.yaml). + ## Известные проблемы - Запросы удаления объектов могут в данный момент приводить к "неполным" объектам в EC-пулах, diff --git a/README.md b/README.md index 66f0c06d..c3ab9e2a 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,6 @@ with configurable redundancy (replication or erasure codes/XOR). Vitastor is currently a pre-release, a lot of features are missing and you can still expect breaking changes in the future. However, the following is implemented: -0.5.x (stable): - Basic part: highly-available block storage with symmetric clustering and no SPOF - Performance ;-D - Multiple redundancy schemes: Replication, XOR n+1, Reed-Solomon erasure codes @@ -37,19 +36,18 @@ breaking changes in the future. However, the following is implemented: - NBD proxy for kernel mounts - Inode removal tool (vitastor-rm) - Packaging for Debian and CentOS - -0.6.x (master): - Per-inode I/O and space usage statistics - Inode metadata storage in etcd - Snapshots and copy-on-write image clones - Write throttling to smooth random write workloads in SSD+HDD configurations - RDMA/RoCEv2 support via libibverbs +- CSI plugin for Kubernetes ## Roadmap - Better OSD creation and auto-start tools - Other administrative tools -- Plugins for OpenStack, Kubernetes, OpenNebula, Proxmox and other cloud systems +- Plugins for OpenStack, OpenNebula, Proxmox and other cloud systems - iSCSI proxy - Faster failover - Scrubbing without checksums (verification of replicas) @@ -461,6 +459,21 @@ It will output the device name, like /dev/nbd0 which you can then format and mou Again, you can use `--pool --inode --size ` insteaf of `--image ` if you want. +### Kubernetes + +Vitastor has a CSI plugin for Kubernetes which supports RWO volumes. + +To deploy it, take manifests from [csi/deploy/](csi/deploy/) directory, put your +Vitastor configuration in [csi/deploy/001-csi-config-map.yaml](001-csi-config-map.yaml), +configure storage class in [csi/deploy/009-storage-class.yaml](009-storage-class.yaml) +and apply all `NNN-*.yaml` manifests to your Kubernetes installation: + +``` +for i in ./???-*.yaml; do kubectl apply -f $i; done +``` + +After that you'll be able to create PersistentVolumes. See example in [csi/deploy/example-pvc.yaml](csi/deploy/example-pvc.yaml). + ## Known Problems - Object deletion requests may currently lead to 'incomplete' objects in EC pools diff --git a/csi/deploy/storage-class.yaml b/csi/deploy/009-storage-class.yaml similarity index 94% rename from csi/deploy/storage-class.yaml rename to csi/deploy/009-storage-class.yaml index fcf46286..77749766 100644 --- a/csi/deploy/storage-class.yaml +++ b/csi/deploy/009-storage-class.yaml @@ -3,7 +3,7 @@ apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: namespace: vitastor-system - name: vitastor-csi-storage-class + name: vitastor annotations: storageclass.kubernetes.io/is-default-class: "true" provisioner: csi.vitastor.io diff --git a/csi/deploy/example-pvc.yaml b/csi/deploy/example-pvc.yaml index fc9c3171..ba00729c 100644 --- a/csi/deploy/example-pvc.yaml +++ b/csi/deploy/example-pvc.yaml @@ -4,7 +4,7 @@ kind: PersistentVolumeClaim metadata: name: test-vitastor-pvc spec: - storageClassName: vitastor-csi-storage-class + storageClassName: vitastor accessModes: - ReadWriteOnce resources: From f9fe72d40acbcba116ec844599f3da5dd806f214 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 16 May 2021 01:17:54 +0300 Subject: [PATCH 9/9] Release 0.6.4 - Implement a basic Kubernetes CSI driver - Minor fixes for vitastor-nbd - Fix build without RDMA broken in 0.6.3 --- CMakeLists.txt | 2 +- csi/Makefile | 2 +- csi/deploy/004-csi-nodeplugin.yaml | 2 +- csi/deploy/007-csi-provisioner.yaml | 2 +- csi/src/config.go | 2 +- debian/changelog | 2 +- debian/vitastor.Dockerfile | 12 ++++++------ rpm/build-tarball.sh | 2 +- rpm/vitastor-el7.Dockerfile | 2 +- rpm/vitastor-el7.spec | 4 ++-- rpm/vitastor-el8.Dockerfile | 2 +- rpm/vitastor-el8.spec | 4 ++-- src/CMakeLists.txt | 2 +- 13 files changed, 20 insertions(+), 20 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 63adc707..3edbb140 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,6 +2,6 @@ cmake_minimum_required(VERSION 2.8) project(vitastor) -set(VERSION "0.6.3") +set(VERSION "0.6.4") add_subdirectory(src) diff --git a/csi/Makefile b/csi/Makefile index 726f71c4..0d017ad6 100644 --- a/csi/Makefile +++ b/csi/Makefile @@ -1,4 +1,4 @@ -VERSION ?= v0.6.3 +VERSION ?= v0.6.4 all: build push diff --git a/csi/deploy/004-csi-nodeplugin.yaml b/csi/deploy/004-csi-nodeplugin.yaml index 5dcf2cc6..78b8af78 100644 --- a/csi/deploy/004-csi-nodeplugin.yaml +++ b/csi/deploy/004-csi-nodeplugin.yaml @@ -49,7 +49,7 @@ spec: capabilities: add: ["SYS_ADMIN"] allowPrivilegeEscalation: true - image: vitalif/vitastor-csi:v0.6.3 + image: vitalif/vitastor-csi:v0.6.4 args: - "--node=$(NODE_ID)" - "--endpoint=$(CSI_ENDPOINT)" diff --git a/csi/deploy/007-csi-provisioner.yaml b/csi/deploy/007-csi-provisioner.yaml index 07a721ea..65786518 100644 --- a/csi/deploy/007-csi-provisioner.yaml +++ b/csi/deploy/007-csi-provisioner.yaml @@ -116,7 +116,7 @@ spec: privileged: true capabilities: add: ["SYS_ADMIN"] - image: vitalif/vitastor-csi:v0.6.3 + image: vitalif/vitastor-csi:v0.6.4 args: - "--node=$(NODE_ID)" - "--endpoint=$(CSI_ENDPOINT)" diff --git a/csi/src/config.go b/csi/src/config.go index 024afde1..cb56a43e 100644 --- a/csi/src/config.go +++ b/csi/src/config.go @@ -5,7 +5,7 @@ package vitastor const ( vitastorCSIDriverName = "csi.vitastor.io" - vitastorCSIDriverVersion = "0.6.3" + vitastorCSIDriverVersion = "0.6.4" ) // Config struct fills the parameters of request or user input diff --git a/debian/changelog b/debian/changelog index 73df54df..4c84a0c6 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,4 +1,4 @@ -vitastor (0.6.3-1) unstable; urgency=medium +vitastor (0.6.4-1) unstable; urgency=medium * RDMA support * Bugfixes diff --git a/debian/vitastor.Dockerfile b/debian/vitastor.Dockerfile index bea70ae4..7f224bb7 100644 --- a/debian/vitastor.Dockerfile +++ b/debian/vitastor.Dockerfile @@ -40,10 +40,10 @@ RUN set -e -x; \ mkdir -p /root/packages/vitastor-$REL; \ rm -rf /root/packages/vitastor-$REL/*; \ cd /root/packages/vitastor-$REL; \ - cp -r /root/vitastor vitastor-0.6.3; \ - ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.6.3/qemu; \ - ln -s /root/fio-build/fio-*/ vitastor-0.6.3/fio; \ - cd vitastor-0.6.3; \ + cp -r /root/vitastor vitastor-0.6.4; \ + ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.6.4/qemu; \ + ln -s /root/fio-build/fio-*/ vitastor-0.6.4/fio; \ + cd vitastor-0.6.4; \ FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \ QEMU=$(head -n1 qemu/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \ sh copy-qemu-includes.sh; \ @@ -59,8 +59,8 @@ RUN set -e -x; \ echo "dep:fio=$FIO" > debian/substvars; \ echo "dep:qemu=$QEMU" >> debian/substvars; \ cd /root/packages/vitastor-$REL; \ - tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.3.orig.tar.xz vitastor-0.6.3; \ - cd vitastor-0.6.3; \ + tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.4.orig.tar.xz vitastor-0.6.4; \ + cd vitastor-0.6.4; \ V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \ DEBFULLNAME="Vitaliy Filippov " dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \ DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \ diff --git a/rpm/build-tarball.sh b/rpm/build-tarball.sh index 4d36500a..3074b6ee 100755 --- a/rpm/build-tarball.sh +++ b/rpm/build-tarball.sh @@ -48,4 +48,4 @@ FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Ve QEMU=`rpm -qi qemu qemu-kvm | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'` perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec perl -i -pe 's/(Requires:\s*qemu(?:-kvm)?)([^\n]+)?/$1 = '$QEMU'/' $VITASTOR/rpm/vitastor-el$EL.spec -tar --transform 's#^#vitastor-0.6.3/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.3$(rpm --eval '%dist').tar.gz * +tar --transform 's#^#vitastor-0.6.4/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.4$(rpm --eval '%dist').tar.gz * diff --git a/rpm/vitastor-el7.Dockerfile b/rpm/vitastor-el7.Dockerfile index bd88fee4..6df2805c 100644 --- a/rpm/vitastor-el7.Dockerfile +++ b/rpm/vitastor-el7.Dockerfile @@ -38,7 +38,7 @@ ADD . /root/vitastor RUN set -e; \ cd /root/vitastor/rpm; \ sh build-tarball.sh; \ - cp /root/vitastor-0.6.3.el7.tar.gz ~/rpmbuild/SOURCES; \ + cp /root/vitastor-0.6.4.el7.tar.gz ~/rpmbuild/SOURCES; \ cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \ cd ~/rpmbuild/SPECS/; \ rpmbuild -ba vitastor.spec; \ diff --git a/rpm/vitastor-el7.spec b/rpm/vitastor-el7.spec index 6d1e6532..3b0fa823 100644 --- a/rpm/vitastor-el7.spec +++ b/rpm/vitastor-el7.spec @@ -1,11 +1,11 @@ Name: vitastor -Version: 0.6.3 +Version: 0.6.4 Release: 1%{?dist} Summary: Vitastor, a fast software-defined clustered block storage License: Vitastor Network Public License 1.1 URL: https://vitastor.io/ -Source0: vitastor-0.6.3.el7.tar.gz +Source0: vitastor-0.6.4.el7.tar.gz BuildRequires: liburing-devel >= 0.6 BuildRequires: gperftools-devel diff --git a/rpm/vitastor-el8.Dockerfile b/rpm/vitastor-el8.Dockerfile index 8c1712ac..ce519336 100644 --- a/rpm/vitastor-el8.Dockerfile +++ b/rpm/vitastor-el8.Dockerfile @@ -36,7 +36,7 @@ ADD . /root/vitastor RUN set -e; \ cd /root/vitastor/rpm; \ sh build-tarball.sh; \ - cp /root/vitastor-0.6.3.el8.tar.gz ~/rpmbuild/SOURCES; \ + cp /root/vitastor-0.6.4.el8.tar.gz ~/rpmbuild/SOURCES; \ cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \ cd ~/rpmbuild/SPECS/; \ rpmbuild -ba vitastor.spec; \ diff --git a/rpm/vitastor-el8.spec b/rpm/vitastor-el8.spec index f0e5ec54..106b1ab1 100644 --- a/rpm/vitastor-el8.spec +++ b/rpm/vitastor-el8.spec @@ -1,11 +1,11 @@ Name: vitastor -Version: 0.6.3 +Version: 0.6.4 Release: 1%{?dist} Summary: Vitastor, a fast software-defined clustered block storage License: Vitastor Network Public License 1.1 URL: https://vitastor.io/ -Source0: vitastor-0.6.3.el8.tar.gz +Source0: vitastor-0.6.4.el8.tar.gz BuildRequires: liburing-devel >= 0.6 BuildRequires: gperftools-devel diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b6cad35c..cc5d4e58 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -13,7 +13,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$") set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}") endif() -add_definitions(-DVERSION="0.6.3") +add_definitions(-DVERSION="0.6.4") add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -I ${CMAKE_SOURCE_DIR}/src) if (${WITH_ASAN}) add_definitions(-fsanitize=address -fno-omit-frame-pointer)