Compare commits

...

7 Commits

Author SHA1 Message Date
williamlardier 8e259f97ef
wip 2023-05-24 00:25:57 +02:00
williamlardier 9193f7a466
wip 2023-05-24 00:18:20 +02:00
williamlardier cca2ee811d
wip 2023-05-23 23:48:14 +02:00
williamlardier 1219a69529
wip 2023-05-23 23:40:24 +02:00
williamlardier 900d3d255a
wip 2023-05-23 17:21:53 +02:00
williamlardier f814fcbfd3
wip 2023-05-23 16:26:15 +02:00
williamlardier b21921d315
wip 2023-05-23 12:27:17 +02:00
3 changed files with 174 additions and 25 deletions

View File

@ -28,20 +28,32 @@ export function checkIPinRangeOrMatch(
}
}
let ipCache = {};
/**
* Parse IP address into object representation
* @param ip - IPV4/IPV6/IPV4-mapped IPV6 address
* @return parsedIp - Object representation of parsed IP
*/
export function parseIp(ip: string): ipaddr.IPv4 | ipaddr.IPv6 | {} {
// Check if the result is cached.
if (ip in ipCache) {
return ipCache[ip];
}
let result = {};
if (ipaddr.IPv4.isValid(ip)) {
return ipaddr.parse(ip);
result = ipaddr.parse(ip);
}
if (ipaddr.IPv6.isValid(ip)) {
// also parses IPv6 mapped IPv4 addresses into IPv4 representation
return ipaddr.process(ip);
else if (ipaddr.IPv6.isValid(ip)) {
// Also parses IPv6 mapped IPv4 addresses into IPv4 representation
result = ipaddr.process(ip);
}
return {};
// Cache the result.
ipCache[ip] = result;
return result;
}
/**

View File

@ -292,6 +292,19 @@ class MetadataWrapper {
});
}
getObjectsMD(bucketName, objNames, params, log, cb) {
log.debug('getting object from metadata');
this.client.getObjects(bucketName, objNames, log, (err, data) => {
if (err) {
log.debug('error from metadata', { implName: this.implName,
err });
return cb(err);
}
log.debug('object retrieved from metadata');
return cb(err, data);
});
}
deleteObjectMD(bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
log.debug('deleting object from metadata');
this.client.deleteObject(bucketName, objName, params, log, err => {
@ -305,6 +318,19 @@ class MetadataWrapper {
}, originOp);
}
batchDeleteObjectMD(bucketName, objArray, log, cb, originOp = 's3:ObjectRemoved:Delete') {
log.debug('deleting object from metadata');
this.client.deleteObjects(bucketName, objArray, log, err => {
if (err) {
log.debug('error from metadata', { implName: this.implName,
err });
return cb(err);
}
log.debug('object deleted from metadata');
return cb(err);
}, originOp);
}
listObject(bucketName, listingParams, log, cb) {
if (listingParams.listingType === undefined) {
// eslint-disable-next-line

View File

@ -37,6 +37,7 @@ const { Version } = require('../../../versioning/Version');
const { formatMasterKey, formatVersionKey } = require('./utils');
const VID_NONE = '';
let cache = {};
const USERSBUCKET = '__usersbucket';
const METASTORE = '__metastore';
@ -51,6 +52,7 @@ const SOCKET_TIMEOUT_MS = 360000;
const CONCURRENT_CURSORS = 10;
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
const isOptim = process.env.OPTIM === 'true';
let uidCounter = 0;
@ -70,6 +72,8 @@ function inc(str) {
String.fromCharCode(str.charCodeAt(str.length - 1) + 1)) : str;
}
const bucketCache = {};
/**
* @constructor
*
@ -148,6 +152,9 @@ class MongoClientInterface {
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
}
if (process.env.POOL) {
options.maxPoolSize = Number.parseInt(process.env.POOL, 10);
}
return MongoClient.connect(this.mongoUrl, options)
.then(client => {
this.logger.info('connected to mongodb');
@ -354,35 +361,43 @@ class MongoClientInterface {
return undefined;
}
getBucketAndObject(bucketName, objName, params, log, cb) {
this.getBucketAttributes(bucketName, log, (err, bucket) => {
if (err) {
log.error(
'getBucketAttributes: error getting bucket attributes',
{ error: err.message });
return cb(err);
}
const cacheKey = bucketName;
const cachedBucket = cache[cacheKey];
if (cachedBucket) {
this.getObject(bucketName, objName, params, log, (err, obj) => {
if (err) {
if (err.is.NoSuchKey) {
return cb(null,
{
bucket:
BucketInfo.fromObj(bucket).serialize(),
});
return cb(null, { bucket: cachedBucket });
}
log.error('getObject: error getting object',
{ error: err.message });
log.error('getObject: error getting object', { error: err.message });
return cb(err);
}
return cb(null, {
bucket: BucketInfo.fromObj(bucket).serialize(),
obj: JSON.stringify(obj),
return cb(null, { bucket: cachedBucket, obj: JSON.stringify(obj) });
});
} else {
this.getBucketAttributes(bucketName, log, (err, bucket) => {
if (err) {
log.error('getBucketAttributes: error getting bucket attributes', { error: err.message });
return cb(err);
}
const serializedBucket = BucketInfo.fromObj(bucket).serialize();
cache[cacheKey] = serializedBucket; // Cache serialized bucket metadata
this.getObject(bucketName, objName, params, log, (err, obj) => {
if (err) {
if (err.is.NoSuchKey) {
return cb(null, { bucket: serializedBucket });
}
log.error('getObject: error getting object', { error: err.message });
return cb(err);
}
return cb(null, { bucket: serializedBucket, obj: JSON.stringify(obj) });
});
});
return undefined;
});
}
}
putBucketAttributes(bucketName, bucketMD, log, cb) {
// FIXME: there should be a version of BucketInfo.serialize()
@ -1044,6 +1059,27 @@ class MongoClientInterface {
], cb);
}
getObjects(bucketName, objectNames, log, cb) {
const c = this.getCollection(bucketName);
const keys = objectNames.map(objName => '\x7fM' + objName);
c.find({
_id: { $in: keys },
$or: [
{ 'value.deleted': { $exists: false } },
{ 'value.deleted': { $eq: false } },
],
}).toArray()
.then(docs => {
docs.forEach(doc => MongoUtils.unserialize(doc.value));
return cb(null, docs.map(doc => doc.value));
})
.catch(err => {
log.error('find: error getting objects', { bucket: bucketName, objects: objectNames, error: err.message });
return cb(errors.InternalError);
});
}
/**
* This function return the latest version of an object
* by getting all keys related to an object's versions, ordering them
@ -1343,7 +1379,7 @@ class MongoClientInterface {
* @param {String} [originOp=s3:ObjectRemoved:Delete] origin operation
* @return {undefined}
*/
deleteObjectVer(c, bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
deleteObjectVer(c, bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
const masterKey = formatMasterKey(objName, params.vFormat);
async.waterfall([
next => {
@ -1431,6 +1467,17 @@ class MongoClientInterface {
* @return {undefined}
*/
internalDeleteObject(collection, bucketName, key, filter, log, cb, originOp = 's3:ObjectRemoved:Delete') {
if (isOptim) {
// filter used when deleting object
const deleteFilter = Object.assign({
'_id': key,
'value.deleted': true,
}, filter);
return collection.deleteOne(deleteFilter)
.then(() => cb(null))
.catch(err => cb(err));
}
// filter used when finding and updating object
const findFilter = Object.assign({
_id: key,
@ -1528,6 +1575,70 @@ class MongoClientInterface {
});
}
/** PoC */
deleteObjects(bucketName, objs, log, cb, originOp = 's3:ObjectRemoved:Delete') {
const c = this.getCollection(bucketName);
// Split objects based on whether they have a versionId
let objsWithVersion = objs.filter(obj => obj.params && obj.params.versionId);
let objsWithoutVersion = objs.filter(obj => !obj.params || !obj.params.versionId);
async.parallel([
// For objects with version
// (parallelCb) => {
// if (!objsWithVersion.length) {
// return parallelCb();
// }
// this.getBucketVFormat(bucketName, log, (err, vFormat) => {
// if (err) {
// return parallelCb(err);
// }
// // Use the bulk API to handle all objects in one MongoDB operation
// let bulk = c.initializeOrderedBulkOp();
// objsWithVersion.forEach(obj => {
// const masterKey = formatMasterKey(obj.key, vFormat);
// // Add your MongoDB operation here, using the 'bulk' variable
// });
// bulk.execute((err, result) => {
// if (err) {
// log.error('deleteObjects: error deleting versioned objects', { error: err.message, bucket: bucketName });
// return parallelCb(errors.InternalError);
// }
// return parallelCb(null, result);
// });
// });
// },
// For objects without version
(parallelCb) => {
if (!objsWithoutVersion.length) {
return parallelCb();
}
this.getBucketVFormat(bucketName, log, (err, vFormat) => {
if (err) {
return parallelCb(err);
}
// Use the bulk API to handle all objects in one MongoDB operation
const keysToDelete = objsWithoutVersion.map(obj => formatMasterKey(obj.key, vFormat));
c.deleteMany({
_id: { $in: keysToDelete },
}).then(result => {
return parallelCb(null, result);
}).catch(err => {
log.error('deleteObjects: error deleting non-versioned objects', { error: err.message, bucket: bucketName });
return parallelCb(errors.InternalError);
});
});
}
], cb);
}
/**
* internal listing function for buckets
* @param {String} bucketName bucket name