Compare commits
7 Commits
53dac8d233
...
8e259f97ef
Author | SHA1 | Date |
---|---|---|
williamlardier | 8e259f97ef | |
williamlardier | 9193f7a466 | |
williamlardier | cca2ee811d | |
williamlardier | 1219a69529 | |
williamlardier | 900d3d255a | |
williamlardier | f814fcbfd3 | |
williamlardier | b21921d315 |
|
@ -28,20 +28,32 @@ export function checkIPinRangeOrMatch(
|
|||
}
|
||||
}
|
||||
|
||||
let ipCache = {};
|
||||
|
||||
/**
|
||||
* Parse IP address into object representation
|
||||
* @param ip - IPV4/IPV6/IPV4-mapped IPV6 address
|
||||
* @return parsedIp - Object representation of parsed IP
|
||||
*/
|
||||
export function parseIp(ip: string): ipaddr.IPv4 | ipaddr.IPv6 | {} {
|
||||
// Check if the result is cached.
|
||||
if (ip in ipCache) {
|
||||
return ipCache[ip];
|
||||
}
|
||||
|
||||
let result = {};
|
||||
|
||||
if (ipaddr.IPv4.isValid(ip)) {
|
||||
return ipaddr.parse(ip);
|
||||
result = ipaddr.parse(ip);
|
||||
}
|
||||
if (ipaddr.IPv6.isValid(ip)) {
|
||||
// also parses IPv6 mapped IPv4 addresses into IPv4 representation
|
||||
return ipaddr.process(ip);
|
||||
else if (ipaddr.IPv6.isValid(ip)) {
|
||||
// Also parses IPv6 mapped IPv4 addresses into IPv4 representation
|
||||
result = ipaddr.process(ip);
|
||||
}
|
||||
return {};
|
||||
|
||||
// Cache the result.
|
||||
ipCache[ip] = result;
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -292,6 +292,19 @@ class MetadataWrapper {
|
|||
});
|
||||
}
|
||||
|
||||
getObjectsMD(bucketName, objNames, params, log, cb) {
|
||||
log.debug('getting object from metadata');
|
||||
this.client.getObjects(bucketName, objNames, log, (err, data) => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
err });
|
||||
return cb(err);
|
||||
}
|
||||
log.debug('object retrieved from metadata');
|
||||
return cb(err, data);
|
||||
});
|
||||
}
|
||||
|
||||
deleteObjectMD(bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||
log.debug('deleting object from metadata');
|
||||
this.client.deleteObject(bucketName, objName, params, log, err => {
|
||||
|
@ -305,6 +318,19 @@ class MetadataWrapper {
|
|||
}, originOp);
|
||||
}
|
||||
|
||||
batchDeleteObjectMD(bucketName, objArray, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||
log.debug('deleting object from metadata');
|
||||
this.client.deleteObjects(bucketName, objArray, log, err => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
err });
|
||||
return cb(err);
|
||||
}
|
||||
log.debug('object deleted from metadata');
|
||||
return cb(err);
|
||||
}, originOp);
|
||||
}
|
||||
|
||||
listObject(bucketName, listingParams, log, cb) {
|
||||
if (listingParams.listingType === undefined) {
|
||||
// eslint-disable-next-line
|
||||
|
|
|
@ -37,6 +37,7 @@ const { Version } = require('../../../versioning/Version');
|
|||
const { formatMasterKey, formatVersionKey } = require('./utils');
|
||||
|
||||
const VID_NONE = '';
|
||||
let cache = {};
|
||||
|
||||
const USERSBUCKET = '__usersbucket';
|
||||
const METASTORE = '__metastore';
|
||||
|
@ -51,6 +52,7 @@ const SOCKET_TIMEOUT_MS = 360000;
|
|||
const CONCURRENT_CURSORS = 10;
|
||||
|
||||
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
||||
const isOptim = process.env.OPTIM === 'true';
|
||||
|
||||
let uidCounter = 0;
|
||||
|
||||
|
@ -70,6 +72,8 @@ function inc(str) {
|
|||
String.fromCharCode(str.charCodeAt(str.length - 1) + 1)) : str;
|
||||
}
|
||||
|
||||
const bucketCache = {};
|
||||
|
||||
/**
|
||||
* @constructor
|
||||
*
|
||||
|
@ -148,6 +152,9 @@ class MongoClientInterface {
|
|||
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
|
||||
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
|
||||
}
|
||||
if (process.env.POOL) {
|
||||
options.maxPoolSize = Number.parseInt(process.env.POOL, 10);
|
||||
}
|
||||
return MongoClient.connect(this.mongoUrl, options)
|
||||
.then(client => {
|
||||
this.logger.info('connected to mongodb');
|
||||
|
@ -354,35 +361,43 @@ class MongoClientInterface {
|
|||
return undefined;
|
||||
}
|
||||
|
||||
|
||||
getBucketAndObject(bucketName, objName, params, log, cb) {
|
||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'getBucketAttributes: error getting bucket attributes',
|
||||
{ error: err.message });
|
||||
return cb(err);
|
||||
}
|
||||
const cacheKey = bucketName;
|
||||
const cachedBucket = cache[cacheKey];
|
||||
if (cachedBucket) {
|
||||
this.getObject(bucketName, objName, params, log, (err, obj) => {
|
||||
if (err) {
|
||||
if (err.is.NoSuchKey) {
|
||||
return cb(null,
|
||||
{
|
||||
bucket:
|
||||
BucketInfo.fromObj(bucket).serialize(),
|
||||
});
|
||||
return cb(null, { bucket: cachedBucket });
|
||||
}
|
||||
log.error('getObject: error getting object',
|
||||
{ error: err.message });
|
||||
log.error('getObject: error getting object', { error: err.message });
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, {
|
||||
bucket: BucketInfo.fromObj(bucket).serialize(),
|
||||
obj: JSON.stringify(obj),
|
||||
return cb(null, { bucket: cachedBucket, obj: JSON.stringify(obj) });
|
||||
});
|
||||
} else {
|
||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
log.error('getBucketAttributes: error getting bucket attributes', { error: err.message });
|
||||
return cb(err);
|
||||
}
|
||||
const serializedBucket = BucketInfo.fromObj(bucket).serialize();
|
||||
cache[cacheKey] = serializedBucket; // Cache serialized bucket metadata
|
||||
this.getObject(bucketName, objName, params, log, (err, obj) => {
|
||||
if (err) {
|
||||
if (err.is.NoSuchKey) {
|
||||
return cb(null, { bucket: serializedBucket });
|
||||
}
|
||||
log.error('getObject: error getting object', { error: err.message });
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, { bucket: serializedBucket, obj: JSON.stringify(obj) });
|
||||
});
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
putBucketAttributes(bucketName, bucketMD, log, cb) {
|
||||
// FIXME: there should be a version of BucketInfo.serialize()
|
||||
|
@ -1044,6 +1059,27 @@ class MongoClientInterface {
|
|||
], cb);
|
||||
}
|
||||
|
||||
getObjects(bucketName, objectNames, log, cb) {
|
||||
const c = this.getCollection(bucketName);
|
||||
const keys = objectNames.map(objName => '\x7fM' + objName);
|
||||
|
||||
c.find({
|
||||
_id: { $in: keys },
|
||||
$or: [
|
||||
{ 'value.deleted': { $exists: false } },
|
||||
{ 'value.deleted': { $eq: false } },
|
||||
],
|
||||
}).toArray()
|
||||
.then(docs => {
|
||||
docs.forEach(doc => MongoUtils.unserialize(doc.value));
|
||||
return cb(null, docs.map(doc => doc.value));
|
||||
})
|
||||
.catch(err => {
|
||||
log.error('find: error getting objects', { bucket: bucketName, objects: objectNames, error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* This function return the latest version of an object
|
||||
* by getting all keys related to an object's versions, ordering them
|
||||
|
@ -1343,7 +1379,7 @@ class MongoClientInterface {
|
|||
* @param {String} [originOp=s3:ObjectRemoved:Delete] origin operation
|
||||
* @return {undefined}
|
||||
*/
|
||||
deleteObjectVer(c, bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||
deleteObjectVer(c, bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||
async.waterfall([
|
||||
next => {
|
||||
|
@ -1431,6 +1467,17 @@ class MongoClientInterface {
|
|||
* @return {undefined}
|
||||
*/
|
||||
internalDeleteObject(collection, bucketName, key, filter, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||
if (isOptim) {
|
||||
// filter used when deleting object
|
||||
const deleteFilter = Object.assign({
|
||||
'_id': key,
|
||||
'value.deleted': true,
|
||||
}, filter);
|
||||
|
||||
return collection.deleteOne(deleteFilter)
|
||||
.then(() => cb(null))
|
||||
.catch(err => cb(err));
|
||||
}
|
||||
// filter used when finding and updating object
|
||||
const findFilter = Object.assign({
|
||||
_id: key,
|
||||
|
@ -1528,6 +1575,70 @@ class MongoClientInterface {
|
|||
});
|
||||
}
|
||||
|
||||
/** PoC */
|
||||
deleteObjects(bucketName, objs, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||
const c = this.getCollection(bucketName);
|
||||
|
||||
// Split objects based on whether they have a versionId
|
||||
let objsWithVersion = objs.filter(obj => obj.params && obj.params.versionId);
|
||||
let objsWithoutVersion = objs.filter(obj => !obj.params || !obj.params.versionId);
|
||||
|
||||
async.parallel([
|
||||
// For objects with version
|
||||
// (parallelCb) => {
|
||||
// if (!objsWithVersion.length) {
|
||||
// return parallelCb();
|
||||
// }
|
||||
|
||||
// this.getBucketVFormat(bucketName, log, (err, vFormat) => {
|
||||
// if (err) {
|
||||
// return parallelCb(err);
|
||||
// }
|
||||
|
||||
// // Use the bulk API to handle all objects in one MongoDB operation
|
||||
// let bulk = c.initializeOrderedBulkOp();
|
||||
|
||||
// objsWithVersion.forEach(obj => {
|
||||
// const masterKey = formatMasterKey(obj.key, vFormat);
|
||||
// // Add your MongoDB operation here, using the 'bulk' variable
|
||||
// });
|
||||
|
||||
// bulk.execute((err, result) => {
|
||||
// if (err) {
|
||||
// log.error('deleteObjects: error deleting versioned objects', { error: err.message, bucket: bucketName });
|
||||
// return parallelCb(errors.InternalError);
|
||||
// }
|
||||
|
||||
// return parallelCb(null, result);
|
||||
// });
|
||||
// });
|
||||
// },
|
||||
// For objects without version
|
||||
(parallelCb) => {
|
||||
if (!objsWithoutVersion.length) {
|
||||
return parallelCb();
|
||||
}
|
||||
|
||||
this.getBucketVFormat(bucketName, log, (err, vFormat) => {
|
||||
if (err) {
|
||||
return parallelCb(err);
|
||||
}
|
||||
|
||||
// Use the bulk API to handle all objects in one MongoDB operation
|
||||
const keysToDelete = objsWithoutVersion.map(obj => formatMasterKey(obj.key, vFormat));
|
||||
c.deleteMany({
|
||||
_id: { $in: keysToDelete },
|
||||
}).then(result => {
|
||||
return parallelCb(null, result);
|
||||
}).catch(err => {
|
||||
log.error('deleteObjects: error deleting non-versioned objects', { error: err.message, bucket: bucketName });
|
||||
return parallelCb(errors.InternalError);
|
||||
});
|
||||
});
|
||||
}
|
||||
], cb);
|
||||
}
|
||||
|
||||
/**
|
||||
* internal listing function for buckets
|
||||
* @param {String} bucketName bucket name
|
||||
|
|
Loading…
Reference in New Issue