Compare commits

...

6 Commits

Author SHA1 Message Date
Jonathan Gramain 60f36bcd12 [squash] ZENKO-1175 test: use stream.finished() on cursor stream 2018-10-29 15:34:07 -07:00
Jonathan Gramain 4640762fb3 [squash] ZENKO-1175 + cursor error log 2018-10-29 15:05:19 -07:00
Jonathan Gramain 77ec227552 [squash] ZENKO-1175 fix cursor stream error handling
listen to 'close' and 'error' events from the stream returned by
cursor.stream() function, instead of from the opened cursor
itself. Also close the cursor if an error happens.
2018-10-29 12:40:09 -07:00
Jonathan Gramain 4e20c1c78c [squash] ZENKO-1175 add helper reachedUnpublishedListing() 2018-10-26 17:15:46 -07:00
Jonathan Gramain b3176ed217 ft: ZENKO-1175 one more unit test
Test when the saved uniqID is not found in the oplog.
2018-10-26 15:39:12 -07:00
Jonathan Gramain 829eba47e9 ft: ZENKO-1175 tailable cursor to consume mongo oplog
Use a tailable custor to keep ordering guarantees for the records we
read. This also means we have to read from the beginning when we
reconnect (at startup), and start processing when we encountered the
unique ID previously stored in zookeeper.

Also removed dispatcher mode with MongoLogReader (was only used for
the short-lived Federation deployment of Zenko).
2018-10-26 14:00:21 -07:00
3 changed files with 166 additions and 105 deletions

View File

