Compare commits

..

8 Commits

Author SHA1 Message Date
64bbf121b6 Experiment: zero-copy TCP send
Some checks failed
Test / test_minsize_1 (push) Successful in 13s
Test / test_snapshot_ec (push) Successful in 32s
Test / test_move_reappear (push) Successful in 20s
Test / test_rm (push) Successful in 17s
Test / test_snapshot_down (push) Successful in 29s
Test / test_snapshot_down_ec (push) Successful in 31s
Test / test_splitbrain (push) Successful in 25s
Test / test_snapshot_chain (push) Successful in 2m32s
Test / test_snapshot_chain_ec (push) Failing after 3m5s
Test / test_rebalance_verify_imm (push) Successful in 2m52s
Test / test_write (push) Successful in 33s
Test / test_rebalance_verify (push) Successful in 3m48s
Test / test_write_no_same (push) Successful in 13s
Test / test_rebalance_verify_ec_imm (push) Successful in 3m11s
Test / test_rebalance_verify_ec (push) Successful in 4m11s
Test / test_write_xor (push) Failing after 3m10s
Test / test_heal_pg_size_2 (push) Successful in 3m50s
Test / test_heal_csum_32k_dmj (push) Successful in 5m15s
Test / test_heal_ec (push) Successful in 6m34s
Test / test_heal_csum_32k_dj (push) Successful in 6m19s
Test / test_heal_csum_32k (push) Successful in 6m26s
Test / test_scrub (push) Successful in 1m16s
Test / test_scrub_zero_osd_2 (push) Successful in 1m15s
Test / test_scrub_xor (push) Successful in 1m24s
Test / test_heal_csum_4k_dmj (push) Successful in 7m0s
Test / test_heal_csum_4k_dj (push) Successful in 6m18s
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 1m4s
Test / test_heal_csum_4k (push) Successful in 6m8s
Test / test_scrub_ec (push) Successful in 57s
Test / test_scrub_pg_size_3 (push) Successful in 1m17s
2023-11-04 01:34:18 +03:00
30dff8893f Fix ISA-L version EC recovery with first missing data chunk not being read
Some checks failed
Test / test_snapshot (push) Successful in 44s
Test / test_snapshot_ec (push) Successful in 28s
Test / test_move_reappear (push) Successful in 19s
Test / test_rm (push) Successful in 16s
Test / test_snapshot_down (push) Successful in 30s
Test / test_snapshot_down_ec (push) Successful in 31s
Test / test_splitbrain (push) Successful in 24s
Test / test_snapshot_chain (push) Successful in 2m20s
Test / test_snapshot_chain_ec (push) Successful in 3m1s
Test / test_rebalance_verify_imm (push) Successful in 2m49s
Test / test_rebalance_verify (push) Successful in 3m37s
Test / test_write (push) Successful in 42s
Test / test_write_no_same (push) Successful in 14s
Test / test_write_xor (push) Successful in 54s
Test / test_rebalance_verify_ec (push) Successful in 4m55s
Test / test_rebalance_verify_ec_imm (push) Successful in 4m13s
Test / test_heal_pg_size_2 (push) Successful in 4m4s
Test / test_heal_ec (push) Successful in 5m2s
Test / test_heal_csum_32k_dmj (push) Failing after 5m54s
Test / test_heal_csum_32k_dj (push) Successful in 6m6s
Test / test_heal_csum_32k (push) Successful in 6m59s
Test / test_scrub (push) Successful in 1m16s
Test / test_heal_csum_4k_dmj (push) Successful in 6m56s
Test / test_scrub_xor (push) Successful in 51s
Test / test_scrub_zero_osd_2 (push) Successful in 1m1s
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 1m25s
Test / test_heal_csum_4k (push) Successful in 6m9s
Test / test_heal_csum_4k_dj (push) Successful in 6m33s
Test / test_scrub_pg_size_3 (push) Successful in 1m37s
Test / test_scrub_ec (push) Successful in 26s
(Yes, all EC n + k with k >= 2 users should upgrade as soon as possible)
2023-11-04 01:34:18 +03:00
becf14a705 Add a test for EC with multiple missing data chunks, but without recovery of first of them 2023-11-04 01:34:18 +03:00
64388788c1 Implement CSI volume expansion
Some checks failed
Test / test_snapshot_ec (push) Successful in 35s
Test / test_minsize_1 (push) Successful in 16s
Test / test_rm (push) Successful in 15s
Test / test_snapshot_down (push) Successful in 22s
Test / test_move_reappear (push) Failing after 50s
Test / test_snapshot_down_ec (push) Successful in 23s
Test / test_splitbrain (push) Successful in 19s
Test / test_snapshot_chain (push) Successful in 2m24s
Test / test_snapshot_chain_ec (push) Failing after 3m6s
Test / test_rebalance_verify_imm (push) Successful in 3m15s
Test / test_write (push) Successful in 41s
Test / test_rebalance_verify (push) Successful in 4m13s
Test / test_write_no_same (push) Successful in 13s
Test / test_write_xor (push) Successful in 50s
Test / test_rebalance_verify_ec_imm (push) Successful in 4m28s
Test / test_rebalance_verify_ec (push) Successful in 5m30s
Test / test_heal_pg_size_2 (push) Successful in 4m5s
Test / test_heal_ec (push) Successful in 4m57s
Test / test_heal_csum_32k_dmj (push) Successful in 6m13s
Test / test_heal_csum_32k_dj (push) Successful in 6m10s
Test / test_heal_csum_32k (push) Successful in 6m40s
Test / test_heal_csum_4k_dmj (push) Successful in 6m24s
Test / test_scrub (push) Successful in 1m7s
Test / test_scrub_xor (push) Successful in 47s
Test / test_scrub_zero_osd_2 (push) Successful in 53s
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 1m25s
Test / test_scrub_pg_size_3 (push) Successful in 1m59s
Test / test_heal_csum_4k_dj (push) Successful in 6m10s
Test / test_heal_csum_4k (push) Successful in 6m2s
Test / test_scrub_ec (push) Successful in 40s
2023-11-01 12:46:20 +03:00
37653abe4b Implement CSI volume snapshots 2023-11-01 12:46:20 +03:00
7c054c6f10 Add "id" to df --json output 2023-11-01 12:46:16 +03:00
bb7709e824 Support listening on non-127.0.0.1 in tests 2023-11-01 12:45:27 +03:00
ebeace5a2d Add cmake and pkg-config to debian build depends 2023-11-01 12:45:27 +03:00
21 changed files with 624 additions and 115 deletions

