Compare commits


2 Commits

Author SHA1 Message Date
Vitaliy Filippov fdfd50c981 Use TS? 2024-08-05 02:26:18 +03:00
Vitaliy Filippov 329d8ef32c Add Vitastor support 2024-08-05 02:23:54 +03:00
3 changed files with 689 additions and 1 deletions

View File

@ -25,6 +25,10 @@ function parseLC(config, vault) {
if (locationObj.type === 'file') { if (locationObj.type === 'file') {
clients[location] = new DataFileBackend(config); clients[location] = new DataFileBackend(config);
} }
if (locationObj.type === 'vitastor') {
const VitastorBackend = require('./vitastor/VitastorBackend');
clients[location] = new VitastorBackend(location, locationObj.details);
if (locationObj.type === 'scality') { if (locationObj.type === 'scality') {
if (locationObj.details.connector.sproxyd) { if (locationObj.details.connector.sproxyd) {
const Sproxy = require('sproxydclient'); const Sproxy = require('sproxydclient');

View File

@ -0,0 +1,684 @@
// Zenko CloudServer Vitastor data storage backend adapter
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see for details)
const stream = require('stream');
const vitastor = require('vitastor');
const VOLUME_MAGIC = 'VstS3Vol';
const OBJECT_MAGIC = 'VstS3Obj';
const FLAG_DELETED = 2n;
type Volume = {
id: number,
partial_sectors: {
[key: string]: {
buffer: Buffer,
refs: number,
header: {
location: string,
bucket: string,
max_size: number,
create_ts: number,
used_ts: number,
size: number,
objects: number,
removed_objects: number,
object_bytes: number,
removed_bytes: number,
type ObjectHeader = {
size: number,
key: string,
part_num?: number,
class VitastorBackend
locationName: string;
config: {
pack_objects: boolean,
[key: string]: any,
next_id: number;
alloc_id: number;
opened: boolean;
on_open: ((...args: any[]) => void)[] | null;
open_error: Error | null;
cli: any;
kv: any;
volumes: {
[bucket: string]: {
[max_size: string]: Volume,
volumes_by_id: {
[id: string]: Volume,
volume_delete_stats: {
[id: string]: {
count: number,
bytes: number,
constructor(locationName, config)
this.locationName = locationName;
this.config = config;
// validate config
this.config.pool_id = Number(this.config.pool_id) || 0;
if (!this.config.pool_id)
throw new Error('pool_id is required for Vitastor');
if (!this.config.metadata_image && (!this.config.metadata_pool_id || !this.config.metadata_inode_num))
throw new Error('metadata_image or metadata_inode is required for Vitastor');
if (!this.config.size_buckets || !this.config.size_buckets.length)
this.config.size_buckets = [ 32*1024, 128*1024, 512*1024, 2*1024, 8*1024 ];
this.config.size_bucket_mul = Number(this.config.size_bucket_mul) || 2;
this.config.id_batch_size = Number(this.config.id_batch_size) || 100;
this.config.sector_size = Number(this.config.sector_size) || 0;
if (this.config.sector_size < 4096)
this.config.sector_size = 4096;
this.config.write_chunk_size = Number(this.config.write_chunk_size) || 0;
if (this.config.write_chunk_size < this.config.sector_size)
this.config.write_chunk_size = 4*1024*1024; // 4 MB
this.config.read_chunk_size = Number(this.config.read_chunk_size) || 0;
if (this.config.read_chunk_size < this.config.sector_size)
this.config.read_chunk_size = 4*1024*1024; // 4 MB
this.config.pack_objects = !!this.config.pack_objects;
// state
this.next_id = 1;
this.alloc_id = 0;
this.opened = false;
this.on_open = null;
this.open_error = null;
this.cli = new vitastor.Client(config);
this.kv = new vitastor.KV(this.cli);
// we group objects into volumes by bucket and size
this.volumes = {};
this.volumes_by_id = {};
this.volume_delete_stats = {};
async _makeVolumeId()
if (this.next_id <= this.alloc_id)
return this.next_id++;
const id_key = 'id'+this.config.pool_id;
const [ err, prev ] = await new Promise<[ any, string ]>(ok => this.kv.get(id_key, (err, value) => ok([ err, value ])));
if (err && err != vitastor.ENOENT)
throw new Error(err);
const new_id = (parseInt(prev) || 0) + 1;
this.next_id = new_id;
this.alloc_id = this.next_id + this.config.id_batch_size - 1;
await new Promise((ok, no) => this.kv.set(id_key, this.alloc_id, err => (err ? no(new Error(err)) : ok(null)), cas_old => cas_old === prev));
return this.next_id;
async _getVolume(bucketName, size)
if (!this.opened)
if (this.on_open)
await new Promise(ok => this.on_open!.push(ok));
this.on_open = [];
if (this.config.metadata_image)
const img = new vitastor.Image(this.cli, this.config.metadata_image);
const info = await new Promise<{ pool_id: number, inode_num: number }>(ok => img.get_info(ok));
this.config.metadata_pool_id = info.pool_id;
this.config.metadata_inode_num = info.inode_num;
const kv_config = {};
for (const key in this.config)
if (key.substr(0, 3) === 'kv_')
kv_config[key] = this.config[key];
this.open_error = await new Promise(ok =>
this.config.metadata_pool_id, this.config.metadata_inode_num,
kv_config, err => ok(err ? new Error(err) : null)
this.opened = true; => setImmediate(cb));
this.on_open = null;
if (this.open_error)
throw this.open_error;
let i;
for (i = 0; i < this.config.size_buckets.length && size >= this.config.size_buckets[i]; i++) {}
let s;
if (i < this.config.size_buckets.length)
s = this.config.size_buckets[i];
else if (this.config.size_bucket_mul > 1)
while (size >= s)
s = Math.floor(this.config.size_bucket_mul * s);
if (!this.volumes[bucketName])
this.volumes[bucketName] = {};
if (this.volumes[bucketName][s])
return this.volumes[bucketName][s];
const new_id = await this._makeVolumeId();
const new_vol = this.volumes[bucketName][s] = {
id: new_id,
// FIXME: partial_sectors should be written with CAS because otherwise we may lose quick deletes
partial_sectors: {},
header: {
location: this.locationName,
bucket: bucketName,
max_size: s,
size: this.config.sector_size, // initial position is right after header
objects: 0,
removed_objects: 0,
object_bytes: 0,
removed_bytes: 0,
this.volumes_by_id[new_id] = new_vol;
const header_text = JSON.stringify(this.volumes[bucketName][s].header);
const buf = Buffer.alloc(this.config.sector_size);
buf.write(VOLUME_MAGIC + header_text, 0);
await new Promise((ok, no) => this.cli.write(
this.config.pool_id, new_id, 0, buf, err => (err ? no(new Error(err)) : ok(null))
await new Promise((ok, no) => this.kv.set(
'vol_'+this.config.pool_id+'_'+new_id, header_text, err => (err ? no(new Error(err)) : ok(null)), cas_old => !cas_old
return new_vol;
toObjectGetInfo(objectKey, bucketName, storageLocation)
return null;
_bufferStart(vol, cur_pos, cur_size, cur_chunks, sector_refs)
if ((cur_pos % this.config.sector_size) ||
Math.floor((cur_pos + cur_size) / this.config.sector_size) == Math.floor(cur_pos / this.config.sector_size))
const sect_pos = Math.floor(cur_pos / this.config.sector_size) * this.config.sector_size;
const sect = vol.partial_sectors[sect_pos]
? vol.partial_sectors[sect_pos].buffer
: Buffer.alloc(this.config.sector_size);
if (this.config.pack_objects)
// Save only if <pack_objects>
if (!vol.partial_sectors[sect_pos])
vol.partial_sectors[sect_pos] = { buffer: sect, refs: 0 };
let off = cur_pos % this.config.sector_size;
let i = 0;
for (; i < cur_chunks.length; i++)
let copy_len = this.config.sector_size - off;
copy_len = copy_len > cur_chunks[i].length ? cur_chunks[i].length : copy_len;
cur_chunks[i].copy(sect, off, 0, copy_len);
off += copy_len;
if (copy_len < cur_chunks[i].length)
cur_chunks[i] = cur_chunks[i].slice(copy_len);
cur_size -= copy_len;
cur_size -= cur_chunks[i].length;
cur_chunks.splice(0, i, sect);
cur_size += this.config.sector_size;
cur_pos = sect_pos;
return [ cur_pos, cur_size ];
_bufferEnd(vol, cur_pos, cur_size, cur_chunks, sector_refs, write_all)
const write_pos = cur_pos;
const write_chunks = cur_chunks;
let write_size = cur_size;
cur_chunks = [];
cur_pos += cur_size;
cur_size = 0;
let remain = (cur_pos % this.config.sector_size);
if (remain > 0)
cur_pos -= remain;
let last_sect = null;
if (write_all)
last_sect = vol.partial_sectors[cur_pos]
? vol.partial_sectors[cur_pos].buffer
: Buffer.alloc(this.config.sector_size);
if (this.config.pack_objects)
// Save only if <pack_objects>
if (!vol.partial_sectors[cur_pos])
vol.partial_sectors[cur_pos] = { buffer: last_sect, refs: 0 };
write_size -= remain;
if (write_size < 0)
write_size = 0;
for (let i = write_chunks.length-1; i >= 0 && remain > 0; i--)
if (write_chunks[i].length <= remain)
remain -= write_chunks[i].length;
if (write_all)
write_chunks[i].copy(last_sect, remain);
if (write_all)
write_chunks[i].copy(last_sect, 0, write_chunks[i].length - remain);
cur_chunks.unshift(write_chunks[i].slice(write_chunks[i].length - remain));
write_chunks[i] = write_chunks[i].slice(0, write_chunks[i].length - remain);
remain = 0;
if (write_all)
write_size += this.config.sector_size;
for (const chunk of cur_chunks)
cur_size += chunk.length;
return [ write_pos, write_chunks, write_size, cur_pos, cur_size, cur_chunks ];
* reqUids: string, // request-ids for log, usually joined by ':'
* keyContext: {
* // a lot of shit, basically all metadata
* bucketName,
* objectKey,
* owner?,
* namespace?,
* partNumber?,
* uploadId?,
* metaHeaders?,
* isDeleteMarker?,
* tagging?,
* contentType?,
* cacheControl?,
* contentDisposition?,
* contentEncoding?,
* },
* callback: (error, objectGetInfo: any) => void,
put(stream, size, keyContext, reqUids, callback)
callback = once(callback);
this._getVolume(keyContext.bucketName, size)
.then(vol => this._put(vol, stream, size, keyContext, reqUids, callback))
_put(vol, stream, size, keyContext, reqUids, callback)
const object_header: ObjectHeader = {
key: keyContext.objectKey,
if (keyContext.partNumber)
object_header.part_num = keyContext.partNumber;
// header is: <8 bytes magic> <8 bytes flags> <8 bytes json length> <json>
const hdr_begin_buf = Buffer.alloc(24);
const hdr_json_buf = Buffer.from(JSON.stringify(object_header), 'utf-8');
hdr_begin_buf.writeBigInt64LE(BigInt(hdr_json_buf.length), 16);
const object_header_buf = Buffer.concat([ hdr_begin_buf, hdr_json_buf ]);
const object_pos = vol.header.size;
const object_get_info = { volume:, offset: object_pos, hdrlen: object_header_buf.length, size };
let cur_pos = object_pos;
let cur_chunks = [ object_header_buf ];
let cur_size = object_header_buf.length;
let err: Error|null = null;
let waiting = 1; // 1 for end or error, 1 for each write request
vol.header.size += object_header_buf.length + size;
if (!this.config.pack_objects && (vol.header.size % this.config.sector_size))
vol.header.size += this.config.sector_size - (vol.header.size % this.config.sector_size);
const writeChunk = (last) =>
const sector_refs = [];
// Handle partial beginning
[ cur_pos, cur_size ] = this._bufferStart(vol, cur_pos, cur_size, cur_chunks, sector_refs);
// Handle partial end
let write_pos, write_chunks, write_size;
[ write_pos, write_chunks, write_size, cur_pos, cur_size, cur_chunks ] = this._bufferEnd(vol, cur_pos, cur_size, cur_chunks, sector_refs, last);
// FIXME: pool_id: maybe it should be stored in volume metadata to allow to migrate volumes?
this.cli.write(this.config.pool_id,, write_pos, write_chunks, (res) =>
for (const sect of sector_refs)
if (!vol.partial_sectors[sect].refs &&
vol.header.size >= sect+this.config.sector_size)
// Forget partial data when it's not needed anymore
if (res)
err = new Error(res);
if (!waiting)
callback(err, err ? null : object_get_info);
// Stream data
stream.on('error', (e) =>
err = e;
if (!waiting)
callback(err, null);
stream.on('end', () =>
if (err)
if (cur_size)
// write last chunk
if (!waiting)
callback(null, object_get_info);
stream.on('data', (chunk) =>
if (err)
cur_size += chunk.length;
if (cur_size >= this.config.write_chunk_size)
// got a complete chunk, write it out
* objectGetInfo: {
* key: { volume, offset, hdrlen, size }, // from put
* size,
* start,
* dataStoreName,
* dataStoreETag,
* range,
* response: ServerResponse,
* },
* range?: [ start, end ], // like in HTTP - first byte index, last byte index
* callback: (error, readStream) => void,
get(objectGetInfo, range, reqUids, callback)
if (!(objectGetInfo instanceof Object) || !objectGetInfo.key ||
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
const [ start, end ] = range || [];
if (start < 0 || end < 0 || end != null && start != null && end < start || start >= objectGetInfo.key.size)
throw new Error('Invalid range: '+start+'-'+end);
let offset = objectGetInfo.key.offset + objectGetInfo.key.hdrlen + (start || 0);
let len = objectGetInfo.key.size - (start || 0);
if (end)
const len2 = end - (start || 0) + 1;
if (len2 < len)
len = len2;
callback(null, new VitastorReadStream(this.cli, objectGetInfo.key.volume, offset, len, this.config));
* objectGetInfo: {
* key: { volume, offset, hdrlen, size }, // from put
* size,
* start,
* dataStoreName,
* dataStoreETag,
* range,
* response: ServerResponse,
* },
* callback: (error) => void,
delete(objectGetInfo, reqUids, callback)
callback = once(callback);
this._delete(objectGetInfo, reqUids)
async _delete(objectGetInfo, reqUids)
if (!(objectGetInfo instanceof Object) || !objectGetInfo.key ||
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
const in_sect_pos = (objectGetInfo.key.offset % this.config.sector_size);
const sect_pos = objectGetInfo.key.offset - in_sect_pos;
const vol = this.volumes_by_id[objectGetInfo.key.volume];
if (vol && vol.partial_sectors[sect_pos])
// The sector may still be written to in corner cases
const sect = vol.partial_sectors[sect_pos];
const flags = sect.buffer.readBigInt64LE(in_sect_pos + 8);
if (!(flags & FLAG_DELETED))
const del_stat = this.volume_delete_stats[] = (this.volume_delete_stats[] || { count: 0, bytes: 0 });
del_stat.bytes += objectGetInfo.key.size;
sect.buffer.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, sect.buffer, ok));
if (err)
sect.buffer.writeBigInt64LE(0n, in_sect_pos + 8);
throw new Error(err);
// RMW with CAS
const [ err, buf, version ] = await new Promise<[ any, Buffer, bigint ]>(ok =>
this.config.pool_id, objectGetInfo.key.volume, sect_pos, this.config.sector_size,
(err, buf, version) => ok([ err, buf, version ])
if (err)
throw new Error(err);
// FIXME What if JSON crosses sector boundary? Prevent it if we want to pack objects
const magic = buf.slice(in_sect_pos, in_sect_pos+8).toString();
const flags = buf.readBigInt64LE(in_sect_pos+8);
const json_len = Number(buf.readBigInt64LE(in_sect_pos+16));
let json_hdr;
if (in_sect_pos+24+json_len <= buf.length)
json_hdr = JSON.parse(buf.slice(in_sect_pos+24, in_sect_pos+24+json_len).toString());
catch (e)
if (magic !== OBJECT_MAGIC || !json_hdr || json_hdr.size !== objectGetInfo.key.size)
throw new Error(
'header of object with size '+objectGetInfo.key.size+
' bytes not found in volume '+objectGetInfo.key.volume+' at '+objectGetInfo.key.offset
else if (!(flags & FLAG_DELETED))
buf.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, buf, { version: version+1n }, ok));
if (err == vitastor.EINTR)
// Retry
await this._delete(objectGetInfo, reqUids);
else if (err)
throw new Error(err);
const del_stat = this.volume_delete_stats[objectGetInfo.key.volume] = (this.volume_delete_stats[objectGetInfo.key.volume] || { count: 0, bytes: 0 });
del_stat.bytes += objectGetInfo.key.size;
* config: full zenko server config,
* callback: (error, stats) => void, // stats is the returned statistics in arbitrary format
getDiskUsage(config, reqUids, callback)
// FIXME: Iterate all volumes and return its sizes and deletion statistics, or maybe just sizes
callback(null, {});
class VitastorReadStream extends stream.Readable
constructor(cli, volume_id, offset, len, config, options = undefined)
this.cli = cli;
this.volume_id = volume_id;
this.offset = offset;
this.end = offset + len;
this.pos = offset;
this.config = config;
this._reading = false;
if (this._reading)
// FIXME: Validate object header
const chunk_size = n && this.config.read_chunk_size < n ? n : this.config.read_chunk_size;
const read_offset = this.pos;
const round_offset = read_offset - (read_offset % this.config.sector_size);
let read_end = this.end <= read_offset+chunk_size ? this.end : read_offset+chunk_size;
const round_end = (read_end % this.config.sector_size)
? read_end + this.config.sector_size - (read_end % this.config.sector_size)
: read_end;
if (round_end <= this.end)
read_end = round_end;
this.pos = read_end;
if (read_end <= read_offset)
// EOF
this._reading = true;, this.volume_id, round_offset, round_end-round_offset, (err, buf, version) =>
this._reading = false;
if (err)
this.destroy(new Error(err));
if (read_offset != round_offset || round_end != read_end)
buf = buf.subarray(read_offset-round_offset, buf.length-(round_end-read_end));
if (this.push(buf))
function once(callback)
let called = false;
return function()
if (!called)
called = true;
callback.apply(null, arguments);
module.exports = VitastorBackend;

View File

@ -1,6 +1,6 @@
{ {
"compilerOptions": { "compilerOptions": {
"target": "es6", "target": "es2020",
"module": "commonjs", "module": "commonjs",
"rootDir": "./", "rootDir": "./",
"resolveJsonModule": true, "resolveJsonModule": true,