Compare commits
47 Commits
developmen
...
proto/S3+A
Author | SHA1 | Date |
---|---|---|
Dimitrios Vasilas | bd4487eb07 | |
Dimitrios Vasilas | 97f4d02a15 | |
Dimitrios Vasilas | 6a543567c4 | |
Dimitrios Vasilas | 2dc0618c80 | |
Dimitrios Vasilas | 1dde0aceff | |
Dimitrios Vasilas | 565d027c7d | |
Dimitrios Vasilas | c3e20cecce | |
Dimitrios Vasilas | 9b4ea5f61c | |
Dimitrios Vasilas | 913103cb15 | |
Dimitrios Vasilas | 539e025895 | |
Dimitrios Vasilas | 34f37c15dc | |
Dimitrios Vasilas | 8eabfb035e | |
Dimitrios Vasilas | 8f274252ce | |
Dimitrios Vasilas | e27cb1cf6c | |
Dimitrios Vasilas | 9c8c8fd9ea | |
Dimitrios Vasilas | a236fd4457 | |
Dimitrios Vasilas | e76472fa20 | |
Dimitrios Vasilas | e6f82079c6 | |
Dimitrios Vasilas | cfb77cc9fe | |
Dimitrios Vasilas | 17a9cd1071 | |
Dimitrios Vasilas | 8ce70e7337 | |
Dimitrios Vasilas | cdbcae37b2 | |
Dimitrios Vasilas | 0f5cd3e545 | |
Dimitrios Vasilas | f6cddaf6a5 | |
Dimitrios Vasilas | c225e483bc | |
Dimitrios Vasilas | 53cb8a3263 | |
Dimitrios Vasilas | d302070959 | |
Dimitrios Vasilas | 7597c379e7 | |
Dimitrios Vasilas | 541e97f1ea | |
Dimitrios Vasilas | c32fe7df51 | |
Dimitrios Vasilas | 9e301aaf25 | |
Dimitrios Vasilas | eb441dd09c | |
Dimitrios Vasilas | 6fddd95a1d | |
Dimitrios Vasilas | 1303bd8b97 | |
Dimitrios Vasilas | 093c071836 | |
Dimitrios Vasilas | da115e6ce4 | |
Dimitrios Vasilas | b588d5e4d0 | |
Dimitrios Vasilas | 5f1bd15fb0 | |
Dimitrios Vasilas | 22a783f23e | |
Dimitrios Vasilas | 320fc256bf | |
Dimitrios Vasilas | 361585825f | |
Dimitrios Vasilas | e390e355ef | |
Dimitrios Vasilas | f03940e45e | |
Dimitrios Vasilas | e67abad697 | |
Dimitrios Vasilas | 9a42556448 | |
Dimitrios Vasilas | 3056714c02 | |
Dimitrios Vasilas | d871755296 |
|
@ -48,6 +48,13 @@
|
|||
"logLevel": "info",
|
||||
"dumpLevel": "error"
|
||||
},
|
||||
"antidote": {
|
||||
"host": "localhost",
|
||||
"port": 8087
|
||||
},
|
||||
"userMetaIndexing":true,
|
||||
"systemMetaIndexing":true,
|
||||
"indexServerPort":7000,
|
||||
"healthChecks": {
|
||||
"allowFrom": ["127.0.0.1/8", "::1"]
|
||||
},
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
#!/bin/bash
|
||||
for i in $(seq 1 $1);
|
||||
do
|
||||
node ./portConfig.js
|
||||
npm run antidote_backend &
|
||||
sleep 3
|
||||
done
|
|
@ -276,7 +276,7 @@ class Config {
|
|||
let metadata = 'file';
|
||||
let kms = 'file';
|
||||
if (process.env.S3BACKEND) {
|
||||
const validBackends = ['mem', 'file', 'scality'];
|
||||
const validBackends = ['mem', 'file', 'scality', 'antidote'];
|
||||
assert(validBackends.indexOf(process.env.S3BACKEND) > -1,
|
||||
'bad environment variable: S3BACKEND environment variable ' +
|
||||
'should be one of mem/file/scality'
|
||||
|
@ -289,7 +289,11 @@ class Config {
|
|||
if (process.env.S3VAULT) {
|
||||
auth = process.env.S3VAULT;
|
||||
}
|
||||
if (auth === 'file' || auth === 'mem') {
|
||||
if (data === 'antidote') {
|
||||
data = 'mem';
|
||||
kms = 'mem';
|
||||
}
|
||||
if (auth === 'file' || auth === 'mem' || auth === 'antidote') {
|
||||
// Auth only checks for 'mem' since mem === file
|
||||
auth = 'mem';
|
||||
let authfile = `${__dirname}/../conf/authdata.json`;
|
||||
|
@ -331,6 +335,32 @@ class Config {
|
|||
dataPath,
|
||||
metadataPath,
|
||||
};
|
||||
this.antidote = {};
|
||||
if (config.antidote) {
|
||||
if (config.antidote.port !== undefined) {
|
||||
assert(Number.isInteger(config.antidote.port)
|
||||
&& config.antidote.port > 0,
|
||||
'bad config: vaultd port must be a positive integer');
|
||||
this.antidote.port = config.antidote.port;
|
||||
}
|
||||
if (config.antidote.host !== undefined) {
|
||||
assert.strictEqual(typeof config.vaultd.host, 'string',
|
||||
'bad config: vaultd host must be a string');
|
||||
this.antidote.host = config.antidote.host;
|
||||
}
|
||||
}
|
||||
this.userMetaIndexing = false;
|
||||
if (config.userMetaIndexing !== undefined) {
|
||||
this.userMetaIndexing = config.userMetaIndexing;
|
||||
}
|
||||
this.systemMetaIndexing = false;
|
||||
if (config.systemMetaIndexing !== undefined) {
|
||||
this.systemMetaIndexing = config.systemMetaIndexing;
|
||||
}
|
||||
this.indexServerPort = 7000;
|
||||
if (config.indexServerPort !== undefined) {
|
||||
this.indexServerPort = config.indexServerPort;
|
||||
}
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
'use strict';
|
||||
|
||||
let startTx = (() => {
|
||||
var _ref = _asyncToGenerator(function* (antidote, cb) {
|
||||
let tx = yield antidote.startTransaction();
|
||||
return cb(tx);
|
||||
});
|
||||
|
||||
return function startTx(_x, _x2) {
|
||||
return _ref.apply(this, arguments);
|
||||
};
|
||||
})();
|
||||
|
||||
let commitTx = (() => {
|
||||
var _ref2 = _asyncToGenerator(function* (tx, cb) {
|
||||
yield tx.commit();
|
||||
return cb();
|
||||
});
|
||||
|
||||
return function commitTx(_x3, _x4) {
|
||||
return _ref2.apply(this, arguments);
|
||||
};
|
||||
})();
|
||||
|
||||
let readSetTx = (() => {
|
||||
var _ref3 = _asyncToGenerator(function* (tx, setKey, cb) {
|
||||
let set = tx.set(setKey);
|
||||
let result = yield set.read();
|
||||
return cb(tx, result);
|
||||
});
|
||||
|
||||
return function readSetTx(_x5, _x6, _x7) {
|
||||
return _ref3.apply(this, arguments);
|
||||
};
|
||||
})();
|
||||
|
||||
let updateSetTx = (() => {
|
||||
var _ref4 = _asyncToGenerator(function* (tx, setKey, keys, cb) {
|
||||
let set = tx.set(setKey);
|
||||
let ops = [];
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
ops.push(set.add(keys[i]));
|
||||
}
|
||||
yield tx.update(ops);
|
||||
return cb(tx);
|
||||
});
|
||||
|
||||
return function updateSetTx(_x8, _x9, _x10, _x11) {
|
||||
return _ref4.apply(this, arguments);
|
||||
};
|
||||
})();
|
||||
|
||||
let removeSetTx = (() => {
|
||||
var _ref5 = _asyncToGenerator(function* (tx, setKey, keys, cb) {
|
||||
let set = tx.set(setKey);
|
||||
let ops = [];
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
ops.push(set.remove(keys[i]));
|
||||
}
|
||||
yield tx.update(ops);
|
||||
return cb(tx);
|
||||
});
|
||||
|
||||
return function removeSetTx(_x12, _x13, _x14, _x15) {
|
||||
return _ref5.apply(this, arguments);
|
||||
};
|
||||
})();
|
||||
|
||||
let readMapTx = (() => {
|
||||
var _ref6 = _asyncToGenerator(function* (tx, mapKey, cb) {
|
||||
let map = tx.map(mapKey);
|
||||
let result = yield map.read();
|
||||
result = result.toJsObject();
|
||||
return cb(tx, result);
|
||||
});
|
||||
|
||||
return function readMapTx(_x16, _x17, _x18) {
|
||||
return _ref6.apply(this, arguments);
|
||||
};
|
||||
})();
|
||||
|
||||
let updateMapRegisterTx = (() => {
|
||||
var _ref7 = _asyncToGenerator(function* (tx, mapKey, keys, values, cb) {
|
||||
let map = tx.map(mapKey);
|
||||
let ops = [];
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
ops.push(map.register(keys[i]).set(values[i]));
|
||||
}
|
||||
yield tx.update(ops);
|
||||
return cb(tx);
|
||||
});
|
||||
|
||||
return function updateMapRegisterTx(_x19, _x20, _x21, _x22, _x23) {
|
||||
return _ref7.apply(this, arguments);
|
||||
};
|
||||
})();
|
||||
|
||||
let removeMapRegisterTx = (() => {
|
||||
var _ref8 = _asyncToGenerator(function* (tx, mapKey, keys, cb) {
|
||||
let map = tx.map(mapKey);
|
||||
let ops = [];
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
ops.push(map.remove(map.register(keys[i])));
|
||||
}
|
||||
yield tx.update(ops);
|
||||
return cb(tx);
|
||||
});
|
||||
|
||||
return function removeMapRegisterTx(_x24, _x25, _x26, _x27) {
|
||||
return _ref8.apply(this, arguments);
|
||||
};
|
||||
})();
|
||||
|
||||
function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
|
||||
|
||||
var antidoteClient = require('antidote_ts_client');
|
||||
|
||||
function connect(port, host) {
|
||||
return antidoteClient.connect(port, host);
|
||||
}
|
||||
|
||||
function setBucket(antidote, bucket) {
|
||||
return antidote.defaultBucket = bucket;
|
||||
}
|
||||
|
||||
exports.connect = connect;
|
||||
exports.setBucket = setBucket;
|
||||
exports.startTx = startTx;
|
||||
exports.commitTx = commitTx;
|
||||
exports.readMapTx = readMapTx;
|
||||
exports.readSetTx = readSetTx;
|
||||
exports.updateMapRegisterTx = updateMapRegisterTx;
|
||||
exports.removeMapRegisterTx = removeMapRegisterTx;
|
||||
exports.removeSetTx = removeSetTx;
|
||||
exports.updateSetTx = updateSetTx;
|
|
@ -0,0 +1,83 @@
|
|||
var antidoteClient = require('antidote_ts_client')
|
||||
|
||||
function connect(port, host) {
|
||||
return antidoteClient.connect(port, host);
|
||||
}
|
||||
|
||||
function setBucket(antidote, bucket) {
|
||||
return antidote.defaultBucket = bucket;
|
||||
}
|
||||
|
||||
async function startTx(antidote, cb) {
|
||||
let tx = await antidote.startTransaction()
|
||||
return cb(tx)
|
||||
}
|
||||
|
||||
async function commitTx(tx, cb) {
|
||||
await tx.commit()
|
||||
return cb();
|
||||
}
|
||||
|
||||
async function readSetTx(tx, setKey, cb) {
|
||||
let set = tx.set(setKey);
|
||||
let result = await set.read();
|
||||
return cb(tx, result)
|
||||
}
|
||||
|
||||
async function updateSetTx(tx, setKey, keys, cb) {
|
||||
let set = tx.set(setKey);
|
||||
let ops = [];
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
ops.push(set.add(keys[i]));
|
||||
}
|
||||
await tx.update(ops);
|
||||
return cb(tx);
|
||||
}
|
||||
|
||||
async function removeSetTx(tx, setKey, keys, cb) {
|
||||
let set = tx.set(setKey);
|
||||
let ops = [];
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
ops.push(set.remove(keys[i]));
|
||||
}
|
||||
await tx.update(ops);
|
||||
return cb(tx);
|
||||
}
|
||||
|
||||
async function readMapTx(tx, mapKey, cb) {
|
||||
let map = tx.map(mapKey);
|
||||
let result = await map.read();
|
||||
result = result.toJsObject();
|
||||
return cb(tx, result)
|
||||
}
|
||||
|
||||
async function updateMapRegisterTx(tx, mapKey, keys, values, cb) {
|
||||
let map = tx.map(mapKey);
|
||||
let ops = [];
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
ops.push(map.register(keys[i]).set(values[i]))
|
||||
}
|
||||
await tx.update(ops);
|
||||
return cb(tx)
|
||||
}
|
||||
|
||||
async function removeMapRegisterTx(tx, mapKey, keys, cb) {
|
||||
let map = tx.map(mapKey);
|
||||
let ops = [];
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
ops.push(map.remove(map.register(keys[i])))
|
||||
}
|
||||
await tx.update(ops);
|
||||
return cb(tx);
|
||||
}
|
||||
|
||||
exports.connect = connect;
|
||||
exports.setBucket = setBucket;
|
||||
exports.startTx = startTx;
|
||||
exports.commitTx = commitTx;
|
||||
exports.readMapTx = readMapTx;
|
||||
exports.readSetTx = readSetTx;
|
||||
exports.updateMapRegisterTx = updateMapRegisterTx;
|
||||
exports.removeMapRegisterTx = removeMapRegisterTx;
|
||||
exports.removeSetTx = removeSetTx;
|
||||
exports.updateSetTx = updateSetTx;
|
|
@ -1,4 +1,5 @@
|
|||
import querystring from 'querystring';
|
||||
import indexClient from '../indexClient/indexClient';
|
||||
import constants from '../../constants';
|
||||
|
||||
import services from '../services';
|
||||
|
@ -65,11 +66,17 @@ export default function bucketGet(authInfo, request, log, callback) {
|
|||
requestType: 'bucketGet',
|
||||
log,
|
||||
};
|
||||
let query = null;
|
||||
if (params.prefix) {
|
||||
query = params.prefix.split("//")[1];
|
||||
params.prefix = params.prefix.split("//")[0];
|
||||
}
|
||||
const listParams = {
|
||||
maxKeys: actualMaxKeys,
|
||||
delimiter: params.delimiter,
|
||||
marker: params.marker,
|
||||
prefix: params.prefix,
|
||||
query: indexClient.processQueryHeader(query),
|
||||
};
|
||||
|
||||
services.metadataValidateAuthorization(metadataValParams, err => {
|
||||
|
@ -83,6 +90,7 @@ export default function bucketGet(authInfo, request, log, callback) {
|
|||
log.debug('error processing request', { error: err });
|
||||
return callback(err);
|
||||
}
|
||||
listParams.prefix = params.prefix;
|
||||
const xml = [];
|
||||
xml.push(
|
||||
'<?xml version="1.0" encoding="UTF-8"?>',
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
import http from "http"
|
||||
import io from "socket.io"
|
||||
import metadata from "../metadata/wrapper.js"
|
||||
const _ = require("underscore")
|
||||
|
||||
let iosock;
|
||||
let queries = [];
|
||||
|
||||
function intersection(a, b) {
|
||||
return _.intersection(a, b);
|
||||
}
|
||||
|
||||
function union(a, b) {
|
||||
return _.union(a, b);
|
||||
}
|
||||
|
||||
function difference(u, a) {
|
||||
return _.difference(u, a);
|
||||
}
|
||||
|
||||
function proccessLogicalOps(terms) {
|
||||
let i;
|
||||
while (terms.length > 1) {
|
||||
for (i = terms.length - 1; i >= 0; i--) {
|
||||
if (terms[i] === "op/AND"
|
||||
|| terms[i] === "op/OR"
|
||||
|| terms[i] === "op/NOT") {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (terms[i] === "op/AND") {
|
||||
const op1 = terms[i + 1];
|
||||
const op2 = terms[i + 2];
|
||||
terms.splice(i, 3, intersection(op1, op2));
|
||||
} else if (terms[i] === "op/OR") {
|
||||
const op1 = terms[i + 1];
|
||||
const op2 = terms[i + 2];
|
||||
terms.splice(i, 3, union(op1, op2));
|
||||
}
|
||||
}
|
||||
return terms[0];
|
||||
}
|
||||
|
||||
export default {
|
||||
createPublisher(port, log) {
|
||||
const server = http.createServer(function(req, res){
|
||||
});
|
||||
server.listen(port);
|
||||
iosock = io.listen(server);
|
||||
iosock.sockets.on("connection", function(socket) {
|
||||
log.info("indexing server connected")
|
||||
socket.on("disconnect", function() {
|
||||
});
|
||||
socket.on("subscribe", function(room) {
|
||||
socket.join(room);
|
||||
});
|
||||
socket.on("query_response", function(msg) {
|
||||
for (let i = 0; i < queries.length; i++) {
|
||||
if (queries[i].id === msg.id) {
|
||||
queries[i].responses -=1
|
||||
let term_index = queries[i].query.indexOf(msg.term);
|
||||
queries[i].query[term_index] = msg.result;
|
||||
if (queries[i].responses === 0) {
|
||||
let result = proccessLogicalOps(queries[i].query);
|
||||
let listingParams = queries[i].listingParams;
|
||||
queries.splice(i, 1)
|
||||
metadata.respondQueryGetMD(result, listingParams);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
putObjectMD(bucketName, objName, objVal) {
|
||||
let msg = {
|
||||
bucketName,
|
||||
objName,
|
||||
objVal
|
||||
};
|
||||
if (iosock.sockets.adapter.rooms["put"]) {
|
||||
iosock.sockets.to("put").emit("put", msg);
|
||||
}
|
||||
},
|
||||
|
||||
listObject(bucketName, listingParams, cb) {
|
||||
const id = Math.random().toString(36).substring(7);
|
||||
let pending_query = {
|
||||
id,
|
||||
query: listingParams.query,
|
||||
responses : 0,
|
||||
listingParams
|
||||
}
|
||||
pending_query.listingParams.bucketName = bucketName;
|
||||
pending_query.listingParams.cb = cb;
|
||||
listingParams.query.forEach(term => {
|
||||
if (term.indexOf("op/") === -1) {
|
||||
pending_query.responses += 1
|
||||
iosock.sockets.to(term.split("/")[0]).emit("query", {
|
||||
id,
|
||||
term,
|
||||
bucketName
|
||||
});
|
||||
}
|
||||
});
|
||||
queries.push(pending_query);
|
||||
},
|
||||
|
||||
deleteObjectMD(bucketName, objName) {
|
||||
iosock.sockets.to("deletes").emit("delete", {
|
||||
bucketName,
|
||||
objName
|
||||
});
|
||||
},
|
||||
|
||||
processQueryHeader(header) {
|
||||
if (!header) {
|
||||
return header;
|
||||
}
|
||||
const queryTerms = header.split("&");
|
||||
const query = [];
|
||||
for (let i = 0; i < queryTerms.length; i++) {
|
||||
query.push(queryTerms[i]);
|
||||
}
|
||||
return query;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
{
|
||||
"S3":
|
||||
[
|
||||
{
|
||||
"host": "127.0.0.1",
|
||||
"port": 7000,
|
||||
"index": ["tags", "size"]
|
||||
}
|
||||
],
|
||||
"antidote": {
|
||||
"host": "127.0.0.1",
|
||||
"port": 8187
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
'use strict';
|
||||
|
||||
const fs = require('fs');
|
||||
|
||||
let config = fs.readFileSync('./config.json', { encoding: 'utf-8' });
|
||||
config = JSON.parse(config);
|
||||
|
||||
for (let i=0; i<config.S3.length; i++) {
|
||||
require('./lib/indexUtils.js').default.connectToS3(config.S3[i].host, config.S3[i].port, config.S3[i].index);
|
||||
}
|
||||
|
||||
require('./lib/indexUtils.js').default.connectToDB();
|
|
@ -0,0 +1,406 @@
|
|||
const indexd = require("./indexUtils").default;
|
||||
const config = require("./indexUtils").config;
|
||||
const async = require("async");
|
||||
const _ = require("underscore")
|
||||
const antidoteCli = require('../../antidoteNodeCli/lib/api.js');
|
||||
|
||||
const indexedAttributes = [];
|
||||
|
||||
config.S3.forEach(connector => {
|
||||
connector.index.forEach(tag => {
|
||||
indexedAttributes.push(tag)
|
||||
});
|
||||
|
||||
})
|
||||
|
||||
function padLeft(nr, n){
|
||||
return Array(n-String(nr).length+1).join("0")+nr;
|
||||
}
|
||||
|
||||
function binaryIndexOf(arr, searchElement, op) {
|
||||
let minIndex = 0;
|
||||
let maxIndex = arr.length - 1;
|
||||
let currentIndex;
|
||||
let currentElement;
|
||||
|
||||
while (minIndex <= maxIndex) {
|
||||
currentIndex = (minIndex + maxIndex) / 2 | 0;
|
||||
currentElement = arr[currentIndex];
|
||||
|
||||
if (currentElement < searchElement) {
|
||||
minIndex = currentIndex + 1;
|
||||
}
|
||||
else if (currentElement > searchElement) {
|
||||
maxIndex = currentIndex - 1;
|
||||
}
|
||||
else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (op === "=") {
|
||||
return currentIndex;
|
||||
} else if (op === ">" && arr[currentIndex] <= searchElement) {
|
||||
currentIndex += 1
|
||||
} else if (op === ">=" && arr[currentIndex] < searchElement) {
|
||||
currentIndex += 1
|
||||
} else if (op === "<" && arr[currentIndex] >= searchElement) {
|
||||
currentIndex -= 1
|
||||
} else if (op === "<=") {
|
||||
if (arr[currentIndex] > searchElement) {
|
||||
currentIndex -= 1
|
||||
}
|
||||
}
|
||||
if (currentIndex > arr.length-1 || currentIndex < 0) {
|
||||
return -1;
|
||||
}
|
||||
return currentIndex;
|
||||
}
|
||||
|
||||
function storeToBitmap(stored) {
|
||||
const bm = bitmap.createObject();
|
||||
if (stored) {
|
||||
stored[2] = new Buffer(stored[2], "binary");
|
||||
bm.read(stored);
|
||||
}
|
||||
return bm;
|
||||
}
|
||||
|
||||
function parseNotOperator(result, not, callback) {
|
||||
if (not) {
|
||||
callback(null, result.not());
|
||||
} else {
|
||||
callback(null, result);
|
||||
}
|
||||
}
|
||||
|
||||
function updateObjectMapping(objMapping, objName) {
|
||||
let rowId;
|
||||
if (Object.keys(objMapping)) {
|
||||
if (typeof objMapping.mapping[objName] === "number") {
|
||||
rowId = objMapping.mapping[objName];
|
||||
} else if (objMapping.nextAvail.length > 0) {
|
||||
rowId = objMapping.nextAvail[0];
|
||||
objMapping.nextAvail.splice(0, 1);
|
||||
} else {
|
||||
rowId = objMapping.length;
|
||||
objMapping.length += 1;
|
||||
}
|
||||
}
|
||||
objMapping.mapping[rowId] = objName;
|
||||
objMapping.mapping[objName] = rowId;
|
||||
|
||||
return rowId;
|
||||
}
|
||||
|
||||
function getObjectMeta(bucketName, cb) {
|
||||
indexd.readAntidoteSet(`${bucketName}`, (err, allObjects) => {
|
||||
return cb(allObjects);
|
||||
});
|
||||
}
|
||||
|
||||
function updateIndexMeta(bucketName, objName, tx, cb) {
|
||||
indexd.writeIndex(`${bucketName}`, objName, tx, (tx) => {
|
||||
return cb(tx);
|
||||
});
|
||||
}
|
||||
|
||||
function filterRemoved(results, params, cb) {
|
||||
indexd.readAntidoteSet(`${params.bucketName}/removed`, (err, removed) => {
|
||||
results = results.filter(elem => {
|
||||
return removed.indexOf(elem) === -1;
|
||||
});
|
||||
return cb(results);
|
||||
});
|
||||
}
|
||||
|
||||
function constructRange(value, callback) {
|
||||
indexd.readAntidoteSet(value, (err, result) => {
|
||||
callback(null, result);
|
||||
});
|
||||
}
|
||||
|
||||
function searchIntRange(bucketName, op, term, not, callback) {
|
||||
term = term.replace("--integer", "");
|
||||
const attr = term.split("/")[0];
|
||||
let value = parseInt(term.split("/")[1], 10);
|
||||
if (op === "=") {
|
||||
readDB(`${bucketName}/${attr}/${value}`, (err, data) =>{
|
||||
callback(err, data);
|
||||
});
|
||||
} else {
|
||||
if (config.backend === "antidote") {
|
||||
indexd.readAntidoteSet(`${bucketName}/${attr}`, (err, result) => {
|
||||
const range = []
|
||||
const index = binaryIndexOf(result, value, op)
|
||||
if (index === -1) {
|
||||
return parseNotOperator([], not, callback);
|
||||
}
|
||||
if (op.indexOf(">") !== -1) {
|
||||
if (op.indexOf("=") === -1) {
|
||||
value += 1
|
||||
}
|
||||
for (let i = index; i < result.length; i+=1) {
|
||||
range.push(`${bucketName}/${attr}/${result[i]}`)
|
||||
}
|
||||
} else if (op.indexOf("<") !== -1) {
|
||||
if (op.indexOf("=") === -1) {
|
||||
value -= 1
|
||||
}
|
||||
for (let i = index; i >= 0; i-=1) {
|
||||
range.push(`${bucketName}/${attr}/${result[i]}`)
|
||||
}
|
||||
}
|
||||
const objRange = [];
|
||||
async.map(range, constructRange, function(err, res) {
|
||||
res.forEach(arr => {
|
||||
arr.forEach(elem => {
|
||||
objRange.push(elem)
|
||||
})
|
||||
})
|
||||
objRange.sort();
|
||||
parseNotOperator(objRange, not, callback);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function searchRegExp(bucketName, searchTerm, not, callback) {
|
||||
const regexp = new RegExp(searchTerm);
|
||||
let result = bitmap.createObject();
|
||||
indexd.getPrefix(`${bucketName}/tags`, (err, list) => {
|
||||
list.forEach(elem => {
|
||||
if (elem.key.indexOf("/") !== -1) {
|
||||
if (regexp.test(elem.key.substring(11))) {
|
||||
result = result.or(storeToBitmap(JSON.parse(elem.value)));
|
||||
}
|
||||
}
|
||||
});
|
||||
parseNotOperator(result, not, callback);
|
||||
})
|
||||
}
|
||||
|
||||
function readTagIndex(bucketName, searchTerm, not, callback) {
|
||||
let term = null;
|
||||
let operator = null;
|
||||
if (searchTerm.indexOf("--integer") !== -1) {
|
||||
operator = searchTerm.split("/")[1];
|
||||
term = searchTerm.replace(operator, "");
|
||||
term = term.replace("/", "");
|
||||
searchIntRange(bucketName, operator, term, not, callback);
|
||||
} else if (searchTerm.indexOf("--regexp") !== -1) {
|
||||
searchTerm = searchTerm.replace("--regexp", "");
|
||||
searchTerm = searchTerm.substring(11);
|
||||
searchRegExp(bucketName, searchTerm, not, callback);
|
||||
} else {
|
||||
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||
callback(err, data);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function readFileSizeIndex(bucketName, searchTerm, not, callback) {
|
||||
const operator = searchTerm.split("/")[1];
|
||||
let term = searchTerm.replace(operator, "");
|
||||
term = term.replace("/", "");
|
||||
searchIntRange(bucketName, operator, term, not, callback);
|
||||
}
|
||||
function readModDateIndex(bucketName, searchTerm, not, callback) {
|
||||
console.log(searchTerm);
|
||||
const operator = searchTerm.split("/")[1];
|
||||
let term = searchTerm.replace(operator, "");
|
||||
term = term.replace("/", "");
|
||||
return searchIntRange(bucketName, operator, term, not, callback);
|
||||
}
|
||||
|
||||
function readACLIndex(bucketName, searchTerm, not, callback) {
|
||||
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||
callback(err, data);
|
||||
});
|
||||
}
|
||||
|
||||
function readContentTypeIndex(bucketName, searchTerm, not, callback) {
|
||||
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||
callback(err, data);
|
||||
});
|
||||
}
|
||||
|
||||
function readIndex(bucketName, searchTerm, callback) {
|
||||
if (searchTerm.indexOf("op/AND") !== -1
|
||||
|| searchTerm.indexOf("op/OR") !== -1
|
||||
|| searchTerm.indexOf("op/NOT") !== -1) {
|
||||
callback(null, searchTerm);
|
||||
}
|
||||
let notOperator = false;
|
||||
let result;
|
||||
if (searchTerm.indexOf("tags") !== -1) {
|
||||
return readTagIndex(bucketName, searchTerm, notOperator, callback);
|
||||
} else if (searchTerm.indexOf("filesize") !== -1) {
|
||||
return readFileSizeIndex(bucketName, searchTerm, notOperator, callback);
|
||||
} else if (searchTerm.indexOf("modificationdate") !== -1) {
|
||||
return readModDateIndex(bucketName, searchTerm, notOperator, callback);
|
||||
} else if (searchTerm.indexOf("contenttype") !== -1 || searchTerm.indexOf("contentsubtype") !== -1) {
|
||||
return readContentTypeIndex(bucketName, searchTerm, notOperator, callback);
|
||||
} else if (searchTerm.indexOf("acl") !== -1) {
|
||||
return readACLIndex(bucketName, searchTerm, notOperator, callback);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function bitmapToStore(bitmap) {
|
||||
const toStore = bitmap.write();
|
||||
toStore[2] = toStore[2].toString("binary");
|
||||
return toStore;
|
||||
}
|
||||
|
||||
function deleteOldEntries(bucketName, rowId, cb) {
|
||||
return cb();
|
||||
}
|
||||
|
||||
function updateBitmap(bitmap, rowId) {
|
||||
if (bitmap.length() - 1 <= rowId) {
|
||||
bitmap.push(rowId);
|
||||
} else {
|
||||
bitmap.set(rowId);
|
||||
}
|
||||
return bitmapToStore(bitmap);
|
||||
}
|
||||
|
||||
function updateIndexEntry(key, rowId, cb) {
|
||||
indexd.get(key, (err, data) => {
|
||||
if (err) {
|
||||
return (err);
|
||||
} else {
|
||||
data = JSON.parse(data)
|
||||
}
|
||||
indexd.put(key, JSON.stringify(updateBitmap(storeToBitmap(data), rowId)), err =>{
|
||||
return (err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function updateIntIndex(bucketName, objName, attribute, value, cb) {
|
||||
indexd.writeIndex(`${bucketName}/${attribute}/${value}`, objName, tx, (tx) => {
|
||||
return cb(tx);
|
||||
})
|
||||
}
|
||||
|
||||
function updateACLIndex(bucketName, objName, objVal, rowId) {
|
||||
deleteOldEntries(bucketName, rowId, () => {
|
||||
Object.keys(objVal).forEach(elem => {
|
||||
if (typeof objVal[elem] === "string") {
|
||||
writeIndex(`${bucketName}/acl/${elem}/${objVal[elem]}`, objName, `${bucketName}/acl`, `${elem}/${objVal[elem]}`, rowId, (err) =>{
|
||||
return ;
|
||||
});
|
||||
} else {
|
||||
objVal[elem].forEach(item => {
|
||||
writeIndex(`${bucketName}/acl/${elem}/${item}`, objName, `${bucketName}/acl`, `${elem}/${item}`, rowId, (err) =>{
|
||||
return ;
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function updateContentTypeIndex(bucketName, objName, objVal, rowId) {
|
||||
const type = objVal.split("/")[0];
|
||||
const subtype = objVal.split("/")[1];
|
||||
writeIndex(`${bucketName}/contenttype/${type}`, objName, `${bucketName}/contenttype`, type, rowId, (err) =>{
|
||||
writeIndex(`${bucketName}/contentsubtype/${subtype}`, objName, `${bucketName}/contentsubtype`, subtype, rowId, (err) =>{
|
||||
return ;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function updateΜodDateIndex(bucketName, objName, objVal, rowId) {
|
||||
const date = new Date(objVal);
|
||||
const term = "modificationdate-";
|
||||
const year = date.getUTCFullYear();
|
||||
const month = date.getUTCMonth() + 1;
|
||||
const day = date.getUTCDate();
|
||||
const hours = date.getUTCHours();
|
||||
const minutes = date.getUTCMinutes();
|
||||
updateIntIndex(bucketName, objName, `${term}year`, year, rowId);
|
||||
updateIntIndex(bucketName, objName, `${term}month`, month, rowId);
|
||||
updateIntIndex(bucketName, objName, `${term}day`, day, rowId);
|
||||
updateIntIndex(bucketName, objName, `${term}hours`, hours, rowId);
|
||||
updateIntIndex(bucketName, objName, `${term}minutes`, minutes, rowId);
|
||||
}
|
||||
|
||||
function updateFileSizeIndex(bucketName, objName, objVal, tx, cb) {
|
||||
console.log("updateFileSizeIndex", indexedAttributes);
|
||||
if (indexedAttributes.indexOf("size") === -1) {
|
||||
console.log("return");
|
||||
return cb();
|
||||
}
|
||||
indexd.writeIndex(`${bucketName}/filesize/${parseInt(objVal, 10)}`, objName, tx, (tx) => {
|
||||
return cb(tx);
|
||||
});
|
||||
}
|
||||
|
||||
function updateTagIndex(bucketName, objName, objVal, tx, cb) {
|
||||
console.log("updateTagIndex", indexedAttributes);
|
||||
if (indexedAttributes.indexOf("tags") === -1) {
|
||||
console.log("return");
|
||||
return cb();
|
||||
}
|
||||
const tags = [];
|
||||
Object.keys(objVal).forEach(elem => {
|
||||
if (elem.indexOf("x-amz-meta") !== -1 &&
|
||||
elem !== "x-amz-meta-s3cmd-attrs") {
|
||||
tags.push({key: elem.replace("x-amz-meta-", ""), value: objVal[elem]});
|
||||
}
|
||||
});
|
||||
console.log(tags);
|
||||
tags.forEach(tag => {
|
||||
indexd.writeIndex(`${bucketName}/tags/${tag.key}/${tag.value}`, objName, tx, (tx) => {
|
||||
return cb(tx);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const index = {
|
||||
evaluateQuery: (params) => {
|
||||
let queryTerms = params.term;
|
||||
const bucketName = params.bucketName;
|
||||
readIndex(bucketName, queryTerms, (err, queryTerms) => {
|
||||
filterRemoved(queryTerms, params, (results) => {
|
||||
indexd.respondQuery(params, results);
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
updateIndex: (bucketName, objName, objVal) => {
|
||||
indexd.startTx( (tx) => {
|
||||
updateIndexMeta(bucketName, objName, tx, (tx) => {
|
||||
updateTagIndex(bucketName, objName, objVal, tx, (tx) => {
|
||||
updateFileSizeIndex(bucketName, objName, objVal["content-length"], tx, (tx) => {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
/*
|
||||
if (indexedAttributes.indexOf("filesize") !== -1) {
|
||||
updateFileSizeIndex(bucketName, objName, objVal["content-length"], rowId);
|
||||
}
|
||||
if (indexedAttributes.indexOf("date") !== -1) {
|
||||
updateΜodDateIndex(bucketName, objName, objVal["last-modified"], rowId);
|
||||
}
|
||||
if (indexedAttributes.indexOf("contenttype") !== -1) {
|
||||
updateContentTypeIndex(bucketName, objName, objVal["content-type"], rowId);
|
||||
}
|
||||
if (indexedAttributes.indexOf("acl") !== -1) {
|
||||
updateACLIndex(bucketName, objName, objVal["acl"], rowId);
|
||||
}
|
||||
*/
|
||||
});
|
||||
})
|
||||
},
|
||||
|
||||
deleteObject: (bucketName, objName) => {
|
||||
indexd.updateAntidoteSet(`${bucketName}/removed`, objName, () => {});
|
||||
},
|
||||
};
|
||||
|
||||
exports.default = index;
|
|
@ -0,0 +1,95 @@
|
|||
const fs = require('fs');
|
||||
const io = require('socket.io-client');
|
||||
const antidoteClient = require('antidote_ts_client');
|
||||
const antidoteCli = require('../../antidoteNodeCli/lib/api.js');
|
||||
|
||||
let config = fs.readFileSync('./config.json', { encoding: 'utf-8' });
|
||||
config = JSON.parse(config);
|
||||
|
||||
let antidote;
|
||||
let antidotedb;
|
||||
const clients = [];
|
||||
|
||||
const utils = {
|
||||
connectToS3(host, port, attributes) {
|
||||
let client = io.connect(`http://${host}:${port}`, {
|
||||
reconnection: true,
|
||||
});
|
||||
client.on('connect', function() {
|
||||
for (let i = 0; i < attributes.length; i++) {
|
||||
client.emit('subscribe', attributes[i]);
|
||||
client.emit('subscribe', 'put');
|
||||
}
|
||||
});
|
||||
client.on('reconnecting', function(number) {
|
||||
});
|
||||
client.on('error', function(err) {
|
||||
});
|
||||
client.on('put', function(msg) {
|
||||
require('./indexOps.js').default.updateIndex(msg.bucketName, msg.objName, msg.objVal);
|
||||
});
|
||||
client.on('query', function(msg) {
|
||||
msg.client = client;
|
||||
require('./indexOps.js').default.evaluateQuery(msg);
|
||||
});
|
||||
client.on('delete', function(msg) {
|
||||
require('./indexOps.js').default.deleteObject(msg.bucketName, msg.objName);
|
||||
});
|
||||
clients.push(client)
|
||||
},
|
||||
|
||||
connectToDB() {
|
||||
antidotedb = antidoteCli.connect(config.antidote.port, config.antidote.host);
|
||||
antidote = antidoteClient.connect(config.antidote.port, config.antidote.host);
|
||||
antidoteCli.setBucket(antidotedb, 'index');
|
||||
antidote.defaultBucket = 'index';
|
||||
},
|
||||
|
||||
updateAntidoteSet(key, elem, tx, cb) {
|
||||
console.log("updateAntidoteSet", key, elem);
|
||||
antidoteCli.updateSetTx(tx, 'keys', [key], (tx) => {
|
||||
antidoteCli.updateSetTx(tx, key, [elem], (tx) => {
|
||||
return cb(tx);
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
startTx(cb) {
|
||||
antidoteCli.startTx(antidotedb, (tx) => {
|
||||
return cb(tx)
|
||||
});
|
||||
},
|
||||
|
||||
commitTx(cb) {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
return cb();
|
||||
});
|
||||
},
|
||||
|
||||
writeIndex(key, objName, tx, cb) {
|
||||
console.log("writeIndex", key, objName);
|
||||
antidoteCli.updateSetTx(tx, 'keys', [key], (tx) => {
|
||||
antidoteCli.updateSetTx(tx, key, [objName], (tx) => {
|
||||
return cb(tx);
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
readIndex(key, cb) {
|
||||
antidoteCli.readSetTx(tx, key, (tx, data) => {
|
||||
return cb(tx, data);
|
||||
});
|
||||
},
|
||||
|
||||
respondQuery(params, queryTerms) {
|
||||
let client = params.client;
|
||||
client.emit('query_response', {
|
||||
result: queryTerms,
|
||||
id: params.id,
|
||||
term: params.term
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
exports.default = utils;
|
||||
exports.config = config;
|
|
@ -0,0 +1,47 @@
|
|||
|
||||
import ListResult from '../in_memory/ListResult';
|
||||
|
||||
export class ListBucketResult extends ListResult {
|
||||
constructor() {
|
||||
super();
|
||||
this.Contents = [];
|
||||
}
|
||||
|
||||
addContentsKey(key, keyMap) {
|
||||
const objectMD = keyMap;
|
||||
this.Contents.push({
|
||||
key,
|
||||
value: {
|
||||
LastModified: objectMD['last-modified'],
|
||||
ETag: objectMD['content-md5'],
|
||||
StorageClass: objectMD['x-amz-storage-class'],
|
||||
Owner: {
|
||||
DisplayName: objectMD['owner-display-name'],
|
||||
ID: objectMD['owner-id'],
|
||||
},
|
||||
Size: objectMD['content-length'],
|
||||
// Initiated is used for overview of MPU
|
||||
Initiated: objectMD.initiated,
|
||||
// Initiator is used for overview of MPU.
|
||||
// It is an object containing DisplayName
|
||||
// and ID
|
||||
Initiator: objectMD.initiator,
|
||||
// EventualStorageBucket is used for overview of MPU
|
||||
EventualStorageBucket: objectMD.eventualStorageBucket,
|
||||
// Used for parts of MPU
|
||||
partLocations: objectMD.partLocations,
|
||||
// creationDate is just used for serviceGet
|
||||
creationDate: objectMD.creationDate,
|
||||
},
|
||||
});
|
||||
this.MaxKeys += 1;
|
||||
}
|
||||
|
||||
hasDeleteMarker(key, keyMap) {
|
||||
const objectMD = keyMap;
|
||||
if (objectMD['x-amz-delete-marker'] !== undefined) {
|
||||
return (objectMD['x-amz-delete-marker'] === true);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,339 @@
|
|||
import { errors } from 'arsenal';
|
||||
|
||||
import { markerFilter, prefixFilter } from '../in_memory/bucket_utilities';
|
||||
import { ListBucketResult } from './ListBucketResult';
|
||||
import getMultipartUploadListing from '../in_memory/getMultipartUploadListing';
|
||||
import config from '../../Config';
|
||||
import async from 'async';
|
||||
import antidoteCli from '../../antidoteNodeCli/lib/api.js';
|
||||
|
||||
const defaultMaxKeys = 1000;
|
||||
|
||||
class AntidoteInterface {
|
||||
constructor() {
|
||||
this.antidotedb = antidoteCli.connect(config.antidote.port, config.antidote.host);
|
||||
}
|
||||
|
||||
createBucket(bucketName, bucketMD, log, cb) {
|
||||
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
|
||||
// TODO Check whether user already owns the bucket,
|
||||
// if so return "BucketAlreadyOwnedByYou"
|
||||
// If not owned by user, return "BucketAlreadyExists"
|
||||
if (bucket) {
|
||||
return cb(errors.BucketAlreadyExists);
|
||||
}
|
||||
const mapKeys = []
|
||||
const mapValues = []
|
||||
Object.keys(bucketMD).forEach(key => {
|
||||
mapKeys.push(key)
|
||||
mapValues.push(bucketMD[key])
|
||||
});
|
||||
antidoteCli.updateMapRegisterTx(tx, `${bucketName}/md`, mapKeys, mapValues, (tx) => {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
return cb();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
putBucketAttributes(bucketName, bucketMD, log, cb) {
|
||||
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||
this.getBucketAttributes(tx, bucketName, log, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
const mapKeys = []
|
||||
const mapValues = []
|
||||
Object.keys(bucketMD).forEach(key => {
|
||||
mapKeys.push(key)
|
||||
mapValues.push(bucketMD[key])
|
||||
});
|
||||
antidoteCli.updateMapRegisterTx(tx, `${bucketName}/md`, mapKeys, mapValues, (tx) => {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
return cb();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getBucketAttributes(tx, bucketName, log, cb) {
|
||||
if (tx === null) {
|
||||
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
if (Object.keys(bucketMD).length === 0) {
|
||||
return cb(errors.NoSuchBucket);
|
||||
}
|
||||
return cb(null, bucketMD);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
else {
|
||||
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
|
||||
if (Object.keys(bucketMD).length === 0) {
|
||||
return cb(errors.NoSuchBucket);
|
||||
}
|
||||
return cb(null, bucketMD);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
deleteBucket(bucketName, log, cb) {
|
||||
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
antidoteCli.readSetTx(tx, `${bucketName}/objs`, (tx, objects) => {
|
||||
if (bucket && objects.length > 0) {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
return cb(errors.BucketNotEmpty);
|
||||
});
|
||||
}
|
||||
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
|
||||
const mapKeys = []
|
||||
Object.keys(bucketMD).forEach(key => {
|
||||
mapKeys.push(key)
|
||||
});
|
||||
antidoteCli.removeMapRegisterTx(tx, `${bucketName}/md`, mapKeys, (tx) => {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
return cb(null);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
putObject(bucketName, objName, objVal, log, cb) {
|
||||
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||
this.getBucketAttributes(tx, bucketName, log, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
const mapKeys = []
|
||||
const mapValues = []
|
||||
Object.keys(objVal).forEach(key => {
|
||||
mapKeys.push(key)
|
||||
mapValues.push(objVal[key])
|
||||
});
|
||||
antidoteCli.updateSetTx(tx, `${bucketName}/objs`, [objName], (tx) => {
|
||||
antidoteCli.updateMapRegisterTx(tx, `${objName}`, mapKeys, mapValues, (tx) => {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
return cb();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getBucketAndObject(bucketName, objName, log, cb) {
|
||||
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
return cb(err, { bucket });
|
||||
}
|
||||
const bucket_MD = {}
|
||||
Object.keys(bucket).map(function(key) {
|
||||
bucket_MD[key.substr(1)] = bucket[key]
|
||||
});
|
||||
antidoteCli.readMapTx(tx, `${objName}`, (tx, objectMD) => {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||
return cb(null, { bucket: JSON.stringify(bucket_MD) });
|
||||
}
|
||||
return cb(null, {
|
||||
bucket: JSON.stringify(bucket_MD),
|
||||
obj: JSON.stringify(objectMD),
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getObject(bucketName, objName, log, cb) {
|
||||
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
antidoteCli.readMapTx(tx, `${objName}`, (tx, objectMD) => {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||
return cb(errors.NoSuchKey);
|
||||
}
|
||||
return cb(null, objectMD);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
deleteObject(bucketName, objName, log, cb) {
|
||||
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
antidoteCli.readMapTx(tx, `${objName}`, (tx, objectMD) => {
|
||||
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
return cb(errors.NoSuchKey);
|
||||
});
|
||||
}
|
||||
const mapKeys = []
|
||||
Object.keys(objectMD).forEach(key => {
|
||||
mapKeys.push(key)
|
||||
});
|
||||
antidoteCli.removeMapRegisterTx(tx, `${objName}`, mapKeys, (tx) => {
|
||||
antidoteCli.removeSetTx(tx, `${bucketName}/objs`, [objName], (tx) => {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
return cb();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getObjectMD(tx, bucketName, key, callback) {
|
||||
antidoteCli.readMapTx(tx, `${key}`, (tx, objectMD) => {
|
||||
if (Object.keys(objectMD).length === 0) {
|
||||
return callback(error.NoSuchKey, null);
|
||||
}
|
||||
return callback(null, objectMD);
|
||||
});
|
||||
}
|
||||
|
||||
listObject(bucketName, params, log, cb) {
|
||||
const { prefix, marker, delimiter, maxKeys } = params;
|
||||
if (prefix && typeof prefix !== 'string') {
|
||||
return cb(errors.InvalidArgument);
|
||||
}
|
||||
|
||||
if (marker && typeof marker !== 'string') {
|
||||
return cb(errors.InvalidArgument);
|
||||
}
|
||||
|
||||
if (delimiter && typeof delimiter !== 'string') {
|
||||
return cb(errors.InvalidArgument);
|
||||
}
|
||||
|
||||
if (maxKeys && typeof maxKeys !== 'number') {
|
||||
return cb(errors.InvalidArgument);
|
||||
}
|
||||
|
||||
let numKeys = maxKeys;
|
||||
// If paramMaxKeys is undefined, the default parameter will set it.
|
||||
// However, if it is null, the default parameter will not set it.
|
||||
if (numKeys === null) {
|
||||
numKeys = defaultMaxKeys;
|
||||
}
|
||||
|
||||
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
|
||||
if (Object.keys(bucketMD).length === 0) {
|
||||
return cb(errors.NoSuchBucket);
|
||||
}
|
||||
const response = new ListBucketResult();
|
||||
antidoteCli.readSetTx(tx, `${bucketName}/objs`, (tx, keys) => {
|
||||
async.map(keys, this.getObjectMD.bind(null, tx, bucketName), function(err, objectMeta) {
|
||||
antidoteCli.commitTx(tx, () => {
|
||||
// If marker specified, edit the keys array so it
|
||||
// only contains keys that occur alphabetically after the marker
|
||||
if (marker) {
|
||||
keys = markerFilter(marker, keys);
|
||||
response.Marker = marker;
|
||||
}
|
||||
// If prefix specified, edit the keys array so it only
|
||||
// contains keys that contain the prefix
|
||||
if (prefix) {
|
||||
keys = prefixFilter(prefix, keys);
|
||||
response.Prefix = prefix;
|
||||
}
|
||||
// Iterate through keys array and filter keys containing
|
||||
// delimiter into response.CommonPrefixes and filter remaining
|
||||
// keys into response.Contents
|
||||
for (let i = 0; i < keys.length; ++i) {
|
||||
const currentKey = keys[i];
|
||||
// Do not list object with delete markers
|
||||
if (response.hasDeleteMarker(currentKey,
|
||||
objectMeta[i])) {
|
||||
continue;
|
||||
}
|
||||
// If hit numKeys, stop adding keys to response
|
||||
if (response.MaxKeys >= numKeys) {
|
||||
response.IsTruncated = true;
|
||||
response.NextMarker = keys[i - 1];
|
||||
break;
|
||||
}
|
||||
// If a delimiter is specified, find its index in the
|
||||
// current key AFTER THE OCCURRENCE OF THE PREFIX
|
||||
let delimiterIndexAfterPrefix = -1;
|
||||
let prefixLength = 0;
|
||||
if (prefix) {
|
||||
prefixLength = prefix.length;
|
||||
}
|
||||
const currentKeyWithoutPrefix = currentKey
|
||||
.slice(prefixLength);
|
||||
let sliceEnd;
|
||||
if (delimiter) {
|
||||
delimiterIndexAfterPrefix = currentKeyWithoutPrefix
|
||||
.indexOf(delimiter);
|
||||
sliceEnd = delimiterIndexAfterPrefix + prefixLength;
|
||||
response.Delimiter = delimiter;
|
||||
}
|
||||
// If delimiter occurs in current key, add key to
|
||||
// response.CommonPrefixes.
|
||||
// Otherwise add key to response.Contents
|
||||
if (delimiterIndexAfterPrefix > -1) {
|
||||
const keySubstring = currentKey.slice(0, sliceEnd + 1);
|
||||
response.addCommonPrefix(keySubstring);
|
||||
} else {
|
||||
response.addContentsKey(currentKey,
|
||||
objectMeta[i]);
|
||||
}
|
||||
}
|
||||
return cb(null, response);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
listMultipartUploads(bucketName, listingParams, log, cb) {
|
||||
process.nextTick(() => {
|
||||
this.getBucketAttributes(null, bucketName, log, (err, bucket) => {
|
||||
if (bucket === undefined) {
|
||||
// no on going multipart uploads, return empty listing
|
||||
return cb(null, {
|
||||
IsTruncated: false,
|
||||
NextMarker: undefined,
|
||||
MaxKeys: 0,
|
||||
});
|
||||
}
|
||||
return getMultipartUploadListing(bucket, listingParams, cb);
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
export default AntidoteInterface;
|
|
@ -2,7 +2,10 @@ import BucketClientInterface from './bucketclient/backend';
|
|||
import BucketFileInterface from './bucketfile/backend';
|
||||
import BucketInfo from './BucketInfo';
|
||||
import inMemory from './in_memory/backend';
|
||||
import AntidoteInterface from './antidote/backend';
|
||||
import config from '../Config';
|
||||
import indexClient from '../indexClient/indexClient'
|
||||
import async from 'async'
|
||||
|
||||
let client;
|
||||
let implName;
|
||||
|
@ -16,6 +19,33 @@ if (config.backends.metadata === 'mem') {
|
|||
} else if (config.backends.metadata === 'scality') {
|
||||
client = new BucketClientInterface();
|
||||
implName = 'bucketclient';
|
||||
} else if (config.backends.metadata === 'antidote') {
|
||||
client = new AntidoteInterface();
|
||||
implName = 'antidote';
|
||||
}
|
||||
|
||||
function getQueryResults(params, objName, callback) {
|
||||
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
|
||||
client.getObject(bucketName, objName, log, (err, data) => {
|
||||
if (err) {
|
||||
callback(err, null);
|
||||
}
|
||||
callback(null, {key: objName, value: {
|
||||
LastModified: data['last-modified'],
|
||||
ETag: data['content-md5'],
|
||||
StorageClass: data['x-amz-storage-class'],
|
||||
Owner: {
|
||||
ID: data['owner-id'],
|
||||
DisplayName: data['owner-display-name']
|
||||
},
|
||||
Size: data['content-length'],
|
||||
Initiated: undefined,
|
||||
Initiator: undefined,
|
||||
EventualStorageBucket: undefined,
|
||||
partLocations: undefined,
|
||||
creationDate: undefined
|
||||
}});
|
||||
});
|
||||
}
|
||||
|
||||
const metadata = {
|
||||
|
@ -45,7 +75,7 @@ const metadata = {
|
|||
|
||||
getBucket: (bucketName, log, cb) => {
|
||||
log.debug('getting bucket from metadata');
|
||||
client.getBucketAttributes(bucketName, log, (err, data) => {
|
||||
client.getBucketAttributes(null, bucketName, log, (err, data) => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName, error: err });
|
||||
return cb(err);
|
||||
|
@ -75,6 +105,12 @@ const metadata = {
|
|||
return cb(err);
|
||||
}
|
||||
log.debug('object successfully put in metadata');
|
||||
if (config.userMetaIndexing || config.systemMetaIndexing) {
|
||||
if (objName.indexOf('..|..') !== -1) {
|
||||
return cb(err);
|
||||
}
|
||||
indexClient.putObjectMD(bucketName, objName, objVal);
|
||||
}
|
||||
return cb(err);
|
||||
}, params);
|
||||
},
|
||||
|
@ -113,11 +149,17 @@ const metadata = {
|
|||
return cb(err);
|
||||
}
|
||||
log.debug('object deleted from metadata');
|
||||
if (config.userMetaIndexing || config.systemMetaIndexing) {
|
||||
indexClient.deleteObjectMD(bucketName, objName);
|
||||
}
|
||||
return cb(err);
|
||||
}, params);
|
||||
},
|
||||
|
||||
listObject: (bucketName, listingParams, log, cb) => {
|
||||
if (listingParams.query) {
|
||||
indexClient.listObject(bucketName, listingParams, cb);
|
||||
} else {
|
||||
client
|
||||
.listObject(bucketName, listingParams,
|
||||
log, (err, data) => {
|
||||
|
@ -129,6 +171,34 @@ const metadata = {
|
|||
log.debug('object listing retrieved from metadata');
|
||||
return cb(err, data);
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
respondQueryGetMD: (result, params) => {
|
||||
async.map(result, getQueryResults.bind(null, params), function(err, res) {
|
||||
const response = {
|
||||
IsTruncated: false,
|
||||
NextMarker: params.marker,
|
||||
CommonPrefixes: [],
|
||||
MaxKeys: 10,
|
||||
Contents: res
|
||||
}
|
||||
return params.cb(err, response);
|
||||
});
|
||||
},
|
||||
|
||||
respondQueryFilter: (result, params) => {
|
||||
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
|
||||
client.listObject(bucketName, { prefix, marker, maxKeys, delimiter },
|
||||
log, (err, data) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
data.Contents = data.Contents.filter(elem => {
|
||||
return result.indexOf(elem.key) !== -1;
|
||||
});
|
||||
return cb(err, data);
|
||||
});
|
||||
},
|
||||
|
||||
listMultipartUploads: (bucketName, listingParams, log, cb) => {
|
||||
|
|
|
@ -7,7 +7,7 @@ import { logger } from './utilities/logger';
|
|||
import { clientCheck } from './utilities/healthcheckHandler';
|
||||
import _config from './Config';
|
||||
import routes from './routes';
|
||||
|
||||
import indexClient from './indexClient/indexClient'
|
||||
|
||||
class S3Server {
|
||||
/**
|
||||
|
@ -38,6 +38,7 @@ class S3Server {
|
|||
* This starts the http server.
|
||||
*/
|
||||
startup(port) {
|
||||
indexClient.createPublisher(_config.indexServerPort, logger);
|
||||
// Todo: http.globalAgent.maxSockets, http.globalAgent.maxFreeSockets
|
||||
if (_config.https) {
|
||||
this.server = https.createServer({
|
||||
|
@ -118,7 +119,7 @@ class S3Server {
|
|||
|
||||
export default function main() {
|
||||
let clusters = _config.clusters || 1;
|
||||
if (process.env.S3BACKEND === 'mem') {
|
||||
if (process.env.S3BACKEND === 'mem' || process.env.S3BACKEND === 'antidote') {
|
||||
clusters = 1;
|
||||
}
|
||||
if (cluster.isMaster) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
},
|
||||
"homepage": "https://github.com/scality/S3#readme",
|
||||
"dependencies": {
|
||||
"antidote_ts_client": "^0.1.0",
|
||||
"arsenal": "scality/Arsenal",
|
||||
"async": "~1.4.2",
|
||||
"babel-core": "^6.5.2",
|
||||
|
@ -33,6 +34,7 @@
|
|||
"multilevel": "^7.3.0",
|
||||
"node-uuid": "^1.4.3",
|
||||
"ready-set-stream": "1.0.7",
|
||||
"socket.io": "^1.7.2",
|
||||
"sproxydclient": "scality/sproxydclient",
|
||||
"utapi": "scality/utapi",
|
||||
"utf8": "~2.1.1",
|
||||
|
@ -47,6 +49,7 @@
|
|||
"aws-sdk": "^2.2.11",
|
||||
"babel-cli": "^6.2.0",
|
||||
"babel-eslint": "^6.0.0",
|
||||
"babel-plugin-transform-async-to-generator": "^6.24.1",
|
||||
"bluebird": "^3.3.1",
|
||||
"eslint": "^2.4.0",
|
||||
"eslint-config-airbnb": "^6.0.0",
|
||||
|
@ -68,9 +71,13 @@
|
|||
"ft_test": "mocha tester.js --compilers js:babel-core/register",
|
||||
"init": "node init.js",
|
||||
"install_ft_deps": "npm install aws-sdk@2.2.11 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7 zombie@5.0.5",
|
||||
"install_antidote_client": "babel --plugins transform-async-to-generator lib/antidoteNodeCli/src/ -d lib/antidoteNodeCli/lib/",
|
||||
"lint": "eslint $(git ls-files '*.js')",
|
||||
"lint_md": "mdlint $(git ls-files '*.md')",
|
||||
"mem_backend": "S3BACKEND=mem node index.js",
|
||||
"antidote_backend": "S3BACKEND=antidote node index.js",
|
||||
"antidote_deploy": "S3BACKEND=antidote ./deploy_instances.sh 3",
|
||||
"antidote_stop": "S3BACKEND=antidote ./stop_instances.sh",
|
||||
"perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js",
|
||||
"start": "node init.js && node index.js",
|
||||
"start_utapi": "node utapiServer.js",
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
|
||||
const configpath = path.join(__dirname, '/config.json');
|
||||
var config = fs.readFileSync(configpath, { encoding: 'utf-8' });
|
||||
config = JSON.parse(config);
|
||||
|
||||
config.port = 8000 + ((config.port - 8000 + 1) % 3);
|
||||
config.antidote.port = 8187 + ((config.antidote.port - 8187 + 100) % 400);
|
||||
config.indexServerPort = 7000 + ((config.indexServerPort - 7000 + 1) % 3);
|
||||
|
||||
config = JSON.stringify(config);
|
||||
fs.writeFileSync(configpath, config, { encoding: 'utf-8' });
|
|
@ -0,0 +1,4 @@
|
|||
#!/bin/bash
|
||||
fuser -k -n tcp 8000
|
||||
fuser -k -n tcp 8001
|
||||
fuser -k -n tcp 8002
|
Loading…
Reference in New Issue