Compare commits

...

36 Commits

Author SHA1 Message Date
JianqinWang 04520404dc remove unused module 2018-02-09 14:00:08 -08:00
JianqinWang a8848cee9a minor text edits - niantic 2018-02-08 18:01:03 -08:00
JianqinWang 934bdffa96 IT FILTERS OUT PREVIOUSLY ADDED OPLOGS!!!" 2018-02-08 17:53:36 -08:00
JianqinWang 7214ff3113 parse until last uniqID 2018-02-08 17:21:07 -08:00
JianqinWang e8b722b7bb pass uniqID 2018-02-08 16:10:15 -08:00
JianqinWang 40aef12063 start stream at beginning 2018-02-08 15:23:47 -08:00
JianqinWang c42f6bc4e2 write undefined will trigger info 2018-02-08 14:27:55 -08:00
JianqinWang 7410cf8f6a put 2018-02-08 12:37:49 -08:00
JianqinWang 15668cfd4e update new methods 2018-02-08 12:37:49 -08:00
JianqinWang 9318b7207f remove incorrect console.log 2018-02-08 12:37:49 -08:00
JianqinWang 057f67a6f5 fix method 2018-02-08 12:37:49 -08:00
JianqinWang ee40267e2e send info after finish stream 2018-02-08 12:37:49 -08:00
JianqinWang fc33bc4591 include bson npm dependency 2018-02-08 12:37:49 -08:00
JianqinWang 45bbeba92c works for backbeat 2018-02-08 12:37:49 -08:00
JianqinWang b99d140db7 update stream format 2018-02-08 12:37:49 -08:00
JianqinWang e4485f2638 fix info 2018-02-08 12:37:49 -08:00
JianqinWang deb87d53c8 base off of RecordLog 2018-02-08 12:37:49 -08:00
JianqinWang bde1873c22 don't close the record stream 2018-02-08 12:37:49 -08:00
JianqinWang cec494d0a3 readd stuff 2018-02-08 12:37:49 -08:00
JianqinWang 2bbcddebe8 slight change in format 2018-02-08 12:37:49 -08:00
JianqinWang cc9d1d1307 don't print out r es 2018-02-08 12:37:49 -08:00
JianqinWang 8653a52cb8 console.log statements 2018-02-08 12:37:49 -08:00
JianqinWang 7957db02fb hide console.log 2018-02-08 12:37:49 -08:00
JianqinWang ed98815cf2 slight change in format 2018-02-08 12:37:49 -08:00
JianqinWang 179345ea46 update index.js file 2018-02-08 12:37:49 -08:00
JianqinWang de0227231b more cleanup 2018-02-08 12:37:49 -08:00
JianqinWang d7b5fc67b3 wip: cleaning p 2018-02-08 12:37:49 -08:00
JianqinWang 2f9e388c5e update logconsumer readRecord 2018-02-08 12:37:49 -08:00
JianqinWang a79d466495 wip: add working mongodb tailer to LogConsumer 2018-02-08 12:37:49 -08:00
JianqinWang e159b1afd9 working oplog tailer! 2018-02-08 12:37:49 -08:00
Lauren Spiegel dd4c149404 SQUASH - play with oplog 2018-02-08 12:37:49 -08:00
Lauren Spiegel fa9ce0a095 WIP: Implement mongo log reader 2018-02-08 12:37:49 -08:00
Lauren Spiegel 956dc28781 DROP ME: bump arsenal version 2018-02-08 12:37:49 -08:00
Lauren Spiegel 703aafeb72 CHORE: Change filename 2018-02-08 12:37:31 -08:00
Lauren Spiegel c023408704 Changes to client due to move 2018-02-08 12:37:10 -08:00
Lauren Spiegel 790c3866b8 Move mongoclient from s3 to arsenal 2018-02-08 12:37:10 -08:00
3 changed files with 364 additions and 101 deletions

View File

