Compare commits

..

No commits in common. "e69db46e3203e117f48de6681baa1072b9057f29" and "c5b209904e1c8806648e08c1aa6be5aac82bde1b" have entirely different histories.

20 changed files with 87 additions and 239 deletions

View File

@ -3,13 +3,10 @@ FROM warp10io/warp10:2.6.0
ENV S6_VERSION 2.0.0.1
ENV WARP10_CONF_TEMPLATES ${WARP10_HOME}/conf.templates/standalone
ENV SENSISION_DATA_DIR /data/sensision
# Modify Warp 10 default config
ENV standalone.host 0.0.0.0
ENV standalone.port 4802
ENV warpscript.repository.directory /usr/local/share/warpscript
ENV warp.token.file /static.tokens
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
# ENV warpscript.extension.debug io.warp10.script.ext.debug.DebugWarpScriptExtension
@ -23,5 +20,5 @@ ADD https://dl.bintray.com/senx/maven/io/warp10/warp10-ext-protobuf/1.1.0-uberja
ADD ./images/warp10/s6 /etc
ADD ./warpscript /usr/local/share/warpscript
ADD ./images/warp10/static.tokens /
CMD /init

View File

@ -15,8 +15,3 @@ ensureDir "$WARP10_DATA_DIR/conf"
ensureDir "$WARP10_DATA_DIR/data/leveldb"
ensureDir "$WARP10_DATA_DIR/data/datalog"
ensureDir "$WARP10_DATA_DIR/data/datalog_done"
ensureDir "$SENSISION_DATA_DIR"
ensureDir "$SENSISION_DATA_DIR/logs"
ensureDir "$SENSISION_DATA_DIR/conf"
ensureDir "/var/run/sensision"

View File

