Compare commits

...

1 Commits

Author SHA1 Message Date
Jonathan Gramain cbc28e8bca bf: UTAPI-105 UtapiReindex: use list of redis sentinels
Use a list of Redis sentinels that are running on stateful nodes only,
instead of localhost.

Previously, a stateless-only node wouldn't have a local sentinel node
running, causing UtapiReindex to fail.

Added a failover mechanism in case of connection error on the current
sentinel, to try each other one in turn.
2024-06-25 14:49:53 -07:00
3 changed files with 71 additions and 37 deletions

View File

@ -16,15 +16,19 @@ const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== un
? process.env.REINDEX_PYTHON_INTERPRETER ? process.env.REINDEX_PYTHON_INTERPRETER
: 'python3.7'; : 'python3.7';
const EXIT_CODE_SENTINEL_CONNECTION = 100;
class UtapiReindex { class UtapiReindex {
constructor(config) { constructor(config) {
this._enabled = false; this._enabled = false;
this._schedule = REINDEX_SCHEDULE; this._schedule = REINDEX_SCHEDULE;
this._sentinel = { this._redis = {
host: '127.0.0.1',
port: 16379,
name: 'scality-s3', name: 'scality-s3',
sentinelPassword: '', sentinelPassword: '',
sentinels: [{
host: '127.0.0.1',
port: 16379,
}],
}; };
this._bucketd = { this._bucketd = {
host: '127.0.0.1', host: '127.0.0.1',
@ -42,14 +46,13 @@ class UtapiReindex {
if (config && config.password) { if (config && config.password) {
this._password = config.password; this._password = config.password;
} }
if (config && config.sentinel) { if (config && config.redis) {
const { const {
host, port, name, sentinelPassword, name, sentinelPassword, sentinels,
} = config.sentinel; } = config.redis;
this._sentinel.host = host || this._sentinel.host; this._redis.name = name || this._redis.name;
this._sentinel.port = port || this._sentinel.port; this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword;
this._sentinel.name = name || this._sentinel.name; this._redis.sentinels = sentinels || this._redis.sentinels;
this._sentinel.sentinelPassword = sentinelPassword || this._sentinel.sentinelPassword;
} }
if (config && config.bucketd) { if (config && config.bucketd) {
const { host, port } = config.bucketd; const { host, port } = config.bucketd;
@ -68,12 +71,9 @@ class UtapiReindex {
_getRedisClient() { _getRedisClient() {
const client = new RedisClient({ const client = new RedisClient({
sentinels: [{ sentinels: this._redis.sentinels,
host: this._sentinel.host, name: this._redis.name,
port: this._sentinel.port, sentinelPassword: this._redis.sentinelPassword,
}],
name: this._sentinel.name,
sentinelPassword: this._sentinel.sentinelPassword,
password: this._password, password: this._password,
}); });
client.connect(); client.connect();
@ -88,16 +88,16 @@ class UtapiReindex {
return this.ds.del(REINDEX_LOCK_KEY); return this.ds.del(REINDEX_LOCK_KEY);
} }
_buildFlags() { _buildFlags(sentinel) {
const flags = { const flags = {
/* eslint-disable camelcase */ /* eslint-disable camelcase */
sentinel_ip: this._sentinel.host, sentinel_ip: sentinel.host,
sentinel_port: this._sentinel.port, sentinel_port: sentinel.port,
sentinel_cluster_name: this._sentinel.name, sentinel_cluster_name: this._redis.name,
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`, bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
}; };
if (this._sentinel.sentinelPassword) { if (this._redis.sentinelPassword) {
flags.redis_password = this._sentinel.sentinelPassword; flags.redis_password = this._redis.sentinelPassword;
} }
/* eslint-enable camelcase */ /* eslint-enable camelcase */
@ -115,8 +115,8 @@ class UtapiReindex {
return opts; return opts;
} }
_runScript(path, done) { _runScriptWithSentinels(path, remainingSentinels, done) {
const flags = this._buildFlags(); const flags = this._buildFlags(remainingSentinels.shift());
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`); this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]); const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
process.stdout.on('data', data => { process.stdout.on('data', data => {
@ -143,6 +143,17 @@ class UtapiReindex {
statusCode: code, statusCode: code,
script: path, script: path,
}); });
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
if (remainingSentinels.length > 0) {
this._requestLogger.info('retrying with next sentinel host', {
script: path,
});
return this._runScriptWithSentinels(path, remainingSentinels, done);
}
this._requestLogger.error('no more sentinel host to try', {
script: path,
});
}
} else { } else {
this._requestLogger.info('script exited successfully', { this._requestLogger.info('script exited successfully', {
statusCode: code, statusCode: code,
@ -153,6 +164,11 @@ class UtapiReindex {
}); });
} }
_runScript(path, done) {
const remainingSentinels = [...this._redis.sentinels];
this._runScriptWithSentinels(path, remainingSentinels, done);
}
_attemptLock(job) { _attemptLock(job) {
this._requestLogger.info('attempting to acquire the lock to begin job'); this._requestLogger.info('attempting to acquire the lock to begin job');
this._lock() this._lock()

View File

@ -1,16 +1,20 @@
import requests import argparse
import redis
import json
import ast import ast
import sys from concurrent.futures import ThreadPoolExecutor
import time import json
import urllib import logging
import re import re
import redis
import requests
import sys import sys
from threading import Thread from threading import Thread
from concurrent.futures import ThreadPoolExecutor import time
import urllib
import argparse logging.basicConfig(level=logging.INFO)
_log = logging.getLogger('utapi-reindex:reporting')
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options(): def get_options():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -29,8 +33,13 @@ class askRedis():
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None): def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
self._password = password self._password = password
r = redis.Redis(host=ip, port=port, db=0, password=password) r = redis.Redis(host=ip, port=port, db=0, password=password, socket_connect_timeout=10)
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name) try:
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
def read(self, resource, name): def read(self, resource, name):
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password) r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)
@ -98,4 +107,4 @@ if __name__ == '__main__':
data = U.read('accounts', userid) data = U.read('accounts', userid)
content = "Account:%s|NumberOFfiles:%s|StorageCapacity:%s " % ( content = "Account:%s|NumberOFfiles:%s|StorageCapacity:%s " % (
userid, data["files"], data["total_size"]) userid, data["files"], data["total_size"])
executor.submit(safe_print, content) executor.submit(safe_print, content)

View File

@ -25,6 +25,8 @@ MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
ACCOUNT_UPDATE_CHUNKSIZE = 100 ACCOUNT_UPDATE_CHUNKSIZE = 100
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options(): def get_options():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP") parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
@ -421,9 +423,16 @@ def get_redis_client(options):
host=options.sentinel_ip, host=options.sentinel_ip,
port=options.sentinel_port, port=options.sentinel_port,
db=0, db=0,
password=options.redis_password password=options.redis_password,
socket_connect_timeout=10
) )
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name) try:
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
return redis.Redis( return redis.Redis(
host=ip, host=ip,
port=port, port=port,