Compare commits

...

1 Commits

Author SHA1 Message Date
Vianney Rancurel 108b1b3770 ft: ARSN-65 oplog pattern library
Snapshot-scan-oplog pattern with state persistence for applications
requiring reading the oplog
2022-04-11 18:12:59 -07:00
13 changed files with 1713 additions and 1 deletions

View File

@ -0,0 +1,251 @@
/*
* Main interface for bucketd oplog management
*/
const async = require('async');
const { RESTClient: BucketClient } = require('bucketclient');
const { jsutil, errors } = require('arsenal');
const LogConsumer = require('arsenal/lib/storage/metadata/bucketclient/LogConsumer');
const { isMasterKey } = require('arsenal/lib/versioning/Version');
const OplogInterface = require('./OplogInterface');
class BucketdOplogInterface extends OplogInterface {
constructor(params) {
super(params);
this.backendRetryTimes = 3;
this.backendRetryInterval = 300;
this.bucketdOplogQuerySize = 20;
this.stopAt = params?.stopAt ?? -1;
const bkBootstrap = params?.bootstrap ?? ['localhost:9000'];
this.bkClient = new BucketClient(bkBootstrap);
}
start(filter, cb) {
if (!(filter.filterType === 'bucket' ||
filter.filterType === 'raftSession')) {
return cb(errors.NotImplemented);
}
const filterName = filter.filterName;
async.waterfall([
/*
* In this step we get the raftId for filterName
*/
next => {
if (filter.filterType === 'raftSession') {
return next(null, filter.raftSession.raftId);
}
this.logger.info('obtaining raftId',
{ filterName });
async.retry(
{
times: this.backendRetryTimes,
interval: this.backendRetryInterval,
},
done => {
this.bkClient.getBucketInformation(
filter.bucket.bucketName,
null,
(err, info) => {
if (err) {
this.logger.info('retrying getBucketInformation', { err, filterName });
return done(err);
}
return done(null, JSON.parse(info));
});
},
(err, res) => {
if (err) {
this.logger.error('getBucketInformation too many failures', { err, filterName });
return next(err);
}
return next(null, res.raftSessionId);
});
return undefined;
},
/*
* In this step we get the stored offset if we have it
*/
(raftId, next) => {
let cseq = undefined;
this.persist.load(filterName, this.persistData, (err, offset) => {
if (err) {
return next(err);
}
cseq = offset;
return next(null, raftId, cseq);
});
},
/*
* In this step we acquire the offset if we don't already have it
*/
(raftId, cseq, next) => {
if (cseq !== undefined) {
this.logger.info(`skipping cseq acquisition (cseq=${cseq})`,
{ filterName });
return next(null, raftId, cseq, true);
}
this.logger.info('cseq acquisition',
{ filterName });
async.retry(
{
times: this.backendRetryTimes,
interval: this.backendRetryInterval,
},
done => {
this.bkClient.getRaftLog(
raftId,
1,
1,
true,
null,
(err, stream) => {
if (err) {
this.logger.info('retrying getRaftLog', { err, filterName });
return done(err);
}
const chunks = [];
stream.on('data', chunk => {
chunks.push(chunk);
});
stream.on('end', () => {
const info = JSON.parse(Buffer.concat(chunks));
return done(null, info);
});
return undefined;
});
},
(err, res) => {
if (err) {
this.logger.error('getRaftLog too many failures', { err, filterName });
return next(err);
}
return next(null, raftId, res.info.cseq, false);
});
return undefined;
},
/*
* In this step we init the state (e.g. scan)
*/
(raftId, cseq, skipInit, next) => {
if (skipInit) {
this.logger.info(`skipping state initialization cseq=${cseq}`,
{ filterName });
return next(null, raftId, cseq);
}
this.logger.info(`initializing state cseq=${cseq}`,
{ filterName });
this.persistData.initState(err => {
if (err) {
return next(err);
}
this.persist.save(
filterName, this.persistData, cseq, err => {
if (err) {
return next(err);
}
return next(null, raftId, cseq);
});
return undefined;
});
return undefined;
},
/*
* In this step we loop over the oplog
*/
(raftId, cseq, next) => {
this.logger.info(`reading oplog raftId=${raftId} cseq=${cseq}`,
{ filterName });
// only way to get out of the loop in all cases
const nextOnce = jsutil.once(next);
let doStop = false;
// resume reading the oplog from cseq. changes are idempotent
const logConsumer = new LogConsumer({
bucketClient: this.bkClient,
raftSession: raftId,
});
let _cseq = cseq;
async.until(
() => doStop,
_next => {
logConsumer.readRecords({
startSeq: _cseq,
limit: this.bucketdOplogQuerySize,
}, (err, record) => {
if (err) {
this.logger.error('readRecords error', { err, filterName });
return setTimeout(() => _next(), 5000);
}
if (!record.log) {
// nothing to read
return setTimeout(() => _next(), 5000);
}
const seqs = [];
record.log.on('data', chunk => {
seqs.push(chunk);
});
record.log.on('end', () => {
const addQueue = [];
const delQueue = [];
for (let i = 0; i < seqs.length; i++) {
if (filter.filterType === 'raftSession' ||
(filter.filterType === 'bucket' &&
seqs[i].db === filter.bucket.bucketName)) {
for (let j = 0; j < seqs[i].entries.length; j++) {
const _item = {};
_item.bucketName = seqs[i].db;
_item.key = seqs[i].entries[j].key;
if (seqs[i].entries[j].type !== undefined &&
seqs[i].entries[j].type === 'del') {
if (!isMasterKey(_item.key)) {
// ignore for now
return;
}
delQueue.push(_item);
} else {
_item.value = Object.assign({}, seqs[i].entries[j].value);
addQueue.push(_item);
}
}
}
}
this.persistData.updateState(
addQueue, delQueue, err => {
if (err) {
return _next(err);
}
_cseq += seqs.length;
this.persist.save(
filterName, this.persistData, _cseq, err => {
if (err) {
return _next(err);
}
if (_cseq > this.stopAt) {
doStop = true;
}
return _next();
});
return undefined;
});
});
return undefined;
});
}, err => {
if (err) {
return nextOnce(err);
}
return nextOnce();
});
},
], err => {
if (err) {
return cb(err);
}
this.logger.info('returning',
{ filterName });
return cb();
});
return undefined;
}
}
module.exports = BucketdOplogInterface;

