Compare commits
47 Commits
developmen
...
proto/S3+A
Author | SHA1 | Date |
---|---|---|
Dimitrios Vasilas | bd4487eb07 | |
Dimitrios Vasilas | 97f4d02a15 | |
Dimitrios Vasilas | 6a543567c4 | |
Dimitrios Vasilas | 2dc0618c80 | |
Dimitrios Vasilas | 1dde0aceff | |
Dimitrios Vasilas | 565d027c7d | |
Dimitrios Vasilas | c3e20cecce | |
Dimitrios Vasilas | 9b4ea5f61c | |
Dimitrios Vasilas | 913103cb15 | |
Dimitrios Vasilas | 539e025895 | |
Dimitrios Vasilas | 34f37c15dc | |
Dimitrios Vasilas | 8eabfb035e | |
Dimitrios Vasilas | 8f274252ce | |
Dimitrios Vasilas | e27cb1cf6c | |
Dimitrios Vasilas | 9c8c8fd9ea | |
Dimitrios Vasilas | a236fd4457 | |
Dimitrios Vasilas | e76472fa20 | |
Dimitrios Vasilas | e6f82079c6 | |
Dimitrios Vasilas | cfb77cc9fe | |
Dimitrios Vasilas | 17a9cd1071 | |
Dimitrios Vasilas | 8ce70e7337 | |
Dimitrios Vasilas | cdbcae37b2 | |
Dimitrios Vasilas | 0f5cd3e545 | |
Dimitrios Vasilas | f6cddaf6a5 | |
Dimitrios Vasilas | c225e483bc | |
Dimitrios Vasilas | 53cb8a3263 | |
Dimitrios Vasilas | d302070959 | |
Dimitrios Vasilas | 7597c379e7 | |
Dimitrios Vasilas | 541e97f1ea | |
Dimitrios Vasilas | c32fe7df51 | |
Dimitrios Vasilas | 9e301aaf25 | |
Dimitrios Vasilas | eb441dd09c | |
Dimitrios Vasilas | 6fddd95a1d | |
Dimitrios Vasilas | 1303bd8b97 | |
Dimitrios Vasilas | 093c071836 | |
Dimitrios Vasilas | da115e6ce4 | |
Dimitrios Vasilas | b588d5e4d0 | |
Dimitrios Vasilas | 5f1bd15fb0 | |
Dimitrios Vasilas | 22a783f23e | |
Dimitrios Vasilas | 320fc256bf | |
Dimitrios Vasilas | 361585825f | |
Dimitrios Vasilas | e390e355ef | |
Dimitrios Vasilas | f03940e45e | |
Dimitrios Vasilas | e67abad697 | |
Dimitrios Vasilas | 9a42556448 | |
Dimitrios Vasilas | 3056714c02 | |
Dimitrios Vasilas | d871755296 |
|
@ -48,6 +48,13 @@
|
||||||
"logLevel": "info",
|
"logLevel": "info",
|
||||||
"dumpLevel": "error"
|
"dumpLevel": "error"
|
||||||
},
|
},
|
||||||
|
"antidote": {
|
||||||
|
"host": "localhost",
|
||||||
|
"port": 8087
|
||||||
|
},
|
||||||
|
"userMetaIndexing":true,
|
||||||
|
"systemMetaIndexing":true,
|
||||||
|
"indexServerPort":7000,
|
||||||
"healthChecks": {
|
"healthChecks": {
|
||||||
"allowFrom": ["127.0.0.1/8", "::1"]
|
"allowFrom": ["127.0.0.1/8", "::1"]
|
||||||
},
|
},
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
#!/bin/bash
|
||||||
|
for i in $(seq 1 $1);
|
||||||
|
do
|
||||||
|
node ./portConfig.js
|
||||||
|
npm run antidote_backend &
|
||||||
|
sleep 3
|
||||||
|
done
|
|
@ -276,7 +276,7 @@ class Config {
|
||||||
let metadata = 'file';
|
let metadata = 'file';
|
||||||
let kms = 'file';
|
let kms = 'file';
|
||||||
if (process.env.S3BACKEND) {
|
if (process.env.S3BACKEND) {
|
||||||
const validBackends = ['mem', 'file', 'scality'];
|
const validBackends = ['mem', 'file', 'scality', 'antidote'];
|
||||||
assert(validBackends.indexOf(process.env.S3BACKEND) > -1,
|
assert(validBackends.indexOf(process.env.S3BACKEND) > -1,
|
||||||
'bad environment variable: S3BACKEND environment variable ' +
|
'bad environment variable: S3BACKEND environment variable ' +
|
||||||
'should be one of mem/file/scality'
|
'should be one of mem/file/scality'
|
||||||
|
@ -289,7 +289,11 @@ class Config {
|
||||||
if (process.env.S3VAULT) {
|
if (process.env.S3VAULT) {
|
||||||
auth = process.env.S3VAULT;
|
auth = process.env.S3VAULT;
|
||||||
}
|
}
|
||||||
if (auth === 'file' || auth === 'mem') {
|
if (data === 'antidote') {
|
||||||
|
data = 'mem';
|
||||||
|
kms = 'mem';
|
||||||
|
}
|
||||||
|
if (auth === 'file' || auth === 'mem' || auth === 'antidote') {
|
||||||
// Auth only checks for 'mem' since mem === file
|
// Auth only checks for 'mem' since mem === file
|
||||||
auth = 'mem';
|
auth = 'mem';
|
||||||
let authfile = `${__dirname}/../conf/authdata.json`;
|
let authfile = `${__dirname}/../conf/authdata.json`;
|
||||||
|
@ -331,6 +335,32 @@ class Config {
|
||||||
dataPath,
|
dataPath,
|
||||||
metadataPath,
|
metadataPath,
|
||||||
};
|
};
|
||||||
|
this.antidote = {};
|
||||||
|
if (config.antidote) {
|
||||||
|
if (config.antidote.port !== undefined) {
|
||||||
|
assert(Number.isInteger(config.antidote.port)
|
||||||
|
&& config.antidote.port > 0,
|
||||||
|
'bad config: vaultd port must be a positive integer');
|
||||||
|
this.antidote.port = config.antidote.port;
|
||||||
|
}
|
||||||
|
if (config.antidote.host !== undefined) {
|
||||||
|
assert.strictEqual(typeof config.vaultd.host, 'string',
|
||||||
|
'bad config: vaultd host must be a string');
|
||||||
|
this.antidote.host = config.antidote.host;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.userMetaIndexing = false;
|
||||||
|
if (config.userMetaIndexing !== undefined) {
|
||||||
|
this.userMetaIndexing = config.userMetaIndexing;
|
||||||
|
}
|
||||||
|
this.systemMetaIndexing = false;
|
||||||
|
if (config.systemMetaIndexing !== undefined) {
|
||||||
|
this.systemMetaIndexing = config.systemMetaIndexing;
|
||||||
|
}
|
||||||
|
this.indexServerPort = 7000;
|
||||||
|
if (config.indexServerPort !== undefined) {
|
||||||
|
this.indexServerPort = config.indexServerPort;
|
||||||
|
}
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,135 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
let startTx = (() => {
|
||||||
|
var _ref = _asyncToGenerator(function* (antidote, cb) {
|
||||||
|
let tx = yield antidote.startTransaction();
|
||||||
|
return cb(tx);
|
||||||
|
});
|
||||||
|
|
||||||
|
return function startTx(_x, _x2) {
|
||||||
|
return _ref.apply(this, arguments);
|
||||||
|
};
|
||||||
|
})();
|
||||||
|
|
||||||
|
let commitTx = (() => {
|
||||||
|
var _ref2 = _asyncToGenerator(function* (tx, cb) {
|
||||||
|
yield tx.commit();
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
|
||||||
|
return function commitTx(_x3, _x4) {
|
||||||
|
return _ref2.apply(this, arguments);
|
||||||
|
};
|
||||||
|
})();
|
||||||
|
|
||||||
|
let readSetTx = (() => {
|
||||||
|
var _ref3 = _asyncToGenerator(function* (tx, setKey, cb) {
|
||||||
|
let set = tx.set(setKey);
|
||||||
|
let result = yield set.read();
|
||||||
|
return cb(tx, result);
|
||||||
|
});
|
||||||
|
|
||||||
|
return function readSetTx(_x5, _x6, _x7) {
|
||||||
|
return _ref3.apply(this, arguments);
|
||||||
|
};
|
||||||
|
})();
|
||||||
|
|
||||||
|
let updateSetTx = (() => {
|
||||||
|
var _ref4 = _asyncToGenerator(function* (tx, setKey, keys, cb) {
|
||||||
|
let set = tx.set(setKey);
|
||||||
|
let ops = [];
|
||||||
|
for (let i = 0; i < keys.length; i++) {
|
||||||
|
ops.push(set.add(keys[i]));
|
||||||
|
}
|
||||||
|
yield tx.update(ops);
|
||||||
|
return cb(tx);
|
||||||
|
});
|
||||||
|
|
||||||
|
return function updateSetTx(_x8, _x9, _x10, _x11) {
|
||||||
|
return _ref4.apply(this, arguments);
|
||||||
|
};
|
||||||
|
})();
|
||||||
|
|
||||||
|
let removeSetTx = (() => {
|
||||||
|
var _ref5 = _asyncToGenerator(function* (tx, setKey, keys, cb) {
|
||||||
|
let set = tx.set(setKey);
|
||||||
|
let ops = [];
|
||||||
|
for (let i = 0; i < keys.length; i++) {
|
||||||
|
ops.push(set.remove(keys[i]));
|
||||||
|
}
|
||||||
|
yield tx.update(ops);
|
||||||
|
return cb(tx);
|
||||||
|
});
|
||||||
|
|
||||||
|
return function removeSetTx(_x12, _x13, _x14, _x15) {
|
||||||
|
return _ref5.apply(this, arguments);
|
||||||
|
};
|
||||||
|
})();
|
||||||
|
|
||||||
|
let readMapTx = (() => {
|
||||||
|
var _ref6 = _asyncToGenerator(function* (tx, mapKey, cb) {
|
||||||
|
let map = tx.map(mapKey);
|
||||||
|
let result = yield map.read();
|
||||||
|
result = result.toJsObject();
|
||||||
|
return cb(tx, result);
|
||||||
|
});
|
||||||
|
|
||||||
|
return function readMapTx(_x16, _x17, _x18) {
|
||||||
|
return _ref6.apply(this, arguments);
|
||||||
|
};
|
||||||
|
})();
|
||||||
|
|
||||||
|
let updateMapRegisterTx = (() => {
|
||||||
|
var _ref7 = _asyncToGenerator(function* (tx, mapKey, keys, values, cb) {
|
||||||
|
let map = tx.map(mapKey);
|
||||||
|
let ops = [];
|
||||||
|
for (let i = 0; i < keys.length; i++) {
|
||||||
|
ops.push(map.register(keys[i]).set(values[i]));
|
||||||
|
}
|
||||||
|
yield tx.update(ops);
|
||||||
|
return cb(tx);
|
||||||
|
});
|
||||||
|
|
||||||
|
return function updateMapRegisterTx(_x19, _x20, _x21, _x22, _x23) {
|
||||||
|
return _ref7.apply(this, arguments);
|
||||||
|
};
|
||||||
|
})();
|
||||||
|
|
||||||
|
let removeMapRegisterTx = (() => {
|
||||||
|
var _ref8 = _asyncToGenerator(function* (tx, mapKey, keys, cb) {
|
||||||
|
let map = tx.map(mapKey);
|
||||||
|
let ops = [];
|
||||||
|
for (let i = 0; i < keys.length; i++) {
|
||||||
|
ops.push(map.remove(map.register(keys[i])));
|
||||||
|
}
|
||||||
|
yield tx.update(ops);
|
||||||
|
return cb(tx);
|
||||||
|
});
|
||||||
|
|
||||||
|
return function removeMapRegisterTx(_x24, _x25, _x26, _x27) {
|
||||||
|
return _ref8.apply(this, arguments);
|
||||||
|
};
|
||||||
|
})();
|
||||||
|
|
||||||
|
function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
|
||||||
|
|
||||||
|
var antidoteClient = require('antidote_ts_client');
|
||||||
|
|
||||||
|
function connect(port, host) {
|
||||||
|
return antidoteClient.connect(port, host);
|
||||||
|
}
|
||||||
|
|
||||||
|
function setBucket(antidote, bucket) {
|
||||||
|
return antidote.defaultBucket = bucket;
|
||||||
|
}
|
||||||
|
|
||||||
|
exports.connect = connect;
|
||||||
|
exports.setBucket = setBucket;
|
||||||
|
exports.startTx = startTx;
|
||||||
|
exports.commitTx = commitTx;
|
||||||
|
exports.readMapTx = readMapTx;
|
||||||
|
exports.readSetTx = readSetTx;
|
||||||
|
exports.updateMapRegisterTx = updateMapRegisterTx;
|
||||||
|
exports.removeMapRegisterTx = removeMapRegisterTx;
|
||||||
|
exports.removeSetTx = removeSetTx;
|
||||||
|
exports.updateSetTx = updateSetTx;
|
|
@ -0,0 +1,83 @@
|
||||||
|
var antidoteClient = require('antidote_ts_client')
|
||||||
|
|
||||||
|
function connect(port, host) {
|
||||||
|
return antidoteClient.connect(port, host);
|
||||||
|
}
|
||||||
|
|
||||||
|
function setBucket(antidote, bucket) {
|
||||||
|
return antidote.defaultBucket = bucket;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function startTx(antidote, cb) {
|
||||||
|
let tx = await antidote.startTransaction()
|
||||||
|
return cb(tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
async function commitTx(tx, cb) {
|
||||||
|
await tx.commit()
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
|
||||||
|
async function readSetTx(tx, setKey, cb) {
|
||||||
|
let set = tx.set(setKey);
|
||||||
|
let result = await set.read();
|
||||||
|
return cb(tx, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
async function updateSetTx(tx, setKey, keys, cb) {
|
||||||
|
let set = tx.set(setKey);
|
||||||
|
let ops = [];
|
||||||
|
for (let i = 0; i < keys.length; i++) {
|
||||||
|
ops.push(set.add(keys[i]));
|
||||||
|
}
|
||||||
|
await tx.update(ops);
|
||||||
|
return cb(tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function removeSetTx(tx, setKey, keys, cb) {
|
||||||
|
let set = tx.set(setKey);
|
||||||
|
let ops = [];
|
||||||
|
for (let i = 0; i < keys.length; i++) {
|
||||||
|
ops.push(set.remove(keys[i]));
|
||||||
|
}
|
||||||
|
await tx.update(ops);
|
||||||
|
return cb(tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function readMapTx(tx, mapKey, cb) {
|
||||||
|
let map = tx.map(mapKey);
|
||||||
|
let result = await map.read();
|
||||||
|
result = result.toJsObject();
|
||||||
|
return cb(tx, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
async function updateMapRegisterTx(tx, mapKey, keys, values, cb) {
|
||||||
|
let map = tx.map(mapKey);
|
||||||
|
let ops = [];
|
||||||
|
for (let i = 0; i < keys.length; i++) {
|
||||||
|
ops.push(map.register(keys[i]).set(values[i]))
|
||||||
|
}
|
||||||
|
await tx.update(ops);
|
||||||
|
return cb(tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
async function removeMapRegisterTx(tx, mapKey, keys, cb) {
|
||||||
|
let map = tx.map(mapKey);
|
||||||
|
let ops = [];
|
||||||
|
for (let i = 0; i < keys.length; i++) {
|
||||||
|
ops.push(map.remove(map.register(keys[i])))
|
||||||
|
}
|
||||||
|
await tx.update(ops);
|
||||||
|
return cb(tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
exports.connect = connect;
|
||||||
|
exports.setBucket = setBucket;
|
||||||
|
exports.startTx = startTx;
|
||||||
|
exports.commitTx = commitTx;
|
||||||
|
exports.readMapTx = readMapTx;
|
||||||
|
exports.readSetTx = readSetTx;
|
||||||
|
exports.updateMapRegisterTx = updateMapRegisterTx;
|
||||||
|
exports.removeMapRegisterTx = removeMapRegisterTx;
|
||||||
|
exports.removeSetTx = removeSetTx;
|
||||||
|
exports.updateSetTx = updateSetTx;
|
|
@ -1,4 +1,5 @@
|
||||||
import querystring from 'querystring';
|
import querystring from 'querystring';
|
||||||
|
import indexClient from '../indexClient/indexClient';
|
||||||
import constants from '../../constants';
|
import constants from '../../constants';
|
||||||
|
|
||||||
import services from '../services';
|
import services from '../services';
|
||||||
|
@ -65,11 +66,17 @@ export default function bucketGet(authInfo, request, log, callback) {
|
||||||
requestType: 'bucketGet',
|
requestType: 'bucketGet',
|
||||||
log,
|
log,
|
||||||
};
|
};
|
||||||
|
let query = null;
|
||||||
|
if (params.prefix) {
|
||||||
|
query = params.prefix.split("//")[1];
|
||||||
|
params.prefix = params.prefix.split("//")[0];
|
||||||
|
}
|
||||||
const listParams = {
|
const listParams = {
|
||||||
maxKeys: actualMaxKeys,
|
maxKeys: actualMaxKeys,
|
||||||
delimiter: params.delimiter,
|
delimiter: params.delimiter,
|
||||||
marker: params.marker,
|
marker: params.marker,
|
||||||
prefix: params.prefix,
|
prefix: params.prefix,
|
||||||
|
query: indexClient.processQueryHeader(query),
|
||||||
};
|
};
|
||||||
|
|
||||||
services.metadataValidateAuthorization(metadataValParams, err => {
|
services.metadataValidateAuthorization(metadataValParams, err => {
|
||||||
|
@ -83,6 +90,7 @@ export default function bucketGet(authInfo, request, log, callback) {
|
||||||
log.debug('error processing request', { error: err });
|
log.debug('error processing request', { error: err });
|
||||||
return callback(err);
|
return callback(err);
|
||||||
}
|
}
|
||||||
|
listParams.prefix = params.prefix;
|
||||||
const xml = [];
|
const xml = [];
|
||||||
xml.push(
|
xml.push(
|
||||||
'<?xml version="1.0" encoding="UTF-8"?>',
|
'<?xml version="1.0" encoding="UTF-8"?>',
|
||||||
|
|
|
@ -0,0 +1,128 @@
|
||||||
|
import http from "http"
|
||||||
|
import io from "socket.io"
|
||||||
|
import metadata from "../metadata/wrapper.js"
|
||||||
|
const _ = require("underscore")
|
||||||
|
|
||||||
|
let iosock;
|
||||||
|
let queries = [];
|
||||||
|
|
||||||
|
function intersection(a, b) {
|
||||||
|
return _.intersection(a, b);
|
||||||
|
}
|
||||||
|
|
||||||
|
function union(a, b) {
|
||||||
|
return _.union(a, b);
|
||||||
|
}
|
||||||
|
|
||||||
|
function difference(u, a) {
|
||||||
|
return _.difference(u, a);
|
||||||
|
}
|
||||||
|
|
||||||
|
function proccessLogicalOps(terms) {
|
||||||
|
let i;
|
||||||
|
while (terms.length > 1) {
|
||||||
|
for (i = terms.length - 1; i >= 0; i--) {
|
||||||
|
if (terms[i] === "op/AND"
|
||||||
|
|| terms[i] === "op/OR"
|
||||||
|
|| terms[i] === "op/NOT") {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (terms[i] === "op/AND") {
|
||||||
|
const op1 = terms[i + 1];
|
||||||
|
const op2 = terms[i + 2];
|
||||||
|
terms.splice(i, 3, intersection(op1, op2));
|
||||||
|
} else if (terms[i] === "op/OR") {
|
||||||
|
const op1 = terms[i + 1];
|
||||||
|
const op2 = terms[i + 2];
|
||||||
|
terms.splice(i, 3, union(op1, op2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return terms[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
export default {
|
||||||
|
createPublisher(port, log) {
|
||||||
|
const server = http.createServer(function(req, res){
|
||||||
|
});
|
||||||
|
server.listen(port);
|
||||||
|
iosock = io.listen(server);
|
||||||
|
iosock.sockets.on("connection", function(socket) {
|
||||||
|
log.info("indexing server connected")
|
||||||
|
socket.on("disconnect", function() {
|
||||||
|
});
|
||||||
|
socket.on("subscribe", function(room) {
|
||||||
|
socket.join(room);
|
||||||
|
});
|
||||||
|
socket.on("query_response", function(msg) {
|
||||||
|
for (let i = 0; i < queries.length; i++) {
|
||||||
|
if (queries[i].id === msg.id) {
|
||||||
|
queries[i].responses -=1
|
||||||
|
let term_index = queries[i].query.indexOf(msg.term);
|
||||||
|
queries[i].query[term_index] = msg.result;
|
||||||
|
if (queries[i].responses === 0) {
|
||||||
|
let result = proccessLogicalOps(queries[i].query);
|
||||||
|
let listingParams = queries[i].listingParams;
|
||||||
|
queries.splice(i, 1)
|
||||||
|
metadata.respondQueryGetMD(result, listingParams);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
putObjectMD(bucketName, objName, objVal) {
|
||||||
|
let msg = {
|
||||||
|
bucketName,
|
||||||
|
objName,
|
||||||
|
objVal
|
||||||
|
};
|
||||||
|
if (iosock.sockets.adapter.rooms["put"]) {
|
||||||
|
iosock.sockets.to("put").emit("put", msg);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
listObject(bucketName, listingParams, cb) {
|
||||||
|
const id = Math.random().toString(36).substring(7);
|
||||||
|
let pending_query = {
|
||||||
|
id,
|
||||||
|
query: listingParams.query,
|
||||||
|
responses : 0,
|
||||||
|
listingParams
|
||||||
|
}
|
||||||
|
pending_query.listingParams.bucketName = bucketName;
|
||||||
|
pending_query.listingParams.cb = cb;
|
||||||
|
listingParams.query.forEach(term => {
|
||||||
|
if (term.indexOf("op/") === -1) {
|
||||||
|
pending_query.responses += 1
|
||||||
|
iosock.sockets.to(term.split("/")[0]).emit("query", {
|
||||||
|
id,
|
||||||
|
term,
|
||||||
|
bucketName
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
queries.push(pending_query);
|
||||||
|
},
|
||||||
|
|
||||||
|
deleteObjectMD(bucketName, objName) {
|
||||||
|
iosock.sockets.to("deletes").emit("delete", {
|
||||||
|
bucketName,
|
||||||
|
objName
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
processQueryHeader(header) {
|
||||||
|
if (!header) {
|
||||||
|
return header;
|
||||||
|
}
|
||||||
|
const queryTerms = header.split("&");
|
||||||
|
const query = [];
|
||||||
|
for (let i = 0; i < queryTerms.length; i++) {
|
||||||
|
query.push(queryTerms[i]);
|
||||||
|
}
|
||||||
|
return query;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,14 @@
|
||||||
|
{
|
||||||
|
"S3":
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"port": 7000,
|
||||||
|
"index": ["tags", "size"]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"antidote": {
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"port": 8187
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
const fs = require('fs');
|
||||||
|
|
||||||
|
let config = fs.readFileSync('./config.json', { encoding: 'utf-8' });
|
||||||
|
config = JSON.parse(config);
|
||||||
|
|
||||||
|
for (let i=0; i<config.S3.length; i++) {
|
||||||
|
require('./lib/indexUtils.js').default.connectToS3(config.S3[i].host, config.S3[i].port, config.S3[i].index);
|
||||||
|
}
|
||||||
|
|
||||||
|
require('./lib/indexUtils.js').default.connectToDB();
|
|
@ -0,0 +1,406 @@
|
||||||
|
const indexd = require("./indexUtils").default;
|
||||||
|
const config = require("./indexUtils").config;
|
||||||
|
const async = require("async");
|
||||||
|
const _ = require("underscore")
|
||||||
|
const antidoteCli = require('../../antidoteNodeCli/lib/api.js');
|
||||||
|
|
||||||
|
const indexedAttributes = [];
|
||||||
|
|
||||||
|
config.S3.forEach(connector => {
|
||||||
|
connector.index.forEach(tag => {
|
||||||
|
indexedAttributes.push(tag)
|
||||||
|
});
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
function padLeft(nr, n){
|
||||||
|
return Array(n-String(nr).length+1).join("0")+nr;
|
||||||
|
}
|
||||||
|
|
||||||
|
function binaryIndexOf(arr, searchElement, op) {
|
||||||
|
let minIndex = 0;
|
||||||
|
let maxIndex = arr.length - 1;
|
||||||
|
let currentIndex;
|
||||||
|
let currentElement;
|
||||||
|
|
||||||
|
while (minIndex <= maxIndex) {
|
||||||
|
currentIndex = (minIndex + maxIndex) / 2 | 0;
|
||||||
|
currentElement = arr[currentIndex];
|
||||||
|
|
||||||
|
if (currentElement < searchElement) {
|
||||||
|
minIndex = currentIndex + 1;
|
||||||
|
}
|
||||||
|
else if (currentElement > searchElement) {
|
||||||
|
maxIndex = currentIndex - 1;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (op === "=") {
|
||||||
|
return currentIndex;
|
||||||
|
} else if (op === ">" && arr[currentIndex] <= searchElement) {
|
||||||
|
currentIndex += 1
|
||||||
|
} else if (op === ">=" && arr[currentIndex] < searchElement) {
|
||||||
|
currentIndex += 1
|
||||||
|
} else if (op === "<" && arr[currentIndex] >= searchElement) {
|
||||||
|
currentIndex -= 1
|
||||||
|
} else if (op === "<=") {
|
||||||
|
if (arr[currentIndex] > searchElement) {
|
||||||
|
currentIndex -= 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (currentIndex > arr.length-1 || currentIndex < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return currentIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
function storeToBitmap(stored) {
|
||||||
|
const bm = bitmap.createObject();
|
||||||
|
if (stored) {
|
||||||
|
stored[2] = new Buffer(stored[2], "binary");
|
||||||
|
bm.read(stored);
|
||||||
|
}
|
||||||
|
return bm;
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseNotOperator(result, not, callback) {
|
||||||
|
if (not) {
|
||||||
|
callback(null, result.not());
|
||||||
|
} else {
|
||||||
|
callback(null, result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateObjectMapping(objMapping, objName) {
|
||||||
|
let rowId;
|
||||||
|
if (Object.keys(objMapping)) {
|
||||||
|
if (typeof objMapping.mapping[objName] === "number") {
|
||||||
|
rowId = objMapping.mapping[objName];
|
||||||
|
} else if (objMapping.nextAvail.length > 0) {
|
||||||
|
rowId = objMapping.nextAvail[0];
|
||||||
|
objMapping.nextAvail.splice(0, 1);
|
||||||
|
} else {
|
||||||
|
rowId = objMapping.length;
|
||||||
|
objMapping.length += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
objMapping.mapping[rowId] = objName;
|
||||||
|
objMapping.mapping[objName] = rowId;
|
||||||
|
|
||||||
|
return rowId;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getObjectMeta(bucketName, cb) {
|
||||||
|
indexd.readAntidoteSet(`${bucketName}`, (err, allObjects) => {
|
||||||
|
return cb(allObjects);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateIndexMeta(bucketName, objName, tx, cb) {
|
||||||
|
indexd.writeIndex(`${bucketName}`, objName, tx, (tx) => {
|
||||||
|
return cb(tx);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function filterRemoved(results, params, cb) {
|
||||||
|
indexd.readAntidoteSet(`${params.bucketName}/removed`, (err, removed) => {
|
||||||
|
results = results.filter(elem => {
|
||||||
|
return removed.indexOf(elem) === -1;
|
||||||
|
});
|
||||||
|
return cb(results);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function constructRange(value, callback) {
|
||||||
|
indexd.readAntidoteSet(value, (err, result) => {
|
||||||
|
callback(null, result);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function searchIntRange(bucketName, op, term, not, callback) {
|
||||||
|
term = term.replace("--integer", "");
|
||||||
|
const attr = term.split("/")[0];
|
||||||
|
let value = parseInt(term.split("/")[1], 10);
|
||||||
|
if (op === "=") {
|
||||||
|
readDB(`${bucketName}/${attr}/${value}`, (err, data) =>{
|
||||||
|
callback(err, data);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
if (config.backend === "antidote") {
|
||||||
|
indexd.readAntidoteSet(`${bucketName}/${attr}`, (err, result) => {
|
||||||
|
const range = []
|
||||||
|
const index = binaryIndexOf(result, value, op)
|
||||||
|
if (index === -1) {
|
||||||
|
return parseNotOperator([], not, callback);
|
||||||
|
}
|
||||||
|
if (op.indexOf(">") !== -1) {
|
||||||
|
if (op.indexOf("=") === -1) {
|
||||||
|
value += 1
|
||||||
|
}
|
||||||
|
for (let i = index; i < result.length; i+=1) {
|
||||||
|
range.push(`${bucketName}/${attr}/${result[i]}`)
|
||||||
|
}
|
||||||
|
} else if (op.indexOf("<") !== -1) {
|
||||||
|
if (op.indexOf("=") === -1) {
|
||||||
|
value -= 1
|
||||||
|
}
|
||||||
|
for (let i = index; i >= 0; i-=1) {
|
||||||
|
range.push(`${bucketName}/${attr}/${result[i]}`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const objRange = [];
|
||||||
|
async.map(range, constructRange, function(err, res) {
|
||||||
|
res.forEach(arr => {
|
||||||
|
arr.forEach(elem => {
|
||||||
|
objRange.push(elem)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
objRange.sort();
|
||||||
|
parseNotOperator(objRange, not, callback);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function searchRegExp(bucketName, searchTerm, not, callback) {
|
||||||
|
const regexp = new RegExp(searchTerm);
|
||||||
|
let result = bitmap.createObject();
|
||||||
|
indexd.getPrefix(`${bucketName}/tags`, (err, list) => {
|
||||||
|
list.forEach(elem => {
|
||||||
|
if (elem.key.indexOf("/") !== -1) {
|
||||||
|
if (regexp.test(elem.key.substring(11))) {
|
||||||
|
result = result.or(storeToBitmap(JSON.parse(elem.value)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
parseNotOperator(result, not, callback);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
function readTagIndex(bucketName, searchTerm, not, callback) {
|
||||||
|
let term = null;
|
||||||
|
let operator = null;
|
||||||
|
if (searchTerm.indexOf("--integer") !== -1) {
|
||||||
|
operator = searchTerm.split("/")[1];
|
||||||
|
term = searchTerm.replace(operator, "");
|
||||||
|
term = term.replace("/", "");
|
||||||
|
searchIntRange(bucketName, operator, term, not, callback);
|
||||||
|
} else if (searchTerm.indexOf("--regexp") !== -1) {
|
||||||
|
searchTerm = searchTerm.replace("--regexp", "");
|
||||||
|
searchTerm = searchTerm.substring(11);
|
||||||
|
searchRegExp(bucketName, searchTerm, not, callback);
|
||||||
|
} else {
|
||||||
|
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||||
|
callback(err, data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function readFileSizeIndex(bucketName, searchTerm, not, callback) {
|
||||||
|
const operator = searchTerm.split("/")[1];
|
||||||
|
let term = searchTerm.replace(operator, "");
|
||||||
|
term = term.replace("/", "");
|
||||||
|
searchIntRange(bucketName, operator, term, not, callback);
|
||||||
|
}
|
||||||
|
function readModDateIndex(bucketName, searchTerm, not, callback) {
|
||||||
|
console.log(searchTerm);
|
||||||
|
const operator = searchTerm.split("/")[1];
|
||||||
|
let term = searchTerm.replace(operator, "");
|
||||||
|
term = term.replace("/", "");
|
||||||
|
return searchIntRange(bucketName, operator, term, not, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
function readACLIndex(bucketName, searchTerm, not, callback) {
|
||||||
|
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||||
|
callback(err, data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function readContentTypeIndex(bucketName, searchTerm, not, callback) {
|
||||||
|
readDB(`${bucketName}/${searchTerm}`, (err, data) =>{
|
||||||
|
callback(err, data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function readIndex(bucketName, searchTerm, callback) {
|
||||||
|
if (searchTerm.indexOf("op/AND") !== -1
|
||||||
|
|| searchTerm.indexOf("op/OR") !== -1
|
||||||
|
|| searchTerm.indexOf("op/NOT") !== -1) {
|
||||||
|
callback(null, searchTerm);
|
||||||
|
}
|
||||||
|
let notOperator = false;
|
||||||
|
let result;
|
||||||
|
if (searchTerm.indexOf("tags") !== -1) {
|
||||||
|
return readTagIndex(bucketName, searchTerm, notOperator, callback);
|
||||||
|
} else if (searchTerm.indexOf("filesize") !== -1) {
|
||||||
|
return readFileSizeIndex(bucketName, searchTerm, notOperator, callback);
|
||||||
|
} else if (searchTerm.indexOf("modificationdate") !== -1) {
|
||||||
|
return readModDateIndex(bucketName, searchTerm, notOperator, callback);
|
||||||
|
} else if (searchTerm.indexOf("contenttype") !== -1 || searchTerm.indexOf("contentsubtype") !== -1) {
|
||||||
|
return readContentTypeIndex(bucketName, searchTerm, notOperator, callback);
|
||||||
|
} else if (searchTerm.indexOf("acl") !== -1) {
|
||||||
|
return readACLIndex(bucketName, searchTerm, notOperator, callback);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
function bitmapToStore(bitmap) {
|
||||||
|
const toStore = bitmap.write();
|
||||||
|
toStore[2] = toStore[2].toString("binary");
|
||||||
|
return toStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
function deleteOldEntries(bucketName, rowId, cb) {
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateBitmap(bitmap, rowId) {
|
||||||
|
if (bitmap.length() - 1 <= rowId) {
|
||||||
|
bitmap.push(rowId);
|
||||||
|
} else {
|
||||||
|
bitmap.set(rowId);
|
||||||
|
}
|
||||||
|
return bitmapToStore(bitmap);
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateIndexEntry(key, rowId, cb) {
|
||||||
|
indexd.get(key, (err, data) => {
|
||||||
|
if (err) {
|
||||||
|
return (err);
|
||||||
|
} else {
|
||||||
|
data = JSON.parse(data)
|
||||||
|
}
|
||||||
|
indexd.put(key, JSON.stringify(updateBitmap(storeToBitmap(data), rowId)), err =>{
|
||||||
|
return (err);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateIntIndex(bucketName, objName, attribute, value, cb) {
|
||||||
|
indexd.writeIndex(`${bucketName}/${attribute}/${value}`, objName, tx, (tx) => {
|
||||||
|
return cb(tx);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateACLIndex(bucketName, objName, objVal, rowId) {
|
||||||
|
deleteOldEntries(bucketName, rowId, () => {
|
||||||
|
Object.keys(objVal).forEach(elem => {
|
||||||
|
if (typeof objVal[elem] === "string") {
|
||||||
|
writeIndex(`${bucketName}/acl/${elem}/${objVal[elem]}`, objName, `${bucketName}/acl`, `${elem}/${objVal[elem]}`, rowId, (err) =>{
|
||||||
|
return ;
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
objVal[elem].forEach(item => {
|
||||||
|
writeIndex(`${bucketName}/acl/${elem}/${item}`, objName, `${bucketName}/acl`, `${elem}/${item}`, rowId, (err) =>{
|
||||||
|
return ;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateContentTypeIndex(bucketName, objName, objVal, rowId) {
|
||||||
|
const type = objVal.split("/")[0];
|
||||||
|
const subtype = objVal.split("/")[1];
|
||||||
|
writeIndex(`${bucketName}/contenttype/${type}`, objName, `${bucketName}/contenttype`, type, rowId, (err) =>{
|
||||||
|
writeIndex(`${bucketName}/contentsubtype/${subtype}`, objName, `${bucketName}/contentsubtype`, subtype, rowId, (err) =>{
|
||||||
|
return ;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateΜodDateIndex(bucketName, objName, objVal, rowId) {
|
||||||
|
const date = new Date(objVal);
|
||||||
|
const term = "modificationdate-";
|
||||||
|
const year = date.getUTCFullYear();
|
||||||
|
const month = date.getUTCMonth() + 1;
|
||||||
|
const day = date.getUTCDate();
|
||||||
|
const hours = date.getUTCHours();
|
||||||
|
const minutes = date.getUTCMinutes();
|
||||||
|
updateIntIndex(bucketName, objName, `${term}year`, year, rowId);
|
||||||
|
updateIntIndex(bucketName, objName, `${term}month`, month, rowId);
|
||||||
|
updateIntIndex(bucketName, objName, `${term}day`, day, rowId);
|
||||||
|
updateIntIndex(bucketName, objName, `${term}hours`, hours, rowId);
|
||||||
|
updateIntIndex(bucketName, objName, `${term}minutes`, minutes, rowId);
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateFileSizeIndex(bucketName, objName, objVal, tx, cb) {
|
||||||
|
console.log("updateFileSizeIndex", indexedAttributes);
|
||||||
|
if (indexedAttributes.indexOf("size") === -1) {
|
||||||
|
console.log("return");
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
indexd.writeIndex(`${bucketName}/filesize/${parseInt(objVal, 10)}`, objName, tx, (tx) => {
|
||||||
|
return cb(tx);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateTagIndex(bucketName, objName, objVal, tx, cb) {
|
||||||
|
console.log("updateTagIndex", indexedAttributes);
|
||||||
|
if (indexedAttributes.indexOf("tags") === -1) {
|
||||||
|
console.log("return");
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
const tags = [];
|
||||||
|
Object.keys(objVal).forEach(elem => {
|
||||||
|
if (elem.indexOf("x-amz-meta") !== -1 &&
|
||||||
|
elem !== "x-amz-meta-s3cmd-attrs") {
|
||||||
|
tags.push({key: elem.replace("x-amz-meta-", ""), value: objVal[elem]});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
console.log(tags);
|
||||||
|
tags.forEach(tag => {
|
||||||
|
indexd.writeIndex(`${bucketName}/tags/${tag.key}/${tag.value}`, objName, tx, (tx) => {
|
||||||
|
return cb(tx);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const index = {
|
||||||
|
evaluateQuery: (params) => {
|
||||||
|
let queryTerms = params.term;
|
||||||
|
const bucketName = params.bucketName;
|
||||||
|
readIndex(bucketName, queryTerms, (err, queryTerms) => {
|
||||||
|
filterRemoved(queryTerms, params, (results) => {
|
||||||
|
indexd.respondQuery(params, results);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
updateIndex: (bucketName, objName, objVal) => {
|
||||||
|
indexd.startTx( (tx) => {
|
||||||
|
updateIndexMeta(bucketName, objName, tx, (tx) => {
|
||||||
|
updateTagIndex(bucketName, objName, objVal, tx, (tx) => {
|
||||||
|
updateFileSizeIndex(bucketName, objName, objVal["content-length"], tx, (tx) => {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
/*
|
||||||
|
if (indexedAttributes.indexOf("filesize") !== -1) {
|
||||||
|
updateFileSizeIndex(bucketName, objName, objVal["content-length"], rowId);
|
||||||
|
}
|
||||||
|
if (indexedAttributes.indexOf("date") !== -1) {
|
||||||
|
updateΜodDateIndex(bucketName, objName, objVal["last-modified"], rowId);
|
||||||
|
}
|
||||||
|
if (indexedAttributes.indexOf("contenttype") !== -1) {
|
||||||
|
updateContentTypeIndex(bucketName, objName, objVal["content-type"], rowId);
|
||||||
|
}
|
||||||
|
if (indexedAttributes.indexOf("acl") !== -1) {
|
||||||
|
updateACLIndex(bucketName, objName, objVal["acl"], rowId);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
});
|
||||||
|
})
|
||||||
|
},
|
||||||
|
|
||||||
|
deleteObject: (bucketName, objName) => {
|
||||||
|
indexd.updateAntidoteSet(`${bucketName}/removed`, objName, () => {});
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
exports.default = index;
|
|
@ -0,0 +1,95 @@
|
||||||
|
const fs = require('fs');
|
||||||
|
const io = require('socket.io-client');
|
||||||
|
const antidoteClient = require('antidote_ts_client');
|
||||||
|
const antidoteCli = require('../../antidoteNodeCli/lib/api.js');
|
||||||
|
|
||||||
|
let config = fs.readFileSync('./config.json', { encoding: 'utf-8' });
|
||||||
|
config = JSON.parse(config);
|
||||||
|
|
||||||
|
let antidote;
|
||||||
|
let antidotedb;
|
||||||
|
const clients = [];
|
||||||
|
|
||||||
|
const utils = {
|
||||||
|
connectToS3(host, port, attributes) {
|
||||||
|
let client = io.connect(`http://${host}:${port}`, {
|
||||||
|
reconnection: true,
|
||||||
|
});
|
||||||
|
client.on('connect', function() {
|
||||||
|
for (let i = 0; i < attributes.length; i++) {
|
||||||
|
client.emit('subscribe', attributes[i]);
|
||||||
|
client.emit('subscribe', 'put');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
client.on('reconnecting', function(number) {
|
||||||
|
});
|
||||||
|
client.on('error', function(err) {
|
||||||
|
});
|
||||||
|
client.on('put', function(msg) {
|
||||||
|
require('./indexOps.js').default.updateIndex(msg.bucketName, msg.objName, msg.objVal);
|
||||||
|
});
|
||||||
|
client.on('query', function(msg) {
|
||||||
|
msg.client = client;
|
||||||
|
require('./indexOps.js').default.evaluateQuery(msg);
|
||||||
|
});
|
||||||
|
client.on('delete', function(msg) {
|
||||||
|
require('./indexOps.js').default.deleteObject(msg.bucketName, msg.objName);
|
||||||
|
});
|
||||||
|
clients.push(client)
|
||||||
|
},
|
||||||
|
|
||||||
|
connectToDB() {
|
||||||
|
antidotedb = antidoteCli.connect(config.antidote.port, config.antidote.host);
|
||||||
|
antidote = antidoteClient.connect(config.antidote.port, config.antidote.host);
|
||||||
|
antidoteCli.setBucket(antidotedb, 'index');
|
||||||
|
antidote.defaultBucket = 'index';
|
||||||
|
},
|
||||||
|
|
||||||
|
updateAntidoteSet(key, elem, tx, cb) {
|
||||||
|
console.log("updateAntidoteSet", key, elem);
|
||||||
|
antidoteCli.updateSetTx(tx, 'keys', [key], (tx) => {
|
||||||
|
antidoteCli.updateSetTx(tx, key, [elem], (tx) => {
|
||||||
|
return cb(tx);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
startTx(cb) {
|
||||||
|
antidoteCli.startTx(antidotedb, (tx) => {
|
||||||
|
return cb(tx)
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
commitTx(cb) {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
writeIndex(key, objName, tx, cb) {
|
||||||
|
console.log("writeIndex", key, objName);
|
||||||
|
antidoteCli.updateSetTx(tx, 'keys', [key], (tx) => {
|
||||||
|
antidoteCli.updateSetTx(tx, key, [objName], (tx) => {
|
||||||
|
return cb(tx);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
readIndex(key, cb) {
|
||||||
|
antidoteCli.readSetTx(tx, key, (tx, data) => {
|
||||||
|
return cb(tx, data);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
respondQuery(params, queryTerms) {
|
||||||
|
let client = params.client;
|
||||||
|
client.emit('query_response', {
|
||||||
|
result: queryTerms,
|
||||||
|
id: params.id,
|
||||||
|
term: params.term
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
exports.default = utils;
|
||||||
|
exports.config = config;
|
|
@ -0,0 +1,47 @@
|
||||||
|
|
||||||
|
import ListResult from '../in_memory/ListResult';
|
||||||
|
|
||||||
|
export class ListBucketResult extends ListResult {
|
||||||
|
constructor() {
|
||||||
|
super();
|
||||||
|
this.Contents = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
addContentsKey(key, keyMap) {
|
||||||
|
const objectMD = keyMap;
|
||||||
|
this.Contents.push({
|
||||||
|
key,
|
||||||
|
value: {
|
||||||
|
LastModified: objectMD['last-modified'],
|
||||||
|
ETag: objectMD['content-md5'],
|
||||||
|
StorageClass: objectMD['x-amz-storage-class'],
|
||||||
|
Owner: {
|
||||||
|
DisplayName: objectMD['owner-display-name'],
|
||||||
|
ID: objectMD['owner-id'],
|
||||||
|
},
|
||||||
|
Size: objectMD['content-length'],
|
||||||
|
// Initiated is used for overview of MPU
|
||||||
|
Initiated: objectMD.initiated,
|
||||||
|
// Initiator is used for overview of MPU.
|
||||||
|
// It is an object containing DisplayName
|
||||||
|
// and ID
|
||||||
|
Initiator: objectMD.initiator,
|
||||||
|
// EventualStorageBucket is used for overview of MPU
|
||||||
|
EventualStorageBucket: objectMD.eventualStorageBucket,
|
||||||
|
// Used for parts of MPU
|
||||||
|
partLocations: objectMD.partLocations,
|
||||||
|
// creationDate is just used for serviceGet
|
||||||
|
creationDate: objectMD.creationDate,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
this.MaxKeys += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
hasDeleteMarker(key, keyMap) {
|
||||||
|
const objectMD = keyMap;
|
||||||
|
if (objectMD['x-amz-delete-marker'] !== undefined) {
|
||||||
|
return (objectMD['x-amz-delete-marker'] === true);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,339 @@
|
||||||
|
import { errors } from 'arsenal';
|
||||||
|
|
||||||
|
import { markerFilter, prefixFilter } from '../in_memory/bucket_utilities';
|
||||||
|
import { ListBucketResult } from './ListBucketResult';
|
||||||
|
import getMultipartUploadListing from '../in_memory/getMultipartUploadListing';
|
||||||
|
import config from '../../Config';
|
||||||
|
import async from 'async';
|
||||||
|
import antidoteCli from '../../antidoteNodeCli/lib/api.js';
|
||||||
|
|
||||||
|
const defaultMaxKeys = 1000;
|
||||||
|
|
||||||
|
class AntidoteInterface {
|
||||||
|
constructor() {
|
||||||
|
this.antidotedb = antidoteCli.connect(config.antidote.port, config.antidote.host);
|
||||||
|
}
|
||||||
|
|
||||||
|
createBucket(bucketName, bucketMD, log, cb) {
|
||||||
|
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||||
|
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||||
|
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
|
||||||
|
// TODO Check whether user already owns the bucket,
|
||||||
|
// if so return "BucketAlreadyOwnedByYou"
|
||||||
|
// If not owned by user, return "BucketAlreadyExists"
|
||||||
|
if (bucket) {
|
||||||
|
return cb(errors.BucketAlreadyExists);
|
||||||
|
}
|
||||||
|
const mapKeys = []
|
||||||
|
const mapValues = []
|
||||||
|
Object.keys(bucketMD).forEach(key => {
|
||||||
|
mapKeys.push(key)
|
||||||
|
mapValues.push(bucketMD[key])
|
||||||
|
});
|
||||||
|
antidoteCli.updateMapRegisterTx(tx, `${bucketName}/md`, mapKeys, mapValues, (tx) => {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
putBucketAttributes(bucketName, bucketMD, log, cb) {
|
||||||
|
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||||
|
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||||
|
this.getBucketAttributes(tx, bucketName, log, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
const mapKeys = []
|
||||||
|
const mapValues = []
|
||||||
|
Object.keys(bucketMD).forEach(key => {
|
||||||
|
mapKeys.push(key)
|
||||||
|
mapValues.push(bucketMD[key])
|
||||||
|
});
|
||||||
|
antidoteCli.updateMapRegisterTx(tx, `${bucketName}/md`, mapKeys, mapValues, (tx) => {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getBucketAttributes(tx, bucketName, log, cb) {
|
||||||
|
if (tx === null) {
|
||||||
|
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||||
|
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||||
|
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
if (Object.keys(bucketMD).length === 0) {
|
||||||
|
return cb(errors.NoSuchBucket);
|
||||||
|
}
|
||||||
|
return cb(null, bucketMD);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
|
||||||
|
if (Object.keys(bucketMD).length === 0) {
|
||||||
|
return cb(errors.NoSuchBucket);
|
||||||
|
}
|
||||||
|
return cb(null, bucketMD);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
deleteBucket(bucketName, log, cb) {
|
||||||
|
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||||
|
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||||
|
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
antidoteCli.readSetTx(tx, `${bucketName}/objs`, (tx, objects) => {
|
||||||
|
if (bucket && objects.length > 0) {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
return cb(errors.BucketNotEmpty);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
|
||||||
|
const mapKeys = []
|
||||||
|
Object.keys(bucketMD).forEach(key => {
|
||||||
|
mapKeys.push(key)
|
||||||
|
});
|
||||||
|
antidoteCli.removeMapRegisterTx(tx, `${bucketName}/md`, mapKeys, (tx) => {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
return cb(null);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
putObject(bucketName, objName, objVal, log, cb) {
|
||||||
|
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||||
|
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||||
|
this.getBucketAttributes(tx, bucketName, log, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
const mapKeys = []
|
||||||
|
const mapValues = []
|
||||||
|
Object.keys(objVal).forEach(key => {
|
||||||
|
mapKeys.push(key)
|
||||||
|
mapValues.push(objVal[key])
|
||||||
|
});
|
||||||
|
antidoteCli.updateSetTx(tx, `${bucketName}/objs`, [objName], (tx) => {
|
||||||
|
antidoteCli.updateMapRegisterTx(tx, `${objName}`, mapKeys, mapValues, (tx) => {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getBucketAndObject(bucketName, objName, log, cb) {
|
||||||
|
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||||
|
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||||
|
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err, { bucket });
|
||||||
|
}
|
||||||
|
const bucket_MD = {}
|
||||||
|
Object.keys(bucket).map(function(key) {
|
||||||
|
bucket_MD[key.substr(1)] = bucket[key]
|
||||||
|
});
|
||||||
|
antidoteCli.readMapTx(tx, `${objName}`, (tx, objectMD) => {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||||
|
return cb(null, { bucket: JSON.stringify(bucket_MD) });
|
||||||
|
}
|
||||||
|
return cb(null, {
|
||||||
|
bucket: JSON.stringify(bucket_MD),
|
||||||
|
obj: JSON.stringify(objectMD),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getObject(bucketName, objName, log, cb) {
|
||||||
|
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||||
|
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||||
|
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
antidoteCli.readMapTx(tx, `${objName}`, (tx, objectMD) => {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||||
|
return cb(errors.NoSuchKey);
|
||||||
|
}
|
||||||
|
return cb(null, objectMD);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
deleteObject(bucketName, objName, log, cb) {
|
||||||
|
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||||
|
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||||
|
this.getBucketAttributes(tx, bucketName, log, (err, bucket) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
antidoteCli.readMapTx(tx, `${objName}`, (tx, objectMD) => {
|
||||||
|
if (!bucket || Object.keys(objectMD).length === 0) {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
return cb(errors.NoSuchKey);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
const mapKeys = []
|
||||||
|
Object.keys(objectMD).forEach(key => {
|
||||||
|
mapKeys.push(key)
|
||||||
|
});
|
||||||
|
antidoteCli.removeMapRegisterTx(tx, `${objName}`, mapKeys, (tx) => {
|
||||||
|
antidoteCli.removeSetTx(tx, `${bucketName}/objs`, [objName], (tx) => {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getObjectMD(tx, bucketName, key, callback) {
|
||||||
|
antidoteCli.readMapTx(tx, `${key}`, (tx, objectMD) => {
|
||||||
|
if (Object.keys(objectMD).length === 0) {
|
||||||
|
return callback(error.NoSuchKey, null);
|
||||||
|
}
|
||||||
|
return callback(null, objectMD);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
listObject(bucketName, params, log, cb) {
|
||||||
|
const { prefix, marker, delimiter, maxKeys } = params;
|
||||||
|
if (prefix && typeof prefix !== 'string') {
|
||||||
|
return cb(errors.InvalidArgument);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (marker && typeof marker !== 'string') {
|
||||||
|
return cb(errors.InvalidArgument);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (delimiter && typeof delimiter !== 'string') {
|
||||||
|
return cb(errors.InvalidArgument);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxKeys && typeof maxKeys !== 'number') {
|
||||||
|
return cb(errors.InvalidArgument);
|
||||||
|
}
|
||||||
|
|
||||||
|
let numKeys = maxKeys;
|
||||||
|
// If paramMaxKeys is undefined, the default parameter will set it.
|
||||||
|
// However, if it is null, the default parameter will not set it.
|
||||||
|
if (numKeys === null) {
|
||||||
|
numKeys = defaultMaxKeys;
|
||||||
|
}
|
||||||
|
|
||||||
|
antidoteCli.setBucket(this.antidotedb, `storage/${bucketName}`);
|
||||||
|
antidoteCli.startTx(this.antidotedb, (tx) => {
|
||||||
|
antidoteCli.readMapTx(tx, `${bucketName}/md`, (tx, bucketMD) => {
|
||||||
|
if (Object.keys(bucketMD).length === 0) {
|
||||||
|
return cb(errors.NoSuchBucket);
|
||||||
|
}
|
||||||
|
const response = new ListBucketResult();
|
||||||
|
antidoteCli.readSetTx(tx, `${bucketName}/objs`, (tx, keys) => {
|
||||||
|
async.map(keys, this.getObjectMD.bind(null, tx, bucketName), function(err, objectMeta) {
|
||||||
|
antidoteCli.commitTx(tx, () => {
|
||||||
|
// If marker specified, edit the keys array so it
|
||||||
|
// only contains keys that occur alphabetically after the marker
|
||||||
|
if (marker) {
|
||||||
|
keys = markerFilter(marker, keys);
|
||||||
|
response.Marker = marker;
|
||||||
|
}
|
||||||
|
// If prefix specified, edit the keys array so it only
|
||||||
|
// contains keys that contain the prefix
|
||||||
|
if (prefix) {
|
||||||
|
keys = prefixFilter(prefix, keys);
|
||||||
|
response.Prefix = prefix;
|
||||||
|
}
|
||||||
|
// Iterate through keys array and filter keys containing
|
||||||
|
// delimiter into response.CommonPrefixes and filter remaining
|
||||||
|
// keys into response.Contents
|
||||||
|
for (let i = 0; i < keys.length; ++i) {
|
||||||
|
const currentKey = keys[i];
|
||||||
|
// Do not list object with delete markers
|
||||||
|
if (response.hasDeleteMarker(currentKey,
|
||||||
|
objectMeta[i])) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// If hit numKeys, stop adding keys to response
|
||||||
|
if (response.MaxKeys >= numKeys) {
|
||||||
|
response.IsTruncated = true;
|
||||||
|
response.NextMarker = keys[i - 1];
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// If a delimiter is specified, find its index in the
|
||||||
|
// current key AFTER THE OCCURRENCE OF THE PREFIX
|
||||||
|
let delimiterIndexAfterPrefix = -1;
|
||||||
|
let prefixLength = 0;
|
||||||
|
if (prefix) {
|
||||||
|
prefixLength = prefix.length;
|
||||||
|
}
|
||||||
|
const currentKeyWithoutPrefix = currentKey
|
||||||
|
.slice(prefixLength);
|
||||||
|
let sliceEnd;
|
||||||
|
if (delimiter) {
|
||||||
|
delimiterIndexAfterPrefix = currentKeyWithoutPrefix
|
||||||
|
.indexOf(delimiter);
|
||||||
|
sliceEnd = delimiterIndexAfterPrefix + prefixLength;
|
||||||
|
response.Delimiter = delimiter;
|
||||||
|
}
|
||||||
|
// If delimiter occurs in current key, add key to
|
||||||
|
// response.CommonPrefixes.
|
||||||
|
// Otherwise add key to response.Contents
|
||||||
|
if (delimiterIndexAfterPrefix > -1) {
|
||||||
|
const keySubstring = currentKey.slice(0, sliceEnd + 1);
|
||||||
|
response.addCommonPrefix(keySubstring);
|
||||||
|
} else {
|
||||||
|
response.addContentsKey(currentKey,
|
||||||
|
objectMeta[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cb(null, response);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
listMultipartUploads(bucketName, listingParams, log, cb) {
|
||||||
|
process.nextTick(() => {
|
||||||
|
this.getBucketAttributes(null, bucketName, log, (err, bucket) => {
|
||||||
|
if (bucket === undefined) {
|
||||||
|
// no on going multipart uploads, return empty listing
|
||||||
|
return cb(null, {
|
||||||
|
IsTruncated: false,
|
||||||
|
NextMarker: undefined,
|
||||||
|
MaxKeys: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return getMultipartUploadListing(bucket, listingParams, cb);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export default AntidoteInterface;
|
|
@ -2,7 +2,10 @@ import BucketClientInterface from './bucketclient/backend';
|
||||||
import BucketFileInterface from './bucketfile/backend';
|
import BucketFileInterface from './bucketfile/backend';
|
||||||
import BucketInfo from './BucketInfo';
|
import BucketInfo from './BucketInfo';
|
||||||
import inMemory from './in_memory/backend';
|
import inMemory from './in_memory/backend';
|
||||||
|
import AntidoteInterface from './antidote/backend';
|
||||||
import config from '../Config';
|
import config from '../Config';
|
||||||
|
import indexClient from '../indexClient/indexClient'
|
||||||
|
import async from 'async'
|
||||||
|
|
||||||
let client;
|
let client;
|
||||||
let implName;
|
let implName;
|
||||||
|
@ -16,6 +19,33 @@ if (config.backends.metadata === 'mem') {
|
||||||
} else if (config.backends.metadata === 'scality') {
|
} else if (config.backends.metadata === 'scality') {
|
||||||
client = new BucketClientInterface();
|
client = new BucketClientInterface();
|
||||||
implName = 'bucketclient';
|
implName = 'bucketclient';
|
||||||
|
} else if (config.backends.metadata === 'antidote') {
|
||||||
|
client = new AntidoteInterface();
|
||||||
|
implName = 'antidote';
|
||||||
|
}
|
||||||
|
|
||||||
|
function getQueryResults(params, objName, callback) {
|
||||||
|
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
|
||||||
|
client.getObject(bucketName, objName, log, (err, data) => {
|
||||||
|
if (err) {
|
||||||
|
callback(err, null);
|
||||||
|
}
|
||||||
|
callback(null, {key: objName, value: {
|
||||||
|
LastModified: data['last-modified'],
|
||||||
|
ETag: data['content-md5'],
|
||||||
|
StorageClass: data['x-amz-storage-class'],
|
||||||
|
Owner: {
|
||||||
|
ID: data['owner-id'],
|
||||||
|
DisplayName: data['owner-display-name']
|
||||||
|
},
|
||||||
|
Size: data['content-length'],
|
||||||
|
Initiated: undefined,
|
||||||
|
Initiator: undefined,
|
||||||
|
EventualStorageBucket: undefined,
|
||||||
|
partLocations: undefined,
|
||||||
|
creationDate: undefined
|
||||||
|
}});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
const metadata = {
|
const metadata = {
|
||||||
|
@ -45,7 +75,7 @@ const metadata = {
|
||||||
|
|
||||||
getBucket: (bucketName, log, cb) => {
|
getBucket: (bucketName, log, cb) => {
|
||||||
log.debug('getting bucket from metadata');
|
log.debug('getting bucket from metadata');
|
||||||
client.getBucketAttributes(bucketName, log, (err, data) => {
|
client.getBucketAttributes(null, bucketName, log, (err, data) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.debug('error from metadata', { implName, error: err });
|
log.debug('error from metadata', { implName, error: err });
|
||||||
return cb(err);
|
return cb(err);
|
||||||
|
@ -75,6 +105,12 @@ const metadata = {
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
log.debug('object successfully put in metadata');
|
log.debug('object successfully put in metadata');
|
||||||
|
if (config.userMetaIndexing || config.systemMetaIndexing) {
|
||||||
|
if (objName.indexOf('..|..') !== -1) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
indexClient.putObjectMD(bucketName, objName, objVal);
|
||||||
|
}
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}, params);
|
}, params);
|
||||||
},
|
},
|
||||||
|
@ -113,11 +149,17 @@ const metadata = {
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
log.debug('object deleted from metadata');
|
log.debug('object deleted from metadata');
|
||||||
|
if (config.userMetaIndexing || config.systemMetaIndexing) {
|
||||||
|
indexClient.deleteObjectMD(bucketName, objName);
|
||||||
|
}
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}, params);
|
}, params);
|
||||||
},
|
},
|
||||||
|
|
||||||
listObject: (bucketName, listingParams, log, cb) => {
|
listObject: (bucketName, listingParams, log, cb) => {
|
||||||
|
if (listingParams.query) {
|
||||||
|
indexClient.listObject(bucketName, listingParams, cb);
|
||||||
|
} else {
|
||||||
client
|
client
|
||||||
.listObject(bucketName, listingParams,
|
.listObject(bucketName, listingParams,
|
||||||
log, (err, data) => {
|
log, (err, data) => {
|
||||||
|
@ -129,6 +171,34 @@ const metadata = {
|
||||||
log.debug('object listing retrieved from metadata');
|
log.debug('object listing retrieved from metadata');
|
||||||
return cb(err, data);
|
return cb(err, data);
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
respondQueryGetMD: (result, params) => {
|
||||||
|
async.map(result, getQueryResults.bind(null, params), function(err, res) {
|
||||||
|
const response = {
|
||||||
|
IsTruncated: false,
|
||||||
|
NextMarker: params.marker,
|
||||||
|
CommonPrefixes: [],
|
||||||
|
MaxKeys: 10,
|
||||||
|
Contents: res
|
||||||
|
}
|
||||||
|
return params.cb(err, response);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
respondQueryFilter: (result, params) => {
|
||||||
|
let { bucketName, prefix, marker, maxKeys, delimiter, log, cb} = params;
|
||||||
|
client.listObject(bucketName, { prefix, marker, maxKeys, delimiter },
|
||||||
|
log, (err, data) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
data.Contents = data.Contents.filter(elem => {
|
||||||
|
return result.indexOf(elem.key) !== -1;
|
||||||
|
});
|
||||||
|
return cb(err, data);
|
||||||
|
});
|
||||||
},
|
},
|
||||||
|
|
||||||
listMultipartUploads: (bucketName, listingParams, log, cb) => {
|
listMultipartUploads: (bucketName, listingParams, log, cb) => {
|
||||||
|
|
|
@ -7,7 +7,7 @@ import { logger } from './utilities/logger';
|
||||||
import { clientCheck } from './utilities/healthcheckHandler';
|
import { clientCheck } from './utilities/healthcheckHandler';
|
||||||
import _config from './Config';
|
import _config from './Config';
|
||||||
import routes from './routes';
|
import routes from './routes';
|
||||||
|
import indexClient from './indexClient/indexClient'
|
||||||
|
|
||||||
class S3Server {
|
class S3Server {
|
||||||
/**
|
/**
|
||||||
|
@ -38,6 +38,7 @@ class S3Server {
|
||||||
* This starts the http server.
|
* This starts the http server.
|
||||||
*/
|
*/
|
||||||
startup(port) {
|
startup(port) {
|
||||||
|
indexClient.createPublisher(_config.indexServerPort, logger);
|
||||||
// Todo: http.globalAgent.maxSockets, http.globalAgent.maxFreeSockets
|
// Todo: http.globalAgent.maxSockets, http.globalAgent.maxFreeSockets
|
||||||
if (_config.https) {
|
if (_config.https) {
|
||||||
this.server = https.createServer({
|
this.server = https.createServer({
|
||||||
|
@ -118,7 +119,7 @@ class S3Server {
|
||||||
|
|
||||||
export default function main() {
|
export default function main() {
|
||||||
let clusters = _config.clusters || 1;
|
let clusters = _config.clusters || 1;
|
||||||
if (process.env.S3BACKEND === 'mem') {
|
if (process.env.S3BACKEND === 'mem' || process.env.S3BACKEND === 'antidote') {
|
||||||
clusters = 1;
|
clusters = 1;
|
||||||
}
|
}
|
||||||
if (cluster.isMaster) {
|
if (cluster.isMaster) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
},
|
},
|
||||||
"homepage": "https://github.com/scality/S3#readme",
|
"homepage": "https://github.com/scality/S3#readme",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"antidote_ts_client": "^0.1.0",
|
||||||
"arsenal": "scality/Arsenal",
|
"arsenal": "scality/Arsenal",
|
||||||
"async": "~1.4.2",
|
"async": "~1.4.2",
|
||||||
"babel-core": "^6.5.2",
|
"babel-core": "^6.5.2",
|
||||||
|
@ -33,6 +34,7 @@
|
||||||
"multilevel": "^7.3.0",
|
"multilevel": "^7.3.0",
|
||||||
"node-uuid": "^1.4.3",
|
"node-uuid": "^1.4.3",
|
||||||
"ready-set-stream": "1.0.7",
|
"ready-set-stream": "1.0.7",
|
||||||
|
"socket.io": "^1.7.2",
|
||||||
"sproxydclient": "scality/sproxydclient",
|
"sproxydclient": "scality/sproxydclient",
|
||||||
"utapi": "scality/utapi",
|
"utapi": "scality/utapi",
|
||||||
"utf8": "~2.1.1",
|
"utf8": "~2.1.1",
|
||||||
|
@ -47,6 +49,7 @@
|
||||||
"aws-sdk": "^2.2.11",
|
"aws-sdk": "^2.2.11",
|
||||||
"babel-cli": "^6.2.0",
|
"babel-cli": "^6.2.0",
|
||||||
"babel-eslint": "^6.0.0",
|
"babel-eslint": "^6.0.0",
|
||||||
|
"babel-plugin-transform-async-to-generator": "^6.24.1",
|
||||||
"bluebird": "^3.3.1",
|
"bluebird": "^3.3.1",
|
||||||
"eslint": "^2.4.0",
|
"eslint": "^2.4.0",
|
||||||
"eslint-config-airbnb": "^6.0.0",
|
"eslint-config-airbnb": "^6.0.0",
|
||||||
|
@ -68,9 +71,13 @@
|
||||||
"ft_test": "mocha tester.js --compilers js:babel-core/register",
|
"ft_test": "mocha tester.js --compilers js:babel-core/register",
|
||||||
"init": "node init.js",
|
"init": "node init.js",
|
||||||
"install_ft_deps": "npm install aws-sdk@2.2.11 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7 zombie@5.0.5",
|
"install_ft_deps": "npm install aws-sdk@2.2.11 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7 zombie@5.0.5",
|
||||||
|
"install_antidote_client": "babel --plugins transform-async-to-generator lib/antidoteNodeCli/src/ -d lib/antidoteNodeCli/lib/",
|
||||||
"lint": "eslint $(git ls-files '*.js')",
|
"lint": "eslint $(git ls-files '*.js')",
|
||||||
"lint_md": "mdlint $(git ls-files '*.md')",
|
"lint_md": "mdlint $(git ls-files '*.md')",
|
||||||
"mem_backend": "S3BACKEND=mem node index.js",
|
"mem_backend": "S3BACKEND=mem node index.js",
|
||||||
|
"antidote_backend": "S3BACKEND=antidote node index.js",
|
||||||
|
"antidote_deploy": "S3BACKEND=antidote ./deploy_instances.sh 3",
|
||||||
|
"antidote_stop": "S3BACKEND=antidote ./stop_instances.sh",
|
||||||
"perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js",
|
"perf": "mocha --compilers js:babel-core/register tests/performance/s3standard.js",
|
||||||
"start": "node init.js && node index.js",
|
"start": "node init.js && node index.js",
|
||||||
"start_utapi": "node utapiServer.js",
|
"start_utapi": "node utapiServer.js",
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
const fs = require('fs');
|
||||||
|
const path = require('path');
|
||||||
|
|
||||||
|
const configpath = path.join(__dirname, '/config.json');
|
||||||
|
var config = fs.readFileSync(configpath, { encoding: 'utf-8' });
|
||||||
|
config = JSON.parse(config);
|
||||||
|
|
||||||
|
config.port = 8000 + ((config.port - 8000 + 1) % 3);
|
||||||
|
config.antidote.port = 8187 + ((config.antidote.port - 8187 + 100) % 400);
|
||||||
|
config.indexServerPort = 7000 + ((config.indexServerPort - 7000 + 1) % 3);
|
||||||
|
|
||||||
|
config = JSON.stringify(config);
|
||||||
|
fs.writeFileSync(configpath, config, { encoding: 'utf-8' });
|
|
@ -0,0 +1,4 @@
|
||||||
|
#!/bin/bash
|
||||||
|
fuser -k -n tcp 8000
|
||||||
|
fuser -k -n tcp 8001
|
||||||
|
fuser -k -n tcp 8002
|
Loading…
Reference in New Issue