Compare commits

...

10 Commits

Author SHA1 Message Date
JianqinWang 04520404dc remove unused module 2018-02-09 14:00:08 -08:00
JianqinWang a8848cee9a minor text edits - niantic 2018-02-08 18:01:03 -08:00
JianqinWang 934bdffa96 IT FILTERS OUT PREVIOUSLY ADDED OPLOGS!!!" 2018-02-08 17:53:36 -08:00
JianqinWang 7214ff3113 parse until last uniqID 2018-02-08 17:21:07 -08:00
JianqinWang e8b722b7bb pass uniqID 2018-02-08 16:10:15 -08:00
JianqinWang 40aef12063 start stream at beginning 2018-02-08 15:23:47 -08:00
JianqinWang c42f6bc4e2 write undefined will trigger info 2018-02-08 14:27:55 -08:00
JianqinWang 7410cf8f6a put 2018-02-08 12:37:49 -08:00
JianqinWang 15668cfd4e update new methods 2018-02-08 12:37:49 -08:00
JianqinWang 9318b7207f remove incorrect console.log 2018-02-08 12:37:49 -08:00
3 changed files with 49 additions and 20 deletions

View File

@ -1,13 +1,14 @@
'use strict'; // eslint-disable-line 'use strict'; // eslint-disable-line
const async = require('async');
const stream = require('stream'); const stream = require('stream');
const MongoClient = require('mongodb').MongoClient; const MongoClient = require('mongodb').MongoClient;
const Timestamp = require('bson').Timestamp; const { Timestamp, Long } = require('bson');
let lastEndID = undefined;
const ops = { const ops = {
i: 'insert', i: 'put',
u: 'update', u: 'put',
d: 'delete', d: 'delete',
}; };
@ -18,6 +19,10 @@ class ListRecordStream extends stream.Transform {
this.hasStarted = false; this.hasStarted = false;
this.start = undefined; this.start = undefined;
this.end = undefined; this.end = undefined;
this.lastUniqID = undefined;
// this.unpublishedListing is true once we pass the oplog that has the
// start seq timestamp and uniqID 'h'
this.unpublishedListing = undefined;
} }
_transform(itemObj, encoding, callback) { _transform(itemObj, encoding, callback) {
@ -26,21 +31,41 @@ class ListRecordStream extends stream.Transform {
this.emit('info', { this.emit('info', {
start: this.start, start: this.start,
end: this.end, end: this.end,
uniqID: this.lastUniqID,
}); });
return callback(); return callback();
} }
// always update to most recent uniqID
this.lastUniqID = itemObj.h.toString();
if (this.end === undefined || itemObj.ts.toNumber() > this.end) {
this.end = itemObj.ts.toNumber();
}
// only push to stream unpublished objects
if (!this.unpublishedListing) {
if (lastEndID === itemObj.h.toString()) {
this.unpublishedListing = true;
}
return callback();
}
if (!this.hasStarted) { if (!this.hasStarted) {
this.hasStarted = true; this.hasStarted = true;
this.start = itemObj.ts.toNumber(); this.start = itemObj.ts.toNumber();
this.emit('info', {
start: this.start,
end: this.end,
uniqId: this.lastUniqID,
});
} }
if (this.end === undefined || itemObj.ts.toNumber() > this.end) { // don't push oplogs that have already been sent
this.end = itemObj.ts.toNumber(); if (!this.unpublishedListing) {
return callback();
} }
console.log('turn back into timestamp,', Timestamp.fromNumber(timestampNum));
const streamObject = { const streamObject = {
timestamp: new Date(itemObj.ts.high_ * 1000), timestamp: new Date(itemObj.ts.high_ * 1000),
db: itemObj.ns, db: itemObj.ns,
@ -48,12 +73,22 @@ class ListRecordStream extends stream.Transform {
{ {
type: ops[itemObj.op], type: ops[itemObj.op],
key: itemObj.o._id, key: itemObj.o._id,
value: itemObj.o.value, value: JSON.stringify(itemObj.o.value),
}, },
], ],
}; };
return callback(null, streamObject); return callback(null, streamObject);
} }
_flush(callback) {
this.emit('info', {
start: this.start,
end: this.end,
uniqID: this.lastUniqID,
});
this.push(null);
callback();
}
} }
/** /**
@ -137,7 +172,9 @@ class LogConsumer {
readRecords(params, cb) { readRecords(params, cb) {
const recordStream = new ListRecordStream(this.logger); const recordStream = new ListRecordStream(this.logger);
const limit = params.limit || 10000; const limit = params.limit || 10000;
const startSeq = params.startSeq || 0; const startIDandSeq = params.startSeq.toString().split('|');
const startSeq = parseInt(startIDandSeq[0], 10) || 0;
lastEndID = startIDandSeq[1];
// need to somehow limit entries to limit and then stop the stream // need to somehow limit entries to limit and then stop the stream
this.coll = this.db.collection('oplog.rs'); this.coll = this.db.collection('oplog.rs');
@ -158,7 +195,7 @@ class LogConsumer {
recordStream.write(data); recordStream.write(data);
}); });
stream.on('end', () => { stream.on('end', () => {
recordStream.write(); recordStream.write(undefined);
}); });
recordStream.once('info', info => { recordStream.once('info', info => {
recordStream.removeAllListeners('error'); recordStream.removeAllListeners('error');
@ -176,7 +213,7 @@ class LogConsumer {
// tester.connectMongo(() => { // tester.connectMongo(() => {
// return async.waterfall([ // return async.waterfall([
// next => { // next => {
// return tester.readRecords({ startSeq: 6519603494532415000, limit: 5 }, (err, res) => { // return tester.readRecords({ startSeq: 0, limit: 5 }, (err, res) => {
// console.log('cb called for read records'); // console.log('cb called for read records');
// res.log.on('data', data => { // res.log.on('data', data => {
// console.log('Streamed formatted data', data); // console.log('Streamed formatted data', data);

View File

@ -59,13 +59,10 @@ class MongoReadStream extends Readable {
delete query._id; delete query._id;
} }
<<<<<<< 4193394340acb6f242c036548a2860cf3766b84e
if (searchOptions) { if (searchOptions) {
Object.assign(query, searchOptions); Object.assign(query, searchOptions);
} }
=======
>>>>>>> Move mongoclient from s3 to arsenal
this._cursor = c.find(query).sort({ this._cursor = c.find(query).sort({
_id: options.reverse ? -1 : 1, _id: options.reverse ? -1 : 1,
}); });

View File

@ -3,11 +3,7 @@
"engines": { "engines": {
"node": "6.9.5" "node": "6.9.5"
}, },
<<<<<<< 703aafeb723c77937798b9f0f6dcc89014a8fd39
"version": "7.4.0", "version": "7.4.0",
=======
"version": "7.2.2",
>>>>>>> DROP ME: bump arsenal version
"description": "Common utilities for the S3 project components", "description": "Common utilities for the S3 project components",
"main": "index.js", "main": "index.js",
"repository": { "repository": {
@ -32,7 +28,6 @@
"level": "~1.6.0", "level": "~1.6.0",
"level-sublevel": "~6.6.1", "level-sublevel": "~6.6.1",
"mongodb": "^3.0.1", "mongodb": "^3.0.1",
"bson": "^1.0.4",
"node-forge": "^0.7.1", "node-forge": "^0.7.1",
"simple-glob": "^0.1", "simple-glob": "^0.1",
"socket.io": "~1.7.3", "socket.io": "~1.7.3",