View File

@ -0,0 +1,163 @@
/*
* Main interface for Mongo oplog management
*/
const MongoClient = require('mongodb').MongoClient;
const bson = require('bson');
const { jsutil, errors } = require('arsenal');
const async = require('async');
const { isMasterKey } = require('arsenal/lib/versioning/Version');
const OplogInterface = require('./OplogInterface');
class MongoOplogInterface extends OplogInterface {
constructor(params) {
super(params);
this.mongoDbUri = 'mongodb://localhost:27017';
if (params && params.mongoDbUri !== undefined) {
this.mongoDbUri = params.mongoDbUri;
}
this.databaseName = 'metadata';
if (params && params.databaseName !== undefined) {
this.databaseName = params.databaseName;
}
}
start(filter, cb) {
if (filter.filterType !== 'bucket') {
return cb(errors.NotImplemented);
}
const filterName = filter.filterName;
const bucketName = filter.bucket.bucketName;
let db;
let collection;
async.waterfall([
/*
* In this step we connect to MongoDB
*/
next => {
MongoClient.connect(
this.mongoDbUri,
(err, client) => {
if (err) {
this.logger.error('error connecting to mongodb', { err, filterName });
return next(err);
}
db = client.db(this.databaseName, {
ignoreUndefined: true,
});
collection = db.collection(bucketName);
return next();
});
},
/*
* In this step we get the stored offset if we have it
*/
next => {
let resumeToken = undefined;
this.persist.load(filterName, this.persistData, (err, offset) => {
if (err) {
return next(err);
}
if (offset && offset._data) {
resumeToken = {};
resumeToken._data = new bson.Binary(Buffer.from(offset._data, 'base64'));
}
return next(null, resumeToken);
});
},
/*
* In this step we acquire the offset if we don't already have it
*/
(resumeToken, next) => {
if (resumeToken !== undefined) {
this.logger.info(
`skipping resumeToken acquisition (resumeToken=${resumeToken})`,
{ filterName });
return next(null, resumeToken, true);
}
this.logger.info('resumeToken acquisition',
{ filterName });
const changeStream = collection.watch();
// big hack to extract resumeToken
changeStream.once('change', () => next(null, changeStream.resumeToken, false));
return undefined;
},
/*
* In this step we init the state (e.g. scan)
*/
(resumeToken, skipInit, next) => {
if (skipInit) {
this.logger.info(`skipping state initialization resumeToken=${resumeToken}`,
{ filterName });
return next(null, resumeToken);
}
this.logger.info(`initializing state resumeToken=${resumeToken}`,
{ filterName });
this.persistData.initState(
err => {
if (err) {
// eslint-disable-next-line
console.error(err);
process.exit(1);
}
this.persist.save(
filterName, this.persistData, resumeToken, err => {
if (err) {
return next(err);
}
return next(null, resumeToken);
});
return undefined;
});
return undefined;
},
/*
* In this step we loop over the oplog
*/
(resumeToken, next) => {
this.logger.info(`reading oplog resumeToken=${resumeToken}`,
{ filterName });
// only way to get out of the loop in all cases
const nextOnce = jsutil.once(next);
// read the change stream
const changeStream = collection.watch({ resumeAfter: resumeToken });
// start bufferization
this.filterName = filterName;
this.startFlusher();
changeStream.on(
'change', item => {
if (item.ns.db === this.databaseName) {
const _item = {};
_item.bucketName = bucketName;
_item.key = item.documentKey._id;
if (item.operationType === 'insert' ||
item.operationType === 'replace') {
_item.value = Object.assign({}, item.fullDocument.value);
this.addEvent(_item, changeStream.resumeToken);
} else if (item.operationType === 'delete') {
if (!isMasterKey(_item.key)) {
// ignore for now
return;
}
this.delEvent(_item, changeStream.resumeToken);
} else if (item.operationType === 'invalidate') {
nextOnce();
return;
} else {
return;
}
}
});
}], err => {
if (err) {
return cb(err);
}
this.logger.info('returning',
{ filterName });
return cb();
});
return undefined;
}
}
module.exports = MongoOplogInterface;

View File

