26 Commits

Author SHA1 Message Date
ab7861b3b0 Fix WithImplicitCoercion type error with newer @types/node 2025-03-14 20:33:33 +03:00
8804658e0a Also check the shadow bucket during defrag 2025-03-14 20:13:13 +03:00
f7a9c2099f Store MPU part locations also in "location", not "partLocations" 2025-03-14 20:13:13 +03:00
3370c44161 Fix vitastor error messages 2025-03-11 00:58:20 +03:00
97f018ae77 Fix error in volume stats if it is already dropped 2025-03-10 21:00:37 +03:00
3fa89dcb3c Fix repeated write after end of the object 2025-03-10 01:16:58 +03:00
1e4b9a8390 Remove event handlers to fix the memory leak 2025-03-09 16:01:24 +03:00
9b5441e42b Use relative dependency URLs 2025-03-08 17:31:46 +03:00
81a5a3aca0 Filter volumes requiring defrag by want_defrag field 2025-03-08 17:12:37 +03:00
5c7cf85f64 Improve configuration and defaults 2025-03-07 01:20:21 +03:00
c192fe7914 Fix defrag bugs, improve error handling, write volume stats regularly 2025-03-04 01:55:42 +03:00
34be3b7030 "Lock" volumes during creation 2025-03-03 13:10:19 +03:00
ed7a78a83f Use a separate MongoDB connection for volume metadata 2025-03-01 16:27:26 +03:00
2cf6e91e76 Plug metadata backend into parseLC wrapper 2025-03-01 13:59:00 +03:00
1904dd3bf0 Use level 9.x 2025-03-01 01:49:53 +03:00
a7eaee760d Support storing volume metadata in MongoDB 2025-02-18 17:24:29 +03:00
be5ed75b8b Purge volume data after defrag 2025-02-16 16:22:24 +03:00
646f4f1bde Add delete() to VitastorMock, implement forgotten CAS write & delete 2025-02-16 16:22:24 +03:00
9057e6a507 Add a basic defrag test 2025-02-16 16:22:24 +03:00
4a2399e588 Document configuration, close unused volumes, test delete statistics 2025-02-16 16:22:24 +03:00
9bd6dec539 Implement VitastorKV mock 2025-02-16 16:22:24 +03:00
7893871a31 Implement basic Vitastor defragmentation 2025-02-16 16:22:24 +03:00
8f630a1d62 Support replaceDataLocations 2025-02-16 16:22:24 +03:00
552d51e539 Force ipv4 in mongo tests 2025-02-12 02:27:06 +03:00
302a7c7465 Use newer uuid/v4 (same as in the main zenko-cloudserver repo) 2025-02-12 02:27:06 +03:00
3fef5da382 NodeJS.Timer breaks build 2025-02-12 02:27:06 +03:00
14 changed files with 1348 additions and 356 deletions

View File

@@ -1,3 +1,5 @@
import type { WithImplicitCoercion } from 'node:buffer';
const msInOneDay = 24 * 60 * 60 * 1000; // Milliseconds in a day.
export const getMD5Buffer = (base64MD5: WithImplicitCoercion<string> | Uint8Array) =>

View File

@@ -129,8 +129,9 @@ export function validateAndFilterMpuParts(
key: item.key,
ETag: `"${item.value.ETag}"`,
size: item.value.Size,
locations: Array.isArray(item.value.partLocations) ?
item.value.partLocations : [item.value.partLocations],
locations: (item.value.location||item.value.partLocations) instanceof Array
? (item.value.location||item.value.partLocations)
: [(item.value.location||item.value.partLocations)],
});
});
keysToDelete.push(mpuOverviewKey);

View File

