Compare commits

..

No commits in common. "60f36bcd12a477e14993998578138ef765877c8d" and "18dfc6b4fa93c77f3389ff7ea2abe920dd71dfec" have entirely different histories.

3 changed files with 106 additions and 167 deletions

View File

@ -1,42 +1,32 @@
const stream = require('stream'); const stream = require('stream');
class ListRecordStream extends stream.Transform { class ListRecordStream extends stream.Transform {
constructor(logger, lastSavedID, latestOplogID) { constructor(logger, lastEndID) {
super({ objectMode: true }); super({ objectMode: true });
this._logger = logger; this._logger = logger;
this._lastSavedID = lastSavedID; this._lastEndID = lastEndID;
this._latestOplogID = latestOplogID; this._lastTs = 0;
this._lastUniqID = null; this._lastUniqID = null;
// this._unpublishedListing is true once we pass the oplog // this._unpublishedListing is true once we pass the oplog
// record that has the same uniqID 'h' than last saved. If we // that has the start seq timestamp and uniqID 'h'
// don't find it (e.g. log rolled over before populator could
// process its oldest entries), we will restart from the
// latest record of the oplog.
this._unpublishedListing = false; this._unpublishedListing = false;
this._skipCount = 0;
} }
_transform(itemObj, encoding, callback) { _transform(itemObj, encoding, callback) {
// always update to most recent uniqID // always update to most recent uniqID
this._lastUniqID = itemObj.h.toString(); this._lastUniqID = itemObj.h.toString();
if (this._lastTs === null || itemObj.ts.toNumber() > this._lastTs) {
this._lastTs = itemObj.ts.toNumber();
}
// only push to stream unpublished objects // only push to stream unpublished objects
if (!this._unpublishedListing) { if (!this._unpublishedListing) {
// When an oplog with a unique ID that is stored in the // When an oplog with a unique ID that is stored in the
// log offset is found, all oplogs AFTER this is unpublished. // log offset is found, all oplogs AFTER this is unpublished.
if (!this._lastSavedID || this._lastSavedID === this._lastUniqID) { if (!this._lastEndID || this._lastEndID === itemObj.h.toString()) {
this._unpublishedListing = true;
} else if (this._latestOplogID === this._lastUniqID) {
this._logger.warn(
'did not encounter the last saved offset in oplog, ' +
'resuming processing right after the latest record ' +
'to date; some entries may have been skipped', {
lastSavedID: this._lastSavedID,
latestRecordID: this._latestOplogID,
});
this._unpublishedListing = true; this._unpublishedListing = true;
} }
++this._skipCount;
return callback(); return callback();
} }
@ -72,7 +62,6 @@ class ListRecordStream extends stream.Transform {
} else { } else {
// skip other entry types as we don't need them for now // skip other entry types as we don't need them for now
// ('c', ...?) // ('c', ...?)
++this._skipCount;
return callback(); return callback();
} }
const streamObject = { const streamObject = {
@ -84,37 +73,16 @@ class ListRecordStream extends stream.Transform {
return callback(null, streamObject); return callback(null, streamObject);
} }
/** _flush(callback) {
* Get an opaque JSON blob containing the latest consumed offset this.emit('info', {
* from MongoDB oplog. // store both the timestamp and unique oplog id in an
* // opaque JSON string returned to the reader
* @return {string} opaque JSON blob end: JSON.stringify({
*/ ts: this._lastTs,
getOffset() {
return JSON.stringify({
uniqID: this._lastUniqID, uniqID: this._lastUniqID,
}),
}); });
} callback();
/**
* Get the number of entries that have been read and skipped from
* MongoDB oplog since the ListRecordStream instance was created.
*
* @return {integer} number of skipped entries
*/
getSkipCount() {
return this._skipCount;
}
/**
* Get whether the stream reached yet-unpublished records
* (i.e. after we reached either the saved unique ID, or the tip
* of the oplog)
*
* @return {boolean} true if we are now returning unpublished records
*/
reachedUnpublishedListing() {
return this._unpublishedListing;
} }
} }

View File

