Compare commits
6 Commits
18dfc6b4fa
...
60f36bcd12
Author | SHA1 | Date |
---|---|---|
Jonathan Gramain | 60f36bcd12 | |
Jonathan Gramain | 4640762fb3 | |
Jonathan Gramain | 77ec227552 | |
Jonathan Gramain | 4e20c1c78c | |
Jonathan Gramain | b3176ed217 | |
Jonathan Gramain | 829eba47e9 |
|
@ -1,32 +1,42 @@
|
|||
const stream = require('stream');
|
||||
|
||||
class ListRecordStream extends stream.Transform {
|
||||
constructor(logger, lastEndID) {
|
||||
constructor(logger, lastSavedID, latestOplogID) {
|
||||
super({ objectMode: true });
|
||||
this._logger = logger;
|
||||
this._lastEndID = lastEndID;
|
||||
this._lastTs = 0;
|
||||
this._lastSavedID = lastSavedID;
|
||||
this._latestOplogID = latestOplogID;
|
||||
this._lastUniqID = null;
|
||||
// this._unpublishedListing is true once we pass the oplog
|
||||
// that has the start seq timestamp and uniqID 'h'
|
||||
// record that has the same uniqID 'h' than last saved. If we
|
||||
// don't find it (e.g. log rolled over before populator could
|
||||
// process its oldest entries), we will restart from the
|
||||
// latest record of the oplog.
|
||||
this._unpublishedListing = false;
|
||||
this._skipCount = 0;
|
||||
}
|
||||
|
||||
_transform(itemObj, encoding, callback) {
|
||||
// always update to most recent uniqID
|
||||
this._lastUniqID = itemObj.h.toString();
|
||||
|
||||
if (this._lastTs === null || itemObj.ts.toNumber() > this._lastTs) {
|
||||
this._lastTs = itemObj.ts.toNumber();
|
||||
}
|
||||
|
||||
// only push to stream unpublished objects
|
||||
if (!this._unpublishedListing) {
|
||||
// When an oplog with a unique ID that is stored in the
|
||||
// log offset is found, all oplogs AFTER this is unpublished.
|
||||
if (!this._lastEndID || this._lastEndID === itemObj.h.toString()) {
|
||||
if (!this._lastSavedID || this._lastSavedID === this._lastUniqID) {
|
||||
this._unpublishedListing = true;
|
||||
} else if (this._latestOplogID === this._lastUniqID) {
|
||||
this._logger.warn(
|
||||
'did not encounter the last saved offset in oplog, ' +
|
||||
'resuming processing right after the latest record ' +
|
||||
'to date; some entries may have been skipped', {
|
||||
lastSavedID: this._lastSavedID,
|
||||
latestRecordID: this._latestOplogID,
|
||||
});
|
||||
this._unpublishedListing = true;
|
||||
}
|
||||
++this._skipCount;
|
||||
return callback();
|
||||
}
|
||||
|
||||
|
@ -62,6 +72,7 @@ class ListRecordStream extends stream.Transform {
|
|||
} else {
|
||||
// skip other entry types as we don't need them for now
|
||||
// ('c', ...?)
|
||||
++this._skipCount;
|
||||
return callback();
|
||||
}
|
||||
const streamObject = {
|
||||
|
@ -73,16 +84,37 @@ class ListRecordStream extends stream.Transform {
|
|||
return callback(null, streamObject);
|
||||
}
|
||||
|
||||
_flush(callback) {
|
||||
this.emit('info', {
|
||||
// store both the timestamp and unique oplog id in an
|
||||
// opaque JSON string returned to the reader
|
||||
end: JSON.stringify({
|
||||
ts: this._lastTs,
|
||||
/**
|
||||
* Get an opaque JSON blob containing the latest consumed offset
|
||||
* from MongoDB oplog.
|
||||
*
|
||||
* @return {string} opaque JSON blob
|
||||
*/
|
||||
getOffset() {
|
||||
return JSON.stringify({
|
||||
uniqID: this._lastUniqID,
|
||||
}),
|
||||
});
|
||||
callback();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of entries that have been read and skipped from
|
||||
* MongoDB oplog since the ListRecordStream instance was created.
|
||||
*
|
||||
* @return {integer} number of skipped entries
|
||||
*/
|
||||
getSkipCount() {
|
||||
return this._skipCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get whether the stream reached yet-unpublished records
|
||||
* (i.e. after we reached either the saved unique ID, or the tip
|
||||
* of the oplog)
|
||||
*
|
||||
* @return {boolean} true if we are now returning unpublished records
|
||||
*/
|
||||
reachedUnpublishedListing() {
|
||||
return this._unpublishedListing;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
'use strict'; // eslint-disable-line
|
||||
|
||||
const stream = require('stream');
|
||||
|
||||
const MongoClient = require('mongodb').MongoClient;
|
||||
const ListRecordStream = require('./ListRecordStream');
|
||||
const { Timestamp } = require('bson');
|
||||
|
||||
/**
|
||||
* @class
|
||||
|
@ -18,12 +19,9 @@ class LogConsumer {
|
|||
*/
|
||||
constructor(mongoConfig, logger) {
|
||||
const { replicaSetHosts, database } = mongoConfig;
|
||||
// 'local' is the database where MongoDB has oplogs.rs capped collection
|
||||
this.database = 'local';
|
||||
this.mongoUrl = `mongodb://${replicaSetHosts}/local`;
|
||||
this.logger = logger;
|
||||
this.metadataDatabase = database;
|
||||
this.oplogNsRegExp = new RegExp(`^${database}\\.`);
|
||||
this._mongoUrl = `mongodb://${replicaSetHosts}/local`;
|
||||
this._logger = logger;
|
||||
this._oplogNsRegExp = new RegExp(`^${database}\\.`);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -35,67 +33,103 @@ class LogConsumer {
|
|||
* @return {undefined}
|
||||
*/
|
||||
connectMongo(done) {
|
||||
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
|
||||
MongoClient.connect(this._mongoUrl, {
|
||||
replicaSet: 'rs0',
|
||||
useNewUrlParser: true,
|
||||
},
|
||||
(err, client) => {
|
||||
if (err) {
|
||||
this.logger.error('Unable to connect to MongoDB',
|
||||
this._logger.error('Unable to connect to MongoDB',
|
||||
{ error: err });
|
||||
return done(err);
|
||||
}
|
||||
this.logger.info('connected to mongodb');
|
||||
this._logger.info('connected to mongodb');
|
||||
this.client = client;
|
||||
this.db = client.db(this.database, {
|
||||
// 'local' is the database where MongoDB has oplog.rs
|
||||
// capped collection
|
||||
this.db = client.db('local', {
|
||||
ignoreUndefined: true,
|
||||
});
|
||||
return done();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a series of log records from mongo
|
||||
* Open a tailable cursor to mongo oplog and retrieve a stream of
|
||||
* records to read
|
||||
*
|
||||
* @param {Object} [params] - params object
|
||||
* @param {String} [params.startSeq] - fetch starting from this
|
||||
* opaque offset returned previously by mongo ListRecordStream
|
||||
* in an 'info' event
|
||||
* @param {Number} [params.limit] - maximum number of log records
|
||||
* to return
|
||||
* @param {function} cb - callback function, called with an error
|
||||
* object or null and an object as 2nd parameter
|
||||
*
|
||||
* @return {undefined}
|
||||
*/
|
||||
readRecords(params, cb) {
|
||||
const limit = params.limit || 10000;
|
||||
let startSeq = { ts: 0 };
|
||||
let startSeq = {};
|
||||
if (params.startSeq) {
|
||||
try {
|
||||
// parse the opaque JSON string passed through from a
|
||||
// previous 'info' event
|
||||
startSeq = JSON.parse(params.startSeq);
|
||||
} catch (err) {
|
||||
this.logger.error('malformed startSeq', {
|
||||
this._logger.error('malformed startSeq', {
|
||||
startSeq: params.startSeq,
|
||||
});
|
||||
// start over if malformed
|
||||
}
|
||||
}
|
||||
const recordStream = new ListRecordStream(this.logger, startSeq.uniqID);
|
||||
|
||||
this.coll = this.db.collection('oplog.rs');
|
||||
this._readLatestOplogID((err, latestOplogID) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
const recordStream = new ListRecordStream(this._logger,
|
||||
startSeq.uniqID,
|
||||
latestOplogID);
|
||||
return this.coll.find({
|
||||
ns: this.oplogNsRegExp,
|
||||
ts: { $gte: Timestamp.fromNumber(startSeq.ts) },
|
||||
ns: this._oplogNsRegExp,
|
||||
}, {
|
||||
limit,
|
||||
tailable: false,
|
||||
awaitData: false,
|
||||
tailable: true,
|
||||
awaitData: true,
|
||||
noCursorTimeout: true,
|
||||
oplogReplay: true,
|
||||
numberOfRetries: Number.MAX_VALUE,
|
||||
}, (err, res) => {
|
||||
res.stream().pipe(recordStream);
|
||||
recordStream.removeAllListeners('error');
|
||||
return cb(null, { log: recordStream });
|
||||
const cursorStream = res.stream();
|
||||
stream.finished(cursorStream, err => {
|
||||
if (err) {
|
||||
this._logger.error('cursor stream error', {
|
||||
error: err.message,
|
||||
});
|
||||
recordStream.emit('error', err);
|
||||
} else {
|
||||
this._logger.error('cursor stream closed');
|
||||
recordStream.emit(
|
||||
'error', new Error('cursor stream closed'));
|
||||
}
|
||||
});
|
||||
cursorStream.pipe(recordStream);
|
||||
return cb(null, { log: recordStream, tailable: true });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
_readLatestOplogID(cb) {
|
||||
this.coll.find({
|
||||
ns: this._oplogNsRegExp,
|
||||
}, {
|
||||
ts: 1,
|
||||
}).sort({
|
||||
$natural: -1,
|
||||
}).limit(1).toArray((err, data) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
const latestOplogID = data[0].h.toString();
|
||||
this._logger.debug('latest oplog ID read', { latestOplogID });
|
||||
return cb(null, latestOplogID);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -170,19 +170,17 @@ describe('mongoclient.ListRecordStream', () => {
|
|||
it(`should transform ${entryType}`, done => {
|
||||
const lrs = new ListRecordStream(logger,
|
||||
lastEndIDEntry.h.toString());
|
||||
let dataReceived = false;
|
||||
lrs.on('info', info => {
|
||||
assert(dataReceived);
|
||||
const parsedInfo = info;
|
||||
parsedInfo.end = JSON.parse(parsedInfo.end);
|
||||
assert.deepStrictEqual(parsedInfo, {
|
||||
end: { ts: 42, uniqID: '-42' },
|
||||
});
|
||||
return done();
|
||||
});
|
||||
let hasReceivedData = false;
|
||||
lrs.on('data', entry => {
|
||||
assert.strictEqual(hasReceivedData, false);
|
||||
hasReceivedData = true;
|
||||
assert.deepStrictEqual(entry, expectedStreamEntries[entryType]);
|
||||
dataReceived = true;
|
||||
});
|
||||
lrs.on('end', () => {
|
||||
assert.strictEqual(hasReceivedData, true);
|
||||
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
|
||||
{ uniqID: '-42' });
|
||||
done();
|
||||
});
|
||||
// first write will be ignored by ListRecordStream because
|
||||
// of the last end ID (-42), it's needed though to bootstrap it
|
||||
|
@ -193,20 +191,12 @@ describe('mongoclient.ListRecordStream', () => {
|
|||
});
|
||||
it('should ignore other entry types', done => {
|
||||
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
|
||||
let infoEmitted = false;
|
||||
lrs.on('info', info => {
|
||||
const parsedInfo = info;
|
||||
parsedInfo.end = JSON.parse(parsedInfo.end);
|
||||
assert.deepStrictEqual(parsedInfo, {
|
||||
end: { ts: 42, uniqID: '-42' },
|
||||
});
|
||||
infoEmitted = true;
|
||||
});
|
||||
lrs.on('data', entry => {
|
||||
assert(false, `ListRecordStream did not ignore entry ${entry}`);
|
||||
});
|
||||
lrs.on('end', () => {
|
||||
assert(infoEmitted);
|
||||
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
|
||||
{ uniqID: '-42' });
|
||||
done();
|
||||
});
|
||||
// first write will be ignored by ListRecordStream because
|
||||
|
@ -217,55 +207,60 @@ describe('mongoclient.ListRecordStream', () => {
|
|||
});
|
||||
lrs.end();
|
||||
});
|
||||
it('should emit info even if no entry consumed', done => {
|
||||
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
|
||||
let infoEmitted = false;
|
||||
lrs.on('info', info => {
|
||||
const parsedInfo = info;
|
||||
parsedInfo.end = JSON.parse(parsedInfo.end);
|
||||
assert.deepStrictEqual(parsedInfo, {
|
||||
end: { ts: 0, uniqID: null },
|
||||
});
|
||||
infoEmitted = true;
|
||||
});
|
||||
lrs.on('data', () => {
|
||||
assert(false, 'did not expect data from ListRecordStream');
|
||||
});
|
||||
lrs.on('end', () => {
|
||||
assert(infoEmitted);
|
||||
done();
|
||||
});
|
||||
lrs.end();
|
||||
});
|
||||
it('should skip entries until uniqID is encountered', done => {
|
||||
const logEntries = [
|
||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||
{ h: 1234 }),
|
||||
{ h: 1234, ts: Timestamp.fromNumber(45) }),
|
||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||
{ h: 5678 }),
|
||||
{ h: 5678, ts: Timestamp.fromNumber(44) }),
|
||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||
{ h: -1234 }),
|
||||
{ h: -1234, ts: Timestamp.fromNumber(42) }),
|
||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||
{ h: 2345 }),
|
||||
{ h: 2345, ts: Timestamp.fromNumber(42) }),
|
||||
];
|
||||
const lrs = new ListRecordStream(logger, '5678');
|
||||
assert.strictEqual(lrs.reachedUnpublishedListing(), false);
|
||||
let nbReceivedEntries = 0;
|
||||
let infoEmitted = false;
|
||||
lrs.on('info', info => {
|
||||
infoEmitted = true;
|
||||
const parsedInfo = info;
|
||||
parsedInfo.end = JSON.parse(parsedInfo.end);
|
||||
assert.deepStrictEqual(parsedInfo, {
|
||||
end: { ts: 42, uniqID: '2345' },
|
||||
lrs.on('data', entry => {
|
||||
assert.deepStrictEqual(entry, expectedStreamEntries.insert);
|
||||
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
|
||||
++nbReceivedEntries;
|
||||
});
|
||||
lrs.on('end', () => {
|
||||
assert.strictEqual(nbReceivedEntries, 2);
|
||||
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
|
||||
{ uniqID: '2345' });
|
||||
assert.strictEqual(lrs.getSkipCount(), 2);
|
||||
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
|
||||
done();
|
||||
});
|
||||
logEntries.forEach(entry => lrs.write(entry));
|
||||
lrs.end();
|
||||
});
|
||||
|
||||
it('should start after latest entry if uniqID is not encountered', done => {
|
||||
const logEntries = [
|
||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||
{ h: 1234, ts: Timestamp.fromNumber(45) }),
|
||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||
{ h: 5678, ts: Timestamp.fromNumber(44) }),
|
||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||
{ h: -1234, ts: Timestamp.fromNumber(42) }),
|
||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||
{ h: 2345, ts: Timestamp.fromNumber(42) }),
|
||||
];
|
||||
const lrs = new ListRecordStream(logger, '4242', '-1234');
|
||||
let nbReceivedEntries = 0;
|
||||
lrs.on('data', entry => {
|
||||
assert.deepStrictEqual(entry, expectedStreamEntries.insert);
|
||||
++nbReceivedEntries;
|
||||
});
|
||||
lrs.on('end', () => {
|
||||
assert.strictEqual(nbReceivedEntries, 2);
|
||||
assert(infoEmitted);
|
||||
assert.strictEqual(nbReceivedEntries, 1);
|
||||
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
|
||||
{ uniqID: '2345' });
|
||||
assert.strictEqual(lrs.getSkipCount(), 3);
|
||||
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
|
||||
done();
|
||||
});
|
||||
logEntries.forEach(entry => lrs.write(entry));
|
||||
|
|
Loading…
Reference in New Issue