Compare commits

...

6 Commits

Author SHA1 Message Date
Jonathan Gramain 60f36bcd12 [squash] ZENKO-1175 test: use stream.finished() on cursor stream 2018-10-29 15:34:07 -07:00
Jonathan Gramain 4640762fb3 [squash] ZENKO-1175 + cursor error log 2018-10-29 15:05:19 -07:00
Jonathan Gramain 77ec227552 [squash] ZENKO-1175 fix cursor stream error handling
listen to 'close' and 'error' events from the stream returned by
cursor.stream() function, instead of from the opened cursor
itself. Also close the cursor if an error happens.
2018-10-29 12:40:09 -07:00
Jonathan Gramain 4e20c1c78c [squash] ZENKO-1175 add helper reachedUnpublishedListing() 2018-10-26 17:15:46 -07:00
Jonathan Gramain b3176ed217 ft: ZENKO-1175 one more unit test
Test when the saved uniqID is not found in the oplog.
2018-10-26 15:39:12 -07:00
Jonathan Gramain 829eba47e9 ft: ZENKO-1175 tailable cursor to consume mongo oplog
Use a tailable custor to keep ordering guarantees for the records we
read. This also means we have to read from the beginning when we
reconnect (at startup), and start processing when we encountered the
unique ID previously stored in zookeeper.

Also removed dispatcher mode with MongoLogReader (was only used for
the short-lived Federation deployment of Zenko).
2018-10-26 14:00:21 -07:00
3 changed files with 166 additions and 105 deletions

View File

@ -1,32 +1,42 @@
const stream = require('stream'); const stream = require('stream');
class ListRecordStream extends stream.Transform { class ListRecordStream extends stream.Transform {
constructor(logger, lastEndID) { constructor(logger, lastSavedID, latestOplogID) {
super({ objectMode: true }); super({ objectMode: true });
this._logger = logger; this._logger = logger;
this._lastEndID = lastEndID; this._lastSavedID = lastSavedID;
this._lastTs = 0; this._latestOplogID = latestOplogID;
this._lastUniqID = null; this._lastUniqID = null;
// this._unpublishedListing is true once we pass the oplog // this._unpublishedListing is true once we pass the oplog
// that has the start seq timestamp and uniqID 'h' // record that has the same uniqID 'h' than last saved. If we
// don't find it (e.g. log rolled over before populator could
// process its oldest entries), we will restart from the
// latest record of the oplog.
this._unpublishedListing = false; this._unpublishedListing = false;
this._skipCount = 0;
} }
_transform(itemObj, encoding, callback) { _transform(itemObj, encoding, callback) {
// always update to most recent uniqID // always update to most recent uniqID
this._lastUniqID = itemObj.h.toString(); this._lastUniqID = itemObj.h.toString();
if (this._lastTs === null || itemObj.ts.toNumber() > this._lastTs) {
this._lastTs = itemObj.ts.toNumber();
}
// only push to stream unpublished objects // only push to stream unpublished objects
if (!this._unpublishedListing) { if (!this._unpublishedListing) {
// When an oplog with a unique ID that is stored in the // When an oplog with a unique ID that is stored in the
// log offset is found, all oplogs AFTER this is unpublished. // log offset is found, all oplogs AFTER this is unpublished.
if (!this._lastEndID || this._lastEndID === itemObj.h.toString()) { if (!this._lastSavedID || this._lastSavedID === this._lastUniqID) {
this._unpublishedListing = true;
} else if (this._latestOplogID === this._lastUniqID) {
this._logger.warn(
'did not encounter the last saved offset in oplog, ' +
'resuming processing right after the latest record ' +
'to date; some entries may have been skipped', {
lastSavedID: this._lastSavedID,
latestRecordID: this._latestOplogID,
});
this._unpublishedListing = true; this._unpublishedListing = true;
} }
++this._skipCount;
return callback(); return callback();
} }
@ -62,6 +72,7 @@ class ListRecordStream extends stream.Transform {
} else { } else {
// skip other entry types as we don't need them for now // skip other entry types as we don't need them for now
// ('c', ...?) // ('c', ...?)
++this._skipCount;
return callback(); return callback();
} }
const streamObject = { const streamObject = {
@ -73,16 +84,37 @@ class ListRecordStream extends stream.Transform {
return callback(null, streamObject); return callback(null, streamObject);
} }
_flush(callback) { /**
this.emit('info', { * Get an opaque JSON blob containing the latest consumed offset
// store both the timestamp and unique oplog id in an * from MongoDB oplog.
// opaque JSON string returned to the reader *
end: JSON.stringify({ * @return {string} opaque JSON blob
ts: this._lastTs, */
uniqID: this._lastUniqID, getOffset() {
}), return JSON.stringify({
uniqID: this._lastUniqID,
}); });
callback(); }
/**
* Get the number of entries that have been read and skipped from
* MongoDB oplog since the ListRecordStream instance was created.
*
* @return {integer} number of skipped entries
*/
getSkipCount() {
return this._skipCount;
}
/**
* Get whether the stream reached yet-unpublished records
* (i.e. after we reached either the saved unique ID, or the tip
* of the oplog)
*
* @return {boolean} true if we are now returning unpublished records
*/
reachedUnpublishedListing() {
return this._unpublishedListing;
} }
} }

