Compare commits

..

1 Commits

Author SHA1 Message Date
Maha Benzekri 0745ae2d13
logs 2024-05-24 16:24:11 +02:00
17 changed files with 7489 additions and 905 deletions

View File

@ -46,9 +46,7 @@ jobs:
run: yarn --silent coverage run: yarn --silent coverage
- name: run functional tests - name: run functional tests
run: yarn ft_test run: yarn ft_test
- uses: codecov/codecov-action@v4 - uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
- name: run executables tests - name: run executables tests
run: yarn install && yarn test run: yarn install && yarn test
working-directory: 'lib/executables/pensieveCreds/' working-directory: 'lib/executables/pensieveCreds/'

12
.swcrc
View File

@ -1,12 +0,0 @@
{
"$schema": "https://swc.rs/schema.json",
"jsc": {
"parser": {
"syntax": "typescript"
},
"target": "es2017"
},
"module": {
"type": "commonjs"
}
}

View File

@ -196,9 +196,6 @@ export class Delimiter extends Extension {
} }
getCommonPrefix(key: string): string | undefined { getCommonPrefix(key: string): string | undefined {
if (!this.delimiter) {
return undefined;
}
const baseIndex = this.prefix ? this.prefix.length : 0; const baseIndex = this.prefix ? this.prefix.length : 0;
const delimiterIndex = key.indexOf(this.delimiter, baseIndex); const delimiterIndex = key.indexOf(this.delimiter, baseIndex);
if (delimiterIndex === -1) { if (delimiterIndex === -1) {

View File

@ -83,7 +83,7 @@ export type ResultObject = {
export type CommandPromise = { export type CommandPromise = {
resolve: (results?: ResultObject[]) => void; resolve: (results?: ResultObject[]) => void;
reject: (error: Error) => void; reject: (error: Error) => void;
timeout: NodeJS.Timeout | null; timeout: NodeJS.Timer | null;
}; };
export type HandlerCallback = (error: (Error & { code?: number }) | null | undefined, result?: any) => void; export type HandlerCallback = (error: (Error & { code?: number }) | null | undefined, result?: any) => void;
export type HandlerFunction = (payload: object, uids: string, callback: HandlerCallback) => void; export type HandlerFunction = (payload: object, uids: string, callback: HandlerCallback) => void;
@ -254,7 +254,7 @@ export async function sendWorkerCommand(
} }
rpcLogger.info('sending command', { toWorkers, toHandler, uids, payload }); rpcLogger.info('sending command', { toWorkers, toHandler, uids, payload });
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let timeout: NodeJS.Timeout | null = null; let timeout: NodeJS.Timer | null = null;
if (timeoutMs) { if (timeoutMs) {
timeout = setTimeout(() => { timeout = setTimeout(() => {
delete uidsToCommandPromise[uids]; delete uidsToCommandPromise[uids];

View File

@ -435,6 +435,7 @@ export default class Server {
this._server.on('connection', sock => { this._server.on('connection', sock => {
// Setting no delay of the socket to the value configured // Setting no delay of the socket to the value configured
// TODO fix this // TODO fix this
// @ts-expect-errors
sock.setNoDelay(this.isNoDelay()); sock.setNoDelay(this.isNoDelay());
sock.on('error', err => this._logger.info( sock.on('error', err => this._logger.info(
'socket error - request rejected', { error: err })); 'socket error - request rejected', { error: err }));

View File

@ -168,6 +168,7 @@ export function meetConditions(
// reference_policies_multi-value-conditions.html) // reference_policies_multi-value-conditions.html)
let keyBasedOnRequestContext = let keyBasedOnRequestContext =
findConditionKey(transformedKey, requestContext); findConditionKey(transformedKey, requestContext);
log.trace(`keyBasedOnRequestContext ${keyBasedOnRequestContext}`);
// Handle IfExists and negation operators // Handle IfExists and negation operators
if ((keyBasedOnRequestContext === undefined || if ((keyBasedOnRequestContext === undefined ||
keyBasedOnRequestContext === null) && keyBasedOnRequestContext === null) &&
@ -179,6 +180,7 @@ export function meetConditions(
// If no IfExists qualifier, the key does not exist and the // If no IfExists qualifier, the key does not exist and the
// condition operator is not Null, the // condition operator is not Null, the
// condition is not met so return false. // condition is not met so return false.
log.trace(`bareOperator ${bareOperator}`);
if ((keyBasedOnRequestContext === null || if ((keyBasedOnRequestContext === null ||
keyBasedOnRequestContext === undefined) && keyBasedOnRequestContext === undefined) &&
bareOperator !== 'Null') { bareOperator !== 'Null') {

View File

@ -71,7 +71,6 @@ const actionMapRQ = {
initiateMultipartUpload: 's3:PutObject', initiateMultipartUpload: 's3:PutObject',
objectDeleteVersion: 's3:DeleteObjectVersion', objectDeleteVersion: 's3:DeleteObjectVersion',
objectDeleteTaggingVersion: 's3:DeleteObjectVersionTagging', objectDeleteTaggingVersion: 's3:DeleteObjectVersionTagging',
objectGetArchiveInfo: 'scality:GetObjectArchiveInfo',
objectGetVersion: 's3:GetObjectVersion', objectGetVersion: 's3:GetObjectVersion',
objectGetACLVersion: 's3:GetObjectVersionAcl', objectGetACLVersion: 's3:GetObjectVersionAcl',
objectGetTaggingVersion: 's3:GetObjectVersionTagging', objectGetTaggingVersion: 's3:GetObjectVersionTagging',

View File

@ -1,10 +1,10 @@
const { http, https } = require('httpagent'); const { http, https } = require('httpagent');
const url = require('url'); const url = require('url');
const AWS = require('aws-sdk'); const AWS = require('aws-sdk');
const Sproxy = require('sproxydclient');
const Hyperdrive = require('hdclient');
const HttpsProxyAgent = require('https-proxy-agent'); const HttpsProxyAgent = require('https-proxy-agent');
require("aws-sdk/lib/maintenance_mode_message").suppress = true;
const constants = require('../../constants'); const constants = require('../../constants');
const DataFileBackend = require('./file/DataFileInterface'); const DataFileBackend = require('./file/DataFileInterface');
const inMemory = require('./in_memory/datastore').backend; const inMemory = require('./in_memory/datastore').backend;
@ -25,13 +25,8 @@ function parseLC(config, vault) {
if (locationObj.type === 'file') { if (locationObj.type === 'file') {
clients[location] = new DataFileBackend(config); clients[location] = new DataFileBackend(config);
} }
if (locationObj.type === 'vitastor') {
const VitastorBackend = require('./vitastor/VitastorBackend');
clients[location] = new VitastorBackend(location, locationObj.details);
}
if (locationObj.type === 'scality') { if (locationObj.type === 'scality') {
if (locationObj.details.connector.sproxyd) { if (locationObj.details.connector.sproxyd) {
const Sproxy = require('sproxydclient');
clients[location] = new Sproxy({ clients[location] = new Sproxy({
bootstrap: locationObj.details.connector bootstrap: locationObj.details.connector
.sproxyd.bootstrap, .sproxyd.bootstrap,
@ -46,7 +41,6 @@ function parseLC(config, vault) {
}); });
clients[location].clientType = 'scality'; clients[location].clientType = 'scality';
} else if (locationObj.details.connector.hdclient) { } else if (locationObj.details.connector.hdclient) {
const Hyperdrive = require('hdclient');
clients[location] = new Hyperdrive.hdcontroller.HDProxydClient( clients[location] = new Hyperdrive.hdcontroller.HDProxydClient(
locationObj.details.connector.hdclient); locationObj.details.connector.hdclient);
clients[location].clientType = 'scality'; clients[location].clientType = 'scality';

View File

@ -5,7 +5,6 @@ const { parseTagFromQuery } = require('../../s3middleware/tagging');
const { externalBackendHealthCheckInterval } = require('../../constants'); const { externalBackendHealthCheckInterval } = require('../../constants');
const DataFileBackend = require('./file/DataFileInterface'); const DataFileBackend = require('./file/DataFileInterface');
const { createLogger, checkExternalBackend } = require('./external/utils'); const { createLogger, checkExternalBackend } = require('./external/utils');
const jsutil = require('../../jsutil');
class MultipleBackendGateway { class MultipleBackendGateway {
constructor(clients, metadata, locStorageCheckFn) { constructor(clients, metadata, locStorageCheckFn) {
@ -200,12 +199,11 @@ class MultipleBackendGateway {
uploadPart(request, streamingV4Params, stream, size, location, key, uploadPart(request, streamingV4Params, stream, size, location, key,
uploadId, partNumber, bucketName, log, cb) { uploadId, partNumber, bucketName, log, cb) {
const client = this.clients[location]; const client = this.clients[location];
const cbOnce = jsutil.once(cb);
if (client.uploadPart) { if (client.uploadPart) {
return this.locStorageCheckFn(location, size, log, err => { return this.locStorageCheckFn(location, size, log, err => {
if (err) { if (err) {
return cbOnce(err); return cb(err);
} }
return client.uploadPart(request, streamingV4Params, stream, return client.uploadPart(request, streamingV4Params, stream,
size, key, uploadId, partNumber, bucketName, log, size, key, uploadId, partNumber, bucketName, log,
@ -219,14 +217,14 @@ class MultipleBackendGateway {
'metric following object PUT failure', 'metric following object PUT failure',
{ error: error.message }); { error: error.message });
} }
return cbOnce(err); return cb(err);
}); });
} }
return cbOnce(null, partInfo); return cb(null, partInfo);
}); });
}); });
} }
return cbOnce(); return cb();
} }
listParts(key, uploadId, location, bucketName, partNumberMarker, maxParts, listParts(key, uploadId, location, bucketName, partNumberMarker, maxParts,

View File

@ -8,7 +8,6 @@ const getMetaHeaders =
const { prepareStream } = require('../../../s3middleware/prepareStream'); const { prepareStream } = require('../../../s3middleware/prepareStream');
const { createLogger, logHelper, removeQuotes, trimXMetaPrefix } = const { createLogger, logHelper, removeQuotes, trimXMetaPrefix } =
require('./utils'); require('./utils');
const jsutil = require('../../../jsutil');
const missingVerIdInternalError = errors.InternalError.customizeDescription( const missingVerIdInternalError = errors.InternalError.customizeDescription(
'Invalid state. Please ensure versioning is enabled ' + 'Invalid state. Please ensure versioning is enabled ' +
@ -318,11 +317,9 @@ class AwsClient {
uploadPart(request, streamingV4Params, stream, size, key, uploadId, uploadPart(request, streamingV4Params, stream, size, key, uploadId,
partNumber, bucketName, log, callback) { partNumber, bucketName, log, callback) {
let hashedStream = stream; let hashedStream = stream;
const cbOnce = jsutil.once(callback);
if (request) { if (request) {
const partStream = prepareStream(request, streamingV4Params, const partStream = prepareStream(request, streamingV4Params,
this._vault, log, cbOnce); this._vault, log, callback);
hashedStream = new MD5Sum(); hashedStream = new MD5Sum();
partStream.pipe(hashedStream); partStream.pipe(hashedStream);
} }
@ -336,7 +333,7 @@ class AwsClient {
if (err) { if (err) {
logHelper(log, 'error', 'err from data backend ' + logHelper(log, 'error', 'err from data backend ' +
'on uploadPart', err, this._dataStoreName, this.clientType); 'on uploadPart', err, this._dataStoreName, this.clientType);
return cbOnce(errors.ServiceUnavailable return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' + .customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`), `${this.type}: ${err.message}`),
); );
@ -350,7 +347,7 @@ class AwsClient {
dataStoreName: this._dataStoreName, dataStoreName: this._dataStoreName,
dataStoreETag: noQuotesETag, dataStoreETag: noQuotesETag,
}; };
return cbOnce(null, dataRetrievalInfo); return callback(null, dataRetrievalInfo);
}); });
} }

View File

@ -1,696 +0,0 @@
// Zenko CloudServer Vitastor data storage backend adapter
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const stream = require('stream');
const vitastor = require('vitastor');
const VOLUME_MAGIC = 'VstS3Vol';
const OBJECT_MAGIC = 'VstS3Obj';
const FLAG_DELETED = 2n;
type Volume = {
id: number,
partial_sectors: {
[key: string]: {
buffer: Buffer,
refs: number,
},
},
header: {
location: string,
bucket: string,
max_size: number,
create_ts: number,
used_ts: number,
size: number,
objects: number,
removed_objects: number,
object_bytes: number,
removed_bytes: number,
},
};
type ObjectHeader = {
size: number,
key: string,
part_num?: number,
};
class VitastorBackend
{
locationName: string;
config: {
pool_id: number,
metadata_image: string,
metadata_pool_id: number,
metadata_inode_num: number,
size_buckets: number[],
size_bucket_mul: number,
id_batch_size: number,
sector_size: number,
write_chunk_size: number,
read_chunk_size: number,
pack_objects: boolean,
// and also other parameters for vitastor itself
};
next_id: number;
alloc_id: number;
opened: boolean;
on_open: ((...args: any[]) => void)[] | null;
open_error: Error | null;
cli: any;
kv: any;
volumes: {
[bucket: string]: {
[max_size: string]: Volume,
},
};
volumes_by_id: {
[id: string]: Volume,
};
volume_delete_stats: {
[id: string]: {
count: number,
bytes: number,
},
};
constructor(locationName, config)
{
this.locationName = locationName;
this.config = config;
// validate config
this.config.pool_id = Number(this.config.pool_id) || 0;
if (!this.config.pool_id)
throw new Error('pool_id is required for Vitastor');
if (!this.config.metadata_image && (!this.config.metadata_pool_id || !this.config.metadata_inode_num))
throw new Error('metadata_image or metadata_inode is required for Vitastor');
if (!this.config.size_buckets || !this.config.size_buckets.length)
this.config.size_buckets = [ 32*1024, 128*1024, 512*1024, 2*1024, 8*1024 ];
this.config.size_bucket_mul = Number(this.config.size_bucket_mul) || 2;
this.config.id_batch_size = Number(this.config.id_batch_size) || 100;
this.config.sector_size = Number(this.config.sector_size) || 0;
if (this.config.sector_size < 4096)
this.config.sector_size = 4096;
this.config.write_chunk_size = Number(this.config.write_chunk_size) || 0;
if (this.config.write_chunk_size < this.config.sector_size)
this.config.write_chunk_size = 4*1024*1024; // 4 MB
this.config.read_chunk_size = Number(this.config.read_chunk_size) || 0;
if (this.config.read_chunk_size < this.config.sector_size)
this.config.read_chunk_size = 4*1024*1024; // 4 MB
this.config.pack_objects = !!this.config.pack_objects;
// state
this.next_id = 1;
this.alloc_id = 0;
this.opened = false;
this.on_open = null;
this.open_error = null;
this.cli = new vitastor.Client(config);
this.kv = new vitastor.KV(this.cli);
// we group objects into volumes by bucket and size
this.volumes = {};
this.volumes_by_id = {};
this.volume_delete_stats = {};
}
async _makeVolumeId()
{
if (this.next_id <= this.alloc_id)
{
return this.next_id++;
}
const id_key = 'id'+this.config.pool_id;
const [ err, prev ] = await new Promise<[ any, string ]>(ok => this.kv.get(id_key, (err, value) => ok([ err, value ])));
if (err && err != vitastor.ENOENT)
{
throw new Error(err);
}
const new_id = (parseInt(prev) || 0) + 1;
this.next_id = new_id;
this.alloc_id = this.next_id + this.config.id_batch_size - 1;
await new Promise((ok, no) => this.kv.set(id_key, this.alloc_id, err => (err ? no(new Error(err)) : ok(null)), cas_old => cas_old === prev));
return this.next_id;
}
async _getVolume(bucketName, size)
{
if (!this.opened)
{
if (this.on_open)
{
await new Promise(ok => this.on_open!.push(ok));
}
else
{
this.on_open = [];
if (this.config.metadata_image)
{
const img = new vitastor.Image(this.cli, this.config.metadata_image);
const info = await new Promise<{ pool_id: number, inode_num: number }>(ok => img.get_info(ok));
this.config.metadata_pool_id = info.pool_id;
this.config.metadata_inode_num = info.inode_num;
}
const kv_config = {};
for (const key in this.config)
{
if (key.substr(0, 3) === 'kv_')
kv_config[key] = this.config[key];
}
this.open_error = await new Promise(ok => this.kv.open(
this.config.metadata_pool_id, this.config.metadata_inode_num,
kv_config, err => ok(err ? new Error(err) : null)
));
this.opened = true;
this.on_open.map(cb => setImmediate(cb));
this.on_open = null;
}
}
if (this.open_error)
{
throw this.open_error;
}
let i;
for (i = 0; i < this.config.size_buckets.length && size >= this.config.size_buckets[i]; i++) {}
let s;
if (i < this.config.size_buckets.length)
s = this.config.size_buckets[i];
else if (this.config.size_bucket_mul > 1)
{
while (size >= s)
s = Math.floor(this.config.size_bucket_mul * s);
}
if (!this.volumes[bucketName])
{
this.volumes[bucketName] = {};
}
if (this.volumes[bucketName][s])
{
return this.volumes[bucketName][s];
}
const new_id = await this._makeVolumeId();
const new_vol = this.volumes[bucketName][s] = {
id: new_id,
// FIXME: partial_sectors should be written with CAS because otherwise we may lose quick deletes
partial_sectors: {},
header: {
location: this.locationName,
bucket: bucketName,
max_size: s,
create_ts: Date.now(),
used_ts: Date.now(),
size: this.config.sector_size, // initial position is right after header
objects: 0,
removed_objects: 0,
object_bytes: 0,
removed_bytes: 0,
},
};
this.volumes_by_id[new_id] = new_vol;
const header_text = JSON.stringify(this.volumes[bucketName][s].header);
const buf = Buffer.alloc(this.config.sector_size);
buf.write(VOLUME_MAGIC + header_text, 0);
await new Promise((ok, no) => this.cli.write(
this.config.pool_id, new_id, 0, buf, err => (err ? no(new Error(err)) : ok(null))
));
await new Promise((ok, no) => this.kv.set(
'vol_'+this.config.pool_id+'_'+new_id, header_text, err => (err ? no(new Error(err)) : ok(null)), cas_old => !cas_old
));
return new_vol;
}
toObjectGetInfo(objectKey, bucketName, storageLocation)
{
return null;
}
_bufferStart(vol, cur_pos, cur_size, cur_chunks, sector_refs)
{
if ((cur_pos % this.config.sector_size) ||
Math.floor((cur_pos + cur_size) / this.config.sector_size) == Math.floor(cur_pos / this.config.sector_size))
{
const sect_pos = Math.floor(cur_pos / this.config.sector_size) * this.config.sector_size;
const sect = vol.partial_sectors[sect_pos]
? vol.partial_sectors[sect_pos].buffer
: Buffer.alloc(this.config.sector_size);
if (this.config.pack_objects)
{
// Save only if <pack_objects>
if (!vol.partial_sectors[sect_pos])
vol.partial_sectors[sect_pos] = { buffer: sect, refs: 0 };
vol.partial_sectors[sect_pos].refs++;
sector_refs.push(sect_pos);
}
let off = cur_pos % this.config.sector_size;
let i = 0;
for (; i < cur_chunks.length; i++)
{
let copy_len = this.config.sector_size - off;
copy_len = copy_len > cur_chunks[i].length ? cur_chunks[i].length : copy_len;
cur_chunks[i].copy(sect, off, 0, copy_len);
off += copy_len;
if (copy_len < cur_chunks[i].length)
{
cur_chunks[i] = cur_chunks[i].slice(copy_len);
cur_size -= copy_len;
break;
}
else
cur_size -= cur_chunks[i].length;
}
cur_chunks.splice(0, i, sect);
cur_size += this.config.sector_size;
cur_pos = sect_pos;
}
return [ cur_pos, cur_size ];
}
_bufferEnd(vol, cur_pos, cur_size, cur_chunks, sector_refs, write_all)
{
const write_pos = cur_pos;
const write_chunks = cur_chunks;
let write_size = cur_size;
cur_chunks = [];
cur_pos += cur_size;
cur_size = 0;
let remain = (cur_pos % this.config.sector_size);
if (remain > 0)
{
cur_pos -= remain;
let last_sect = null;
if (write_all)
{
last_sect = vol.partial_sectors[cur_pos]
? vol.partial_sectors[cur_pos].buffer
: Buffer.alloc(this.config.sector_size);
if (this.config.pack_objects)
{
// Save only if <pack_objects>
if (!vol.partial_sectors[cur_pos])
vol.partial_sectors[cur_pos] = { buffer: last_sect, refs: 0 };
vol.partial_sectors[cur_pos].refs++;
sector_refs.push(cur_pos);
}
}
write_size -= remain;
if (write_size < 0)
write_size = 0;
for (let i = write_chunks.length-1; i >= 0 && remain > 0; i--)
{
if (write_chunks[i].length <= remain)
{
remain -= write_chunks[i].length;
if (write_all)
write_chunks[i].copy(last_sect, remain);
else
cur_chunks.unshift(write_chunks[i]);
write_chunks.pop();
}
else
{
if (write_all)
write_chunks[i].copy(last_sect, 0, write_chunks[i].length - remain);
else
cur_chunks.unshift(write_chunks[i].slice(write_chunks[i].length - remain));
write_chunks[i] = write_chunks[i].slice(0, write_chunks[i].length - remain);
remain = 0;
i++;
}
}
if (write_all)
{
write_chunks.push(last_sect);
write_size += this.config.sector_size;
}
}
for (const chunk of cur_chunks)
{
cur_size += chunk.length;
}
return [ write_pos, write_chunks, write_size, cur_pos, cur_size, cur_chunks ];
}
/**
* reqUids: string, // request-ids for log, usually joined by ':'
* keyContext: {
* // a lot of shit, basically all metadata
* bucketName,
* objectKey,
* owner?,
* namespace?,
* partNumber?,
* uploadId?,
* metaHeaders?,
* isDeleteMarker?,
* tagging?,
* contentType?,
* cacheControl?,
* contentDisposition?,
* contentEncoding?,
* },
* callback: (error, objectGetInfo: any) => void,
*/
put(stream, size, keyContext, reqUids, callback)
{
callback = once(callback);
this._getVolume(keyContext.bucketName, size)
.then(vol => this._put(vol, stream, size, keyContext, reqUids, callback))
.catch(callback);
}
_put(vol, stream, size, keyContext, reqUids, callback)
{
const object_header: ObjectHeader = {
size,
key: keyContext.objectKey,
};
if (keyContext.partNumber)
{
object_header.part_num = keyContext.partNumber;
}
// header is: <8 bytes magic> <8 bytes flags> <8 bytes json length> <json>
const hdr_begin_buf = Buffer.alloc(24);
const hdr_json_buf = Buffer.from(JSON.stringify(object_header), 'utf-8');
hdr_begin_buf.write(OBJECT_MAGIC);
hdr_begin_buf.writeBigInt64LE(BigInt(hdr_json_buf.length), 16);
const object_header_buf = Buffer.concat([ hdr_begin_buf, hdr_json_buf ]);
const object_pos = vol.header.size;
const object_get_info = { volume: vol.id, offset: object_pos, hdrlen: object_header_buf.length, size };
let cur_pos = object_pos;
let cur_chunks = [ object_header_buf ];
let cur_size = object_header_buf.length;
let err: Error|null = null;
let waiting = 1; // 1 for end or error, 1 for each write request
vol.header.size += object_header_buf.length + size;
if (!this.config.pack_objects && (vol.header.size % this.config.sector_size))
{
vol.header.size += this.config.sector_size - (vol.header.size % this.config.sector_size);
}
const writeChunk = (last) =>
{
const sector_refs = [];
// Handle partial beginning
[ cur_pos, cur_size ] = this._bufferStart(vol, cur_pos, cur_size, cur_chunks, sector_refs);
// Handle partial end
let write_pos, write_chunks, write_size;
[ write_pos, write_chunks, write_size, cur_pos, cur_size, cur_chunks ] = this._bufferEnd(vol, cur_pos, cur_size, cur_chunks, sector_refs, last);
waiting++;
// FIXME: pool_id: maybe it should be stored in volume metadata to allow to migrate volumes?
this.cli.write(this.config.pool_id, vol.id, write_pos, write_chunks, (res) =>
{
for (const sect of sector_refs)
{
vol.partial_sectors[sect].refs--;
if (!vol.partial_sectors[sect].refs &&
vol.header.size >= sect+this.config.sector_size)
{
// Forget partial data when it's not needed anymore
delete(vol.partial_sectors[sect]);
}
}
waiting--;
if (res)
{
err = new Error(res);
waiting--;
}
if (!waiting)
{
callback(err, err ? null : object_get_info);
}
});
};
// Stream data
stream.on('error', (e) =>
{
err = e;
waiting--;
if (!waiting)
{
callback(err, null);
}
});
stream.on('end', () =>
{
if (err)
{
return;
}
waiting--;
if (cur_size)
{
// write last chunk
writeChunk(true);
}
if (!waiting)
{
callback(null, object_get_info);
}
});
stream.on('data', (chunk) =>
{
if (err)
{
return;
}
cur_chunks.push(chunk);
cur_size += chunk.length;
if (cur_size >= this.config.write_chunk_size)
{
// got a complete chunk, write it out
writeChunk(false);
}
});
}
/**
* objectGetInfo: {
* key: { volume, offset, hdrlen, size }, // from put
* size,
* start,
* dataStoreName,
* dataStoreETag,
* range,
* response: ServerResponse,
* },
* range?: [ start, end ], // like in HTTP - first byte index, last byte index
* callback: (error, readStream) => void,
*/
get(objectGetInfo, range, reqUids, callback)
{
if (!(objectGetInfo instanceof Object) || !objectGetInfo.key ||
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
{
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
}
const [ start, end ] = range || [];
if (start < 0 || end < 0 || end != null && start != null && end < start || start >= objectGetInfo.key.size)
{
throw new Error('Invalid range: '+start+'-'+end);
}
let offset = objectGetInfo.key.offset + objectGetInfo.key.hdrlen + (start || 0);
let len = objectGetInfo.key.size - (start || 0);
if (end)
{
const len2 = end - (start || 0) + 1;
if (len2 < len)
len = len2;
}
callback(null, new VitastorReadStream(this.cli, objectGetInfo.key.volume, offset, len, this.config));
}
/**
* objectGetInfo: {
* key: { volume, offset, hdrlen, size }, // from put
* size,
* start,
* dataStoreName,
* dataStoreETag,
* range,
* response: ServerResponse,
* },
* callback: (error) => void,
*/
delete(objectGetInfo, reqUids, callback)
{
callback = once(callback);
this._delete(objectGetInfo, reqUids)
.then(callback)
.catch(callback);
}
async _delete(objectGetInfo, reqUids)
{
if (!(objectGetInfo instanceof Object) || !objectGetInfo.key ||
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
{
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
}
const in_sect_pos = (objectGetInfo.key.offset % this.config.sector_size);
const sect_pos = objectGetInfo.key.offset - in_sect_pos;
const vol = this.volumes_by_id[objectGetInfo.key.volume];
if (vol && vol.partial_sectors[sect_pos])
{
// The sector may still be written to in corner cases
const sect = vol.partial_sectors[sect_pos];
const flags = sect.buffer.readBigInt64LE(in_sect_pos + 8);
if (!(flags & FLAG_DELETED))
{
const del_stat = this.volume_delete_stats[vol.id] = (this.volume_delete_stats[vol.id] || { count: 0, bytes: 0 });
del_stat.count++;
del_stat.bytes += objectGetInfo.key.size;
sect.buffer.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
sect.refs++;
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, sect.buffer, ok));
sect.refs--;
if (err)
{
sect.buffer.writeBigInt64LE(0n, in_sect_pos + 8);
throw new Error(err);
}
}
}
else
{
// RMW with CAS
const [ err, buf, version ] = await new Promise<[ any, Buffer, bigint ]>(ok => this.cli.read(
this.config.pool_id, objectGetInfo.key.volume, sect_pos, this.config.sector_size,
(err, buf, version) => ok([ err, buf, version ])
));
if (err)
{
throw new Error(err);
}
// FIXME What if JSON crosses sector boundary? Prevent it if we want to pack objects
const magic = buf.slice(in_sect_pos, in_sect_pos+8).toString();
const flags = buf.readBigInt64LE(in_sect_pos+8);
const json_len = Number(buf.readBigInt64LE(in_sect_pos+16));
let json_hdr;
if (in_sect_pos+24+json_len <= buf.length)
{
try
{
json_hdr = JSON.parse(buf.slice(in_sect_pos+24, in_sect_pos+24+json_len).toString());
}
catch (e)
{
}
}
if (magic !== OBJECT_MAGIC || !json_hdr || json_hdr.size !== objectGetInfo.key.size)
{
throw new Error(
'header of object with size '+objectGetInfo.key.size+
' bytes not found in volume '+objectGetInfo.key.volume+' at '+objectGetInfo.key.offset
);
}
else if (!(flags & FLAG_DELETED))
{
buf.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, buf, { version: version+1n }, ok));
if (err == vitastor.EINTR)
{
// Retry
await this._delete(objectGetInfo, reqUids);
}
else if (err)
{
throw new Error(err);
}
else
{
// FIXME: Write deletion statistics to volumes
// FIXME: Implement defragmentation
const del_stat = this.volume_delete_stats[objectGetInfo.key.volume] = (this.volume_delete_stats[objectGetInfo.key.volume] || { count: 0, bytes: 0 });
del_stat.count++;
del_stat.bytes += objectGetInfo.key.size;
}
}
}
}
/**
* config: full zenko server config,
* callback: (error, stats) => void, // stats is the returned statistics in arbitrary format
*/
getDiskUsage(config, reqUids, callback)
{
// FIXME: Iterate all volumes and return its sizes and deletion statistics, or maybe just sizes
callback(null, {});
}
}
class VitastorReadStream extends stream.Readable
{
constructor(cli, volume_id, offset, len, config, options = undefined)
{
super(options);
this.cli = cli;
this.volume_id = volume_id;
this.offset = offset;
this.end = offset + len;
this.pos = offset;
this.config = config;
this._reading = false;
}
_read(n)
{
if (this._reading)
{
return;
}
// FIXME: Validate object header
const chunk_size = n && this.config.read_chunk_size < n ? n : this.config.read_chunk_size;
const read_offset = this.pos;
const round_offset = read_offset - (read_offset % this.config.sector_size);
let read_end = this.end <= read_offset+chunk_size ? this.end : read_offset+chunk_size;
const round_end = (read_end % this.config.sector_size)
? read_end + this.config.sector_size - (read_end % this.config.sector_size)
: read_end;
if (round_end <= this.end)
read_end = round_end;
this.pos = read_end;
if (read_end <= read_offset)
{
// EOF
this.push(null);
return;
}
this._reading = true;
this.cli.read(this.config.pool_id, this.volume_id, round_offset, round_end-round_offset, (err, buf, version) =>
{
this._reading = false;
if (err)
{
this.destroy(new Error(err));
return;
}
if (read_offset != round_offset || round_end != read_end)
{
buf = buf.subarray(read_offset-round_offset, buf.length-(round_end-read_end));
}
if (this.push(buf))
{
this._read(n);
}
});
}
}
function once(callback)
{
let called = false;
return function()
{
if (!called)
{
called = true;
callback.apply(null, arguments);
}
};
}
module.exports = VitastorBackend;

View File

@ -899,130 +899,35 @@ class MongoClientInterface {
return cb(errors.InternalError); return cb(errors.InternalError);
}); });
} }
/** /**
* Puts an object into a MongoDB collection. * Put object when versioning is not enabled
* Depending on the parameters, the object is either directly put into the collection * @param {Object} c bucket collection
* or the existing object is marked as deleted and a new object is inserted. * @param {String} bucketName bucket name
* * @param {String} objName object name
* @param {Object} collection - The MongoDB collection to put the object into. * @param {Object} objVal object metadata
* @param {string} bucketName - The name of the bucket the object belongs to. * @param {Object} params params
* @param {string} objName - The name of the object. * @param {Object} log logger
* @param {Object} value - The value of the object. * @param {Function} cb callback
* @param {Object} params - Additional parameters. * @return {undefined}
* @param {string} params.vFormat - object key format.
* @param {boolean} params.needOplogUpdate - If true, the object is directly put into the collection
* with updating the operation log.
* @param {Object} log - The logger to use.
* @param {Function} cb - The callback function to call when the operation is complete. It is called with an error
* if there is an issue with the operation.
* @returns {Promise} A promise that resolves when the operation is complete. The promise is rejected with an error
* if there is an issue with the operation.
*/ */
putObjectNoVer(collection, bucketName, objName, value, params, log, cb) { putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) {
if (params?.needOplogUpdate) { const masterKey = formatMasterKey(objName, params.vFormat);
return this.putObjectNoVerWithOplogUpdate(collection, bucketName, objName, value, params, log, cb); c.updateOne({
} _id: masterKey,
const key = formatMasterKey(objName, params.vFormat); }, {
const putFilter = { _id: key };
return collection.updateOne(putFilter, {
$set: { $set: {
_id: key, _id: masterKey,
value, value: objVal,
}, },
}, { }, {
upsert: true, upsert: true,
}).then(() => cb()).catch(err => { }).then(() => cb()).catch((err) => {
log.error('putObjectNoVer: error putting obect with no versioning', { error: err.message }); log.error('putObjectNoVer: error putting obect with no versioning', { error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
}); });
} }
/**
* Updates an object in a MongoDB collection without changing its version.
* If the object doesn't exist, it will be created (upsert is true for the second update operation).
* The operation is logged in the oplog.
*
* @param {Object} collection - The MongoDB collection to update the object in.
* @param {string} bucketName - The name of the bucket the object belongs to.
* @param {string} objName - The name of the object.
* @param {Object} value - The new value of the object.
* @param {Object} params - Additional parameters.
* @param {string} params.vFormat - object key format
* @param {string} params.originOp - origin operation
* @param {Object} log - The logger to use.
* @param {Function} cb - The callback function to call when the operation is complete.
* It is called with an error if there is an issue with the operation.
* @returns {void}
*/
putObjectNoVerWithOplogUpdate(collection, bucketName, objName, value, params, log, cb) {
const key = formatMasterKey(objName, params.vFormat);
const putFilter = { _id: key };
// filter used when finding and updating object
const findFilter = {
...putFilter,
$or: [
{ 'value.deleted': { $exists: false } },
{ 'value.deleted': { $eq: false } },
],
};
const updateDeleteFilter = {
...putFilter,
'value.deleted': true,
};
return async.waterfall([
// Adding delete flag when getting the object
// to avoid having race conditions.
next => collection.findOneAndUpdate(findFilter, {
$set: updateDeleteFilter,
}, {
upsert: false,
}).then(doc => {
if (!doc.value) {
log.error('internalPutObject: unable to find target object to update',
{ bucket: bucketName, object: key });
return next(errors.NoSuchKey);
}
const obj = doc.value;
const objMetadata = new ObjectMD(obj.value);
objMetadata.setOriginOp(params.originOp);
objMetadata.setDeleted(true);
return next(null, objMetadata.getValue());
}).catch(err => {
log.error('internalPutObject: error getting object',
{ bucket: bucketName, object: key, error: err.message });
return next(errors.InternalError);
}),
// We update the full object to get the whole object metadata
// in the oplog update event
(objMetadata, next) => collection.bulkWrite([
{
updateOne: {
filter: updateDeleteFilter,
update: {
$set: { _id: key, value: objMetadata },
},
upsert: false,
},
},
{
updateOne: {
filter: putFilter,
update: {
$set: { _id: key, value },
},
upsert: true,
},
},
], { ordered: true }).then(() => next(null)).catch(next),
], (err) => {
if (err) {
log.error('internalPutObject: error updating object',
{ bucket: bucketName, object: key, error: err.message });
return cb(errors.InternalError);
}
return cb();
});
}
/** /**
* Returns the putObjectVerCase function to use * Returns the putObjectVerCase function to use
* depending on params * depending on params
@ -1068,7 +973,8 @@ class MongoClientInterface {
return putObjectVer(c, bucketName, objName, objVal, _params, log, return putObjectVer(c, bucketName, objName, objVal, _params, log,
cb); cb);
} }
return this.putObjectNoVer(c, bucketName, objName, objVal, _params, log, cb); return this.putObjectNoVer(c, bucketName, objName, objVal,
_params, log, cb);
}); });
} }

View File

@ -10,21 +10,21 @@ function trySetDirSyncFlag(path) {
const GETFLAGS = 2148034049; const GETFLAGS = 2148034049;
const SETFLAGS = 1074292226; const SETFLAGS = 1074292226;
const FS_DIRSYNC_FL = 65536n; const FS_DIRSYNC_FL = 65536;
const buffer = Buffer.alloc(8, 0); const buffer = Buffer.alloc(8, 0);
const pathFD = fs.openSync(path, 'r'); const pathFD = fs.openSync(path, 'r');
const status = ioctl(pathFD, GETFLAGS, buffer); const status = ioctl(pathFD, GETFLAGS, buffer);
assert.strictEqual(status, 0); assert.strictEqual(status, 0);
const currentFlags = buffer.readBigInt64LE(0); const currentFlags = buffer.readUIntLE(0, 8);
const flags = currentFlags | FS_DIRSYNC_FL; const flags = currentFlags | FS_DIRSYNC_FL;
buffer.writeBigInt64LE(flags, 0); buffer.writeUIntLE(flags, 0, 8);
const status2 = ioctl(pathFD, SETFLAGS, buffer); const status2 = ioctl(pathFD, SETFLAGS, buffer);
assert.strictEqual(status2, 0); assert.strictEqual(status2, 0);
fs.closeSync(pathFD); fs.closeSync(pathFD);
const pathFD2 = fs.openSync(path, 'r'); const pathFD2 = fs.openSync(path, 'r');
const confirmBuffer = Buffer.alloc(8, 0); const confirmBuffer = Buffer.alloc(8, 0);
ioctl(pathFD2, GETFLAGS, confirmBuffer); ioctl(pathFD2, GETFLAGS, confirmBuffer);
assert.strictEqual(confirmBuffer.readBigInt64LE(0), assert.strictEqual(confirmBuffer.readUIntLE(0, 8),
currentFlags | FS_DIRSYNC_FL, 'FS_DIRSYNC_FL not set'); currentFlags | FS_DIRSYNC_FL, 'FS_DIRSYNC_FL not set');
fs.closeSync(pathFD2); fs.closeSync(pathFD2);
} }

View File

@ -3,7 +3,7 @@
"engines": { "engines": {
"node": ">=16" "node": ">=16"
}, },
"version": "8.1.134", "version": "8.1.131",
"description": "Common utilities for the S3 project components", "description": "Common utilities for the S3 project components",
"main": "build/index.js", "main": "build/index.js",
"repository": { "repository": {
@ -20,37 +20,39 @@
"@azure/identity": "^3.1.1", "@azure/identity": "^3.1.1",
"@azure/storage-blob": "^12.12.0", "@azure/storage-blob": "^12.12.0",
"@js-sdsl/ordered-set": "^4.4.2", "@js-sdsl/ordered-set": "^4.4.2",
"@swc/cli": "^0.4.0", "@types/async": "^3.2.12",
"@swc/core": "^1.7.4", "@types/utf8": "^3.0.1",
"JSONStream": "^1.0.0",
"agentkeepalive": "^4.1.3", "agentkeepalive": "^4.1.3",
"ajv": "^6.12.3", "ajv": "6.12.3",
"async": "^2.6.4", "async": "~2.6.4",
"aws-sdk": "^2.1005.0", "aws-sdk": "^2.1005.0",
"backo": "^1.1.0", "backo": "^1.1.0",
"base-x": "^3.0.8", "base-x": "3.0.8",
"base62": "^2.0.1", "base62": "2.0.1",
"bson": "^4.0.0", "bson": "4.0.0",
"debug": "^4.1.0", "debug": "~4.1.0",
"diskusage": "^1.1.1", "diskusage": "^1.1.1",
"fcntl": "git+https://git.yourcmc.ru/vitalif/zenko-fcntl.git", "fcntl": "github:scality/node-fcntl#0.2.2",
"httpagent": "git+https://git.yourcmc.ru/vitalif/zenko-httpagent.git#development/1.0", "hdclient": "scality/hdclient#1.1.7",
"httpagent": "scality/httpagent#1.0.6",
"https-proxy-agent": "^2.2.0", "https-proxy-agent": "^2.2.0",
"ioredis": "^4.28.5", "ioredis": "^4.28.5",
"ipaddr.js": "^1.9.1", "ipaddr.js": "1.9.1",
"joi": "^17.6.0", "joi": "^17.6.0",
"JSONStream": "^1.0.0", "level": "~5.0.1",
"level": "^5.0.1", "level-sublevel": "~6.6.5",
"level-sublevel": "^6.6.5",
"mongodb": "^5.2.0", "mongodb": "^5.2.0",
"node-forge": "^1.3.0", "node-forge": "^1.3.0",
"prom-client": "^14.2.0", "prom-client": "14.2.0",
"simple-glob": "^0.2.0", "simple-glob": "^0.2.0",
"socket.io": "^4.6.1", "socket.io": "~4.6.1",
"socket.io-client": "^4.6.1", "socket.io-client": "~4.6.1",
"utf8": "^3.0.0", "sproxydclient": "git+https://github.com/scality/sproxydclient#8.0.10",
"utf8": "3.0.0",
"uuid": "^3.0.1", "uuid": "^3.0.1",
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1", "werelogs": "scality/werelogs#8.1.4",
"xml2js": "^0.4.23" "xml2js": "~0.4.23"
}, },
"optionalDependencies": { "optionalDependencies": {
"ioctl": "^2.0.2" "ioctl": "^2.0.2"
@ -59,24 +61,22 @@
"@babel/preset-env": "^7.16.11", "@babel/preset-env": "^7.16.11",
"@babel/preset-typescript": "^7.16.7", "@babel/preset-typescript": "^7.16.7",
"@sinonjs/fake-timers": "^6.0.1", "@sinonjs/fake-timers": "^6.0.1",
"@types/async": "^3.2.12",
"@types/utf8": "^3.0.1",
"@types/ioredis": "^4.28.10", "@types/ioredis": "^4.28.10",
"@types/jest": "^27.4.1", "@types/jest": "^27.4.1",
"@types/node": "^18.19.41", "@types/node": "^17.0.21",
"@types/xml2js": "^0.4.11", "@types/xml2js": "^0.4.11",
"eslint": "^8.14.0", "eslint": "^8.14.0",
"eslint-config-airbnb-base": "^15.0.0", "eslint-config-airbnb": "6.2.0",
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git", "eslint-config-scality": "scality/Guidelines#ec33dfb",
"eslint-plugin-react": "^4.3.0", "eslint-plugin-react": "^4.3.0",
"jest": "^27.5.1", "jest": "^27.5.1",
"mongodb-memory-server": "^8.12.2", "mongodb-memory-server": "^8.12.2",
"nyc": "^15.1.0", "nyc": "^15.1.0",
"sinon": "^9.0.2", "sinon": "^9.0.2",
"temp": "^0.9.1", "temp": "0.9.1",
"ts-jest": "^27.1.3", "ts-jest": "^27.1.3",
"ts-node": "^10.6.0", "ts-node": "^10.6.0",
"typescript": "^4.9.5" "typescript": "^4.6.2"
}, },
"scripts": { "scripts": {
"lint": "eslint $(git ls-files '*.js')", "lint": "eslint $(git ls-files '*.js')",
@ -84,8 +84,7 @@
"lint_yml": "yamllint $(git ls-files '*.yml')", "lint_yml": "yamllint $(git ls-files '*.yml')",
"test": "jest tests/unit", "test": "jest tests/unit",
"build": "tsc", "build": "tsc",
"prepack": "tsc", "prepare": "yarn build",
"postinstall": "[ -d build ] || swc -d build --copy-files package.json index.ts lib",
"ft_test": "jest tests/functional --testTimeout=120000 --forceExit", "ft_test": "jest tests/functional --testTimeout=120000 --forceExit",
"coverage": "nyc --clean jest tests --coverage --testTimeout=120000 --forceExit", "coverage": "nyc --clean jest tests --coverage --testTimeout=120000 --forceExit",
"build_doc": "cd documentation/listingAlgos/pics; dot -Tsvg delimiterStateChart.dot > delimiterStateChart.svg; dot -Tsvg delimiterMasterV0StateChart.dot > delimiterMasterV0StateChart.svg; dot -Tsvg delimiterVersionsStateChart.dot > delimiterVersionsStateChart.svg" "build_doc": "cd documentation/listingAlgos/pics; dot -Tsvg delimiterStateChart.dot > delimiterStateChart.svg; dot -Tsvg delimiterMasterV0StateChart.dot > delimiterMasterV0StateChart.svg; dot -Tsvg delimiterVersionsStateChart.dot > delimiterVersionsStateChart.svg"

View File

@ -466,25 +466,6 @@ function getListingKey(key, vFormat) {
`${inc(DbPrefixes.Replay)}foo/bar${inc(VID_SEP)}`); `${inc(DbPrefixes.Replay)}foo/bar${inc(VID_SEP)}`);
}); });
}); });
it('should not crash if key contains "undefined" with no delimiter', () => {
const delimiter = new DelimiterMaster({}, fakeLogger, vFormat);
const listingKey = getListingKey('undefinedfoo', vFormat);
assert.strictEqual(
delimiter.filter({
key: listingKey,
value: '{}',
}),
FILTER_ACCEPT);
assert.deepStrictEqual(delimiter.result(), {
CommonPrefixes: [],
Contents: [{ key: 'undefinedfoo', value: '{}' }],
IsTruncated: false,
NextMarker: undefined,
Delimiter: undefined,
});
});
} }
}); });
}); });

View File

@ -1,6 +1,6 @@
{ {
"compilerOptions": { "compilerOptions": {
"target": "es2020", "target": "es6",
"module": "commonjs", "module": "commonjs",
"rootDir": "./", "rootDir": "./",
"resolveJsonModule": true, "resolveJsonModule": true,

7420
yarn.lock Normal file

File diff suppressed because it is too large Load Diff