Compare commits
36 Commits
developmen
...
ft/mongo-l
Author | SHA1 | Date |
---|---|---|
JianqinWang | 04520404dc | |
JianqinWang | a8848cee9a | |
JianqinWang | 934bdffa96 | |
JianqinWang | 7214ff3113 | |
JianqinWang | e8b722b7bb | |
JianqinWang | 40aef12063 | |
JianqinWang | c42f6bc4e2 | |
JianqinWang | 7410cf8f6a | |
JianqinWang | 15668cfd4e | |
JianqinWang | 9318b7207f | |
JianqinWang | 057f67a6f5 | |
JianqinWang | ee40267e2e | |
JianqinWang | fc33bc4591 | |
JianqinWang | 45bbeba92c | |
JianqinWang | b99d140db7 | |
JianqinWang | e4485f2638 | |
JianqinWang | deb87d53c8 | |
JianqinWang | bde1873c22 | |
JianqinWang | cec494d0a3 | |
JianqinWang | 2bbcddebe8 | |
JianqinWang | cc9d1d1307 | |
JianqinWang | 8653a52cb8 | |
JianqinWang | 7957db02fb | |
JianqinWang | ed98815cf2 | |
JianqinWang | 179345ea46 | |
JianqinWang | de0227231b | |
JianqinWang | d7b5fc67b3 | |
JianqinWang | 2f9e388c5e | |
JianqinWang | a79d466495 | |
JianqinWang | e159b1afd9 | |
Lauren Spiegel | dd4c149404 | |
Lauren Spiegel | fa9ce0a095 | |
Lauren Spiegel | 956dc28781 | |
Lauren Spiegel | 703aafeb72 | |
Lauren Spiegel | c023408704 | |
Lauren Spiegel | 790c3866b8 |
15
index.js
15
index.js
|
@ -72,14 +72,23 @@ module.exports = {
|
||||||
},
|
},
|
||||||
storage: {
|
storage: {
|
||||||
metadata: {
|
metadata: {
|
||||||
|
bucketclient: {
|
||||||
|
LogConsumer:
|
||||||
|
require('./lib/storage/metadata/bucketclient/LogConsumer'),
|
||||||
|
},
|
||||||
|
file: {
|
||||||
MetadataFileServer:
|
MetadataFileServer:
|
||||||
require('./lib/storage/metadata/file/MetadataFileServer'),
|
require('./lib/storage/metadata/file/MetadataFileServer'),
|
||||||
MetadataFileClient:
|
MetadataFileClient:
|
||||||
require('./lib/storage/metadata/file/MetadataFileClient'),
|
require('./lib/storage/metadata/file/MetadataFileClient'),
|
||||||
LogConsumer:
|
},
|
||||||
require('./lib/storage/metadata/bucketclient/LogConsumer'),
|
mongoclient: {
|
||||||
MongoClientInterface:
|
MongoClientInterface:
|
||||||
require('./lib/storage/metadata/mongoclient/MongoClientInterface'),
|
require('./lib/storage/metadata/mongoclient/' +
|
||||||
|
'MongoClientInterface'),
|
||||||
|
LogConsumer:
|
||||||
|
require('./lib/storage/metadata/mongoclient/LogConsumer'),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
data: {
|
data: {
|
||||||
file: {
|
file: {
|
||||||
|
|
|
@ -0,0 +1,334 @@
|
||||||
|
'use strict'; // eslint-disable-line
|
||||||
|
|
||||||
|
const stream = require('stream');
|
||||||
|
const MongoClient = require('mongodb').MongoClient;
|
||||||
|
const { Timestamp, Long } = require('bson');
|
||||||
|
|
||||||
|
let lastEndID = undefined;
|
||||||
|
|
||||||
|
const ops = {
|
||||||
|
i: 'put',
|
||||||
|
u: 'put',
|
||||||
|
d: 'delete',
|
||||||
|
};
|
||||||
|
|
||||||
|
class ListRecordStream extends stream.Transform {
|
||||||
|
constructor(logger) {
|
||||||
|
super({ objectMode: true });
|
||||||
|
this.logger = logger;
|
||||||
|
this.hasStarted = false;
|
||||||
|
this.start = undefined;
|
||||||
|
this.end = undefined;
|
||||||
|
this.lastUniqID = undefined;
|
||||||
|
// this.unpublishedListing is true once we pass the oplog that has the
|
||||||
|
// start seq timestamp and uniqID 'h'
|
||||||
|
this.unpublishedListing = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
_transform(itemObj, encoding, callback) {
|
||||||
|
if (!itemObj) {
|
||||||
|
this.push(null);
|
||||||
|
this.emit('info', {
|
||||||
|
start: this.start,
|
||||||
|
end: this.end,
|
||||||
|
uniqID: this.lastUniqID,
|
||||||
|
});
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
// always update to most recent uniqID
|
||||||
|
this.lastUniqID = itemObj.h.toString();
|
||||||
|
|
||||||
|
if (this.end === undefined || itemObj.ts.toNumber() > this.end) {
|
||||||
|
this.end = itemObj.ts.toNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
// only push to stream unpublished objects
|
||||||
|
if (!this.unpublishedListing) {
|
||||||
|
if (lastEndID === itemObj.h.toString()) {
|
||||||
|
this.unpublishedListing = true;
|
||||||
|
}
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.hasStarted) {
|
||||||
|
this.hasStarted = true;
|
||||||
|
this.start = itemObj.ts.toNumber();
|
||||||
|
this.emit('info', {
|
||||||
|
start: this.start,
|
||||||
|
end: this.end,
|
||||||
|
uniqId: this.lastUniqID,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// don't push oplogs that have already been sent
|
||||||
|
if (!this.unpublishedListing) {
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
const streamObject = {
|
||||||
|
timestamp: new Date(itemObj.ts.high_ * 1000),
|
||||||
|
db: itemObj.ns,
|
||||||
|
entries: [
|
||||||
|
{
|
||||||
|
type: ops[itemObj.op],
|
||||||
|
key: itemObj.o._id,
|
||||||
|
value: JSON.stringify(itemObj.o.value),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
return callback(null, streamObject);
|
||||||
|
}
|
||||||
|
|
||||||
|
_flush(callback) {
|
||||||
|
this.emit('info', {
|
||||||
|
start: this.start,
|
||||||
|
end: this.end,
|
||||||
|
uniqID: this.lastUniqID,
|
||||||
|
});
|
||||||
|
this.push(null);
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @class
|
||||||
|
* @classdesc Class to consume mongo oplog
|
||||||
|
*/
|
||||||
|
class LogConsumer {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @constructor
|
||||||
|
*
|
||||||
|
* @param {string} host - string that is MongoDB host
|
||||||
|
* @param {number} port - port at host name for MongoDB instance
|
||||||
|
* @param {string} database - name of database to get replica set oplogs
|
||||||
|
*/
|
||||||
|
constructor(mongoConfig, logger) {
|
||||||
|
const { host, port, database, writeConcern, replicaSet, readPreference } = mongoConfig;
|
||||||
|
// need a way to connect to master
|
||||||
|
// might have to use all replica set members in uri? http://mongodb.github.io/node-mongodb-native/3.0/tutorials/connect/
|
||||||
|
|
||||||
|
// can have second argument {ts: timestamp} to call only entries from some number
|
||||||
|
// this ts gets translated to find({ts: {$gt: ts }}) in mongo language
|
||||||
|
// but if have it here on instantiation, won't work. we are sending the sequence on
|
||||||
|
// each read call.
|
||||||
|
// so might be better to do query like this directly (without module)
|
||||||
|
// https://github.com/cayasso/mongo-oplog/blob/master/src/stream.js#L28
|
||||||
|
|
||||||
|
// and then use the stream method in the same way to get just the number
|
||||||
|
// of entries needed -- http://mongodb.github.io/node-mongodb-native/3.0/api/Cursor.html#stream
|
||||||
|
|
||||||
|
// 'local' is the database where MongoDB has oplogs.rs capped collection
|
||||||
|
this.database = 'local';
|
||||||
|
// this.mongoUrl = format(
|
||||||
|
// 'mongodb://%s:%s/local',
|
||||||
|
// host,
|
||||||
|
// port);
|
||||||
|
this.mongoUrl = 'mongodb://localhost:27018,localhost:27017,localhost:27019/local';
|
||||||
|
this.logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to MongoClient using Mongo node module to access database and
|
||||||
|
* database oplogs (operation logs)
|
||||||
|
*
|
||||||
|
* @param {function} done - callback function, called with an error object
|
||||||
|
* or null and an object as 2nd parameter
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
connectMongo(done) {
|
||||||
|
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
|
||||||
|
(err, client) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error('Unable to connect to MongoDB', err);
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
this.logger.info('connected to mongodb');
|
||||||
|
this.client = client;
|
||||||
|
this.db = client.db(this.database, {
|
||||||
|
ignoreUndefined: true,
|
||||||
|
});
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Read a series of log records from mongo
|
||||||
|
*
|
||||||
|
* @param {Object} [params] - params object
|
||||||
|
* @param {Number} [params.startSeq] - fetch starting from this
|
||||||
|
* sequence number
|
||||||
|
* @param {Number} [params.limit] - maximum number of log records
|
||||||
|
* to return
|
||||||
|
* @param {function} cb - callback function, called with an error
|
||||||
|
* object or null and an object as 2nd parameter
|
||||||
|
* // CONTINUE HERE!!!
|
||||||
|
* object.info contains ...
|
||||||
|
* object.log contains ... (see recordstream below) and _processPrepareEntries function
|
||||||
|
* in backbeat. batchState.logRes is object.log from here.
|
||||||
|
*
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
readRecords(params, cb) {
|
||||||
|
const recordStream = new ListRecordStream(this.logger);
|
||||||
|
const limit = params.limit || 10000;
|
||||||
|
const startIDandSeq = params.startSeq.toString().split('|');
|
||||||
|
const startSeq = parseInt(startIDandSeq[0], 10) || 0;
|
||||||
|
lastEndID = startIDandSeq[1];
|
||||||
|
// need to somehow limit entries to limit and then stop the stream
|
||||||
|
|
||||||
|
this.coll = this.db.collection('oplog.rs');
|
||||||
|
return this.coll.find({
|
||||||
|
ns: /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/,
|
||||||
|
ts: { $gte: Timestamp.fromNumber(startSeq) },
|
||||||
|
}, {
|
||||||
|
limit,
|
||||||
|
tailable: false,
|
||||||
|
awaitData: false,
|
||||||
|
noCursorTimeout: true,
|
||||||
|
OplogReplay: true,
|
||||||
|
numberOfRetries: Number.MAX_VALUE,
|
||||||
|
}, (err, res) => {
|
||||||
|
// console.log('LOG RESPONSE', res);
|
||||||
|
const stream = res.stream();
|
||||||
|
stream.on('data', data => {
|
||||||
|
recordStream.write(data);
|
||||||
|
});
|
||||||
|
stream.on('end', () => {
|
||||||
|
recordStream.write(undefined);
|
||||||
|
});
|
||||||
|
recordStream.once('info', info => {
|
||||||
|
recordStream.removeAllListeners('error');
|
||||||
|
cb(null, { info, log: recordStream });
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// const tester = new LogConsumer({ host: '127.0.0.1', port: ['27017'],
|
||||||
|
// database: 'local', writeConcern: 'majority', replicaSet: 'rs0',
|
||||||
|
// readPreference: 'primary' }, console);
|
||||||
|
//
|
||||||
|
// tester.connectMongo(() => {
|
||||||
|
// return async.waterfall([
|
||||||
|
// next => {
|
||||||
|
// return tester.readRecords({ startSeq: 0, limit: 5 }, (err, res) => {
|
||||||
|
// console.log('cb called for read records');
|
||||||
|
// res.log.on('data', data => {
|
||||||
|
// console.log('Streamed formatted data', data);
|
||||||
|
// });
|
||||||
|
// res.log.on('end', () => {
|
||||||
|
// console.log('ENDED');
|
||||||
|
// return next();
|
||||||
|
// });
|
||||||
|
// });
|
||||||
|
// },
|
||||||
|
// ], () => {});
|
||||||
|
// });
|
||||||
|
|
||||||
|
module.exports = LogConsumer;
|
||||||
|
|
||||||
|
//
|
||||||
|
// sample put:
|
||||||
|
//
|
||||||
|
// insert!! { ts: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1516323641 },
|
||||||
|
// t: 1,
|
||||||
|
// h: Long { _bsontype: 'Long', low_: -1207597211, high_: -1765392211 },
|
||||||
|
// v: 2,
|
||||||
|
// op: 'i',
|
||||||
|
// ns: 'metadata.hello',
|
||||||
|
// ui:
|
||||||
|
// Binary {
|
||||||
|
// _bsontype: 'Binary',
|
||||||
|
// sub_type: 4,
|
||||||
|
// position: 16,
|
||||||
|
// buffer: <Buffer 66 bd 4e 7d 0b 92 4a 12 b8 cc 77 b1 26 8a 94 cb> },
|
||||||
|
// wall: 2018-01-19T01:00:41.798Z,
|
||||||
|
// o:
|
||||||
|
// { _id: 'stuff4',
|
||||||
|
// value:
|
||||||
|
// { 'owner-display-name': 'Bart',
|
||||||
|
// 'owner-id': '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be',
|
||||||
|
// 'content-length': 508,
|
||||||
|
// 'content-type': 'binary/octet-stream',
|
||||||
|
// 'content-md5': '656c3fe292407ccfdc9b125b37cc1930',
|
||||||
|
// 'x-amz-version-id': 'null',
|
||||||
|
// 'x-amz-server-version-id': '',
|
||||||
|
// 'x-amz-storage-class': 'STANDARD',
|
||||||
|
// 'x-amz-server-side-encryption': '',
|
||||||
|
// 'x-amz-server-side-encryption-aws-kms-key-id': '',
|
||||||
|
// 'x-amz-server-side-encryption-customer-algorithm': '',
|
||||||
|
// 'x-amz-website-redirect-location': '',
|
||||||
|
// acl: [Object],
|
||||||
|
// key: '',
|
||||||
|
// location: [Object],
|
||||||
|
// isDeleteMarker: false,
|
||||||
|
// tags: {},
|
||||||
|
// replicationInfo: [Object],
|
||||||
|
// dataStoreName: 'us-east-1',
|
||||||
|
// 'last-modified': '2018-01-19T01:00:41.797Z',
|
||||||
|
// 'md-model-version': 3,
|
||||||
|
// 'x-amz-meta-s3cmd-attrs': 'uid:501/gname:staff/uname:lhs/gid:20/mode:33188/mtime:1508801827/atime:1516323377/md5:656c3fe292407ccfdc9b125b37cc1930/ctime:1508801827' } } }
|
||||||
|
|
||||||
|
// sample update:
|
||||||
|
//
|
||||||
|
// update!! { ts: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1516323702 },
|
||||||
|
// t: 1,
|
||||||
|
// h: Long { _bsontype: 'Long', low_: -1991728232, high_: -721376083 },
|
||||||
|
// v: 2,
|
||||||
|
// op: 'u',
|
||||||
|
// ns: 'metadata.hello',
|
||||||
|
// ui:
|
||||||
|
// Binary {
|
||||||
|
// _bsontype: 'Binary',
|
||||||
|
// sub_type: 4,
|
||||||
|
// position: 16,
|
||||||
|
// buffer: <Buffer 66 bd 4e 7d 0b 92 4a 12 b8 cc 77 b1 26 8a 94 cb> },
|
||||||
|
// o2: { _id: 'stuff4' },
|
||||||
|
// wall: 2018-01-19T01:01:42.050Z,
|
||||||
|
// o:
|
||||||
|
// { _id: 'stuff4',
|
||||||
|
// value:
|
||||||
|
// { 'owner-display-name': 'Bart',
|
||||||
|
// 'owner-id': '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be',
|
||||||
|
// 'content-length': 508,
|
||||||
|
// 'content-type': 'binary/octet-stream',
|
||||||
|
// 'content-md5': '656c3fe292407ccfdc9b125b37cc1930',
|
||||||
|
// 'x-amz-version-id': 'null',
|
||||||
|
// 'x-amz-server-version-id': '',
|
||||||
|
// 'x-amz-storage-class': 'STANDARD',
|
||||||
|
// 'x-amz-server-side-encryption': '',
|
||||||
|
// 'x-amz-server-side-encryption-aws-kms-key-id': '',
|
||||||
|
// 'x-amz-server-side-encryption-customer-algorithm': '',
|
||||||
|
// 'x-amz-website-redirect-location': '',
|
||||||
|
// acl: [Object],
|
||||||
|
// key: '',
|
||||||
|
// location: [Object],
|
||||||
|
// isDeleteMarker: false,
|
||||||
|
// tags: {},
|
||||||
|
// replicationInfo: [Object],
|
||||||
|
// dataStoreName: 'us-east-1',
|
||||||
|
// 'last-modified': '2018-01-19T01:01:42.022Z',
|
||||||
|
// 'md-model-version': 3,
|
||||||
|
// 'x-amz-meta-s3cmd-attrs': 'uid:501/gname:staff/uname:lhs/gid:20/mode:33188/mtime:1508801827/atime:1516323641/md5:656c3fe292407ccfdc9b125b37cc1930/ctime:1508801827' } } }
|
||||||
|
//
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// sample delete:
|
||||||
|
//
|
||||||
|
// delete!! { ts: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1516323746 },
|
||||||
|
// t: 1,
|
||||||
|
// h: Long { _bsontype: 'Long', low_: 2036516563, high_: 689237179 },
|
||||||
|
// v: 2,
|
||||||
|
// op: 'd',
|
||||||
|
// ns: 'metadata.hello',
|
||||||
|
// ui:
|
||||||
|
// Binary {
|
||||||
|
// _bsontype: 'Binary',
|
||||||
|
// sub_type: 4,
|
||||||
|
// position: 16,
|
||||||
|
// buffer: <Buffer 66 bd 4e 7d 0b 92 4a 12 b8 cc 77 b1 26 8a 94 cb> },
|
||||||
|
// wall: 2018-01-19T01:02:26.962Z,
|
||||||
|
// o: { _id: 'stuff4' } }
|
||||||
|
//
|
|
@ -34,8 +34,6 @@ const ASYNC_REPAIR_TIMEOUT = 15000;
|
||||||
|
|
||||||
let uidCounter = 0;
|
let uidCounter = 0;
|
||||||
|
|
||||||
const format = require('util').format;
|
|
||||||
|
|
||||||
const VID_SEP = require('../../../versioning/constants')
|
const VID_SEP = require('../../../versioning/constants')
|
||||||
.VersioningConstants.VersionId.Separator;
|
.VersioningConstants.VersionId.Separator;
|
||||||
|
|
||||||
|
@ -70,30 +68,20 @@ function generatePHDVersion(versionId) {
|
||||||
* @param {Object} params - constructor params
|
* @param {Object} params - constructor params
|
||||||
* @param {String} params.host - host for mongo
|
* @param {String} params.host - host for mongo
|
||||||
* @param {String} params.port - port for mongo
|
* @param {String} params.port - port for mongo
|
||||||
* @param {String} params.writeConcern - writeConcern for mongo
|
* @param {String} params.replicationGroupId - replication group id
|
||||||
* @param {String} params.replicaSet - replicaSet for mongo
|
* used here to generate version id's
|
||||||
* @param {String} params.readPreference - readPreference for mongo
|
* // Does backbeat use this at all? Can we make this optional and
|
||||||
|
* // set a default so when instantiate the client elsewhere don't need?
|
||||||
* @param {String} params.database - name of database
|
* @param {String} params.database - name of database
|
||||||
* @param {werelogs.Logger} params.logger - logger instance
|
* @param {werelogs.Logger} params.logger - logger instance
|
||||||
* @param {String} [params.path] - path for mongo volume
|
* @param {String} [params.path] - path for mongo volume
|
||||||
* @param {String} [params.replicationGroupId] - replication group id
|
|
||||||
* used here to generate version id's on puts. S3 instantiates the class
|
|
||||||
* but backbeat does not
|
|
||||||
*/
|
*/
|
||||||
class MongoClientInterface {
|
class MongoClientInterface {
|
||||||
|
|
||||||
constructor(params) {
|
constructor(params) {
|
||||||
const { host, port,
|
const { host, port, path, database, logger,
|
||||||
writeConcern, replicaSet, readPreference,
|
|
||||||
path, database, logger,
|
|
||||||
replicationGroupId } = params;
|
replicationGroupId } = params;
|
||||||
const mongoUrl = format(
|
const mongoUrl =
|
||||||
'mongodb://%s:%s/w=%s&replicaSet=%s&readPreference=%s',
|
`mongodb://${host}:${port}`;
|
||||||
host,
|
|
||||||
port,
|
|
||||||
writeConcern,
|
|
||||||
replicaSet,
|
|
||||||
readPreference);
|
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.client = null;
|
this.client = null;
|
||||||
this.db = null;
|
this.db = null;
|
||||||
|
@ -106,8 +94,6 @@ class MongoClientInterface {
|
||||||
// initialize this backend
|
// initialize this backend
|
||||||
MongoClient.connect(mongoUrl, (err, client) => {
|
MongoClient.connect(mongoUrl, (err, client) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
this.logger.error('error connecting to mongo server',
|
|
||||||
{ error: err.message });
|
|
||||||
throw (errors.InternalError);
|
throw (errors.InternalError);
|
||||||
}
|
}
|
||||||
this.logger.debug('connected to mongodb');
|
this.logger.debug('connected to mongodb');
|
||||||
|
@ -171,9 +157,6 @@ class MongoClientInterface {
|
||||||
_id: bucketName,
|
_id: bucketName,
|
||||||
}, {}, (err, doc) => {
|
}, {}, (err, doc) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
|
||||||
'getBucketAttributes: error getting bucket attributes',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
if (!doc) {
|
if (!doc) {
|
||||||
|
@ -190,9 +173,6 @@ class MongoClientInterface {
|
||||||
getBucketAndObject(bucketName, objName, params, log, cb) {
|
getBucketAndObject(bucketName, objName, params, log, cb) {
|
||||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
|
||||||
'getBucketAttributes: error getting bucket attributes',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
this.getObject(bucketName, objName, params, log, (err, obj) => {
|
this.getObject(bucketName, objName, params, log, (err, obj) => {
|
||||||
|
@ -203,8 +183,6 @@ class MongoClientInterface {
|
||||||
BucketInfo.fromObj(bucket).serialize(),
|
BucketInfo.fromObj(bucket).serialize(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
log.error('getObject: error getting object',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
return cb(null, {
|
return cb(null, {
|
||||||
|
@ -242,13 +220,9 @@ class MongoClientInterface {
|
||||||
_id: bucketName,
|
_id: bucketName,
|
||||||
}, {}, (err, result) => {
|
}, {}, (err, result) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('deleteBucketStep2: error deleting bucket',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
if (result.ok !== 1) {
|
if (result.ok !== 1) {
|
||||||
log.error('deleteBucketStep2: failed deleting bucket',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
return cb(null);
|
return cb(null);
|
||||||
|
@ -272,8 +246,6 @@ class MongoClientInterface {
|
||||||
if (err.codeName === 'NamespaceNotFound') {
|
if (err.codeName === 'NamespaceNotFound') {
|
||||||
return this.deleteBucketStep2(bucketName, log, cb);
|
return this.deleteBucketStep2(bucketName, log, cb);
|
||||||
}
|
}
|
||||||
log.error('deleteBucket: error deleting bucket',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
return this.deleteBucketStep2(bucketName, log, cb);
|
return this.deleteBucketStep2(bucketName, log, cb);
|
||||||
|
@ -285,8 +257,6 @@ class MongoClientInterface {
|
||||||
* sequentially create the object THEN update the master
|
* sequentially create the object THEN update the master
|
||||||
*/
|
*/
|
||||||
putObjectVerCase1(c, bucketName, objName, objVal, params, log, cb) {
|
putObjectVerCase1(c, bucketName, objName, objVal, params, log, cb) {
|
||||||
// TODO: if backbeat is doing a put, versionId should be pulled
|
|
||||||
// from existing metadata
|
|
||||||
const versionId = generateVersionId(this.replicationGroupId);
|
const versionId = generateVersionId(this.replicationGroupId);
|
||||||
// eslint-disable-next-line
|
// eslint-disable-next-line
|
||||||
objVal.versionId = versionId;
|
objVal.versionId = versionId;
|
||||||
|
@ -321,8 +291,6 @@ class MongoClientInterface {
|
||||||
* have been created with versions
|
* have been created with versions
|
||||||
*/
|
*/
|
||||||
putObjectVerCase2(c, bucketName, objName, objVal, params, log, cb) {
|
putObjectVerCase2(c, bucketName, objName, objVal, params, log, cb) {
|
||||||
// TODO: if backbeat is doing a put, versionId should be pulled
|
|
||||||
// from existing metadata
|
|
||||||
const versionId = generateVersionId(this.replicationGroupId);
|
const versionId = generateVersionId(this.replicationGroupId);
|
||||||
// eslint-disable-next-line
|
// eslint-disable-next-line
|
||||||
objVal.versionId = versionId;
|
objVal.versionId = versionId;
|
||||||
|
@ -383,15 +351,7 @@ class MongoClientInterface {
|
||||||
value: objVal,
|
value: objVal,
|
||||||
}, {
|
}, {
|
||||||
upsert: true,
|
upsert: true,
|
||||||
}, err => {
|
}, () => cb());
|
||||||
if (err) {
|
|
||||||
log.error(
|
|
||||||
'putObjectNoVer: error putting obect with no versioning',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
|
||||||
}
|
|
||||||
return cb();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
putObject(bucketName, objName, objVal, params, log, cb) {
|
putObject(bucketName, objName, objVal, params, log, cb) {
|
||||||
|
@ -421,8 +381,6 @@ class MongoClientInterface {
|
||||||
_id: objName,
|
_id: objName,
|
||||||
}, {}, (err, doc) => {
|
}, {}, (err, doc) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('findOne: error getting object',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
if (!doc) {
|
if (!doc) {
|
||||||
|
@ -431,8 +389,7 @@ class MongoClientInterface {
|
||||||
if (doc.value.isPHD) {
|
if (doc.value.isPHD) {
|
||||||
this.getLatestVersion(c, objName, log, (err, value) => {
|
this.getLatestVersion(c, objName, log, (err, value) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('getLatestVersion: getting latest version',
|
log.error('getting latest version', err);
|
||||||
{ error: err.message });
|
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
return cb(null, value);
|
return cb(null, value);
|
||||||
|
@ -461,9 +418,6 @@ class MongoClientInterface {
|
||||||
toArray(
|
toArray(
|
||||||
(err, keys) => {
|
(err, keys) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
|
||||||
'getLatestVersion: error getting latest version',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
if (keys.length === 0) {
|
if (keys.length === 0) {
|
||||||
|
@ -493,13 +447,9 @@ class MongoClientInterface {
|
||||||
upsert: true,
|
upsert: true,
|
||||||
}, (err, result) => {
|
}, (err, result) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('repair: error trying to repair value',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
if (result.ok !== 1) {
|
if (result.ok !== 1) {
|
||||||
log.error('repair: failed trying to repair value',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
return cb(null);
|
return cb(null);
|
||||||
|
@ -513,13 +463,12 @@ class MongoClientInterface {
|
||||||
asyncRepair(c, objName, mst, log) {
|
asyncRepair(c, objName, mst, log) {
|
||||||
this.getLatestVersion(c, objName, log, (err, value) => {
|
this.getLatestVersion(c, objName, log, (err, value) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('async-repair: getting latest version',
|
log.error('async-repair: getting latest version', err);
|
||||||
{ error: err.message });
|
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
this.repair(c, objName, value, mst, log, err => {
|
this.repair(c, objName, value, mst, log, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('async-repair failed', { error: err.message });
|
log.error('async-repair failed', err);
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
log.debug('async-repair success');
|
log.debug('async-repair success');
|
||||||
|
@ -550,9 +499,6 @@ class MongoClientInterface {
|
||||||
'value.versionId': mst.versionId,
|
'value.versionId': mst.versionId,
|
||||||
}, {}, err => {
|
}, {}, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
|
||||||
'findOneAndDelete: error finding and deleting',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
// do not test result.ok === 1 because
|
// do not test result.ok === 1 because
|
||||||
|
@ -561,8 +507,6 @@ class MongoClientInterface {
|
||||||
});
|
});
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
log.error('getLatestVersion: error getting latest version',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
// We have other versions available so repair:
|
// We have other versions available so repair:
|
||||||
|
@ -582,8 +526,6 @@ class MongoClientInterface {
|
||||||
*/
|
*/
|
||||||
deleteObjectVerMaster(c, bucketName, objName, params, log, cb) {
|
deleteObjectVerMaster(c, bucketName, objName, params, log, cb) {
|
||||||
const vObjName = formatVersionKey(objName, params.versionId);
|
const vObjName = formatVersionKey(objName, params.versionId);
|
||||||
// TODO: if backbeat is doing a delete, versionId should be pulled
|
|
||||||
// from existing metadata
|
|
||||||
const _vid = generateVersionId(this.replicationGroupId);
|
const _vid = generateVersionId(this.replicationGroupId);
|
||||||
const mst = generatePHDVersion(_vid);
|
const mst = generatePHDVersion(_vid);
|
||||||
c.bulkWrite([{
|
c.bulkWrite([{
|
||||||
|
@ -617,15 +559,9 @@ class MongoClientInterface {
|
||||||
_id: vObjName,
|
_id: vObjName,
|
||||||
}, {}, (err, result) => {
|
}, {}, (err, result) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
|
||||||
'findOneAndDelete: error when version is not master',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
if (result.ok !== 1) {
|
if (result.ok !== 1) {
|
||||||
log.error(
|
|
||||||
'findOneAndDelete: failed when version is not master',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
return cb(null);
|
return cb(null);
|
||||||
|
@ -643,8 +579,6 @@ class MongoClientInterface {
|
||||||
_id: objName,
|
_id: objName,
|
||||||
}, {}, (err, mst) => {
|
}, {}, (err, mst) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('deleteObjectVer: error deleting versioned object',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
if (!mst) {
|
if (!mst) {
|
||||||
|
@ -668,15 +602,9 @@ class MongoClientInterface {
|
||||||
_id: objName,
|
_id: objName,
|
||||||
}, {}, (err, result) => {
|
}, {}, (err, result) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
|
||||||
'deleteObjectNoVer: error deleting object with no version',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
if (result.ok !== 1) {
|
if (result.ok !== 1) {
|
||||||
log.error(
|
|
||||||
'deleteObjectNoVer: failed deleting object with no version',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
return cb(null);
|
return cb(null);
|
||||||
|
@ -699,8 +627,7 @@ class MongoClientInterface {
|
||||||
const requestParams = extension.genMDParams();
|
const requestParams = extension.genMDParams();
|
||||||
const c = this.getCollection(bucketName);
|
const c = this.getCollection(bucketName);
|
||||||
let cbDone = false;
|
let cbDone = false;
|
||||||
const stream = new MongoReadStream(c, requestParams,
|
const stream = new MongoReadStream(c, requestParams);
|
||||||
params.mongifiedSearch);
|
|
||||||
stream
|
stream
|
||||||
.on('data', e => {
|
.on('data', e => {
|
||||||
if (extension.filter(e) < 0) {
|
if (extension.filter(e) < 0) {
|
||||||
|
@ -716,8 +643,7 @@ class MongoClientInterface {
|
||||||
error: err.message,
|
error: err.message,
|
||||||
errorStack: err.stack,
|
errorStack: err.stack,
|
||||||
};
|
};
|
||||||
log.error(
|
log.error('error listing objects', logObj);
|
||||||
'internalListObject: error listing objects', logObj);
|
|
||||||
cb(errors.InternalError);
|
cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -745,8 +671,6 @@ class MongoClientInterface {
|
||||||
_id: __UUID,
|
_id: __UUID,
|
||||||
}, {}, (err, doc) => {
|
}, {}, (err, doc) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('readUUID: error reading UUID',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
if (!doc) {
|
if (!doc) {
|
||||||
|
@ -768,8 +692,6 @@ class MongoClientInterface {
|
||||||
// FIXME: define a KeyAlreadyExists error in Arsenal
|
// FIXME: define a KeyAlreadyExists error in Arsenal
|
||||||
return cb(errors.EntityAlreadyExists);
|
return cb(errors.EntityAlreadyExists);
|
||||||
}
|
}
|
||||||
log.error('writeUUIDIfNotExists: error writing UUID',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
// FIXME: shoud we check for result.ok === 1 ?
|
// FIXME: shoud we check for result.ok === 1 ?
|
||||||
|
@ -786,8 +708,6 @@ class MongoClientInterface {
|
||||||
this.writeUUIDIfNotExists(uuid, log, err => {
|
this.writeUUIDIfNotExists(uuid, log, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (err === errors.InternalError) {
|
if (err === errors.InternalError) {
|
||||||
log.error('getUUID: error getting UUID',
|
|
||||||
{ error: err.message });
|
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
return this.readUUID(log, cb);
|
return this.readUUID(log, cb);
|
||||||
|
|
Loading…
Reference in New Issue