@ -1,9 +1,8 @@
'use strict'; // eslint-disable-line 'use strict'; // eslint-disable-line
const stream = require('stream');
const MongoClient = require('mongodb').MongoClient; const MongoClient = require('mongodb').MongoClient;
const ListRecordStream = require('./ListRecordStream'); const ListRecordStream = require('./ListRecordStream');
const { Timestamp } = require('bson');
/** /**
* @class * @class
@ -19,9 +18,12 @@ class LogConsumer {
*/ */
constructor(mongoConfig, logger) { constructor(mongoConfig, logger) {
const { replicaSetHosts, database } = mongoConfig; const { replicaSetHosts, database } = mongoConfig;
this._mongoUrl = `mongodb://${replicaSetHosts}/local`; // 'local' is the database where MongoDB has oplogs.rs capped collection
this._logger = logger; this.database = 'local';
this._oplogNsRegExp = new RegExp(`^${database}\\.`); this.mongoUrl = `mongodb://${replicaSetHosts}/local`;
this.logger = logger;
this.metadataDatabase = database;
this.oplogNsRegExp = new RegExp(`^${database}\\.`);
} }
/** /**
@ -33,103 +35,67 @@ class LogConsumer {
* @return {undefined} * @return {undefined}
*/ */
connectMongo(done) { connectMongo(done) {
MongoClient.connect(this._mongoUrl, { MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
replicaSet: 'rs0',
useNewUrlParser: true,
},
(err, client) => { (err, client) => {
if (err) { if (err) {
this._logger.error('Unable to connect to MongoDB', this.logger.error('Unable to connect to MongoDB',
{ error: err }); { error: err });
return done(err); return done(err);
} }
this._logger.info('connected to mongodb'); this.logger.info('connected to mongodb');
this.client = client; this.client = client;
// 'local' is the database where MongoDB has oplog.rs this.db = client.db(this.database, {
// capped collection
this.db = client.db('local', {
ignoreUndefined: true, ignoreUndefined: true,
}); });
return done(); return done();
}); });
} }
/** /**
* Open a tailable cursor to mongo oplog and retrieve a stream of * Read a series of log records from mongo
* records to read
* *
* @param {Object} [params] - params object * @param {Object} [params] - params object
* @param {String} [params.startSeq] - fetch starting from this * @param {String} [params.startSeq] - fetch starting from this
* opaque offset returned previously by mongo ListRecordStream * opaque offset returned previously by mongo ListRecordStream
* in an 'info' event * in an 'info' event
* @param {Number} [params.limit] - maximum number of log records
* to return
* @param {function} cb - callback function, called with an error * @param {function} cb - callback function, called with an error
* object or null and an object as 2nd parameter * object or null and an object as 2nd parameter
* *
* @return {undefined} * @return {undefined}
*/ */
readRecords(params, cb) { readRecords(params, cb) {
let startSeq = {}; const limit = params.limit || 10000;
let startSeq = { ts: 0 };
if (params.startSeq) { if (params.startSeq) {
try { try {
// parse the opaque JSON string passed through from a // parse the opaque JSON string passed through from a
// previous 'info' event // previous 'info' event
startSeq = JSON.parse(params.startSeq); startSeq = JSON.parse(params.startSeq);
} catch (err) { } catch (err) {
this._logger.error('malformed startSeq', { this.logger.error('malformed startSeq', {
startSeq: params.startSeq, startSeq: params.startSeq,
}); });
// start over if malformed // start over if malformed
} }
} }
const recordStream = new ListRecordStream(this.logger, startSeq.uniqID);
this.coll = this.db.collection('oplog.rs'); this.coll = this.db.collection('oplog.rs');
this._readLatestOplogID((err, latestOplogID) => {
if (err) {
return cb(err);
}
const recordStream = new ListRecordStream(this._logger,
startSeq.uniqID,
latestOplogID);
return this.coll.find({ return this.coll.find({
ns: this._oplogNsRegExp, ns: this.oplogNsRegExp,
ts: { $gte: Timestamp.fromNumber(startSeq.ts) },
}, { }, {
tailable: true, limit,
awaitData: true, tailable: false,
awaitData: false,
noCursorTimeout: true, noCursorTimeout: true,
oplogReplay: true,
numberOfRetries: Number.MAX_VALUE, numberOfRetries: Number.MAX_VALUE,
}, (err, res) => { }, (err, res) => {
const cursorStream = res.stream(); res.stream().pipe(recordStream);
stream.finished(cursorStream, err => { recordStream.removeAllListeners('error');
if (err) { return cb(null, { log: recordStream });
this._logger.error('cursor stream error', {
error: err.message,
});
recordStream.emit('error', err);
} else {
this._logger.error('cursor stream closed');
recordStream.emit(
'error', new Error('cursor stream closed'));
}
});
cursorStream.pipe(recordStream);
return cb(null, { log: recordStream, tailable: true });
});
});
}
_readLatestOplogID(cb) {
this.coll.find({
ns: this._oplogNsRegExp,
}, {
ts: 1,
}).sort({
$natural: -1,
}).limit(1).toArray((err, data) => {
if (err) {
return cb(err);
}
const latestOplogID = data[0].h.toString();
this._logger.debug('latest oplog ID read', { latestOplogID });
return cb(null, latestOplogID);
}); });
} }
} }

View File

