forked from vitalif/vitastor
Compare commits
1 Commits
master
...
csi-use-vi
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 3089b4b800 |
|
@ -10,7 +10,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
@ -21,8 +20,6 @@ import (
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
"go.etcd.io/etcd/clientv3"
|
|
||||||
|
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -114,6 +111,34 @@ func GetConnectionParams(params map[string]string) (map[string]string, []string,
|
||||||
return ctxVars, etcdUrl, etcdPrefix
|
return ctxVars, etcdUrl, etcdPrefix
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func invokeCLI(ctxVars map[string]string, args []string) ([]byte, error)
|
||||||
|
{
|
||||||
|
if (ctxVars["etcdUrl"] != "")
|
||||||
|
{
|
||||||
|
args = append(args, "--etcd_address", ctxVars["etcdUrl"])
|
||||||
|
}
|
||||||
|
if (ctxVars["etcdPrefix"] != "")
|
||||||
|
{
|
||||||
|
args = append(args, "--etcd_prefix", ctxVars["etcdPrefix"])
|
||||||
|
}
|
||||||
|
if (ctxVars["configPath"] != "")
|
||||||
|
{
|
||||||
|
args = append(args, "--config_path", ctxVars["configPath"])
|
||||||
|
}
|
||||||
|
c := exec.Command("/usr/bin/vitastor-cli", args...)
|
||||||
|
var stdout, stderr bytes.Buffer
|
||||||
|
c.Stdout = &stdout
|
||||||
|
c.Stderr = &stderr
|
||||||
|
err := c.Run()
|
||||||
|
stderrStr := string(stderr.Bytes())
|
||||||
|
if (err != nil)
|
||||||
|
{
|
||||||
|
klog.Errorf("vitastor-cli %s failed: %s, status %s\n", strings.Join(args, " "), stderrStr, err)
|
||||||
|
return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")")
|
||||||
|
}
|
||||||
|
return stdout.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Create the volume
|
// Create the volume
|
||||||
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error)
|
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error)
|
||||||
{
|
{
|
||||||
|
@ -146,128 +171,41 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||||
volSize = ((capRange.GetRequiredBytes() + MB - 1) / MB) * MB
|
volSize = ((capRange.GetRequiredBytes() + MB - 1) / MB) * MB
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: The following should PROBABLY be implemented externally in a management tool
|
ctxVars, etcdUrl, _ := GetConnectionParams(req.Parameters)
|
||||||
|
|
||||||
ctxVars, etcdUrl, etcdPrefix := GetConnectionParams(req.Parameters)
|
|
||||||
if (len(etcdUrl) == 0)
|
if (len(etcdUrl) == 0)
|
||||||
{
|
{
|
||||||
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf")
|
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect to etcd
|
// Create image using vitastor-cli
|
||||||
cli, err := clientv3.New(clientv3.Config{
|
_, err := invokeCLI(ctxVars, []string{ "create", volName, "-s", string(volSize), "--pool", string(poolId) })
|
||||||
DialTimeout: ETCD_TIMEOUT,
|
|
||||||
Endpoints: etcdUrl,
|
|
||||||
})
|
|
||||||
if (err != nil)
|
if (err != nil)
|
||||||
{
|
{
|
||||||
return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error())
|
if (strings.Index(err.Error(), "already exists") > 0)
|
||||||
}
|
|
||||||
defer cli.Close()
|
|
||||||
|
|
||||||
var imageId uint64 = 0
|
|
||||||
for
|
|
||||||
{
|
{
|
||||||
// Check if the image exists
|
stat, err := invokeCLI(ctxVars, []string{ "ls", "--json", volName })
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
|
||||||
resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName)
|
|
||||||
cancel()
|
|
||||||
if (err != nil)
|
if (err != nil)
|
||||||
{
|
{
|
||||||
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
|
return nil, err
|
||||||
}
|
}
|
||||||
if (len(resp.Kvs) > 0)
|
var inodeCfg []InodeConfig
|
||||||
{
|
err = json.Unmarshal(stat, &inodeCfg)
|
||||||
kv := resp.Kvs[0]
|
|
||||||
var v InodeIndex
|
|
||||||
err := json.Unmarshal(kv.Value, &v)
|
|
||||||
if (err != nil)
|
if (err != nil)
|
||||||
{
|
{
|
||||||
return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error())
|
return nil, status.Error(codes.Internal, "Invalid JSON in vitastor-cli ls: "+err.Error())
|
||||||
}
|
}
|
||||||
poolId = v.PoolId
|
if (len(inodeCfg) == 0)
|
||||||
imageId = v.Id
|
|
||||||
inodeCfgKey := fmt.Sprintf("/config/inode/%d/%d", poolId, imageId)
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
|
||||||
resp, err := cli.Get(ctx, etcdPrefix+inodeCfgKey)
|
|
||||||
cancel()
|
|
||||||
if (err != nil)
|
|
||||||
{
|
{
|
||||||
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
|
return nil, status.Error(codes.Internal, "vitastor-cli create said that image already exists, but ls can't find it")
|
||||||
}
|
}
|
||||||
if (len(resp.Kvs) == 0)
|
if (inodeCfg[0].Size < uint64(volSize))
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, "missing "+inodeCfgKey+" key in etcd")
|
|
||||||
}
|
|
||||||
var inodeCfg InodeConfig
|
|
||||||
err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg)
|
|
||||||
if (err != nil)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error())
|
|
||||||
}
|
|
||||||
if (inodeCfg.Size < uint64(volSize))
|
|
||||||
{
|
{
|
||||||
return nil, status.Error(codes.Internal, "image "+volName+" is already created, but size is less than expected")
|
return nil, status.Error(codes.Internal, "image "+volName+" is already created, but size is less than expected")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Find a free ID
|
return nil, err
|
||||||
// Create image metadata in a transaction verifying that the image doesn't exist yet AND ID is still free
|
|
||||||
maxIdKey := fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId)
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
|
||||||
resp, err := cli.Get(ctx, maxIdKey)
|
|
||||||
cancel()
|
|
||||||
if (err != nil)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
|
|
||||||
}
|
|
||||||
var modRev int64
|
|
||||||
var nextId uint64
|
|
||||||
if (len(resp.Kvs) > 0)
|
|
||||||
{
|
|
||||||
var err error
|
|
||||||
nextId, err = strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64)
|
|
||||||
if (err != nil)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, maxIdKey+" contains invalid ID")
|
|
||||||
}
|
|
||||||
modRev = resp.Kvs[0].ModRevision
|
|
||||||
nextId++
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
nextId = 1
|
|
||||||
}
|
|
||||||
inodeIdxJson, _ := json.Marshal(InodeIndex{
|
|
||||||
Id: nextId,
|
|
||||||
PoolId: poolId,
|
|
||||||
})
|
|
||||||
inodeCfgJson, _ := json.Marshal(InodeConfig{
|
|
||||||
Name: volName,
|
|
||||||
Size: uint64(volSize),
|
|
||||||
})
|
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
|
||||||
txnResp, err := cli.Txn(ctx).If(
|
|
||||||
clientv3.Compare(clientv3.ModRevision(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId)), "=", modRev),
|
|
||||||
clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)), "=", 0),
|
|
||||||
clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId)), "=", 0),
|
|
||||||
).Then(
|
|
||||||
clientv3.OpPut(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId), fmt.Sprintf("%d", nextId)),
|
|
||||||
clientv3.OpPut(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName), string(inodeIdxJson)),
|
|
||||||
clientv3.OpPut(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId), string(inodeCfgJson)),
|
|
||||||
).Commit()
|
|
||||||
cancel()
|
|
||||||
if (err != nil)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, "failed to commit transaction in etcd: "+err.Error())
|
|
||||||
}
|
|
||||||
if (txnResp.Succeeded)
|
|
||||||
{
|
|
||||||
imageId = nextId
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// Start over if the transaction fails
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,97 +237,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||||
}
|
}
|
||||||
volName := ctxVars["name"]
|
volName := ctxVars["name"]
|
||||||
|
|
||||||
_, etcdUrl, etcdPrefix := GetConnectionParams(ctxVars)
|
ctxVars, _, _ = GetConnectionParams(ctxVars)
|
||||||
if (len(etcdUrl) == 0)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf")
|
|
||||||
}
|
|
||||||
|
|
||||||
cli, err := clientv3.New(clientv3.Config{
|
_, err = invokeCLI(ctxVars, []string{ "rm", volName })
|
||||||
DialTimeout: ETCD_TIMEOUT,
|
|
||||||
Endpoints: etcdUrl,
|
|
||||||
})
|
|
||||||
if (err != nil)
|
if (err != nil)
|
||||||
{
|
{
|
||||||
return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error())
|
return nil, err
|
||||||
}
|
|
||||||
defer cli.Close()
|
|
||||||
|
|
||||||
// Find inode by name
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
|
||||||
resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName)
|
|
||||||
cancel()
|
|
||||||
if (err != nil)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
|
|
||||||
}
|
|
||||||
if (len(resp.Kvs) == 0)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist")
|
|
||||||
}
|
|
||||||
var idx InodeIndex
|
|
||||||
err = json.Unmarshal(resp.Kvs[0].Value, &idx)
|
|
||||||
if (err != nil)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get inode config
|
|
||||||
inodeCfgKey := fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id)
|
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
|
||||||
resp, err = cli.Get(ctx, inodeCfgKey)
|
|
||||||
cancel()
|
|
||||||
if (err != nil)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
|
|
||||||
}
|
|
||||||
if (len(resp.Kvs) == 0)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist")
|
|
||||||
}
|
|
||||||
var inodeCfg InodeConfig
|
|
||||||
err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg)
|
|
||||||
if (err != nil)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete inode data by invoking vitastor-cli
|
|
||||||
args := []string{
|
|
||||||
"rm-data", "--etcd_address", strings.Join(etcdUrl, ","),
|
|
||||||
"--pool", fmt.Sprintf("%d", idx.PoolId),
|
|
||||||
"--inode", fmt.Sprintf("%d", idx.Id),
|
|
||||||
}
|
|
||||||
if (ctxVars["configPath"] != "")
|
|
||||||
{
|
|
||||||
args = append(args, "--config_path", ctxVars["configPath"])
|
|
||||||
}
|
|
||||||
c := exec.Command("/usr/bin/vitastor-cli", args...)
|
|
||||||
var stderr bytes.Buffer
|
|
||||||
c.Stdout = nil
|
|
||||||
c.Stderr = &stderr
|
|
||||||
err = c.Run()
|
|
||||||
stderrStr := string(stderr.Bytes())
|
|
||||||
if (err != nil)
|
|
||||||
{
|
|
||||||
klog.Errorf("vitastor-cli rm-data failed: %s, status %s\n", stderrStr, err)
|
|
||||||
return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete inode config in etcd
|
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
|
||||||
txnResp, err := cli.Txn(ctx).Then(
|
|
||||||
clientv3.OpDelete(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)),
|
|
||||||
clientv3.OpDelete(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id)),
|
|
||||||
).Commit()
|
|
||||||
cancel()
|
|
||||||
if (err != nil)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, "failed to delete keys in etcd: "+err.Error())
|
|
||||||
}
|
|
||||||
if (!txnResp.Succeeded)
|
|
||||||
{
|
|
||||||
return nil, status.Error(codes.Internal, "failed to delete keys in etcd: transaction failed")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &csi.DeleteVolumeResponse{}, nil
|
return &csi.DeleteVolumeResponse{}, nil
|
||||||
|
|
Loading…
Reference in New Issue