Compare commits
3 Commits
developmen
...
dev/downsa
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 1c1a4a945d | |
Taylor McKinnon | 0c512c9e70 | |
Taylor McKinnon | f6575f32e6 |
|
@ -0,0 +1,14 @@
|
||||||
|
const { tasks } = require('..');
|
||||||
|
const { LoggerContext } = require('../libV2/utils');
|
||||||
|
|
||||||
|
const logger = new LoggerContext({
|
||||||
|
task: 'Downsample',
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
const task = new tasks.DownsampleTask();
|
||||||
|
|
||||||
|
task.setup()
|
||||||
|
.then(() => logger.info('Starting Downsample daemon'))
|
||||||
|
.then(() => task.start())
|
||||||
|
.then(() => logger.info('Downsample started'));
|
|
@ -12,6 +12,8 @@ ENV warpscript.repository.directory /usr/local/share/warpscript
|
||||||
ENV warp.token.file /static.tokens
|
ENV warp.token.file /static.tokens
|
||||||
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
|
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
|
||||||
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
|
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
|
||||||
|
ENV warpscript.extension.concurrent 'io.warp10.script.ext.concurrent.ConcurrentWarpScriptExtension'
|
||||||
|
|
||||||
# ENV warpscript.extension.debug io.warp10.script.ext.debug.DebugWarpScriptExtension
|
# ENV warpscript.extension.debug io.warp10.script.ext.debug.DebugWarpScriptExtension
|
||||||
|
|
||||||
RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_VERSION}/s6-overlay-amd64.tar.gz -O /tmp/s6-overlay-amd64.tar.gz \
|
RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_VERSION}/s6-overlay-amd64.tar.gz -O /tmp/s6-overlay-amd64.tar.gz \
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
const BaseTask = require('./BaseTask');
|
||||||
|
const config = require('../config');
|
||||||
|
const { LoggerContext } = require('../utils');
|
||||||
|
const { downsampleLagSecs, indexedEventFields } = require('../constants');
|
||||||
|
|
||||||
|
const logger = new LoggerContext({
|
||||||
|
module: 'Repair',
|
||||||
|
});
|
||||||
|
|
||||||
|
class DownsampleTask extends BaseTask {
|
||||||
|
constructor(options) {
|
||||||
|
super({
|
||||||
|
warp10: {
|
||||||
|
requestTimeout: 30000,
|
||||||
|
connectTimeout: 30000,
|
||||||
|
},
|
||||||
|
...options,
|
||||||
|
});
|
||||||
|
this._defaultSchedule = config.downsampleSchedule;
|
||||||
|
this._defaultLag = downsampleLagSecs;
|
||||||
|
}
|
||||||
|
|
||||||
|
async _execute(timestamp) {
|
||||||
|
logger.debug('Downsampling records', { timestamp, nodeId: this.nodeId });
|
||||||
|
|
||||||
|
const params = {
|
||||||
|
params: {
|
||||||
|
nodeId: this.nodeId,
|
||||||
|
end: timestamp.toString(),
|
||||||
|
fields: indexedEventFields,
|
||||||
|
},
|
||||||
|
macro: 'utapi/repairRecords',
|
||||||
|
};
|
||||||
|
const status = await this._warp10.exec(params);
|
||||||
|
if (status.result[0]) {
|
||||||
|
logger.info(`created ${status.result[0]} corrections`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = DownsampleTask;
|
|
@ -0,0 +1,105 @@
|
||||||
|
{
|
||||||
|
'name' 'downsampleRecords'
|
||||||
|
'desc'
|
||||||
|
<'
|
||||||
|
|
||||||
|
'>
|
||||||
|
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
||||||
|
'params' {
|
||||||
|
// Signature params description
|
||||||
|
'a' 'Map containing read/write tokens'
|
||||||
|
'o' 'Map containing operation info'
|
||||||
|
'c' 'List of created checkpoints'
|
||||||
|
}
|
||||||
|
'examples' [
|
||||||
|
<'
|
||||||
|
|
||||||
|
'>
|
||||||
|
]
|
||||||
|
} 'info' STORE
|
||||||
|
|
||||||
|
<%
|
||||||
|
!$info INFO
|
||||||
|
SAVE 'context' STORE
|
||||||
|
<%
|
||||||
|
'Downsampling records' LOGMSG
|
||||||
|
JSON-> 'operation_info' STORE
|
||||||
|
JSON-> 'auth_info' STORE
|
||||||
|
|
||||||
|
$auth_info 'write' GET 'write_token' STORE
|
||||||
|
$auth_info 'read' GET 'read_token' STORE
|
||||||
|
|
||||||
|
$operation_info 'fields' GET 'fieldsToIndex' STORE
|
||||||
|
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||||
|
$operation_info 'size' GET 'sampleSize' STORE
|
||||||
|
|
||||||
|
'utapi.downsample.master' 'master_downsample_class' STORE
|
||||||
|
'utapi.snapshot' 'snapshot_class' STORE
|
||||||
|
'utapi.downsample.stage1' 'sample_class' STORE
|
||||||
|
|
||||||
|
$read_token $master_downsample_class {} $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master downsample
|
||||||
|
FIRSTTICK 'startTimestamp' STORE // Grab our starting timestamp from the last downsample (0 if no downsamples)
|
||||||
|
|
||||||
|
<% $startTimestamp 0 == %>
|
||||||
|
<%
|
||||||
|
'No previous timestamp found, calculating starting point' LOGMSG
|
||||||
|
$auth_info ->JSON
|
||||||
|
{
|
||||||
|
'class' 'utapi.event'
|
||||||
|
'end' $operation_info 'end' GET
|
||||||
|
'labels' {}
|
||||||
|
} ->JSON @utapi/findOldestRecord
|
||||||
|
'startTimestamp' STORE
|
||||||
|
%> IFT
|
||||||
|
|
||||||
|
'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
|
||||||
|
[] 'created' STORE
|
||||||
|
$fieldsToIndex
|
||||||
|
<%
|
||||||
|
'field' STORE
|
||||||
|
[
|
||||||
|
$read_token
|
||||||
|
$snapshot_class
|
||||||
|
{ $field '~.*' }
|
||||||
|
] FINDSETS
|
||||||
|
DROP SWAP DROP $field GET
|
||||||
|
DUP ->JSON ' ' + $field + LOGMSG
|
||||||
|
<% DUP ISNULL %>
|
||||||
|
<% DROP CONTINUE %> IFT
|
||||||
|
'resources' STORE
|
||||||
|
[] 'macros' STORE
|
||||||
|
$resources
|
||||||
|
<%
|
||||||
|
DROP
|
||||||
|
<%
|
||||||
|
1 - 'idx' STORE
|
||||||
|
$idx TOSTRING LOGMSG
|
||||||
|
$auth_info ->JSON
|
||||||
|
{
|
||||||
|
'start' $startTimestamp
|
||||||
|
'end' $operation_info 'end' GET
|
||||||
|
'labels' { $field $resources $idx GET }
|
||||||
|
'size' $sampleSize
|
||||||
|
'sampleClass' $sample_class
|
||||||
|
} ->JSON @utapi/sampleMetrics
|
||||||
|
%> 'macro' STORE
|
||||||
|
$macros $macro +! DROP
|
||||||
|
%> FOREACH
|
||||||
|
$macros 10 CEVAL
|
||||||
|
$created SWAP +! DROP
|
||||||
|
'test' LOGMSG
|
||||||
|
%> FOREACH
|
||||||
|
$created
|
||||||
|
%>
|
||||||
|
<% // catch any exception
|
||||||
|
RETHROW
|
||||||
|
%>
|
||||||
|
<% // finally, restore the context
|
||||||
|
$context RESTORE
|
||||||
|
%> TRY
|
||||||
|
%>
|
||||||
|
'macro' STORE
|
||||||
|
|
||||||
|
// Unit tests
|
||||||
|
|
||||||
|
$macro
|
|
@ -0,0 +1,130 @@
|
||||||
|
{
|
||||||
|
'name' 'findOldestRecord'
|
||||||
|
'desc'
|
||||||
|
<'
|
||||||
|
|
||||||
|
'>
|
||||||
|
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
||||||
|
'params' {
|
||||||
|
// Signature params description
|
||||||
|
'a' 'Map containing read/write tokens'
|
||||||
|
'o' 'Map containing operation info'
|
||||||
|
'r' 'Timestamp of oldest record found or NULL if no records exist'
|
||||||
|
}
|
||||||
|
'examples' [
|
||||||
|
<'
|
||||||
|
|
||||||
|
'>
|
||||||
|
]
|
||||||
|
} 'info' STORE
|
||||||
|
|
||||||
|
<%
|
||||||
|
!$info INFO
|
||||||
|
SAVE 'context' STORE
|
||||||
|
<%
|
||||||
|
JSON-> 'operation_info' STORE
|
||||||
|
JSON-> 'auth_info' STORE
|
||||||
|
|
||||||
|
$auth_info 'write' GET 'write_token' STORE
|
||||||
|
$auth_info 'read' GET 'read_token' STORE
|
||||||
|
|
||||||
|
$operation_info 'class' GET 'className' STORE
|
||||||
|
// $operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||||
|
$operation_info 'labels' GET 'filterLabels' STORE
|
||||||
|
|
||||||
|
'Finding oldest record for ' $className + $filterLabels ->JSON + LOGMSG
|
||||||
|
|
||||||
|
{
|
||||||
|
'token' $read_token
|
||||||
|
'class' $className
|
||||||
|
'labels' $filterLabels
|
||||||
|
'end' MINLONG
|
||||||
|
'count' 0
|
||||||
|
'boundary.pre' 1
|
||||||
|
} FETCH
|
||||||
|
|
||||||
|
DUP ->JSON LOGMSG
|
||||||
|
|
||||||
|
//ISO-8601 Duration Format PwWdDThHmMsS
|
||||||
|
// w weeks
|
||||||
|
// d days
|
||||||
|
// h hours
|
||||||
|
// m minutes
|
||||||
|
// s or s.ssssss... seconds
|
||||||
|
// [ 'P-1D' 'PT-12H' 'PT-6H' 'PT-1H' 'PT-15M' 'PT-5M' 'PT-1M' 'PT-30S' ] 'steps' STORE
|
||||||
|
// 0 'currentStep' STORE
|
||||||
|
// false 'foundRecords' STORE
|
||||||
|
// $endTimestamp 'currentTimestamp' STORE
|
||||||
|
// $currentTimestamp TOSTRING LOGMSG
|
||||||
|
|
||||||
|
// $currentTimestamp
|
||||||
|
// <%
|
||||||
|
// $steps $currentStep GET ADDDURATION 'checkTimestamp' STORE
|
||||||
|
// $checkTimestamp TOSTRING LOGMSG
|
||||||
|
// %>
|
||||||
|
// <%
|
||||||
|
// <% DUP SIZE 0 == %>
|
||||||
|
// <%
|
||||||
|
// DROP // Drop empty GTS
|
||||||
|
// 'Found 0 records for timestamp' $currentTimestamp TOSTRING + LOGMSG
|
||||||
|
// <% $currentStep $steps SIZE 1 - >= %>
|
||||||
|
// <%
|
||||||
|
// 'At final step, exiting loop' LOGMSG
|
||||||
|
// $currentTimestamp
|
||||||
|
// true
|
||||||
|
// %>
|
||||||
|
// <%
|
||||||
|
// 'Moving to smaller step and retrying timestamp' LOGMSG
|
||||||
|
// $currentStep 1 + 'currentStep' STORE
|
||||||
|
// $currentTimestamp
|
||||||
|
// false
|
||||||
|
// %> IFTE
|
||||||
|
// %>
|
||||||
|
// <%
|
||||||
|
// 'Found records before ' $currentTimestamp TOSTRING + ' Advancing currentTimestamp' + LOGMSG
|
||||||
|
// FIRSTTICK 'currentTimestamp' STORE
|
||||||
|
// true 'foundRecords' STORE
|
||||||
|
// $currentTimestamp
|
||||||
|
// false
|
||||||
|
// %> IFTE
|
||||||
|
// %> UNTIL
|
||||||
|
|
||||||
|
// 'currentTimestamp' STORE
|
||||||
|
// 'currentTimestamp ' $currentTimestamp TOSTRING + LOGMSG
|
||||||
|
|
||||||
|
// <% $foundRecords ! %>
|
||||||
|
// <% NULL %>
|
||||||
|
// <%
|
||||||
|
// <%
|
||||||
|
// $currentTimestamp TOSTRING LOGMSG
|
||||||
|
// {
|
||||||
|
// 'token' $read_token
|
||||||
|
// 'class' $className
|
||||||
|
// 'labels' $filterLabels
|
||||||
|
// 'end' $currentTimestamp 1 -
|
||||||
|
// 'count' 1000
|
||||||
|
// } FETCH
|
||||||
|
// %>
|
||||||
|
// <%
|
||||||
|
// <% DUP SIZE 0 == %>
|
||||||
|
// <% DROP true %>
|
||||||
|
// <%
|
||||||
|
// FIRSTTICK 'currentTimestamp' STORE
|
||||||
|
// false
|
||||||
|
// %> IFTE
|
||||||
|
// %> UNTIL
|
||||||
|
// $currentTimestamp
|
||||||
|
// %> IFTE
|
||||||
|
%>
|
||||||
|
<% // catch any exception
|
||||||
|
RETHROW
|
||||||
|
%>
|
||||||
|
<% // finally, restore the context
|
||||||
|
$context RESTORE
|
||||||
|
%> TRY
|
||||||
|
%>
|
||||||
|
'macro' STORE
|
||||||
|
|
||||||
|
// Unit tests
|
||||||
|
|
||||||
|
$macro
|
|
@ -34,7 +34,9 @@
|
||||||
|
|
||||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||||
$operation_info 'labels' GET 'labels' STORE
|
$operation_info 'labels' GET 'labels' STORE
|
||||||
$operation_info 'node' GET 'nodeID' STORE
|
|
||||||
|
<% $operation_info 'node' CONTAINSKEY SWAP DROP %>
|
||||||
|
<% $operation_info 'node' GET 'nodeID' STORE %> IFT
|
||||||
|
|
||||||
// Raise the max operations for a executing script
|
// Raise the max operations for a executing script
|
||||||
// $read_token AUTHENTICATE
|
// $read_token AUTHENTICATE
|
||||||
|
@ -259,23 +261,26 @@
|
||||||
%> FOREACH
|
%> FOREACH
|
||||||
|
|
||||||
'load_reindex' SECTION
|
'load_reindex' SECTION
|
||||||
// Only load the latest reindex for the current node
|
<% 'nodeID' DEFINED %>
|
||||||
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
|
<%
|
||||||
{
|
// Only load the latest reindex for the current node
|
||||||
'token' $read_token
|
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
|
||||||
'class' $reindex_class
|
{
|
||||||
'labels' $filterLabels
|
'token' $read_token
|
||||||
'end' $endTimestamp
|
'class' $reindex_class
|
||||||
'count' 1
|
'labels' $filterLabels
|
||||||
} FETCH
|
'end' $endTimestamp
|
||||||
<% // Handle multiple GTS
|
'count' 1
|
||||||
VALUES
|
} FETCH
|
||||||
<% // For each reindex correction
|
<% // Handle multiple GTS
|
||||||
@utapi/decodeRecord
|
VALUES
|
||||||
// DUP 'Loaded reindex correction ' SWAP ->JSON + LOGMSG
|
<% // For each reindex correction
|
||||||
$results @util/sumRecord 'results' STORE
|
@utapi/decodeRecord
|
||||||
|
// DUP 'Loaded reindex correction ' SWAP ->JSON + LOGMSG
|
||||||
|
$results @util/sumRecord 'results' STORE
|
||||||
|
%> FOREACH
|
||||||
%> FOREACH
|
%> FOREACH
|
||||||
%> FOREACH
|
%> IFT
|
||||||
|
|
||||||
$results // Leave results on the stack
|
$results // Leave results on the stack
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
{
|
||||||
|
'name' 'sampleMetrics'
|
||||||
|
'desc'
|
||||||
|
<'
|
||||||
|
Sample metrics at the given interval using the provided labels
|
||||||
|
'>
|
||||||
|
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
||||||
|
'params' {
|
||||||
|
// Signature params description
|
||||||
|
'a' 'Map containing read/write tokens'
|
||||||
|
'o' 'Map containing operation info'
|
||||||
|
'r' 'Generated metrics GTS'
|
||||||
|
}
|
||||||
|
'examples' [
|
||||||
|
<'
|
||||||
|
|
||||||
|
'>
|
||||||
|
]
|
||||||
|
} 'info' STORE
|
||||||
|
|
||||||
|
<%
|
||||||
|
!$info INFO
|
||||||
|
SAVE 'context' STORE
|
||||||
|
<%
|
||||||
|
JSON-> 'operation_info' STORE
|
||||||
|
'auth_info_json' STORE
|
||||||
|
$auth_info_json JSON-> 'auth_info' STORE
|
||||||
|
|
||||||
|
$auth_info 'write' GET 'write_token' STORE
|
||||||
|
$auth_info 'read' GET 'read_token' STORE
|
||||||
|
|
||||||
|
$operation_info 'start' GET TOLONG 'startTimestamp' STORE
|
||||||
|
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||||
|
$operation_info 'labels' GET 'filterLabels' STORE
|
||||||
|
$operation_info 'sampleClass' GET 'sampleClass' STORE
|
||||||
|
$operation_info 'size' GET 'sampleSize' STORE
|
||||||
|
|
||||||
|
'Sampling records from ' $startTimestamp TOSTRING + ' - ' + $endTimestamp TOSTRING + ' ' + $filterLabels ->JSON + LOGMSG
|
||||||
|
|
||||||
|
$auth_info_json { 'labels' $filterLabels 'end' $startTimestamp } ->JSON @utapi/getMetricsAt 'metric' STORE
|
||||||
|
$startTimestamp $sampleSize ADDDURATION 'currentTimestamp' STORE
|
||||||
|
NEWGTS $sampleClass RENAME
|
||||||
|
$filterLabels RELABEL
|
||||||
|
<% $currentTimestamp $sampleSize ADDDURATION $endTimestamp <= %>
|
||||||
|
<%
|
||||||
|
'Sampling metrics for ' $filterLabels ->JSON + ' at ' + $currentTimestamp TOSTRING + LOGMSG
|
||||||
|
$auth_info_json { 'labels' $filterLabels 'end' $startTimestamp 'prev' $metric } ->JSON @utapi/getMetricsAt 'metric' STORE
|
||||||
|
$currentTimestamp NaN NaN NaN $metric @utapi/encodeRecord ADDVALUE
|
||||||
|
$currentTimestamp $sampleSize ADDDURATION 'currentTimestamp' STORE
|
||||||
|
%>
|
||||||
|
WHILE
|
||||||
|
%>
|
||||||
|
<% // catch any exception
|
||||||
|
RETHROW
|
||||||
|
%>
|
||||||
|
<% // finally, restore the context
|
||||||
|
$context RESTORE
|
||||||
|
%> TRY
|
||||||
|
%>
|
||||||
|
'macro' STORE
|
||||||
|
|
||||||
|
// Unit tests
|
||||||
|
|
||||||
|
$macro
|
Loading…
Reference in New Issue