@ -1,32 +1,42 @@
const stream = require('stream');
class ListRecordStream extends stream.Transform {
constructor(logger, lastEndID) {
constructor(logger, lastSavedID, latestOplogID) {
super({ objectMode: true });
this._logger = logger;
this._lastEndID = lastEndID;
this._lastTs = 0;
this._lastSavedID = lastSavedID;
this._latestOplogID = latestOplogID;
this._lastUniqID = null;
// this._unpublishedListing is true once we pass the oplog
// that has the start seq timestamp and uniqID 'h'
// record that has the same uniqID 'h' than last saved. If we
// don't find it (e.g. log rolled over before populator could
// process its oldest entries), we will restart from the
// latest record of the oplog.
this._unpublishedListing = false;
this._skipCount = 0;
}
_transform(itemObj, encoding, callback) {
// always update to most recent uniqID
this._lastUniqID = itemObj.h.toString();
if (this._lastTs === null || itemObj.ts.toNumber() > this._lastTs) {
this._lastTs = itemObj.ts.toNumber();
}
// only push to stream unpublished objects
if (!this._unpublishedListing) {
// When an oplog with a unique ID that is stored in the
// log offset is found, all oplogs AFTER this is unpublished.
if (!this._lastEndID || this._lastEndID === itemObj.h.toString()) {
if (!this._lastSavedID || this._lastSavedID === this._lastUniqID) {
this._unpublishedListing = true;
} else if (this._latestOplogID === this._lastUniqID) {
this._logger.warn(
'did not encounter the last saved offset in oplog, ' +
'resuming processing right after the latest record ' +
'to date; some entries may have been skipped', {
lastSavedID: this._lastSavedID,
latestRecordID: this._latestOplogID,
});
this._unpublishedListing = true;
}
++this._skipCount;
return callback();
}
@ -62,6 +72,7 @@ class ListRecordStream extends stream.Transform {
} else {
// skip other entry types as we don't need them for now
// ('c', ...?)
++this._skipCount;
return callback();
}
const streamObject = {
@ -73,16 +84,37 @@ class ListRecordStream extends stream.Transform {
return callback(null, streamObject);
}
_flush(callback) {
this.emit('info', {
// store both the timestamp and unique oplog id in an
// opaque JSON string returned to the reader
end: JSON.stringify({
ts: this._lastTs,
/**
* Get an opaque JSON blob containing the latest consumed offset
* from MongoDB oplog.
*
* @return {string} opaque JSON blob
*/
getOffset() {
return JSON.stringify({
uniqID: this._lastUniqID,
}),
});
callback();
}
/**
* Get the number of entries that have been read and skipped from
* MongoDB oplog since the ListRecordStream instance was created.
*
* @return {integer} number of skipped entries
*/
getSkipCount() {
return this._skipCount;
}
/**
* Get whether the stream reached yet-unpublished records
* (i.e. after we reached either the saved unique ID, or the tip
* of the oplog)
*
* @return {boolean} true if we are now returning unpublished records
*/
reachedUnpublishedListing() {
return this._unpublishedListing;
}
}

View File

@ -1,8 +1,9 @@
'use strict'; // eslint-disable-line
const stream = require('stream');
const MongoClient = require('mongodb').MongoClient;
const ListRecordStream = require('./ListRecordStream');
const { Timestamp } = require('bson');
/**
* @class
@ -18,12 +19,9 @@ class LogConsumer {
*/
constructor(mongoConfig, logger) {
const { replicaSetHosts, database } = mongoConfig;
// 'local' is the database where MongoDB has oplogs.rs capped collection
this.database = 'local';
this.mongoUrl = `mongodb://${replicaSetHosts}/local`;
this.logger = logger;
this.metadataDatabase = database;
this.oplogNsRegExp = new RegExp(`^${database}\\.`);
this._mongoUrl = `mongodb://${replicaSetHosts}/local`;
this._logger = logger;
this._oplogNsRegExp = new RegExp(`^${database}\\.`);
}
/**
@ -35,67 +33,103 @@ class LogConsumer {
* @return {undefined}
*/
connectMongo(done) {
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
MongoClient.connect(this._mongoUrl, {
replicaSet: 'rs0',
useNewUrlParser: true,
},
(err, client) => {
if (err) {
this.logger.error('Unable to connect to MongoDB',
this._logger.error('Unable to connect to MongoDB',
{ error: err });
return done(err);
}
this.logger.info('connected to mongodb');
this._logger.info('connected to mongodb');
this.client = client;
this.db = client.db(this.database, {
// 'local' is the database where MongoDB has oplog.rs
// capped collection
this.db = client.db('local', {
ignoreUndefined: true,
});
return done();
});
}
/**
* Read a series of log records from mongo
* Open a tailable cursor to mongo oplog and retrieve a stream of
* records to read
*
* @param {Object} [params] - params object
* @param {String} [params.startSeq] - fetch starting from this
* opaque offset returned previously by mongo ListRecordStream
* in an 'info' event
* @param {Number} [params.limit] - maximum number of log records
* to return
* @param {function} cb - callback function, called with an error
* object or null and an object as 2nd parameter
*
* @return {undefined}
*/
readRecords(params, cb) {
const limit = params.limit || 10000;
let startSeq = { ts: 0 };
let startSeq = {};
if (params.startSeq) {
try {
// parse the opaque JSON string passed through from a
// previous 'info' event
startSeq = JSON.parse(params.startSeq);
} catch (err) {
this.logger.error('malformed startSeq', {
this._logger.error('malformed startSeq', {
startSeq: params.startSeq,
});
// start over if malformed
}
}
const recordStream = new ListRecordStream(this.logger, startSeq.uniqID);
this.coll = this.db.collection('oplog.rs');
this._readLatestOplogID((err, latestOplogID) => {
if (err) {
return cb(err);
}
const recordStream = new ListRecordStream(this._logger,
startSeq.uniqID,
latestOplogID);
return this.coll.find({
ns: this.oplogNsRegExp,
ts: { $gte: Timestamp.fromNumber(startSeq.ts) },
ns: this._oplogNsRegExp,
}, {
limit,
tailable: false,
awaitData: false,
tailable: true,
awaitData: true,
noCursorTimeout: true,
oplogReplay: true,
numberOfRetries: Number.MAX_VALUE,
}, (err, res) => {
res.stream().pipe(recordStream);
recordStream.removeAllListeners('error');
return cb(null, { log: recordStream });
const cursorStream = res.stream();
stream.finished(cursorStream, err => {
if (err) {
this._logger.error('cursor stream error', {
error: err.message,
});
recordStream.emit('error', err);
} else {
this._logger.error('cursor stream closed');
recordStream.emit(
'error', new Error('cursor stream closed'));
}
});
cursorStream.pipe(recordStream);
return cb(null, { log: recordStream, tailable: true });
});
});
}
_readLatestOplogID(cb) {
this.coll.find({
ns: this._oplogNsRegExp,
}, {
ts: 1,
}).sort({
$natural: -1,
}).limit(1).toArray((err, data) => {
if (err) {
return cb(err);
}
const latestOplogID = data[0].h.toString();
this._logger.debug('latest oplog ID read', { latestOplogID });
return cb(null, latestOplogID);
});
}
}

View File

@ -170,19 +170,17 @@ describe('mongoclient.ListRecordStream', () => {
it(`should transform ${entryType}`, done => {
const lrs = new ListRecordStream(logger,
lastEndIDEntry.h.toString());
let dataReceived = false;
lrs.on('info', info => {
assert(dataReceived);
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '-42' },
});
return done();
});
let hasReceivedData = false;
lrs.on('data', entry => {
assert.strictEqual(hasReceivedData, false);
hasReceivedData = true;
assert.deepStrictEqual(entry, expectedStreamEntries[entryType]);
dataReceived = true;
});
lrs.on('end', () => {
assert.strictEqual(hasReceivedData, true);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '-42' });
done();
});
// first write will be ignored by ListRecordStream because
// of the last end ID (-42), it's needed though to bootstrap it
@ -193,20 +191,12 @@ describe('mongoclient.ListRecordStream', () => {
});
it('should ignore other entry types', done => {
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
let infoEmitted = false;
lrs.on('info', info => {
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '-42' },
});
infoEmitted = true;
});
lrs.on('data', entry => {
assert(false, `ListRecordStream did not ignore entry ${entry}`);
});
lrs.on('end', () => {
assert(infoEmitted);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '-42' });
done();
});
// first write will be ignored by ListRecordStream because
@ -217,55 +207,60 @@ describe('mongoclient.ListRecordStream', () => {
});
lrs.end();
});
it('should emit info even if no entry consumed', done => {
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
let infoEmitted = false;
lrs.on('info', info => {
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 0, uniqID: null },
});
infoEmitted = true;
});
lrs.on('data', () => {
assert(false, 'did not expect data from ListRecordStream');
});
lrs.on('end', () => {
assert(infoEmitted);
done();
});
lrs.end();
});
it('should skip entries until uniqID is encountered', done => {
const logEntries = [
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 1234 }),
{ h: 1234, ts: Timestamp.fromNumber(45) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 5678 }),
{ h: 5678, ts: Timestamp.fromNumber(44) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: -1234 }),
{ h: -1234, ts: Timestamp.fromNumber(42) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 2345 }),
{ h: 2345, ts: Timestamp.fromNumber(42) }),
];
const lrs = new ListRecordStream(logger, '5678');
assert.strictEqual(lrs.reachedUnpublishedListing(), false);
let nbReceivedEntries = 0;
let infoEmitted = false;
lrs.on('info', info => {
infoEmitted = true;
const parsedInfo = info;
parsedInfo.end = JSON.parse(parsedInfo.end);
assert.deepStrictEqual(parsedInfo, {
end: { ts: 42, uniqID: '2345' },
lrs.on('data', entry => {
assert.deepStrictEqual(entry, expectedStreamEntries.insert);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
++nbReceivedEntries;
});
lrs.on('end', () => {
assert.strictEqual(nbReceivedEntries, 2);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '2345' });
assert.strictEqual(lrs.getSkipCount(), 2);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
done();
});
logEntries.forEach(entry => lrs.write(entry));
lrs.end();
});
it('should start after latest entry if uniqID is not encountered', done => {
const logEntries = [
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 1234, ts: Timestamp.fromNumber(45) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 5678, ts: Timestamp.fromNumber(44) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: -1234, ts: Timestamp.fromNumber(42) }),
Object.assign({}, mongoProcessedLogEntries.insert,
{ h: 2345, ts: Timestamp.fromNumber(42) }),
];
const lrs = new ListRecordStream(logger, '4242', '-1234');
let nbReceivedEntries = 0;
lrs.on('data', entry => {
assert.deepStrictEqual(entry, expectedStreamEntries.insert);
++nbReceivedEntries;
});
lrs.on('end', () => {
assert.strictEqual(nbReceivedEntries, 2);
assert(infoEmitted);
assert.strictEqual(nbReceivedEntries, 1);
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
{ uniqID: '2345' });
assert.strictEqual(lrs.getSkipCount(), 3);
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
done();
});
logEntries.forEach(entry => lrs.write(entry));