Implement CSI volume snapshots

zerocopy-tcp-send
Vitaliy Filippov 2023-10-30 18:43:43 +03:00
parent 7c054c6f10
commit 37653abe4b
7 changed files with 227 additions and 24 deletions

View File

@ -35,10 +35,13 @@ rules:
verbs: ["get", "list", "watch"] verbs: ["get", "list", "watch"]
- apiGroups: ["snapshot.storage.k8s.io"] - apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots"] resources: ["volumesnapshots"]
verbs: ["get", "list"] verbs: ["get", "list", "patch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots/status"]
verbs: ["get", "list", "patch"]
- apiGroups: ["snapshot.storage.k8s.io"] - apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents"] resources: ["volumesnapshotcontents"]
verbs: ["create", "get", "list", "watch", "update", "delete"] verbs: ["create", "get", "list", "watch", "update", "delete", "patch"]
- apiGroups: ["snapshot.storage.k8s.io"] - apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotclasses"] resources: ["volumesnapshotclasses"]
verbs: ["get", "list", "watch"] verbs: ["get", "list", "watch"]
@ -53,7 +56,7 @@ rules:
verbs: ["get", "list", "watch"] verbs: ["get", "list", "watch"]
- apiGroups: ["snapshot.storage.k8s.io"] - apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents/status"] resources: ["volumesnapshotcontents/status"]
verbs: ["update"] verbs: ["update", "patch"]
- apiGroups: [""] - apiGroups: [""]
resources: ["configmaps"] resources: ["configmaps"]
verbs: ["get"] verbs: ["get"]

View File

@ -23,6 +23,11 @@ metadata:
name: csi-vitastor-provisioner name: csi-vitastor-provisioner
spec: spec:
replicas: 3 replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 0
selector: selector:
matchLabels: matchLabels:
app: csi-vitastor-provisioner app: csi-vitastor-provisioner
@ -46,7 +51,7 @@ spec:
priorityClassName: system-cluster-critical priorityClassName: system-cluster-critical
containers: containers:
- name: csi-provisioner - name: csi-provisioner
image: k8s.gcr.io/sig-storage/csi-provisioner:v2.2.0 image: k8s.gcr.io/sig-storage/csi-provisioner:v3.0.0
args: args:
- "--csi-address=$(ADDRESS)" - "--csi-address=$(ADDRESS)"
- "--v=5" - "--v=5"

View File

@ -0,0 +1,7 @@
apiVersion: snapshot.storage.k8s.io/v1
kind: VolumeSnapshotClass
metadata:
name: vitastor-snapclass
driver: csi.vitastor.io
deletionPolicy: Delete
parameters:

View File

@ -0,0 +1,16 @@
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: test-vitastor-clone
spec:
storageClassName: vitastor
dataSource:
name: snap1
kind: VolumeSnapshot
apiGroup: snapshot.storage.k8s.io
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi

View File

@ -0,0 +1,8 @@
apiVersion: snapshot.storage.k8s.io/v1
kind: VolumeSnapshot
metadata:
name: snap1
spec:
volumeSnapshotClassName: vitastor-snapclass
source:
persistentVolumeClaimName: test-vitastor-pvc

View File

@ -9,6 +9,7 @@ require (
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/grpc v1.33.1 google.golang.org/grpc v1.33.1
google.golang.org/protobuf v1.24.0
k8s.io/klog v1.0.0 k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20210305010621-2afb4311ab10 k8s.io/utils v0.0.0-20210305010621-2afb4311ab10
) )

View File

