Compare commits

...

34 Commits

Author SHA1 Message Date
Dimitrios Vasilas e27cb1cf6c improve metadata crdt representation 2017-01-10 18:16:19 +01:00
Dimitrios Vasilas 9c8c8fd9ea merge with updates to master 2016-12-25 12:24:25 +02:00
Dimitrios Vasilas a236fd4457 add wrapper for put to db 2016-12-23 17:13:10 +01:00
Dimitrios Vasilas e76472fa20 fix range search 2016-12-23 15:15:30 +01:00
Dimitrios Vasilas e6f82079c6 fix range search bug 2016-12-22 17:39:40 +01:00
Dimitrios Vasilas cfb77cc9fe different buckets for index and metadata 2016-12-21 18:24:59 +01:00
Dimitrios Vasilas 17a9cd1071 implement logical operators in antidote backend 2016-12-21 16:11:30 +01:00
Dimitrios Vasilas 8ce70e7337 add range search 2016-12-20 18:32:27 +01:00
Dimitrios Vasilas cdbcae37b2 revert levedb integer indexing 2016-12-16 16:22:18 +01:00
Dimitrios Vasilas 0f5cd3e545 improve antitode integration 2016-12-16 15:52:08 +01:00
Dimitrios Vasilas f6cddaf6a5 add antidote metadata backend 2016-12-14 12:00:45 +01:00
Dimitrios Vasilas c225e483bc add Antidotedb indexing backend 2016-12-09 15:28:22 +01:00
Dimitrios Vasilas 53cb8a3263 change communication scheme 2016-12-02 12:54:34 +01:00
Dimitrios Vasilas d302070959 add indexing server configuration 2016-11-29 17:39:40 +01:00
Dimitrios Vasilas 7597c379e7 improve client/server system implementation 2016-11-28 18:48:04 +01:00
Dimitrios Vasilas 541e97f1ea change index representation in database 2016-11-25 18:34:02 +01:00
Dimitrios Vasilas c32fe7df51 bug fixes 2016-11-16 19:26:40 +01:00
Dimitrios Vasilas 9e301aaf25 add regexp search on tags 2016-10-19 13:20:48 +02:00
Dimitrios Vasilas eb441dd09c fix range queries 2016-10-14 15:12:53 +02:00
Dimitrios Vasilas 6fddd95a1d change filesize indexing 2016-10-13 18:33:57 +02:00
Dimitrios Vasilas 1303bd8b97 add delete functionality 2016-10-13 17:13:42 +02:00
Dimitrios Vasilas 093c071836 implement bitmapd functionality 2016-10-05 01:05:27 +02:00
Dimitrios Vasilas da115e6ce4 various fixes 2016-08-30 10:35:37 +02:00
Dimitrios Vasilas b588d5e4d0 fix lint errors 2016-08-25 12:00:51 +02:00
Dimitrios Vasilas 5f1bd15fb0 merge indexes in single object 2016-08-24 15:15:58 +02:00
Dimitrios Vasilas 22a783f23e add acl indexing 2016-08-23 16:28:46 +02:00
Dimitrios Vasilas 320fc256bf implement last content type index 2016-08-22 17:06:05 +02:00
Dimitrios Vasilas 361585825f implement last modification date index 2016-08-22 14:36:05 +02:00
Dimitrios Vasilas e390e355ef add file size indexing 2016-08-19 15:56:42 +02:00
Dimitrios Vasilas f03940e45e add config option to enable indexing of x-amz-meta headers 2016-08-18 14:20:02 +02:00
Dimitrios Vasilas e67abad697 tag indexing (integer optimization) 2016-08-18 13:06:42 +02:00
Dimitrios Vasilas 9a42556448 implement index functionality 2016-08-14 21:38:05 +02:00
Dimitrios Vasilas 3056714c02 implement object to rowid mechanism 2016-08-11 17:39:04 +02:00
Dimitrios Vasilas d871755296 add dimitriosvasilas/node-bitmap-ewah to package.json 2016-08-02 16:44:02 +02:00
13 changed files with 1239 additions and 5 deletions

View File

@ -48,6 +48,13 @@
"logLevel": "info", "logLevel": "info",
"dumpLevel": "error" "dumpLevel": "error"
}, },
"antidote": {
"host": "localhost",
"port": 8087
},
"userMetaIndexing":true,
"systemMetaIndexing":true,
"indexServerPort":7000,
"healthChecks": { "healthChecks": {
"allowFrom": ["127.0.0.1/8", "::1"] "allowFrom": ["127.0.0.1/8", "::1"]
} }

12
indexServer/config.json Normal file
View File

@ -0,0 +1,12 @@
{
"S3": {
"host": "127.0.0.1",
"port": 7000
},
"antidote": {
"host": "127.0.0.1",
"port": 8087
},
"leveldb_path": "indexdb0",
"backend": "antidote"
}

4
indexServer/index.js Normal file
View File

@ -0,0 +1,4 @@
'use strict';
require('./lib/bitmapd-utils.js').default.connectToS3();
require('./lib/bitmapd-utils.js').default.connectToDB();

View File