View File

@@ -35,10 +35,13 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots"]
verbs: ["get", "list"]
verbs: ["get", "list", "patch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots/status"]
verbs: ["get", "list", "patch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents"]
verbs: ["create", "get", "list", "watch", "update", "delete"]
verbs: ["create", "get", "list", "watch", "update", "delete", "patch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotclasses"]
verbs: ["get", "list", "watch"]
@@ -53,7 +56,7 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents/status"]
verbs: ["update"]
verbs: ["update", "patch"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get"]

View File

@@ -23,6 +23,11 @@ metadata:
name: csi-vitastor-provisioner
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 0
selector:
matchLabels:
app: csi-vitastor-provisioner
@@ -46,7 +51,7 @@ spec:
priorityClassName: system-cluster-critical
containers:
- name: csi-provisioner
image: k8s.gcr.io/sig-storage/csi-provisioner:v2.2.0
image: k8s.gcr.io/sig-storage/csi-provisioner:v3.0.0
args:
- "--csi-address=$(ADDRESS)"
- "--v=5"

View File

@@ -17,3 +17,4 @@ parameters:
# multiple etcdUrls may be specified, delimited by comma
#etcdUrl: "http://192.168.7.2:2379"
#etcdPrefix: "/vitastor"
allowVolumeExpansion: true

View File

@@ -0,0 +1,7 @@
apiVersion: snapshot.storage.k8s.io/v1
kind: VolumeSnapshotClass
metadata:
name: vitastor-snapclass
driver: csi.vitastor.io
deletionPolicy: Delete
parameters:

View File

@@ -0,0 +1,16 @@
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: test-vitastor-clone
spec:
storageClassName: vitastor
dataSource:
name: snap1
kind: VolumeSnapshot
apiGroup: snapshot.storage.k8s.io
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi

View File

@@ -0,0 +1,8 @@
apiVersion: snapshot.storage.k8s.io/v1
kind: VolumeSnapshot
metadata:
name: snap1
spec:
volumeSnapshotClassName: vitastor-snapclass
source:
persistentVolumeClaimName: test-vitastor-pvc

View File

@@ -9,6 +9,7 @@ require (
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/grpc v1.33.1
google.golang.org/protobuf v1.24.0
k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20210305010621-2afb4311ab10
)

View File

@@ -20,6 +20,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/container-storage-interface/spec/lib/go/csi"
)
@@ -45,6 +46,7 @@ type InodeConfig struct
ParentPool uint64 `json:"parent_pool,omitempty"`
ParentId uint64 `json:"parent_id,omitempty"`
Readonly bool `json:"readonly,omitempty"`
CreateTs uint64 `json:"create_ts,omitempty"`
}
type ControllerServer struct
@@ -178,27 +180,43 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf")
}
args := []string{ "create", volName, "-s", fmt.Sprintf("%v", volSize), "--pool", fmt.Sprintf("%v", poolId) }
// Support creation from snapshot
var src *csi.VolumeContentSource
if (req.VolumeContentSource.GetSnapshot() != nil)
{
snapId := req.VolumeContentSource.GetSnapshot().GetSnapshotId()
if (snapId != "")
{
snapVars := make(map[string]string)
err := json.Unmarshal([]byte(snapId), &snapVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
}
args = append(args, "--parent", snapVars["name"]+"@"+snapVars["snapshot"])
src = &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Snapshot{
Snapshot: &csi.VolumeContentSource_SnapshotSource{
SnapshotId: snapId,
},
},
}
}
}
// Create image using vitastor-cli
_, err := invokeCLI(ctxVars, []string{ "create", volName, "-s", fmt.Sprintf("%v", volSize), "--pool", fmt.Sprintf("%v", poolId) })
_, err := invokeCLI(ctxVars, args)
if (err != nil)
{
if (strings.Index(err.Error(), "already exists") > 0)
{
stat, err := invokeCLI(ctxVars, []string{ "ls", "--json", volName })
inodeCfg, err := invokeList(ctxVars, volName, true)
if (err != nil)
{
return nil, err
}
var inodeCfg []InodeConfig
err = json.Unmarshal(stat, &inodeCfg)
if (err != nil)
{
return nil, status.Error(codes.Internal, "Invalid JSON in vitastor-cli ls: "+err.Error())
}
if (len(inodeCfg) == 0)
{
return nil, status.Error(codes.Internal, "vitastor-cli create said that image already exists, but ls can't find it")
}
if (inodeCfg[0].Size < uint64(volSize))
{
return nil, status.Error(codes.Internal, "image "+volName+" is already created, but size is less than expected")
@@ -217,6 +235,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
// Ugly, but VolumeContext isn't passed to DeleteVolume :-(
VolumeId: string(volumeIdJson),
CapacityBytes: volSize,
ContentSource: src,
},
}, nil
}
@@ -230,15 +249,15 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.InvalidArgument, "request cannot be empty")
}
ctxVars := make(map[string]string)
err := json.Unmarshal([]byte(req.VolumeId), &ctxVars)
volVars := make(map[string]string)
err := json.Unmarshal([]byte(req.VolumeId), &volVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
}
volName := ctxVars["name"]
volName := volVars["name"]
ctxVars, _, _ = GetConnectionParams(ctxVars)
ctxVars, _, _ := GetConnectionParams(volVars)
_, err = invokeCLI(ctxVars, []string{ "rm", volName })
if (err != nil)
@@ -344,6 +363,8 @@ func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
// TODO: csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
} {
controllerServerCapabilities = append(controllerServerCapabilities, functionControllerServerCapabilities(capability))
}
@@ -353,28 +374,214 @@ func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *
}, nil
}
func invokeList(ctxVars map[string]string, pattern string, expectExist bool) ([]InodeConfig, error)
{
stat, err := invokeCLI(ctxVars, []string{ "ls", "--json", pattern })
if (err != nil)
{
return nil, err
}
var inodeCfg []InodeConfig
err = json.Unmarshal(stat, &inodeCfg)
if (err != nil)
{
return nil, status.Error(codes.Internal, "Invalid JSON in vitastor-cli ls: "+err.Error())
}
if (expectExist && len(inodeCfg) == 0)
{
return nil, status.Error(codes.Internal, "Can't find expected image "+pattern+" via vitastor-cli ls")
}
return inodeCfg, nil
}
// CreateSnapshot create snapshot of an existing PV
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error)
{
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("received controller create snapshot request %+v", protosanitizer.StripSecrets(req))
if (req == nil)
{
return nil, status.Errorf(codes.InvalidArgument, "request cannot be empty")
}
if (req.SourceVolumeId == "" || req.Name == "")
{
return nil, status.Error(codes.InvalidArgument, "source volume ID and snapshot name are required fields")
}
// snapshot name
snapName := req.Name
// req.VolumeId is an ugly json string in our case :)
ctxVars := make(map[string]string)
err := json.Unmarshal([]byte(req.SourceVolumeId), &ctxVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
}
volName := ctxVars["name"]
// Create image using vitastor-cli
_, err = invokeCLI(ctxVars, []string{ "create", "--snapshot", snapName, volName })
if (err != nil && strings.Index(err.Error(), "already exists") <= 0)
{
return nil, err
}
// Check created snapshot
inodeCfg, err := invokeList(ctxVars, volName+"@"+snapName, true)
if (err != nil)
{
return nil, err
}
// Use ugly JSON snapshot ID again, DeleteSnapshot doesn't have context :-(
ctxVars["snapshot"] = snapName
snapIdJson, _ := json.Marshal(ctxVars)
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: int64(inodeCfg[0].Size),
SnapshotId: string(snapIdJson),
SourceVolumeId: req.SourceVolumeId,
CreationTime: &timestamppb.Timestamp{ Seconds: int64(inodeCfg[0].CreateTs) },
ReadyToUse: true,
},
}, nil
}
// DeleteSnapshot delete provided snapshot of a PV
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error)
{
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("received controller delete snapshot request %+v", protosanitizer.StripSecrets(req))
if (req == nil)
{
return nil, status.Errorf(codes.InvalidArgument, "request cannot be empty")
}
if (req.SnapshotId == "")
{
return nil, status.Error(codes.InvalidArgument, "snapshot ID is a required field")
}
volVars := make(map[string]string)
err := json.Unmarshal([]byte(req.SnapshotId), &volVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "snapshot ID not in JSON format")
}
volName := volVars["name"]
snapName := volVars["snapshot"]
ctxVars, _, _ := GetConnectionParams(volVars)
_, err = invokeCLI(ctxVars, []string{ "rm", volName+"@"+snapName })
if (err != nil)
{
return nil, err
}
return &csi.DeleteSnapshotResponse{}, nil
}
// ListSnapshots list the snapshots of a PV
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error)
{
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("received controller list snapshots request %+v", protosanitizer.StripSecrets(req))
if (req == nil)
{
return nil, status.Error(codes.InvalidArgument, "request cannot be empty")
}
volVars := make(map[string]string)
err := json.Unmarshal([]byte(req.SourceVolumeId), &volVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
}
volName := volVars["name"]
ctxVars, _, _ := GetConnectionParams(volVars)
inodeCfg, err := invokeList(ctxVars, volName+"@*", false)
if (err != nil)
{
return nil, err
}
resp := &csi.ListSnapshotsResponse{}
for _, ino := range inodeCfg
{
snapName := ino.Name[len(volName)+1:]
if (len(req.StartingToken) > 0 && snapName < req.StartingToken)
{
}
else if (req.MaxEntries == 0 || len(resp.Entries) < int(req.MaxEntries))
{
volVars["snapshot"] = snapName
snapIdJson, _ := json.Marshal(volVars)
resp.Entries = append(resp.Entries, &csi.ListSnapshotsResponse_Entry{
Snapshot: &csi.Snapshot{
SizeBytes: int64(ino.Size),
SnapshotId: string(snapIdJson),
SourceVolumeId: req.SourceVolumeId,
CreationTime: &timestamppb.Timestamp{ Seconds: int64(ino.CreateTs) },
ReadyToUse: true,
},
})
}
else
{
resp.NextToken = snapName
break
}
}
return resp, nil
}
// ControllerExpandVolume resizes a volume
// ControllerExpandVolume increases the size of a volume
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error)
{
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("received controller expand volume request %+v", protosanitizer.StripSecrets(req))
if (req == nil)
{
return nil, status.Error(codes.InvalidArgument, "request cannot be empty")
}
if (req.VolumeId == "" || req.CapacityRange == nil || req.CapacityRange.RequiredBytes == 0)
{
return nil, status.Error(codes.InvalidArgument, "VolumeId, CapacityRange and RequiredBytes are required fields")
}
volVars := make(map[string]string)
err := json.Unmarshal([]byte(req.VolumeId), &volVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
}
volName := volVars["name"]
ctxVars, _, _ := GetConnectionParams(volVars)
inodeCfg, err := invokeList(ctxVars, volName, true)
if (err != nil)
{
return nil, err
}
if (req.CapacityRange.RequiredBytes > 0 && inodeCfg[0].Size < uint64(req.CapacityRange.RequiredBytes))
{
sz := ((req.CapacityRange.RequiredBytes+4095)/4096)*4096
_, err := invokeCLI(ctxVars, []string{ "modify", "--inc_size", "1", "--resize", fmt.Sprintf("%d", sz), volName })
if (err != nil)
{
return nil, err
}
inodeCfg, err = invokeList(ctxVars, volName, true)
if (err != nil)
{
return nil, err
}
}
return &csi.ControllerExpandVolumeResponse{
CapacityBytes: int64(inodeCfg[0].Size),
NodeExpansionRequired: false,
}, nil
}
// ControllerGetVolume get volume info

