Compare commits
10 Commits
057f67a6f5
...
04520404dc
Author | SHA1 | Date |
---|---|---|
JianqinWang | 04520404dc | |
JianqinWang | a8848cee9a | |
JianqinWang | 934bdffa96 | |
JianqinWang | 7214ff3113 | |
JianqinWang | e8b722b7bb | |
JianqinWang | 40aef12063 | |
JianqinWang | c42f6bc4e2 | |
JianqinWang | 7410cf8f6a | |
JianqinWang | 15668cfd4e | |
JianqinWang | 9318b7207f |
|
@ -1,13 +1,14 @@
|
||||||
'use strict'; // eslint-disable-line
|
'use strict'; // eslint-disable-line
|
||||||
|
|
||||||
const async = require('async');
|
|
||||||
const stream = require('stream');
|
const stream = require('stream');
|
||||||
const MongoClient = require('mongodb').MongoClient;
|
const MongoClient = require('mongodb').MongoClient;
|
||||||
const Timestamp = require('bson').Timestamp;
|
const { Timestamp, Long } = require('bson');
|
||||||
|
|
||||||
|
let lastEndID = undefined;
|
||||||
|
|
||||||
const ops = {
|
const ops = {
|
||||||
i: 'insert',
|
i: 'put',
|
||||||
u: 'update',
|
u: 'put',
|
||||||
d: 'delete',
|
d: 'delete',
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -18,6 +19,10 @@ class ListRecordStream extends stream.Transform {
|
||||||
this.hasStarted = false;
|
this.hasStarted = false;
|
||||||
this.start = undefined;
|
this.start = undefined;
|
||||||
this.end = undefined;
|
this.end = undefined;
|
||||||
|
this.lastUniqID = undefined;
|
||||||
|
// this.unpublishedListing is true once we pass the oplog that has the
|
||||||
|
// start seq timestamp and uniqID 'h'
|
||||||
|
this.unpublishedListing = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
_transform(itemObj, encoding, callback) {
|
_transform(itemObj, encoding, callback) {
|
||||||
|
@ -26,21 +31,41 @@ class ListRecordStream extends stream.Transform {
|
||||||
this.emit('info', {
|
this.emit('info', {
|
||||||
start: this.start,
|
start: this.start,
|
||||||
end: this.end,
|
end: this.end,
|
||||||
|
uniqID: this.lastUniqID,
|
||||||
});
|
});
|
||||||
return callback();
|
return callback();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// always update to most recent uniqID
|
||||||
|
this.lastUniqID = itemObj.h.toString();
|
||||||
|
|
||||||
|
if (this.end === undefined || itemObj.ts.toNumber() > this.end) {
|
||||||
|
this.end = itemObj.ts.toNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
// only push to stream unpublished objects
|
||||||
|
if (!this.unpublishedListing) {
|
||||||
|
if (lastEndID === itemObj.h.toString()) {
|
||||||
|
this.unpublishedListing = true;
|
||||||
|
}
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
|
|
||||||
if (!this.hasStarted) {
|
if (!this.hasStarted) {
|
||||||
this.hasStarted = true;
|
this.hasStarted = true;
|
||||||
this.start = itemObj.ts.toNumber();
|
this.start = itemObj.ts.toNumber();
|
||||||
|
this.emit('info', {
|
||||||
|
start: this.start,
|
||||||
|
end: this.end,
|
||||||
|
uniqId: this.lastUniqID,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.end === undefined || itemObj.ts.toNumber() > this.end) {
|
// don't push oplogs that have already been sent
|
||||||
this.end = itemObj.ts.toNumber();
|
if (!this.unpublishedListing) {
|
||||||
|
return callback();
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log('turn back into timestamp,', Timestamp.fromNumber(timestampNum));
|
|
||||||
|
|
||||||
const streamObject = {
|
const streamObject = {
|
||||||
timestamp: new Date(itemObj.ts.high_ * 1000),
|
timestamp: new Date(itemObj.ts.high_ * 1000),
|
||||||
db: itemObj.ns,
|
db: itemObj.ns,
|
||||||
|
@ -48,12 +73,22 @@ class ListRecordStream extends stream.Transform {
|
||||||
{
|
{
|
||||||
type: ops[itemObj.op],
|
type: ops[itemObj.op],
|
||||||
key: itemObj.o._id,
|
key: itemObj.o._id,
|
||||||
value: itemObj.o.value,
|
value: JSON.stringify(itemObj.o.value),
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
};
|
};
|
||||||
return callback(null, streamObject);
|
return callback(null, streamObject);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_flush(callback) {
|
||||||
|
this.emit('info', {
|
||||||
|
start: this.start,
|
||||||
|
end: this.end,
|
||||||
|
uniqID: this.lastUniqID,
|
||||||
|
});
|
||||||
|
this.push(null);
|
||||||
|
callback();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -137,7 +172,9 @@ class LogConsumer {
|
||||||
readRecords(params, cb) {
|
readRecords(params, cb) {
|
||||||
const recordStream = new ListRecordStream(this.logger);
|
const recordStream = new ListRecordStream(this.logger);
|
||||||
const limit = params.limit || 10000;
|
const limit = params.limit || 10000;
|
||||||
const startSeq = params.startSeq || 0;
|
const startIDandSeq = params.startSeq.toString().split('|');
|
||||||
|
const startSeq = parseInt(startIDandSeq[0], 10) || 0;
|
||||||
|
lastEndID = startIDandSeq[1];
|
||||||
// need to somehow limit entries to limit and then stop the stream
|
// need to somehow limit entries to limit and then stop the stream
|
||||||
|
|
||||||
this.coll = this.db.collection('oplog.rs');
|
this.coll = this.db.collection('oplog.rs');
|
||||||
|
@ -158,7 +195,7 @@ class LogConsumer {
|
||||||
recordStream.write(data);
|
recordStream.write(data);
|
||||||
});
|
});
|
||||||
stream.on('end', () => {
|
stream.on('end', () => {
|
||||||
recordStream.write();
|
recordStream.write(undefined);
|
||||||
});
|
});
|
||||||
recordStream.once('info', info => {
|
recordStream.once('info', info => {
|
||||||
recordStream.removeAllListeners('error');
|
recordStream.removeAllListeners('error');
|
||||||
|
@ -176,7 +213,7 @@ class LogConsumer {
|
||||||
// tester.connectMongo(() => {
|
// tester.connectMongo(() => {
|
||||||
// return async.waterfall([
|
// return async.waterfall([
|
||||||
// next => {
|
// next => {
|
||||||
// return tester.readRecords({ startSeq: 6519603494532415000, limit: 5 }, (err, res) => {
|
// return tester.readRecords({ startSeq: 0, limit: 5 }, (err, res) => {
|
||||||
// console.log('cb called for read records');
|
// console.log('cb called for read records');
|
||||||
// res.log.on('data', data => {
|
// res.log.on('data', data => {
|
||||||
// console.log('Streamed formatted data', data);
|
// console.log('Streamed formatted data', data);
|
||||||
|
|
|
@ -59,13 +59,10 @@ class MongoReadStream extends Readable {
|
||||||
delete query._id;
|
delete query._id;
|
||||||
}
|
}
|
||||||
|
|
||||||
<<<<<<< 4193394340acb6f242c036548a2860cf3766b84e
|
|
||||||
if (searchOptions) {
|
if (searchOptions) {
|
||||||
Object.assign(query, searchOptions);
|
Object.assign(query, searchOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
=======
|
|
||||||
>>>>>>> Move mongoclient from s3 to arsenal
|
|
||||||
this._cursor = c.find(query).sort({
|
this._cursor = c.find(query).sort({
|
||||||
_id: options.reverse ? -1 : 1,
|
_id: options.reverse ? -1 : 1,
|
||||||
});
|
});
|
||||||
|
|
|
@ -3,11 +3,7 @@
|
||||||
"engines": {
|
"engines": {
|
||||||
"node": "6.9.5"
|
"node": "6.9.5"
|
||||||
},
|
},
|
||||||
<<<<<<< 703aafeb723c77937798b9f0f6dcc89014a8fd39
|
|
||||||
"version": "7.4.0",
|
"version": "7.4.0",
|
||||||
=======
|
|
||||||
"version": "7.2.2",
|
|
||||||
>>>>>>> DROP ME: bump arsenal version
|
|
||||||
"description": "Common utilities for the S3 project components",
|
"description": "Common utilities for the S3 project components",
|
||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
"repository": {
|
"repository": {
|
||||||
|
@ -32,7 +28,6 @@
|
||||||
"level": "~1.6.0",
|
"level": "~1.6.0",
|
||||||
"level-sublevel": "~6.6.1",
|
"level-sublevel": "~6.6.1",
|
||||||
"mongodb": "^3.0.1",
|
"mongodb": "^3.0.1",
|
||||||
"bson": "^1.0.4",
|
|
||||||
"node-forge": "^0.7.1",
|
"node-forge": "^0.7.1",
|
||||||
"simple-glob": "^0.1",
|
"simple-glob": "^0.1",
|
||||||
"socket.io": "~1.7.3",
|
"socket.io": "~1.7.3",
|
||||||
|
|
Loading…
Reference in New Issue