Compare commits
10 Commits
a02b874942
...
df7d30c164
Author | SHA1 | Date |
---|---|---|
williamlardier | df7d30c164 | |
williamlardier | ad19118d2c | |
williamlardier | dbd6f439db | |
williamlardier | b2b133b824 | |
williamlardier | 9ee0da7084 | |
williamlardier | 61f26ffe02 | |
williamlardier | 952fe9197a | |
williamlardier | 5ba1cd697b | |
williamlardier | 497c76dfd7 | |
williamlardier | 3595e7ad7b |
|
@ -279,10 +279,11 @@ class MultipleBackendGateway {
|
|||
}
|
||||
|
||||
objectTagging(method, key, bucket, objectMD, log, cb) {
|
||||
console.log('>> objectTagging', method, key, bucket, objectMD);
|
||||
// if legacy, objectMD will not contain dataStoreName, so just return
|
||||
const client = this.clients[objectMD.dataStoreName];
|
||||
if (client && client[`object${method}Tagging`]) {
|
||||
return client[`object${method}Tagging`](key, bucket, objectMD, log,
|
||||
return client[`object${method}Tagging`](key, bucket.getName(), objectMD, log,
|
||||
cb);
|
||||
}
|
||||
return cb();
|
||||
|
|
|
@ -69,6 +69,7 @@ class AwsClient {
|
|||
|
||||
_createAwsKey(requestBucketName, requestObjectKey,
|
||||
bucketMatch) {
|
||||
console.log('===', requestBucketName, requestObjectKey, bucketMatch);
|
||||
if (bucketMatch) {
|
||||
return requestObjectKey;
|
||||
}
|
||||
|
@ -489,14 +490,17 @@ class AwsClient {
|
|||
}
|
||||
|
||||
objectPutTagging(key, bucketName, objectMD, log, callback) {
|
||||
console.log('0 >>', this._client.config, this._bucketMatch, '--->', this._createAwsKey(bucketName, key, this._bucketMatch));
|
||||
const awsBucket = this._awsBucketName;
|
||||
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
|
||||
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
|
||||
console.log('1 >>', JSON.stringify(objectMD), key, awsKey);
|
||||
const tagParams = {
|
||||
Bucket: awsBucket,
|
||||
Key: awsKey,
|
||||
VersionId: dataStoreVersionId,
|
||||
};
|
||||
console.log('2 >>', tagParams);
|
||||
const keyArray = Object.keys(objectMD.tags);
|
||||
tagParams.Tagging = {};
|
||||
tagParams.Tagging.TagSet = keyArray.map(key => {
|
||||
|
@ -505,6 +509,7 @@ class AwsClient {
|
|||
});
|
||||
return this._client.putObjectTagging(tagParams, err => {
|
||||
if (err) {
|
||||
console.log('ERROR!! >>', err);
|
||||
logHelper(log, 'error', 'error from data backend on ' +
|
||||
'putObjectTagging', err,
|
||||
this._dataStoreName, this.clientType);
|
||||
|
|
|
@ -156,18 +156,6 @@ class MongoClientInterface {
|
|||
!Number.isNaN(process.env.MONGO_MAX_POOL_SIZE)) {
|
||||
options.maxPoolSize = Number.parseInt(process.env.MONGO_MAX_POOL_SIZE, 10);
|
||||
}
|
||||
if (process.env.MONGODB_NO_NODELAY) {
|
||||
options.noDelay = false;
|
||||
}
|
||||
if (process.env.MONGODB_RAW) {
|
||||
options.raw = true;
|
||||
}
|
||||
if (process.env.MONGODB_TOPOLOGY) {
|
||||
options.useUnifiedTopology = true;
|
||||
}
|
||||
if (process.env.MONGODB_DOMAIN) {
|
||||
options.domainsEnabled = true;
|
||||
}
|
||||
this.bulked = process.env.BULK_FREQ;
|
||||
return MongoClient.connect(this.mongoUrl, options, (err, client) => {
|
||||
if (err) {
|
||||
|
@ -205,27 +193,27 @@ class MongoClientInterface {
|
|||
}
|
||||
// Every BULK_FREQ || 10ms, bulk write everything we have
|
||||
setInterval(() => {
|
||||
|
||||
console.log('INTERVAL -- ', this.bulks);
|
||||
Object.keys(this.bulks).forEach(async key => {
|
||||
const c = this.bulks[key];
|
||||
if (c.operations.length === 0) {
|
||||
delete this.bulks[key];
|
||||
return null;
|
||||
console.log(' > NO ', key);
|
||||
} else {
|
||||
let globalOps = [];
|
||||
let callbacks = [];
|
||||
c.operations.forEach(op => {
|
||||
globalOps = globalOps.concat(op.list);
|
||||
callbacks = callbacks.concat(op.onDone);
|
||||
});
|
||||
// Empty the arrays
|
||||
c.operations = [];
|
||||
console.log(' > ', key, 'opnb', globalOps.length, 'cbnb', callbacks.length);
|
||||
await c.client.bulkWrite(globalOps, {
|
||||
ordered: true,
|
||||
});
|
||||
callbacks.forEach(cb => cb());
|
||||
}
|
||||
let globalOps = [];
|
||||
let callbacks = [];
|
||||
c.operations.forEach(op => {
|
||||
globalOps = globalOps.concat(op.list);
|
||||
callbacks = callbacks.concat(op.onDone);
|
||||
});
|
||||
await c.client.bulkWrite(globalOps, {
|
||||
ordered: true,
|
||||
});
|
||||
// TODO error handling
|
||||
// https://www.mongodb.com/docs/v5.3/reference/method/db.collection.bulkWrite/#error-handling
|
||||
try {
|
||||
callbacks.forEach(cb => setImmediate(() => cb()));
|
||||
} catch(err) {}
|
||||
delete this.bulk[key];
|
||||
});
|
||||
}, Number(process.env.BULK_FREQ) || 10);
|
||||
}
|
||||
|
@ -387,6 +375,11 @@ class MongoClientInterface {
|
|||
* @return {undefined}
|
||||
*/
|
||||
getBucketAttributes(bucketName, log, cb) {
|
||||
// Return the cached bucket if exists
|
||||
const cachedBucket = this.bucketVFormatCache.get(bucketName+'bkt');
|
||||
if (cachedBucket) {
|
||||
return cb(null, cachedBucket);
|
||||
}
|
||||
const m = this.getCollection(METASTORE);
|
||||
m.findOne({
|
||||
_id: bucketName,
|
||||
|
@ -404,6 +397,8 @@ class MongoClientInterface {
|
|||
// that properly inits w/o JSON.parse()
|
||||
const bucketMDStr = JSON.stringify(doc.value);
|
||||
const bucketMD = BucketInfo.deSerialize(bucketMDStr);
|
||||
// Cache the result
|
||||
this.bucketVFormatCache.add(bucketName+'bkt', bucketMD);
|
||||
return cb(null, bucketMD);
|
||||
});
|
||||
return undefined;
|
||||
|
|
Loading…
Reference in New Issue