Compare commits

..

No commits in common. "df7d30c164d8124ddff1097853f663d17939b3f6" and "a02b8749429253915d20a9ec77f2ddd93d4abece" have entirely different histories.

3 changed files with 30 additions and 31 deletions

View File

@ -279,11 +279,10 @@ class MultipleBackendGateway {
} }
objectTagging(method, key, bucket, objectMD, log, cb) { objectTagging(method, key, bucket, objectMD, log, cb) {
console.log('>> objectTagging', method, key, bucket, objectMD);
// if legacy, objectMD will not contain dataStoreName, so just return // if legacy, objectMD will not contain dataStoreName, so just return
const client = this.clients[objectMD.dataStoreName]; const client = this.clients[objectMD.dataStoreName];
if (client && client[`object${method}Tagging`]) { if (client && client[`object${method}Tagging`]) {
return client[`object${method}Tagging`](key, bucket.getName(), objectMD, log, return client[`object${method}Tagging`](key, bucket, objectMD, log,
cb); cb);
} }
return cb(); return cb();

View File

@ -69,7 +69,6 @@ class AwsClient {
_createAwsKey(requestBucketName, requestObjectKey, _createAwsKey(requestBucketName, requestObjectKey,
bucketMatch) { bucketMatch) {
console.log('===', requestBucketName, requestObjectKey, bucketMatch);
if (bucketMatch) { if (bucketMatch) {
return requestObjectKey; return requestObjectKey;
} }
@ -490,17 +489,14 @@ class AwsClient {
} }
objectPutTagging(key, bucketName, objectMD, log, callback) { objectPutTagging(key, bucketName, objectMD, log, callback) {
console.log('0 >>', this._client.config, this._bucketMatch, '--->', this._createAwsKey(bucketName, key, this._bucketMatch));
const awsBucket = this._awsBucketName; const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch); const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId; const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
console.log('1 >>', JSON.stringify(objectMD), key, awsKey);
const tagParams = { const tagParams = {
Bucket: awsBucket, Bucket: awsBucket,
Key: awsKey, Key: awsKey,
VersionId: dataStoreVersionId, VersionId: dataStoreVersionId,
}; };
console.log('2 >>', tagParams);
const keyArray = Object.keys(objectMD.tags); const keyArray = Object.keys(objectMD.tags);
tagParams.Tagging = {}; tagParams.Tagging = {};
tagParams.Tagging.TagSet = keyArray.map(key => { tagParams.Tagging.TagSet = keyArray.map(key => {
@ -509,7 +505,6 @@ class AwsClient {
}); });
return this._client.putObjectTagging(tagParams, err => { return this._client.putObjectTagging(tagParams, err => {
if (err) { if (err) {
console.log('ERROR!! >>', err);
logHelper(log, 'error', 'error from data backend on ' + logHelper(log, 'error', 'error from data backend on ' +
'putObjectTagging', err, 'putObjectTagging', err,
this._dataStoreName, this.clientType); this._dataStoreName, this.clientType);

View File

@ -156,6 +156,18 @@ class MongoClientInterface {
!Number.isNaN(process.env.MONGO_MAX_POOL_SIZE)) { !Number.isNaN(process.env.MONGO_MAX_POOL_SIZE)) {
options.maxPoolSize = Number.parseInt(process.env.MONGO_MAX_POOL_SIZE, 10); options.maxPoolSize = Number.parseInt(process.env.MONGO_MAX_POOL_SIZE, 10);
} }
if (process.env.MONGODB_NO_NODELAY) {
options.noDelay = false;
}
if (process.env.MONGODB_RAW) {
options.raw = true;
}
if (process.env.MONGODB_TOPOLOGY) {
options.useUnifiedTopology = true;
}
if (process.env.MONGODB_DOMAIN) {
options.domainsEnabled = true;
}
this.bulked = process.env.BULK_FREQ; this.bulked = process.env.BULK_FREQ;
return MongoClient.connect(this.mongoUrl, options, (err, client) => { return MongoClient.connect(this.mongoUrl, options, (err, client) => {
if (err) { if (err) {
@ -193,27 +205,27 @@ class MongoClientInterface {
} }
// Every BULK_FREQ || 10ms, bulk write everything we have // Every BULK_FREQ || 10ms, bulk write everything we have
setInterval(() => { setInterval(() => {
console.log('INTERVAL -- ', this.bulks);
Object.keys(this.bulks).forEach(async key => { Object.keys(this.bulks).forEach(async key => {
const c = this.bulks[key]; const c = this.bulks[key];
if (c.operations.length === 0) { if (c.operations.length === 0) {
console.log(' > NO ', key); delete this.bulks[key];
} else { return null;
let globalOps = [];
let callbacks = [];
c.operations.forEach(op => {
globalOps = globalOps.concat(op.list);
callbacks = callbacks.concat(op.onDone);
});
// Empty the arrays
c.operations = [];
console.log(' > ', key, 'opnb', globalOps.length, 'cbnb', callbacks.length);
await c.client.bulkWrite(globalOps, {
ordered: true,
});
callbacks.forEach(cb => cb());
} }
let globalOps = [];
let callbacks = [];
c.operations.forEach(op => {
globalOps = globalOps.concat(op.list);
callbacks = callbacks.concat(op.onDone);
});
await c.client.bulkWrite(globalOps, {
ordered: true,
});
// TODO error handling
// https://www.mongodb.com/docs/v5.3/reference/method/db.collection.bulkWrite/#error-handling
try {
callbacks.forEach(cb => setImmediate(() => cb()));
} catch(err) {}
delete this.bulk[key];
}); });
}, Number(process.env.BULK_FREQ) || 10); }, Number(process.env.BULK_FREQ) || 10);
} }
@ -375,11 +387,6 @@ class MongoClientInterface {
* @return {undefined} * @return {undefined}
*/ */
getBucketAttributes(bucketName, log, cb) { getBucketAttributes(bucketName, log, cb) {
// Return the cached bucket if exists
const cachedBucket = this.bucketVFormatCache.get(bucketName+'bkt');
if (cachedBucket) {
return cb(null, cachedBucket);
}
const m = this.getCollection(METASTORE); const m = this.getCollection(METASTORE);
m.findOne({ m.findOne({
_id: bucketName, _id: bucketName,
@ -397,8 +404,6 @@ class MongoClientInterface {
// that properly inits w/o JSON.parse() // that properly inits w/o JSON.parse()
const bucketMDStr = JSON.stringify(doc.value); const bucketMDStr = JSON.stringify(doc.value);
const bucketMD = BucketInfo.deSerialize(bucketMDStr); const bucketMD = BucketInfo.deSerialize(bucketMDStr);
// Cache the result
this.bucketVFormatCache.add(bucketName+'bkt', bucketMD);
return cb(null, bucketMD); return cb(null, bucketMD);
}); });
return undefined; return undefined;