@ -0,0 +1,148 @@
const fs = require('fs');
const io = require('socket.io-client');
const levelup = require('levelup');
const leveldown = require('leveldown');
const antidoteClient = require('antidote_ts_client');
let config = fs.readFileSync('./config.json', { encoding: 'utf-8' });
config = JSON.parse(config);
let antidote;
let client = null;
let db = null;
const utils = {
connectToS3() {
client = io.connect(`http://${config.S3.host}:${config.S3.port}`, {
reconnection: true,
});
client.on('connect', function() {
client.emit('subscribe', 'puts');
client.emit('subscribe', 'deletes');
client.emit('subscribe', 'queries');
});
client.on('reconnecting', function(number) {
});
client.on('error', function(err) {
});
client.on('put', function(msg) {
require('./utils.js').default.updateIndex(msg.bucketName, msg.objName, msg.objVal);
});
client.on('query', function(msg) {
if (!msg.params.prefix)
msg.params.prefix = undefined;
if (!msg.params.marker)
msg.params.marker = undefined;
if (!msg.params.delimiter)
msg.params.delimiter = undefined;
require('./utils.js').default.evaluateQuery(msg.query, msg.params);
});
client.on('delete', function(msg) {
require('./utils.js').default.deleteObject(msg.bucketName, msg.objName);
});
},
connectToDB() {
antidote = antidoteClient.connect(config.antidote.port, config.antidote.host);
antidote.defaultBucket = 'index';
leveldown.destroy(config.leveldb_path, function (err) {
if (!err) {
db = levelup(config.leveldb_path);
}
});
},
updateAntidoteSet(key, elem, cb) {
const keyset = antidote.set('keys');
antidote.update(
keyset.add(key)
).then( (resp) => {
const set = antidote.set(key);
antidote.update(
set.add(elem)
).then( (resp) => {
return cb();
});
});
},
readAntidoteSet(key, cb) {
const set = antidote.set(key);
set.read().then(objs => {
return cb(null, objs);
});
},
removeFromAntidoteSet(key, objName, cb) {
const set = antidote.set(key);
antidote.update(
set.remove(objName)
).then( (resp) => {
return cb();
});
},
respondQuery(params, queryTerms) {
client.emit('query_response', {
result: queryTerms,
params
})
},
put(key, value, cb) {
db.put(key, value, function(err) {
return cb(err);
});
},
get(key, cb) {
db.get(key, function(err, data) {
return cb(err, data);
});
},
getPrefix(prefix, cb) {
const list = []
db.createReadStream({
start: prefix,
end: prefix + "\xFF"
})
.on('data', function(data) {
list.push(data);
})
.on('error', function(err) {
return cb(err, null)
})
.on('close', function() {
if (list.length === 0)
return cb(null, null)
return cb(null, list)
});
},
getRange(start, end, cb) {
const list = []
db.createReadStream({
start: start,
end: end
})
.on('data', function(data) {
list.push(data);
})
.on('error', function(err) {
return cb(err, null)
})
.on('close', function() {
return cb(null, list)
});
},
batchWrite(ops, cb) {
db.batch(ops, function(err) {
return cb(err)
})
}
}
exports.default = utils;
exports.config = config;

528
indexServer/lib/utils.js Normal file
View File

