Compare commits

...

5 Commits

Author SHA1 Message Date
Taylor McKinnon f7c9758b9c add default enableReadyCheck flag to redis client 2021-08-02 12:05:49 -07:00
Taylor McKinnon 1542962d72 add handlers for close, end, and reconnect events 2021-08-02 12:05:05 -07:00
Taylor McKinnon 9bbec85160 bf(S3C-1997): Fix ioredis failover for sentinels
(cherry picked from commit 4af996c637)
(cherry picked from commit 1cb7e880df823c512e3cbfe651465ce05b593dbf)
2021-08-02 09:33:17 -07:00
Taylor McKinnon 41e183d434 linting 2021-08-02 09:16:17 -07:00
Taylor McKinnon 2b4bd85e27 bf(S3C-4205): prevent concurrent replays 2021-07-29 11:17:40 -07:00
12 changed files with 364 additions and 61 deletions

View File

@ -33,7 +33,10 @@ class Datastore {
* @return {undefined}
*/
set(key, value, cb) {
return this._client.set(key, value, cb);
return this._client.call(
(backend, done) => backend.set(key, value, done),
cb,
);
}
/**
@ -45,7 +48,7 @@ class Datastore {
*/
setExpire(key, value, ttl) {
// This method is a Promise because no callback is given.
return this._client.set(key, value, 'EX', ttl, 'NX');
return this._client.call(backend => backend.set(key, value, 'EX', ttl, 'NX'));
}
/**
@ -54,7 +57,8 @@ class Datastore {
* @return {undefined}
*/
del(key) {
return this._client.del(key);
// This method is a Promise because no callback is given.
return this._client.call(backend => backend.del(key));
}
/**
@ -64,7 +68,7 @@ class Datastore {
* @return {undefined}
*/
get(key, cb) {
return this._client.get(key, cb);
return this._client.call((backend, done) => backend.get(key, done), cb);
}
/**
@ -74,7 +78,7 @@ class Datastore {
* @return {undefined}
*/
incr(key, cb) {
return this._client.incr(key, cb);
return this._client.call((backend, done) => backend.incr(key, done), cb);
}
/**
@ -84,7 +88,7 @@ class Datastore {
* @return {undefined}
*/
decr(key, cb) {
return this._client.decr(key, cb);
return this._client.call((backend, done) => backend.decr(key, done), cb);
}
/**
@ -95,7 +99,7 @@ class Datastore {
* @return {undefined}
*/
decrby(key, value, cb) {
return this._client.decrby(key, value, cb);
return this._client.call((backend, done) => backend.decrby(key, value, done), cb);
}
/**
@ -107,7 +111,7 @@ class Datastore {
* @return {undefined}
*/
zadd(key, score, value, cb) {
return this._client.zadd(key, score, value, cb);
return this._client.call((backend, done) => backend.zadd(key, score, value, done), cb);
}
/**
@ -120,7 +124,7 @@ class Datastore {
* @return {undefined}
*/
zrange(key, min, max, cb) {
return this._client.zrange(key, min, max, cb);
return this._client.call((backend, done) => backend.zrange(key, min, max, done), cb);
}
/**
@ -133,7 +137,7 @@ class Datastore {
* @return {undefined}
*/
zrangebyscore(key, min, max, cb) {
return this._client.zrangebyscore(key, min, max, cb);
return this._client.call((backend, done) => backend.zrangebyscore(key, min, max, done), cb);
}
/**
@ -146,8 +150,12 @@ class Datastore {
* @return {undefined}
*/
bZrangebyscore(keys, min, max, cb) {
return this._client.pipeline(keys.map(
item => ['zrangebyscore', item, min, max])).exec(cb);
return this._client.call(
(backend, done) => backend
.pipeline(keys.map(item => ['zrangebyscore', item, min, max]))
.exec(done),
cb,
);
}
/**
@ -157,7 +165,9 @@ class Datastore {
* @return {undefined}
*/
batch(cmds, cb) {
return this._client.multi(cmds).exec(cb);
return this._client.call((backend, done) => {
backend.multi(cmds).exec(done);
}, cb);
}
/**
@ -167,7 +177,7 @@ class Datastore {
* @return {undefined}
*/
pipeline(cmds, cb) {
return this._client.pipeline(cmds).exec(cb);
return this._client.call((backend, done) => backend.pipeline(cmds).exec(done), cb);
}
/**
@ -177,20 +187,21 @@ class Datastore {
* @return {undefined}
*/
multi(cmds, cb) {
return this._client.multi(cmds).exec((err, res) => {
if (err) {
return cb(err);
}
const flattenRes = [];
const resErr = res.filter(item => {
flattenRes.push(item[1]);
return item[0] !== null;
});
if (resErr && resErr.length > 0) {
return cb(resErr);
}
return cb(null, flattenRes);
});
return this._client.call((backend, done) =>
backend.multi(cmds).exec((err, res) => {
if (err) {
return done(err);
}
const flattenRes = [];
const resErr = res.filter(item => {
flattenRes.push(item[1]);
return item[0] !== null;
});
if (resErr && resErr.length > 0) {
return done(resErr);
}
return done(null, flattenRes);
}), cb);
}
/**
@ -203,7 +214,7 @@ class Datastore {
* @return {undefined}
*/
zremrangebyscore(key, min, max, cb) {
return this._client.zremrangebyscore(key, min, max, cb);
return this._client.call((backend, done) => backend.zremrangebyscore(key, min, max, done), cb);
}
/**
@ -214,7 +225,7 @@ class Datastore {
* @return {undefined}
*/
lpush(key, val, cb) {
return this._client.lpush(key, val, cb);
return this._client.call((backend, done) => backend.lpush(key, val, done), cb);
}
/**
@ -224,7 +235,7 @@ class Datastore {
* @return {undefined}
*/
rpop(key, cb) {
return this._client.rpop(key, cb);
return this._client.call((backend, done) => backend.rpop(key, done), cb);
}
/**
@ -236,7 +247,7 @@ class Datastore {
* @return {undefined}
*/
lrange(key, start, stop, cb) {
return this._client.lrange(key, start, stop, cb);
return this._client.call((backend, done) => backend.lrange(key, start, stop, done), cb);
}
/**
@ -246,7 +257,7 @@ class Datastore {
* @return {undefined}
*/
llen(key, cb) {
return this._client.llen(key, cb);
return this._client.call((backend, done) => backend.llen(key, done), cb);
}
/**
@ -257,7 +268,7 @@ class Datastore {
* @return {undefined}
*/
publish(channel, message, cb) {
return this._client.publish(channel, message, cb);
return this._client.call((backend, done) => backend.publish(channel, message, done), cb);
}
/**
@ -268,7 +279,7 @@ class Datastore {
* @return {undefined}
*/
scan(cursor, pattern, cb) {
return this._client.scan(cursor, 'match', pattern, cb);
return this._client.call((backend, done) => backend.scan(cursor, 'match', pattern, done), cb);
}
}