@ -20,6 +20,7 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
) )
@ -45,6 +46,7 @@ type InodeConfig struct
ParentPool uint64 `json:"parent_pool,omitempty"` ParentPool uint64 `json:"parent_pool,omitempty"`
ParentId uint64 `json:"parent_id,omitempty"` ParentId uint64 `json:"parent_id,omitempty"`
Readonly bool `json:"readonly,omitempty"` Readonly bool `json:"readonly,omitempty"`
CreateTs uint64 `json:"create_ts,omitempty"`
} }
type ControllerServer struct type ControllerServer struct
@ -178,27 +180,43 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf") return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf")
} }
args := []string{ "create", volName, "-s", fmt.Sprintf("%v", volSize), "--pool", fmt.Sprintf("%v", poolId) }
// Support creation from snapshot
var src *csi.VolumeContentSource
if (req.VolumeContentSource.GetSnapshot() != nil)
{
snapId := req.VolumeContentSource.GetSnapshot().GetSnapshotId()
if (snapId != "")
{
snapVars := make(map[string]string)
err := json.Unmarshal([]byte(snapId), &snapVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
}
args = append(args, "--parent", snapVars["name"]+"@"+snapVars["snapshot"])
src = &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Snapshot{
Snapshot: &csi.VolumeContentSource_SnapshotSource{
SnapshotId: snapId,
},
},
}
}
}
// Create image using vitastor-cli // Create image using vitastor-cli
_, err := invokeCLI(ctxVars, []string{ "create", volName, "-s", fmt.Sprintf("%v", volSize), "--pool", fmt.Sprintf("%v", poolId) }) _, err := invokeCLI(ctxVars, args)
if (err != nil) if (err != nil)
{ {
if (strings.Index(err.Error(), "already exists") > 0) if (strings.Index(err.Error(), "already exists") > 0)
{ {
stat, err := invokeCLI(ctxVars, []string{ "ls", "--json", volName }) inodeCfg, err := invokeList(ctxVars, volName, true)
if (err != nil) if (err != nil)
{ {
return nil, err return nil, err
} }
var inodeCfg []InodeConfig
err = json.Unmarshal(stat, &inodeCfg)
if (err != nil)
{
return nil, status.Error(codes.Internal, "Invalid JSON in vitastor-cli ls: "+err.Error())
}
if (len(inodeCfg) == 0)
{
return nil, status.Error(codes.Internal, "vitastor-cli create said that image already exists, but ls can't find it")
}
if (inodeCfg[0].Size < uint64(volSize)) if (inodeCfg[0].Size < uint64(volSize))
{ {
return nil, status.Error(codes.Internal, "image "+volName+" is already created, but size is less than expected") return nil, status.Error(codes.Internal, "image "+volName+" is already created, but size is less than expected")
@ -217,6 +235,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
// Ugly, but VolumeContext isn't passed to DeleteVolume :-( // Ugly, but VolumeContext isn't passed to DeleteVolume :-(
VolumeId: string(volumeIdJson), VolumeId: string(volumeIdJson),
CapacityBytes: volSize, CapacityBytes: volSize,
ContentSource: src,
}, },
}, nil }, nil
} }
@ -230,15 +249,15 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.InvalidArgument, "request cannot be empty") return nil, status.Error(codes.InvalidArgument, "request cannot be empty")
} }
ctxVars := make(map[string]string) volVars := make(map[string]string)
err := json.Unmarshal([]byte(req.VolumeId), &ctxVars) err := json.Unmarshal([]byte(req.VolumeId), &volVars)
if (err != nil) if (err != nil)
{ {
return nil, status.Error(codes.Internal, "volume ID not in JSON format") return nil, status.Error(codes.Internal, "volume ID not in JSON format")
} }
volName := ctxVars["name"] volName := volVars["name"]
ctxVars, _, _ = GetConnectionParams(ctxVars) ctxVars, _, _ := GetConnectionParams(volVars)
_, err = invokeCLI(ctxVars, []string{ "rm", volName }) _, err = invokeCLI(ctxVars, []string{ "rm", volName })
if (err != nil) if (err != nil)
@ -342,8 +361,9 @@ func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *
for _, capability := range []csi.ControllerServiceCapability_RPC_Type{ for _, capability := range []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES, csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
// TODO: csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
} { } {
controllerServerCapabilities = append(controllerServerCapabilities, functionControllerServerCapabilities(capability)) controllerServerCapabilities = append(controllerServerCapabilities, functionControllerServerCapabilities(capability))
} }
@ -353,22 +373,165 @@ func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *
}, nil }, nil
} }
func invokeList(ctxVars map[string]string, pattern string, expectExist bool) ([]InodeConfig, error)
{
stat, err := invokeCLI(ctxVars, []string{ "ls", "--json", pattern })
if (err != nil)
{
return nil, err
}
var inodeCfg []InodeConfig
err = json.Unmarshal(stat, &inodeCfg)
if (err != nil)
{
return nil, status.Error(codes.Internal, "Invalid JSON in vitastor-cli ls: "+err.Error())
}
if (expectExist && len(inodeCfg) == 0)
{
return nil, status.Error(codes.Internal, "Can't find expected image "+pattern+" via vitastor-cli ls")
}
return inodeCfg, nil
}
// CreateSnapshot create snapshot of an existing PV // CreateSnapshot create snapshot of an existing PV
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error)
{ {
return nil, status.Error(codes.Unimplemented, "") klog.Infof("received controller create snapshot request %+v", protosanitizer.StripSecrets(req))
if (req == nil)
{
return nil, status.Errorf(codes.InvalidArgument, "request cannot be empty")
}
if (req.SourceVolumeId == "" || req.Name == "")
{
return nil, status.Error(codes.InvalidArgument, "source volume ID and snapshot name are required fields")
}
// snapshot name
snapName := req.Name
// req.VolumeId is an ugly json string in our case :)
ctxVars := make(map[string]string)
err := json.Unmarshal([]byte(req.SourceVolumeId), &ctxVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
}
volName := ctxVars["name"]
// Create image using vitastor-cli
_, err = invokeCLI(ctxVars, []string{ "create", "--snapshot", snapName, volName })
if (err != nil && strings.Index(err.Error(), "already exists") <= 0)
{
return nil, err
}
// Check created snapshot
inodeCfg, err := invokeList(ctxVars, volName+"@"+snapName, true)
if (err != nil)
{
return nil, err
}
// Use ugly JSON snapshot ID again, DeleteSnapshot doesn't have context :-(
ctxVars["snapshot"] = snapName
snapIdJson, _ := json.Marshal(ctxVars)
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: int64(inodeCfg[0].Size),
SnapshotId: string(snapIdJson),
SourceVolumeId: req.SourceVolumeId,
CreationTime: &timestamppb.Timestamp{ Seconds: int64(inodeCfg[0].CreateTs) },
ReadyToUse: true,
},
}, nil
} }
// DeleteSnapshot delete provided snapshot of a PV // DeleteSnapshot delete provided snapshot of a PV
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error)
{ {
return nil, status.Error(codes.Unimplemented, "") klog.Infof("received controller delete snapshot request %+v", protosanitizer.StripSecrets(req))
if (req == nil)
{
return nil, status.Errorf(codes.InvalidArgument, "request cannot be empty")
}
if (req.SnapshotId == "")
{
return nil, status.Error(codes.InvalidArgument, "snapshot ID is a required field")
}
volVars := make(map[string]string)
err := json.Unmarshal([]byte(req.SnapshotId), &volVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "snapshot ID not in JSON format")
}
volName := volVars["name"]
snapName := volVars["snapshot"]
ctxVars, _, _ := GetConnectionParams(volVars)
_, err = invokeCLI(ctxVars, []string{ "rm", volName+"@"+snapName })
if (err != nil)
{
return nil, err
}
return &csi.DeleteSnapshotResponse{}, nil
} }
// ListSnapshots list the snapshots of a PV // ListSnapshots list the snapshots of a PV
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error)
{ {
return nil, status.Error(codes.Unimplemented, "") klog.Infof("received controller list snapshots request %+v", protosanitizer.StripSecrets(req))
if (req == nil)
{
return nil, status.Error(codes.InvalidArgument, "request cannot be empty")
}
volVars := make(map[string]string)
err := json.Unmarshal([]byte(req.SourceVolumeId), &volVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
}
volName := volVars["name"]
ctxVars, _, _ := GetConnectionParams(volVars)
inodeCfg, err := invokeList(ctxVars, volName+"@*", false)
if (err != nil)
{
return nil, err
}
resp := &csi.ListSnapshotsResponse{}
for _, ino := range inodeCfg
{
snapName := ino.Name[len(volName)+1:]
if (len(req.StartingToken) > 0 && snapName < req.StartingToken)
{
}
else if (req.MaxEntries == 0 || len(resp.Entries) < int(req.MaxEntries))
{
volVars["snapshot"] = snapName
snapIdJson, _ := json.Marshal(volVars)
resp.Entries = append(resp.Entries, &csi.ListSnapshotsResponse_Entry{
Snapshot: &csi.Snapshot{
SizeBytes: int64(ino.Size),
SnapshotId: string(snapIdJson),
SourceVolumeId: req.SourceVolumeId,
CreationTime: &timestamppb.Timestamp{ Seconds: int64(ino.CreateTs) },
ReadyToUse: true,
},
})
}
else
{
resp.NextToken = snapName
break
}
}
return resp, nil
} }
// ControllerExpandVolume resizes a volume // ControllerExpandVolume resizes a volume