@ -0,0 +1,138 @@
/*
* Interface for oplog management
*
* filter is an object with the following structure: {
* filterName: string,
* filterType: bucket|bucketList|raftSession,
* bucket: {
* bucketName: string,
* },
* bucketList: {
* bucketList: [string, ...]
* },
* raftSession: {
* raftId: number,
* },
* }
*
* persist is an interface with the following methods:
* - constructor(params)
* - load(filterName, persistData, cb(err, offset))
* - save(filterName, persistData, offset, cb(err))
* persistData is an interface with the following methods:
* - constuctor(params)
* - initState(cb(err)): initialize the structure, e.g. initial bucket scan
* - loadState(stream, cb(err)): load the state
* - saveState(stream, cb(err)): save the state
* - updateState(addQueue, delQueue, cb(err)): update the state
* item: { filterName, key, value }
*/
const werelogs = require('werelogs');
werelogs.configure({
level: 'info',
dump: 'error',
});
class OplogInterface {
constructor(params) {
this.persist = params?.persist;
this.persistData = params?.persistData;
this.logger = new werelogs.Logger('OplogInterface');
/* for backends requiring bufferization only */
this.bufferTimeoutMs = params?.bufferTimeoutMs ?? 500;
this.addQueue = [];
this.delQueue = [];
this.pending = false;
this.prevOffset = null;
this.offset = null;
}
addEvent(item, offset) {
this.addQueue.push(item);
this.offset = offset;
}
delEvent(item, offset) {
this.delQueue.push(item);
this.offset = offset;
}
/*
* Optional buffer management for backends that don't bufferize.
* It avoids persisting the state at each event
*/
flushQueue(cb) {
if (this.offset === null ||
this.prevOffset === this.offset) {
if (cb) {
return process.nextTick(cb);
}
return undefined;
}
if (this.pending) {
if (cb) {
return process.nextTick(cb);
}
return undefined;
}
this.pending = true;
const addQueue = this.addQueue;
this.addQueue = [];
const delQueue = this.delQueue;
this.delQueue = [];
const offset = this.offset;
this.prevOffset = this.offset;
this.persistData.updateState(
addQueue,
delQueue,
err => {
if (err) {
if (cb) {
return cb(err);
}
}
this.persist.save(
this.filterName,
this.persistData,
offset,
err => {
this.pending = false;
if (err) {
if (cb) {
return cb(err);
}
return undefined;
}
if (cb) {
return cb();
}
return undefined;
});
return undefined;
});
return undefined;
}
doFlush() {
this.flushQueue(err => {
if (err) {
this.logger.error('flusing buffer', { err });
}
});
this.startFlusher();
}
startFlusher() {
setTimeout(this.doFlush.bind(this), this.bufferTimeoutMs);
}
// method to be overridden
start() {
throw new Error('not implemented');
}
}
module.exports = OplogInterface;

View File

@ -0,0 +1,91 @@
const fs = require('fs');
const werelogs = require('werelogs');
werelogs.configure({
level: 'info',
dump: 'error',
});
class PersistFileInterface {
constructor() {
this.folder = '/tmp';
this.logger = new werelogs.Logger('PersistFileInterface');
fs.access(this.folder, err => {
if (err) {
fs.mkdirSync(this.folder, { recursive: true });
}
});
}
getFileName(filterName) {
return `${this.folder}/${filterName}.json`;
}
getOffsetFileName(filterName) {
return `${this.folder}/${filterName}.offset.json`;
}
load(filterName, persistData, cb) {
const fileName = this.getFileName(filterName);
const offsetFileName = this.getOffsetFileName(filterName);
let obj = {};
fs.readFile(
offsetFileName,
'utf-8', (err, data) => {
if (err) {
if (err.code === 'ENOENT') {
this.logger.info(`${offsetFileName} non-existent`);
} else {
this.logger.error('error loading', { err });
return cb(err);
}
} else {
obj = JSON.parse(data);
}
if (fs.existsSync(fileName)) {
const file = fs.createReadStream(fileName);
persistData.loadState(file, err => {
if (err) {
return cb(err);
}
this.logger.info(`${fileName} loaded: offset ${obj.offset}`);
return cb(null, obj.offset);
});
} else {
this.logger.info(`${fileName} non-existent`);
return cb(null, obj.offset);
}
return undefined;
});
}
save(filterName, persistData, offset, cb) {
const fileName = this.getFileName(filterName);
const offsetFileName = this.getOffsetFileName(filterName);
const file = fs.createWriteStream(fileName);
persistData.saveState(file, err => {
if (err) {
return cb(err);
}
const obj = {
offset,
};
fs.writeFile(
offsetFileName, JSON.stringify(obj),
'utf-8',
err => {
if (err) {
this.logger.error('error saving', { err });
return cb(err);
}
this.logger.info(`${fileName} saved: offset ${offset}`);
return cb();
});
return undefined;
});
return undefined;
}
}
module.exports = PersistFileInterface;

View File

@ -0,0 +1,73 @@
// fake backend for unit tests
const assert = require('assert');
const MemoryStream = require('memorystream');
const werelogs = require('werelogs');
werelogs.configure({
level: 'info',
dump: 'error',
});
class PersistMemInterface {
constructor() {
this.memoryStreams = {};
this.offsets = {};
this.logger = new werelogs.Logger('PersistMemInterface');
}
getOffset(filterName) {
if (!this.offsets[filterName]) {
return {};
}
return this.offsets[filterName];
}
setOffset(filterName, offset) {
if (!this.memoryStreams[filterName]) {
this.memoryStreams[filterName] = new MemoryStream();
}
if (!this.offsets[filterName]) {
this.offsets[filterName] = {};
}
Object.assign(
this.offsets[filterName],
offset);
}
load(filterName, persistData, cb) {
this.logger.info(`loading ${filterName}`);
const stream = this.memoryStreams[filterName];
const offset = this.offsets[filterName];
if (stream === undefined) {
this.logger.info(`${filterName} non-existent`);
return cb(null, undefined);
}
assert(offset !== undefined);
persistData.loadState(stream, err => {
if (err) {
return cb(err);
}
this.logger.info(`${filterName} loaded: offset ${offset}`);
return cb(null, offset);
});
return undefined;
}
save(filterName, persistData, offset, cb) {
this.logger.info(`saving ${filterName} offset ${JSON.stringify(offset)}`);
const stream = new MemoryStream();
this.memoryStreams[filterName] = stream;
persistData.saveState(stream, err => {
if (err) {
return cb(err);
}
this.offsets[filterName] = offset;
this.logger.info(`${filterName} saved: offset ${offset}`);
return cb();
});
}
}
module.exports = PersistMemInterface;

