Compare commits

...

47 Commits

Author SHA1 Message Date
Dimitrios Vasilas bd4487eb07 change file tree 2017-07-06 10:45:28 +02:00
Dimitrios Vasilas 97f4d02a15 up to date with master 2017-05-31 15:45:08 +02:00
Dimitrios Vasilas 6a543567c4 add npm scripts to deploy and stop multiple nodes 2017-05-30 12:20:02 +02:00
Dimitrios Vasilas 2dc0618c80 add antidote js client wrapper 2017-05-22 17:37:25 +02:00
Dimitrios Vasilas 1dde0aceff remove redundant connection 2017-04-28 15:40:40 +02:00
Dimitrios Vasilas 565d027c7d add antidote transaction API source code 2017-04-28 14:34:31 +02:00
Dimitrios Vasilas c3e20cecce update: dynamic antidote transactions 2017-04-28 14:32:21 +02:00
Dimitrios Vasilas 9b4ea5f61c add antidote transaction API 2017-04-03 16:09:14 +02:00
Dimitrios Vasilas 913103cb15 refactor index update 2017-01-26 18:21:10 +01:00
Dimitrios Vasilas 539e025895 improve metadata representation 2017-01-26 16:16:38 +01:00
Dimitrios Vasilas 34f37c15dc move query to list prefix 2017-01-19 11:45:43 +01:00
Dimitrios Vasilas 8eabfb035e implement query distribution to index servers 2017-01-18 17:24:16 +01:00
Dimitrios Vasilas 8f274252ce replex architecture setup 2017-01-12 15:10:50 +01:00
Dimitrios Vasilas e27cb1cf6c improve metadata crdt representation 2017-01-10 18:16:19 +01:00
Dimitrios Vasilas 9c8c8fd9ea merge with updates to master 2016-12-25 12:24:25 +02:00
Dimitrios Vasilas a236fd4457 add wrapper for put to db 2016-12-23 17:13:10 +01:00
Dimitrios Vasilas e76472fa20 fix range search 2016-12-23 15:15:30 +01:00
Dimitrios Vasilas e6f82079c6 fix range search bug 2016-12-22 17:39:40 +01:00
Dimitrios Vasilas cfb77cc9fe different buckets for index and metadata 2016-12-21 18:24:59 +01:00
Dimitrios Vasilas 17a9cd1071 implement logical operators in antidote backend 2016-12-21 16:11:30 +01:00
Dimitrios Vasilas 8ce70e7337 add range search 2016-12-20 18:32:27 +01:00
Dimitrios Vasilas cdbcae37b2 revert levedb integer indexing 2016-12-16 16:22:18 +01:00
Dimitrios Vasilas 0f5cd3e545 improve antitode integration 2016-12-16 15:52:08 +01:00
Dimitrios Vasilas f6cddaf6a5 add antidote metadata backend 2016-12-14 12:00:45 +01:00
Dimitrios Vasilas c225e483bc add Antidotedb indexing backend 2016-12-09 15:28:22 +01:00
Dimitrios Vasilas 53cb8a3263 change communication scheme 2016-12-02 12:54:34 +01:00
Dimitrios Vasilas d302070959 add indexing server configuration 2016-11-29 17:39:40 +01:00
Dimitrios Vasilas 7597c379e7 improve client/server system implementation 2016-11-28 18:48:04 +01:00
Dimitrios Vasilas 541e97f1ea change index representation in database 2016-11-25 18:34:02 +01:00
Dimitrios Vasilas c32fe7df51 bug fixes 2016-11-16 19:26:40 +01:00
Dimitrios Vasilas 9e301aaf25 add regexp search on tags 2016-10-19 13:20:48 +02:00
Dimitrios Vasilas eb441dd09c fix range queries 2016-10-14 15:12:53 +02:00
Dimitrios Vasilas 6fddd95a1d change filesize indexing 2016-10-13 18:33:57 +02:00
Dimitrios Vasilas 1303bd8b97 add delete functionality 2016-10-13 17:13:42 +02:00
Dimitrios Vasilas 093c071836 implement bitmapd functionality 2016-10-05 01:05:27 +02:00
Dimitrios Vasilas da115e6ce4 various fixes 2016-08-30 10:35:37 +02:00
Dimitrios Vasilas b588d5e4d0 fix lint errors 2016-08-25 12:00:51 +02:00
Dimitrios Vasilas 5f1bd15fb0 merge indexes in single object 2016-08-24 15:15:58 +02:00
Dimitrios Vasilas 22a783f23e add acl indexing 2016-08-23 16:28:46 +02:00
Dimitrios Vasilas 320fc256bf implement last content type index 2016-08-22 17:06:05 +02:00
Dimitrios Vasilas 361585825f implement last modification date index 2016-08-22 14:36:05 +02:00
Dimitrios Vasilas e390e355ef add file size indexing 2016-08-19 15:56:42 +02:00
Dimitrios Vasilas f03940e45e add config option to enable indexing of x-amz-meta headers 2016-08-18 14:20:02 +02:00
Dimitrios Vasilas e67abad697 tag indexing (integer optimization) 2016-08-18 13:06:42 +02:00
Dimitrios Vasilas 9a42556448 implement index functionality 2016-08-14 21:38:05 +02:00
Dimitrios Vasilas 3056714c02 implement object to rowid mechanism 2016-08-11 17:39:04 +02:00
Dimitrios Vasilas d871755296 add dimitriosvasilas/node-bitmap-ewah to package.json 2016-08-02 16:44:02 +02:00
18 changed files with 1412 additions and 6 deletions

View File

@ -48,6 +48,13 @@
"logLevel": "info", "logLevel": "info",
"dumpLevel": "error" "dumpLevel": "error"
}, },
"antidote": {
"host": "localhost",
"port": 8087
},
"userMetaIndexing":true,
"systemMetaIndexing":true,
"indexServerPort":7000,
"healthChecks": { "healthChecks": {
"allowFrom": ["127.0.0.1/8", "::1"] "allowFrom": ["127.0.0.1/8", "::1"]
}, },

7
deploy_instances.sh Executable file
View File

@ -0,0 +1,7 @@
#!/bin/bash
for i in $(seq 1 $1);
do
node ./portConfig.js
npm run antidote_backend &
sleep 3
done

View File