@ -1,6 +1,5 @@
#!/usr/bin/with-contenv sh
echo "Installing warp 10 config"
for path in $WARP10_CONF_TEMPLATES/*; do
name="$(basename $path .template)"
if [ ! -f "$WARP10_DATA_DIR/conf/$name" ]; then
@ -8,7 +7,3 @@ for path in $WARP10_CONF_TEMPLATES/*; do
echo "Copied $name to $WARP10_DATA_DIR/conf/$name"
fi
done
echo "Installing sensision config"
cp ${SENSISION_HOME}/templates/sensision.template ${SENSISION_DATA_DIR}/conf/sensision.conf
cp ${SENSISION_HOME}/templates/log4j.properties.template ${SENSISION_DATA_DIR}/conf/log4j.properties

View File

@ -15,9 +15,3 @@ ensure_link "$WARP10_HOME/etc/conf.d" "$WARP10_DATA_DIR/conf"
ensure_link "$WARP10_HOME/leveldb" "$WARP10_DATA_DIR/data/leveldb"
ensure_link "$WARP10_HOME/datalog" "$WARP10_DATA_DIR/data/datalog"
ensure_link "$WARP10_HOME/datalog_done" "$WARP10_DATA_DIR/data/datalog_done"
ensure_link "$SENSISION_HOME/etc" "${SENSISION_DATA_DIR}/conf"
ensure_link "$SENSISION_HOME/logs" "${SENSISION_DATA_DIR}/logs"
ensure_link /var/run/sensision/metrics ${SENSISION_HOME}/metrics
ensure_link /var/run/sensision/targets ${SENSISION_HOME}/targets
ensure_link /var/run/sensision/queued ${SENSISION_HOME}/queued

View File

@ -1,9 +0,0 @@
#!/usr/bin/with-contenv sh
chmod 1733 "$SENSISION_HOME/metrics"
chmod 1733 "$SENSISION_HOME/targets"
chmod 700 "$SENSISION_HOME/queued"
sed -i 's/@warp:WriteToken@/'"writeTokenStatic"'/' $SENSISION_DATA_DIR/conf/sensision.conf
sed -i -e "s_^sensision\.home.*_sensision\.home = ${SENSISION_HOME}_" $SENSISION_DATA_DIR/conf/sensision.conf
sed -i -e 's_^sensision\.qf\.url\.default.*_sensision\.qf\.url\.default=http://127.0.0.1:4802/api/v0/update_' $SENSISION_DATA_DIR/conf/sensision.conf

View File

@ -1,26 +0,0 @@
#!/usr/bin/with-contenv sh
JAVA="/usr/bin/java"
JAVA_OPTS=""
VERSION=1.0.21
SENSISION_CONFIG=${SENSISION_DATA_DIR}/conf/sensision.conf
SENSISION_JAR=${SENSISION_HOME}/bin/sensision-${VERSION}.jar
SENSISION_CP=${SENSISION_HOME}/etc:${SENSISION_JAR}
SENSISION_CLASS=io.warp10.sensision.Main
export MALLOC_ARENA_MAX=1
if [ -z "$SENSISION_HEAP" ]; then
SENSISION_HEAP=64m
fi
SENSISION_CMD="${JAVA} ${JAVA_OPTS} -Xmx${SENSISION_HEAP} -Dsensision.server.port=0 ${SENSISION_OPTS} -Dsensision.config=${SENSISION_CONFIG} -cp ${SENSISION_CP} ${SENSISION_CLASS}"
if [ -n "$ENABLE_SENSISION" ]; then
echo "Starting Sensision with $SENSISION_CMD ..."
exec $SENSISION_CMD | tee -a ${SENSISION_HOME}/logs/sensision.log
else
echo "Sensision is disabled"
# wait indefinitely
exec tail -f /dev/null
fi

View File

@ -1,37 +1,13 @@
#!/usr/bin/with-contenv sh
export SENSISIONID=warp10
export MALLOC_ARENA_MAX=1
JAVA="/usr/bin/java"
WARP10_JAR=${WARP10_HOME}/bin/warp10-${WARP10_VERSION}.jar
WARP10_CLASS=io.warp10.standalone.Warp
WARP10_CP="${WARP10_HOME}/etc:${WARP10_JAR}:${WARP10_HOME}/lib/*"
WARP10_CONFIG_DIR="$WARP10_DATA_DIR/conf"
CONFIG_FILES="$(find ${WARP10_CONFIG_DIR} -not -path "*/\.*" -name "*.conf" | sort | tr '\n' ' ' 2> /dev/null)"
LOG4J_CONF=${WARP10_HOME}/etc/log4j.properties
if [ -z "$WARP10_HEAP" ]; then
WARP10_HEAP=1g
fi
if [ -z "$WARP10_HEAP_MAX" ]; then
WARP10_HEAP_MAX=1g
fi
JAVA_OPTS="-Djava.awt.headless=true -Xms${WARP10_HEAP} -Xmx${WARP10_HEAP_MAX} -XX:+UseG1GC ${JAVA_OPTS}"
SENSISION_OPTS=
if [ -n "$ENABLE_SENSISION" ]; then
_SENSISION_LABELS=
# Expects a comma seperated list of key=value ex key=value,foo=bar
if [ -n "$SENSISION_LABELS" ]; then
_SENSISION_LABELS="-Dsensision.default.labels=$SENSISION_LABELS"
fi
SENSISION_OPTS="-Dsensision.server.port=0 ${_SENSISION_LABELS} -Dsensision.events.dir=/var/run/sensision/metrics -Dfile.encoding=UTF-8"
fi
WARP10_CMD="${JAVA} -Dlog4j.configuration=file:${LOG4J_CONF} ${JAVA_OPTS} ${SENSISION_OPTS} -cp ${WARP10_CP} ${WARP10_CLASS} ${CONFIG_FILES}"
WARP10_CMD="${JAVA} ${JAVA_OPTS} -cp ${WARP10_CP} ${WARP10_CLASS} ${CONFIG_FILES}"
echo "Starting Warp 10 with $WARP10_CMD ..."
exec $WARP10_CMD | tee -a ${WARP10_HOME}/logs/warp10.log

View File

@ -1,9 +0,0 @@
token.write.0.name=writeTokenStatic
token.write.0.producer=42424242-4242-4242-4242-424242424242
token.write.0.owner=42424242-4242-4242-4242-424242424242
token.write.0.app=utapi
token.read.0.name=readTokenStatic
token.read.0.owner=42424242-4242-4242-4242-424242424242
token.read.0.app=utapi

View File

