Compare commits
3 Commits
521d729249
...
308492308f
Author | SHA1 | Date |
---|---|---|
|
308492308f | |
|
8804658e0a | |
|
f7a9c2099f |
lib
s3middleware
storage
data/vitastor
metadata
|
@ -1,3 +1,5 @@
|
|||
import type { WithImplicitCoercion } from 'node:buffer';
|
||||
|
||||
const msInOneDay = 24 * 60 * 60 * 1000; // Milliseconds in a day.
|
||||
|
||||
export const getMD5Buffer = (base64MD5: WithImplicitCoercion<string> | Uint8Array) =>
|
||||
|
|
|
@ -129,8 +129,9 @@ export function validateAndFilterMpuParts(
|
|||
key: item.key,
|
||||
ETag: `"${item.value.ETag}"`,
|
||||
size: item.value.Size,
|
||||
locations: Array.isArray(item.value.partLocations) ?
|
||||
item.value.partLocations : [item.value.partLocations],
|
||||
locations: (item.value.location||item.value.partLocations) instanceof Array
|
||||
? (item.value.location||item.value.partLocations)
|
||||
: [(item.value.location||item.value.partLocations)],
|
||||
});
|
||||
});
|
||||
keysToDelete.push(mpuOverviewKey);
|
||||
|
|
|
@ -7,6 +7,8 @@ import { Readable } from 'stream';
|
|||
import { Db } from 'mongodb';
|
||||
import { MongoKVWrapper, VitastorKVWrapper } from './KVWrapper';
|
||||
|
||||
const constants = require('../../../constants');
|
||||
|
||||
const VOLUME_MAGIC = 'VstS3Vol';
|
||||
const OBJECT_MAGIC = 'VstS3Obj';
|
||||
const OBJECT_BIN_HDR_SIZE = 16;
|
||||
|
@ -965,21 +967,28 @@ class VitastorGC
|
|||
const new_get_info = await new Promise<VitastorObjectGetInfo>(
|
||||
(ok, no) => this.backend._put(read_stream, json_hdr, (err, res) => (err ? no(err) : ok(res)))
|
||||
);
|
||||
const updated_count = await new Promise<number>((ok, no) => this.meta_backend.replaceDataLocations(
|
||||
json_hdr.bucket,
|
||||
json_hdr.loc,
|
||||
[ {
|
||||
oldKey: {
|
||||
pool: this.pool_id,
|
||||
volume: this.volume_id,
|
||||
offset: this.pos+offset,
|
||||
hdrlen: OBJECT_BIN_HDR_SIZE+json_len,
|
||||
size: json_hdr.size,
|
||||
},
|
||||
newKey: new_get_info,
|
||||
} ],
|
||||
const replacement = {
|
||||
oldKey: {
|
||||
pool: this.pool_id,
|
||||
volume: this.volume_id,
|
||||
offset: this.pos+offset,
|
||||
hdrlen: OBJECT_BIN_HDR_SIZE+json_len,
|
||||
size: json_hdr.size,
|
||||
},
|
||||
newKey: new_get_info,
|
||||
};
|
||||
let updated_count = await new Promise<number>((ok, no) => this.meta_backend.replaceDataLocations(
|
||||
json_hdr.bucket, json_hdr.loc, [ replacement ],
|
||||
(err, res) => err ? no(err) : ok(res)
|
||||
));
|
||||
if (!updated_count && json_hdr.part_num)
|
||||
{
|
||||
// Also check the shadow bucket just in case if it's an incomplete upload
|
||||
updated_count = await new Promise<number>((ok, no) => this.meta_backend.replaceDataLocations(
|
||||
constants.mpuBucketPrefix+json_hdr.bucket, json_hdr.loc, [ replacement ],
|
||||
(err, res) => err ? no(err) : ok(res)
|
||||
));
|
||||
}
|
||||
// If updated_count is 0 then the object is unreferenced garbage. Mark it as deleted then...
|
||||
if (!updated_count)
|
||||
{
|
||||
|
|
|
@ -43,7 +43,7 @@ function _parseListEntries(entries) {
|
|||
Initiated: tmp.initiated,
|
||||
Initiator: tmp.initiator,
|
||||
EventualStorageBucket: tmp.eventualStorageBucket,
|
||||
partLocations: tmp.partLocations,
|
||||
location: tmp.location||tmp.partLocations,
|
||||
creationDate: tmp.creationDate,
|
||||
ingestion: tmp.ingestion,
|
||||
},
|
||||
|
|
|
@ -132,7 +132,8 @@ export type InternalListObjectParams = {
|
|||
mongifiedSearch?: object;
|
||||
listingType?: string;
|
||||
start?: undefined;
|
||||
gt?: undefined
|
||||
gt?: undefined;
|
||||
withLocation?: undefined;
|
||||
};
|
||||
|
||||
export interface InfostoreDocument extends Document {
|
||||
|
@ -2155,7 +2156,7 @@ class MongoClientInterface {
|
|||
});
|
||||
if (!params.secondaryStreamParams) {
|
||||
// listing masters only (DelimiterMaster)
|
||||
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch);
|
||||
stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.withLocation);
|
||||
baseStream = stream;
|
||||
if (vFormat === BUCKET_VERSIONS.v1) {
|
||||
/**
|
||||
|
@ -2216,8 +2217,8 @@ class MongoClientInterface {
|
|||
}
|
||||
} else {
|
||||
// listing both master and version keys (delimiterVersion Algo)
|
||||
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch);
|
||||
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch);
|
||||
const masterStream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch, params.withLocation);
|
||||
const versionStream = new MongoReadStream(c, params.secondaryStreamParams, params.mongifiedSearch, params.withLocation);
|
||||
stream = new MergeStream(
|
||||
versionStream, masterStream, extension.compareObjects.bind(extension));
|
||||
}
|
||||
|
@ -2306,6 +2307,7 @@ class MongoClientInterface {
|
|||
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
|
||||
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
|
||||
mongifiedSearch: params.mongifiedSearch,
|
||||
withLocation: params.withLocation,
|
||||
};
|
||||
return this.internalListObject(bucketName, internalParams, extension,
|
||||
vFormat, log, cb);
|
||||
|
@ -2342,6 +2344,7 @@ class MongoClientInterface {
|
|||
const internalParams = {
|
||||
mainStreamParams: Array.isArray(extensionParams) ? extensionParams[0] : extensionParams,
|
||||
secondaryStreamParams: Array.isArray(extensionParams) ? extensionParams[1] : null,
|
||||
withLocation: params.withLocation,
|
||||
};
|
||||
|
||||
return this.internalListObject(bucketName, internalParams, extension, vFormat, log, cb);
|
||||
|
@ -2373,6 +2376,7 @@ class MongoClientInterface {
|
|||
const internalParams = {
|
||||
mainStreamParams: extensionParams,
|
||||
mongifiedSearch: params.mongifiedSearch,
|
||||
withLocation: params.withLocation,
|
||||
};
|
||||
return this.internalListObject(bucketName, internalParams, extension,
|
||||
BUCKET_VERSIONS.v0, log, cb);
|
||||
|
|
|
@ -2,7 +2,7 @@ const Readable = require('stream').Readable;
|
|||
const MongoUtils = require('./utils');
|
||||
|
||||
class MongoReadStream extends Readable {
|
||||
constructor(c, options, searchOptions) {
|
||||
constructor(c, options, searchOptions, withLocation) {
|
||||
super({
|
||||
objectMode: true,
|
||||
highWaterMark: 0,
|
||||
|
@ -85,7 +85,7 @@ class MongoReadStream extends Readable {
|
|||
Object.assign(query, searchOptions);
|
||||
}
|
||||
|
||||
const projection = { 'value.location': 0 };
|
||||
const projection = withLocation ? undefined : { 'value.location': 0 };
|
||||
this._cursor = c.find(query, { projection }).sort({
|
||||
_id: options.reverse ? -1 : 1,
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue