Compare commits
3 Commits
developmen
...
dev/downsa
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 1c1a4a945d | |
Taylor McKinnon | 0c512c9e70 | |
Taylor McKinnon | f6575f32e6 |
|
@ -0,0 +1,14 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'Downsample',
|
||||
});
|
||||
|
||||
|
||||
const task = new tasks.DownsampleTask();
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting Downsample daemon'))
|
||||
.then(() => task.start())
|
||||
.then(() => logger.info('Downsample started'));
|
|
@ -12,6 +12,8 @@ ENV warpscript.repository.directory /usr/local/share/warpscript
|
|||
ENV warp.token.file /static.tokens
|
||||
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
|
||||
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
|
||||
ENV warpscript.extension.concurrent 'io.warp10.script.ext.concurrent.ConcurrentWarpScriptExtension'
|
||||
|
||||
# ENV warpscript.extension.debug io.warp10.script.ext.debug.DebugWarpScriptExtension
|
||||
|
||||
RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_VERSION}/s6-overlay-amd64.tar.gz -O /tmp/s6-overlay-amd64.tar.gz \
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
const BaseTask = require('./BaseTask');
|
||||
const config = require('../config');
|
||||
const { LoggerContext } = require('../utils');
|
||||
const { downsampleLagSecs, indexedEventFields } = require('../constants');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
module: 'Repair',
|
||||
});
|
||||
|
||||
class DownsampleTask extends BaseTask {
|
||||
constructor(options) {
|
||||
super({
|
||||
warp10: {
|
||||
requestTimeout: 30000,
|
||||
connectTimeout: 30000,
|
||||
},
|
||||
...options,
|
||||
});
|
||||
this._defaultSchedule = config.downsampleSchedule;
|
||||
this._defaultLag = downsampleLagSecs;
|
||||
}
|
||||
|
||||
async _execute(timestamp) {
|
||||
logger.debug('Downsampling records', { timestamp, nodeId: this.nodeId });
|
||||
|
||||
const params = {
|
||||
params: {
|
||||
nodeId: this.nodeId,
|
||||
end: timestamp.toString(),
|
||||
fields: indexedEventFields,
|
||||
},
|
||||
macro: 'utapi/repairRecords',
|
||||
};
|
||||
const status = await this._warp10.exec(params);
|
||||
if (status.result[0]) {
|
||||
logger.info(`created ${status.result[0]} corrections`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = DownsampleTask;
|
|
@ -0,0 +1,105 @@
|
|||
{
|
||||
'name' 'downsampleRecords'
|
||||
'desc'
|
||||
<'
|
||||
|
||||
'>
|
||||
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
||||
'params' {
|
||||
// Signature params description
|
||||
'a' 'Map containing read/write tokens'
|
||||
'o' 'Map containing operation info'
|
||||
'c' 'List of created checkpoints'
|
||||
}
|
||||
'examples' [
|
||||
<'
|
||||
|
||||
'>
|
||||
]
|
||||
} 'info' STORE
|
||||
|
||||
<%
|
||||
!$info INFO
|
||||
SAVE 'context' STORE
|
||||
<%
|
||||
'Downsampling records' LOGMSG
|
||||
JSON-> 'operation_info' STORE
|
||||
JSON-> 'auth_info' STORE
|
||||
|
||||
$auth_info 'write' GET 'write_token' STORE
|
||||
$auth_info 'read' GET 'read_token' STORE
|
||||
|
||||
$operation_info 'fields' GET 'fieldsToIndex' STORE
|
||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||
$operation_info 'size' GET 'sampleSize' STORE
|
||||
|
||||
'utapi.downsample.master' 'master_downsample_class' STORE
|
||||
'utapi.snapshot' 'snapshot_class' STORE
|
||||
'utapi.downsample.stage1' 'sample_class' STORE
|
||||
|
||||
$read_token $master_downsample_class {} $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master downsample
|
||||
FIRSTTICK 'startTimestamp' STORE // Grab our starting timestamp from the last downsample (0 if no downsamples)
|
||||
|
||||
<% $startTimestamp 0 == %>
|
||||
<%
|
||||
'No previous timestamp found, calculating starting point' LOGMSG
|
||||
$auth_info ->JSON
|
||||
{
|
||||
'class' 'utapi.event'
|
||||
'end' $operation_info 'end' GET
|
||||
'labels' {}
|
||||
} ->JSON @utapi/findOldestRecord
|
||||
'startTimestamp' STORE
|
||||
%> IFT
|
||||
|
||||
'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
|
||||
[] 'created' STORE
|
||||
$fieldsToIndex
|
||||
<%
|
||||
'field' STORE
|
||||
[
|
||||
$read_token
|
||||
$snapshot_class
|
||||
{ $field '~.*' }
|
||||
] FINDSETS
|
||||
DROP SWAP DROP $field GET
|
||||
DUP ->JSON ' ' + $field + LOGMSG
|
||||
<% DUP ISNULL %>
|
||||
<% DROP CONTINUE %> IFT
|
||||
'resources' STORE
|
||||
[] 'macros' STORE
|
||||
$resources
|
||||
<%
|
||||
DROP
|
||||
<%
|
||||
1 - 'idx' STORE
|
||||
$idx TOSTRING LOGMSG
|
||||
$auth_info ->JSON
|
||||
{
|
||||
'start' $startTimestamp
|
||||
'end' $operation_info 'end' GET
|
||||
'labels' { $field $resources $idx GET }
|
||||
'size' $sampleSize
|
||||
'sampleClass' $sample_class
|
||||
} ->JSON @utapi/sampleMetrics
|
||||
%> 'macro' STORE
|
||||
$macros $macro +! DROP
|
||||
%> FOREACH
|
||||
$macros 10 CEVAL
|
||||
$created SWAP +! DROP
|
||||
'test' LOGMSG
|
||||
%> FOREACH
|
||||
$created
|
||||
%>
|
||||
<% // catch any exception
|
||||
RETHROW
|
||||
%>
|
||||
<% // finally, restore the context
|
||||
$context RESTORE
|
||||
%> TRY
|
||||
%>
|
||||
'macro' STORE
|
||||
|
||||
// Unit tests
|
||||
|
||||
$macro
|
|
@ -0,0 +1,130 @@
|
|||
{
|
||||
'name' 'findOldestRecord'
|
||||
'desc'
|
||||
<'
|
||||
|
||||
'>
|
||||
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
||||
'params' {
|
||||
// Signature params description
|
||||
'a' 'Map containing read/write tokens'
|
||||
'o' 'Map containing operation info'
|
||||
'r' 'Timestamp of oldest record found or NULL if no records exist'
|
||||
}
|
||||
'examples' [
|
||||
<'
|
||||
|
||||
'>
|
||||
]
|
||||
} 'info' STORE
|
||||
|
||||
<%
|
||||
!$info INFO
|
||||
SAVE 'context' STORE
|
||||
<%
|
||||
JSON-> 'operation_info' STORE
|
||||
JSON-> 'auth_info' STORE
|
||||
|
||||
$auth_info 'write' GET 'write_token' STORE
|
||||
$auth_info 'read' GET 'read_token' STORE
|
||||
|
||||
$operation_info 'class' GET 'className' STORE
|
||||
// $operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||
$operation_info 'labels' GET 'filterLabels' STORE
|
||||
|
||||
'Finding oldest record for ' $className + $filterLabels ->JSON + LOGMSG
|
||||
|
||||
{
|
||||
'token' $read_token
|
||||
'class' $className
|
||||
'labels' $filterLabels
|
||||
'end' MINLONG
|
||||
'count' 0
|
||||
'boundary.pre' 1
|
||||
} FETCH
|
||||
|
||||
DUP ->JSON LOGMSG
|
||||
|
||||
//ISO-8601 Duration Format PwWdDThHmMsS
|
||||
// w weeks
|
||||
// d days
|
||||
// h hours
|
||||
// m minutes
|
||||
// s or s.ssssss... seconds
|
||||
// [ 'P-1D' 'PT-12H' 'PT-6H' 'PT-1H' 'PT-15M' 'PT-5M' 'PT-1M' 'PT-30S' ] 'steps' STORE
|
||||
// 0 'currentStep' STORE
|
||||
// false 'foundRecords' STORE
|
||||
// $endTimestamp 'currentTimestamp' STORE
|
||||
// $currentTimestamp TOSTRING LOGMSG
|
||||
|
||||
// $currentTimestamp
|
||||
// <%
|
||||
// $steps $currentStep GET ADDDURATION 'checkTimestamp' STORE
|
||||
// $checkTimestamp TOSTRING LOGMSG
|
||||
// %>
|
||||
// <%
|
||||
// <% DUP SIZE 0 == %>
|
||||
// <%
|
||||
// DROP // Drop empty GTS
|
||||
// 'Found 0 records for timestamp' $currentTimestamp TOSTRING + LOGMSG
|
||||
// <% $currentStep $steps SIZE 1 - >= %>
|
||||
// <%
|
||||
// 'At final step, exiting loop' LOGMSG
|
||||
// $currentTimestamp
|
||||
// true
|
||||
// %>
|
||||
// <%
|
||||
// 'Moving to smaller step and retrying timestamp' LOGMSG
|
||||
// $currentStep 1 + 'currentStep' STORE
|
||||
// $currentTimestamp
|
||||
// false
|
||||
// %> IFTE
|
||||
// %>
|
||||
// <%
|
||||
// 'Found records before ' $currentTimestamp TOSTRING + ' Advancing currentTimestamp' + LOGMSG
|
||||
// FIRSTTICK 'currentTimestamp' STORE
|
||||
// true 'foundRecords' STORE
|
||||
// $currentTimestamp
|
||||
// false
|
||||
// %> IFTE
|
||||
// %> UNTIL
|
||||
|
||||
// 'currentTimestamp' STORE
|
||||
// 'currentTimestamp ' $currentTimestamp TOSTRING + LOGMSG
|
||||
|
||||
// <% $foundRecords ! %>
|
||||
// <% NULL %>
|
||||
// <%
|
||||
// <%
|
||||
// $currentTimestamp TOSTRING LOGMSG
|
||||
// {
|
||||
// 'token' $read_token
|
||||
// 'class' $className
|
||||
// 'labels' $filterLabels
|
||||
// 'end' $currentTimestamp 1 -
|
||||
// 'count' 1000
|
||||
// } FETCH
|
||||
// %>
|
||||
// <%
|
||||
// <% DUP SIZE 0 == %>
|
||||
// <% DROP true %>
|
||||
// <%
|
||||
// FIRSTTICK 'currentTimestamp' STORE
|
||||
// false
|
||||
// %> IFTE
|
||||
// %> UNTIL
|
||||
// $currentTimestamp
|
||||
// %> IFTE
|
||||
%>
|
||||
<% // catch any exception
|
||||
RETHROW
|
||||
%>
|
||||
<% // finally, restore the context
|
||||
$context RESTORE
|
||||
%> TRY
|
||||
%>
|
||||
'macro' STORE
|
||||
|
||||
// Unit tests
|
||||
|
||||
$macro
|
|
@ -34,7 +34,9 @@
|
|||
|
||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||
$operation_info 'labels' GET 'labels' STORE
|
||||
$operation_info 'node' GET 'nodeID' STORE
|
||||
|
||||
<% $operation_info 'node' CONTAINSKEY SWAP DROP %>
|
||||
<% $operation_info 'node' GET 'nodeID' STORE %> IFT
|
||||
|
||||
// Raise the max operations for a executing script
|
||||
// $read_token AUTHENTICATE
|
||||
|
@ -259,6 +261,8 @@
|
|||
%> FOREACH
|
||||
|
||||
'load_reindex' SECTION
|
||||
<% 'nodeID' DEFINED %>
|
||||
<%
|
||||
// Only load the latest reindex for the current node
|
||||
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
|
||||
{
|
||||
|
@ -276,6 +280,7 @@
|
|||
$results @util/sumRecord 'results' STORE
|
||||
%> FOREACH
|
||||
%> FOREACH
|
||||
%> IFT
|
||||
|
||||
$results // Leave results on the stack
|
||||
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
{
|
||||
'name' 'sampleMetrics'
|
||||
'desc'
|
||||
<'
|
||||
Sample metrics at the given interval using the provided labels
|
||||
'>
|
||||
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
||||
'params' {
|
||||
// Signature params description
|
||||
'a' 'Map containing read/write tokens'
|
||||
'o' 'Map containing operation info'
|
||||
'r' 'Generated metrics GTS'
|
||||
}
|
||||
'examples' [
|
||||
<'
|
||||
|
||||
'>
|
||||
]
|
||||
} 'info' STORE
|
||||
|
||||
<%
|
||||
!$info INFO
|
||||
SAVE 'context' STORE
|
||||
<%
|
||||
JSON-> 'operation_info' STORE
|
||||
'auth_info_json' STORE
|
||||
$auth_info_json JSON-> 'auth_info' STORE
|
||||
|
||||
$auth_info 'write' GET 'write_token' STORE
|
||||
$auth_info 'read' GET 'read_token' STORE
|
||||
|
||||
$operation_info 'start' GET TOLONG 'startTimestamp' STORE
|
||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||
$operation_info 'labels' GET 'filterLabels' STORE
|
||||
$operation_info 'sampleClass' GET 'sampleClass' STORE
|
||||
$operation_info 'size' GET 'sampleSize' STORE
|
||||
|
||||
'Sampling records from ' $startTimestamp TOSTRING + ' - ' + $endTimestamp TOSTRING + ' ' + $filterLabels ->JSON + LOGMSG
|
||||
|
||||
$auth_info_json { 'labels' $filterLabels 'end' $startTimestamp } ->JSON @utapi/getMetricsAt 'metric' STORE
|
||||
$startTimestamp $sampleSize ADDDURATION 'currentTimestamp' STORE
|
||||
NEWGTS $sampleClass RENAME
|
||||
$filterLabels RELABEL
|
||||
<% $currentTimestamp $sampleSize ADDDURATION $endTimestamp <= %>
|
||||
<%
|
||||
'Sampling metrics for ' $filterLabels ->JSON + ' at ' + $currentTimestamp TOSTRING + LOGMSG
|
||||
$auth_info_json { 'labels' $filterLabels 'end' $startTimestamp 'prev' $metric } ->JSON @utapi/getMetricsAt 'metric' STORE
|
||||
$currentTimestamp NaN NaN NaN $metric @utapi/encodeRecord ADDVALUE
|
||||
$currentTimestamp $sampleSize ADDDURATION 'currentTimestamp' STORE
|
||||
%>
|
||||
WHILE
|
||||
%>
|
||||
<% // catch any exception
|
||||
RETHROW
|
||||
%>
|
||||
<% // finally, restore the context
|
||||
$context RESTORE
|
||||
%> TRY
|
||||
%>
|
||||
'macro' STORE
|
||||
|
||||
// Unit tests
|
||||
|
||||
$macro
|
Loading…
Reference in New Issue