Compare commits

...

21 Commits

Author SHA1 Message Date
Lauren Spiegel 20a7044d8c
Merge pull request #393 from scality/rf/livy
Rf/livy
2018-01-09 11:00:56 -08:00
JianqinWang 8d267feae7 rf: livyClient cleanup 2018-01-09 10:59:53 -08:00
Lauren Spiegel 22e9b8fc83 SQUASH -- modify executable script 2017-12-19 15:03:48 -08:00
Lauren Spiegel c807c54406 FT: Pensieve executable 2017-12-18 16:16:36 -08:00
Lauren Spiegel 1367ceea86 FT: Remove unnecessary changes 2017-12-15 14:51:51 -08:00
Lauren Spiegel 77c8e7de4b SQUASH with livyclient
1) fixes bug on backoff for get statement (wasn't backing off)
2) changed backoff params
3) added max attempts concept
4) added tests for max attempts
5) modified batch tests so using files in project
2017-12-14 17:42:13 -08:00
Lauren Spiegel d3592fe841 SQUASH -- further fix bad rebase 2017-11-13 16:53:47 -08:00
Lauren Spiegel e4a1d34675 SQUASH -- fix bad rebase 2017-10-31 17:38:37 -07:00
Lauren Spiegel 8d2cf59f06 DROP ME: bump arsenal 2017-10-31 17:22:26 -07:00
Lauren Spiegel 7f70c397c1 DROP ME: Deal with dependency issues 2017-10-31 17:04:11 -07:00
Lauren Spiegel b78b3eb012 expose object metadata attributes 2017-10-31 17:03:22 -07:00
Lauren Spiegel 19e35b02cd Backoff delay 2017-10-31 17:00:30 -07:00
Lauren Spiegel 54b4a6508f disable ip whitelist check 2017-10-31 17:00:30 -07:00
Lauren Spiegel 5e106f3c9a SQUASH -- refactor get statement so don't keep trying on error 2017-10-31 17:00:30 -07:00
Lauren Spiegel 55d4aa343c FT: Modify auth tool to handle query params 2017-10-31 17:00:30 -07:00
Lauren Spiegel 07615a3f17 Export livyClient 2017-10-31 17:00:30 -07:00
Lauren Spiegel 3ba1462ff4 SQUASH -- getting statement results works!! 2017-10-31 17:00:30 -07:00
Lauren Spiegel 37c910a51e SQUASH -- more (statements) 2017-10-31 17:00:30 -07:00
Lauren Spiegel 3115ce1377 SQUASH -further (batches) 2017-10-31 17:00:30 -07:00
Lauren Spiegel 0144f34e6e Livy Client 2017-10-31 17:00:30 -07:00
Lauren Spiegel 6e2fd0ae62 Temporary way to allow METADATA bucket requests 2017-10-31 17:00:30 -07:00
13 changed files with 913 additions and 1 deletions

8
.gitignore vendored
View File

@ -3,3 +3,11 @@
# Dependency directory # Dependency directory
node_modules/ node_modules/
*/node_modules/
# Build executables
*-win.exe
*-linux
*-macos

View File

