Compare commits
4 Commits
developmen
...
dropme/log
Author | SHA1 | Date |
---|---|---|
Rached Ben Mustapha | bbec02c631 | |
Rached Ben Mustapha | b0b0e453a6 | |
Rached Ben Mustapha | 0a33954385 | |
Rached Ben Mustapha | c896d59482 |
|
@ -4,7 +4,7 @@ const stream = require('stream');
|
|||
const MongoClient = require('mongodb').MongoClient;
|
||||
const { Timestamp } = require('bson');
|
||||
|
||||
let lastEndID = undefined;
|
||||
let lastEndID = null;
|
||||
|
||||
const ops = {
|
||||
i: 'put',
|
||||
|
@ -17,12 +17,12 @@ class ListRecordStream extends stream.Transform {
|
|||
super({ objectMode: true });
|
||||
this.logger = logger;
|
||||
this.hasStarted = false;
|
||||
this.start = undefined;
|
||||
this.end = undefined;
|
||||
this.lastUniqID = undefined;
|
||||
this.start = null;
|
||||
this.end = null;
|
||||
this.lastUniqID = null;
|
||||
// this.unpublishedListing is true once we pass the oplog that has the
|
||||
// start seq timestamp and uniqID 'h'
|
||||
this.unpublishedListing = undefined;
|
||||
this.unpublishedListing = null;
|
||||
}
|
||||
|
||||
_transform(itemObj, encoding, callback) {
|
||||
|
@ -39,7 +39,7 @@ class ListRecordStream extends stream.Transform {
|
|||
// always update to most recent uniqID
|
||||
this.lastUniqID = itemObj.h.toString();
|
||||
|
||||
if (this.end === undefined || itemObj.ts.toNumber() > this.end) {
|
||||
if (this.end === null || itemObj.ts.toNumber() > this.end) {
|
||||
this.end = itemObj.ts.toNumber();
|
||||
}
|
||||
|
||||
|
@ -158,7 +158,7 @@ class LogConsumer {
|
|||
|
||||
this.coll = this.db.collection('oplog.rs');
|
||||
return this.coll.find({
|
||||
ns: /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/,
|
||||
ns: /^.*\.\w+.*/,
|
||||
ts: { $gte: Timestamp.fromNumber(startSeq) },
|
||||
}, {
|
||||
limit,
|
||||
|
@ -173,9 +173,11 @@ class LogConsumer {
|
|||
recordStream.write(data);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
console.log('READRECORDS end');
|
||||
recordStream.write(undefined);
|
||||
});
|
||||
recordStream.once('info', info => {
|
||||
console.log('READRECORDS info', JSON.stringify(info));
|
||||
recordStream.removeAllListeners('error');
|
||||
cb(null, { info, log: recordStream });
|
||||
});
|
||||
|
|
|
@ -868,7 +868,8 @@ class MongoClientInterface {
|
|||
if (value.name === METASTORE ||
|
||||
value.name === INFOSTORE ||
|
||||
value.name === USERSBUCKET ||
|
||||
value.name === PENSIEVE
|
||||
value.name === PENSIEVE ||
|
||||
value.name.startsWith(constants.mpuBucketPrefix)
|
||||
) {
|
||||
// skip
|
||||
return next();
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
"engines": {
|
||||
"node": "6.9.5"
|
||||
},
|
||||
"version": "7.4.0",
|
||||
"version": "7.4.0-mongologreader",
|
||||
"description": "Common utilities for the S3 project components",
|
||||
"main": "index.js",
|
||||
"repository": {
|
||||
|
|
Loading…
Reference in New Issue