View File

@ -0,0 +1,283 @@
// Ring backend that persists on Sproxyd and offsets on ZK
const async = require('async');
const { pipeline } = require('stream');
const MemoryStream = require('memorystream');
const zlib = require('zlib');
const zookeeper = require('node-zookeeper-client');
const Sproxy = require('sproxydclient');
const werelogs = require('werelogs');
werelogs.configure({
level: 'info',
dump: 'error',
});
class PersistRingInterface {
constructor(params) {
let zkConnectionString = 'localhost:2181';
if (params && params.zkConnectionString !== undefined) {
zkConnectionString = params.zkConnectionString;
}
this.zkPath = '/persist-ring-interface';
if (params && params.zkPath !== undefined) {
this.zkPath = params.zkPath;
}
let spPath = '/proxy/DC1/'; // do not forget "/" at the end !!!
if (params && params.spPath !== undefined) {
spPath = params.spPath;
}
let spBootstrap = ['localhost:8181'];
if (params && params.spBootstrap !== undefined) {
spBootstrap = params.spBootstrap;
}
this.reqUid = 'persist-ring-interface-req-uid';
this.logger = new werelogs.Logger('PersistRingInterface');
this.zkClient = zookeeper.createClient(zkConnectionString);
this.zkClient.connect();
this.zkClient.on('error', err => {
this.logger.error('error connecting', { err });
});
this.zkClient.once('connected', () => {
this.logger.info('connected');
});
this.spClient = new Sproxy({
bootstrap: spBootstrap,
path: spPath,
});
}
getZKPath(filterName) {
return `${this.zkPath}/${filterName}`;
}
load(filterName, persistData, cb) {
this.logger.info(`loading ${filterName}`);
async.waterfall([
/*
* Check of we have an existing Zookeeper node
*/
next => {
this.zkClient.getData(
this.getZKPath(filterName),
(err, data) => {
if (err) {
if (err.name === 'NO_NODE') {
this.logger.info(`${filterName} non-existent`);
} else {
this.logger.error(`getData ${filterName} error`, { err });
}
return next(err);
}
return next(null, data);
});
},
/*
* Extract the Sproxyd key from the Zookeeper node.
* Read the key from Sproxyd.
*/
(data, next) => {
const _data = JSON.parse(data.toString());
this.spClient.get(
_data.key,
undefined,
this.reqUid,
(err, stream) => {
if (err) {
this.logger.error(`sproxyd ${filterName} error`, { err });
return next(err);
}
return next(null, _data, stream);
});
},
/*
* Uncompress the stream in memory
*/
(_data, stream, next) => {
const ostream = new MemoryStream();
pipeline(
stream,
zlib.createGunzip(),
ostream,
err => {
if (err) {
this.logger.error(`pipeline ${filterName} error`, { err });
return next(err);
}
return next(null, _data, ostream);
});
},
/*
* Load the state from uncompressed stream
*/
(_data, stream, next) => {
persistData.loadState(stream, err => {
if (err) {
this.logger.error(`load ${filterName} error`, { err });
return next(err);
}
this.logger.info(`${filterName} loaded: offset ${_data.offset}`);
return next(null, _data);
});
}], (err, _data) => {
if (err) {
if (err.name === 'NO_NODE') {
return cb(null, undefined);
}
this.logger.error(`load ${filterName} error`, { err });
return cb(err);
}
return cb(null, _data.offset);
});
}
save(filterName, persistData, offset, cb) {
this.logger.info(`saving ${filterName} offset ${offset}`);
async.waterfall([
/*
* Save the state in a memory stream
*/
next => {
const stream = new MemoryStream();
persistData.saveState(
stream, err => {
if (err) {
this.logger.error(`save ${filterName} error`, { err });
return next(err);
}
return next(null, stream);
});
},
/*
* Compress the state in memory
*/
(stream, next) => {
const ostream = new MemoryStream();
pipeline(
stream,
zlib.createGzip(),
ostream,
err => {
if (err) {
this.logger.error(`pipeline ${filterName} error`, { err });
return next(err);
}
return next(null, ostream);
});
},
/*
* Store the state in Sproxyd
*/
(stream, next) => {
const parameters = {
filterName,
namespace: 'persist-ring-interface',
owner: 'persist-ring-interface',
};
const size = stream._readableState.length;
this.spClient.put(
stream,
size,
parameters,
this.reqUid,
(err, key) => {
if (err) {
this.logger.error(`sproxyd put ${filterName} error`, { err });
return next(err);
}
const newData = {};
newData.offset = offset;
newData.key = key;
return next(null, newData);
});
},
/*
* Check if the Zookeeper node exists
*/
(newData, next) => {
this.zkClient.exists(
this.getZKPath(filterName),
(err, stat) => {
if (err) {
this.logger.error(`exists ${filterName} error`, { err });
return next(err);
}
let doesExist = false;
if (stat) {
doesExist = true;
}
return next(null, newData, doesExist);
});
},
/*
* If the Zookeeper node exists read it.
* Else create it.
*/
(newData, doesExist, next) => {
if (doesExist) {
this.zkClient.getData(
this.getZKPath(filterName),
(err, _oldData) => {
if (err) {
this.logger.error(`getData ${filterName} error`, { err });
return next(err);
}
const oldData = JSON.parse(_oldData);
return next(null, newData, oldData);
});
} else {
this.zkClient.mkdirp(
this.getZKPath(filterName),
null,
err => {
if (err) {
this.logger.error(`mkdirp ${filterName} error`, { err });
return next(err);
}
return next(null, newData, null);
});
}
},
/*
* Store the context in the Zookeeper node and delete the old sproxyd key.
*/
(newData, oldData, next) => {
const _newData = JSON.stringify(newData);
this.zkClient.setData(
this.getZKPath(filterName),
Buffer.from(_newData),
err => {
if (err) {
this.logger.error(`setData ${filterName} error`, { err });
return cb(err);
}
this.logger.info(`${filterName} saved: new key ${newData.key} offset ${offset}`);
if (oldData) {
this.spClient.delete(
oldData.key,
this.reqUid,
err => {
if (err) {
this.logger.error(
`sproxyd del ${filterName} old key ${oldData.key} error`,
{ err });
return next(err);
}
return next();
});
} else {
return next();
}
return undefined;
});
}], err => {
if (err) {
this.logger.error(`save ${filterName} error`, { err });
return cb(err);
}
return cb();
});
}
}
module.exports = PersistRingInterface;

