Compare commits

...

4 Commits

Author SHA1 Message Date
Rached Ben Mustapha bbec02c631 debug 2018-04-01 22:24:37 -07:00
Rached Ben Mustapha b0b0e453a6 Skip MPU shadow buckets 2018-04-01 02:20:02 -07:00
Rached Ben Mustapha 0a33954385 Fix mongodb log consumer initial values
Pre-existing LogConsumer contract uses `null` for initial values,
`undefined` breaks client code assumptions.
2018-03-31 17:01:28 -07:00
Rached Ben Mustapha c896d59482 dropme: bump version 2018-03-31 17:01:28 -07:00
3 changed files with 12 additions and 9 deletions

View File

@ -4,7 +4,7 @@ const stream = require('stream');
const MongoClient = require('mongodb').MongoClient; const MongoClient = require('mongodb').MongoClient;
const { Timestamp } = require('bson'); const { Timestamp } = require('bson');
let lastEndID = undefined; let lastEndID = null;
const ops = { const ops = {
i: 'put', i: 'put',
@ -17,12 +17,12 @@ class ListRecordStream extends stream.Transform {
super({ objectMode: true }); super({ objectMode: true });
this.logger = logger; this.logger = logger;
this.hasStarted = false; this.hasStarted = false;
this.start = undefined; this.start = null;
this.end = undefined; this.end = null;
this.lastUniqID = undefined; this.lastUniqID = null;
// this.unpublishedListing is true once we pass the oplog that has the // this.unpublishedListing is true once we pass the oplog that has the
// start seq timestamp and uniqID 'h' // start seq timestamp and uniqID 'h'
this.unpublishedListing = undefined; this.unpublishedListing = null;
} }
_transform(itemObj, encoding, callback) { _transform(itemObj, encoding, callback) {
@ -39,7 +39,7 @@ class ListRecordStream extends stream.Transform {
// always update to most recent uniqID // always update to most recent uniqID
this.lastUniqID = itemObj.h.toString(); this.lastUniqID = itemObj.h.toString();
if (this.end === undefined || itemObj.ts.toNumber() > this.end) { if (this.end === null || itemObj.ts.toNumber() > this.end) {
this.end = itemObj.ts.toNumber(); this.end = itemObj.ts.toNumber();
} }
@ -158,7 +158,7 @@ class LogConsumer {
this.coll = this.db.collection('oplog.rs'); this.coll = this.db.collection('oplog.rs');
return this.coll.find({ return this.coll.find({
ns: /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/, ns: /^.*\.\w+.*/,
ts: { $gte: Timestamp.fromNumber(startSeq) }, ts: { $gte: Timestamp.fromNumber(startSeq) },
}, { }, {
limit, limit,
@ -173,9 +173,11 @@ class LogConsumer {
recordStream.write(data); recordStream.write(data);
}); });
stream.on('end', () => { stream.on('end', () => {
console.log('READRECORDS end');
recordStream.write(undefined); recordStream.write(undefined);
}); });
recordStream.once('info', info => { recordStream.once('info', info => {
console.log('READRECORDS info', JSON.stringify(info));
recordStream.removeAllListeners('error'); recordStream.removeAllListeners('error');
cb(null, { info, log: recordStream }); cb(null, { info, log: recordStream });
}); });

View File

@ -868,7 +868,8 @@ class MongoClientInterface {
if (value.name === METASTORE || if (value.name === METASTORE ||
value.name === INFOSTORE || value.name === INFOSTORE ||
value.name === USERSBUCKET || value.name === USERSBUCKET ||
value.name === PENSIEVE value.name === PENSIEVE ||
value.name.startsWith(constants.mpuBucketPrefix)
) { ) {
// skip // skip
return next(); return next();

View File

@ -3,7 +3,7 @@
"engines": { "engines": {
"node": "6.9.5" "node": "6.9.5"
}, },
"version": "7.4.0", "version": "7.4.0-mongologreader",
"description": "Common utilities for the S3 project components", "description": "Common utilities for the S3 project components",
"main": "index.js", "main": "index.js",
"repository": { "repository": {