Compare commits
7 Commits
03079e2121
...
3ec47c6897
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 3ec47c6897 | |
Vitaliy Filippov | 71ed7ed1c0 | |
Vitaliy Filippov | 96de82af24 | |
Vitaliy Filippov | 220d464ed6 | |
Vitaliy Filippov | 0bb684d1ba | |
Vitaliy Filippov | 6114c58288 | |
Vitaliy Filippov | 85a73e3fdd |
|
@ -0,0 +1,12 @@
|
||||||
|
{
|
||||||
|
"$schema": "https://swc.rs/schema.json",
|
||||||
|
"jsc": {
|
||||||
|
"parser": {
|
||||||
|
"syntax": "typescript"
|
||||||
|
},
|
||||||
|
"target": "es5"
|
||||||
|
},
|
||||||
|
"module": {
|
||||||
|
"type": "commonjs"
|
||||||
|
}
|
||||||
|
}
|
|
@ -83,7 +83,7 @@ export type ResultObject = {
|
||||||
export type CommandPromise = {
|
export type CommandPromise = {
|
||||||
resolve: (results?: ResultObject[]) => void;
|
resolve: (results?: ResultObject[]) => void;
|
||||||
reject: (error: Error) => void;
|
reject: (error: Error) => void;
|
||||||
timeout: NodeJS.Timer | null;
|
timeout: NodeJS.Timeout | null;
|
||||||
};
|
};
|
||||||
export type HandlerCallback = (error: (Error & { code?: number }) | null | undefined, result?: any) => void;
|
export type HandlerCallback = (error: (Error & { code?: number }) | null | undefined, result?: any) => void;
|
||||||
export type HandlerFunction = (payload: object, uids: string, callback: HandlerCallback) => void;
|
export type HandlerFunction = (payload: object, uids: string, callback: HandlerCallback) => void;
|
||||||
|
@ -254,7 +254,7 @@ export async function sendWorkerCommand(
|
||||||
}
|
}
|
||||||
rpcLogger.info('sending command', { toWorkers, toHandler, uids, payload });
|
rpcLogger.info('sending command', { toWorkers, toHandler, uids, payload });
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
let timeout: NodeJS.Timer | null = null;
|
let timeout: NodeJS.Timeout | null = null;
|
||||||
if (timeoutMs) {
|
if (timeoutMs) {
|
||||||
timeout = setTimeout(() => {
|
timeout = setTimeout(() => {
|
||||||
delete uidsToCommandPromise[uids];
|
delete uidsToCommandPromise[uids];
|
||||||
|
|
|
@ -435,7 +435,6 @@ export default class Server {
|
||||||
this._server.on('connection', sock => {
|
this._server.on('connection', sock => {
|
||||||
// Setting no delay of the socket to the value configured
|
// Setting no delay of the socket to the value configured
|
||||||
// TODO fix this
|
// TODO fix this
|
||||||
// @ts-expect-errors
|
|
||||||
sock.setNoDelay(this.isNoDelay());
|
sock.setNoDelay(this.isNoDelay());
|
||||||
sock.on('error', err => this._logger.info(
|
sock.on('error', err => this._logger.info(
|
||||||
'socket error - request rejected', { error: err }));
|
'socket error - request rejected', { error: err }));
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
const { http, https } = require('httpagent');
|
const { http, https } = require('httpagent');
|
||||||
const url = require('url');
|
const url = require('url');
|
||||||
const AWS = require('aws-sdk');
|
const AWS = require('aws-sdk');
|
||||||
const Sproxy = require('sproxydclient');
|
|
||||||
const Hyperdrive = require('hdclient');
|
|
||||||
const HttpsProxyAgent = require('https-proxy-agent');
|
const HttpsProxyAgent = require('https-proxy-agent');
|
||||||
|
|
||||||
|
require("aws-sdk/lib/maintenance_mode_message").suppress = true;
|
||||||
|
|
||||||
const constants = require('../../constants');
|
const constants = require('../../constants');
|
||||||
const DataFileBackend = require('./file/DataFileInterface');
|
const DataFileBackend = require('./file/DataFileInterface');
|
||||||
const inMemory = require('./in_memory/datastore').backend;
|
const inMemory = require('./in_memory/datastore').backend;
|
||||||
|
@ -25,8 +25,13 @@ function parseLC(config, vault) {
|
||||||
if (locationObj.type === 'file') {
|
if (locationObj.type === 'file') {
|
||||||
clients[location] = new DataFileBackend(config);
|
clients[location] = new DataFileBackend(config);
|
||||||
}
|
}
|
||||||
|
if (locationObj.type === 'vitastor') {
|
||||||
|
const VitastorBackend = require('./vitastor/VitastorBackend');
|
||||||
|
clients[location] = new VitastorBackend(location, locationObj.details);
|
||||||
|
}
|
||||||
if (locationObj.type === 'scality') {
|
if (locationObj.type === 'scality') {
|
||||||
if (locationObj.details.connector.sproxyd) {
|
if (locationObj.details.connector.sproxyd) {
|
||||||
|
const Sproxy = require('sproxydclient');
|
||||||
clients[location] = new Sproxy({
|
clients[location] = new Sproxy({
|
||||||
bootstrap: locationObj.details.connector
|
bootstrap: locationObj.details.connector
|
||||||
.sproxyd.bootstrap,
|
.sproxyd.bootstrap,
|
||||||
|
@ -41,6 +46,7 @@ function parseLC(config, vault) {
|
||||||
});
|
});
|
||||||
clients[location].clientType = 'scality';
|
clients[location].clientType = 'scality';
|
||||||
} else if (locationObj.details.connector.hdclient) {
|
} else if (locationObj.details.connector.hdclient) {
|
||||||
|
const Hyperdrive = require('hdclient');
|
||||||
clients[location] = new Hyperdrive.hdcontroller.HDProxydClient(
|
clients[location] = new Hyperdrive.hdcontroller.HDProxydClient(
|
||||||
locationObj.details.connector.hdclient);
|
locationObj.details.connector.hdclient);
|
||||||
clients[location].clientType = 'scality';
|
clients[location].clientType = 'scality';
|
||||||
|
|
|
@ -0,0 +1,574 @@
|
||||||
|
// Zenko CloudServer Vitastor data storage backend adapter
|
||||||
|
// Copyright (c) Vitaliy Filippov, 2019+
|
||||||
|
// License: VNPL-1.1 (see README.md for details)
|
||||||
|
|
||||||
|
const stream = require('stream');
|
||||||
|
|
||||||
|
const vitastor = require('vitastor');
|
||||||
|
|
||||||
|
const VOLUME_MAGIC = 'VstS3Vol';
|
||||||
|
const OBJECT_MAGIC = 'VstS3Obj';
|
||||||
|
const FLAG_DELETED = 2n;
|
||||||
|
|
||||||
|
class VitastorBackend
|
||||||
|
{
|
||||||
|
constructor(locationName, config)
|
||||||
|
{
|
||||||
|
this.locationName = locationName;
|
||||||
|
this.config = config;
|
||||||
|
// validate config
|
||||||
|
this.config.pool_id = Number(this.config.pool_id) || 0;
|
||||||
|
if (!this.config.pool_id)
|
||||||
|
throw new Error('pool_id is required for Vitastor');
|
||||||
|
if (!this.config.metadata_image && !this.config.metadata_inode)
|
||||||
|
throw new Error('metadata_image or metadata_inode is required for Vitastor');
|
||||||
|
if (!this.config.size_buckets || !this.config.size_buckets.length)
|
||||||
|
this.config.size_buckets = [ 32*1024, 128*1024, 512*1024, 2*1024, 8*1024 ];
|
||||||
|
this.config.size_bucket_mul = Number(this.config.size_bucket_mul) || 2;
|
||||||
|
this.config.id_batch_size = Number(this.config.id_batch_size) || 100;
|
||||||
|
this.config.sector_size = Number(this.config.sector_size) || 0;
|
||||||
|
if (this.config.sector_size < 4096)
|
||||||
|
this.config.sector_size = 4096;
|
||||||
|
this.config.write_chunk_size = Number(this.config.write_chunk_size) || 0;
|
||||||
|
if (this.config.write_chunk_size < this.config.sector_size)
|
||||||
|
this.config.write_chunk_size = 4*1024*1024; // 4 MB
|
||||||
|
this.config.read_chunk_size = Number(this.config.read_chunk_size) || 0;
|
||||||
|
if (this.config.read_chunk_size < this.config.sector_size)
|
||||||
|
this.config.read_chunk_size = 4*1024*1024; // 4 MB
|
||||||
|
this.config.align_objects = !!this.config.align_objects;
|
||||||
|
// state
|
||||||
|
this.next_id = 1;
|
||||||
|
this.alloc_max_id = 0;
|
||||||
|
this.opened = false;
|
||||||
|
this.on_open = null;
|
||||||
|
this.open_error = null;
|
||||||
|
this.cli = new vitastor.Client(config);
|
||||||
|
this.kv = new vitastor.KV(this.cli);
|
||||||
|
// we group objects into volumes by bucket and size
|
||||||
|
this.volumes = {};
|
||||||
|
this.volumes_by_id = {};
|
||||||
|
this.volume_delete_stats = {};
|
||||||
|
}
|
||||||
|
|
||||||
|
async _makeVolumeId()
|
||||||
|
{
|
||||||
|
if (this.next_id <= this.alloc_id)
|
||||||
|
{
|
||||||
|
return this.next_id++;
|
||||||
|
}
|
||||||
|
const id_key = 'id'+this.config.pool_id;
|
||||||
|
const [ err, prev ] = await new Promise(ok => this.kv.get(id_key, (err, value) => ok([ err, value ])));
|
||||||
|
if (err && err != vitastor.ENOENT)
|
||||||
|
{
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
const new_id = (parseInt(prev) || 0) + 1;
|
||||||
|
this.next_id = new_id;
|
||||||
|
this.alloc_id = this.next_id + this.config.id_batch_size - 1;
|
||||||
|
await new Promise((ok, no) => this.kv.set(id_key, this.alloc_id, err => (err ? no(err) : ok()), cas_old => cas_old === prev));
|
||||||
|
return this.next_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
async _getVolume(bucketName, size)
|
||||||
|
{
|
||||||
|
if (!this.opened)
|
||||||
|
{
|
||||||
|
if (this.on_open)
|
||||||
|
{
|
||||||
|
await new Promise(ok => this.on_open.push(ok));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
this.on_open = [];
|
||||||
|
if (this.config.metadata_image)
|
||||||
|
{
|
||||||
|
const img = new vitastor.Image(this.cli, this.config.metadata_image);
|
||||||
|
const info = await new Promise(ok => img.get_info(ok));
|
||||||
|
this.config.metadata_inode = info.num;
|
||||||
|
}
|
||||||
|
const kv_config = {};
|
||||||
|
for (const key in this.config)
|
||||||
|
{
|
||||||
|
if (key.substr(0, 3) === 'kv_')
|
||||||
|
kv_config[key] = this.config[key];
|
||||||
|
}
|
||||||
|
this.open_error = await new Promise(ok => this.kv.open(this.config.metadata_inode, kv_config, ok));
|
||||||
|
this.opened = true;
|
||||||
|
this.on_open.map(cb => setImmediate(cb));
|
||||||
|
this.on_open = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (this.open_error)
|
||||||
|
{
|
||||||
|
throw this.open_error;
|
||||||
|
}
|
||||||
|
let i;
|
||||||
|
for (i = 0; i < this.config.size_buckets.length && size >= this.config.size_buckets[i]; i++) {}
|
||||||
|
let s;
|
||||||
|
if (i < this.config.size_buckets.length)
|
||||||
|
s = this.config.size_buckets[i];
|
||||||
|
else if (this.config.size_bucket_mul > 1)
|
||||||
|
{
|
||||||
|
while (size >= s)
|
||||||
|
s = Math.floor(this.config.size_bucket_mul * s);
|
||||||
|
}
|
||||||
|
if (!this.volumes[bucketName])
|
||||||
|
{
|
||||||
|
this.volumes[bucketName] = {};
|
||||||
|
}
|
||||||
|
if (this.volumes[bucketName][s])
|
||||||
|
{
|
||||||
|
return this.volumes[bucketName][s];
|
||||||
|
}
|
||||||
|
const new_id = await this._makeVolumeId();
|
||||||
|
const new_vol = this.volumes[bucketName][s] = {
|
||||||
|
id: new_id,
|
||||||
|
// FIXME: partial_sectors should be written with CAS because otherwise we may lose quick deletes
|
||||||
|
partial_sectors: {},
|
||||||
|
header: {
|
||||||
|
location: this.locationName,
|
||||||
|
bucket: bucketName,
|
||||||
|
max_size: s,
|
||||||
|
create_ts: Date.now(),
|
||||||
|
used_ts: Date.now(),
|
||||||
|
size: this.config.sector_size, // initial position is right after header
|
||||||
|
objects: 0,
|
||||||
|
removed_objects: 0,
|
||||||
|
object_bytes: 0,
|
||||||
|
removed_bytes: 0,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.volumes_by_id[new_id] = new_vol;
|
||||||
|
const header_text = JSON.stringify(this.volumes[bucketName][s].header);
|
||||||
|
const buf = Buffer.alloc(this.config.sector_size);
|
||||||
|
buf.write(VOLUME_MAGIC + header_text, 0);
|
||||||
|
await new Promise((ok, no) => this.cli.write(
|
||||||
|
this.config.pool_id, new_id, 0, buf, (err) => err ? no(err) : ok()
|
||||||
|
));
|
||||||
|
await new Promise((ok, no) => this.kv.set(
|
||||||
|
'vol_'+this.config.pool_id+'_'+new_id, header_text, err => (err ? no(err) : ok()), cas_old => !cas_old
|
||||||
|
));
|
||||||
|
return new_vol;
|
||||||
|
}
|
||||||
|
|
||||||
|
toObjectGetInfo(objectKey, bucketName, storageLocation)
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
_bufferStart(vol, cur_pos, cur_size, cur_chunks, sector_refs)
|
||||||
|
{
|
||||||
|
if ((cur_pos % this.config.sector_size) ||
|
||||||
|
Math.floor((cur_pos + cur_size) / this.config.sector_size) == Math.floor(cur_pos / this.config.sector_size))
|
||||||
|
{
|
||||||
|
const sect_pos = Math.floor(cur_pos / this.config.sector_size) * this.config.sector_size;
|
||||||
|
const sect = vol.partial_sectors[sect_pos]
|
||||||
|
? vol.partial_sectors[sect_pos].buffer
|
||||||
|
: Buffer.alloc(this.config.sector_size);
|
||||||
|
if (!this.config.align_objects)
|
||||||
|
{
|
||||||
|
// Save only if not <align_objects>
|
||||||
|
if (!vol.partial_sectors[sect_pos])
|
||||||
|
vol.partial_sectors[sect_pos] = { buffer: sect, refs: 0 };
|
||||||
|
vol.partial_sectors[sect_pos].refs++;
|
||||||
|
sector_refs.push(sect_pos);
|
||||||
|
}
|
||||||
|
let off = cur_pos % this.config.sector_size;
|
||||||
|
let i = 0;
|
||||||
|
for (; i < cur_chunks.length; i++)
|
||||||
|
{
|
||||||
|
let copy_len = this.config.sector_size - off;
|
||||||
|
copy_len = copy_len > cur_chunks[i].length ? cur_chunks[i].length : copy_len;
|
||||||
|
cur_chunks[i].copy(sect, off, 0, copy_len);
|
||||||
|
off += copy_len;
|
||||||
|
if (copy_len < cur_chunks[i].length)
|
||||||
|
{
|
||||||
|
cur_chunks[i] = cur_chunks[i].slice(copy_len);
|
||||||
|
cur_size -= copy_len;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
cur_size -= cur_chunks[i].length;
|
||||||
|
}
|
||||||
|
cur_chunks.splice(0, i, sect);
|
||||||
|
cur_size += this.config.sector_size;
|
||||||
|
cur_pos = sect_pos;
|
||||||
|
}
|
||||||
|
return [ cur_pos, cur_size ];
|
||||||
|
}
|
||||||
|
|
||||||
|
_bufferEnd(vol, cur_pos, cur_size, cur_chunks, sector_refs, write_all)
|
||||||
|
{
|
||||||
|
const write_pos = cur_pos;
|
||||||
|
const write_chunks = cur_chunks;
|
||||||
|
cur_chunks = [];
|
||||||
|
cur_pos += cur_size;
|
||||||
|
cur_size = 0;
|
||||||
|
let remain = (cur_pos % this.config.sector_size);
|
||||||
|
if (remain > 0)
|
||||||
|
{
|
||||||
|
cur_pos -= remain;
|
||||||
|
let last_sect = null;
|
||||||
|
if (write_all)
|
||||||
|
{
|
||||||
|
last_sect = vol.partial_sectors[cur_pos]
|
||||||
|
? vol.partial_sectors[cur_pos].buffer
|
||||||
|
: Buffer.alloc(this.config.sector_size);
|
||||||
|
if (!this.config.align_objects)
|
||||||
|
{
|
||||||
|
// Save only if not <align_objects>
|
||||||
|
if (!vol.partial_sectors[cur_pos])
|
||||||
|
vol.partial_sectors[cur_pos] = { buffer: sect, refs: 0 };
|
||||||
|
vol.partial_sectors[cur_pos].refs++;
|
||||||
|
sector_refs.push(cur_pos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (let i = write_chunks.length-1; i >= 0 && remain > 0; i--)
|
||||||
|
{
|
||||||
|
if (write_chunks[i].length <= remain)
|
||||||
|
{
|
||||||
|
remain -= write_chunks[i].length;
|
||||||
|
if (write_all)
|
||||||
|
write_chunks[i].copy(last_sect, remain);
|
||||||
|
else
|
||||||
|
cur_chunks.unshift(write_chunks[i]);
|
||||||
|
write_chunks.pop();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (write_all)
|
||||||
|
write_chunks[i].copy(last_sect, 0, write_chunks[i].length - remain);
|
||||||
|
else
|
||||||
|
cur_chunks.unshift(write_chunks[i].slice(write_chunks[i].length - remain));
|
||||||
|
write_chunks[i] = write_chunks[i].slice(0, write_chunks[i].length - remain);
|
||||||
|
remain = 0;
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (last)
|
||||||
|
{
|
||||||
|
write_chunks.push(last_sect);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (const chunk of cur_chunks.length)
|
||||||
|
{
|
||||||
|
cur_size += chunk.size;
|
||||||
|
}
|
||||||
|
return [ write_pos, write_chunks, cur_pos, cur_size, cur_chunks ];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* reqUids: string, // request-ids for log, usually joined by ':'
|
||||||
|
* keyContext: {
|
||||||
|
* // a lot of shit, basically all metadata
|
||||||
|
* bucketName,
|
||||||
|
* objectKey,
|
||||||
|
* owner?,
|
||||||
|
* namespace?,
|
||||||
|
* partNumber?,
|
||||||
|
* uploadId?,
|
||||||
|
* metaHeaders?,
|
||||||
|
* isDeleteMarker?,
|
||||||
|
* tagging?,
|
||||||
|
* contentType?,
|
||||||
|
* cacheControl?,
|
||||||
|
* contentDisposition?,
|
||||||
|
* contentEncoding?,
|
||||||
|
* },
|
||||||
|
* callback: (error, objectGetInfo: any) => void,
|
||||||
|
*/
|
||||||
|
put(stream, size, keyContext, reqUids, callback)
|
||||||
|
{
|
||||||
|
this._getVolume(keyContext.bucketName, size)
|
||||||
|
.then(vol => this._put(vol, stream, size, keyContext, reqUids, callback))
|
||||||
|
.catch(err => callback(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
_put(vol, stream, size, keyContext, reqUids, callback)
|
||||||
|
{
|
||||||
|
const object_header = {
|
||||||
|
size,
|
||||||
|
key: keyContext.objectKey,
|
||||||
|
};
|
||||||
|
if (keyContext.partNumber)
|
||||||
|
{
|
||||||
|
object_header.part_num = keyContext.partNumber;
|
||||||
|
}
|
||||||
|
// header is: <8 bytes magic> <8 bytes flags> <8 bytes json length> <json>
|
||||||
|
const hdr_begin_buf = Buffer.alloc(24);
|
||||||
|
const hdr_json_buf = Buffer.from(JSON.stringify(object_header), 'utf-8');
|
||||||
|
hdr_begin_buf.write(OBJECT_MAGIC);
|
||||||
|
hdr_begin_buf.writeBigInt64LE(BigInt(hdr_json_buf.length), 16);
|
||||||
|
const object_header_buf = Buffer.concat([ hdr_begin_buf, hdr_json_buf ]);
|
||||||
|
const object_pos = vol.header.size;
|
||||||
|
const object_get_info = { volume: vol.id, offset: object_pos, hdrlen: object_header_buf.length, size };
|
||||||
|
let cur_pos = object_pos;
|
||||||
|
let cur_chunks = [ object_header_buf ];
|
||||||
|
let cur_size = object_header_buf.length;
|
||||||
|
let err = null;
|
||||||
|
let waiting = 1; // 1 for end or error, 1 for each write request
|
||||||
|
vol.header.size += object_header_buf.length + size;
|
||||||
|
if (this.config.align_objects && (vol.header.size % this.config.sector_size))
|
||||||
|
{
|
||||||
|
vol.header.size += this.config.sector_size - (vol.header.size % this.config.sector_size);
|
||||||
|
}
|
||||||
|
const writeChunk = (last) =>
|
||||||
|
{
|
||||||
|
const sector_refs = [];
|
||||||
|
// Handle partial beginning
|
||||||
|
[ cur_pos, cur_size ] = this._bufferStart(vol, cur_pos, cur_size, cur_chunks, sector_refs);
|
||||||
|
// Handle partial end
|
||||||
|
let write_pos, write_chunks;
|
||||||
|
[ write_pos, write_chunks, cur_pos, cur_size, cur_chunks ] = this._bufferEnd(vol, cur_pos, cur_size, cur_chunks, sector_refs, last);
|
||||||
|
waiting++;
|
||||||
|
// FIXME: pool_id: maybe it should be stored in volume metadata to allow to migrate volumes?
|
||||||
|
this.cli.write(this.config.pool_id, vol.id, write_pos, write_chunks, (e) =>
|
||||||
|
{
|
||||||
|
for (const sect of sector_refs)
|
||||||
|
{
|
||||||
|
vol.partial_sectors[sect].refs--;
|
||||||
|
if (!vol.partial_sectors[sect].refs &&
|
||||||
|
vol.header.size >= sect+this.config.sector_size)
|
||||||
|
{
|
||||||
|
// Forget partial data when it's not needed anymore
|
||||||
|
delete(vol.partial_sectors[sect]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
waiting--;
|
||||||
|
if (err)
|
||||||
|
{
|
||||||
|
err = e;
|
||||||
|
waiting--;
|
||||||
|
}
|
||||||
|
if (!waiting)
|
||||||
|
{
|
||||||
|
callback(err, err ? null : object_get_info);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
|
// Stream data
|
||||||
|
stream.on('error', (e) =>
|
||||||
|
{
|
||||||
|
err = e;
|
||||||
|
waiting--;
|
||||||
|
if (!waiting)
|
||||||
|
{
|
||||||
|
callback(err, null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
stream.on('end', () =>
|
||||||
|
{
|
||||||
|
if (err)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
waiting--;
|
||||||
|
if (cur_size)
|
||||||
|
{
|
||||||
|
// write last chunk
|
||||||
|
writeChunk(true);
|
||||||
|
}
|
||||||
|
if (!waiting)
|
||||||
|
{
|
||||||
|
callback(null, object_get_info);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
stream.on('data', (chunk) =>
|
||||||
|
{
|
||||||
|
if (err)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
cur_chunks.push(chunk);
|
||||||
|
cur_size += chunk.size;
|
||||||
|
if (cur_size >= this.config.write_chunk_size)
|
||||||
|
{
|
||||||
|
// got a complete chunk, write it out
|
||||||
|
writeChunk(false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* objectGetInfo: { volume, offset, hdrlen, size }, // from put
|
||||||
|
* range?: [ start, end ], // like in HTTP - first byte index, last byte index
|
||||||
|
* callback: (error, readStream) => void,
|
||||||
|
*/
|
||||||
|
get(objectGetInfo, range, reqUids, callback)
|
||||||
|
{
|
||||||
|
if (!(objectGetInfo instanceof Object) || !objectGetInfo.volume ||
|
||||||
|
!objectGetInfo.offset || !objectGetInfo.hdrlen || !objectGetInfo.size)
|
||||||
|
{
|
||||||
|
throw new Error('objectGetInfo must be an object, but is '+objectGetInfo);
|
||||||
|
}
|
||||||
|
const [ start, end ] = range || [];
|
||||||
|
if (start < 0 || end < 0 || end != null && start != null && end < start || start >= objectGetInfo.size)
|
||||||
|
{
|
||||||
|
throw new Error('Invalid range: '+start+'-'+end);
|
||||||
|
}
|
||||||
|
let offset = objectGetInfo.offset + objectGetInfo.hdrlen + (start || 0);
|
||||||
|
let len = objectGetInfo.size - (start || 0);
|
||||||
|
if (end)
|
||||||
|
{
|
||||||
|
const len2 = end - (start || 0) + 1;
|
||||||
|
if (len2 < len)
|
||||||
|
len = len2;
|
||||||
|
}
|
||||||
|
callback(null, new VitastorReadStream(this.cli, objectGetInfo.volume, offset, this.config, len));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* objectGetInfo: { volume, offset, hdrlen, size }, // from put
|
||||||
|
* callback: (error) => void,
|
||||||
|
*/
|
||||||
|
delete(objectGetInfo, reqUids, callback)
|
||||||
|
{
|
||||||
|
if (!(objectGetInfo instanceof Object) || !objectGetInfo.volume ||
|
||||||
|
!objectGetInfo.offset || !objectGetInfo.hdrlen || !objectGetInfo.size)
|
||||||
|
{
|
||||||
|
throw new Error('objectGetInfo must be an object, but is '+objectGetInfo);
|
||||||
|
}
|
||||||
|
const in_sect_pos = (objectGetInfo.offset % this.config.sector_size);
|
||||||
|
const sect_pos = objectGetInfo.offset - in_sect_pos;
|
||||||
|
const vol = this.volumes_by_id[objectGetInfo.volume];
|
||||||
|
if (vol && vol.partial_sectors[sect_pos])
|
||||||
|
{
|
||||||
|
// The sector may still be written to in corner cases
|
||||||
|
const sect = vol.partial_sectors[sect_pos];
|
||||||
|
const flags = sect.buffer.readBigInt64LE(in_sect_pos + 8);
|
||||||
|
if (flags & FLAG_DELETED)
|
||||||
|
{
|
||||||
|
callback(null);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
const del_stat = this.volume_delete_stats[vol.id] = (this.volume_delete_stats[vol.id] || { count: 0, bytes: 0 });
|
||||||
|
del_stat.count++;
|
||||||
|
del_stat.bytes += objectGetInfo.size;
|
||||||
|
sect.buffer.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
|
||||||
|
sect.refs++;
|
||||||
|
this.cli.write(this.config.pool_id, objectGetInfo.volume, sect_pos, sect.buffer, (err) =>
|
||||||
|
{
|
||||||
|
if (err)
|
||||||
|
sect.buffer.writeBigInt64LE(0n, in_sect_pos + 8);
|
||||||
|
sect.refs--;
|
||||||
|
callback(err);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// RMW with CAS
|
||||||
|
this.cli.read(this.config.pool_id, objectGetInfo.volume, sect_pos, sect.buffer, (err, buf, version) =>
|
||||||
|
{
|
||||||
|
if (err)
|
||||||
|
{
|
||||||
|
callback(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const magic = buf.slice(in_sect_pos, in_sect_pos+8).toString();
|
||||||
|
const flags = buf.readBigInt64LE(in_sect_pos+8);
|
||||||
|
const json_len = buf.readBigInt64LE(in_sect_pos+16);
|
||||||
|
if (magic !== OBJECT_MAGIC || json_len !== objectGetInfo.size)
|
||||||
|
{
|
||||||
|
callback(new Error('object header not found in volume '+objectGetInfo.volume+' at '+objectGetInfo.offset));
|
||||||
|
}
|
||||||
|
if (flags & FLAG_DELETED)
|
||||||
|
{
|
||||||
|
callback(null);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
buf.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
|
||||||
|
this.cli.write(this.config.pool_id, objectGetInfo.volume, sect_pos, sect.buffer, { version: version+1n }, (err) =>
|
||||||
|
{
|
||||||
|
if (err == vitastor.EINTR)
|
||||||
|
{
|
||||||
|
// Retry
|
||||||
|
this.delete(objectGetInfo, reqUids, callback);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (!err)
|
||||||
|
{
|
||||||
|
const del_stat = this.volume_delete_stats[vol.id] = (this.volume_delete_stats[vol.id] || { count: 0, bytes: 0 });
|
||||||
|
del_stat.count++;
|
||||||
|
del_stat.bytes += objectGetInfo.size;
|
||||||
|
}
|
||||||
|
callback(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* config: full zenko server config,
|
||||||
|
* callback: (error, stats) => void, // stats is the returned statistics in arbitrary format
|
||||||
|
*/
|
||||||
|
getDiskUsage(config, reqUids, callback)
|
||||||
|
{
|
||||||
|
// FIXME: Iterate all volumes and return its sizes and deletion statistics, or maybe just sizes
|
||||||
|
callback(null, {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class VitastorReadStream extends stream.Readable
|
||||||
|
{
|
||||||
|
constructor(cli, volume_id, offset, len, config, options)
|
||||||
|
{
|
||||||
|
super(options);
|
||||||
|
this.cli = cli;
|
||||||
|
this.volume_id = volume_id;
|
||||||
|
this.offset = offset;
|
||||||
|
this.end = offset + len;
|
||||||
|
this.pos = offset;
|
||||||
|
this.config = config;
|
||||||
|
this._reading = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
_read(n)
|
||||||
|
{
|
||||||
|
if (this._reading)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const chunk_size = this.config.read_chunk_size < n ? n : this.config.read_chunk_size;
|
||||||
|
const read_offset = this.pos;
|
||||||
|
const round_offset = read_offset - (read_offset % this.config.sector_size);
|
||||||
|
let read_end = this.end <= read_offset+chunk_size ? this.end : read_offset+chunk_size;
|
||||||
|
const round_end = (read_end % this.config.sector_size)
|
||||||
|
? read_end + this.config.sector_size - (read_end % this.config.sector_size)
|
||||||
|
: read_end;
|
||||||
|
if (round_end <= this.end)
|
||||||
|
read_end = round_end;
|
||||||
|
this.pos = read_end;
|
||||||
|
if (read_end <= read_offset)
|
||||||
|
{
|
||||||
|
// EOF
|
||||||
|
this.push(null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this._reading = true;
|
||||||
|
this.cli.read(this.config.pool_id, this.volume_id, round_offset, round_end-round_offset, (err, buf, version) =>
|
||||||
|
{
|
||||||
|
this.reading = false;
|
||||||
|
if (err)
|
||||||
|
{
|
||||||
|
this.destroy(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (read_offset != round_offset || round_end != read_end)
|
||||||
|
{
|
||||||
|
buf = buf.subarray(read_offset-round_offset, buf.length-(round_end-read_end));
|
||||||
|
}
|
||||||
|
if (this.push(buf))
|
||||||
|
{
|
||||||
|
this._read(n);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = VitastorBackend;
|
|
@ -10,21 +10,21 @@ function trySetDirSyncFlag(path) {
|
||||||
|
|
||||||
const GETFLAGS = 2148034049;
|
const GETFLAGS = 2148034049;
|
||||||
const SETFLAGS = 1074292226;
|
const SETFLAGS = 1074292226;
|
||||||
const FS_DIRSYNC_FL = 65536;
|
const FS_DIRSYNC_FL = 65536n;
|
||||||
const buffer = Buffer.alloc(8, 0);
|
const buffer = Buffer.alloc(8, 0);
|
||||||
const pathFD = fs.openSync(path, 'r');
|
const pathFD = fs.openSync(path, 'r');
|
||||||
const status = ioctl(pathFD, GETFLAGS, buffer);
|
const status = ioctl(pathFD, GETFLAGS, buffer);
|
||||||
assert.strictEqual(status, 0);
|
assert.strictEqual(status, 0);
|
||||||
const currentFlags = buffer.readUIntLE(0, 8);
|
const currentFlags = buffer.readBigInt64LE(0);
|
||||||
const flags = currentFlags | FS_DIRSYNC_FL;
|
const flags = currentFlags | FS_DIRSYNC_FL;
|
||||||
buffer.writeUIntLE(flags, 0, 8);
|
buffer.writeBigInt64LE(flags, 0);
|
||||||
const status2 = ioctl(pathFD, SETFLAGS, buffer);
|
const status2 = ioctl(pathFD, SETFLAGS, buffer);
|
||||||
assert.strictEqual(status2, 0);
|
assert.strictEqual(status2, 0);
|
||||||
fs.closeSync(pathFD);
|
fs.closeSync(pathFD);
|
||||||
const pathFD2 = fs.openSync(path, 'r');
|
const pathFD2 = fs.openSync(path, 'r');
|
||||||
const confirmBuffer = Buffer.alloc(8, 0);
|
const confirmBuffer = Buffer.alloc(8, 0);
|
||||||
ioctl(pathFD2, GETFLAGS, confirmBuffer);
|
ioctl(pathFD2, GETFLAGS, confirmBuffer);
|
||||||
assert.strictEqual(confirmBuffer.readUIntLE(0, 8),
|
assert.strictEqual(confirmBuffer.readBigInt64LE(0),
|
||||||
currentFlags | FS_DIRSYNC_FL, 'FS_DIRSYNC_FL not set');
|
currentFlags | FS_DIRSYNC_FL, 'FS_DIRSYNC_FL not set');
|
||||||
fs.closeSync(pathFD2);
|
fs.closeSync(pathFD2);
|
||||||
}
|
}
|
||||||
|
|
57
package.json
57
package.json
|
@ -20,39 +20,37 @@
|
||||||
"@azure/identity": "^3.1.1",
|
"@azure/identity": "^3.1.1",
|
||||||
"@azure/storage-blob": "^12.12.0",
|
"@azure/storage-blob": "^12.12.0",
|
||||||
"@js-sdsl/ordered-set": "^4.4.2",
|
"@js-sdsl/ordered-set": "^4.4.2",
|
||||||
"@types/async": "^3.2.12",
|
"@swc/cli": "^0.4.0",
|
||||||
"@types/utf8": "^3.0.1",
|
"@swc/core": "^1.7.4",
|
||||||
"JSONStream": "^1.0.0",
|
|
||||||
"agentkeepalive": "^4.1.3",
|
"agentkeepalive": "^4.1.3",
|
||||||
"ajv": "6.12.3",
|
"ajv": "^6.12.3",
|
||||||
"async": "~2.6.4",
|
"async": "^2.6.4",
|
||||||
"aws-sdk": "^2.1005.0",
|
"aws-sdk": "^2.1005.0",
|
||||||
"backo": "^1.1.0",
|
"backo": "^1.1.0",
|
||||||
"base-x": "3.0.8",
|
"base-x": "^3.0.8",
|
||||||
"base62": "2.0.1",
|
"base62": "^2.0.1",
|
||||||
"bson": "4.0.0",
|
"bson": "^4.0.0",
|
||||||
"debug": "~4.1.0",
|
"debug": "^4.1.0",
|
||||||
"diskusage": "^1.1.1",
|
"diskusage": "^1.1.1",
|
||||||
"fcntl": "github:scality/node-fcntl#0.2.2",
|
"fcntl": "git+https://git.yourcmc.ru/vitalif/zenko-fcntl.git",
|
||||||
"hdclient": "scality/hdclient#1.1.7",
|
"httpagent": "git+https://git.yourcmc.ru/vitalif/zenko-httpagent.git#development/1.0",
|
||||||
"httpagent": "scality/httpagent#1.0.6",
|
|
||||||
"https-proxy-agent": "^2.2.0",
|
"https-proxy-agent": "^2.2.0",
|
||||||
"ioredis": "^4.28.5",
|
"ioredis": "^4.28.5",
|
||||||
"ipaddr.js": "1.9.1",
|
"ipaddr.js": "^1.9.1",
|
||||||
"joi": "^17.6.0",
|
"joi": "^17.6.0",
|
||||||
"level": "~5.0.1",
|
"JSONStream": "^1.0.0",
|
||||||
"level-sublevel": "~6.6.5",
|
"level": "^5.0.1",
|
||||||
|
"level-sublevel": "^6.6.5",
|
||||||
"mongodb": "^5.2.0",
|
"mongodb": "^5.2.0",
|
||||||
"node-forge": "^1.3.0",
|
"node-forge": "^1.3.0",
|
||||||
"prom-client": "14.2.0",
|
"prom-client": "^14.2.0",
|
||||||
"simple-glob": "^0.2.0",
|
"simple-glob": "^0.2.0",
|
||||||
"socket.io": "~4.6.1",
|
"socket.io": "^4.6.1",
|
||||||
"socket.io-client": "~4.6.1",
|
"socket.io-client": "^4.6.1",
|
||||||
"sproxydclient": "git+https://github.com/scality/sproxydclient#8.0.10",
|
"utf8": "^3.0.0",
|
||||||
"utf8": "3.0.0",
|
|
||||||
"uuid": "^3.0.1",
|
"uuid": "^3.0.1",
|
||||||
"werelogs": "scality/werelogs#8.1.4",
|
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1",
|
||||||
"xml2js": "~0.4.23"
|
"xml2js": "^0.4.23"
|
||||||
},
|
},
|
||||||
"optionalDependencies": {
|
"optionalDependencies": {
|
||||||
"ioctl": "^2.0.2"
|
"ioctl": "^2.0.2"
|
||||||
|
@ -61,22 +59,24 @@
|
||||||
"@babel/preset-env": "^7.16.11",
|
"@babel/preset-env": "^7.16.11",
|
||||||
"@babel/preset-typescript": "^7.16.7",
|
"@babel/preset-typescript": "^7.16.7",
|
||||||
"@sinonjs/fake-timers": "^6.0.1",
|
"@sinonjs/fake-timers": "^6.0.1",
|
||||||
|
"@types/async": "^3.2.12",
|
||||||
|
"@types/utf8": "^3.0.1",
|
||||||
"@types/ioredis": "^4.28.10",
|
"@types/ioredis": "^4.28.10",
|
||||||
"@types/jest": "^27.4.1",
|
"@types/jest": "^27.4.1",
|
||||||
"@types/node": "^17.0.21",
|
"@types/node": "^18.19.41",
|
||||||
"@types/xml2js": "^0.4.11",
|
"@types/xml2js": "^0.4.11",
|
||||||
"eslint": "^8.14.0",
|
"eslint": "^8.14.0",
|
||||||
"eslint-config-airbnb": "6.2.0",
|
"eslint-config-airbnb-base": "^15.0.0",
|
||||||
"eslint-config-scality": "scality/Guidelines#ec33dfb",
|
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git",
|
||||||
"eslint-plugin-react": "^4.3.0",
|
"eslint-plugin-react": "^4.3.0",
|
||||||
"jest": "^27.5.1",
|
"jest": "^27.5.1",
|
||||||
"mongodb-memory-server": "^8.12.2",
|
"mongodb-memory-server": "^8.12.2",
|
||||||
"nyc": "^15.1.0",
|
"nyc": "^15.1.0",
|
||||||
"sinon": "^9.0.2",
|
"sinon": "^9.0.2",
|
||||||
"temp": "0.9.1",
|
"temp": "^0.9.1",
|
||||||
"ts-jest": "^27.1.3",
|
"ts-jest": "^27.1.3",
|
||||||
"ts-node": "^10.6.0",
|
"ts-node": "^10.6.0",
|
||||||
"typescript": "^4.6.2"
|
"typescript": "^4.9.5"
|
||||||
},
|
},
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"lint": "eslint $(git ls-files '*.js')",
|
"lint": "eslint $(git ls-files '*.js')",
|
||||||
|
@ -84,7 +84,8 @@
|
||||||
"lint_yml": "yamllint $(git ls-files '*.yml')",
|
"lint_yml": "yamllint $(git ls-files '*.yml')",
|
||||||
"test": "jest tests/unit",
|
"test": "jest tests/unit",
|
||||||
"build": "tsc",
|
"build": "tsc",
|
||||||
"prepare": "yarn build",
|
"prepack": "tsc",
|
||||||
|
"postinstall": "[ -d build ] || swc -d build --copy-files package.json index.ts lib",
|
||||||
"ft_test": "jest tests/functional --testTimeout=120000 --forceExit",
|
"ft_test": "jest tests/functional --testTimeout=120000 --forceExit",
|
||||||
"coverage": "nyc --clean jest tests --coverage --testTimeout=120000 --forceExit",
|
"coverage": "nyc --clean jest tests --coverage --testTimeout=120000 --forceExit",
|
||||||
"build_doc": "cd documentation/listingAlgos/pics; dot -Tsvg delimiterStateChart.dot > delimiterStateChart.svg; dot -Tsvg delimiterMasterV0StateChart.dot > delimiterMasterV0StateChart.svg; dot -Tsvg delimiterVersionsStateChart.dot > delimiterVersionsStateChart.svg"
|
"build_doc": "cd documentation/listingAlgos/pics; dot -Tsvg delimiterStateChart.dot > delimiterStateChart.svg; dot -Tsvg delimiterMasterV0StateChart.dot > delimiterMasterV0StateChart.svg; dot -Tsvg delimiterVersionsStateChart.dot > delimiterVersionsStateChart.svg"
|
||||||
|
|
Loading…
Reference in New Issue