View File

@ -4,7 +4,7 @@ const werelogs = require('werelogs');
const Datastore = require('./Datastore');
const { generateKey, generateCounter, generateStateKey } = require('./schema');
const { errors } = require('arsenal');
const redisClient = require('../utils/redisClient');
const redisClientv2 = require('../utils/redisClientv2');
const member = require('../utils/member');
const methods = {
@ -104,11 +104,11 @@ class UtapiClient {
}
if (config.redis) {
this.ds = new Datastore()
.setClient(redisClient(config.redis, this.log));
.setClient(redisClientv2(config.redis, this.log));
}
if (config.localCache) {
this.localCache = new Datastore()
.setClient(redisClient(config.localCache, this.log));
.setClient(redisClientv2(config.localCache, this.log));
}
if (config.component) {
// The configuration uses the property `component`, while

View File

@ -7,7 +7,7 @@ const { jsutil } = require('arsenal');
const werelogs = require('werelogs');
const Datastore = require('./Datastore');
const redisClient = require('../utils/redisClient');
const RedisClient = require('../libV2/redis');
const REINDEX_SCHEDULE = '0 0 * * Sun';
const REINDEX_LOCK_KEY = 's3:utapireindex:lock';
@ -62,7 +62,7 @@ class UtapiReindex {
}
_getRedisClient() {
return redisClient({
const client = new RedisClient({
sentinels: [{
host: this._sentinel.host,
port: this._sentinel.port,
@ -70,7 +70,9 @@ class UtapiReindex {
name: this._sentinel.name,
sentinelPassword: this._sentinel.sentinelPassword,
password: this._password,
}, this._log);
});
client.connect();
return client;
}
_lock() {

View File

@ -1,11 +1,12 @@
/* eslint-disable no-underscore-dangle */
const assert = require('assert');
const async = require('async');
const { scheduleJob } = require('node-schedule');
const UtapiClient = require('./UtapiClient');
const Datastore = require('./Datastore');
const redisClient = require('../utils/redisClient');
const safeJsonParse = require('../utils/safeJsonParse');
const werelogs = require('werelogs');
const redisClientv2 = require('../utils/redisClientv2');
// Every five minutes. Cron-style scheduling used by node-schedule.
const REPLAY_SCHEDULE = '*/5 * * * *';
@ -33,6 +34,7 @@ class UtapiReplay {
this.replaySchedule = REPLAY_SCHEDULE;
this.batchSize = BATCH_SIZE;
this.disableReplay = true;
this._isRunning = false;
if (config) {
const message = 'missing required property in UtapiReplay ' +
@ -41,7 +43,7 @@ class UtapiReplay {
assert(config.localCache, `${message}: localCache`);
this.utapiClient = new UtapiClient(config);
this.localCache = new Datastore()
.setClient(redisClient(config.localCache, this.log));
.setClient(redisClientv2(config.localCache));
if (config.replaySchedule) {
this.replaySchedule = config.replaySchedule;
}
@ -184,17 +186,25 @@ class UtapiReplay {
this.log.info('disabled utapi replay scheduler');
return this;
}
const replay = scheduleJob(this.replaySchedule, () =>
this._setLock()
.then(res => {
// If `res` is not `null`, there is no pre-existing lock.
if (res) {
return this._checkLocalCache();
}
return undefined;
}));
replay.on('scheduled', date =>
this.log.info(`replay job started: ${date}`));
const replay = scheduleJob(this.replaySchedule, () => {
if (!this._isRunning) {
this._isRunning = true;
return this._setLock()
.then(res => {
// If `res` is not `null`, there is no pre-existing lock.
if (res) {
return this._checkLocalCache();
}
return undefined;
})
.then(() => { this._isRunning = false; })
.catch(() => { this._isRunning = false; });
}
return;
});
replay.on('scheduled', date => this.log.info(`replay job started: ${date}`));
this.log.info('enabled utapi replay scheduler', {
schedule: this.replaySchedule,
});

View File

@ -52,6 +52,16 @@ class Memory {
this.data = {};
}
/**
* A simple wrapper provided for API compatibility with redis
* @param {Function} func - Function to call
* @param {callback} cb - callback
* @returns {undefined}
*/
call(func, cb) {
return func(this, cb);
}
/**
* Set key to hold a value
* @param {string} key - data key

View File

@ -12,7 +12,7 @@ const Route = require('../router/Route');
const Router = require('../router/Router');
const UtapiRequest = require('../lib/UtapiRequest');
const Datastore = require('./Datastore');
const redisClient = require('../utils/redisClient');
const RedisClient = require('../libV2/redis');
class UtapiServer {
/**
@ -91,7 +91,7 @@ class UtapiServer {
return this.errorResponse(utapiRequest, errors.AccessDenied);
}
const redisClient = this.datastore.getClient();
if (redisClient.status !== 'ready') {
if (!redisClient.isReady) {
return this.errorResponse(utapiRequest,
errors.InternalError.customizeDescription(
'Redis server is not ready'));
@ -220,7 +220,15 @@ function spawn(params) {
dump: log.dumpLevel });
const cluster = new Clustering(workers, logger);
cluster.start(worker => {
const datastore = new Datastore().setClient(redisClient(redis, logger));
const client = new RedisClient({
// disable offline queue
enableOfflineQueue: false,
// keep alive 3 seconds
keepAlive: 3000,
...redis,
});
client.connect();
const datastore = new Datastore().setClient(client);
const server = new UtapiServer(worker, port, datastore, logger, config);
server.startup();
});

182
libV2/redis.js Normal file
View File

@ -0,0 +1,182 @@
const EventEmitter = require('events');
const { callbackify, promisify } = require('util');
const IORedis = require('ioredis');
const { jsutil } = require('arsenal');
const errors = require('./errors');
const { LoggerContext, asyncOrCallback } = require('./utils');
const moduleLogger = new LoggerContext({
module: 'redis',
});
const COMMAND_TIMEOUT = 10000;
const CONNECTION_TIMEOUT = 30000;
/**
* Creates a new Redis client instance
* @param {object} conf - redis configuration
* @param {string} conf.host - redis host
* @param {number} conf.port - redis port
* @param {string} [conf.password] - redis password (optional)
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
* @param {Array<Object>} conf.sentinels - sentinels
* @param {Werelogs.Logger} log - Werelogs logger
* @return {Redis} - Redis client instance
*/
class RedisClient extends EventEmitter {
constructor(options) {
super();
this._redisOptions = options;
this._redis = null;
// Controls the use of additional command timeouts
// Only use if connecting to a sentinel cluster
this._useTimeouts = options.sentinels !== undefined;
this._inFlightTimeouts = this._useTimeouts ? new Set() : null;
this._runningRedisProbe = null;
this._isConnected = false;
this._isReady = false;
}
connect(callback) {
this._initClient(false);
if (callback) {
process.nextTick(callback);
}
}
disconnect(callback) {
return asyncOrCallback(async () => {
if (this._useTimeouts) {
Object.values(this._inFlightTimeouts)
.forEach(clearTimeout);
}
await this._redis.quit();
}, callback);
}
get isReady() {
return this._isConnected && this._isReady;
}
_initClient(startProbe = true) {
moduleLogger.debug('initializing redis client');
if (this._redis !== null) {
this._redis.off('connect', this._onConnect);
this._redis.off('ready', this._onReady);
this._redis.off('error', this._onError);
this._redis.quit();
}
this._isConnected = false;
this._isReady = false;
this._redis = new IORedis(this._redisOptions);
this._redis.on('connect', this._onConnect.bind(this));
this._redis.on('ready', this._onReady.bind(this));
this._redis.on('close', this._onClose.bind(this));
this._redis.on('reconnect', this._onReconnect.bind(this));
this._redis.on('end', this._onEnd.bind(this));
this._redis.on('error', this._onError.bind(this));
if (startProbe && this._runningRedisProbe === null) {
this._runningRedisProbe = setInterval(this._probeRedis.bind(this), CONNECTION_TIMEOUT);
}
}
_probeRedis() {
if (this.isReady) {
moduleLogger.debug('redis client is ready, clearing reinitialize interval');
clearInterval(this._runningRedisProbe);
this._runningRedisProbe = null;
} else {
moduleLogger.warn('redis client has failed to become ready, reinitializing');
this._initClient();
}
}
_onConnect() {
this._isConnected = true;
this.emit('connect');
}
_onReady() {
this._isReady = true;
this.emit('ready');
}
_onError(error) {
moduleLogger.error('error connecting to redis', { error });
this._isConnected = false;
this._isReady = false;
this.emit('error', error);
}
_onReconnect(delay) {
moduleLogger.debug(`reconnecting to redis, waiting ${delay} ms`);
}
_onClose() {
moduleLogger.debug('connection to redis closed');
this._isConnected = false;
this._isReady = false;
}
_onEnd() {
moduleLogger.debug('connection to redis ended.');
this._isConnected = false;
this._isReady = false;
}
_createCommandTimeout() {
let timer;
const cancelTimeout = jsutil.once(() => {
clearTimeout(timer);
this._inFlightTimeouts.delete(timer);
});
const timeout = new Promise((_, reject) => {
timer = setTimeout(
() => {
this.emit('timeout');
this._initClient();
},
COMMAND_TIMEOUT,
);
this._inFlightTimeouts.add(timer);
this.once('timeout', () => {
moduleLogger.warn('redis command timed out');
cancelTimeout();
reject(errors.OperationTimedOut);
});
});
return { timeout, cancelTimeout };
}
async _call(asyncFunc) {
const funcPromise = asyncFunc(this._redis);
if (!this._useTimeouts) {
// If timeouts are disabled simply return the Promise
return funcPromise;
}
const { timeout, cancelTimeout } = this._createCommandTimeout();
try {
// timeout always rejects so we can just return
return await Promise.race([funcPromise, timeout]);
} finally {
cancelTimeout();
}
}
call(func, callback) {
if (callback !== undefined) {
// If a callback is provided `func` is assumed to also take a callback
// and is converted to a promise using promisify
return callbackify(this._call.bind(this))(promisify(func), callback);
}
return this._call(func);
}
}
module.exports = RedisClient;

20
libV2/utils/func.js Normal file
View File

@ -0,0 +1,20 @@
const { callbackify } = require('util');
/**
* Convenience function to handle "if no callback then return a promise" pattern
*
* @param {Function} asyncFunc - asyncFunction to call
* @param {Function|undefined} callback - optional callback
* @returns {Promise|undefined} - returns a Promise if no callback is passed
*/
function asyncOrCallback(asyncFunc, callback) {
if (typeof callback === 'function') {
callbackify(asyncFunc)(callback);
return undefined;
}
return asyncFunc();
}
module.exports = {
asyncOrCallback,
};

11
libV2/utils/index.js Normal file
View File

@ -0,0 +1,11 @@
const log = require('./log');
const shard = require('./shard');
const timestamp = require('./timestamp');
const func = require('./func');
module.exports = {
...log,
...shard,
...timestamp,
...func,
};

View File

@ -4,14 +4,25 @@ const UtapiClient = require('../../../lib/UtapiClient');
const Datastore = require('../../../lib/Datastore');
const redisClient = require('../../../utils/redisClient');
const { Logger } = require('werelogs');
const { getCounters, getMetricFromKey,
getStateKeys, getKeys } = require('../../../lib/schema');
const RedisClientv2 = require('../../../libV2/redis');
const {
getCounters, getMetricFromKey,
getStateKeys, getKeys,
} = require('../../../lib/schema');
const log = new Logger('TestUtapiClient');
const redis = redisClient({
host: '127.0.0.1',
port: 6379,
}, log);
const datastore = new Datastore().setClient(redis);
const redisV2 = new RedisClientv2({
host: '127.0.0.1',
port: 6379,
});
redisV2.connect();
const datastore = new Datastore().setClient(
redisV2,
);
const utapiConfig = {
redis: {
host: '127.0.0.1',

View File

@ -5,6 +5,7 @@ const UtapiReplay = require('../../../lib/UtapiReplay');
const UtapiClient = require('../../../lib/UtapiClient');
const Datastore = require('../../../lib/Datastore');
const redisClient = require('../../../utils/redisClient');
const RedisClientv2 = require('../../../libV2/redis');
const { getAllResourceTypeKeys } = require('../../utils/utils');
const safeJsonParse = require('../../../utils/safeJsonParse');
@ -13,7 +14,12 @@ const localCache = redisClient({
host: '127.0.0.1',
port: 6379,
}, log);
const datastore = new Datastore().setClient(localCache);
const redis = new RedisClientv2({
host: '127.0.0.1',
port: 6379,
});
redis.connect();
const datastore = new Datastore().setClient(redis);
const utapiClient = new UtapiClient({
redis: {
host: '127.0.0.1',

32
utils/redisClientv2.js Normal file
View File

@ -0,0 +1,32 @@
const RedisClient = require('../libV2/redis');
/**
* Creates a new Redis client instance
* @param {object} conf - redis configuration
* @param {string} conf.host - redis host
* @param {number} conf.port - redis port
* @param {string} [conf.password] - redis password (optional)
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
* @param {Werelogs.Logger} log - Werelogs logger
* @return {Redis} - Redis client instance
*/
function redisClientv2(conf, log) {
const client = new RedisClient({
// disable offline queue
enableOfflineQueue: false,
// keep alive 3 seconds
keepAlive: 3000,
// Only emit `ready` if the server is able to accept commands
enableReadyCheck: true,
...conf,
});
client.connect();
client.on('error', err => log.trace('error with redis client', {
error: err,
}));
return client;
}
module.exports = redisClientv2;