View File

@@ -49,6 +49,13 @@ func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.Ge
},
},
},
{
Type: &csi.PluginCapability_VolumeExpansion_{
VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
Type: csi.PluginCapability_VolumeExpansion_OFFLINE,
},
},
},
},
}, nil
}

View File

@@ -70,10 +70,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
isBlock := req.GetVolumeCapability().GetBlock() != nil
// Check that it's not already mounted
_, error := mount.IsNotMountPoint(ns.mounter, targetPath)
if (error != nil)
_, err := mount.IsNotMountPoint(ns.mounter, targetPath)
if (err != nil)
{
if (os.IsNotExist(error))
if (os.IsNotExist(err))
{
if (isBlock)
{
@@ -102,12 +102,12 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
else
{
return nil, status.Error(codes.Internal, error.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
ctxVars := make(map[string]string)
err := json.Unmarshal([]byte(req.VolumeId), &ctxVars)
err = json.Unmarshal([]byte(req.VolumeId), &ctxVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
@@ -147,70 +147,74 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
devicePath := strings.TrimSpace(stdoutStr)
// Check existing format
diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: utilexec.New()}
existingFormat, err := diskMounter.GetDiskFormat(devicePath)
if (err != nil)
{
klog.Errorf("failed to get disk format for path %s, error: %v", err)
// unmap NBD device
unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput()
if (unmapErr != nil)
{
klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr)
}
return nil, err
}
// Format the device (ext4 or xfs)
fsType := req.GetVolumeCapability().GetMount().GetFsType()
opt := req.GetVolumeCapability().GetMount().GetMountFlags()
opt = append(opt, "_netdev")
if ((req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY) &&
!Contains(opt, "ro"))
{
opt = append(opt, "ro")
}
if (fsType == "xfs")
{
opt = append(opt, "nouuid")
}
readOnly := Contains(opt, "ro")
if (existingFormat == "" && !readOnly)
{
args := []string{}
switch fsType
{
case "ext4":
args = []string{"-m0", "-Enodiscard,lazy_itable_init=1,lazy_journal_init=1", devicePath}
case "xfs":
args = []string{"-K", devicePath}
}
if (len(args) > 0)
{
cmdOut, cmdErr := diskMounter.Exec.Command("mkfs."+fsType, args...).CombinedOutput()
if (cmdErr != nil)
{
klog.Errorf("failed to run mkfs error: %v, output: %v", cmdErr, string(cmdOut))
// unmap NBD device
unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput()
if (unmapErr != nil)
{
klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr)
}
return nil, status.Error(codes.Internal, cmdErr.Error())
}
}
}
if (isBlock)
{
opt = append(opt, "bind")
err = diskMounter.Mount(devicePath, targetPath, fsType, opt)
err = diskMounter.Mount(devicePath, targetPath, "", []string{"bind"})
}
else
{
// Check existing format
existingFormat, err := diskMounter.GetDiskFormat(devicePath)
if (err != nil)
{
klog.Errorf("failed to get disk format for path %s, error: %v", err)
goto unmap
}
// Format the device (ext4 or xfs)
fsType := req.GetVolumeCapability().GetMount().GetFsType()
opt := req.GetVolumeCapability().GetMount().GetMountFlags()
opt = append(opt, "_netdev")
if ((req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY) &&
!Contains(opt, "ro"))
{
opt = append(opt, "ro")
}
if (fsType == "xfs")
{
opt = append(opt, "nouuid")
}
readOnly := Contains(opt, "ro")
if (existingFormat == "" && !readOnly)
{
var cmdOut []byte
switch fsType
{
case "ext4":
args := []string{"-m0", "-Enodiscard,lazy_itable_init=1,lazy_journal_init=1", devicePath}
cmdOut, err = diskMounter.Exec.Command("mkfs.ext4", args...).CombinedOutput()
case "xfs":
cmdOut, err = diskMounter.Exec.Command("mkfs.xfs", "-K", devicePath).CombinedOutput()
}
if (err != nil)
{
klog.Errorf("failed to run mkfs error: %v, output: %v", err, string(cmdOut))
goto unmap
}
}
err = diskMounter.FormatAndMount(devicePath, targetPath, fsType, opt)
// Try to run online resize on mount.
// FIXME: Implement online resize. It requires online resize support in vitastor-nbd.
if (err == nil && existingFormat != "" && !readOnly)
{
var cmdOut []byte
switch (fsType)
{
case "ext4":
cmdOut, err = diskMounter.Exec.Command("resize2fs", devicePath).CombinedOutput()
case "xfs":
cmdOut, err = diskMounter.Exec.Command("xfs_growfs", devicePath).CombinedOutput()
}
if (err != nil)
{
klog.Errorf("failed to run resizefs error: %v, output: %v", err, string(cmdOut))
goto unmap
}
}
}
if (err != nil)
{
@@ -218,15 +222,18 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
"failed to mount device path (%s) to path (%s) for volume (%s) error: %s",
devicePath, targetPath, volName, err,
)
// unmap NBD device
unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput()
if (unmapErr != nil)
{
klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr)
}
return nil, status.Error(codes.Internal, err.Error())
goto unmap
}
return &csi.NodePublishVolumeResponse{}, nil
unmap:
// unmap NBD device
unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput()
if (unmapErr != nil)
{
klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr)
}
return nil, status.Error(codes.Internal, err.Error())
}
// NodeUnpublishVolume unmounts the volume from the target path

