Compare commits
10 Commits
developmen
...
vitastor-p
Author | SHA1 | Date | |
---|---|---|---|
091f8238a0 | |||
ff50df6a96 | |||
52c1cfb0b4 | |||
ac3be657fc | |||
368a45c68d | |||
9b13b110ad | |||
ae81a1f5d4 | |||
0e98d781f7 | |||
922e37ba11 | |||
dba5e6397c |
12
.swcrc
Normal file
12
.swcrc
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"$schema": "https://swc.rs/schema.json",
|
||||
"jsc": {
|
||||
"parser": {
|
||||
"syntax": "typescript"
|
||||
},
|
||||
"target": "es2017"
|
||||
},
|
||||
"module": {
|
||||
"type": "commonjs"
|
||||
}
|
||||
}
|
@@ -83,7 +83,7 @@ export type ResultObject = {
|
||||
export type CommandPromise = {
|
||||
resolve: (results?: ResultObject[]) => void;
|
||||
reject: (error: Error) => void;
|
||||
timeout: NodeJS.Timer | null;
|
||||
timeout: NodeJS.Timeout | null;
|
||||
};
|
||||
export type HandlerCallback = (error: (Error & { code?: number }) | null | undefined, result?: any) => void;
|
||||
export type HandlerFunction = (payload: object, uids: string, callback: HandlerCallback) => void;
|
||||
@@ -254,7 +254,7 @@ export async function sendWorkerCommand(
|
||||
}
|
||||
rpcLogger.info('sending command', { toWorkers, toHandler, uids, payload });
|
||||
return new Promise((resolve, reject) => {
|
||||
let timeout: NodeJS.Timer | null = null;
|
||||
let timeout: NodeJS.Timeout | null = null;
|
||||
if (timeoutMs) {
|
||||
timeout = setTimeout(() => {
|
||||
delete uidsToCommandPromise[uids];
|
||||
|
@@ -435,7 +435,6 @@ export default class Server {
|
||||
this._server.on('connection', sock => {
|
||||
// Setting no delay of the socket to the value configured
|
||||
// TODO fix this
|
||||
// @ts-expect-errors
|
||||
sock.setNoDelay(this.isNoDelay());
|
||||
sock.on('error', err => this._logger.info(
|
||||
'socket error - request rejected', { error: err }));
|
||||
|
@@ -1,10 +1,10 @@
|
||||
const { http, https } = require('httpagent');
|
||||
const url = require('url');
|
||||
const AWS = require('aws-sdk');
|
||||
const Sproxy = require('sproxydclient');
|
||||
const Hyperdrive = require('hdclient');
|
||||
const HttpsProxyAgent = require('https-proxy-agent');
|
||||
|
||||
require("aws-sdk/lib/maintenance_mode_message").suppress = true;
|
||||
|
||||
const constants = require('../../constants');
|
||||
const DataFileBackend = require('./file/DataFileInterface');
|
||||
const inMemory = require('./in_memory/datastore').backend;
|
||||
@@ -25,8 +25,13 @@ function parseLC(config, vault) {
|
||||
if (locationObj.type === 'file') {
|
||||
clients[location] = new DataFileBackend(config);
|
||||
}
|
||||
if (locationObj.type === 'vitastor') {
|
||||
const VitastorBackend = require('./vitastor/VitastorBackend');
|
||||
clients[location] = new VitastorBackend(location, locationObj.details);
|
||||
}
|
||||
if (locationObj.type === 'scality') {
|
||||
if (locationObj.details.connector.sproxyd) {
|
||||
const Sproxy = require('sproxydclient');
|
||||
clients[location] = new Sproxy({
|
||||
bootstrap: locationObj.details.connector
|
||||
.sproxyd.bootstrap,
|
||||
@@ -41,6 +46,7 @@ function parseLC(config, vault) {
|
||||
});
|
||||
clients[location].clientType = 'scality';
|
||||
} else if (locationObj.details.connector.hdclient) {
|
||||
const Hyperdrive = require('hdclient');
|
||||
clients[location] = new Hyperdrive.hdcontroller.HDProxydClient(
|
||||
locationObj.details.connector.hdclient);
|
||||
clients[location].clientType = 'scality';
|
||||
|
860
lib/storage/data/vitastor/VitastorBackend.ts
Normal file
860
lib/storage/data/vitastor/VitastorBackend.ts
Normal file
@@ -0,0 +1,860 @@
|
||||
// Zenko CloudServer Vitastor data storage backend adapter
|
||||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
import { Readable } from 'stream';
|
||||
|
||||
const vitastor = require('vitastor');
|
||||
|
||||
const VOLUME_MAGIC = 'VstS3Vol';
|
||||
const OBJECT_MAGIC = 'VstS3Obj';
|
||||
const FLAG_DELETED = 2n;
|
||||
|
||||
type Volume = {
|
||||
id: number,
|
||||
pool_id: number,
|
||||
min_io_size: number,
|
||||
max_atomic_write_size: number,
|
||||
immediate_commit: boolean,
|
||||
partial_blocks: {
|
||||
[block_num: string]: {
|
||||
sectors: { [abs_offset]: Buffer },
|
||||
version: bigint,
|
||||
queue: (() => any)[],
|
||||
},
|
||||
},
|
||||
header: {
|
||||
location: string,
|
||||
bucket: string,
|
||||
max_size: number,
|
||||
create_ts: number,
|
||||
opened_ts: number,
|
||||
size: number,
|
||||
objects: number,
|
||||
removed_objects: number,
|
||||
object_bytes: number,
|
||||
removed_bytes: number,
|
||||
},
|
||||
};
|
||||
|
||||
type ObjectHeader = {
|
||||
size: number,
|
||||
key: string,
|
||||
part_num?: number,
|
||||
};
|
||||
|
||||
// FIXME: Write some tests
|
||||
class VitastorBackend
|
||||
{
|
||||
locationName: string;
|
||||
config: {
|
||||
// FIXME: add s3_ prefix or move vitastor config into subkey?
|
||||
pool_id: number,
|
||||
metadata_image: string,
|
||||
metadata_pool_id: number,
|
||||
metadata_inode_num: number,
|
||||
size_buckets: number[],
|
||||
size_bucket_mul: number,
|
||||
id_batch_size: number,
|
||||
write_chunk_size: number,
|
||||
read_chunk_size: number,
|
||||
pack_objects: boolean,
|
||||
bump_volume_interval: number,
|
||||
// and also other parameters for vitastor itself
|
||||
};
|
||||
next_id: number;
|
||||
alloc_id: number;
|
||||
opened: boolean;
|
||||
on_open: ((...args: any[]) => void)[] | null;
|
||||
open_error: Error | null;
|
||||
cli: any;
|
||||
kv: any;
|
||||
volumes: {
|
||||
[bucket: string]: {
|
||||
[max_size: string]: Volume,
|
||||
},
|
||||
};
|
||||
volumes_by_id: {
|
||||
[id: string]: Volume,
|
||||
};
|
||||
volume_delete_stats: {
|
||||
[id: string]: {
|
||||
count: number,
|
||||
bytes: number,
|
||||
},
|
||||
};
|
||||
|
||||
constructor(locationName, config)
|
||||
{
|
||||
this.locationName = locationName;
|
||||
this.config = config;
|
||||
// validate config
|
||||
this.config.pool_id = Number(this.config.pool_id) || 0;
|
||||
if (!this.config.pool_id)
|
||||
throw new Error('default pool_id is required for Vitastor');
|
||||
if (!this.config.metadata_image && (!this.config.metadata_pool_id || !this.config.metadata_inode_num))
|
||||
throw new Error('metadata_image or metadata_inode is required for Vitastor');
|
||||
if (!this.config.size_buckets || !this.config.size_buckets.length)
|
||||
this.config.size_buckets = [ 32*1024, 128*1024, 512*1024, 2*1024, 8*1024 ];
|
||||
this.config.size_bucket_mul = Number(this.config.size_bucket_mul) || 2;
|
||||
this.config.id_batch_size = Number(this.config.id_batch_size) || 100;
|
||||
this.config.write_chunk_size = Number(this.config.write_chunk_size) || 4*1024*1024;
|
||||
this.config.read_chunk_size = Number(this.config.read_chunk_size) || 4*1024*1024;
|
||||
this.config.pack_objects = !!this.config.pack_objects;
|
||||
this.config.bump_volume_interval = Number(this.config.bump_volume_interval) || 300; // 5 minutes
|
||||
// state
|
||||
this.next_id = 1;
|
||||
this.alloc_id = 0;
|
||||
this.opened = false;
|
||||
this.on_open = null;
|
||||
this.open_error = null;
|
||||
this.cli = new vitastor.Client(config);
|
||||
this.kv = new vitastor.KV(this.cli);
|
||||
// we group objects into volumes by bucket and size
|
||||
this.volumes = {};
|
||||
this.volumes_by_id = {};
|
||||
this.volume_delete_stats = {};
|
||||
}
|
||||
|
||||
async _makeVolumeId()
|
||||
{
|
||||
if (this.next_id <= this.alloc_id)
|
||||
{
|
||||
return this.next_id++;
|
||||
}
|
||||
const id_key = 'id'+this.config.pool_id;
|
||||
const [ err, prev ] = await new Promise<[ any, string ]>(ok => this.kv.get(id_key, (err, value) => ok([ err, value ])));
|
||||
if (err && err != vitastor.ENOENT)
|
||||
{
|
||||
throw new Error(err);
|
||||
}
|
||||
const new_id = (parseInt(prev) || 0) + 1;
|
||||
this.next_id = new_id;
|
||||
this.alloc_id = this.next_id + this.config.id_batch_size - 1;
|
||||
await new Promise((ok, no) => this.kv.set(id_key, this.alloc_id, err => (err ? no(new Error(err)) : ok(null)), cas_old => cas_old === prev));
|
||||
return this.next_id;
|
||||
}
|
||||
|
||||
async _getVolume(bucketName, size)
|
||||
{
|
||||
if (!this.opened)
|
||||
{
|
||||
if (this.on_open)
|
||||
{
|
||||
await new Promise(ok => this.on_open!.push(ok));
|
||||
}
|
||||
else
|
||||
{
|
||||
this.on_open = [];
|
||||
if (this.config.metadata_image)
|
||||
{
|
||||
const img = new vitastor.Image(this.cli, this.config.metadata_image);
|
||||
const info = await new Promise<{ pool_id: number, inode_num: number }>(ok => img.get_info(ok));
|
||||
this.config.metadata_pool_id = info.pool_id;
|
||||
this.config.metadata_inode_num = info.inode_num;
|
||||
}
|
||||
const kv_config = {};
|
||||
for (const key in this.config)
|
||||
{
|
||||
if (key.substr(0, 3) === 'kv_')
|
||||
kv_config[key] = this.config[key];
|
||||
}
|
||||
this.open_error = await new Promise(ok => this.kv.open(
|
||||
this.config.metadata_pool_id, this.config.metadata_inode_num,
|
||||
kv_config, err => ok(err ? new Error(err) : null)
|
||||
));
|
||||
this.opened = true;
|
||||
this.on_open.map(cb => setImmediate(cb));
|
||||
this.on_open = null;
|
||||
}
|
||||
}
|
||||
if (this.open_error)
|
||||
{
|
||||
throw this.open_error;
|
||||
}
|
||||
let i;
|
||||
for (i = 0; i < this.config.size_buckets.length && size >= this.config.size_buckets[i]; i++) {}
|
||||
let s;
|
||||
if (i < this.config.size_buckets.length)
|
||||
s = this.config.size_buckets[i];
|
||||
else if (this.config.size_bucket_mul > 1)
|
||||
{
|
||||
while (size >= s)
|
||||
s = Math.floor(this.config.size_bucket_mul * s);
|
||||
}
|
||||
if (!this.volumes[bucketName])
|
||||
{
|
||||
this.volumes[bucketName] = {};
|
||||
}
|
||||
if (this.volumes[bucketName][s])
|
||||
{
|
||||
return this.volumes[bucketName][s];
|
||||
}
|
||||
const new_id = await this._makeVolumeId();
|
||||
const min_io_size = this.cli.get_min_io_size(this.config.pool_id);
|
||||
const max_atomic_write_size = this.cli.get_max_atomic_write_size(this.config.pool_id);
|
||||
const immediate_commit = this.cli.get_immediate_commit(this.config.pool_id) == vitastor.IMMEDIATE_ALL;
|
||||
if (!min_io_size)
|
||||
{
|
||||
throw new Error('Pool '+this.config.pool_id+' does not exist');
|
||||
}
|
||||
const new_vol = this.volumes[bucketName][s] = {
|
||||
id: new_id,
|
||||
pool_id: this.config.pool_id,
|
||||
min_io_size,
|
||||
max_atomic_write_size,
|
||||
immediate_commit,
|
||||
partial_blocks: {/*
|
||||
[block_num]: {
|
||||
sectors: { [absolute offset]: Buffer },
|
||||
version: BigInt,
|
||||
queue: null | function[],
|
||||
},
|
||||
*/},
|
||||
header: {
|
||||
location: this.locationName,
|
||||
bucket: bucketName,
|
||||
max_size: s,
|
||||
create_ts: Date.now(),
|
||||
// FIXME: bump opened_ts regularly while the volume is active
|
||||
opened_ts: Date.now(),
|
||||
size: min_io_size, // initial position is right after header
|
||||
objects: 0,
|
||||
removed_objects: 0,
|
||||
object_bytes: 0,
|
||||
removed_bytes: 0,
|
||||
},
|
||||
};
|
||||
this.volumes_by_id[new_id] = new_vol;
|
||||
const header_text = JSON.stringify(this.volumes[bucketName][s].header);
|
||||
const buf = Buffer.alloc(new_vol.min_io_size);
|
||||
buf.write(VOLUME_MAGIC + header_text, 0);
|
||||
await new Promise((ok, no) => this.cli.write(
|
||||
new_vol.pool_id, new_vol.id, 0, buf, err => (err ? no(new Error(err)) : ok(null))
|
||||
));
|
||||
await new Promise((ok, no) => this.kv.set(
|
||||
'vol_'+new_vol.pool_id+'_'+new_vol.id, header_text, err => (err ? no(new Error(err)) : ok(null)), cas_old => !cas_old
|
||||
));
|
||||
return new_vol;
|
||||
}
|
||||
|
||||
toObjectGetInfo(objectKey, bucketName, storageLocation)
|
||||
{
|
||||
throw new Error('Not implemented');
|
||||
}
|
||||
|
||||
/**
|
||||
* reqUids: string, // request-ids for log, usually joined by ':'
|
||||
* keyContext: {
|
||||
* // a lot of shit, basically all metadata
|
||||
* bucketName,
|
||||
* objectKey,
|
||||
* owner?,
|
||||
* namespace?,
|
||||
* partNumber?,
|
||||
* uploadId?,
|
||||
* metaHeaders?,
|
||||
* isDeleteMarker?,
|
||||
* tagging?,
|
||||
* contentType?,
|
||||
* cacheControl?,
|
||||
* contentDisposition?,
|
||||
* contentEncoding?,
|
||||
* },
|
||||
* callback: (error, objectGetInfo: any) => void,
|
||||
*/
|
||||
put(stream, size, keyContext, reqUids, callback)
|
||||
{
|
||||
callback = once(callback);
|
||||
this._getVolume(keyContext.bucketName, size)
|
||||
.then(vol => this._put(vol, stream, size, keyContext, reqUids, callback))
|
||||
.catch(callback);
|
||||
}
|
||||
|
||||
_put(vol, stream, size, keyContext, reqUids, callback)
|
||||
{
|
||||
new VitastorObjectWriter(this.cli, vol, stream, size, keyContext, this.config.write_chunk_size, this.config.pack_objects, callback);
|
||||
}
|
||||
|
||||
_checkGetInfo(objectGetInfo)
|
||||
{
|
||||
if (!(objectGetInfo instanceof Object) ||
|
||||
!objectGetInfo.key || !(objectGetInfo.key instanceof Object) ||
|
||||
!objectGetInfo.key.pool || !objectGetInfo.key.volume ||
|
||||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
|
||||
{
|
||||
throw new Error('objectGetInfo must be { key: { pool, volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* objectGetInfo: {
|
||||
* key: { volume, offset, hdrlen, size }, // from put
|
||||
* size,
|
||||
* start,
|
||||
* dataStoreName,
|
||||
* dataStoreETag,
|
||||
* range,
|
||||
* response: ServerResponse,
|
||||
* },
|
||||
* range?: [ start, end ], // like in HTTP - first byte index, last byte index
|
||||
* callback: (error, readStream) => void,
|
||||
*/
|
||||
get(objectGetInfo, range, reqUids, callback)
|
||||
{
|
||||
this._checkGetInfo(objectGetInfo);
|
||||
const [ start, end ] = range || [];
|
||||
if (start < 0 || end < 0 || end != null && start != null && end < start || start >= objectGetInfo.key.size)
|
||||
{
|
||||
throw new Error('Invalid range: '+start+'-'+end);
|
||||
}
|
||||
let offset = objectGetInfo.key.offset + objectGetInfo.key.hdrlen + (start || 0);
|
||||
let len = objectGetInfo.key.size - (start || 0);
|
||||
if (end)
|
||||
{
|
||||
const len2 = end - (start || 0) + 1;
|
||||
if (len2 < len)
|
||||
len = len2;
|
||||
}
|
||||
callback(null, new VitastorReadStream(this.cli, objectGetInfo.key.pool,
|
||||
objectGetInfo.key.volume, offset, len, this.config.read_chunk_size));
|
||||
}
|
||||
|
||||
/**
|
||||
* objectGetInfo: {
|
||||
* key: { volume, offset, hdrlen, size }, // from put
|
||||
* size,
|
||||
* start,
|
||||
* dataStoreName,
|
||||
* dataStoreETag,
|
||||
* range,
|
||||
* response: ServerResponse,
|
||||
* },
|
||||
* callback: (error) => void,
|
||||
*/
|
||||
delete(objectGetInfo, reqUids, callback)
|
||||
{
|
||||
callback = once(callback);
|
||||
this._delete(objectGetInfo, reqUids)
|
||||
.then(callback)
|
||||
.catch(callback);
|
||||
}
|
||||
|
||||
async _delete(objectGetInfo, reqUids)
|
||||
{
|
||||
this._checkGetInfo(objectGetInfo);
|
||||
const vol = this.volumes_by_id[objectGetInfo.key.volume];
|
||||
const min_io_size = this.cli.get_min_io_size(objectGetInfo.key.pool);
|
||||
const max_atomic_write_size = this.cli.get_max_atomic_write_size(objectGetInfo.key.pool);
|
||||
const in_sect_pos = (objectGetInfo.key.offset % min_io_size);
|
||||
const sect_pos = (objectGetInfo.key.offset - in_sect_pos);
|
||||
const block_num = Math.floor(objectGetInfo.key.offset / max_atomic_write_size);
|
||||
// RMW with CAS, vol.partial_blocks is used as CAS cache
|
||||
const vol_block = vol && vol.partial_blocks[block_num];
|
||||
let version = vol_block && vol_block.version || 0;
|
||||
let buf = vol_block && vol_block.sectors[sect_pos] || null;
|
||||
if (!buf)
|
||||
{
|
||||
[ buf, version ] = await new Promise<[ Buffer, bigint ]>((ok, no) => this.cli.read(
|
||||
objectGetInfo.key.pool, objectGetInfo.key.volume, sect_pos, min_io_size,
|
||||
(err, buf, version) => (err ? no(new Error(err)) : ok([ buf, version ]))
|
||||
));
|
||||
}
|
||||
const magic = buf.slice(in_sect_pos, in_sect_pos+8).toString();
|
||||
const flags = buf.readBigInt64LE(in_sect_pos+8);
|
||||
const json_len = Number(buf.readBigInt64LE(in_sect_pos+16));
|
||||
let json_hdr;
|
||||
if (in_sect_pos+24+json_len <= buf.length)
|
||||
{
|
||||
try
|
||||
{
|
||||
json_hdr = JSON.parse(buf.slice(in_sect_pos+24, in_sect_pos+24+json_len).toString());
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
}
|
||||
}
|
||||
if (magic !== OBJECT_MAGIC || !json_hdr || json_hdr.size !== objectGetInfo.key.size)
|
||||
{
|
||||
throw new Error(
|
||||
'header of object with size '+objectGetInfo.key.size+
|
||||
' bytes not found in volume '+objectGetInfo.key.volume+' at '+objectGetInfo.key.offset
|
||||
);
|
||||
}
|
||||
else if (!(flags & FLAG_DELETED))
|
||||
{
|
||||
buf.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
|
||||
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, buf, { version: version+1n }, ok));
|
||||
if (err == vitastor.EINTR)
|
||||
{
|
||||
// CAS failure - retry
|
||||
if (vol)
|
||||
{
|
||||
vol.clear_partial_block(sect_pos);
|
||||
}
|
||||
await this._delete(objectGetInfo, reqUids);
|
||||
}
|
||||
else if (err)
|
||||
{
|
||||
throw new Error(err);
|
||||
}
|
||||
else
|
||||
{
|
||||
// FIXME: Write deletion statistics to volumes
|
||||
// FIXME: Implement defragmentation
|
||||
const del_stat = this.volume_delete_stats[objectGetInfo.key.volume] = (this.volume_delete_stats[objectGetInfo.key.volume] || { count: 0, bytes: 0 });
|
||||
del_stat.count++;
|
||||
del_stat.bytes += objectGetInfo.key.size;
|
||||
if (vol_block)
|
||||
{
|
||||
vol_block.version = version+1n;
|
||||
vol_block.sectors[sect_pos] = buf;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* config: full zenko server config,
|
||||
* callback: (error, stats) => void, // stats is the returned statistics in arbitrary format
|
||||
*/
|
||||
getDiskUsage(config, reqUids, callback)
|
||||
{
|
||||
// FIXME: Iterate all volumes and return its sizes and deletion statistics, or maybe just sizes
|
||||
callback(null, {});
|
||||
}
|
||||
}
|
||||
|
||||
class VitastorReadStream extends Readable
|
||||
{
|
||||
cli: any;
|
||||
pool_id: number;
|
||||
volume_id: number;
|
||||
offset: number;
|
||||
end: number;
|
||||
read_chunk_size: number;
|
||||
min_io_size: number;
|
||||
pos: number;
|
||||
_reading: boolean;
|
||||
|
||||
constructor(cli, pool_id, volume_id, offset, len, read_chunk_size, stream_options = undefined)
|
||||
{
|
||||
super(stream_options);
|
||||
this.cli = cli;
|
||||
this.pool_id = pool_id;
|
||||
this.volume_id = volume_id;
|
||||
this.offset = offset;
|
||||
this.end = offset + len;
|
||||
this.pos = offset;
|
||||
this.min_io_size = this.cli.get_min_io_size(pool_id);
|
||||
this.read_chunk_size = read_chunk_size;
|
||||
this._reading = false;
|
||||
}
|
||||
|
||||
_read(n)
|
||||
{
|
||||
if (this._reading)
|
||||
{
|
||||
return;
|
||||
}
|
||||
// FIXME: Validate object header
|
||||
const chunk_size = n && this.read_chunk_size < n ? n : this.read_chunk_size;
|
||||
const read_offset = this.pos;
|
||||
const round_offset = read_offset - (read_offset % this.min_io_size);
|
||||
let read_end = this.end <= read_offset+chunk_size ? this.end : read_offset+chunk_size;
|
||||
const round_end = (read_end % this.min_io_size)
|
||||
? read_end + this.min_io_size - (read_end % this.min_io_size)
|
||||
: read_end;
|
||||
if (round_end <= this.end)
|
||||
read_end = round_end;
|
||||
this.pos = read_end;
|
||||
if (read_end <= read_offset)
|
||||
{
|
||||
// EOF
|
||||
this.push(null);
|
||||
return;
|
||||
}
|
||||
this._reading = true;
|
||||
this.cli.read(this.pool_id, this.volume_id, round_offset, round_end-round_offset, (err, buf, version) =>
|
||||
{
|
||||
this._reading = false;
|
||||
if (err)
|
||||
{
|
||||
this.destroy(new Error(err));
|
||||
return;
|
||||
}
|
||||
if (read_offset != round_offset || round_end != read_end)
|
||||
{
|
||||
buf = buf.subarray(read_offset-round_offset, buf.length-(round_end-read_end));
|
||||
}
|
||||
if (this.push(buf))
|
||||
{
|
||||
this._read(n);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class VitastorObjectWriter
|
||||
{
|
||||
cli: any;
|
||||
vol: Volume;
|
||||
read_stream: Readable;
|
||||
object_size: number;
|
||||
pack_objects: boolean;
|
||||
write_chunk_size: number;
|
||||
object_header_buf: Buffer;
|
||||
callback: (Error?, any?) => any;
|
||||
|
||||
object_get_info: any;
|
||||
waiting: number = 0;
|
||||
full_size: number = 0;
|
||||
object_pos: number = 0;
|
||||
cur_pos: number = 0;
|
||||
cur_chunks: Buffer[] = [];
|
||||
cur_size: number = 0;
|
||||
err: Error|null = null;
|
||||
|
||||
constructor(cli, vol, read_stream, size, key_context, write_chunk_size, pack_objects, callback)
|
||||
{
|
||||
this.cli = cli;
|
||||
this.vol = vol;
|
||||
this.read_stream = read_stream;
|
||||
this.object_size = size;
|
||||
this.write_chunk_size = write_chunk_size;
|
||||
this.pack_objects = pack_objects;
|
||||
this.callback = callback;
|
||||
this.object_header_buf = this.make_header(size, key_context);
|
||||
this.start_write();
|
||||
}
|
||||
|
||||
make_header(size, key_context)
|
||||
{
|
||||
const vol = this.vol;
|
||||
// header is: <8 bytes magic> <8 bytes flags> <8 bytes json length> <json>
|
||||
const object_header: ObjectHeader = {
|
||||
size,
|
||||
key: key_context.objectKey,
|
||||
};
|
||||
if (key_context.partNumber)
|
||||
{
|
||||
object_header.part_num = key_context.partNumber;
|
||||
}
|
||||
const hdr_begin_buf = Buffer.alloc(24);
|
||||
const hdr_json_buf = Buffer.from(JSON.stringify(object_header), 'utf-8');
|
||||
hdr_begin_buf.write(OBJECT_MAGIC);
|
||||
hdr_begin_buf.writeBigInt64LE(BigInt(hdr_json_buf.length), 16);
|
||||
const object_header_buf = Buffer.concat([ hdr_begin_buf, hdr_json_buf ]);
|
||||
this.full_size = object_header_buf.length + this.object_size;
|
||||
// do not allow header to cross sector boundary when packing objects
|
||||
if (this.pack_objects &&
|
||||
(vol.header.size % vol.min_io_size) != 0 &&
|
||||
((vol.header.size % vol.min_io_size) + object.header_buf.length) > vol.min_io_size)
|
||||
{
|
||||
vol.header.size += (vol.min_io_size - (vol.header.size % vol.min_io_size));
|
||||
}
|
||||
this.object_pos = vol.header.size;
|
||||
vol.header.size += this.full_size;
|
||||
if (!this.pack_objects && (vol.header.size % vol.min_io_size) != 0)
|
||||
{
|
||||
vol.header.size += vol.min_io_size - (vol.header.size % vol.min_io_size);
|
||||
}
|
||||
this.object_get_info = {
|
||||
pool: vol.pool_id,
|
||||
volume: vol.id,
|
||||
offset: this.object_pos,
|
||||
hdrlen: object_header_buf.length,
|
||||
size: this.object_size,
|
||||
};
|
||||
return object_header_buf;
|
||||
}
|
||||
|
||||
slice_list(target_size): [ Buffer[], number ]
|
||||
{
|
||||
let write_size = this.cur_size;
|
||||
let write_chunks = this.cur_chunks;
|
||||
let pos = this.cur_chunks.length;
|
||||
this.cur_size = 0;
|
||||
while (pos > 0 && write_size-write_chunks[pos-1].length >= target_size)
|
||||
{
|
||||
write_size -= write_chunks[pos-1].length;
|
||||
this.cur_size += write_chunks[pos-1].length;
|
||||
pos--;
|
||||
}
|
||||
this.cur_chunks = write_chunks.slice(pos);
|
||||
write_chunks = write_chunks.slice(0, pos);
|
||||
if (write_size > target_size)
|
||||
{
|
||||
const end_sz = (write_size - target_size);
|
||||
const begin_sz = (write_chunks[pos-1].length - end_sz);
|
||||
this.cur_chunks.unshift(write_chunks[pos-1].slice(begin_sz));
|
||||
write_chunks[pos-1] = write_chunks[pos-1].slice(0, begin_sz);
|
||||
this.cur_size += end_sz;
|
||||
write_size = target_size;
|
||||
}
|
||||
return [ write_chunks, write_size ];
|
||||
}
|
||||
|
||||
write_packed_beginning(vol, cur_pos, cur_size, cur_chunks)
|
||||
{
|
||||
// In the pack_objects mode, we always write partial beginning and end with CAS
|
||||
if ((cur_pos % vol.max_atomic_write_size) ||
|
||||
((cur_pos + cur_size) % vol.max_atomic_write_size) &&
|
||||
Math.floor((cur_pos + cur_size) / vol.max_atomic_write_size) == Math.floor(cur_pos / vol.max_atomic_write_size))
|
||||
{
|
||||
const block_num = Math.floor(cur_pos / vol.max_atomic_write_size);
|
||||
const block_pos = block_num * vol.max_atomic_write_size;
|
||||
const in_block_pos = cur_pos - block_pos;
|
||||
const [ write_chunks, write_size ] = this.slice_list(vol.max_atomic_write_size - in_block_pos);
|
||||
const write_pos = cur_pos;
|
||||
cur_pos += write_size;
|
||||
if ((write_pos+write_size) % vol.min_io_size)
|
||||
{
|
||||
// Incomplete beginning may be padded with zeros - but it should only happen with small objects
|
||||
if (this.full_size >= vol.max_atomic_write_size)
|
||||
{
|
||||
// It's a bug if it happens with a large object
|
||||
throw new Error('BUG: attempt to write incomplete beginning of a large object, probably too small write_chunk_size');
|
||||
}
|
||||
const pad = vol.min_io_size - ((write_pos+write_size) % vol.min_io_size);
|
||||
write_size += pad;
|
||||
write_chunks.push_back(Buffer.alloc(pad));
|
||||
}
|
||||
this.cas_rmw_or_wait(write_pos, write_chunks, write_size);
|
||||
}
|
||||
}
|
||||
|
||||
cas_rmw_or_wait(write_pos, write_chunks, write_size)
|
||||
{
|
||||
const block_num = Math.floor(write_pos / vol.max_atomic_write_size);
|
||||
if (!this.vol.partial_blocks[block_num])
|
||||
{
|
||||
this.vol.partial_blocks[block_num] = {
|
||||
sectors: {},
|
||||
version: 0n,
|
||||
queue: null,
|
||||
};
|
||||
}
|
||||
const vol_block = this.vol.partial_blocks[block_num];
|
||||
if (vol_block.queue)
|
||||
{
|
||||
vol_block.queue.push(() => this.cas_rmw(write_pos, write_chunks, write_size).catch(e => this.on_error(e)));
|
||||
}
|
||||
else
|
||||
{
|
||||
vol_block.queue = [];
|
||||
this.cas_rmw(write_pos, write_chunks, write_size).catch(e => this.on_error(e));
|
||||
}
|
||||
}
|
||||
|
||||
on_error(err)
|
||||
{
|
||||
if (err)
|
||||
{
|
||||
if (!this.err)
|
||||
{
|
||||
this.waiting--;
|
||||
}
|
||||
this.err = (err instanceof Error ? err : new Error(err));
|
||||
if (!this.waiting)
|
||||
{
|
||||
this.finish();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async cas_rmw(write_pos, write_chunks, write_size)
|
||||
{
|
||||
const block_num = Math.floor(write_pos / vol.max_atomic_write_size);
|
||||
if (Math.floor((write_pos+write_size-1) / vol.max_atomic_write_size) != block_num)
|
||||
{
|
||||
throw new Error('BUG: cas_rmw() is only for 1 atomic block');
|
||||
}
|
||||
const vol_block = this.vol.partial_blocks[block_num];
|
||||
let res = vitastor.EINTR;
|
||||
while (res == vitastor.EINTR)
|
||||
{
|
||||
let cas_chunks = write_chunks;
|
||||
let cas_size = write_size;
|
||||
let cas_pos = write_pos;
|
||||
// FIXME And now we want to do it in parallel
|
||||
if (cas_pos % vol.min_io_size)
|
||||
{
|
||||
// Read the beginning
|
||||
const pad = (cas_pos % vol.min_io_size);
|
||||
cas_pos -= pad;
|
||||
if (!vol_block.sectors[cas_pos])
|
||||
{
|
||||
this.waiting++;
|
||||
const [ err, buf, version ] = await new Promise(ok => this.cli.read(this.vol.pool_id, this.vol.id,
|
||||
cas_pos, vol.min_io_size, (err, buf, version) => ok([ err, buf, version ])));
|
||||
this.waiting--;
|
||||
if (err)
|
||||
{
|
||||
this.on_error(err);
|
||||
return;
|
||||
}
|
||||
vol_block.sectors[cas_pos] = buf;
|
||||
vol_block.version = version;
|
||||
}
|
||||
cas_chunks = [ vol_block.sectors[cas_pos].slice(0, pad), ...cas_chunks ];
|
||||
cas_size += pad;
|
||||
}
|
||||
if ((cas_pos+cas_size) % vol.min_io_size)
|
||||
{
|
||||
// Read the end
|
||||
const pad = vol.min_io_size - ((cas_pos+cas_size) % vol.min_io_size);
|
||||
cas_size += pad;
|
||||
const end_sect = cas_pos + cas_size - vol.min_io_size;
|
||||
if (!vol_block.sectors[end_sect])
|
||||
{
|
||||
this.waiting++;
|
||||
const [ err, buf, version ] = await new Promise(ok => this.cli.read(this.vol.pool_id, this.vol.id,
|
||||
end_sect, vol.min_io_size, (err, buf, version) => ok([ err, buf, version ])));
|
||||
this.waiting--;
|
||||
if (err)
|
||||
{
|
||||
this.on_error(err);
|
||||
return;
|
||||
}
|
||||
vol_block.sectors[end_sect] = buf;
|
||||
vol_block.version = version;
|
||||
}
|
||||
cas_chunks = [ ...cas_chunks, vol_block.sectors[end_sect].slice(vol.min_io_size-pad) ];
|
||||
}
|
||||
vol_block.version = vol_block.version+1n;
|
||||
this.waiting++;
|
||||
res = await new Promise(ok => this.cli.write(this.vol.pool_id, this.vol.id, cas_pos, cas_chunks, { version: vol_block.version }, (res) => ok(res)));
|
||||
this.waiting--;
|
||||
if (res == vitastor.EINTR)
|
||||
{
|
||||
vol.clear_partial_block(cas_pos);
|
||||
}
|
||||
}
|
||||
this.handle_write(res);
|
||||
}
|
||||
|
||||
write_unpacked_chunk(last)
|
||||
{
|
||||
if (this.err)
|
||||
{
|
||||
return;
|
||||
}
|
||||
const write_pos = this.cur_pos;
|
||||
let write_size = this.cur_size;
|
||||
let write_chunks = this.cur_chunks;
|
||||
if (!last)
|
||||
{
|
||||
[ write_chunks, write_size ] = this.slice_list(this.write_chunk_size);
|
||||
}
|
||||
else if (this.full_size % this.vol.min_io_size)
|
||||
{
|
||||
// zero pad
|
||||
const pad_sz = this.vol.min_io_size - (this.full_size % this.vol.min_io_size);
|
||||
write_chunks.push(Buffer.alloc(pad_sz));
|
||||
write_size += pad_sz;
|
||||
}
|
||||
this.cur_pos += write_size;
|
||||
this.waiting++;
|
||||
this.cli.write(this.vol.pool_id, this.vol.id, write_pos, write_chunks, (res) =>
|
||||
{
|
||||
this.waiting--;
|
||||
this.handle_write(res);
|
||||
});
|
||||
}
|
||||
|
||||
handle_write(res)
|
||||
{
|
||||
if (res)
|
||||
{
|
||||
this.on_error(res);
|
||||
return;
|
||||
}
|
||||
if (!this.waiting)
|
||||
{
|
||||
if (!this.err && !this.vol.immediate_commit)
|
||||
{
|
||||
this.cli.sync((res) =>
|
||||
{
|
||||
if (res)
|
||||
this.err = new Error(res);
|
||||
this.finish();
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
this.finish();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
finish()
|
||||
{
|
||||
this.callback(this.err, this.err ? null : this.object_get_info);
|
||||
}
|
||||
|
||||
start_write()
|
||||
{
|
||||
this.cur_pos = this.object_pos;
|
||||
this.cur_chunks = [ this.object_header_buf ];
|
||||
this.cur_size = this.object_header_buf.length;
|
||||
// increment this.waiting at the beginning and on each storage write
|
||||
this.waiting++;
|
||||
// Stream data
|
||||
this.read_stream.on('error', (e) =>
|
||||
{
|
||||
this.on_error(e);
|
||||
});
|
||||
this.read_stream.on('end', () =>
|
||||
{
|
||||
if (this.err)
|
||||
{
|
||||
return;
|
||||
}
|
||||
this.waiting--;
|
||||
if (this.cur_size > 0)
|
||||
{
|
||||
// write last chunk
|
||||
this.write_unpacked_chunk(true);
|
||||
}
|
||||
else if (!this.waiting)
|
||||
{
|
||||
this.finish();
|
||||
}
|
||||
});
|
||||
this.read_stream.on('data', (chunk) =>
|
||||
{
|
||||
if (this.err)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (this.cur_size + chunk.length > this.full_size)
|
||||
{
|
||||
this.on_error('data exceeds object size');
|
||||
return;
|
||||
}
|
||||
this.cur_chunks.push(chunk);
|
||||
this.cur_size += chunk.length;
|
||||
if (this.cur_size >= this.write_chunk_size)
|
||||
{
|
||||
// got a complete chunk, write it out
|
||||
this.write_unpacked_chunk(this.cur_size >= this.full_size);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function once(callback)
|
||||
{
|
||||
let called = false;
|
||||
return function()
|
||||
{
|
||||
if (!called)
|
||||
{
|
||||
called = true;
|
||||
callback.apply(null, arguments);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = VitastorBackend;
|
@@ -10,21 +10,21 @@ function trySetDirSyncFlag(path) {
|
||||
|
||||
const GETFLAGS = 2148034049;
|
||||
const SETFLAGS = 1074292226;
|
||||
const FS_DIRSYNC_FL = 65536;
|
||||
const FS_DIRSYNC_FL = 65536n;
|
||||
const buffer = Buffer.alloc(8, 0);
|
||||
const pathFD = fs.openSync(path, 'r');
|
||||
const status = ioctl(pathFD, GETFLAGS, buffer);
|
||||
assert.strictEqual(status, 0);
|
||||
const currentFlags = buffer.readUIntLE(0, 8);
|
||||
const currentFlags = buffer.readBigInt64LE(0);
|
||||
const flags = currentFlags | FS_DIRSYNC_FL;
|
||||
buffer.writeUIntLE(flags, 0, 8);
|
||||
buffer.writeBigInt64LE(flags, 0);
|
||||
const status2 = ioctl(pathFD, SETFLAGS, buffer);
|
||||
assert.strictEqual(status2, 0);
|
||||
fs.closeSync(pathFD);
|
||||
const pathFD2 = fs.openSync(path, 'r');
|
||||
const confirmBuffer = Buffer.alloc(8, 0);
|
||||
ioctl(pathFD2, GETFLAGS, confirmBuffer);
|
||||
assert.strictEqual(confirmBuffer.readUIntLE(0, 8),
|
||||
assert.strictEqual(confirmBuffer.readBigInt64LE(0),
|
||||
currentFlags | FS_DIRSYNC_FL, 'FS_DIRSYNC_FL not set');
|
||||
fs.closeSync(pathFD2);
|
||||
}
|
||||
|
57
package.json
57
package.json
@@ -20,39 +20,37 @@
|
||||
"@azure/identity": "^3.1.1",
|
||||
"@azure/storage-blob": "^12.12.0",
|
||||
"@js-sdsl/ordered-set": "^4.4.2",
|
||||
"@types/async": "^3.2.12",
|
||||
"@types/utf8": "^3.0.1",
|
||||
"JSONStream": "^1.0.0",
|
||||
"@swc/cli": "^0.4.0",
|
||||
"@swc/core": "^1.7.4",
|
||||
"agentkeepalive": "^4.1.3",
|
||||
"ajv": "6.12.3",
|
||||
"async": "~2.6.4",
|
||||
"ajv": "^6.12.3",
|
||||
"async": "^2.6.4",
|
||||
"aws-sdk": "^2.1005.0",
|
||||
"backo": "^1.1.0",
|
||||
"base-x": "3.0.8",
|
||||
"base62": "2.0.1",
|
||||
"bson": "4.0.0",
|
||||
"debug": "~4.1.0",
|
||||
"base-x": "^3.0.8",
|
||||
"base62": "^2.0.1",
|
||||
"bson": "^4.0.0",
|
||||
"debug": "^4.1.0",
|
||||
"diskusage": "^1.1.1",
|
||||
"fcntl": "github:scality/node-fcntl#0.2.2",
|
||||
"hdclient": "scality/hdclient#1.1.7",
|
||||
"httpagent": "scality/httpagent#1.0.6",
|
||||
"fcntl": "git+https://git.yourcmc.ru/vitalif/zenko-fcntl.git",
|
||||
"httpagent": "git+https://git.yourcmc.ru/vitalif/zenko-httpagent.git#development/1.0",
|
||||
"https-proxy-agent": "^2.2.0",
|
||||
"ioredis": "^4.28.5",
|
||||
"ipaddr.js": "1.9.1",
|
||||
"ipaddr.js": "^1.9.1",
|
||||
"joi": "^17.6.0",
|
||||
"level": "~5.0.1",
|
||||
"level-sublevel": "~6.6.5",
|
||||
"JSONStream": "^1.0.0",
|
||||
"level": "^5.0.1",
|
||||
"level-sublevel": "^6.6.5",
|
||||
"mongodb": "^5.2.0",
|
||||
"node-forge": "^1.3.0",
|
||||
"prom-client": "14.2.0",
|
||||
"prom-client": "^14.2.0",
|
||||
"simple-glob": "^0.2.0",
|
||||
"socket.io": "~4.6.1",
|
||||
"socket.io-client": "~4.6.1",
|
||||
"sproxydclient": "git+https://github.com/scality/sproxydclient#8.0.10",
|
||||
"utf8": "3.0.0",
|
||||
"socket.io": "^4.6.1",
|
||||
"socket.io-client": "^4.6.1",
|
||||
"utf8": "^3.0.0",
|
||||
"uuid": "^3.0.1",
|
||||
"werelogs": "scality/werelogs#8.1.4",
|
||||
"xml2js": "~0.4.23"
|
||||
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1",
|
||||
"xml2js": "^0.4.23"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"ioctl": "^2.0.2"
|
||||
@@ -61,22 +59,24 @@
|
||||
"@babel/preset-env": "^7.16.11",
|
||||
"@babel/preset-typescript": "^7.16.7",
|
||||
"@sinonjs/fake-timers": "^6.0.1",
|
||||
"@types/async": "^3.2.12",
|
||||
"@types/utf8": "^3.0.1",
|
||||
"@types/ioredis": "^4.28.10",
|
||||
"@types/jest": "^27.4.1",
|
||||
"@types/node": "^17.0.21",
|
||||
"@types/node": "^18.19.41",
|
||||
"@types/xml2js": "^0.4.11",
|
||||
"eslint": "^8.14.0",
|
||||
"eslint-config-airbnb": "6.2.0",
|
||||
"eslint-config-scality": "scality/Guidelines#ec33dfb",
|
||||
"eslint-config-airbnb-base": "^15.0.0",
|
||||
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git",
|
||||
"eslint-plugin-react": "^4.3.0",
|
||||
"jest": "^27.5.1",
|
||||
"mongodb-memory-server": "^8.12.2",
|
||||
"nyc": "^15.1.0",
|
||||
"sinon": "^9.0.2",
|
||||
"temp": "0.9.1",
|
||||
"temp": "^0.9.1",
|
||||
"ts-jest": "^27.1.3",
|
||||
"ts-node": "^10.6.0",
|
||||
"typescript": "^4.6.2"
|
||||
"typescript": "^4.9.5"
|
||||
},
|
||||
"scripts": {
|
||||
"lint": "eslint $(git ls-files '*.js')",
|
||||
@@ -84,7 +84,8 @@
|
||||
"lint_yml": "yamllint $(git ls-files '*.yml')",
|
||||
"test": "jest tests/unit",
|
||||
"build": "tsc",
|
||||
"prepare": "yarn build",
|
||||
"prepack": "tsc",
|
||||
"postinstall": "[ -d build ] || swc -d build --copy-files package.json index.ts lib",
|
||||
"ft_test": "jest tests/functional --testTimeout=120000 --forceExit",
|
||||
"coverage": "nyc --clean jest tests --coverage --testTimeout=120000 --forceExit",
|
||||
"build_doc": "cd documentation/listingAlgos/pics; dot -Tsvg delimiterStateChart.dot > delimiterStateChart.svg; dot -Tsvg delimiterMasterV0StateChart.dot > delimiterMasterV0StateChart.svg; dot -Tsvg delimiterVersionsStateChart.dot > delimiterVersionsStateChart.svg"
|
||||
|
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"target": "es6",
|
||||
"target": "es2020",
|
||||
"module": "commonjs",
|
||||
"rootDir": "./",
|
||||
"resolveJsonModule": true,
|
||||
|
Reference in New Issue
Block a user