@ -0,0 +1,528 @@
const bitmap = require('node-bitmap-ewah');
const indexd = require('./bitmapd-utils').default;
const config = require('./bitmapd-utils').config;
const async = require('async');
const _ = require('underscore')
function intersection(a, b) {
if (config.backend === "leveldb") {
return a.and(b);
} else if (config.backend === "antidote") {
return _.intersection(a, b);
}
}
function union(a, b) {
if (config.backend === "leveldb") {
return a.and(b);
} else if (config.backend === "antidote") {
return _.union(a, b);
}
}
function difference(u, a) {
if (config.backend === "leveldb") {
return a.not()
} else if (config.backend === "antidote") {
return _.difference(u, a);
}
}
function padLeft(nr, n){
return Array(n-String(nr).length+1).join('0')+nr;
}
function binaryIndexOf(arr, searchElement, op) {
let minIndex = 0;
let maxIndex = arr.length - 1;
let currentIndex;
let currentElement;
while (minIndex <= maxIndex) {
currentIndex = (minIndex + maxIndex) / 2 | 0;
currentElement = arr[currentIndex];
if (currentElement < searchElement) {
minIndex = currentIndex + 1;
}
else if (currentElement > searchElement) {
maxIndex = currentIndex - 1;
}
else {
break;
}
}
if (op === '=') {
return currentIndex;
} else if (op === '>' && arr[currentIndex] <= searchElement) {
currentIndex += 1
} else if (op === '>=' && arr[currentIndex] < searchElement) {
currentIndex += 1
} else if (op === '<' && arr[currentIndex] >= searchElement) {
currentIndex -= 1
} else if (op === '<=') {
if (arr[currentIndex] > searchElement) {
currentIndex -= 1
}
}
if (currentIndex > arr.length-1 || currentIndex < 0) {
return -1;
}
return currentIndex;
}
function storeToBitmap(stored) {
const bm = bitmap.createObject();
if (stored) {
stored[2] = new Buffer(stored[2], 'binary');
bm.read(stored);
}
return bm;
}
function parseNotOperator(result, not, callback) {
if (not) {
callback(null, result.not());
} else {
callback(null, result);
}
}
function updateObjectMapping(objMapping, objName) {
let rowId;
if (Object.keys(objMapping)) {
if (typeof objMapping.mapping[objName] === 'number') {
rowId = objMapping.mapping[objName];
} else if (objMapping.nextAvail.length > 0) {
rowId = objMapping.nextAvail[0];
objMapping.nextAvail.splice(0, 1);
} else {
rowId = objMapping.length;
objMapping.length += 1;
}
}
objMapping.mapping[rowId] = objName;
objMapping.mapping[objName] = rowId;
return rowId;
}
function getObjectMeta(bucketName, cb) {
if (config.backend === "leveldb") {
return cb(null);
} else if (config.backend === "antidote") {
indexd.readAntidoteSet(`${bucketName}`, (err, allObjects) => {
return cb(allObjects);
});
}
}
function updateIndexMeta(bucketName, objName, cb) {
if (config.backend === "leveldb") {
let rowId;
indexd.get(`${bucketName}`, (err, objMapping) => {
if (err) {
objMapping = {nextAvail: [], length:1, mapping:{}}
}
else {
objMapping = JSON.parse(objMapping);
}
rowId = updateObjectMapping(objMapping, objName);
indexd.put(`${bucketName}`, JSON.stringify(objMapping), err => {
if (err) {
return ;
}
return cb(err, rowId)
})
})
} else if (config.backend === "antidote") {
indexd.updateAntidoteSet(`${bucketName}`, objName, () => {
return cb(null, -1);
});
}
}
function filterRemoved(results, params, cb) {
indexd.readAntidoteSet(`${params.bucketName}/removed`, (err, removed) => {
results = results.filter(elem => {
return removed.indexOf(elem) === -1;
});
return cb(results);
});
}
function constructRange(value, callback) {
indexd.readAntidoteSet(value, (err, result) => {
callback(null, result);
});
}
function readDB(key, cb) {
if (config.backend === "leveldb") {
indexd.get(key, (err, data) => {
if (!err) {
data = JSON.parse(data)
}
return (err, data)
});
} else if (config.backend === "antidote") {
indexd.readAntidoteSet(key, (err, data) => {
return cb(err, data);
});
}
}
function writeDB(key, objName, attribute, value, rowId, cb) {
if (config.backend === "leveldb") {
updateIndexEntry(key, rowId, cb);
} else if (config.backend === "antidote") {
indexd.updateAntidoteSet(key, objName, () => {
indexd.updateAntidoteSet(attribute, value, () => {
return cb(null)
});
});
}
}
function searchIntRange(bucketName, op, term, not, callback) {
term = term.replace('--integer', '');
const attr = term.split('/')[0];
let value = parseInt(term.split('/')[1], 10);
if (op === '=') {
readDB(`${bucketName}/${attr}/${value}`, (err, data) =>{
callback(err, data);
});
} else {
if (config.backend === "antidote") {
indexd.readAntidoteSet(`${bucketName}/${attr}`, (err, result) => {
const range = []
const index = binaryIndexOf(result, value, op)
if (index === -1) {
return parseNotOperator([], not, callback);
}
if (op.indexOf('>') !== -1) {
if (op.indexOf('=') === -1) {
value += 1
}
for (let i = index; i < result.length; i+=1) {
range.push(`${bucketName}/${attr}/${result[i]}`)
}
} else if (op.indexOf('<') !== -1) {
if (op.indexOf('=') === -1) {
value -= 1
}
for (let i = index; i >= 0; i-=1) {
range.push(`${bucketName}/${attr}/${result[i]}`)
}
}
const objRange = [];
async.map(range, constructRange, function(err, res) {
res.forEach(arr => {
arr.forEach(elem => {
objRange.push(elem)
})
})
objRange.sort();
parseNotOperator(objRange, not, callback);
});
});
}
}
}
function searchRegExp(bucketName, searchTerm, not, callback) {
const regexp = new RegExp(searchTerm);
let result = bitmap.createObject();
indexd.getPrefix(`${bucketName}/x-amz-meta`, (err, list) => {
list.forEach(elem => {
if (elem.key.indexOf('/') !== -1) {
if (regexp.test(elem.key.substring(11))) {
result = result.or(storeToBitmap(JSON.parse(elem.value)));
}
}
});
parseNotOperator(result, not, callback);
})
}
function readFromLevelDB(key, not, callback) {
}
function readTagIndex(bucketName, searchTerm, not, callback) {
let term = null;
let operator = null;
if (searchTerm.indexOf('--integer') !== -1) {
operator = searchTerm.split('/')[1];
term = searchTerm.replace(operator, '');
term = term.replace('/', '');
searchIntRange(bucketName, operator, term, not, callback);
} else if (searchTerm.indexOf('--regexp') !== -1) {
searchTerm = searchTerm.replace('--regexp', '');
searchTerm = searchTerm.substring(11);
searchRegExp(bucketName, searchTerm, not, callback);
} else {
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
callback(err, data);
});
}
}
function readFileSizeIndex(bucketName, searchTerm, not, callback) {
const operator = searchTerm.split('/')[1];
let term = searchTerm.replace(operator, '');
term = term.replace('/', '');
searchIntRange(bucketName, operator, term, not, callback);
}
function readModDateIndex(bucketName, searchTerm, not, callback) {
const operator = searchTerm.split('/')[1];
let term = searchTerm.replace(operator, '');
term = term.replace('/', '');
return searchIntRange(bucketName, operator, term, not, callback);
}
function readACLIndex(bucketName, searchTerm, not, callback) {
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
callback(err, data);
});
}
function readContentTypeIndex(bucketName, searchTerm, not, callback) {
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
callback(err, data);
});
}
function readIndex(bucketName, searchTerm, callback) {
if (searchTerm.indexOf('op/AND') !== -1
|| searchTerm.indexOf('op/OR') !== -1
|| searchTerm.indexOf('op/NOT') !== -1) {
callback(null, searchTerm);
}
let notOperator = false;
let result;
if (searchTerm.indexOf('x-amz-meta') !== -1) {
return readTagIndex(bucketName, searchTerm, notOperator, callback);
} else if (searchTerm.indexOf('filesize') !== -1) {
return readFileSizeIndex(bucketName, searchTerm, notOperator, callback);
} else if (searchTerm.indexOf('modificationdate') !== -1) {
return readModDateIndex(bucketName, searchTerm, notOperator, callback);
} else if (searchTerm.indexOf('contenttype') !== -1 || searchTerm.indexOf('contentsubtype') !== -1) {
return readContentTypeIndex(bucketName, searchTerm, notOperator, callback);
} else if (searchTerm.indexOf('acl') !== -1) {
return readACLIndex(bucketName, searchTerm, notOperator, callback);
}
return result;
}
function bitmapToStore(bitmap) {
const toStore = bitmap.write();
toStore[2] = toStore[2].toString('binary');
return toStore;
}
function deleteOldEntries(bucketName, rowId, cb) {
if (config.backend === "leveldb") {
indexd.getPrefix(`${bucketName}/acl/`, (err, list) => {
if (!list) {
return cb();
}
const ops = []
list.forEach(elem => {
const tmp = storeToBitmap(JSON.parse(elem.value));
tmp.unset(rowId);
ops.push({ 'type':'put', 'key': elem.key, 'value':JSON.stringify(bitmapToStore(tmp))})
});
indexd.batchWrite(ops, (err) =>{
if (err) {
return null
} else {
return cb();
}
});
});
} else if (config.backend === "antidote") {
return cb();
}
}
function updateBitmap(bitmap, rowId) {
if (bitmap.length() - 1 <= rowId) {
bitmap.push(rowId);
} else {
bitmap.set(rowId);
}
return bitmapToStore(bitmap);
}
function updateIndexEntry(key, rowId, cb) {
indexd.get(key, (err, data) => {
if (err) {
return (err);
} else {
data = JSON.parse(data)
}
indexd.put(key, JSON.stringify(updateBitmap(storeToBitmap(data), rowId)), err =>{
return (err);
});
});
}
function updateIntIndex(bucketName, objName, attribute, value, rowId) {
writeDB(`${bucketName}/${attribute}/${value}`, objName, `${bucketName}/${attribute}`, value, rowId, (err) =>{
return ;
})
}
function updateACLIndex(bucketName, objName, objVal, rowId) {
deleteOldEntries(bucketName, rowId, () => {
Object.keys(objVal).forEach(elem => {
if (typeof objVal[elem] === 'string') {
writeDB(`${bucketName}/acl/${elem}/${objVal[elem]}`, objName, `${bucketName}/acl`, `${elem}/${objVal[elem]}`, rowId, (err) =>{
return ;
});
} else {
objVal[elem].forEach(item => {
writeDB(`${bucketName}/acl/${elem}/${item}`, objName, `${bucketName}/acl`, `${elem}/${item}`, rowId, (err) =>{
return ;
});
});
}
});
});
}
function updateContentTypeIndex(bucketName, objName, objVal, rowId) {
const type = objVal.split('/')[0];
const subtype = objVal.split('/')[1];
writeDB(`${bucketName}/contenttype/${type}`, objName, `${bucketName}/contenttype`, type, rowId, (err) =>{
writeDB(`${bucketName}/contentsubtype/${subtype}`, objName, `${bucketName}/contentsubtype`, subtype, rowId, (err) =>{
return ;
});
});
}
function updateΜodDateIndex(bucketName, objName, objVal, rowId) {
const date = new Date(objVal);
const term = 'modificationdate-';
const year = date.getUTCFullYear();
const month = date.getUTCMonth() + 1;
const day = date.getUTCDate();
const hours = date.getUTCHours();
const minutes = date.getUTCMinutes();
updateIntIndex(bucketName, objName, `${term}year`, year, rowId);
updateIntIndex(bucketName, objName, `${term}month`, month, rowId);
updateIntIndex(bucketName, objName, `${term}day`, day, rowId);
updateIntIndex(bucketName, objName, `${term}hours`, hours, rowId);
updateIntIndex(bucketName, objName, `${term}minutes`, minutes, rowId);
}
function updateFileSizeIndex(bucketName, objName, objVal, rowId) {
updateIntIndex(bucketName, objName, `filesize`, parseInt(objVal, 10), rowId);
}
function updateTagIndex(bucketName, objName, objVal, rowId) {
const tags = [];
Object.keys(objVal).forEach(elem => {
if (elem.indexOf('x-amz-meta') !== -1 &&
elem !== 'x-amz-meta-s3cmd-attrs') {
tags.push({key: elem.replace('x-amz-meta-', ''), value: objVal[elem]});
}
});
tags.forEach(tag => {
if (tag.key.indexOf('--integer') !== -1) {
tag.key = tag.key.replace('--integer', '');
updateIntIndex(bucketName, objName, `x-amz-meta/${tag.key}`, parseInt(tag.value, 10), rowId);
} else {
writeDB(`${bucketName}/x-amz-meta/${tag.key}/${tag.value}`, objName, `${bucketName}/x-amz-meta`, `${tag.key}/${tag.value}`, rowId, (err) =>{
return ;
});
}
});
}
const index = {
evaluateQuery: (queryTerms, params) => {
const bucketName = params.bucketName;
async.map(queryTerms, readIndex.bind(null, bucketName), function(err, queryTerms) {
getObjectMeta(bucketName, (allObjects) => {
let i;
while (queryTerms.length > 1) {
for (i = queryTerms.length - 1; i >= 0; i--) {
if (queryTerms[i] === 'op/AND'
|| queryTerms[i] === 'op/OR'
|| queryTerms[i] === 'op/NOT') {
break;
}
}
if (queryTerms[i] === 'op/NOT') {
const op1 = allObjects;
const op2 = queryTerms[i + 1];
queryTerms.splice(i, 2, difference(op1, op2));
} else {
const op1 = queryTerms[i + 1];
const op2 = queryTerms[i + 2];
if (queryTerms[i] === 'op/AND') {
queryTerms.splice(i, 3, intersection(op1, op2));
} else {
queryTerms.splice(i, 3, union(op1, op2));
}
}
}
if (config.backend === "leveldb") {
indexd.get(`${bucketName}`, (err, objMapping) => {
if (err) {
return ;
}
objMapping = JSON.parse(objMapping);
queryTerms = queryTerms[0].toString(':').split(':');
queryTerms = queryTerms.map(function (elem) {
return objMapping.mapping[elem];
});
indexd.respondQuery(params, queryTerms)
});
} else if (config.backend === "antidote") {
filterRemoved(queryTerms[0], params, (results) => {
indexd.respondQuery(params, results);
});
}
});
});
},
updateIndex: (bucketName, objName, objVal) => {
updateIndexMeta(bucketName, objName, (err, rowId) => {
updateFileSizeIndex(bucketName, objName, objVal['content-length'], rowId);
updateΜodDateIndex(bucketName, objName, objVal['last-modified'], rowId);
updateContentTypeIndex(bucketName, objName, objVal['content-type'], rowId);
updateACLIndex(bucketName, objName, objVal['acl'], rowId);
updateTagIndex(bucketName, objName, objVal, rowId);
});
},
deleteObject: (bucketName, objName) => {
if (config.backend === "leveldb") {
indexd.get(`${bucketName}`, (err, objMapping) => {
if (err) {
return ;
}
objMapping = JSON.parse(objMapping);
const rowId = objMapping.mapping[objName];
delete objMapping.mapping[objName];
delete objMapping.mapping[rowId];
objMapping.nextAvail.push(rowId);
indexd.put(`${bucketName}`, JSON.stringify(objMapping), err => {
if (err) {
return ;
}
return ;
});
});
} else if (config.backend === "antidote") {
indexd.updateAntidoteSet(`${bucketName}/removed`, objName, () => {});
}
},
};
exports.default = index;

