Compare commits
7 Commits
53dac8d233
...
8e259f97ef
Author | SHA1 | Date |
---|---|---|
williamlardier | 8e259f97ef | |
williamlardier | 9193f7a466 | |
williamlardier | cca2ee811d | |
williamlardier | 1219a69529 | |
williamlardier | 900d3d255a | |
williamlardier | f814fcbfd3 | |
williamlardier | b21921d315 |
|
@ -28,20 +28,32 @@ export function checkIPinRangeOrMatch(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let ipCache = {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse IP address into object representation
|
* Parse IP address into object representation
|
||||||
* @param ip - IPV4/IPV6/IPV4-mapped IPV6 address
|
* @param ip - IPV4/IPV6/IPV4-mapped IPV6 address
|
||||||
* @return parsedIp - Object representation of parsed IP
|
* @return parsedIp - Object representation of parsed IP
|
||||||
*/
|
*/
|
||||||
export function parseIp(ip: string): ipaddr.IPv4 | ipaddr.IPv6 | {} {
|
export function parseIp(ip: string): ipaddr.IPv4 | ipaddr.IPv6 | {} {
|
||||||
|
// Check if the result is cached.
|
||||||
|
if (ip in ipCache) {
|
||||||
|
return ipCache[ip];
|
||||||
|
}
|
||||||
|
|
||||||
|
let result = {};
|
||||||
|
|
||||||
if (ipaddr.IPv4.isValid(ip)) {
|
if (ipaddr.IPv4.isValid(ip)) {
|
||||||
return ipaddr.parse(ip);
|
result = ipaddr.parse(ip);
|
||||||
}
|
}
|
||||||
if (ipaddr.IPv6.isValid(ip)) {
|
else if (ipaddr.IPv6.isValid(ip)) {
|
||||||
// also parses IPv6 mapped IPv4 addresses into IPv4 representation
|
// Also parses IPv6 mapped IPv4 addresses into IPv4 representation
|
||||||
return ipaddr.process(ip);
|
result = ipaddr.process(ip);
|
||||||
}
|
}
|
||||||
return {};
|
|
||||||
|
// Cache the result.
|
||||||
|
ipCache[ip] = result;
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -292,6 +292,19 @@ class MetadataWrapper {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getObjectsMD(bucketName, objNames, params, log, cb) {
|
||||||
|
log.debug('getting object from metadata');
|
||||||
|
this.client.getObjects(bucketName, objNames, log, (err, data) => {
|
||||||
|
if (err) {
|
||||||
|
log.debug('error from metadata', { implName: this.implName,
|
||||||
|
err });
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
log.debug('object retrieved from metadata');
|
||||||
|
return cb(err, data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
deleteObjectMD(bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
deleteObjectMD(bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||||
log.debug('deleting object from metadata');
|
log.debug('deleting object from metadata');
|
||||||
this.client.deleteObject(bucketName, objName, params, log, err => {
|
this.client.deleteObject(bucketName, objName, params, log, err => {
|
||||||
|
@ -305,6 +318,19 @@ class MetadataWrapper {
|
||||||
}, originOp);
|
}, originOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
batchDeleteObjectMD(bucketName, objArray, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||||
|
log.debug('deleting object from metadata');
|
||||||
|
this.client.deleteObjects(bucketName, objArray, log, err => {
|
||||||
|
if (err) {
|
||||||
|
log.debug('error from metadata', { implName: this.implName,
|
||||||
|
err });
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
log.debug('object deleted from metadata');
|
||||||
|
return cb(err);
|
||||||
|
}, originOp);
|
||||||
|
}
|
||||||
|
|
||||||
listObject(bucketName, listingParams, log, cb) {
|
listObject(bucketName, listingParams, log, cb) {
|
||||||
if (listingParams.listingType === undefined) {
|
if (listingParams.listingType === undefined) {
|
||||||
// eslint-disable-next-line
|
// eslint-disable-next-line
|
||||||
|
|
|
@ -37,6 +37,7 @@ const { Version } = require('../../../versioning/Version');
|
||||||
const { formatMasterKey, formatVersionKey } = require('./utils');
|
const { formatMasterKey, formatVersionKey } = require('./utils');
|
||||||
|
|
||||||
const VID_NONE = '';
|
const VID_NONE = '';
|
||||||
|
let cache = {};
|
||||||
|
|
||||||
const USERSBUCKET = '__usersbucket';
|
const USERSBUCKET = '__usersbucket';
|
||||||
const METASTORE = '__metastore';
|
const METASTORE = '__metastore';
|
||||||
|
@ -51,6 +52,7 @@ const SOCKET_TIMEOUT_MS = 360000;
|
||||||
const CONCURRENT_CURSORS = 10;
|
const CONCURRENT_CURSORS = 10;
|
||||||
|
|
||||||
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
||||||
|
const isOptim = process.env.OPTIM === 'true';
|
||||||
|
|
||||||
let uidCounter = 0;
|
let uidCounter = 0;
|
||||||
|
|
||||||
|
@ -70,6 +72,8 @@ function inc(str) {
|
||||||
String.fromCharCode(str.charCodeAt(str.length - 1) + 1)) : str;
|
String.fromCharCode(str.charCodeAt(str.length - 1) + 1)) : str;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const bucketCache = {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @constructor
|
* @constructor
|
||||||
*
|
*
|
||||||
|
@ -148,6 +152,9 @@ class MongoClientInterface {
|
||||||
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
|
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
|
||||||
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
|
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
|
||||||
}
|
}
|
||||||
|
if (process.env.POOL) {
|
||||||
|
options.maxPoolSize = Number.parseInt(process.env.POOL, 10);
|
||||||
|
}
|
||||||
return MongoClient.connect(this.mongoUrl, options)
|
return MongoClient.connect(this.mongoUrl, options)
|
||||||
.then(client => {
|
.then(client => {
|
||||||
this.logger.info('connected to mongodb');
|
this.logger.info('connected to mongodb');
|
||||||
|
@ -354,35 +361,43 @@ class MongoClientInterface {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
getBucketAndObject(bucketName, objName, params, log, cb) {
|
getBucketAndObject(bucketName, objName, params, log, cb) {
|
||||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
const cacheKey = bucketName;
|
||||||
if (err) {
|
const cachedBucket = cache[cacheKey];
|
||||||
log.error(
|
if (cachedBucket) {
|
||||||
'getBucketAttributes: error getting bucket attributes',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(err);
|
|
||||||
}
|
|
||||||
this.getObject(bucketName, objName, params, log, (err, obj) => {
|
this.getObject(bucketName, objName, params, log, (err, obj) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (err.is.NoSuchKey) {
|
if (err.is.NoSuchKey) {
|
||||||
return cb(null,
|
return cb(null, { bucket: cachedBucket });
|
||||||
{
|
|
||||||
bucket:
|
|
||||||
BucketInfo.fromObj(bucket).serialize(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
log.error('getObject: error getting object',
|
log.error('getObject: error getting object', { error: err.message });
|
||||||
{ error: err.message });
|
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
return cb(null, {
|
return cb(null, { bucket: cachedBucket, obj: JSON.stringify(obj) });
|
||||||
bucket: BucketInfo.fromObj(bucket).serialize(),
|
});
|
||||||
obj: JSON.stringify(obj),
|
} else {
|
||||||
|
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||||
|
if (err) {
|
||||||
|
log.error('getBucketAttributes: error getting bucket attributes', { error: err.message });
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
const serializedBucket = BucketInfo.fromObj(bucket).serialize();
|
||||||
|
cache[cacheKey] = serializedBucket; // Cache serialized bucket metadata
|
||||||
|
this.getObject(bucketName, objName, params, log, (err, obj) => {
|
||||||
|
if (err) {
|
||||||
|
if (err.is.NoSuchKey) {
|
||||||
|
return cb(null, { bucket: serializedBucket });
|
||||||
|
}
|
||||||
|
log.error('getObject: error getting object', { error: err.message });
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
return cb(null, { bucket: serializedBucket, obj: JSON.stringify(obj) });
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
return undefined;
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
putBucketAttributes(bucketName, bucketMD, log, cb) {
|
putBucketAttributes(bucketName, bucketMD, log, cb) {
|
||||||
// FIXME: there should be a version of BucketInfo.serialize()
|
// FIXME: there should be a version of BucketInfo.serialize()
|
||||||
|
@ -1044,6 +1059,27 @@ class MongoClientInterface {
|
||||||
], cb);
|
], cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getObjects(bucketName, objectNames, log, cb) {
|
||||||
|
const c = this.getCollection(bucketName);
|
||||||
|
const keys = objectNames.map(objName => '\x7fM' + objName);
|
||||||
|
|
||||||
|
c.find({
|
||||||
|
_id: { $in: keys },
|
||||||
|
$or: [
|
||||||
|
{ 'value.deleted': { $exists: false } },
|
||||||
|
{ 'value.deleted': { $eq: false } },
|
||||||
|
],
|
||||||
|
}).toArray()
|
||||||
|
.then(docs => {
|
||||||
|
docs.forEach(doc => MongoUtils.unserialize(doc.value));
|
||||||
|
return cb(null, docs.map(doc => doc.value));
|
||||||
|
})
|
||||||
|
.catch(err => {
|
||||||
|
log.error('find: error getting objects', { bucket: bucketName, objects: objectNames, error: err.message });
|
||||||
|
return cb(errors.InternalError);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This function return the latest version of an object
|
* This function return the latest version of an object
|
||||||
* by getting all keys related to an object's versions, ordering them
|
* by getting all keys related to an object's versions, ordering them
|
||||||
|
@ -1343,7 +1379,7 @@ class MongoClientInterface {
|
||||||
* @param {String} [originOp=s3:ObjectRemoved:Delete] origin operation
|
* @param {String} [originOp=s3:ObjectRemoved:Delete] origin operation
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
deleteObjectVer(c, bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
deleteObjectVer(c, bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||||
async.waterfall([
|
async.waterfall([
|
||||||
next => {
|
next => {
|
||||||
|
@ -1431,6 +1467,17 @@ class MongoClientInterface {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
internalDeleteObject(collection, bucketName, key, filter, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
internalDeleteObject(collection, bucketName, key, filter, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||||
|
if (isOptim) {
|
||||||
|
// filter used when deleting object
|
||||||
|
const deleteFilter = Object.assign({
|
||||||
|
'_id': key,
|
||||||
|
'value.deleted': true,
|
||||||
|
}, filter);
|
||||||
|
|
||||||
|
return collection.deleteOne(deleteFilter)
|
||||||
|
.then(() => cb(null))
|
||||||
|
.catch(err => cb(err));
|
||||||
|
}
|
||||||
// filter used when finding and updating object
|
// filter used when finding and updating object
|
||||||
const findFilter = Object.assign({
|
const findFilter = Object.assign({
|
||||||
_id: key,
|
_id: key,
|
||||||
|
@ -1528,6 +1575,70 @@ class MongoClientInterface {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** PoC */
|
||||||
|
deleteObjects(bucketName, objs, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||||
|
const c = this.getCollection(bucketName);
|
||||||
|
|
||||||
|
// Split objects based on whether they have a versionId
|
||||||
|
let objsWithVersion = objs.filter(obj => obj.params && obj.params.versionId);
|
||||||
|
let objsWithoutVersion = objs.filter(obj => !obj.params || !obj.params.versionId);
|
||||||
|
|
||||||
|
async.parallel([
|
||||||
|
// For objects with version
|
||||||
|
// (parallelCb) => {
|
||||||
|
// if (!objsWithVersion.length) {
|
||||||
|
// return parallelCb();
|
||||||
|
// }
|
||||||
|
|
||||||
|
// this.getBucketVFormat(bucketName, log, (err, vFormat) => {
|
||||||
|
// if (err) {
|
||||||
|
// return parallelCb(err);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Use the bulk API to handle all objects in one MongoDB operation
|
||||||
|
// let bulk = c.initializeOrderedBulkOp();
|
||||||
|
|
||||||
|
// objsWithVersion.forEach(obj => {
|
||||||
|
// const masterKey = formatMasterKey(obj.key, vFormat);
|
||||||
|
// // Add your MongoDB operation here, using the 'bulk' variable
|
||||||
|
// });
|
||||||
|
|
||||||
|
// bulk.execute((err, result) => {
|
||||||
|
// if (err) {
|
||||||
|
// log.error('deleteObjects: error deleting versioned objects', { error: err.message, bucket: bucketName });
|
||||||
|
// return parallelCb(errors.InternalError);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// return parallelCb(null, result);
|
||||||
|
// });
|
||||||
|
// });
|
||||||
|
// },
|
||||||
|
// For objects without version
|
||||||
|
(parallelCb) => {
|
||||||
|
if (!objsWithoutVersion.length) {
|
||||||
|
return parallelCb();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.getBucketVFormat(bucketName, log, (err, vFormat) => {
|
||||||
|
if (err) {
|
||||||
|
return parallelCb(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the bulk API to handle all objects in one MongoDB operation
|
||||||
|
const keysToDelete = objsWithoutVersion.map(obj => formatMasterKey(obj.key, vFormat));
|
||||||
|
c.deleteMany({
|
||||||
|
_id: { $in: keysToDelete },
|
||||||
|
}).then(result => {
|
||||||
|
return parallelCb(null, result);
|
||||||
|
}).catch(err => {
|
||||||
|
log.error('deleteObjects: error deleting non-versioned objects', { error: err.message, bucket: bucketName });
|
||||||
|
return parallelCb(errors.InternalError);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
], cb);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* internal listing function for buckets
|
* internal listing function for buckets
|
||||||
* @param {String} bucketName bucket name
|
* @param {String} bucketName bucket name
|
||||||
|
|
Loading…
Reference in New Issue