@@ -14,7 +14,7 @@ const GcpClient = require('./external/GcpClient');
const PfsClient = require('./external/PfsClient');
const backendUtils = require('./external/utils');
function parseLC(config, vault) {
function parseLC(config, vault, metadata) {
const clients = {};
Object.keys(config.locationConstraints).forEach(location => {
@@ -27,7 +27,7 @@ function parseLC(config, vault) {
}
if (locationObj.type === 'vitastor') {
const VitastorBackend = require('./vitastor/VitastorBackend');
clients[location] = new VitastorBackend(location, locationObj.details);
clients[location] = new VitastorBackend(location, locationObj.details, metadata);
}
if (locationObj.type === 'scality') {
if (locationObj.details.connector.sproxyd) {

View File

@@ -0,0 +1,89 @@
// Zenko CloudServer Vitastor data storage backend adapter
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const assert = require('assert');
const { MongoMemoryReplSet } = require('mongodb-memory-server');
import { MongoClient, Db } from 'mongodb';
import { MongoKVWrapper } from './KVWrapper';
const mongoserver = new MongoMemoryReplSet({
debug: false,
instanceOpts: [
{ port: 27021 },
],
replSet: {
name: 'customSetName',
count: 1,
dbName: 'mongokvtest',
storageEngine: 'ephemeralForTest',
},
});
let kv: MongoKVWrapper|null = null;
beforeAll(async () =>
{
await mongoserver.start();
await mongoserver.waitUntilRunning();
kv = new MongoKVWrapper('mongodb://127.0.0.1:27021/?w=majority&readPreference=primary&replicaSet=customSetName',
'mongokvtest', 'volumes');
await kv.open();
});
afterAll(async () =>
{
if (kv)
await kv.close();
await mongoserver.stop();
});
test('MongoKVWrapper', async () =>
{
if (!kv)
throw new Error('not connected');
// insert a number and check that the type is preserved
assert.strictEqual(await kv.get('id'), null);
assert.strictEqual(await kv.update('id', null, 123), true);
assert.strictEqual(await kv.get('id'), 123);
// vol_1_1 not exists
assert.strictEqual(await kv.get('vol_1_1'), null);
const hdr = { created: Date.now(), bucket: 'helloworld' };
// update non-existing key - should fail
assert.strictEqual(await kv.update('vol_1_1', {}, hdr), false);
// key is not affected by a failed operation
assert.strictEqual(await kv.get('vol_1_1'), null);
// create key
assert.strictEqual(await kv.update('vol_1_1', null, hdr), true);
assert.deepStrictEqual(await kv.get('vol_1_1'), hdr);
// try to create a duplicate key - should fail
assert.strictEqual(await kv.update('vol_1_1', null, hdr), false);
// key is not affected by a failed operation
assert.deepStrictEqual(await kv.get('vol_1_1'), hdr);
// update key
const hdr2 = { created: Date.now(), bucket: 'helloworld', deleted: Date.now() };
assert.strictEqual(await kv.update('vol_1_1', hdr, hdr2), true);
assert.deepStrictEqual(await kv.get('vol_1_1'), hdr2);
// try to update again from the same old value - should fail
assert.strictEqual(await kv.update('vol_1_1', hdr, hdr2), false);
// key is not affected by a failed operation
assert.deepStrictEqual(await kv.get('vol_1_1'), hdr2);
// create 2 more keys
assert.strictEqual(await kv.update('vol_1_3', null, { abc: 'def' }), true);
assert.strictEqual(await kv.update('vol_1_2', null, { def: 'xyz' }), true);
// check listing
let lst: any[] = [];
for await (const item of kv.list('vol_'))
lst.push(item);
assert.deepStrictEqual(lst, [ [ 'vol_1_1', hdr2 ], [ 'vol_1_2', { def: 'xyz' } ], [ 'vol_1_3', { 'abc': 'def' } ] ]);
lst = [];
for await (const item of kv.list('vol_', { def: 'xyz' }))
lst.push(item);
assert.deepStrictEqual(lst, [ [ 'vol_1_2', { def: 'xyz' } ] ]);
// delete key
assert.strictEqual(await kv.update('vol_1_1', hdr2, null), true);
assert.deepStrictEqual(await kv.get('vol_1_1'), null);
});

View File

@@ -0,0 +1,262 @@
import { MongoClient, Db, Collection, MongoServerError } from 'mongodb';
interface MongoKV {
_id: string;
value: any;
}
export class MongoKVWrapper
{
client: MongoClient | null = null;
db: Db | null = null;
db_name: string;
url: string;
collection_name: string;
collection: Collection<MongoKV> | null = null;
options: any;
opened: boolean = false;
on_open: ((...args: any[]) => void)[] | null = null;
open_error: any;
constructor(url: string, db_name: string, collection_name: string, options?: any)
{
this.url = url;
this.db_name = db_name;
this.collection_name = collection_name;
this.options = options;
}
async open()
{
if (!this.collection)
{
if (this.on_open)
{
await new Promise(ok => this.on_open!.push(ok));
}
else
{
this.on_open = [];
try
{
this.client = await MongoClient.connect(this.url, this.options);
this.db = this.client.db(this.db_name, { ignoreUndefined: true });
await this.db.createCollection(this.collection_name);
this.collection = this.db.collection<MongoKV>(this.collection_name);
}
catch (e)
{
if (this.client)
{
this.client.close().catch(console.error);
this.client = null;
}
this.db = null;
this.open_error = e;
}
this.opened = true;
this.on_open.map(cb => setImmediate(cb));
this.on_open = null;
}
}
if (this.open_error)
{
throw this.open_error;
}
}
async close()
{
if (this.collection)
{
this.collection = null;
}
if (this.client)
{
await this.client!.close();
this.client = null;
}
}
async get(key: string)
{
const doc = await this.collection!.findOne({ _id: key });
return doc ? doc.value : null;
}
async update(key: string, old_value: any, value: any): Promise<boolean>
{
if (old_value === null)
{
try
{
const res = await this.collection!.insertOne({ _id: key, value: value });
if (res.insertedId !== key)
throw new Error('mongodb: insertOne insertedId='+res.insertedId+', but should be '+key);
}
catch (e)
{
if ((e instanceof MongoServerError) && e.code == 11000)
return false;
throw e;
}
return true;
}
else if (value !== null)
{
const doc = await this.collection!.findOneAndUpdate(
{ _id: key, value: old_value },
{ '$set': { value: value } },
);
return !!doc.value;
}
else
{
const res = await this.collection!.deleteOne({ _id: key, value: old_value });
return res.deletedCount > 0;
}
}
async* list(start_key: string, filter?: object)
{
const mongo_filter = { _id: { '$gte': start_key } };
if (filter)
{
for (const key in filter)
{
mongo_filter["value."+key] = { '$eq': filter[key] };
}
}
for await (const item of this.collection!.find(mongo_filter))
{
yield [ item._id, item.value ];
}
}
}
export class VitastorKVWrapper
{
config: any;
kv: any;
cli: any;
vitastor: any;
opened: boolean = false;
on_open: ((...args: any[]) => void)[] | null = null;
open_error: any;
constructor(config, cli, vitastor)
{
this.config = config;
this.cli = cli;
this.vitastor = vitastor;
this.kv = new vitastor.KV(this.cli);
}
async open()
{
if (!this.opened)
{
if (this.on_open)
{
await new Promise(ok => this.on_open!.push(ok));
}
else
{
this.on_open = [];
try
{
if (this.config.metadata_image)
{
const img = new this.vitastor.Image(this.cli, this.config.metadata_image);
const info = await new Promise<{ pool_id: number, inode_num: number }>(ok => img.get_info(ok));
if (!info.inode_num)
throw new Error('vitastorkv metadata image '+this.config.metadata_image+' does not exist');
this.config.metadata_pool_id = info.pool_id;
this.config.metadata_inode_num = info.inode_num;
}
await new Promise<void>((ok, no) => this.kv.open(
this.config.metadata_pool_id, this.config.metadata_inode_num,
this.config.vitastor || {}, err => (err ? no(new Error(err)) : ok())
));
}
catch (e)
{
this.open_error = e;
}
this.opened = true;
this.on_open.map(cb => setImmediate(cb));
this.on_open = null;
}
}
if (this.open_error)
{
throw this.open_error;
}
}
async close()
{
if (this.opened && !this.open_error)
{
await new Promise<void>((ok, no) => this.kv.close(err => (err ? no(err) : ok())));
this.opened = false;
}
}
async get(key: string)
{
const [ err, prev ] = await new Promise<[ any, string ]>(ok => this.kv.get(key, (err, value) => ok([ err, value ])));
if (err == this.vitastor.ENOENT)
return null;
else if (err)
throw new Error('vitastorkv get: error '+err);
return JSON.parse(prev);
}
async update(key: string, old_value: any, value: any): Promise<boolean>
{
const cas = (old_value !== null ? (cas_old => cas_old === JSON.stringify(old_value)) : (cas_old => !cas_old));
const err = await new Promise(ok => (value !== null
? this.kv.set(key, JSON.stringify(value), ok, cas)
: this.kv.del(key, ok, cas)));
if (err === this.vitastor.EINTR)
return false;
else if (err)
throw new Error((value !== null ? 'vitastorkv set: error ' : 'vitastorkv del: error ')+err);
return true;
}
async* list(start_key: string, filter?: object)
{
const lst = this.kv.list(start_key);
try
{
next_key: while (true)
{
const [ err, key, value ] = await new Promise<[ number, string, string ]>(ok => lst.next((e, k, v) => ok([ e, k, v ])));
if (err)
{
if (err != this.vitastor.ENOENT)
throw new Error('Error listing: '+err);
break;
}
const decoded = JSON.parse(value);
if (filter)
{
for (const k in filter)
{
if (decoded[k] != filter[k])
{
continue next_key;
}
}
}
yield [ key, decoded ];
}
}
finally
{
lst.close();
}
}
}

View File

@@ -1,3 +1,7 @@
// Zenko CloudServer Vitastor data storage backend adapter
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const assert = require('assert');
const crypto = require('crypto');
@@ -8,19 +12,31 @@ const VitastorMock = require('./VitastorMock');
test('basic read and write', async () =>
{
const origNow = Date.now;
const startTs = Date.now();
const metastub = {};
const backend = new VitastorBackend('standard', {
pool_id: 1,
metadata_pool_id: 2,
metadata_inode_num: 1,
read_chunk_size: 128*1024,
write_chunk_size: 128*1024,
}, null, VitastorMock);
open_volume_limit: 0,
open_volume_max_unused_sec: 30,
open_volume_max_garbage: 0.25,
closed_volume_max_garbage: 0.25,
volume_lock_timeout_sec: 10,
volume_lock_interval_sec: 5,
min_volume_size: 128*1024,
defrag_interval_sec: 600,
defrag_cleanup_age_sec: 20,
}, metastub, VitastorMock);
// test put
const size = Math.floor(128*1024+Math.random()*128*1024);
const putData1 = crypto.randomBytes(size);
const getInfo1 = await new Promise((ok, no) => backend.put(new MockReadStream(putData1), size,
{ bucketName: 'test', objectKey: 'abcd/efg', partNumber: 2 },
{ bucketName: 'testBucket', objectKey: 'abcd/efg', partNumber: 2 },
'test:1', (err, res) => err ? no(err) : ok(res)));
console.log('object written:', getInfo1);
@@ -42,7 +58,7 @@ test('basic read and write', async () =>
const volData1 = backend.cli.inodes[1][1].data;
console.log('object deleted OK:',
volData1.slice(getInfo1.offset, getInfo1.offset+16).toString('hex'),
volData1.slice(getInfo1.offset+16, getInfo1.offset+16+getInfo1.hdrlen).toString(),
volData1.slice(getInfo1.offset+16, getInfo1.offset+getInfo1.hdrlen).toString(),
);
if (volData1.slice(getInfo1.offset, getInfo1.offset+8).toString() != VitastorBackend.OBJECT_MAGIC)
throw new Error('invalid header magic');
@@ -51,8 +67,48 @@ test('basic read and write', async () =>
if (volData1.readUInt32LE(getInfo1.offset+12) != (getInfo1.hdrlen-16))
throw new Error('invalid header json length');
// test deletion statistics
clearTimeout(backend.volume_stats_timer_id);
await backend._writeVolumeStats();
const volHeader = JSON.parse(backend.kv.kv.data['vol_1_1']);
assert(volHeader.removed_objects == 1);
assert(volHeader.removed_bytes == size);
console.log('deletion statistics written ok');
// one more put
const size2 = Math.floor(128*1024+Math.random()*128*1024);
const putData2 = crypto.randomBytes(size);
const getInfo2 = await new Promise((ok, no) => backend.put(new MockReadStream(putData2), size,
{ bucketName: 'testBucket', objectKey: 'hello' },
'test:4', (err, res) => err ? no(err) : ok(res)));
console.log('object written:', getInfo2);
// test closing unused volumes
Date.now = () => startTs+45000;
clearTimeout(backend.bump_timer_id);
await backend._bumpVolumes();
assert.deepEqual(backend.volumes, { testBucket: {} });
assert.deepEqual(backend.volumes_by_id, { '1': {} });
console.log('unused volume closed');
// test defrag
metastub.replaceDataLocations = (bucket, location, replacements, cb) => cb(null, 1);
await backend._autoDefrag();
console.log(backend.kv.kv.data);
assert.equal(JSON.parse(backend.kv.kv.data['vol_1_1']).defrag_ts, Date.now());
assert.equal(Object.keys(backend.volumes['testBucket']).length, 1);
console.log('basic defrag ok');
// test purging volume data
Date.now = () => startTs+70000;
await backend._autoDefrag();
assert(backend.cli.inodes[1][1].data.length == 0);
assert(backend.kv.kv.data['vol_1_1'] === undefined);
console.log('volume data purged ok');
// stop timers and so on
backend.destroy();
Date.now = origNow;
});
function readAll(readable)

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,10 @@
// Zenko CloudServer Vitastor data storage backend adapter
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const ENOENT = -2;
const EINTR = -4;
const EINVAL = -22;
const IMMEDIATE_NONE = 0;
const IMMEDIATE_SMALL = 1;
const IMMEDIATE_ALL = 2;
@@ -13,6 +18,8 @@ class Client
on_read: null | ((pool_id: number, inode_id: number, offset: number, length: number, callback: (number) => void) => void) = null;
on_write: null | ((pool_id: number, inode_id: number, offset: number,
data: Buffer|Buffer[], cas: { version: number|bigint }|null, callback: (number) => void) => void) = null;
on_delete: null | ((pool_id: number, inode_id: number, offset: number,
length: number, cas: { version: number|bigint }|null, callback: (number) => void) => void) = null;
on_sync: null | ((callback: (number) => void) => void) = null;
get_min_io_size(pool_id: number)
@@ -35,7 +42,7 @@ class Client
{
if (!this.on_read)
{
setImmediate(() => this._read(pool_id, inode_id, offset, length, callback));
setImmediate(() => this._read(pool_id, inode_id, Number(offset), Number(length), callback));
}
else
{
@@ -73,31 +80,44 @@ class Client
}
write(pool_id: number, inode_id: number, offset: number|bigint, data: Buffer|Buffer[],
cas: { version: number|bigint } | ((err: number|null) => void),
options: { version: number|bigint } | ((err: number|null) => void) | null,
callback?: (err: number|null) => void)
{
const cas = options instanceof Function ? null : options;
callback = options instanceof Function ? options : callback;
if (!callback)
{
throw new Error('callback is required');
}
if (!this.on_write)
{
setImmediate(() => this._write(pool_id, inode_id, offset, data, cas, callback));
setImmediate(() => this._write(pool_id, inode_id, offset, data, cas, callback!));
}
else
{
this.on_write(pool_id, inode_id, Number(offset), data, cas instanceof Function ? null : cas, (err) =>
this.on_write(pool_id, inode_id, Number(offset), data, cas, (err) =>
{
if (err != 0)
callback!(err);
else
this._write(pool_id, inode_id, offset, data, cas, callback);
this._write(pool_id, inode_id, offset, data, cas, callback!);
});
}
}
_write(pool_id: number, inode_id: number, offset: number|bigint, data: Buffer|Buffer[],
cas: { version: number|bigint } | ((err: number|null) => void),
callback?: (err: number|null) => void)
cas: { version: number|bigint } | null,
callback: (err: number|null) => void)
{
const adata: Buffer[] = (data instanceof Buffer ? [ data ] : data);
const length = adata.reduce((a, c) => a + c.length, 0);
const first_block = Math.floor(Number(offset) / this.max_atomic_write_size);
const last_block = Math.floor((Number(offset) + length - 1) / this.max_atomic_write_size);
if (cas && cas.version && first_block != last_block)
{
callback!(EINVAL);
return;
}
if (!this.inodes[pool_id])
{
this.inodes[pool_id] = {};
@@ -110,8 +130,11 @@ class Client
};
}
const ino = this.inodes[pool_id][inode_id];
const first_block = Math.floor(Number(offset) / this.max_atomic_write_size);
const last_block = Math.floor((Number(offset) + length - 1) / this.max_atomic_write_size);
if (cas && cas.version && BigInt(cas.version)-1n != (ino.versions[first_block] || 0n))
{
callback!(EINTR);
return;
}
for (let i = first_block; i <= last_block; i++)
{
ino.versions[i] = (ino.versions[i] || 0n) + 1n;
@@ -128,9 +151,74 @@ class Client
buf.copy(ino.data, coff);
coff += buf.length;
}
if (cas instanceof Function)
callback!(0);
}
delete(pool_id: number, inode_id: number, offset: number|bigint, length: number|bigint,
options: { version: number|bigint } | ((err: number|null) => void) | null,
callback?: (err: number|null) => void)
{
const cas = options instanceof Function ? null : options;
callback = options instanceof Function ? options : callback;
if (!callback)
{
callback = cas;
throw new Error('callback is required');
}
if (!this.on_delete)
{
setImmediate(() => this._delete(pool_id, inode_id, offset, length, cas, callback!));
}
else
{
this.on_delete(pool_id, inode_id, Number(offset), Number(length), cas, (err) =>
{
if (err != 0)
callback!(err);
else
this._delete(pool_id, inode_id, offset, length, cas, callback!);
});
}
}
_delete(pool_id: number, inode_id: number, offset: number|bigint, length: number|bigint,
cas: { version: number|bigint } | null,
callback: (err: number|null) => void)
{
const first_block = Math.floor(Number(offset) / this.max_atomic_write_size);
const last_block = Math.floor((Number(offset) + Number(length) - 1) / this.max_atomic_write_size);
if (cas && cas.version && first_block != last_block)
{
callback!(EINVAL);
return;
}
if (!this.inodes[pool_id])
{
this.inodes[pool_id] = {};
}
if (!this.inodes[pool_id][inode_id])
{
this.inodes[pool_id][inode_id] = {
data: Buffer.alloc(this.max_atomic_write_size < Number(length) ? Number(length) : this.max_atomic_write_size),
versions: [],
};
}
const ino = this.inodes[pool_id][inode_id];
if (cas && cas.version && BigInt(cas.version)-1n != (ino.versions[first_block] || 0n))
{
callback!(EINTR);
return;
}
for (let i = first_block; i <= last_block; i++)
{
delete ino.versions[i];
}
if (ino.data.length <= (last_block+1)*this.max_atomic_write_size)
{
ino.data = ino.data.slice(0, first_block*this.max_atomic_write_size);
}
else
{
ino.data.fill(0, first_block*this.max_atomic_write_size, (last_block+1)*this.max_atomic_write_size);
}
callback!(0);
}
@@ -162,6 +250,7 @@ class Client
class KV
{
data: { [key: string]: string } = {};
keys: string[] | null = null;
size: number = 0;
open(pool_id: number, inode_id: number, params: { [key: string]: string }, callback: (err: number|null) => void)
@@ -202,6 +291,10 @@ class KV
}
}
this.data[key] = value;
if (!(key in this.data))
{
this.keys = null;
}
setImmediate(() => callback(null));
}
@@ -217,12 +310,80 @@ class KV
}
}
delete this.data[key];
this.keys = null;
setImmediate(() => callback(null));
}
list(start_key: string)
{
throw new Error('not implemented');
return new KVListing(this, start_key);
}
}
class KVListing
{
kv: KV;
next_key: string;
ge: boolean = true;
keys: string[] | null = null;
pos: number = 0;
constructor(kv: KV, start_key: string)
{
this.kv = kv;
this.next_key = start_key;
}
next(callback: (err: number|null, key: string|null, value: string|null) => void)
{
if (this.pos < 0)
{
setImmediate(() => callback(ENOENT, null, null));
return;
}
if (!this.keys || this.kv.keys != this.keys)
{
if (!this.kv.keys)
this.kv.keys = Object.keys(this.kv.data).sort();
this.keys = this.kv.keys;
this.pos = 0;
if (this.next_key != '')
{
let start = 0, end = this.keys.length;
while (end > start+1)
{
const mid = 0|((start+end)/2);
if (this.next_key < this.keys[mid])
start = mid;
else
end = mid;
}
if (!this.ge)
{
while (start < this.keys.length && this.next_key == this.keys[start])
start++;
}
this.pos = start;
}
}
if (this.pos < this.keys.length)
{
const key = this.keys[this.pos];
const value = this.kv.data[key];
this.pos++;
this.next_key = key;
this.ge = false;
setImmediate(() => callback(null, key, value));
}
else
{
this.pos = -1;
setImmediate(() => callback(ENOENT, null, null));
}
}
close()
{
}
}
@@ -231,6 +392,7 @@ module.exports = {
KV,
ENOENT,
EINTR,
EINVAL,
IMMEDIATE_NONE,
IMMEDIATE_SMALL,
IMMEDIATE_ALL,

View File

@@ -43,7 +43,7 @@ function _parseListEntries(entries) {
Initiated: tmp.initiated,
Initiator: tmp.initiator,
EventualStorageBucket: tmp.eventualStorageBucket,
partLocations: tmp.partLocations,
location: tmp.location||tmp.partLocations,
creationDate: tmp.creationDate,
ingestion: tmp.ingestion,
},
@@ -517,6 +517,13 @@ class MetadataWrapper {
});
}
replaceDataLocations(bucketName, dataStoreName, replacements, cb) {
if (typeof this.client.replaceDataLocations !== 'function') {
return cb(errors.NotImplemented);
}
return this.client.replaceDataLocations(bucketName, dataStoreName, replacements, cb);
}
/**
* updates(insert, if missing) an object that matches the given conditions
* @param{string} bucketName -

View File

@@ -3,8 +3,7 @@
const fs = require('fs');
const assert = require('assert');
const uuid = require('uuid');
const level = require('level');
const sublevel = require('level-sublevel');
const { Level } = require('level');
const debug = require('debug')('MetadataFileServer');
const diskusage = require('diskusage');
const werelogs = require('werelogs');
@@ -158,8 +157,8 @@ class MetadataFileServer {
// /metadata namespace
const namespace = `${constants.metadataFileNamespace}/metadata`;
this.logger.info(`creating metadata service at ${namespace}`);
this.baseDb = level(`${this.path}/${ROOT_DB}`);
this.rootDb = sublevel(this.baseDb);
this.baseDb = new Level(`${this.path}/${ROOT_DB}`);
this.rootDb = this.baseDb;
const dbService = new levelNet.LevelDbService({
rootDb: this.rootDb,
namespace,

View File

@@ -1,11 +1,20 @@
const errors = require('../../../errors').default;
const list = require('../../../algos/list/exportAlgos');
const genVID =
require('../../../versioning/VersionID').generateVersionId;
const { serializeCircJSON } = require('../../utils');
const getMultipartUploadListing = require('./getMultipartUploadListing');
const { metadata } = require('./metadata');
// const genVID = versioning.VersionID.generateVersionId;
const defaultMaxKeys = 1000;
let uidCounter = 0;
function generateVersionId(replicationGroupId) {
return genVID(uidCounter++, replicationGroupId);
}
function formatVersionKey(key, versionId) {
return `${key}\0${versionId}`;
@@ -84,12 +93,12 @@ const metastore = {
}
/*
valid combinations of versioning options:
- !params.versioning && !params.versionId: normal non-versioning put
- params.versioning && !params.versionId: create a new version with objVal.versionId
- params.versionId: update (PUT/DELETE) an existing version,
- !versioning && !versionId: normal non-versioning put
- versioning && !versionId: create a new version
- versionId: update (PUT/DELETE) an existing version,
and also update master version in case the put
version is newer or same version than master.
- params.versionId === '': update master version with objVal.versionId
if versionId === '' update master version
*/
if (params && params.versionId) {
@@ -104,17 +113,17 @@ const metastore = {
return cb(null, `{"versionId":"${objVal.versionId}"}`);
}
if (params && params.versioning) {
if (!objVal.versionId)
throw new Error('objVal.versionId is required')
const versionId = generateVersionId();
objVal.versionId = versionId; // eslint-disable-line
metadata.keyMaps.get(bucketName).set(objName, objVal);
// eslint-disable-next-line
objName = formatVersionKey(objName, objVal.versionId);
objName = formatVersionKey(objName, versionId);
metadata.keyMaps.get(bucketName).set(objName, objVal);
return cb(null, `{"versionId":"${objVal.versionId}"}`);
return cb(null, `{"versionId":"${versionId}"}`);
}
if (params && params.versionId === '') {
if (!objVal.versionId)
throw new Error('objVal.versionId is required')
const versionId = generateVersionId();
objVal.versionId = versionId; // eslint-disable-line
metadata.keyMaps.get(bucketName).set(objName, objVal);
return cb(null, `{"versionId":"${objVal.versionId}"}`);
}

View File

@@ -24,7 +24,7 @@ import { MongoClient, Long, Db, MongoClientOptions, ReadPreferenceMode, WithId,
import { v4 as uuidv4 } from 'uuid';
import diskusage from 'diskusage';
import { generateVersionId } from '../../../versioning/VersionID';
import { generateVersionId as genVID } from '../../../versioning/VersionID';
import * as listAlgos from '../../../algos/list/exportAlgos';
import LRUCache from '../../../algos/cache/LRUCache';
@@ -55,9 +55,17 @@ const CONCURRENT_CURSORS = 10;
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
let uidCounter = 0;
const BUCKET_VERSIONS = VersioningConstants.BucketVersioningKeyFormat;
const DB_PREFIXES = VersioningConstants.DbPrefixes;
function generateVersionId(replicationGroupId) {
// generate a unique number for each member of the nodejs cluster
return genVID(`${process.pid}.${uidCounter++}`,
replicationGroupId);
}
function inc(str) {
return str ? (str.slice(0, str.length - 1) +
String.fromCharCode(str.charCodeAt(str.length - 1) + 1)) : str;
@@ -124,7 +132,8 @@ export type InternalListObjectParams = {
mongifiedSearch?: object;
listingType?: string;
start?: undefined;
gt?: undefined
gt?: undefined;
withLocation?: undefined;
};
export interface InfostoreDocument extends Document {
@@ -399,46 +408,41 @@ class MongoClientInterface {
!bucketName.startsWith(constants.mpuBucketPrefix)) {
payload.$set.vFormat = this.defaultBucketKeyFormat;
} else {
payload.$set.vFormat = BUCKET_VERSIONS.v0;
payload.$set.vFormat = BUCKET_VERSIONS.v0;
}
// we don't have to test bucket existence here as it is done
// on the upper layers
m.updateOne({
_id: bucketName,
}, payload, {
upsert: true,
})
.then(() => {
// caching bucket vFormat
this.bucketVFormatCache.add(bucketName, payload.$set.vFormat);
// NOTE: We do not need to create a collection for
// "constants.usersBucket" and "PENSIEVE" since it has already
// been created
if (bucketName !== constants.usersBucket && bucketName !== PENSIEVE) {
return this.db!.createCollection(bucketName)
.then(() => {
if (this.shardCollections) {
const cmd = {
shardCollection: `${this.database}.${bucketName}`,
key: { _id: 1 },
};
return this.adminDb!.command(cmd, {}).then(() => cb(null)).catch(err => {
log.error(
'createBucket: enabling sharding',
{ error: err });
return cb(errors.InternalError);
});
}
return cb(null);
});
}
return cb(null);
})
.catch(err => {
log.error('createBucket: error creating bucket', { error: err.message });
return cb(errors.InternalError);
});
// we don't have to test bucket existence here as it is done on the upper layers
(async () => {
await m.updateOne({ _id: bucketName }, payload, { upsert: true });
// caching bucket vFormat
this.bucketVFormatCache.add(bucketName, payload.$set.vFormat);
// NOTE: We do not need to create a collection for
// "constants.usersBucket" and "PENSIEVE" since it has already
// been created
if (bucketName !== constants.usersBucket && bucketName !== PENSIEVE) {
await this.db!.createCollection(bucketName);
}
// Index for replaceDataLocations (Vitastor defrag)
await this.getCollection(bucketName).createIndex({
'value.location.dataStoreName': 1,
'value.location.key': 1,
}, { name: 'contentLocationIdx' });
if (this.shardCollections) {
const cmd = {
shardCollection: `${this.database}.${bucketName}`,
key: { _id: 1 },
};
await this.adminDb!.command(cmd, {});
}
cb(null);
})().catch(err => {
log.error('createBucket: error creating bucket', { error: err.message });
cb(errors.InternalError);
});
}
/**
@@ -765,7 +769,7 @@ class MongoClientInterface {
* @param {Object} c bucket collection
* @param {String} bucketName bucket name
* @param {String} objName object name
* @param {Object} objVal object metadata (must have versionId)
* @param {Object} objVal object metadata
* @param {Object} params params
* @param {String} params.vFormat object key format
* @param {Object} log logger
@@ -777,14 +781,16 @@ class MongoClientInterface {
c: Collection<ObjectMetastoreDocument>,
bucketName: string,
objName: string,
objVal: ObjectMDData & { versionId: string },
objVal: ObjectMDData,
params: ObjectMDOperationParams,
log: werelogs.Logger,
cb: ArsenalCallback<string>,
isRetry?: boolean,
) {
const versionId = generateVersionId(this.replicationGroupId);
// eslint-disable-next-line
const versionKey = formatVersionKey(objName, objVal.versionId, params.vFormat);
objVal.versionId = versionId;
const versionKey = formatVersionKey(objName, versionId, params.vFormat);
const masterKey = formatMasterKey(objName, params.vFormat);
// initiating array of operations with version creation
const ops: AnyBulkWriteOperation<ObjectMetastoreDocument>[] = [{
@@ -825,7 +831,7 @@ class MongoClientInterface {
c.bulkWrite(ops, {
ordered: true,
})
.then(() => cb(null, `{"versionId": "${objVal.versionId}"}`))
.then(() => cb(null, `{"versionId": "${versionId}"}`))
.catch((err) => {
/*
* Related to https://jira.mongodb.org/browse/SERVER-14322
@@ -863,7 +869,7 @@ class MongoClientInterface {
}
// Otherwise this error is expected, it means that two differents version was put at the
// same time
return cb(null, `{"versionId": "${objVal.versionId}"}`);
return cb(null, `{"versionId": "${versionId}"}`);
}
log.error('putObjectVerCase1: error putting object version', {
error: err.errmsg,
@@ -889,12 +895,14 @@ class MongoClientInterface {
c: Collection<ObjectMetastoreDocument>,
bucketName: string,
objName: string,
objVal: ObjectMDData & { versionId: string },
objVal: ObjectMDData,
params: ObjectMDOperationParams,
log: werelogs.Logger,
cb: ArsenalCallback<string>,
) {
const versionId = generateVersionId(this.replicationGroupId);
// eslint-disable-next-line
objVal.versionId = versionId;
const masterKey = formatMasterKey(objName, params.vFormat);
c.updateOne({ _id: masterKey },
{ $set: { value: objVal }, $setOnInsert: { _id: masterKey } },
@@ -1257,11 +1265,9 @@ class MongoClientInterface {
return cb(null);
});
}
/**
* Returns the putObjectVerCase function to use depending on params
* See also in_memory/metastore.js putObject
*
* Returns the putObjectVerCase function to use
* depending on params
* @param {Object} params params
* @return {Function} suitable putObjectVerCase function
*/
@@ -1316,6 +1322,36 @@ class MongoClientInterface {
});
}
replaceDataLocations(
bucketName: string,
dataStoreName: string,
replacements: { oldKey: any, newKey: any }[],
cb: ArsenalCallback<number>,
): void {
// Object versioning and completeMultipartUpload are non-transactional
// So in fact such replacement may be prone to inconsistency errors
// I.e. the old data reference may still be left in objects being updated in parallel
// So the called should recheck replaceDataLocations some time after completion
// The other way to do it would be recheck for object modifications in completeMPU and in putObjectCase**
const c = this.getCollection<ObjectMetastoreDocument>(bucketName);
// collection.findOneAndUpdate(
// { 'value.location.dataStoreName': storeName, 'value.location.key': key },
// { '$set': { 'value.location.$[element].key': newKey } },
// { arrayFilters: [ { 'element.dataStoreName': storeName, 'element.key': key } ] }
// )
c.bulkWrite(replacements.map(repl => ({
updateMany: {
filter: { 'value.location.dataStoreName': dataStoreName, 'value.location.key': repl.oldKey },
update: { '$set': { 'value.location.$[element].key': repl.newKey } },
arrayFilters: [ { 'element.dataStoreName': dataStoreName, 'element.key': repl.oldKey } ],
},
}))).then(res => {
cb(null, res.modifiedCount);
}).catch(err => {
cb(err);
});
}
/**
* gets versioned and non versioned object metadata
* @param {String} bucketName bucket name
@@ -1728,7 +1764,7 @@ class MongoClientInterface {
) {
const masterKey = formatMasterKey(objName, params.vFormat);
const versionKey = formatVersionKey(objName, params.versionId, params.vFormat);
const _vid = generateVersionId('tmp', this.replicationGroupId);
const _vid = generateVersionId(this.replicationGroupId);
async.series([
next => c.updateOne(
{
@@ -2120,7 +2156,7 @@ class MongoClientInterface {
});
if (!params.secondaryStreamParams) {
// listing masters only (DelimiterMaster)
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch);
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.withLocation);
baseStream = stream;
if (vFormat === BUCKET_VERSIONS.v1) {
/**
@@ -2181,8 +2217,8 @@ class MongoClientInterface {
}
} else {
// listing both master and version keys (delimiterVersion Algo)
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch);
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch);
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.withLocation);
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch, params.withLocation);
stream = new MergeStream(
versionStream, masterStream, extension.compareObjects.bind(extension));
}
@@ -2271,6 +2307,7 @@ class MongoClientInterface {
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
mongifiedSearch: params.mongifiedSearch,
withLocation: params.withLocation,
};
return this.internalListObject(bucketName, internalParams, extension,
vFormat, log, cb);
@@ -2307,6 +2344,7 @@ class MongoClientInterface {
const internalParams = {
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
withLocation: params.withLocation,
};
return this.internalListObject(bucketName, internalParams, extension, vFormat, log, cb);
@@ -2338,6 +2376,7 @@ class MongoClientInterface {
const internalParams = {
mainStreamParams: extensionParams,
mongifiedSearch: params.mongifiedSearch,
withLocation: params.withLocation,
};
return this.internalListObject(bucketName, internalParams, extension,
BUCKET_VERSIONS.v0, log, cb);

View File

@@ -2,7 +2,7 @@ const Readable = require('stream').Readable;
const MongoUtils = require('./utils');
class MongoReadStream extends Readable {
constructor(c, options, searchOptions) {
constructor(c, options, searchOptions, withLocation) {
super({
objectMode: true,
highWaterMark: 0,
@@ -85,7 +85,7 @@ class MongoReadStream extends Readable {
Object.assign(query, searchOptions);
}
const projection = { 'value.location': 0 };
const projection = withLocation ? undefined : { 'value.location': 0 };
this._cursor = c.find(query, { projection }).sort({
_id: options.reverse ? -1 : 1,
});

View File

@@ -32,15 +32,14 @@
"bson": "^4.0.0",
"debug": "^4.1.0",
"diskusage": "^1.1.1",
"fcntl": "git+https://git.yourcmc.ru/vitalif/zenko-fcntl.git",
"httpagent": "git+https://git.yourcmc.ru/vitalif/zenko-httpagent.git#development/1.0",
"fcntl": "../fcntl",
"httpagent": "../httpagent",
"https-proxy-agent": "^2.2.0",
"ioredis": "^4.28.5",
"ipaddr.js": "^1.9.1",
"joi": "^17.6.0",
"JSONStream": "^1.0.0",
"level": "^5.0.1",
"level-sublevel": "^6.6.5",
"level": "^9.0.0",
"mongodb": "^5.2.0",
"node-forge": "^1.3.0",
"prom-client": "^14.2.0",
@@ -49,7 +48,7 @@
"socket.io-client": "^4.6.1",
"utf8": "^3.0.0",
"uuid": "^8.3.2",
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1",
"werelogs": "../werelogs",
"xml2js": "^0.4.23"
},
"optionalDependencies": {
@@ -63,11 +62,11 @@
"@types/utf8": "^3.0.1",
"@types/ioredis": "^4.28.10",
"@types/jest": "^27.4.1",
"@types/node": "^18.19.41",
"@types/node": "^22.13.10",
"@types/xml2js": "^0.4.11",
"eslint": "^8.14.0",
"eslint-config-airbnb-base": "^15.0.0",
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git",
"eslint-config-scality": "../eslint-config-scality",
"eslint-plugin-react": "^4.3.0",
"jest": "^27.5.1",
"mongodb-memory-server": "^8.12.2",