Compare commits

...

3 Commits

Author SHA1 Message Date
Taylor McKinnon 1c1a4a945d stash 2020-11-09 12:42:56 -08:00
Taylor McKinnon 0c512c9e70 stash 2020-11-04 17:01:57 -08:00
Taylor McKinnon f6575f32e6 stash 2020-11-03 21:52:55 -08:00
7 changed files with 378 additions and 17 deletions

14
bin/downsample.js Normal file
View File

@ -0,0 +1,14 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const logger = new LoggerContext({
task: 'Downsample',
});
const task = new tasks.DownsampleTask();
task.setup()
.then(() => logger.info('Starting Downsample daemon'))
.then(() => task.start())
.then(() => logger.info('Downsample started'));

View File

@ -12,6 +12,8 @@ ENV warpscript.repository.directory /usr/local/share/warpscript
ENV warp.token.file /static.tokens
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
ENV warpscript.extension.concurrent 'io.warp10.script.ext.concurrent.ConcurrentWarpScriptExtension'
# ENV warpscript.extension.debug io.warp10.script.ext.debug.DebugWarpScriptExtension
RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_VERSION}/s6-overlay-amd64.tar.gz -O /tmp/s6-overlay-amd64.tar.gz \

41
libV2/tasks/Downsample.js Normal file
View File

@ -0,0 +1,41 @@
const BaseTask = require('./BaseTask');
const config = require('../config');
const { LoggerContext } = require('../utils');
const { downsampleLagSecs, indexedEventFields } = require('../constants');
const logger = new LoggerContext({
module: 'Repair',
});
class DownsampleTask extends BaseTask {
constructor(options) {
super({
warp10: {
requestTimeout: 30000,
connectTimeout: 30000,
},
...options,
});
this._defaultSchedule = config.downsampleSchedule;
this._defaultLag = downsampleLagSecs;
}
async _execute(timestamp) {
logger.debug('Downsampling records', { timestamp, nodeId: this.nodeId });
const params = {
params: {
nodeId: this.nodeId,
end: timestamp.toString(),
fields: indexedEventFields,
},
macro: 'utapi/repairRecords',
};
const status = await this._warp10.exec(params);
if (status.result[0]) {
logger.info(`created ${status.result[0]} corrections`);
}
}
}
module.exports = DownsampleTask;

View File

@ -0,0 +1,105 @@
{
'name' 'downsampleRecords'
'desc'
<'
'>
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
'params' {
// Signature params description
'a' 'Map containing read/write tokens'
'o' 'Map containing operation info'
'c' 'List of created checkpoints'
}
'examples' [
<'
'>
]
} 'info' STORE
<%
!$info INFO
SAVE 'context' STORE
<%
'Downsampling records' LOGMSG
JSON-> 'operation_info' STORE
JSON-> 'auth_info' STORE
$auth_info 'write' GET 'write_token' STORE
$auth_info 'read' GET 'read_token' STORE
$operation_info 'fields' GET 'fieldsToIndex' STORE
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
$operation_info 'size' GET 'sampleSize' STORE
'utapi.downsample.master' 'master_downsample_class' STORE
'utapi.snapshot' 'snapshot_class' STORE
'utapi.downsample.stage1' 'sample_class' STORE
$read_token $master_downsample_class {} $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master downsample
FIRSTTICK 'startTimestamp' STORE // Grab our starting timestamp from the last downsample (0 if no downsamples)
<% $startTimestamp 0 == %>
<%
'No previous timestamp found, calculating starting point' LOGMSG
$auth_info ->JSON
{
'class' 'utapi.event'
'end' $operation_info 'end' GET
'labels' {}
} ->JSON @utapi/findOldestRecord
'startTimestamp' STORE
%> IFT
'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
[] 'created' STORE
$fieldsToIndex
<%
'field' STORE
[
$read_token
$snapshot_class
{ $field '~.*' }
] FINDSETS
DROP SWAP DROP $field GET
DUP ->JSON ' ' + $field + LOGMSG
<% DUP ISNULL %>
<% DROP CONTINUE %> IFT
'resources' STORE
[] 'macros' STORE
$resources
<%
DROP
<%
1 - 'idx' STORE
$idx TOSTRING LOGMSG
$auth_info ->JSON
{
'start' $startTimestamp
'end' $operation_info 'end' GET
'labels' { $field $resources $idx GET }
'size' $sampleSize
'sampleClass' $sample_class
} ->JSON @utapi/sampleMetrics
%> 'macro' STORE
$macros $macro +! DROP
%> FOREACH
$macros 10 CEVAL
$created SWAP +! DROP
'test' LOGMSG
%> FOREACH
$created
%>
<% // catch any exception
RETHROW
%>
<% // finally, restore the context
$context RESTORE
%> TRY
%>
'macro' STORE
// Unit tests
$macro

