Compare commits
1 Commits
developmen
...
user/jonat
Author | SHA1 | Date |
---|---|---|
Jonathan Gramain | cbc28e8bca |
|
@ -16,15 +16,19 @@ const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== un
|
|||
? process.env.REINDEX_PYTHON_INTERPRETER
|
||||
: 'python3.7';
|
||||
|
||||
const EXIT_CODE_SENTINEL_CONNECTION = 100;
|
||||
|
||||
class UtapiReindex {
|
||||
constructor(config) {
|
||||
this._enabled = false;
|
||||
this._schedule = REINDEX_SCHEDULE;
|
||||
this._sentinel = {
|
||||
host: '127.0.0.1',
|
||||
port: 16379,
|
||||
this._redis = {
|
||||
name: 'scality-s3',
|
||||
sentinelPassword: '',
|
||||
sentinels: [{
|
||||
host: '127.0.0.1',
|
||||
port: 16379,
|
||||
}],
|
||||
};
|
||||
this._bucketd = {
|
||||
host: '127.0.0.1',
|
||||
|
@ -42,14 +46,13 @@ class UtapiReindex {
|
|||
if (config && config.password) {
|
||||
this._password = config.password;
|
||||
}
|
||||
if (config && config.sentinel) {
|
||||
if (config && config.redis) {
|
||||
const {
|
||||
host, port, name, sentinelPassword,
|
||||
} = config.sentinel;
|
||||
this._sentinel.host = host || this._sentinel.host;
|
||||
this._sentinel.port = port || this._sentinel.port;
|
||||
this._sentinel.name = name || this._sentinel.name;
|
||||
this._sentinel.sentinelPassword = sentinelPassword || this._sentinel.sentinelPassword;
|
||||
name, sentinelPassword, sentinels,
|
||||
} = config.redis;
|
||||
this._redis.name = name || this._redis.name;
|
||||
this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword;
|
||||
this._redis.sentinels = sentinels || this._redis.sentinels;
|
||||
}
|
||||
if (config && config.bucketd) {
|
||||
const { host, port } = config.bucketd;
|
||||
|
@ -68,12 +71,9 @@ class UtapiReindex {
|
|||
|
||||
_getRedisClient() {
|
||||
const client = new RedisClient({
|
||||
sentinels: [{
|
||||
host: this._sentinel.host,
|
||||
port: this._sentinel.port,
|
||||
}],
|
||||
name: this._sentinel.name,
|
||||
sentinelPassword: this._sentinel.sentinelPassword,
|
||||
sentinels: this._redis.sentinels,
|
||||
name: this._redis.name,
|
||||
sentinelPassword: this._redis.sentinelPassword,
|
||||
password: this._password,
|
||||
});
|
||||
client.connect();
|
||||
|
@ -88,16 +88,16 @@ class UtapiReindex {
|
|||
return this.ds.del(REINDEX_LOCK_KEY);
|
||||
}
|
||||
|
||||
_buildFlags() {
|
||||
_buildFlags(sentinel) {
|
||||
const flags = {
|
||||
/* eslint-disable camelcase */
|
||||
sentinel_ip: this._sentinel.host,
|
||||
sentinel_port: this._sentinel.port,
|
||||
sentinel_cluster_name: this._sentinel.name,
|
||||
sentinel_ip: sentinel.host,
|
||||
sentinel_port: sentinel.port,
|
||||
sentinel_cluster_name: this._redis.name,
|
||||
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
|
||||
};
|
||||
if (this._sentinel.sentinelPassword) {
|
||||
flags.redis_password = this._sentinel.sentinelPassword;
|
||||
if (this._redis.sentinelPassword) {
|
||||
flags.redis_password = this._redis.sentinelPassword;
|
||||
}
|
||||
|
||||
/* eslint-enable camelcase */
|
||||
|
@ -115,8 +115,8 @@ class UtapiReindex {
|
|||
return opts;
|
||||
}
|
||||
|
||||
_runScript(path, done) {
|
||||
const flags = this._buildFlags();
|
||||
_runScriptWithSentinels(path, remainingSentinels, done) {
|
||||
const flags = this._buildFlags(remainingSentinels.shift());
|
||||
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
|
||||
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
|
||||
process.stdout.on('data', data => {
|
||||
|
@ -143,6 +143,17 @@ class UtapiReindex {
|
|||
statusCode: code,
|
||||
script: path,
|
||||
});
|
||||
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
|
||||
if (remainingSentinels.length > 0) {
|
||||
this._requestLogger.info('retrying with next sentinel host', {
|
||||
script: path,
|
||||
});
|
||||
return this._runScriptWithSentinels(path, remainingSentinels, done);
|
||||
}
|
||||
this._requestLogger.error('no more sentinel host to try', {
|
||||
script: path,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
this._requestLogger.info('script exited successfully', {
|
||||
statusCode: code,
|
||||
|
@ -153,6 +164,11 @@ class UtapiReindex {
|
|||
});
|
||||
}
|
||||
|
||||
_runScript(path, done) {
|
||||
const remainingSentinels = [...this._redis.sentinels];
|
||||
this._runScriptWithSentinels(path, remainingSentinels, done);
|
||||
}
|
||||
|
||||
_attemptLock(job) {
|
||||
this._requestLogger.info('attempting to acquire the lock to begin job');
|
||||
this._lock()
|
||||
|
|
|
@ -1,16 +1,20 @@
|
|||
import requests
|
||||
import redis
|
||||
import json
|
||||
import argparse
|
||||
import ast
|
||||
import sys
|
||||
import time
|
||||
import urllib
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import redis
|
||||
import requests
|
||||
import sys
|
||||
from threading import Thread
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import time
|
||||
import urllib
|
||||
|
||||
import argparse
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
_log = logging.getLogger('utapi-reindex:reporting')
|
||||
|
||||
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
|
||||
|
||||
def get_options():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
@ -29,8 +33,13 @@ class askRedis():
|
|||
|
||||
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
|
||||
self._password = password
|
||||
r = redis.Redis(host=ip, port=port, db=0, password=password)
|
||||
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
|
||||
r = redis.Redis(host=ip, port=port, db=0, password=password, socket_connect_timeout=10)
|
||||
try:
|
||||
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
|
||||
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
|
||||
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
|
||||
# use a specific error code to hint on retrying with another sentinel node
|
||||
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
|
||||
|
||||
def read(self, resource, name):
|
||||
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)
|
||||
|
@ -98,4 +107,4 @@ if __name__ == '__main__':
|
|||
data = U.read('accounts', userid)
|
||||
content = "Account:%s|NumberOFfiles:%s|StorageCapacity:%s " % (
|
||||
userid, data["files"], data["total_size"])
|
||||
executor.submit(safe_print, content)
|
||||
executor.submit(safe_print, content)
|
||||
|
|
|
@ -25,6 +25,8 @@ MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
|
|||
|
||||
ACCOUNT_UPDATE_CHUNKSIZE = 100
|
||||
|
||||
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
|
||||
|
||||
def get_options():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
|
||||
|
@ -421,9 +423,16 @@ def get_redis_client(options):
|
|||
host=options.sentinel_ip,
|
||||
port=options.sentinel_port,
|
||||
db=0,
|
||||
password=options.redis_password
|
||||
password=options.redis_password,
|
||||
socket_connect_timeout=10
|
||||
)
|
||||
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
|
||||
try:
|
||||
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
|
||||
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
|
||||
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
|
||||
# use a specific error code to hint on retrying with another sentinel node
|
||||
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
|
||||
|
||||
return redis.Redis(
|
||||
host=ip,
|
||||
port=port,
|
||||
|
|
Loading…
Reference in New Issue