Compare commits

..

No commits in common. "60f36bcd12a477e14993998578138ef765877c8d" and "18dfc6b4fa93c77f3389ff7ea2abe920dd71dfec" have entirely different histories.

3 changed files with 106 additions and 167 deletions

View File

@ -1,42 +1,32 @@
const stream = require('stream');
class ListRecordStream extends stream.Transform {
constructor(logger, lastSavedID, latestOplogID) {
constructor(logger, lastEndID) {
super({ objectMode: true });
this._logger = logger;
this._lastSavedID = lastSavedID;
this._latestOplogID = latestOplogID;
this._lastEndID = lastEndID;
this._lastTs = 0;
this._lastUniqID = null;
// this._unpublishedListing is true once we pass the oplog
// record that has the same uniqID 'h' than last saved. If we
// don't find it (e.g. log rolled over before populator could
// process its oldest entries), we will restart from the
// latest record of the oplog.
// that has the start seq timestamp and uniqID 'h'
this._unpublishedListing = false;
this._skipCount = 0;
}
_transform(itemObj, encoding, callback) {
// always update to most recent uniqID
this._lastUniqID = itemObj.h.toString();
if (this._lastTs === null || itemObj.ts.toNumber() > this._lastTs) {
this._lastTs = itemObj.ts.toNumber();
}
// only push to stream unpublished objects
if (!this._unpublishedListing) {
// When an oplog with a unique ID that is stored in the
// log offset is found, all oplogs AFTER this is unpublished.
if (!this._lastSavedID || this._lastSavedID === this._lastUniqID) {
this._unpublishedListing = true;
} else if (this._latestOplogID === this._lastUniqID) {
this._logger.warn(
'did not encounter the last saved offset in oplog, ' +
'resuming processing right after the latest record ' +
'to date; some entries may have been skipped', {
lastSavedID: this._lastSavedID,
latestRecordID: this._latestOplogID,
});
if (!this._lastEndID || this._lastEndID === itemObj.h.toString()) {
this._unpublishedListing = true;
}
++this._skipCount;
return callback();
}
@ -72,7 +62,6 @@ class ListRecordStream extends stream.Transform {
} else {
// skip other entry types as we don't need them for now
// ('c', ...?)
++this._skipCount;
return callback();
}
const streamObject = {
@ -84,37 +73,16 @@ class ListRecordStream extends stream.Transform {
return callback(null, streamObject);
}
/**
* Get an opaque JSON blob containing the latest consumed offset
* from MongoDB oplog.
*
* @return {string} opaque JSON blob
*/
getOffset() {
return JSON.stringify({
_flush(callback) {
this.emit('info', {
// store both the timestamp and unique oplog id in an
// opaque JSON string returned to the reader
end: JSON.stringify({
ts: this._lastTs,
uniqID: this._lastUniqID,
}),
});
}
/**
* Get the number of entries that have been read and skipped from
* MongoDB oplog since the ListRecordStream instance was created.
*
* @return {integer} number of skipped entries
*/
getSkipCount() {
return this._skipCount;
}
/**
* Get whether the stream reached yet-unpublished records
* (i.e. after we reached either the saved unique ID, or the tip
* of the oplog)
*
* @return {boolean} true if we are now returning unpublished records
*/
reachedUnpublishedListing() {
return this._unpublishedListing;
callback();
}
}

View File

@ -1,9 +1,8 @@
'use strict'; // eslint-disable-line
const stream = require('stream');
const MongoClient = require('mongodb').MongoClient;
const ListRecordStream = require('./ListRecordStream');
const { Timestamp } = require('bson');
/**
* @class
@ -19,9 +18,12 @@ class LogConsumer {
*/
constructor(mongoConfig, logger) {
const { replicaSetHosts, database } = mongoConfig;
this._mongoUrl = `mongodb://${replicaSetHosts}/local`;
this._logger = logger;
this._oplogNsRegExp = new RegExp(`^${database}\\.`);
// 'local' is the database where MongoDB has oplogs.rs capped collection
this.database = 'local';
this.mongoUrl = `mongodb://${replicaSetHosts}/local`;
this.logger = logger;
this.metadataDatabase = database;
this.oplogNsRegExp = new RegExp(`^${database}\\.`);
}
/**
@ -33,103 +35,67 @@ class LogConsumer {
* @return {undefined}
*/
connectMongo(done) {
MongoClient.connect(this._mongoUrl, {
replicaSet: 'rs0',
useNewUrlParser: true,
},
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
(err, client) => {
if (err) {
this._logger.error('Unable to connect to MongoDB',
this.logger.error('Unable to connect to MongoDB',
{ error: err });
return done(err);
}
this._logger.info('connected to mongodb');
this.logger.info('connected to mongodb');
this.client = client;
// 'local' is the database where MongoDB has oplog.rs
// capped collection
this.db = client.db('local', {
this.db = client.db(this.database, {
ignoreUndefined: true,
});
return done();
});
}
/**
* Open a tailable cursor to mongo oplog and retrieve a stream of
* records to read
* Read a series of log records from mongo
*
* @param {Object} [params] - params object
* @param {String} [params.startSeq] - fetch starting from this
* opaque offset returned previously by mongo ListRecordStream
* in an 'info' event
* @param {Number} [params.limit] - maximum number of log records
* to return
* @param {function} cb - callback function, called with an error
* object or null and an object as 2nd parameter
*
* @return {undefined}
*/
readRecords(params, cb) {
let startSeq = {};
const limit = params.limit || 10000;
let startSeq = { ts: 0 };
if (params.startSeq) {
try {
// parse the opaque JSON string passed through from a
// previous 'info' event
startSeq = JSON.parse(params.startSeq);
} catch (err) {
this._logger.error('malformed startSeq', {
this.logger.error('malformed startSeq', {
startSeq: params.startSeq,
});
// start over if malformed
}
}
const recordStream = new ListRecordStream(this.logger, startSeq.uniqID);
this.coll = this.db.collection('oplog.rs');
this._readLatestOplogID((err, latestOplogID) => {
if (err) {
return cb(err);
}
const recordStream = new ListRecordStream(this._logger,
startSeq.uniqID,
latestOplogID);
return this.coll.find({
ns: this._oplogNsRegExp,
ns: this.oplogNsRegExp,
ts: { $gte: Timestamp.fromNumber(startSeq.ts) },
}, {
tailable: true,
awaitData: true,
limit,
tailable: false,
awaitData: false,
noCursorTimeout: true,
oplogReplay: true,
numberOfRetries: Number.MAX_VALUE,
}, (err, res) => {
const cursorStream = res.stream();
stream.finished(cursorStream, err => {
if (err) {
this._logger.error('cursor stream error', {
error: err.message,
});
recordStream.emit('error', err);
} else {
this._logger.error('cursor stream closed');
recordStream.emit(
'error', new Error('cursor stream closed'));
}
});
cursorStream.pipe(recordStream);
return cb(null, { log: recordStream, tailable: true });
});
});
}
_readLatestOplogID(cb) {
this.coll.find({
ns: this._oplogNsRegExp,
}, {
ts: 1,
}).sort({
$natural: -1,
}).limit(1).toArray((err, data) => {
if (err) {
return cb(err);
}
const latestOplogID = data[0].h.toString();
this._logger.debug('latest oplog ID read', { latestOplogID });
return cb(null, latestOplogID);
res.stream().pipe(recordStream);
recordStream.removeAllListeners('error');
return cb(null, { log: recordStream });
});
}
}

View File

@ -170,17 +170,19 @@ describe('mongoclient.ListRecordStream', () => {
it(`should transform ${entryType}`, done => {
const lrs = new ListRecordStream(logger,
lastEndIDEntry.h.toString());
let hasReceivedData = false;
lrs.on('data', entry => {
assert.strictEqual(hasReceivedData, false);
hasReceivedData = true;
assert.deepStrictEqual(entry, expectedStreamEntries[entryType]);
let dataReceived = false;
lrs.on('info', info => {
assert(dataReceived);
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '-42' },
});
lrs.on('end', () => {
assert.strictEqual(hasReceivedData, true);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '-42' });
done();
return done();
});
lrs.on('data', entry => {
assert.deepStrictEqual(entry, expectedStreamEntries[entryType]);
dataReceived = true;
});
// first write will be ignored by ListRecordStream because
// of the last end ID (-42), it's needed though to bootstrap it
@ -191,12 +193,20 @@ describe('mongoclient.ListRecordStream', () => {
});
it('should ignore other entry types', done => {
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
let infoEmitted = false;
lrs.on('info', info => {
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '-42' },
});
infoEmitted = true;
});
lrs.on('data', entry => {
assert(false, `ListRecordStream did not ignore entry ${entry}`);
});
lrs.on('end', () => {
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '-42' });
assert(infoEmitted);
done();
});
// first write will be ignored by ListRecordStream because
@ -207,60 +217,55 @@ describe('mongoclient.ListRecordStream', () => {
});
lrs.end();
});
it('should emit info even if no entry consumed', done => {
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
let infoEmitted = false;
lrs.on('info', info => {
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 0, uniqID: null },
});
infoEmitted = true;
});
lrs.on('data', () => {
assert(false, 'did not expect data from ListRecordStream');
});
lrs.on('end', () => {
assert(infoEmitted);
done();
});
lrs.end();
});
it('should skip entries until uniqID is encountered', done => {
const logEntries = [
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 1234, ts: Timestamp.fromNumber(45) }),
{ h: 1234 }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 5678, ts: Timestamp.fromNumber(44) }),
{ h: 5678 }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: -1234, ts: Timestamp.fromNumber(42) }),
{ h: -1234 }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 2345, ts: Timestamp.fromNumber(42) }),
{ h: 2345 }),
];
const lrs = new ListRecordStream(logger, '5678');
assert.strictEqual(lrs.reachedUnpublishedListing(), false);
let nbReceivedEntries = 0;
let infoEmitted = false;
lrs.on('info', info => {
infoEmitted = true;
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '2345' },
});
});
lrs.on('data', entry => {
assert.deepStrictEqual(entry, expectedStreamEntries.insert);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
++nbReceivedEntries;
});
lrs.on('end', () => {
assert.strictEqual(nbReceivedEntries, 2);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '2345' });
assert.strictEqual(lrs.getSkipCount(), 2);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
done();
});
logEntries.forEach(entry => lrs.write(entry));
lrs.end();
});
it('should start after latest entry if uniqID is not encountered', done => {
const logEntries = [
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 1234, ts: Timestamp.fromNumber(45) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 5678, ts: Timestamp.fromNumber(44) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: -1234, ts: Timestamp.fromNumber(42) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 2345, ts: Timestamp.fromNumber(42) }),
];
const lrs = new ListRecordStream(logger, '4242', '-1234');
let nbReceivedEntries = 0;
lrs.on('data', entry => {
assert.deepStrictEqual(entry, expectedStreamEntries.insert);
++nbReceivedEntries;
});
lrs.on('end', () => {
assert.strictEqual(nbReceivedEntries, 1);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '2345' });
assert.strictEqual(lrs.getSkipCount(), 3);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
assert(infoEmitted);
done();
});
logEntries.forEach(entry => lrs.write(entry));