Compare commits
21 Commits
developmen
...
ft/orbitpl
Author | SHA1 | Date |
---|---|---|
Lauren Spiegel | 20a7044d8c | |
JianqinWang | 8d267feae7 | |
Lauren Spiegel | 22e9b8fc83 | |
Lauren Spiegel | c807c54406 | |
Lauren Spiegel | 1367ceea86 | |
Lauren Spiegel | 77c8e7de4b | |
Lauren Spiegel | d3592fe841 | |
Lauren Spiegel | e4a1d34675 | |
Lauren Spiegel | 8d2cf59f06 | |
Lauren Spiegel | 7f70c397c1 | |
Lauren Spiegel | b78b3eb012 | |
Lauren Spiegel | 19e35b02cd | |
Lauren Spiegel | 54b4a6508f | |
Lauren Spiegel | 5e106f3c9a | |
Lauren Spiegel | 55d4aa343c | |
Lauren Spiegel | 07615a3f17 | |
Lauren Spiegel | 3ba1462ff4 | |
Lauren Spiegel | 37c910a51e | |
Lauren Spiegel | 3115ce1377 | |
Lauren Spiegel | 0144f34e6e | |
Lauren Spiegel | 6e2fd0ae62 |
|
@ -3,3 +3,11 @@
|
|||
|
||||
# Dependency directory
|
||||
node_modules/
|
||||
*/node_modules/
|
||||
|
||||
# Build executables
|
||||
*-win.exe
|
||||
*-linux
|
||||
*-macos
|
||||
|
||||
|
||||
|
|
1
index.js
1
index.js
|
@ -7,6 +7,7 @@ module.exports = {
|
|||
stringHash: require('./lib/stringHash'),
|
||||
ipCheck: require('./lib/ipCheck'),
|
||||
jsutil: require('./lib/jsutil'),
|
||||
LivyClient: require('./lib/livyClient/lib/client.js'),
|
||||
https: {
|
||||
ciphers: require('./lib/https/ciphers.js'),
|
||||
dhparam: require('./lib/https/dh2048.js'),
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
'use strict'; // eslint-disable-line strict
|
||||
|
||||
const crypto = require('crypto');
|
||||
const url = require('url');
|
||||
const errors = require('../errors');
|
||||
const queryString = require('querystring');
|
||||
const AuthInfo = require('./AuthInfo');
|
||||
|
@ -159,6 +160,11 @@ function doAuth(request, log, cb, awsService, requestContexts) {
|
|||
function generateV4Headers(request, data, accessKey, secretKeyValue,
|
||||
awsService, proxyPath) {
|
||||
Object.assign(request, { headers: {} });
|
||||
// hold the full path to restore the request after creating signature
|
||||
const holdPath = request.path;
|
||||
// pull the path without the query since canonical uri is without query
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
request.path = url.parse(request.path, true).pathname;
|
||||
const amzDate = convertUTCtoISO8601(Date.now());
|
||||
// get date without time
|
||||
const scopeDate = amzDate.slice(0, amzDate.indexOf('T'));
|
||||
|
@ -202,6 +208,9 @@ function generateV4Headers(request, data, accessKey, secretKeyValue,
|
|||
`Signature=${signature}`;
|
||||
request.setHeader('authorization', authorizationHeader);
|
||||
Object.assign(request, { headers: {} });
|
||||
// restore path
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
request.path = holdPath;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
# Get Pensieve Credentials Executable
|
||||
|
||||
## To make executable file from getPensieveCreds.js
|
||||
|
||||
`npm install -g pkg`
|
||||
`pkg getPensieveCreds.js`
|
||||
|
||||
This will build a mac, linux and windows file.
|
||||
If you just want linux, for example:
|
||||
`pkg getPensieveCreds.js --targets node6-linux-x64`
|
||||
|
||||
For further options, see https://github.com/zeit/pkg
|
||||
|
||||
## To run the executable file
|
||||
|
||||
Call the output executable file with an
|
||||
argument that names the service you
|
||||
are trying to get credentials for:
|
||||
|
||||
`./getPensieveCreds-linux serviceName`
|
|
@ -0,0 +1,76 @@
|
|||
const async = require('async');
|
||||
const forge = require('node-forge');
|
||||
const MetadataFileClient =
|
||||
require('../../storage/metadata/file/MetadataFileClient');
|
||||
const mdClient = new MetadataFileClient({
|
||||
host: 's3-metadata',
|
||||
port: '9993',
|
||||
});
|
||||
|
||||
const serviceName = process.argv[2];
|
||||
const tokenKey = 'auth/zenko/remote-management-token';
|
||||
|
||||
// XXX copy-pasted from Backbeat
|
||||
function decryptSecret(instanceCredentials, secret) {
|
||||
// XXX don't forget to use u.encryptionKeyVersion if present
|
||||
const privateKey = forge.pki.privateKeyFromPem(
|
||||
instanceCredentials.privateKey);
|
||||
const encryptedSecretKey = forge.util.decode64(secret);
|
||||
return privateKey.decrypt(encryptedSecretKey, 'RSA-OAEP', {
|
||||
md: forge.md.sha256.create(),
|
||||
});
|
||||
}
|
||||
|
||||
function loadOverlayVersion(db, version, cb) {
|
||||
db.get(`configuration/overlay/${version}`, {}, (err, val) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, JSON.parse(val));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
function parseServiceCredentials(conf, auth) {
|
||||
const instanceAuth = JSON.parse(auth);
|
||||
const serviceAccount = (conf.users || []).find(
|
||||
u => u.accountType === `service-${serviceName}`);
|
||||
if (!serviceAccount) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return {
|
||||
accessKey: serviceAccount.accessKey,
|
||||
secretKey: decryptSecret(instanceAuth, serviceAccount.secretKey),
|
||||
};
|
||||
}
|
||||
|
||||
const mdDb = mdClient.openDB(error => {
|
||||
if (error) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
const db = mdDb.openSub('PENSIEVE');
|
||||
return async.waterfall([
|
||||
cb => db.get('configuration/overlay-version', {}, cb),
|
||||
(version, cb) => loadOverlayVersion(db, version, cb),
|
||||
(conf, cb) => db.get(tokenKey, {}, (err, instanceAuth) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
const creds = parseServiceCredentials(conf, instanceAuth);
|
||||
return cb(null, creds);
|
||||
}),
|
||||
], (err, creds) => {
|
||||
db.disconnect();
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
if (!creds) {
|
||||
throw new Error('No credentials found');
|
||||
}
|
||||
process.stdout.write(`export AWS_ACCESS_KEY_ID="${creds.accessKey}"\n`);
|
||||
process.stdout
|
||||
.write(`export AWS_SECRET_ACCESS_KEY="${creds.secretKey}"`);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
"name": "pensievecreds",
|
||||
"version": "1.0.0",
|
||||
"description": "Executable tool for Pensieve",
|
||||
"main": "getPensieveCreds.js",
|
||||
"scripts": {
|
||||
"test": "echo \"Error: no test specified\" && exit 1"
|
||||
},
|
||||
"dependencies": {
|
||||
"async": "^2.6.0",
|
||||
"node-forge": "^0.7.1"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,432 @@
|
|||
'use strict'; // eslint-disable-line strict
|
||||
|
||||
const assert = require('assert');
|
||||
const http = require('http');
|
||||
const https = require('https');
|
||||
const querystring = require('querystring');
|
||||
const Backoff = require('backo');
|
||||
// min and max in ms
|
||||
const defaultGetBackoffParams = { min: 100, max: 200, maxAttempts: 300,
|
||||
factor: 1.01 };
|
||||
const defaultPostBackoffParams = { min: 1000, max: 3000, maxAttempts: 20,
|
||||
factor: 1.1 };
|
||||
|
||||
class LivyClient {
|
||||
/**
|
||||
* Constructor for REST client to apache livy
|
||||
*
|
||||
* @param {object} params - object for all parameters to initialize Livy
|
||||
* @return {undefined}
|
||||
*/
|
||||
constructor(params) {
|
||||
assert(typeof params.host === 'string' && params.host !== '',
|
||||
'host is required');
|
||||
assert(params.port === undefined || Number.isInteger(params.port),
|
||||
'port must be an integer');
|
||||
assert(params.logger === undefined || typeof params.logger === 'object',
|
||||
'logger must be an object');
|
||||
assert(params.logger === undefined ||
|
||||
typeof params.logger.error === 'function',
|
||||
'logger must have error method');
|
||||
assert(params.logger === undefined ||
|
||||
typeof params.logger.info === 'function',
|
||||
'logger must have info method');
|
||||
assert(params.key === undefined || typeof params.key === 'string',
|
||||
'key must be a string');
|
||||
assert(params.cert === undefined || typeof params.cert === 'string',
|
||||
'cert must be a string');
|
||||
assert(params.ca === undefined || typeof params.ca === 'string',
|
||||
'ca must be a string');
|
||||
assert(params.getBackoffParams === undefined ||
|
||||
typeof params.getBackoffParams === 'object',
|
||||
'getBackoffParams must be an object');
|
||||
assert(params.postBackoffParams === undefined ||
|
||||
typeof params.postBackoffParams === 'object',
|
||||
'postBackoffParams must be an object');
|
||||
assert(params.getBackoffParams === undefined ||
|
||||
(Number.isInteger(params.getBackoffParams.min) &&
|
||||
Number.isInteger(params.getBackoffParams.max) &&
|
||||
Number.isInteger(params.getBackoffParams.maxAttempts) &&
|
||||
!isNaN(params.getBackoffParams.factor)),
|
||||
'getBackoffParams should have valid numerical values for ' +
|
||||
'the min, max, maxAttempts, and factor attributes');
|
||||
assert(params.postBackoffParams === undefined ||
|
||||
(Number.isInteger(params.postBackoffParams.min) &&
|
||||
Number.isInteger(params.postBackoffParams.max) &&
|
||||
Number.isInteger(params.postBackoffParams.maxAttempts) &&
|
||||
!isNaN(params.postBackoffParams.factor)),
|
||||
'postBackoffParams should have valid numerical values for ' +
|
||||
'the min, max, maxAttempts, and factor attributes');
|
||||
this.serverHost = params.host;
|
||||
this.serverPort = params.port !== undefined ? params.port : 8998;
|
||||
this.logger = params.logger !== undefined ? params.logger : console;
|
||||
this._key = params.key;
|
||||
this._cert = params.cert;
|
||||
this._ca = params.ca;
|
||||
this.getBackoffParams = params.getBackoffParams !== undefined ?
|
||||
params.getBackoffParams : defaultGetBackoffParams;
|
||||
this.postBackoffParams = params.postBackoffParams !== undefined ?
|
||||
params.postBackoffParams : defaultPostBackoffParams;
|
||||
this.useHttps = (params.useHttps === true);
|
||||
if (this.useHttps) {
|
||||
this.transport = https;
|
||||
this._agent = new https.Agent({
|
||||
ca: params.ca ? [params.ca] : undefined,
|
||||
keepAlive: true,
|
||||
requestCert: true,
|
||||
});
|
||||
} else {
|
||||
this.transport = http;
|
||||
this._agent = new http.Agent({
|
||||
keepAlive: true,
|
||||
});
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** Return session or batch information by id
|
||||
* @param {string} batchOrSession - either 'batch' or 'session'
|
||||
* @param {number} id - id of session to get information on
|
||||
* @param {funcftion} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
getSessionOrBatch(batchOrSession, id, callback) {
|
||||
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
|
||||
this._request('GET', `${pathPrefix}/${id}`, null, null, callback);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** Returns interactive sessions or batches
|
||||
* @param {string} batchOrSession - either 'batch' or 'session'
|
||||
* @param {number} [startIndex] - index to start listing
|
||||
* @param {number} [numOfSessionOrBatch] - number of sessions to
|
||||
* return
|
||||
* @param {function} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
getSessionsOrBatches(batchOrSession, startIndex, numOfSessionOrBatch,
|
||||
callback) {
|
||||
const pathPrefix = this._inputCheck(batchOrSession, null, callback);
|
||||
const params = {};
|
||||
if (startIndex) {
|
||||
assert(Number.isInteger(startIndex),
|
||||
'startIndex must be an integer');
|
||||
params.from = startIndex;
|
||||
}
|
||||
if (numOfSessionOrBatch) {
|
||||
assert(Number.isInteger(numOfSessionOrBatch),
|
||||
`numOf${batchOrSession} must be an integer`);
|
||||
params.size = numOfSessionOrBatch;
|
||||
}
|
||||
this._request('GET', `${pathPrefix}`,
|
||||
params, null, callback);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** Returns state of batch or session
|
||||
* @param {string} batchOrSession - either 'batch' or 'session'
|
||||
* @param {number} id - batch or session id
|
||||
* @param {function} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
getSessionOrBatchState(batchOrSession, id, callback) {
|
||||
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
|
||||
this._request('GET', `${pathPrefix}/${id}/state`,
|
||||
null, null, callback);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** Returns log of batch or session
|
||||
* @param {string} batchOrSession - either 'batch' or 'session'
|
||||
* @param {number} id - batch or session id
|
||||
* @param {function} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
getSessionOrBatchLog(batchOrSession, id, callback) {
|
||||
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
|
||||
this._request('GET', `${pathPrefix}/${id}/log`, null, null, callback);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** Returns a specified statement within a session
|
||||
* @param {number} sessionId - session id
|
||||
* @param {number} statementId - statement id
|
||||
* @param {function} callback - callback
|
||||
* @param {object} [backoff] - backoff instance
|
||||
* @return {undefined}
|
||||
*/
|
||||
getStatement(sessionId, statementId, callback, backoff) {
|
||||
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||
assert(Number.isInteger(statementId), 'statementId must be an integer');
|
||||
const backoffInstance = backoff || new Backoff(this.getBackoffParams);
|
||||
if (backoffInstance.attempts >= this.getBackoffParams.maxAttempts) {
|
||||
const error = new Error('Attempted to get statement from livy ' +
|
||||
'too many times');
|
||||
return process.nextTick(() => callback(error));
|
||||
}
|
||||
this._request('GET', `/sessions/${sessionId}/statements/${statementId}`,
|
||||
null, null, (err, res) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
if (!res) {
|
||||
const error = new Error('Livy did not send response' +
|
||||
'to get statement request');
|
||||
return callback(error);
|
||||
}
|
||||
if (res.output && res.output.status === 'error') {
|
||||
const error = new Error('Error response to statement:' +
|
||||
` ${res.output.evalue}`);
|
||||
return callback(error);
|
||||
}
|
||||
if (res.output && res.output.data) {
|
||||
return callback(null, res.output);
|
||||
}
|
||||
if (res.state === 'waiting' || res.state === 'running') {
|
||||
this.logger.info('statement result not ready',
|
||||
{ statementStatus: res.state });
|
||||
const retryDelayMs = backoffInstance.duration();
|
||||
return setTimeout(this.getStatement.bind(this),
|
||||
retryDelayMs, sessionId, statementId,
|
||||
callback, backoffInstance);
|
||||
}
|
||||
// otherwise, error out (status could be error,
|
||||
// cancelling or cancelled or possibly other issue)
|
||||
const error = new Error('Statement status is: ' +
|
||||
`${res.status}. Cannot obtain result`);
|
||||
return callback(error);
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** Returns all statements within a session
|
||||
* @param {number} sessionId - id of session
|
||||
* @param {function} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
getStatements(sessionId, callback) {
|
||||
assert(typeof callback === 'function', 'callback must be a function');
|
||||
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||
this._request('GET', `/sessions/${sessionId}/statements`, null, null,
|
||||
callback);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** Creates a new interactive Scala, Python, or R shell in the cluster
|
||||
* @param {object} options - options for session
|
||||
* @param {string} options.kind - type of session: spark, pyspark,
|
||||
* pyspark3 or sparkr. If not specified, defaults to spark.
|
||||
* For other options, see: https://github.com/apache/
|
||||
* incubator-livy/blob/master/docs/rest-api.md#post-sessions
|
||||
* @param {function} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
postSession(options, callback) {
|
||||
assert(typeof callback === 'function', 'callback must be a function');
|
||||
assert(typeof options === 'object', 'options must be an object');
|
||||
let postBody = options;
|
||||
if (!options.kind) {
|
||||
postBody.kind = 'spark';
|
||||
}
|
||||
postBody = JSON.stringify(postBody);
|
||||
this._request('POST', '/sessions',
|
||||
null, postBody, callback);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** Run a statement in a session
|
||||
* @param {number} sessionId - options for session
|
||||
* @param {string} codeToExecute - actual code to be executed by spark
|
||||
* @param {function} callback - callback
|
||||
* @param {object} [backoff] - backoff instance
|
||||
* @return {undefined}
|
||||
*/
|
||||
postStatement(sessionId, codeToExecute, callback, backoff) {
|
||||
assert(typeof callback === 'function', 'callback must be a function');
|
||||
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||
assert(typeof codeToExecute === 'string', 'codeToExecute must be ' +
|
||||
'a string');
|
||||
const backoffInstance = backoff || new Backoff(this.postBackoffParams);
|
||||
if (backoffInstance.attempts >= this.postBackoffParams.maxAttempts) {
|
||||
const error = new Error('Attempted to post statement to livy ' +
|
||||
'too many times');
|
||||
return process.nextTick(() => callback(error));
|
||||
}
|
||||
// Need to check status of session
|
||||
return this.getSessionOrBatchState('session', sessionId, (err, res) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
if (res.state === 'starting' || res.state === 'busy') {
|
||||
this.logger.info('session not ready',
|
||||
{ sessionState: res.state });
|
||||
const retryDelayMs = backoffInstance.duration();
|
||||
return setTimeout(this.postStatement.bind(this),
|
||||
retryDelayMs, sessionId, codeToExecute,
|
||||
callback, backoffInstance);
|
||||
}
|
||||
if (res.state !== 'idle') {
|
||||
const error = new Error('Session is in state: ' +
|
||||
`${res.state}. Cannot accept statement`);
|
||||
return callback(error);
|
||||
}
|
||||
const postBody = JSON.stringify({ code: codeToExecute });
|
||||
this._request('POST', `/sessions/${sessionId}/statements`,
|
||||
null, postBody, callback);
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
|
||||
/** Creates a new batch job
|
||||
* @param {object} options - options for batch
|
||||
* @param {string} options.file - path of file containing the
|
||||
* application to execute
|
||||
* For other options, see: https://github.com/apache/incubator-livy/
|
||||
* blob/master/docs/rest-api.md#post-batches
|
||||
* @param {function} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
postBatch(options, callback) {
|
||||
assert(typeof callback === 'function', 'callback must be a function');
|
||||
assert(typeof options === 'object', 'options must be an object');
|
||||
assert(typeof options.file === 'string',
|
||||
'options.file must be a string');
|
||||
this._request('POST', '/batches',
|
||||
null, JSON.stringify(options), callback);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** cancels a statement
|
||||
* @param {number} sessionId - session id
|
||||
* @param {number} statementId - id of statement to cancel
|
||||
* @param {function} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
cancelStatement(sessionId, statementId, callback) {
|
||||
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||
assert(Number.isInteger(statementId), 'statementId must be an integer');
|
||||
assert(typeof callback === 'function', 'callback must be a function');
|
||||
this._request('POST',
|
||||
`/sessions/${sessionId}/statements/${statementId}/cancel`, null, null,
|
||||
callback);
|
||||
}
|
||||
|
||||
/** Deletes a bath or session
|
||||
* @param {number} batchOrSession - either 'batch' or 'session'
|
||||
* @param {number} id - id of batch or session
|
||||
* @param {function} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
deleteSessionOrBatch(batchOrSession, id, callback) {
|
||||
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
|
||||
this._request('DELETE', `${pathPrefix}/${id}`,
|
||||
null, null, callback);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
_inputCheck(batchOrSession, id, callback) {
|
||||
assert(batchOrSession === 'batch' || batchOrSession === 'session',
|
||||
'batchOrSession must be string "batch" or "session"');
|
||||
if (id) {
|
||||
assert(Number.isInteger(id), 'id must be an integer');
|
||||
}
|
||||
if (callback) {
|
||||
assert(typeof callback === 'function',
|
||||
'callback must be a function');
|
||||
}
|
||||
return batchOrSession === 'batch' ?
|
||||
'/batches' : '/sessions';
|
||||
}
|
||||
|
||||
_endResponse(res, data, callback) {
|
||||
const code = res.statusCode;
|
||||
const parsedData = data ? JSON.parse(data) : null;
|
||||
if (code <= 201) {
|
||||
this.logger.info(`request to ${this.serverHost} returned success`,
|
||||
{ httpCode: code });
|
||||
return callback(null, parsedData);
|
||||
}
|
||||
const error = new Error(res.statusMessage);
|
||||
this.logger.info(`request to ${this.serverHost} returned error`,
|
||||
{ statusCode: code, statusMessage: res.statusMessage,
|
||||
info: data });
|
||||
return callback(error, parsedData);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} method - the HTTP method of the request
|
||||
* @param {string} path - path without query parameters
|
||||
* @param {object} params - query parameters of the request
|
||||
* @param {string} dataToSend - data of the request
|
||||
* @param {function} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
_request(method, path, params, dataToSend, callback) {
|
||||
assert(method === 'GET' || method === 'POST' || method === 'DELETE',
|
||||
'httpMethod must be GET, POST or DELETE');
|
||||
assert(typeof callback === 'function', 'callback must be a function');
|
||||
assert(typeof path === 'string', 'path must be a string');
|
||||
assert(typeof params === 'object', 'pararms must be an object');
|
||||
this.logger.info('sending request',
|
||||
{ httpMethod: method, path, params });
|
||||
let fullPath = path;
|
||||
const headers = {
|
||||
'content-length': 0,
|
||||
};
|
||||
|
||||
if (params) {
|
||||
fullPath += `?${querystring.stringify(params)}`;
|
||||
}
|
||||
|
||||
const options = {
|
||||
method,
|
||||
path: fullPath,
|
||||
headers,
|
||||
hostname: this.serverHost,
|
||||
port: this.serverPort,
|
||||
agent: this.agent,
|
||||
};
|
||||
if (this._cert && this._key) {
|
||||
options.key = this._key;
|
||||
options.cert = this._cert;
|
||||
}
|
||||
const dataResponse = [];
|
||||
let dataResponseLength = 0;
|
||||
|
||||
const req = this.transport.request(options);
|
||||
req.setNoDelay();
|
||||
|
||||
if (dataToSend) {
|
||||
/*
|
||||
* Encoding data to binary provides a hot path to write data
|
||||
* directly to the socket, without node.js trying to encode the data
|
||||
* over and over again.
|
||||
*/
|
||||
const binData = Buffer.from(dataToSend, 'utf8');
|
||||
req.setHeader('content-type', 'application/octet-stream');
|
||||
/*
|
||||
* Using Buffer.bytelength is not required here because data is
|
||||
* binary encoded, data.length would give us the exact byte length
|
||||
*/
|
||||
req.setHeader('content-length', binData.length);
|
||||
req.write(binData);
|
||||
}
|
||||
|
||||
req.on('response', res => {
|
||||
res.on('data', data => {
|
||||
dataResponse.push(data);
|
||||
dataResponseLength += data.length;
|
||||
}).on('error', callback).on('end', () => {
|
||||
this._endResponse(res, Buffer.concat(dataResponse,
|
||||
dataResponseLength).toString(), callback);
|
||||
});
|
||||
}).on('error', error => {
|
||||
// covers system errors like ECONNREFUSED, ECONNRESET etc.
|
||||
this.logger.error('error sending request to livy', { error });
|
||||
return callback(error);
|
||||
}).end();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
module.exports = LivyClient;
|
|
@ -0,0 +1,330 @@
|
|||
'use strict'; // eslint-disable-line strict
|
||||
|
||||
const assert = require('assert');
|
||||
|
||||
const LivyClient = require('../../lib/client');
|
||||
|
||||
describe('LivyClient tests', function testClient() {
|
||||
this.timeout(0);
|
||||
let client;
|
||||
|
||||
before('Create the client', () => {
|
||||
client = new LivyClient({ host: 'localhost' });
|
||||
});
|
||||
|
||||
describe('POST Sessions', function postSession() {
|
||||
this.sessionId = undefined;
|
||||
afterEach(done => {
|
||||
client.deleteSessionOrBatch('session', this.sessionId, done);
|
||||
});
|
||||
|
||||
it('should post a session', done => {
|
||||
client.postSession({}, (err, res) => {
|
||||
assert.ifError(err);
|
||||
this.sessionId = res.id;
|
||||
assert(res.id !== undefined && res.id !== null);
|
||||
assert.strictEqual(res.state, 'starting');
|
||||
assert.strictEqual(res.kind, 'spark');
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('GET Sessions', () => {
|
||||
it('should get interactive sessions', done => {
|
||||
client.getSessionsOrBatches('session', null, null, (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert(res.from !== undefined);
|
||||
assert(res.total !== undefined);
|
||||
assert(res.sessions !== undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get interactive sessions starting from index 1', done => {
|
||||
client.getSessionsOrBatches('session', 1, null, (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.from, 1);
|
||||
assert(res.total !== undefined);
|
||||
assert(res.sessions !== undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
describe('After sessions have been posted', () => {
|
||||
beforeEach(function beforeE(done) {
|
||||
return client.postSession({}, (err, res) => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
this.sessionId = res.id;
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get interactive sessions with total limited to 1',
|
||||
done => {
|
||||
client.getSessionsOrBatches('session', null, 1, (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert(res.from !== undefined);
|
||||
assert(res.total !== undefined);
|
||||
assert(res.sessions !== undefined);
|
||||
assert.strictEqual(res.sessions.length, 1);
|
||||
assert(res.sessions[0].state === 'idle' ||
|
||||
res.sessions[0].state === 'starting');
|
||||
assert.strictEqual(res.sessions[0].kind, 'spark');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get session by id', function itF(done) {
|
||||
client.getSessionOrBatch('session', this.sessionId,
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.id, this.sessionId);
|
||||
assert(res.appId !== undefined);
|
||||
assert(res.owner !== undefined);
|
||||
assert(res.proxyUser !== undefined);
|
||||
assert(res.state !== undefined);
|
||||
assert.strictEqual(res.kind, 'spark');
|
||||
assert(res.appInfo !== undefined);
|
||||
assert(res.log !== undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get state of session', function itF(done) {
|
||||
client.getSessionOrBatchState('session', this.sessionId,
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.id, this.sessionId);
|
||||
assert(res.state !== undefined &&
|
||||
typeof res.state === 'string');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get log of session', function itF(done) {
|
||||
client.getSessionOrBatchLog('session', this.sessionId,
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.id, this.sessionId);
|
||||
assert(res.from !== undefined);
|
||||
assert(res.total !== undefined);
|
||||
assert(res.log !== undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('POST and GET Session Statements', () => {
|
||||
before(function beforeE(done) {
|
||||
client.postSession({}, (err, res) => {
|
||||
assert.ifError(err);
|
||||
this.sessionId = res.id;
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
after(function afterE(done) {
|
||||
client.deleteSessionOrBatch('session', this.sessionId, done);
|
||||
});
|
||||
|
||||
it('should post a session statement with a quick program',
|
||||
function itF(done) {
|
||||
const codeToExecute = '2 + 2';
|
||||
client.postStatement(this.sessionId, codeToExecute,
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
this.quickStatementId = res.id;
|
||||
assert.strictEqual(res.id, this.quickStatementId);
|
||||
assert.strictEqual(res.state, 'waiting');
|
||||
assert.strictEqual(res.output, null);
|
||||
assert.strictEqual(res.code, codeToExecute);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get a session statement with quick execution',
|
||||
function itF(done) {
|
||||
client.getStatement(this.sessionId, this.quickStatementId,
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.status, 'ok');
|
||||
assert.strictEqual(res.execution_count, 0);
|
||||
assert.deepStrictEqual(res.data,
|
||||
{ 'text/plain': 'res0: Int = 4' });
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should post a session statement with a long program',
|
||||
function itF(done) {
|
||||
const codeToExecute = 'for(n <- 1 to 1000000) ' +
|
||||
'{ Range(2, n-1).filter(primeTester => n % primeTester == 0).' +
|
||||
'length == 0 }';
|
||||
client.postStatement(this.sessionId, codeToExecute,
|
||||
(err, res) => {
|
||||
this.longStatementId = res.id;
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.state, 'waiting');
|
||||
assert.strictEqual(res.output, null);
|
||||
assert.strictEqual(res.code, codeToExecute);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should max out on retries getting a session statement with long ' +
|
||||
'execution', function itF(done) {
|
||||
client.getStatement(this.sessionId, this.longStatementId,
|
||||
err => {
|
||||
assert(err);
|
||||
assert.strictEqual(err.message, 'Attempted to ' +
|
||||
'get statement from livy too many times');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should cancel in-progress statement', function itF(done) {
|
||||
const codeToExecute = 'for(n <- 1 to 1000000) ' +
|
||||
'{ Range(2, n-1).filter(primeTester => n % primeTester == 0).' +
|
||||
'length == 0 }';
|
||||
client.postStatement(this.sessionId, codeToExecute,
|
||||
(err, res) => {
|
||||
client.cancelStatement(this.sessionId, res.id, (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictequal(res.msg, 'canceled');
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('POST, GET, and DELETE batch', function batches() {
|
||||
it('should post a jar file batch', done => {
|
||||
// Note for any test env: must add path to livy.conf whitelist
|
||||
// livy.file.local-dir-whitelist=<insert path here>
|
||||
client.postBatch({ file: `${__dirname}/../resources/` +
|
||||
'simplespark_2.11-0.1.jar',
|
||||
className: 'SimpleApp' },
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
this.jarBatch = res.id;
|
||||
assert(res.state === 'running' || res.state === 'starting');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should post a python file batch', done => {
|
||||
// Note for any test env: must add path to livy.conf whitelist
|
||||
// livy.file.local-dir-whitelist=<insert path here>
|
||||
client.postBatch({ file: `${__dirname}/../resources/` +
|
||||
'SimpleApp.py' },
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
this.pythonBatch = res.id;
|
||||
assert(res.state === 'running' || res.state === 'starting');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get active batches', done => {
|
||||
client.getSessionsOrBatches('batch', null, null, (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert(res.from !== undefined);
|
||||
assert(res.total !== undefined);
|
||||
assert(res.sessions !== undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get a jar batch by id', done => {
|
||||
client.getSessionOrBatch('batch', this.jarBatch, (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.id, this.jarBatch);
|
||||
assert(res.state !== undefined &&
|
||||
typeof res.state === 'string');
|
||||
assert(res.appId !== undefined);
|
||||
assert(res.appInfo !== undefined);
|
||||
assert(res.log !== undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get a python file batch by id', done => {
|
||||
client.getSessionOrBatch('batch', this.pythonBatch, (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.id, this.pythonBatch);
|
||||
assert(res.state !== undefined &&
|
||||
typeof res.state === 'string');
|
||||
assert(res.appId !== undefined);
|
||||
assert(res.appInfo !== undefined);
|
||||
assert(res.log !== undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get a jar batch state', done => {
|
||||
client.getSessionOrBatchState('batch', this.jarBatch,
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.id, this.jarBatch);
|
||||
assert(res.state !== undefined && typeof
|
||||
res.state === 'string');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get a python file batch state', done => {
|
||||
client.getSessionOrBatchState('batch', this.jarBatch,
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.id, this.jarBatch);
|
||||
assert(res.state !== undefined && typeof
|
||||
res.state === 'string');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get the log of a jar batch', done => {
|
||||
client.getSessionOrBatchLog('batch', this.jarBatch, (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.id, this.jarBatch);
|
||||
assert(res.log !== undefined);
|
||||
assert(res.from !== undefined && Number.isInteger(res.total));
|
||||
assert(res.total !== undefined && Number.isInteger(res.total));
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get the log of a python file batch', done => {
|
||||
client.getSessionOrBatchLog('batch', this.pythonBatch,
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res.id, this.pythonBatch);
|
||||
assert(res.log !== undefined);
|
||||
assert(res.from !== undefined && Number.isInteger(res.total));
|
||||
assert(res.total !== undefined && Number.isInteger(res.total));
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should delete a python file batch', done => {
|
||||
client.deleteSessionOrBatch('batch', this.pythonBatch,
|
||||
(err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.deepStrictEqual(res, { msg: 'deleted' });
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should delete a jar batch', done => {
|
||||
client.deleteSessionOrBatch('batch', this.jarBatch, (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.deepStrictEqual(res, { msg: 'deleted' });
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,3 @@
|
|||
"""SimpleApp.py"""
|
||||
|
||||
print "I was submitted to Livy"
|
Binary file not shown.
|
@ -54,6 +54,21 @@ class ObjectMD {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns metadata attributes for the current model
|
||||
*
|
||||
* @return {object} object with keys of existing attributes
|
||||
* and value set to true
|
||||
*/
|
||||
static getAttributes() {
|
||||
const sample = new ObjectMD();
|
||||
const attributes = {};
|
||||
Object.keys(sample.getValue()).forEach(key => {
|
||||
attributes[key] = true;
|
||||
});
|
||||
return attributes;
|
||||
}
|
||||
|
||||
getSerialized() {
|
||||
return JSON.stringify(this.getValue());
|
||||
}
|
||||
|
|
|
@ -805,6 +805,10 @@ const routesUtils = {
|
|||
* bucket name against validation rules
|
||||
*/
|
||||
isValidBucketName(bucketname, prefixBlacklist) {
|
||||
// allow this one capitalized bucketname so that clueso can use it
|
||||
if (bucketname === 'METADATA') {
|
||||
return true;
|
||||
}
|
||||
const ipAddressRegex = new RegExp(/^(\d+\.){3}\d+$/);
|
||||
// eslint-disable-next-line no-useless-escape
|
||||
const dnsRegex = new RegExp(/^[a-z0-9]+([\.\-]{1}[a-z0-9]+)*$/);
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
"engines": {
|
||||
"node": "6.9.5"
|
||||
},
|
||||
"version": "7.2.0",
|
||||
"version": "7.2.1",
|
||||
"description": "Common utilities for the S3 project components",
|
||||
"main": "index.js",
|
||||
"repository": {
|
||||
|
@ -19,6 +19,7 @@
|
|||
"dependencies": {
|
||||
"ajv": "4.10.0",
|
||||
"async": "~2.1.5",
|
||||
"backo": "^1.1.0",
|
||||
"debug": "~2.3.3",
|
||||
"diskusage": "^0.2.2",
|
||||
"ioredis": "2.4.0",
|
||||
|
|
Loading…
Reference in New Issue