View File

@ -0,0 +1,130 @@
{
'name' 'findOldestRecord'
'desc'
<'
'>
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
'params' {
// Signature params description
'a' 'Map containing read/write tokens'
'o' 'Map containing operation info'
'r' 'Timestamp of oldest record found or NULL if no records exist'
}
'examples' [
<'
'>
]
} 'info' STORE
<%
!$info INFO
SAVE 'context' STORE
<%
JSON-> 'operation_info' STORE
JSON-> 'auth_info' STORE
$auth_info 'write' GET 'write_token' STORE
$auth_info 'read' GET 'read_token' STORE
$operation_info 'class' GET 'className' STORE
// $operation_info 'end' GET TOLONG 'endTimestamp' STORE
$operation_info 'labels' GET 'filterLabels' STORE
'Finding oldest record for ' $className + $filterLabels ->JSON + LOGMSG
{
'token' $read_token
'class' $className
'labels' $filterLabels
'end' MINLONG
'count' 0
'boundary.pre' 1
} FETCH
DUP ->JSON LOGMSG
//ISO-8601 Duration Format PwWdDThHmMsS
// w weeks
// d days
// h hours
// m minutes
// s or s.ssssss... seconds
// [ 'P-1D' 'PT-12H' 'PT-6H' 'PT-1H' 'PT-15M' 'PT-5M' 'PT-1M' 'PT-30S' ] 'steps' STORE
// 0 'currentStep' STORE
// false 'foundRecords' STORE
// $endTimestamp 'currentTimestamp' STORE
// $currentTimestamp TOSTRING LOGMSG
// $currentTimestamp
// <%
// $steps $currentStep GET ADDDURATION 'checkTimestamp' STORE
// $checkTimestamp TOSTRING LOGMSG
// %>
// <%
// <% DUP SIZE 0 == %>
// <%
// DROP // Drop empty GTS
// 'Found 0 records for timestamp' $currentTimestamp TOSTRING + LOGMSG
// <% $currentStep $steps SIZE 1 - >= %>
// <%
// 'At final step, exiting loop' LOGMSG
// $currentTimestamp
// true
// %>
// <%
// 'Moving to smaller step and retrying timestamp' LOGMSG
// $currentStep 1 + 'currentStep' STORE
// $currentTimestamp
// false
// %> IFTE
// %>
// <%
// 'Found records before ' $currentTimestamp TOSTRING + ' Advancing currentTimestamp' + LOGMSG
// FIRSTTICK 'currentTimestamp' STORE
// true 'foundRecords' STORE
// $currentTimestamp
// false
// %> IFTE
// %> UNTIL
// 'currentTimestamp' STORE
// 'currentTimestamp ' $currentTimestamp TOSTRING + LOGMSG
// <% $foundRecords ! %>
// <% NULL %>
// <%
// <%
// $currentTimestamp TOSTRING LOGMSG
// {
// 'token' $read_token
// 'class' $className
// 'labels' $filterLabels
// 'end' $currentTimestamp 1 -
// 'count' 1000
// } FETCH
// %>
// <%
// <% DUP SIZE 0 == %>
// <% DROP true %>
// <%
// FIRSTTICK 'currentTimestamp' STORE
// false
// %> IFTE
// %> UNTIL
// $currentTimestamp
// %> IFTE
%>
<% // catch any exception
RETHROW
%>
<% // finally, restore the context
$context RESTORE
%> TRY
%>
'macro' STORE
// Unit tests
$macro

