Compare commits
26 Commits
move-versi
...
developmen
Author | SHA1 | Date | |
---|---|---|---|
ab7861b3b0 | |||
8804658e0a | |||
f7a9c2099f | |||
3370c44161 | |||
97f018ae77 | |||
3fa89dcb3c | |||
1e4b9a8390 | |||
9b5441e42b | |||
81a5a3aca0 | |||
5c7cf85f64 | |||
c192fe7914 | |||
34be3b7030 | |||
ed7a78a83f | |||
2cf6e91e76 | |||
1904dd3bf0 | |||
a7eaee760d | |||
be5ed75b8b | |||
646f4f1bde | |||
9057e6a507 | |||
4a2399e588 | |||
9bd6dec539 | |||
7893871a31 | |||
8f630a1d62 | |||
552d51e539 | |||
302a7c7465 | |||
3fef5da382 |
@@ -1,3 +1,5 @@
|
||||
import type { WithImplicitCoercion } from 'node:buffer';
|
||||
|
||||
const msInOneDay = 24 * 60 * 60 * 1000; // Milliseconds in a day.
|
||||
|
||||
export const getMD5Buffer = (base64MD5: WithImplicitCoercion<string> | Uint8Array) =>
|
||||
|
@@ -129,8 +129,9 @@ export function validateAndFilterMpuParts(
|
||||
key: item.key,
|
||||
ETag: `"${item.value.ETag}"`,
|
||||
size: item.value.Size,
|
||||
locations: Array.isArray(item.value.partLocations) ?
|
||||
item.value.partLocations : [item.value.partLocations],
|
||||
locations: (item.value.location||item.value.partLocations) instanceof Array
|
||||
? (item.value.location||item.value.partLocations)
|
||||
: [(item.value.location||item.value.partLocations)],
|
||||
});
|
||||
});
|
||||
keysToDelete.push(mpuOverviewKey);
|
||||
|
@@ -14,7 +14,7 @@ const GcpClient = require('./external/GcpClient');
|
||||
const PfsClient = require('./external/PfsClient');
|
||||
const backendUtils = require('./external/utils');
|
||||
|
||||
function parseLC(config, vault) {
|
||||
function parseLC(config, vault, metadata) {
|
||||
const clients = {};
|
||||
|
||||
Object.keys(config.locationConstraints).forEach(location => {
|
||||
@@ -27,7 +27,7 @@ function parseLC(config, vault) {
|
||||
}
|
||||
if (locationObj.type === 'vitastor') {
|
||||
const VitastorBackend = require('./vitastor/VitastorBackend');
|
||||
clients[location] = new VitastorBackend(location, locationObj.details);
|
||||
clients[location] = new VitastorBackend(location, locationObj.details, metadata);
|
||||
}
|
||||
if (locationObj.type === 'scality') {
|
||||
if (locationObj.details.connector.sproxyd) {
|
||||
|
89
lib/storage/data/vitastor/KVWrapper.spec.ts
Normal file
89
lib/storage/data/vitastor/KVWrapper.spec.ts
Normal file
@@ -0,0 +1,89 @@
|
||||
// Zenko CloudServer Vitastor data storage backend adapter
|
||||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
const assert = require('assert');
|
||||
|
||||
const { MongoMemoryReplSet } = require('mongodb-memory-server');
|
||||
|
||||
import { MongoClient, Db } from 'mongodb';
|
||||
|
||||
import { MongoKVWrapper } from './KVWrapper';
|
||||
|
||||
const mongoserver = new MongoMemoryReplSet({
|
||||
debug: false,
|
||||
instanceOpts: [
|
||||
{ port: 27021 },
|
||||
],
|
||||
replSet: {
|
||||
name: 'customSetName',
|
||||
count: 1,
|
||||
dbName: 'mongokvtest',
|
||||
storageEngine: 'ephemeralForTest',
|
||||
},
|
||||
});
|
||||
|
||||
let kv: MongoKVWrapper|null = null;
|
||||
|
||||
beforeAll(async () =>
|
||||
{
|
||||
await mongoserver.start();
|
||||
await mongoserver.waitUntilRunning();
|
||||
kv = new MongoKVWrapper('mongodb://127.0.0.1:27021/?w=majority&readPreference=primary&replicaSet=customSetName',
|
||||
'mongokvtest', 'volumes');
|
||||
await kv.open();
|
||||
});
|
||||
|
||||
afterAll(async () =>
|
||||
{
|
||||
if (kv)
|
||||
await kv.close();
|
||||
await mongoserver.stop();
|
||||
});
|
||||
|
||||
test('MongoKVWrapper', async () =>
|
||||
{
|
||||
if (!kv)
|
||||
throw new Error('not connected');
|
||||
// insert a number and check that the type is preserved
|
||||
assert.strictEqual(await kv.get('id'), null);
|
||||
assert.strictEqual(await kv.update('id', null, 123), true);
|
||||
assert.strictEqual(await kv.get('id'), 123);
|
||||
// vol_1_1 not exists
|
||||
assert.strictEqual(await kv.get('vol_1_1'), null);
|
||||
const hdr = { created: Date.now(), bucket: 'helloworld' };
|
||||
// update non-existing key - should fail
|
||||
assert.strictEqual(await kv.update('vol_1_1', {}, hdr), false);
|
||||
// key is not affected by a failed operation
|
||||
assert.strictEqual(await kv.get('vol_1_1'), null);
|
||||
// create key
|
||||
assert.strictEqual(await kv.update('vol_1_1', null, hdr), true);
|
||||
assert.deepStrictEqual(await kv.get('vol_1_1'), hdr);
|
||||
// try to create a duplicate key - should fail
|
||||
assert.strictEqual(await kv.update('vol_1_1', null, hdr), false);
|
||||
// key is not affected by a failed operation
|
||||
assert.deepStrictEqual(await kv.get('vol_1_1'), hdr);
|
||||
// update key
|
||||
const hdr2 = { created: Date.now(), bucket: 'helloworld', deleted: Date.now() };
|
||||
assert.strictEqual(await kv.update('vol_1_1', hdr, hdr2), true);
|
||||
assert.deepStrictEqual(await kv.get('vol_1_1'), hdr2);
|
||||
// try to update again from the same old value - should fail
|
||||
assert.strictEqual(await kv.update('vol_1_1', hdr, hdr2), false);
|
||||
// key is not affected by a failed operation
|
||||
assert.deepStrictEqual(await kv.get('vol_1_1'), hdr2);
|
||||
// create 2 more keys
|
||||
assert.strictEqual(await kv.update('vol_1_3', null, { abc: 'def' }), true);
|
||||
assert.strictEqual(await kv.update('vol_1_2', null, { def: 'xyz' }), true);
|
||||
// check listing
|
||||
let lst: any[] = [];
|
||||
for await (const item of kv.list('vol_'))
|
||||
lst.push(item);
|
||||
assert.deepStrictEqual(lst, [ [ 'vol_1_1', hdr2 ], [ 'vol_1_2', { def: 'xyz' } ], [ 'vol_1_3', { 'abc': 'def' } ] ]);
|
||||
lst = [];
|
||||
for await (const item of kv.list('vol_', { def: 'xyz' }))
|
||||
lst.push(item);
|
||||
assert.deepStrictEqual(lst, [ [ 'vol_1_2', { def: 'xyz' } ] ]);
|
||||
// delete key
|
||||
assert.strictEqual(await kv.update('vol_1_1', hdr2, null), true);
|
||||
assert.deepStrictEqual(await kv.get('vol_1_1'), null);
|
||||
});
|
262
lib/storage/data/vitastor/KVWrapper.ts
Normal file
262
lib/storage/data/vitastor/KVWrapper.ts
Normal file
@@ -0,0 +1,262 @@
|
||||
import { MongoClient, Db, Collection, MongoServerError } from 'mongodb';
|
||||
|
||||
interface MongoKV {
|
||||
_id: string;
|
||||
value: any;
|
||||
}
|
||||
|
||||
export class MongoKVWrapper
|
||||
{
|
||||
client: MongoClient | null = null;
|
||||
db: Db | null = null;
|
||||
db_name: string;
|
||||
url: string;
|
||||
collection_name: string;
|
||||
collection: Collection<MongoKV> | null = null;
|
||||
options: any;
|
||||
opened: boolean = false;
|
||||
on_open: ((...args: any[]) => void)[] | null = null;
|
||||
open_error: any;
|
||||
|
||||
constructor(url: string, db_name: string, collection_name: string, options?: any)
|
||||
{
|
||||
this.url = url;
|
||||
this.db_name = db_name;
|
||||
this.collection_name = collection_name;
|
||||
this.options = options;
|
||||
}
|
||||
|
||||
async open()
|
||||
{
|
||||
if (!this.collection)
|
||||
{
|
||||
if (this.on_open)
|
||||
{
|
||||
await new Promise(ok => this.on_open!.push(ok));
|
||||
}
|
||||
else
|
||||
{
|
||||
this.on_open = [];
|
||||
try
|
||||
{
|
||||
this.client = await MongoClient.connect(this.url, this.options);
|
||||
this.db = this.client.db(this.db_name, { ignoreUndefined: true });
|
||||
await this.db.createCollection(this.collection_name);
|
||||
this.collection = this.db.collection<MongoKV>(this.collection_name);
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
if (this.client)
|
||||
{
|
||||
this.client.close().catch(console.error);
|
||||
this.client = null;
|
||||
}
|
||||
this.db = null;
|
||||
this.open_error = e;
|
||||
}
|
||||
this.opened = true;
|
||||
this.on_open.map(cb => setImmediate(cb));
|
||||
this.on_open = null;
|
||||
}
|
||||
}
|
||||
if (this.open_error)
|
||||
{
|
||||
throw this.open_error;
|
||||
}
|
||||
}
|
||||
|
||||
async close()
|
||||
{
|
||||
if (this.collection)
|
||||
{
|
||||
this.collection = null;
|
||||
}
|
||||
if (this.client)
|
||||
{
|
||||
await this.client!.close();
|
||||
this.client = null;
|
||||
}
|
||||
}
|
||||
|
||||
async get(key: string)
|
||||
{
|
||||
const doc = await this.collection!.findOne({ _id: key });
|
||||
return doc ? doc.value : null;
|
||||
}
|
||||
|
||||
async update(key: string, old_value: any, value: any): Promise<boolean>
|
||||
{
|
||||
if (old_value === null)
|
||||
{
|
||||
try
|
||||
{
|
||||
const res = await this.collection!.insertOne({ _id: key, value: value });
|
||||
if (res.insertedId !== key)
|
||||
throw new Error('mongodb: insertOne insertedId='+res.insertedId+', but should be '+key);
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
if ((e instanceof MongoServerError) && e.code == 11000)
|
||||
return false;
|
||||
throw e;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
else if (value !== null)
|
||||
{
|
||||
const doc = await this.collection!.findOneAndUpdate(
|
||||
{ _id: key, value: old_value },
|
||||
{ '$set': { value: value } },
|
||||
);
|
||||
return !!doc.value;
|
||||
}
|
||||
else
|
||||
{
|
||||
const res = await this.collection!.deleteOne({ _id: key, value: old_value });
|
||||
return res.deletedCount > 0;
|
||||
}
|
||||
}
|
||||
|
||||
async* list(start_key: string, filter?: object)
|
||||
{
|
||||
const mongo_filter = { _id: { '$gte': start_key } };
|
||||
if (filter)
|
||||
{
|
||||
for (const key in filter)
|
||||
{
|
||||
mongo_filter["value."+key] = { '$eq': filter[key] };
|
||||
}
|
||||
}
|
||||
for await (const item of this.collection!.find(mongo_filter))
|
||||
{
|
||||
yield [ item._id, item.value ];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class VitastorKVWrapper
|
||||
{
|
||||
config: any;
|
||||
kv: any;
|
||||
cli: any;
|
||||
vitastor: any;
|
||||
opened: boolean = false;
|
||||
on_open: ((...args: any[]) => void)[] | null = null;
|
||||
open_error: any;
|
||||
|
||||
constructor(config, cli, vitastor)
|
||||
{
|
||||
this.config = config;
|
||||
this.cli = cli;
|
||||
this.vitastor = vitastor;
|
||||
this.kv = new vitastor.KV(this.cli);
|
||||
}
|
||||
|
||||
async open()
|
||||
{
|
||||
if (!this.opened)
|
||||
{
|
||||
if (this.on_open)
|
||||
{
|
||||
await new Promise(ok => this.on_open!.push(ok));
|
||||
}
|
||||
else
|
||||
{
|
||||
this.on_open = [];
|
||||
try
|
||||
{
|
||||
if (this.config.metadata_image)
|
||||
{
|
||||
const img = new this.vitastor.Image(this.cli, this.config.metadata_image);
|
||||
const info = await new Promise<{ pool_id: number, inode_num: number }>(ok => img.get_info(ok));
|
||||
if (!info.inode_num)
|
||||
throw new Error('vitastorkv metadata image '+this.config.metadata_image+' does not exist');
|
||||
this.config.metadata_pool_id = info.pool_id;
|
||||
this.config.metadata_inode_num = info.inode_num;
|
||||
}
|
||||
await new Promise<void>((ok, no) => this.kv.open(
|
||||
this.config.metadata_pool_id, this.config.metadata_inode_num,
|
||||
this.config.vitastor || {}, err => (err ? no(new Error(err)) : ok())
|
||||
));
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
this.open_error = e;
|
||||
}
|
||||
this.opened = true;
|
||||
this.on_open.map(cb => setImmediate(cb));
|
||||
this.on_open = null;
|
||||
}
|
||||
}
|
||||
if (this.open_error)
|
||||
{
|
||||
throw this.open_error;
|
||||
}
|
||||
}
|
||||
|
||||
async close()
|
||||
{
|
||||
if (this.opened && !this.open_error)
|
||||
{
|
||||
await new Promise<void>((ok, no) => this.kv.close(err => (err ? no(err) : ok())));
|
||||
this.opened = false;
|
||||
}
|
||||
}
|
||||
|
||||
async get(key: string)
|
||||
{
|
||||
const [ err, prev ] = await new Promise<[ any, string ]>(ok => this.kv.get(key, (err, value) => ok([ err, value ])));
|
||||
if (err == this.vitastor.ENOENT)
|
||||
return null;
|
||||
else if (err)
|
||||
throw new Error('vitastorkv get: error '+err);
|
||||
return JSON.parse(prev);
|
||||
}
|
||||
|
||||
async update(key: string, old_value: any, value: any): Promise<boolean>
|
||||
{
|
||||
const cas = (old_value !== null ? (cas_old => cas_old === JSON.stringify(old_value)) : (cas_old => !cas_old));
|
||||
const err = await new Promise(ok => (value !== null
|
||||
? this.kv.set(key, JSON.stringify(value), ok, cas)
|
||||
: this.kv.del(key, ok, cas)));
|
||||
if (err === this.vitastor.EINTR)
|
||||
return false;
|
||||
else if (err)
|
||||
throw new Error((value !== null ? 'vitastorkv set: error ' : 'vitastorkv del: error ')+err);
|
||||
return true;
|
||||
}
|
||||
|
||||
async* list(start_key: string, filter?: object)
|
||||
{
|
||||
const lst = this.kv.list(start_key);
|
||||
try
|
||||
{
|
||||
next_key: while (true)
|
||||
{
|
||||
const [ err, key, value ] = await new Promise<[ number, string, string ]>(ok => lst.next((e, k, v) => ok([ e, k, v ])));
|
||||
if (err)
|
||||
{
|
||||
if (err != this.vitastor.ENOENT)
|
||||
throw new Error('Error listing: '+err);
|
||||
break;
|
||||
}
|
||||
const decoded = JSON.parse(value);
|
||||
if (filter)
|
||||
{
|
||||
for (const k in filter)
|
||||
{
|
||||
if (decoded[k] != filter[k])
|
||||
{
|
||||
continue next_key;
|
||||
}
|
||||
}
|
||||
}
|
||||
yield [ key, decoded ];
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
lst.close();
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,3 +1,7 @@
|
||||
// Zenko CloudServer Vitastor data storage backend adapter
|
||||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
const assert = require('assert');
|
||||
|
||||
const crypto = require('crypto');
|
||||
@@ -8,19 +12,31 @@ const VitastorMock = require('./VitastorMock');
|
||||
|
||||
test('basic read and write', async () =>
|
||||
{
|
||||
const origNow = Date.now;
|
||||
const startTs = Date.now();
|
||||
const metastub = {};
|
||||
const backend = new VitastorBackend('standard', {
|
||||
pool_id: 1,
|
||||
metadata_pool_id: 2,
|
||||
metadata_inode_num: 1,
|
||||
read_chunk_size: 128*1024,
|
||||
write_chunk_size: 128*1024,
|
||||
}, null, VitastorMock);
|
||||
open_volume_limit: 0,
|
||||
open_volume_max_unused_sec: 30,
|
||||
open_volume_max_garbage: 0.25,
|
||||
closed_volume_max_garbage: 0.25,
|
||||
volume_lock_timeout_sec: 10,
|
||||
volume_lock_interval_sec: 5,
|
||||
min_volume_size: 128*1024,
|
||||
defrag_interval_sec: 600,
|
||||
defrag_cleanup_age_sec: 20,
|
||||
}, metastub, VitastorMock);
|
||||
|
||||
// test put
|
||||
const size = Math.floor(128*1024+Math.random()*128*1024);
|
||||
const putData1 = crypto.randomBytes(size);
|
||||
const getInfo1 = await new Promise((ok, no) => backend.put(new MockReadStream(putData1), size,
|
||||
{ bucketName: 'test', objectKey: 'abcd/efg', partNumber: 2 },
|
||||
{ bucketName: 'testBucket', objectKey: 'abcd/efg', partNumber: 2 },
|
||||
'test:1', (err, res) => err ? no(err) : ok(res)));
|
||||
console.log('object written:', getInfo1);
|
||||
|
||||
@@ -42,7 +58,7 @@ test('basic read and write', async () =>
|
||||
const volData1 = backend.cli.inodes[1][1].data;
|
||||
console.log('object deleted OK:',
|
||||
volData1.slice(getInfo1.offset, getInfo1.offset+16).toString('hex'),
|
||||
volData1.slice(getInfo1.offset+16, getInfo1.offset+16+getInfo1.hdrlen).toString(),
|
||||
volData1.slice(getInfo1.offset+16, getInfo1.offset+getInfo1.hdrlen).toString(),
|
||||
);
|
||||
if (volData1.slice(getInfo1.offset, getInfo1.offset+8).toString() != VitastorBackend.OBJECT_MAGIC)
|
||||
throw new Error('invalid header magic');
|
||||
@@ -51,8 +67,48 @@ test('basic read and write', async () =>
|
||||
if (volData1.readUInt32LE(getInfo1.offset+12) != (getInfo1.hdrlen-16))
|
||||
throw new Error('invalid header json length');
|
||||
|
||||
// test deletion statistics
|
||||
clearTimeout(backend.volume_stats_timer_id);
|
||||
await backend._writeVolumeStats();
|
||||
const volHeader = JSON.parse(backend.kv.kv.data['vol_1_1']);
|
||||
assert(volHeader.removed_objects == 1);
|
||||
assert(volHeader.removed_bytes == size);
|
||||
console.log('deletion statistics written ok');
|
||||
|
||||
// one more put
|
||||
const size2 = Math.floor(128*1024+Math.random()*128*1024);
|
||||
const putData2 = crypto.randomBytes(size);
|
||||
const getInfo2 = await new Promise((ok, no) => backend.put(new MockReadStream(putData2), size,
|
||||
{ bucketName: 'testBucket', objectKey: 'hello' },
|
||||
'test:4', (err, res) => err ? no(err) : ok(res)));
|
||||
console.log('object written:', getInfo2);
|
||||
|
||||
// test closing unused volumes
|
||||
Date.now = () => startTs+45000;
|
||||
clearTimeout(backend.bump_timer_id);
|
||||
await backend._bumpVolumes();
|
||||
assert.deepEqual(backend.volumes, { testBucket: {} });
|
||||
assert.deepEqual(backend.volumes_by_id, { '1': {} });
|
||||
console.log('unused volume closed');
|
||||
|
||||
// test defrag
|
||||
metastub.replaceDataLocations = (bucket, location, replacements, cb) => cb(null, 1);
|
||||
await backend._autoDefrag();
|
||||
console.log(backend.kv.kv.data);
|
||||
assert.equal(JSON.parse(backend.kv.kv.data['vol_1_1']).defrag_ts, Date.now());
|
||||
assert.equal(Object.keys(backend.volumes['testBucket']).length, 1);
|
||||
console.log('basic defrag ok');
|
||||
|
||||
// test purging volume data
|
||||
Date.now = () => startTs+70000;
|
||||
await backend._autoDefrag();
|
||||
assert(backend.cli.inodes[1][1].data.length == 0);
|
||||
assert(backend.kv.kv.data['vol_1_1'] === undefined);
|
||||
console.log('volume data purged ok');
|
||||
|
||||
// stop timers and so on
|
||||
backend.destroy();
|
||||
Date.now = origNow;
|
||||
});
|
||||
|
||||
function readAll(readable)
|
||||
|
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,10 @@
|
||||
// Zenko CloudServer Vitastor data storage backend adapter
|
||||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
const ENOENT = -2;
|
||||
const EINTR = -4;
|
||||
const EINVAL = -22;
|
||||
const IMMEDIATE_NONE = 0;
|
||||
const IMMEDIATE_SMALL = 1;
|
||||
const IMMEDIATE_ALL = 2;
|
||||
@@ -13,6 +18,8 @@ class Client
|
||||
on_read: null | ((pool_id: number, inode_id: number, offset: number, length: number, callback: (number) => void) => void) = null;
|
||||
on_write: null | ((pool_id: number, inode_id: number, offset: number,
|
||||
data: Buffer|Buffer[], cas: { version: number|bigint }|null, callback: (number) => void) => void) = null;
|
||||
on_delete: null | ((pool_id: number, inode_id: number, offset: number,
|
||||
length: number, cas: { version: number|bigint }|null, callback: (number) => void) => void) = null;
|
||||
on_sync: null | ((callback: (number) => void) => void) = null;
|
||||
|
||||
get_min_io_size(pool_id: number)
|
||||
@@ -35,7 +42,7 @@ class Client
|
||||
{
|
||||
if (!this.on_read)
|
||||
{
|
||||
setImmediate(() => this._read(pool_id, inode_id, offset, length, callback));
|
||||
setImmediate(() => this._read(pool_id, inode_id, Number(offset), Number(length), callback));
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -73,31 +80,44 @@ class Client
|
||||
}
|
||||
|
||||
write(pool_id: number, inode_id: number, offset: number|bigint, data: Buffer|Buffer[],
|
||||
cas: { version: number|bigint } | ((err: number|null) => void),
|
||||
options: { version: number|bigint } | ((err: number|null) => void) | null,
|
||||
callback?: (err: number|null) => void)
|
||||
{
|
||||
const cas = options instanceof Function ? null : options;
|
||||
callback = options instanceof Function ? options : callback;
|
||||
if (!callback)
|
||||
{
|
||||
throw new Error('callback is required');
|
||||
}
|
||||
if (!this.on_write)
|
||||
{
|
||||
setImmediate(() => this._write(pool_id, inode_id, offset, data, cas, callback));
|
||||
setImmediate(() => this._write(pool_id, inode_id, offset, data, cas, callback!));
|
||||
}
|
||||
else
|
||||
{
|
||||
this.on_write(pool_id, inode_id, Number(offset), data, cas instanceof Function ? null : cas, (err) =>
|
||||
this.on_write(pool_id, inode_id, Number(offset), data, cas, (err) =>
|
||||
{
|
||||
if (err != 0)
|
||||
callback!(err);
|
||||
else
|
||||
this._write(pool_id, inode_id, offset, data, cas, callback);
|
||||
this._write(pool_id, inode_id, offset, data, cas, callback!);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
_write(pool_id: number, inode_id: number, offset: number|bigint, data: Buffer|Buffer[],
|
||||
cas: { version: number|bigint } | ((err: number|null) => void),
|
||||
callback?: (err: number|null) => void)
|
||||
cas: { version: number|bigint } | null,
|
||||
callback: (err: number|null) => void)
|
||||
{
|
||||
const adata: Buffer[] = (data instanceof Buffer ? [ data ] : data);
|
||||
const length = adata.reduce((a, c) => a + c.length, 0);
|
||||
const first_block = Math.floor(Number(offset) / this.max_atomic_write_size);
|
||||
const last_block = Math.floor((Number(offset) + length - 1) / this.max_atomic_write_size);
|
||||
if (cas && cas.version && first_block != last_block)
|
||||
{
|
||||
callback!(EINVAL);
|
||||
return;
|
||||
}
|
||||
if (!this.inodes[pool_id])
|
||||
{
|
||||
this.inodes[pool_id] = {};
|
||||
@@ -110,8 +130,11 @@ class Client
|
||||
};
|
||||
}
|
||||
const ino = this.inodes[pool_id][inode_id];
|
||||
const first_block = Math.floor(Number(offset) / this.max_atomic_write_size);
|
||||
const last_block = Math.floor((Number(offset) + length - 1) / this.max_atomic_write_size);
|
||||
if (cas && cas.version && BigInt(cas.version)-1n != (ino.versions[first_block] || 0n))
|
||||
{
|
||||
callback!(EINTR);
|
||||
return;
|
||||
}
|
||||
for (let i = first_block; i <= last_block; i++)
|
||||
{
|
||||
ino.versions[i] = (ino.versions[i] || 0n) + 1n;
|
||||
@@ -128,9 +151,74 @@ class Client
|
||||
buf.copy(ino.data, coff);
|
||||
coff += buf.length;
|
||||
}
|
||||
if (cas instanceof Function)
|
||||
callback!(0);
|
||||
}
|
||||
|
||||
delete(pool_id: number, inode_id: number, offset: number|bigint, length: number|bigint,
|
||||
options: { version: number|bigint } | ((err: number|null) => void) | null,
|
||||
callback?: (err: number|null) => void)
|
||||
{
|
||||
const cas = options instanceof Function ? null : options;
|
||||
callback = options instanceof Function ? options : callback;
|
||||
if (!callback)
|
||||
{
|
||||
callback = cas;
|
||||
throw new Error('callback is required');
|
||||
}
|
||||
if (!this.on_delete)
|
||||
{
|
||||
setImmediate(() => this._delete(pool_id, inode_id, offset, length, cas, callback!));
|
||||
}
|
||||
else
|
||||
{
|
||||
this.on_delete(pool_id, inode_id, Number(offset), Number(length), cas, (err) =>
|
||||
{
|
||||
if (err != 0)
|
||||
callback!(err);
|
||||
else
|
||||
this._delete(pool_id, inode_id, offset, length, cas, callback!);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
_delete(pool_id: number, inode_id: number, offset: number|bigint, length: number|bigint,
|
||||
cas: { version: number|bigint } | null,
|
||||
callback: (err: number|null) => void)
|
||||
{
|
||||
const first_block = Math.floor(Number(offset) / this.max_atomic_write_size);
|
||||
const last_block = Math.floor((Number(offset) + Number(length) - 1) / this.max_atomic_write_size);
|
||||
if (cas && cas.version && first_block != last_block)
|
||||
{
|
||||
callback!(EINVAL);
|
||||
return;
|
||||
}
|
||||
if (!this.inodes[pool_id])
|
||||
{
|
||||
this.inodes[pool_id] = {};
|
||||
}
|
||||
if (!this.inodes[pool_id][inode_id])
|
||||
{
|
||||
this.inodes[pool_id][inode_id] = {
|
||||
data: Buffer.alloc(this.max_atomic_write_size < Number(length) ? Number(length) : this.max_atomic_write_size),
|
||||
versions: [],
|
||||
};
|
||||
}
|
||||
const ino = this.inodes[pool_id][inode_id];
|
||||
if (cas && cas.version && BigInt(cas.version)-1n != (ino.versions[first_block] || 0n))
|
||||
{
|
||||
callback!(EINTR);
|
||||
return;
|
||||
}
|
||||
for (let i = first_block; i <= last_block; i++)
|
||||
{
|
||||
delete ino.versions[i];
|
||||
}
|
||||
if (ino.data.length <= (last_block+1)*this.max_atomic_write_size)
|
||||
{
|
||||
ino.data = ino.data.slice(0, first_block*this.max_atomic_write_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
ino.data.fill(0, first_block*this.max_atomic_write_size, (last_block+1)*this.max_atomic_write_size);
|
||||
}
|
||||
callback!(0);
|
||||
}
|
||||
@@ -162,6 +250,7 @@ class Client
|
||||
class KV
|
||||
{
|
||||
data: { [key: string]: string } = {};
|
||||
keys: string[] | null = null;
|
||||
size: number = 0;
|
||||
|
||||
open(pool_id: number, inode_id: number, params: { [key: string]: string }, callback: (err: number|null) => void)
|
||||
@@ -202,6 +291,10 @@ class KV
|
||||
}
|
||||
}
|
||||
this.data[key] = value;
|
||||
if (!(key in this.data))
|
||||
{
|
||||
this.keys = null;
|
||||
}
|
||||
setImmediate(() => callback(null));
|
||||
}
|
||||
|
||||
@@ -217,12 +310,80 @@ class KV
|
||||
}
|
||||
}
|
||||
delete this.data[key];
|
||||
this.keys = null;
|
||||
setImmediate(() => callback(null));
|
||||
}
|
||||
|
||||
list(start_key: string)
|
||||
{
|
||||
throw new Error('not implemented');
|
||||
return new KVListing(this, start_key);
|
||||
}
|
||||
}
|
||||
|
||||
class KVListing
|
||||
{
|
||||
kv: KV;
|
||||
next_key: string;
|
||||
ge: boolean = true;
|
||||
keys: string[] | null = null;
|
||||
pos: number = 0;
|
||||
|
||||
constructor(kv: KV, start_key: string)
|
||||
{
|
||||
this.kv = kv;
|
||||
this.next_key = start_key;
|
||||
}
|
||||
|
||||
next(callback: (err: number|null, key: string|null, value: string|null) => void)
|
||||
{
|
||||
if (this.pos < 0)
|
||||
{
|
||||
setImmediate(() => callback(ENOENT, null, null));
|
||||
return;
|
||||
}
|
||||
if (!this.keys || this.kv.keys != this.keys)
|
||||
{
|
||||
if (!this.kv.keys)
|
||||
this.kv.keys = Object.keys(this.kv.data).sort();
|
||||
this.keys = this.kv.keys;
|
||||
this.pos = 0;
|
||||
if (this.next_key != '')
|
||||
{
|
||||
let start = 0, end = this.keys.length;
|
||||
while (end > start+1)
|
||||
{
|
||||
const mid = 0|((start+end)/2);
|
||||
if (this.next_key < this.keys[mid])
|
||||
start = mid;
|
||||
else
|
||||
end = mid;
|
||||
}
|
||||
if (!this.ge)
|
||||
{
|
||||
while (start < this.keys.length && this.next_key == this.keys[start])
|
||||
start++;
|
||||
}
|
||||
this.pos = start;
|
||||
}
|
||||
}
|
||||
if (this.pos < this.keys.length)
|
||||
{
|
||||
const key = this.keys[this.pos];
|
||||
const value = this.kv.data[key];
|
||||
this.pos++;
|
||||
this.next_key = key;
|
||||
this.ge = false;
|
||||
setImmediate(() => callback(null, key, value));
|
||||
}
|
||||
else
|
||||
{
|
||||
this.pos = -1;
|
||||
setImmediate(() => callback(ENOENT, null, null));
|
||||
}
|
||||
}
|
||||
|
||||
close()
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
@@ -231,6 +392,7 @@ module.exports = {
|
||||
KV,
|
||||
ENOENT,
|
||||
EINTR,
|
||||
EINVAL,
|
||||
IMMEDIATE_NONE,
|
||||
IMMEDIATE_SMALL,
|
||||
IMMEDIATE_ALL,
|
||||
|
@@ -43,7 +43,7 @@ function _parseListEntries(entries) {
|
||||
Initiated: tmp.initiated,
|
||||
Initiator: tmp.initiator,
|
||||
EventualStorageBucket: tmp.eventualStorageBucket,
|
||||
partLocations: tmp.partLocations,
|
||||
location: tmp.location||tmp.partLocations,
|
||||
creationDate: tmp.creationDate,
|
||||
ingestion: tmp.ingestion,
|
||||
},
|
||||
@@ -517,6 +517,13 @@ class MetadataWrapper {
|
||||
});
|
||||
}
|
||||
|
||||
replaceDataLocations(bucketName, dataStoreName, replacements, cb) {
|
||||
if (typeof this.client.replaceDataLocations !== 'function') {
|
||||
return cb(errors.NotImplemented);
|
||||
}
|
||||
return this.client.replaceDataLocations(bucketName, dataStoreName, replacements, cb);
|
||||
}
|
||||
|
||||
/**
|
||||
* updates(insert, if missing) an object that matches the given conditions
|
||||
* @param{string} bucketName -
|
||||
|
@@ -3,8 +3,7 @@
|
||||
const fs = require('fs');
|
||||
const assert = require('assert');
|
||||
const uuid = require('uuid');
|
||||
const level = require('level');
|
||||
const sublevel = require('level-sublevel');
|
||||
const { Level } = require('level');
|
||||
const debug = require('debug')('MetadataFileServer');
|
||||
const diskusage = require('diskusage');
|
||||
const werelogs = require('werelogs');
|
||||
@@ -158,8 +157,8 @@ class MetadataFileServer {
|
||||
// /metadata namespace
|
||||
const namespace = `${constants.metadataFileNamespace}/metadata`;
|
||||
this.logger.info(`creating metadata service at ${namespace}`);
|
||||
this.baseDb = level(`${this.path}/${ROOT_DB}`);
|
||||
this.rootDb = sublevel(this.baseDb);
|
||||
this.baseDb = new Level(`${this.path}/${ROOT_DB}`);
|
||||
this.rootDb = this.baseDb;
|
||||
const dbService = new levelNet.LevelDbService({
|
||||
rootDb: this.rootDb,
|
||||
namespace,
|
||||
|
@@ -1,11 +1,20 @@
|
||||
const errors = require('../../../errors').default;
|
||||
const list = require('../../../algos/list/exportAlgos');
|
||||
const genVID =
|
||||
require('../../../versioning/VersionID').generateVersionId;
|
||||
const { serializeCircJSON } = require('../../utils');
|
||||
|
||||
const getMultipartUploadListing = require('./getMultipartUploadListing');
|
||||
const { metadata } = require('./metadata');
|
||||
|
||||
// const genVID = versioning.VersionID.generateVersionId;
|
||||
|
||||
const defaultMaxKeys = 1000;
|
||||
let uidCounter = 0;
|
||||
|
||||
function generateVersionId(replicationGroupId) {
|
||||
return genVID(uidCounter++, replicationGroupId);
|
||||
}
|
||||
|
||||
function formatVersionKey(key, versionId) {
|
||||
return `${key}\0${versionId}`;
|
||||
@@ -84,12 +93,12 @@ const metastore = {
|
||||
}
|
||||
/*
|
||||
valid combinations of versioning options:
|
||||
- !params.versioning && !params.versionId: normal non-versioning put
|
||||
- params.versioning && !params.versionId: create a new version with objVal.versionId
|
||||
- params.versionId: update (PUT/DELETE) an existing version,
|
||||
- !versioning && !versionId: normal non-versioning put
|
||||
- versioning && !versionId: create a new version
|
||||
- versionId: update (PUT/DELETE) an existing version,
|
||||
and also update master version in case the put
|
||||
version is newer or same version than master.
|
||||
- params.versionId === '': update master version with objVal.versionId
|
||||
if versionId === '' update master version
|
||||
*/
|
||||
|
||||
if (params && params.versionId) {
|
||||
@@ -104,17 +113,17 @@ const metastore = {
|
||||
return cb(null, `{"versionId":"${objVal.versionId}"}`);
|
||||
}
|
||||
if (params && params.versioning) {
|
||||
if (!objVal.versionId)
|
||||
throw new Error('objVal.versionId is required')
|
||||
const versionId = generateVersionId();
|
||||
objVal.versionId = versionId; // eslint-disable-line
|
||||
metadata.keyMaps.get(bucketName).set(objName, objVal);
|
||||
// eslint-disable-next-line
|
||||
objName = formatVersionKey(objName, objVal.versionId);
|
||||
objName = formatVersionKey(objName, versionId);
|
||||
metadata.keyMaps.get(bucketName).set(objName, objVal);
|
||||
return cb(null, `{"versionId":"${objVal.versionId}"}`);
|
||||
return cb(null, `{"versionId":"${versionId}"}`);
|
||||
}
|
||||
if (params && params.versionId === '') {
|
||||
if (!objVal.versionId)
|
||||
throw new Error('objVal.versionId is required')
|
||||
const versionId = generateVersionId();
|
||||
objVal.versionId = versionId; // eslint-disable-line
|
||||
metadata.keyMaps.get(bucketName).set(objName, objVal);
|
||||
return cb(null, `{"versionId":"${objVal.versionId}"}`);
|
||||
}
|
||||
|
@@ -24,7 +24,7 @@ import { MongoClient, Long, Db, MongoClientOptions, ReadPreferenceMode, WithId,
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import diskusage from 'diskusage';
|
||||
|
||||
import { generateVersionId } from '../../../versioning/VersionID';
|
||||
import { generateVersionId as genVID } from '../../../versioning/VersionID';
|
||||
import * as listAlgos from '../../../algos/list/exportAlgos';
|
||||
import LRUCache from '../../../algos/cache/LRUCache';
|
||||
|
||||
@@ -55,9 +55,17 @@ const CONCURRENT_CURSORS = 10;
|
||||
|
||||
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
||||
|
||||
let uidCounter = 0;
|
||||
|
||||
const BUCKET_VERSIONS = VersioningConstants.BucketVersioningKeyFormat;
|
||||
const DB_PREFIXES = VersioningConstants.DbPrefixes;
|
||||
|
||||
function generateVersionId(replicationGroupId) {
|
||||
// generate a unique number for each member of the nodejs cluster
|
||||
return genVID(`${process.pid}.${uidCounter++}`,
|
||||
replicationGroupId);
|
||||
}
|
||||
|
||||
function inc(str) {
|
||||
return str ? (str.slice(0, str.length - 1) +
|
||||
String.fromCharCode(str.charCodeAt(str.length - 1) + 1)) : str;
|
||||
@@ -124,7 +132,8 @@ export type InternalListObjectParams = {
|
||||
mongifiedSearch?: object;
|
||||
listingType?: string;
|
||||
start?: undefined;
|
||||
gt?: undefined
|
||||
gt?: undefined;
|
||||
withLocation?: undefined;
|
||||
};
|
||||
|
||||
export interface InfostoreDocument extends Document {
|
||||
@@ -399,46 +408,41 @@ class MongoClientInterface {
|
||||
!bucketName.startsWith(constants.mpuBucketPrefix)) {
|
||||
payload.$set.vFormat = this.defaultBucketKeyFormat;
|
||||
} else {
|
||||
payload.$set.vFormat = BUCKET_VERSIONS.v0;
|
||||
payload.$set.vFormat = BUCKET_VERSIONS.v0;
|
||||
}
|
||||
|
||||
// we don't have to test bucket existence here as it is done
|
||||
// on the upper layers
|
||||
m.updateOne({
|
||||
_id: bucketName,
|
||||
}, payload, {
|
||||
upsert: true,
|
||||
})
|
||||
.then(() => {
|
||||
// caching bucket vFormat
|
||||
this.bucketVFormatCache.add(bucketName, payload.$set.vFormat);
|
||||
// NOTE: We do not need to create a collection for
|
||||
// "constants.usersBucket" and "PENSIEVE" since it has already
|
||||
// been created
|
||||
if (bucketName !== constants.usersBucket && bucketName !== PENSIEVE) {
|
||||
return this.db!.createCollection(bucketName)
|
||||
.then(() => {
|
||||
if (this.shardCollections) {
|
||||
const cmd = {
|
||||
shardCollection: `${this.database}.${bucketName}`,
|
||||
key: { _id: 1 },
|
||||
};
|
||||
return this.adminDb!.command(cmd, {}).then(() => cb(null)).catch(err => {
|
||||
log.error(
|
||||
'createBucket: enabling sharding',
|
||||
{ error: err });
|
||||
return cb(errors.InternalError);
|
||||
});
|
||||
}
|
||||
return cb(null);
|
||||
});
|
||||
}
|
||||
return cb(null);
|
||||
})
|
||||
.catch(err => {
|
||||
log.error('createBucket: error creating bucket', { error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
});
|
||||
// we don't have to test bucket existence here as it is done on the upper layers
|
||||
(async () => {
|
||||
await m.updateOne({ _id: bucketName }, payload, { upsert: true });
|
||||
|
||||
// caching bucket vFormat
|
||||
this.bucketVFormatCache.add(bucketName, payload.$set.vFormat);
|
||||
// NOTE: We do not need to create a collection for
|
||||
// "constants.usersBucket" and "PENSIEVE" since it has already
|
||||
// been created
|
||||
if (bucketName !== constants.usersBucket && bucketName !== PENSIEVE) {
|
||||
await this.db!.createCollection(bucketName);
|
||||
}
|
||||
|
||||
// Index for replaceDataLocations (Vitastor defrag)
|
||||
await this.getCollection(bucketName).createIndex({
|
||||
'value.location.dataStoreName': 1,
|
||||
'value.location.key': 1,
|
||||
}, { name: 'contentLocationIdx' });
|
||||
|
||||
if (this.shardCollections) {
|
||||
const cmd = {
|
||||
shardCollection: `${this.database}.${bucketName}`,
|
||||
key: { _id: 1 },
|
||||
};
|
||||
await this.adminDb!.command(cmd, {});
|
||||
}
|
||||
|
||||
cb(null);
|
||||
})().catch(err => {
|
||||
log.error('createBucket: error creating bucket', { error: err.message });
|
||||
cb(errors.InternalError);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -765,7 +769,7 @@ class MongoClientInterface {
|
||||
* @param {Object} c bucket collection
|
||||
* @param {String} bucketName bucket name
|
||||
* @param {String} objName object name
|
||||
* @param {Object} objVal object metadata (must have versionId)
|
||||
* @param {Object} objVal object metadata
|
||||
* @param {Object} params params
|
||||
* @param {String} params.vFormat object key format
|
||||
* @param {Object} log logger
|
||||
@@ -777,14 +781,16 @@ class MongoClientInterface {
|
||||
c: Collection<ObjectMetastoreDocument>,
|
||||
bucketName: string,
|
||||
objName: string,
|
||||
objVal: ObjectMDData & { versionId: string },
|
||||
objVal: ObjectMDData,
|
||||
params: ObjectMDOperationParams,
|
||||
log: werelogs.Logger,
|
||||
cb: ArsenalCallback<string>,
|
||||
isRetry?: boolean,
|
||||
) {
|
||||
const versionId = generateVersionId(this.replicationGroupId);
|
||||
// eslint-disable-next-line
|
||||
const versionKey = formatVersionKey(objName, objVal.versionId, params.vFormat);
|
||||
objVal.versionId = versionId;
|
||||
const versionKey = formatVersionKey(objName, versionId, params.vFormat);
|
||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||
// initiating array of operations with version creation
|
||||
const ops: AnyBulkWriteOperation<ObjectMetastoreDocument>[] = [{
|
||||
@@ -825,7 +831,7 @@ class MongoClientInterface {
|
||||
c.bulkWrite(ops, {
|
||||
ordered: true,
|
||||
})
|
||||
.then(() => cb(null, `{"versionId": "${objVal.versionId}"}`))
|
||||
.then(() => cb(null, `{"versionId": "${versionId}"}`))
|
||||
.catch((err) => {
|
||||
/*
|
||||
* Related to https://jira.mongodb.org/browse/SERVER-14322
|
||||
@@ -863,7 +869,7 @@ class MongoClientInterface {
|
||||
}
|
||||
// Otherwise this error is expected, it means that two differents version was put at the
|
||||
// same time
|
||||
return cb(null, `{"versionId": "${objVal.versionId}"}`);
|
||||
return cb(null, `{"versionId": "${versionId}"}`);
|
||||
}
|
||||
log.error('putObjectVerCase1: error putting object version', {
|
||||
error: err.errmsg,
|
||||
@@ -889,12 +895,14 @@ class MongoClientInterface {
|
||||
c: Collection<ObjectMetastoreDocument>,
|
||||
bucketName: string,
|
||||
objName: string,
|
||||
objVal: ObjectMDData & { versionId: string },
|
||||
objVal: ObjectMDData,
|
||||
params: ObjectMDOperationParams,
|
||||
log: werelogs.Logger,
|
||||
cb: ArsenalCallback<string>,
|
||||
) {
|
||||
const versionId = generateVersionId(this.replicationGroupId);
|
||||
// eslint-disable-next-line
|
||||
objVal.versionId = versionId;
|
||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||
c.updateOne({ _id: masterKey },
|
||||
{ $set: { value: objVal }, $setOnInsert: { _id: masterKey } },
|
||||
@@ -1257,11 +1265,9 @@ class MongoClientInterface {
|
||||
return cb(null);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the putObjectVerCase function to use depending on params
|
||||
* See also in_memory/metastore.js putObject
|
||||
*
|
||||
* Returns the putObjectVerCase function to use
|
||||
* depending on params
|
||||
* @param {Object} params params
|
||||
* @return {Function} suitable putObjectVerCase function
|
||||
*/
|
||||
@@ -1316,6 +1322,36 @@ class MongoClientInterface {
|
||||
});
|
||||
}
|
||||
|
||||
replaceDataLocations(
|
||||
bucketName: string,
|
||||
dataStoreName: string,
|
||||
replacements: { oldKey: any, newKey: any }[],
|
||||
cb: ArsenalCallback<number>,
|
||||
): void {
|
||||
// Object versioning and completeMultipartUpload are non-transactional
|
||||
// So in fact such replacement may be prone to inconsistency errors
|
||||
// I.e. the old data reference may still be left in objects being updated in parallel
|
||||
// So the called should recheck replaceDataLocations some time after completion
|
||||
// The other way to do it would be recheck for object modifications in completeMPU and in putObjectCase**
|
||||
const c = this.getCollection<ObjectMetastoreDocument>(bucketName);
|
||||
// collection.findOneAndUpdate(
|
||||
// { 'value.location.dataStoreName': storeName, 'value.location.key': key },
|
||||
// { '$set': { 'value.location.$[element].key': newKey } },
|
||||
// { arrayFilters: [ { 'element.dataStoreName': storeName, 'element.key': key } ] }
|
||||
// )
|
||||
c.bulkWrite(replacements.map(repl => ({
|
||||
updateMany: {
|
||||
filter: { 'value.location.dataStoreName': dataStoreName, 'value.location.key': repl.oldKey },
|
||||
update: { '$set': { 'value.location.$[element].key': repl.newKey } },
|
||||
arrayFilters: [ { 'element.dataStoreName': dataStoreName, 'element.key': repl.oldKey } ],
|
||||
},
|
||||
}))).then(res => {
|
||||
cb(null, res.modifiedCount);
|
||||
}).catch(err => {
|
||||
cb(err);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* gets versioned and non versioned object metadata
|
||||
* @param {String} bucketName bucket name
|
||||
@@ -1728,7 +1764,7 @@ class MongoClientInterface {
|
||||
) {
|
||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||
const versionKey = formatVersionKey(objName, params.versionId, params.vFormat);
|
||||
const _vid = generateVersionId('tmp', this.replicationGroupId);
|
||||
const _vid = generateVersionId(this.replicationGroupId);
|
||||
async.series([
|
||||
next => c.updateOne(
|
||||
{
|
||||
@@ -2120,7 +2156,7 @@ class MongoClientInterface {
|
||||
});
|
||||
if (!params.secondaryStreamParams) {
|
||||
// listing masters only (DelimiterMaster)
|
||||
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch);
|
||||
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.withLocation);
|
||||
baseStream = stream;
|
||||
if (vFormat === BUCKET_VERSIONS.v1) {
|
||||
/**
|
||||
@@ -2181,8 +2217,8 @@ class MongoClientInterface {
|
||||
}
|
||||
} else {
|
||||
// listing both master and version keys (delimiterVersion Algo)
|
||||
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch);
|
||||
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch);
|
||||
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.withLocation);
|
||||
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch, params.withLocation);
|
||||
stream = new MergeStream(
|
||||
versionStream, masterStream, extension.compareObjects.bind(extension));
|
||||
}
|
||||
@@ -2271,6 +2307,7 @@ class MongoClientInterface {
|
||||
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
|
||||
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
|
||||
mongifiedSearch: params.mongifiedSearch,
|
||||
withLocation: params.withLocation,
|
||||
};
|
||||
return this.internalListObject(bucketName, internalParams, extension,
|
||||
vFormat, log, cb);
|
||||
@@ -2307,6 +2344,7 @@ class MongoClientInterface {
|
||||
const internalParams = {
|
||||
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
|
||||
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
|
||||
withLocation: params.withLocation,
|
||||
};
|
||||
|
||||
return this.internalListObject(bucketName, internalParams, extension, vFormat, log, cb);
|
||||
@@ -2338,6 +2376,7 @@ class MongoClientInterface {
|
||||
const internalParams = {
|
||||
mainStreamParams: extensionParams,
|
||||
mongifiedSearch: params.mongifiedSearch,
|
||||
withLocation: params.withLocation,
|
||||
};
|
||||
return this.internalListObject(bucketName, internalParams, extension,
|
||||
BUCKET_VERSIONS.v0, log, cb);
|
||||
|
@@ -2,7 +2,7 @@ const Readable = require('stream').Readable;
|
||||
const MongoUtils = require('./utils');
|
||||
|
||||
class MongoReadStream extends Readable {
|
||||
constructor(c, options, searchOptions) {
|
||||
constructor(c, options, searchOptions, withLocation) {
|
||||
super({
|
||||
objectMode: true,
|
||||
highWaterMark: 0,
|
||||
@@ -85,7 +85,7 @@ class MongoReadStream extends Readable {
|
||||
Object.assign(query, searchOptions);
|
||||
}
|
||||
|
||||
const projection = { 'value.location': 0 };
|
||||
const projection = withLocation ? undefined : { 'value.location': 0 };
|
||||
this._cursor = c.find(query, { projection }).sort({
|
||||
_id: options.reverse ? -1 : 1,
|
||||
});
|
||||
|
13
package.json
13
package.json
@@ -32,15 +32,14 @@
|
||||
"bson": "^4.0.0",
|
||||
"debug": "^4.1.0",
|
||||
"diskusage": "^1.1.1",
|
||||
"fcntl": "git+https://git.yourcmc.ru/vitalif/zenko-fcntl.git",
|
||||
"httpagent": "git+https://git.yourcmc.ru/vitalif/zenko-httpagent.git#development/1.0",
|
||||
"fcntl": "../fcntl",
|
||||
"httpagent": "../httpagent",
|
||||
"https-proxy-agent": "^2.2.0",
|
||||
"ioredis": "^4.28.5",
|
||||
"ipaddr.js": "^1.9.1",
|
||||
"joi": "^17.6.0",
|
||||
"JSONStream": "^1.0.0",
|
||||
"level": "^5.0.1",
|
||||
"level-sublevel": "^6.6.5",
|
||||
"level": "^9.0.0",
|
||||
"mongodb": "^5.2.0",
|
||||
"node-forge": "^1.3.0",
|
||||
"prom-client": "^14.2.0",
|
||||
@@ -49,7 +48,7 @@
|
||||
"socket.io-client": "^4.6.1",
|
||||
"utf8": "^3.0.0",
|
||||
"uuid": "^8.3.2",
|
||||
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1",
|
||||
"werelogs": "../werelogs",
|
||||
"xml2js": "^0.4.23"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
@@ -63,11 +62,11 @@
|
||||
"@types/utf8": "^3.0.1",
|
||||
"@types/ioredis": "^4.28.10",
|
||||
"@types/jest": "^27.4.1",
|
||||
"@types/node": "^18.19.41",
|
||||
"@types/node": "^22.13.10",
|
||||
"@types/xml2js": "^0.4.11",
|
||||
"eslint": "^8.14.0",
|
||||
"eslint-config-airbnb-base": "^15.0.0",
|
||||
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git",
|
||||
"eslint-config-scality": "../eslint-config-scality",
|
||||
"eslint-plugin-react": "^4.3.0",
|
||||
"jest": "^27.5.1",
|
||||
"mongodb-memory-server": "^8.12.2",
|
||||
|
Reference in New Issue
Block a user