2
debian/control vendored
View File

@@ -2,7 +2,7 @@ Source: vitastor
Section: admin
Priority: optional
Maintainer: Vitaliy Filippov <vitalif@yourcmc.ru>
Build-Depends: debhelper, liburing-dev (>= 0.6), g++ (>= 8), libstdc++6 (>= 8), linux-libc-dev, libgoogle-perftools-dev, libjerasure-dev, libgf-complete-dev, libibverbs-dev, libisal-dev
Build-Depends: debhelper, liburing-dev (>= 0.6), g++ (>= 8), libstdc++6 (>= 8), linux-libc-dev, libgoogle-perftools-dev, libjerasure-dev, libgf-complete-dev, libibverbs-dev, libisal-dev, cmake, pkg-config
Standards-Version: 4.5.0
Homepage: https://vitastor.io/
Rules-Requires-Root: no

View File

@@ -30,6 +30,18 @@
будут использоваться обычные синхронные системные вызовы send/recv. Для OSD
это бессмысленно, так как OSD в любом случае нуждается в io_uring, но, в
принципе, это может применяться для клиентов со старыми версиями ядра.
- name: use_zerocopy_send
type: bool
default: false
info: |
If true, OSDs and clients will attempt to use TCP zero-copy send
(MSG_ZEROCOPY) for big buffers. It's recommended to raise net.ipv4.tcp_wmem
and net.core.wmem_max sysctls when using this mode.
info_ru: |
Если установлено в true, то OSD и клиенты будут стараться использовать
TCP-отправку без копирования (MSG_ZEROCOPY) для больших буферов данных.
Рекомендуется поднять значения sysctl net.ipv4.tcp_wmem и net.core.wmem_max
при использовании этого режима.
- name: use_rdma
type: bool
default: true

