Compare commits
5 Commits
77c8e7de4b
...
20a7044d8c
Author | SHA1 | Date |
---|---|---|
Lauren Spiegel | 20a7044d8c | |
JianqinWang | 8d267feae7 | |
Lauren Spiegel | 22e9b8fc83 | |
Lauren Spiegel | c807c54406 | |
Lauren Spiegel | 1367ceea86 |
|
@ -3,3 +3,11 @@
|
||||||
|
|
||||||
# Dependency directory
|
# Dependency directory
|
||||||
node_modules/
|
node_modules/
|
||||||
|
*/node_modules/
|
||||||
|
|
||||||
|
# Build executables
|
||||||
|
*-win.exe
|
||||||
|
*-linux
|
||||||
|
*-macos
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
# Get Pensieve Credentials Executable
|
||||||
|
|
||||||
|
## To make executable file from getPensieveCreds.js
|
||||||
|
|
||||||
|
`npm install -g pkg`
|
||||||
|
`pkg getPensieveCreds.js`
|
||||||
|
|
||||||
|
This will build a mac, linux and windows file.
|
||||||
|
If you just want linux, for example:
|
||||||
|
`pkg getPensieveCreds.js --targets node6-linux-x64`
|
||||||
|
|
||||||
|
For further options, see https://github.com/zeit/pkg
|
||||||
|
|
||||||
|
## To run the executable file
|
||||||
|
|
||||||
|
Call the output executable file with an
|
||||||
|
argument that names the service you
|
||||||
|
are trying to get credentials for:
|
||||||
|
|
||||||
|
`./getPensieveCreds-linux serviceName`
|
|
@ -0,0 +1,76 @@
|
||||||
|
const async = require('async');
|
||||||
|
const forge = require('node-forge');
|
||||||
|
const MetadataFileClient =
|
||||||
|
require('../../storage/metadata/file/MetadataFileClient');
|
||||||
|
const mdClient = new MetadataFileClient({
|
||||||
|
host: 's3-metadata',
|
||||||
|
port: '9993',
|
||||||
|
});
|
||||||
|
|
||||||
|
const serviceName = process.argv[2];
|
||||||
|
const tokenKey = 'auth/zenko/remote-management-token';
|
||||||
|
|
||||||
|
// XXX copy-pasted from Backbeat
|
||||||
|
function decryptSecret(instanceCredentials, secret) {
|
||||||
|
// XXX don't forget to use u.encryptionKeyVersion if present
|
||||||
|
const privateKey = forge.pki.privateKeyFromPem(
|
||||||
|
instanceCredentials.privateKey);
|
||||||
|
const encryptedSecretKey = forge.util.decode64(secret);
|
||||||
|
return privateKey.decrypt(encryptedSecretKey, 'RSA-OAEP', {
|
||||||
|
md: forge.md.sha256.create(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function loadOverlayVersion(db, version, cb) {
|
||||||
|
db.get(`configuration/overlay/${version}`, {}, (err, val) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
return cb(null, JSON.parse(val));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
function parseServiceCredentials(conf, auth) {
|
||||||
|
const instanceAuth = JSON.parse(auth);
|
||||||
|
const serviceAccount = (conf.users || []).find(
|
||||||
|
u => u.accountType === `service-${serviceName}`);
|
||||||
|
if (!serviceAccount) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
accessKey: serviceAccount.accessKey,
|
||||||
|
secretKey: decryptSecret(instanceAuth, serviceAccount.secretKey),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const mdDb = mdClient.openDB(error => {
|
||||||
|
if (error) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
|
const db = mdDb.openSub('PENSIEVE');
|
||||||
|
return async.waterfall([
|
||||||
|
cb => db.get('configuration/overlay-version', {}, cb),
|
||||||
|
(version, cb) => loadOverlayVersion(db, version, cb),
|
||||||
|
(conf, cb) => db.get(tokenKey, {}, (err, instanceAuth) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
const creds = parseServiceCredentials(conf, instanceAuth);
|
||||||
|
return cb(null, creds);
|
||||||
|
}),
|
||||||
|
], (err, creds) => {
|
||||||
|
db.disconnect();
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
if (!creds) {
|
||||||
|
throw new Error('No credentials found');
|
||||||
|
}
|
||||||
|
process.stdout.write(`export AWS_ACCESS_KEY_ID="${creds.accessKey}"\n`);
|
||||||
|
process.stdout
|
||||||
|
.write(`export AWS_SECRET_ACCESS_KEY="${creds.secretKey}"`);
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,13 @@
|
||||||
|
{
|
||||||
|
"name": "pensievecreds",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"description": "Executable tool for Pensieve",
|
||||||
|
"main": "getPensieveCreds.js",
|
||||||
|
"scripts": {
|
||||||
|
"test": "echo \"Error: no test specified\" && exit 1"
|
||||||
|
},
|
||||||
|
"dependencies": {
|
||||||
|
"async": "^2.6.0",
|
||||||
|
"node-forge": "^0.7.1"
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,50 +6,72 @@ const https = require('https');
|
||||||
const querystring = require('querystring');
|
const querystring = require('querystring');
|
||||||
const Backoff = require('backo');
|
const Backoff = require('backo');
|
||||||
// min and max in ms
|
// min and max in ms
|
||||||
const GET_BACKOFF_PARAMS = { min: 100, max: 200, maxAttempts: 300,
|
const defaultGetBackoffParams = { min: 100, max: 200, maxAttempts: 300,
|
||||||
factor: 1.01 };
|
factor: 1.01 };
|
||||||
const POST_BACKOFF_PARAMS = { min: 1000, max: 3000, maxAttempts: 20,
|
const defaultPostBackoffParams = { min: 1000, max: 3000, maxAttempts: 20,
|
||||||
factor: 1.1 };
|
factor: 1.1 };
|
||||||
|
|
||||||
|
|
||||||
class LivyClient {
|
class LivyClient {
|
||||||
/**
|
/**
|
||||||
* Constructor for REST client to apache livy
|
* Constructor for REST client to apache livy
|
||||||
*
|
*
|
||||||
* @param {string} host - hostname or IP of the livy server
|
* @param {object} params - object for all parameters to initialize Livy
|
||||||
* @param {number} [port=8998] - port of the livy server
|
|
||||||
* @param {object} [logger=console] - logger object
|
|
||||||
* @param {boolean} [useHttps] - whether to use https or not
|
|
||||||
* @param {string} [key] - https private key content
|
|
||||||
* @param {string} [cert] - https public certificate content
|
|
||||||
* @param {string} [ca] - https authority certificate content
|
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
constructor(host, port = 8998, logger = console, useHttps, key, cert, ca) {
|
constructor(params) {
|
||||||
assert(typeof host === 'string' && host !== '', 'host is required');
|
assert(typeof params.host === 'string' && params.host !== '',
|
||||||
assert(Number.isInteger(port), 'port must be an integer');
|
'host is required');
|
||||||
assert(typeof logger === 'object', 'logger must be an object');
|
assert(params.port === undefined || Number.isInteger(params.port),
|
||||||
assert(typeof logger.error === 'function', 'logger must have' +
|
'port must be an integer');
|
||||||
'error method');
|
assert(params.logger === undefined || typeof params.logger === 'object',
|
||||||
assert(typeof logger.info === 'function', 'logger must have' +
|
'logger must be an object');
|
||||||
'info method');
|
assert(params.logger === undefined ||
|
||||||
assert(key === undefined || typeof key === 'string',
|
typeof params.logger.error === 'function',
|
||||||
'key must be a string');
|
'logger must have error method');
|
||||||
assert(cert === undefined || typeof cert === 'string',
|
assert(params.logger === undefined ||
|
||||||
'cert must be a string');
|
typeof params.logger.info === 'function',
|
||||||
assert(ca === undefined || typeof ca === 'string',
|
'logger must have info method');
|
||||||
'ca must be a string');
|
assert(params.key === undefined || typeof params.key === 'string',
|
||||||
this.serverHost = host;
|
'key must be a string');
|
||||||
this.serverPort = port;
|
assert(params.cert === undefined || typeof params.cert === 'string',
|
||||||
this.logger = logger;
|
'cert must be a string');
|
||||||
this._key = key;
|
assert(params.ca === undefined || typeof params.ca === 'string',
|
||||||
this._cert = cert;
|
'ca must be a string');
|
||||||
this._ca = ca;
|
assert(params.getBackoffParams === undefined ||
|
||||||
this.useHttps = (useHttps === true);
|
typeof params.getBackoffParams === 'object',
|
||||||
|
'getBackoffParams must be an object');
|
||||||
|
assert(params.postBackoffParams === undefined ||
|
||||||
|
typeof params.postBackoffParams === 'object',
|
||||||
|
'postBackoffParams must be an object');
|
||||||
|
assert(params.getBackoffParams === undefined ||
|
||||||
|
(Number.isInteger(params.getBackoffParams.min) &&
|
||||||
|
Number.isInteger(params.getBackoffParams.max) &&
|
||||||
|
Number.isInteger(params.getBackoffParams.maxAttempts) &&
|
||||||
|
!isNaN(params.getBackoffParams.factor)),
|
||||||
|
'getBackoffParams should have valid numerical values for ' +
|
||||||
|
'the min, max, maxAttempts, and factor attributes');
|
||||||
|
assert(params.postBackoffParams === undefined ||
|
||||||
|
(Number.isInteger(params.postBackoffParams.min) &&
|
||||||
|
Number.isInteger(params.postBackoffParams.max) &&
|
||||||
|
Number.isInteger(params.postBackoffParams.maxAttempts) &&
|
||||||
|
!isNaN(params.postBackoffParams.factor)),
|
||||||
|
'postBackoffParams should have valid numerical values for ' +
|
||||||
|
'the min, max, maxAttempts, and factor attributes');
|
||||||
|
this.serverHost = params.host;
|
||||||
|
this.serverPort = params.port !== undefined ? params.port : 8998;
|
||||||
|
this.logger = params.logger !== undefined ? params.logger : console;
|
||||||
|
this._key = params.key;
|
||||||
|
this._cert = params.cert;
|
||||||
|
this._ca = params.ca;
|
||||||
|
this.getBackoffParams = params.getBackoffParams !== undefined ?
|
||||||
|
params.getBackoffParams : defaultGetBackoffParams;
|
||||||
|
this.postBackoffParams = params.postBackoffParams !== undefined ?
|
||||||
|
params.postBackoffParams : defaultPostBackoffParams;
|
||||||
|
this.useHttps = (params.useHttps === true);
|
||||||
if (this.useHttps) {
|
if (this.useHttps) {
|
||||||
this.transport = https;
|
this.transport = https;
|
||||||
this._agent = new https.Agent({
|
this._agent = new https.Agent({
|
||||||
ca: ca ? [ca] : undefined,
|
ca: params.ca ? [params.ca] : undefined,
|
||||||
keepAlive: true,
|
keepAlive: true,
|
||||||
requestCert: true,
|
requestCert: true,
|
||||||
});
|
});
|
||||||
|
@ -62,27 +84,41 @@ class LivyClient {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns interactive sessions
|
/** Return session or batch information by id
|
||||||
|
* @param {string} batchOrSession - either 'batch' or 'session'
|
||||||
|
* @param {number} id - id of session to get information on
|
||||||
|
* @param {funcftion} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
getSessionOrBatch(batchOrSession, id, callback) {
|
||||||
|
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
|
||||||
|
this._request('GET', `${pathPrefix}/${id}`, null, null, callback);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns interactive sessions or batches
|
||||||
|
* @param {string} batchOrSession - either 'batch' or 'session'
|
||||||
* @param {number} [startIndex] - index to start listing
|
* @param {number} [startIndex] - index to start listing
|
||||||
* @param {number} [numOfSessions] - number of sessions to
|
* @param {number} [numOfSessionOrBatch] - number of sessions to
|
||||||
* return
|
* return
|
||||||
* @param {function} callback - callback
|
* @param {function} callback - callback
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
getSessions(startIndex, numOfSessions, callback) {
|
getSessionsOrBatches(batchOrSession, startIndex, numOfSessionOrBatch,
|
||||||
assert(typeof callback === 'function', 'callback must be a function');
|
callback) {
|
||||||
|
const pathPrefix = this._inputCheck(batchOrSession, null, callback);
|
||||||
const params = {};
|
const params = {};
|
||||||
if (startIndex) {
|
if (startIndex) {
|
||||||
assert(Number.isInteger(startIndex),
|
assert(Number.isInteger(startIndex),
|
||||||
'startIndex must be an integer');
|
'startIndex must be an integer');
|
||||||
params.from = startIndex;
|
params.from = startIndex;
|
||||||
}
|
}
|
||||||
if (numOfSessions) {
|
if (numOfSessionOrBatch) {
|
||||||
assert(Number.isInteger(numOfSessions),
|
assert(Number.isInteger(numOfSessionOrBatch),
|
||||||
'numOfSessions must be an integer');
|
`numOf${batchOrSession} must be an integer`);
|
||||||
params.size = numOfSessions;
|
params.size = numOfSessionOrBatch;
|
||||||
}
|
}
|
||||||
this._request('GET', '/sessions',
|
this._request('GET', `${pathPrefix}`,
|
||||||
params, null, callback);
|
params, null, callback);
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
@ -94,95 +130,24 @@ class LivyClient {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
getSessionOrBatchState(batchOrSession, id, callback) {
|
getSessionOrBatchState(batchOrSession, id, callback) {
|
||||||
assert(typeof callback === 'function', 'callback must be a function');
|
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
|
||||||
assert(batchOrSession === 'batch' || batchOrSession === 'session',
|
|
||||||
'batchOrSession must be string "batch" or "session"');
|
|
||||||
assert(Number.isInteger(id), 'id must be an integer');
|
|
||||||
const pathPrefix = batchOrSession === 'batch' ?
|
|
||||||
'/batches' : '/sessions';
|
|
||||||
this._request('GET', `${pathPrefix}/${id}/state`,
|
this._request('GET', `${pathPrefix}/${id}/state`,
|
||||||
null, null, callback);
|
null, null, callback);
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Creates a new interactive Scala, Python, or R shell in the cluster
|
/** Returns log of batch or session
|
||||||
* @param {object} options - options for session
|
* @param {string} batchOrSession - either 'batch' or 'session'
|
||||||
* @param {string} options.kind - type of session: spark, pyspark,
|
* @param {number} id - batch or session id
|
||||||
* pyspark3 or sparkr. If not specified, defaults to spark.
|
|
||||||
* For other options, see: https://github.com/apache/
|
|
||||||
* incubator-livy/blob/master/docs/rest-api.md#post-sessions
|
|
||||||
* @param {function} callback - callback
|
* @param {function} callback - callback
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
postSession(options, callback) {
|
getSessionOrBatchLog(batchOrSession, id, callback) {
|
||||||
assert(typeof callback === 'function', 'callback must be a function');
|
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
|
||||||
assert(typeof options === 'object', 'options must be an object');
|
this._request('GET', `${pathPrefix}/${id}/log`, null, null, callback);
|
||||||
let postBody = options;
|
|
||||||
if (!options.kind) {
|
|
||||||
postBody.kind = 'spark';
|
|
||||||
}
|
|
||||||
postBody = JSON.stringify(postBody);
|
|
||||||
this._request('POST', '/sessions',
|
|
||||||
null, postBody, callback);
|
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Deletes a session
|
|
||||||
* @param {number} sessionId - sessionId to delete
|
|
||||||
* @param {function} callback - callback
|
|
||||||
* @return {undefined}
|
|
||||||
*/
|
|
||||||
deleteSession(sessionId, callback) {
|
|
||||||
assert(typeof callback === 'function', 'callback must be a function');
|
|
||||||
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
|
||||||
this._request('DELETE', `/sessions/${sessionId}`,
|
|
||||||
null, null, callback);
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Run a statement in a session
|
|
||||||
* @param {number} sessionId - options for session
|
|
||||||
* @param {string} codeToExecute - actual code to be executed by spark
|
|
||||||
* @param {function} callback - callback
|
|
||||||
* @param {object} [backoff] - backoff instance
|
|
||||||
* @return {undefined}
|
|
||||||
*/
|
|
||||||
postStatement(sessionId, codeToExecute, callback, backoff) {
|
|
||||||
assert(typeof callback === 'function', 'callback must be a function');
|
|
||||||
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
|
||||||
assert(typeof codeToExecute === 'string', 'codeToExecute must be ' +
|
|
||||||
'a string');
|
|
||||||
const backoffInstance = backoff || new Backoff(POST_BACKOFF_PARAMS);
|
|
||||||
if (backoffInstance.attempts >= POST_BACKOFF_PARAMS.maxAttempts) {
|
|
||||||
const error = new Error('Attempted to post statement to livy ' +
|
|
||||||
'too many times');
|
|
||||||
return process.nextTick(() => callback(error));
|
|
||||||
}
|
|
||||||
// Need to check status of session
|
|
||||||
return this.getSessionOrBatchState('session', sessionId, (err, res) => {
|
|
||||||
if (err) {
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
if (res.state === 'starting' || res.state === 'busy') {
|
|
||||||
this.logger.info('session not ready',
|
|
||||||
{ sessionState: res.state });
|
|
||||||
const retryDelayMs = backoffInstance.duration();
|
|
||||||
return setTimeout(this.postStatement.bind(this),
|
|
||||||
retryDelayMs, sessionId, codeToExecute,
|
|
||||||
callback, backoffInstance);
|
|
||||||
}
|
|
||||||
if (res.state !== 'idle') {
|
|
||||||
const error = new Error('Session is in state: ' +
|
|
||||||
`${res.state}. Cannot accept statement`);
|
|
||||||
return callback(error);
|
|
||||||
}
|
|
||||||
const postBody = JSON.stringify({ code: codeToExecute });
|
|
||||||
this._request('POST', `/sessions/${sessionId}/statements`,
|
|
||||||
null, postBody, callback);
|
|
||||||
return undefined;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns a specified statement within a session
|
/** Returns a specified statement within a session
|
||||||
* @param {number} sessionId - session id
|
* @param {number} sessionId - session id
|
||||||
* @param {number} statementId - statement id
|
* @param {number} statementId - statement id
|
||||||
|
@ -191,11 +156,10 @@ class LivyClient {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
getStatement(sessionId, statementId, callback, backoff) {
|
getStatement(sessionId, statementId, callback, backoff) {
|
||||||
assert(typeof callback === 'function', 'callback must be a function');
|
|
||||||
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||||
assert(Number.isInteger(statementId), 'statementId must be an integer');
|
assert(Number.isInteger(statementId), 'statementId must be an integer');
|
||||||
const backoffInstance = backoff || new Backoff(GET_BACKOFF_PARAMS);
|
const backoffInstance = backoff || new Backoff(this.getBackoffParams);
|
||||||
if (backoffInstance.attempts >= GET_BACKOFF_PARAMS.maxAttempts) {
|
if (backoffInstance.attempts >= this.getBackoffParams.maxAttempts) {
|
||||||
const error = new Error('Attempted to get statement from livy ' +
|
const error = new Error('Attempted to get statement from livy ' +
|
||||||
'too many times');
|
'too many times');
|
||||||
return process.nextTick(() => callback(error));
|
return process.nextTick(() => callback(error));
|
||||||
|
@ -235,31 +199,84 @@ class LivyClient {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns all active batch sessions
|
/** Returns all statements within a session
|
||||||
* @param {number} [startIndex] - index to start listing
|
* @param {number} sessionId - id of session
|
||||||
* @param {number} [numOfBatches] - number of batches to
|
|
||||||
* return
|
|
||||||
* @param {function} callback - callback
|
* @param {function} callback - callback
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
getBatches(startIndex, numOfBatches, callback) {
|
getStatements(sessionId, callback) {
|
||||||
assert(typeof callback === 'function', 'callback must be a function');
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
const params = {};
|
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||||
if (startIndex) {
|
this._request('GET', `/sessions/${sessionId}/statements`, null, null,
|
||||||
assert(Number.isInteger(startIndex),
|
callback);
|
||||||
'startIndex must be an integer');
|
|
||||||
params.from = startIndex;
|
|
||||||
}
|
|
||||||
if (numOfBatches) {
|
|
||||||
assert(Number.isInteger(numOfBatches),
|
|
||||||
'numOfBatches must be an integer');
|
|
||||||
params.size = numOfBatches;
|
|
||||||
}
|
|
||||||
this._request('GET', '/batches',
|
|
||||||
params, null, callback);
|
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Creates a new interactive Scala, Python, or R shell in the cluster
|
||||||
|
* @param {object} options - options for session
|
||||||
|
* @param {string} options.kind - type of session: spark, pyspark,
|
||||||
|
* pyspark3 or sparkr. If not specified, defaults to spark.
|
||||||
|
* For other options, see: https://github.com/apache/
|
||||||
|
* incubator-livy/blob/master/docs/rest-api.md#post-sessions
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
postSession(options, callback) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
assert(typeof options === 'object', 'options must be an object');
|
||||||
|
let postBody = options;
|
||||||
|
if (!options.kind) {
|
||||||
|
postBody.kind = 'spark';
|
||||||
|
}
|
||||||
|
postBody = JSON.stringify(postBody);
|
||||||
|
this._request('POST', '/sessions',
|
||||||
|
null, postBody, callback);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Run a statement in a session
|
||||||
|
* @param {number} sessionId - options for session
|
||||||
|
* @param {string} codeToExecute - actual code to be executed by spark
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @param {object} [backoff] - backoff instance
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
postStatement(sessionId, codeToExecute, callback, backoff) {
|
||||||
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
|
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||||
|
assert(typeof codeToExecute === 'string', 'codeToExecute must be ' +
|
||||||
|
'a string');
|
||||||
|
const backoffInstance = backoff || new Backoff(this.postBackoffParams);
|
||||||
|
if (backoffInstance.attempts >= this.postBackoffParams.maxAttempts) {
|
||||||
|
const error = new Error('Attempted to post statement to livy ' +
|
||||||
|
'too many times');
|
||||||
|
return process.nextTick(() => callback(error));
|
||||||
|
}
|
||||||
|
// Need to check status of session
|
||||||
|
return this.getSessionOrBatchState('session', sessionId, (err, res) => {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
if (res.state === 'starting' || res.state === 'busy') {
|
||||||
|
this.logger.info('session not ready',
|
||||||
|
{ sessionState: res.state });
|
||||||
|
const retryDelayMs = backoffInstance.duration();
|
||||||
|
return setTimeout(this.postStatement.bind(this),
|
||||||
|
retryDelayMs, sessionId, codeToExecute,
|
||||||
|
callback, backoffInstance);
|
||||||
|
}
|
||||||
|
if (res.state !== 'idle') {
|
||||||
|
const error = new Error('Session is in state: ' +
|
||||||
|
`${res.state}. Cannot accept statement`);
|
||||||
|
return callback(error);
|
||||||
|
}
|
||||||
|
const postBody = JSON.stringify({ code: codeToExecute });
|
||||||
|
this._request('POST', `/sessions/${sessionId}/statements`,
|
||||||
|
null, postBody, callback);
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/** Creates a new batch job
|
/** Creates a new batch job
|
||||||
* @param {object} options - options for batch
|
* @param {object} options - options for batch
|
||||||
* @param {string} options.file - path of file containing the
|
* @param {string} options.file - path of file containing the
|
||||||
|
@ -279,19 +296,48 @@ class LivyClient {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Deletes a batch job
|
/** cancels a statement
|
||||||
* @param {number} batchId - batchId to delete
|
* @param {number} sessionId - session id
|
||||||
|
* @param {number} statementId - id of statement to cancel
|
||||||
* @param {function} callback - callback
|
* @param {function} callback - callback
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
deleteBatch(batchId, callback) {
|
cancelStatement(sessionId, statementId, callback) {
|
||||||
|
assert(Number.isInteger(sessionId), 'sessionId must be an integer');
|
||||||
|
assert(Number.isInteger(statementId), 'statementId must be an integer');
|
||||||
assert(typeof callback === 'function', 'callback must be a function');
|
assert(typeof callback === 'function', 'callback must be a function');
|
||||||
assert(Number.isInteger(batchId), 'batchId must be an integer');
|
this._request('POST',
|
||||||
this._request('DELETE', `/batches/${batchId}`,
|
`/sessions/${sessionId}/statements/${statementId}/cancel`, null, null,
|
||||||
|
callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Deletes a bath or session
|
||||||
|
* @param {number} batchOrSession - either 'batch' or 'session'
|
||||||
|
* @param {number} id - id of batch or session
|
||||||
|
* @param {function} callback - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
deleteSessionOrBatch(batchOrSession, id, callback) {
|
||||||
|
const pathPrefix = this._inputCheck(batchOrSession, id, callback);
|
||||||
|
this._request('DELETE', `${pathPrefix}/${id}`,
|
||||||
null, null, callback);
|
null, null, callback);
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_inputCheck(batchOrSession, id, callback) {
|
||||||
|
assert(batchOrSession === 'batch' || batchOrSession === 'session',
|
||||||
|
'batchOrSession must be string "batch" or "session"');
|
||||||
|
if (id) {
|
||||||
|
assert(Number.isInteger(id), 'id must be an integer');
|
||||||
|
}
|
||||||
|
if (callback) {
|
||||||
|
assert(typeof callback === 'function',
|
||||||
|
'callback must be a function');
|
||||||
|
}
|
||||||
|
return batchOrSession === 'batch' ?
|
||||||
|
'/batches' : '/sessions';
|
||||||
|
}
|
||||||
|
|
||||||
_endResponse(res, data, callback) {
|
_endResponse(res, data, callback) {
|
||||||
const code = res.statusCode;
|
const code = res.statusCode;
|
||||||
const parsedData = data ? JSON.parse(data) : null;
|
const parsedData = data ? JSON.parse(data) : null;
|
||||||
|
|
|
@ -7,33 +7,24 @@ const LivyClient = require('../../lib/client');
|
||||||
describe('LivyClient tests', function testClient() {
|
describe('LivyClient tests', function testClient() {
|
||||||
this.timeout(0);
|
this.timeout(0);
|
||||||
let client;
|
let client;
|
||||||
// even if a session is deleted, livy keeps incrementing for new
|
|
||||||
// sesssions so keep track through the tests
|
|
||||||
// TODO; refactor this so using this.sessionId in each test
|
|
||||||
// rather than this global counter
|
|
||||||
let sessionId = 0;
|
|
||||||
|
|
||||||
before('Create the client', () => {
|
before('Create the client', () => {
|
||||||
client = new LivyClient('localhost');
|
client = new LivyClient({ host: 'localhost' });
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('POST Sessions', () => {
|
describe('POST Sessions', function postSession() {
|
||||||
|
this.sessionId = undefined;
|
||||||
afterEach(done => {
|
afterEach(done => {
|
||||||
client.deleteSession(sessionId - 1, done);
|
client.deleteSessionOrBatch('session', this.sessionId, done);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should post a session', done => {
|
it('should post a session', done => {
|
||||||
client.postSession({}, (err, res) => {
|
client.postSession({}, (err, res) => {
|
||||||
assert.ifError(err);
|
assert.ifError(err);
|
||||||
const expectedResponse = {
|
this.sessionId = res.id;
|
||||||
id: 0, appId: null, owner: null,
|
assert(res.id !== undefined && res.id !== null);
|
||||||
proxyUser: null,
|
assert.strictEqual(res.state, 'starting');
|
||||||
state: 'starting',
|
assert.strictEqual(res.kind, 'spark');
|
||||||
kind: 'spark',
|
|
||||||
appInfo: { driverLogUrl: null, sparkUiUrl: null },
|
|
||||||
log: [] };
|
|
||||||
assert.deepStrictEqual(res, expectedResponse);
|
|
||||||
sessionId++;
|
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -41,65 +32,94 @@ describe('LivyClient tests', function testClient() {
|
||||||
|
|
||||||
describe('GET Sessions', () => {
|
describe('GET Sessions', () => {
|
||||||
it('should get interactive sessions', done => {
|
it('should get interactive sessions', done => {
|
||||||
client.getSessions(null, null, (err, res) => {
|
client.getSessionsOrBatches('session', null, null, (err, res) => {
|
||||||
assert.ifError(err);
|
assert.ifError(err);
|
||||||
assert.deepStrictEqual(res, { from: 0, total: 0,
|
assert(res.from !== undefined);
|
||||||
sessions: [] });
|
assert(res.total !== undefined);
|
||||||
|
assert(res.sessions !== undefined);
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should get interactive sessions starting from index 1', done => {
|
it('should get interactive sessions starting from index 1', done => {
|
||||||
client.getSessions(1, null, (err, res) => {
|
client.getSessionsOrBatches('session', 1, null, (err, res) => {
|
||||||
assert.ifError(err);
|
assert.ifError(err);
|
||||||
assert.deepStrictEqual(res, { from: 1, total: 0,
|
assert.strictEqual(res.from, 1);
|
||||||
sessions: [] });
|
assert(res.total !== undefined);
|
||||||
|
assert(res.sessions !== undefined);
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('After sessions have been posted', () => {
|
describe('After sessions have been posted', () => {
|
||||||
before(done => {
|
beforeEach(function beforeE(done) {
|
||||||
client.postSession({}, err => {
|
return client.postSession({}, (err, res) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
return done(err);
|
return done(err);
|
||||||
}
|
}
|
||||||
sessionId++;
|
this.sessionId = res.id;
|
||||||
return done();
|
return done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
after(done => client.deleteSession(sessionId - 1, done));
|
|
||||||
|
|
||||||
it('should get interactive sessions with total limited to 1',
|
it('should get interactive sessions with total limited to 1',
|
||||||
done => {
|
done => {
|
||||||
client.getSessions(null, 1, (err, res) => {
|
client.getSessionsOrBatches('session', null, 1, (err, res) => {
|
||||||
assert.ifError(err);
|
assert.ifError(err);
|
||||||
const expectedResponse = {
|
assert(res.from !== undefined);
|
||||||
from: 0,
|
assert(res.total !== undefined);
|
||||||
total: 1,
|
assert(res.sessions !== undefined);
|
||||||
sessions: [{
|
assert.strictEqual(res.sessions.length, 1);
|
||||||
id: 1,
|
assert(res.sessions[0].state === 'idle' ||
|
||||||
appId: null,
|
res.sessions[0].state === 'starting');
|
||||||
owner: null,
|
assert.strictEqual(res.sessions[0].kind, 'spark');
|
||||||
proxyUser: null,
|
done();
|
||||||
state: 'starting',
|
});
|
||||||
kind: 'spark',
|
});
|
||||||
appInfo: {
|
|
||||||
driverLogUrl: null,
|
it('should get session by id', function itF(done) {
|
||||||
sparkUiUrl: null,
|
client.getSessionOrBatch('session', this.sessionId,
|
||||||
},
|
(err, res) => {
|
||||||
log: [],
|
assert.ifError(err);
|
||||||
}] };
|
assert.strictEqual(res.id, this.sessionId);
|
||||||
assert.deepStrictEqual(res, expectedResponse);
|
assert(res.appId !== undefined);
|
||||||
|
assert(res.owner !== undefined);
|
||||||
|
assert(res.proxyUser !== undefined);
|
||||||
|
assert(res.state !== undefined);
|
||||||
|
assert.strictEqual(res.kind, 'spark');
|
||||||
|
assert(res.appInfo !== undefined);
|
||||||
|
assert(res.log !== undefined);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get state of session', function itF(done) {
|
||||||
|
client.getSessionOrBatchState('session', this.sessionId,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.strictEqual(res.id, this.sessionId);
|
||||||
|
assert(res.state !== undefined &&
|
||||||
|
typeof res.state === 'string');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get log of session', function itF(done) {
|
||||||
|
client.getSessionOrBatchLog('session', this.sessionId,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.strictEqual(res.id, this.sessionId);
|
||||||
|
assert(res.from !== undefined);
|
||||||
|
assert(res.total !== undefined);
|
||||||
|
assert(res.log !== undefined);
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('POST and GET Session Statements', function statements() {
|
describe('POST and GET Session Statements', () => {
|
||||||
before(done => {
|
before(function beforeE(done) {
|
||||||
client.postSession({}, (err, res) => {
|
client.postSession({}, (err, res) => {
|
||||||
assert.ifError(err);
|
assert.ifError(err);
|
||||||
this.sessionId = res.id;
|
this.sessionId = res.id;
|
||||||
|
@ -107,53 +127,56 @@ describe('LivyClient tests', function testClient() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
after(done => {
|
after(function afterE(done) {
|
||||||
client.deleteSession(this.sessionId, done);
|
client.deleteSessionOrBatch('session', this.sessionId, done);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should post a session statement with a quick program', done => {
|
it('should post a session statement with a quick program',
|
||||||
|
function itF(done) {
|
||||||
const codeToExecute = '2 + 2';
|
const codeToExecute = '2 + 2';
|
||||||
client.postStatement(this.sessionId, codeToExecute,
|
client.postStatement(this.sessionId, codeToExecute,
|
||||||
(err, res) => {
|
(err, res) => {
|
||||||
this.quickStatementId = res.id;
|
|
||||||
assert.ifError(err);
|
assert.ifError(err);
|
||||||
assert.deepStrictEqual(res, { id: this.quickStatementId,
|
this.quickStatementId = res.id;
|
||||||
state: 'waiting',
|
assert.strictEqual(res.id, this.quickStatementId);
|
||||||
output: null });
|
assert.strictEqual(res.state, 'waiting');
|
||||||
|
assert.strictEqual(res.output, null);
|
||||||
|
assert.strictEqual(res.code, codeToExecute);
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should get a session statement with quick execution', done => {
|
it('should get a session statement with quick execution',
|
||||||
|
function itF(done) {
|
||||||
client.getStatement(this.sessionId, this.quickStatementId,
|
client.getStatement(this.sessionId, this.quickStatementId,
|
||||||
(err, res) => {
|
(err, res) => {
|
||||||
assert.ifError(err);
|
assert.ifError(err);
|
||||||
assert.deepStrictEqual(res,
|
assert.strictEqual(res.status, 'ok');
|
||||||
{ status: 'ok',
|
assert.strictEqual(res.execution_count, 0);
|
||||||
// eslint-disable-next-line
|
assert.deepStrictEqual(res.data,
|
||||||
execution_count: 0,
|
{ 'text/plain': 'res0: Int = 4' });
|
||||||
data: { 'text/plain': 'res0: Int = 4' } });
|
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should post a session statement with a long program', done => {
|
it('should post a session statement with a long program',
|
||||||
const codeToExecute = 'for(n <- 1 to 100000) ' +
|
function itF(done) {
|
||||||
|
const codeToExecute = 'for(n <- 1 to 1000000) ' +
|
||||||
'{ Range(2, n-1).filter(primeTester => n % primeTester == 0).' +
|
'{ Range(2, n-1).filter(primeTester => n % primeTester == 0).' +
|
||||||
'length == 0 }';
|
'length == 0 }';
|
||||||
client.postStatement(this.sessionId, codeToExecute,
|
client.postStatement(this.sessionId, codeToExecute,
|
||||||
(err, res) => {
|
(err, res) => {
|
||||||
this.longStatementId = res.id;
|
this.longStatementId = res.id;
|
||||||
assert.ifError(err);
|
assert.ifError(err);
|
||||||
assert.deepStrictEqual(res, { id: this.longStatementId,
|
assert.strictEqual(res.state, 'waiting');
|
||||||
state: 'waiting',
|
assert.strictEqual(res.output, null);
|
||||||
output: null });
|
assert.strictEqual(res.code, codeToExecute);
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should max out on retries getting a session statement with long ' +
|
it('should max out on retries getting a session statement with long ' +
|
||||||
'execution', done => {
|
'execution', function itF(done) {
|
||||||
client.getStatement(this.sessionId, this.longStatementId,
|
client.getStatement(this.sessionId, this.longStatementId,
|
||||||
err => {
|
err => {
|
||||||
assert(err);
|
assert(err);
|
||||||
|
@ -162,20 +185,23 @@ describe('LivyClient tests', function testClient() {
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
|
||||||
|
|
||||||
describe('GET batches', () => {
|
it('should cancel in-progress statement', function itF(done) {
|
||||||
it('should get active batches', done => {
|
const codeToExecute = 'for(n <- 1 to 1000000) ' +
|
||||||
client.getBatches(null, null, (err, res) => {
|
'{ Range(2, n-1).filter(primeTester => n % primeTester == 0).' +
|
||||||
assert.ifError(err);
|
'length == 0 }';
|
||||||
assert.deepStrictEqual(res, { from: 0, total: 0,
|
client.postStatement(this.sessionId, codeToExecute,
|
||||||
sessions: [] });
|
(err, res) => {
|
||||||
done();
|
client.cancelStatement(this.sessionId, res.id, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.strictequal(res.msg, 'canceled');
|
||||||
|
done();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('POST batch', function batches() {
|
describe('POST, GET, and DELETE batch', function batches() {
|
||||||
it('should post a jar file batch', done => {
|
it('should post a jar file batch', done => {
|
||||||
// Note for any test env: must add path to livy.conf whitelist
|
// Note for any test env: must add path to livy.conf whitelist
|
||||||
// livy.file.local-dir-whitelist=<insert path here>
|
// livy.file.local-dir-whitelist=<insert path here>
|
||||||
|
@ -190,14 +216,6 @@ describe('LivyClient tests', function testClient() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should delete a jar batch', done => {
|
|
||||||
client.deleteBatch(this.jarBatch, (err, res) => {
|
|
||||||
assert.ifError(err);
|
|
||||||
assert.deepStrictEqual(res, { msg: 'deleted' });
|
|
||||||
done();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should post a python file batch', done => {
|
it('should post a python file batch', done => {
|
||||||
// Note for any test env: must add path to livy.conf whitelist
|
// Note for any test env: must add path to livy.conf whitelist
|
||||||
// livy.file.local-dir-whitelist=<insert path here>
|
// livy.file.local-dir-whitelist=<insert path here>
|
||||||
|
@ -211,8 +229,98 @@ describe('LivyClient tests', function testClient() {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should get active batches', done => {
|
||||||
|
client.getSessionsOrBatches('batch', null, null, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert(res.from !== undefined);
|
||||||
|
assert(res.total !== undefined);
|
||||||
|
assert(res.sessions !== undefined);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get a jar batch by id', done => {
|
||||||
|
client.getSessionOrBatch('batch', this.jarBatch, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.strictEqual(res.id, this.jarBatch);
|
||||||
|
assert(res.state !== undefined &&
|
||||||
|
typeof res.state === 'string');
|
||||||
|
assert(res.appId !== undefined);
|
||||||
|
assert(res.appInfo !== undefined);
|
||||||
|
assert(res.log !== undefined);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get a python file batch by id', done => {
|
||||||
|
client.getSessionOrBatch('batch', this.pythonBatch, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.strictEqual(res.id, this.pythonBatch);
|
||||||
|
assert(res.state !== undefined &&
|
||||||
|
typeof res.state === 'string');
|
||||||
|
assert(res.appId !== undefined);
|
||||||
|
assert(res.appInfo !== undefined);
|
||||||
|
assert(res.log !== undefined);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get a jar batch state', done => {
|
||||||
|
client.getSessionOrBatchState('batch', this.jarBatch,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.strictEqual(res.id, this.jarBatch);
|
||||||
|
assert(res.state !== undefined && typeof
|
||||||
|
res.state === 'string');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get a python file batch state', done => {
|
||||||
|
client.getSessionOrBatchState('batch', this.jarBatch,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.strictEqual(res.id, this.jarBatch);
|
||||||
|
assert(res.state !== undefined && typeof
|
||||||
|
res.state === 'string');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get the log of a jar batch', done => {
|
||||||
|
client.getSessionOrBatchLog('batch', this.jarBatch, (err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.strictEqual(res.id, this.jarBatch);
|
||||||
|
assert(res.log !== undefined);
|
||||||
|
assert(res.from !== undefined && Number.isInteger(res.total));
|
||||||
|
assert(res.total !== undefined && Number.isInteger(res.total));
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get the log of a python file batch', done => {
|
||||||
|
client.getSessionOrBatchLog('batch', this.pythonBatch,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.strictEqual(res.id, this.pythonBatch);
|
||||||
|
assert(res.log !== undefined);
|
||||||
|
assert(res.from !== undefined && Number.isInteger(res.total));
|
||||||
|
assert(res.total !== undefined && Number.isInteger(res.total));
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should delete a python file batch', done => {
|
||||||
|
client.deleteSessionOrBatch('batch', this.pythonBatch,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepStrictEqual(res, { msg: 'deleted' });
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it('should delete a jar batch', done => {
|
it('should delete a jar batch', done => {
|
||||||
client.deleteBatch(this.pythonBatch, (err, res) => {
|
client.deleteSessionOrBatch('batch', this.jarBatch, (err, res) => {
|
||||||
assert.ifError(err);
|
assert.ifError(err);
|
||||||
assert.deepStrictEqual(res, { msg: 'deleted' });
|
assert.deepStrictEqual(res, { msg: 'deleted' });
|
||||||
done();
|
done();
|
||||||
|
|
|
@ -10,8 +10,6 @@ const routeOPTIONS = require('./routes/routeOPTIONS');
|
||||||
const routesUtils = require('./routesUtils');
|
const routesUtils = require('./routesUtils');
|
||||||
const routeWebsite = require('./routes/routeWebsite');
|
const routeWebsite = require('./routes/routeWebsite');
|
||||||
|
|
||||||
const ipCheck = require('../ipCheck');
|
|
||||||
|
|
||||||
const routeMap = {
|
const routeMap = {
|
||||||
GET: routeGET,
|
GET: routeGET,
|
||||||
PUT: routePUT,
|
PUT: routePUT,
|
||||||
|
@ -113,12 +111,6 @@ function checkTypes(req, res, params, logger) {
|
||||||
});
|
});
|
||||||
assert.strictEqual(typeof params.dataRetrievalFn, 'function',
|
assert.strictEqual(typeof params.dataRetrievalFn, 'function',
|
||||||
'bad routes param: dataRetrievalFn must be a defined function');
|
'bad routes param: dataRetrievalFn must be a defined function');
|
||||||
assert(Array.isArray(params.whiteListedIps),
|
|
||||||
'bad routes param: whiteListedIps must be an array');
|
|
||||||
params.whiteListedIps.forEach(ip => {
|
|
||||||
assert.strictEqual(typeof ip, 'string',
|
|
||||||
'bad routes param: each item in whiteListedIps must be a string');
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** routes - route request to appropriate method
|
/** routes - route request to appropriate method
|
||||||
|
@ -138,8 +130,6 @@ function checkTypes(req, res, params, logger) {
|
||||||
* @param {object} params.unsupportedQueries - object containing true/false
|
* @param {object} params.unsupportedQueries - object containing true/false
|
||||||
* values for whether queries are supported
|
* values for whether queries are supported
|
||||||
* @param {function} params.dataRetrievalFn - function to retrieve data
|
* @param {function} params.dataRetrievalFn - function to retrieve data
|
||||||
* @param {string []} params.whiteListedIps - list of ip addresses
|
|
||||||
* that have special rights with respect to the METADATA bucket
|
|
||||||
* @param {RequestLogger} logger - werelogs logger instance
|
* @param {RequestLogger} logger - werelogs logger instance
|
||||||
* @returns {undefined}
|
* @returns {undefined}
|
||||||
*/
|
*/
|
||||||
|
@ -233,19 +223,6 @@ function routes(req, res, params, logger) {
|
||||||
return routesUtils.responseXMLBody(bucketOrKeyError, null, res, log);
|
return routesUtils.responseXMLBody(bucketOrKeyError, null, res, log);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Need better solution to identify request from backbeat/clueso
|
|
||||||
if (req.bucketName === 'METADATA' &&
|
|
||||||
(req.method === 'PUT' ||
|
|
||||||
req.method === 'POST' ||
|
|
||||||
req.method === 'DELETE')) {
|
|
||||||
// TODO: handle based on admin roles!!!!
|
|
||||||
if (false) {
|
|
||||||
return routesUtils.responseXMLBody(
|
|
||||||
errors.AccessDenied.customizeDescription('The bucket METADATA is ' +
|
|
||||||
'used for internal purposes'), null, res, log);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// bucket website request
|
// bucket website request
|
||||||
if (websiteEndpoints && websiteEndpoints.indexOf(req.parsedHost) > -1) {
|
if (websiteEndpoints && websiteEndpoints.indexOf(req.parsedHost) > -1) {
|
||||||
return routeWebsite(req, res, api, log, statsClient, dataRetrievalFn);
|
return routeWebsite(req, res, api, log, statsClient, dataRetrievalFn);
|
||||||
|
|
|
@ -3,13 +3,7 @@
|
||||||
const fs = require('fs');
|
const fs = require('fs');
|
||||||
const crypto = require('crypto');
|
const crypto = require('crypto');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
let diskusage;
|
const diskusage = require('diskusage');
|
||||||
try {
|
|
||||||
diskusage = require('diskusage');
|
|
||||||
} catch (err) {
|
|
||||||
process.stdout.write('unable to install diskusage module',
|
|
||||||
{ error: err.message, stack: err.stack });
|
|
||||||
}
|
|
||||||
const werelogs = require('werelogs');
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
const errors = require('../../../errors');
|
const errors = require('../../../errors');
|
||||||
|
@ -295,10 +289,7 @@ class DataFileStore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
getDiskUsage(callback) {
|
getDiskUsage(callback) {
|
||||||
if (diskusage) {
|
diskusage.check(this.dataPath, callback);
|
||||||
return diskusage.check(this.dataPath, callback);
|
|
||||||
}
|
|
||||||
return callback();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,13 +6,7 @@ const uuid = require('uuid');
|
||||||
const level = require('level');
|
const level = require('level');
|
||||||
const sublevel = require('level-sublevel');
|
const sublevel = require('level-sublevel');
|
||||||
const debug = require('debug')('MetadataFileServer');
|
const debug = require('debug')('MetadataFileServer');
|
||||||
let diskusage;
|
const diskusage = require('diskusage');
|
||||||
try {
|
|
||||||
diskusage = require('diskusage');
|
|
||||||
} catch (err) {
|
|
||||||
process.stdout.write('unable to install diskusage module',
|
|
||||||
{ error: err.message, stack: err.stack });
|
|
||||||
}
|
|
||||||
const werelogs = require('werelogs');
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
const constants = require('../../../constants');
|
const constants = require('../../../constants');
|
||||||
|
@ -253,12 +247,7 @@ class MetadataFileServer {
|
||||||
vrp.get({ db: dbName, key, options },
|
vrp.get({ db: dbName, key, options },
|
||||||
env.requestLogger, cb);
|
env.requestLogger, cb);
|
||||||
},
|
},
|
||||||
getDiskUsage: (env, cb) => {
|
getDiskUsage: (env, cb) => diskusage.check(this.path, cb),
|
||||||
if (diskusage) {
|
|
||||||
return diskusage.check(this.path, cb);
|
|
||||||
}
|
|
||||||
return cb();
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
dbService.registerSyncAPI({
|
dbService.registerSyncAPI({
|
||||||
createReadStream:
|
createReadStream:
|
||||||
|
|
Loading…
Reference in New Issue