@ -276,7 +276,7 @@ class Config {
let metadata = 'file'; let metadata = 'file';
let kms = 'file'; let kms = 'file';
if (process.env.S3BACKEND) { if (process.env.S3BACKEND) {
const validBackends = ['mem', 'file', 'scality']; const validBackends = ['mem', 'file', 'scality', 'antidote'];
assert(validBackends.indexOf(process.env.S3BACKEND) > -1, assert(validBackends.indexOf(process.env.S3BACKEND) > -1,
'bad environment variable: S3BACKEND environment variable ' + 'bad environment variable: S3BACKEND environment variable ' +
'should be one of mem/file/scality' 'should be one of mem/file/scality'
@ -289,7 +289,11 @@ class Config {
if (process.env.S3VAULT) { if (process.env.S3VAULT) {
auth = process.env.S3VAULT; auth = process.env.S3VAULT;
} }
if (auth === 'file' || auth === 'mem') { if (data === 'antidote') {
data = 'mem';
kms = 'mem';
}
if (auth === 'file' || auth === 'mem' || auth === 'antidote') {
// Auth only checks for 'mem' since mem === file // Auth only checks for 'mem' since mem === file
auth = 'mem'; auth = 'mem';
let authfile = `${__dirname}/../conf/authdata.json`; let authfile = `${__dirname}/../conf/authdata.json`;
@ -331,6 +335,32 @@ class Config {
dataPath, dataPath,
metadataPath, metadataPath,
}; };
this.antidote = {};
if (config.antidote) {
if (config.antidote.port !== undefined) {
assert(Number.isInteger(config.antidote.port)
&& config.antidote.port > 0,
'bad config: vaultd port must be a positive integer');
this.antidote.port = config.antidote.port;
}
if (config.antidote.host !== undefined) {
assert.strictEqual(typeof config.vaultd.host, 'string',
'bad config: vaultd host must be a string');
this.antidote.host = config.antidote.host;
}
}
this.userMetaIndexing = false;
if (config.userMetaIndexing !== undefined) {
this.userMetaIndexing = config.userMetaIndexing;
}
this.systemMetaIndexing = false;
if (config.systemMetaIndexing !== undefined) {
this.systemMetaIndexing = config.systemMetaIndexing;
}
this.indexServerPort = 7000;
if (config.indexServerPort !== undefined) {
this.indexServerPort = config.indexServerPort;
}
return config; return config;
} }
} }

View File

@ -0,0 +1,135 @@
'use strict';
let startTx = (() => {
var _ref = _asyncToGenerator(function* (antidote, cb) {
let tx = yield antidote.startTransaction();
return cb(tx);
});
return function startTx(_x, _x2) {
return _ref.apply(this, arguments);
};
})();
let commitTx = (() => {
var _ref2 = _asyncToGenerator(function* (tx, cb) {
yield tx.commit();
return cb();
});
return function commitTx(_x3, _x4) {
return _ref2.apply(this, arguments);
};
})();
let readSetTx = (() => {
var _ref3 = _asyncToGenerator(function* (tx, setKey, cb) {
let set = tx.set(setKey);
let result = yield set.read();
return cb(tx, result);
});
return function readSetTx(_x5, _x6, _x7) {
return _ref3.apply(this, arguments);
};
})();
let updateSetTx = (() => {
var _ref4 = _asyncToGenerator(function* (tx, setKey, keys, cb) {
let set = tx.set(setKey);
let ops = [];
for (let i = 0; i < keys.length; i++) {
ops.push(set.add(keys[i]));
}
yield tx.update(ops);
return cb(tx);
});
return function updateSetTx(_x8, _x9, _x10, _x11) {
return _ref4.apply(this, arguments);
};
})();
let removeSetTx = (() => {
var _ref5 = _asyncToGenerator(function* (tx, setKey, keys, cb) {
let set = tx.set(setKey);
let ops = [];
for (let i = 0; i < keys.length; i++) {
ops.push(set.remove(keys[i]));
}
yield tx.update(ops);
return cb(tx);
});
return function removeSetTx(_x12, _x13, _x14, _x15) {
return _ref5.apply(this, arguments);
};
})();
let readMapTx = (() => {
var _ref6 = _asyncToGenerator(function* (tx, mapKey, cb) {
let map = tx.map(mapKey);
let result = yield map.read();
result = result.toJsObject();
return cb(tx, result);
});
return function readMapTx(_x16, _x17, _x18) {
return _ref6.apply(this, arguments);
};
})();
let updateMapRegisterTx = (() => {
var _ref7 = _asyncToGenerator(function* (tx, mapKey, keys, values, cb) {
let map = tx.map(mapKey);
let ops = [];
for (let i = 0; i < keys.length; i++) {
ops.push(map.register(keys[i]).set(values[i]));
}
yield tx.update(ops);
return cb(tx);
});
return function updateMapRegisterTx(_x19, _x20, _x21, _x22, _x23) {
return _ref7.apply(this, arguments);
};
})();
let removeMapRegisterTx = (() => {
var _ref8 = _asyncToGenerator(function* (tx, mapKey, keys, cb) {
let map = tx.map(mapKey);
let ops = [];
for (let i = 0; i < keys.length; i++) {
ops.push(map.remove(map.register(keys[i])));
}
yield tx.update(ops);
return cb(tx);
});
return function removeMapRegisterTx(_x24, _x25, _x26, _x27) {
return _ref8.apply(this, arguments);
};
})();
function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
var antidoteClient = require('antidote_ts_client');
function connect(port, host) {
return antidoteClient.connect(port, host);
}
function setBucket(antidote, bucket) {
return antidote.defaultBucket = bucket;
}
exports.connect = connect;
exports.setBucket = setBucket;
exports.startTx = startTx;
exports.commitTx = commitTx;
exports.readMapTx = readMapTx;
exports.readSetTx = readSetTx;
exports.updateMapRegisterTx = updateMapRegisterTx;
exports.removeMapRegisterTx = removeMapRegisterTx;
exports.removeSetTx = removeSetTx;
exports.updateSetTx = updateSetTx;

View File

@ -0,0 +1,83 @@
var antidoteClient = require('antidote_ts_client')
function connect(port, host) {
return antidoteClient.connect(port, host);
}
function setBucket(antidote, bucket) {
return antidote.defaultBucket = bucket;
}
async function startTx(antidote, cb) {
let tx = await antidote.startTransaction()
return cb(tx)
}
async function commitTx(tx, cb) {
await tx.commit()
return cb();
}
async function readSetTx(tx, setKey, cb) {
let set = tx.set(setKey);
let result = await set.read();
return cb(tx, result)
}
async function updateSetTx(tx, setKey, keys, cb) {
let set = tx.set(setKey);
let ops = [];
for (let i = 0; i < keys.length; i++) {
ops.push(set.add(keys[i]));
}
await tx.update(ops);
return cb(tx);
}
async function removeSetTx(tx, setKey, keys, cb) {
let set = tx.set(setKey);
let ops = [];
for (let i = 0; i < keys.length; i++) {
ops.push(set.remove(keys[i]));
}
await tx.update(ops);
return cb(tx);
}
async function readMapTx(tx, mapKey, cb) {
let map = tx.map(mapKey);
let result = await map.read();
result = result.toJsObject();
return cb(tx, result)
}
async function updateMapRegisterTx(tx, mapKey, keys, values, cb) {
let map = tx.map(mapKey);
let ops = [];
for (let i = 0; i < keys.length; i++) {
ops.push(map.register(keys[i]).set(values[i]))
}
await tx.update(ops);
return cb(tx)
}
async function removeMapRegisterTx(tx, mapKey, keys, cb) {
let map = tx.map(mapKey);
let ops = [];
for (let i = 0; i < keys.length; i++) {
ops.push(map.remove(map.register(keys[i])))
}
await tx.update(ops);
return cb(tx);
}
exports.connect = connect;
exports.setBucket = setBucket;
exports.startTx = startTx;
exports.commitTx = commitTx;
exports.readMapTx = readMapTx;
exports.readSetTx = readSetTx;
exports.updateMapRegisterTx = updateMapRegisterTx;
exports.removeMapRegisterTx = removeMapRegisterTx;
exports.removeSetTx = removeSetTx;
exports.updateSetTx = updateSetTx;

View File

@ -1,4 +1,5 @@
import querystring from 'querystring'; import querystring from 'querystring';
import indexClient from '../indexClient/indexClient';
import constants from '../../constants'; import constants from '../../constants';
import services from '../services'; import services from '../services';
@ -65,11 +66,17 @@ export default function bucketGet(authInfo, request, log, callback) {
requestType: 'bucketGet', requestType: 'bucketGet',
log, log,
}; };
let query = null;
if (params.prefix) {
query = params.prefix.split("//")[1];
params.prefix = params.prefix.split("//")[0];
}
const listParams = { const listParams = {
maxKeys: actualMaxKeys, maxKeys: actualMaxKeys,
delimiter: params.delimiter, delimiter: params.delimiter,
marker: params.marker, marker: params.marker,
prefix: params.prefix, prefix: params.prefix,
query: indexClient.processQueryHeader(query),
}; };
services.metadataValidateAuthorization(metadataValParams, err => { services.metadataValidateAuthorization(metadataValParams, err => {
@ -83,6 +90,7 @@ export default function bucketGet(authInfo, request, log, callback) {
log.debug('error processing request', { error: err }); log.debug('error processing request', { error: err });
return callback(err); return callback(err);
} }
listParams.prefix = params.prefix;
const xml = []; const xml = [];
xml.push( xml.push(
'<?xml version="1.0" encoding="UTF-8"?>', '<?xml version="1.0" encoding="UTF-8"?>',

View File

@ -0,0 +1,128 @@
import http from "http"
import io from "socket.io"
import metadata from "../metadata/wrapper.js"
const _ = require("underscore")
let iosock;
let queries = [];
function intersection(a, b) {
return _.intersection(a, b);
}
function union(a, b) {
return _.union(a, b);
}
function difference(u, a) {
return _.difference(u, a);
}
function proccessLogicalOps(terms) {
let i;
while (terms.length > 1) {
for (i = terms.length - 1; i >= 0; i--) {
if (terms[i] === "op/AND"
|| terms[i] === "op/OR"
|| terms[i] === "op/NOT") {
break;
}
}
if (terms[i] === "op/AND") {
const op1 = terms[i + 1];
const op2 = terms[i + 2];
terms.splice(i, 3, intersection(op1, op2));
} else if (terms[i] === "op/OR") {
const op1 = terms[i + 1];
const op2 = terms[i + 2];
terms.splice(i, 3, union(op1, op2));
}
}
return terms[0];
}
export default {
createPublisher(port, log) {
const server = http.createServer(function(req, res){
});
server.listen(port);
iosock = io.listen(server);
iosock.sockets.on("connection", function(socket) {
log.info("indexing server connected")
socket.on("disconnect", function() {
});
socket.on("subscribe", function(room) {
socket.join(room);
});
socket.on("query_response", function(msg) {
for (let i = 0; i < queries.length; i++) {
if (queries[i].id === msg.id) {
queries[i].responses -=1
let term_index = queries[i].query.indexOf(msg.term);
queries[i].query[term_index] = msg.result;
if (queries[i].responses === 0) {
let result = proccessLogicalOps(queries[i].query);
let listingParams = queries[i].listingParams;
queries.splice(i, 1)
metadata.respondQueryGetMD(result, listingParams);
break;
}
}
}
});
});
},
putObjectMD(bucketName, objName, objVal) {
let msg = {
bucketName,
objName,
objVal
};
if (iosock.sockets.adapter.rooms["put"]) {
iosock.sockets.to("put").emit("put", msg);
}
},
listObject(bucketName, listingParams, cb) {
const id = Math.random().toString(36).substring(7);
let pending_query = {
id,
query: listingParams.query,
responses : 0,
listingParams
}
pending_query.listingParams.bucketName = bucketName;
pending_query.listingParams.cb = cb;
listingParams.query.forEach(term => {
if (term.indexOf("op/") === -1) {
pending_query.responses += 1
iosock.sockets.to(term.split("/")[0]).emit("query", {
id,
term,
bucketName
});
}
});
queries.push(pending_query);
},
deleteObjectMD(bucketName, objName) {
iosock.sockets.to("deletes").emit("delete", {
bucketName,
objName
});
},
processQueryHeader(header) {
if (!header) {
return header;
}
const queryTerms = header.split("&");
const query = [];
for (let i = 0; i < queryTerms.length; i++) {
query.push(queryTerms[i]);
}
return query;
}
}

View File

@ -0,0 +1,14 @@
{
"S3":
[
{
"host": "127.0.0.1",
"port": 7000,
"index": ["tags", "size"]
}
],
"antidote": {
"host": "127.0.0.1",
"port": 8187
}
}

12
lib/indexServer/index.js Normal file
View File

@ -0,0 +1,12 @@
'use strict';
const fs = require('fs');
let config = fs.readFileSync('./config.json', { encoding: 'utf-8' });
config = JSON.parse(config);
for (let i=0; i<config.S3.length; i++) {
require('./lib/indexUtils.js').default.connectToS3(config.S3[i].host, config.S3[i].port, config.S3[i].index);
}
require('./lib/indexUtils.js').default.connectToDB();

View File

@ -0,0 +1,406 @@
const indexd = require("./indexUtils").default;
const config = require("./indexUtils").config;
const async = require("async");
const _ = require("underscore")
const antidoteCli = require('../../antidoteNodeCli/lib/api.js');
const indexedAttributes = [];
config.S3.forEach(connector => {
connector.index.forEach(tag => {
indexedAttributes.push(tag)
});
})
function padLeft(nr, n){
return Array(n-String(nr).length+1).join("0")+nr;
}
function binaryIndexOf(arr, searchElement, op) {
let minIndex = 0;
let maxIndex = arr.length - 1;
let currentIndex;
let currentElement;
while (minIndex <= maxIndex) {
currentIndex = (minIndex + maxIndex) / 2 | 0;
currentElement = arr[currentIndex];
if (currentElement < searchElement) {
minIndex = currentIndex + 1;
}
else if (currentElement > searchElement) {
maxIndex = currentIndex - 1;
}
else {
break;
}
}
if (op === "=") {
return currentIndex;
} else if (op === ">" && arr[currentIndex] <= searchElement) {
currentIndex += 1
} else if (op === ">=" && arr[currentIndex] < searchElement) {
currentIndex += 1
} else if (op === "<" && arr[currentIndex] >= searchElement) {
currentIndex -= 1
} else if (op === "<=") {
if (arr[currentIndex] > searchElement) {
currentIndex -= 1
}
}
if (currentIndex > arr.length-1 || currentIndex < 0) {
return -1;
}
return currentIndex;
}
function storeToBitmap(stored) {
const bm = bitmap.createObject();
if (stored) {
stored[2] = new Buffer(stored[2], "binary");
bm.read(stored);
}
return bm;
}
function parseNotOperator(result, not, callback) {
if (not) {
callback(null, result.not());
} else {
callback(null, result);
}
}
function updateObjectMapping(objMapping, objName) {
let rowId;
if (Object.keys(objMapping)) {
if (typeof objMapping.mapping[objName] === "number") {
rowId = objMapping.mapping[objName];
} else if (objMapping.nextAvail.length > 0) {
rowId = objMapping.nextAvail[0];
objMapping.nextAvail.splice(0, 1);
} else {
rowId = objMapping.length;
objMapping.length += 1;
}
}
objMapping.mapping[rowId] = objName;
objMapping.mapping[objName] = rowId;
return rowId;
}
function getObjectMeta(bucketName, cb) {
indexd.readAntidoteSet(`${bucketName}`, (err, allObjects) => {
return cb(allObjects);
});
}
function updateIndexMeta(bucketName, objName, tx, cb) {
indexd.writeIndex(`${bucketName}`, objName, tx, (tx) => {
return cb(tx);
});
}
function filterRemoved(results, params, cb) {
indexd.readAntidoteSet(`${params.bucketName}/removed`, (err, removed) => {
results = results.filter(elem => {
return removed.indexOf(elem) === -1;
});
return cb(results);
});
}
function constructRange(value, callback) {
indexd.readAntidoteSet(value, (err, result) => {
callback(null, result);
});
}
function searchIntRange(bucketName, op, term, not, callback) {
term = term.replace("--integer", "");
const attr = term.split("/")[0];
let value = parseInt(term.split("/")[1], 10);
if (op === "=") {
readDB(`${bucketName}/${attr}/${value}`, (err, data) =>{
callback(err, data);
});
} else {
if (config.backend === "antidote") {
indexd.readAntidoteSet(`${bucketName}/${attr}`, (err, result) => {
const range = []
const index = binaryIndexOf(result, value, op)
if (index === -1) {
return parseNotOperator([], not, callback);
}
if (op.indexOf(">") !== -1) {
if (op.indexOf("=") === -1) {
value += 1
}
for (let i = index; i < result.length; i+=1) {
range.push(`${bucketName}/${attr}/${result[i]}`)
}
} else if (op.indexOf("<") !== -1) {
if (op.indexOf("=") === -1) {
value -= 1
}
for (let i = index; i >= 0; i-=1) {
range.push(`${bucketName}/${attr}/${result[i]}`)
}
}
const objRange = [];
async.map(range, constructRange, function(err, res) {
res.forEach(arr => {
arr.forEach(elem => {
objRange.push(elem)
})
})
objRange.sort();
parseNotOperator(objRange, not, callback);
});
});
}
}
}
function searchRegExp(bucketName, searchTerm, not, callback) {
const regexp = new RegExp(searchTerm);
let result = bitmap.createObject();
indexd.getPrefix(`${bucketName}/tags`, (err, list) => {
list.forEach(elem => {
if (elem.key.indexOf("/") !== -1) {
if (regexp.test(elem.key.substring(11))) {
result = result.or(storeToBitmap(JSON.parse(elem.value)));
}
}
});
parseNotOperator(result, not, callback);
})
}
function readTagIndex(bucketName, searchTerm, not, callback) {
let term = null;
let operator = null;
if (searchTerm.indexOf("--integer") !== -1) {
operator = searchTerm.split("/")[1];
term = searchTerm.replace(operator, "");
term = term.replace("/", "");
searchIntRange(bucketName, operator, term, not, callback);
} else if (searchTerm.indexOf("--regexp") !== -1) {
searchTerm = searchTerm.replace("--regexp", "");
searchTerm = searchTerm.substring(11);
searchRegExp(bucketName, searchTerm, not, callback);
} else {
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
callback(err, data);
});
}
}
function readFileSizeIndex(bucketName, searchTerm, not, callback) {
const operator = searchTerm.split("/")[1];
let term = searchTerm.replace(operator, "");
term = term.replace("/", "");
searchIntRange(bucketName, operator, term, not, callback);
}
function readModDateIndex(bucketName, searchTerm, not, callback) {
console.log(searchTerm);
const operator = searchTerm.split("/")[1];
let term = searchTerm.replace(operator, "");
term = term.replace("/", "");
return searchIntRange(bucketName, operator, term, not, callback);
}
function readACLIndex(bucketName, searchTerm, not, callback) {
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
callback(err, data);
});
}
function readContentTypeIndex(bucketName, searchTerm, not, callback) {
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
callback(err, data);
});
}
function readIndex(bucketName, searchTerm, callback) {
if (searchTerm.indexOf("op/AND") !== -1
|| searchTerm.indexOf("op/OR") !== -1
|| searchTerm.indexOf("op/NOT") !== -1) {
callback(null, searchTerm);
}
let notOperator = false;
let result;
if (searchTerm.indexOf("tags") !== -1) {
return readTagIndex(bucketName, searchTerm, notOperator, callback);
} else if (searchTerm.indexOf("filesize") !== -1) {
return readFileSizeIndex(bucketName, searchTerm, notOperator, callback);
} else if (searchTerm.indexOf("modificationdate") !== -1) {
return readModDateIndex(bucketName, searchTerm, notOperator, callback);
} else if (searchTerm.indexOf("contenttype") !== -1 || searchTerm.indexOf("contentsubtype") !== -1) {
return readContentTypeIndex(bucketName, searchTerm, notOperator, callback);
} else if (searchTerm.indexOf("acl") !== -1) {
return readACLIndex(bucketName, searchTerm, notOperator, callback);
}
return result;
}
function bitmapToStore(bitmap) {
const toStore = bitmap.write();
toStore[2] = toStore[2].toString("binary");
return toStore;
}
function deleteOldEntries(bucketName, rowId, cb) {
return cb();
}
function updateBitmap(bitmap, rowId) {
if (bitmap.length() - 1 <= rowId) {
bitmap.push(rowId);
} else {
bitmap.set(rowId);
}
return bitmapToStore(bitmap);
}
function updateIndexEntry(key, rowId, cb) {
indexd.get(key, (err, data) => {
if (err) {
return (err);
} else {
data = JSON.parse(data)
}
indexd.put(key, JSON.stringify(updateBitmap(storeToBitmap(data), rowId)), err =>{
return (err);
});
});
}
function updateIntIndex(bucketName, objName, attribute, value, cb) {
indexd.writeIndex(`${bucketName}/${attribute}/${value}`, objName, tx, (tx) => {
return cb(tx);
})
}
function updateACLIndex(bucketName, objName, objVal, rowId) {
deleteOldEntries(bucketName, rowId, () => {
Object.keys(objVal).forEach(elem => {
if (typeof objVal[elem] === "string") {
writeIndex(`${bucketName}/acl/${elem}/${objVal[elem]}`, objName, `${bucketName}/acl`, `${elem}/${objVal[elem]}`, rowId, (err) =>{
return ;
});
} else {
objVal[elem].forEach(item => {
writeIndex(`${bucketName}/acl/${elem}/${item}`, objName, `${bucketName}/acl`, `${elem}/${item}`, rowId, (err) =>{
return ;
});
});
}
});
});
}
function updateContentTypeIndex(bucketName, objName, objVal, rowId) {
const type = objVal.split("/")[0];
const subtype = objVal.split("/")[1];
writeIndex(`${bucketName}/contenttype/${type}`, objName, `${bucketName}/contenttype`, type, rowId, (err) =>{
writeIndex(`${bucketName}/contentsubtype/${subtype}`, objName, `${bucketName}/contentsubtype`, subtype, rowId, (err) =>{
return ;
});
});
}
function updateΜodDateIndex(bucketName, objName, objVal, rowId) {
const date = new Date(objVal);
const term = "modificationdate-";
const year = date.getUTCFullYear();
const month = date.getUTCMonth() + 1;
const day = date.getUTCDate();
const hours = date.getUTCHours();
const minutes = date.getUTCMinutes();
updateIntIndex(bucketName, objName, `${term}year`, year, rowId);
updateIntIndex(bucketName, objName, `${term}month`, month, rowId);
updateIntIndex(bucketName, objName, `${term}day`, day, rowId);
updateIntIndex(bucketName, objName, `${term}hours`, hours, rowId);
updateIntIndex(bucketName, objName, `${term}minutes`, minutes, rowId);
}
function updateFileSizeIndex(bucketName, objName, objVal, tx, cb) {
console.log("updateFileSizeIndex", indexedAttributes);
if (indexedAttributes.indexOf("size") === -1) {
console.log("return");
return cb();
}
indexd.writeIndex(`${bucketName}/filesize/${parseInt(objVal, 10)}`, objName, tx, (tx) => {
return cb(tx);
});
}
function updateTagIndex(bucketName, objName, objVal, tx, cb) {
console.log("updateTagIndex", indexedAttributes);
if (indexedAttributes.indexOf("tags") === -1) {
console.log("return");
return cb();
}
const tags = [];
Object.keys(objVal).forEach(elem => {
if (elem.indexOf("x-amz-meta") !== -1 &&
elem !== "x-amz-meta-s3cmd-attrs") {
tags.push({key: elem.replace("x-amz-meta-", ""), value: objVal[elem]});
}
});
console.log(tags);
tags.forEach(tag => {
indexd.writeIndex(`${bucketName}/tags/${tag.key}/${tag.value}`, objName, tx, (tx) => {
return cb(tx);
});
});
}
const index = {
evaluateQuery: (params) => {
let queryTerms = params.term;
const bucketName = params.bucketName;
readIndex(bucketName, queryTerms, (err, queryTerms) => {
filterRemoved(queryTerms, params, (results) => {
indexd.respondQuery(params, results);
});
});
},
updateIndex: (bucketName, objName, objVal) => {
indexd.startTx( (tx) => {
updateIndexMeta(bucketName, objName, tx, (tx) => {
updateTagIndex(bucketName, objName, objVal, tx, (tx) => {
updateFileSizeIndex(bucketName, objName, objVal["content-length"], tx, (tx) => {
antidoteCli.commitTx(tx, () => {
});
});
});
/*
if (indexedAttributes.indexOf("filesize") !== -1) {
updateFileSizeIndex(bucketName, objName, objVal["content-length"], rowId);
}
if (indexedAttributes.indexOf("date") !== -1) {
updateΜodDateIndex(bucketName, objName, objVal["last-modified"], rowId);
}
if (indexedAttributes.indexOf("contenttype") !== -1) {
updateContentTypeIndex(bucketName, objName, objVal["content-type"], rowId);
}
if (indexedAttributes.indexOf("acl") !== -1) {
updateACLIndex(bucketName, objName, objVal["acl"], rowId);
}
*/
});
})
},
deleteObject: (bucketName, objName) => {
indexd.updateAntidoteSet(`${bucketName}/removed`, objName, () => {});
},
};
exports.default = index;

View File

@ -0,0 +1,95 @@
const fs = require('fs');
const io = require('socket.io-client');
const antidoteClient = require('antidote_ts_client');
const antidoteCli = require('../../antidoteNodeCli/lib/api.js');
let config = fs.readFileSync('./config.json', { encoding: 'utf-8' });
config = JSON.parse(config);
let antidote;
let antidotedb;
const clients = [];
const utils = {
connectToS3(host, port, attributes) {
let client = io.connect(`http://${host}:${port}`, {
reconnection: true,
});
client.on('connect', function() {
for (let i = 0; i < attributes.length; i++) {
client.emit('subscribe', attributes[i]);
client.emit('subscribe', 'put');
}
});
client.on('reconnecting', function(number) {
});
client.on('error', function(err) {
});
client.on('put', function(msg) {
require('./indexOps.js').default.updateIndex(msg.bucketName, msg.objName, msg.objVal);
});
client.on('query', function(msg) {
msg.client = client;
require('./indexOps.js').default.evaluateQuery(msg);
});
client.on('delete', function(msg) {
require('./indexOps.js').default.deleteObject(msg.bucketName, msg.objName);
});
clients.push(client)
},
connectToDB() {
antidotedb = antidoteCli.connect(config.antidote.port, config.antidote.host);
antidote = antidoteClient.connect(config.antidote.port, config.antidote.host);
antidoteCli.setBucket(antidotedb, 'index');
antidote.defaultBucket = 'index';
},
updateAntidoteSet(key, elem, tx, cb) {
console.log("updateAntidoteSet", key, elem);
antidoteCli.updateSetTx(tx, 'keys', [key], (tx) => {
antidoteCli.updateSetTx(tx, key, [elem], (tx) => {
return cb(tx);
});
});
},
startTx(cb) {
antidoteCli.startTx(antidotedb, (tx) => {
return cb(tx)
});
},
commitTx(cb) {
antidoteCli.commitTx(tx, () => {
return cb();
});
},
writeIndex(key, objName, tx, cb) {
console.log("writeIndex", key, objName);
antidoteCli.updateSetTx(tx, 'keys', [key], (tx) => {
antidoteCli.updateSetTx(tx, key, [objName], (tx) => {
return cb(tx);
});
});
},
readIndex(key, cb) {
antidoteCli.readSetTx(tx, key, (tx, data) => {
return cb(tx, data);
});
},
respondQuery(params, queryTerms) {
let client = params.client;
client.emit('query_response', {
result: queryTerms,
id: params.id,
term: params.term
})
}
}
exports.default = utils;
exports.config = config;

View File

@ -0,0 +1,47 @@
import ListResult from '../in_memory/ListResult';
export class ListBucketResult extends ListResult {
constructor() {
super();
this.Contents = [];
}
addContentsKey(key, keyMap) {
const objectMD = keyMap;
this.Contents.push({
key,
value: {
LastModified: objectMD['last-modified'],
ETag: objectMD['content-md5'],
StorageClass: objectMD['x-amz-storage-class'],
Owner: {
DisplayName: objectMD['owner-display-name'],
ID: objectMD['owner-id'],
},
Size: objectMD['content-length'],
// Initiated is used for overview of MPU
Initiated: objectMD.initiated,
// Initiator is used for overview of MPU.
// It is an object containing DisplayName
// and ID
Initiator: objectMD.initiator,
// EventualStorageBucket is used for overview of MPU
EventualStorageBucket: objectMD.eventualStorageBucket,
// Used for parts of MPU
partLocations: objectMD.partLocations,
// creationDate is just used for serviceGet
creationDate: objectMD.creationDate,
},
});
this.MaxKeys += 1;
}
hasDeleteMarker(key, keyMap) {
const objectMD = keyMap;
if (objectMD['x-amz-delete-marker'] !== undefined) {
return (objectMD['x-amz-delete-marker'] === true);
}
return false;
}
}

View File

@ -0,0 +1,339 @@
import { errors } from 'arsenal';
import { markerFilter, prefixFilter } from '../in_memory/bucket_utilities';
import { ListBucketResult } from './ListBucketResult';
import getMultipartUploadListing from '../in_memory/getMultipartUploadListing';
import config from '../../Config';
import async from 'async';
import antidoteCli from '../../antidoteNodeCli/lib/api.js';
const defaultMaxKeys = 1000;
class AntidoteInterface {
constructor() {
this.antidotedb = antidoteCli.connect(config.antidote.port, config.antidote.host);
}
createBucket(bucketName, bucketMD, log, cb) {
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
antidoteCli.startTx(this.antidotedb, (tx) => {
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
// TODO Check whether user already owns the bucket,
// if so return "BucketAlreadyOwnedByYou"
// If not owned by user, return "BucketAlreadyExists"
if (bucket) {
return cb(errors.BucketAlreadyExists);
}
const mapKeys = []
const mapValues = []
Object.keys(bucketMD).forEach(key => {
mapKeys.push(key)
mapValues.push(bucketMD[key])
});
antidoteCli.updateMapRegisterTx(tx, `${bucketName}/md`, mapKeys, mapValues, (tx) => {
antidoteCli.commitTx(tx, () => {
return cb();
});
});
});
});
}
putBucketAttributes(bucketName, bucketMD, log, cb) {
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
antidoteCli.startTx(this.antidotedb, (tx) => {
this.getBucketAttributes(tx, bucketName, log, err => {
if (err) {
return cb(err);
}
const mapKeys = []
const mapValues = []
Object.keys(bucketMD).forEach(key => {
mapKeys.push(key)
mapValues.push(bucketMD[key])
});
antidoteCli.updateMapRegisterTx(tx, `${bucketName}/md`, mapKeys, mapValues, (tx) => {
antidoteCli.commitTx(tx, () => {
return cb();
});
});
});
});
}
getBucketAttributes(tx, bucketName, log, cb) {
if (tx === null) {
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
antidoteCli.startTx(this.antidotedb, (tx) => {
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
antidoteCli.commitTx(tx, () => {
if (Object.keys(bucketMD).length === 0) {
return cb(errors.NoSuchBucket);
}
return cb(null, bucketMD);
});
});
});
}
else {
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
if (Object.keys(bucketMD).length === 0) {
return cb(errors.NoSuchBucket);
}
return cb(null, bucketMD);
});
}
}
deleteBucket(bucketName, log, cb) {
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
antidoteCli.startTx(this.antidotedb, (tx) => {
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
if (err) {
return cb(err);
}
antidoteCli.readSetTx(tx, `${bucketName}/objs`, (tx, objects) => {
if (bucket && objects.length > 0) {
antidoteCli.commitTx(tx, () => {
return cb(errors.BucketNotEmpty);
});
}
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
const mapKeys = []
Object.keys(bucketMD).forEach(key => {
mapKeys.push(key)
});
antidoteCli.removeMapRegisterTx(tx, `${bucketName}/md`, mapKeys, (tx) => {
antidoteCli.commitTx(tx, () => {
return cb(null);
});
});
});
});
});
});
}
putObject(bucketName, objName, objVal, log, cb) {
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
antidoteCli.startTx(this.antidotedb, (tx) => {
this.getBucketAttributes(tx, bucketName, log, err => {
if (err) {
return cb(err);
}
const mapKeys = []
const mapValues = []
Object.keys(objVal).forEach(key => {
mapKeys.push(key)
mapValues.push(objVal[key])
});
antidoteCli.updateSetTx(tx, `${bucketName}/objs`, [objName], (tx) => {
antidoteCli.updateMapRegisterTx(tx, `${objName}`, mapKeys, mapValues, (tx) => {
antidoteCli.commitTx(tx, () => {
return cb();
});
});
});
});
});
}
getBucketAndObject(bucketName, objName, log, cb) {
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
antidoteCli.startTx(this.antidotedb, (tx) => {
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
if (err) {
return cb(err, { bucket });
}
const bucket_MD = {}
Object.keys(bucket).map(function(key) {
bucket_MD[key.substr(1)] = bucket[key]
});
antidoteCli.readMapTx(tx, `${objName}`, (tx, objectMD) => {
antidoteCli.commitTx(tx, () => {
if (!bucket || Object.keys(objectMD).length === 0) {
return cb(null, { bucket: JSON.stringify(bucket_MD) });
}
return cb(null, {
bucket: JSON.stringify(bucket_MD),
obj: JSON.stringify(objectMD),
});
});
});
});
});
}
getObject(bucketName, objName, log, cb) {
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
antidoteCli.startTx(this.antidotedb, (tx) => {
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
if (err) {
return cb(err);
}
antidoteCli.readMapTx(tx, `${objName}`, (tx, objectMD) => {
antidoteCli.commitTx(tx, () => {
if (!bucket || Object.keys(objectMD).length === 0) {
return cb(errors.NoSuchKey);
}
return cb(null, objectMD);
});
});
});
});
}
deleteObject(bucketName, objName, log, cb) {
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
antidoteCli.startTx(this.antidotedb, (tx) => {
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
if (err) {
return cb(err);
}
antidoteCli.readMapTx(tx, `${objName}`, (tx, objectMD) => {
if (!bucket || Object.keys(objectMD).length === 0) {
antidoteCli.commitTx(tx, () => {
return cb(errors.NoSuchKey);
});
}
const mapKeys = []
Object.keys(objectMD).forEach(key => {
mapKeys.push(key)
});
antidoteCli.removeMapRegisterTx(tx, `${objName}`, mapKeys, (tx) => {
antidoteCli.removeSetTx(tx, `${bucketName}/objs`, [objName], (tx) => {
antidoteCli.commitTx(tx, () => {
return cb();
});
});
});
});
});
});
}
getObjectMD(tx, bucketName, key, callback) {
antidoteCli.readMapTx(tx, `${key}`, (tx, objectMD) => {
if (Object.keys(objectMD).length === 0) {
return callback(error.NoSuchKey, null);
}
return callback(null, objectMD);
});
}
listObject(bucketName, params, log, cb) {
const { prefix, marker, delimiter, maxKeys } = params;
if (prefix && typeof prefix !== 'string') {
return cb(errors.InvalidArgument);
}
if (marker && typeof marker !== 'string') {
return cb(errors.InvalidArgument);
}
if (delimiter && typeof delimiter !== 'string') {
return cb(errors.InvalidArgument);
}
if (maxKeys && typeof maxKeys !== 'number') {
return cb(errors.InvalidArgument);
}
let numKeys = maxKeys;
// If paramMaxKeys is undefined, the default parameter will set it.
// However, if it is null, the default parameter will not set it.
if (numKeys === null) {
numKeys = defaultMaxKeys;
}
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
antidoteCli.startTx(this.antidotedb, (tx) => {
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
if (Object.keys(bucketMD).length === 0) {
return cb(errors.NoSuchBucket);
}
const response = new ListBucketResult();
antidoteCli.readSetTx(tx, `${bucketName}/objs`, (tx, keys) => {
async.map(keys, this.getObjectMD.bind(null, tx, bucketName), function(err, objectMeta) {
antidoteCli.commitTx(tx, () => {
// If marker specified, edit the keys array so it
// only contains keys that occur alphabetically after the marker
if (marker) {
keys = markerFilter(marker, keys);
response.Marker = marker;
}
// If prefix specified, edit the keys array so it only
// contains keys that contain the prefix
if (prefix) {
keys = prefixFilter(prefix, keys);
response.Prefix = prefix;
}
// Iterate through keys array and filter keys containing
// delimiter into response.CommonPrefixes and filter remaining
// keys into response.Contents
for (let i = 0; i < keys.length; ++i) {
const currentKey = keys[i];
// Do not list object with delete markers
if (response.hasDeleteMarker(currentKey,
objectMeta[i])) {
continue;
}
// If hit numKeys, stop adding keys to response
if (response.MaxKeys >= numKeys) {
response.IsTruncated = true;
response.NextMarker = keys[i - 1];
break;
}
// If a delimiter is specified, find its index in the
// current key AFTER THE OCCURRENCE OF THE PREFIX
let delimiterIndexAfterPrefix = -1;
let prefixLength = 0;
if (prefix) {
prefixLength = prefix.length;
}
const currentKeyWithoutPrefix = currentKey
.slice(prefixLength);
let sliceEnd;
if (delimiter) {
delimiterIndexAfterPrefix = currentKeyWithoutPrefix
.indexOf(delimiter);
sliceEnd = delimiterIndexAfterPrefix + prefixLength;
response.Delimiter = delimiter;
}
// If delimiter occurs in current key, add key to
// response.CommonPrefixes.
// Otherwise add key to response.Contents
if (delimiterIndexAfterPrefix > -1) {
const keySubstring = currentKey.slice(0, sliceEnd + 1);
response.addCommonPrefix(keySubstring);
} else {
response.addContentsKey(currentKey,
objectMeta[i]);
}
}
return cb(null, response);
});
});
});
});
});
}
listMultipartUploads(bucketName, listingParams, log, cb) {
process.nextTick(() => {
this.getBucketAttributes(null, bucketName, log, (err, bucket) => {
if (bucket === undefined) {
// no on going multipart uploads, return empty listing
return cb(null, {
IsTruncated: false,
NextMarker: undefined,
MaxKeys: 0,
});
}
return getMultipartUploadListing(bucket, listingParams, cb);
});
});
}
};
export default AntidoteInterface;

