Compare commits
41 Commits
developmen
...
doNotUse/f
Author | SHA1 | Date |
---|---|---|
Lauren Spiegel | 91caf0cfc2 | |
Lauren Spiegel | 36abb26df9 | |
Lauren Spiegel | 6396df3f28 | |
Lauren Spiegel | 95b7fa6adb | |
Lauren Spiegel | b99fe466a3 | |
Lauren Spiegel | 3e35be09c9 | |
Lauren Spiegel | fc46c41452 | |
Lauren Spiegel | 6b2acb4022 | |
Lauren Spiegel | 807cdaef1f | |
Lauren Spiegel | fa2c35390b | |
Lauren Spiegel | c677c7e478 | |
Cloud User | 969af0b641 | |
Lauren Spiegel | 72652939f0 | |
Lauren Spiegel | 23403de630 | |
Lauren Spiegel | 63ba9ddee6 | |
Lauren Spiegel | 0dab2acd52 | |
Lauren Spiegel | 5d19abed9f | |
Lauren Spiegel | 8fdd1a3aa1 | |
Lauren Spiegel | 95a9619923 | |
Lauren Spiegel | e91726e58d | |
Lauren Spiegel | 81db1bc187 | |
Lauren Spiegel | 3ebbaf7056 | |
Lauren Spiegel | b408918ff6 | |
Lauren Spiegel | a117ad1e6b | |
Lauren Spiegel | 9e2726ee73 | |
Lauren Spiegel | b552883a94 | |
Lauren Spiegel | 02598f7d73 | |
Lauren Spiegel | 16c198b4b6 | |
Lauren Spiegel | f73d1f0575 | |
Lauren Spiegel | 287c10b8da | |
Lauren Spiegel | 28a9427b0a | |
Lauren Spiegel | 1099af426f | |
Lauren Spiegel | 816d118a12 | |
Lauren Spiegel | 8dbcb6da10 | |
Lauren Spiegel | edd48a9557 | |
Lauren Spiegel | 2faa8e811c | |
Lauren Spiegel | 0fbff464fe | |
Lauren Spiegel | 39aff4231d | |
Lauren Spiegel | 06ffdd31f2 | |
Lauren Spiegel | 6b6c3e1ad9 | |
Cloud User | 596c79f376 |
|
@ -0,0 +1,98 @@
|
||||||
|
#!/bin/sh
|
||||||
|
// 2>/dev/null ; exec "$(which nodejs || which node)" "$0" "$@"
|
||||||
|
'use strict'; // eslint-disable-line strict
|
||||||
|
|
||||||
|
const { auth } = require('arsenal');
|
||||||
|
const commander = require('commander');
|
||||||
|
|
||||||
|
const http = require('http');
|
||||||
|
const https = require('https');
|
||||||
|
const logger = require('../lib/utilities/logger');
|
||||||
|
|
||||||
|
function _performSearch(host,
|
||||||
|
port,
|
||||||
|
bucketName,
|
||||||
|
query,
|
||||||
|
accessKey,
|
||||||
|
secretKey,
|
||||||
|
verbose, ssl) {
|
||||||
|
const escapedSearch = encodeURIComponent(query);
|
||||||
|
console.log("escapedsearch!!", escapedSearch);
|
||||||
|
const options = {
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
method: 'GET',
|
||||||
|
path: `/${bucketName}/?search=${escapedSearch}`,
|
||||||
|
headers: {
|
||||||
|
'Content-Length': 0,
|
||||||
|
},
|
||||||
|
rejectUnauthorized: false,
|
||||||
|
};
|
||||||
|
const transport = ssl ? https : http;
|
||||||
|
const request = transport.request(options, response => {
|
||||||
|
if (verbose) {
|
||||||
|
logger.info('response status code', {
|
||||||
|
statusCode: response.statusCode,
|
||||||
|
});
|
||||||
|
logger.info('response headers', { headers: response.headers });
|
||||||
|
}
|
||||||
|
const body = [];
|
||||||
|
response.setEncoding('utf8');
|
||||||
|
response.on('data', chunk => body.push(chunk));
|
||||||
|
response.on('end', () => {
|
||||||
|
if (response.statusCode >= 200 && response.statusCode < 300) {
|
||||||
|
logger.info('Success');
|
||||||
|
process.stdout.write(body.join(''));
|
||||||
|
process.exit(0);
|
||||||
|
} else {
|
||||||
|
logger.error('request failed with HTTP Status ', {
|
||||||
|
statusCode: response.statusCode,
|
||||||
|
body: body.join(''),
|
||||||
|
});
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
auth.client.generateV4Headers(request, { search: query },
|
||||||
|
accessKey, secretKey, 's3');
|
||||||
|
if (verbose) {
|
||||||
|
logger.info('request headers', { headers: request._headers });
|
||||||
|
}
|
||||||
|
request.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function is used as a binary to send a request to S3 to perform a
|
||||||
|
* search on the objects in a bucket
|
||||||
|
*
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
function searchBucket() {
|
||||||
|
// TODO: Include other bucket listing possible query params?
|
||||||
|
commander
|
||||||
|
.version('0.0.1')
|
||||||
|
.option('-a, --access-key <accessKey>', 'Access key id')
|
||||||
|
.option('-k, --secret-key <secretKey>', 'Secret access key')
|
||||||
|
.option('-b, --bucket <bucket>', 'Name of the bucket')
|
||||||
|
.option('-q, --query <query>', 'Search query')
|
||||||
|
.option('-h, --host <host>', 'Host of the server')
|
||||||
|
.option('-p, --port <port>', 'Port of the server')
|
||||||
|
.option('-s', '--ssl', 'Enable ssl')
|
||||||
|
.option('-v, --verbose')
|
||||||
|
.parse(process.argv);
|
||||||
|
|
||||||
|
const { host, port, accessKey, secretKey, bucket, query, verbose, ssl } =
|
||||||
|
commander;
|
||||||
|
|
||||||
|
if (!host || !port || !accessKey || !secretKey || !bucket || !query) {
|
||||||
|
logger.error('missing parameter');
|
||||||
|
commander.outputHelp();
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
_performSearch(host, port, bucket, query, accessKey, secretKey, verbose,
|
||||||
|
ssl);
|
||||||
|
}
|
||||||
|
|
||||||
|
searchBucket();
|
14
config.json
14
config.json
|
@ -7,7 +7,9 @@
|
||||||
"127.0.0.1": "us-east-1",
|
"127.0.0.1": "us-east-1",
|
||||||
"s3.docker.test": "us-east-1",
|
"s3.docker.test": "us-east-1",
|
||||||
"127.0.0.2": "us-east-1",
|
"127.0.0.2": "us-east-1",
|
||||||
"s3.amazonaws.com": "us-east-1"
|
"s3.amazonaws.com": "us-east-1",
|
||||||
|
"s3-front-lb": "us-east-1",
|
||||||
|
"lb": "us-east-1"
|
||||||
},
|
},
|
||||||
"websiteEndpoints": ["s3-website-us-east-1.amazonaws.com",
|
"websiteEndpoints": ["s3-website-us-east-1.amazonaws.com",
|
||||||
"s3-website.us-east-2.amazonaws.com",
|
"s3-website.us-east-2.amazonaws.com",
|
||||||
|
@ -43,6 +45,11 @@
|
||||||
"healthChecks": {
|
"healthChecks": {
|
||||||
"allowFrom": ["127.0.0.1/8", "::1"]
|
"allowFrom": ["127.0.0.1/8", "::1"]
|
||||||
},
|
},
|
||||||
|
"livy": {
|
||||||
|
"host": "livy",
|
||||||
|
"port": 8998,
|
||||||
|
"transport": "http"
|
||||||
|
},
|
||||||
"metadataClient": {
|
"metadataClient": {
|
||||||
"host": "localhost",
|
"host": "localhost",
|
||||||
"port": 9990
|
"port": 9990
|
||||||
|
@ -60,7 +67,8 @@
|
||||||
"port": 9991
|
"port": 9991
|
||||||
},
|
},
|
||||||
"recordLog": {
|
"recordLog": {
|
||||||
"enabled": false,
|
"enabled": true,
|
||||||
"recordLogName": "s3-recordlog"
|
"recordLogName": "s3-recordlog"
|
||||||
}
|
},
|
||||||
|
"whiteListedIps": ["127.0.0.1/8", "::1"]
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
version: '3'
|
||||||
|
|
||||||
|
services:
|
||||||
|
s3-front:
|
||||||
|
image: 127.0.0.1:5000/s3server
|
||||||
|
build: .
|
||||||
|
ports:
|
||||||
|
- "8000:8000"
|
|
@ -354,6 +354,24 @@ class Config {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (config.livy) {
|
||||||
|
this.livy = {};
|
||||||
|
assert.strictEqual(typeof config.livy.host, 'string',
|
||||||
|
'bad config: livy host must be ' +
|
||||||
|
'a string');
|
||||||
|
this.livy.host = config.livy.host;
|
||||||
|
|
||||||
|
assert(Number.isInteger(config.livy.port)
|
||||||
|
&& config.livy.port > 0,
|
||||||
|
'bad config: livy port must be a positive ' +
|
||||||
|
'integer');
|
||||||
|
this.livy.port = config.livy.port;
|
||||||
|
assert(this.livy.transport !== 'http' &&
|
||||||
|
this.livy.transport !== 'https', 'bad config: livy ' +
|
||||||
|
'transport must be either "http" or "https"');
|
||||||
|
this.livy.transport = config.livy.transport;
|
||||||
|
}
|
||||||
|
|
||||||
if (config.dataClient) {
|
if (config.dataClient) {
|
||||||
this.dataClient = {};
|
this.dataClient = {};
|
||||||
assert.strictEqual(typeof config.dataClient.host, 'string',
|
assert.strictEqual(typeof config.dataClient.host, 'string',
|
||||||
|
@ -589,6 +607,17 @@ class Config {
|
||||||
.concat(config.healthChecks.allowFrom);
|
.concat(config.healthChecks.allowFrom);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.whiteListedIps = [];
|
||||||
|
if (config.whiteListedIps) {
|
||||||
|
assert(Array.isArray(config.whiteListedIps), 'config: invalid ' +
|
||||||
|
'whiteListedIps. whiteListedIps must be array');
|
||||||
|
config.whiteListedIps.forEach(ip => {
|
||||||
|
assert.strictEqual(typeof ip, 'string', 'config: invalid ' +
|
||||||
|
'whiteListedIps. each item in whiteListedIps must be a string');
|
||||||
|
});
|
||||||
|
this.whiteListedIps = config.whiteListedIps;
|
||||||
|
}
|
||||||
|
|
||||||
if (config.certFilePaths) {
|
if (config.certFilePaths) {
|
||||||
assert(typeof config.certFilePaths === 'object' &&
|
assert(typeof config.certFilePaths === 'object' &&
|
||||||
typeof config.certFilePaths.key === 'string' &&
|
typeof config.certFilePaths.key === 'string' &&
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
const Parser = require('node-sql-parser').Parser;
|
||||||
|
const parser = new Parser();
|
||||||
|
const { errors } = require('arsenal');
|
||||||
|
const objModel = require('arsenal').models.ObjectMD;
|
||||||
|
|
||||||
|
function _validateTree(whereClause, possibleAttributes) {
|
||||||
|
let invalidAttribute;
|
||||||
|
function _searchTree(node) {
|
||||||
|
if (!invalidAttribute) {
|
||||||
|
// the node.table would contain userMd for instance
|
||||||
|
// and then the column would contain x-amz-meta-whatever
|
||||||
|
if (node.table) {
|
||||||
|
if (!possibleAttributes[node.table]) {
|
||||||
|
invalidAttribute = node.table;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// if there is no table, the column would contain the non-nested
|
||||||
|
// attribute (e.g., content-length)
|
||||||
|
if (!node.table && node.column) {
|
||||||
|
if (!possibleAttributes[node.column]) {
|
||||||
|
invalidAttribute = node.column;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// the attributes we care about are always on the left because
|
||||||
|
// the value being searched for is on the right
|
||||||
|
if (node.left) {
|
||||||
|
_searchTree(node.left);
|
||||||
|
}
|
||||||
|
if (node.right && node.right.left) {
|
||||||
|
_searchTree(node.right.left);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_searchTree(whereClause);
|
||||||
|
return invalidAttribute;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* validateSearchParams - validate value of ?search= in request
|
||||||
|
* @param {string} searchParams - value of search params in request
|
||||||
|
* which should be jsu sql where clause
|
||||||
|
* For metadata: userMd.`x-amz-meta-color`=\"blue\"
|
||||||
|
* For tags: tags.`x-amz-meta-color`=\"blue\"
|
||||||
|
* For any other attribute: `content-length`=5
|
||||||
|
* @return {undefined | error} undefined if validates or arsenal error if not
|
||||||
|
*/
|
||||||
|
function validateSearchParams(searchParams) {
|
||||||
|
let ast;
|
||||||
|
try {
|
||||||
|
ast =
|
||||||
|
parser.parse(`SELECT * FROM t WHERE ${searchParams}`);
|
||||||
|
} catch (e) {
|
||||||
|
if (e) {
|
||||||
|
return errors.InvalidArgument
|
||||||
|
.customizeDescription('Invalid sql where clause ' +
|
||||||
|
'sent as search query');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const possibleAttributes = objModel.getAttributes();
|
||||||
|
// For search we aggregate the user metadata (x-amz-meta)
|
||||||
|
// under the userMd attribute so add to possibilities
|
||||||
|
possibleAttributes.userMd = true;
|
||||||
|
const invalidAttribute = _validateTree(ast.where, possibleAttributes);
|
||||||
|
if (invalidAttribute) {
|
||||||
|
return errors.InvalidArgument.customizeDescription('Search param ' +
|
||||||
|
`contains unknown attribute: ${invalidAttribute}`);
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = validateSearchParams;
|
|
@ -1,5 +1,7 @@
|
||||||
const querystring = require('querystring');
|
const querystring = require('querystring');
|
||||||
const { errors, versioning, s3middleware } = require('arsenal');
|
const async = require('async');
|
||||||
|
const Backoff = require('backo');
|
||||||
|
const { errors, versioning, s3middleware, LivyClient } = require('arsenal');
|
||||||
|
|
||||||
const constants = require('../../constants');
|
const constants = require('../../constants');
|
||||||
const services = require('../services');
|
const services = require('../services');
|
||||||
|
@ -7,9 +9,96 @@ const { metadataValidateBucket } = require('../metadata/metadataUtils');
|
||||||
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
|
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
|
||||||
const escapeForXml = s3middleware.escapeForXml;
|
const escapeForXml = s3middleware.escapeForXml;
|
||||||
const { pushMetric } = require('../utapi/utilities');
|
const { pushMetric } = require('../utapi/utilities');
|
||||||
|
const validateSearchParams = require('../api/apiUtils/bucket/validateSearch');
|
||||||
const versionIdUtils = versioning.VersionID;
|
const versionIdUtils = versioning.VersionID;
|
||||||
|
|
||||||
|
const PRINT_BACKOFF_PARAMS = { min: 150, max: 1000 };
|
||||||
|
|
||||||
|
const config = require('../Config.js').config;
|
||||||
|
const werelogs = require('werelogs');
|
||||||
|
werelogs.configure({ level: config.log.logLevel,
|
||||||
|
dump: config.log.dumpLevel });
|
||||||
|
const log = new werelogs.Logger('LivyClient');
|
||||||
|
const useHttps = !!config.livy.transport.https;
|
||||||
|
const livyClient = new LivyClient(config.livy.host,
|
||||||
|
config.livy.port, log, useHttps);
|
||||||
|
const setUpSessionCode = 'import com.scality.clueso._\n' +
|
||||||
|
'import com.scality.clueso.query._\n' +
|
||||||
|
'val config = com.scality.clueso.SparkUtils.' +
|
||||||
|
'loadCluesoConfig("/apps/spark-modules/application.conf"); \n' +
|
||||||
|
'SparkUtils.confSparkSession(spark,config); \n' +
|
||||||
|
'val queryExecutor = MetadataQueryExecutor(spark, config); \n';
|
||||||
|
const sessionConfig = {
|
||||||
|
kind: 'spark',
|
||||||
|
driverMemory: '3g',
|
||||||
|
numExecutors: 4,
|
||||||
|
executorMemory: '8g',
|
||||||
|
jars: ['/apps/spark-modules/clueso-1.0-SNAPSHOT-all.jar'],
|
||||||
|
conf: { 'spark.hadoop.fs.s3a.impl':
|
||||||
|
'org.apache.hadoop.fs.s3a.S3AFileSystem',
|
||||||
|
'spark.hadoop.fs.s3a.connection.ssl.enabled': 'false',
|
||||||
|
// TODO: We need to figure out how to configure this directly in spark
|
||||||
|
// or could use the restEndpoints from the config
|
||||||
|
'spark.hadoop.fs.s3a.endpoint': 'lb',
|
||||||
|
// TODO: For Zenko, we can send admin keys but for enterprise version,
|
||||||
|
// s3 will not have access to keys. So, this should be set
|
||||||
|
// in spark config directly on deployment.
|
||||||
|
'spark.hadoop.fs.s3a.access.key': 'accessKey1',
|
||||||
|
'spark.hadoop.fs.s3a.secret.key': 'verySecretKey1',
|
||||||
|
'spark.hadoop.fs.s3a.path.style.access': 'true',
|
||||||
|
'spark.cores.max': '8',
|
||||||
|
'spark.metrics.namespace': 'clueso_searcher',
|
||||||
|
'spark.driver.port': '38600',
|
||||||
|
'spark.metrics.conf': '/apps/spark-modules/metrics.properties',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
// parse JSON safely without throwing an exception
|
||||||
|
function _safeJSONParse(s) {
|
||||||
|
try {
|
||||||
|
return JSON.parse(s);
|
||||||
|
} catch (e) {
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef {Object} availableSession
|
||||||
|
* @property {number} [sessionId] sessionId to use
|
||||||
|
* @property {boolean} [SlowDown] whether to return SlowDown error
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* findAvailableSession - find an idle session
|
||||||
|
* @param {availableSession[]} sessions - array of session objects
|
||||||
|
* @return {object} availableSession
|
||||||
|
* @
|
||||||
|
*/
|
||||||
|
function findAvailableSession(sessions) {
|
||||||
|
const availableSession = {};
|
||||||
|
const idleSessions = [];
|
||||||
|
const activeSessions = [];
|
||||||
|
sessions.forEach(session => {
|
||||||
|
if(session.state === 'idle') {
|
||||||
|
idleSessions.push(session);
|
||||||
|
}
|
||||||
|
if(session.state === 'busy' || session.state === 'starting') {
|
||||||
|
activeSessions.push(session);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if(idleSessions.length > 0) {
|
||||||
|
const sessionIndex = Math.floor(Math.random() * idleSessions.length);
|
||||||
|
availableSession.sessionId = idleSessions[sessionIndex].id;
|
||||||
|
return availableSession;
|
||||||
|
}
|
||||||
|
if(activeSessions.length >= 4) {
|
||||||
|
availableSession.SlowDown = true;
|
||||||
|
return availableSession;
|
||||||
|
}
|
||||||
|
return availableSession;
|
||||||
|
}
|
||||||
|
|
||||||
// Sample XML response for GET bucket objects:
|
// Sample XML response for GET bucket objects:
|
||||||
/* <ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
/* <ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
||||||
<Name>example-bucket</Name>
|
<Name>example-bucket</Name>
|
||||||
|
@ -201,6 +290,153 @@ function processMasterVersions(bucketName, listParams, list) {
|
||||||
return xml.join('');
|
return xml.join('');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function handleStatement(sessionId, codeToExecute, jobName, corsHeaders,
|
||||||
|
bucketName, listParams, log, callback) {
|
||||||
|
log.info('about to postStatement with codeToExecute', { codeToExecute });
|
||||||
|
return livyClient.postStatement(sessionId,
|
||||||
|
codeToExecute, (err, res) => {
|
||||||
|
console.log("code executed!!!", codeToExecute);
|
||||||
|
console.log("err from livy on postStatement of code!!", err);
|
||||||
|
if (err) {
|
||||||
|
log.info('error from livy posting code' +
|
||||||
|
'statement', { error: err.message });
|
||||||
|
return callback(errors.InternalError
|
||||||
|
.customizeDescription('Error ' +
|
||||||
|
'performing search'),
|
||||||
|
null, corsHeaders);
|
||||||
|
}
|
||||||
|
if (!res || !res.state ||
|
||||||
|
res.state === 'error' ||
|
||||||
|
res.state === 'cancelling' ||
|
||||||
|
res.state === 'cancelled') {
|
||||||
|
log.info('bad response from livy posting code' +
|
||||||
|
'statement', { responseState: res ? res.state : undefined });
|
||||||
|
return callback(errors.InternalError
|
||||||
|
.customizeDescription('Error ' +
|
||||||
|
'performing search'),
|
||||||
|
null, corsHeaders);
|
||||||
|
}
|
||||||
|
const resultsCode =
|
||||||
|
`queryExecutor.printResults(${jobName}, scala.concurrent.duration.Duration.apply(200, java.util.concurrent.TimeUnit.MILLISECONDS))`;
|
||||||
|
let keepChecking = true;
|
||||||
|
const backoffInstance = new Backoff(PRINT_BACKOFF_PARAMS);
|
||||||
|
return async.whilst(
|
||||||
|
// test function
|
||||||
|
() => { return keepChecking; },
|
||||||
|
// repeating function
|
||||||
|
(cb) => {
|
||||||
|
// post statement
|
||||||
|
livyClient.postStatement(sessionId, resultsCode, (err, res) => {
|
||||||
|
console.log("code executed to print results!!!", resultsCode)
|
||||||
|
console.log("err from livy on postStatement to print results!!", err)
|
||||||
|
if (err) {
|
||||||
|
log.info('error from livy posting print' +
|
||||||
|
'statement', { error: err.message });
|
||||||
|
return cb(errors.InternalError
|
||||||
|
.customizeDescription('Error ' +
|
||||||
|
'performing search'));
|
||||||
|
}
|
||||||
|
if (!res || !Number.isInteger(res.id)) {
|
||||||
|
log.error('posting statement did not result ' +
|
||||||
|
'in valid statement id', { resFromLivy: res });
|
||||||
|
return cb(errors.InternalError
|
||||||
|
.customizeDescription('Error ' +
|
||||||
|
'performing search'));
|
||||||
|
}
|
||||||
|
// get statement
|
||||||
|
livyClient.getStatement(sessionId, res.id, (err, res) => {
|
||||||
|
if (err) {
|
||||||
|
console.log("error getting statement!!", err)
|
||||||
|
log.info('error from livy getting ' +
|
||||||
|
'statement', { error: err.message });
|
||||||
|
return cb(errors.InternalError
|
||||||
|
.customizeDescription('Error ' +
|
||||||
|
'performing search'));
|
||||||
|
}
|
||||||
|
if (!res || !res.data || !res.data['text/plain']
|
||||||
|
|| !res.status === 'ok' ||
|
||||||
|
res.data['text/plain'].includes('ERROR')) {
|
||||||
|
log.debug('getting statement of print is' +
|
||||||
|
'not valid or reveals error',
|
||||||
|
{ resFromLivy: res });
|
||||||
|
// error out
|
||||||
|
console.log("get on print statement says to bail!! res:", res)
|
||||||
|
return cb(errors.InternalError
|
||||||
|
.customizeDescription('Error ' +
|
||||||
|
'performing search'));
|
||||||
|
}
|
||||||
|
if (res.data['text/plain'].includes('RESULT')) {
|
||||||
|
keepChecking = false;
|
||||||
|
console.log("gotResults and res is...!!!", res);
|
||||||
|
return cb(null, res);
|
||||||
|
}
|
||||||
|
console.log("res.data does not include result. retrying!!", res.data)
|
||||||
|
// not ready, keep trying
|
||||||
|
return setTimeout(cb, backoffInstance);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
// function when test function passes or there is error
|
||||||
|
(err, res) => {
|
||||||
|
console.log("err, res from async.until!!", err, res)
|
||||||
|
if (err) {
|
||||||
|
return callback(err, null, corsHeaders);
|
||||||
|
}
|
||||||
|
// remove RESULT string and extra livy info at end
|
||||||
|
let parsedRes = res.data['text/plain'];
|
||||||
|
const start = parsedRes.indexOf('RESULT:') + 7;
|
||||||
|
const end = parsedRes.lastIndexOf(']\n') + 1;
|
||||||
|
parsedRes = parsedRes.
|
||||||
|
substring(start, end);
|
||||||
|
parsedRes = _safeJSONParse(parsedRes);
|
||||||
|
if (parsedRes instanceof Error) {
|
||||||
|
log.error('livy returned invalid json',
|
||||||
|
{ resFromLivy: res });
|
||||||
|
return callback(errors.InternalError
|
||||||
|
.customizeDescription('Error ' +
|
||||||
|
'performing search'),
|
||||||
|
null, corsHeaders);
|
||||||
|
}
|
||||||
|
// Not grouping searched keys by common prefix so just
|
||||||
|
// set CommonPrefixes to an empty array
|
||||||
|
const list = { CommonPrefixes: [] };
|
||||||
|
list.Contents = parsedRes.map(entry => {
|
||||||
|
return {
|
||||||
|
key: entry.key,
|
||||||
|
value: {
|
||||||
|
LastModified: entry['last-modified'],
|
||||||
|
ETag: entry['content-md5'],
|
||||||
|
Size: entry['content-length'],
|
||||||
|
StorageClass: entry['x-amz-storage-class'],
|
||||||
|
Owner: {
|
||||||
|
ID: entry['owner-id'],
|
||||||
|
DisplayName:
|
||||||
|
entry['owner-display-name'],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
console.log("constructed list.Contents!!", list.Contents);
|
||||||
|
if (listParams.maxKeys < list.Contents.length) {
|
||||||
|
// If received one more key than the max, the
|
||||||
|
// last item is to send back a next marker
|
||||||
|
// so remove from contents and send as NextMarker
|
||||||
|
list.NextMarker = list.Contents.pop().key;
|
||||||
|
list.isTruncated = 'true';
|
||||||
|
}
|
||||||
|
// TODO: (1) handle versioning,
|
||||||
|
// (2) TEST nextMarkers -- nextMarker should be
|
||||||
|
// the last key returned if the number of keys is
|
||||||
|
// more than the
|
||||||
|
// max keys (since we request max plus 1)
|
||||||
|
// (3) TEST sending nextMarker and max keys
|
||||||
|
const xml = processMasterVersions(bucketName,
|
||||||
|
listParams, list);
|
||||||
|
return callback(null, xml, corsHeaders);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* bucketGet - Return list of objects in bucket
|
* bucketGet - Return list of objects in bucket
|
||||||
* @param {AuthInfo} authInfo - Instance of AuthInfo class with
|
* @param {AuthInfo} authInfo - Instance of AuthInfo class with
|
||||||
|
@ -225,6 +461,12 @@ function bucketGet(authInfo, request, log, callback) {
|
||||||
if (Number.isNaN(requestMaxKeys) || requestMaxKeys < 0) {
|
if (Number.isNaN(requestMaxKeys) || requestMaxKeys < 0) {
|
||||||
return callback(errors.InvalidArgument);
|
return callback(errors.InvalidArgument);
|
||||||
}
|
}
|
||||||
|
if (params.search !== undefined) {
|
||||||
|
const validation = validateSearchParams(params.search);
|
||||||
|
if (validation instanceof Error) {
|
||||||
|
return callback(validation);
|
||||||
|
}
|
||||||
|
}
|
||||||
// AWS only returns 1000 keys even if max keys are greater.
|
// AWS only returns 1000 keys even if max keys are greater.
|
||||||
// Max keys stated in response xml can be greater than actual
|
// Max keys stated in response xml can be greater than actual
|
||||||
// keys returned.
|
// keys returned.
|
||||||
|
@ -257,6 +499,77 @@ function bucketGet(authInfo, request, log, callback) {
|
||||||
listParams.versionIdMarker = params['version-id-marker'] ?
|
listParams.versionIdMarker = params['version-id-marker'] ?
|
||||||
versionIdUtils.decode(params['version-id-marker']) : undefined;
|
versionIdUtils.decode(params['version-id-marker']) : undefined;
|
||||||
}
|
}
|
||||||
|
if (params.search !== undefined) {
|
||||||
|
log.info('performaing search listing', { search: params.search });
|
||||||
|
// Add escape character to quotes since enclosing where clause
|
||||||
|
// in quotes when sending to livy
|
||||||
|
const whereClause = params.search.replace(/"/g, '\\"');
|
||||||
|
console.log("whereClause!!", whereClause);
|
||||||
|
// spark should return keys starting AFTER marker alphabetically
|
||||||
|
// spark should return up to maxKeys
|
||||||
|
const start = listParams.marker ? `Some("${listParams.marker}")` :
|
||||||
|
'None';
|
||||||
|
// need to start with character rather than number
|
||||||
|
const jobName = `job${Date.now()}`;
|
||||||
|
const searchCodeToExecute =
|
||||||
|
`var ${jobName} = queryExecutor.execAsync(MetadataQuery` +
|
||||||
|
`("${bucketName}", "${whereClause}", ${start}, ` +
|
||||||
|
// Add one to the keys requested so we can use the last key
|
||||||
|
// as a next marker if needed
|
||||||
|
// Might just need the last one???!!!
|
||||||
|
`${listParams.maxKeys + 1}));\n`;
|
||||||
|
|
||||||
|
// List sessions to find available.
|
||||||
|
// If at least 4 active and busy/starting, return SlowDown error
|
||||||
|
// (don't want to create too many since holding dataframes
|
||||||
|
// in mem within a session)
|
||||||
|
// If idle sessions, use random available one
|
||||||
|
return livyClient.getSessions(null, null,
|
||||||
|
(err, res) => {
|
||||||
|
if (err || !res) {
|
||||||
|
log.info('err from livy listing sessions',
|
||||||
|
{ error: err });
|
||||||
|
return callback(errors.InternalError
|
||||||
|
.customizeDescription('Error contacting spark ' +
|
||||||
|
'for search'), null, corsHeaders);
|
||||||
|
}
|
||||||
|
const availableSession = findAvailableSession(res.sessions);
|
||||||
|
if (availableSession.SlowDown) {
|
||||||
|
return callback(errors.SlowDown, null, corsHeaders);
|
||||||
|
}
|
||||||
|
if (availableSession.sessionId === undefined) {
|
||||||
|
return livyClient.postSession(sessionConfig,
|
||||||
|
(err, res) => {
|
||||||
|
if (err) {
|
||||||
|
log.info('error from livy creating session',
|
||||||
|
{ error: err.message });
|
||||||
|
return callback(errors.InternalError
|
||||||
|
.customizeDescription('Error ' +
|
||||||
|
'performing search'),
|
||||||
|
null, corsHeaders);
|
||||||
|
}
|
||||||
|
if (!res || !Number.isInteger(res.id)) {
|
||||||
|
log.error('posting session did not ' +
|
||||||
|
'result in valid session id',
|
||||||
|
{ resFromLivy: res });
|
||||||
|
return callback(errors.InternalError
|
||||||
|
.customizeDescription('Error ' +
|
||||||
|
'performing search'),
|
||||||
|
null, corsHeaders);
|
||||||
|
}
|
||||||
|
const codeToExecute = `${setUpSessionCode} ` +
|
||||||
|
`${searchCodeToExecute};`;
|
||||||
|
return handleStatement(res.id, codeToExecute,
|
||||||
|
jobName, corsHeaders, bucketName,
|
||||||
|
listParams, log, callback);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// no need to create session
|
||||||
|
return handleStatement(availableSession.sessionId,
|
||||||
|
searchCodeToExecute, jobName, corsHeaders, bucketName,
|
||||||
|
listParams, log, callback);
|
||||||
|
});
|
||||||
|
}
|
||||||
return services.getObjectListing(bucketName, listParams, log,
|
return services.getObjectListing(bucketName, listParams, log,
|
||||||
(err, list) => {
|
(err, list) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
|
|
|
@ -16,6 +16,7 @@ const StatsClient = require('./StatsClient');
|
||||||
const routes = arsenal.s3routes.routes;
|
const routes = arsenal.s3routes.routes;
|
||||||
const allEndpoints = Object.keys(_config.restEndpoints);
|
const allEndpoints = Object.keys(_config.restEndpoints);
|
||||||
const websiteEndpoints = _config.websiteEndpoints;
|
const websiteEndpoints = _config.websiteEndpoints;
|
||||||
|
const whiteListedIps = _config.whiteListedIps;
|
||||||
|
|
||||||
// redis client
|
// redis client
|
||||||
let localCacheClient;
|
let localCacheClient;
|
||||||
|
@ -69,6 +70,7 @@ class S3Server {
|
||||||
websiteEndpoints,
|
websiteEndpoints,
|
||||||
blacklistedPrefixes,
|
blacklistedPrefixes,
|
||||||
dataRetrievalFn: data.get,
|
dataRetrievalFn: data.get,
|
||||||
|
whiteListedIps,
|
||||||
};
|
};
|
||||||
routes(req, res, params, logger);
|
routes(req, res, params, logger);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,15 @@
|
||||||
},
|
},
|
||||||
"homepage": "https://github.com/scality/S3#readme",
|
"homepage": "https://github.com/scality/S3#readme",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"arsenal": "scality/Arsenal#ft/clueso",
|
||||||
|
"async": "~2.5.0",
|
||||||
"aws-sdk": "2.28.0",
|
"aws-sdk": "2.28.0",
|
||||||
"arsenal": "scality/Arsenal",
|
|
||||||
"async": "~1.4.2",
|
|
||||||
"azure-storage": "^2.1.0",
|
"azure-storage": "^2.1.0",
|
||||||
|
"backo": "^1.1.0",
|
||||||
"bucketclient": "scality/bucketclient",
|
"bucketclient": "scality/bucketclient",
|
||||||
"commander": "^2.9.0",
|
"commander": "^2.9.0",
|
||||||
"ioredis": "2.4.0",
|
"ioredis": "2.4.0",
|
||||||
|
"node-sql-parser": "0.0.1",
|
||||||
"node-uuid": "^1.4.3",
|
"node-uuid": "^1.4.3",
|
||||||
"npm-run-all": "~4.0.2",
|
"npm-run-all": "~4.0.2",
|
||||||
"sproxydclient": "scality/sproxydclient",
|
"sproxydclient": "scality/sproxydclient",
|
||||||
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
const assert = require('assert');
|
||||||
|
const { errors } = require('arsenal');
|
||||||
|
const validateSearch =
|
||||||
|
require('../../../lib/api/apiUtils/bucket/validateSearch');
|
||||||
|
|
||||||
|
|
||||||
|
describe('validate search where clause', () => {
|
||||||
|
const tests = [
|
||||||
|
{
|
||||||
|
it: 'should allow a valid simple search with table attribute',
|
||||||
|
searchParams: 'userMd.`x-amz-meta-dog`="labrador"',
|
||||||
|
result: undefined,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should allow a simple search with known ' +
|
||||||
|
'column attribute',
|
||||||
|
searchParams: '`content-length`="10"',
|
||||||
|
result: undefined,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should allow valid search with AND',
|
||||||
|
searchParams: 'userMd.`x-amz-meta-dog`="labrador" ' +
|
||||||
|
'AND userMd.`x-amz-meta-age`="5"',
|
||||||
|
result: undefined,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should allow valid search with OR',
|
||||||
|
searchParams: 'userMd.`x-amz-meta-dog`="labrador" ' +
|
||||||
|
'OR userMd.`x-amz-meta-age`="5"',
|
||||||
|
result: undefined,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should allow valid search with double AND',
|
||||||
|
searchParams: 'userMd.`x-amz-meta-dog`="labrador" ' +
|
||||||
|
'AND userMd.`x-amz-meta-age`="5" ' +
|
||||||
|
'AND userMd.`x-amz-meta-whatever`="ok"',
|
||||||
|
result: undefined,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should allow valid chained search with tables and columns',
|
||||||
|
searchParams: 'userMd.`x-amz-meta-dog`="labrador" ' +
|
||||||
|
'AND userMd.`x-amz-meta-age`="5" ' +
|
||||||
|
'AND `content-length`="10"' +
|
||||||
|
'OR isDeleteMarker="true"' +
|
||||||
|
'AND userMd.`x-amz-meta-whatever`="ok"',
|
||||||
|
result: undefined,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should allow valid LIKE search',
|
||||||
|
searchParams: 'userMd.`x-amz-meta-dog` LIKE "lab%" ' +
|
||||||
|
'AND userMd.`x-amz-meta-age` LIKE "5%" ' +
|
||||||
|
'AND `content-length`="10"',
|
||||||
|
result: undefined,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should disallow a LIKE search with invalid attribute',
|
||||||
|
searchParams: 'userNotMd.`x-amz-meta-dog` LIKE "labrador"',
|
||||||
|
result: errors.InvalidArgument.customizeDescription('Search ' +
|
||||||
|
'param contains unknown attribute: userNotMd'),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should disallow a simple search with unknown attribute',
|
||||||
|
searchParams: 'userNotMd.`x-amz-meta-dog`="labrador"',
|
||||||
|
result: errors.InvalidArgument.customizeDescription('Search ' +
|
||||||
|
'param contains unknown attribute: userNotMd'),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should disallow a compound search with unknown ' +
|
||||||
|
'attribute on right',
|
||||||
|
searchParams: 'userMd.`x-amz-meta-dog`="labrador" AND ' +
|
||||||
|
'userNotMd.`x-amz-meta-dog`="labrador"',
|
||||||
|
result: errors.InvalidArgument.customizeDescription('Search ' +
|
||||||
|
'param contains unknown attribute: userNotMd'),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should disallow a compound search with unknown ' +
|
||||||
|
'attribute on left',
|
||||||
|
searchParams: 'userNotMd.`x-amz-meta-dog`="labrador" AND ' +
|
||||||
|
'userMd.`x-amz-meta-dog`="labrador"',
|
||||||
|
result: errors.InvalidArgument.customizeDescription('Search ' +
|
||||||
|
'param contains unknown attribute: userNotMd'),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should disallow a chained search with one invalid ' +
|
||||||
|
'table attribute',
|
||||||
|
searchParams: 'userMd.`x-amz-meta-dog`="labrador" ' +
|
||||||
|
'AND userMd.`x-amz-meta-age`="5" ' +
|
||||||
|
'OR userNotMd.`x-amz-meta-whatever`="ok"',
|
||||||
|
result: errors.InvalidArgument.customizeDescription('Search ' +
|
||||||
|
'param contains unknown attribute: userNotMd'),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should disallow a simple search with unknown ' +
|
||||||
|
'column attribute',
|
||||||
|
searchParams: 'whatever="labrador"',
|
||||||
|
result: errors.InvalidArgument.customizeDescription('Search ' +
|
||||||
|
'param contains unknown attribute: whatever'),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should disallow a chained search with one invalid ' +
|
||||||
|
'column attribute',
|
||||||
|
searchParams: 'userMd.`x-amz-meta-dog`="labrador" ' +
|
||||||
|
'AND userMd.`x-amz-meta-age`="5" ' +
|
||||||
|
'OR madeUp="something"' +
|
||||||
|
'OR userMd.`x-amz-meta-whatever`="ok"',
|
||||||
|
result: errors.InvalidArgument.customizeDescription('Search ' +
|
||||||
|
'param contains unknown attribute: madeUp'),
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
tests.forEach(test => {
|
||||||
|
it(test.it, () => {
|
||||||
|
const actualResult =
|
||||||
|
validateSearch(test.searchParams);
|
||||||
|
assert.deepStrictEqual(actualResult, test.result);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
Loading…
Reference in New Issue