@ -170,17 +170,19 @@ describe('mongoclient.ListRecordStream', () => {
it(`should transform ${entryType}`, done => { it(`should transform ${entryType}`, done => {
const lrs = new ListRecordStream(logger, const lrs = new ListRecordStream(logger,
lastEndIDEntry.h.toString()); lastEndIDEntry.h.toString());
let hasReceivedData = false; let dataReceived = false;
lrs.on('data', entry => { lrs.on('info', info => {
assert.strictEqual(hasReceivedData, false); assert(dataReceived);
hasReceivedData = true; const parsedInfo = info;
assert.deepStrictEqual(entry, expectedStreamEntries[entryType]); parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '-42' },
}); });
lrs.on('end', () => { return done();
assert.strictEqual(hasReceivedData, true); });
assert.deepStrictEqual(JSON.parse(lrs.getOffset()), lrs.on('data', entry => {
{ uniqID: '-42' }); assert.deepStrictEqual(entry, expectedStreamEntries[entryType]);
done(); dataReceived = true;
}); });
// first write will be ignored by ListRecordStream because // first write will be ignored by ListRecordStream because
// of the last end ID (-42), it's needed though to bootstrap it // of the last end ID (-42), it's needed though to bootstrap it
@ -191,12 +193,20 @@ describe('mongoclient.ListRecordStream', () => {
}); });
it('should ignore other entry types', done => { it('should ignore other entry types', done => {
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString()); const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
let infoEmitted = false;
lrs.on('info', info => {
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '-42' },
});
infoEmitted = true;
});
lrs.on('data', entry => { lrs.on('data', entry => {
assert(false, `ListRecordStream did not ignore entry ${entry}`); assert(false, `ListRecordStream did not ignore entry ${entry}`);
}); });
lrs.on('end', () => { lrs.on('end', () => {
assert.deepStrictEqual(JSON.parse(lrs.getOffset()), assert(infoEmitted);
{ uniqID: '-42' });
done(); done();
}); });
// first write will be ignored by ListRecordStream because // first write will be ignored by ListRecordStream because
@ -207,60 +217,55 @@ describe('mongoclient.ListRecordStream', () => {
}); });
lrs.end(); lrs.end();
}); });
it('should emit info even if no entry consumed', done => {
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
let infoEmitted = false;
lrs.on('info', info => {
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 0, uniqID: null },
});
infoEmitted = true;
});
lrs.on('data', () => {
assert(false, 'did not expect data from ListRecordStream');
});
lrs.on('end', () => {
assert(infoEmitted);
done();
});
lrs.end();
});
it('should skip entries until uniqID is encountered', done => { it('should skip entries until uniqID is encountered', done => {
const logEntries = [ const logEntries = [
Object.assign({}, mongoProcessedLogEntries.insert, Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 1234, ts: Timestamp.fromNumber(45) }), { h: 1234 }),
Object.assign({}, mongoProcessedLogEntries.insert, Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 5678, ts: Timestamp.fromNumber(44) }), { h: 5678 }),
Object.assign({}, mongoProcessedLogEntries.insert, Object.assign({}, mongoProcessedLogEntries.insert,
{ h: -1234, ts: Timestamp.fromNumber(42) }), { h: -1234 }),
Object.assign({}, mongoProcessedLogEntries.insert, Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 2345, ts: Timestamp.fromNumber(42) }), { h: 2345 }),
]; ];
const lrs = new ListRecordStream(logger, '5678'); const lrs = new ListRecordStream(logger, '5678');
assert.strictEqual(lrs.reachedUnpublishedListing(), false);
let nbReceivedEntries = 0; let nbReceivedEntries = 0;
let infoEmitted = false;
lrs.on('info', info => {
infoEmitted = true;
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '2345' },
});
});
lrs.on('data', entry => { lrs.on('data', entry => {
assert.deepStrictEqual(entry, expectedStreamEntries.insert); assert.deepStrictEqual(entry, expectedStreamEntries.insert);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
++nbReceivedEntries; ++nbReceivedEntries;
}); });
lrs.on('end', () => { lrs.on('end', () => {
assert.strictEqual(nbReceivedEntries, 2); assert.strictEqual(nbReceivedEntries, 2);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()), assert(infoEmitted);
{ uniqID: '2345' });
assert.strictEqual(lrs.getSkipCount(), 2);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
done();
});
logEntries.forEach(entry => lrs.write(entry));
lrs.end();
});
it('should start after latest entry if uniqID is not encountered', done => {
const logEntries = [
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 1234, ts: Timestamp.fromNumber(45) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 5678, ts: Timestamp.fromNumber(44) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: -1234, ts: Timestamp.fromNumber(42) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 2345, ts: Timestamp.fromNumber(42) }),
];
const lrs = new ListRecordStream(logger, '4242', '-1234');
let nbReceivedEntries = 0;
lrs.on('data', entry => {
assert.deepStrictEqual(entry, expectedStreamEntries.insert);
++nbReceivedEntries;
});
lrs.on('end', () => {
assert.strictEqual(nbReceivedEntries, 1);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '2345' });
assert.strictEqual(lrs.getSkipCount(), 3);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
done(); done();
}); });
logEntries.forEach(entry => lrs.write(entry)); logEntries.forEach(entry => lrs.write(entry));