View File

@ -1,8 +1,9 @@
'use strict'; // eslint-disable-line 'use strict'; // eslint-disable-line
const stream = require('stream');
const MongoClient = require('mongodb').MongoClient; const MongoClient = require('mongodb').MongoClient;
const ListRecordStream = require('./ListRecordStream'); const ListRecordStream = require('./ListRecordStream');
const { Timestamp } = require('bson');
/** /**
* @class * @class
@ -18,12 +19,9 @@ class LogConsumer {
*/ */
constructor(mongoConfig, logger) { constructor(mongoConfig, logger) {
const { replicaSetHosts, database } = mongoConfig; const { replicaSetHosts, database } = mongoConfig;
// 'local' is the database where MongoDB has oplogs.rs capped collection this._mongoUrl = `mongodb://${replicaSetHosts}/local`;
this.database = 'local'; this._logger = logger;
this.mongoUrl = `mongodb://${replicaSetHosts}/local`; this._oplogNsRegExp = new RegExp(`^${database}\\.`);
this.logger = logger;
this.metadataDatabase = database;
this.oplogNsRegExp = new RegExp(`^${database}\\.`);
} }
/** /**
@ -35,67 +33,103 @@ class LogConsumer {
* @return {undefined} * @return {undefined}
*/ */
connectMongo(done) { connectMongo(done) {
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' }, MongoClient.connect(this._mongoUrl, {
replicaSet: 'rs0',
useNewUrlParser: true,
},
(err, client) => { (err, client) => {
if (err) { if (err) {
this.logger.error('Unable to connect to MongoDB', this._logger.error('Unable to connect to MongoDB',
{ error: err }); { error: err });
return done(err); return done(err);
} }
this.logger.info('connected to mongodb'); this._logger.info('connected to mongodb');
this.client = client; this.client = client;
this.db = client.db(this.database, { // 'local' is the database where MongoDB has oplog.rs
// capped collection
this.db = client.db('local', {
ignoreUndefined: true, ignoreUndefined: true,
}); });
return done(); return done();
}); });
} }
/** /**
* Read a series of log records from mongo * Open a tailable cursor to mongo oplog and retrieve a stream of
* records to read
* *
* @param {Object} [params] - params object * @param {Object} [params] - params object
* @param {String} [params.startSeq] - fetch starting from this * @param {String} [params.startSeq] - fetch starting from this
* opaque offset returned previously by mongo ListRecordStream * opaque offset returned previously by mongo ListRecordStream
* in an 'info' event * in an 'info' event
* @param {Number} [params.limit] - maximum number of log records
* to return
* @param {function} cb - callback function, called with an error * @param {function} cb - callback function, called with an error
* object or null and an object as 2nd parameter * object or null and an object as 2nd parameter
* *
* @return {undefined} * @return {undefined}
*/ */
readRecords(params, cb) { readRecords(params, cb) {
const limit = params.limit || 10000; let startSeq = {};
let startSeq = { ts: 0 };
if (params.startSeq) { if (params.startSeq) {
try { try {
// parse the opaque JSON string passed through from a // parse the opaque JSON string passed through from a
// previous 'info' event // previous 'info' event
startSeq = JSON.parse(params.startSeq); startSeq = JSON.parse(params.startSeq);
} catch (err) { } catch (err) {
this.logger.error('malformed startSeq', { this._logger.error('malformed startSeq', {
startSeq: params.startSeq, startSeq: params.startSeq,
}); });
// start over if malformed // start over if malformed
} }
} }
const recordStream = new ListRecordStream(this.logger, startSeq.uniqID);
this.coll = this.db.collection('oplog.rs'); this.coll = this.db.collection('oplog.rs');
return this.coll.find({ this._readLatestOplogID((err, latestOplogID) => {
ns: this.oplogNsRegExp, if (err) {
ts: { $gte: Timestamp.fromNumber(startSeq.ts) }, return cb(err);
}
const recordStream = new ListRecordStream(this._logger,
startSeq.uniqID,
latestOplogID);
return this.coll.find({
ns: this._oplogNsRegExp,
}, {
tailable: true,
awaitData: true,
noCursorTimeout: true,
numberOfRetries: Number.MAX_VALUE,
}, (err, res) => {
const cursorStream = res.stream();
stream.finished(cursorStream, err => {
if (err) {
this._logger.error('cursor stream error', {
error: err.message,
});
recordStream.emit('error', err);
} else {
this._logger.error('cursor stream closed');
recordStream.emit(
'error', new Error('cursor stream closed'));
}
});
cursorStream.pipe(recordStream);
return cb(null, { log: recordStream, tailable: true });
});
});
}
_readLatestOplogID(cb) {
this.coll.find({
ns: this._oplogNsRegExp,
}, { }, {
limit, ts: 1,
tailable: false, }).sort({
awaitData: false, $natural: -1,
noCursorTimeout: true, }).limit(1).toArray((err, data) => {
oplogReplay: true, if (err) {
numberOfRetries: Number.MAX_VALUE, return cb(err);
}, (err, res) => { }
res.stream().pipe(recordStream); const latestOplogID = data[0].h.toString();
recordStream.removeAllListeners('error'); this._logger.debug('latest oplog ID read', { latestOplogID });
return cb(null, { log: recordStream }); return cb(null, latestOplogID);
}); });
} }
} }