2
json11

Submodule json11 updated: fd37016cf8...52a3af664f

View File

@@ -65,6 +65,7 @@ const etcd_tree = {
// client and osd
tcp_header_buffer_size: 65536,
use_sync_send_recv: false,
use_zerocopy_send: false,
use_rdma: true,
rdma_device: null, // for example, "rocep5s0f0"
rdma_port_num: 1,

View File

@@ -124,6 +124,7 @@ resume_1:
pool_avail *= (pool_cfg.pg_size - pool_cfg.parity_chunks);
}
pool_stats[pool_cfg.id] = json11::Json::object {
{ "id", (uint64_t)pool_cfg.id },
{ "name", pool_cfg.name },
{ "pg_count", pool_cfg.pg_count },
{ "scheme", pool_cfg.scheme == POOL_SCHEME_REPLICATED ? "replicated" : "ec" },

View File

@@ -42,6 +42,12 @@ void osd_messenger_t::init()
handle_rdma_events();
}
}
#endif
#ifndef SO_ZEROCOPY
if (log_level > 0)
{
fprintf(stderr, "Zero-copy TCP send is not supported in this build, ignoring\n");
}
#endif
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
{
@@ -173,6 +179,8 @@ void osd_messenger_t::parse_config(const json11::Json & config)
this->receive_buffer_size = 65536;
this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
config["use_sync_send_recv"].uint64_value();
this->use_zerocopy_send = config["use_zerocopy_send"].bool_value() ||
config["use_zerocopy_send"].uint64_value();
this->peer_connect_interval = config["peer_connect_interval"].uint64_value();
if (!this->peer_connect_interval)
this->peer_connect_interval = 5;
@@ -303,8 +311,7 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd)
on_connect_peer(peer_osd, -result);
return;
}
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
set_socket_options(cl);
cl->peer_state = PEER_CONNECTED;
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
{
@@ -314,6 +321,23 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd)
check_peer_config(cl);
}
void osd_messenger_t::set_socket_options(osd_client_t *cl)
{
int one = 1;
setsockopt(cl->peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
#ifdef SO_ZEROCOPY
if (!use_zerocopy_send)
cl->zerocopy_send = false;
else if (setsockopt(cl->peer_fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one)) != 0)
{
if (log_level > 0)
fprintf(stderr, "[OSD %lu] Failed to enable zero-copy send for client %d: %s\n", this->osd_num, cl->peer_fd, strerror(errno));
}
else
cl->zerocopy_send = true;
#endif
}
void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events)
{
// Mark client as ready (i.e. some data is available)
@@ -515,14 +539,13 @@ void osd_messenger_t::accept_connections(int listen_fd)
fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd,
addr_to_string(addr).c_str());
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
clients[peer_fd] = new osd_client_t();
clients[peer_fd]->peer_addr = addr;
clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
clients[peer_fd]->peer_fd = peer_fd;
clients[peer_fd]->peer_state = PEER_CONNECTED;
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
auto cl = clients[peer_fd] = new osd_client_t();
cl->peer_addr = addr;
cl->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
cl->peer_fd = peer_fd;
cl->peer_state = PEER_CONNECTED;
cl->in_buf = malloc_or_die(receive_buffer_size);
set_socket_options(cl);
// Add FD to epoll
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
{

View File

@@ -45,6 +45,12 @@ struct msgr_rdma_connection_t;
struct msgr_rdma_context_t;
#endif
struct msgr_zc_not_t
{
osd_op_t *op;
uint32_t nsend;
};
struct osd_client_t
{
int refs = 0;
@@ -57,6 +63,7 @@ struct osd_client_t
int ping_time_remaining = 0;
int idle_time_remaining = 0;
osd_num_t osd_num = 0;
bool zerocopy_send = false;
void *in_buf = NULL;
@@ -87,6 +94,12 @@ struct osd_client_t
int write_state = 0;
std::vector<iovec> send_list, next_send_list;
std::vector<msgr_sendp_t> outbox, next_outbox;
std::vector<msgr_zc_not_t> zerocopy_sent;
uint64_t outbox_size = 0, next_outbox_size = 0;
uint32_t zerocopy_notification_idx = 0;
uint32_t zerocopy_notification_prev = 0;
uint8_t zerocopy_notification_buf[256];
struct msghdr zerocopy_notification_msg;
~osd_client_t();
};
@@ -123,6 +136,7 @@ protected:
int osd_ping_timeout = 0;
int log_level = 0;
bool use_sync_send_recv = false;
bool use_zerocopy_send = false;
#ifdef WITH_RDMA
bool use_rdma = true;
@@ -185,9 +199,11 @@ protected:
void check_peer_config(osd_client_t *cl);
void cancel_osd_ops(osd_client_t *cl);
void cancel_op(osd_op_t *op);
void set_socket_options(osd_client_t *cl);
bool try_send(osd_client_t *cl);
void handle_send(int result, osd_client_t *cl);
void handle_zerocopy_notification(osd_client_t *cl, int res);
bool handle_read(int result, osd_client_t *cl);
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);

