Compare commits
36 Commits
developmen
...
ft/mongo-l
Author | SHA1 | Date |
---|---|---|
JianqinWang | 04520404dc | |
JianqinWang | a8848cee9a | |
JianqinWang | 934bdffa96 | |
JianqinWang | 7214ff3113 | |
JianqinWang | e8b722b7bb | |
JianqinWang | 40aef12063 | |
JianqinWang | c42f6bc4e2 | |
JianqinWang | 7410cf8f6a | |
JianqinWang | 15668cfd4e | |
JianqinWang | 9318b7207f | |
JianqinWang | 057f67a6f5 | |
JianqinWang | ee40267e2e | |
JianqinWang | fc33bc4591 | |
JianqinWang | 45bbeba92c | |
JianqinWang | b99d140db7 | |
JianqinWang | e4485f2638 | |
JianqinWang | deb87d53c8 | |
JianqinWang | bde1873c22 | |
JianqinWang | cec494d0a3 | |
JianqinWang | 2bbcddebe8 | |
JianqinWang | cc9d1d1307 | |
JianqinWang | 8653a52cb8 | |
JianqinWang | 7957db02fb | |
JianqinWang | ed98815cf2 | |
JianqinWang | 179345ea46 | |
JianqinWang | de0227231b | |
JianqinWang | d7b5fc67b3 | |
JianqinWang | 2f9e388c5e | |
JianqinWang | a79d466495 | |
JianqinWang | e159b1afd9 | |
Lauren Spiegel | dd4c149404 | |
Lauren Spiegel | fa9ce0a095 | |
Lauren Spiegel | 956dc28781 | |
Lauren Spiegel | 703aafeb72 | |
Lauren Spiegel | c023408704 | |
Lauren Spiegel | 790c3866b8 |
25
index.js
25
index.js
|
@ -72,14 +72,23 @@ module.exports = {
|
|||
},
|
||||
storage: {
|
||||
metadata: {
|
||||
MetadataFileServer:
|
||||
require('./lib/storage/metadata/file/MetadataFileServer'),
|
||||
MetadataFileClient:
|
||||
require('./lib/storage/metadata/file/MetadataFileClient'),
|
||||
LogConsumer:
|
||||
require('./lib/storage/metadata/bucketclient/LogConsumer'),
|
||||
MongoClientInterface:
|
||||
require('./lib/storage/metadata/mongoclient/MongoClientInterface'),
|
||||
bucketclient: {
|
||||
LogConsumer:
|
||||
require('./lib/storage/metadata/bucketclient/LogConsumer'),
|
||||
},
|
||||
file: {
|
||||
MetadataFileServer:
|
||||
require('./lib/storage/metadata/file/MetadataFileServer'),
|
||||
MetadataFileClient:
|
||||
require('./lib/storage/metadata/file/MetadataFileClient'),
|
||||
},
|
||||
mongoclient: {
|
||||
MongoClientInterface:
|
||||
require('./lib/storage/metadata/mongoclient/' +
|
||||
'MongoClientInterface'),
|
||||
LogConsumer:
|
||||
require('./lib/storage/metadata/mongoclient/LogConsumer'),
|
||||
},
|
||||
},
|
||||
data: {
|
||||
file: {
|
||||
|
|
|
@ -0,0 +1,334 @@
|
|||
'use strict'; // eslint-disable-line
|
||||
|
||||
const stream = require('stream');
|
||||
const MongoClient = require('mongodb').MongoClient;
|
||||
const { Timestamp, Long } = require('bson');
|
||||
|
||||
let lastEndID = undefined;
|
||||
|
||||
const ops = {
|
||||
i: 'put',
|
||||
u: 'put',
|
||||
d: 'delete',
|
||||
};
|
||||
|
||||
class ListRecordStream extends stream.Transform {
|
||||
constructor(logger) {
|
||||
super({ objectMode: true });
|
||||
this.logger = logger;
|
||||
this.hasStarted = false;
|
||||
this.start = undefined;
|
||||
this.end = undefined;
|
||||
this.lastUniqID = undefined;
|
||||
// this.unpublishedListing is true once we pass the oplog that has the
|
||||
// start seq timestamp and uniqID 'h'
|
||||
this.unpublishedListing = undefined;
|
||||
}
|
||||
|
||||
_transform(itemObj, encoding, callback) {
|
||||
if (!itemObj) {
|
||||
this.push(null);
|
||||
this.emit('info', {
|
||||
start: this.start,
|
||||
end: this.end,
|
||||
uniqID: this.lastUniqID,
|
||||
});
|
||||
return callback();
|
||||
}
|
||||
|
||||
// always update to most recent uniqID
|
||||
this.lastUniqID = itemObj.h.toString();
|
||||
|
||||
if (this.end === undefined || itemObj.ts.toNumber() > this.end) {
|
||||
this.end = itemObj.ts.toNumber();
|
||||
}
|
||||
|
||||
// only push to stream unpublished objects
|
||||
if (!this.unpublishedListing) {
|
||||
if (lastEndID === itemObj.h.toString()) {
|
||||
this.unpublishedListing = true;
|
||||
}
|
||||
return callback();
|
||||
}
|
||||
|
||||
if (!this.hasStarted) {
|
||||
this.hasStarted = true;
|
||||
this.start = itemObj.ts.toNumber();
|
||||
this.emit('info', {
|
||||
start: this.start,
|
||||
end: this.end,
|
||||
uniqId: this.lastUniqID,
|
||||
});
|
||||
}
|
||||
|
||||
// don't push oplogs that have already been sent
|
||||
if (!this.unpublishedListing) {
|
||||
return callback();
|
||||
}
|
||||
|
||||
const streamObject = {
|
||||
timestamp: new Date(itemObj.ts.high_ * 1000),
|
||||
db: itemObj.ns,
|
||||
entries: [
|
||||
{
|
||||
type: ops[itemObj.op],
|
||||
key: itemObj.o._id,
|
||||
value: JSON.stringify(itemObj.o.value),
|
||||
},
|
||||
],
|
||||
};
|
||||
return callback(null, streamObject);
|
||||
}
|
||||
|
||||
_flush(callback) {
|
||||
this.emit('info', {
|
||||
start: this.start,
|
||||
end: this.end,
|
||||
uniqID: this.lastUniqID,
|
||||
});
|
||||
this.push(null);
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @class
|
||||
* @classdesc Class to consume mongo oplog
|
||||
*/
|
||||
class LogConsumer {
|
||||
|
||||
/**
|
||||
* @constructor
|
||||
*
|
||||
* @param {string} host - string that is MongoDB host
|
||||
* @param {number} port - port at host name for MongoDB instance
|
||||
* @param {string} database - name of database to get replica set oplogs
|
||||
*/
|
||||
constructor(mongoConfig, logger) {
|
||||
const { host, port, database, writeConcern, replicaSet, readPreference } = mongoConfig;
|
||||
// need a way to connect to master
|
||||
// might have to use all replica set members in uri? http://mongodb.github.io/node-mongodb-native/3.0/tutorials/connect/
|
||||
|
||||
// can have second argument {ts: timestamp} to call only entries from some number
|
||||
// this ts gets translated to find({ts: {$gt: ts }}) in mongo language
|
||||
// but if have it here on instantiation, won't work. we are sending the sequence on
|
||||
// each read call.
|
||||
// so might be better to do query like this directly (without module)
|
||||
// https://github.com/cayasso/mongo-oplog/blob/master/src/stream.js#L28
|
||||
|
||||
// and then use the stream method in the same way to get just the number
|
||||
// of entries needed -- http://mongodb.github.io/node-mongodb-native/3.0/api/Cursor.html#stream
|
||||
|
||||
// 'local' is the database where MongoDB has oplogs.rs capped collection
|
||||
this.database = 'local';
|
||||
// this.mongoUrl = format(
|
||||
// 'mongodb://%s:%s/local',
|
||||
// host,
|
||||
// port);
|
||||
this.mongoUrl = 'mongodb://localhost:27018,localhost:27017,localhost:27019/local';
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to MongoClient using Mongo node module to access database and
|
||||
* database oplogs (operation logs)
|
||||
*
|
||||
* @param {function} done - callback function, called with an error object
|
||||
* or null and an object as 2nd parameter
|
||||
* @return {undefined}
|
||||
*/
|
||||
connectMongo(done) {
|
||||
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
|
||||
(err, client) => {
|
||||
if (err) {
|
||||
this.logger.error('Unable to connect to MongoDB', err);
|
||||
return done(err);
|
||||
}
|
||||
this.logger.info('connected to mongodb');
|
||||
this.client = client;
|
||||
this.db = client.db(this.database, {
|
||||
ignoreUndefined: true,
|
||||
});
|
||||
return done();
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Read a series of log records from mongo
|
||||
*
|
||||
* @param {Object} [params] - params object
|
||||
* @param {Number} [params.startSeq] - fetch starting from this
|
||||
* sequence number
|
||||
* @param {Number} [params.limit] - maximum number of log records
|
||||
* to return
|
||||
* @param {function} cb - callback function, called with an error
|
||||
* object or null and an object as 2nd parameter
|
||||
* // CONTINUE HERE!!!
|
||||
* object.info contains ...
|
||||
* object.log contains ... (see recordstream below) and _processPrepareEntries function
|
||||
* in backbeat. batchState.logRes is object.log from here.
|
||||
*
|
||||
* @return {undefined}
|
||||
*/
|
||||
readRecords(params, cb) {
|
||||
const recordStream = new ListRecordStream(this.logger);
|
||||
const limit = params.limit || 10000;
|
||||
const startIDandSeq = params.startSeq.toString().split('|');
|
||||
const startSeq = parseInt(startIDandSeq[0], 10) || 0;
|
||||
lastEndID = startIDandSeq[1];
|
||||
// need to somehow limit entries to limit and then stop the stream
|
||||
|
||||
this.coll = this.db.collection('oplog.rs');
|
||||
return this.coll.find({
|
||||
ns: /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/,
|
||||
ts: { $gte: Timestamp.fromNumber(startSeq) },
|
||||
}, {
|
||||
limit,
|
||||
tailable: false,
|
||||
awaitData: false,
|
||||
noCursorTimeout: true,
|
||||
OplogReplay: true,
|
||||
numberOfRetries: Number.MAX_VALUE,
|
||||
}, (err, res) => {
|
||||
// console.log('LOG RESPONSE', res);
|
||||
const stream = res.stream();
|
||||
stream.on('data', data => {
|
||||
recordStream.write(data);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
recordStream.write(undefined);
|
||||
});
|
||||
recordStream.once('info', info => {
|
||||
recordStream.removeAllListeners('error');
|
||||
cb(null, { info, log: recordStream });
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// const tester = new LogConsumer({ host: '127.0.0.1', port: ['27017'],
|
||||
// database: 'local', writeConcern: 'majority', replicaSet: 'rs0',
|
||||
// readPreference: 'primary' }, console);
|
||||
//
|
||||
// tester.connectMongo(() => {
|
||||
// return async.waterfall([
|
||||
// next => {
|
||||
// return tester.readRecords({ startSeq: 0, limit: 5 }, (err, res) => {
|
||||
// console.log('cb called for read records');
|
||||
// res.log.on('data', data => {
|
||||
// console.log('Streamed formatted data', data);
|
||||
// });
|
||||
// res.log.on('end', () => {
|
||||
// console.log('ENDED');
|
||||
// return next();
|
||||
// });
|
||||
// });
|
||||
// },
|
||||
// ], () => {});
|
||||
// });
|
||||
|
||||
module.exports = LogConsumer;
|
||||
|
||||
//
|
||||
// sample put:
|
||||
//
|
||||
// insert!! { ts: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1516323641 },
|
||||
// t: 1,
|
||||
// h: Long { _bsontype: 'Long', low_: -1207597211, high_: -1765392211 },
|
||||
// v: 2,
|
||||
// op: 'i',
|
||||
// ns: 'metadata.hello',
|
||||
// ui:
|
||||
// Binary {
|
||||
// _bsontype: 'Binary',
|
||||
// sub_type: 4,
|
||||
// position: 16,
|
||||
// buffer: <Buffer 66 bd 4e 7d 0b 92 4a 12 b8 cc 77 b1 26 8a 94 cb> },
|
||||
// wall: 2018-01-19T01:00:41.798Z,
|
||||
// o:
|
||||
// { _id: 'stuff4',
|
||||
// value:
|
||||
// { 'owner-display-name': 'Bart',
|
||||
// 'owner-id': '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be',
|
||||
// 'content-length': 508,
|
||||
// 'content-type': 'binary/octet-stream',
|
||||
// 'content-md5': '656c3fe292407ccfdc9b125b37cc1930',
|
||||
// 'x-amz-version-id': 'null',
|
||||
// 'x-amz-server-version-id': '',
|
||||
// 'x-amz-storage-class': 'STANDARD',
|
||||
// 'x-amz-server-side-encryption': '',
|
||||
// 'x-amz-server-side-encryption-aws-kms-key-id': '',
|
||||
// 'x-amz-server-side-encryption-customer-algorithm': '',
|
||||
// 'x-amz-website-redirect-location': '',
|
||||
// acl: [Object],
|
||||
// key: '',
|
||||
// location: [Object],
|
||||
// isDeleteMarker: false,
|
||||
// tags: {},
|
||||
// replicationInfo: [Object],
|
||||
// dataStoreName: 'us-east-1',
|
||||
// 'last-modified': '2018-01-19T01:00:41.797Z',
|
||||
// 'md-model-version': 3,
|
||||
// 'x-amz-meta-s3cmd-attrs': 'uid:501/gname:staff/uname:lhs/gid:20/mode:33188/mtime:1508801827/atime:1516323377/md5:656c3fe292407ccfdc9b125b37cc1930/ctime:1508801827' } } }
|
||||
|
||||
// sample update:
|
||||
//
|
||||
// update!! { ts: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1516323702 },
|
||||
// t: 1,
|
||||
// h: Long { _bsontype: 'Long', low_: -1991728232, high_: -721376083 },
|
||||
// v: 2,
|
||||
// op: 'u',
|
||||
// ns: 'metadata.hello',
|
||||
// ui:
|
||||
// Binary {
|
||||
// _bsontype: 'Binary',
|
||||
// sub_type: 4,
|
||||
// position: 16,
|
||||
// buffer: <Buffer 66 bd 4e 7d 0b 92 4a 12 b8 cc 77 b1 26 8a 94 cb> },
|
||||
// o2: { _id: 'stuff4' },
|
||||
// wall: 2018-01-19T01:01:42.050Z,
|
||||
// o:
|
||||
// { _id: 'stuff4',
|
||||
// value:
|
||||
// { 'owner-display-name': 'Bart',
|
||||
// 'owner-id': '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be',
|
||||
// 'content-length': 508,
|
||||
// 'content-type': 'binary/octet-stream',
|
||||
// 'content-md5': '656c3fe292407ccfdc9b125b37cc1930',
|
||||
// 'x-amz-version-id': 'null',
|
||||
// 'x-amz-server-version-id': '',
|
||||
// 'x-amz-storage-class': 'STANDARD',
|
||||
// 'x-amz-server-side-encryption': '',
|
||||
// 'x-amz-server-side-encryption-aws-kms-key-id': '',
|
||||
// 'x-amz-server-side-encryption-customer-algorithm': '',
|
||||
// 'x-amz-website-redirect-location': '',
|
||||
// acl: [Object],
|
||||
// key: '',
|
||||
// location: [Object],
|
||||
// isDeleteMarker: false,
|
||||
// tags: {},
|
||||
// replicationInfo: [Object],
|
||||
// dataStoreName: 'us-east-1',
|
||||
// 'last-modified': '2018-01-19T01:01:42.022Z',
|
||||
// 'md-model-version': 3,
|
||||
// 'x-amz-meta-s3cmd-attrs': 'uid:501/gname:staff/uname:lhs/gid:20/mode:33188/mtime:1508801827/atime:1516323641/md5:656c3fe292407ccfdc9b125b37cc1930/ctime:1508801827' } } }
|
||||
//
|
||||
//
|
||||
//
|
||||
// sample delete:
|
||||
//
|
||||
// delete!! { ts: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1516323746 },
|
||||
// t: 1,
|
||||
// h: Long { _bsontype: 'Long', low_: 2036516563, high_: 689237179 },
|
||||
// v: 2,
|
||||
// op: 'd',
|
||||
// ns: 'metadata.hello',
|
||||
// ui:
|
||||
// Binary {
|
||||
// _bsontype: 'Binary',
|
||||
// sub_type: 4,
|
||||
// position: 16,
|
||||
// buffer: <Buffer 66 bd 4e 7d 0b 92 4a 12 b8 cc 77 b1 26 8a 94 cb> },
|
||||
// wall: 2018-01-19T01:02:26.962Z,
|
||||
// o: { _id: 'stuff4' } }
|
||||
//
|
|
@ -34,8 +34,6 @@ const ASYNC_REPAIR_TIMEOUT = 15000;
|
|||
|
||||
let uidCounter = 0;
|
||||
|
||||
const format = require('util').format;
|
||||
|
||||
const VID_SEP = require('../../../versioning/constants')
|
||||
.VersioningConstants.VersionId.Separator;
|
||||
|
||||
|
@ -70,30 +68,20 @@ function generatePHDVersion(versionId) {
|
|||
* @param {Object} params - constructor params
|
||||
* @param {String} params.host - host for mongo
|
||||
* @param {String} params.port - port for mongo
|
||||
* @param {String} params.writeConcern - writeConcern for mongo
|
||||
* @param {String} params.replicaSet - replicaSet for mongo
|
||||
* @param {String} params.readPreference - readPreference for mongo
|
||||
* @param {String} params.replicationGroupId - replication group id
|
||||
* used here to generate version id's
|
||||
* // Does backbeat use this at all? Can we make this optional and
|
||||
* // set a default so when instantiate the client elsewhere don't need?
|
||||
* @param {String} params.database - name of database
|
||||
* @param {werelogs.Logger} params.logger - logger instance
|
||||
* @param {String} [params.path] - path for mongo volume
|
||||
* @param {String} [params.replicationGroupId] - replication group id
|
||||
* used here to generate version id's on puts. S3 instantiates the class
|
||||
* but backbeat does not
|
||||
*/
|
||||
class MongoClientInterface {
|
||||
|
||||
constructor(params) {
|
||||
const { host, port,
|
||||
writeConcern, replicaSet, readPreference,
|
||||
path, database, logger,
|
||||
const { host, port, path, database, logger,
|
||||
replicationGroupId } = params;
|
||||
const mongoUrl = format(
|
||||
'mongodb://%s:%s/w=%s&replicaSet=%s&readPreference=%s',
|
||||
host,
|
||||
port,
|
||||
writeConcern,
|
||||
replicaSet,
|
||||
readPreference);
|
||||
const mongoUrl =
|
||||
`mongodb://${host}:${port}`;
|
||||
this.logger = logger;
|
||||
this.client = null;
|
||||
this.db = null;
|
||||
|
@ -106,8 +94,6 @@ class MongoClientInterface {
|
|||
// initialize this backend
|
||||
MongoClient.connect(mongoUrl, (err, client) => {
|
||||
if (err) {
|
||||
this.logger.error('error connecting to mongo server',
|
||||
{ error: err.message });
|
||||
throw (errors.InternalError);
|
||||
}
|
||||
this.logger.debug('connected to mongodb');
|
||||
|
@ -171,9 +157,6 @@ class MongoClientInterface {
|
|||
_id: bucketName,
|
||||
}, {}, (err, doc) => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'getBucketAttributes: error getting bucket attributes',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (!doc) {
|
||||
|
@ -190,9 +173,6 @@ class MongoClientInterface {
|
|||
getBucketAndObject(bucketName, objName, params, log, cb) {
|
||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'getBucketAttributes: error getting bucket attributes',
|
||||
{ error: err.message });
|
||||
return cb(err);
|
||||
}
|
||||
this.getObject(bucketName, objName, params, log, (err, obj) => {
|
||||
|
@ -203,8 +183,6 @@ class MongoClientInterface {
|
|||
BucketInfo.fromObj(bucket).serialize(),
|
||||
});
|
||||
}
|
||||
log.error('getObject: error getting object',
|
||||
{ error: err.message });
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, {
|
||||
|
@ -242,13 +220,9 @@ class MongoClientInterface {
|
|||
_id: bucketName,
|
||||
}, {}, (err, result) => {
|
||||
if (err) {
|
||||
log.error('deleteBucketStep2: error deleting bucket',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (result.ok !== 1) {
|
||||
log.error('deleteBucketStep2: failed deleting bucket',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb(null);
|
||||
|
@ -272,8 +246,6 @@ class MongoClientInterface {
|
|||
if (err.codeName === 'NamespaceNotFound') {
|
||||
return this.deleteBucketStep2(bucketName, log, cb);
|
||||
}
|
||||
log.error('deleteBucket: error deleting bucket',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return this.deleteBucketStep2(bucketName, log, cb);
|
||||
|
@ -285,8 +257,6 @@ class MongoClientInterface {
|
|||
* sequentially create the object THEN update the master
|
||||
*/
|
||||
putObjectVerCase1(c, bucketName, objName, objVal, params, log, cb) {
|
||||
// TODO: if backbeat is doing a put, versionId should be pulled
|
||||
// from existing metadata
|
||||
const versionId = generateVersionId(this.replicationGroupId);
|
||||
// eslint-disable-next-line
|
||||
objVal.versionId = versionId;
|
||||
|
@ -321,8 +291,6 @@ class MongoClientInterface {
|
|||
* have been created with versions
|
||||
*/
|
||||
putObjectVerCase2(c, bucketName, objName, objVal, params, log, cb) {
|
||||
// TODO: if backbeat is doing a put, versionId should be pulled
|
||||
// from existing metadata
|
||||
const versionId = generateVersionId(this.replicationGroupId);
|
||||
// eslint-disable-next-line
|
||||
objVal.versionId = versionId;
|
||||
|
@ -383,15 +351,7 @@ class MongoClientInterface {
|
|||
value: objVal,
|
||||
}, {
|
||||
upsert: true,
|
||||
}, err => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'putObjectNoVer: error putting obect with no versioning',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb();
|
||||
});
|
||||
}, () => cb());
|
||||
}
|
||||
|
||||
putObject(bucketName, objName, objVal, params, log, cb) {
|
||||
|
@ -421,8 +381,6 @@ class MongoClientInterface {
|
|||
_id: objName,
|
||||
}, {}, (err, doc) => {
|
||||
if (err) {
|
||||
log.error('findOne: error getting object',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (!doc) {
|
||||
|
@ -431,8 +389,7 @@ class MongoClientInterface {
|
|||
if (doc.value.isPHD) {
|
||||
this.getLatestVersion(c, objName, log, (err, value) => {
|
||||
if (err) {
|
||||
log.error('getLatestVersion: getting latest version',
|
||||
{ error: err.message });
|
||||
log.error('getting latest version', err);
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, value);
|
||||
|
@ -461,9 +418,6 @@ class MongoClientInterface {
|
|||
toArray(
|
||||
(err, keys) => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'getLatestVersion: error getting latest version',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (keys.length === 0) {
|
||||
|
@ -493,13 +447,9 @@ class MongoClientInterface {
|
|||
upsert: true,
|
||||
}, (err, result) => {
|
||||
if (err) {
|
||||
log.error('repair: error trying to repair value',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (result.ok !== 1) {
|
||||
log.error('repair: failed trying to repair value',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb(null);
|
||||
|
@ -513,13 +463,12 @@ class MongoClientInterface {
|
|||
asyncRepair(c, objName, mst, log) {
|
||||
this.getLatestVersion(c, objName, log, (err, value) => {
|
||||
if (err) {
|
||||
log.error('async-repair: getting latest version',
|
||||
{ error: err.message });
|
||||
log.error('async-repair: getting latest version', err);
|
||||
return undefined;
|
||||
}
|
||||
this.repair(c, objName, value, mst, log, err => {
|
||||
if (err) {
|
||||
log.error('async-repair failed', { error: err.message });
|
||||
log.error('async-repair failed', err);
|
||||
return undefined;
|
||||
}
|
||||
log.debug('async-repair success');
|
||||
|
@ -550,9 +499,6 @@ class MongoClientInterface {
|
|||
'value.versionId': mst.versionId,
|
||||
}, {}, err => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'findOneAndDelete: error finding and deleting',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
// do not test result.ok === 1 because
|
||||
|
@ -561,8 +507,6 @@ class MongoClientInterface {
|
|||
});
|
||||
return undefined;
|
||||
}
|
||||
log.error('getLatestVersion: error getting latest version',
|
||||
{ error: err.message });
|
||||
return cb(err);
|
||||
}
|
||||
// We have other versions available so repair:
|
||||
|
@ -582,8 +526,6 @@ class MongoClientInterface {
|
|||
*/
|
||||
deleteObjectVerMaster(c, bucketName, objName, params, log, cb) {
|
||||
const vObjName = formatVersionKey(objName, params.versionId);
|
||||
// TODO: if backbeat is doing a delete, versionId should be pulled
|
||||
// from existing metadata
|
||||
const _vid = generateVersionId(this.replicationGroupId);
|
||||
const mst = generatePHDVersion(_vid);
|
||||
c.bulkWrite([{
|
||||
|
@ -617,15 +559,9 @@ class MongoClientInterface {
|
|||
_id: vObjName,
|
||||
}, {}, (err, result) => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'findOneAndDelete: error when version is not master',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (result.ok !== 1) {
|
||||
log.error(
|
||||
'findOneAndDelete: failed when version is not master',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb(null);
|
||||
|
@ -643,8 +579,6 @@ class MongoClientInterface {
|
|||
_id: objName,
|
||||
}, {}, (err, mst) => {
|
||||
if (err) {
|
||||
log.error('deleteObjectVer: error deleting versioned object',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (!mst) {
|
||||
|
@ -668,15 +602,9 @@ class MongoClientInterface {
|
|||
_id: objName,
|
||||
}, {}, (err, result) => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'deleteObjectNoVer: error deleting object with no version',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (result.ok !== 1) {
|
||||
log.error(
|
||||
'deleteObjectNoVer: failed deleting object with no version',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb(null);
|
||||
|
@ -699,8 +627,7 @@ class MongoClientInterface {
|
|||
const requestParams = extension.genMDParams();
|
||||
const c = this.getCollection(bucketName);
|
||||
let cbDone = false;
|
||||
const stream = new MongoReadStream(c, requestParams,
|
||||
params.mongifiedSearch);
|
||||
const stream = new MongoReadStream(c, requestParams);
|
||||
stream
|
||||
.on('data', e => {
|
||||
if (extension.filter(e) < 0) {
|
||||
|
@ -716,8 +643,7 @@ class MongoClientInterface {
|
|||
error: err.message,
|
||||
errorStack: err.stack,
|
||||
};
|
||||
log.error(
|
||||
'internalListObject: error listing objects', logObj);
|
||||
log.error('error listing objects', logObj);
|
||||
cb(errors.InternalError);
|
||||
}
|
||||
})
|
||||
|
@ -745,8 +671,6 @@ class MongoClientInterface {
|
|||
_id: __UUID,
|
||||
}, {}, (err, doc) => {
|
||||
if (err) {
|
||||
log.error('readUUID: error reading UUID',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (!doc) {
|
||||
|
@ -768,8 +692,6 @@ class MongoClientInterface {
|
|||
// FIXME: define a KeyAlreadyExists error in Arsenal
|
||||
return cb(errors.EntityAlreadyExists);
|
||||
}
|
||||
log.error('writeUUIDIfNotExists: error writing UUID',
|
||||
{ error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
// FIXME: shoud we check for result.ok === 1 ?
|
||||
|
@ -786,8 +708,6 @@ class MongoClientInterface {
|
|||
this.writeUUIDIfNotExists(uuid, log, err => {
|
||||
if (err) {
|
||||
if (err === errors.InternalError) {
|
||||
log.error('getUUID: error getting UUID',
|
||||
{ error: err.message });
|
||||
return cb(err);
|
||||
}
|
||||
return this.readUUID(log, cb);
|
||||
|
|
Loading…
Reference in New Issue