View File

@ -170,19 +170,17 @@ describe('mongoclient.ListRecordStream', () => {
it(`should transform ${entryType}`, done => { it(`should transform ${entryType}`, done => {
const lrs = new ListRecordStream(logger, const lrs = new ListRecordStream(logger,
lastEndIDEntry.h.toString()); lastEndIDEntry.h.toString());
let dataReceived = false; let hasReceivedData = false;
lrs.on('info', info => {
assert(dataReceived);
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '-42' },
});
return done();
});
lrs.on('data', entry => { lrs.on('data', entry => {
assert.strictEqual(hasReceivedData, false);
hasReceivedData = true;
assert.deepStrictEqual(entry, expectedStreamEntries[entryType]); assert.deepStrictEqual(entry, expectedStreamEntries[entryType]);
dataReceived = true; });
lrs.on('end', () => {
assert.strictEqual(hasReceivedData, true);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '-42' });
done();
}); });
// first write will be ignored by ListRecordStream because // first write will be ignored by ListRecordStream because
// of the last end ID (-42), it's needed though to bootstrap it // of the last end ID (-42), it's needed though to bootstrap it
@ -193,20 +191,12 @@ describe('mongoclient.ListRecordStream', () => {
}); });
it('should ignore other entry types', done => { it('should ignore other entry types', done => {
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString()); const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
let infoEmitted = false;
lrs.on('info', info => {
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '-42' },
});
infoEmitted = true;
});
lrs.on('data', entry => { lrs.on('data', entry => {
assert(false, `ListRecordStream did not ignore entry ${entry}`); assert(false, `ListRecordStream did not ignore entry ${entry}`);
}); });
lrs.on('end', () => { lrs.on('end', () => {
assert(infoEmitted); assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '-42' });
done(); done();
}); });
// first write will be ignored by ListRecordStream because // first write will be ignored by ListRecordStream because
@ -217,55 +207,60 @@ describe('mongoclient.ListRecordStream', () => {
}); });
lrs.end(); lrs.end();
}); });
it('should emit info even if no entry consumed', done => {
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
let infoEmitted = false;
lrs.on('info', info => {
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 0, uniqID: null },
});
infoEmitted = true;
});
lrs.on('data', () => {
assert(false, 'did not expect data from ListRecordStream');
});
lrs.on('end', () => {
assert(infoEmitted);
done();
});
lrs.end();
});
it('should skip entries until uniqID is encountered', done => { it('should skip entries until uniqID is encountered', done => {
const logEntries = [ const logEntries = [
Object.assign({}, mongoProcessedLogEntries.insert, Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 1234 }), { h: 1234, ts: Timestamp.fromNumber(45) }),
Object.assign({}, mongoProcessedLogEntries.insert, Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 5678 }), { h: 5678, ts: Timestamp.fromNumber(44) }),
Object.assign({}, mongoProcessedLogEntries.insert, Object.assign({}, mongoProcessedLogEntries.insert,
{ h: -1234 }), { h: -1234, ts: Timestamp.fromNumber(42) }),
Object.assign({}, mongoProcessedLogEntries.insert, Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 2345 }), { h: 2345, ts: Timestamp.fromNumber(42) }),
]; ];
const lrs = new ListRecordStream(logger, '5678'); const lrs = new ListRecordStream(logger, '5678');
assert.strictEqual(lrs.reachedUnpublishedListing(), false);
let nbReceivedEntries = 0; let nbReceivedEntries = 0;
let infoEmitted = false; lrs.on('data', entry => {
lrs.on('info', info => { assert.deepStrictEqual(entry, expectedStreamEntries.insert);
infoEmitted = true; assert.strictEqual(lrs.reachedUnpublishedListing(), true);
const parsedInfo = info; ++nbReceivedEntries;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '2345' },
});
}); });
lrs.on('end', () => {
assert.strictEqual(nbReceivedEntries, 2);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '2345' });
assert.strictEqual(lrs.getSkipCount(), 2);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
done();
});
logEntries.forEach(entry => lrs.write(entry));
lrs.end();
});
it('should start after latest entry if uniqID is not encountered', done => {
const logEntries = [
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 1234, ts: Timestamp.fromNumber(45) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 5678, ts: Timestamp.fromNumber(44) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: -1234, ts: Timestamp.fromNumber(42) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 2345, ts: Timestamp.fromNumber(42) }),
];
const lrs = new ListRecordStream(logger, '4242', '-1234');
let nbReceivedEntries = 0;
lrs.on('data', entry => { lrs.on('data', entry => {
assert.deepStrictEqual(entry, expectedStreamEntries.insert); assert.deepStrictEqual(entry, expectedStreamEntries.insert);
++nbReceivedEntries; ++nbReceivedEntries;
}); });
lrs.on('end', () => { lrs.on('end', () => {
assert.strictEqual(nbReceivedEntries, 2); assert.strictEqual(nbReceivedEntries, 1);
assert(infoEmitted); assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '2345' });
assert.strictEqual(lrs.getSkipCount(), 3);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
done(); done();
}); });
logEntries.forEach(entry => lrs.write(entry)); logEntries.forEach(entry => lrs.write(entry));