Compare commits
16 Commits
developmen
...
ft/clueso
Author | SHA1 | Date |
---|---|---|
Lauren Spiegel | 77c8e7de4b | |
Lauren Spiegel | d3592fe841 | |
Lauren Spiegel | e4a1d34675 | |
Lauren Spiegel | 8d2cf59f06 | |
Lauren Spiegel | 7f70c397c1 | |
Lauren Spiegel | b78b3eb012 | |
Lauren Spiegel | 19e35b02cd | |
Lauren Spiegel | 54b4a6508f | |
Lauren Spiegel | 5e106f3c9a | |
Lauren Spiegel | 55d4aa343c | |
Lauren Spiegel | 07615a3f17 | |
Lauren Spiegel | 3ba1462ff4 | |
Lauren Spiegel | 37c910a51e | |
Lauren Spiegel | 3115ce1377 | |
Lauren Spiegel | 0144f34e6e | |
Lauren Spiegel | 6e2fd0ae62 |
1
index.js
1
index.js
|
@ -7,6 +7,7 @@ module.exports = {
|
||||||
stringHash: require('./lib/stringHash'),
|
stringHash: require('./lib/stringHash'),
|
||||||
ipCheck: require('./lib/ipCheck'),
|
ipCheck: require('./lib/ipCheck'),
|
||||||
jsutil: require('./lib/jsutil'),
|
jsutil: require('./lib/jsutil'),
|
||||||
|
LivyClient: require('./lib/livyClient/lib/client.js'),
|
||||||
https: {
|
https: {
|
||||||
ciphers: require('./lib/https/ciphers.js'),
|
ciphers: require('./lib/https/ciphers.js'),
|
||||||
dhparam: require('./lib/https/dh2048.js'),
|
dhparam: require('./lib/https/dh2048.js'),
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
'use strict'; // eslint-disable-line strict
|
'use strict'; // eslint-disable-line strict
|
||||||
|
|
||||||
const crypto = require('crypto');
|
const crypto = require('crypto');
|
||||||
|
const url = require('url');
|
||||||
const errors = require('../errors');
|
const errors = require('../errors');
|
||||||
const queryString = require('querystring');
|
const queryString = require('querystring');
|
||||||
const AuthInfo = require('./AuthInfo');
|
const AuthInfo = require('./AuthInfo');
|
||||||
|
@ -159,6 +160,11 @@ function doAuth(request, log, cb, awsService, requestContexts) {
|
||||||
function generateV4Headers(request, data, accessKey, secretKeyValue,
|
function generateV4Headers(request, data, accessKey, secretKeyValue,
|
||||||
awsService, proxyPath) {
|
awsService, proxyPath) {
|
||||||
Object.assign(request, { headers: {} });
|
Object.assign(request, { headers: {} });
|
||||||
|
// hold the full path to restore the request after creating signature
|
||||||
|
const holdPath = request.path;
|
||||||
|
// pull the path without the query since canonical uri is without query
|
||||||
|
// eslint-disable-next-line no-param-reassign
|
||||||
|
request.path = url.parse(request.path, true).pathname;
|
||||||
const amzDate = convertUTCtoISO8601(Date.now());
|
const amzDate = convertUTCtoISO8601(Date.now());
|
||||||
// get date without time
|
// get date without time
|
||||||
const scopeDate = amzDate.slice(0, amzDate.indexOf('T'));
|
const scopeDate = amzDate.slice(0, amzDate.indexOf('T'));
|
||||||
|
@ -202,6 +208,9 @@ function generateV4Headers(request, data, accessKey, secretKeyValue,
|
||||||
`Signature=${signature}`;
|
`Signature=${signature}`;
|
||||||
request.setHeader('authorization', authorizationHeader);
|
request.setHeader('authorization', authorizationHeader);
|
||||||
Object.assign(request, { headers: {} });
|
Object.assign(request, { headers: {} });
|
||||||
|
// restore path
|
||||||
|
// eslint-disable-next-line no-param-reassign
|
||||||
|
request.path = holdPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
|
|
|
@ -0,0 +1,386 @@
|
||||||
|
'use strict'; // eslint-disable-line strict
|
||||||
|
|
||||||
|
const assert = require('assert');
|
||||||
|
const http = require('http');
|
||||||
|
const https = require('https');
|
||||||
|
const querystring = require('querystring');
|
||||||
|
const Backoff = require('backo');
|
||||||
|
// min and max in ms
|
||||||
|
const GET_BACKOFF_PARAMS = { min: 100, max: 200, maxAttempts: 300,
|
||||||
|
factor: 1.01 };
|
||||||
|
const POST_BACKOFF_PARAMS = { min: 1000, max: 3000, maxAttempts: 20,
|
||||||
|
factor: 1.1 };
|
||||||
|
|
||||||
|
|
||||||
|
class LivyClient {
|
||||||
|
/**
|
||||||
|
* Constructor for REST client to apache livy
|
||||||
|
*
|
||||||
|
* @param {string} host - hostname or IP of the livy server
|
||||||
|
* @param {number} [port=8998] - port of the livy server
|
||||||
|
* @param {object} [logger=console] - logger object
|
||||||
|
* @param {boolean} [useHttps] - whether to use https or not
|
||||||
|
* @param {string} [key] - https private key content
|
||||||
|
* @param {string} [cert] - https public certificate content
|
||||||
|
* @param {string} [ca] - https authority certificate content
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
constructor(host, port = 8998, logger = console, useHttps, key, cert, ca) {
|
||||||
|
assert(typeof host === 'string' && host !== '', 'host is required');
|
||||||
|
assert(Number.isInteger(port), 'port must be an integer');
|
||||||
|
assert(typeof logger === 'object', 'logger must be an object');
|
||||||
|
assert(typeof logger.error === 'function', 'logger must have' +
|
||||||
|
'error method');
|
||||||
|
assert(typeof logger.info === 'function', 'logger must have' +
|
||||||
|
'info method');
|
||||||
|
assert(key === undefined || typeof key === 'string',
|
||||||
|
'key must be a string');
|
||||||
|
assert(cert === undefined || typeof cert === 'string',
|
||||||
|
'cert must be a string');
|
||||||
|
assert(ca === undefined || typeof ca === 'string',
|
||||||
|
'ca must be a string');
|
||||||
|
this.serverHost = host;
|
||||||
|
this.serverPort = port;
|
||||||
|
this.logger = logger;
|
||||||
|
this._key = key;
|
||||||
|
this._cert = cert;
|
||||||
|
this._ca = ca;
|
||||||
|
this.useHttps = (useHttps === true);
|
||||||
|
if (this.useHttps) {
|
||||||
|
this.transport = https;
|
||||||
|
this._agent = new https.Agent({
|
||||||
|
ca: ca ? [ca] : undefined,
|
||||||
|
keepAlive: true,
|
||||||
|
requestCert: true,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.transport = http;
|
||||||
|
this._agent = new http.Agent({
|
||||||
|
keepAlive: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns interactive sessions
|
||||||
|
* @param {number} [startIndex] - index to start listing
|
||||||
|
* @param {number} [numOfSessions] - number of sessions to
|
||||||
|
* return
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
getSessions(startIndex, numOfSessions, callback) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
const params = {};
|
||||||
|
if (startIndex) {
|
||||||
|
assert(Number.isInteger(startIndex),
|
||||||
|
'startIndex must be an integer');
|
||||||
|
params.from = startIndex;
|
||||||
|
}
|
||||||
|
if (numOfSessions) {
|
||||||
|
assert(Number.isInteger(numOfSessions),
|
||||||
|
'numOfSessions must be an integer');
|
||||||
|
params.size = numOfSessions;
|
||||||
|
}
|
||||||
|
this._request('GET', '/sessions',
|
||||||
|
params, null, callback);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns state of batch or session
|
||||||
|
* @param {string} batchOrSession - either 'batch' or 'session'
|
||||||
|
* @param {number} id - batch or session id
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
getSessionOrBatchState(batchOrSession, id, callback) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
assert(batchOrSession === 'batch' || batchOrSession === 'session',
|
||||||
|
'batchOrSession must be string "batch" or "session"');
|
||||||
|
assert(Number.isInteger(id), 'id must be an integer');
|
||||||
|
const pathPrefix = batchOrSession === 'batch' ?
|
||||||
|
'/batches' : '/sessions';
|
||||||
|
this._request('GET', `${pathPrefix}/${id}/state`,
|
||||||
|
null, null, callback);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Creates a new interactive Scala, Python, or R shell in the cluster
|
||||||
|
* @param {object} options - options for session
|
||||||
|
* @param {string} options.kind - type of session: spark, pyspark,
|
||||||
|
* pyspark3 or sparkr. If not specified, defaults to spark.
|
||||||
|
* For other options, see: https://github.com/apache/
|
||||||
|
* incubator-livy/blob/master/docs/rest-api.md#post-sessions
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
postSession(options, callback) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
assert(typeof options === 'object', 'options must be an object');
|
||||||
|
let postBody = options;
|
||||||
|
if (!options.kind) {
|
||||||
|
postBody.kind = 'spark';
|
||||||
|
}
|
||||||
|
postBody = JSON.stringify(postBody);
|
||||||
|
this._request('POST', '/sessions',
|
||||||
|
null, postBody, callback);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Deletes a session
|
||||||
|
* @param {number} sessionId - sessionId to delete
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
deleteSession(sessionId, callback) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||||
|
this._request('DELETE', `/sessions/${sessionId}`,
|
||||||
|
null, null, callback);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Run a statement in a session
|
||||||
|
* @param {number} sessionId - options for session
|
||||||
|
* @param {string} codeToExecute - actual code to be executed by spark
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @param {object} [backoff] - backoff instance
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
postStatement(sessionId, codeToExecute, callback, backoff) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||||
|
assert(typeof codeToExecute === 'string', 'codeToExecute must be ' +
|
||||||
|
'a string');
|
||||||
|
const backoffInstance = backoff || new Backoff(POST_BACKOFF_PARAMS);
|
||||||
|
if (backoffInstance.attempts >= POST_BACKOFF_PARAMS.maxAttempts) {
|
||||||
|
const error = new Error('Attempted to post statement to livy ' +
|
||||||
|
'too many times');
|
||||||
|
return process.nextTick(() => callback(error));
|
||||||
|
}
|
||||||
|
// Need to check status of session
|
||||||
|
return this.getSessionOrBatchState('session', sessionId, (err, res) => {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
if (res.state === 'starting' || res.state === 'busy') {
|
||||||
|
this.logger.info('session not ready',
|
||||||
|
{ sessionState: res.state });
|
||||||
|
const retryDelayMs = backoffInstance.duration();
|
||||||
|
return setTimeout(this.postStatement.bind(this),
|
||||||
|
retryDelayMs, sessionId, codeToExecute,
|
||||||
|
callback, backoffInstance);
|
||||||
|
}
|
||||||
|
if (res.state !== 'idle') {
|
||||||
|
const error = new Error('Session is in state: ' +
|
||||||
|
`${res.state}. Cannot accept statement`);
|
||||||
|
return callback(error);
|
||||||
|
}
|
||||||
|
const postBody = JSON.stringify({ code: codeToExecute });
|
||||||
|
this._request('POST', `/sessions/${sessionId}/statements`,
|
||||||
|
null, postBody, callback);
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns a specified statement within a session
|
||||||
|
* @param {number} sessionId - session id
|
||||||
|
* @param {number} statementId - statement id
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @param {object} [backoff] - backoff instance
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
getStatement(sessionId, statementId, callback, backoff) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||||
|
assert(Number.isInteger(statementId), 'statementId must be an integer');
|
||||||
|
const backoffInstance = backoff || new Backoff(GET_BACKOFF_PARAMS);
|
||||||
|
if (backoffInstance.attempts >= GET_BACKOFF_PARAMS.maxAttempts) {
|
||||||
|
const error = new Error('Attempted to get statement from livy ' +
|
||||||
|
'too many times');
|
||||||
|
return process.nextTick(() => callback(error));
|
||||||
|
}
|
||||||
|
this._request('GET', `/sessions/${sessionId}/statements/${statementId}`,
|
||||||
|
null, null, (err, res) => {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
if (!res) {
|
||||||
|
const error = new Error('Livy did not send response' +
|
||||||
|
'to get statement request');
|
||||||
|
return callback(error);
|
||||||
|
}
|
||||||
|
if (res.output && res.output.status === 'error') {
|
||||||
|
const error = new Error('Error response to statement:' +
|
||||||
|
` ${res.output.evalue}`);
|
||||||
|
return callback(error);
|
||||||
|
}
|
||||||
|
if (res.output && res.output.data) {
|
||||||
|
return callback(null, res.output);
|
||||||
|
}
|
||||||
|
if (res.state === 'waiting' || res.state === 'running') {
|
||||||
|
this.logger.info('statement result not ready',
|
||||||
|
{ statementStatus: res.state });
|
||||||
|
const retryDelayMs = backoffInstance.duration();
|
||||||
|
return setTimeout(this.getStatement.bind(this),
|
||||||
|
retryDelayMs, sessionId, statementId,
|
||||||
|
callback, backoffInstance);
|
||||||
|
}
|
||||||
|
// otherwise, error out (status could be error,
|
||||||
|
// cancelling or cancelled or possibly other issue)
|
||||||
|
const error = new Error('Statement status is: ' +
|
||||||
|
`${res.status}. Cannot obtain result`);
|
||||||
|
return callback(error);
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns all active batch sessions
|
||||||
|
* @param {number} [startIndex] - index to start listing
|
||||||
|
* @param {number} [numOfBatches] - number of batches to
|
||||||
|
* return
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
getBatches(startIndex, numOfBatches, callback) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
const params = {};
|
||||||
|
if (startIndex) {
|
||||||
|
assert(Number.isInteger(startIndex),
|
||||||
|
'startIndex must be an integer');
|
||||||
|
params.from = startIndex;
|
||||||
|
}
|
||||||
|
if (numOfBatches) {
|
||||||
|
assert(Number.isInteger(numOfBatches),
|
||||||
|
'numOfBatches must be an integer');
|
||||||
|
params.size = numOfBatches;
|
||||||
|
}
|
||||||
|
this._request('GET', '/batches',
|
||||||
|
params, null, callback);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Creates a new batch job
|
||||||
|
* @param {object} options - options for batch
|
||||||
|
* @param {string} options.file - path of file containing the
|
||||||
|
* application to execute
|
||||||
|
* For other options, see: https://github.com/apache/incubator-livy/
|
||||||
|
* blob/master/docs/rest-api.md#post-batches
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
postBatch(options, callback) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
assert(typeof options === 'object', 'options must be an object');
|
||||||
|
assert(typeof options.file === 'string',
|
||||||
|
'options.file must be a string');
|
||||||
|
this._request('POST', '/batches',
|
||||||
|
null, JSON.stringify(options), callback);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Deletes a batch job
|
||||||
|
* @param {number} batchId - batchId to delete
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
deleteBatch(batchId, callback) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
assert(Number.isInteger(batchId), 'batchId must be an integer');
|
||||||
|
this._request('DELETE', `/batches/${batchId}`,
|
||||||
|
null, null, callback);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
_endResponse(res, data, callback) {
|
||||||
|
const code = res.statusCode;
|
||||||
|
const parsedData = data ? JSON.parse(data) : null;
|
||||||
|
if (code <= 201) {
|
||||||
|
this.logger.info(`request to ${this.serverHost} returned success`,
|
||||||
|
{ httpCode: code });
|
||||||
|
return callback(null, parsedData);
|
||||||
|
}
|
||||||
|
const error = new Error(res.statusMessage);
|
||||||
|
this.logger.info(`request to ${this.serverHost} returned error`,
|
||||||
|
{ statusCode: code, statusMessage: res.statusMessage,
|
||||||
|
info: data });
|
||||||
|
return callback(error, parsedData);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {string} method - the HTTP method of the request
|
||||||
|
* @param {string} path - path without query parameters
|
||||||
|
* @param {object} params - query parameters of the request
|
||||||
|
* @param {string} dataToSend - data of the request
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
_request(method, path, params, dataToSend, callback) {
|
||||||
|
assert(method === 'GET' || method === 'POST' || method === 'DELETE',
|
||||||
|
'httpMethod must be GET, POST or DELETE');
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
assert(typeof path === 'string', 'path must be a string');
|
||||||
|
assert(typeof params === 'object', 'pararms must be an object');
|
||||||
|
this.logger.info('sending request',
|
||||||
|
{ httpMethod: method, path, params });
|
||||||
|
let fullPath = path;
|
||||||
|
const headers = {
|
||||||
|
'content-length': 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (params) {
|
||||||
|
fullPath += `?${querystring.stringify(params)}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
const options = {
|
||||||
|
method,
|
||||||
|
path: fullPath,
|
||||||
|
headers,
|
||||||
|
hostname: this.serverHost,
|
||||||
|
port: this.serverPort,
|
||||||
|
agent: this.agent,
|
||||||
|
};
|
||||||
|
if (this._cert && this._key) {
|
||||||
|
options.key = this._key;
|
||||||
|
options.cert = this._cert;
|
||||||
|
}
|
||||||
|
const dataResponse = [];
|
||||||
|
let dataResponseLength = 0;
|
||||||
|
|
||||||
|
const req = this.transport.request(options);
|
||||||
|
req.setNoDelay();
|
||||||
|
|
||||||
|
if (dataToSend) {
|
||||||
|
/*
|
||||||
|
* Encoding data to binary provides a hot path to write data
|
||||||
|
* directly to the socket, without node.js trying to encode the data
|
||||||
|
* over and over again.
|
||||||
|
*/
|
||||||
|
const binData = Buffer.from(dataToSend, 'utf8');
|
||||||
|
req.setHeader('content-type', 'application/octet-stream');
|
||||||
|
/*
|
||||||
|
* Using Buffer.bytelength is not required here because data is
|
||||||
|
* binary encoded, data.length would give us the exact byte length
|
||||||
|
*/
|
||||||
|
req.setHeader('content-length', binData.length);
|
||||||
|
req.write(binData);
|
||||||
|
}
|
||||||
|
|
||||||
|
req.on('response', res => {
|
||||||
|
res.on('data', data => {
|
||||||
|
dataResponse.push(data);
|
||||||
|
dataResponseLength += data.length;
|
||||||
|
}).on('error', callback).on('end', () => {
|
||||||
|
this._endResponse(res, Buffer.concat(dataResponse,
|
||||||
|
dataResponseLength).toString(), callback);
|
||||||
|
});
|
||||||
|
}).on('error', error => {
|
||||||
|
// covers system errors like ECONNREFUSED, ECONNRESET etc.
|
||||||
|
this.logger.error('error sending request to livy', { error });
|
||||||
|
return callback(error);
|
||||||
|
}).end();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = LivyClient;
|
|
@ -0,0 +1,222 @@
|
||||||
|
'use strict'; // eslint-disable-line strict
|
||||||
|
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
|
const LivyClient = require('../../lib/client');
|
||||||
|
|
||||||
|
describe('LivyClient tests', function testClient() {
|
||||||
|
this.timeout(0);
|
||||||
|
let client;
|
||||||
|
// even if a session is deleted, livy keeps incrementing for new
|
||||||
|
// sesssions so keep track through the tests
|
||||||
|
// TODO; refactor this so using this.sessionId in each test
|
||||||
|
// rather than this global counter
|
||||||
|
let sessionId = 0;
|
||||||
|
|
||||||
|
before('Create the client', () => {
|
||||||
|
client = new LivyClient('localhost');
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('POST Sessions', () => {
|
||||||
|
afterEach(done => {
|
||||||
|
client.deleteSession(sessionId - 1, done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should post a session', done => {
|
||||||
|
client.postSession({}, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
const expectedResponse = {
|
||||||
|
id: 0, appId: null, owner: null,
|
||||||
|
proxyUser: null,
|
||||||
|
state: 'starting',
|
||||||
|
kind: 'spark',
|
||||||
|
appInfo: { driverLogUrl: null, sparkUiUrl: null },
|
||||||
|
log: [] };
|
||||||
|
assert.deepStrictEqual(res, expectedResponse);
|
||||||
|
sessionId++;
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('GET Sessions', () => {
|
||||||
|
it('should get interactive sessions', done => {
|
||||||
|
client.getSessions(null, null, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepStrictEqual(res, { from: 0, total: 0,
|
||||||
|
sessions: [] });
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get interactive sessions starting from index 1', done => {
|
||||||
|
client.getSessions(1, null, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepStrictEqual(res, { from: 1, total: 0,
|
||||||
|
sessions: [] });
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('After sessions have been posted', () => {
|
||||||
|
before(done => {
|
||||||
|
client.postSession({}, err => {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
sessionId++;
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
after(done => client.deleteSession(sessionId - 1, done));
|
||||||
|
|
||||||
|
it('should get interactive sessions with total limited to 1',
|
||||||
|
done => {
|
||||||
|
client.getSessions(null, 1, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
const expectedResponse = {
|
||||||
|
from: 0,
|
||||||
|
total: 1,
|
||||||
|
sessions: [{
|
||||||
|
id: 1,
|
||||||
|
appId: null,
|
||||||
|
owner: null,
|
||||||
|
proxyUser: null,
|
||||||
|
state: 'starting',
|
||||||
|
kind: 'spark',
|
||||||
|
appInfo: {
|
||||||
|
driverLogUrl: null,
|
||||||
|
sparkUiUrl: null,
|
||||||
|
},
|
||||||
|
log: [],
|
||||||
|
}] };
|
||||||
|
assert.deepStrictEqual(res, expectedResponse);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('POST and GET Session Statements', function statements() {
|
||||||
|
before(done => {
|
||||||
|
client.postSession({}, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
this.sessionId = res.id;
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
after(done => {
|
||||||
|
client.deleteSession(this.sessionId, done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should post a session statement with a quick program', done => {
|
||||||
|
const codeToExecute = '2 + 2';
|
||||||
|
client.postStatement(this.sessionId, codeToExecute,
|
||||||
|
(err, res) => {
|
||||||
|
this.quickStatementId = res.id;
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepStrictEqual(res, { id: this.quickStatementId,
|
||||||
|
state: 'waiting',
|
||||||
|
output: null });
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get a session statement with quick execution', done => {
|
||||||
|
client.getStatement(this.sessionId, this.quickStatementId,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepStrictEqual(res,
|
||||||
|
{ status: 'ok',
|
||||||
|
// eslint-disable-next-line
|
||||||
|
execution_count: 0,
|
||||||
|
data: { 'text/plain': 'res0: Int = 4' } });
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should post a session statement with a long program', done => {
|
||||||
|
const codeToExecute = 'for(n <- 1 to 100000) ' +
|
||||||
|
'{ Range(2, n-1).filter(primeTester => n % primeTester == 0).' +
|
||||||
|
'length == 0 }';
|
||||||
|
client.postStatement(this.sessionId, codeToExecute,
|
||||||
|
(err, res) => {
|
||||||
|
this.longStatementId = res.id;
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepStrictEqual(res, { id: this.longStatementId,
|
||||||
|
state: 'waiting',
|
||||||
|
output: null });
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should max out on retries getting a session statement with long ' +
|
||||||
|
'execution', done => {
|
||||||
|
client.getStatement(this.sessionId, this.longStatementId,
|
||||||
|
err => {
|
||||||
|
assert(err);
|
||||||
|
assert.strictEqual(err.message, 'Attempted to ' +
|
||||||
|
'get statement from livy too many times');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('GET batches', () => {
|
||||||
|
it('should get active batches', done => {
|
||||||
|
client.getBatches(null, null, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepStrictEqual(res, { from: 0, total: 0,
|
||||||
|
sessions: [] });
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('POST batch', function batches() {
|
||||||
|
it('should post a jar file batch', done => {
|
||||||
|
// Note for any test env: must add path to livy.conf whitelist
|
||||||
|
// livy.file.local-dir-whitelist=<insert path here>
|
||||||
|
client.postBatch({ file: `${__dirname}/../resources/` +
|
||||||
|
'simplespark_2.11-0.1.jar',
|
||||||
|
className: 'SimpleApp' },
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
this.jarBatch = res.id;
|
||||||
|
assert(res.state === 'running' || res.state === 'starting');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should delete a jar batch', done => {
|
||||||
|
client.deleteBatch(this.jarBatch, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepStrictEqual(res, { msg: 'deleted' });
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should post a python file batch', done => {
|
||||||
|
// Note for any test env: must add path to livy.conf whitelist
|
||||||
|
// livy.file.local-dir-whitelist=<insert path here>
|
||||||
|
client.postBatch({ file: `${__dirname}/../resources/` +
|
||||||
|
'SimpleApp.py' },
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
this.pythonBatch = res.id;
|
||||||
|
assert(res.state === 'running' || res.state === 'starting');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should delete a jar batch', done => {
|
||||||
|
client.deleteBatch(this.pythonBatch, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepStrictEqual(res, { msg: 'deleted' });
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,3 @@
|
||||||
|
"""SimpleApp.py"""
|
||||||
|
|
||||||
|
print "I was submitted to Livy"
|
Binary file not shown.
|
@ -54,6 +54,21 @@ class ObjectMD {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns metadata attributes for the current model
|
||||||
|
*
|
||||||
|
* @return {object} object with keys of existing attributes
|
||||||
|
* and value set to true
|
||||||
|
*/
|
||||||
|
static getAttributes() {
|
||||||
|
const sample = new ObjectMD();
|
||||||
|
const attributes = {};
|
||||||
|
Object.keys(sample.getValue()).forEach(key => {
|
||||||
|
attributes[key] = true;
|
||||||
|
});
|
||||||
|
return attributes;
|
||||||
|
}
|
||||||
|
|
||||||
getSerialized() {
|
getSerialized() {
|
||||||
return JSON.stringify(this.getValue());
|
return JSON.stringify(this.getValue());
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,8 @@ const routeOPTIONS = require('./routes/routeOPTIONS');
|
||||||
const routesUtils = require('./routesUtils');
|
const routesUtils = require('./routesUtils');
|
||||||
const routeWebsite = require('./routes/routeWebsite');
|
const routeWebsite = require('./routes/routeWebsite');
|
||||||
|
|
||||||
|
const ipCheck = require('../ipCheck');
|
||||||
|
|
||||||
const routeMap = {
|
const routeMap = {
|
||||||
GET: routeGET,
|
GET: routeGET,
|
||||||
PUT: routePUT,
|
PUT: routePUT,
|
||||||
|
@ -111,6 +113,12 @@ function checkTypes(req, res, params, logger) {
|
||||||
});
|
});
|
||||||
assert.strictEqual(typeof params.dataRetrievalFn, 'function',
|
assert.strictEqual(typeof params.dataRetrievalFn, 'function',
|
||||||
'bad routes param: dataRetrievalFn must be a defined function');
|
'bad routes param: dataRetrievalFn must be a defined function');
|
||||||
|
assert(Array.isArray(params.whiteListedIps),
|
||||||
|
'bad routes param: whiteListedIps must be an array');
|
||||||
|
params.whiteListedIps.forEach(ip => {
|
||||||
|
assert.strictEqual(typeof ip, 'string',
|
||||||
|
'bad routes param: each item in whiteListedIps must be a string');
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/** routes - route request to appropriate method
|
/** routes - route request to appropriate method
|
||||||
|
@ -130,6 +138,8 @@ function checkTypes(req, res, params, logger) {
|
||||||
* @param {object} params.unsupportedQueries - object containing true/false
|
* @param {object} params.unsupportedQueries - object containing true/false
|
||||||
* values for whether queries are supported
|
* values for whether queries are supported
|
||||||
* @param {function} params.dataRetrievalFn - function to retrieve data
|
* @param {function} params.dataRetrievalFn - function to retrieve data
|
||||||
|
* @param {string []} params.whiteListedIps - list of ip addresses
|
||||||
|
* that have special rights with respect to the METADATA bucket
|
||||||
* @param {RequestLogger} logger - werelogs logger instance
|
* @param {RequestLogger} logger - werelogs logger instance
|
||||||
* @returns {undefined}
|
* @returns {undefined}
|
||||||
*/
|
*/
|
||||||
|
@ -223,6 +233,19 @@ function routes(req, res, params, logger) {
|
||||||
return routesUtils.responseXMLBody(bucketOrKeyError, null, res, log);
|
return routesUtils.responseXMLBody(bucketOrKeyError, null, res, log);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Need better solution to identify request from backbeat/clueso
|
||||||
|
if (req.bucketName === 'METADATA' &&
|
||||||
|
(req.method === 'PUT' ||
|
||||||
|
req.method === 'POST' ||
|
||||||
|
req.method === 'DELETE')) {
|
||||||
|
// TODO: handle based on admin roles!!!!
|
||||||
|
if (false) {
|
||||||
|
return routesUtils.responseXMLBody(
|
||||||
|
errors.AccessDenied.customizeDescription('The bucket METADATA is ' +
|
||||||
|
'used for internal purposes'), null, res, log);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// bucket website request
|
// bucket website request
|
||||||
if (websiteEndpoints && websiteEndpoints.indexOf(req.parsedHost) > -1) {
|
if (websiteEndpoints && websiteEndpoints.indexOf(req.parsedHost) > -1) {
|
||||||
return routeWebsite(req, res, api, log, statsClient, dataRetrievalFn);
|
return routeWebsite(req, res, api, log, statsClient, dataRetrievalFn);
|
||||||
|
|
|
@ -805,6 +805,10 @@ const routesUtils = {
|
||||||
* bucket name against validation rules
|
* bucket name against validation rules
|
||||||
*/
|
*/
|
||||||
isValidBucketName(bucketname, prefixBlacklist) {
|
isValidBucketName(bucketname, prefixBlacklist) {
|
||||||
|
// allow this one capitalized bucketname so that clueso can use it
|
||||||
|
if (bucketname === 'METADATA') {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
const ipAddressRegex = new RegExp(/^(\d+\.){3}\d+$/);
|
const ipAddressRegex = new RegExp(/^(\d+\.){3}\d+$/);
|
||||||
// eslint-disable-next-line no-useless-escape
|
// eslint-disable-next-line no-useless-escape
|
||||||
const dnsRegex = new RegExp(/^[a-z0-9]+([\.\-]{1}[a-z0-9]+)*$/);
|
const dnsRegex = new RegExp(/^[a-z0-9]+([\.\-]{1}[a-z0-9]+)*$/);
|
||||||
|
|
|
@ -3,7 +3,13 @@
|
||||||
const fs = require('fs');
|
const fs = require('fs');
|
||||||
const crypto = require('crypto');
|
const crypto = require('crypto');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const diskusage = require('diskusage');
|
let diskusage;
|
||||||
|
try {
|
||||||
|
diskusage = require('diskusage');
|
||||||
|
} catch (err) {
|
||||||
|
process.stdout.write('unable to install diskusage module',
|
||||||
|
{ error: err.message, stack: err.stack });
|
||||||
|
}
|
||||||
const werelogs = require('werelogs');
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
const errors = require('../../../errors');
|
const errors = require('../../../errors');
|
||||||
|
@ -289,7 +295,10 @@ class DataFileStore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
getDiskUsage(callback) {
|
getDiskUsage(callback) {
|
||||||
diskusage.check(this.dataPath, callback);
|
if (diskusage) {
|
||||||
|
return diskusage.check(this.dataPath, callback);
|
||||||
|
}
|
||||||
|
return callback();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,13 @@ const uuid = require('uuid');
|
||||||
const level = require('level');
|
const level = require('level');
|
||||||
const sublevel = require('level-sublevel');
|
const sublevel = require('level-sublevel');
|
||||||
const debug = require('debug')('MetadataFileServer');
|
const debug = require('debug')('MetadataFileServer');
|
||||||
const diskusage = require('diskusage');
|
let diskusage;
|
||||||
|
try {
|
||||||
|
diskusage = require('diskusage');
|
||||||
|
} catch (err) {
|
||||||
|
process.stdout.write('unable to install diskusage module',
|
||||||
|
{ error: err.message, stack: err.stack });
|
||||||
|
}
|
||||||
const werelogs = require('werelogs');
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
const constants = require('../../../constants');
|
const constants = require('../../../constants');
|
||||||
|
@ -247,7 +253,12 @@ class MetadataFileServer {
|
||||||
vrp.get({ db: dbName, key, options },
|
vrp.get({ db: dbName, key, options },
|
||||||
env.requestLogger, cb);
|
env.requestLogger, cb);
|
||||||
},
|
},
|
||||||
getDiskUsage: (env, cb) => diskusage.check(this.path, cb),
|
getDiskUsage: (env, cb) => {
|
||||||
|
if (diskusage) {
|
||||||
|
return diskusage.check(this.path, cb);
|
||||||
|
}
|
||||||
|
return cb();
|
||||||
|
},
|
||||||
});
|
});
|
||||||
dbService.registerSyncAPI({
|
dbService.registerSyncAPI({
|
||||||
createReadStream:
|
createReadStream:
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
"engines": {
|
"engines": {
|
||||||
"node": "6.9.5"
|
"node": "6.9.5"
|
||||||
},
|
},
|
||||||
"version": "7.2.0",
|
"version": "7.2.1",
|
||||||
"description": "Common utilities for the S3 project components",
|
"description": "Common utilities for the S3 project components",
|
||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
"repository": {
|
"repository": {
|
||||||
|
@ -19,6 +19,7 @@
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"ajv": "4.10.0",
|
"ajv": "4.10.0",
|
||||||
"async": "~2.1.5",
|
"async": "~2.1.5",
|
||||||
|
"backo": "^1.1.0",
|
||||||
"debug": "~2.3.3",
|
"debug": "~2.3.3",
|
||||||
"diskusage": "^0.2.2",
|
"diskusage": "^0.2.2",
|
||||||
"ioredis": "2.4.0",
|
"ioredis": "2.4.0",
|
||||||
|
|
Loading…
Reference in New Issue