@ -32,7 +32,7 @@ def get_options():
parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name")
parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server")
parser.add_argument("-w", "--worker", default=10, help="Number of workers")
parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed")
parser.add_argument("-b", "--bucket", default=False, help="Bucket to be processed")
return parser.parse_args()
def chunks(iterable, size):
@ -119,7 +119,7 @@ class BucketDClient:
else:
is_truncated = len(payload) > 0
def list_buckets(self, name = None):
def list_buckets(self):
def get_next_marker(p):
if p is None:
@ -135,14 +135,8 @@ class BucketDClient:
buckets = []
for result in payload['Contents']:
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups())
if name is None or bucket.name == name:
buckets.append(bucket)
if buckets:
buckets.append(Bucket(*match.groups()))
yield buckets
if name is not None:
# Break on the first matching bucket if a name is given
break
def list_mpus(self, bucket):
@ -334,15 +328,12 @@ def log_report(resource, name, obj_count, total_size):
if __name__ == '__main__':
options = get_options()
if options.bucket is not None and not options.bucket.strip():
print('You must provide a bucket name with the --bucket flag')
sys.exit(1)
bucket_client = BucketDClient(options.bucketd_addr)
redis_client = get_redis_client(options)
account_reports = {}
observed_buckets = set()
with ThreadPoolExecutor(max_workers=options.worker) as executor:
for batch in bucket_client.list_buckets(options.bucket):
for batch in bucket_client.list_buckets():
bucket_reports = {}
jobs = [executor.submit(index_bucket, bucket_client, b) for b in batch]
for job in futures.as_completed(jobs):
@ -358,25 +349,6 @@ if __name__ == '__main__':
log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute()
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
if options.bucket is None:
stale_buckets = recorded_buckets.difference(observed_buckets)
elif observed_buckets and options.bucket in recorded_buckets:
# The provided bucket does not exist, so clean up any metrics
stale_buckets = { options.bucket }
else:
stale_buckets = set()
_log.info('Found %s stale buckets' % len(stale_buckets))
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
update_redis(pipeline, 'buckets', bucket, 0, 0)
log_report('buckets', bucket, 0, 0)
pipeline.execute()
# Account metrics are not updated if a bucket is specified
if options.bucket is None:
# Update total account reports in chunks
for chunk in chunks(account_reports.items(), ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
@ -387,6 +359,7 @@ if __name__ == '__main__':
observed_accounts = set(account_reports.keys())
recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts'))
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = recorded_accounts.difference(observed_accounts)
@ -397,3 +370,12 @@ if __name__ == '__main__':
update_redis(pipeline, 'accounts', account, 0, 0)
log_report('accounts', account, 0, 0)
pipeline.execute()
stale_buckets = recorded_buckets.difference(observed_buckets)
_log.info('Found %s stale buckets' % len(stale_buckets))
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
update_redis(pipeline, 'buckets', bucket, 0, 0)
log_report('buckets', bucket, 0, 0)
pipeline.execute()

View File

@ -103,26 +103,29 @@ class RedisClient extends EventEmitter {
this.emit('error', error);
}
_createCommandTimeout() {
let timer;
let onTimeout;
const cancelTimeout = jsutil.once(() => {
clearTimeout(timer);
this.off('timeout', onTimeout);
this._inFlightTimeouts.delete(timer);
});
const timeout = new Promise((_, reject) => {
timer = setTimeout(this.emit.bind(this, 'timeout'), COMMAND_TIMEOUT);
timer = setTimeout(
() => {
this.emit('timeout');
this._initClient();
},
COMMAND_TIMEOUT,
);
this._inFlightTimeouts.add(timer);
onTimeout = () => {
this.once('timeout', () => {
moduleLogger.warn('redis command timed out');
cancelTimeout();
this._initClient();
reject(errors.OperationTimedOut);
};
this.once('timeout', onTimeout);
});
});
return { timeout, cancelTimeout };

View File

@ -26,8 +26,8 @@ async function listMetric(ctx, params) {
// A separate request will be made to warp 10 per requested resource
const results = await Promise.all(
resources.map(async ({ resource, id }) => {
const labels = { [labelName]: id };
resources.map(async resource => {
const labels = { [labelName]: resource };
const options = {
params: {
start,

View File

@ -5,7 +5,7 @@ const { ipCheck } = require('arsenal');
const config = require('../config');
const { logger, buildRequestLogger } = require('../utils');
const errors = require('../errors');
const { translateAndAuthorize } = require('../vault');
const { authenticateRequest, vault } = require('../vault');
const oasOptions = {
controllers: path.join(__dirname, './API/'),
@ -44,17 +44,6 @@ function loggerMiddleware(req, res, next) {
return next();
}
function responseLoggerMiddleware(req, res, next) {
const info = {
httpCode: res.statusCode,
httpMessage: res.statusMessage,
};
req.logger.end('finished handling request', info);
if (next !== undefined) {
next();
}
}
// next is purposely not called as all error responses are handled here
// eslint-disable-next-line no-unused-vars
function errorMiddleware(err, req, res, next) {
@ -81,7 +70,15 @@ function errorMiddleware(err, req, res, next) {
message,
},
});
responseLoggerMiddleware(req, res);
}
function responseLoggerMiddleware(req, res, next) {
const info = {
httpCode: res.statusCode,
httpMessage: res.statusMessage,
};
req.logger.end('finished handling request', info);
return next();
}
// eslint-disable-next-line no-unused-vars
@ -114,7 +111,7 @@ async function authV4Middleware(request, response, params) {
let authorizedResources;
try {
[passed, authorizedResources] = await translateAndAuthorize(request, action, params.level, requestedResources);
[passed, authorizedResources] = await authenticateRequest(request, action, params.level, requestedResources);
} catch (error) {
request.logger.error('error during authentication', { error });
throw errors.InternalError;
@ -125,14 +122,17 @@ async function authV4Middleware(request, response, params) {
throw errors.AccessDenied;
}
switch (request.ctx.operationId) {
case 'listMetrics':
params.body[params.level] = authorizedResources;
break;
if (params.level === 'accounts') {
request.logger.debug('converting account ids to canonical ids');
authorizedResources = await vault.getCanonicalIds(
authorizedResources,
request.logger.logger,
);
}
default:
[params.resource] = authorizedResources;
break;
// authorizedResources is only defined on non-account credentials
if (request.ctx.operationId === 'listMetrics' && authorizedResources !== undefined) {
params.body[params.level] = authorizedResources;
}
}

View File

@ -2,7 +2,6 @@ const assert = require('assert');
const { auth, policies } = require('arsenal');
const vaultclient = require('vaultclient');
const config = require('./config');
const errors = require('./errors');
/**
@class Vault
@ -84,14 +83,7 @@ class Vault {
reject(err);
return;
}
if (!res.message || !res.message.body) {
reject(errors.InternalError);
return;
}
resolve(res.message.body.map(acc => ({
resource: acc.accountId,
id: acc.canonicalId,
})));
resolve(res);
}));
}
}
@ -99,14 +91,6 @@ class Vault {
const vault = new Vault(config);
auth.setHandler(vault);
async function translateResourceIds(level, resources, log) {
if (level === 'accounts') {
return vault.getCanonicalIds(resources, log);
}
return resources.map(resource => ({ resource, id: resource }));
}
async function authenticateRequest(request, action, level, resources) {
const policyContext = new policies.RequestContext(
request.headers,
@ -130,11 +114,10 @@ async function authenticateRequest(request, action, level, resources) {
return;
}
// Will only have res if request is from a user rather than an account
let authorizedResources = resources;
if (res) {
try {
authorizedResources = res.reduce(
(authed, result) => {
const authorizedResources = (res || [])
.reduce((authed, result) => {
if (result.isAllowed) {
// result.arn should be of format:
// arn:scality:utapi:::resourcetype/resource
@ -145,32 +128,24 @@ async function authenticateRequest(request, action, level, resources) {
request.logger.trace('access granted for resource', { resource });
}
return authed;
}, [],
);
}, []);
resolve([
authorizedResources.length !== 0,
authorizedResources,
]);
} catch (err) {
reject(err);
}
} else {
request.logger.trace('granted access to all resources');
resolve([true]);
}
resolve([
authorizedResources.length !== 0,
authorizedResources,
]);
}, 's3', [policyContext]);
});
}
async function translateAndAuthorize(request, action, level, resources) {
const [authed, authorizedResources] = await authenticateRequest(request, action, level, resources);
const translated = await translateResourceIds(level, authorizedResources, request.logger.logger);
return [authed, translated];
}
module.exports = {
authenticateRequest,
translateAndAuthorize,
Vault,
vault,
};

View File

@ -38,16 +38,14 @@ describe('Test middleware', () => {
});
describe('test errorMiddleware', () => {
let req;
let resp;
beforeEach(() => {
req = templateRequest();
resp = new ExpressResponseStub();
});
it('should set a default code and message', () => {
middleware.errorMiddleware({}, req, resp);
middleware.errorMiddleware({}, null, resp);
assert.strictEqual(resp._status, 500);
assert.deepStrictEqual(resp._body, {
error: {
@ -58,7 +56,7 @@ describe('Test middleware', () => {
});
it('should set the correct info from an error', () => {
middleware.errorMiddleware({ code: 123, message: 'Hello World!', utapiError: true }, req, resp);
middleware.errorMiddleware({ code: 123, message: 'Hello World!', utapiError: true }, null, resp);
assert.deepStrictEqual(resp._body, {
error: {
code: '123',
@ -68,7 +66,7 @@ describe('Test middleware', () => {
});
it("should replace an error's message if it's internal and not in development mode", () => {
middleware.errorMiddleware({ code: 123, message: 'Hello World!' }, req, resp);
middleware.errorMiddleware({ code: 123, message: 'Hello World!' }, null, resp);
assert.deepStrictEqual(resp._body, {
error: {
code: '123',
@ -76,16 +74,5 @@ describe('Test middleware', () => {
},
});
});
it('should call responseLoggerMiddleware after response', () => {
const spy = sinon.spy();
req.logger.end = spy;
resp.statusMessage = 'Hello World!';
middleware.errorMiddleware({ code: 123 }, req, resp);
assert(spy.calledOnceWith('finished handling request', {
httpCode: 123,
httpMessage: 'Hello World!',
}));
});
});
});

View File

@ -8,7 +8,7 @@ function decode(type, data, includeDefaults = true) {
if (!Type) {
throw new Error(`Unknown type ${type}`);
}
const msg = Type.decode(Buffer.from(data));
const msg = Type.decode(Buffer.from(data, 'hex'));
return Type.toObject(msg, {
longs: Number,
defaults: includeDefaults,

View File

@ -7,10 +7,6 @@ class ExpressResponseStub {
this._redirect = null;
}
get statusCode() {
return this._status;
}
status(code) {
this._status = code;
return this;

View File

@ -17,22 +17,18 @@ message Event {
'>
PROTOC 'proto' STORE
<%
'iso8859-1' ->BYTES
!$proto 'Event' PB->
HEX-> !$proto 'Event' PB->
%>
'macro' STORE
'0a0b6d794f7065726174696f6e32096d796163636f756e74480150ffffffffffffffffff01'
HEX-> 'iso8859-1' BYTES-> @macro
'0a0b6d794f7065726174696f6e32096d796163636f756e74480150ffffffffffffffffff01' @macro
DUP 'op' GET 'myOperation' == ASSERT
DUP 'acc' GET 'myaccount' == ASSERT
DUP 'objD' GET 1 == ASSERT
'sizeD' GET -1 == ASSERT
'0a0568656c6c6f120568656c6c6f1a0568656c6c6f220568656c6c6f2a0568656c6c6f320568656c6c6f3a0568656c6c6f420568656c6c6f4801500158016000'
HEX-> 'iso8859-1' BYTES-> @macro
'0a0568656c6c6f120568656c6c6f1a0568656c6c6f220568656c6c6f2a0568656c6c6f320568656c6c6f3a0568656c6c6f420568656c6c6f4801500158016000' @macro
DUP "op" GET "hello" == ASSERT
DUP "id" GET "hello" == ASSERT
DUP "bck" GET "hello" == ASSERT

View File

@ -33,8 +33,7 @@ PROTOC 'proto' STORE
!$info INFO
SAVE 'context' STORE
<%
'iso8859-1' ->BYTES
!$proto 'Record' PB->
HEX-> !$proto 'Record' PB->
%>
<% // catch any exception
RETHROW
@ -46,7 +45,7 @@ PROTOC 'proto' STORE
'macro' STORE
// Unit tests
'081e101e181e20002a0d0a097075744f626a656374101e' HEX-> 'iso8859-1' BYTES->
'081e101e181e20002a0d0a097075744f626a656374101e'
@macro
{ 'outB' 0 'ops' { 'putObject' 30 } 'sizeD' 30 'inB' 30 'objD' 30 } == ASSERT

View File

@ -17,7 +17,7 @@ message Event {
'>
PROTOC 'proto' STORE
<%
!$proto 'Event' ->PB
!$proto 'Event' ->PB ->HEX
%>
'macro' STORE
@ -28,7 +28,6 @@ PROTOC 'proto' STORE
'sizeD' -1
} @macro
->HEX
'0a0b6d794f7065726174696f6e32096d796163636f756e74480150ffffffffffffffffff01' == ASSERT
{
@ -46,7 +45,6 @@ PROTOC 'proto' STORE
"outB" 0
} @macro
->HEX
'0a0568656c6c6f120568656c6c6f1a0568656c6c6f220568656c6c6f2a0568656c6c6f320568656c6c6f3a0568656c6c6f420568656c6c6f4801500158016000' == ASSERT
$macro

View File

@ -34,7 +34,7 @@ PROTOC 'proto' STORE
!$info INFO
SAVE 'context' STORE
<%
!$proto 'Record' ->PB
!$proto 'Record' ->PB ->HEX
%>
<% // catch any exception
RETHROW
@ -49,7 +49,6 @@ PROTOC 'proto' STORE
{ 'outB' 0 'ops' { 'putObject' 30 } 'sizeD' 30 'inB' 30 'objD' 30 }
@macro
->HEX
'081e101e181e20002a0d0a097075744f626a656374101e' == ASSERT
$macro