View File

@ -2,7 +2,10 @@ import BucketClientInterface from './bucketclient/backend';
import BucketFileInterface from './bucketfile/backend'; import BucketFileInterface from './bucketfile/backend';
import BucketInfo from './BucketInfo'; import BucketInfo from './BucketInfo';
import inMemory from './in_memory/backend'; import inMemory from './in_memory/backend';
import AntidoteInterface from './antidote/backend';
import config from '../Config'; import config from '../Config';
import indexClient from '../indexClient/indexClient'
import async from 'async'
let client; let client;
let implName; let implName;
@ -16,6 +19,33 @@ if (config.backends.metadata === 'mem') {
} else if (config.backends.metadata === 'scality') { } else if (config.backends.metadata === 'scality') {
client = new BucketClientInterface(); client = new BucketClientInterface();
implName = 'bucketclient'; implName = 'bucketclient';
} else if (config.backends.metadata === 'antidote') {
client = new AntidoteInterface();
implName = 'antidote';
}
function getQueryResults(params, objName, callback) {
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
client.getObject(bucketName, objName, log, (err, data) => {
if (err) {
callback(err, null);
}
callback(null, {key: objName, value: {
LastModified: data['last-modified'],
ETag: data['content-md5'],
StorageClass: data['x-amz-storage-class'],
Owner: {
ID: data['owner-id'],
DisplayName: data['owner-display-name']
},
Size: data['content-length'],
Initiated: undefined,
Initiator: undefined,
EventualStorageBucket: undefined,
partLocations: undefined,
creationDate: undefined
}});
});
} }
const metadata = { const metadata = {
@ -45,7 +75,7 @@ const metadata = {
getBucket: (bucketName, log, cb) => { getBucket: (bucketName, log, cb) => {
log.debug('getting bucket from metadata'); log.debug('getting bucket from metadata');
client.getBucketAttributes(bucketName, log, (err, data) => { client.getBucketAttributes(null, bucketName, log, (err, data) => {
if (err) { if (err) {
log.debug('error from metadata', { implName, error: err }); log.debug('error from metadata', { implName, error: err });
return cb(err); return cb(err);
@ -75,6 +105,12 @@ const metadata = {
return cb(err); return cb(err);
} }
log.debug('object successfully put in metadata'); log.debug('object successfully put in metadata');
if (config.userMetaIndexing || config.systemMetaIndexing) {
if (objName.indexOf('..|..') !== -1) {
return cb(err);
}
indexClient.putObjectMD(bucketName, objName, objVal);
}
return cb(err); return cb(err);
}, params); }, params);
}, },
@ -113,11 +149,17 @@ const metadata = {
return cb(err); return cb(err);
} }
log.debug('object deleted from metadata'); log.debug('object deleted from metadata');
if (config.userMetaIndexing || config.systemMetaIndexing) {
indexClient.deleteObjectMD(bucketName, objName);
}
return cb(err); return cb(err);
}, params); }, params);
}, },
listObject: (bucketName, listingParams, log, cb) => { listObject: (bucketName, listingParams, log, cb) => {
if (listingParams.query) {
indexClient.listObject(bucketName, listingParams, cb);
} else {
client client
.listObject(bucketName, listingParams, .listObject(bucketName, listingParams,
log, (err, data) => { log, (err, data) => {
@ -129,6 +171,34 @@ const metadata = {
log.debug('object listing retrieved from metadata'); log.debug('object listing retrieved from metadata');
return cb(err, data); return cb(err, data);
}); });
}
},
respondQueryGetMD: (result, params) => {
async.map(result, getQueryResults.bind(null, params), function(err, res) {
const response = {
IsTruncated: false,
NextMarker: params.marker,
CommonPrefixes: [],
MaxKeys: 10,
Contents: res
}
return params.cb(err, response);
});
},
respondQueryFilter: (result, params) => {
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
client.listObject(bucketName, { prefix, marker, maxKeys, delimiter },
log, (err, data) => {
if (err) {
return cb(err);
}
data.Contents = data.Contents.filter(elem => {
return result.indexOf(elem.key) !== -1;
});
return cb(err, data);
});
}, },
listMultipartUploads: (bucketName, listingParams, log, cb) => { listMultipartUploads: (bucketName, listingParams, log, cb) => {

View File

@ -7,7 +7,7 @@ import { logger } from './utilities/logger';
import { clientCheck } from './utilities/healthcheckHandler'; import { clientCheck } from './utilities/healthcheckHandler';
import _config from './Config'; import _config from './Config';
import routes from './routes'; import routes from './routes';
import indexClient from './indexClient/indexClient'
class S3Server { class S3Server {
/** /**
@ -38,6 +38,7 @@ class S3Server {
* This starts the http server. * This starts the http server.
*/ */
startup(port) { startup(port) {
indexClient.createPublisher(_config.indexServerPort, logger);
// Todo: http.globalAgent.maxSockets, http.globalAgent.maxFreeSockets // Todo: http.globalAgent.maxSockets, http.globalAgent.maxFreeSockets
if (_config.https) { if (_config.https) {
this.server = https.createServer({ this.server = https.createServer({
@ -118,7 +119,7 @@ class S3Server {
export default function main() { export default function main() {
let clusters = _config.clusters || 1; let clusters = _config.clusters || 1;
if (process.env.S3BACKEND === 'mem') { if (process.env.S3BACKEND === 'mem' || process.env.S3BACKEND === 'antidote') {
clusters = 1; clusters = 1;
} }
if (cluster.isMaster) { if (cluster.isMaster) {

View File

@ -19,6 +19,7 @@
}, },
"homepage": "https://github.com/scality/S3#readme", "homepage": "https://github.com/scality/S3#readme",
"dependencies": { "dependencies": {
"antidote_ts_client": "^0.1.0",
"arsenal": "scality/Arsenal", "arsenal": "scality/Arsenal",
"async": "~1.4.2", "async": "~1.4.2",
"babel-core": "^6.5.2", "babel-core": "^6.5.2",
@ -33,6 +34,7 @@
"multilevel": "^7.3.0", "multilevel": "^7.3.0",
"node-uuid": "^1.4.3", "node-uuid": "^1.4.3",
"ready-set-stream": "1.0.7", "ready-set-stream": "1.0.7",
"socket.io": "^1.7.2",
"sproxydclient": "scality/sproxydclient", "sproxydclient": "scality/sproxydclient",
"utapi": "scality/utapi", "utapi": "scality/utapi",
"utf8": "~2.1.1", "utf8": "~2.1.1",
@ -47,6 +49,7 @@
"aws-sdk": "^2.2.11", "aws-sdk": "^2.2.11",
"babel-cli": "^6.2.0", "babel-cli": "^6.2.0",
"babel-eslint": "^6.0.0", "babel-eslint": "^6.0.0",
"babel-plugin-transform-async-to-generator": "^6.24.1",
"bluebird": "^3.3.1", "bluebird": "^3.3.1",
"eslint": "^2.4.0", "eslint": "^2.4.0",
"eslint-config-airbnb": "^6.0.0", "eslint-config-airbnb": "^6.0.0",
@ -68,9 +71,13 @@
"ft_test": "mocha tester.js --compilers js:babel-core/register", "ft_test": "mocha tester.js --compilers js:babel-core/register",
"init": "node init.js", "init": "node init.js",
"install_ft_deps": "npm install aws-sdk@2.2.11 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7 zombie@5.0.5", "install_ft_deps": "npm install aws-sdk@2.2.11 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7 zombie@5.0.5",
"install_antidote_client": "babel --plugins transform-async-to-generator lib/antidoteNodeCli/src/ -d lib/antidoteNodeCli/lib/",
"lint": "eslint $(git ls-files '*.js')", "lint": "eslint $(git ls-files '*.js')",
"lint_md": "mdlint $(git ls-files '*.md')", "lint_md": "mdlint $(git ls-files '*.md')",
"mem_backend": "S3BACKEND=mem node index.js", "mem_backend": "S3BACKEND=mem node index.js",
"antidote_backend": "S3BACKEND=antidote node index.js",
"antidote_deploy": "S3BACKEND=antidote ./deploy_instances.sh 3",
"antidote_stop": "S3BACKEND=antidote ./stop_instances.sh",
"perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js", "perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js",
"start": "node init.js && node index.js", "start": "node init.js && node index.js",
"start_utapi": "node utapiServer.js", "start_utapi": "node utapiServer.js",

13
portConfig.js Normal file
View File

@ -0,0 +1,13 @@
const fs = require('fs');
const path = require('path');
const configpath = path.join(__dirname, '/config.json');
var config = fs.readFileSync(configpath, { encoding: 'utf-8' });
config = JSON.parse(config);
config.port = 8000 + ((config.port - 8000 + 1) % 3);
config.antidote.port = 8187 + ((config.antidote.port - 8187 + 100) % 400);
config.indexServerPort = 7000 + ((config.indexServerPort - 7000 + 1) % 3);
config = JSON.stringify(config);
fs.writeFileSync(configpath, config, { encoding: 'utf-8' });

4
stop_instances.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
fuser -k -n tcp 8000
fuser -k -n tcp 8001
fuser -k -n tcp 8002