@ -7,6 +7,7 @@ module.exports = {
stringHash: require('./lib/stringHash'), stringHash: require('./lib/stringHash'),
ipCheck: require('./lib/ipCheck'), ipCheck: require('./lib/ipCheck'),
jsutil: require('./lib/jsutil'), jsutil: require('./lib/jsutil'),
LivyClient: require('./lib/livyClient/lib/client.js'),
https: { https: {
ciphers: require('./lib/https/ciphers.js'), ciphers: require('./lib/https/ciphers.js'),
dhparam: require('./lib/https/dh2048.js'), dhparam: require('./lib/https/dh2048.js'),

View File

@ -1,6 +1,7 @@
'use strict'; // eslint-disable-line strict 'use strict'; // eslint-disable-line strict
const crypto = require('crypto'); const crypto = require('crypto');
const url = require('url');
const errors = require('../errors'); const errors = require('../errors');
const queryString = require('querystring'); const queryString = require('querystring');
const AuthInfo = require('./AuthInfo'); const AuthInfo = require('./AuthInfo');
@ -159,6 +160,11 @@ function doAuth(request, log, cb, awsService, requestContexts) {
function generateV4Headers(request, data, accessKey, secretKeyValue, function generateV4Headers(request, data, accessKey, secretKeyValue,
awsService, proxyPath) { awsService, proxyPath) {
Object.assign(request, { headers: {} }); Object.assign(request, { headers: {} });
// hold the full path to restore the request after creating signature
const holdPath = request.path;
// pull the path without the query since canonical uri is without query
// eslint-disable-next-line no-param-reassign
request.path = url.parse(request.path, true).pathname;
const amzDate = convertUTCtoISO8601(Date.now()); const amzDate = convertUTCtoISO8601(Date.now());
// get date without time // get date without time
const scopeDate = amzDate.slice(0, amzDate.indexOf('T')); const scopeDate = amzDate.slice(0, amzDate.indexOf('T'));
@ -202,6 +208,9 @@ function generateV4Headers(request, data, accessKey, secretKeyValue,
`Signature=${signature}`; `Signature=${signature}`;
request.setHeader('authorization', authorizationHeader); request.setHeader('authorization', authorizationHeader);
Object.assign(request, { headers: {} }); Object.assign(request, { headers: {} });
// restore path
// eslint-disable-next-line no-param-reassign
request.path = holdPath;
} }
module.exports = { module.exports = {

View File

@ -0,0 +1,20 @@
# Get Pensieve Credentials Executable
## To make executable file from getPensieveCreds.js
`npm install -g pkg`
`pkg getPensieveCreds.js`
This will build a mac, linux and windows file.
If you just want linux, for example:
`pkg getPensieveCreds.js --targets node6-linux-x64`
For further options, see https://github.com/zeit/pkg
## To run the executable file
Call the output executable file with an
argument that names the service you
are trying to get credentials for:
`./getPensieveCreds-linux serviceName`

View File

@ -0,0 +1,76 @@
const async = require('async');
const forge = require('node-forge');
const MetadataFileClient =
require('../../storage/metadata/file/MetadataFileClient');
const mdClient = new MetadataFileClient({
host: 's3-metadata',
port: '9993',
});
const serviceName = process.argv[2];
const tokenKey = 'auth/zenko/remote-management-token';
// XXX copy-pasted from Backbeat
function decryptSecret(instanceCredentials, secret) {
// XXX don't forget to use u.encryptionKeyVersion if present
const privateKey = forge.pki.privateKeyFromPem(
instanceCredentials.privateKey);
const encryptedSecretKey = forge.util.decode64(secret);
return privateKey.decrypt(encryptedSecretKey, 'RSA-OAEP', {
md: forge.md.sha256.create(),
});
}
function loadOverlayVersion(db, version, cb) {
db.get(`configuration/overlay/${version}`, {}, (err, val) => {
if (err) {
return cb(err);
}
return cb(null, JSON.parse(val));
});
}
function parseServiceCredentials(conf, auth) {
const instanceAuth = JSON.parse(auth);
const serviceAccount = (conf.users || []).find(
u => u.accountType === `service-${serviceName}`);
if (!serviceAccount) {
return undefined;
}
return {
accessKey: serviceAccount.accessKey,
secretKey: decryptSecret(instanceAuth, serviceAccount.secretKey),
};
}
const mdDb = mdClient.openDB(error => {
if (error) {
throw error;
}
const db = mdDb.openSub('PENSIEVE');
return async.waterfall([
cb => db.get('configuration/overlay-version', {}, cb),
(version, cb) => loadOverlayVersion(db, version, cb),
(conf, cb) => db.get(tokenKey, {}, (err, instanceAuth) => {
if (err) {
return cb(err);
}
const creds = parseServiceCredentials(conf, instanceAuth);
return cb(null, creds);
}),
], (err, creds) => {
db.disconnect();
if (err) {
throw err;
}
if (!creds) {
throw new Error('No credentials found');
}
process.stdout.write(`export AWS_ACCESS_KEY_ID="${creds.accessKey}"\n`);
process.stdout
.write(`export AWS_SECRET_ACCESS_KEY="${creds.secretKey}"`);
});
});

View File

@ -0,0 +1,13 @@
{
"name": "pensievecreds",
"version": "1.0.0",
"description": "Executable tool for Pensieve",
"main": "getPensieveCreds.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"dependencies": {
"async": "^2.6.0",
"node-forge": "^0.7.1"
}
}

View File

@ -0,0 +1,432 @@
'use strict'; // eslint-disable-line strict
const assert = require('assert');
const http = require('http');
const https = require('https');
const querystring = require('querystring');
const Backoff = require('backo');
// min and max in ms
const defaultGetBackoffParams = { min: 100, max: 200, maxAttempts: 300,
factor: 1.01 };
const defaultPostBackoffParams = { min: 1000, max: 3000, maxAttempts: 20,
factor: 1.1 };
class LivyClient {
/**
* Constructor for REST client to apache livy
*
* @param {object} params - object for all parameters to initialize Livy
* @return {undefined}
*/
constructor(params) {
assert(typeof params.host === 'string' && params.host !== '',
'host is required');
assert(params.port === undefined || Number.isInteger(params.port),
'port must be an integer');
assert(params.logger === undefined || typeof params.logger === 'object',
'logger must be an object');
assert(params.logger === undefined ||
typeof params.logger.error === 'function',
'logger must have error method');
assert(params.logger === undefined ||
typeof params.logger.info === 'function',
'logger must have info method');
assert(params.key === undefined || typeof params.key === 'string',
'key must be a string');
assert(params.cert === undefined || typeof params.cert === 'string',
'cert must be a string');
assert(params.ca === undefined || typeof params.ca === 'string',
'ca must be a string');
assert(params.getBackoffParams === undefined ||
typeof params.getBackoffParams === 'object',
'getBackoffParams must be an object');
assert(params.postBackoffParams === undefined ||
typeof params.postBackoffParams === 'object',
'postBackoffParams must be an object');
assert(params.getBackoffParams === undefined ||
(Number.isInteger(params.getBackoffParams.min) &&
Number.isInteger(params.getBackoffParams.max) &&
Number.isInteger(params.getBackoffParams.maxAttempts) &&
!isNaN(params.getBackoffParams.factor)),
'getBackoffParams should have valid numerical values for ' +
'the min, max, maxAttempts, and factor attributes');
assert(params.postBackoffParams === undefined ||
(Number.isInteger(params.postBackoffParams.min) &&
Number.isInteger(params.postBackoffParams.max) &&
Number.isInteger(params.postBackoffParams.maxAttempts) &&
!isNaN(params.postBackoffParams.factor)),
'postBackoffParams should have valid numerical values for ' +
'the min, max, maxAttempts, and factor attributes');
this.serverHost = params.host;
this.serverPort = params.port !== undefined ? params.port : 8998;
this.logger = params.logger !== undefined ? params.logger : console;
this._key = params.key;
this._cert = params.cert;
this._ca = params.ca;
this.getBackoffParams = params.getBackoffParams !== undefined ?
params.getBackoffParams : defaultGetBackoffParams;
this.postBackoffParams = params.postBackoffParams !== undefined ?
params.postBackoffParams : defaultPostBackoffParams;
this.useHttps = (params.useHttps === true);
if (this.useHttps) {
this.transport = https;
this._agent = new https.Agent({
ca: params.ca ? [params.ca] : undefined,
keepAlive: true,
requestCert: true,
});
} else {
this.transport = http;
this._agent = new http.Agent({
keepAlive: true,
});
}
return undefined;
}
/** Return session or batch information by id
* @param {string} batchOrSession - either 'batch' or 'session'
* @param {number} id - id of session to get information on
* @param {funcftion} callback - callback
* @return {undefined}
*/
getSessionOrBatch(batchOrSession, id, callback) {
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
this._request('GET', `${pathPrefix}/${id}`, null, null, callback);
return undefined;
}
/** Returns interactive sessions or batches
* @param {string} batchOrSession - either 'batch' or 'session'
* @param {number} [startIndex] - index to start listing
* @param {number} [numOfSessionOrBatch] - number of sessions to
* return
* @param {function} callback - callback
* @return {undefined}
*/
getSessionsOrBatches(batchOrSession, startIndex, numOfSessionOrBatch,
callback) {
const pathPrefix = this._inputCheck(batchOrSession, null, callback);
const params = {};
if (startIndex) {
assert(Number.isInteger(startIndex),
'startIndex must be an integer');
params.from = startIndex;
}
if (numOfSessionOrBatch) {
assert(Number.isInteger(numOfSessionOrBatch),
`numOf${batchOrSession} must be an integer`);
params.size = numOfSessionOrBatch;
}
this._request('GET', `${pathPrefix}`,
params, null, callback);
return undefined;
}
/** Returns state of batch or session
* @param {string} batchOrSession - either 'batch' or 'session'
* @param {number} id - batch or session id
* @param {function} callback - callback
* @return {undefined}
*/
getSessionOrBatchState(batchOrSession, id, callback) {
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
this._request('GET', `${pathPrefix}/${id}/state`,
null, null, callback);
return undefined;
}
/** Returns log of batch or session
* @param {string} batchOrSession - either 'batch' or 'session'
* @param {number} id - batch or session id
* @param {function} callback - callback
* @return {undefined}
*/
getSessionOrBatchLog(batchOrSession, id, callback) {
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
this._request('GET', `${pathPrefix}/${id}/log`, null, null, callback);
return undefined;
}
/** Returns a specified statement within a session
* @param {number} sessionId - session id
* @param {number} statementId - statement id
* @param {function} callback - callback
* @param {object} [backoff] - backoff instance
* @return {undefined}
*/
getStatement(sessionId, statementId, callback, backoff) {
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
assert(Number.isInteger(statementId), 'statementId must be an integer');
const backoffInstance = backoff || new Backoff(this.getBackoffParams);
if (backoffInstance.attempts >= this.getBackoffParams.maxAttempts) {
const error = new Error('Attempted to get statement from livy ' +
'too many times');
return process.nextTick(() => callback(error));
}
this._request('GET', `/sessions/${sessionId}/statements/${statementId}`,
null, null, (err, res) => {
if (err) {
return callback(err);
}
if (!res) {
const error = new Error('Livy did not send response' +
'to get statement request');
return callback(error);
}
if (res.output && res.output.status === 'error') {
const error = new Error('Error response to statement:' +
` ${res.output.evalue}`);
return callback(error);
}
if (res.output && res.output.data) {
return callback(null, res.output);
}
if (res.state === 'waiting' || res.state === 'running') {
this.logger.info('statement result not ready',
{ statementStatus: res.state });
const retryDelayMs = backoffInstance.duration();
return setTimeout(this.getStatement.bind(this),
retryDelayMs, sessionId, statementId,
callback, backoffInstance);
}
// otherwise, error out (status could be error,
// cancelling or cancelled or possibly other issue)
const error = new Error('Statement status is: ' +
`${res.status}. Cannot obtain result`);
return callback(error);
});
return undefined;
}
/** Returns all statements within a session
* @param {number} sessionId - id of session
* @param {function} callback - callback
* @return {undefined}
*/
getStatements(sessionId, callback) {
assert(typeof callback === 'function', 'callback must be a function');
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
this._request('GET', `/sessions/${sessionId}/statements`, null, null,
callback);
return undefined;
}
/** Creates a new interactive Scala, Python, or R shell in the cluster
* @param {object} options - options for session
* @param {string} options.kind - type of session: spark, pyspark,
* pyspark3 or sparkr. If not specified, defaults to spark.
* For other options, see: https://github.com/apache/
* incubator-livy/blob/master/docs/rest-api.md#post-sessions
* @param {function} callback - callback
* @return {undefined}
*/
postSession(options, callback) {
assert(typeof callback === 'function', 'callback must be a function');
assert(typeof options === 'object', 'options must be an object');
let postBody = options;
if (!options.kind) {
postBody.kind = 'spark';
}
postBody = JSON.stringify(postBody);
this._request('POST', '/sessions',
null, postBody, callback);
return undefined;
}
/** Run a statement in a session
* @param {number} sessionId - options for session
* @param {string} codeToExecute - actual code to be executed by spark
* @param {function} callback - callback
* @param {object} [backoff] - backoff instance
* @return {undefined}
*/
postStatement(sessionId, codeToExecute, callback, backoff) {
assert(typeof callback === 'function', 'callback must be a function');
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
assert(typeof codeToExecute === 'string', 'codeToExecute must be ' +
'a string');
const backoffInstance = backoff || new Backoff(this.postBackoffParams);
if (backoffInstance.attempts >= this.postBackoffParams.maxAttempts) {
const error = new Error('Attempted to post statement to livy ' +
'too many times');
return process.nextTick(() => callback(error));
}
// Need to check status of session
return this.getSessionOrBatchState('session', sessionId, (err, res) => {
if (err) {
return callback(err);
}
if (res.state === 'starting' || res.state === 'busy') {
this.logger.info('session not ready',
{ sessionState: res.state });
const retryDelayMs = backoffInstance.duration();
return setTimeout(this.postStatement.bind(this),
retryDelayMs, sessionId, codeToExecute,
callback, backoffInstance);
}
if (res.state !== 'idle') {
const error = new Error('Session is in state: ' +
`${res.state}. Cannot accept statement`);
return callback(error);
}
const postBody = JSON.stringify({ code: codeToExecute });
this._request('POST', `/sessions/${sessionId}/statements`,
null, postBody, callback);
return undefined;
});
}
/** Creates a new batch job
* @param {object} options - options for batch
* @param {string} options.file - path of file containing the
* application to execute
* For other options, see: https://github.com/apache/incubator-livy/
* blob/master/docs/rest-api.md#post-batches
* @param {function} callback - callback
* @return {undefined}
*/
postBatch(options, callback) {
assert(typeof callback === 'function', 'callback must be a function');
assert(typeof options === 'object', 'options must be an object');
assert(typeof options.file === 'string',
'options.file must be a string');
this._request('POST', '/batches',
null, JSON.stringify(options), callback);
return undefined;
}
/** cancels a statement
* @param {number} sessionId - session id
* @param {number} statementId - id of statement to cancel
* @param {function} callback - callback
* @return {undefined}
*/
cancelStatement(sessionId, statementId, callback) {
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
assert(Number.isInteger(statementId), 'statementId must be an integer');
assert(typeof callback === 'function', 'callback must be a function');
this._request('POST',
`/sessions/${sessionId}/statements/${statementId}/cancel`, null, null,
callback);
}
/** Deletes a bath or session
* @param {number} batchOrSession - either 'batch' or 'session'
* @param {number} id - id of batch or session
* @param {function} callback - callback
* @return {undefined}
*/
deleteSessionOrBatch(batchOrSession, id, callback) {
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
this._request('DELETE', `${pathPrefix}/${id}`,
null, null, callback);
return undefined;
}
_inputCheck(batchOrSession, id, callback) {
assert(batchOrSession === 'batch' || batchOrSession === 'session',
'batchOrSession must be string "batch" or "session"');
if (id) {
assert(Number.isInteger(id), 'id must be an integer');
}
if (callback) {
assert(typeof callback === 'function',
'callback must be a function');
}
return batchOrSession === 'batch' ?
'/batches' : '/sessions';
}
_endResponse(res, data, callback) {
const code = res.statusCode;
const parsedData = data ? JSON.parse(data) : null;
if (code <= 201) {
this.logger.info(`request to ${this.serverHost} returned success`,
{ httpCode: code });
return callback(null, parsedData);
}
const error = new Error(res.statusMessage);
this.logger.info(`request to ${this.serverHost} returned error`,
{ statusCode: code, statusMessage: res.statusMessage,
info: data });
return callback(error, parsedData);
}
/**
* @param {string} method - the HTTP method of the request
* @param {string} path - path without query parameters
* @param {object} params - query parameters of the request
* @param {string} dataToSend - data of the request
* @param {function} callback - callback
* @return {undefined}
*/
_request(method, path, params, dataToSend, callback) {
assert(method === 'GET' || method === 'POST' || method === 'DELETE',
'httpMethod must be GET, POST or DELETE');
assert(typeof callback === 'function', 'callback must be a function');
assert(typeof path === 'string', 'path must be a string');
assert(typeof params === 'object', 'pararms must be an object');
this.logger.info('sending request',
{ httpMethod: method, path, params });
let fullPath = path;
const headers = {
'content-length': 0,
};
if (params) {
fullPath += `?${querystring.stringify(params)}`;
}
const options = {
method,
path: fullPath,
headers,
hostname: this.serverHost,
port: this.serverPort,
agent: this.agent,
};
if (this._cert && this._key) {
options.key = this._key;
options.cert = this._cert;
}
const dataResponse = [];
let dataResponseLength = 0;
const req = this.transport.request(options);
req.setNoDelay();
if (dataToSend) {
/*
* Encoding data to binary provides a hot path to write data
* directly to the socket, without node.js trying to encode the data
* over and over again.
*/
const binData = Buffer.from(dataToSend, 'utf8');
req.setHeader('content-type', 'application/octet-stream');
/*
* Using Buffer.bytelength is not required here because data is
* binary encoded, data.length would give us the exact byte length
*/
req.setHeader('content-length', binData.length);
req.write(binData);
}
req.on('response', res => {
res.on('data', data => {
dataResponse.push(data);
dataResponseLength += data.length;
}).on('error', callback).on('end', () => {
this._endResponse(res, Buffer.concat(dataResponse,
dataResponseLength).toString(), callback);
});
}).on('error', error => {
// covers system errors like ECONNREFUSED, ECONNRESET etc.
this.logger.error('error sending request to livy', { error });
return callback(error);
}).end();
}
}
module.exports = LivyClient;

View File

@ -0,0 +1,330 @@
'use strict'; // eslint-disable-line strict
const assert = require('assert');
const LivyClient = require('../../lib/client');
describe('LivyClient tests', function testClient() {
this.timeout(0);
let client;
before('Create the client', () => {
client = new LivyClient({ host: 'localhost' });
});
describe('POST Sessions', function postSession() {
this.sessionId = undefined;
afterEach(done => {
client.deleteSessionOrBatch('session', this.sessionId, done);
});
it('should post a session', done => {
client.postSession({}, (err, res) => {
assert.ifError(err);
this.sessionId = res.id;
assert(res.id !== undefined && res.id !== null);
assert.strictEqual(res.state, 'starting');
assert.strictEqual(res.kind, 'spark');
done();
});
});
});
describe('GET Sessions', () => {
it('should get interactive sessions', done => {
client.getSessionsOrBatches('session', null, null, (err, res) => {
assert.ifError(err);
assert(res.from !== undefined);
assert(res.total !== undefined);
assert(res.sessions !== undefined);
done();
});
});
it('should get interactive sessions starting from index 1', done => {
client.getSessionsOrBatches('session', 1, null, (err, res) => {
assert.ifError(err);
assert.strictEqual(res.from, 1);
assert(res.total !== undefined);
assert(res.sessions !== undefined);
done();
});
});
describe('After sessions have been posted', () => {
beforeEach(function beforeE(done) {
return client.postSession({}, (err, res) => {
if (err) {
return done(err);
}
this.sessionId = res.id;
return done();
});
});
it('should get interactive sessions with total limited to 1',
done => {
client.getSessionsOrBatches('session', null, 1, (err, res) => {
assert.ifError(err);
assert(res.from !== undefined);
assert(res.total !== undefined);
assert(res.sessions !== undefined);
assert.strictEqual(res.sessions.length, 1);
assert(res.sessions[0].state === 'idle' ||
res.sessions[0].state === 'starting');
assert.strictEqual(res.sessions[0].kind, 'spark');
done();
});
});
it('should get session by id', function itF(done) {
client.getSessionOrBatch('session', this.sessionId,
(err, res) => {
assert.ifError(err);
assert.strictEqual(res.id, this.sessionId);
assert(res.appId !== undefined);
assert(res.owner !== undefined);
assert(res.proxyUser !== undefined);
assert(res.state !== undefined);
assert.strictEqual(res.kind, 'spark');
assert(res.appInfo !== undefined);
assert(res.log !== undefined);
done();
});
});
it('should get state of session', function itF(done) {
client.getSessionOrBatchState('session', this.sessionId,
(err, res) => {
assert.ifError(err);
assert.strictEqual(res.id, this.sessionId);
assert(res.state !== undefined &&
typeof res.state === 'string');
done();
});
});
it('should get log of session', function itF(done) {
client.getSessionOrBatchLog('session', this.sessionId,
(err, res) => {
assert.ifError(err);
assert.strictEqual(res.id, this.sessionId);
assert(res.from !== undefined);
assert(res.total !== undefined);
assert(res.log !== undefined);
done();
});
});
});
});
describe('POST and GET Session Statements', () => {
before(function beforeE(done) {
client.postSession({}, (err, res) => {
assert.ifError(err);
this.sessionId = res.id;
done();
});
});
after(function afterE(done) {
client.deleteSessionOrBatch('session', this.sessionId, done);
});
it('should post a session statement with a quick program',
function itF(done) {
const codeToExecute = '2 + 2';
client.postStatement(this.sessionId, codeToExecute,
(err, res) => {
assert.ifError(err);
this.quickStatementId = res.id;
assert.strictEqual(res.id, this.quickStatementId);
assert.strictEqual(res.state, 'waiting');
assert.strictEqual(res.output, null);
assert.strictEqual(res.code, codeToExecute);
done();
});
});
it('should get a session statement with quick execution',
function itF(done) {
client.getStatement(this.sessionId, this.quickStatementId,
(err, res) => {
assert.ifError(err);
assert.strictEqual(res.status, 'ok');
assert.strictEqual(res.execution_count, 0);
assert.deepStrictEqual(res.data,
{ 'text/plain': 'res0: Int = 4' });
done();
});
});
it('should post a session statement with a long program',
function itF(done) {
const codeToExecute = 'for(n <- 1 to 1000000) ' +
'{ Range(2, n-1).filter(primeTester => n % primeTester == 0).' +
'length == 0 }';
client.postStatement(this.sessionId, codeToExecute,
(err, res) => {
this.longStatementId = res.id;
assert.ifError(err);
assert.strictEqual(res.state, 'waiting');
assert.strictEqual(res.output, null);
assert.strictEqual(res.code, codeToExecute);
done();
});
});
it('should max out on retries getting a session statement with long ' +
'execution', function itF(done) {
client.getStatement(this.sessionId, this.longStatementId,
err => {
assert(err);
assert.strictEqual(err.message, 'Attempted to ' +
'get statement from livy too many times');
done();
});
});
it('should cancel in-progress statement', function itF(done) {
const codeToExecute = 'for(n <- 1 to 1000000) ' +
'{ Range(2, n-1).filter(primeTester => n % primeTester == 0).' +
'length == 0 }';
client.postStatement(this.sessionId, codeToExecute,
(err, res) => {
client.cancelStatement(this.sessionId, res.id, (err, res) => {
assert.ifError(err);
assert.strictequal(res.msg, 'canceled');
done();
});
});
});
});
describe('POST, GET, and DELETE batch', function batches() {
it('should post a jar file batch', done => {
// Note for any test env: must add path to livy.conf whitelist
// livy.file.local-dir-whitelist=<insert path here>
client.postBatch({ file: `${__dirname}/../resources/` +
'simplespark_2.11-0.1.jar',
className: 'SimpleApp' },
(err, res) => {
assert.ifError(err);
this.jarBatch = res.id;
assert(res.state === 'running' || res.state === 'starting');
done();
});
});
it('should post a python file batch', done => {
// Note for any test env: must add path to livy.conf whitelist
// livy.file.local-dir-whitelist=<insert path here>
client.postBatch({ file: `${__dirname}/../resources/` +
'SimpleApp.py' },
(err, res) => {
assert.ifError(err);
this.pythonBatch = res.id;
assert(res.state === 'running' || res.state === 'starting');
done();
});
});
it('should get active batches', done => {
client.getSessionsOrBatches('batch', null, null, (err, res) => {
assert.ifError(err);
assert(res.from !== undefined);
assert(res.total !== undefined);
assert(res.sessions !== undefined);
done();
});
});
it('should get a jar batch by id', done => {
client.getSessionOrBatch('batch', this.jarBatch, (err, res) => {
assert.ifError(err);
assert.strictEqual(res.id, this.jarBatch);
assert(res.state !== undefined &&
typeof res.state === 'string');
assert(res.appId !== undefined);
assert(res.appInfo !== undefined);
assert(res.log !== undefined);
done();
});
});
it('should get a python file batch by id', done => {
client.getSessionOrBatch('batch', this.pythonBatch, (err, res) => {
assert.ifError(err);
assert.strictEqual(res.id, this.pythonBatch);
assert(res.state !== undefined &&
typeof res.state === 'string');
assert(res.appId !== undefined);
assert(res.appInfo !== undefined);
assert(res.log !== undefined);
done();
});
});
it('should get a jar batch state', done => {
client.getSessionOrBatchState('batch', this.jarBatch,
(err, res) => {
assert.ifError(err);
assert.strictEqual(res.id, this.jarBatch);
assert(res.state !== undefined && typeof
res.state === 'string');
done();
});
});
it('should get a python file batch state', done => {
client.getSessionOrBatchState('batch', this.jarBatch,
(err, res) => {
assert.ifError(err);
assert.strictEqual(res.id, this.jarBatch);
assert(res.state !== undefined && typeof
res.state === 'string');
done();
});
});
it('should get the log of a jar batch', done => {
client.getSessionOrBatchLog('batch', this.jarBatch, (err, res) => {
assert.ifError(err);
assert.strictEqual(res.id, this.jarBatch);
assert(res.log !== undefined);
assert(res.from !== undefined && Number.isInteger(res.total));
assert(res.total !== undefined && Number.isInteger(res.total));
done();
});
});
it('should get the log of a python file batch', done => {
client.getSessionOrBatchLog('batch', this.pythonBatch,
(err, res) => {
assert.ifError(err);
assert.strictEqual(res.id, this.pythonBatch);
assert(res.log !== undefined);
assert(res.from !== undefined && Number.isInteger(res.total));
assert(res.total !== undefined && Number.isInteger(res.total));
done();
});
});
it('should delete a python file batch', done => {
client.deleteSessionOrBatch('batch', this.pythonBatch,
(err, res) => {
assert.ifError(err);
assert.deepStrictEqual(res, { msg: 'deleted' });
done();
});
});
it('should delete a jar batch', done => {
client.deleteSessionOrBatch('batch', this.jarBatch, (err, res) => {
assert.ifError(err);
assert.deepStrictEqual(res, { msg: 'deleted' });
done();
});
});
});
});

View File

@ -0,0 +1,3 @@
"""SimpleApp.py"""
print "I was submitted to Livy"

View File

@ -54,6 +54,21 @@ class ObjectMD {
} }
} }
/**
* Returns metadata attributes for the current model
*
* @return {object} object with keys of existing attributes
* and value set to true
*/
static getAttributes() {
const sample = new ObjectMD();
const attributes = {};
Object.keys(sample.getValue()).forEach(key => {
attributes[key] = true;
});
return attributes;
}
getSerialized() { getSerialized() {
return JSON.stringify(this.getValue()); return JSON.stringify(this.getValue());
} }

View File

@ -805,6 +805,10 @@ const routesUtils = {
* bucket name against validation rules * bucket name against validation rules
*/ */
isValidBucketName(bucketname, prefixBlacklist) { isValidBucketName(bucketname, prefixBlacklist) {
// allow this one capitalized bucketname so that clueso can use it
if (bucketname === 'METADATA') {
return true;
}
const ipAddressRegex = new RegExp(/^(\d+\.){3}\d+$/); const ipAddressRegex = new RegExp(/^(\d+\.){3}\d+$/);
// eslint-disable-next-line no-useless-escape // eslint-disable-next-line no-useless-escape
const dnsRegex = new RegExp(/^[a-z0-9]+([\.\-]{1}[a-z0-9]+)*$/); const dnsRegex = new RegExp(/^[a-z0-9]+([\.\-]{1}[a-z0-9]+)*$/);

View File

@ -3,7 +3,7 @@
"engines": { "engines": {
"node": "6.9.5" "node": "6.9.5"
}, },
"version": "7.2.0", "version": "7.2.1",
"description": "Common utilities for the S3 project components", "description": "Common utilities for the S3 project components",
"main": "index.js", "main": "index.js",
"repository": { "repository": {
@ -19,6 +19,7 @@
"dependencies": { "dependencies": {
"ajv": "4.10.0", "ajv": "4.10.0",
"async": "~2.1.5", "async": "~2.1.5",
"backo": "^1.1.0",
"debug": "~2.3.3", "debug": "~2.3.3",
"diskusage": "^0.2.2", "diskusage": "^0.2.2",
"ioredis": "2.4.0", "ioredis": "2.4.0",