Compare commits
1 Commits
developmen
...
user/jonat
Author | SHA1 | Date |
---|---|---|
Jonathan Gramain | cbc28e8bca |
|
@ -16,15 +16,19 @@ const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== un
|
||||||
? process.env.REINDEX_PYTHON_INTERPRETER
|
? process.env.REINDEX_PYTHON_INTERPRETER
|
||||||
: 'python3.7';
|
: 'python3.7';
|
||||||
|
|
||||||
|
const EXIT_CODE_SENTINEL_CONNECTION = 100;
|
||||||
|
|
||||||
class UtapiReindex {
|
class UtapiReindex {
|
||||||
constructor(config) {
|
constructor(config) {
|
||||||
this._enabled = false;
|
this._enabled = false;
|
||||||
this._schedule = REINDEX_SCHEDULE;
|
this._schedule = REINDEX_SCHEDULE;
|
||||||
this._sentinel = {
|
this._redis = {
|
||||||
host: '127.0.0.1',
|
|
||||||
port: 16379,
|
|
||||||
name: 'scality-s3',
|
name: 'scality-s3',
|
||||||
sentinelPassword: '',
|
sentinelPassword: '',
|
||||||
|
sentinels: [{
|
||||||
|
host: '127.0.0.1',
|
||||||
|
port: 16379,
|
||||||
|
}],
|
||||||
};
|
};
|
||||||
this._bucketd = {
|
this._bucketd = {
|
||||||
host: '127.0.0.1',
|
host: '127.0.0.1',
|
||||||
|
@ -42,14 +46,13 @@ class UtapiReindex {
|
||||||
if (config && config.password) {
|
if (config && config.password) {
|
||||||
this._password = config.password;
|
this._password = config.password;
|
||||||
}
|
}
|
||||||
if (config && config.sentinel) {
|
if (config && config.redis) {
|
||||||
const {
|
const {
|
||||||
host, port, name, sentinelPassword,
|
name, sentinelPassword, sentinels,
|
||||||
} = config.sentinel;
|
} = config.redis;
|
||||||
this._sentinel.host = host || this._sentinel.host;
|
this._redis.name = name || this._redis.name;
|
||||||
this._sentinel.port = port || this._sentinel.port;
|
this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword;
|
||||||
this._sentinel.name = name || this._sentinel.name;
|
this._redis.sentinels = sentinels || this._redis.sentinels;
|
||||||
this._sentinel.sentinelPassword = sentinelPassword || this._sentinel.sentinelPassword;
|
|
||||||
}
|
}
|
||||||
if (config && config.bucketd) {
|
if (config && config.bucketd) {
|
||||||
const { host, port } = config.bucketd;
|
const { host, port } = config.bucketd;
|
||||||
|
@ -68,12 +71,9 @@ class UtapiReindex {
|
||||||
|
|
||||||
_getRedisClient() {
|
_getRedisClient() {
|
||||||
const client = new RedisClient({
|
const client = new RedisClient({
|
||||||
sentinels: [{
|
sentinels: this._redis.sentinels,
|
||||||
host: this._sentinel.host,
|
name: this._redis.name,
|
||||||
port: this._sentinel.port,
|
sentinelPassword: this._redis.sentinelPassword,
|
||||||
}],
|
|
||||||
name: this._sentinel.name,
|
|
||||||
sentinelPassword: this._sentinel.sentinelPassword,
|
|
||||||
password: this._password,
|
password: this._password,
|
||||||
});
|
});
|
||||||
client.connect();
|
client.connect();
|
||||||
|
@ -88,16 +88,16 @@ class UtapiReindex {
|
||||||
return this.ds.del(REINDEX_LOCK_KEY);
|
return this.ds.del(REINDEX_LOCK_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
_buildFlags() {
|
_buildFlags(sentinel) {
|
||||||
const flags = {
|
const flags = {
|
||||||
/* eslint-disable camelcase */
|
/* eslint-disable camelcase */
|
||||||
sentinel_ip: this._sentinel.host,
|
sentinel_ip: sentinel.host,
|
||||||
sentinel_port: this._sentinel.port,
|
sentinel_port: sentinel.port,
|
||||||
sentinel_cluster_name: this._sentinel.name,
|
sentinel_cluster_name: this._redis.name,
|
||||||
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
|
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
|
||||||
};
|
};
|
||||||
if (this._sentinel.sentinelPassword) {
|
if (this._redis.sentinelPassword) {
|
||||||
flags.redis_password = this._sentinel.sentinelPassword;
|
flags.redis_password = this._redis.sentinelPassword;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* eslint-enable camelcase */
|
/* eslint-enable camelcase */
|
||||||
|
@ -115,8 +115,8 @@ class UtapiReindex {
|
||||||
return opts;
|
return opts;
|
||||||
}
|
}
|
||||||
|
|
||||||
_runScript(path, done) {
|
_runScriptWithSentinels(path, remainingSentinels, done) {
|
||||||
const flags = this._buildFlags();
|
const flags = this._buildFlags(remainingSentinels.shift());
|
||||||
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
|
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
|
||||||
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
|
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
|
||||||
process.stdout.on('data', data => {
|
process.stdout.on('data', data => {
|
||||||
|
@ -143,6 +143,17 @@ class UtapiReindex {
|
||||||
statusCode: code,
|
statusCode: code,
|
||||||
script: path,
|
script: path,
|
||||||
});
|
});
|
||||||
|
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
|
||||||
|
if (remainingSentinels.length > 0) {
|
||||||
|
this._requestLogger.info('retrying with next sentinel host', {
|
||||||
|
script: path,
|
||||||
|
});
|
||||||
|
return this._runScriptWithSentinels(path, remainingSentinels, done);
|
||||||
|
}
|
||||||
|
this._requestLogger.error('no more sentinel host to try', {
|
||||||
|
script: path,
|
||||||
|
});
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
this._requestLogger.info('script exited successfully', {
|
this._requestLogger.info('script exited successfully', {
|
||||||
statusCode: code,
|
statusCode: code,
|
||||||
|
@ -153,6 +164,11 @@ class UtapiReindex {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_runScript(path, done) {
|
||||||
|
const remainingSentinels = [...this._redis.sentinels];
|
||||||
|
this._runScriptWithSentinels(path, remainingSentinels, done);
|
||||||
|
}
|
||||||
|
|
||||||
_attemptLock(job) {
|
_attemptLock(job) {
|
||||||
this._requestLogger.info('attempting to acquire the lock to begin job');
|
this._requestLogger.info('attempting to acquire the lock to begin job');
|
||||||
this._lock()
|
this._lock()
|
||||||
|
|
|
@ -1,16 +1,20 @@
|
||||||
import requests
|
import argparse
|
||||||
import redis
|
|
||||||
import json
|
|
||||||
import ast
|
import ast
|
||||||
import sys
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
import time
|
import json
|
||||||
import urllib
|
import logging
|
||||||
import re
|
import re
|
||||||
|
import redis
|
||||||
|
import requests
|
||||||
import sys
|
import sys
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
import time
|
||||||
|
import urllib
|
||||||
|
|
||||||
import argparse
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
_log = logging.getLogger('utapi-reindex:reporting')
|
||||||
|
|
||||||
|
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
|
||||||
|
|
||||||
def get_options():
|
def get_options():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
|
@ -29,8 +33,13 @@ class askRedis():
|
||||||
|
|
||||||
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
|
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
|
||||||
self._password = password
|
self._password = password
|
||||||
r = redis.Redis(host=ip, port=port, db=0, password=password)
|
r = redis.Redis(host=ip, port=port, db=0, password=password, socket_connect_timeout=10)
|
||||||
|
try:
|
||||||
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
|
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
|
||||||
|
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
|
||||||
|
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
|
||||||
|
# use a specific error code to hint on retrying with another sentinel node
|
||||||
|
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
|
||||||
|
|
||||||
def read(self, resource, name):
|
def read(self, resource, name):
|
||||||
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)
|
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)
|
||||||
|
|
|
@ -25,6 +25,8 @@ MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
|
||||||
|
|
||||||
ACCOUNT_UPDATE_CHUNKSIZE = 100
|
ACCOUNT_UPDATE_CHUNKSIZE = 100
|
||||||
|
|
||||||
|
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
|
||||||
|
|
||||||
def get_options():
|
def get_options():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
|
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
|
||||||
|
@ -421,9 +423,16 @@ def get_redis_client(options):
|
||||||
host=options.sentinel_ip,
|
host=options.sentinel_ip,
|
||||||
port=options.sentinel_port,
|
port=options.sentinel_port,
|
||||||
db=0,
|
db=0,
|
||||||
password=options.redis_password
|
password=options.redis_password,
|
||||||
|
socket_connect_timeout=10
|
||||||
)
|
)
|
||||||
|
try:
|
||||||
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
|
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
|
||||||
|
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
|
||||||
|
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
|
||||||
|
# use a specific error code to hint on retrying with another sentinel node
|
||||||
|
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
|
||||||
|
|
||||||
return redis.Redis(
|
return redis.Redis(
|
||||||
host=ip,
|
host=ip,
|
||||||
port=port,
|
port=port,
|
||||||
|
|
Loading…
Reference in New Issue