Compare commits
6 Commits
18dfc6b4fa
...
60f36bcd12
Author | SHA1 | Date |
---|---|---|
Jonathan Gramain | 60f36bcd12 | |
Jonathan Gramain | 4640762fb3 | |
Jonathan Gramain | 77ec227552 | |
Jonathan Gramain | 4e20c1c78c | |
Jonathan Gramain | b3176ed217 | |
Jonathan Gramain | 829eba47e9 |
|
@ -1,32 +1,42 @@
|
||||||
const stream = require('stream');
|
const stream = require('stream');
|
||||||
|
|
||||||
class ListRecordStream extends stream.Transform {
|
class ListRecordStream extends stream.Transform {
|
||||||
constructor(logger, lastEndID) {
|
constructor(logger, lastSavedID, latestOplogID) {
|
||||||
super({ objectMode: true });
|
super({ objectMode: true });
|
||||||
this._logger = logger;
|
this._logger = logger;
|
||||||
this._lastEndID = lastEndID;
|
this._lastSavedID = lastSavedID;
|
||||||
this._lastTs = 0;
|
this._latestOplogID = latestOplogID;
|
||||||
this._lastUniqID = null;
|
this._lastUniqID = null;
|
||||||
// this._unpublishedListing is true once we pass the oplog
|
// this._unpublishedListing is true once we pass the oplog
|
||||||
// that has the start seq timestamp and uniqID 'h'
|
// record that has the same uniqID 'h' than last saved. If we
|
||||||
|
// don't find it (e.g. log rolled over before populator could
|
||||||
|
// process its oldest entries), we will restart from the
|
||||||
|
// latest record of the oplog.
|
||||||
this._unpublishedListing = false;
|
this._unpublishedListing = false;
|
||||||
|
this._skipCount = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
_transform(itemObj, encoding, callback) {
|
_transform(itemObj, encoding, callback) {
|
||||||
// always update to most recent uniqID
|
// always update to most recent uniqID
|
||||||
this._lastUniqID = itemObj.h.toString();
|
this._lastUniqID = itemObj.h.toString();
|
||||||
|
|
||||||
if (this._lastTs === null || itemObj.ts.toNumber() > this._lastTs) {
|
|
||||||
this._lastTs = itemObj.ts.toNumber();
|
|
||||||
}
|
|
||||||
|
|
||||||
// only push to stream unpublished objects
|
// only push to stream unpublished objects
|
||||||
if (!this._unpublishedListing) {
|
if (!this._unpublishedListing) {
|
||||||
// When an oplog with a unique ID that is stored in the
|
// When an oplog with a unique ID that is stored in the
|
||||||
// log offset is found, all oplogs AFTER this is unpublished.
|
// log offset is found, all oplogs AFTER this is unpublished.
|
||||||
if (!this._lastEndID || this._lastEndID === itemObj.h.toString()) {
|
if (!this._lastSavedID || this._lastSavedID === this._lastUniqID) {
|
||||||
|
this._unpublishedListing = true;
|
||||||
|
} else if (this._latestOplogID === this._lastUniqID) {
|
||||||
|
this._logger.warn(
|
||||||
|
'did not encounter the last saved offset in oplog, ' +
|
||||||
|
'resuming processing right after the latest record ' +
|
||||||
|
'to date; some entries may have been skipped', {
|
||||||
|
lastSavedID: this._lastSavedID,
|
||||||
|
latestRecordID: this._latestOplogID,
|
||||||
|
});
|
||||||
this._unpublishedListing = true;
|
this._unpublishedListing = true;
|
||||||
}
|
}
|
||||||
|
++this._skipCount;
|
||||||
return callback();
|
return callback();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,6 +72,7 @@ class ListRecordStream extends stream.Transform {
|
||||||
} else {
|
} else {
|
||||||
// skip other entry types as we don't need them for now
|
// skip other entry types as we don't need them for now
|
||||||
// ('c', ...?)
|
// ('c', ...?)
|
||||||
|
++this._skipCount;
|
||||||
return callback();
|
return callback();
|
||||||
}
|
}
|
||||||
const streamObject = {
|
const streamObject = {
|
||||||
|
@ -73,16 +84,37 @@ class ListRecordStream extends stream.Transform {
|
||||||
return callback(null, streamObject);
|
return callback(null, streamObject);
|
||||||
}
|
}
|
||||||
|
|
||||||
_flush(callback) {
|
/**
|
||||||
this.emit('info', {
|
* Get an opaque JSON blob containing the latest consumed offset
|
||||||
// store both the timestamp and unique oplog id in an
|
* from MongoDB oplog.
|
||||||
// opaque JSON string returned to the reader
|
*
|
||||||
end: JSON.stringify({
|
* @return {string} opaque JSON blob
|
||||||
ts: this._lastTs,
|
*/
|
||||||
uniqID: this._lastUniqID,
|
getOffset() {
|
||||||
}),
|
return JSON.stringify({
|
||||||
|
uniqID: this._lastUniqID,
|
||||||
});
|
});
|
||||||
callback();
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of entries that have been read and skipped from
|
||||||
|
* MongoDB oplog since the ListRecordStream instance was created.
|
||||||
|
*
|
||||||
|
* @return {integer} number of skipped entries
|
||||||
|
*/
|
||||||
|
getSkipCount() {
|
||||||
|
return this._skipCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get whether the stream reached yet-unpublished records
|
||||||
|
* (i.e. after we reached either the saved unique ID, or the tip
|
||||||
|
* of the oplog)
|
||||||
|
*
|
||||||
|
* @return {boolean} true if we are now returning unpublished records
|
||||||
|
*/
|
||||||
|
reachedUnpublishedListing() {
|
||||||
|
return this._unpublishedListing;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
'use strict'; // eslint-disable-line
|
'use strict'; // eslint-disable-line
|
||||||
|
|
||||||
|
const stream = require('stream');
|
||||||
|
|
||||||
const MongoClient = require('mongodb').MongoClient;
|
const MongoClient = require('mongodb').MongoClient;
|
||||||
const ListRecordStream = require('./ListRecordStream');
|
const ListRecordStream = require('./ListRecordStream');
|
||||||
const { Timestamp } = require('bson');
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @class
|
* @class
|
||||||
|
@ -18,12 +19,9 @@ class LogConsumer {
|
||||||
*/
|
*/
|
||||||
constructor(mongoConfig, logger) {
|
constructor(mongoConfig, logger) {
|
||||||
const { replicaSetHosts, database } = mongoConfig;
|
const { replicaSetHosts, database } = mongoConfig;
|
||||||
// 'local' is the database where MongoDB has oplogs.rs capped collection
|
this._mongoUrl = `mongodb://${replicaSetHosts}/local`;
|
||||||
this.database = 'local';
|
this._logger = logger;
|
||||||
this.mongoUrl = `mongodb://${replicaSetHosts}/local`;
|
this._oplogNsRegExp = new RegExp(`^${database}\\.`);
|
||||||
this.logger = logger;
|
|
||||||
this.metadataDatabase = database;
|
|
||||||
this.oplogNsRegExp = new RegExp(`^${database}\\.`);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -35,67 +33,103 @@ class LogConsumer {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
connectMongo(done) {
|
connectMongo(done) {
|
||||||
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
|
MongoClient.connect(this._mongoUrl, {
|
||||||
|
replicaSet: 'rs0',
|
||||||
|
useNewUrlParser: true,
|
||||||
|
},
|
||||||
(err, client) => {
|
(err, client) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
this.logger.error('Unable to connect to MongoDB',
|
this._logger.error('Unable to connect to MongoDB',
|
||||||
{ error: err });
|
{ error: err });
|
||||||
return done(err);
|
return done(err);
|
||||||
}
|
}
|
||||||
this.logger.info('connected to mongodb');
|
this._logger.info('connected to mongodb');
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.db = client.db(this.database, {
|
// 'local' is the database where MongoDB has oplog.rs
|
||||||
|
// capped collection
|
||||||
|
this.db = client.db('local', {
|
||||||
ignoreUndefined: true,
|
ignoreUndefined: true,
|
||||||
});
|
});
|
||||||
return done();
|
return done();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read a series of log records from mongo
|
* Open a tailable cursor to mongo oplog and retrieve a stream of
|
||||||
|
* records to read
|
||||||
*
|
*
|
||||||
* @param {Object} [params] - params object
|
* @param {Object} [params] - params object
|
||||||
* @param {String} [params.startSeq] - fetch starting from this
|
* @param {String} [params.startSeq] - fetch starting from this
|
||||||
* opaque offset returned previously by mongo ListRecordStream
|
* opaque offset returned previously by mongo ListRecordStream
|
||||||
* in an 'info' event
|
* in an 'info' event
|
||||||
* @param {Number} [params.limit] - maximum number of log records
|
|
||||||
* to return
|
|
||||||
* @param {function} cb - callback function, called with an error
|
* @param {function} cb - callback function, called with an error
|
||||||
* object or null and an object as 2nd parameter
|
* object or null and an object as 2nd parameter
|
||||||
*
|
*
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
readRecords(params, cb) {
|
readRecords(params, cb) {
|
||||||
const limit = params.limit || 10000;
|
let startSeq = {};
|
||||||
let startSeq = { ts: 0 };
|
|
||||||
if (params.startSeq) {
|
if (params.startSeq) {
|
||||||
try {
|
try {
|
||||||
// parse the opaque JSON string passed through from a
|
// parse the opaque JSON string passed through from a
|
||||||
// previous 'info' event
|
// previous 'info' event
|
||||||
startSeq = JSON.parse(params.startSeq);
|
startSeq = JSON.parse(params.startSeq);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.logger.error('malformed startSeq', {
|
this._logger.error('malformed startSeq', {
|
||||||
startSeq: params.startSeq,
|
startSeq: params.startSeq,
|
||||||
});
|
});
|
||||||
// start over if malformed
|
// start over if malformed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const recordStream = new ListRecordStream(this.logger, startSeq.uniqID);
|
|
||||||
|
|
||||||
this.coll = this.db.collection('oplog.rs');
|
this.coll = this.db.collection('oplog.rs');
|
||||||
return this.coll.find({
|
this._readLatestOplogID((err, latestOplogID) => {
|
||||||
ns: this.oplogNsRegExp,
|
if (err) {
|
||||||
ts: { $gte: Timestamp.fromNumber(startSeq.ts) },
|
return cb(err);
|
||||||
|
}
|
||||||
|
const recordStream = new ListRecordStream(this._logger,
|
||||||
|
startSeq.uniqID,
|
||||||
|
latestOplogID);
|
||||||
|
return this.coll.find({
|
||||||
|
ns: this._oplogNsRegExp,
|
||||||
|
}, {
|
||||||
|
tailable: true,
|
||||||
|
awaitData: true,
|
||||||
|
noCursorTimeout: true,
|
||||||
|
numberOfRetries: Number.MAX_VALUE,
|
||||||
|
}, (err, res) => {
|
||||||
|
const cursorStream = res.stream();
|
||||||
|
stream.finished(cursorStream, err => {
|
||||||
|
if (err) {
|
||||||
|
this._logger.error('cursor stream error', {
|
||||||
|
error: err.message,
|
||||||
|
});
|
||||||
|
recordStream.emit('error', err);
|
||||||
|
} else {
|
||||||
|
this._logger.error('cursor stream closed');
|
||||||
|
recordStream.emit(
|
||||||
|
'error', new Error('cursor stream closed'));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cursorStream.pipe(recordStream);
|
||||||
|
return cb(null, { log: recordStream, tailable: true });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_readLatestOplogID(cb) {
|
||||||
|
this.coll.find({
|
||||||
|
ns: this._oplogNsRegExp,
|
||||||
}, {
|
}, {
|
||||||
limit,
|
ts: 1,
|
||||||
tailable: false,
|
}).sort({
|
||||||
awaitData: false,
|
$natural: -1,
|
||||||
noCursorTimeout: true,
|
}).limit(1).toArray((err, data) => {
|
||||||
oplogReplay: true,
|
if (err) {
|
||||||
numberOfRetries: Number.MAX_VALUE,
|
return cb(err);
|
||||||
}, (err, res) => {
|
}
|
||||||
res.stream().pipe(recordStream);
|
const latestOplogID = data[0].h.toString();
|
||||||
recordStream.removeAllListeners('error');
|
this._logger.debug('latest oplog ID read', { latestOplogID });
|
||||||
return cb(null, { log: recordStream });
|
return cb(null, latestOplogID);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,19 +170,17 @@ describe('mongoclient.ListRecordStream', () => {
|
||||||
it(`should transform ${entryType}`, done => {
|
it(`should transform ${entryType}`, done => {
|
||||||
const lrs = new ListRecordStream(logger,
|
const lrs = new ListRecordStream(logger,
|
||||||
lastEndIDEntry.h.toString());
|
lastEndIDEntry.h.toString());
|
||||||
let dataReceived = false;
|
let hasReceivedData = false;
|
||||||
lrs.on('info', info => {
|
|
||||||
assert(dataReceived);
|
|
||||||
const parsedInfo = info;
|
|
||||||
parsedInfo.end = JSON.parse(parsedInfo.end);
|
|
||||||
assert.deepStrictEqual(parsedInfo, {
|
|
||||||
end: { ts: 42, uniqID: '-42' },
|
|
||||||
});
|
|
||||||
return done();
|
|
||||||
});
|
|
||||||
lrs.on('data', entry => {
|
lrs.on('data', entry => {
|
||||||
|
assert.strictEqual(hasReceivedData, false);
|
||||||
|
hasReceivedData = true;
|
||||||
assert.deepStrictEqual(entry, expectedStreamEntries[entryType]);
|
assert.deepStrictEqual(entry, expectedStreamEntries[entryType]);
|
||||||
dataReceived = true;
|
});
|
||||||
|
lrs.on('end', () => {
|
||||||
|
assert.strictEqual(hasReceivedData, true);
|
||||||
|
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
|
||||||
|
{ uniqID: '-42' });
|
||||||
|
done();
|
||||||
});
|
});
|
||||||
// first write will be ignored by ListRecordStream because
|
// first write will be ignored by ListRecordStream because
|
||||||
// of the last end ID (-42), it's needed though to bootstrap it
|
// of the last end ID (-42), it's needed though to bootstrap it
|
||||||
|
@ -193,20 +191,12 @@ describe('mongoclient.ListRecordStream', () => {
|
||||||
});
|
});
|
||||||
it('should ignore other entry types', done => {
|
it('should ignore other entry types', done => {
|
||||||
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
|
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
|
||||||
let infoEmitted = false;
|
|
||||||
lrs.on('info', info => {
|
|
||||||
const parsedInfo = info;
|
|
||||||
parsedInfo.end = JSON.parse(parsedInfo.end);
|
|
||||||
assert.deepStrictEqual(parsedInfo, {
|
|
||||||
end: { ts: 42, uniqID: '-42' },
|
|
||||||
});
|
|
||||||
infoEmitted = true;
|
|
||||||
});
|
|
||||||
lrs.on('data', entry => {
|
lrs.on('data', entry => {
|
||||||
assert(false, `ListRecordStream did not ignore entry ${entry}`);
|
assert(false, `ListRecordStream did not ignore entry ${entry}`);
|
||||||
});
|
});
|
||||||
lrs.on('end', () => {
|
lrs.on('end', () => {
|
||||||
assert(infoEmitted);
|
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
|
||||||
|
{ uniqID: '-42' });
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
// first write will be ignored by ListRecordStream because
|
// first write will be ignored by ListRecordStream because
|
||||||
|
@ -217,55 +207,60 @@ describe('mongoclient.ListRecordStream', () => {
|
||||||
});
|
});
|
||||||
lrs.end();
|
lrs.end();
|
||||||
});
|
});
|
||||||
it('should emit info even if no entry consumed', done => {
|
|
||||||
const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString());
|
|
||||||
let infoEmitted = false;
|
|
||||||
lrs.on('info', info => {
|
|
||||||
const parsedInfo = info;
|
|
||||||
parsedInfo.end = JSON.parse(parsedInfo.end);
|
|
||||||
assert.deepStrictEqual(parsedInfo, {
|
|
||||||
end: { ts: 0, uniqID: null },
|
|
||||||
});
|
|
||||||
infoEmitted = true;
|
|
||||||
});
|
|
||||||
lrs.on('data', () => {
|
|
||||||
assert(false, 'did not expect data from ListRecordStream');
|
|
||||||
});
|
|
||||||
lrs.on('end', () => {
|
|
||||||
assert(infoEmitted);
|
|
||||||
done();
|
|
||||||
});
|
|
||||||
lrs.end();
|
|
||||||
});
|
|
||||||
it('should skip entries until uniqID is encountered', done => {
|
it('should skip entries until uniqID is encountered', done => {
|
||||||
const logEntries = [
|
const logEntries = [
|
||||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||||
{ h: 1234 }),
|
{ h: 1234, ts: Timestamp.fromNumber(45) }),
|
||||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||||
{ h: 5678 }),
|
{ h: 5678, ts: Timestamp.fromNumber(44) }),
|
||||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||||
{ h: -1234 }),
|
{ h: -1234, ts: Timestamp.fromNumber(42) }),
|
||||||
Object.assign({}, mongoProcessedLogEntries.insert,
|
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||||
{ h: 2345 }),
|
{ h: 2345, ts: Timestamp.fromNumber(42) }),
|
||||||
];
|
];
|
||||||
const lrs = new ListRecordStream(logger, '5678');
|
const lrs = new ListRecordStream(logger, '5678');
|
||||||
|
assert.strictEqual(lrs.reachedUnpublishedListing(), false);
|
||||||
let nbReceivedEntries = 0;
|
let nbReceivedEntries = 0;
|
||||||
let infoEmitted = false;
|
lrs.on('data', entry => {
|
||||||
lrs.on('info', info => {
|
assert.deepStrictEqual(entry, expectedStreamEntries.insert);
|
||||||
infoEmitted = true;
|
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
|
||||||
const parsedInfo = info;
|
++nbReceivedEntries;
|
||||||
parsedInfo.end = JSON.parse(parsedInfo.end);
|
|
||||||
assert.deepStrictEqual(parsedInfo, {
|
|
||||||
end: { ts: 42, uniqID: '2345' },
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
lrs.on('end', () => {
|
||||||
|
assert.strictEqual(nbReceivedEntries, 2);
|
||||||
|
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
|
||||||
|
{ uniqID: '2345' });
|
||||||
|
assert.strictEqual(lrs.getSkipCount(), 2);
|
||||||
|
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
logEntries.forEach(entry => lrs.write(entry));
|
||||||
|
lrs.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should start after latest entry if uniqID is not encountered', done => {
|
||||||
|
const logEntries = [
|
||||||
|
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||||
|
{ h: 1234, ts: Timestamp.fromNumber(45) }),
|
||||||
|
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||||
|
{ h: 5678, ts: Timestamp.fromNumber(44) }),
|
||||||
|
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||||
|
{ h: -1234, ts: Timestamp.fromNumber(42) }),
|
||||||
|
Object.assign({}, mongoProcessedLogEntries.insert,
|
||||||
|
{ h: 2345, ts: Timestamp.fromNumber(42) }),
|
||||||
|
];
|
||||||
|
const lrs = new ListRecordStream(logger, '4242', '-1234');
|
||||||
|
let nbReceivedEntries = 0;
|
||||||
lrs.on('data', entry => {
|
lrs.on('data', entry => {
|
||||||
assert.deepStrictEqual(entry, expectedStreamEntries.insert);
|
assert.deepStrictEqual(entry, expectedStreamEntries.insert);
|
||||||
++nbReceivedEntries;
|
++nbReceivedEntries;
|
||||||
});
|
});
|
||||||
lrs.on('end', () => {
|
lrs.on('end', () => {
|
||||||
assert.strictEqual(nbReceivedEntries, 2);
|
assert.strictEqual(nbReceivedEntries, 1);
|
||||||
assert(infoEmitted);
|
assert.deepStrictEqual(JSON.parse(lrs.getOffset()),
|
||||||
|
{ uniqID: '2345' });
|
||||||
|
assert.strictEqual(lrs.getSkipCount(), 3);
|
||||||
|
assert.strictEqual(lrs.reachedUnpublishedListing(), true);
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
logEntries.forEach(entry => lrs.write(entry));
|
logEntries.forEach(entry => lrs.write(entry));
|
||||||
|
|
Loading…
Reference in New Issue