@ -72,14 +72,23 @@ module.exports = {
}, },
storage: { storage: {
metadata: { metadata: {
MetadataFileServer: bucketclient: {
require('./lib/storage/metadata/file/MetadataFileServer'), LogConsumer:
MetadataFileClient: require('./lib/storage/metadata/bucketclient/LogConsumer'),
require('./lib/storage/metadata/file/MetadataFileClient'), },
LogConsumer: file: {
require('./lib/storage/metadata/bucketclient/LogConsumer'), MetadataFileServer:
MongoClientInterface: require('./lib/storage/metadata/file/MetadataFileServer'),
require('./lib/storage/metadata/mongoclient/MongoClientInterface'), MetadataFileClient:
require('./lib/storage/metadata/file/MetadataFileClient'),
},
mongoclient: {
MongoClientInterface:
require('./lib/storage/metadata/mongoclient/' +
'MongoClientInterface'),
LogConsumer:
require('./lib/storage/metadata/mongoclient/LogConsumer'),
},
}, },
data: { data: {
file: { file: {

View File

@ -0,0 +1,334 @@
'use strict'; // eslint-disable-line
const stream = require('stream');
const MongoClient = require('mongodb').MongoClient;
const { Timestamp, Long } = require('bson');
let lastEndID = undefined;
const ops = {
i: 'put',
u: 'put',
d: 'delete',
};
class ListRecordStream extends stream.Transform {
constructor(logger) {
super({ objectMode: true });
this.logger = logger;
this.hasStarted = false;
this.start = undefined;
this.end = undefined;
this.lastUniqID = undefined;
// this.unpublishedListing is true once we pass the oplog that has the
// start seq timestamp and uniqID 'h'
this.unpublishedListing = undefined;
}
_transform(itemObj, encoding, callback) {
if (!itemObj) {
this.push(null);
this.emit('info', {
start: this.start,
end: this.end,
uniqID: this.lastUniqID,
});
return callback();
}
// always update to most recent uniqID
this.lastUniqID = itemObj.h.toString();
if (this.end === undefined || itemObj.ts.toNumber() > this.end) {
this.end = itemObj.ts.toNumber();
}
// only push to stream unpublished objects
if (!this.unpublishedListing) {
if (lastEndID === itemObj.h.toString()) {
this.unpublishedListing = true;
}
return callback();
}
if (!this.hasStarted) {
this.hasStarted = true;
this.start = itemObj.ts.toNumber();
this.emit('info', {
start: this.start,
end: this.end,
uniqId: this.lastUniqID,
});
}
// don't push oplogs that have already been sent
if (!this.unpublishedListing) {
return callback();
}
const streamObject = {
timestamp: new Date(itemObj.ts.high_ * 1000),
db: itemObj.ns,
entries: [
{
type: ops[itemObj.op],
key: itemObj.o._id,
value: JSON.stringify(itemObj.o.value),
},
],
};
return callback(null, streamObject);
}
_flush(callback) {
this.emit('info', {
start: this.start,
end: this.end,
uniqID: this.lastUniqID,
});
this.push(null);
callback();
}
}
/**
* @class
* @classdesc Class to consume mongo oplog
*/
class LogConsumer {
/**
* @constructor
*
* @param {string} host - string that is MongoDB host
* @param {number} port - port at host name for MongoDB instance
* @param {string} database - name of database to get replica set oplogs
*/
constructor(mongoConfig, logger) {
const { host, port, database, writeConcern, replicaSet, readPreference } = mongoConfig;
// need a way to connect to master
// might have to use all replica set members in uri? http://mongodb.github.io/node-mongodb-native/3.0/tutorials/connect/
// can have second argument {ts: timestamp} to call only entries from some number
// this ts gets translated to find({ts: {$gt: ts }}) in mongo language
// but if have it here on instantiation, won't work. we are sending the sequence on
// each read call.
// so might be better to do query like this directly (without module)
// https://github.com/cayasso/mongo-oplog/blob/master/src/stream.js#L28
// and then use the stream method in the same way to get just the number
// of entries needed -- http://mongodb.github.io/node-mongodb-native/3.0/api/Cursor.html#stream
// 'local' is the database where MongoDB has oplogs.rs capped collection
this.database = 'local';
// this.mongoUrl = format(
// 'mongodb://%s:%s/local',
// host,
// port);
this.mongoUrl = 'mongodb://localhost:27018,localhost:27017,localhost:27019/local';
this.logger = logger;
}
/**
* Connect to MongoClient using Mongo node module to access database and
* database oplogs (operation logs)
*
* @param {function} done - callback function, called with an error object
* or null and an object as 2nd parameter
* @return {undefined}
*/
connectMongo(done) {
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
(err, client) => {
if (err) {
this.logger.error('Unable to connect to MongoDB', err);
return done(err);
}
this.logger.info('connected to mongodb');
this.client = client;
this.db = client.db(this.database, {
ignoreUndefined: true,
});
return done();
});
}
/**
* Read a series of log records from mongo
*
* @param {Object} [params] - params object
* @param {Number} [params.startSeq] - fetch starting from this
* sequence number
* @param {Number} [params.limit] - maximum number of log records
* to return
* @param {function} cb - callback function, called with an error
* object or null and an object as 2nd parameter
* // CONTINUE HERE!!!
* object.info contains ...
* object.log contains ... (see recordstream below) and _processPrepareEntries function
* in backbeat. batchState.logRes is object.log from here.
*
* @return {undefined}
*/
readRecords(params, cb) {
const recordStream = new ListRecordStream(this.logger);
const limit = params.limit || 10000;
const startIDandSeq = params.startSeq.toString().split('|');
const startSeq = parseInt(startIDandSeq[0], 10) || 0;
lastEndID = startIDandSeq[1];
// need to somehow limit entries to limit and then stop the stream
this.coll = this.db.collection('oplog.rs');
return this.coll.find({
ns: /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/,
ts: { $gte: Timestamp.fromNumber(startSeq) },
}, {
limit,
tailable: false,
awaitData: false,
noCursorTimeout: true,
OplogReplay: true,
numberOfRetries: Number.MAX_VALUE,
}, (err, res) => {
// console.log('LOG RESPONSE', res);
const stream = res.stream();
stream.on('data', data => {
recordStream.write(data);
});
stream.on('end', () => {
recordStream.write(undefined);
});
recordStream.once('info', info => {
recordStream.removeAllListeners('error');
cb(null, { info, log: recordStream });
});
return undefined;
});
}
}
// const tester = new LogConsumer({ host: '127.0.0.1', port: ['27017'],
// database: 'local', writeConcern: 'majority', replicaSet: 'rs0',
// readPreference: 'primary' }, console);
//
// tester.connectMongo(() => {
// return async.waterfall([
// next => {
// return tester.readRecords({ startSeq: 0, limit: 5 }, (err, res) => {
// console.log('cb called for read records');
// res.log.on('data', data => {
// console.log('Streamed formatted data', data);
// });
// res.log.on('end', () => {
// console.log('ENDED');
// return next();
// });
// });
// },
// ], () => {});
// });
module.exports = LogConsumer;
//
// sample put:
//
// insert!! { ts: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1516323641 },
// t: 1,
// h: Long { _bsontype: 'Long', low_: -1207597211, high_: -1765392211 },
// v: 2,
// op: 'i',
// ns: 'metadata.hello',
// ui:
// Binary {
// _bsontype: 'Binary',
// sub_type: 4,
// position: 16,
// buffer: <Buffer 66 bd 4e 7d 0b 92 4a 12 b8 cc 77 b1 26 8a 94 cb> },
// wall: 2018-01-19T01:00:41.798Z,
// o:
// { _id: 'stuff4',
// value:
// { 'owner-display-name': 'Bart',
// 'owner-id': '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be',
// 'content-length': 508,
// 'content-type': 'binary/octet-stream',
// 'content-md5': '656c3fe292407ccfdc9b125b37cc1930',
// 'x-amz-version-id': 'null',
// 'x-amz-server-version-id': '',
// 'x-amz-storage-class': 'STANDARD',
// 'x-amz-server-side-encryption': '',
// 'x-amz-server-side-encryption-aws-kms-key-id': '',
// 'x-amz-server-side-encryption-customer-algorithm': '',
// 'x-amz-website-redirect-location': '',
// acl: [Object],
// key: '',
// location: [Object],
// isDeleteMarker: false,
// tags: {},
// replicationInfo: [Object],
// dataStoreName: 'us-east-1',
// 'last-modified': '2018-01-19T01:00:41.797Z',
// 'md-model-version': 3,
// 'x-amz-meta-s3cmd-attrs': 'uid:501/gname:staff/uname:lhs/gid:20/mode:33188/mtime:1508801827/atime:1516323377/md5:656c3fe292407ccfdc9b125b37cc1930/ctime:1508801827' } } }
// sample update:
//
// update!! { ts: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1516323702 },
// t: 1,
// h: Long { _bsontype: 'Long', low_: -1991728232, high_: -721376083 },
// v: 2,
// op: 'u',
// ns: 'metadata.hello',
// ui:
// Binary {
// _bsontype: 'Binary',
// sub_type: 4,
// position: 16,
// buffer: <Buffer 66 bd 4e 7d 0b 92 4a 12 b8 cc 77 b1 26 8a 94 cb> },
// o2: { _id: 'stuff4' },
// wall: 2018-01-19T01:01:42.050Z,
// o:
// { _id: 'stuff4',
// value:
// { 'owner-display-name': 'Bart',
// 'owner-id': '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be',
// 'content-length': 508,
// 'content-type': 'binary/octet-stream',
// 'content-md5': '656c3fe292407ccfdc9b125b37cc1930',
// 'x-amz-version-id': 'null',
// 'x-amz-server-version-id': '',
// 'x-amz-storage-class': 'STANDARD',
// 'x-amz-server-side-encryption': '',
// 'x-amz-server-side-encryption-aws-kms-key-id': '',
// 'x-amz-server-side-encryption-customer-algorithm': '',
// 'x-amz-website-redirect-location': '',
// acl: [Object],
// key: '',
// location: [Object],
// isDeleteMarker: false,
// tags: {},
// replicationInfo: [Object],
// dataStoreName: 'us-east-1',
// 'last-modified': '2018-01-19T01:01:42.022Z',
// 'md-model-version': 3,
// 'x-amz-meta-s3cmd-attrs': 'uid:501/gname:staff/uname:lhs/gid:20/mode:33188/mtime:1508801827/atime:1516323641/md5:656c3fe292407ccfdc9b125b37cc1930/ctime:1508801827' } } }
//
//
//
// sample delete:
//
// delete!! { ts: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1516323746 },
// t: 1,
// h: Long { _bsontype: 'Long', low_: 2036516563, high_: 689237179 },
// v: 2,
// op: 'd',
// ns: 'metadata.hello',
// ui:
// Binary {
// _bsontype: 'Binary',
// sub_type: 4,
// position: 16,
// buffer: <Buffer 66 bd 4e 7d 0b 92 4a 12 b8 cc 77 b1 26 8a 94 cb> },
// wall: 2018-01-19T01:02:26.962Z,
// o: { _id: 'stuff4' } }
//

View File

@ -34,8 +34,6 @@ const ASYNC_REPAIR_TIMEOUT = 15000;
let uidCounter = 0; let uidCounter = 0;
const format = require('util').format;
const VID_SEP = require('../../../versioning/constants') const VID_SEP = require('../../../versioning/constants')
.VersioningConstants.VersionId.Separator; .VersioningConstants.VersionId.Separator;
@ -70,30 +68,20 @@ function generatePHDVersion(versionId) {
* @param {Object} params - constructor params * @param {Object} params - constructor params
* @param {String} params.host - host for mongo * @param {String} params.host - host for mongo
* @param {String} params.port - port for mongo * @param {String} params.port - port for mongo
* @param {String} params.writeConcern - writeConcern for mongo * @param {String} params.replicationGroupId - replication group id
* @param {String} params.replicaSet - replicaSet for mongo * used here to generate version id's
* @param {String} params.readPreference - readPreference for mongo * // Does backbeat use this at all? Can we make this optional and
* // set a default so when instantiate the client elsewhere don't need?
* @param {String} params.database - name of database * @param {String} params.database - name of database
* @param {werelogs.Logger} params.logger - logger instance * @param {werelogs.Logger} params.logger - logger instance
* @param {String} [params.path] - path for mongo volume * @param {String} [params.path] - path for mongo volume
* @param {String} [params.replicationGroupId] - replication group id
* used here to generate version id's on puts. S3 instantiates the class
* but backbeat does not
*/ */
class MongoClientInterface { class MongoClientInterface {
constructor(params) { constructor(params) {
const { host, port, const { host, port, path, database, logger,
writeConcern, replicaSet, readPreference,
path, database, logger,
replicationGroupId } = params; replicationGroupId } = params;
const mongoUrl = format( const mongoUrl =
'mongodb://%s:%s/w=%s&replicaSet=%s&readPreference=%s', `mongodb://${host}:${port}`;
host,
port,
writeConcern,
replicaSet,
readPreference);
this.logger = logger; this.logger = logger;
this.client = null; this.client = null;
this.db = null; this.db = null;
@ -106,8 +94,6 @@ class MongoClientInterface {
// initialize this backend // initialize this backend
MongoClient.connect(mongoUrl, (err, client) => { MongoClient.connect(mongoUrl, (err, client) => {
if (err) { if (err) {
this.logger.error('error connecting to mongo server',
{ error: err.message });
throw (errors.InternalError); throw (errors.InternalError);
} }
this.logger.debug('connected to mongodb'); this.logger.debug('connected to mongodb');
@ -171,9 +157,6 @@ class MongoClientInterface {
_id: bucketName, _id: bucketName,
}, {}, (err, doc) => { }, {}, (err, doc) => {
if (err) { if (err) {
log.error(
'getBucketAttributes: error getting bucket attributes',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (!doc) { if (!doc) {
@ -190,9 +173,6 @@ class MongoClientInterface {
getBucketAndObject(bucketName, objName, params, log, cb) { getBucketAndObject(bucketName, objName, params, log, cb) {
this.getBucketAttributes(bucketName, log, (err, bucket) => { this.getBucketAttributes(bucketName, log, (err, bucket) => {
if (err) { if (err) {
log.error(
'getBucketAttributes: error getting bucket attributes',
{ error: err.message });
return cb(err); return cb(err);
} }
this.getObject(bucketName, objName, params, log, (err, obj) => { this.getObject(bucketName, objName, params, log, (err, obj) => {
@ -203,8 +183,6 @@ class MongoClientInterface {
BucketInfo.fromObj(bucket).serialize(), BucketInfo.fromObj(bucket).serialize(),
}); });
} }
log.error('getObject: error getting object',
{ error: err.message });
return cb(err); return cb(err);
} }
return cb(null, { return cb(null, {
@ -242,13 +220,9 @@ class MongoClientInterface {
_id: bucketName, _id: bucketName,
}, {}, (err, result) => { }, {}, (err, result) => {
if (err) { if (err) {
log.error('deleteBucketStep2: error deleting bucket',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (result.ok !== 1) { if (result.ok !== 1) {
log.error('deleteBucketStep2: failed deleting bucket',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return cb(null); return cb(null);
@ -272,8 +246,6 @@ class MongoClientInterface {
if (err.codeName === 'NamespaceNotFound') { if (err.codeName === 'NamespaceNotFound') {
return this.deleteBucketStep2(bucketName, log, cb); return this.deleteBucketStep2(bucketName, log, cb);
} }
log.error('deleteBucket: error deleting bucket',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return this.deleteBucketStep2(bucketName, log, cb); return this.deleteBucketStep2(bucketName, log, cb);
@ -285,8 +257,6 @@ class MongoClientInterface {
* sequentially create the object THEN update the master * sequentially create the object THEN update the master
*/ */
putObjectVerCase1(c, bucketName, objName, objVal, params, log, cb) { putObjectVerCase1(c, bucketName, objName, objVal, params, log, cb) {
// TODO: if backbeat is doing a put, versionId should be pulled
// from existing metadata
const versionId = generateVersionId(this.replicationGroupId); const versionId = generateVersionId(this.replicationGroupId);
// eslint-disable-next-line // eslint-disable-next-line
objVal.versionId = versionId; objVal.versionId = versionId;
@ -321,8 +291,6 @@ class MongoClientInterface {
* have been created with versions * have been created with versions
*/ */
putObjectVerCase2(c, bucketName, objName, objVal, params, log, cb) { putObjectVerCase2(c, bucketName, objName, objVal, params, log, cb) {
// TODO: if backbeat is doing a put, versionId should be pulled
// from existing metadata
const versionId = generateVersionId(this.replicationGroupId); const versionId = generateVersionId(this.replicationGroupId);
// eslint-disable-next-line // eslint-disable-next-line
objVal.versionId = versionId; objVal.versionId = versionId;
@ -383,15 +351,7 @@ class MongoClientInterface {
value: objVal, value: objVal,
}, { }, {
upsert: true, upsert: true,
}, err => { }, () => cb());
if (err) {
log.error(
'putObjectNoVer: error putting obect with no versioning',
{ error: err.message });
return cb(errors.InternalError);
}
return cb();
});
} }
putObject(bucketName, objName, objVal, params, log, cb) { putObject(bucketName, objName, objVal, params, log, cb) {
@ -421,8 +381,6 @@ class MongoClientInterface {
_id: objName, _id: objName,
}, {}, (err, doc) => { }, {}, (err, doc) => {
if (err) { if (err) {
log.error('findOne: error getting object',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (!doc) { if (!doc) {
@ -431,8 +389,7 @@ class MongoClientInterface {
if (doc.value.isPHD) { if (doc.value.isPHD) {
this.getLatestVersion(c, objName, log, (err, value) => { this.getLatestVersion(c, objName, log, (err, value) => {
if (err) { if (err) {
log.error('getLatestVersion: getting latest version', log.error('getting latest version', err);
{ error: err.message });
return cb(err); return cb(err);
} }
return cb(null, value); return cb(null, value);
@ -461,9 +418,6 @@ class MongoClientInterface {
toArray( toArray(
(err, keys) => { (err, keys) => {
if (err) { if (err) {
log.error(
'getLatestVersion: error getting latest version',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (keys.length === 0) { if (keys.length === 0) {
@ -493,13 +447,9 @@ class MongoClientInterface {
upsert: true, upsert: true,
}, (err, result) => { }, (err, result) => {
if (err) { if (err) {
log.error('repair: error trying to repair value',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (result.ok !== 1) { if (result.ok !== 1) {
log.error('repair: failed trying to repair value',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return cb(null); return cb(null);
@ -513,13 +463,12 @@ class MongoClientInterface {
asyncRepair(c, objName, mst, log) { asyncRepair(c, objName, mst, log) {
this.getLatestVersion(c, objName, log, (err, value) => { this.getLatestVersion(c, objName, log, (err, value) => {
if (err) { if (err) {
log.error('async-repair: getting latest version', log.error('async-repair: getting latest version', err);
{ error: err.message });
return undefined; return undefined;
} }
this.repair(c, objName, value, mst, log, err => { this.repair(c, objName, value, mst, log, err => {
if (err) { if (err) {
log.error('async-repair failed', { error: err.message }); log.error('async-repair failed', err);
return undefined; return undefined;
} }
log.debug('async-repair success'); log.debug('async-repair success');
@ -550,9 +499,6 @@ class MongoClientInterface {
'value.versionId': mst.versionId, 'value.versionId': mst.versionId,
}, {}, err => { }, {}, err => {
if (err) { if (err) {
log.error(
'findOneAndDelete: error finding and deleting',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
// do not test result.ok === 1 because // do not test result.ok === 1 because
@ -561,8 +507,6 @@ class MongoClientInterface {
}); });
return undefined; return undefined;
} }
log.error('getLatestVersion: error getting latest version',
{ error: err.message });
return cb(err); return cb(err);
} }
// We have other versions available so repair: // We have other versions available so repair:
@ -582,8 +526,6 @@ class MongoClientInterface {
*/ */
deleteObjectVerMaster(c, bucketName, objName, params, log, cb) { deleteObjectVerMaster(c, bucketName, objName, params, log, cb) {
const vObjName = formatVersionKey(objName, params.versionId); const vObjName = formatVersionKey(objName, params.versionId);
// TODO: if backbeat is doing a delete, versionId should be pulled
// from existing metadata
const _vid = generateVersionId(this.replicationGroupId); const _vid = generateVersionId(this.replicationGroupId);
const mst = generatePHDVersion(_vid); const mst = generatePHDVersion(_vid);
c.bulkWrite([{ c.bulkWrite([{
@ -617,15 +559,9 @@ class MongoClientInterface {
_id: vObjName, _id: vObjName,
}, {}, (err, result) => { }, {}, (err, result) => {
if (err) { if (err) {
log.error(
'findOneAndDelete: error when version is not master',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (result.ok !== 1) { if (result.ok !== 1) {
log.error(
'findOneAndDelete: failed when version is not master',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return cb(null); return cb(null);
@ -643,8 +579,6 @@ class MongoClientInterface {
_id: objName, _id: objName,
}, {}, (err, mst) => { }, {}, (err, mst) => {
if (err) { if (err) {
log.error('deleteObjectVer: error deleting versioned object',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (!mst) { if (!mst) {
@ -668,15 +602,9 @@ class MongoClientInterface {
_id: objName, _id: objName,
}, {}, (err, result) => { }, {}, (err, result) => {
if (err) { if (err) {
log.error(
'deleteObjectNoVer: error deleting object with no version',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (result.ok !== 1) { if (result.ok !== 1) {
log.error(
'deleteObjectNoVer: failed deleting object with no version',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return cb(null); return cb(null);
@ -699,8 +627,7 @@ class MongoClientInterface {
const requestParams = extension.genMDParams(); const requestParams = extension.genMDParams();
const c = this.getCollection(bucketName); const c = this.getCollection(bucketName);
let cbDone = false; let cbDone = false;
const stream = new MongoReadStream(c, requestParams, const stream = new MongoReadStream(c, requestParams);
params.mongifiedSearch);
stream stream
.on('data', e => { .on('data', e => {
if (extension.filter(e) < 0) { if (extension.filter(e) < 0) {
@ -716,8 +643,7 @@ class MongoClientInterface {
error: err.message, error: err.message,
errorStack: err.stack, errorStack: err.stack,
}; };
log.error( log.error('error listing objects', logObj);
'internalListObject: error listing objects', logObj);
cb(errors.InternalError); cb(errors.InternalError);
} }
}) })
@ -745,8 +671,6 @@ class MongoClientInterface {
_id: __UUID, _id: __UUID,
}, {}, (err, doc) => { }, {}, (err, doc) => {
if (err) { if (err) {
log.error('readUUID: error reading UUID',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (!doc) { if (!doc) {
@ -768,8 +692,6 @@ class MongoClientInterface {
// FIXME: define a KeyAlreadyExists error in Arsenal // FIXME: define a KeyAlreadyExists error in Arsenal
return cb(errors.EntityAlreadyExists); return cb(errors.EntityAlreadyExists);
} }
log.error('writeUUIDIfNotExists: error writing UUID',
{ error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
// FIXME: shoud we check for result.ok === 1 ? // FIXME: shoud we check for result.ok === 1 ?
@ -786,8 +708,6 @@ class MongoClientInterface {
this.writeUUIDIfNotExists(uuid, log, err => { this.writeUUIDIfNotExists(uuid, log, err => {
if (err) { if (err) {
if (err === errors.InternalError) { if (err === errors.InternalError) {
log.error('getUUID: error getting UUID',
{ error: err.message });
return cb(err); return cb(err);
} }
return this.readUUID(log, cb); return this.readUUID(log, cb);