Compare commits
1 Commits
developmen
...
ft/ARSN-65
Author | SHA1 | Date |
---|---|---|
Vianney Rancurel | 108b1b3770 |
|
@ -0,0 +1,251 @@
|
||||||
|
/*
|
||||||
|
* Main interface for bucketd oplog management
|
||||||
|
*/
|
||||||
|
const async = require('async');
|
||||||
|
const { RESTClient: BucketClient } = require('bucketclient');
|
||||||
|
const { jsutil, errors } = require('arsenal');
|
||||||
|
const LogConsumer = require('arsenal/lib/storage/metadata/bucketclient/LogConsumer');
|
||||||
|
const { isMasterKey } = require('arsenal/lib/versioning/Version');
|
||||||
|
const OplogInterface = require('./OplogInterface');
|
||||||
|
|
||||||
|
class BucketdOplogInterface extends OplogInterface {
|
||||||
|
|
||||||
|
constructor(params) {
|
||||||
|
super(params);
|
||||||
|
this.backendRetryTimes = 3;
|
||||||
|
this.backendRetryInterval = 300;
|
||||||
|
this.bucketdOplogQuerySize = 20;
|
||||||
|
this.stopAt = params?.stopAt ?? -1;
|
||||||
|
const bkBootstrap = params?.bootstrap ?? ['localhost:9000'];
|
||||||
|
this.bkClient = new BucketClient(bkBootstrap);
|
||||||
|
}
|
||||||
|
|
||||||
|
start(filter, cb) {
|
||||||
|
if (!(filter.filterType === 'bucket' ||
|
||||||
|
filter.filterType === 'raftSession')) {
|
||||||
|
return cb(errors.NotImplemented);
|
||||||
|
}
|
||||||
|
const filterName = filter.filterName;
|
||||||
|
async.waterfall([
|
||||||
|
/*
|
||||||
|
* In this step we get the raftId for filterName
|
||||||
|
*/
|
||||||
|
next => {
|
||||||
|
if (filter.filterType === 'raftSession') {
|
||||||
|
return next(null, filter.raftSession.raftId);
|
||||||
|
}
|
||||||
|
this.logger.info('obtaining raftId',
|
||||||
|
{ filterName });
|
||||||
|
async.retry(
|
||||||
|
{
|
||||||
|
times: this.backendRetryTimes,
|
||||||
|
interval: this.backendRetryInterval,
|
||||||
|
},
|
||||||
|
done => {
|
||||||
|
this.bkClient.getBucketInformation(
|
||||||
|
filter.bucket.bucketName,
|
||||||
|
null,
|
||||||
|
(err, info) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.info('retrying getBucketInformation', { err, filterName });
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
return done(null, JSON.parse(info));
|
||||||
|
});
|
||||||
|
},
|
||||||
|
(err, res) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error('getBucketInformation too many failures', { err, filterName });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, res.raftSessionId);
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* In this step we get the stored offset if we have it
|
||||||
|
*/
|
||||||
|
(raftId, next) => {
|
||||||
|
let cseq = undefined;
|
||||||
|
this.persist.load(filterName, this.persistData, (err, offset) => {
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
cseq = offset;
|
||||||
|
return next(null, raftId, cseq);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* In this step we acquire the offset if we don't already have it
|
||||||
|
*/
|
||||||
|
(raftId, cseq, next) => {
|
||||||
|
if (cseq !== undefined) {
|
||||||
|
this.logger.info(`skipping cseq acquisition (cseq=${cseq})`,
|
||||||
|
{ filterName });
|
||||||
|
return next(null, raftId, cseq, true);
|
||||||
|
}
|
||||||
|
this.logger.info('cseq acquisition',
|
||||||
|
{ filterName });
|
||||||
|
async.retry(
|
||||||
|
{
|
||||||
|
times: this.backendRetryTimes,
|
||||||
|
interval: this.backendRetryInterval,
|
||||||
|
},
|
||||||
|
done => {
|
||||||
|
this.bkClient.getRaftLog(
|
||||||
|
raftId,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
true,
|
||||||
|
null,
|
||||||
|
(err, stream) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.info('retrying getRaftLog', { err, filterName });
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
const chunks = [];
|
||||||
|
stream.on('data', chunk => {
|
||||||
|
chunks.push(chunk);
|
||||||
|
});
|
||||||
|
stream.on('end', () => {
|
||||||
|
const info = JSON.parse(Buffer.concat(chunks));
|
||||||
|
return done(null, info);
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
},
|
||||||
|
(err, res) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error('getRaftLog too many failures', { err, filterName });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, raftId, res.info.cseq, false);
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* In this step we init the state (e.g. scan)
|
||||||
|
*/
|
||||||
|
(raftId, cseq, skipInit, next) => {
|
||||||
|
if (skipInit) {
|
||||||
|
this.logger.info(`skipping state initialization cseq=${cseq}`,
|
||||||
|
{ filterName });
|
||||||
|
return next(null, raftId, cseq);
|
||||||
|
}
|
||||||
|
this.logger.info(`initializing state cseq=${cseq}`,
|
||||||
|
{ filterName });
|
||||||
|
this.persistData.initState(err => {
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
this.persist.save(
|
||||||
|
filterName, this.persistData, cseq, err => {
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, raftId, cseq);
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* In this step we loop over the oplog
|
||||||
|
*/
|
||||||
|
(raftId, cseq, next) => {
|
||||||
|
this.logger.info(`reading oplog raftId=${raftId} cseq=${cseq}`,
|
||||||
|
{ filterName });
|
||||||
|
// only way to get out of the loop in all cases
|
||||||
|
const nextOnce = jsutil.once(next);
|
||||||
|
let doStop = false;
|
||||||
|
// resume reading the oplog from cseq. changes are idempotent
|
||||||
|
const logConsumer = new LogConsumer({
|
||||||
|
bucketClient: this.bkClient,
|
||||||
|
raftSession: raftId,
|
||||||
|
});
|
||||||
|
let _cseq = cseq;
|
||||||
|
async.until(
|
||||||
|
() => doStop,
|
||||||
|
_next => {
|
||||||
|
logConsumer.readRecords({
|
||||||
|
startSeq: _cseq,
|
||||||
|
limit: this.bucketdOplogQuerySize,
|
||||||
|
}, (err, record) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error('readRecords error', { err, filterName });
|
||||||
|
return setTimeout(() => _next(), 5000);
|
||||||
|
}
|
||||||
|
if (!record.log) {
|
||||||
|
// nothing to read
|
||||||
|
return setTimeout(() => _next(), 5000);
|
||||||
|
}
|
||||||
|
const seqs = [];
|
||||||
|
record.log.on('data', chunk => {
|
||||||
|
seqs.push(chunk);
|
||||||
|
});
|
||||||
|
record.log.on('end', () => {
|
||||||
|
const addQueue = [];
|
||||||
|
const delQueue = [];
|
||||||
|
for (let i = 0; i < seqs.length; i++) {
|
||||||
|
if (filter.filterType === 'raftSession' ||
|
||||||
|
(filter.filterType === 'bucket' &&
|
||||||
|
seqs[i].db === filter.bucket.bucketName)) {
|
||||||
|
for (let j = 0; j < seqs[i].entries.length; j++) {
|
||||||
|
const _item = {};
|
||||||
|
_item.bucketName = seqs[i].db;
|
||||||
|
_item.key = seqs[i].entries[j].key;
|
||||||
|
if (seqs[i].entries[j].type !== undefined &&
|
||||||
|
seqs[i].entries[j].type === 'del') {
|
||||||
|
if (!isMasterKey(_item.key)) {
|
||||||
|
// ignore for now
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
delQueue.push(_item);
|
||||||
|
} else {
|
||||||
|
_item.value = Object.assign({}, seqs[i].entries[j].value);
|
||||||
|
addQueue.push(_item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.persistData.updateState(
|
||||||
|
addQueue, delQueue, err => {
|
||||||
|
if (err) {
|
||||||
|
return _next(err);
|
||||||
|
}
|
||||||
|
_cseq += seqs.length;
|
||||||
|
this.persist.save(
|
||||||
|
filterName, this.persistData, _cseq, err => {
|
||||||
|
if (err) {
|
||||||
|
return _next(err);
|
||||||
|
}
|
||||||
|
if (_cseq > this.stopAt) {
|
||||||
|
doStop = true;
|
||||||
|
}
|
||||||
|
return _next();
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}, err => {
|
||||||
|
if (err) {
|
||||||
|
return nextOnce(err);
|
||||||
|
}
|
||||||
|
return nextOnce();
|
||||||
|
});
|
||||||
|
},
|
||||||
|
], err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.logger.info('returning',
|
||||||
|
{ filterName });
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = BucketdOplogInterface;
|
|
@ -0,0 +1,163 @@
|
||||||
|
/*
|
||||||
|
* Main interface for Mongo oplog management
|
||||||
|
*/
|
||||||
|
const MongoClient = require('mongodb').MongoClient;
|
||||||
|
const bson = require('bson');
|
||||||
|
const { jsutil, errors } = require('arsenal');
|
||||||
|
const async = require('async');
|
||||||
|
const { isMasterKey } = require('arsenal/lib/versioning/Version');
|
||||||
|
const OplogInterface = require('./OplogInterface');
|
||||||
|
|
||||||
|
class MongoOplogInterface extends OplogInterface {
|
||||||
|
|
||||||
|
constructor(params) {
|
||||||
|
super(params);
|
||||||
|
this.mongoDbUri = 'mongodb://localhost:27017';
|
||||||
|
if (params && params.mongoDbUri !== undefined) {
|
||||||
|
this.mongoDbUri = params.mongoDbUri;
|
||||||
|
}
|
||||||
|
this.databaseName = 'metadata';
|
||||||
|
if (params && params.databaseName !== undefined) {
|
||||||
|
this.databaseName = params.databaseName;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
start(filter, cb) {
|
||||||
|
if (filter.filterType !== 'bucket') {
|
||||||
|
return cb(errors.NotImplemented);
|
||||||
|
}
|
||||||
|
const filterName = filter.filterName;
|
||||||
|
const bucketName = filter.bucket.bucketName;
|
||||||
|
let db;
|
||||||
|
let collection;
|
||||||
|
async.waterfall([
|
||||||
|
/*
|
||||||
|
* In this step we connect to MongoDB
|
||||||
|
*/
|
||||||
|
next => {
|
||||||
|
MongoClient.connect(
|
||||||
|
this.mongoDbUri,
|
||||||
|
(err, client) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error('error connecting to mongodb', { err, filterName });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
db = client.db(this.databaseName, {
|
||||||
|
ignoreUndefined: true,
|
||||||
|
});
|
||||||
|
collection = db.collection(bucketName);
|
||||||
|
return next();
|
||||||
|
});
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* In this step we get the stored offset if we have it
|
||||||
|
*/
|
||||||
|
next => {
|
||||||
|
let resumeToken = undefined;
|
||||||
|
this.persist.load(filterName, this.persistData, (err, offset) => {
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
if (offset && offset._data) {
|
||||||
|
resumeToken = {};
|
||||||
|
resumeToken._data = new bson.Binary(Buffer.from(offset._data, 'base64'));
|
||||||
|
}
|
||||||
|
return next(null, resumeToken);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* In this step we acquire the offset if we don't already have it
|
||||||
|
*/
|
||||||
|
(resumeToken, next) => {
|
||||||
|
if (resumeToken !== undefined) {
|
||||||
|
this.logger.info(
|
||||||
|
`skipping resumeToken acquisition (resumeToken=${resumeToken})`,
|
||||||
|
{ filterName });
|
||||||
|
return next(null, resumeToken, true);
|
||||||
|
}
|
||||||
|
this.logger.info('resumeToken acquisition',
|
||||||
|
{ filterName });
|
||||||
|
const changeStream = collection.watch();
|
||||||
|
// big hack to extract resumeToken
|
||||||
|
changeStream.once('change', () => next(null, changeStream.resumeToken, false));
|
||||||
|
return undefined;
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* In this step we init the state (e.g. scan)
|
||||||
|
*/
|
||||||
|
(resumeToken, skipInit, next) => {
|
||||||
|
if (skipInit) {
|
||||||
|
this.logger.info(`skipping state initialization resumeToken=${resumeToken}`,
|
||||||
|
{ filterName });
|
||||||
|
return next(null, resumeToken);
|
||||||
|
}
|
||||||
|
this.logger.info(`initializing state resumeToken=${resumeToken}`,
|
||||||
|
{ filterName });
|
||||||
|
this.persistData.initState(
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
// eslint-disable-next-line
|
||||||
|
console.error(err);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
this.persist.save(
|
||||||
|
filterName, this.persistData, resumeToken, err => {
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, resumeToken);
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* In this step we loop over the oplog
|
||||||
|
*/
|
||||||
|
(resumeToken, next) => {
|
||||||
|
this.logger.info(`reading oplog resumeToken=${resumeToken}`,
|
||||||
|
{ filterName });
|
||||||
|
// only way to get out of the loop in all cases
|
||||||
|
const nextOnce = jsutil.once(next);
|
||||||
|
// read the change stream
|
||||||
|
const changeStream = collection.watch({ resumeAfter: resumeToken });
|
||||||
|
// start bufferization
|
||||||
|
this.filterName = filterName;
|
||||||
|
this.startFlusher();
|
||||||
|
changeStream.on(
|
||||||
|
'change', item => {
|
||||||
|
if (item.ns.db === this.databaseName) {
|
||||||
|
const _item = {};
|
||||||
|
_item.bucketName = bucketName;
|
||||||
|
_item.key = item.documentKey._id;
|
||||||
|
if (item.operationType === 'insert' ||
|
||||||
|
item.operationType === 'replace') {
|
||||||
|
_item.value = Object.assign({}, item.fullDocument.value);
|
||||||
|
this.addEvent(_item, changeStream.resumeToken);
|
||||||
|
} else if (item.operationType === 'delete') {
|
||||||
|
if (!isMasterKey(_item.key)) {
|
||||||
|
// ignore for now
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.delEvent(_item, changeStream.resumeToken);
|
||||||
|
} else if (item.operationType === 'invalidate') {
|
||||||
|
nextOnce();
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}], err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.logger.info('returning',
|
||||||
|
{ filterName });
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = MongoOplogInterface;
|
|
@ -0,0 +1,138 @@
|
||||||
|
/*
|
||||||
|
* Interface for oplog management
|
||||||
|
*
|
||||||
|
* filter is an object with the following structure: {
|
||||||
|
* filterName: string,
|
||||||
|
* filterType: bucket|bucketList|raftSession,
|
||||||
|
* bucket: {
|
||||||
|
* bucketName: string,
|
||||||
|
* },
|
||||||
|
* bucketList: {
|
||||||
|
* bucketList: [string, ...]
|
||||||
|
* },
|
||||||
|
* raftSession: {
|
||||||
|
* raftId: number,
|
||||||
|
* },
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* persist is an interface with the following methods:
|
||||||
|
* - constructor(params)
|
||||||
|
* - load(filterName, persistData, cb(err, offset))
|
||||||
|
* - save(filterName, persistData, offset, cb(err))
|
||||||
|
|
||||||
|
* persistData is an interface with the following methods:
|
||||||
|
* - constuctor(params)
|
||||||
|
* - initState(cb(err)): initialize the structure, e.g. initial bucket scan
|
||||||
|
* - loadState(stream, cb(err)): load the state
|
||||||
|
* - saveState(stream, cb(err)): save the state
|
||||||
|
* - updateState(addQueue, delQueue, cb(err)): update the state
|
||||||
|
* item: { filterName, key, value }
|
||||||
|
*/
|
||||||
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
|
werelogs.configure({
|
||||||
|
level: 'info',
|
||||||
|
dump: 'error',
|
||||||
|
});
|
||||||
|
|
||||||
|
class OplogInterface {
|
||||||
|
|
||||||
|
constructor(params) {
|
||||||
|
this.persist = params?.persist;
|
||||||
|
this.persistData = params?.persistData;
|
||||||
|
this.logger = new werelogs.Logger('OplogInterface');
|
||||||
|
/* for backends requiring bufferization only */
|
||||||
|
this.bufferTimeoutMs = params?.bufferTimeoutMs ?? 500;
|
||||||
|
this.addQueue = [];
|
||||||
|
this.delQueue = [];
|
||||||
|
this.pending = false;
|
||||||
|
this.prevOffset = null;
|
||||||
|
this.offset = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
addEvent(item, offset) {
|
||||||
|
this.addQueue.push(item);
|
||||||
|
this.offset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
delEvent(item, offset) {
|
||||||
|
this.delQueue.push(item);
|
||||||
|
this.offset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Optional buffer management for backends that don't bufferize.
|
||||||
|
* It avoids persisting the state at each event
|
||||||
|
*/
|
||||||
|
flushQueue(cb) {
|
||||||
|
if (this.offset === null ||
|
||||||
|
this.prevOffset === this.offset) {
|
||||||
|
if (cb) {
|
||||||
|
return process.nextTick(cb);
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
if (this.pending) {
|
||||||
|
if (cb) {
|
||||||
|
return process.nextTick(cb);
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
this.pending = true;
|
||||||
|
const addQueue = this.addQueue;
|
||||||
|
this.addQueue = [];
|
||||||
|
const delQueue = this.delQueue;
|
||||||
|
this.delQueue = [];
|
||||||
|
const offset = this.offset;
|
||||||
|
this.prevOffset = this.offset;
|
||||||
|
this.persistData.updateState(
|
||||||
|
addQueue,
|
||||||
|
delQueue,
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
if (cb) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.persist.save(
|
||||||
|
this.filterName,
|
||||||
|
this.persistData,
|
||||||
|
offset,
|
||||||
|
err => {
|
||||||
|
this.pending = false;
|
||||||
|
if (err) {
|
||||||
|
if (cb) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
if (cb) {
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
doFlush() {
|
||||||
|
this.flushQueue(err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error('flusing buffer', { err });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
this.startFlusher();
|
||||||
|
}
|
||||||
|
|
||||||
|
startFlusher() {
|
||||||
|
setTimeout(this.doFlush.bind(this), this.bufferTimeoutMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
// method to be overridden
|
||||||
|
start() {
|
||||||
|
throw new Error('not implemented');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = OplogInterface;
|
|
@ -0,0 +1,91 @@
|
||||||
|
const fs = require('fs');
|
||||||
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
|
werelogs.configure({
|
||||||
|
level: 'info',
|
||||||
|
dump: 'error',
|
||||||
|
});
|
||||||
|
|
||||||
|
class PersistFileInterface {
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.folder = '/tmp';
|
||||||
|
this.logger = new werelogs.Logger('PersistFileInterface');
|
||||||
|
fs.access(this.folder, err => {
|
||||||
|
if (err) {
|
||||||
|
fs.mkdirSync(this.folder, { recursive: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getFileName(filterName) {
|
||||||
|
return `${this.folder}/${filterName}.json`;
|
||||||
|
}
|
||||||
|
|
||||||
|
getOffsetFileName(filterName) {
|
||||||
|
return `${this.folder}/${filterName}.offset.json`;
|
||||||
|
}
|
||||||
|
|
||||||
|
load(filterName, persistData, cb) {
|
||||||
|
const fileName = this.getFileName(filterName);
|
||||||
|
const offsetFileName = this.getOffsetFileName(filterName);
|
||||||
|
let obj = {};
|
||||||
|
fs.readFile(
|
||||||
|
offsetFileName,
|
||||||
|
'utf-8', (err, data) => {
|
||||||
|
if (err) {
|
||||||
|
if (err.code === 'ENOENT') {
|
||||||
|
this.logger.info(`${offsetFileName} non-existent`);
|
||||||
|
} else {
|
||||||
|
this.logger.error('error loading', { err });
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
obj = JSON.parse(data);
|
||||||
|
}
|
||||||
|
if (fs.existsSync(fileName)) {
|
||||||
|
const file = fs.createReadStream(fileName);
|
||||||
|
persistData.loadState(file, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.logger.info(`${fileName} loaded: offset ${obj.offset}`);
|
||||||
|
return cb(null, obj.offset);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.logger.info(`${fileName} non-existent`);
|
||||||
|
return cb(null, obj.offset);
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
save(filterName, persistData, offset, cb) {
|
||||||
|
const fileName = this.getFileName(filterName);
|
||||||
|
const offsetFileName = this.getOffsetFileName(filterName);
|
||||||
|
const file = fs.createWriteStream(fileName);
|
||||||
|
persistData.saveState(file, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
const obj = {
|
||||||
|
offset,
|
||||||
|
};
|
||||||
|
fs.writeFile(
|
||||||
|
offsetFileName, JSON.stringify(obj),
|
||||||
|
'utf-8',
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error('error saving', { err });
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.logger.info(`${fileName} saved: offset ${offset}`);
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = PersistFileInterface;
|
|
@ -0,0 +1,73 @@
|
||||||
|
// fake backend for unit tests
|
||||||
|
const assert = require('assert');
|
||||||
|
const MemoryStream = require('memorystream');
|
||||||
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
|
werelogs.configure({
|
||||||
|
level: 'info',
|
||||||
|
dump: 'error',
|
||||||
|
});
|
||||||
|
|
||||||
|
class PersistMemInterface {
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.memoryStreams = {};
|
||||||
|
this.offsets = {};
|
||||||
|
this.logger = new werelogs.Logger('PersistMemInterface');
|
||||||
|
}
|
||||||
|
|
||||||
|
getOffset(filterName) {
|
||||||
|
if (!this.offsets[filterName]) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
return this.offsets[filterName];
|
||||||
|
}
|
||||||
|
|
||||||
|
setOffset(filterName, offset) {
|
||||||
|
if (!this.memoryStreams[filterName]) {
|
||||||
|
this.memoryStreams[filterName] = new MemoryStream();
|
||||||
|
}
|
||||||
|
if (!this.offsets[filterName]) {
|
||||||
|
this.offsets[filterName] = {};
|
||||||
|
}
|
||||||
|
Object.assign(
|
||||||
|
this.offsets[filterName],
|
||||||
|
offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
load(filterName, persistData, cb) {
|
||||||
|
this.logger.info(`loading ${filterName}`);
|
||||||
|
const stream = this.memoryStreams[filterName];
|
||||||
|
const offset = this.offsets[filterName];
|
||||||
|
if (stream === undefined) {
|
||||||
|
this.logger.info(`${filterName} non-existent`);
|
||||||
|
return cb(null, undefined);
|
||||||
|
}
|
||||||
|
assert(offset !== undefined);
|
||||||
|
persistData.loadState(stream, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.logger.info(`${filterName} loaded: offset ${offset}`);
|
||||||
|
return cb(null, offset);
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
save(filterName, persistData, offset, cb) {
|
||||||
|
this.logger.info(`saving ${filterName} offset ${JSON.stringify(offset)}`);
|
||||||
|
const stream = new MemoryStream();
|
||||||
|
this.memoryStreams[filterName] = stream;
|
||||||
|
persistData.saveState(stream, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.offsets[filterName] = offset;
|
||||||
|
this.logger.info(`${filterName} saved: offset ${offset}`);
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = PersistMemInterface;
|
||||||
|
|
|
@ -0,0 +1,283 @@
|
||||||
|
// Ring backend that persists on Sproxyd and offsets on ZK
|
||||||
|
const async = require('async');
|
||||||
|
const { pipeline } = require('stream');
|
||||||
|
const MemoryStream = require('memorystream');
|
||||||
|
const zlib = require('zlib');
|
||||||
|
const zookeeper = require('node-zookeeper-client');
|
||||||
|
const Sproxy = require('sproxydclient');
|
||||||
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
|
werelogs.configure({
|
||||||
|
level: 'info',
|
||||||
|
dump: 'error',
|
||||||
|
});
|
||||||
|
|
||||||
|
class PersistRingInterface {
|
||||||
|
|
||||||
|
constructor(params) {
|
||||||
|
let zkConnectionString = 'localhost:2181';
|
||||||
|
if (params && params.zkConnectionString !== undefined) {
|
||||||
|
zkConnectionString = params.zkConnectionString;
|
||||||
|
}
|
||||||
|
this.zkPath = '/persist-ring-interface';
|
||||||
|
if (params && params.zkPath !== undefined) {
|
||||||
|
this.zkPath = params.zkPath;
|
||||||
|
}
|
||||||
|
let spPath = '/proxy/DC1/'; // do not forget "/" at the end !!!
|
||||||
|
if (params && params.spPath !== undefined) {
|
||||||
|
spPath = params.spPath;
|
||||||
|
}
|
||||||
|
let spBootstrap = ['localhost:8181'];
|
||||||
|
if (params && params.spBootstrap !== undefined) {
|
||||||
|
spBootstrap = params.spBootstrap;
|
||||||
|
}
|
||||||
|
this.reqUid = 'persist-ring-interface-req-uid';
|
||||||
|
this.logger = new werelogs.Logger('PersistRingInterface');
|
||||||
|
this.zkClient = zookeeper.createClient(zkConnectionString);
|
||||||
|
this.zkClient.connect();
|
||||||
|
this.zkClient.on('error', err => {
|
||||||
|
this.logger.error('error connecting', { err });
|
||||||
|
});
|
||||||
|
this.zkClient.once('connected', () => {
|
||||||
|
this.logger.info('connected');
|
||||||
|
});
|
||||||
|
this.spClient = new Sproxy({
|
||||||
|
bootstrap: spBootstrap,
|
||||||
|
path: spPath,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getZKPath(filterName) {
|
||||||
|
return `${this.zkPath}/${filterName}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
load(filterName, persistData, cb) {
|
||||||
|
this.logger.info(`loading ${filterName}`);
|
||||||
|
async.waterfall([
|
||||||
|
/*
|
||||||
|
* Check of we have an existing Zookeeper node
|
||||||
|
*/
|
||||||
|
next => {
|
||||||
|
this.zkClient.getData(
|
||||||
|
this.getZKPath(filterName),
|
||||||
|
(err, data) => {
|
||||||
|
if (err) {
|
||||||
|
if (err.name === 'NO_NODE') {
|
||||||
|
this.logger.info(`${filterName} non-existent`);
|
||||||
|
} else {
|
||||||
|
this.logger.error(`getData ${filterName} error`, { err });
|
||||||
|
}
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, data);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* Extract the Sproxyd key from the Zookeeper node.
|
||||||
|
* Read the key from Sproxyd.
|
||||||
|
*/
|
||||||
|
(data, next) => {
|
||||||
|
const _data = JSON.parse(data.toString());
|
||||||
|
this.spClient.get(
|
||||||
|
_data.key,
|
||||||
|
undefined,
|
||||||
|
this.reqUid,
|
||||||
|
(err, stream) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`sproxyd ${filterName} error`, { err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, _data, stream);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* Uncompress the stream in memory
|
||||||
|
*/
|
||||||
|
(_data, stream, next) => {
|
||||||
|
const ostream = new MemoryStream();
|
||||||
|
pipeline(
|
||||||
|
stream,
|
||||||
|
zlib.createGunzip(),
|
||||||
|
ostream,
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`pipeline ${filterName} error`, { err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, _data, ostream);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* Load the state from uncompressed stream
|
||||||
|
*/
|
||||||
|
(_data, stream, next) => {
|
||||||
|
persistData.loadState(stream, err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`load ${filterName} error`, { err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
this.logger.info(`${filterName} loaded: offset ${_data.offset}`);
|
||||||
|
return next(null, _data);
|
||||||
|
});
|
||||||
|
}], (err, _data) => {
|
||||||
|
if (err) {
|
||||||
|
if (err.name === 'NO_NODE') {
|
||||||
|
return cb(null, undefined);
|
||||||
|
}
|
||||||
|
this.logger.error(`load ${filterName} error`, { err });
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
return cb(null, _data.offset);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
save(filterName, persistData, offset, cb) {
|
||||||
|
this.logger.info(`saving ${filterName} offset ${offset}`);
|
||||||
|
async.waterfall([
|
||||||
|
/*
|
||||||
|
* Save the state in a memory stream
|
||||||
|
*/
|
||||||
|
next => {
|
||||||
|
const stream = new MemoryStream();
|
||||||
|
persistData.saveState(
|
||||||
|
stream, err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`save ${filterName} error`, { err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, stream);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* Compress the state in memory
|
||||||
|
*/
|
||||||
|
(stream, next) => {
|
||||||
|
const ostream = new MemoryStream();
|
||||||
|
pipeline(
|
||||||
|
stream,
|
||||||
|
zlib.createGzip(),
|
||||||
|
ostream,
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`pipeline ${filterName} error`, { err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, ostream);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* Store the state in Sproxyd
|
||||||
|
*/
|
||||||
|
(stream, next) => {
|
||||||
|
const parameters = {
|
||||||
|
filterName,
|
||||||
|
namespace: 'persist-ring-interface',
|
||||||
|
owner: 'persist-ring-interface',
|
||||||
|
};
|
||||||
|
const size = stream._readableState.length;
|
||||||
|
this.spClient.put(
|
||||||
|
stream,
|
||||||
|
size,
|
||||||
|
parameters,
|
||||||
|
this.reqUid,
|
||||||
|
(err, key) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`sproxyd put ${filterName} error`, { err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
const newData = {};
|
||||||
|
newData.offset = offset;
|
||||||
|
newData.key = key;
|
||||||
|
return next(null, newData);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* Check if the Zookeeper node exists
|
||||||
|
*/
|
||||||
|
(newData, next) => {
|
||||||
|
this.zkClient.exists(
|
||||||
|
this.getZKPath(filterName),
|
||||||
|
(err, stat) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`exists ${filterName} error`, { err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
let doesExist = false;
|
||||||
|
if (stat) {
|
||||||
|
doesExist = true;
|
||||||
|
}
|
||||||
|
return next(null, newData, doesExist);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* If the Zookeeper node exists read it.
|
||||||
|
* Else create it.
|
||||||
|
*/
|
||||||
|
(newData, doesExist, next) => {
|
||||||
|
if (doesExist) {
|
||||||
|
this.zkClient.getData(
|
||||||
|
this.getZKPath(filterName),
|
||||||
|
(err, _oldData) => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`getData ${filterName} error`, { err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
const oldData = JSON.parse(_oldData);
|
||||||
|
return next(null, newData, oldData);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.zkClient.mkdirp(
|
||||||
|
this.getZKPath(filterName),
|
||||||
|
null,
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`mkdirp ${filterName} error`, { err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, newData, null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* Store the context in the Zookeeper node and delete the old sproxyd key.
|
||||||
|
*/
|
||||||
|
(newData, oldData, next) => {
|
||||||
|
const _newData = JSON.stringify(newData);
|
||||||
|
this.zkClient.setData(
|
||||||
|
this.getZKPath(filterName),
|
||||||
|
Buffer.from(_newData),
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`setData ${filterName} error`, { err });
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.logger.info(`${filterName} saved: new key ${newData.key} offset ${offset}`);
|
||||||
|
if (oldData) {
|
||||||
|
this.spClient.delete(
|
||||||
|
oldData.key,
|
||||||
|
this.reqUid,
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(
|
||||||
|
`sproxyd del ${filterName} old key ${oldData.key} error`,
|
||||||
|
{ err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next();
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
return next();
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}], err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error(`save ${filterName} error`, { err });
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = PersistRingInterface;
|
|
@ -3,7 +3,7 @@
|
||||||
"engines": {
|
"engines": {
|
||||||
"node": ">=16"
|
"node": ">=16"
|
||||||
},
|
},
|
||||||
"version": "7.10.13",
|
"version": "7.10.14",
|
||||||
"description": "Common utilities for the S3 project components",
|
"description": "Common utilities for the S3 project components",
|
||||||
"main": "build/index.js",
|
"main": "build/index.js",
|
||||||
"repository": {
|
"repository": {
|
||||||
|
@ -36,8 +36,10 @@
|
||||||
"ipaddr.js": "1.9.1",
|
"ipaddr.js": "1.9.1",
|
||||||
"level": "~5.0.1",
|
"level": "~5.0.1",
|
||||||
"level-sublevel": "~6.6.5",
|
"level-sublevel": "~6.6.5",
|
||||||
|
"memorystream": "^0.3.1",
|
||||||
"mongodb": "^3.0.1",
|
"mongodb": "^3.0.1",
|
||||||
"node-forge": "^0.7.1",
|
"node-forge": "^0.7.1",
|
||||||
|
"node-zookeeper-client": "^0.2.2",
|
||||||
"prom-client": "10.2.3",
|
"prom-client": "10.2.3",
|
||||||
"simple-glob": "^0.2",
|
"simple-glob": "^0.2",
|
||||||
"socket.io": "~2.3.0",
|
"socket.io": "~2.3.0",
|
||||||
|
@ -57,6 +59,7 @@
|
||||||
"@sinonjs/fake-timers": "^6.0.1",
|
"@sinonjs/fake-timers": "^6.0.1",
|
||||||
"@types/jest": "^27.4.1",
|
"@types/jest": "^27.4.1",
|
||||||
"@types/node": "^17.0.21",
|
"@types/node": "^17.0.21",
|
||||||
|
"argparse": "^2.0.1",
|
||||||
"eslint": "2.13.1",
|
"eslint": "2.13.1",
|
||||||
"eslint-config-airbnb": "6.2.0",
|
"eslint-config-airbnb": "6.2.0",
|
||||||
"eslint-config-scality": "scality/Guidelines#7.10.2",
|
"eslint-config-scality": "scality/Guidelines#7.10.2",
|
||||||
|
@ -64,6 +67,7 @@
|
||||||
"jest": "^27.5.1",
|
"jest": "^27.5.1",
|
||||||
"mocha": "8.0.1",
|
"mocha": "8.0.1",
|
||||||
"mongodb-memory-server": "^6.0.2",
|
"mongodb-memory-server": "^6.0.2",
|
||||||
|
"prando": "^6.0.1",
|
||||||
"sinon": "^9.0.2",
|
"sinon": "^9.0.2",
|
||||||
"temp": "0.9.1",
|
"temp": "0.9.1",
|
||||||
"ts-jest": "^27.1.3",
|
"ts-jest": "^27.1.3",
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
{
|
||||||
|
"acl": {
|
||||||
|
"Canned": "private",
|
||||||
|
"FULL_CONTROL": [],
|
||||||
|
"WRITE": [],
|
||||||
|
"WRITE_ACP": [],
|
||||||
|
"READ": [],
|
||||||
|
"READ_ACP": []
|
||||||
|
},
|
||||||
|
"name": "BucketName",
|
||||||
|
"owner": "9d8fe19a78974c56dceb2ea4a8f01ed0f5fecb9d29f80e9e3b84104e4a3ea520",
|
||||||
|
"ownerDisplayName": "anonymousCoward",
|
||||||
|
"creationDate": "2018-06-04T17:45:42.592Z",
|
||||||
|
"mdBucketModelVersion": 8,
|
||||||
|
"transient": false,
|
||||||
|
"deleted": false,
|
||||||
|
"serverSideEncryption": null,
|
||||||
|
"versioningConfiguration": null,
|
||||||
|
"websiteConfiguration": null,
|
||||||
|
"locationConstraint": "us-east-1",
|
||||||
|
"readLocationConstraint": "us-east-1",
|
||||||
|
"cors": null,
|
||||||
|
"replicationConfiguration": null,
|
||||||
|
"lifecycleConfiguration": null,
|
||||||
|
"uid": "fea97818-6a9a-11e8-9777-e311618cc5d4"
|
||||||
|
}
|
|
@ -0,0 +1,268 @@
|
||||||
|
const async = require('async');
|
||||||
|
const assert = require('assert');
|
||||||
|
const Prando = require('prando');
|
||||||
|
const werelogs = require('werelogs');
|
||||||
|
const MetadataWrapper = require('../../../../lib/storage/metadata/MetadataWrapper');
|
||||||
|
const fakeBucketInfo = require('./FakeBucketInfo.json');
|
||||||
|
|
||||||
|
werelogs.configure({
|
||||||
|
level: 'info',
|
||||||
|
dump: 'error',
|
||||||
|
});
|
||||||
|
|
||||||
|
class Injector {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @constructor
|
||||||
|
*
|
||||||
|
* @param {Object} backend - Metadata backend to use for injection
|
||||||
|
* @param {Object} logger - Logger to use
|
||||||
|
*/
|
||||||
|
constructor(backend, logger) {
|
||||||
|
this.backend = backend;
|
||||||
|
this.rnd = new Prando(0);
|
||||||
|
this.logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
get opPut() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
get opDelete() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
genKey(len) {
|
||||||
|
const characters =
|
||||||
|
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
|
||||||
|
return this.rnd.nextString(len, characters);
|
||||||
|
}
|
||||||
|
|
||||||
|
genBase16(len) {
|
||||||
|
const characters = 'abcdef0123456789';
|
||||||
|
return this.rnd.nextString(len, characters);
|
||||||
|
}
|
||||||
|
|
||||||
|
_zeroPad(s, n, width) {
|
||||||
|
return s + n.toString().padStart(width, '0');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deterministic injection of puts or deletes according to parameters
|
||||||
|
*
|
||||||
|
* @param {String} bucketName - bucket name to inject to
|
||||||
|
* @param {Object} params - parameters for injection
|
||||||
|
* @param {Array} inputKeys - optional keys to use as input
|
||||||
|
* @param {Array} outputKeys - optional generated keys as output
|
||||||
|
* @param {Array} outputValues - optional generated values as output
|
||||||
|
* @param {function} cb - callback when done
|
||||||
|
*
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
inject(bucketName, params, inputKeys, outputKeys, outputValues, cb) {
|
||||||
|
let maxKeyLen = 1024;
|
||||||
|
if (params.maxKeyLen !== undefined) {
|
||||||
|
maxKeyLen = params.maxKeyLen;
|
||||||
|
}
|
||||||
|
async.timesLimit(
|
||||||
|
params.numKeys,
|
||||||
|
10,
|
||||||
|
(n, next) => {
|
||||||
|
let key;
|
||||||
|
if (inputKeys) {
|
||||||
|
const idx = this.rnd.nextInt(0, inputKeys.length - 1);
|
||||||
|
key = inputKeys[idx];
|
||||||
|
inputKeys.splice(idx, 1);
|
||||||
|
} else {
|
||||||
|
if (params.randomKey) {
|
||||||
|
const len = this.rnd.nextInt(1, maxKeyLen);
|
||||||
|
key = this.genKey(len);
|
||||||
|
} else {
|
||||||
|
let x;
|
||||||
|
if (params.randomSeq) {
|
||||||
|
x = this.rnd.nextInt(0, params.maxSeq - 1);
|
||||||
|
} else {
|
||||||
|
x = n;
|
||||||
|
}
|
||||||
|
key = this._zeroPad(params.prefix, x, 10) +
|
||||||
|
params.suffix;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (outputKeys) {
|
||||||
|
outputKeys.push(key);
|
||||||
|
}
|
||||||
|
// eslint-disable-next-line
|
||||||
|
const value = {
|
||||||
|
versionId: this.genBase16(32),
|
||||||
|
'content-length': this.rnd.nextInt(0, 5000000),
|
||||||
|
'content-md5': this.genBase16(32),
|
||||||
|
};
|
||||||
|
if (outputValues) {
|
||||||
|
outputValues.push(value);
|
||||||
|
}
|
||||||
|
if (params.op === this.opPut) {
|
||||||
|
this.backend.putObjectMD(
|
||||||
|
bucketName,
|
||||||
|
key,
|
||||||
|
value,
|
||||||
|
{},
|
||||||
|
this.logger,
|
||||||
|
next);
|
||||||
|
return undefined;
|
||||||
|
} else if (params.op === this.opDelete) {
|
||||||
|
this.backend.deleteObjectMD(
|
||||||
|
bucketName,
|
||||||
|
key,
|
||||||
|
{},
|
||||||
|
this.logger,
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
if (err.code !== 404) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return next();
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
return next(new Error('unknow op'));
|
||||||
|
},
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
// eslint-disable-next-line
|
||||||
|
console.error('inject error', err);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
if (cb) {
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = Injector;
|
||||||
|
|
||||||
|
describe('Injector', () => {
|
||||||
|
const fakeBucket = 'fake';
|
||||||
|
const logger = new werelogs.Logger('Injector');
|
||||||
|
const memBackend = new MetadataWrapper(
|
||||||
|
'mem', {}, null, logger);
|
||||||
|
|
||||||
|
before(done => {
|
||||||
|
memBackend.createBucket(fakeBucket, fakeBucketInfo, logger, done);
|
||||||
|
});
|
||||||
|
|
||||||
|
after(done => {
|
||||||
|
memBackend.deleteBucket(fakeBucket, logger, done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('zeropad', () => {
|
||||||
|
const injector = new Injector(memBackend, logger);
|
||||||
|
assert(injector._zeroPad('foo', 42, 10) === 'foo0000000042');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('inject inputKeys', done => {
|
||||||
|
const injector = new Injector(memBackend, logger);
|
||||||
|
const inputKeys = ['foo1', 'foo2', 'foo3'];
|
||||||
|
const outputKeys = [];
|
||||||
|
injector.inject(
|
||||||
|
fakeBucket,
|
||||||
|
{
|
||||||
|
op: injector.opPut,
|
||||||
|
numKeys: 3,
|
||||||
|
},
|
||||||
|
inputKeys,
|
||||||
|
outputKeys,
|
||||||
|
null,
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
assert.deepEqual(outputKeys, ['foo2', 'foo1', 'foo3']);
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('inject sequence', done => {
|
||||||
|
const injector = new Injector(memBackend, logger);
|
||||||
|
const outputKeys = [];
|
||||||
|
injector.inject(
|
||||||
|
fakeBucket,
|
||||||
|
{
|
||||||
|
prefix: 'foo',
|
||||||
|
suffix: 'x',
|
||||||
|
randomSeq: true,
|
||||||
|
maxSeq: 10,
|
||||||
|
op: injector.opPut,
|
||||||
|
numKeys: 3,
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
outputKeys,
|
||||||
|
null,
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
assert.deepEqual(
|
||||||
|
outputKeys,
|
||||||
|
[
|
||||||
|
'foo0000000005x',
|
||||||
|
'foo0000000001x',
|
||||||
|
'foo0000000000x',
|
||||||
|
]);
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('inject random', done => {
|
||||||
|
const injector = new Injector(memBackend, logger);
|
||||||
|
const outputKeys = [];
|
||||||
|
const outputValues = [];
|
||||||
|
injector.inject(
|
||||||
|
fakeBucket,
|
||||||
|
{
|
||||||
|
prefix: 'foo',
|
||||||
|
suffix: 'x',
|
||||||
|
randomKey: true,
|
||||||
|
maxKeyLen: 10,
|
||||||
|
op: injector.opPut,
|
||||||
|
numKeys: 3,
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
outputKeys,
|
||||||
|
outputValues,
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
assert.deepEqual(
|
||||||
|
outputKeys,
|
||||||
|
[
|
||||||
|
'f5TJ6X',
|
||||||
|
'7T',
|
||||||
|
'LStNJxHS8',
|
||||||
|
]);
|
||||||
|
assert.deepEqual(
|
||||||
|
outputValues,
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'content-length': 3009012,
|
||||||
|
'content-md5': '60f7abfdc5855e00a6e6dc2918bda7a8',
|
||||||
|
'versionId': 'f13938435117885f7f88d7636a3b238e',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'content-length': 3984521,
|
||||||
|
'content-md5': '7cfc7e8b82826b83e302d18fa0e24b12',
|
||||||
|
'versionId': 'f7f5c4973bc353ad8a9d5084fc1f0dc3',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'content-length': 4112702,
|
||||||
|
'content-md5': '6d4fe78b11cfa4ff7b2efaba5c5965fe',
|
||||||
|
'versionId': '5540954e05a7910abeb72b8393c93afe',
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,216 @@
|
||||||
|
const async = require('async');
|
||||||
|
const fakeBucketInfo = require('../FakeBucketInfo.json');
|
||||||
|
const MetadataWrapper = require('../../../../../lib/storage/metadata/MetadataWrapper');
|
||||||
|
const BucketdOplogInterface = require('../../../../../lib/storage/metadata/oplog/BucketdOplogInterface');
|
||||||
|
const PersistMemInterface = require('../../../../../lib/storage/metadata/oplog/PersistMemInterface');
|
||||||
|
const Injector = require('../Injector');
|
||||||
|
const http = require('http');
|
||||||
|
const url = require('url');
|
||||||
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
|
werelogs.configure({
|
||||||
|
level: 'info',
|
||||||
|
dump: 'error',
|
||||||
|
});
|
||||||
|
|
||||||
|
class PersistDataInterface {
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.data = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
initState(cb) {
|
||||||
|
this.data = {};
|
||||||
|
return process.nextTick(cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
loadState(stream, cb) {
|
||||||
|
const chunks = [];
|
||||||
|
stream.on('data', chunk => {
|
||||||
|
chunks.push(chunk);
|
||||||
|
});
|
||||||
|
stream.on('end', () => {
|
||||||
|
this.data = JSON.parse(Buffer.concat(chunks));
|
||||||
|
return process.nextTick(cb);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
saveState(stream, cb) {
|
||||||
|
stream.write(JSON.stringify(this.data));
|
||||||
|
stream.end();
|
||||||
|
return process.nextTick(cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
updateState(addQueue, deleteQueue, cb) {
|
||||||
|
return process.nextTick(cb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('BucketdOplogInterface', () => {
|
||||||
|
const logger = new werelogs.Logger('BucketOplogInterface');
|
||||||
|
|
||||||
|
const fakePort = 9090;
|
||||||
|
const fakeBucket = 'fake';
|
||||||
|
const fakeRaftId = 2;
|
||||||
|
const numObjs = 20000;
|
||||||
|
const fakeCseq = 20001;
|
||||||
|
let oplogInjected = false;
|
||||||
|
const numOplogSeqs = 100;
|
||||||
|
const oplogBatchSize = 2;
|
||||||
|
const endCseq = fakeCseq + numOplogSeqs;
|
||||||
|
const maxLimit = 2;
|
||||||
|
const oplogKeys = [];
|
||||||
|
const oplogValues = [];
|
||||||
|
let oplogKeysIdx = 0;
|
||||||
|
|
||||||
|
const memBackend = new MetadataWrapper(
|
||||||
|
'mem', {}, null, logger);
|
||||||
|
const injector = new Injector(memBackend, logger);
|
||||||
|
|
||||||
|
const requestListener = (req, res) => {
|
||||||
|
const _url = url.parse(req.url, true);
|
||||||
|
if (_url.pathname === `/_/buckets/${fakeBucket}`) {
|
||||||
|
res.writeHead(200);
|
||||||
|
res.end(JSON.stringify(
|
||||||
|
{
|
||||||
|
raftSessionId: fakeRaftId,
|
||||||
|
creating: false,
|
||||||
|
deleting: false,
|
||||||
|
version: 0,
|
||||||
|
}));
|
||||||
|
} else if (_url.pathname === `/_/raft_sessions/${fakeRaftId}/log`) {
|
||||||
|
const begin = _url.query.begin;
|
||||||
|
const limit = _url.query.limit;
|
||||||
|
if (begin === '1' && limit === '1') {
|
||||||
|
res.writeHead(200);
|
||||||
|
res.end(JSON.stringify(
|
||||||
|
{
|
||||||
|
info: {
|
||||||
|
start: 1,
|
||||||
|
cseq: fakeCseq,
|
||||||
|
prune: 1,
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
const realLimit = Math.min(limit, maxLimit);
|
||||||
|
async.until(
|
||||||
|
() => oplogInjected,
|
||||||
|
next => {
|
||||||
|
// inject similar but different random objects
|
||||||
|
injector.inject(
|
||||||
|
fakeBucket,
|
||||||
|
{
|
||||||
|
numKeys: numOplogSeqs * oplogBatchSize,
|
||||||
|
maxSeq: numObjs,
|
||||||
|
op: injector.opPut,
|
||||||
|
randomSeq: true,
|
||||||
|
prefix: 'obj_',
|
||||||
|
suffix: '_bis',
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
oplogKeys,
|
||||||
|
oplogValues,
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
oplogInjected = true;
|
||||||
|
return next();
|
||||||
|
});
|
||||||
|
}, err => {
|
||||||
|
if (err) {
|
||||||
|
res.writeHead(404);
|
||||||
|
res.end('error', err);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
if (begin < endCseq) {
|
||||||
|
res.writeHead(200);
|
||||||
|
const resp = {};
|
||||||
|
resp.info = {
|
||||||
|
start: begin,
|
||||||
|
cseq: endCseq,
|
||||||
|
prune: 1,
|
||||||
|
};
|
||||||
|
resp.log = [];
|
||||||
|
for (let i = 0; i < realLimit; i++) {
|
||||||
|
resp.log[i] = {};
|
||||||
|
resp.log[i].db = fakeBucket;
|
||||||
|
resp.log[i].method = 8;
|
||||||
|
resp.log[i].entries = [];
|
||||||
|
for (let j = 0; j < oplogBatchSize; j++) {
|
||||||
|
resp.log[i].entries[j] = {};
|
||||||
|
resp.log[i].entries[j].key = oplogKeys[oplogKeysIdx];
|
||||||
|
resp.log[i].entries[j].value = oplogValues[oplogKeysIdx];
|
||||||
|
oplogKeysIdx++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res.end(JSON.stringify(resp));
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else if (_url.pathname === `/default/bucket/${fakeBucket}`) {
|
||||||
|
const marker = _url.query.marker === '' ? null : _url.query.marker;
|
||||||
|
const maxKeys = parseInt(_url.query.maxKeys, 10);
|
||||||
|
memBackend.listObjects(fakeBucket, {
|
||||||
|
listingType: 'Delimiter',
|
||||||
|
marker,
|
||||||
|
maxKeys,
|
||||||
|
}, (err, result) => {
|
||||||
|
if (err) {
|
||||||
|
res.writeHead(404);
|
||||||
|
res.end('error', err);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
res.writeHead(200);
|
||||||
|
res.end(JSON.stringify(result));
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
before(done => {
|
||||||
|
const server = http.createServer(requestListener);
|
||||||
|
server.listen(fakePort);
|
||||||
|
async.waterfall([
|
||||||
|
next => memBackend.createBucket(fakeBucket, fakeBucketInfo, logger, next),
|
||||||
|
next => injector.inject(
|
||||||
|
fakeBucket,
|
||||||
|
{
|
||||||
|
numKeys: numObjs,
|
||||||
|
maxSeq: numObjs,
|
||||||
|
op: injector.opPut,
|
||||||
|
randomSeq: false,
|
||||||
|
prefix: 'obj_',
|
||||||
|
suffix: '',
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
next),
|
||||||
|
], done);
|
||||||
|
});
|
||||||
|
|
||||||
|
after(done => {
|
||||||
|
memBackend.deleteBucket(fakeBucket, logger, done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('simulation', done => {
|
||||||
|
const params = {
|
||||||
|
bootstrap: [`localhost:${fakePort}`],
|
||||||
|
persist: new PersistMemInterface(),
|
||||||
|
persistData: new PersistDataInterface(),
|
||||||
|
stopAt: numObjs + numOplogSeqs,
|
||||||
|
};
|
||||||
|
const bucketdOplog = new BucketdOplogInterface(params);
|
||||||
|
bucketdOplog.start(
|
||||||
|
{
|
||||||
|
filterName: fakeBucket,
|
||||||
|
filterType: 'bucket',
|
||||||
|
bucket: {
|
||||||
|
bucketName: fakeBucket,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
done);
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,55 @@
|
||||||
|
const assert = require('assert');
|
||||||
|
const PersistMemInterface = require('../../../../../lib/storage/metadata/oplog/PersistMemInterface');
|
||||||
|
|
||||||
|
class PersistDataInterface {
|
||||||
|
|
||||||
|
constructor(obj) {
|
||||||
|
this.obj = obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
loadState(stream, cb) {
|
||||||
|
const chunks = [];
|
||||||
|
stream.on('data', chunk => {
|
||||||
|
chunks.push(chunk);
|
||||||
|
});
|
||||||
|
stream.on('end', () => {
|
||||||
|
this.obj = JSON.parse(Buffer.concat(chunks));
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
saveState(stream, cb) {
|
||||||
|
stream.write(JSON.stringify(this.obj));
|
||||||
|
stream.end();
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('Persist Mem', () => {
|
||||||
|
const persist = new PersistMemInterface();
|
||||||
|
const filterName = 'foo';
|
||||||
|
|
||||||
|
it('basic operations', done => {
|
||||||
|
const pd1 = new PersistDataInterface({
|
||||||
|
foo: 'bar',
|
||||||
|
bar: {
|
||||||
|
qux: 42,
|
||||||
|
quuux: false,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const pd2 = new PersistDataInterface();
|
||||||
|
persist.save(filterName, pd1, 42, err => {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
persist.load(filterName, pd2, err => {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
assert.deepEqual(pd1.obj, pd2.obj);
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,116 @@
|
||||||
|
/* eslint-disable no-console */
|
||||||
|
const fs = require('fs');
|
||||||
|
const { ArgumentParser } = require('argparse');
|
||||||
|
const BucketdOplogInterface = require('../lib/storage/metadata/oplog/BucketdOplogInterface');
|
||||||
|
const MongoOplogInterface = require('../lib/storage/metadata/oplog/MongoOplogInterface');
|
||||||
|
const PersistMemInterface = require('../lib/storage/metadata/oplog/PersistMemInterface');
|
||||||
|
const PersistFileInterface = require('../lib/storage/metadata/oplog/PersistFileInterface');
|
||||||
|
const PersistRingInterface = require('../lib/storage/metadata/oplog/PersistRingInterface');
|
||||||
|
|
||||||
|
const parser = new ArgumentParser({
|
||||||
|
description: 'Oplog CLI tool',
|
||||||
|
});
|
||||||
|
|
||||||
|
parser.add_argument('-v', '--verbose', { action: 'store_true' });
|
||||||
|
parser.add_argument('-c', '--config-file', { help: 'config file' });
|
||||||
|
parser.add_argument('--oplog-interface', { help: 'bucketd|mongo' });
|
||||||
|
parser.add_argument('--persist', { help: 'mem|file|ring' });
|
||||||
|
parser.add_argument('--bucket', { help: 'bucket' });
|
||||||
|
parser.add_argument('--start', { action: 'store_true' });
|
||||||
|
|
||||||
|
const args = parser.parse_args();
|
||||||
|
|
||||||
|
class PersistDataInterface {
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.data = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
initState(cb) {
|
||||||
|
this.data = {};
|
||||||
|
return process.nextTick(cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
loadState(stream, cb) {
|
||||||
|
const chunks = [];
|
||||||
|
stream.on('data', chunk => {
|
||||||
|
chunks.push(chunk);
|
||||||
|
});
|
||||||
|
stream.on('end', () => {
|
||||||
|
this.data = JSON.parse(Buffer.concat(chunks));
|
||||||
|
return process.nextTick(cb);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
saveState(stream, cb) {
|
||||||
|
stream.write(JSON.stringify(this.data));
|
||||||
|
stream.end();
|
||||||
|
return process.nextTick(cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
updateState(addQueue, deleteQueue, cb) {
|
||||||
|
console.log('addQueue', addQueue, 'deleteQueue', deleteQueue);
|
||||||
|
return process.nextTick(cb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let config = {};
|
||||||
|
|
||||||
|
if (args.config_file !== undefined) {
|
||||||
|
config = JSON.parse(fs.readFileSync(args.config_file, 'utf8'));
|
||||||
|
}
|
||||||
|
|
||||||
|
let persist;
|
||||||
|
if (args.persist === 'mem') {
|
||||||
|
persist = new PersistMemInterface(config.persistMem);
|
||||||
|
} else if (args.persist === 'file') {
|
||||||
|
persist = new PersistFileInterface(config.persistFile);
|
||||||
|
} else if (args.persist === 'ring') {
|
||||||
|
persist = new PersistRingInterface(config.persistRing);
|
||||||
|
} else {
|
||||||
|
console.error(`invalid persist ${args.persist}`);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
let params = {
|
||||||
|
persist,
|
||||||
|
persistData: new PersistDataInterface(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let oplogInterface;
|
||||||
|
if (args.oplog_interface === 'bucketd') {
|
||||||
|
params = Object.assign(params, config.bucketdOplog);
|
||||||
|
oplogInterface = new BucketdOplogInterface(params);
|
||||||
|
} else if (args.oplog_interface === 'mongo') {
|
||||||
|
params = Object.assign(params, config.mongoOplog);
|
||||||
|
oplogInterface = new MongoOplogInterface(params);
|
||||||
|
} else {
|
||||||
|
console.error(`invalid oplog-interface ${args.oplog_interface}`);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (args.start) {
|
||||||
|
if (args.bucket === undefined) {
|
||||||
|
console.error('please provide bucket');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
oplogInterface.start(
|
||||||
|
{
|
||||||
|
filterName: args.bucket,
|
||||||
|
filterType: 'bucket',
|
||||||
|
bucket: {
|
||||||
|
bucketName: args.bucket,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
console.error(err);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
console.log('exiting...');
|
||||||
|
return;
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
console.error('please provide an option');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
28
yarn.lock
28
yarn.lock
|
@ -1682,6 +1682,11 @@ async@^3.2.0:
|
||||||
resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9"
|
resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9"
|
||||||
integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g==
|
integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g==
|
||||||
|
|
||||||
|
async@~0.2.7:
|
||||||
|
version "0.2.10"
|
||||||
|
resolved "https://registry.yarnpkg.com/async/-/async-0.2.10.tgz#b6bbe0b0674b9d719708ca38de8c237cb526c3d1"
|
||||||
|
integrity sha1-trvgsGdLnXGXCMo43owjfLUmw9E=
|
||||||
|
|
||||||
async@~2.1.5:
|
async@~2.1.5:
|
||||||
version "2.1.5"
|
version "2.1.5"
|
||||||
resolved "https://registry.yarnpkg.com/async/-/async-2.1.5.tgz#e587c68580994ac67fc56ff86d3ac56bdbe810bc"
|
resolved "https://registry.yarnpkg.com/async/-/async-2.1.5.tgz#e587c68580994ac67fc56ff86d3ac56bdbe810bc"
|
||||||
|
@ -4895,6 +4900,11 @@ memory-pager@^1.0.2:
|
||||||
resolved "https://registry.yarnpkg.com/memory-pager/-/memory-pager-1.5.0.tgz#d8751655d22d384682741c972f2c3d6dfa3e66b5"
|
resolved "https://registry.yarnpkg.com/memory-pager/-/memory-pager-1.5.0.tgz#d8751655d22d384682741c972f2c3d6dfa3e66b5"
|
||||||
integrity sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==
|
integrity sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==
|
||||||
|
|
||||||
|
memorystream@^0.3.1:
|
||||||
|
version "0.3.1"
|
||||||
|
resolved "https://registry.yarnpkg.com/memorystream/-/memorystream-0.3.1.tgz#86d7090b30ce455d63fbae12dda51a47ddcaf9b2"
|
||||||
|
integrity sha1-htcJCzDORV1j+64S3aUaR93K+bI=
|
||||||
|
|
||||||
merge-stream@^2.0.0:
|
merge-stream@^2.0.0:
|
||||||
version "2.0.0"
|
version "2.0.0"
|
||||||
resolved "https://registry.yarnpkg.com/merge-stream/-/merge-stream-2.0.0.tgz#52823629a14dd00c9770fb6ad47dc6310f2c1f60"
|
resolved "https://registry.yarnpkg.com/merge-stream/-/merge-stream-2.0.0.tgz#52823629a14dd00c9770fb6ad47dc6310f2c1f60"
|
||||||
|
@ -5203,6 +5213,14 @@ node-releases@^2.0.2:
|
||||||
resolved "https://registry.yarnpkg.com/node-releases/-/node-releases-2.0.2.tgz#7139fe71e2f4f11b47d4d2986aaf8c48699e0c01"
|
resolved "https://registry.yarnpkg.com/node-releases/-/node-releases-2.0.2.tgz#7139fe71e2f4f11b47d4d2986aaf8c48699e0c01"
|
||||||
integrity sha512-XxYDdcQ6eKqp/YjI+tb2C5WM2LgjnZrfYg4vgQt49EK268b6gYCHsBLrK2qvJo4FmCtqmKezb0WZFK4fkrZNsg==
|
integrity sha512-XxYDdcQ6eKqp/YjI+tb2C5WM2LgjnZrfYg4vgQt49EK268b6gYCHsBLrK2qvJo4FmCtqmKezb0WZFK4fkrZNsg==
|
||||||
|
|
||||||
|
node-zookeeper-client@^0.2.2:
|
||||||
|
version "0.2.3"
|
||||||
|
resolved "https://registry.yarnpkg.com/node-zookeeper-client/-/node-zookeeper-client-0.2.3.tgz#48c79129c56b8e898df9bd3bdad9e27dcad63851"
|
||||||
|
integrity sha512-V4gVHxzQ42iwhkANpPryzfjmqi3Ql3xeO9E/px7W5Yi774WplU3YtqUpnvcL/eJit4UqcfuLOgZLkpf0BPhHmg==
|
||||||
|
dependencies:
|
||||||
|
async "~0.2.7"
|
||||||
|
underscore "~1.4.4"
|
||||||
|
|
||||||
nopt@^5.0.0:
|
nopt@^5.0.0:
|
||||||
version "5.0.0"
|
version "5.0.0"
|
||||||
resolved "https://registry.yarnpkg.com/nopt/-/nopt-5.0.0.tgz#530942bb58a512fccafe53fe210f13a25355dc88"
|
resolved "https://registry.yarnpkg.com/nopt/-/nopt-5.0.0.tgz#530942bb58a512fccafe53fe210f13a25355dc88"
|
||||||
|
@ -5508,6 +5526,11 @@ pluralize@^1.2.1:
|
||||||
resolved "https://registry.yarnpkg.com/pluralize/-/pluralize-1.2.1.tgz#d1a21483fd22bb41e58a12fa3421823140897c45"
|
resolved "https://registry.yarnpkg.com/pluralize/-/pluralize-1.2.1.tgz#d1a21483fd22bb41e58a12fa3421823140897c45"
|
||||||
integrity sha1-0aIUg/0iu0HlihL6NCGCMUCJfEU=
|
integrity sha1-0aIUg/0iu0HlihL6NCGCMUCJfEU=
|
||||||
|
|
||||||
|
prando@^6.0.1:
|
||||||
|
version "6.0.1"
|
||||||
|
resolved "https://registry.yarnpkg.com/prando/-/prando-6.0.1.tgz#ffa8de84c2adc4975dd9df37ae4ada0458face53"
|
||||||
|
integrity sha512-ghUWxQ1T9IJmPu6eshc3VU0OwveUtXQ33ZLXYUcz1Oc5ppKLDXKp0TBDj6b0epwhEctzcQSNGR2iHyvQSn4W5A==
|
||||||
|
|
||||||
prelude-ls@~1.1.2:
|
prelude-ls@~1.1.2:
|
||||||
version "1.1.2"
|
version "1.1.2"
|
||||||
resolved "https://registry.yarnpkg.com/prelude-ls/-/prelude-ls-1.1.2.tgz#21932a549f5e52ffd9a827f570e04be62a97da54"
|
resolved "https://registry.yarnpkg.com/prelude-ls/-/prelude-ls-1.1.2.tgz#21932a549f5e52ffd9a827f570e04be62a97da54"
|
||||||
|
@ -6726,6 +6749,11 @@ unbzip2-stream@^1.0.9:
|
||||||
buffer "^5.2.1"
|
buffer "^5.2.1"
|
||||||
through "^2.3.8"
|
through "^2.3.8"
|
||||||
|
|
||||||
|
underscore@~1.4.4:
|
||||||
|
version "1.4.4"
|
||||||
|
resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.4.4.tgz#61a6a32010622afa07963bf325203cf12239d604"
|
||||||
|
integrity sha1-YaajIBBiKvoHljvzJSA88SI51gQ=
|
||||||
|
|
||||||
underscore@~1.8.3:
|
underscore@~1.8.3:
|
||||||
version "1.8.3"
|
version "1.8.3"
|
||||||
resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.8.3.tgz#4f3fb53b106e6097fcf9cb4109f2a5e9bdfa5022"
|
resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.8.3.tgz#4f3fb53b106e6097fcf9cb4109f2a5e9bdfa5022"
|
||||||
|
|
Loading…
Reference in New Issue