Compare commits
1 Commits
developmen
...
ft/ARSN-65
Author | SHA1 | Date |
---|---|---|
Vianney Rancurel | 108b1b3770 |
|
@ -0,0 +1,251 @@
|
|||
/*
|
||||
* Main interface for bucketd oplog management
|
||||
*/
|
||||
const async = require('async');
|
||||
const { RESTClient: BucketClient } = require('bucketclient');
|
||||
const { jsutil, errors } = require('arsenal');
|
||||
const LogConsumer = require('arsenal/lib/storage/metadata/bucketclient/LogConsumer');
|
||||
const { isMasterKey } = require('arsenal/lib/versioning/Version');
|
||||
const OplogInterface = require('./OplogInterface');
|
||||
|
||||
class BucketdOplogInterface extends OplogInterface {
|
||||
|
||||
constructor(params) {
|
||||
super(params);
|
||||
this.backendRetryTimes = 3;
|
||||
this.backendRetryInterval = 300;
|
||||
this.bucketdOplogQuerySize = 20;
|
||||
this.stopAt = params?.stopAt ?? -1;
|
||||
const bkBootstrap = params?.bootstrap ?? ['localhost:9000'];
|
||||
this.bkClient = new BucketClient(bkBootstrap);
|
||||
}
|
||||
|
||||
start(filter, cb) {
|
||||
if (!(filter.filterType === 'bucket' ||
|
||||
filter.filterType === 'raftSession')) {
|
||||
return cb(errors.NotImplemented);
|
||||
}
|
||||
const filterName = filter.filterName;
|
||||
async.waterfall([
|
||||
/*
|
||||
* In this step we get the raftId for filterName
|
||||
*/
|
||||
next => {
|
||||
if (filter.filterType === 'raftSession') {
|
||||
return next(null, filter.raftSession.raftId);
|
||||
}
|
||||
this.logger.info('obtaining raftId',
|
||||
{ filterName });
|
||||
async.retry(
|
||||
{
|
||||
times: this.backendRetryTimes,
|
||||
interval: this.backendRetryInterval,
|
||||
},
|
||||
done => {
|
||||
this.bkClient.getBucketInformation(
|
||||
filter.bucket.bucketName,
|
||||
null,
|
||||
(err, info) => {
|
||||
if (err) {
|
||||
this.logger.info('retrying getBucketInformation', { err, filterName });
|
||||
return done(err);
|
||||
}
|
||||
return done(null, JSON.parse(info));
|
||||
});
|
||||
},
|
||||
(err, res) => {
|
||||
if (err) {
|
||||
this.logger.error('getBucketInformation too many failures', { err, filterName });
|
||||
return next(err);
|
||||
}
|
||||
return next(null, res.raftSessionId);
|
||||
});
|
||||
return undefined;
|
||||
},
|
||||
/*
|
||||
* In this step we get the stored offset if we have it
|
||||
*/
|
||||
(raftId, next) => {
|
||||
let cseq = undefined;
|
||||
this.persist.load(filterName, this.persistData, (err, offset) => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
cseq = offset;
|
||||
return next(null, raftId, cseq);
|
||||
});
|
||||
},
|
||||
/*
|
||||
* In this step we acquire the offset if we don't already have it
|
||||
*/
|
||||
(raftId, cseq, next) => {
|
||||
if (cseq !== undefined) {
|
||||
this.logger.info(`skipping cseq acquisition (cseq=${cseq})`,
|
||||
{ filterName });
|
||||
return next(null, raftId, cseq, true);
|
||||
}
|
||||
this.logger.info('cseq acquisition',
|
||||
{ filterName });
|
||||
async.retry(
|
||||
{
|
||||
times: this.backendRetryTimes,
|
||||
interval: this.backendRetryInterval,
|
||||
},
|
||||
done => {
|
||||
this.bkClient.getRaftLog(
|
||||
raftId,
|
||||
1,
|
||||
1,
|
||||
true,
|
||||
null,
|
||||
(err, stream) => {
|
||||
if (err) {
|
||||
this.logger.info('retrying getRaftLog', { err, filterName });
|
||||
return done(err);
|
||||
}
|
||||
const chunks = [];
|
||||
stream.on('data', chunk => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
const info = JSON.parse(Buffer.concat(chunks));
|
||||
return done(null, info);
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
},
|
||||
(err, res) => {
|
||||
if (err) {
|
||||
this.logger.error('getRaftLog too many failures', { err, filterName });
|
||||
return next(err);
|
||||
}
|
||||
return next(null, raftId, res.info.cseq, false);
|
||||
});
|
||||
return undefined;
|
||||
},
|
||||
/*
|
||||
* In this step we init the state (e.g. scan)
|
||||
*/
|
||||
(raftId, cseq, skipInit, next) => {
|
||||
if (skipInit) {
|
||||
this.logger.info(`skipping state initialization cseq=${cseq}`,
|
||||
{ filterName });
|
||||
return next(null, raftId, cseq);
|
||||
}
|
||||
this.logger.info(`initializing state cseq=${cseq}`,
|
||||
{ filterName });
|
||||
this.persistData.initState(err => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
this.persist.save(
|
||||
filterName, this.persistData, cseq, err => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
return next(null, raftId, cseq);
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
return undefined;
|
||||
},
|
||||
/*
|
||||
* In this step we loop over the oplog
|
||||
*/
|
||||
(raftId, cseq, next) => {
|
||||
this.logger.info(`reading oplog raftId=${raftId} cseq=${cseq}`,
|
||||
{ filterName });
|
||||
// only way to get out of the loop in all cases
|
||||
const nextOnce = jsutil.once(next);
|
||||
let doStop = false;
|
||||
// resume reading the oplog from cseq. changes are idempotent
|
||||
const logConsumer = new LogConsumer({
|
||||
bucketClient: this.bkClient,
|
||||
raftSession: raftId,
|
||||
});
|
||||
let _cseq = cseq;
|
||||
async.until(
|
||||
() => doStop,
|
||||
_next => {
|
||||
logConsumer.readRecords({
|
||||
startSeq: _cseq,
|
||||
limit: this.bucketdOplogQuerySize,
|
||||
}, (err, record) => {
|
||||
if (err) {
|
||||
this.logger.error('readRecords error', { err, filterName });
|
||||
return setTimeout(() => _next(), 5000);
|
||||
}
|
||||
if (!record.log) {
|
||||
// nothing to read
|
||||
return setTimeout(() => _next(), 5000);
|
||||
}
|
||||
const seqs = [];
|
||||
record.log.on('data', chunk => {
|
||||
seqs.push(chunk);
|
||||
});
|
||||
record.log.on('end', () => {
|
||||
const addQueue = [];
|
||||
const delQueue = [];
|
||||
for (let i = 0; i < seqs.length; i++) {
|
||||
if (filter.filterType === 'raftSession' ||
|
||||
(filter.filterType === 'bucket' &&
|
||||
seqs[i].db === filter.bucket.bucketName)) {
|
||||
for (let j = 0; j < seqs[i].entries.length; j++) {
|
||||
const _item = {};
|
||||
_item.bucketName = seqs[i].db;
|
||||
_item.key = seqs[i].entries[j].key;
|
||||
if (seqs[i].entries[j].type !== undefined &&
|
||||
seqs[i].entries[j].type === 'del') {
|
||||
if (!isMasterKey(_item.key)) {
|
||||
// ignore for now
|
||||
return;
|
||||
}
|
||||
delQueue.push(_item);
|
||||
} else {
|
||||
_item.value = Object.assign({}, seqs[i].entries[j].value);
|
||||
addQueue.push(_item);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.persistData.updateState(
|
||||
addQueue, delQueue, err => {
|
||||
if (err) {
|
||||
return _next(err);
|
||||
}
|
||||
_cseq += seqs.length;
|
||||
this.persist.save(
|
||||
filterName, this.persistData, _cseq, err => {
|
||||
if (err) {
|
||||
return _next(err);
|
||||
}
|
||||
if (_cseq > this.stopAt) {
|
||||
doStop = true;
|
||||
}
|
||||
return _next();
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
}, err => {
|
||||
if (err) {
|
||||
return nextOnce(err);
|
||||
}
|
||||
return nextOnce();
|
||||
});
|
||||
},
|
||||
], err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
this.logger.info('returning',
|
||||
{ filterName });
|
||||
return cb();
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = BucketdOplogInterface;
|
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
* Main interface for Mongo oplog management
|
||||
*/
|
||||
const MongoClient = require('mongodb').MongoClient;
|
||||
const bson = require('bson');
|
||||
const { jsutil, errors } = require('arsenal');
|
||||
const async = require('async');
|
||||
const { isMasterKey } = require('arsenal/lib/versioning/Version');
|
||||
const OplogInterface = require('./OplogInterface');
|
||||
|
||||
class MongoOplogInterface extends OplogInterface {
|
||||
|
||||
constructor(params) {
|
||||
super(params);
|
||||
this.mongoDbUri = 'mongodb://localhost:27017';
|
||||
if (params && params.mongoDbUri !== undefined) {
|
||||
this.mongoDbUri = params.mongoDbUri;
|
||||
}
|
||||
this.databaseName = 'metadata';
|
||||
if (params && params.databaseName !== undefined) {
|
||||
this.databaseName = params.databaseName;
|
||||
}
|
||||
}
|
||||
|
||||
start(filter, cb) {
|
||||
if (filter.filterType !== 'bucket') {
|
||||
return cb(errors.NotImplemented);
|
||||
}
|
||||
const filterName = filter.filterName;
|
||||
const bucketName = filter.bucket.bucketName;
|
||||
let db;
|
||||
let collection;
|
||||
async.waterfall([
|
||||
/*
|
||||
* In this step we connect to MongoDB
|
||||
*/
|
||||
next => {
|
||||
MongoClient.connect(
|
||||
this.mongoDbUri,
|
||||
(err, client) => {
|
||||
if (err) {
|
||||
this.logger.error('error connecting to mongodb', { err, filterName });
|
||||
return next(err);
|
||||
}
|
||||
db = client.db(this.databaseName, {
|
||||
ignoreUndefined: true,
|
||||
});
|
||||
collection = db.collection(bucketName);
|
||||
return next();
|
||||
});
|
||||
},
|
||||
/*
|
||||
* In this step we get the stored offset if we have it
|
||||
*/
|
||||
next => {
|
||||
let resumeToken = undefined;
|
||||
this.persist.load(filterName, this.persistData, (err, offset) => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
if (offset && offset._data) {
|
||||
resumeToken = {};
|
||||
resumeToken._data = new bson.Binary(Buffer.from(offset._data, 'base64'));
|
||||
}
|
||||
return next(null, resumeToken);
|
||||
});
|
||||
},
|
||||
/*
|
||||
* In this step we acquire the offset if we don't already have it
|
||||
*/
|
||||
(resumeToken, next) => {
|
||||
if (resumeToken !== undefined) {
|
||||
this.logger.info(
|
||||
`skipping resumeToken acquisition (resumeToken=${resumeToken})`,
|
||||
{ filterName });
|
||||
return next(null, resumeToken, true);
|
||||
}
|
||||
this.logger.info('resumeToken acquisition',
|
||||
{ filterName });
|
||||
const changeStream = collection.watch();
|
||||
// big hack to extract resumeToken
|
||||
changeStream.once('change', () => next(null, changeStream.resumeToken, false));
|
||||
return undefined;
|
||||
},
|
||||
/*
|
||||
* In this step we init the state (e.g. scan)
|
||||
*/
|
||||
(resumeToken, skipInit, next) => {
|
||||
if (skipInit) {
|
||||
this.logger.info(`skipping state initialization resumeToken=${resumeToken}`,
|
||||
{ filterName });
|
||||
return next(null, resumeToken);
|
||||
}
|
||||
this.logger.info(`initializing state resumeToken=${resumeToken}`,
|
||||
{ filterName });
|
||||
this.persistData.initState(
|
||||
err => {
|
||||
if (err) {
|
||||
// eslint-disable-next-line
|
||||
console.error(err);
|
||||
process.exit(1);
|
||||
}
|
||||
this.persist.save(
|
||||
filterName, this.persistData, resumeToken, err => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
return next(null, resumeToken);
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
return undefined;
|
||||
},
|
||||
/*
|
||||
* In this step we loop over the oplog
|
||||
*/
|
||||
(resumeToken, next) => {
|
||||
this.logger.info(`reading oplog resumeToken=${resumeToken}`,
|
||||
{ filterName });
|
||||
// only way to get out of the loop in all cases
|
||||
const nextOnce = jsutil.once(next);
|
||||
// read the change stream
|
||||
const changeStream = collection.watch({ resumeAfter: resumeToken });
|
||||
// start bufferization
|
||||
this.filterName = filterName;
|
||||
this.startFlusher();
|
||||
changeStream.on(
|
||||
'change', item => {
|
||||
if (item.ns.db === this.databaseName) {
|
||||
const _item = {};
|
||||
_item.bucketName = bucketName;
|
||||
_item.key = item.documentKey._id;
|
||||
if (item.operationType === 'insert' ||
|
||||
item.operationType === 'replace') {
|
||||
_item.value = Object.assign({}, item.fullDocument.value);
|
||||
this.addEvent(_item, changeStream.resumeToken);
|
||||
} else if (item.operationType === 'delete') {
|
||||
if (!isMasterKey(_item.key)) {
|
||||
// ignore for now
|
||||
return;
|
||||
}
|
||||
this.delEvent(_item, changeStream.resumeToken);
|
||||
} else if (item.operationType === 'invalidate') {
|
||||
nextOnce();
|
||||
return;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
}], err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
this.logger.info('returning',
|
||||
{ filterName });
|
||||
return cb();
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = MongoOplogInterface;
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Interface for oplog management
|
||||
*
|
||||
* filter is an object with the following structure: {
|
||||
* filterName: string,
|
||||
* filterType: bucket|bucketList|raftSession,
|
||||
* bucket: {
|
||||
* bucketName: string,
|
||||
* },
|
||||
* bucketList: {
|
||||
* bucketList: [string, ...]
|
||||
* },
|
||||
* raftSession: {
|
||||
* raftId: number,
|
||||
* },
|
||||
* }
|
||||
*
|
||||
* persist is an interface with the following methods:
|
||||
* - constructor(params)
|
||||
* - load(filterName, persistData, cb(err, offset))
|
||||
* - save(filterName, persistData, offset, cb(err))
|
||||
|
||||
* persistData is an interface with the following methods:
|
||||
* - constuctor(params)
|
||||
* - initState(cb(err)): initialize the structure, e.g. initial bucket scan
|
||||
* - loadState(stream, cb(err)): load the state
|
||||
* - saveState(stream, cb(err)): save the state
|
||||
* - updateState(addQueue, delQueue, cb(err)): update the state
|
||||
* item: { filterName, key, value }
|
||||
*/
|
||||
const werelogs = require('werelogs');
|
||||
|
||||
werelogs.configure({
|
||||
level: 'info',
|
||||
dump: 'error',
|
||||
});
|
||||
|
||||
class OplogInterface {
|
||||
|
||||
constructor(params) {
|
||||
this.persist = params?.persist;
|
||||
this.persistData = params?.persistData;
|
||||
this.logger = new werelogs.Logger('OplogInterface');
|
||||
/* for backends requiring bufferization only */
|
||||
this.bufferTimeoutMs = params?.bufferTimeoutMs ?? 500;
|
||||
this.addQueue = [];
|
||||
this.delQueue = [];
|
||||
this.pending = false;
|
||||
this.prevOffset = null;
|
||||
this.offset = null;
|
||||
}
|
||||
|
||||
addEvent(item, offset) {
|
||||
this.addQueue.push(item);
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
delEvent(item, offset) {
|
||||
this.delQueue.push(item);
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
/*
|
||||
* Optional buffer management for backends that don't bufferize.
|
||||
* It avoids persisting the state at each event
|
||||
*/
|
||||
flushQueue(cb) {
|
||||
if (this.offset === null ||
|
||||
this.prevOffset === this.offset) {
|
||||
if (cb) {
|
||||
return process.nextTick(cb);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
if (this.pending) {
|
||||
if (cb) {
|
||||
return process.nextTick(cb);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
this.pending = true;
|
||||
const addQueue = this.addQueue;
|
||||
this.addQueue = [];
|
||||
const delQueue = this.delQueue;
|
||||
this.delQueue = [];
|
||||
const offset = this.offset;
|
||||
this.prevOffset = this.offset;
|
||||
this.persistData.updateState(
|
||||
addQueue,
|
||||
delQueue,
|
||||
err => {
|
||||
if (err) {
|
||||
if (cb) {
|
||||
return cb(err);
|
||||
}
|
||||
}
|
||||
this.persist.save(
|
||||
this.filterName,
|
||||
this.persistData,
|
||||
offset,
|
||||
err => {
|
||||
this.pending = false;
|
||||
if (err) {
|
||||
if (cb) {
|
||||
return cb(err);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
if (cb) {
|
||||
return cb();
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
|
||||
doFlush() {
|
||||
this.flushQueue(err => {
|
||||
if (err) {
|
||||
this.logger.error('flusing buffer', { err });
|
||||
}
|
||||
});
|
||||
this.startFlusher();
|
||||
}
|
||||
|
||||
startFlusher() {
|
||||
setTimeout(this.doFlush.bind(this), this.bufferTimeoutMs);
|
||||
}
|
||||
|
||||
// method to be overridden
|
||||
start() {
|
||||
throw new Error('not implemented');
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = OplogInterface;
|
|
@ -0,0 +1,91 @@
|
|||
const fs = require('fs');
|
||||
const werelogs = require('werelogs');
|
||||
|
||||
werelogs.configure({
|
||||
level: 'info',
|
||||
dump: 'error',
|
||||
});
|
||||
|
||||
class PersistFileInterface {
|
||||
|
||||
constructor() {
|
||||
this.folder = '/tmp';
|
||||
this.logger = new werelogs.Logger('PersistFileInterface');
|
||||
fs.access(this.folder, err => {
|
||||
if (err) {
|
||||
fs.mkdirSync(this.folder, { recursive: true });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
getFileName(filterName) {
|
||||
return `${this.folder}/${filterName}.json`;
|
||||
}
|
||||
|
||||
getOffsetFileName(filterName) {
|
||||
return `${this.folder}/${filterName}.offset.json`;
|
||||
}
|
||||
|
||||
load(filterName, persistData, cb) {
|
||||
const fileName = this.getFileName(filterName);
|
||||
const offsetFileName = this.getOffsetFileName(filterName);
|
||||
let obj = {};
|
||||
fs.readFile(
|
||||
offsetFileName,
|
||||
'utf-8', (err, data) => {
|
||||
if (err) {
|
||||
if (err.code === 'ENOENT') {
|
||||
this.logger.info(`${offsetFileName} non-existent`);
|
||||
} else {
|
||||
this.logger.error('error loading', { err });
|
||||
return cb(err);
|
||||
}
|
||||
} else {
|
||||
obj = JSON.parse(data);
|
||||
}
|
||||
if (fs.existsSync(fileName)) {
|
||||
const file = fs.createReadStream(fileName);
|
||||
persistData.loadState(file, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
this.logger.info(`${fileName} loaded: offset ${obj.offset}`);
|
||||
return cb(null, obj.offset);
|
||||
});
|
||||
} else {
|
||||
this.logger.info(`${fileName} non-existent`);
|
||||
return cb(null, obj.offset);
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
|
||||
save(filterName, persistData, offset, cb) {
|
||||
const fileName = this.getFileName(filterName);
|
||||
const offsetFileName = this.getOffsetFileName(filterName);
|
||||
const file = fs.createWriteStream(fileName);
|
||||
persistData.saveState(file, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
const obj = {
|
||||
offset,
|
||||
};
|
||||
fs.writeFile(
|
||||
offsetFileName, JSON.stringify(obj),
|
||||
'utf-8',
|
||||
err => {
|
||||
if (err) {
|
||||
this.logger.error('error saving', { err });
|
||||
return cb(err);
|
||||
}
|
||||
this.logger.info(`${fileName} saved: offset ${offset}`);
|
||||
return cb();
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = PersistFileInterface;
|
|
@ -0,0 +1,73 @@
|
|||
// fake backend for unit tests
|
||||
const assert = require('assert');
|
||||
const MemoryStream = require('memorystream');
|
||||
const werelogs = require('werelogs');
|
||||
|
||||
werelogs.configure({
|
||||
level: 'info',
|
||||
dump: 'error',
|
||||
});
|
||||
|
||||
class PersistMemInterface {
|
||||
|
||||
constructor() {
|
||||
this.memoryStreams = {};
|
||||
this.offsets = {};
|
||||
this.logger = new werelogs.Logger('PersistMemInterface');
|
||||
}
|
||||
|
||||
getOffset(filterName) {
|
||||
if (!this.offsets[filterName]) {
|
||||
return {};
|
||||
}
|
||||
return this.offsets[filterName];
|
||||
}
|
||||
|
||||
setOffset(filterName, offset) {
|
||||
if (!this.memoryStreams[filterName]) {
|
||||
this.memoryStreams[filterName] = new MemoryStream();
|
||||
}
|
||||
if (!this.offsets[filterName]) {
|
||||
this.offsets[filterName] = {};
|
||||
}
|
||||
Object.assign(
|
||||
this.offsets[filterName],
|
||||
offset);
|
||||
}
|
||||
|
||||
load(filterName, persistData, cb) {
|
||||
this.logger.info(`loading ${filterName}`);
|
||||
const stream = this.memoryStreams[filterName];
|
||||
const offset = this.offsets[filterName];
|
||||
if (stream === undefined) {
|
||||
this.logger.info(`${filterName} non-existent`);
|
||||
return cb(null, undefined);
|
||||
}
|
||||
assert(offset !== undefined);
|
||||
persistData.loadState(stream, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
this.logger.info(`${filterName} loaded: offset ${offset}`);
|
||||
return cb(null, offset);
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
|
||||
save(filterName, persistData, offset, cb) {
|
||||
this.logger.info(`saving ${filterName} offset ${JSON.stringify(offset)}`);
|
||||
const stream = new MemoryStream();
|
||||
this.memoryStreams[filterName] = stream;
|
||||
persistData.saveState(stream, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
this.offsets[filterName] = offset;
|
||||
this.logger.info(`${filterName} saved: offset ${offset}`);
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = PersistMemInterface;
|
||||
|
|
@ -0,0 +1,283 @@
|
|||
// Ring backend that persists on Sproxyd and offsets on ZK
|
||||
const async = require('async');
|
||||
const { pipeline } = require('stream');
|
||||
const MemoryStream = require('memorystream');
|
||||
const zlib = require('zlib');
|
||||
const zookeeper = require('node-zookeeper-client');
|
||||
const Sproxy = require('sproxydclient');
|
||||
const werelogs = require('werelogs');
|
||||
|
||||
werelogs.configure({
|
||||
level: 'info',
|
||||
dump: 'error',
|
||||
});
|
||||
|
||||
class PersistRingInterface {
|
||||
|
||||
constructor(params) {
|
||||
let zkConnectionString = 'localhost:2181';
|
||||
if (params && params.zkConnectionString !== undefined) {
|
||||
zkConnectionString = params.zkConnectionString;
|
||||
}
|
||||
this.zkPath = '/persist-ring-interface';
|
||||
if (params && params.zkPath !== undefined) {
|
||||
this.zkPath = params.zkPath;
|
||||
}
|
||||
let spPath = '/proxy/DC1/'; // do not forget "/" at the end !!!
|
||||
if (params && params.spPath !== undefined) {
|
||||
spPath = params.spPath;
|
||||
}
|
||||
let spBootstrap = ['localhost:8181'];
|
||||
if (params && params.spBootstrap !== undefined) {
|
||||
spBootstrap = params.spBootstrap;
|
||||
}
|
||||
this.reqUid = 'persist-ring-interface-req-uid';
|
||||
this.logger = new werelogs.Logger('PersistRingInterface');
|
||||
this.zkClient = zookeeper.createClient(zkConnectionString);
|
||||
this.zkClient.connect();
|
||||
this.zkClient.on('error', err => {
|
||||
this.logger.error('error connecting', { err });
|
||||
});
|
||||
this.zkClient.once('connected', () => {
|
||||
this.logger.info('connected');
|
||||
});
|
||||
this.spClient = new Sproxy({
|
||||
bootstrap: spBootstrap,
|
||||
path: spPath,
|
||||
});
|
||||
}
|
||||
|
||||
getZKPath(filterName) {
|
||||
return `${this.zkPath}/${filterName}`;
|
||||
}
|
||||
|
||||
load(filterName, persistData, cb) {
|
||||
this.logger.info(`loading ${filterName}`);
|
||||
async.waterfall([
|
||||
/*
|
||||
* Check of we have an existing Zookeeper node
|
||||
*/
|
||||
next => {
|
||||
this.zkClient.getData(
|
||||
this.getZKPath(filterName),
|
||||
(err, data) => {
|
||||
if (err) {
|
||||
if (err.name === 'NO_NODE') {
|
||||
this.logger.info(`${filterName} non-existent`);
|
||||
} else {
|
||||
this.logger.error(`getData ${filterName} error`, { err });
|
||||
}
|
||||
return next(err);
|
||||
}
|
||||
return next(null, data);
|
||||
});
|
||||
},
|
||||
/*
|
||||
* Extract the Sproxyd key from the Zookeeper node.
|
||||
* Read the key from Sproxyd.
|
||||
*/
|
||||
(data, next) => {
|
||||
const _data = JSON.parse(data.toString());
|
||||
this.spClient.get(
|
||||
_data.key,
|
||||
undefined,
|
||||
this.reqUid,
|
||||
(err, stream) => {
|
||||
if (err) {
|
||||
this.logger.error(`sproxyd ${filterName} error`, { err });
|
||||
return next(err);
|
||||
}
|
||||
return next(null, _data, stream);
|
||||
});
|
||||
},
|
||||
/*
|
||||
* Uncompress the stream in memory
|
||||
*/
|
||||
(_data, stream, next) => {
|
||||
const ostream = new MemoryStream();
|
||||
pipeline(
|
||||
stream,
|
||||
zlib.createGunzip(),
|
||||
ostream,
|
||||
err => {
|
||||
if (err) {
|
||||
this.logger.error(`pipeline ${filterName} error`, { err });
|
||||
return next(err);
|
||||
}
|
||||
return next(null, _data, ostream);
|
||||
});
|
||||
},
|
||||
/*
|
||||
* Load the state from uncompressed stream
|
||||
*/
|
||||
(_data, stream, next) => {
|
||||
persistData.loadState(stream, err => {
|
||||
if (err) {
|
||||
this.logger.error(`load ${filterName} error`, { err });
|
||||
return next(err);
|
||||
}
|
||||
this.logger.info(`${filterName} loaded: offset ${_data.offset}`);
|
||||
return next(null, _data);
|
||||
});
|
||||
}], (err, _data) => {
|
||||
if (err) {
|
||||
if (err.name === 'NO_NODE') {
|
||||
return cb(null, undefined);
|
||||
}
|
||||
this.logger.error(`load ${filterName} error`, { err });
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, _data.offset);
|
||||
});
|
||||
}
|
||||
|
||||
save(filterName, persistData, offset, cb) {
|
||||
this.logger.info(`saving ${filterName} offset ${offset}`);
|
||||
async.waterfall([
|
||||
/*
|
||||
* Save the state in a memory stream
|
||||
*/
|
||||
next => {
|
||||
const stream = new MemoryStream();
|
||||
persistData.saveState(
|
||||
stream, err => {
|
||||
if (err) {
|
||||
this.logger.error(`save ${filterName} error`, { err });
|
||||
return next(err);
|
||||
}
|
||||
return next(null, stream);
|
||||
});
|
||||
},
|
||||
/*
|
||||
* Compress the state in memory
|
||||
*/
|
||||
(stream, next) => {
|
||||
const ostream = new MemoryStream();
|
||||
pipeline(
|
||||
stream,
|
||||
zlib.createGzip(),
|
||||
ostream,
|
||||
err => {
|
||||
if (err) {
|
||||
this.logger.error(`pipeline ${filterName} error`, { err });
|
||||
return next(err);
|
||||
}
|
||||
return next(null, ostream);
|
||||
});
|
||||
},
|
||||
/*
|
||||
* Store the state in Sproxyd
|
||||
*/
|
||||
(stream, next) => {
|
||||
const parameters = {
|
||||
filterName,
|
||||
namespace: 'persist-ring-interface',
|
||||
owner: 'persist-ring-interface',
|
||||
};
|
||||
const size = stream._readableState.length;
|
||||
this.spClient.put(
|
||||
stream,
|
||||
size,
|
||||
parameters,
|
||||
this.reqUid,
|
||||
(err, key) => {
|
||||
if (err) {
|
||||
this.logger.error(`sproxyd put ${filterName} error`, { err });
|
||||
return next(err);
|
||||
}
|
||||
const newData = {};
|
||||
newData.offset = offset;
|
||||
newData.key = key;
|
||||
return next(null, newData);
|
||||
});
|
||||
},
|
||||
/*
|
||||
* Check if the Zookeeper node exists
|
||||
*/
|
||||
(newData, next) => {
|
||||
this.zkClient.exists(
|
||||
this.getZKPath(filterName),
|
||||
(err, stat) => {
|
||||
if (err) {
|
||||
this.logger.error(`exists ${filterName} error`, { err });
|
||||
return next(err);
|
||||
}
|
||||
let doesExist = false;
|
||||
if (stat) {
|
||||
doesExist = true;
|
||||
}
|
||||
return next(null, newData, doesExist);
|
||||
});
|
||||
},
|
||||
/*
|
||||
* If the Zookeeper node exists read it.
|
||||
* Else create it.
|
||||
*/
|
||||
(newData, doesExist, next) => {
|
||||
if (doesExist) {
|
||||
this.zkClient.getData(
|
||||
this.getZKPath(filterName),
|
||||
(err, _oldData) => {
|
||||
if (err) {
|
||||
this.logger.error(`getData ${filterName} error`, { err });
|
||||
return next(err);
|
||||
}
|
||||
const oldData = JSON.parse(_oldData);
|
||||
return next(null, newData, oldData);
|
||||
});
|
||||
} else {
|
||||
this.zkClient.mkdirp(
|
||||
this.getZKPath(filterName),
|
||||
null,
|
||||
err => {
|
||||
if (err) {
|
||||
this.logger.error(`mkdirp ${filterName} error`, { err });
|
||||
return next(err);
|
||||
}
|
||||
return next(null, newData, null);
|
||||
});
|
||||
}
|
||||
},
|
||||
/*
|
||||
* Store the context in the Zookeeper node and delete the old sproxyd key.
|
||||
*/
|
||||
(newData, oldData, next) => {
|
||||
const _newData = JSON.stringify(newData);
|
||||
this.zkClient.setData(
|
||||
this.getZKPath(filterName),
|
||||
Buffer.from(_newData),
|
||||
err => {
|
||||
if (err) {
|
||||
this.logger.error(`setData ${filterName} error`, { err });
|
||||
return cb(err);
|
||||
}
|
||||
this.logger.info(`${filterName} saved: new key ${newData.key} offset ${offset}`);
|
||||
if (oldData) {
|
||||
this.spClient.delete(
|
||||
oldData.key,
|
||||
this.reqUid,
|
||||
err => {
|
||||
if (err) {
|
||||
this.logger.error(
|
||||
`sproxyd del ${filterName} old key ${oldData.key} error`,
|
||||
{ err });
|
||||
return next(err);
|
||||
}
|
||||
return next();
|
||||
});
|
||||
} else {
|
||||
return next();
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
}], err => {
|
||||
if (err) {
|
||||
this.logger.error(`save ${filterName} error`, { err });
|
||||
return cb(err);
|
||||
}
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = PersistRingInterface;
|
|
@ -3,7 +3,7 @@
|
|||
"engines": {
|
||||
"node": ">=16"
|
||||
},
|
||||
"version": "7.10.13",
|
||||
"version": "7.10.14",
|
||||
"description": "Common utilities for the S3 project components",
|
||||
"main": "build/index.js",
|
||||
"repository": {
|
||||
|
@ -36,8 +36,10 @@
|
|||
"ipaddr.js": "1.9.1",
|
||||
"level": "~5.0.1",
|
||||
"level-sublevel": "~6.6.5",
|
||||
"memorystream": "^0.3.1",
|
||||
"mongodb": "^3.0.1",
|
||||
"node-forge": "^0.7.1",
|
||||
"node-zookeeper-client": "^0.2.2",
|
||||
"prom-client": "10.2.3",
|
||||
"simple-glob": "^0.2",
|
||||
"socket.io": "~2.3.0",
|
||||
|
@ -57,6 +59,7 @@
|
|||
"@sinonjs/fake-timers": "^6.0.1",
|
||||
"@types/jest": "^27.4.1",
|
||||
"@types/node": "^17.0.21",
|
||||
"argparse": "^2.0.1",
|
||||
"eslint": "2.13.1",
|
||||
"eslint-config-airbnb": "6.2.0",
|
||||
"eslint-config-scality": "scality/Guidelines#7.10.2",
|
||||
|
@ -64,6 +67,7 @@
|
|||
"jest": "^27.5.1",
|
||||
"mocha": "8.0.1",
|
||||
"mongodb-memory-server": "^6.0.2",
|
||||
"prando": "^6.0.1",
|
||||
"sinon": "^9.0.2",
|
||||
"temp": "0.9.1",
|
||||
"ts-jest": "^27.1.3",
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
{
|
||||
"acl": {
|
||||
"Canned": "private",
|
||||
"FULL_CONTROL": [],
|
||||
"WRITE": [],
|
||||
"WRITE_ACP": [],
|
||||
"READ": [],
|
||||
"READ_ACP": []
|
||||
},
|
||||
"name": "BucketName",
|
||||
"owner": "9d8fe19a78974c56dceb2ea4a8f01ed0f5fecb9d29f80e9e3b84104e4a3ea520",
|
||||
"ownerDisplayName": "anonymousCoward",
|
||||
"creationDate": "2018-06-04T17:45:42.592Z",
|
||||
"mdBucketModelVersion": 8,
|
||||
"transient": false,
|
||||
"deleted": false,
|
||||
"serverSideEncryption": null,
|
||||
"versioningConfiguration": null,
|
||||
"websiteConfiguration": null,
|
||||
"locationConstraint": "us-east-1",
|
||||
"readLocationConstraint": "us-east-1",
|
||||
"cors": null,
|
||||
"replicationConfiguration": null,
|
||||
"lifecycleConfiguration": null,
|
||||
"uid": "fea97818-6a9a-11e8-9777-e311618cc5d4"
|
||||
}
|
|
@ -0,0 +1,268 @@
|
|||
const async = require('async');
|
||||
const assert = require('assert');
|
||||
const Prando = require('prando');
|
||||
const werelogs = require('werelogs');
|
||||
const MetadataWrapper = require('../../../../lib/storage/metadata/MetadataWrapper');
|
||||
const fakeBucketInfo = require('./FakeBucketInfo.json');
|
||||
|
||||
werelogs.configure({
|
||||
level: 'info',
|
||||
dump: 'error',
|
||||
});
|
||||
|
||||
class Injector {
|
||||
|
||||
/**
|
||||
* @constructor
|
||||
*
|
||||
* @param {Object} backend - Metadata backend to use for injection
|
||||
* @param {Object} logger - Logger to use
|
||||
*/
|
||||
constructor(backend, logger) {
|
||||
this.backend = backend;
|
||||
this.rnd = new Prando(0);
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
get opPut() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
get opDelete() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
genKey(len) {
|
||||
const characters =
|
||||
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
|
||||
return this.rnd.nextString(len, characters);
|
||||
}
|
||||
|
||||
genBase16(len) {
|
||||
const characters = 'abcdef0123456789';
|
||||
return this.rnd.nextString(len, characters);
|
||||
}
|
||||
|
||||
_zeroPad(s, n, width) {
|
||||
return s + n.toString().padStart(width, '0');
|
||||
}
|
||||
|
||||
/**
|
||||
* Deterministic injection of puts or deletes according to parameters
|
||||
*
|
||||
* @param {String} bucketName - bucket name to inject to
|
||||
* @param {Object} params - parameters for injection
|
||||
* @param {Array} inputKeys - optional keys to use as input
|
||||
* @param {Array} outputKeys - optional generated keys as output
|
||||
* @param {Array} outputValues - optional generated values as output
|
||||
* @param {function} cb - callback when done
|
||||
*
|
||||
* @return {undefined}
|
||||
*/
|
||||
inject(bucketName, params, inputKeys, outputKeys, outputValues, cb) {
|
||||
let maxKeyLen = 1024;
|
||||
if (params.maxKeyLen !== undefined) {
|
||||
maxKeyLen = params.maxKeyLen;
|
||||
}
|
||||
async.timesLimit(
|
||||
params.numKeys,
|
||||
10,
|
||||
(n, next) => {
|
||||
let key;
|
||||
if (inputKeys) {
|
||||
const idx = this.rnd.nextInt(0, inputKeys.length - 1);
|
||||
key = inputKeys[idx];
|
||||
inputKeys.splice(idx, 1);
|
||||
} else {
|
||||
if (params.randomKey) {
|
||||
const len = this.rnd.nextInt(1, maxKeyLen);
|
||||
key = this.genKey(len);
|
||||
} else {
|
||||
let x;
|
||||
if (params.randomSeq) {
|
||||
x = this.rnd.nextInt(0, params.maxSeq - 1);
|
||||
} else {
|
||||
x = n;
|
||||
}
|
||||
key = this._zeroPad(params.prefix, x, 10) +
|
||||
params.suffix;
|
||||
}
|
||||
}
|
||||
if (outputKeys) {
|
||||
outputKeys.push(key);
|
||||
}
|
||||
// eslint-disable-next-line
|
||||
const value = {
|
||||
versionId: this.genBase16(32),
|
||||
'content-length': this.rnd.nextInt(0, 5000000),
|
||||
'content-md5': this.genBase16(32),
|
||||
};
|
||||
if (outputValues) {
|
||||
outputValues.push(value);
|
||||
}
|
||||
if (params.op === this.opPut) {
|
||||
this.backend.putObjectMD(
|
||||
bucketName,
|
||||
key,
|
||||
value,
|
||||
{},
|
||||
this.logger,
|
||||
next);
|
||||
return undefined;
|
||||
} else if (params.op === this.opDelete) {
|
||||
this.backend.deleteObjectMD(
|
||||
bucketName,
|
||||
key,
|
||||
{},
|
||||
this.logger,
|
||||
err => {
|
||||
if (err) {
|
||||
if (err.code !== 404) {
|
||||
return next(err);
|
||||
}
|
||||
}
|
||||
return next();
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
return next(new Error('unknow op'));
|
||||
},
|
||||
err => {
|
||||
if (err) {
|
||||
// eslint-disable-next-line
|
||||
console.error('inject error', err);
|
||||
process.exit(1);
|
||||
}
|
||||
if (cb) {
|
||||
return cb();
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Injector;
|
||||
|
||||
describe('Injector', () => {
|
||||
const fakeBucket = 'fake';
|
||||
const logger = new werelogs.Logger('Injector');
|
||||
const memBackend = new MetadataWrapper(
|
||||
'mem', {}, null, logger);
|
||||
|
||||
before(done => {
|
||||
memBackend.createBucket(fakeBucket, fakeBucketInfo, logger, done);
|
||||
});
|
||||
|
||||
after(done => {
|
||||
memBackend.deleteBucket(fakeBucket, logger, done);
|
||||
});
|
||||
|
||||
it('zeropad', () => {
|
||||
const injector = new Injector(memBackend, logger);
|
||||
assert(injector._zeroPad('foo', 42, 10) === 'foo0000000042');
|
||||
});
|
||||
|
||||
it('inject inputKeys', done => {
|
||||
const injector = new Injector(memBackend, logger);
|
||||
const inputKeys = ['foo1', 'foo2', 'foo3'];
|
||||
const outputKeys = [];
|
||||
injector.inject(
|
||||
fakeBucket,
|
||||
{
|
||||
op: injector.opPut,
|
||||
numKeys: 3,
|
||||
},
|
||||
inputKeys,
|
||||
outputKeys,
|
||||
null,
|
||||
err => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
assert.deepEqual(outputKeys, ['foo2', 'foo1', 'foo3']);
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
it('inject sequence', done => {
|
||||
const injector = new Injector(memBackend, logger);
|
||||
const outputKeys = [];
|
||||
injector.inject(
|
||||
fakeBucket,
|
||||
{
|
||||
prefix: 'foo',
|
||||
suffix: 'x',
|
||||
randomSeq: true,
|
||||
maxSeq: 10,
|
||||
op: injector.opPut,
|
||||
numKeys: 3,
|
||||
},
|
||||
null,
|
||||
outputKeys,
|
||||
null,
|
||||
err => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
assert.deepEqual(
|
||||
outputKeys,
|
||||
[
|
||||
'foo0000000005x',
|
||||
'foo0000000001x',
|
||||
'foo0000000000x',
|
||||
]);
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
it('inject random', done => {
|
||||
const injector = new Injector(memBackend, logger);
|
||||
const outputKeys = [];
|
||||
const outputValues = [];
|
||||
injector.inject(
|
||||
fakeBucket,
|
||||
{
|
||||
prefix: 'foo',
|
||||
suffix: 'x',
|
||||
randomKey: true,
|
||||
maxKeyLen: 10,
|
||||
op: injector.opPut,
|
||||
numKeys: 3,
|
||||
},
|
||||
null,
|
||||
outputKeys,
|
||||
outputValues,
|
||||
err => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
assert.deepEqual(
|
||||
outputKeys,
|
||||
[
|
||||
'f5TJ6X',
|
||||
'7T',
|
||||
'LStNJxHS8',
|
||||
]);
|
||||
assert.deepEqual(
|
||||
outputValues,
|
||||
[
|
||||
{
|
||||
'content-length': 3009012,
|
||||
'content-md5': '60f7abfdc5855e00a6e6dc2918bda7a8',
|
||||
'versionId': 'f13938435117885f7f88d7636a3b238e',
|
||||
},
|
||||
{
|
||||
'content-length': 3984521,
|
||||
'content-md5': '7cfc7e8b82826b83e302d18fa0e24b12',
|
||||
'versionId': 'f7f5c4973bc353ad8a9d5084fc1f0dc3',
|
||||
},
|
||||
{
|
||||
'content-length': 4112702,
|
||||
'content-md5': '6d4fe78b11cfa4ff7b2efaba5c5965fe',
|
||||
'versionId': '5540954e05a7910abeb72b8393c93afe',
|
||||
},
|
||||
]);
|
||||
return done();
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,216 @@
|
|||
const async = require('async');
|
||||
const fakeBucketInfo = require('../FakeBucketInfo.json');
|
||||
const MetadataWrapper = require('../../../../../lib/storage/metadata/MetadataWrapper');
|
||||
const BucketdOplogInterface = require('../../../../../lib/storage/metadata/oplog/BucketdOplogInterface');
|
||||
const PersistMemInterface = require('../../../../../lib/storage/metadata/oplog/PersistMemInterface');
|
||||
const Injector = require('../Injector');
|
||||
const http = require('http');
|
||||
const url = require('url');
|
||||
const werelogs = require('werelogs');
|
||||
|
||||
werelogs.configure({
|
||||
level: 'info',
|
||||
dump: 'error',
|
||||
});
|
||||
|
||||
class PersistDataInterface {
|
||||
|
||||
constructor() {
|
||||
this.data = null;
|
||||
}
|
||||
|
||||
initState(cb) {
|
||||
this.data = {};
|
||||
return process.nextTick(cb);
|
||||
}
|
||||
|
||||
loadState(stream, cb) {
|
||||
const chunks = [];
|
||||
stream.on('data', chunk => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
this.data = JSON.parse(Buffer.concat(chunks));
|
||||
return process.nextTick(cb);
|
||||
});
|
||||
}
|
||||
|
||||
saveState(stream, cb) {
|
||||
stream.write(JSON.stringify(this.data));
|
||||
stream.end();
|
||||
return process.nextTick(cb);
|
||||
}
|
||||
|
||||
updateState(addQueue, deleteQueue, cb) {
|
||||
return process.nextTick(cb);
|
||||
}
|
||||
}
|
||||
|
||||
describe('BucketdOplogInterface', () => {
|
||||
const logger = new werelogs.Logger('BucketOplogInterface');
|
||||
|
||||
const fakePort = 9090;
|
||||
const fakeBucket = 'fake';
|
||||
const fakeRaftId = 2;
|
||||
const numObjs = 20000;
|
||||
const fakeCseq = 20001;
|
||||
let oplogInjected = false;
|
||||
const numOplogSeqs = 100;
|
||||
const oplogBatchSize = 2;
|
||||
const endCseq = fakeCseq + numOplogSeqs;
|
||||
const maxLimit = 2;
|
||||
const oplogKeys = [];
|
||||
const oplogValues = [];
|
||||
let oplogKeysIdx = 0;
|
||||
|
||||
const memBackend = new MetadataWrapper(
|
||||
'mem', {}, null, logger);
|
||||
const injector = new Injector(memBackend, logger);
|
||||
|
||||
const requestListener = (req, res) => {
|
||||
const _url = url.parse(req.url, true);
|
||||
if (_url.pathname === `/_/buckets/${fakeBucket}`) {
|
||||
res.writeHead(200);
|
||||
res.end(JSON.stringify(
|
||||
{
|
||||
raftSessionId: fakeRaftId,
|
||||
creating: false,
|
||||
deleting: false,
|
||||
version: 0,
|
||||
}));
|
||||
} else if (_url.pathname === `/_/raft_sessions/${fakeRaftId}/log`) {
|
||||
const begin = _url.query.begin;
|
||||
const limit = _url.query.limit;
|
||||
if (begin === '1' && limit === '1') {
|
||||
res.writeHead(200);
|
||||
res.end(JSON.stringify(
|
||||
{
|
||||
info: {
|
||||
start: 1,
|
||||
cseq: fakeCseq,
|
||||
prune: 1,
|
||||
},
|
||||
}));
|
||||
} else {
|
||||
const realLimit = Math.min(limit, maxLimit);
|
||||
async.until(
|
||||
() => oplogInjected,
|
||||
next => {
|
||||
// inject similar but different random objects
|
||||
injector.inject(
|
||||
fakeBucket,
|
||||
{
|
||||
numKeys: numOplogSeqs * oplogBatchSize,
|
||||
maxSeq: numObjs,
|
||||
op: injector.opPut,
|
||||
randomSeq: true,
|
||||
prefix: 'obj_',
|
||||
suffix: '_bis',
|
||||
},
|
||||
null,
|
||||
oplogKeys,
|
||||
oplogValues,
|
||||
err => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
oplogInjected = true;
|
||||
return next();
|
||||
});
|
||||
}, err => {
|
||||
if (err) {
|
||||
res.writeHead(404);
|
||||
res.end('error', err);
|
||||
return undefined;
|
||||
}
|
||||
if (begin < endCseq) {
|
||||
res.writeHead(200);
|
||||
const resp = {};
|
||||
resp.info = {
|
||||
start: begin,
|
||||
cseq: endCseq,
|
||||
prune: 1,
|
||||
};
|
||||
resp.log = [];
|
||||
for (let i = 0; i < realLimit; i++) {
|
||||
resp.log[i] = {};
|
||||
resp.log[i].db = fakeBucket;
|
||||
resp.log[i].method = 8;
|
||||
resp.log[i].entries = [];
|
||||
for (let j = 0; j < oplogBatchSize; j++) {
|
||||
resp.log[i].entries[j] = {};
|
||||
resp.log[i].entries[j].key = oplogKeys[oplogKeysIdx];
|
||||
resp.log[i].entries[j].value = oplogValues[oplogKeysIdx];
|
||||
oplogKeysIdx++;
|
||||
}
|
||||
}
|
||||
res.end(JSON.stringify(resp));
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
} else if (_url.pathname === `/default/bucket/${fakeBucket}`) {
|
||||
const marker = _url.query.marker === '' ? null : _url.query.marker;
|
||||
const maxKeys = parseInt(_url.query.maxKeys, 10);
|
||||
memBackend.listObjects(fakeBucket, {
|
||||
listingType: 'Delimiter',
|
||||
marker,
|
||||
maxKeys,
|
||||
}, (err, result) => {
|
||||
if (err) {
|
||||
res.writeHead(404);
|
||||
res.end('error', err);
|
||||
return undefined;
|
||||
}
|
||||
res.writeHead(200);
|
||||
res.end(JSON.stringify(result));
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
before(done => {
|
||||
const server = http.createServer(requestListener);
|
||||
server.listen(fakePort);
|
||||
async.waterfall([
|
||||
next => memBackend.createBucket(fakeBucket, fakeBucketInfo, logger, next),
|
||||
next => injector.inject(
|
||||
fakeBucket,
|
||||
{
|
||||
numKeys: numObjs,
|
||||
maxSeq: numObjs,
|
||||
op: injector.opPut,
|
||||
randomSeq: false,
|
||||
prefix: 'obj_',
|
||||
suffix: '',
|
||||
},
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
next),
|
||||
], done);
|
||||
});
|
||||
|
||||
after(done => {
|
||||
memBackend.deleteBucket(fakeBucket, logger, done);
|
||||
});
|
||||
|
||||
it('simulation', done => {
|
||||
const params = {
|
||||
bootstrap: [`localhost:${fakePort}`],
|
||||
persist: new PersistMemInterface(),
|
||||
persistData: new PersistDataInterface(),
|
||||
stopAt: numObjs + numOplogSeqs,
|
||||
};
|
||||
const bucketdOplog = new BucketdOplogInterface(params);
|
||||
bucketdOplog.start(
|
||||
{
|
||||
filterName: fakeBucket,
|
||||
filterType: 'bucket',
|
||||
bucket: {
|
||||
bucketName: fakeBucket,
|
||||
},
|
||||
},
|
||||
done);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,55 @@
|
|||
const assert = require('assert');
|
||||
const PersistMemInterface = require('../../../../../lib/storage/metadata/oplog/PersistMemInterface');
|
||||
|
||||
class PersistDataInterface {
|
||||
|
||||
constructor(obj) {
|
||||
this.obj = obj;
|
||||
}
|
||||
|
||||
loadState(stream, cb) {
|
||||
const chunks = [];
|
||||
stream.on('data', chunk => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
this.obj = JSON.parse(Buffer.concat(chunks));
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
|
||||
saveState(stream, cb) {
|
||||
stream.write(JSON.stringify(this.obj));
|
||||
stream.end();
|
||||
return cb();
|
||||
}
|
||||
}
|
||||
|
||||
describe('Persist Mem', () => {
|
||||
const persist = new PersistMemInterface();
|
||||
const filterName = 'foo';
|
||||
|
||||
it('basic operations', done => {
|
||||
const pd1 = new PersistDataInterface({
|
||||
foo: 'bar',
|
||||
bar: {
|
||||
qux: 42,
|
||||
quuux: false,
|
||||
},
|
||||
});
|
||||
const pd2 = new PersistDataInterface();
|
||||
persist.save(filterName, pd1, 42, err => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
persist.load(filterName, pd2, err => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
assert.deepEqual(pd1.obj, pd2.obj);
|
||||
return done();
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,116 @@
|
|||
/* eslint-disable no-console */
|
||||
const fs = require('fs');
|
||||
const { ArgumentParser } = require('argparse');
|
||||
const BucketdOplogInterface = require('../lib/storage/metadata/oplog/BucketdOplogInterface');
|
||||
const MongoOplogInterface = require('../lib/storage/metadata/oplog/MongoOplogInterface');
|
||||
const PersistMemInterface = require('../lib/storage/metadata/oplog/PersistMemInterface');
|
||||
const PersistFileInterface = require('../lib/storage/metadata/oplog/PersistFileInterface');
|
||||
const PersistRingInterface = require('../lib/storage/metadata/oplog/PersistRingInterface');
|
||||
|
||||
const parser = new ArgumentParser({
|
||||
description: 'Oplog CLI tool',
|
||||
});
|
||||
|
||||
parser.add_argument('-v', '--verbose', { action: 'store_true' });
|
||||
parser.add_argument('-c', '--config-file', { help: 'config file' });
|
||||
parser.add_argument('--oplog-interface', { help: 'bucketd|mongo' });
|
||||
parser.add_argument('--persist', { help: 'mem|file|ring' });
|
||||
parser.add_argument('--bucket', { help: 'bucket' });
|
||||
parser.add_argument('--start', { action: 'store_true' });
|
||||
|
||||
const args = parser.parse_args();
|
||||
|
||||
class PersistDataInterface {
|
||||
|
||||
constructor() {
|
||||
this.data = null;
|
||||
}
|
||||
|
||||
initState(cb) {
|
||||
this.data = {};
|
||||
return process.nextTick(cb);
|
||||
}
|
||||
|
||||
loadState(stream, cb) {
|
||||
const chunks = [];
|
||||
stream.on('data', chunk => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
this.data = JSON.parse(Buffer.concat(chunks));
|
||||
return process.nextTick(cb);
|
||||
});
|
||||
}
|
||||
|
||||
saveState(stream, cb) {
|
||||
stream.write(JSON.stringify(this.data));
|
||||
stream.end();
|
||||
return process.nextTick(cb);
|
||||
}
|
||||
|
||||
updateState(addQueue, deleteQueue, cb) {
|
||||
console.log('addQueue', addQueue, 'deleteQueue', deleteQueue);
|
||||
return process.nextTick(cb);
|
||||
}
|
||||
}
|
||||
|
||||
let config = {};
|
||||
|
||||
if (args.config_file !== undefined) {
|
||||
config = JSON.parse(fs.readFileSync(args.config_file, 'utf8'));
|
||||
}
|
||||
|
||||
let persist;
|
||||
if (args.persist === 'mem') {
|
||||
persist = new PersistMemInterface(config.persistMem);
|
||||
} else if (args.persist === 'file') {
|
||||
persist = new PersistFileInterface(config.persistFile);
|
||||
} else if (args.persist === 'ring') {
|
||||
persist = new PersistRingInterface(config.persistRing);
|
||||
} else {
|
||||
console.error(`invalid persist ${args.persist}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
let params = {
|
||||
persist,
|
||||
persistData: new PersistDataInterface(),
|
||||
};
|
||||
|
||||
let oplogInterface;
|
||||
if (args.oplog_interface === 'bucketd') {
|
||||
params = Object.assign(params, config.bucketdOplog);
|
||||
oplogInterface = new BucketdOplogInterface(params);
|
||||
} else if (args.oplog_interface === 'mongo') {
|
||||
params = Object.assign(params, config.mongoOplog);
|
||||
oplogInterface = new MongoOplogInterface(params);
|
||||
} else {
|
||||
console.error(`invalid oplog-interface ${args.oplog_interface}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (args.start) {
|
||||
if (args.bucket === undefined) {
|
||||
console.error('please provide bucket');
|
||||
process.exit(1);
|
||||
}
|
||||
oplogInterface.start(
|
||||
{
|
||||
filterName: args.bucket,
|
||||
filterType: 'bucket',
|
||||
bucket: {
|
||||
bucketName: args.bucket,
|
||||
},
|
||||
},
|
||||
err => {
|
||||
if (err) {
|
||||
console.error(err);
|
||||
process.exit(1);
|
||||
}
|
||||
console.log('exiting...');
|
||||
return;
|
||||
});
|
||||
} else {
|
||||
console.error('please provide an option');
|
||||
process.exit(1);
|
||||
}
|
28
yarn.lock
28
yarn.lock
|
@ -1682,6 +1682,11 @@ async@^3.2.0:
|
|||
resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9"
|
||||
integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g==
|
||||
|
||||
async@~0.2.7:
|
||||
version "0.2.10"
|
||||
resolved "https://registry.yarnpkg.com/async/-/async-0.2.10.tgz#b6bbe0b0674b9d719708ca38de8c237cb526c3d1"
|
||||
integrity sha1-trvgsGdLnXGXCMo43owjfLUmw9E=
|
||||
|
||||
async@~2.1.5:
|
||||
version "2.1.5"
|
||||
resolved "https://registry.yarnpkg.com/async/-/async-2.1.5.tgz#e587c68580994ac67fc56ff86d3ac56bdbe810bc"
|
||||
|
@ -4895,6 +4900,11 @@ memory-pager@^1.0.2:
|
|||
resolved "https://registry.yarnpkg.com/memory-pager/-/memory-pager-1.5.0.tgz#d8751655d22d384682741c972f2c3d6dfa3e66b5"
|
||||
integrity sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==
|
||||
|
||||
memorystream@^0.3.1:
|
||||
version "0.3.1"
|
||||
resolved "https://registry.yarnpkg.com/memorystream/-/memorystream-0.3.1.tgz#86d7090b30ce455d63fbae12dda51a47ddcaf9b2"
|
||||
integrity sha1-htcJCzDORV1j+64S3aUaR93K+bI=
|
||||
|
||||
merge-stream@^2.0.0:
|
||||
version "2.0.0"
|
||||
resolved "https://registry.yarnpkg.com/merge-stream/-/merge-stream-2.0.0.tgz#52823629a14dd00c9770fb6ad47dc6310f2c1f60"
|
||||
|
@ -5203,6 +5213,14 @@ node-releases@^2.0.2:
|
|||
resolved "https://registry.yarnpkg.com/node-releases/-/node-releases-2.0.2.tgz#7139fe71e2f4f11b47d4d2986aaf8c48699e0c01"
|
||||
integrity sha512-XxYDdcQ6eKqp/YjI+tb2C5WM2LgjnZrfYg4vgQt49EK268b6gYCHsBLrK2qvJo4FmCtqmKezb0WZFK4fkrZNsg==
|
||||
|
||||
node-zookeeper-client@^0.2.2:
|
||||
version "0.2.3"
|
||||
resolved "https://registry.yarnpkg.com/node-zookeeper-client/-/node-zookeeper-client-0.2.3.tgz#48c79129c56b8e898df9bd3bdad9e27dcad63851"
|
||||
integrity sha512-V4gVHxzQ42iwhkANpPryzfjmqi3Ql3xeO9E/px7W5Yi774WplU3YtqUpnvcL/eJit4UqcfuLOgZLkpf0BPhHmg==
|
||||
dependencies:
|
||||
async "~0.2.7"
|
||||
underscore "~1.4.4"
|
||||
|
||||
nopt@^5.0.0:
|
||||
version "5.0.0"
|
||||
resolved "https://registry.yarnpkg.com/nopt/-/nopt-5.0.0.tgz#530942bb58a512fccafe53fe210f13a25355dc88"
|
||||
|
@ -5508,6 +5526,11 @@ pluralize@^1.2.1:
|
|||
resolved "https://registry.yarnpkg.com/pluralize/-/pluralize-1.2.1.tgz#d1a21483fd22bb41e58a12fa3421823140897c45"
|
||||
integrity sha1-0aIUg/0iu0HlihL6NCGCMUCJfEU=
|
||||
|
||||
prando@^6.0.1:
|
||||
version "6.0.1"
|
||||
resolved "https://registry.yarnpkg.com/prando/-/prando-6.0.1.tgz#ffa8de84c2adc4975dd9df37ae4ada0458face53"
|
||||
integrity sha512-ghUWxQ1T9IJmPu6eshc3VU0OwveUtXQ33ZLXYUcz1Oc5ppKLDXKp0TBDj6b0epwhEctzcQSNGR2iHyvQSn4W5A==
|
||||
|
||||
prelude-ls@~1.1.2:
|
||||
version "1.1.2"
|
||||
resolved "https://registry.yarnpkg.com/prelude-ls/-/prelude-ls-1.1.2.tgz#21932a549f5e52ffd9a827f570e04be62a97da54"
|
||||
|
@ -6726,6 +6749,11 @@ unbzip2-stream@^1.0.9:
|
|||
buffer "^5.2.1"
|
||||
through "^2.3.8"
|
||||
|
||||
underscore@~1.4.4:
|
||||
version "1.4.4"
|
||||
resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.4.4.tgz#61a6a32010622afa07963bf325203cf12239d604"
|
||||
integrity sha1-YaajIBBiKvoHljvzJSA88SI51gQ=
|
||||
|
||||
underscore@~1.8.3:
|
||||
version "1.8.3"
|
||||
resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.8.3.tgz#4f3fb53b106e6097fcf9cb4109f2a5e9bdfa5022"
|
||||
|
|
Loading…
Reference in New Issue