Compare commits
10 Commits
a02b874942
...
df7d30c164
Author | SHA1 | Date |
---|---|---|
williamlardier | df7d30c164 | |
williamlardier | ad19118d2c | |
williamlardier | dbd6f439db | |
williamlardier | b2b133b824 | |
williamlardier | 9ee0da7084 | |
williamlardier | 61f26ffe02 | |
williamlardier | 952fe9197a | |
williamlardier | 5ba1cd697b | |
williamlardier | 497c76dfd7 | |
williamlardier | 3595e7ad7b |
|
@ -279,10 +279,11 @@ class MultipleBackendGateway {
|
||||||
}
|
}
|
||||||
|
|
||||||
objectTagging(method, key, bucket, objectMD, log, cb) {
|
objectTagging(method, key, bucket, objectMD, log, cb) {
|
||||||
|
console.log('>> objectTagging', method, key, bucket, objectMD);
|
||||||
// if legacy, objectMD will not contain dataStoreName, so just return
|
// if legacy, objectMD will not contain dataStoreName, so just return
|
||||||
const client = this.clients[objectMD.dataStoreName];
|
const client = this.clients[objectMD.dataStoreName];
|
||||||
if (client && client[`object${method}Tagging`]) {
|
if (client && client[`object${method}Tagging`]) {
|
||||||
return client[`object${method}Tagging`](key, bucket, objectMD, log,
|
return client[`object${method}Tagging`](key, bucket.getName(), objectMD, log,
|
||||||
cb);
|
cb);
|
||||||
}
|
}
|
||||||
return cb();
|
return cb();
|
||||||
|
|
|
@ -69,6 +69,7 @@ class AwsClient {
|
||||||
|
|
||||||
_createAwsKey(requestBucketName, requestObjectKey,
|
_createAwsKey(requestBucketName, requestObjectKey,
|
||||||
bucketMatch) {
|
bucketMatch) {
|
||||||
|
console.log('===', requestBucketName, requestObjectKey, bucketMatch);
|
||||||
if (bucketMatch) {
|
if (bucketMatch) {
|
||||||
return requestObjectKey;
|
return requestObjectKey;
|
||||||
}
|
}
|
||||||
|
@ -489,14 +490,17 @@ class AwsClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
objectPutTagging(key, bucketName, objectMD, log, callback) {
|
objectPutTagging(key, bucketName, objectMD, log, callback) {
|
||||||
|
console.log('0 >>', this._client.config, this._bucketMatch, '--->', this._createAwsKey(bucketName, key, this._bucketMatch));
|
||||||
const awsBucket = this._awsBucketName;
|
const awsBucket = this._awsBucketName;
|
||||||
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
|
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
|
||||||
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
|
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
|
||||||
|
console.log('1 >>', JSON.stringify(objectMD), key, awsKey);
|
||||||
const tagParams = {
|
const tagParams = {
|
||||||
Bucket: awsBucket,
|
Bucket: awsBucket,
|
||||||
Key: awsKey,
|
Key: awsKey,
|
||||||
VersionId: dataStoreVersionId,
|
VersionId: dataStoreVersionId,
|
||||||
};
|
};
|
||||||
|
console.log('2 >>', tagParams);
|
||||||
const keyArray = Object.keys(objectMD.tags);
|
const keyArray = Object.keys(objectMD.tags);
|
||||||
tagParams.Tagging = {};
|
tagParams.Tagging = {};
|
||||||
tagParams.Tagging.TagSet = keyArray.map(key => {
|
tagParams.Tagging.TagSet = keyArray.map(key => {
|
||||||
|
@ -505,6 +509,7 @@ class AwsClient {
|
||||||
});
|
});
|
||||||
return this._client.putObjectTagging(tagParams, err => {
|
return this._client.putObjectTagging(tagParams, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
|
console.log('ERROR!! >>', err);
|
||||||
logHelper(log, 'error', 'error from data backend on ' +
|
logHelper(log, 'error', 'error from data backend on ' +
|
||||||
'putObjectTagging', err,
|
'putObjectTagging', err,
|
||||||
this._dataStoreName, this.clientType);
|
this._dataStoreName, this.clientType);
|
||||||
|
|
|
@ -156,18 +156,6 @@ class MongoClientInterface {
|
||||||
!Number.isNaN(process.env.MONGO_MAX_POOL_SIZE)) {
|
!Number.isNaN(process.env.MONGO_MAX_POOL_SIZE)) {
|
||||||
options.maxPoolSize = Number.parseInt(process.env.MONGO_MAX_POOL_SIZE, 10);
|
options.maxPoolSize = Number.parseInt(process.env.MONGO_MAX_POOL_SIZE, 10);
|
||||||
}
|
}
|
||||||
if (process.env.MONGODB_NO_NODELAY) {
|
|
||||||
options.noDelay = false;
|
|
||||||
}
|
|
||||||
if (process.env.MONGODB_RAW) {
|
|
||||||
options.raw = true;
|
|
||||||
}
|
|
||||||
if (process.env.MONGODB_TOPOLOGY) {
|
|
||||||
options.useUnifiedTopology = true;
|
|
||||||
}
|
|
||||||
if (process.env.MONGODB_DOMAIN) {
|
|
||||||
options.domainsEnabled = true;
|
|
||||||
}
|
|
||||||
this.bulked = process.env.BULK_FREQ;
|
this.bulked = process.env.BULK_FREQ;
|
||||||
return MongoClient.connect(this.mongoUrl, options, (err, client) => {
|
return MongoClient.connect(this.mongoUrl, options, (err, client) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
|
@ -205,27 +193,27 @@ class MongoClientInterface {
|
||||||
}
|
}
|
||||||
// Every BULK_FREQ || 10ms, bulk write everything we have
|
// Every BULK_FREQ || 10ms, bulk write everything we have
|
||||||
setInterval(() => {
|
setInterval(() => {
|
||||||
|
|
||||||
|
console.log('INTERVAL -- ', this.bulks);
|
||||||
Object.keys(this.bulks).forEach(async key => {
|
Object.keys(this.bulks).forEach(async key => {
|
||||||
const c = this.bulks[key];
|
const c = this.bulks[key];
|
||||||
if (c.operations.length === 0) {
|
if (c.operations.length === 0) {
|
||||||
delete this.bulks[key];
|
console.log(' > NO ', key);
|
||||||
return null;
|
} else {
|
||||||
|
let globalOps = [];
|
||||||
|
let callbacks = [];
|
||||||
|
c.operations.forEach(op => {
|
||||||
|
globalOps = globalOps.concat(op.list);
|
||||||
|
callbacks = callbacks.concat(op.onDone);
|
||||||
|
});
|
||||||
|
// Empty the arrays
|
||||||
|
c.operations = [];
|
||||||
|
console.log(' > ', key, 'opnb', globalOps.length, 'cbnb', callbacks.length);
|
||||||
|
await c.client.bulkWrite(globalOps, {
|
||||||
|
ordered: true,
|
||||||
|
});
|
||||||
|
callbacks.forEach(cb => cb());
|
||||||
}
|
}
|
||||||
let globalOps = [];
|
|
||||||
let callbacks = [];
|
|
||||||
c.operations.forEach(op => {
|
|
||||||
globalOps = globalOps.concat(op.list);
|
|
||||||
callbacks = callbacks.concat(op.onDone);
|
|
||||||
});
|
|
||||||
await c.client.bulkWrite(globalOps, {
|
|
||||||
ordered: true,
|
|
||||||
});
|
|
||||||
// TODO error handling
|
|
||||||
// https://www.mongodb.com/docs/v5.3/reference/method/db.collection.bulkWrite/#error-handling
|
|
||||||
try {
|
|
||||||
callbacks.forEach(cb => setImmediate(() => cb()));
|
|
||||||
} catch(err) {}
|
|
||||||
delete this.bulk[key];
|
|
||||||
});
|
});
|
||||||
}, Number(process.env.BULK_FREQ) || 10);
|
}, Number(process.env.BULK_FREQ) || 10);
|
||||||
}
|
}
|
||||||
|
@ -387,6 +375,11 @@ class MongoClientInterface {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
getBucketAttributes(bucketName, log, cb) {
|
getBucketAttributes(bucketName, log, cb) {
|
||||||
|
// Return the cached bucket if exists
|
||||||
|
const cachedBucket = this.bucketVFormatCache.get(bucketName+'bkt');
|
||||||
|
if (cachedBucket) {
|
||||||
|
return cb(null, cachedBucket);
|
||||||
|
}
|
||||||
const m = this.getCollection(METASTORE);
|
const m = this.getCollection(METASTORE);
|
||||||
m.findOne({
|
m.findOne({
|
||||||
_id: bucketName,
|
_id: bucketName,
|
||||||
|
@ -404,6 +397,8 @@ class MongoClientInterface {
|
||||||
// that properly inits w/o JSON.parse()
|
// that properly inits w/o JSON.parse()
|
||||||
const bucketMDStr = JSON.stringify(doc.value);
|
const bucketMDStr = JSON.stringify(doc.value);
|
||||||
const bucketMD = BucketInfo.deSerialize(bucketMDStr);
|
const bucketMD = BucketInfo.deSerialize(bucketMDStr);
|
||||||
|
// Cache the result
|
||||||
|
this.bucketVFormatCache.add(bucketName+'bkt', bucketMD);
|
||||||
return cb(null, bucketMD);
|
return cb(null, bucketMD);
|
||||||
});
|
});
|
||||||
return undefined;
|
return undefined;
|
||||||
|
|
Loading…
Reference in New Issue