Compare commits

...

1 Commits

Author SHA1 Message Date
Jonathan Gramain cbc28e8bca bf: UTAPI-105 UtapiReindex: use list of redis sentinels
Use a list of Redis sentinels that are running on stateful nodes only,
instead of localhost.

Previously, a stateless-only node wouldn't have a local sentinel node
running, causing UtapiReindex to fail.

Added a failover mechanism in case of connection error on the current
sentinel, to try each other one in turn.
2024-06-25 14:49:53 -07:00
3 changed files with 71 additions and 37 deletions

View File

@ -16,15 +16,19 @@ const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== un
? process.env.REINDEX_PYTHON_INTERPRETER
: 'python3.7';
const EXIT_CODE_SENTINEL_CONNECTION = 100;
class UtapiReindex {
constructor(config) {
this._enabled = false;
this._schedule = REINDEX_SCHEDULE;
this._sentinel = {
host: '127.0.0.1',
port: 16379,
this._redis = {
name: 'scality-s3',
sentinelPassword: '',
sentinels: [{
host: '127.0.0.1',
port: 16379,
}],
};
this._bucketd = {
host: '127.0.0.1',
@ -42,14 +46,13 @@ class UtapiReindex {
if (config && config.password) {
this._password = config.password;
}
if (config && config.sentinel) {
if (config && config.redis) {
const {
host, port, name, sentinelPassword,
} = config.sentinel;
this._sentinel.host = host || this._sentinel.host;
this._sentinel.port = port || this._sentinel.port;
this._sentinel.name = name || this._sentinel.name;
this._sentinel.sentinelPassword = sentinelPassword || this._sentinel.sentinelPassword;
name, sentinelPassword, sentinels,
} = config.redis;
this._redis.name = name || this._redis.name;
this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword;
this._redis.sentinels = sentinels || this._redis.sentinels;
}
if (config && config.bucketd) {
const { host, port } = config.bucketd;
@ -68,12 +71,9 @@ class UtapiReindex {
_getRedisClient() {
const client = new RedisClient({
sentinels: [{
host: this._sentinel.host,
port: this._sentinel.port,
}],
name: this._sentinel.name,
sentinelPassword: this._sentinel.sentinelPassword,
sentinels: this._redis.sentinels,
name: this._redis.name,
sentinelPassword: this._redis.sentinelPassword,
password: this._password,
});
client.connect();
@ -88,16 +88,16 @@ class UtapiReindex {
return this.ds.del(REINDEX_LOCK_KEY);
}
_buildFlags() {
_buildFlags(sentinel) {
const flags = {
/* eslint-disable camelcase */
sentinel_ip: this._sentinel.host,
sentinel_port: this._sentinel.port,
sentinel_cluster_name: this._sentinel.name,
sentinel_ip: sentinel.host,
sentinel_port: sentinel.port,
sentinel_cluster_name: this._redis.name,
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
};
if (this._sentinel.sentinelPassword) {
flags.redis_password = this._sentinel.sentinelPassword;
if (this._redis.sentinelPassword) {
flags.redis_password = this._redis.sentinelPassword;
}
/* eslint-enable camelcase */
@ -115,8 +115,8 @@ class UtapiReindex {
return opts;
}
_runScript(path, done) {
const flags = this._buildFlags();
_runScriptWithSentinels(path, remainingSentinels, done) {
const flags = this._buildFlags(remainingSentinels.shift());
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
process.stdout.on('data', data => {
@ -143,6 +143,17 @@ class UtapiReindex {
statusCode: code,
script: path,
});
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
if (remainingSentinels.length > 0) {
this._requestLogger.info('retrying with next sentinel host', {
script: path,
});
return this._runScriptWithSentinels(path, remainingSentinels, done);
}
this._requestLogger.error('no more sentinel host to try', {
script: path,
});
}
} else {
this._requestLogger.info('script exited successfully', {
statusCode: code,
@ -153,6 +164,11 @@ class UtapiReindex {
});
}
_runScript(path, done) {
const remainingSentinels = [...this._redis.sentinels];
this._runScriptWithSentinels(path, remainingSentinels, done);
}
_attemptLock(job) {
this._requestLogger.info('attempting to acquire the lock to begin job');
this._lock()

View File

@ -1,16 +1,20 @@
import requests
import redis
import json
import argparse
import ast
import sys
import time
import urllib
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import re
import redis
import requests
import sys
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
import time
import urllib
import argparse
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger('utapi-reindex:reporting')
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options():
parser = argparse.ArgumentParser()
@ -29,8 +33,13 @@ class askRedis():
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
self._password = password
r = redis.Redis(host=ip, port=port, db=0, password=password)
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
r = redis.Redis(host=ip, port=port, db=0, password=password, socket_connect_timeout=10)
try:
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
def read(self, resource, name):
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)

View File

@ -25,6 +25,8 @@ MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
ACCOUNT_UPDATE_CHUNKSIZE = 100
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options():
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
@ -421,9 +423,16 @@ def get_redis_client(options):
host=options.sentinel_ip,
port=options.sentinel_port,
db=0,
password=options.redis_password
password=options.redis_password,
socket_connect_timeout=10
)
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
try:
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
return redis.Redis(
host=ip,
port=port,