View File

@ -3,7 +3,7 @@
"engines": {
"node": ">=16"
},
"version": "7.10.13",
"version": "7.10.14",
"description": "Common utilities for the S3 project components",
"main": "build/index.js",
"repository": {
@ -36,8 +36,10 @@
"ipaddr.js": "1.9.1",
"level": "~5.0.1",
"level-sublevel": "~6.6.5",
"memorystream": "^0.3.1",
"mongodb": "^3.0.1",
"node-forge": "^0.7.1",
"node-zookeeper-client": "^0.2.2",
"prom-client": "10.2.3",
"simple-glob": "^0.2",
"socket.io": "~2.3.0",
@ -57,6 +59,7 @@
"@sinonjs/fake-timers": "^6.0.1",
"@types/jest": "^27.4.1",
"@types/node": "^17.0.21",
"argparse": "^2.0.1",
"eslint": "2.13.1",
"eslint-config-airbnb": "6.2.0",
"eslint-config-scality": "scality/Guidelines#7.10.2",
@ -64,6 +67,7 @@
"jest": "^27.5.1",
"mocha": "8.0.1",
"mongodb-memory-server": "^6.0.2",
"prando": "^6.0.1",
"sinon": "^9.0.2",
"temp": "0.9.1",
"ts-jest": "^27.1.3",

View File

@ -0,0 +1,26 @@
{
"acl": {
"Canned": "private",
"FULL_CONTROL": [],
"WRITE": [],
"WRITE_ACP": [],
"READ": [],
"READ_ACP": []
},
"name": "BucketName",
"owner": "9d8fe19a78974c56dceb2ea4a8f01ed0f5fecb9d29f80e9e3b84104e4a3ea520",
"ownerDisplayName": "anonymousCoward",
"creationDate": "2018-06-04T17:45:42.592Z",
"mdBucketModelVersion": 8,
"transient": false,
"deleted": false,
"serverSideEncryption": null,
"versioningConfiguration": null,
"websiteConfiguration": null,
"locationConstraint": "us-east-1",
"readLocationConstraint": "us-east-1",
"cors": null,
"replicationConfiguration": null,
"lifecycleConfiguration": null,
"uid": "fea97818-6a9a-11e8-9777-e311618cc5d4"
}

View File

@ -0,0 +1,268 @@
const async = require('async');
const assert = require('assert');
const Prando = require('prando');
const werelogs = require('werelogs');
const MetadataWrapper = require('../../../../lib/storage/metadata/MetadataWrapper');
const fakeBucketInfo = require('./FakeBucketInfo.json');
werelogs.configure({
level: 'info',
dump: 'error',
});
class Injector {
/**
* @constructor
*
* @param {Object} backend - Metadata backend to use for injection
* @param {Object} logger - Logger to use
*/
constructor(backend, logger) {
this.backend = backend;
this.rnd = new Prando(0);
this.logger = logger;
}
get opPut() {
return 1;
}
get opDelete() {
return 0;
}
genKey(len) {
const characters =
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
return this.rnd.nextString(len, characters);
}
genBase16(len) {
const characters = 'abcdef0123456789';
return this.rnd.nextString(len, characters);
}
_zeroPad(s, n, width) {
return s + n.toString().padStart(width, '0');
}
/**
* Deterministic injection of puts or deletes according to parameters
*
* @param {String} bucketName - bucket name to inject to
* @param {Object} params - parameters for injection
* @param {Array} inputKeys - optional keys to use as input
* @param {Array} outputKeys - optional generated keys as output
* @param {Array} outputValues - optional generated values as output
* @param {function} cb - callback when done
*
* @return {undefined}
*/
inject(bucketName, params, inputKeys, outputKeys, outputValues, cb) {
let maxKeyLen = 1024;
if (params.maxKeyLen !== undefined) {
maxKeyLen = params.maxKeyLen;
}
async.timesLimit(
params.numKeys,
10,
(n, next) => {
let key;
if (inputKeys) {
const idx = this.rnd.nextInt(0, inputKeys.length - 1);
key = inputKeys[idx];
inputKeys.splice(idx, 1);
} else {
if (params.randomKey) {
const len = this.rnd.nextInt(1, maxKeyLen);
key = this.genKey(len);
} else {
let x;
if (params.randomSeq) {
x = this.rnd.nextInt(0, params.maxSeq - 1);
} else {
x = n;
}
key = this._zeroPad(params.prefix, x, 10) +
params.suffix;
}
}
if (outputKeys) {
outputKeys.push(key);
}
// eslint-disable-next-line
const value = {
versionId: this.genBase16(32),
'content-length': this.rnd.nextInt(0, 5000000),
'content-md5': this.genBase16(32),
};
if (outputValues) {
outputValues.push(value);
}
if (params.op === this.opPut) {
this.backend.putObjectMD(
bucketName,
key,
value,
{},
this.logger,
next);
return undefined;
} else if (params.op === this.opDelete) {
this.backend.deleteObjectMD(
bucketName,
key,
{},
this.logger,
err => {
if (err) {
if (err.code !== 404) {
return next(err);
}
}
return next();
});
return undefined;
}
return next(new Error('unknow op'));
},
err => {
if (err) {
// eslint-disable-next-line
console.error('inject error', err);
process.exit(1);
}
if (cb) {
return cb();
}
return undefined;
});
}
}
module.exports = Injector;
describe('Injector', () => {
const fakeBucket = 'fake';
const logger = new werelogs.Logger('Injector');
const memBackend = new MetadataWrapper(
'mem', {}, null, logger);
before(done => {
memBackend.createBucket(fakeBucket, fakeBucketInfo, logger, done);
});
after(done => {
memBackend.deleteBucket(fakeBucket, logger, done);
});
it('zeropad', () => {
const injector = new Injector(memBackend, logger);
assert(injector._zeroPad('foo', 42, 10) === 'foo0000000042');
});
it('inject inputKeys', done => {
const injector = new Injector(memBackend, logger);
const inputKeys = ['foo1', 'foo2', 'foo3'];
const outputKeys = [];
injector.inject(
fakeBucket,
{
op: injector.opPut,
numKeys: 3,
},
inputKeys,
outputKeys,
null,
err => {
if (err) {
return done(err);
}
assert.deepEqual(outputKeys, ['foo2', 'foo1', 'foo3']);
return done();
});
});
it('inject sequence', done => {
const injector = new Injector(memBackend, logger);
const outputKeys = [];
injector.inject(
fakeBucket,
{
prefix: 'foo',
suffix: 'x',
randomSeq: true,
maxSeq: 10,
op: injector.opPut,
numKeys: 3,
},
null,
outputKeys,
null,
err => {
if (err) {
return done(err);
}
assert.deepEqual(
outputKeys,
[
'foo0000000005x',
'foo0000000001x',
'foo0000000000x',
]);
return done();
});
});
it('inject random', done => {
const injector = new Injector(memBackend, logger);
const outputKeys = [];
const outputValues = [];
injector.inject(
fakeBucket,
{
prefix: 'foo',
suffix: 'x',
randomKey: true,
maxKeyLen: 10,
op: injector.opPut,
numKeys: 3,
},
null,
outputKeys,
outputValues,
err => {
if (err) {
return done(err);
}
assert.deepEqual(
outputKeys,
[
'f5TJ6X',
'7T',
'LStNJxHS8',
]);
assert.deepEqual(
outputValues,
[
{
'content-length': 3009012,
'content-md5': '60f7abfdc5855e00a6e6dc2918bda7a8',
'versionId': 'f13938435117885f7f88d7636a3b238e',
},
{
'content-length': 3984521,
'content-md5': '7cfc7e8b82826b83e302d18fa0e24b12',
'versionId': 'f7f5c4973bc353ad8a9d5084fc1f0dc3',
},
{
'content-length': 4112702,
'content-md5': '6d4fe78b11cfa4ff7b2efaba5c5965fe',
'versionId': '5540954e05a7910abeb72b8393c93afe',
},
]);
return done();
});
});
});

View File

@ -0,0 +1,216 @@
const async = require('async');
const fakeBucketInfo = require('../FakeBucketInfo.json');
const MetadataWrapper = require('../../../../../lib/storage/metadata/MetadataWrapper');
const BucketdOplogInterface = require('../../../../../lib/storage/metadata/oplog/BucketdOplogInterface');
const PersistMemInterface = require('../../../../../lib/storage/metadata/oplog/PersistMemInterface');
const Injector = require('../Injector');
const http = require('http');
const url = require('url');
const werelogs = require('werelogs');
werelogs.configure({
level: 'info',
dump: 'error',
});
class PersistDataInterface {
constructor() {
this.data = null;
}
initState(cb) {
this.data = {};
return process.nextTick(cb);
}
loadState(stream, cb) {
const chunks = [];
stream.on('data', chunk => {
chunks.push(chunk);
});
stream.on('end', () => {
this.data = JSON.parse(Buffer.concat(chunks));
return process.nextTick(cb);
});
}
saveState(stream, cb) {
stream.write(JSON.stringify(this.data));
stream.end();
return process.nextTick(cb);
}
updateState(addQueue, deleteQueue, cb) {
return process.nextTick(cb);
}
}
describe('BucketdOplogInterface', () => {
const logger = new werelogs.Logger('BucketOplogInterface');
const fakePort = 9090;
const fakeBucket = 'fake';
const fakeRaftId = 2;
const numObjs = 20000;
const fakeCseq = 20001;
let oplogInjected = false;
const numOplogSeqs = 100;
const oplogBatchSize = 2;
const endCseq = fakeCseq + numOplogSeqs;
const maxLimit = 2;
const oplogKeys = [];
const oplogValues = [];
let oplogKeysIdx = 0;
const memBackend = new MetadataWrapper(
'mem', {}, null, logger);
const injector = new Injector(memBackend, logger);
const requestListener = (req, res) => {
const _url = url.parse(req.url, true);
if (_url.pathname === `/_/buckets/${fakeBucket}`) {
res.writeHead(200);
res.end(JSON.stringify(
{
raftSessionId: fakeRaftId,
creating: false,
deleting: false,
version: 0,
}));
} else if (_url.pathname === `/_/raft_sessions/${fakeRaftId}/log`) {
const begin = _url.query.begin;
const limit = _url.query.limit;
if (begin === '1' && limit === '1') {
res.writeHead(200);
res.end(JSON.stringify(
{
info: {
start: 1,
cseq: fakeCseq,
prune: 1,
},
}));
} else {
const realLimit = Math.min(limit, maxLimit);
async.until(
() => oplogInjected,
next => {
// inject similar but different random objects
injector.inject(
fakeBucket,
{
numKeys: numOplogSeqs * oplogBatchSize,
maxSeq: numObjs,
op: injector.opPut,
randomSeq: true,
prefix: 'obj_',
suffix: '_bis',
},
null,
oplogKeys,
oplogValues,
err => {
if (err) {
return next(err);
}
oplogInjected = true;
return next();
});
}, err => {
if (err) {
res.writeHead(404);
res.end('error', err);
return undefined;
}
if (begin < endCseq) {
res.writeHead(200);
const resp = {};
resp.info = {
start: begin,
cseq: endCseq,
prune: 1,
};
resp.log = [];
for (let i = 0; i < realLimit; i++) {
resp.log[i] = {};
resp.log[i].db = fakeBucket;
resp.log[i].method = 8;
resp.log[i].entries = [];
for (let j = 0; j < oplogBatchSize; j++) {
resp.log[i].entries[j] = {};
resp.log[i].entries[j].key = oplogKeys[oplogKeysIdx];
resp.log[i].entries[j].value = oplogValues[oplogKeysIdx];
oplogKeysIdx++;
}
}
res.end(JSON.stringify(resp));
}
return undefined;
});
}
} else if (_url.pathname === `/default/bucket/${fakeBucket}`) {
const marker = _url.query.marker === '' ? null : _url.query.marker;
const maxKeys = parseInt(_url.query.maxKeys, 10);
memBackend.listObjects(fakeBucket, {
listingType: 'Delimiter',
marker,
maxKeys,
}, (err, result) => {
if (err) {
res.writeHead(404);
res.end('error', err);
return undefined;
}
res.writeHead(200);
res.end(JSON.stringify(result));
return undefined;
});
}
};
before(done => {
const server = http.createServer(requestListener);
server.listen(fakePort);
async.waterfall([
next => memBackend.createBucket(fakeBucket, fakeBucketInfo, logger, next),
next => injector.inject(
fakeBucket,
{
numKeys: numObjs,
maxSeq: numObjs,
op: injector.opPut,
randomSeq: false,
prefix: 'obj_',
suffix: '',
},
null,
null,
null,
next),
], done);
});
after(done => {
memBackend.deleteBucket(fakeBucket, logger, done);
});
it('simulation', done => {
const params = {
bootstrap: [`localhost:${fakePort}`],
persist: new PersistMemInterface(),
persistData: new PersistDataInterface(),
stopAt: numObjs + numOplogSeqs,
};
const bucketdOplog = new BucketdOplogInterface(params);
bucketdOplog.start(
{
filterName: fakeBucket,
filterType: 'bucket',
bucket: {
bucketName: fakeBucket,
},
},
done);
});
});

View File

@ -0,0 +1,55 @@
const assert = require('assert');
const PersistMemInterface = require('../../../../../lib/storage/metadata/oplog/PersistMemInterface');
class PersistDataInterface {
constructor(obj) {
this.obj = obj;
}
loadState(stream, cb) {
const chunks = [];
stream.on('data', chunk => {
chunks.push(chunk);
});
stream.on('end', () => {
this.obj = JSON.parse(Buffer.concat(chunks));
return cb();
});
}
saveState(stream, cb) {
stream.write(JSON.stringify(this.obj));
stream.end();
return cb();
}
}
describe('Persist Mem', () => {
const persist = new PersistMemInterface();
const filterName = 'foo';
it('basic operations', done => {
const pd1 = new PersistDataInterface({
foo: 'bar',
bar: {
qux: 42,
quuux: false,
},
});
const pd2 = new PersistDataInterface();
persist.save(filterName, pd1, 42, err => {
if (err) {
return done(err);
}
persist.load(filterName, pd2, err => {
if (err) {
return done(err);
}
assert.deepEqual(pd1.obj, pd2.obj);
return done();
});
return undefined;
});
});
});

116
tools/oplog-cli.js Normal file
View File

@ -0,0 +1,116 @@
/* eslint-disable no-console */
const fs = require('fs');
const { ArgumentParser } = require('argparse');
const BucketdOplogInterface = require('../lib/storage/metadata/oplog/BucketdOplogInterface');
const MongoOplogInterface = require('../lib/storage/metadata/oplog/MongoOplogInterface');
const PersistMemInterface = require('../lib/storage/metadata/oplog/PersistMemInterface');
const PersistFileInterface = require('../lib/storage/metadata/oplog/PersistFileInterface');
const PersistRingInterface = require('../lib/storage/metadata/oplog/PersistRingInterface');
const parser = new ArgumentParser({
description: 'Oplog CLI tool',
});
parser.add_argument('-v', '--verbose', { action: 'store_true' });
parser.add_argument('-c', '--config-file', { help: 'config file' });
parser.add_argument('--oplog-interface', { help: 'bucketd|mongo' });
parser.add_argument('--persist', { help: 'mem|file|ring' });
parser.add_argument('--bucket', { help: 'bucket' });
parser.add_argument('--start', { action: 'store_true' });
const args = parser.parse_args();
class PersistDataInterface {
constructor() {
this.data = null;
}
initState(cb) {
this.data = {};
return process.nextTick(cb);
}
loadState(stream, cb) {
const chunks = [];
stream.on('data', chunk => {
chunks.push(chunk);
});
stream.on('end', () => {
this.data = JSON.parse(Buffer.concat(chunks));
return process.nextTick(cb);
});
}
saveState(stream, cb) {
stream.write(JSON.stringify(this.data));
stream.end();
return process.nextTick(cb);
}
updateState(addQueue, deleteQueue, cb) {
console.log('addQueue', addQueue, 'deleteQueue', deleteQueue);
return process.nextTick(cb);
}
}
let config = {};
if (args.config_file !== undefined) {
config = JSON.parse(fs.readFileSync(args.config_file, 'utf8'));
}
let persist;
if (args.persist === 'mem') {
persist = new PersistMemInterface(config.persistMem);
} else if (args.persist === 'file') {
persist = new PersistFileInterface(config.persistFile);
} else if (args.persist === 'ring') {
persist = new PersistRingInterface(config.persistRing);
} else {
console.error(`invalid persist ${args.persist}`);
process.exit(1);
}
let params = {
persist,
persistData: new PersistDataInterface(),
};
let oplogInterface;
if (args.oplog_interface === 'bucketd') {
params = Object.assign(params, config.bucketdOplog);
oplogInterface = new BucketdOplogInterface(params);
} else if (args.oplog_interface === 'mongo') {
params = Object.assign(params, config.mongoOplog);
oplogInterface = new MongoOplogInterface(params);
} else {
console.error(`invalid oplog-interface ${args.oplog_interface}`);
process.exit(1);
}
if (args.start) {
if (args.bucket === undefined) {
console.error('please provide bucket');
process.exit(1);
}
oplogInterface.start(
{
filterName: args.bucket,
filterType: 'bucket',
bucket: {
bucketName: args.bucket,
},
},
err => {
if (err) {
console.error(err);
process.exit(1);
}
console.log('exiting...');
return;
});
} else {
console.error('please provide an option');
process.exit(1);
}

View File

@ -1682,6 +1682,11 @@ async@^3.2.0:
resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9"
integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g==
async@~0.2.7:
version "0.2.10"
resolved "https://registry.yarnpkg.com/async/-/async-0.2.10.tgz#b6bbe0b0674b9d719708ca38de8c237cb526c3d1"
integrity sha1-trvgsGdLnXGXCMo43owjfLUmw9E=
async@~2.1.5:
version "2.1.5"
resolved "https://registry.yarnpkg.com/async/-/async-2.1.5.tgz#e587c68580994ac67fc56ff86d3ac56bdbe810bc"
@ -4895,6 +4900,11 @@ memory-pager@^1.0.2:
resolved "https://registry.yarnpkg.com/memory-pager/-/memory-pager-1.5.0.tgz#d8751655d22d384682741c972f2c3d6dfa3e66b5"
integrity sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==
memorystream@^0.3.1:
version "0.3.1"
resolved "https://registry.yarnpkg.com/memorystream/-/memorystream-0.3.1.tgz#86d7090b30ce455d63fbae12dda51a47ddcaf9b2"
integrity sha1-htcJCzDORV1j+64S3aUaR93K+bI=
merge-stream@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/merge-stream/-/merge-stream-2.0.0.tgz#52823629a14dd00c9770fb6ad47dc6310f2c1f60"
@ -5203,6 +5213,14 @@ node-releases@^2.0.2:
resolved "https://registry.yarnpkg.com/node-releases/-/node-releases-2.0.2.tgz#7139fe71e2f4f11b47d4d2986aaf8c48699e0c01"
integrity sha512-XxYDdcQ6eKqp/YjI+tb2C5WM2LgjnZrfYg4vgQt49EK268b6gYCHsBLrK2qvJo4FmCtqmKezb0WZFK4fkrZNsg==
node-zookeeper-client@^0.2.2:
version "0.2.3"
resolved "https://registry.yarnpkg.com/node-zookeeper-client/-/node-zookeeper-client-0.2.3.tgz#48c79129c56b8e898df9bd3bdad9e27dcad63851"
integrity sha512-V4gVHxzQ42iwhkANpPryzfjmqi3Ql3xeO9E/px7W5Yi774WplU3YtqUpnvcL/eJit4UqcfuLOgZLkpf0BPhHmg==
dependencies:
async "~0.2.7"
underscore "~1.4.4"
nopt@^5.0.0:
version "5.0.0"
resolved "https://registry.yarnpkg.com/nopt/-/nopt-5.0.0.tgz#530942bb58a512fccafe53fe210f13a25355dc88"
@ -5508,6 +5526,11 @@ pluralize@^1.2.1:
resolved "https://registry.yarnpkg.com/pluralize/-/pluralize-1.2.1.tgz#d1a21483fd22bb41e58a12fa3421823140897c45"
integrity sha1-0aIUg/0iu0HlihL6NCGCMUCJfEU=
prando@^6.0.1:
version "6.0.1"
resolved "https://registry.yarnpkg.com/prando/-/prando-6.0.1.tgz#ffa8de84c2adc4975dd9df37ae4ada0458face53"
integrity sha512-ghUWxQ1T9IJmPu6eshc3VU0OwveUtXQ33ZLXYUcz1Oc5ppKLDXKp0TBDj6b0epwhEctzcQSNGR2iHyvQSn4W5A==
prelude-ls@~1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/prelude-ls/-/prelude-ls-1.1.2.tgz#21932a549f5e52ffd9a827f570e04be62a97da54"
@ -6726,6 +6749,11 @@ unbzip2-stream@^1.0.9:
buffer "^5.2.1"
through "^2.3.8"
underscore@~1.4.4:
version "1.4.4"
resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.4.4.tgz#61a6a32010622afa07963bf325203cf12239d604"
integrity sha1-YaajIBBiKvoHljvzJSA88SI51gQ=
underscore@~1.8.3:
version "1.8.3"
resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.8.3.tgz#4f3fb53b106e6097fcf9cb4109f2a5e9bdfa5022"