View File

@@ -6,6 +6,12 @@
#include "messenger.h"
#include <linux/errqueue.h>
#ifndef MSG_ZEROCOPY
#define MSG_ZEROCOPY 0
#endif
void osd_messenger_t::outbox_push(osd_op_t *cur_op)
{
assert(cur_op->peer_fd);
@@ -36,6 +42,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
}
auto & to_send_list = cl->write_msg.msg_iovlen ? cl->next_send_list : cl->send_list;
auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox;
auto & to_size = cl->write_msg.msg_iovlen ? cl->next_outbox_size : cl->outbox_size;
if (cur_op->op_type == OSD_OP_IN)
{
measure_exec(cur_op);
@@ -46,6 +53,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE });
cl->sent_ops[cur_op->req.hdr.id] = cur_op;
}
to_size += OSD_PACKET_SIZE;
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_HDR });
// Bitmap
if (cur_op->op_type == OSD_OP_IN &&
@@ -57,6 +65,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
.iov_len = cur_op->reply.sec_rw.attr_len,
});
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
to_size += cur_op->reply.sec_rw.attr_len;
}
else if (cur_op->op_type == OSD_OP_OUT &&
(cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) &&
@@ -67,6 +76,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
.iov_len = cur_op->req.sec_rw.attr_len,
});
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
to_size += cur_op->req.sec_rw.attr_len;
}
// Operation data
if ((cur_op->op_type == OSD_OP_IN
@@ -89,15 +99,22 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
assert(cur_op->iov.buf[i].iov_base);
to_send_list.push_back(cur_op->iov.buf[i]);
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
to_size += cur_op->iov.buf[i].iov_len;
}
}
}
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
{
if (cur_op->op_type == OSD_OP_IN && cur_op->reply.hdr.retval > 0)
{
to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->reply.hdr.retval });
to_size += cur_op->reply.hdr.retval;
}
else if (cur_op->op_type == OSD_OP_OUT && cur_op->req.sec_read_bmp.len > 0)
{
to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->req.sec_read_bmp.len });
to_size += cur_op->req.sec_read_bmp.len;
}
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
}
if (cur_op->op_type == OSD_OP_IN)
@@ -183,17 +200,19 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
}
cl->write_msg.msg_iov = cl->send_list.data();
cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
cl->write_msg.msg_flags = (cl->zerocopy_send && (cl->outbox_size/cl->send_list.size()) >= 4096 ? MSG_ZEROCOPY : 0);
cl->refs++;
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, cl->write_msg.msg_flags);
}
else
{
cl->write_msg.msg_iov = cl->send_list.data();
cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
cl->write_msg.msg_flags = (cl->zerocopy_send && (cl->outbox_size/cl->send_list.size()) >= 4096 ? MSG_ZEROCOPY : 0);
cl->refs++;
int result = sendmsg(peer_fd, &cl->write_msg, MSG_NOSIGNAL);
int result = sendmsg(peer_fd, &cl->write_msg, MSG_NOSIGNAL | cl->write_msg.msg_flags);
if (result < 0)
{
result = -errno;
@@ -203,6 +222,62 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
return true;
}
void osd_messenger_t::handle_zerocopy_notification(osd_client_t *cl, int res)
{
cl->refs--;
if (cl->peer_state == PEER_STOPPED)
{
if (cl->refs <= 0)
{
delete cl;
}
return;
}
if (res != 0)
{
return;
}
if (cl->zerocopy_notification_msg.msg_flags & MSG_CTRUNC)
{
fprintf(stderr, "zero-copy send notification truncated on client socket %d\n", cl->peer_fd);
return;
}
for (struct cmsghdr *cm = CMSG_FIRSTHDR(&cl->zerocopy_notification_msg); cm; cm = CMSG_NXTHDR(&cl->zerocopy_notification_msg, cm))
{
if (cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR)
{
struct sock_extended_err *serr = (struct sock_extended_err*)CMSG_DATA(cm);
if (serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY)
{
// completed sends numbered serr->ee_info .. serr->ee_data
int start = 0;
while (start < cl->zerocopy_sent.size() && cl->zerocopy_sent[start].nsend < serr->ee_info)
start++;
int end = start;
if (serr->ee_data < serr->ee_info)
{
// counter has wrapped around
while (end < cl->zerocopy_sent.size() && cl->zerocopy_sent[end].nsend >= cl->zerocopy_sent[start].nsend)
end++;
}
while (end < cl->zerocopy_sent.size() && cl->zerocopy_sent[end].nsend <= serr->ee_data)
end++;
if (end > start)
{
for (int i = start; i < end; i++)
{
delete cl->zerocopy_sent[i].op;
}
cl->zerocopy_sent.erase(
cl->zerocopy_sent.begin() + start,
cl->zerocopy_sent.begin() + end
);
}
}
}
}
}
void osd_messenger_t::send_replies()
{
for (int i = 0; i < write_ready_clients.size(); i++)
@@ -230,16 +305,19 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
}
return;
}
if (result < 0 && result != -EAGAIN && result != -EINTR)
if (result < 0 && result != -EAGAIN && result != -EINTR && result != -ENOBUFS)
{
// this is a client socket, so don't panic. just disconnect it
fprintf(stderr, "Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
stop_client(cl->peer_fd);
return;
}
bool used_zerocopy = false;
if (result >= 0)
{
used_zerocopy = (cl->write_msg.msg_flags & MSG_ZEROCOPY) ? true : false;
int done = 0;
int bytes_written = result;
while (result > 0 && done < cl->send_list.size())
{
iovec & iov = cl->send_list[done];
@@ -248,7 +326,19 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
if (cl->outbox[done].flags & MSGR_SENDP_FREE)
{
// Reply fully sent
delete cl->outbox[done].op;
if (!used_zerocopy)
{
delete cl->outbox[done].op;
}
else
{
// With zero-copy send the difference is that we must keep the buffer (i.e. the operation)
// allocated until we get send notification from MSG_ERRQUEUE
cl->zerocopy_sent.push_back((msgr_zc_not_t){
.op = cl->outbox[done].op,
.nsend = cl->zerocopy_notification_idx,
});
}
}
result -= iov.iov_len;
done++;
@@ -260,6 +350,11 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
break;
}
}
if (used_zerocopy)
{
cl->zerocopy_notification_idx++;
}
cl->outbox_size -= bytes_written;
if (done > 0)
{
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+done);
@@ -269,8 +364,10 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
{
cl->send_list.insert(cl->send_list.end(), cl->next_send_list.begin(), cl->next_send_list.end());
cl->outbox.insert(cl->outbox.end(), cl->next_outbox.begin(), cl->next_outbox.end());
cl->outbox_size += cl->next_outbox_size;
cl->next_send_list.clear();
cl->next_outbox.clear();
cl->next_outbox_size = 0;
}
cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0;
#ifdef WITH_RDMA
@@ -293,4 +390,34 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
{
write_ready_clients.push_back(cl->peer_fd);
}
if (used_zerocopy && (cl->zerocopy_notification_idx-cl->zerocopy_notification_prev) >= 16 &&
cl->zerocopy_sent.size() > 0)
{
cl->zerocopy_notification_prev = cl->zerocopy_notification_idx;
cl->zerocopy_notification_msg = {
.msg_control = cl->zerocopy_notification_buf,
.msg_controllen = sizeof(cl->zerocopy_notification_buf),
};
cl->refs++;
io_uring_sqe* sqe = NULL;
if (ringloop && !use_sync_send_recv)
{
sqe = ringloop->get_sqe();
}
if (!sqe)
{
int res = recvmsg(cl->peer_fd, &cl->zerocopy_notification_msg, MSG_ERRQUEUE|MSG_DONTWAIT);
if (res < 0)
{
res = -errno;
}
handle_zerocopy_notification(cl, res);
}
else
{
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_zerocopy_notification(cl, data->res); };
my_uring_prep_recvmsg(sqe, cl->peer_fd, &cl->zerocopy_notification_msg, MSG_ERRQUEUE);
}
}
}