View File

@ -34,7 +34,9 @@
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
$operation_info 'labels' GET 'labels' STORE
$operation_info 'node' GET 'nodeID' STORE
<% $operation_info 'node' CONTAINSKEY SWAP DROP %>
<% $operation_info 'node' GET 'nodeID' STORE %> IFT
// Raise the max operations for a executing script
// $read_token AUTHENTICATE
@ -259,23 +261,26 @@
%> FOREACH
'load_reindex' SECTION
// Only load the latest reindex for the current node
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
{
'token' $read_token
'class' $reindex_class
'labels' $filterLabels
'end' $endTimestamp
'count' 1
} FETCH
<% // Handle multiple GTS
VALUES
<% // For each reindex correction
@utapi/decodeRecord
// DUP 'Loaded reindex correction ' SWAP ->JSON + LOGMSG
$results @util/sumRecord 'results' STORE
<% 'nodeID' DEFINED %>
<%
// Only load the latest reindex for the current node
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
{
'token' $read_token
'class' $reindex_class
'labels' $filterLabels
'end' $endTimestamp
'count' 1
} FETCH
<% // Handle multiple GTS
VALUES
<% // For each reindex correction
@utapi/decodeRecord
// DUP 'Loaded reindex correction ' SWAP ->JSON + LOGMSG
$results @util/sumRecord 'results' STORE
%> FOREACH
%> FOREACH
%> FOREACH
%> IFT
$results // Leave results on the stack

View File

@ -0,0 +1,64 @@
{
'name' 'sampleMetrics'
'desc'
<'
Sample metrics at the given interval using the provided labels
'>
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
'params' {
// Signature params description
'a' 'Map containing read/write tokens'
'o' 'Map containing operation info'
'r' 'Generated metrics GTS'
}
'examples' [
<'
'>
]
} 'info' STORE
<%
!$info INFO
SAVE 'context' STORE
<%
JSON-> 'operation_info' STORE
'auth_info_json' STORE
$auth_info_json JSON-> 'auth_info' STORE
$auth_info 'write' GET 'write_token' STORE
$auth_info 'read' GET 'read_token' STORE
$operation_info 'start' GET TOLONG 'startTimestamp' STORE
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
$operation_info 'labels' GET 'filterLabels' STORE
$operation_info 'sampleClass' GET 'sampleClass' STORE
$operation_info 'size' GET 'sampleSize' STORE
'Sampling records from ' $startTimestamp TOSTRING + ' - ' + $endTimestamp TOSTRING + ' ' + $filterLabels ->JSON + LOGMSG
$auth_info_json { 'labels' $filterLabels 'end' $startTimestamp } ->JSON @utapi/getMetricsAt 'metric' STORE
$startTimestamp $sampleSize ADDDURATION 'currentTimestamp' STORE
NEWGTS $sampleClass RENAME
$filterLabels RELABEL
<% $currentTimestamp $sampleSize ADDDURATION $endTimestamp <= %>
<%
'Sampling metrics for ' $filterLabels ->JSON + ' at ' + $currentTimestamp TOSTRING + LOGMSG
$auth_info_json { 'labels' $filterLabels 'end' $startTimestamp 'prev' $metric } ->JSON @utapi/getMetricsAt 'metric' STORE
$currentTimestamp NaN NaN NaN $metric @utapi/encodeRecord ADDVALUE
$currentTimestamp $sampleSize ADDDURATION 'currentTimestamp' STORE
%>
WHILE
%>
<% // catch any exception
RETHROW
%>
<% // finally, restore the context
$context RESTORE
%> TRY
%>
'macro' STORE
// Unit tests
$macro