View File

@ -271,7 +271,7 @@ class Config {
let metadata = 'file'; let metadata = 'file';
let kms = 'file'; let kms = 'file';
if (process.env.S3BACKEND) { if (process.env.S3BACKEND) {
const validBackends = ['mem', 'file', 'scality']; const validBackends = ['mem', 'file', 'scality', 'antidote'];
assert(validBackends.indexOf(process.env.S3BACKEND) > -1, assert(validBackends.indexOf(process.env.S3BACKEND) > -1,
'bad environment variable: S3BACKEND environment variable ' + 'bad environment variable: S3BACKEND environment variable ' +
'should be one of mem/file/scality' 'should be one of mem/file/scality'
@ -284,7 +284,11 @@ class Config {
if (process.env.S3VAULT) { if (process.env.S3VAULT) {
auth = process.env.S3VAULT; auth = process.env.S3VAULT;
} }
if (auth === 'file' || auth === 'mem') { if (data === 'antidote') {
data = 'mem';
kms = 'mem';
}
if (auth === 'file' || auth === 'mem' || auth === 'antidote') {
// Auth only checks for 'mem' since mem === file // Auth only checks for 'mem' since mem === file
auth = 'mem'; auth = 'mem';
let authfile = `${__dirname}/../conf/authdata.json`; let authfile = `${__dirname}/../conf/authdata.json`;
@ -326,6 +330,32 @@ class Config {
dataPath, dataPath,
metadataPath, metadataPath,
}; };
this.antidote = {};
if (config.antidote) {
if (config.antidote.port !== undefined) {
assert(Number.isInteger(config.antidote.port)
&& config.antidote.port > 0,
'bad config: vaultd port must be a positive integer');
this.antidote.port = config.antidote.port;
}
if (config.antidote.host !== undefined) {
assert.strictEqual(typeof config.vaultd.host, 'string',
'bad config: vaultd host must be a string');
this.antidote.host = config.antidote.host;
}
}
this.userMetaIndexing = false;
if (config.userMetaIndexing !== undefined) {
this.userMetaIndexing = config.userMetaIndexing;
}
this.systemMetaIndexing = false;
if (config.systemMetaIndexing !== undefined) {
this.systemMetaIndexing = config.systemMetaIndexing;
}
this.indexServerPort = 7000;
if (config.indexServerPort !== undefined) {
this.indexServerPort = config.indexServerPort;
}
return config; return config;
} }
} }

View File

@ -1,4 +1,5 @@
import querystring from 'querystring'; import querystring from 'querystring';
import indexClient from '../indexClient/indexClient';
import constants from '../../constants'; import constants from '../../constants';
import services from '../services'; import services from '../services';
@ -70,6 +71,7 @@ export default function bucketGet(authInfo, request, log, callback) {
delimiter: params.delimiter, delimiter: params.delimiter,
marker: params.marker, marker: params.marker,
prefix: params.prefix, prefix: params.prefix,
query: indexClient.processQueryHeader(request.headers.query),
}; };
services.metadataValidateAuthorization(metadataValParams, err => { services.metadataValidateAuthorization(metadataValParams, err => {
@ -83,6 +85,7 @@ export default function bucketGet(authInfo, request, log, callback) {
log.debug('error processing request', { error: err }); log.debug('error processing request', { error: err });
return callback(err); return callback(err);
} }
listParams.prefix = params.prefix;
const xml = []; const xml = [];
xml.push( xml.push(
'<?xml version="1.0" encoding="UTF-8"?>', '<?xml version="1.0" encoding="UTF-8"?>',

View File

@ -0,0 +1,71 @@
import http from 'http'
import io from 'socket.io'
import metadata from '../metadata/wrapper.js'
let iosock;
let callback;
export default {
createPublisher(port, log) {
const server = http.createServer(function(req, res){
});
server.listen(port);
iosock = io.listen(server);
iosock.sockets.on('connection', function(socket) {
log.info("indexing server connected")
socket.on('disconnect', function() {
});
socket.on('subscribe', function(room) {
socket.join(room);
});
socket.on('unsubscribe', function(room) {
socket.leave(room);
});
socket.on('query_response', function(msg) {
msg.params.cb = callback;
metadata.respondQueryGetMD(msg.result, msg.params);
});
});
},
putObjectMD(bucketName, objName, objVal) {
iosock.sockets.to('puts').emit('put', {
bucketName,
objName,
objVal
});
},
listObject(bucketName, query, prefix, marker, delimiter, maxKeys, cb) {
callback = cb;
iosock.sockets.to('queries').emit('query', {
query,
params : {
bucketName,
prefix,
marker,
maxKeys,
delimiter,
}
});
},
deleteObjectMD(bucketName, objName) {
iosock.sockets.to('deletes').emit('delete', {
bucketName,
objName
});
},
processQueryHeader(header) {
if (!header) {
return header;
}
const queryTerms = header.split('&');
const query = [];
for (let i = 0; i < queryTerms.length; i++) {
query.push(queryTerms[i]);
}
return query;
}
}

View File

@ -0,0 +1,47 @@
import ListResult from '../in_memory/ListResult';
export class ListBucketResult extends ListResult {
constructor() {
super();
this.Contents = [];
}
addContentsKey(key, keyMap) {
const objectMD = keyMap;
this.Contents.push({
key,
value: {
LastModified: objectMD['last-modified'],
ETag: objectMD['content-md5'],
StorageClass: objectMD['x-amz-storage-class'],
Owner: {
DisplayName: objectMD['owner-display-name'],
ID: objectMD['owner-id'],
},
Size: objectMD['content-length'],
// Initiated is used for overview of MPU
Initiated: objectMD.initiated,
// Initiator is used for overview of MPU.
// It is an object containing DisplayName
// and ID
Initiator: objectMD.initiator,
// EventualStorageBucket is used for overview of MPU
EventualStorageBucket: objectMD.eventualStorageBucket,
// Used for parts of MPU
partLocations: objectMD.partLocations,
// creationDate is just used for serviceGet
creationDate: objectMD.creationDate,
},
});
this.MaxKeys += 1;
}
hasDeleteMarker(key, keyMap) {
const objectMD = keyMap;
if (objectMD['x-amz-delete-marker'] !== undefined) {
return (objectMD['x-amz-delete-marker'] === true);
}
return false;
}
}

View File

@ -0,0 +1,309 @@
import { errors } from 'arsenal';
import { markerFilter, prefixFilter } from '../in_memory/bucket_utilities';
import { ListBucketResult } from './ListBucketResult';
import getMultipartUploadListing from '../in_memory/getMultipartUploadListing';
import config from '../../Config';
import async from 'async';
import antidoteClient from 'antidote_ts_client';
const defaultMaxKeys = 1000;
var replacer = function(key, value) {
if (value === undefined){
return null;
}
return value;
};
var reviver = function(key, value) {
if (value === null){
return undefined;
}
return value;
};
class AntidoteInterface {
constructor() {
this.antidote = antidoteClient.connect(config.antidote.port, config.antidote.host);
}
createBucket(bucketName, bucketMD, log, cb) {
this.getBucketAttributes(bucketName, log, (err, bucket) => {
// TODO Check whether user already owns the bucket,
// if so return "BucketAlreadyOwnedByYou"
// If not owned by user, return "BucketAlreadyExists"
if (bucket) {
return cb(errors.BucketAlreadyExists);
}
this.antidote.defaultBucket = `storage/${bucketName}`;
let bucket_MD = this.antidote.map(`${bucketName}/md`)
this.antidote.update([
bucket_MD.register('md').set(bucketMD)
]).then( (resp) => {
return cb();
});
});
}
putBucketAttributes(bucketName, bucketMD, log, cb) {
this.getBucketAttributes(bucketName, log, err => {
if (err) {
return cb(err);
}
this.antidote.defaultBucket = `storage/${bucketName}`;
let bucket_MD = this.antidote.map(`${bucketName}/md`)
this.antidote.update([
bucket_MD.register('md').set(bucketMD)
]).then( (resp) => {
return cb();
});
});
}
getBucketAttributes(bucketName, log, cb) {
this.antidote.defaultBucket = `storage/${bucketName}`;
let bucket_MD = this.antidote.map(`${bucketName}/md`)
bucket_MD.read().then(bucketMD => {
bucketMD = bucketMD.toJsObject();
if (Object.keys(bucketMD).length === 0) {
return cb(errors.NoSuchBucket);
}
return cb(null, bucketMD['md']);
});
}
deleteBucket(bucketName, log, cb) {
this.getBucketAttributes(bucketName, log, (err, bucket) => {
if (err) {
return cb(err);
}
this.antidote.defaultBucket = `storage/${bucketName}`;
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
bucket_Objs.read().then(objects => {
if (bucket && objects.length > 0) {
return cb(errors.BucketNotEmpty);
}
let bucket_MD = this.antidote.map(`${bucketName}/md`)
this.antidote.update([
bucket_MD.remove(bucket_MD.register('md')),
]).then( (resp) => {
return cb(null);
});
});
});
}
putObject(bucketName, objName, objVal, log, cb) {
this.getBucketAttributes(bucketName, log, err => {
if (err) {
return cb(err);
}
this.antidote.defaultBucket = `storage/${bucketName}`;
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
let object_MD = this.antidote.map(`${objName}`);
this.antidote.update([
bucket_Objs.add(objName),
object_MD.register('md').set(objVal)
]).then( (resp) => {
return cb();
});
});
}
getBucketAndObject(bucketName, objName, log, cb) {
this.getBucketAttributes(bucketName, log, (err, bucket) => {
if (err) {
return cb(err, { bucket });
}
const bucket_MD = {}
Object.keys(bucket).map(function(key) {
bucket_MD[key.substr(1)] = bucket[key]
});
this.antidote.defaultBucket = `storage/${bucketName}`;
let object_MD = this.antidote.map(`${objName}`);
object_MD.read().then(objectMD => {
objectMD = objectMD.toJsObject();
if (!bucket || Object.keys(objectMD).length === 0) {
return cb(null, { bucket: JSON.stringify(bucket_MD) });
}
return cb(null, {
bucket: JSON.stringify(bucket_MD),
obj: JSON.stringify(objectMD['md']),
});
});
});
}
getObject(bucketName, objName, log, cb) {
this.getBucketAttributes(bucketName, log, (err, bucket) => {
if (err) {
return cb(err);
}
this.antidote.defaultBucket = `storage/${bucketName}`;
let object_MD = this.antidote.map(`${objName}`);
object_MD.read().then(objectMD => {
objectMD = objectMD.toJsObject();
if (!bucket || Object.keys(objectMD).length === 0) {
return cb(errors.NoSuchKey);
}
return cb(null, objectMD['md']);
});
});
}
deleteObject(bucketName, objName, log, cb) {
this.getBucketAttributes(bucketName, log, (err, bucket) => {
if (err) {
return cb(err);
}
this.antidote.defaultBucket = `storage/${bucketName}`;
let object_MD = this.antidote.map(`${objName}`);
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
object_MD.read().then(objectMD => {
objectMD = objectMD.toJsObject();
if (!bucket || Object.keys(objectMD).length === 0) {
return cb(errors.NoSuchKey);
}
this.antidote.update([
object_MD.remove(object_MD.register('md')),
bucket_Objs.remove(objName)
]).then( (resp) => {
return cb();
});
});
});
}
getObjectMD(antidote, bucketName, key, callback) {
antidote.defaultBucket = `storage/${bucketName}`;
let object_MD = antidote.map(`${key}`);
object_MD.read().then(objectMD => {
objectMD = objectMD.toJsObject();
if (Object.keys(objectMD).length === 0) {
return callback(error.NoSuchKey, null);
}
return callback(null, objectMD['md']);
});
}
listObject(bucketName, params, log, cb) {
const { prefix, marker, delimiter, maxKeys } = params;
if (prefix && typeof prefix !== 'string') {
return cb(errors.InvalidArgument);
}
if (marker && typeof marker !== 'string') {
return cb(errors.InvalidArgument);
}
if (delimiter && typeof delimiter !== 'string') {
return cb(errors.InvalidArgument);
}
if (maxKeys && typeof maxKeys !== 'number') {
return cb(errors.InvalidArgument);
}
let numKeys = maxKeys;
// If paramMaxKeys is undefined, the default parameter will set it.
// However, if it is null, the default parameter will not set it.
if (numKeys === null) {
numKeys = defaultMaxKeys;
}
this.antidote.defaultBucket = `storage/${bucketName}`;
let bucket_MD = this.antidote.map(`${bucketName}/md`)
bucket_MD.read().then(bucketMD => {
bucketMD = bucketMD.toJsObject();
if (Object.keys(bucketMD).length === 0) {
return cb(errors.NoSuchBucket);
}
const response = new ListBucketResult();
this.antidote.defaultBucket = `storage/${bucketName}`;
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
bucket_Objs.read().then(keys => {
async.map(keys, this.getObjectMD.bind(null, this.antidote, bucketName), function(err, objectMeta) {
// If marker specified, edit the keys array so it
// only contains keys that occur alphabetically after the marker
if (marker) {
keys = markerFilter(marker, keys);
response.Marker = marker;
}
// If prefix specified, edit the keys array so it only
// contains keys that contain the prefix
if (prefix) {
keys = prefixFilter(prefix, keys);
response.Prefix = prefix;
}
// Iterate through keys array and filter keys containing
// delimiter into response.CommonPrefixes and filter remaining
// keys into response.Contents
for (let i = 0; i < keys.length; ++i) {
const currentKey = keys[i];
// Do not list object with delete markers
if (response.hasDeleteMarker(currentKey,
objectMeta[i])) {
continue;
}
// If hit numKeys, stop adding keys to response
if (response.MaxKeys >= numKeys) {
response.IsTruncated = true;
response.NextMarker = keys[i - 1];
break;
}
// If a delimiter is specified, find its index in the
// current key AFTER THE OCCURRENCE OF THE PREFIX
let delimiterIndexAfterPrefix = -1;
let prefixLength = 0;
if (prefix) {
prefixLength = prefix.length;
}
const currentKeyWithoutPrefix = currentKey
.slice(prefixLength);
let sliceEnd;
if (delimiter) {
delimiterIndexAfterPrefix = currentKeyWithoutPrefix
.indexOf(delimiter);
sliceEnd = delimiterIndexAfterPrefix + prefixLength;
response.Delimiter = delimiter;
}
// If delimiter occurs in current key, add key to
// response.CommonPrefixes.
// Otherwise add key to response.Contents
if (delimiterIndexAfterPrefix > -1) {
const keySubstring = currentKey.slice(0, sliceEnd + 1);
response.addCommonPrefix(keySubstring);
} else {
response.addContentsKey(currentKey,
objectMeta[i]);
}
}
return cb(null, response);
});
});
});
}
listMultipartUploads(bucketName, listingParams, log, cb) {
process.nextTick(() => {
this.getBucketAttributes(bucketName, log, (err, bucket) => {
if (bucket === undefined) {
// no on going multipart uploads, return empty listing
return cb(null, {
IsTruncated: false,
NextMarker: undefined,
MaxKeys: 0,
});
}
return getMultipartUploadListing(bucket, listingParams, cb);
});
});
}
};
export default AntidoteInterface;

View File

@ -2,7 +2,10 @@ import BucketClientInterface from './bucketclient/backend';
import BucketFileInterface from './bucketfile/backend'; import BucketFileInterface from './bucketfile/backend';
import BucketInfo from './BucketInfo'; import BucketInfo from './BucketInfo';
import inMemory from './in_memory/backend'; import inMemory from './in_memory/backend';
import AntidoteInterface from './antidote/backend';
import config from '../Config'; import config from '../Config';
import indexClient from '../indexClient/indexClient'
import async from 'async'
let client; let client;
let implName; let implName;
@ -16,6 +19,33 @@ if (config.backends.metadata === 'mem') {
} else if (config.backends.metadata === 'scality') { } else if (config.backends.metadata === 'scality') {
client = new BucketClientInterface(); client = new BucketClientInterface();
implName = 'bucketclient'; implName = 'bucketclient';
} else if (config.backends.metadata === 'antidote') {
client = new AntidoteInterface();
implName = 'antidote';
}
function getQueryResults(params, objName, callback) {
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
client.getObject(bucketName, objName, log, (err, data) => {
if (err) {
callback(err, null);
}
callback(null, {key: objName, value: {
LastModified: data['last-modified'],
ETag: data['content-md5'],
StorageClass: data['x-amz-storage-class'],
Owner: {
ID: data['owner-id'],
DisplayName: data['owner-display-name']
},
Size: data['content-length'],
Initiated: undefined,
Initiator: undefined,
EventualStorageBucket: undefined,
partLocations: undefined,
creationDate: undefined
}});
});
} }
const metadata = { const metadata = {
@ -75,6 +105,12 @@ const metadata = {
return cb(err); return cb(err);
} }
log.debug('object successfully put in metadata'); log.debug('object successfully put in metadata');
if (config.userMetaIndexing || config.systemMetaIndexing) {
if (objName.indexOf('..|..') !== -1) {
return cb(err);
}
indexClient.putObjectMD(bucketName, objName, objVal);
}
return cb(err); return cb(err);
}, params); }, params);
}, },
@ -113,11 +149,17 @@ const metadata = {
return cb(err); return cb(err);
} }
log.debug('object deleted from metadata'); log.debug('object deleted from metadata');
if (config.userMetaIndexing || config.systemMetaIndexing) {
indexClient.deleteObjectMD(bucketName, objName);
}
return cb(err); return cb(err);
}, params); }, params);
}, },
listObject: (bucketName, listingParams, log, cb) => { listObject: (bucketName, listingParams, log, cb) => {
if (listingParams.query) {
indexClient.listObject(bucketName, listingParams.query, listingParams.prefix, listingParams.marker,listingParams.delimiter, listingParams.maxKeys, cb);
} else {
client client
.listObject(bucketName, listingParams, .listObject(bucketName, listingParams,
log, (err, data) => { log, (err, data) => {
@ -129,6 +171,34 @@ const metadata = {
log.debug('object listing retrieved from metadata'); log.debug('object listing retrieved from metadata');
return cb(err, data); return cb(err, data);
}); });
}
},
respondQueryGetMD: (result, params) => {
async.map(result, getQueryResults.bind(null, params), function(err, res) {
const response = {
IsTruncated: false,
NextMarker: params.marker,
CommonPrefixes: [],
MaxKeys: 10,
Contents: res
}
return params.cb(err, response);
});
},
respondQueryFilter: (result, params) => {
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
client.listObject(bucketName, { prefix, marker, maxKeys, delimiter },
log, (err, data) => {
if (err) {
return cb(err);
}
data.Contents = data.Contents.filter(elem => {
return result.indexOf(elem.key) !== -1;
});
return cb(err, data);
});
}, },
listMultipartUploads: (bucketName, listingParams, log, cb) => { listMultipartUploads: (bucketName, listingParams, log, cb) => {

View File

@ -7,7 +7,7 @@ import { logger } from './utilities/logger';
import { clientCheck } from './utilities/healthcheckHandler'; import { clientCheck } from './utilities/healthcheckHandler';
import _config from './Config'; import _config from './Config';
import routes from './routes'; import routes from './routes';
import indexClient from './indexClient/indexClient'
class S3Server { class S3Server {
/** /**
@ -38,6 +38,7 @@ class S3Server {
* This starts the http server. * This starts the http server.
*/ */
startup() { startup() {
indexClient.createPublisher(_config.indexServerPort, logger)
// Todo: http.globalAgent.maxSockets, http.globalAgent.maxFreeSockets // Todo: http.globalAgent.maxSockets, http.globalAgent.maxFreeSockets
if (_config.https) { if (_config.https) {
this.server = https.createServer({ this.server = https.createServer({
@ -118,7 +119,7 @@ class S3Server {
export default function main() { export default function main() {
let clusters = _config.clusters || 1; let clusters = _config.clusters || 1;
if (process.env.S3BACKEND === 'mem') { if (process.env.S3BACKEND === 'mem' || process.env.S3BACKEND === 'antidote') {
clusters = 1; clusters = 1;
} }
if (cluster.isMaster) { if (cluster.isMaster) {

View File

@ -19,6 +19,7 @@
}, },
"homepage": "https://github.com/scality/S3#readme", "homepage": "https://github.com/scality/S3#readme",
"dependencies": { "dependencies": {
"antidote_ts_client": "^0.1.0",
"arsenal": "scality/Arsenal", "arsenal": "scality/Arsenal",
"async": "~1.4.2", "async": "~1.4.2",
"babel-core": "^6.5.2", "babel-core": "^6.5.2",
@ -31,8 +32,10 @@
"level": "^1.4.0", "level": "^1.4.0",
"level-sublevel": "^6.5.4", "level-sublevel": "^6.5.4",
"multilevel": "^7.3.0", "multilevel": "^7.3.0",
"node-bitmap-ewah": "dimitriosvasilas/node-bitmap-ewah",
"node-uuid": "^1.4.3", "node-uuid": "^1.4.3",
"ready-set-stream": "1.0.7", "ready-set-stream": "1.0.7",
"socket.io": "^1.7.2",
"sproxydclient": "scality/sproxydclient", "sproxydclient": "scality/sproxydclient",
"utapi": "scality/utapi", "utapi": "scality/utapi",
"utf8": "~2.1.1", "utf8": "~2.1.1",
@ -71,6 +74,7 @@
"lint": "eslint $(git ls-files '*.js')", "lint": "eslint $(git ls-files '*.js')",
"lint_md": "mdlint $(git ls-files '*.md')", "lint_md": "mdlint $(git ls-files '*.md')",
"mem_backend": "S3BACKEND=mem node index.js", "mem_backend": "S3BACKEND=mem node index.js",
"antidote_backend": "S3BACKEND=antidote node index.js",
"perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js", "perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js",
"start": "node init.js && node index.js", "start": "node init.js && node index.js",
"start_utapi": "node utapiServer.js", "start_utapi": "node utapiServer.js",