View File

@@ -239,8 +239,9 @@ static void* get_jerasure_decoding_matrix(osd_rmw_stripe_t *stripes, int pg_size
{
int edd = 0;
int erased[pg_size];
// we should distinguish stripes which are not at all and missing stripes
for (int i = 0; i < pg_size; i++)
erased[i] = (stripes[i].read_end == 0 || stripes[i].missing ? 1 : 0);
erased[i] = (stripes[i].read_end == 0 ? 2 : (stripes[i].missing ? 1 : 0));
for (int i = 0; i < pg_minsize; i++)
if (stripes[i].read_end != 0 && stripes[i].missing)
edd++;
@@ -253,7 +254,7 @@ static void* get_jerasure_decoding_matrix(osd_rmw_stripe_t *stripes, int pg_size
#ifdef WITH_ISAL
int smrow = 0;
uint8_t *submatrix = (uint8_t*)malloc_or_die(pg_minsize*pg_minsize*2);
for (int i = 0; i < pg_size; i++)
for (int i = 0; i < pg_size && smrow < pg_minsize; i++)
{
if (!erased[i])
{
@@ -279,7 +280,7 @@ static void* get_jerasure_decoding_matrix(osd_rmw_stripe_t *stripes, int pg_size
smrow = 0;
for (int i = 0; i < pg_minsize; i++)
{
if (erased[i])
if (erased[i] == 1)
{
memcpy(submatrix + pg_minsize*smrow, submatrix + (pg_minsize+i)*pg_minsize, pg_minsize);
smrow++;

View File

@@ -29,6 +29,7 @@ void test15(bool second);
void test16();
void test_recover_22_d2();
void test_ec43_error_bruteforce();
void test_recover_53_d5();
int main(int narg, char *args[])
{
@@ -67,6 +68,8 @@ int main(int narg, char *args[])
test_recover_22_d2();
// Error bruteforce
test_ec43_error_bruteforce();
// Test 19
test_recover_53_d5();
// End
printf("all ok\n");
return 0;
@@ -1112,7 +1115,7 @@ void test_recover_22_d2()
/***
EC 4+2 error location bruteforce
18. EC 4+2 error location bruteforce
***/
@@ -1178,3 +1181,66 @@ void test_ec43_error_bruteforce()
free(write_buf);
use_ec(7, 4, false);
}
/***
19. EC 5+3 recover 5th data block but not 4th
***/
void test_recover_53_d5()
{
const int bmp = 128*1024 / 4096 / 8;
use_ec(8, 5, true);
osd_num_t osd_set[8] = { 1, 2, 3, 0, 0, 6, 7, 8 };
osd_rmw_stripe_t stripes[8] = {};
unsigned bitmaps[8] = { 0 };
// Read 512+128K
split_stripes(5, 128*1024, 512*1024, 128*1024, stripes);
assert(stripes[0].req_start == 0 && stripes[0].req_end == 0);
assert(stripes[1].req_start == 0 && stripes[1].req_end == 0);
assert(stripes[2].req_start == 0 && stripes[2].req_end == 0);
assert(stripes[3].req_start == 0 && stripes[3].req_end == 0);
assert(stripes[4].req_start == 0 && stripes[4].req_end == 128*1024);
uint8_t *data_buf = (uint8_t*)malloc_or_die(128*1024*8);
for (int i = 0; i < 8; i++)
{
stripes[i].read_start = stripes[i].req_start;
stripes[i].read_end = stripes[i].req_end;
stripes[i].read_buf = data_buf + i*128*1024;
stripes[i].bmp_buf = bitmaps + i;
}
// Read using parity
assert(extend_missing_stripes(stripes, osd_set, 5, 8) == 0);
assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024);
assert(stripes[1].read_start == 0 && stripes[1].read_end == 128*1024);
assert(stripes[2].read_start == 0 && stripes[2].read_end == 128*1024);
assert(stripes[3].read_start == 0 && stripes[3].read_end == 0);
assert(stripes[4].read_start == 0 && stripes[4].read_end == 128*1024);
assert(stripes[5].read_start == 0 && stripes[5].read_end == 128*1024);
assert(stripes[6].read_start == 0 && stripes[6].read_end == 128*1024);
assert(stripes[7].read_start == 0 && stripes[7].read_end == 0);
bitmaps[0] = 0xffffffff;
bitmaps[1] = 0xffffffff;
bitmaps[2] = 0xffffffff;
bitmaps[3] = 0;
bitmaps[4] = 0;
bitmaps[5] = 0xffffffff;
bitmaps[6] = 0x64646464;
bitmaps[7] = 0;
set_pattern(stripes[0].read_buf, 128*1024, 0x70a549add9a2280a);
set_pattern(stripes[1].read_buf, 128*1024, 0xa70a549add9a2280);
set_pattern(stripes[2].read_buf, 128*1024, 0x0a70a549add9a228);
set_pattern(stripes[3].read_buf, 128*1024, 0); // 0x80a70a549add9a22
set_pattern(stripes[4].read_buf, 128*1024, 0); // 0x280a70a549add9a2
set_pattern(stripes[5].read_buf, 128*1024, 0x7572c28f7a91eb22); // xor
set_pattern(stripes[6].read_buf, 128*1024, 0xb4542b32a560fe26); // 2nd EC chunk
set_pattern(stripes[7].read_buf, 128*1024, 0);
// Reconstruct
reconstruct_stripes_ec(stripes, 8, 5, bmp);
check_pattern(stripes[4].read_buf, 128*1024, 0x280a70a549add9a2);
assert(bitmaps[4] == 0xFFFFFFFF);
free(data_buf);
// Done
use_ec(8, 5, false);
}

View File

@@ -29,7 +29,7 @@ start_osd_on()
{
local i=$1
local dev=$2
build/src/vitastor-osd --osd_num $i --bind_address 127.0.0.1 $NO_SAME $OSD_ARGS --etcd_address $ETCD_URL \
build/src/vitastor-osd --osd_num $i --bind_address $ETCD_IP $NO_SAME $OSD_ARGS --etcd_address $ETCD_URL \
$(build/src/vitastor-disk simple-offsets --format options $OFFSET_ARGS $dev $OFFSET_ARGS 2>/dev/null) \
>>./testdata/osd$i.log 2>&1 &
eval OSD${i}_PID=$!