Compare commits
34 Commits
developmen
...
proto/bitm
Author | SHA1 | Date |
---|---|---|
Dimitrios Vasilas | e27cb1cf6c | |
Dimitrios Vasilas | 9c8c8fd9ea | |
Dimitrios Vasilas | a236fd4457 | |
Dimitrios Vasilas | e76472fa20 | |
Dimitrios Vasilas | e6f82079c6 | |
Dimitrios Vasilas | cfb77cc9fe | |
Dimitrios Vasilas | 17a9cd1071 | |
Dimitrios Vasilas | 8ce70e7337 | |
Dimitrios Vasilas | cdbcae37b2 | |
Dimitrios Vasilas | 0f5cd3e545 | |
Dimitrios Vasilas | f6cddaf6a5 | |
Dimitrios Vasilas | c225e483bc | |
Dimitrios Vasilas | 53cb8a3263 | |
Dimitrios Vasilas | d302070959 | |
Dimitrios Vasilas | 7597c379e7 | |
Dimitrios Vasilas | 541e97f1ea | |
Dimitrios Vasilas | c32fe7df51 | |
Dimitrios Vasilas | 9e301aaf25 | |
Dimitrios Vasilas | eb441dd09c | |
Dimitrios Vasilas | 6fddd95a1d | |
Dimitrios Vasilas | 1303bd8b97 | |
Dimitrios Vasilas | 093c071836 | |
Dimitrios Vasilas | da115e6ce4 | |
Dimitrios Vasilas | b588d5e4d0 | |
Dimitrios Vasilas | 5f1bd15fb0 | |
Dimitrios Vasilas | 22a783f23e | |
Dimitrios Vasilas | 320fc256bf | |
Dimitrios Vasilas | 361585825f | |
Dimitrios Vasilas | e390e355ef | |
Dimitrios Vasilas | f03940e45e | |
Dimitrios Vasilas | e67abad697 | |
Dimitrios Vasilas | 9a42556448 | |
Dimitrios Vasilas | 3056714c02 | |
Dimitrios Vasilas | d871755296 |
|
@ -48,6 +48,13 @@
|
|||
"logLevel": "info",
|
||||
"dumpLevel": "error"
|
||||
},
|
||||
"antidote": {
|
||||
"host": "localhost",
|
||||
"port": 8087
|
||||
},
|
||||
"userMetaIndexing":true,
|
||||
"systemMetaIndexing":true,
|
||||
"indexServerPort":7000,
|
||||
"healthChecks": {
|
||||
"allowFrom": ["127.0.0.1/8", "::1"]
|
||||
}
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
"S3": {
|
||||
"host": "127.0.0.1",
|
||||
"port": 7000
|
||||
},
|
||||
"antidote": {
|
||||
"host": "127.0.0.1",
|
||||
"port": 8087
|
||||
},
|
||||
"leveldb_path": "indexdb0",
|
||||
"backend": "antidote"
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
'use strict';
|
||||
|
||||
require('./lib/bitmapd-utils.js').default.connectToS3();
|
||||
require('./lib/bitmapd-utils.js').default.connectToDB();
|
|
@ -0,0 +1,148 @@
|
|||
const fs = require('fs');
|
||||
const io = require('socket.io-client');
|
||||
const levelup = require('levelup');
|
||||
const leveldown = require('leveldown');
|
||||
const antidoteClient = require('antidote_ts_client');
|
||||
|
||||
let config = fs.readFileSync('./config.json', { encoding: 'utf-8' });
|
||||
config = JSON.parse(config);
|
||||
|
||||
let antidote;
|
||||
let client = null;
|
||||
let db = null;
|
||||
|
||||
const utils = {
|
||||
connectToS3() {
|
||||
client = io.connect(`http://${config.S3.host}:${config.S3.port}`, {
|
||||
reconnection: true,
|
||||
});
|
||||
client.on('connect', function() {
|
||||
client.emit('subscribe', 'puts');
|
||||
client.emit('subscribe', 'deletes');
|
||||
client.emit('subscribe', 'queries');
|
||||
});
|
||||
client.on('reconnecting', function(number) {
|
||||
});
|
||||
client.on('error', function(err) {
|
||||
});
|
||||
client.on('put', function(msg) {
|
||||
require('./utils.js').default.updateIndex(msg.bucketName, msg.objName, msg.objVal);
|
||||
});
|
||||
client.on('query', function(msg) {
|
||||
if (!msg.params.prefix)
|
||||
msg.params.prefix = undefined;
|
||||
if (!msg.params.marker)
|
||||
msg.params.marker = undefined;
|
||||
if (!msg.params.delimiter)
|
||||
msg.params.delimiter = undefined;
|
||||
require('./utils.js').default.evaluateQuery(msg.query, msg.params);
|
||||
});
|
||||
client.on('delete', function(msg) {
|
||||
require('./utils.js').default.deleteObject(msg.bucketName, msg.objName);
|
||||
});
|
||||
},
|
||||
|
||||
connectToDB() {
|
||||
antidote = antidoteClient.connect(config.antidote.port, config.antidote.host);
|
||||
antidote.defaultBucket = 'index';
|
||||
leveldown.destroy(config.leveldb_path, function (err) {
|
||||
if (!err) {
|
||||
db = levelup(config.leveldb_path);
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
updateAntidoteSet(key, elem, cb) {
|
||||
const keyset = antidote.set('keys');
|
||||
antidote.update(
|
||||
keyset.add(key)
|
||||
).then( (resp) => {
|
||||
const set = antidote.set(key);
|
||||
antidote.update(
|
||||
set.add(elem)
|
||||
).then( (resp) => {
|
||||
return cb();
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
readAntidoteSet(key, cb) {
|
||||
const set = antidote.set(key);
|
||||
set.read().then(objs => {
|
||||
return cb(null, objs);
|
||||
});
|
||||
},
|
||||
|
||||
removeFromAntidoteSet(key, objName, cb) {
|
||||
const set = antidote.set(key);
|
||||
antidote.update(
|
||||
set.remove(objName)
|
||||
).then( (resp) => {
|
||||
return cb();
|
||||
});
|
||||
},
|
||||
|
||||
respondQuery(params, queryTerms) {
|
||||
client.emit('query_response', {
|
||||
result: queryTerms,
|
||||
params
|
||||
})
|
||||
},
|
||||
|
||||
put(key, value, cb) {
|
||||
db.put(key, value, function(err) {
|
||||
return cb(err);
|
||||
});
|
||||
},
|
||||
|
||||
get(key, cb) {
|
||||
db.get(key, function(err, data) {
|
||||
return cb(err, data);
|
||||
});
|
||||
},
|
||||
|
||||
getPrefix(prefix, cb) {
|
||||
const list = []
|
||||
db.createReadStream({
|
||||
start: prefix,
|
||||
end: prefix + "\xFF"
|
||||
})
|
||||
.on('data', function(data) {
|
||||
list.push(data);
|
||||
})
|
||||
.on('error', function(err) {
|
||||
return cb(err, null)
|
||||
})
|
||||
.on('close', function() {
|
||||
if (list.length === 0)
|
||||
return cb(null, null)
|
||||
return cb(null, list)
|
||||
});
|
||||
},
|
||||
|
||||
getRange(start, end, cb) {
|
||||
const list = []
|
||||
db.createReadStream({
|
||||
start: start,
|
||||
end: end
|
||||
})
|
||||
.on('data', function(data) {
|
||||
list.push(data);
|
||||
})
|
||||
.on('error', function(err) {
|
||||
return cb(err, null)
|
||||
})
|
||||
.on('close', function() {
|
||||
return cb(null, list)
|
||||
});
|
||||
},
|
||||
|
||||
batchWrite(ops, cb) {
|
||||
db.batch(ops, function(err) {
|
||||
return cb(err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
exports.default = utils;
|
||||
exports.config = config;
|
|
@ -0,0 +1,528 @@
|
|||
const bitmap = require('node-bitmap-ewah');
|
||||
const indexd = require('./bitmapd-utils').default;
|
||||
const config = require('./bitmapd-utils').config;
|
||||
const async = require('async');
|
||||
const _ = require('underscore')
|
||||
|
||||
function intersection(a, b) {
|
||||
if (config.backend === "leveldb") {
|
||||
return a.and(b);
|
||||
} else if (config.backend === "antidote") {
|
||||
return _.intersection(a, b);
|
||||
}
|
||||
}
|
||||
|
||||
function union(a, b) {
|
||||
if (config.backend === "leveldb") {
|
||||
return a.and(b);
|
||||
} else if (config.backend === "antidote") {
|
||||
return _.union(a, b);
|
||||
}
|
||||
}
|
||||
|
||||
function difference(u, a) {
|
||||
if (config.backend === "leveldb") {
|
||||
return a.not()
|
||||
} else if (config.backend === "antidote") {
|
||||
return _.difference(u, a);
|
||||
}
|
||||
}
|
||||
|
||||
function padLeft(nr, n){
|
||||
return Array(n-String(nr).length+1).join('0')+nr;
|
||||
}
|
||||
|
||||
function binaryIndexOf(arr, searchElement, op) {
|
||||
let minIndex = 0;
|
||||
let maxIndex = arr.length - 1;
|
||||
let currentIndex;
|
||||
let currentElement;
|
||||
|
||||
while (minIndex <= maxIndex) {
|
||||
currentIndex = (minIndex + maxIndex) / 2 | 0;
|
||||
currentElement = arr[currentIndex];
|
||||
|
||||
if (currentElement < searchElement) {
|
||||
minIndex = currentIndex + 1;
|
||||
}
|
||||
else if (currentElement > searchElement) {
|
||||
maxIndex = currentIndex - 1;
|
||||
}
|
||||
else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (op === '=') {
|
||||
return currentIndex;
|
||||
} else if (op === '>' && arr[currentIndex] <= searchElement) {
|
||||
currentIndex += 1
|
||||
} else if (op === '>=' && arr[currentIndex] < searchElement) {
|
||||
currentIndex += 1
|
||||
} else if (op === '<' && arr[currentIndex] >= searchElement) {
|
||||
currentIndex -= 1
|
||||
} else if (op === '<=') {
|
||||
if (arr[currentIndex] > searchElement) {
|
||||
currentIndex -= 1
|
||||
}
|
||||
}
|
||||
if (currentIndex > arr.length-1 || currentIndex < 0) {
|
||||
return -1;
|
||||
}
|
||||
return currentIndex;
|
||||
}
|
||||
|
||||
function storeToBitmap(stored) {
|
||||
const bm = bitmap.createObject();
|
||||
if (stored) {
|
||||
stored[2] = new Buffer(stored[2], 'binary');
|
||||
bm.read(stored);
|
||||
}
|
||||
return bm;
|
||||
}
|
||||
|
||||
function parseNotOperator(result, not, callback) {
|
||||
if (not) {
|
||||
callback(null, result.not());
|
||||
} else {
|
||||
callback(null, result);
|
||||
}
|
||||
}
|
||||
|
||||
function updateObjectMapping(objMapping, objName) {
|
||||
let rowId;
|
||||
if (Object.keys(objMapping)) {
|
||||
if (typeof objMapping.mapping[objName] === 'number') {
|
||||
rowId = objMapping.mapping[objName];
|
||||
} else if (objMapping.nextAvail.length > 0) {
|
||||
rowId = objMapping.nextAvail[0];
|
||||
objMapping.nextAvail.splice(0, 1);
|
||||
} else {
|
||||
rowId = objMapping.length;
|
||||
objMapping.length += 1;
|
||||
}
|
||||
}
|
||||
objMapping.mapping[rowId] = objName;
|
||||
objMapping.mapping[objName] = rowId;
|
||||
|
||||
return rowId;
|
||||
}
|
||||
|
||||
function getObjectMeta(bucketName, cb) {
|
||||
if (config.backend === "leveldb") {
|
||||
return cb(null);
|
||||
} else if (config.backend === "antidote") {
|
||||
indexd.readAntidoteSet(`${bucketName}`, (err, allObjects) => {
|
||||
return cb(allObjects);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function updateIndexMeta(bucketName, objName, cb) {
|
||||
if (config.backend === "leveldb") {
|
||||
let rowId;
|
||||
indexd.get(`${bucketName}`, (err, objMapping) => {
|
||||
if (err) {
|
||||
objMapping = {nextAvail: [], length:1, mapping:{}}
|
||||
}
|
||||
else {
|
||||
objMapping = JSON.parse(objMapping);
|
||||
}
|
||||
rowId = updateObjectMapping(objMapping, objName);
|
||||
indexd.put(`${bucketName}`, JSON.stringify(objMapping), err => {
|
||||
if (err) {
|
||||
return ;
|
||||
}
|
||||
return cb(err, rowId)
|
||||
})
|
||||
})
|
||||
} else if (config.backend === "antidote") {
|
||||
indexd.updateAntidoteSet(`${bucketName}`, objName, () => {
|
||||
return cb(null, -1);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function filterRemoved(results, params, cb) {
|
||||
indexd.readAntidoteSet(`${params.bucketName}/removed`, (err, removed) => {
|
||||
results = results.filter(elem => {
|
||||
return removed.indexOf(elem) === -1;
|
||||
});
|
||||
return cb(results);
|
||||
});
|
||||
}
|
||||
|
||||
function constructRange(value, callback) {
|
||||
indexd.readAntidoteSet(value, (err, result) => {
|
||||
callback(null, result);
|
||||
});
|
||||
}
|
||||
|
||||
function readDB(key, cb) {
|
||||
if (config.backend === "leveldb") {
|
||||
indexd.get(key, (err, data) => {
|
||||
if (!err) {
|
||||
data = JSON.parse(data)
|
||||
}
|
||||
return (err, data)
|
||||
});
|
||||
} else if (config.backend === "antidote") {
|
||||
indexd.readAntidoteSet(key, (err, data) => {
|
||||
return cb(err, data);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function writeDB(key, objName, attribute, value, rowId, cb) {
|
||||
if (config.backend === "leveldb") {
|
||||
updateIndexEntry(key, rowId, cb);
|
||||
} else if (config.backend === "antidote") {
|
||||
indexd.updateAntidoteSet(key, objName, () => {
|
||||
indexd.updateAntidoteSet(attribute, value, () => {
|
||||
return cb(null)
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function searchIntRange(bucketName, op, term, not, callback) {
|
||||
term = term.replace('--integer', '');
|
||||
const attr = term.split('/')[0];
|
||||
let value = parseInt(term.split('/')[1], 10);
|
||||
if (op === '=') {
|
||||
readDB(`${bucketName}/${attr}/${value}`, (err, data) =>{
|
||||
callback(err, data);
|
||||
});
|
||||
} else {
|
||||
if (config.backend === "antidote") {
|
||||
indexd.readAntidoteSet(`${bucketName}/${attr}`, (err, result) => {
|
||||
const range = []
|
||||
const index = binaryIndexOf(result, value, op)
|
||||
if (index === -1) {
|
||||
return parseNotOperator([], not, callback);
|
||||
}
|
||||
if (op.indexOf('>') !== -1) {
|
||||
if (op.indexOf('=') === -1) {
|
||||
value += 1
|
||||
}
|
||||
for (let i = index; i < result.length; i+=1) {
|
||||
range.push(`${bucketName}/${attr}/${result[i]}`)
|
||||
}
|
||||
} else if (op.indexOf('<') !== -1) {
|
||||
if (op.indexOf('=') === -1) {
|
||||
value -= 1
|
||||
}
|
||||
for (let i = index; i >= 0; i-=1) {
|
||||
range.push(`${bucketName}/${attr}/${result[i]}`)
|
||||
}
|
||||
}
|
||||
const objRange = [];
|
||||
async.map(range, constructRange, function(err, res) {
|
||||
res.forEach(arr => {
|
||||
arr.forEach(elem => {
|
||||
objRange.push(elem)
|
||||
})
|
||||
})
|
||||
objRange.sort();
|
||||
parseNotOperator(objRange, not, callback);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function searchRegExp(bucketName, searchTerm, not, callback) {
|
||||
const regexp = new RegExp(searchTerm);
|
||||
let result = bitmap.createObject();
|
||||
indexd.getPrefix(`${bucketName}/x-amz-meta`, (err, list) => {
|
||||
list.forEach(elem => {
|
||||
if (elem.key.indexOf('/') !== -1) {
|
||||
if (regexp.test(elem.key.substring(11))) {
|
||||
result = result.or(storeToBitmap(JSON.parse(elem.value)));
|
||||
}
|
||||
}
|
||||
});
|
||||
parseNotOperator(result, not, callback);
|
||||
})
|
||||
}
|
||||
|
||||
function readFromLevelDB(key, not, callback) {
|
||||
|
||||
}
|
||||
|
||||
function readTagIndex(bucketName, searchTerm, not, callback) {
|
||||
let term = null;
|
||||
let operator = null;
|
||||
if (searchTerm.indexOf('--integer') !== -1) {
|
||||
operator = searchTerm.split('/')[1];
|
||||
term = searchTerm.replace(operator, '');
|
||||
term = term.replace('/', '');
|
||||
searchIntRange(bucketName, operator, term, not, callback);
|
||||
} else if (searchTerm.indexOf('--regexp') !== -1) {
|
||||
searchTerm = searchTerm.replace('--regexp', '');
|
||||
searchTerm = searchTerm.substring(11);
|
||||
searchRegExp(bucketName, searchTerm, not, callback);
|
||||
} else {
|
||||
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||
callback(err, data);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function readFileSizeIndex(bucketName, searchTerm, not, callback) {
|
||||
const operator = searchTerm.split('/')[1];
|
||||
let term = searchTerm.replace(operator, '');
|
||||
term = term.replace('/', '');
|
||||
searchIntRange(bucketName, operator, term, not, callback);
|
||||
}
|
||||
|
||||
function readModDateIndex(bucketName, searchTerm, not, callback) {
|
||||
const operator = searchTerm.split('/')[1];
|
||||
let term = searchTerm.replace(operator, '');
|
||||
term = term.replace('/', '');
|
||||
return searchIntRange(bucketName, operator, term, not, callback);
|
||||
}
|
||||
|
||||
function readACLIndex(bucketName, searchTerm, not, callback) {
|
||||
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||
callback(err, data);
|
||||
});
|
||||
}
|
||||
|
||||
function readContentTypeIndex(bucketName, searchTerm, not, callback) {
|
||||
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||
callback(err, data);
|
||||
});
|
||||
}
|
||||
|
||||
function readIndex(bucketName, searchTerm, callback) {
|
||||
if (searchTerm.indexOf('op/AND') !== -1
|
||||
|| searchTerm.indexOf('op/OR') !== -1
|
||||
|| searchTerm.indexOf('op/NOT') !== -1) {
|
||||
callback(null, searchTerm);
|
||||
}
|
||||
let notOperator = false;
|
||||
let result;
|
||||
if (searchTerm.indexOf('x-amz-meta') !== -1) {
|
||||
return readTagIndex(bucketName, searchTerm, notOperator, callback);
|
||||
} else if (searchTerm.indexOf('filesize') !== -1) {
|
||||
return readFileSizeIndex(bucketName, searchTerm, notOperator, callback);
|
||||
} else if (searchTerm.indexOf('modificationdate') !== -1) {
|
||||
return readModDateIndex(bucketName, searchTerm, notOperator, callback);
|
||||
} else if (searchTerm.indexOf('contenttype') !== -1 || searchTerm.indexOf('contentsubtype') !== -1) {
|
||||
return readContentTypeIndex(bucketName, searchTerm, notOperator, callback);
|
||||
} else if (searchTerm.indexOf('acl') !== -1) {
|
||||
return readACLIndex(bucketName, searchTerm, notOperator, callback);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function bitmapToStore(bitmap) {
|
||||
const toStore = bitmap.write();
|
||||
toStore[2] = toStore[2].toString('binary');
|
||||
return toStore;
|
||||
}
|
||||
|
||||
function deleteOldEntries(bucketName, rowId, cb) {
|
||||
if (config.backend === "leveldb") {
|
||||
indexd.getPrefix(`${bucketName}/acl/`, (err, list) => {
|
||||
if (!list) {
|
||||
return cb();
|
||||
}
|
||||
const ops = []
|
||||
list.forEach(elem => {
|
||||
const tmp = storeToBitmap(JSON.parse(elem.value));
|
||||
tmp.unset(rowId);
|
||||
ops.push({ 'type':'put', 'key': elem.key, 'value':JSON.stringify(bitmapToStore(tmp))})
|
||||
});
|
||||
indexd.batchWrite(ops, (err) =>{
|
||||
if (err) {
|
||||
return null
|
||||
} else {
|
||||
return cb();
|
||||
}
|
||||
});
|
||||
});
|
||||
} else if (config.backend === "antidote") {
|
||||
return cb();
|
||||
}
|
||||
}
|
||||
|
||||
function updateBitmap(bitmap, rowId) {
|
||||
if (bitmap.length() - 1 <= rowId) {
|
||||
bitmap.push(rowId);
|
||||
} else {
|
||||
bitmap.set(rowId);
|
||||
}
|
||||
return bitmapToStore(bitmap);
|
||||
}
|
||||
|
||||
function updateIndexEntry(key, rowId, cb) {
|
||||
indexd.get(key, (err, data) => {
|
||||
if (err) {
|
||||
return (err);
|
||||
} else {
|
||||
data = JSON.parse(data)
|
||||
}
|
||||
indexd.put(key, JSON.stringify(updateBitmap(storeToBitmap(data), rowId)), err =>{
|
||||
return (err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function updateIntIndex(bucketName, objName, attribute, value, rowId) {
|
||||
writeDB(`${bucketName}/${attribute}/${value}`, objName, `${bucketName}/${attribute}`, value, rowId, (err) =>{
|
||||
return ;
|
||||
})
|
||||
}
|
||||
|
||||
function updateACLIndex(bucketName, objName, objVal, rowId) {
|
||||
deleteOldEntries(bucketName, rowId, () => {
|
||||
Object.keys(objVal).forEach(elem => {
|
||||
if (typeof objVal[elem] === 'string') {
|
||||
writeDB(`${bucketName}/acl/${elem}/${objVal[elem]}`, objName, `${bucketName}/acl`, `${elem}/${objVal[elem]}`, rowId, (err) =>{
|
||||
return ;
|
||||
});
|
||||
} else {
|
||||
objVal[elem].forEach(item => {
|
||||
writeDB(`${bucketName}/acl/${elem}/${item}`, objName, `${bucketName}/acl`, `${elem}/${item}`, rowId, (err) =>{
|
||||
return ;
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function updateContentTypeIndex(bucketName, objName, objVal, rowId) {
|
||||
const type = objVal.split('/')[0];
|
||||
const subtype = objVal.split('/')[1];
|
||||
writeDB(`${bucketName}/contenttype/${type}`, objName, `${bucketName}/contenttype`, type, rowId, (err) =>{
|
||||
writeDB(`${bucketName}/contentsubtype/${subtype}`, objName, `${bucketName}/contentsubtype`, subtype, rowId, (err) =>{
|
||||
return ;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function updateΜodDateIndex(bucketName, objName, objVal, rowId) {
|
||||
const date = new Date(objVal);
|
||||
const term = 'modificationdate-';
|
||||
const year = date.getUTCFullYear();
|
||||
const month = date.getUTCMonth() + 1;
|
||||
const day = date.getUTCDate();
|
||||
const hours = date.getUTCHours();
|
||||
const minutes = date.getUTCMinutes();
|
||||
updateIntIndex(bucketName, objName, `${term}year`, year, rowId);
|
||||
updateIntIndex(bucketName, objName, `${term}month`, month, rowId);
|
||||
updateIntIndex(bucketName, objName, `${term}day`, day, rowId);
|
||||
updateIntIndex(bucketName, objName, `${term}hours`, hours, rowId);
|
||||
updateIntIndex(bucketName, objName, `${term}minutes`, minutes, rowId);
|
||||
}
|
||||
|
||||
function updateFileSizeIndex(bucketName, objName, objVal, rowId) {
|
||||
updateIntIndex(bucketName, objName, `filesize`, parseInt(objVal, 10), rowId);
|
||||
}
|
||||
|
||||
function updateTagIndex(bucketName, objName, objVal, rowId) {
|
||||
const tags = [];
|
||||
Object.keys(objVal).forEach(elem => {
|
||||
if (elem.indexOf('x-amz-meta') !== -1 &&
|
||||
elem !== 'x-amz-meta-s3cmd-attrs') {
|
||||
tags.push({key: elem.replace('x-amz-meta-', ''), value: objVal[elem]});
|
||||
}
|
||||
});
|
||||
tags.forEach(tag => {
|
||||
if (tag.key.indexOf('--integer') !== -1) {
|
||||
tag.key = tag.key.replace('--integer', '');
|
||||
updateIntIndex(bucketName, objName, `x-amz-meta/${tag.key}`, parseInt(tag.value, 10), rowId);
|
||||
} else {
|
||||
writeDB(`${bucketName}/x-amz-meta/${tag.key}/${tag.value}`, objName, `${bucketName}/x-amz-meta`, `${tag.key}/${tag.value}`, rowId, (err) =>{
|
||||
return ;
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const index = {
|
||||
evaluateQuery: (queryTerms, params) => {
|
||||
const bucketName = params.bucketName;
|
||||
async.map(queryTerms, readIndex.bind(null, bucketName), function(err, queryTerms) {
|
||||
getObjectMeta(bucketName, (allObjects) => {
|
||||
let i;
|
||||
while (queryTerms.length > 1) {
|
||||
for (i = queryTerms.length - 1; i >= 0; i--) {
|
||||
if (queryTerms[i] === 'op/AND'
|
||||
|| queryTerms[i] === 'op/OR'
|
||||
|| queryTerms[i] === 'op/NOT') {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (queryTerms[i] === 'op/NOT') {
|
||||
const op1 = allObjects;
|
||||
const op2 = queryTerms[i + 1];
|
||||
queryTerms.splice(i, 2, difference(op1, op2));
|
||||
} else {
|
||||
const op1 = queryTerms[i + 1];
|
||||
const op2 = queryTerms[i + 2];
|
||||
if (queryTerms[i] === 'op/AND') {
|
||||
queryTerms.splice(i, 3, intersection(op1, op2));
|
||||
} else {
|
||||
queryTerms.splice(i, 3, union(op1, op2));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (config.backend === "leveldb") {
|
||||
indexd.get(`${bucketName}`, (err, objMapping) => {
|
||||
if (err) {
|
||||
return ;
|
||||
}
|
||||
objMapping = JSON.parse(objMapping);
|
||||
queryTerms = queryTerms[0].toString(':').split(':');
|
||||
queryTerms = queryTerms.map(function (elem) {
|
||||
return objMapping.mapping[elem];
|
||||
});
|
||||
indexd.respondQuery(params, queryTerms)
|
||||
});
|
||||
} else if (config.backend === "antidote") {
|
||||
filterRemoved(queryTerms[0], params, (results) => {
|
||||
indexd.respondQuery(params, results);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
updateIndex: (bucketName, objName, objVal) => {
|
||||
updateIndexMeta(bucketName, objName, (err, rowId) => {
|
||||
updateFileSizeIndex(bucketName, objName, objVal['content-length'], rowId);
|
||||
updateΜodDateIndex(bucketName, objName, objVal['last-modified'], rowId);
|
||||
updateContentTypeIndex(bucketName, objName, objVal['content-type'], rowId);
|
||||
updateACLIndex(bucketName, objName, objVal['acl'], rowId);
|
||||
updateTagIndex(bucketName, objName, objVal, rowId);
|
||||
});
|
||||
},
|
||||
|
||||
deleteObject: (bucketName, objName) => {
|
||||
if (config.backend === "leveldb") {
|
||||
indexd.get(`${bucketName}`, (err, objMapping) => {
|
||||
if (err) {
|
||||
return ;
|
||||
}
|
||||
objMapping = JSON.parse(objMapping);
|
||||
const rowId = objMapping.mapping[objName];
|
||||
delete objMapping.mapping[objName];
|
||||
delete objMapping.mapping[rowId];
|
||||
objMapping.nextAvail.push(rowId);
|
||||
indexd.put(`${bucketName}`, JSON.stringify(objMapping), err => {
|
||||
if (err) {
|
||||
return ;
|
||||
}
|
||||
return ;
|
||||
});
|
||||
});
|
||||
} else if (config.backend === "antidote") {
|
||||
indexd.updateAntidoteSet(`${bucketName}/removed`, objName, () => {});
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
exports.default = index;
|
|
@ -271,7 +271,7 @@ class Config {
|
|||
let metadata = 'file';
|
||||
let kms = 'file';
|
||||
if (process.env.S3BACKEND) {
|
||||
const validBackends = ['mem', 'file', 'scality'];
|
||||
const validBackends = ['mem', 'file', 'scality', 'antidote'];
|
||||
assert(validBackends.indexOf(process.env.S3BACKEND) > -1,
|
||||
'bad environment variable: S3BACKEND environment variable ' +
|
||||
'should be one of mem/file/scality'
|
||||
|
@ -284,7 +284,11 @@ class Config {
|
|||
if (process.env.S3VAULT) {
|
||||
auth = process.env.S3VAULT;
|
||||
}
|
||||
if (auth === 'file' || auth === 'mem') {
|
||||
if (data === 'antidote') {
|
||||
data = 'mem';
|
||||
kms = 'mem';
|
||||
}
|
||||
if (auth === 'file' || auth === 'mem' || auth === 'antidote') {
|
||||
// Auth only checks for 'mem' since mem === file
|
||||
auth = 'mem';
|
||||
let authfile = `${__dirname}/../conf/authdata.json`;
|
||||
|
@ -326,6 +330,32 @@ class Config {
|
|||
dataPath,
|
||||
metadataPath,
|
||||
};
|
||||
this.antidote = {};
|
||||
if (config.antidote) {
|
||||
if (config.antidote.port !== undefined) {
|
||||
assert(Number.isInteger(config.antidote.port)
|
||||
&& config.antidote.port > 0,
|
||||
'bad config: vaultd port must be a positive integer');
|
||||
this.antidote.port = config.antidote.port;
|
||||
}
|
||||
if (config.antidote.host !== undefined) {
|
||||
assert.strictEqual(typeof config.vaultd.host, 'string',
|
||||
'bad config: vaultd host must be a string');
|
||||
this.antidote.host = config.antidote.host;
|
||||
}
|
||||
}
|
||||
this.userMetaIndexing = false;
|
||||
if (config.userMetaIndexing !== undefined) {
|
||||
this.userMetaIndexing = config.userMetaIndexing;
|
||||
}
|
||||
this.systemMetaIndexing = false;
|
||||
if (config.systemMetaIndexing !== undefined) {
|
||||
this.systemMetaIndexing = config.systemMetaIndexing;
|
||||
}
|
||||
this.indexServerPort = 7000;
|
||||
if (config.indexServerPort !== undefined) {
|
||||
this.indexServerPort = config.indexServerPort;
|
||||
}
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import querystring from 'querystring';
|
||||
import indexClient from '../indexClient/indexClient';
|
||||
import constants from '../../constants';
|
||||
|
||||
import services from '../services';
|
||||
|
@ -70,6 +71,7 @@ export default function bucketGet(authInfo, request, log, callback) {
|
|||
delimiter: params.delimiter,
|
||||
marker: params.marker,
|
||||
prefix: params.prefix,
|
||||
query: indexClient.processQueryHeader(request.headers.query),
|
||||
};
|
||||
|
||||
services.metadataValidateAuthorization(metadataValParams, err => {
|
||||
|
@ -83,6 +85,7 @@ export default function bucketGet(authInfo, request, log, callback) {
|
|||
log.debug('error processing request', { error: err });
|
||||
return callback(err);
|
||||
}
|
||||
listParams.prefix = params.prefix;
|
||||
const xml = [];
|
||||
xml.push(
|
||||
'<?xml version="1.0" encoding="UTF-8"?>',
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
import http from 'http'
|
||||
import io from 'socket.io'
|
||||
import metadata from '../metadata/wrapper.js'
|
||||
|
||||
let iosock;
|
||||
let callback;
|
||||
|
||||
export default {
|
||||
createPublisher(port, log) {
|
||||
const server = http.createServer(function(req, res){
|
||||
});
|
||||
server.listen(port);
|
||||
iosock = io.listen(server);
|
||||
iosock.sockets.on('connection', function(socket) {
|
||||
log.info("indexing server connected")
|
||||
socket.on('disconnect', function() {
|
||||
});
|
||||
socket.on('subscribe', function(room) {
|
||||
socket.join(room);
|
||||
});
|
||||
socket.on('unsubscribe', function(room) {
|
||||
socket.leave(room);
|
||||
});
|
||||
socket.on('query_response', function(msg) {
|
||||
msg.params.cb = callback;
|
||||
metadata.respondQueryGetMD(msg.result, msg.params);
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
putObjectMD(bucketName, objName, objVal) {
|
||||
iosock.sockets.to('puts').emit('put', {
|
||||
bucketName,
|
||||
objName,
|
||||
objVal
|
||||
});
|
||||
},
|
||||
|
||||
listObject(bucketName, query, prefix, marker, delimiter, maxKeys, cb) {
|
||||
callback = cb;
|
||||
iosock.sockets.to('queries').emit('query', {
|
||||
query,
|
||||
params : {
|
||||
bucketName,
|
||||
prefix,
|
||||
marker,
|
||||
maxKeys,
|
||||
delimiter,
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
deleteObjectMD(bucketName, objName) {
|
||||
iosock.sockets.to('deletes').emit('delete', {
|
||||
bucketName,
|
||||
objName
|
||||
});
|
||||
},
|
||||
|
||||
processQueryHeader(header) {
|
||||
if (!header) {
|
||||
return header;
|
||||
}
|
||||
const queryTerms = header.split('&');
|
||||
const query = [];
|
||||
for (let i = 0; i < queryTerms.length; i++) {
|
||||
query.push(queryTerms[i]);
|
||||
}
|
||||
return query;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
|
||||
import ListResult from '../in_memory/ListResult';
|
||||
|
||||
export class ListBucketResult extends ListResult {
|
||||
constructor() {
|
||||
super();
|
||||
this.Contents = [];
|
||||
}
|
||||
|
||||
addContentsKey(key, keyMap) {
|
||||
const objectMD = keyMap;
|
||||
this.Contents.push({
|
||||
key,
|
||||
value: {
|
||||
LastModified: objectMD['last-modified'],
|
||||
ETag: objectMD['content-md5'],
|
||||
StorageClass: objectMD['x-amz-storage-class'],
|
||||
Owner: {
|
||||
DisplayName: objectMD['owner-display-name'],
|
||||
ID: objectMD['owner-id'],
|
||||
},
|
||||
Size: objectMD['content-length'],
|
||||
// Initiated is used for overview of MPU
|
||||
Initiated: objectMD.initiated,
|
||||
// Initiator is used for overview of MPU.
|
||||
// It is an object containing DisplayName
|
||||
// and ID
|
||||
Initiator: objectMD.initiator,
|
||||
// EventualStorageBucket is used for overview of MPU
|
||||
EventualStorageBucket: objectMD.eventualStorageBucket,
|
||||
// Used for parts of MPU
|
||||
partLocations: objectMD.partLocations,
|
||||
// creationDate is just used for serviceGet
|
||||
creationDate: objectMD.creationDate,
|
||||
},
|
||||
});
|
||||
this.MaxKeys += 1;
|
||||
}
|
||||
|
||||
hasDeleteMarker(key, keyMap) {
|
||||
const objectMD = keyMap;
|
||||
if (objectMD['x-amz-delete-marker'] !== undefined) {
|
||||
return (objectMD['x-amz-delete-marker'] === true);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,309 @@
|
|||
import { errors } from 'arsenal';
|
||||
|
||||
import { markerFilter, prefixFilter } from '../in_memory/bucket_utilities';
|
||||
import { ListBucketResult } from './ListBucketResult';
|
||||
import getMultipartUploadListing from '../in_memory/getMultipartUploadListing';
|
||||
import config from '../../Config';
|
||||
import async from 'async';
|
||||
import antidoteClient from 'antidote_ts_client';
|
||||
|
||||
const defaultMaxKeys = 1000;
|
||||
|
||||
var replacer = function(key, value) {
|
||||
if (value === undefined){
|
||||
return null;
|
||||
}
|
||||
return value;
|
||||
};
|
||||
|
||||
var reviver = function(key, value) {
|
||||
if (value === null){
|
||||
return undefined;
|
||||
}
|
||||
return value;
|
||||
};
|
||||
|
||||
class AntidoteInterface {
|
||||
constructor() {
|
||||
this.antidote = antidoteClient.connect(config.antidote.port, config.antidote.host);
|
||||
}
|
||||
|
||||
createBucket(bucketName, bucketMD, log, cb) {
|
||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||
// TODO Check whether user already owns the bucket,
|
||||
// if so return "BucketAlreadyOwnedByYou"
|
||||
// If not owned by user, return "BucketAlreadyExists"
|
||||
if (bucket) {
|
||||
return cb(errors.BucketAlreadyExists);
|
||||
}
|
||||
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let bucket_MD = this.antidote.map(`${bucketName}/md`)
|
||||
this.antidote.update([
|
||||
bucket_MD.register('md').set(bucketMD)
|
||||
]).then( (resp) => {
|
||||
return cb();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
putBucketAttributes(bucketName, bucketMD, log, cb) {
|
||||
this.getBucketAttributes(bucketName, log, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let bucket_MD = this.antidote.map(`${bucketName}/md`)
|
||||
this.antidote.update([
|
||||
bucket_MD.register('md').set(bucketMD)
|
||||
]).then( (resp) => {
|
||||
return cb();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getBucketAttributes(bucketName, log, cb) {
|
||||
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let bucket_MD = this.antidote.map(`${bucketName}/md`)
|
||||
bucket_MD.read().then(bucketMD => {
|
||||
bucketMD = bucketMD.toJsObject();
|
||||
if (Object.keys(bucketMD).length === 0) {
|
||||
return cb(errors.NoSuchBucket);
|
||||
}
|
||||
return cb(null, bucketMD['md']);
|
||||
});
|
||||
}
|
||||
|
||||
deleteBucket(bucketName, log, cb) {
|
||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
|
||||
bucket_Objs.read().then(objects => {
|
||||
if (bucket && objects.length > 0) {
|
||||
return cb(errors.BucketNotEmpty);
|
||||
}
|
||||
let bucket_MD = this.antidote.map(`${bucketName}/md`)
|
||||
this.antidote.update([
|
||||
bucket_MD.remove(bucket_MD.register('md')),
|
||||
]).then( (resp) => {
|
||||
return cb(null);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
putObject(bucketName, objName, objVal, log, cb) {
|
||||
this.getBucketAttributes(bucketName, log, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
|
||||
let object_MD = this.antidote.map(`${objName}`);
|
||||
this.antidote.update([
|
||||
bucket_Objs.add(objName),
|
||||
object_MD.register('md').set(objVal)
|
||||
]).then( (resp) => {
|
||||
return cb();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getBucketAndObject(bucketName, objName, log, cb) {
|
||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
return cb(err, { bucket });
|
||||
}
|
||||
const bucket_MD = {}
|
||||
Object.keys(bucket).map(function(key) {
|
||||
bucket_MD[key.substr(1)] = bucket[key]
|
||||
});
|
||||
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let object_MD = this.antidote.map(`${objName}`);
|
||||
object_MD.read().then(objectMD => {
|
||||
objectMD = objectMD.toJsObject();
|
||||
|
||||
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||
return cb(null, { bucket: JSON.stringify(bucket_MD) });
|
||||
}
|
||||
return cb(null, {
|
||||
bucket: JSON.stringify(bucket_MD),
|
||||
obj: JSON.stringify(objectMD['md']),
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getObject(bucketName, objName, log, cb) {
|
||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let object_MD = this.antidote.map(`${objName}`);
|
||||
object_MD.read().then(objectMD => {
|
||||
objectMD = objectMD.toJsObject();
|
||||
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||
return cb(errors.NoSuchKey);
|
||||
}
|
||||
return cb(null, objectMD['md']);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
deleteObject(bucketName, objName, log, cb) {
|
||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let object_MD = this.antidote.map(`${objName}`);
|
||||
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
|
||||
object_MD.read().then(objectMD => {
|
||||
objectMD = objectMD.toJsObject();
|
||||
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||
return cb(errors.NoSuchKey);
|
||||
}
|
||||
this.antidote.update([
|
||||
object_MD.remove(object_MD.register('md')),
|
||||
bucket_Objs.remove(objName)
|
||||
]).then( (resp) => {
|
||||
return cb();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getObjectMD(antidote, bucketName, key, callback) {
|
||||
antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let object_MD = antidote.map(`${key}`);
|
||||
object_MD.read().then(objectMD => {
|
||||
objectMD = objectMD.toJsObject();
|
||||
if (Object.keys(objectMD).length === 0) {
|
||||
return callback(error.NoSuchKey, null);
|
||||
}
|
||||
return callback(null, objectMD['md']);
|
||||
});
|
||||
}
|
||||
|
||||
listObject(bucketName, params, log, cb) {
|
||||
const { prefix, marker, delimiter, maxKeys } = params;
|
||||
if (prefix && typeof prefix !== 'string') {
|
||||
return cb(errors.InvalidArgument);
|
||||
}
|
||||
|
||||
if (marker && typeof marker !== 'string') {
|
||||
return cb(errors.InvalidArgument);
|
||||
}
|
||||
|
||||
if (delimiter && typeof delimiter !== 'string') {
|
||||
return cb(errors.InvalidArgument);
|
||||
}
|
||||
|
||||
if (maxKeys && typeof maxKeys !== 'number') {
|
||||
return cb(errors.InvalidArgument);
|
||||
}
|
||||
|
||||
let numKeys = maxKeys;
|
||||
// If paramMaxKeys is undefined, the default parameter will set it.
|
||||
// However, if it is null, the default parameter will not set it.
|
||||
if (numKeys === null) {
|
||||
numKeys = defaultMaxKeys;
|
||||
}
|
||||
|
||||
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let bucket_MD = this.antidote.map(`${bucketName}/md`)
|
||||
bucket_MD.read().then(bucketMD => {
|
||||
bucketMD = bucketMD.toJsObject();
|
||||
if (Object.keys(bucketMD).length === 0) {
|
||||
return cb(errors.NoSuchBucket);
|
||||
}
|
||||
const response = new ListBucketResult();
|
||||
|
||||
this.antidote.defaultBucket = `storage/${bucketName}`;
|
||||
let bucket_Objs = this.antidote.set(`${bucketName}/objs`);
|
||||
bucket_Objs.read().then(keys => {
|
||||
|
||||
async.map(keys, this.getObjectMD.bind(null, this.antidote, bucketName), function(err, objectMeta) {
|
||||
|
||||
// If marker specified, edit the keys array so it
|
||||
// only contains keys that occur alphabetically after the marker
|
||||
if (marker) {
|
||||
keys = markerFilter(marker, keys);
|
||||
response.Marker = marker;
|
||||
}
|
||||
// If prefix specified, edit the keys array so it only
|
||||
// contains keys that contain the prefix
|
||||
if (prefix) {
|
||||
keys = prefixFilter(prefix, keys);
|
||||
response.Prefix = prefix;
|
||||
}
|
||||
// Iterate through keys array and filter keys containing
|
||||
// delimiter into response.CommonPrefixes and filter remaining
|
||||
// keys into response.Contents
|
||||
for (let i = 0; i < keys.length; ++i) {
|
||||
const currentKey = keys[i];
|
||||
// Do not list object with delete markers
|
||||
if (response.hasDeleteMarker(currentKey,
|
||||
objectMeta[i])) {
|
||||
continue;
|
||||
}
|
||||
// If hit numKeys, stop adding keys to response
|
||||
if (response.MaxKeys >= numKeys) {
|
||||
response.IsTruncated = true;
|
||||
response.NextMarker = keys[i - 1];
|
||||
break;
|
||||
}
|
||||
// If a delimiter is specified, find its index in the
|
||||
// current key AFTER THE OCCURRENCE OF THE PREFIX
|
||||
let delimiterIndexAfterPrefix = -1;
|
||||
let prefixLength = 0;
|
||||
if (prefix) {
|
||||
prefixLength = prefix.length;
|
||||
}
|
||||
const currentKeyWithoutPrefix = currentKey
|
||||
.slice(prefixLength);
|
||||
let sliceEnd;
|
||||
if (delimiter) {
|
||||
delimiterIndexAfterPrefix = currentKeyWithoutPrefix
|
||||
.indexOf(delimiter);
|
||||
sliceEnd = delimiterIndexAfterPrefix + prefixLength;
|
||||
response.Delimiter = delimiter;
|
||||
}
|
||||
// If delimiter occurs in current key, add key to
|
||||
// response.CommonPrefixes.
|
||||
// Otherwise add key to response.Contents
|
||||
if (delimiterIndexAfterPrefix > -1) {
|
||||
const keySubstring = currentKey.slice(0, sliceEnd + 1);
|
||||
response.addCommonPrefix(keySubstring);
|
||||
} else {
|
||||
response.addContentsKey(currentKey,
|
||||
objectMeta[i]);
|
||||
}
|
||||
}
|
||||
return cb(null, response);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
listMultipartUploads(bucketName, listingParams, log, cb) {
|
||||
process.nextTick(() => {
|
||||
this.getBucketAttributes(bucketName, log, (err, bucket) => {
|
||||
if (bucket === undefined) {
|
||||
// no on going multipart uploads, return empty listing
|
||||
return cb(null, {
|
||||
IsTruncated: false,
|
||||
NextMarker: undefined,
|
||||
MaxKeys: 0,
|
||||
});
|
||||
}
|
||||
return getMultipartUploadListing(bucket, listingParams, cb);
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
export default AntidoteInterface;
|
|
@ -2,7 +2,10 @@ import BucketClientInterface from './bucketclient/backend';
|
|||
import BucketFileInterface from './bucketfile/backend';
|
||||
import BucketInfo from './BucketInfo';
|
||||
import inMemory from './in_memory/backend';
|
||||
import AntidoteInterface from './antidote/backend';
|
||||
import config from '../Config';
|
||||
import indexClient from '../indexClient/indexClient'
|
||||
import async from 'async'
|
||||
|
||||
let client;
|
||||
let implName;
|
||||
|
@ -16,6 +19,33 @@ if (config.backends.metadata === 'mem') {
|
|||
} else if (config.backends.metadata === 'scality') {
|
||||
client = new BucketClientInterface();
|
||||
implName = 'bucketclient';
|
||||
} else if (config.backends.metadata === 'antidote') {
|
||||
client = new AntidoteInterface();
|
||||
implName = 'antidote';
|
||||
}
|
||||
|
||||
function getQueryResults(params, objName, callback) {
|
||||
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
|
||||
client.getObject(bucketName, objName, log, (err, data) => {
|
||||
if (err) {
|
||||
callback(err, null);
|
||||
}
|
||||
callback(null, {key: objName, value: {
|
||||
LastModified: data['last-modified'],
|
||||
ETag: data['content-md5'],
|
||||
StorageClass: data['x-amz-storage-class'],
|
||||
Owner: {
|
||||
ID: data['owner-id'],
|
||||
DisplayName: data['owner-display-name']
|
||||
},
|
||||
Size: data['content-length'],
|
||||
Initiated: undefined,
|
||||
Initiator: undefined,
|
||||
EventualStorageBucket: undefined,
|
||||
partLocations: undefined,
|
||||
creationDate: undefined
|
||||
}});
|
||||
});
|
||||
}
|
||||
|
||||
const metadata = {
|
||||
|
@ -75,6 +105,12 @@ const metadata = {
|
|||
return cb(err);
|
||||
}
|
||||
log.debug('object successfully put in metadata');
|
||||
if (config.userMetaIndexing || config.systemMetaIndexing) {
|
||||
if (objName.indexOf('..|..') !== -1) {
|
||||
return cb(err);
|
||||
}
|
||||
indexClient.putObjectMD(bucketName, objName, objVal);
|
||||
}
|
||||
return cb(err);
|
||||
}, params);
|
||||
},
|
||||
|
@ -113,11 +149,17 @@ const metadata = {
|
|||
return cb(err);
|
||||
}
|
||||
log.debug('object deleted from metadata');
|
||||
if (config.userMetaIndexing || config.systemMetaIndexing) {
|
||||
indexClient.deleteObjectMD(bucketName, objName);
|
||||
}
|
||||
return cb(err);
|
||||
}, params);
|
||||
},
|
||||
|
||||
listObject: (bucketName, listingParams, log, cb) => {
|
||||
if (listingParams.query) {
|
||||
indexClient.listObject(bucketName, listingParams.query, listingParams.prefix, listingParams.marker,listingParams.delimiter, listingParams.maxKeys, cb);
|
||||
} else {
|
||||
client
|
||||
.listObject(bucketName, listingParams,
|
||||
log, (err, data) => {
|
||||
|
@ -129,6 +171,34 @@ const metadata = {
|
|||
log.debug('object listing retrieved from metadata');
|
||||
return cb(err, data);
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
respondQueryGetMD: (result, params) => {
|
||||
async.map(result, getQueryResults.bind(null, params), function(err, res) {
|
||||
const response = {
|
||||
IsTruncated: false,
|
||||
NextMarker: params.marker,
|
||||
CommonPrefixes: [],
|
||||
MaxKeys: 10,
|
||||
Contents: res
|
||||
}
|
||||
return params.cb(err, response);
|
||||
});
|
||||
},
|
||||
|
||||
respondQueryFilter: (result, params) => {
|
||||
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
|
||||
client.listObject(bucketName, { prefix, marker, maxKeys, delimiter },
|
||||
log, (err, data) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
data.Contents = data.Contents.filter(elem => {
|
||||
return result.indexOf(elem.key) !== -1;
|
||||
});
|
||||
return cb(err, data);
|
||||
});
|
||||
},
|
||||
|
||||
listMultipartUploads: (bucketName, listingParams, log, cb) => {
|
||||
|
|
|
@ -7,7 +7,7 @@ import { logger } from './utilities/logger';
|
|||
import { clientCheck } from './utilities/healthcheckHandler';
|
||||
import _config from './Config';
|
||||
import routes from './routes';
|
||||
|
||||
import indexClient from './indexClient/indexClient'
|
||||
|
||||
class S3Server {
|
||||
/**
|
||||
|
@ -38,6 +38,7 @@ class S3Server {
|
|||
* This starts the http server.
|
||||
*/
|
||||
startup() {
|
||||
indexClient.createPublisher(_config.indexServerPort, logger)
|
||||
// Todo: http.globalAgent.maxSockets, http.globalAgent.maxFreeSockets
|
||||
if (_config.https) {
|
||||
this.server = https.createServer({
|
||||
|
@ -118,7 +119,7 @@ class S3Server {
|
|||
|
||||
export default function main() {
|
||||
let clusters = _config.clusters || 1;
|
||||
if (process.env.S3BACKEND === 'mem') {
|
||||
if (process.env.S3BACKEND === 'mem' || process.env.S3BACKEND === 'antidote') {
|
||||
clusters = 1;
|
||||
}
|
||||
if (cluster.isMaster) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
},
|
||||
"homepage": "https://github.com/scality/S3#readme",
|
||||
"dependencies": {
|
||||
"antidote_ts_client": "^0.1.0",
|
||||
"arsenal": "scality/Arsenal",
|
||||
"async": "~1.4.2",
|
||||
"babel-core": "^6.5.2",
|
||||
|
@ -31,8 +32,10 @@
|
|||
"level": "^1.4.0",
|
||||
"level-sublevel": "^6.5.4",
|
||||
"multilevel": "^7.3.0",
|
||||
"node-bitmap-ewah": "dimitriosvasilas/node-bitmap-ewah",
|
||||
"node-uuid": "^1.4.3",
|
||||
"ready-set-stream": "1.0.7",
|
||||
"socket.io": "^1.7.2",
|
||||
"sproxydclient": "scality/sproxydclient",
|
||||
"utapi": "scality/utapi",
|
||||
"utf8": "~2.1.1",
|
||||
|
@ -71,6 +74,7 @@
|
|||
"lint": "eslint $(git ls-files '*.js')",
|
||||
"lint_md": "mdlint $(git ls-files '*.md')",
|
||||
"mem_backend": "S3BACKEND=mem node index.js",
|
||||
"antidote_backend": "S3BACKEND=antidote node index.js",
|
||||
"perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js",
|
||||
"start": "node init.js && node index.js",
|
||||
"start_utapi": "node utapiServer.js",
|
||||
|
|
Loading…
Reference in New Issue