Compare commits
No commits in common. "1c1a4a945dc8022db5bd10437b6dd2e7f439670c" and "9b800a062f732a29e9434a5134eaa0db4b043d1c" have entirely different histories.
1c1a4a945d
...
9b800a062f
|
@ -1,14 +0,0 @@
|
||||||
const { tasks } = require('..');
|
|
||||||
const { LoggerContext } = require('../libV2/utils');
|
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
|
||||||
task: 'Downsample',
|
|
||||||
});
|
|
||||||
|
|
||||||
|
|
||||||
const task = new tasks.DownsampleTask();
|
|
||||||
|
|
||||||
task.setup()
|
|
||||||
.then(() => logger.info('Starting Downsample daemon'))
|
|
||||||
.then(() => task.start())
|
|
||||||
.then(() => logger.info('Downsample started'));
|
|
|
@ -12,8 +12,6 @@ ENV warpscript.repository.directory /usr/local/share/warpscript
|
||||||
ENV warp.token.file /static.tokens
|
ENV warp.token.file /static.tokens
|
||||||
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
|
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
|
||||||
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
|
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
|
||||||
ENV warpscript.extension.concurrent 'io.warp10.script.ext.concurrent.ConcurrentWarpScriptExtension'
|
|
||||||
|
|
||||||
# ENV warpscript.extension.debug io.warp10.script.ext.debug.DebugWarpScriptExtension
|
# ENV warpscript.extension.debug io.warp10.script.ext.debug.DebugWarpScriptExtension
|
||||||
|
|
||||||
RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_VERSION}/s6-overlay-amd64.tar.gz -O /tmp/s6-overlay-amd64.tar.gz \
|
RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_VERSION}/s6-overlay-amd64.tar.gz -O /tmp/s6-overlay-amd64.tar.gz \
|
||||||
|
|
|
@ -1,41 +0,0 @@
|
||||||
const BaseTask = require('./BaseTask');
|
|
||||||
const config = require('../config');
|
|
||||||
const { LoggerContext } = require('../utils');
|
|
||||||
const { downsampleLagSecs, indexedEventFields } = require('../constants');
|
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
|
||||||
module: 'Repair',
|
|
||||||
});
|
|
||||||
|
|
||||||
class DownsampleTask extends BaseTask {
|
|
||||||
constructor(options) {
|
|
||||||
super({
|
|
||||||
warp10: {
|
|
||||||
requestTimeout: 30000,
|
|
||||||
connectTimeout: 30000,
|
|
||||||
},
|
|
||||||
...options,
|
|
||||||
});
|
|
||||||
this._defaultSchedule = config.downsampleSchedule;
|
|
||||||
this._defaultLag = downsampleLagSecs;
|
|
||||||
}
|
|
||||||
|
|
||||||
async _execute(timestamp) {
|
|
||||||
logger.debug('Downsampling records', { timestamp, nodeId: this.nodeId });
|
|
||||||
|
|
||||||
const params = {
|
|
||||||
params: {
|
|
||||||
nodeId: this.nodeId,
|
|
||||||
end: timestamp.toString(),
|
|
||||||
fields: indexedEventFields,
|
|
||||||
},
|
|
||||||
macro: 'utapi/repairRecords',
|
|
||||||
};
|
|
||||||
const status = await this._warp10.exec(params);
|
|
||||||
if (status.result[0]) {
|
|
||||||
logger.info(`created ${status.result[0]} corrections`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = DownsampleTask;
|
|
|
@ -1,105 +0,0 @@
|
||||||
{
|
|
||||||
'name' 'downsampleRecords'
|
|
||||||
'desc'
|
|
||||||
<'
|
|
||||||
|
|
||||||
'>
|
|
||||||
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
|
||||||
'params' {
|
|
||||||
// Signature params description
|
|
||||||
'a' 'Map containing read/write tokens'
|
|
||||||
'o' 'Map containing operation info'
|
|
||||||
'c' 'List of created checkpoints'
|
|
||||||
}
|
|
||||||
'examples' [
|
|
||||||
<'
|
|
||||||
|
|
||||||
'>
|
|
||||||
]
|
|
||||||
} 'info' STORE
|
|
||||||
|
|
||||||
<%
|
|
||||||
!$info INFO
|
|
||||||
SAVE 'context' STORE
|
|
||||||
<%
|
|
||||||
'Downsampling records' LOGMSG
|
|
||||||
JSON-> 'operation_info' STORE
|
|
||||||
JSON-> 'auth_info' STORE
|
|
||||||
|
|
||||||
$auth_info 'write' GET 'write_token' STORE
|
|
||||||
$auth_info 'read' GET 'read_token' STORE
|
|
||||||
|
|
||||||
$operation_info 'fields' GET 'fieldsToIndex' STORE
|
|
||||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
|
||||||
$operation_info 'size' GET 'sampleSize' STORE
|
|
||||||
|
|
||||||
'utapi.downsample.master' 'master_downsample_class' STORE
|
|
||||||
'utapi.snapshot' 'snapshot_class' STORE
|
|
||||||
'utapi.downsample.stage1' 'sample_class' STORE
|
|
||||||
|
|
||||||
$read_token $master_downsample_class {} $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master downsample
|
|
||||||
FIRSTTICK 'startTimestamp' STORE // Grab our starting timestamp from the last downsample (0 if no downsamples)
|
|
||||||
|
|
||||||
<% $startTimestamp 0 == %>
|
|
||||||
<%
|
|
||||||
'No previous timestamp found, calculating starting point' LOGMSG
|
|
||||||
$auth_info ->JSON
|
|
||||||
{
|
|
||||||
'class' 'utapi.event'
|
|
||||||
'end' $operation_info 'end' GET
|
|
||||||
'labels' {}
|
|
||||||
} ->JSON @utapi/findOldestRecord
|
|
||||||
'startTimestamp' STORE
|
|
||||||
%> IFT
|
|
||||||
|
|
||||||
'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
|
|
||||||
[] 'created' STORE
|
|
||||||
$fieldsToIndex
|
|
||||||
<%
|
|
||||||
'field' STORE
|
|
||||||
[
|
|
||||||
$read_token
|
|
||||||
$snapshot_class
|
|
||||||
{ $field '~.*' }
|
|
||||||
] FINDSETS
|
|
||||||
DROP SWAP DROP $field GET
|
|
||||||
DUP ->JSON ' ' + $field + LOGMSG
|
|
||||||
<% DUP ISNULL %>
|
|
||||||
<% DROP CONTINUE %> IFT
|
|
||||||
'resources' STORE
|
|
||||||
[] 'macros' STORE
|
|
||||||
$resources
|
|
||||||
<%
|
|
||||||
DROP
|
|
||||||
<%
|
|
||||||
1 - 'idx' STORE
|
|
||||||
$idx TOSTRING LOGMSG
|
|
||||||
$auth_info ->JSON
|
|
||||||
{
|
|
||||||
'start' $startTimestamp
|
|
||||||
'end' $operation_info 'end' GET
|
|
||||||
'labels' { $field $resources $idx GET }
|
|
||||||
'size' $sampleSize
|
|
||||||
'sampleClass' $sample_class
|
|
||||||
} ->JSON @utapi/sampleMetrics
|
|
||||||
%> 'macro' STORE
|
|
||||||
$macros $macro +! DROP
|
|
||||||
%> FOREACH
|
|
||||||
$macros 10 CEVAL
|
|
||||||
$created SWAP +! DROP
|
|
||||||
'test' LOGMSG
|
|
||||||
%> FOREACH
|
|
||||||
$created
|
|
||||||
%>
|
|
||||||
<% // catch any exception
|
|
||||||
RETHROW
|
|
||||||
%>
|
|
||||||
<% // finally, restore the context
|
|
||||||
$context RESTORE
|
|
||||||
%> TRY
|
|
||||||
%>
|
|
||||||
'macro' STORE
|
|
||||||
|
|
||||||
// Unit tests
|
|
||||||
|
|
||||||
$macro
|
|
|
@ -1,130 +0,0 @@
|
||||||
{
|
|
||||||
'name' 'findOldestRecord'
|
|
||||||
'desc'
|
|
||||||
<'
|
|
||||||
|
|
||||||
'>
|
|
||||||
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
|
||||||
'params' {
|
|
||||||
// Signature params description
|
|
||||||
'a' 'Map containing read/write tokens'
|
|
||||||
'o' 'Map containing operation info'
|
|
||||||
'r' 'Timestamp of oldest record found or NULL if no records exist'
|
|
||||||
}
|
|
||||||
'examples' [
|
|
||||||
<'
|
|
||||||
|
|
||||||
'>
|
|
||||||
]
|
|
||||||
} 'info' STORE
|
|
||||||
|
|
||||||
<%
|
|
||||||
!$info INFO
|
|
||||||
SAVE 'context' STORE
|
|
||||||
<%
|
|
||||||
JSON-> 'operation_info' STORE
|
|
||||||
JSON-> 'auth_info' STORE
|
|
||||||
|
|
||||||
$auth_info 'write' GET 'write_token' STORE
|
|
||||||
$auth_info 'read' GET 'read_token' STORE
|
|
||||||
|
|
||||||
$operation_info 'class' GET 'className' STORE
|
|
||||||
// $operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
|
||||||
$operation_info 'labels' GET 'filterLabels' STORE
|
|
||||||
|
|
||||||
'Finding oldest record for ' $className + $filterLabels ->JSON + LOGMSG
|
|
||||||
|
|
||||||
{
|
|
||||||
'token' $read_token
|
|
||||||
'class' $className
|
|
||||||
'labels' $filterLabels
|
|
||||||
'end' MINLONG
|
|
||||||
'count' 0
|
|
||||||
'boundary.pre' 1
|
|
||||||
} FETCH
|
|
||||||
|
|
||||||
DUP ->JSON LOGMSG
|
|
||||||
|
|
||||||
//ISO-8601 Duration Format PwWdDThHmMsS
|
|
||||||
// w weeks
|
|
||||||
// d days
|
|
||||||
// h hours
|
|
||||||
// m minutes
|
|
||||||
// s or s.ssssss... seconds
|
|
||||||
// [ 'P-1D' 'PT-12H' 'PT-6H' 'PT-1H' 'PT-15M' 'PT-5M' 'PT-1M' 'PT-30S' ] 'steps' STORE
|
|
||||||
// 0 'currentStep' STORE
|
|
||||||
// false 'foundRecords' STORE
|
|
||||||
// $endTimestamp 'currentTimestamp' STORE
|
|
||||||
// $currentTimestamp TOSTRING LOGMSG
|
|
||||||
|
|
||||||
// $currentTimestamp
|
|
||||||
// <%
|
|
||||||
// $steps $currentStep GET ADDDURATION 'checkTimestamp' STORE
|
|
||||||
// $checkTimestamp TOSTRING LOGMSG
|
|
||||||
// %>
|
|
||||||
// <%
|
|
||||||
// <% DUP SIZE 0 == %>
|
|
||||||
// <%
|
|
||||||
// DROP // Drop empty GTS
|
|
||||||
// 'Found 0 records for timestamp' $currentTimestamp TOSTRING + LOGMSG
|
|
||||||
// <% $currentStep $steps SIZE 1 - >= %>
|
|
||||||
// <%
|
|
||||||
// 'At final step, exiting loop' LOGMSG
|
|
||||||
// $currentTimestamp
|
|
||||||
// true
|
|
||||||
// %>
|
|
||||||
// <%
|
|
||||||
// 'Moving to smaller step and retrying timestamp' LOGMSG
|
|
||||||
// $currentStep 1 + 'currentStep' STORE
|
|
||||||
// $currentTimestamp
|
|
||||||
// false
|
|
||||||
// %> IFTE
|
|
||||||
// %>
|
|
||||||
// <%
|
|
||||||
// 'Found records before ' $currentTimestamp TOSTRING + ' Advancing currentTimestamp' + LOGMSG
|
|
||||||
// FIRSTTICK 'currentTimestamp' STORE
|
|
||||||
// true 'foundRecords' STORE
|
|
||||||
// $currentTimestamp
|
|
||||||
// false
|
|
||||||
// %> IFTE
|
|
||||||
// %> UNTIL
|
|
||||||
|
|
||||||
// 'currentTimestamp' STORE
|
|
||||||
// 'currentTimestamp ' $currentTimestamp TOSTRING + LOGMSG
|
|
||||||
|
|
||||||
// <% $foundRecords ! %>
|
|
||||||
// <% NULL %>
|
|
||||||
// <%
|
|
||||||
// <%
|
|
||||||
// $currentTimestamp TOSTRING LOGMSG
|
|
||||||
// {
|
|
||||||
// 'token' $read_token
|
|
||||||
// 'class' $className
|
|
||||||
// 'labels' $filterLabels
|
|
||||||
// 'end' $currentTimestamp 1 -
|
|
||||||
// 'count' 1000
|
|
||||||
// } FETCH
|
|
||||||
// %>
|
|
||||||
// <%
|
|
||||||
// <% DUP SIZE 0 == %>
|
|
||||||
// <% DROP true %>
|
|
||||||
// <%
|
|
||||||
// FIRSTTICK 'currentTimestamp' STORE
|
|
||||||
// false
|
|
||||||
// %> IFTE
|
|
||||||
// %> UNTIL
|
|
||||||
// $currentTimestamp
|
|
||||||
// %> IFTE
|
|
||||||
%>
|
|
||||||
<% // catch any exception
|
|
||||||
RETHROW
|
|
||||||
%>
|
|
||||||
<% // finally, restore the context
|
|
||||||
$context RESTORE
|
|
||||||
%> TRY
|
|
||||||
%>
|
|
||||||
'macro' STORE
|
|
||||||
|
|
||||||
// Unit tests
|
|
||||||
|
|
||||||
$macro
|
|
|
@ -34,9 +34,7 @@
|
||||||
|
|
||||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||||
$operation_info 'labels' GET 'labels' STORE
|
$operation_info 'labels' GET 'labels' STORE
|
||||||
|
$operation_info 'node' GET 'nodeID' STORE
|
||||||
<% $operation_info 'node' CONTAINSKEY SWAP DROP %>
|
|
||||||
<% $operation_info 'node' GET 'nodeID' STORE %> IFT
|
|
||||||
|
|
||||||
// Raise the max operations for a executing script
|
// Raise the max operations for a executing script
|
||||||
// $read_token AUTHENTICATE
|
// $read_token AUTHENTICATE
|
||||||
|
@ -261,8 +259,6 @@
|
||||||
%> FOREACH
|
%> FOREACH
|
||||||
|
|
||||||
'load_reindex' SECTION
|
'load_reindex' SECTION
|
||||||
<% 'nodeID' DEFINED %>
|
|
||||||
<%
|
|
||||||
// Only load the latest reindex for the current node
|
// Only load the latest reindex for the current node
|
||||||
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
|
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
|
||||||
{
|
{
|
||||||
|
@ -280,7 +276,6 @@
|
||||||
$results @util/sumRecord 'results' STORE
|
$results @util/sumRecord 'results' STORE
|
||||||
%> FOREACH
|
%> FOREACH
|
||||||
%> FOREACH
|
%> FOREACH
|
||||||
%> IFT
|
|
||||||
|
|
||||||
$results // Leave results on the stack
|
$results // Leave results on the stack
|
||||||
|
|
||||||
|
|
|
@ -1,64 +0,0 @@
|
||||||
{
|
|
||||||
'name' 'sampleMetrics'
|
|
||||||
'desc'
|
|
||||||
<'
|
|
||||||
Sample metrics at the given interval using the provided labels
|
|
||||||
'>
|
|
||||||
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
|
||||||
'params' {
|
|
||||||
// Signature params description
|
|
||||||
'a' 'Map containing read/write tokens'
|
|
||||||
'o' 'Map containing operation info'
|
|
||||||
'r' 'Generated metrics GTS'
|
|
||||||
}
|
|
||||||
'examples' [
|
|
||||||
<'
|
|
||||||
|
|
||||||
'>
|
|
||||||
]
|
|
||||||
} 'info' STORE
|
|
||||||
|
|
||||||
<%
|
|
||||||
!$info INFO
|
|
||||||
SAVE 'context' STORE
|
|
||||||
<%
|
|
||||||
JSON-> 'operation_info' STORE
|
|
||||||
'auth_info_json' STORE
|
|
||||||
$auth_info_json JSON-> 'auth_info' STORE
|
|
||||||
|
|
||||||
$auth_info 'write' GET 'write_token' STORE
|
|
||||||
$auth_info 'read' GET 'read_token' STORE
|
|
||||||
|
|
||||||
$operation_info 'start' GET TOLONG 'startTimestamp' STORE
|
|
||||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
|
||||||
$operation_info 'labels' GET 'filterLabels' STORE
|
|
||||||
$operation_info 'sampleClass' GET 'sampleClass' STORE
|
|
||||||
$operation_info 'size' GET 'sampleSize' STORE
|
|
||||||
|
|
||||||
'Sampling records from ' $startTimestamp TOSTRING + ' - ' + $endTimestamp TOSTRING + ' ' + $filterLabels ->JSON + LOGMSG
|
|
||||||
|
|
||||||
$auth_info_json { 'labels' $filterLabels 'end' $startTimestamp } ->JSON @utapi/getMetricsAt 'metric' STORE
|
|
||||||
$startTimestamp $sampleSize ADDDURATION 'currentTimestamp' STORE
|
|
||||||
NEWGTS $sampleClass RENAME
|
|
||||||
$filterLabels RELABEL
|
|
||||||
<% $currentTimestamp $sampleSize ADDDURATION $endTimestamp <= %>
|
|
||||||
<%
|
|
||||||
'Sampling metrics for ' $filterLabels ->JSON + ' at ' + $currentTimestamp TOSTRING + LOGMSG
|
|
||||||
$auth_info_json { 'labels' $filterLabels 'end' $startTimestamp 'prev' $metric } ->JSON @utapi/getMetricsAt 'metric' STORE
|
|
||||||
$currentTimestamp NaN NaN NaN $metric @utapi/encodeRecord ADDVALUE
|
|
||||||
$currentTimestamp $sampleSize ADDDURATION 'currentTimestamp' STORE
|
|
||||||
%>
|
|
||||||
WHILE
|
|
||||||
%>
|
|
||||||
<% // catch any exception
|
|
||||||
RETHROW
|
|
||||||
%>
|
|
||||||
<% // finally, restore the context
|
|
||||||
$context RESTORE
|
|
||||||
%> TRY
|
|
||||||
%>
|
|
||||||
'macro' STORE
|
|
||||||
|
|
||||||
// Unit tests
|
|
||||||
|
|
||||||
$macro
|
|
Loading…
Reference in New Issue