Compare commits

...

3 Commits

6 changed files with 38 additions and 22 deletions

View File

@ -1,3 +1,5 @@
import type { WithImplicitCoercion } from 'node:buffer';
const msInOneDay = 24 * 60 * 60 * 1000; // Milliseconds in a day.
export const getMD5Buffer = (base64MD5: WithImplicitCoercion<string> | Uint8Array) =>

View File

@ -129,8 +129,9 @@ export function validateAndFilterMpuParts(
key: item.key,
ETag: `"${item.value.ETag}"`,
size: item.value.Size,
locations: Array.isArray(item.value.partLocations) ?
item.value.partLocations : [item.value.partLocations],
locations: (item.value.location||item.value.partLocations) instanceof Array
? (item.value.location||item.value.partLocations)
: [(item.value.location||item.value.partLocations)],
});
});
keysToDelete.push(mpuOverviewKey);

View File

@ -7,6 +7,8 @@ import { Readable } from 'stream';
import { Db } from 'mongodb';
import { MongoKVWrapper, VitastorKVWrapper } from './KVWrapper';
const constants = require('../../../constants');
const VOLUME_MAGIC = 'VstS3Vol';
const OBJECT_MAGIC = 'VstS3Obj';
const OBJECT_BIN_HDR_SIZE = 16;
@ -965,21 +967,28 @@ class VitastorGC
const new_get_info = await new Promise<VitastorObjectGetInfo>(
(ok, no) => this.backend._put(read_stream, json_hdr, (err, res) => (err ? no(err) : ok(res)))
);
const updated_count = await new Promise<number>((ok, no) => this.meta_backend.replaceDataLocations(
json_hdr.bucket,
json_hdr.loc,
[ {
oldKey: {
pool: this.pool_id,
volume: this.volume_id,
offset: this.pos+offset,
hdrlen: OBJECT_BIN_HDR_SIZE+json_len,
size: json_hdr.size,
},
newKey: new_get_info,
} ],
const replacement = {
oldKey: {
pool: this.pool_id,
volume: this.volume_id,
offset: this.pos+offset,
hdrlen: OBJECT_BIN_HDR_SIZE+json_len,
size: json_hdr.size,
},
newKey: new_get_info,
};
let updated_count = await new Promise<number>((ok, no) => this.meta_backend.replaceDataLocations(
json_hdr.bucket, json_hdr.loc, [ replacement ],
(err, res) => err ? no(err) : ok(res)
));
if (!updated_count && json_hdr.part_num)
{
// Also check the shadow bucket just in case if it's an incomplete upload
updated_count = await new Promise<number>((ok, no) => this.meta_backend.replaceDataLocations(
constants.mpuBucketPrefix+json_hdr.bucket, json_hdr.loc, [ replacement ],
(err, res) => err ? no(err) : ok(res)
));
}
// If updated_count is 0 then the object is unreferenced garbage. Mark it as deleted then...
if (!updated_count)
{

View File

@ -43,7 +43,7 @@ function _parseListEntries(entries) {
Initiated: tmp.initiated,
Initiator: tmp.initiator,
EventualStorageBucket: tmp.eventualStorageBucket,
partLocations: tmp.partLocations,
location: tmp.location||tmp.partLocations,
creationDate: tmp.creationDate,
ingestion: tmp.ingestion,
},

View File

@ -132,7 +132,8 @@ export type InternalListObjectParams = {
mongifiedSearch?: object;
listingType?: string;
start?: undefined;
gt?: undefined
gt?: undefined;
withLocation?: undefined;
};
export interface InfostoreDocument extends Document {
@ -2155,7 +2156,7 @@ class MongoClientInterface {
});
if (!params.secondaryStreamParams) {
// listing masters only (DelimiterMaster)
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch);
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.withLocation);
baseStream = stream;
if (vFormat === BUCKET_VERSIONS.v1) {
/**
@ -2216,8 +2217,8 @@ class MongoClientInterface {
}
} else {
// listing both master and version keys (delimiterVersion Algo)
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch);
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch);
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.withLocation);
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch, params.withLocation);
stream = new MergeStream(
versionStream, masterStream, extension.compareObjects.bind(extension));
}
@ -2306,6 +2307,7 @@ class MongoClientInterface {
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
mongifiedSearch: params.mongifiedSearch,
withLocation: params.withLocation,
};
return this.internalListObject(bucketName, internalParams, extension,
vFormat, log, cb);
@ -2342,6 +2344,7 @@ class MongoClientInterface {
const internalParams = {
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
withLocation: params.withLocation,
};
return this.internalListObject(bucketName, internalParams, extension, vFormat, log, cb);
@ -2373,6 +2376,7 @@ class MongoClientInterface {
const internalParams = {
mainStreamParams: extensionParams,
mongifiedSearch: params.mongifiedSearch,
withLocation: params.withLocation,
};
return this.internalListObject(bucketName, internalParams, extension,
BUCKET_VERSIONS.v0, log, cb);

View File

@ -2,7 +2,7 @@ const Readable = require('stream').Readable;
const MongoUtils = require('./utils');
class MongoReadStream extends Readable {
constructor(c, options, searchOptions) {
constructor(c, options, searchOptions, withLocation) {
super({
objectMode: true,
highWaterMark: 0,
@ -85,7 +85,7 @@ class MongoReadStream extends Readable {
Object.assign(query, searchOptions);
}
const projection = { 'value.location': 0 };
const projection = withLocation ? undefined : { 'value.location': 0 };
this._cursor = c.find(query, { projection }).sort({
_id: options.reverse ? -1 : 1,
});