Compare commits
34 Commits
developmen
...
proto/bitm
Author | SHA1 | Date |
---|---|---|
Dimitrios Vasilas | e27cb1cf6c | |
Dimitrios Vasilas | 9c8c8fd9ea | |
Dimitrios Vasilas | a236fd4457 | |
Dimitrios Vasilas | e76472fa20 | |
Dimitrios Vasilas | e6f82079c6 | |
Dimitrios Vasilas | cfb77cc9fe | |
Dimitrios Vasilas | 17a9cd1071 | |
Dimitrios Vasilas | 8ce70e7337 | |
Dimitrios Vasilas | cdbcae37b2 | |
Dimitrios Vasilas | 0f5cd3e545 | |
Dimitrios Vasilas | f6cddaf6a5 | |
Dimitrios Vasilas | c225e483bc | |
Dimitrios Vasilas | 53cb8a3263 | |
Dimitrios Vasilas | d302070959 | |
Dimitrios Vasilas | 7597c379e7 | |
Dimitrios Vasilas | 541e97f1ea | |
Dimitrios Vasilas | c32fe7df51 | |
Dimitrios Vasilas | 9e301aaf25 | |
Dimitrios Vasilas | eb441dd09c | |
Dimitrios Vasilas | 6fddd95a1d | |
Dimitrios Vasilas | 1303bd8b97 | |
Dimitrios Vasilas | 093c071836 | |
Dimitrios Vasilas | da115e6ce4 | |
Dimitrios Vasilas | b588d5e4d0 | |
Dimitrios Vasilas | 5f1bd15fb0 | |
Dimitrios Vasilas | 22a783f23e | |
Dimitrios Vasilas | 320fc256bf | |
Dimitrios Vasilas | 361585825f | |
Dimitrios Vasilas | e390e355ef | |
Dimitrios Vasilas | f03940e45e | |
Dimitrios Vasilas | e67abad697 | |
Dimitrios Vasilas | 9a42556448 | |
Dimitrios Vasilas | 3056714c02 | |
Dimitrios Vasilas | d871755296 |
|
@ -48,6 +48,13 @@
|
||||||
"logLevel": "info",
|
"logLevel": "info",
|
||||||
"dumpLevel": "error"
|
"dumpLevel": "error"
|
||||||
},
|
},
|
||||||
|
"antidote": {
|
||||||
|
"host": "localhost",
|
||||||
|
"port": 8087
|
||||||
|
},
|
||||||
|
"userMetaIndexing":true,
|
||||||
|
"systemMetaIndexing":true,
|
||||||
|
"indexServerPort":7000,
|
||||||
"healthChecks": {
|
"healthChecks": {
|
||||||
"allowFrom": ["127.0.0.1/8", "::1"]
|
"allowFrom": ["127.0.0.1/8", "::1"]
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
{
|
||||||
|
"S3": {
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"port": 7000
|
||||||
|
},
|
||||||
|
"antidote": {
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"port": 8087
|
||||||
|
},
|
||||||
|
"leveldb_path": "indexdb0",
|
||||||
|
"backend": "antidote"
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
require('./lib/bitmapd-utils.js').default.connectToS3();
|
||||||
|
require('./lib/bitmapd-utils.js').default.connectToDB();
|
|
@ -0,0 +1,148 @@
|
||||||
|
const fs = require('fs');
|
||||||
|
const io = require('socket.io-client');
|
||||||
|
const levelup = require('levelup');
|
||||||
|
const leveldown = require('leveldown');
|
||||||
|
const antidoteClient = require('antidote_ts_client');
|
||||||
|
|
||||||
|
let config = fs.readFileSync('./config.json', { encoding: 'utf-8' });
|
||||||
|
config = JSON.parse(config);
|
||||||
|
|
||||||
|
let antidote;
|
||||||
|
let client = null;
|
||||||
|
let db = null;
|
||||||
|
|
||||||
|
const utils = {
|
||||||
|
connectToS3() {
|
||||||
|
client = io.connect(`http://${config.S3.host}:${config.S3.port}`, {
|
||||||
|
reconnection: true,
|
||||||
|
});
|
||||||
|
client.on('connect', function() {
|
||||||
|
client.emit('subscribe', 'puts');
|
||||||
|
client.emit('subscribe', 'deletes');
|
||||||
|
client.emit('subscribe', 'queries');
|
||||||
|
});
|
||||||
|
client.on('reconnecting', function(number) {
|
||||||
|
});
|
||||||
|
client.on('error', function(err) {
|
||||||
|
});
|
||||||
|
client.on('put', function(msg) {
|
||||||
|
require('./utils.js').default.updateIndex(msg.bucketName, msg.objName, msg.objVal);
|
||||||
|
});
|
||||||
|
client.on('query', function(msg) {
|
||||||
|
if (!msg.params.prefix)
|
||||||
|
msg.params.prefix = undefined;
|
||||||
|
if (!msg.params.marker)
|
||||||
|
msg.params.marker = undefined;
|
||||||
|
if (!msg.params.delimiter)
|
||||||
|
msg.params.delimiter = undefined;
|
||||||
|
require('./utils.js').default.evaluateQuery(msg.query, msg.params);
|
||||||
|
});
|
||||||
|
client.on('delete', function(msg) {
|
||||||
|
require('./utils.js').default.deleteObject(msg.bucketName, msg.objName);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
connectToDB() {
|
||||||
|
antidote = antidoteClient.connect(config.antidote.port, config.antidote.host);
|
||||||
|
antidote.defaultBucket = 'index';
|
||||||
|
leveldown.destroy(config.leveldb_path, function (err) {
|
||||||
|
if (!err) {
|
||||||
|
db = levelup(config.leveldb_path);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
updateAntidoteSet(key, elem, cb) {
|
||||||
|
const keyset = antidote.set('keys');
|
||||||
|
antidote.update(
|
||||||
|
keyset.add(key)
|
||||||
|
).then( (resp) => {
|
||||||
|
const set = antidote.set(key);
|
||||||
|
antidote.update(
|
||||||
|
set.add(elem)
|
||||||
|
).then( (resp) => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
readAntidoteSet(key, cb) {
|
||||||
|
const set = antidote.set(key);
|
||||||
|
set.read().then(objs => {
|
||||||
|
return cb(null, objs);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
removeFromAntidoteSet(key, objName, cb) {
|
||||||
|
const set = antidote.set(key);
|
||||||
|
antidote.update(
|
||||||
|
set.remove(objName)
|
||||||
|
).then( (resp) => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
respondQuery(params, queryTerms) {
|
||||||
|
client.emit('query_response', {
|
||||||
|
result: queryTerms,
|
||||||
|
params
|
||||||
|
})
|
||||||
|
},
|
||||||
|
|
||||||
|
put(key, value, cb) {
|
||||||
|
db.put(key, value, function(err) {
|
||||||
|
return cb(err);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
get(key, cb) {
|
||||||
|
db.get(key, function(err, data) {
|
||||||
|
return cb(err, data);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
getPrefix(prefix, cb) {
|
||||||
|
const list = []
|
||||||
|
db.createReadStream({
|
||||||
|
start: prefix,
|
||||||
|
end: prefix + "\xFF"
|
||||||
|
})
|
||||||
|
.on('data', function(data) {
|
||||||
|
list.push(data);
|
||||||
|
})
|
||||||
|
.on('error', function(err) {
|
||||||
|
return cb(err, null)
|
||||||
|
})
|
||||||
|
.on('close', function() {
|
||||||
|
if (list.length === 0)
|
||||||
|
return cb(null, null)
|
||||||
|
return cb(null, list)
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
getRange(start, end, cb) {
|
||||||
|
const list = []
|
||||||
|
db.createReadStream({
|
||||||
|
start: start,
|
||||||
|
end: end
|
||||||
|
})
|
||||||
|
.on('data', function(data) {
|
||||||
|
list.push(data);
|
||||||
|
})
|
||||||
|
.on('error', function(err) {
|
||||||
|
return cb(err, null)
|
||||||
|
})
|
||||||
|
.on('close', function() {
|
||||||
|
return cb(null, list)
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
batchWrite(ops, cb) {
|
||||||
|
db.batch(ops, function(err) {
|
||||||
|
return cb(err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
exports.default = utils;
|
||||||
|
exports.config = config;
|
|
@ -0,0 +1,528 @@
|
||||||
|
const bitmap = require('node-bitmap-ewah');
|
||||||
|
const indexd = require('./bitmapd-utils').default;
|
||||||
|
const config = require('./bitmapd-utils').config;
|
||||||
|
const async = require('async');
|
||||||
|
const _ = require('underscore')
|
||||||
|
|
||||||
|
function intersection(a, b) {
|
||||||
|
if (config.backend === "leveldb") {
|
||||||
|
return a.and(b);
|
||||||
|
} else if (config.backend === "antidote") {
|
||||||
|
return _.intersection(a, b);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function union(a, b) {
|
||||||
|
if (config.backend === "leveldb") {
|
||||||
|
return a.and(b);
|
||||||
|
} else if (config.backend === "antidote") {
|
||||||
|
return _.union(a, b);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function difference(u, a) {
|
||||||
|
if (config.backend === "leveldb") {
|
||||||
|
return a.not()
|
||||||
|
} else if (config.backend === "antidote") {
|
||||||
|
return _.difference(u, a);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function padLeft(nr, n){
|
||||||
|
return Array(n-String(nr).length+1).join('0')+nr;
|
||||||
|
}
|
||||||
|
|
||||||
|
function binaryIndexOf(arr, searchElement, op) {
|
||||||
|
let minIndex = 0;
|
||||||
|
let maxIndex = arr.length - 1;
|
||||||
|
let currentIndex;
|
||||||
|
let currentElement;
|
||||||
|
|
||||||
|
while (minIndex <= maxIndex) {
|
||||||
|
currentIndex = (minIndex + maxIndex) / 2 | 0;
|
||||||
|
currentElement = arr[currentIndex];
|
||||||
|
|
||||||
|
if (currentElement < searchElement) {
|
||||||
|
minIndex = currentIndex + 1;
|
||||||
|
}
|
||||||
|
else if (currentElement > searchElement) {
|
||||||
|
maxIndex = currentIndex - 1;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (op === '=') {
|
||||||
|
return currentIndex;
|
||||||
|
} else if (op === '>' && arr[currentIndex] <= searchElement) {
|
||||||
|
currentIndex += 1
|
||||||
|
} else if (op === '>=' && arr[currentIndex] < searchElement) {
|
||||||
|
currentIndex += 1
|
||||||
|
} else if (op === '<' && arr[currentIndex] >= searchElement) {
|
||||||
|
currentIndex -= 1
|
||||||
|
} else if (op === '<=') {
|
||||||
|
if (arr[currentIndex] > searchElement) {
|
||||||
|
currentIndex -= 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (currentIndex > arr.length-1 || currentIndex < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return currentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
function storeToBitmap(stored) {
|
||||||
|
const bm = bitmap.createObject();
|
||||||
|
if (stored) {
|
||||||
|
stored[2] = new Buffer(stored[2], 'binary');
|
||||||
|
bm.read(stored);
|
||||||
|
}
|
||||||
|
return bm;
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseNotOperator(result, not, callback) {
|
||||||
|
if (not) {
|
||||||
|
callback(null, result.not());
|
||||||
|
} else {
|
||||||
|
callback(null, result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateObjectMapping(objMapping, objName) {
|
||||||
|
let rowId;
|
||||||
|
if (Object.keys(objMapping)) {
|
||||||
|
if (typeof objMapping.mapping[objName] === 'number') {
|
||||||
|
rowId = objMapping.mapping[objName];
|
||||||
|
} else if (objMapping.nextAvail.length > 0) {
|
||||||
|
rowId = objMapping.nextAvail[0];
|
||||||
|
objMapping.nextAvail.splice(0, 1);
|
||||||
|
} else {
|
||||||
|
rowId = objMapping.length;
|
||||||
|
objMapping.length += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
objMapping.mapping[rowId] = objName;
|
||||||
|
objMapping.mapping[objName] = rowId;
|
||||||
|
|
||||||
|
return rowId;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getObjectMeta(bucketName, cb) {
|
||||||
|
if (config.backend === "leveldb") {
|
||||||
|
return cb(null);
|
||||||
|
} else if (config.backend === "antidote") {
|
||||||
|
indexd.readAntidoteSet(`${bucketName}`, (err, allObjects) => {
|
||||||
|
return cb(allObjects);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateIndexMeta(bucketName, objName, cb) {
|
||||||
|
if (config.backend === "leveldb") {
|
||||||
|
let rowId;
|
||||||
|
indexd.get(`${bucketName}`, (err, objMapping) => {
|
||||||
|
if (err) {
|
||||||
|
objMapping = {nextAvail: [], length:1, mapping:{}}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
objMapping = JSON.parse(objMapping);
|
||||||
|
}
|
||||||
|
rowId = updateObjectMapping(objMapping, objName);
|
||||||
|
indexd.put(`${bucketName}`, JSON.stringify(objMapping), err => {
|
||||||
|
if (err) {
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
return cb(err, rowId)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
} else if (config.backend === "antidote") {
|
||||||
|
indexd.updateAntidoteSet(`${bucketName}`, objName, () => {
|
||||||
|
return cb(null, -1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function filterRemoved(results, params, cb) {
|
||||||
|
indexd.readAntidoteSet(`${params.bucketName}/removed`, (err, removed) => {
|
||||||
|
results = results.filter(elem => {
|
||||||
|
return removed.indexOf(elem) === -1;
|
||||||
|
});
|
||||||
|
return cb(results);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function constructRange(value, callback) {
|
||||||
|
indexd.readAntidoteSet(value, (err, result) => {
|
||||||
|
callback(null, result);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function readDB(key, cb) {
|
||||||
|
if (config.backend === "leveldb") {
|
||||||
|
indexd.get(key, (err, data) => {
|
||||||
|
if (!err) {
|
||||||
|
data = JSON.parse(data)
|
||||||
|
}
|
||||||
|
return (err, data)
|
||||||
|
});
|
||||||
|
} else if (config.backend === "antidote") {
|
||||||
|
indexd.readAntidoteSet(key, (err, data) => {
|
||||||
|
return cb(err, data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function writeDB(key, objName, attribute, value, rowId, cb) {
|
||||||
|
if (config.backend === "leveldb") {
|
||||||
|
updateIndexEntry(key, rowId, cb);
|
||||||
|
} else if (config.backend === "antidote") {
|
||||||
|
indexd.updateAntidoteSet(key, objName, () => {
|
||||||
|
indexd.updateAntidoteSet(attribute, value, () => {
|
||||||
|
return cb(null)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function searchIntRange(bucketName, op, term, not, callback) {
|
||||||
|
term = term.replace('--integer', '');
|
||||||
|
const attr = term.split('/')[0];
|
||||||
|
let value = parseInt(term.split('/')[1], 10);
|
||||||
|
if (op === '=') {
|
||||||
|
readDB(`${bucketName}/${attr}/${value}`, (err, data) =>{
|
||||||
|
callback(err, data);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
if (config.backend === "antidote") {
|
||||||
|
indexd.readAntidoteSet(`${bucketName}/${attr}`, (err, result) => {
|
||||||
|
const range = []
|
||||||
|
const index = binaryIndexOf(result, value, op)
|
||||||
|
if (index === -1) {
|
||||||
|
return parseNotOperator([], not, callback);
|
||||||
|
}
|
||||||
|
if (op.indexOf('>') !== -1) {
|
||||||
|
if (op.indexOf('=') === -1) {
|
||||||
|
value += 1
|
||||||
|
}
|
||||||
|
for (let i = index; i < result.length; i+=1) {
|
||||||
|
range.push(`${bucketName}/${attr}/${result[i]}`)
|
||||||
|
}
|
||||||
|
} else if (op.indexOf('<') !== -1) {
|
||||||
|
if (op.indexOf('=') === -1) {
|
||||||
|
value -= 1
|
||||||
|
}
|
||||||
|
for (let i = index; i >= 0; i-=1) {
|
||||||
|
range.push(`${bucketName}/${attr}/${result[i]}`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const objRange = [];
|
||||||
|
async.map(range, constructRange, function(err, res) {
|
||||||
|
res.forEach(arr => {
|
||||||
|
arr.forEach(elem => {
|
||||||
|
objRange.push(elem)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
objRange.sort();
|
||||||
|
parseNotOperator(objRange, not, callback);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function searchRegExp(bucketName, searchTerm, not, callback) {
|
||||||
|
const regexp = new RegExp(searchTerm);
|
||||||
|
let result = bitmap.createObject();
|
||||||
|
indexd.getPrefix(`${bucketName}/x-amz-meta`, (err, list) => {
|
||||||
|
list.forEach(elem => {
|
||||||
|
if (elem.key.indexOf('/') !== -1) {
|
||||||
|
if (regexp.test(elem.key.substring(11))) {
|
||||||
|
result = result.or(storeToBitmap(JSON.parse(elem.value)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
parseNotOperator(result, not, callback);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
function readFromLevelDB(key, not, callback) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
function readTagIndex(bucketName, searchTerm, not, callback) {
|
||||||
|
let term = null;
|
||||||
|
let operator = null;
|
||||||
|
if (searchTerm.indexOf('--integer') !== -1) {
|
||||||
|
operator = searchTerm.split('/')[1];
|
||||||
|
term = searchTerm.replace(operator, '');
|
||||||
|
term = term.replace('/', '');
|
||||||
|
searchIntRange(bucketName, operator, term, not, callback);
|
||||||
|
} else if (searchTerm.indexOf('--regexp') !== -1) {
|
||||||
|
searchTerm = searchTerm.replace('--regexp', '');
|
||||||
|
searchTerm = searchTerm.substring(11);
|
||||||
|
searchRegExp(bucketName, searchTerm, not, callback);
|
||||||
|
} else {
|
||||||
|
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||||
|
callback(err, data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function readFileSizeIndex(bucketName, searchTerm, not, callback) {
|
||||||
|
const operator = searchTerm.split('/')[1];
|
||||||
|
let term = searchTerm.replace(operator, '');
|
||||||
|
term = term.replace('/', '');
|
||||||
|
searchIntRange(bucketName, operator, term, not, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
function readModDateIndex(bucketName, searchTerm, not, callback) {
|
||||||
|
const operator = searchTerm.split('/')[1];
|
||||||
|
let term = searchTerm.replace(operator, '');
|
||||||
|
term = term.replace('/', '');
|
||||||
|
return searchIntRange(bucketName, operator, term, not, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
function readACLIndex(bucketName, searchTerm, not, callback) {
|
||||||
|
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||||
|
callback(err, data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function readContentTypeIndex(bucketName, searchTerm, not, callback) {
|
||||||
|
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||||
|
callback(err, data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function readIndex(bucketName, searchTerm, callback) {
|
||||||
|
if (searchTerm.indexOf('op/AND') !== -1
|
||||||
|
|| searchTerm.indexOf('op/OR') !== -1
|
||||||
|
|| searchTerm.indexOf('op/NOT') !== -1) {
|
||||||
|
callback(null, searchTerm);
|
||||||
|
}
|
||||||
|
let notOperator = false;
|
||||||
|
let result;
|
||||||
|
if (searchTerm.indexOf('x-amz-meta') !== -1) {
|
||||||
|
return readTagIndex(bucketName, searchTerm, notOperator, callback);
|
||||||
|
} else if (searchTerm.indexOf('filesize') !== -1) {
|
||||||
|
return readFileSizeIndex(bucketName, searchTerm, notOperator, callback);
|
||||||
|
} else if (searchTerm.indexOf('modificationdate') !== -1) {
|
||||||
|
return readModDateIndex(bucketName, searchTerm, notOperator, callback);
|
||||||
|
} else if (searchTerm.indexOf('contenttype') !== -1 || searchTerm.indexOf('contentsubtype') !== -1) {
|
||||||
|
return readContentTypeIndex(bucketName, searchTerm, notOperator, callback);
|
||||||
|
} else if (searchTerm.indexOf('acl') !== -1) {
|
||||||
|
return readACLIndex(bucketName, searchTerm, notOperator, callback);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
function bitmapToStore(bitmap) {
|
||||||
|
const toStore = bitmap.write();
|
||||||
|
toStore[2] = toStore[2].toString('binary');
|
||||||
|
return toStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
function deleteOldEntries(bucketName, rowId, cb) {
|
||||||
|
if (config.backend === "leveldb") {
|
||||||
|
indexd.getPrefix(`${bucketName}/acl/`, (err, list) => {
|
||||||
|
if (!list) {
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
const ops = []
|
||||||
|
list.forEach(elem => {
|
||||||
|
const tmp = storeToBitmap(JSON.parse(elem.value));
|
||||||
|
tmp.unset(rowId);
|
||||||
|
ops.push({ 'type':'put', 'key': elem.key, 'value':JSON.stringify(bitmapToStore(tmp))})
|
||||||
|
});
|
||||||
|
indexd.batchWrite(ops, (err) =>{
|
||||||
|
if (err) {
|
||||||
|
return null
|
||||||
|
} else {
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
} else if (config.backend === "antidote") {
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateBitmap(bitmap, rowId) {
|
||||||
|
if (bitmap.length() - 1 <= rowId) {
|
||||||
|
bitmap.push(rowId);
|
||||||
|
} else {
|
||||||
|
bitmap.set(rowId);
|
||||||
|
}
|
||||||
|
return bitmapToStore(bitmap);
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateIndexEntry(key, rowId, cb) {
|
||||||
|
indexd.get(key, (err, data) => {
|
||||||
|
if (err) {
|
||||||
|
return (err);
|
||||||
|
} else {
|
||||||
|
data = JSON.parse(data)
|
||||||
|
}
|
||||||
|
indexd.put(key, JSON.stringify(updateBitmap(storeToBitmap(data), rowId)), err =>{
|
||||||
|
return (err);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateIntIndex(bucketName, objName, attribute, value, rowId) {
|
||||||
|
writeDB(`${bucketName}/${attribute}/${value}`, objName, `${bucketName}/${attribute}`, value, rowId, (err) =>{
|
||||||
|
return ;
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateACLIndex(bucketName, objName, objVal, rowId) {
|
||||||
|
deleteOldEntries(bucketName, rowId, () => {
|
||||||
|
Object.keys(objVal).forEach(elem => {
|
||||||
|
if (typeof objVal[elem] === 'string') {
|
||||||
|
writeDB(`${bucketName}/acl/${elem}/${objVal[elem]}`, objName, `${bucketName}/acl`, `${elem}/${objVal[elem]}`, rowId, (err) =>{
|
||||||
|
return ;
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
objVal[elem].forEach(item => {
|
||||||
|
writeDB(`${bucketName}/acl/${elem}/${item}`, objName, `${bucketName}/acl`, `${elem}/${item}`, rowId, (err) =>{
|
||||||
|
return ;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateContentTypeIndex(bucketName, objName, objVal, rowId) {
|
||||||
|
const type = objVal.split('/')[0];
|
||||||
|
const subtype = objVal.split('/')[1];
|
||||||
|
writeDB(`${bucketName}/contenttype/${type}`, objName, `${bucketName}/contenttype`, type, rowId, (err) =>{
|
||||||
|
writeDB(`${bucketName}/contentsubtype/${subtype}`, objName, `${bucketName}/contentsubtype`, subtype, rowId, (err) =>{
|
||||||
|
return ;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateΜodDateIndex(bucketName, objName, objVal, rowId) {
|
||||||
|
const date = new Date(objVal);
|
||||||
|
const term = 'modificationdate-';
|
||||||
|
const year = date.getUTCFullYear();
|
||||||
|
const month = date.getUTCMonth() + 1;
|
||||||
|
const day = date.getUTCDate();
|
||||||
|
const hours = date.getUTCHours();
|
||||||
|
const minutes = date.getUTCMinutes();
|
||||||
|
updateIntIndex(bucketName, objName, `${term}year`, year, rowId);
|
||||||
|
updateIntIndex(bucketName, objName, `${term}month`, month, rowId);
|
||||||
|
updateIntIndex(bucketName, objName, `${term}day`, day, rowId);
|
||||||
|
updateIntIndex(bucketName, objName, `${term}hours`, hours, rowId);
|
||||||
|
updateIntIndex(bucketName, objName, `${term}minutes`, minutes, rowId);
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateFileSizeIndex(bucketName, objName, objVal, rowId) {
|
||||||
|
updateIntIndex(bucketName, objName, `filesize`, parseInt(objVal, 10), rowId);
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateTagIndex(bucketName, objName, objVal, rowId) {
|
||||||
|
const tags = [];
|
||||||
|
Object.keys(objVal).forEach(elem => {
|
||||||
|
if (elem.indexOf('x-amz-meta') !== -1 &&
|
||||||
|
elem !== 'x-amz-meta-s3cmd-attrs') {
|
||||||
|
tags.push({key: elem.replace('x-amz-meta-', ''), value: objVal[elem]});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
tags.forEach(tag => {
|
||||||
|
if (tag.key.indexOf('--integer') !== -1) {
|
||||||
|
tag.key = tag.key.replace('--integer', '');
|
||||||
|
updateIntIndex(bucketName, objName, `x-amz-meta/${tag.key}`, parseInt(tag.value, 10), rowId);
|
||||||
|
} else {
|
||||||
|
writeDB(`${bucketName}/x-amz-meta/${tag.key}/${tag.value}`, objName, `${bucketName}/x-amz-meta`, `${tag.key}/${tag.value}`, rowId, (err) =>{
|
||||||
|
return ;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const index = {
|
||||||
|
evaluateQuery: (queryTerms, params) => {
|
||||||
|
const bucketName = params.bucketName;
|
||||||
|
async.map(queryTerms, readIndex.bind(null, bucketName), function(err, queryTerms) {
|
||||||
|
getObjectMeta(bucketName, (allObjects) => {
|
||||||
|
let i;
|
||||||
|
while (queryTerms.length > 1) {
|
||||||
|
for (i = queryTerms.length - 1; i >= 0; i--) {
|
||||||
|
if (queryTerms[i] === 'op/AND'
|
||||||
|
|| queryTerms[i] === 'op/OR'
|
||||||
|
|| queryTerms[i] === 'op/NOT') {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (queryTerms[i] === 'op/NOT') {
|
||||||
|
const op1 = allObjects;
|
||||||
|
const op2 = queryTerms[i + 1];
|
||||||
|
queryTerms.splice(i, 2, difference(op1, op2));
|
||||||
|
} else {
|
||||||
|
const op1 = queryTerms[i + 1];
|
||||||
|
const op2 = queryTerms[i + 2];
|
||||||
|
if (queryTerms[i] === 'op/AND') {
|
||||||
|
queryTerms.splice(i, 3, intersection(op1, op2));
|
||||||
|
} else {
|
||||||
|
queryTerms.splice(i, 3, union(op1, op2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (config.backend === "leveldb") {
|
||||||
|
indexd.get(`${bucketName}`, (err, objMapping) => {
|
||||||
|
if (err) {
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
objMapping = JSON.parse(objMapping);
|
||||||
|
queryTerms = queryTerms[0].toString(':').split(':');
|
||||||
|
queryTerms = queryTerms.map(function (elem) {
|
||||||
|
return objMapping.mapping[elem];
|
||||||
|
});
|
||||||
|
indexd.respondQuery(params, queryTerms)
|
||||||
|
});
|
||||||
|
} else if (config.backend === "antidote") {
|
||||||
|
filterRemoved(queryTerms[0], params, (results) => {
|
||||||
|
indexd.respondQuery(params, results);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
updateIndex: (bucketName, objName, objVal) => {
|
||||||
|
updateIndexMeta(bucketName, objName, (err, rowId) => {
|
||||||
|
updateFileSizeIndex(bucketName, objName, objVal['content-length'], rowId);
|
||||||
|
updateΜodDateIndex(bucketName, objName, objVal['last-modified'], rowId);
|
||||||
|
updateContentTypeIndex(bucketName, objName, objVal['content-type'], rowId);
|
||||||
|
updateACLIndex(bucketName, objName, objVal['acl'], rowId);
|
||||||
|
updateTagIndex(bucketName, objName, objVal, rowId);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
deleteObject: (bucketName, objName) => {
|
||||||
|
if (config.backend === "leveldb") {
|
||||||
|
indexd.get(`${bucketName}`, (err, objMapping) => {
|
||||||
|
if (err) {
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
objMapping = JSON.parse(objMapping);
|
||||||
|
const rowId = objMapping.mapping[objName];
|
||||||
|
delete objMapping.mapping[objName];
|
||||||
|
delete objMapping.mapping[rowId];
|
||||||
|
objMapping.nextAvail.push(rowId);
|
||||||
|
indexd.put(`${bucketName}`, JSON.stringify(objMapping), err => {
|
||||||
|
if (err) {
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
return ;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
} else if (config.backend === "antidote") {
|
||||||
|
indexd.updateAntidoteSet(`${bucketName}/removed`, objName, () => {});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
exports.default = index;
|
|
@ -271,7 +271,7 @@ class Config {
|
||||||
let metadata = 'file';
|
let metadata = 'file';
|
||||||
let kms = 'file';
|
let kms = 'file';
|
||||||
if (process.env.S3BACKEND) {
|
if (process.env.S3BACKEND) {
|
||||||
const validBackends = ['mem', 'file', 'scality'];
|
const validBackends = ['mem', 'file', 'scality', 'antidote'];
|
||||||
assert(validBackends.indexOf(process.env.S3BACKEND) > -1,
|
assert(validBackends.indexOf(process.env.S3BACKEND) > -1,
|
||||||
'bad environment variable: S3BACKEND environment variable ' +
|
'bad environment variable: S3BACKEND environment variable ' +
|
||||||
'should be one of mem/file/scality'
|
'should be one of mem/file/scality'
|
||||||
|
@ -284,7 +284,11 @@ class Config {
|
||||||
if (process.env.S3VAULT) {
|
if (process.env.S3VAULT) {
|
||||||
auth = process.env.S3VAULT;
|
auth = process.env.S3VAULT;
|
||||||
}
|
}
|
||||||
if (auth === 'file' || auth === 'mem') {
|
if (data === 'antidote') {
|
||||||
|
data = 'mem';
|
||||||
|
kms = 'mem';
|
||||||
|
}
|
||||||
|
if (auth === 'file' || auth === 'mem' || auth === 'antidote') {
|
||||||
// Auth only checks for 'mem' since mem === file
|
// Auth only checks for 'mem' since mem === file
|
||||||
auth = 'mem';
|
auth = 'mem';
|
||||||
let authfile = `${__dirname}/../conf/authdata.json`;
|
let authfile = `${__dirname}/../conf/authdata.json`;
|
||||||
|
@ -326,6 +330,32 @@ class Config {
|
||||||
dataPath,
|
dataPath,
|
||||||
metadataPath,
|
metadataPath,
|
||||||
};
|
};
|
||||||
|
this.antidote = {};
|
||||||
|
if (config.antidote) {
|
||||||
|
if (config.antidote.port !== undefined) {
|
||||||
|
assert(Number.isInteger(config.antidote.port)
|
||||||
|
&& config.antidote.port > 0,
|
||||||
|
'bad config: vaultd port must be a positive integer');
|
||||||
|
this.antidote.port = config.antidote.port;
|
||||||
|
}
|
||||||
|
if (config.antidote.host !== undefined) {
|
||||||
|
assert.strictEqual(typeof config.vaultd.host, 'string',
|
||||||
|
'bad config: vaultd host must be a string');
|
||||||
|
this.antidote.host = config.antidote.host;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.userMetaIndexing = false;
|
||||||
|
if (config.userMetaIndexing !== undefined) {
|
||||||
|
this.userMetaIndexing = config.userMetaIndexing;
|
||||||
|
}
|
||||||
|
this.systemMetaIndexing = false;
|
||||||
|
if (config.systemMetaIndexing !== undefined) {
|
||||||
|
this.systemMetaIndexing = config.systemMetaIndexing;
|
||||||
|
}
|
||||||
|
this.indexServerPort = 7000;
|
||||||
|
if (config.indexServerPort !== undefined) {
|
||||||
|
this.indexServerPort = config.indexServerPort;
|
||||||
|
}
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import querystring from 'querystring';
|
import querystring from 'querystring';
|
||||||
|
import indexClient from '../indexClient/indexClient';
|
||||||
import constants from '../../constants';
|
import constants from '../../constants';
|
||||||
|
|
||||||
import services from '../services';
|
import services from '../services';
|
||||||
|
@ -70,6 +71,7 @@ export default function bucketGet(authInfo, request, log, callback) {
|
||||||
delimiter: params.delimiter,
|
delimiter: params.delimiter,
|
||||||
marker: params.marker,
|
marker: params.marker,
|
||||||
prefix: params.prefix,
|
prefix: params.prefix,
|
||||||
|
query: indexClient.processQueryHeader(request.headers.query),
|
||||||
};
|
};
|
||||||
|
|
||||||
services.metadataValidateAuthorization(metadataValParams, err => {
|
services.metadataValidateAuthorization(metadataValParams, err => {
|
||||||
|
@ -83,6 +85,7 @@ export default function bucketGet(authInfo, request, log, callback) {
|
||||||
log.debug('error processing request', { error: err });
|
log.debug('error processing request', { error: err });
|
||||||
return callback(err);
|
return callback(err);
|
||||||
}
|
}
|
||||||
|
listParams.prefix = params.prefix;
|
||||||
const xml = [];
|
const xml = [];
|
||||||
xml.push(
|
xml.push(
|
||||||
'<?xml version="1.0" encoding="UTF-8"?>',
|
'<?xml version="1.0" encoding="UTF-8"?>',
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
import http from 'http'
|
||||||
|
import io from 'socket.io'
|
||||||
|
import metadata from '../metadata/wrapper.js'
|
||||||
|
|
||||||
|
let iosock;
|
||||||
|
let callback;
|
||||||
|
|
||||||
|
export default {
|
||||||
|
createPublisher(port, log) {
|
||||||
|
const server = http.createServer(function(req, res){
|
||||||
|
});
|
||||||
|
server.listen(port);
|
||||||
|
iosock = io.listen(server);
|
||||||
|
iosock.sockets.on('connection', function(socket) {
|
||||||
|
log.info("indexing server connected")
|
||||||
|
socket.on('disconnect', function() {
|
||||||
|
});
|
||||||
|
socket.on('subscribe', function(room) {
|
||||||
|
socket.join(room);
|
||||||
|
});
|
||||||
|
socket.on('unsubscribe', function(room) {
|
||||||
|
socket.leave(room);
|
||||||
|
});
|
||||||
|
socket.on('query_response', function(msg) {
|
||||||
|
msg.params.cb = callback;
|
||||||
|
metadata.respondQueryGetMD(msg.result, msg.params);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
putObjectMD(bucketName, objName, objVal) {
|
||||||
|
iosock.sockets.to('puts').emit('put', {
|
||||||
|
bucketName,
|
||||||
|
objName,
|
||||||
|
objVal
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
listObject(bucketName, query, prefix, marker, delimiter, maxKeys, cb) {
|
||||||
|
callback = cb;
|
||||||
|
iosock.sockets.to('queries').emit('query', {
|
||||||
|
query,
|
||||||
|
params : {
|
||||||
|
bucketName,
|
||||||
|
prefix,
|
||||||
|
marker,
|
||||||
|
maxKeys,
|
||||||
|
delimiter,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
deleteObjectMD(bucketName, objName) {
|
||||||
|
iosock.sockets.to('deletes').emit('delete', {
|
||||||
|
bucketName,
|
||||||
|
objName
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
processQueryHeader(header) {
|
||||||
|
if (!header) {
|
||||||
|
return header;
|
||||||
|
}
|
||||||
|
const queryTerms = header.split('&');
|
||||||
|
const query = [];
|
||||||
|
for (let i = 0; i < queryTerms.length; i++) {
|
||||||
|
query.push(queryTerms[i]);
|
||||||
|
}
|
||||||
|
return query;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
|
||||||
|
import ListResult from '../in_memory/ListResult';
|
||||||
|
|
||||||
|
export class ListBucketResult extends ListResult {
|
||||||
|
constructor() {
|
||||||
|
super();
|
||||||
|
this.Contents = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
addContentsKey(key, keyMap) {
|
||||||
|
const objectMD = keyMap;
|
||||||
|
this.Contents.push({
|
||||||
|
key,
|
||||||
|
value: {
|
||||||
|
LastModified: objectMD['last-modified'],
|
||||||
|
ETag: objectMD['content-md5'],
|
||||||
|
StorageClass: objectMD['x-amz-storage-class'],
|
||||||
|
Owner: {
|
||||||
|
DisplayName: objectMD['owner-display-name'],
|
||||||
|
ID: objectMD['owner-id'],
|
||||||
|
},
|
||||||
|
Size: objectMD['content-length'],
|
||||||
|
// Initiated is used for overview of MPU
|
||||||
|
Initiated: objectMD.initiated,
|
||||||
|
// Initiator is used for overview of MPU.
|
||||||
|
// It is an object containing DisplayName
|
||||||
|
// and ID
|
||||||
|
Initiator: objectMD.initiator,
|
||||||
|
// EventualStorageBucket is used for overview of MPU
|
||||||
|
EventualStorageBucket: objectMD.eventualStorageBucket,
|
||||||
|
// Used for parts of MPU
|
||||||
|
partLocations: objectMD.partLocations,
|
||||||
|
// creationDate is just used for serviceGet
|
||||||
|
creationDate: objectMD.creationDate,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
this.MaxKeys += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
hasDeleteMarker(key, keyMap) {
|
||||||
|
const objectMD = keyMap;
|
||||||
|
if (objectMD['x-amz-delete-marker'] !== undefined) {
|
||||||
|
return (objectMD['x-amz-delete-marker'] === true);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,309 @@
|
||||||
|
import { errors } from 'arsenal';
|
||||||
|
|
||||||
|
import { markerFilter, prefixFilter } from '../in_memory/bucket_utilities';
|
||||||
|
import { ListBucketResult } from './ListBucketResult';
|
||||||
|
import getMultipartUploadListing from '../in_memory/getMultipartUploadListing';
|
||||||
|
import config from '../../Config';
|
||||||
|
import async from 'async';
|
||||||
|
import antidoteClient from 'antidote_ts_client';
|
||||||
|
|
||||||
|
const defaultMaxKeys = 1000;
|
||||||
|
|
||||||
|
var replacer = function(key, value) {
|
||||||
|
if (value === undefined){
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
};
|
||||||
|
|
||||||
|
var reviver = function(key, value) {
|
||||||
|
if (value === null){
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
};
|
||||||
|
|
||||||
|
class AntidoteInterface {
|
||||||
|
constructor() {
|
||||||
|
this.antidote = antidoteClient.connect(config.antidote.port, config.antidote.host);
|
||||||
|
}
|
||||||
|
|
||||||
|
createBucket(bucketName, bucketMD, log, cb) {
|
||||||
|
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||||
|
// TODO Check whether user already owns the bucket,
|
||||||
|
// if so return "BucketAlreadyOwnedByYou"
|
||||||
|
// If not owned by user, return "BucketAlreadyExists"
|
||||||
|
if (bucket) {
|
||||||
|
return cb(errors.BucketAlreadyExists);
|
||||||
|
}
|
||||||
|
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let bucket_MD = this.antidote.map(`${bucketName}/md`)
|
||||||
|
this.antidote.update([
|
||||||
|
bucket_MD.register('md').set(bucketMD)
|
||||||
|
]).then( (resp) => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
putBucketAttributes(bucketName, bucketMD, log, cb) {
|
||||||
|
this.getBucketAttributes(bucketName, log, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let bucket_MD = this.antidote.map(`${bucketName}/md`)
|
||||||
|
this.antidote.update([
|
||||||
|
bucket_MD.register('md').set(bucketMD)
|
||||||
|
]).then( (resp) => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getBucketAttributes(bucketName, log, cb) {
|
||||||
|
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let bucket_MD = this.antidote.map(`${bucketName}/md`)
|
||||||
|
bucket_MD.read().then(bucketMD => {
|
||||||
|
bucketMD = bucketMD.toJsObject();
|
||||||
|
if (Object.keys(bucketMD).length === 0) {
|
||||||
|
return cb(errors.NoSuchBucket);
|
||||||
|
}
|
||||||
|
return cb(null, bucketMD['md']);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
deleteBucket(bucketName, log, cb) {
|
||||||
|
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
|
||||||
|
bucket_Objs.read().then(objects => {
|
||||||
|
if (bucket && objects.length > 0) {
|
||||||
|
return cb(errors.BucketNotEmpty);
|
||||||
|
}
|
||||||
|
let bucket_MD = this.antidote.map(`${bucketName}/md`)
|
||||||
|
this.antidote.update([
|
||||||
|
bucket_MD.remove(bucket_MD.register('md')),
|
||||||
|
]).then( (resp) => {
|
||||||
|
return cb(null);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
putObject(bucketName, objName, objVal, log, cb) {
|
||||||
|
this.getBucketAttributes(bucketName, log, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
|
||||||
|
let object_MD = this.antidote.map(`${objName}`);
|
||||||
|
this.antidote.update([
|
||||||
|
bucket_Objs.add(objName),
|
||||||
|
object_MD.register('md').set(objVal)
|
||||||
|
]).then( (resp) => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getBucketAndObject(bucketName, objName, log, cb) {
|
||||||
|
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err, { bucket });
|
||||||
|
}
|
||||||
|
const bucket_MD = {}
|
||||||
|
Object.keys(bucket).map(function(key) {
|
||||||
|
bucket_MD[key.substr(1)] = bucket[key]
|
||||||
|
});
|
||||||
|
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let object_MD = this.antidote.map(`${objName}`);
|
||||||
|
object_MD.read().then(objectMD => {
|
||||||
|
objectMD = objectMD.toJsObject();
|
||||||
|
|
||||||
|
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||||
|
return cb(null, { bucket: JSON.stringify(bucket_MD) });
|
||||||
|
}
|
||||||
|
return cb(null, {
|
||||||
|
bucket: JSON.stringify(bucket_MD),
|
||||||
|
obj: JSON.stringify(objectMD['md']),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getObject(bucketName, objName, log, cb) {
|
||||||
|
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let object_MD = this.antidote.map(`${objName}`);
|
||||||
|
object_MD.read().then(objectMD => {
|
||||||
|
objectMD = objectMD.toJsObject();
|
||||||
|
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||||
|
return cb(errors.NoSuchKey);
|
||||||
|
}
|
||||||
|
return cb(null, objectMD['md']);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
deleteObject(bucketName, objName, log, cb) {
|
||||||
|
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let object_MD = this.antidote.map(`${objName}`);
|
||||||
|
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
|
||||||
|
object_MD.read().then(objectMD => {
|
||||||
|
objectMD = objectMD.toJsObject();
|
||||||
|
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||||
|
return cb(errors.NoSuchKey);
|
||||||
|
}
|
||||||
|
this.antidote.update([
|
||||||
|
object_MD.remove(object_MD.register('md')),
|
||||||
|
bucket_Objs.remove(objName)
|
||||||
|
]).then( (resp) => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getObjectMD(antidote, bucketName, key, callback) {
|
||||||
|
antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let object_MD = antidote.map(`${key}`);
|
||||||
|
object_MD.read().then(objectMD => {
|
||||||
|
objectMD = objectMD.toJsObject();
|
||||||
|
if (Object.keys(objectMD).length === 0) {
|
||||||
|
return callback(error.NoSuchKey, null);
|
||||||
|
}
|
||||||
|
return callback(null, objectMD['md']);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
listObject(bucketName, params, log, cb) {
|
||||||
|
const { prefix, marker, delimiter, maxKeys } = params;
|
||||||
|
if (prefix && typeof prefix !== 'string') {
|
||||||
|
return cb(errors.InvalidArgument);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (marker && typeof marker !== 'string') {
|
||||||
|
return cb(errors.InvalidArgument);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (delimiter && typeof delimiter !== 'string') {
|
||||||
|
return cb(errors.InvalidArgument);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxKeys && typeof maxKeys !== 'number') {
|
||||||
|
return cb(errors.InvalidArgument);
|
||||||
|
}
|
||||||
|
|
||||||
|
let numKeys = maxKeys;
|
||||||
|
// If paramMaxKeys is undefined, the default parameter will set it.
|
||||||
|
// However, if it is null, the default parameter will not set it.
|
||||||
|
if (numKeys === null) {
|
||||||
|
numKeys = defaultMaxKeys;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let bucket_MD = this.antidote.map(`${bucketName}/md`)
|
||||||
|
bucket_MD.read().then(bucketMD => {
|
||||||
|
bucketMD = bucketMD.toJsObject();
|
||||||
|
if (Object.keys(bucketMD).length === 0) {
|
||||||
|
return cb(errors.NoSuchBucket);
|
||||||
|
}
|
||||||
|
const response = new ListBucketResult();
|
||||||
|
|
||||||
|
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||||
|
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
|
||||||
|
bucket_Objs.read().then(keys => {
|
||||||
|
|
||||||
|
async.map(keys, this.getObjectMD.bind(null, this.antidote, bucketName), function(err, objectMeta) {
|
||||||
|
|
||||||
|
// If marker specified, edit the keys array so it
|
||||||
|
// only contains keys that occur alphabetically after the marker
|
||||||
|
if (marker) {
|
||||||
|
keys = markerFilter(marker, keys);
|
||||||
|
response.Marker = marker;
|
||||||
|
}
|
||||||
|
// If prefix specified, edit the keys array so it only
|
||||||
|
// contains keys that contain the prefix
|
||||||
|
if (prefix) {
|
||||||
|
keys = prefixFilter(prefix, keys);
|
||||||
|
response.Prefix = prefix;
|
||||||
|
}
|
||||||
|
// Iterate through keys array and filter keys containing
|
||||||
|
// delimiter into response.CommonPrefixes and filter remaining
|
||||||
|
// keys into response.Contents
|
||||||
|
for (let i = 0; i < keys.length; ++i) {
|
||||||
|
const currentKey = keys[i];
|
||||||
|
// Do not list object with delete markers
|
||||||
|
if (response.hasDeleteMarker(currentKey,
|
||||||
|
objectMeta[i])) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// If hit numKeys, stop adding keys to response
|
||||||
|
if (response.MaxKeys >= numKeys) {
|
||||||
|
response.IsTruncated = true;
|
||||||
|
response.NextMarker = keys[i - 1];
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// If a delimiter is specified, find its index in the
|
||||||
|
// current key AFTER THE OCCURRENCE OF THE PREFIX
|
||||||
|
let delimiterIndexAfterPrefix = -1;
|
||||||
|
let prefixLength = 0;
|
||||||
|
if (prefix) {
|
||||||
|
prefixLength = prefix.length;
|
||||||
|
}
|
||||||
|
const currentKeyWithoutPrefix = currentKey
|
||||||
|
.slice(prefixLength);
|
||||||
|
let sliceEnd;
|
||||||
|
if (delimiter) {
|
||||||
|
delimiterIndexAfterPrefix = currentKeyWithoutPrefix
|
||||||
|
.indexOf(delimiter);
|
||||||
|
sliceEnd = delimiterIndexAfterPrefix + prefixLength;
|
||||||
|
response.Delimiter = delimiter;
|
||||||
|
}
|
||||||
|
// If delimiter occurs in current key, add key to
|
||||||
|
// response.CommonPrefixes.
|
||||||
|
// Otherwise add key to response.Contents
|
||||||
|
if (delimiterIndexAfterPrefix > -1) {
|
||||||
|
const keySubstring = currentKey.slice(0, sliceEnd + 1);
|
||||||
|
response.addCommonPrefix(keySubstring);
|
||||||
|
} else {
|
||||||
|
response.addContentsKey(currentKey,
|
||||||
|
objectMeta[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cb(null, response);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
listMultipartUploads(bucketName, listingParams, log, cb) {
|
||||||
|
process.nextTick(() => {
|
||||||
|
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||||
|
if (bucket === undefined) {
|
||||||
|
// no on going multipart uploads, return empty listing
|
||||||
|
return cb(null, {
|
||||||
|
IsTruncated: false,
|
||||||
|
NextMarker: undefined,
|
||||||
|
MaxKeys: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return getMultipartUploadListing(bucket, listingParams, cb);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export default AntidoteInterface;
|
|
@ -2,7 +2,10 @@ import BucketClientInterface from './bucketclient/backend';
|
||||||
import BucketFileInterface from './bucketfile/backend';
|
import BucketFileInterface from './bucketfile/backend';
|
||||||
import BucketInfo from './BucketInfo';
|
import BucketInfo from './BucketInfo';
|
||||||
import inMemory from './in_memory/backend';
|
import inMemory from './in_memory/backend';
|
||||||
|
import AntidoteInterface from './antidote/backend';
|
||||||
import config from '../Config';
|
import config from '../Config';
|
||||||
|
import indexClient from '../indexClient/indexClient'
|
||||||
|
import async from 'async'
|
||||||
|
|
||||||
let client;
|
let client;
|
||||||
let implName;
|
let implName;
|
||||||
|
@ -16,6 +19,33 @@ if (config.backends.metadata === 'mem') {
|
||||||
} else if (config.backends.metadata === 'scality') {
|
} else if (config.backends.metadata === 'scality') {
|
||||||
client = new BucketClientInterface();
|
client = new BucketClientInterface();
|
||||||
implName = 'bucketclient';
|
implName = 'bucketclient';
|
||||||
|
} else if (config.backends.metadata === 'antidote') {
|
||||||
|
client = new AntidoteInterface();
|
||||||
|
implName = 'antidote';
|
||||||
|
}
|
||||||
|
|
||||||
|
function getQueryResults(params, objName, callback) {
|
||||||
|
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
|
||||||
|
client.getObject(bucketName, objName, log, (err, data) => {
|
||||||
|
if (err) {
|
||||||
|
callback(err, null);
|
||||||
|
}
|
||||||
|
callback(null, {key: objName, value: {
|
||||||
|
LastModified: data['last-modified'],
|
||||||
|
ETag: data['content-md5'],
|
||||||
|
StorageClass: data['x-amz-storage-class'],
|
||||||
|
Owner: {
|
||||||
|
ID: data['owner-id'],
|
||||||
|
DisplayName: data['owner-display-name']
|
||||||
|
},
|
||||||
|
Size: data['content-length'],
|
||||||
|
Initiated: undefined,
|
||||||
|
Initiator: undefined,
|
||||||
|
EventualStorageBucket: undefined,
|
||||||
|
partLocations: undefined,
|
||||||
|
creationDate: undefined
|
||||||
|
}});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
const metadata = {
|
const metadata = {
|
||||||
|
@ -75,6 +105,12 @@ const metadata = {
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
log.debug('object successfully put in metadata');
|
log.debug('object successfully put in metadata');
|
||||||
|
if (config.userMetaIndexing || config.systemMetaIndexing) {
|
||||||
|
if (objName.indexOf('..|..') !== -1) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
indexClient.putObjectMD(bucketName, objName, objVal);
|
||||||
|
}
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}, params);
|
}, params);
|
||||||
},
|
},
|
||||||
|
@ -113,12 +149,18 @@ const metadata = {
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
log.debug('object deleted from metadata');
|
log.debug('object deleted from metadata');
|
||||||
|
if (config.userMetaIndexing || config.systemMetaIndexing) {
|
||||||
|
indexClient.deleteObjectMD(bucketName, objName);
|
||||||
|
}
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}, params);
|
}, params);
|
||||||
},
|
},
|
||||||
|
|
||||||
listObject: (bucketName, listingParams, log, cb) => {
|
listObject: (bucketName, listingParams, log, cb) => {
|
||||||
client
|
if (listingParams.query) {
|
||||||
|
indexClient.listObject(bucketName, listingParams.query, listingParams.prefix, listingParams.marker,listingParams.delimiter, listingParams.maxKeys, cb);
|
||||||
|
} else {
|
||||||
|
client
|
||||||
.listObject(bucketName, listingParams,
|
.listObject(bucketName, listingParams,
|
||||||
log, (err, data) => {
|
log, (err, data) => {
|
||||||
log.debug('getting object listing from metadata');
|
log.debug('getting object listing from metadata');
|
||||||
|
@ -129,6 +171,34 @@ const metadata = {
|
||||||
log.debug('object listing retrieved from metadata');
|
log.debug('object listing retrieved from metadata');
|
||||||
return cb(err, data);
|
return cb(err, data);
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
respondQueryGetMD: (result, params) => {
|
||||||
|
async.map(result, getQueryResults.bind(null, params), function(err, res) {
|
||||||
|
const response = {
|
||||||
|
IsTruncated: false,
|
||||||
|
NextMarker: params.marker,
|
||||||
|
CommonPrefixes: [],
|
||||||
|
MaxKeys: 10,
|
||||||
|
Contents: res
|
||||||
|
}
|
||||||
|
return params.cb(err, response);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
respondQueryFilter: (result, params) => {
|
||||||
|
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
|
||||||
|
client.listObject(bucketName, { prefix, marker, maxKeys, delimiter },
|
||||||
|
log, (err, data) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
data.Contents = data.Contents.filter(elem => {
|
||||||
|
return result.indexOf(elem.key) !== -1;
|
||||||
|
});
|
||||||
|
return cb(err, data);
|
||||||
|
});
|
||||||
},
|
},
|
||||||
|
|
||||||
listMultipartUploads: (bucketName, listingParams, log, cb) => {
|
listMultipartUploads: (bucketName, listingParams, log, cb) => {
|
||||||
|
|
|
@ -7,7 +7,7 @@ import { logger } from './utilities/logger';
|
||||||
import { clientCheck } from './utilities/healthcheckHandler';
|
import { clientCheck } from './utilities/healthcheckHandler';
|
||||||
import _config from './Config';
|
import _config from './Config';
|
||||||
import routes from './routes';
|
import routes from './routes';
|
||||||
|
import indexClient from './indexClient/indexClient'
|
||||||
|
|
||||||
class S3Server {
|
class S3Server {
|
||||||
/**
|
/**
|
||||||
|
@ -38,6 +38,7 @@ class S3Server {
|
||||||
* This starts the http server.
|
* This starts the http server.
|
||||||
*/
|
*/
|
||||||
startup() {
|
startup() {
|
||||||
|
indexClient.createPublisher(_config.indexServerPort, logger)
|
||||||
// Todo: http.globalAgent.maxSockets, http.globalAgent.maxFreeSockets
|
// Todo: http.globalAgent.maxSockets, http.globalAgent.maxFreeSockets
|
||||||
if (_config.https) {
|
if (_config.https) {
|
||||||
this.server = https.createServer({
|
this.server = https.createServer({
|
||||||
|
@ -118,7 +119,7 @@ class S3Server {
|
||||||
|
|
||||||
export default function main() {
|
export default function main() {
|
||||||
let clusters = _config.clusters || 1;
|
let clusters = _config.clusters || 1;
|
||||||
if (process.env.S3BACKEND === 'mem') {
|
if (process.env.S3BACKEND === 'mem' || process.env.S3BACKEND === 'antidote') {
|
||||||
clusters = 1;
|
clusters = 1;
|
||||||
}
|
}
|
||||||
if (cluster.isMaster) {
|
if (cluster.isMaster) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
},
|
},
|
||||||
"homepage": "https://github.com/scality/S3#readme",
|
"homepage": "https://github.com/scality/S3#readme",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"antidote_ts_client": "^0.1.0",
|
||||||
"arsenal": "scality/Arsenal",
|
"arsenal": "scality/Arsenal",
|
||||||
"async": "~1.4.2",
|
"async": "~1.4.2",
|
||||||
"babel-core": "^6.5.2",
|
"babel-core": "^6.5.2",
|
||||||
|
@ -31,8 +32,10 @@
|
||||||
"level": "^1.4.0",
|
"level": "^1.4.0",
|
||||||
"level-sublevel": "^6.5.4",
|
"level-sublevel": "^6.5.4",
|
||||||
"multilevel": "^7.3.0",
|
"multilevel": "^7.3.0",
|
||||||
|
"node-bitmap-ewah": "dimitriosvasilas/node-bitmap-ewah",
|
||||||
"node-uuid": "^1.4.3",
|
"node-uuid": "^1.4.3",
|
||||||
"ready-set-stream": "1.0.7",
|
"ready-set-stream": "1.0.7",
|
||||||
|
"socket.io": "^1.7.2",
|
||||||
"sproxydclient": "scality/sproxydclient",
|
"sproxydclient": "scality/sproxydclient",
|
||||||
"utapi": "scality/utapi",
|
"utapi": "scality/utapi",
|
||||||
"utf8": "~2.1.1",
|
"utf8": "~2.1.1",
|
||||||
|
@ -71,6 +74,7 @@
|
||||||
"lint": "eslint $(git ls-files '*.js')",
|
"lint": "eslint $(git ls-files '*.js')",
|
||||||
"lint_md": "mdlint $(git ls-files '*.md')",
|
"lint_md": "mdlint $(git ls-files '*.md')",
|
||||||
"mem_backend": "S3BACKEND=mem node index.js",
|
"mem_backend": "S3BACKEND=mem node index.js",
|
||||||
|
"antidote_backend": "S3BACKEND=antidote node index.js",
|
||||||
"perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js",
|
"perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js",
|
||||||
"start": "node init.js && node index.js",
|
"start": "node init.js && node index.js",
|
||||||
"start_utapi": "node utapiServer.js",
|
"start_utapi": "node utapiServer.js",
|
||||||
|
|
Loading…
Reference in New Issue