Compare commits
No commits in common. "f7c9758b9c07942ad65322d45a1113e7affe8fc0" and "acbf8880f61fa492782142a539fe1208ab35a2f6" have entirely different histories.
f7c9758b9c
...
acbf8880f6
|
@ -33,10 +33,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
set(key, value, cb) {
|
set(key, value, cb) {
|
||||||
return this._client.call(
|
return this._client.set(key, value, cb);
|
||||||
(backend, done) => backend.set(key, value, done),
|
|
||||||
cb,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,7 +45,7 @@ class Datastore {
|
||||||
*/
|
*/
|
||||||
setExpire(key, value, ttl) {
|
setExpire(key, value, ttl) {
|
||||||
// This method is a Promise because no callback is given.
|
// This method is a Promise because no callback is given.
|
||||||
return this._client.call(backend => backend.set(key, value, 'EX', ttl, 'NX'));
|
return this._client.set(key, value, 'EX', ttl, 'NX');
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -57,8 +54,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
del(key) {
|
del(key) {
|
||||||
// This method is a Promise because no callback is given.
|
return this._client.del(key);
|
||||||
return this._client.call(backend => backend.del(key));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -68,7 +64,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
get(key, cb) {
|
get(key, cb) {
|
||||||
return this._client.call((backend, done) => backend.get(key, done), cb);
|
return this._client.get(key, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -78,7 +74,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
incr(key, cb) {
|
incr(key, cb) {
|
||||||
return this._client.call((backend, done) => backend.incr(key, done), cb);
|
return this._client.incr(key, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -88,7 +84,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
decr(key, cb) {
|
decr(key, cb) {
|
||||||
return this._client.call((backend, done) => backend.decr(key, done), cb);
|
return this._client.decr(key, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -99,7 +95,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
decrby(key, value, cb) {
|
decrby(key, value, cb) {
|
||||||
return this._client.call((backend, done) => backend.decrby(key, value, done), cb);
|
return this._client.decrby(key, value, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -111,7 +107,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
zadd(key, score, value, cb) {
|
zadd(key, score, value, cb) {
|
||||||
return this._client.call((backend, done) => backend.zadd(key, score, value, done), cb);
|
return this._client.zadd(key, score, value, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -124,7 +120,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
zrange(key, min, max, cb) {
|
zrange(key, min, max, cb) {
|
||||||
return this._client.call((backend, done) => backend.zrange(key, min, max, done), cb);
|
return this._client.zrange(key, min, max, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -137,7 +133,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
zrangebyscore(key, min, max, cb) {
|
zrangebyscore(key, min, max, cb) {
|
||||||
return this._client.call((backend, done) => backend.zrangebyscore(key, min, max, done), cb);
|
return this._client.zrangebyscore(key, min, max, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -150,12 +146,8 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
bZrangebyscore(keys, min, max, cb) {
|
bZrangebyscore(keys, min, max, cb) {
|
||||||
return this._client.call(
|
return this._client.pipeline(keys.map(
|
||||||
(backend, done) => backend
|
item => ['zrangebyscore', item, min, max])).exec(cb);
|
||||||
.pipeline(keys.map(item => ['zrangebyscore', item, min, max]))
|
|
||||||
.exec(done),
|
|
||||||
cb,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -165,9 +157,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
batch(cmds, cb) {
|
batch(cmds, cb) {
|
||||||
return this._client.call((backend, done) => {
|
return this._client.multi(cmds).exec(cb);
|
||||||
backend.multi(cmds).exec(done);
|
|
||||||
}, cb);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -177,7 +167,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
pipeline(cmds, cb) {
|
pipeline(cmds, cb) {
|
||||||
return this._client.call((backend, done) => backend.pipeline(cmds).exec(done), cb);
|
return this._client.pipeline(cmds).exec(cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -187,21 +177,20 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
multi(cmds, cb) {
|
multi(cmds, cb) {
|
||||||
return this._client.call((backend, done) =>
|
return this._client.multi(cmds).exec((err, res) => {
|
||||||
backend.multi(cmds).exec((err, res) => {
|
if (err) {
|
||||||
if (err) {
|
return cb(err);
|
||||||
return done(err);
|
}
|
||||||
}
|
const flattenRes = [];
|
||||||
const flattenRes = [];
|
const resErr = res.filter(item => {
|
||||||
const resErr = res.filter(item => {
|
flattenRes.push(item[1]);
|
||||||
flattenRes.push(item[1]);
|
return item[0] !== null;
|
||||||
return item[0] !== null;
|
});
|
||||||
});
|
if (resErr && resErr.length > 0) {
|
||||||
if (resErr && resErr.length > 0) {
|
return cb(resErr);
|
||||||
return done(resErr);
|
}
|
||||||
}
|
return cb(null, flattenRes);
|
||||||
return done(null, flattenRes);
|
});
|
||||||
}), cb);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -214,7 +203,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
zremrangebyscore(key, min, max, cb) {
|
zremrangebyscore(key, min, max, cb) {
|
||||||
return this._client.call((backend, done) => backend.zremrangebyscore(key, min, max, done), cb);
|
return this._client.zremrangebyscore(key, min, max, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -225,7 +214,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
lpush(key, val, cb) {
|
lpush(key, val, cb) {
|
||||||
return this._client.call((backend, done) => backend.lpush(key, val, done), cb);
|
return this._client.lpush(key, val, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -235,7 +224,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
rpop(key, cb) {
|
rpop(key, cb) {
|
||||||
return this._client.call((backend, done) => backend.rpop(key, done), cb);
|
return this._client.rpop(key, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -247,7 +236,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
lrange(key, start, stop, cb) {
|
lrange(key, start, stop, cb) {
|
||||||
return this._client.call((backend, done) => backend.lrange(key, start, stop, done), cb);
|
return this._client.lrange(key, start, stop, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -257,7 +246,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
llen(key, cb) {
|
llen(key, cb) {
|
||||||
return this._client.call((backend, done) => backend.llen(key, done), cb);
|
return this._client.llen(key, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -268,7 +257,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
publish(channel, message, cb) {
|
publish(channel, message, cb) {
|
||||||
return this._client.call((backend, done) => backend.publish(channel, message, done), cb);
|
return this._client.publish(channel, message, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -279,7 +268,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
scan(cursor, pattern, cb) {
|
scan(cursor, pattern, cb) {
|
||||||
return this._client.call((backend, done) => backend.scan(cursor, 'match', pattern, done), cb);
|
return this._client.scan(cursor, 'match', pattern, cb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ const werelogs = require('werelogs');
|
||||||
const Datastore = require('./Datastore');
|
const Datastore = require('./Datastore');
|
||||||
const { generateKey, generateCounter, generateStateKey } = require('./schema');
|
const { generateKey, generateCounter, generateStateKey } = require('./schema');
|
||||||
const { errors } = require('arsenal');
|
const { errors } = require('arsenal');
|
||||||
const redisClientv2 = require('../utils/redisClientv2');
|
const redisClient = require('../utils/redisClient');
|
||||||
const member = require('../utils/member');
|
const member = require('../utils/member');
|
||||||
|
|
||||||
const methods = {
|
const methods = {
|
||||||
|
@ -104,11 +104,11 @@ class UtapiClient {
|
||||||
}
|
}
|
||||||
if (config.redis) {
|
if (config.redis) {
|
||||||
this.ds = new Datastore()
|
this.ds = new Datastore()
|
||||||
.setClient(redisClientv2(config.redis, this.log));
|
.setClient(redisClient(config.redis, this.log));
|
||||||
}
|
}
|
||||||
if (config.localCache) {
|
if (config.localCache) {
|
||||||
this.localCache = new Datastore()
|
this.localCache = new Datastore()
|
||||||
.setClient(redisClientv2(config.localCache, this.log));
|
.setClient(redisClient(config.localCache, this.log));
|
||||||
}
|
}
|
||||||
if (config.component) {
|
if (config.component) {
|
||||||
// The configuration uses the property `component`, while
|
// The configuration uses the property `component`, while
|
||||||
|
|
|
@ -7,7 +7,7 @@ const { jsutil } = require('arsenal');
|
||||||
const werelogs = require('werelogs');
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
const Datastore = require('./Datastore');
|
const Datastore = require('./Datastore');
|
||||||
const RedisClient = require('../libV2/redis');
|
const redisClient = require('../utils/redisClient');
|
||||||
|
|
||||||
const REINDEX_SCHEDULE = '0 0 * * Sun';
|
const REINDEX_SCHEDULE = '0 0 * * Sun';
|
||||||
const REINDEX_LOCK_KEY = 's3:utapireindex:lock';
|
const REINDEX_LOCK_KEY = 's3:utapireindex:lock';
|
||||||
|
@ -62,7 +62,7 @@ class UtapiReindex {
|
||||||
}
|
}
|
||||||
|
|
||||||
_getRedisClient() {
|
_getRedisClient() {
|
||||||
const client = new RedisClient({
|
return redisClient({
|
||||||
sentinels: [{
|
sentinels: [{
|
||||||
host: this._sentinel.host,
|
host: this._sentinel.host,
|
||||||
port: this._sentinel.port,
|
port: this._sentinel.port,
|
||||||
|
@ -70,9 +70,7 @@ class UtapiReindex {
|
||||||
name: this._sentinel.name,
|
name: this._sentinel.name,
|
||||||
sentinelPassword: this._sentinel.sentinelPassword,
|
sentinelPassword: this._sentinel.sentinelPassword,
|
||||||
password: this._password,
|
password: this._password,
|
||||||
});
|
}, this._log);
|
||||||
client.connect();
|
|
||||||
return client;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_lock() {
|
_lock() {
|
||||||
|
|
|
@ -1,12 +1,11 @@
|
||||||
/* eslint-disable no-underscore-dangle */
|
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const { scheduleJob } = require('node-schedule');
|
const { scheduleJob } = require('node-schedule');
|
||||||
const UtapiClient = require('./UtapiClient');
|
const UtapiClient = require('./UtapiClient');
|
||||||
const Datastore = require('./Datastore');
|
const Datastore = require('./Datastore');
|
||||||
|
const redisClient = require('../utils/redisClient');
|
||||||
const safeJsonParse = require('../utils/safeJsonParse');
|
const safeJsonParse = require('../utils/safeJsonParse');
|
||||||
const werelogs = require('werelogs');
|
const werelogs = require('werelogs');
|
||||||
const redisClientv2 = require('../utils/redisClientv2');
|
|
||||||
|
|
||||||
// Every five minutes. Cron-style scheduling used by node-schedule.
|
// Every five minutes. Cron-style scheduling used by node-schedule.
|
||||||
const REPLAY_SCHEDULE = '*/5 * * * *';
|
const REPLAY_SCHEDULE = '*/5 * * * *';
|
||||||
|
@ -34,7 +33,6 @@ class UtapiReplay {
|
||||||
this.replaySchedule = REPLAY_SCHEDULE;
|
this.replaySchedule = REPLAY_SCHEDULE;
|
||||||
this.batchSize = BATCH_SIZE;
|
this.batchSize = BATCH_SIZE;
|
||||||
this.disableReplay = true;
|
this.disableReplay = true;
|
||||||
this._isRunning = false;
|
|
||||||
|
|
||||||
if (config) {
|
if (config) {
|
||||||
const message = 'missing required property in UtapiReplay ' +
|
const message = 'missing required property in UtapiReplay ' +
|
||||||
|
@ -43,7 +41,7 @@ class UtapiReplay {
|
||||||
assert(config.localCache, `${message}: localCache`);
|
assert(config.localCache, `${message}: localCache`);
|
||||||
this.utapiClient = new UtapiClient(config);
|
this.utapiClient = new UtapiClient(config);
|
||||||
this.localCache = new Datastore()
|
this.localCache = new Datastore()
|
||||||
.setClient(redisClientv2(config.localCache));
|
.setClient(redisClient(config.localCache, this.log));
|
||||||
if (config.replaySchedule) {
|
if (config.replaySchedule) {
|
||||||
this.replaySchedule = config.replaySchedule;
|
this.replaySchedule = config.replaySchedule;
|
||||||
}
|
}
|
||||||
|
@ -186,25 +184,17 @@ class UtapiReplay {
|
||||||
this.log.info('disabled utapi replay scheduler');
|
this.log.info('disabled utapi replay scheduler');
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
const replay = scheduleJob(this.replaySchedule, () => {
|
const replay = scheduleJob(this.replaySchedule, () =>
|
||||||
if (!this._isRunning) {
|
this._setLock()
|
||||||
this._isRunning = true;
|
.then(res => {
|
||||||
return this._setLock()
|
// If `res` is not `null`, there is no pre-existing lock.
|
||||||
.then(res => {
|
if (res) {
|
||||||
// If `res` is not `null`, there is no pre-existing lock.
|
return this._checkLocalCache();
|
||||||
if (res) {
|
}
|
||||||
return this._checkLocalCache();
|
return undefined;
|
||||||
}
|
}));
|
||||||
|
replay.on('scheduled', date =>
|
||||||
return undefined;
|
this.log.info(`replay job started: ${date}`));
|
||||||
})
|
|
||||||
.then(() => { this._isRunning = false; })
|
|
||||||
.catch(() => { this._isRunning = false; });
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
});
|
|
||||||
|
|
||||||
replay.on('scheduled', date => this.log.info(`replay job started: ${date}`));
|
|
||||||
this.log.info('enabled utapi replay scheduler', {
|
this.log.info('enabled utapi replay scheduler', {
|
||||||
schedule: this.replaySchedule,
|
schedule: this.replaySchedule,
|
||||||
});
|
});
|
||||||
|
|
|
@ -52,16 +52,6 @@ class Memory {
|
||||||
this.data = {};
|
this.data = {};
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A simple wrapper provided for API compatibility with redis
|
|
||||||
* @param {Function} func - Function to call
|
|
||||||
* @param {callback} cb - callback
|
|
||||||
* @returns {undefined}
|
|
||||||
*/
|
|
||||||
call(func, cb) {
|
|
||||||
return func(this, cb);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set key to hold a value
|
* Set key to hold a value
|
||||||
* @param {string} key - data key
|
* @param {string} key - data key
|
||||||
|
|
|
@ -12,7 +12,7 @@ const Route = require('../router/Route');
|
||||||
const Router = require('../router/Router');
|
const Router = require('../router/Router');
|
||||||
const UtapiRequest = require('../lib/UtapiRequest');
|
const UtapiRequest = require('../lib/UtapiRequest');
|
||||||
const Datastore = require('./Datastore');
|
const Datastore = require('./Datastore');
|
||||||
const RedisClient = require('../libV2/redis');
|
const redisClient = require('../utils/redisClient');
|
||||||
|
|
||||||
class UtapiServer {
|
class UtapiServer {
|
||||||
/**
|
/**
|
||||||
|
@ -91,7 +91,7 @@ class UtapiServer {
|
||||||
return this.errorResponse(utapiRequest, errors.AccessDenied);
|
return this.errorResponse(utapiRequest, errors.AccessDenied);
|
||||||
}
|
}
|
||||||
const redisClient = this.datastore.getClient();
|
const redisClient = this.datastore.getClient();
|
||||||
if (!redisClient.isReady) {
|
if (redisClient.status !== 'ready') {
|
||||||
return this.errorResponse(utapiRequest,
|
return this.errorResponse(utapiRequest,
|
||||||
errors.InternalError.customizeDescription(
|
errors.InternalError.customizeDescription(
|
||||||
'Redis server is not ready'));
|
'Redis server is not ready'));
|
||||||
|
@ -220,15 +220,7 @@ function spawn(params) {
|
||||||
dump: log.dumpLevel });
|
dump: log.dumpLevel });
|
||||||
const cluster = new Clustering(workers, logger);
|
const cluster = new Clustering(workers, logger);
|
||||||
cluster.start(worker => {
|
cluster.start(worker => {
|
||||||
const client = new RedisClient({
|
const datastore = new Datastore().setClient(redisClient(redis, logger));
|
||||||
// disable offline queue
|
|
||||||
enableOfflineQueue: false,
|
|
||||||
// keep alive 3 seconds
|
|
||||||
keepAlive: 3000,
|
|
||||||
...redis,
|
|
||||||
});
|
|
||||||
client.connect();
|
|
||||||
const datastore = new Datastore().setClient(client);
|
|
||||||
const server = new UtapiServer(worker, port, datastore, logger, config);
|
const server = new UtapiServer(worker, port, datastore, logger, config);
|
||||||
server.startup();
|
server.startup();
|
||||||
});
|
});
|
||||||
|
|
182
libV2/redis.js
182
libV2/redis.js
|
@ -1,182 +0,0 @@
|
||||||
const EventEmitter = require('events');
|
|
||||||
const { callbackify, promisify } = require('util');
|
|
||||||
const IORedis = require('ioredis');
|
|
||||||
const { jsutil } = require('arsenal');
|
|
||||||
|
|
||||||
const errors = require('./errors');
|
|
||||||
const { LoggerContext, asyncOrCallback } = require('./utils');
|
|
||||||
|
|
||||||
const moduleLogger = new LoggerContext({
|
|
||||||
module: 'redis',
|
|
||||||
});
|
|
||||||
|
|
||||||
const COMMAND_TIMEOUT = 10000;
|
|
||||||
const CONNECTION_TIMEOUT = 30000;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new Redis client instance
|
|
||||||
* @param {object} conf - redis configuration
|
|
||||||
* @param {string} conf.host - redis host
|
|
||||||
* @param {number} conf.port - redis port
|
|
||||||
* @param {string} [conf.password] - redis password (optional)
|
|
||||||
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
|
|
||||||
* @param {Array<Object>} conf.sentinels - sentinels
|
|
||||||
* @param {Werelogs.Logger} log - Werelogs logger
|
|
||||||
* @return {Redis} - Redis client instance
|
|
||||||
*/
|
|
||||||
class RedisClient extends EventEmitter {
|
|
||||||
constructor(options) {
|
|
||||||
super();
|
|
||||||
this._redisOptions = options;
|
|
||||||
this._redis = null;
|
|
||||||
// Controls the use of additional command timeouts
|
|
||||||
// Only use if connecting to a sentinel cluster
|
|
||||||
this._useTimeouts = options.sentinels !== undefined;
|
|
||||||
this._inFlightTimeouts = this._useTimeouts ? new Set() : null;
|
|
||||||
this._runningRedisProbe = null;
|
|
||||||
this._isConnected = false;
|
|
||||||
this._isReady = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
connect(callback) {
|
|
||||||
this._initClient(false);
|
|
||||||
if (callback) {
|
|
||||||
process.nextTick(callback);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
disconnect(callback) {
|
|
||||||
return asyncOrCallback(async () => {
|
|
||||||
if (this._useTimeouts) {
|
|
||||||
Object.values(this._inFlightTimeouts)
|
|
||||||
.forEach(clearTimeout);
|
|
||||||
}
|
|
||||||
await this._redis.quit();
|
|
||||||
}, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
get isReady() {
|
|
||||||
return this._isConnected && this._isReady;
|
|
||||||
}
|
|
||||||
|
|
||||||
_initClient(startProbe = true) {
|
|
||||||
moduleLogger.debug('initializing redis client');
|
|
||||||
if (this._redis !== null) {
|
|
||||||
this._redis.off('connect', this._onConnect);
|
|
||||||
this._redis.off('ready', this._onReady);
|
|
||||||
this._redis.off('error', this._onError);
|
|
||||||
this._redis.quit();
|
|
||||||
}
|
|
||||||
this._isConnected = false;
|
|
||||||
this._isReady = false;
|
|
||||||
this._redis = new IORedis(this._redisOptions);
|
|
||||||
this._redis.on('connect', this._onConnect.bind(this));
|
|
||||||
this._redis.on('ready', this._onReady.bind(this));
|
|
||||||
this._redis.on('close', this._onClose.bind(this));
|
|
||||||
this._redis.on('reconnect', this._onReconnect.bind(this));
|
|
||||||
this._redis.on('end', this._onEnd.bind(this));
|
|
||||||
this._redis.on('error', this._onError.bind(this));
|
|
||||||
if (startProbe && this._runningRedisProbe === null) {
|
|
||||||
this._runningRedisProbe = setInterval(this._probeRedis.bind(this), CONNECTION_TIMEOUT);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_probeRedis() {
|
|
||||||
if (this.isReady) {
|
|
||||||
moduleLogger.debug('redis client is ready, clearing reinitialize interval');
|
|
||||||
clearInterval(this._runningRedisProbe);
|
|
||||||
this._runningRedisProbe = null;
|
|
||||||
} else {
|
|
||||||
moduleLogger.warn('redis client has failed to become ready, reinitializing');
|
|
||||||
this._initClient();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_onConnect() {
|
|
||||||
this._isConnected = true;
|
|
||||||
this.emit('connect');
|
|
||||||
}
|
|
||||||
|
|
||||||
_onReady() {
|
|
||||||
this._isReady = true;
|
|
||||||
this.emit('ready');
|
|
||||||
}
|
|
||||||
|
|
||||||
_onError(error) {
|
|
||||||
moduleLogger.error('error connecting to redis', { error });
|
|
||||||
this._isConnected = false;
|
|
||||||
this._isReady = false;
|
|
||||||
this.emit('error', error);
|
|
||||||
}
|
|
||||||
|
|
||||||
_onReconnect(delay) {
|
|
||||||
moduleLogger.debug(`reconnecting to redis, waiting ${delay} ms`);
|
|
||||||
}
|
|
||||||
|
|
||||||
_onClose() {
|
|
||||||
moduleLogger.debug('connection to redis closed');
|
|
||||||
this._isConnected = false;
|
|
||||||
this._isReady = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
_onEnd() {
|
|
||||||
moduleLogger.debug('connection to redis ended.');
|
|
||||||
this._isConnected = false;
|
|
||||||
this._isReady = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
_createCommandTimeout() {
|
|
||||||
let timer;
|
|
||||||
const cancelTimeout = jsutil.once(() => {
|
|
||||||
clearTimeout(timer);
|
|
||||||
this._inFlightTimeouts.delete(timer);
|
|
||||||
});
|
|
||||||
|
|
||||||
const timeout = new Promise((_, reject) => {
|
|
||||||
timer = setTimeout(
|
|
||||||
() => {
|
|
||||||
this.emit('timeout');
|
|
||||||
this._initClient();
|
|
||||||
},
|
|
||||||
COMMAND_TIMEOUT,
|
|
||||||
);
|
|
||||||
|
|
||||||
this._inFlightTimeouts.add(timer);
|
|
||||||
this.once('timeout', () => {
|
|
||||||
moduleLogger.warn('redis command timed out');
|
|
||||||
cancelTimeout();
|
|
||||||
reject(errors.OperationTimedOut);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
return { timeout, cancelTimeout };
|
|
||||||
}
|
|
||||||
|
|
||||||
async _call(asyncFunc) {
|
|
||||||
const funcPromise = asyncFunc(this._redis);
|
|
||||||
if (!this._useTimeouts) {
|
|
||||||
// If timeouts are disabled simply return the Promise
|
|
||||||
return funcPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
const { timeout, cancelTimeout } = this._createCommandTimeout();
|
|
||||||
|
|
||||||
try {
|
|
||||||
// timeout always rejects so we can just return
|
|
||||||
return await Promise.race([funcPromise, timeout]);
|
|
||||||
} finally {
|
|
||||||
cancelTimeout();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
call(func, callback) {
|
|
||||||
if (callback !== undefined) {
|
|
||||||
// If a callback is provided `func` is assumed to also take a callback
|
|
||||||
// and is converted to a promise using promisify
|
|
||||||
return callbackify(this._call.bind(this))(promisify(func), callback);
|
|
||||||
}
|
|
||||||
return this._call(func);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = RedisClient;
|
|
|
@ -1,20 +0,0 @@
|
||||||
const { callbackify } = require('util');
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convenience function to handle "if no callback then return a promise" pattern
|
|
||||||
*
|
|
||||||
* @param {Function} asyncFunc - asyncFunction to call
|
|
||||||
* @param {Function|undefined} callback - optional callback
|
|
||||||
* @returns {Promise|undefined} - returns a Promise if no callback is passed
|
|
||||||
*/
|
|
||||||
function asyncOrCallback(asyncFunc, callback) {
|
|
||||||
if (typeof callback === 'function') {
|
|
||||||
callbackify(asyncFunc)(callback);
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
return asyncFunc();
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = {
|
|
||||||
asyncOrCallback,
|
|
||||||
};
|
|
|
@ -1,11 +0,0 @@
|
||||||
const log = require('./log');
|
|
||||||
const shard = require('./shard');
|
|
||||||
const timestamp = require('./timestamp');
|
|
||||||
const func = require('./func');
|
|
||||||
|
|
||||||
module.exports = {
|
|
||||||
...log,
|
|
||||||
...shard,
|
|
||||||
...timestamp,
|
|
||||||
...func,
|
|
||||||
};
|
|
|
@ -4,25 +4,14 @@ const UtapiClient = require('../../../lib/UtapiClient');
|
||||||
const Datastore = require('../../../lib/Datastore');
|
const Datastore = require('../../../lib/Datastore');
|
||||||
const redisClient = require('../../../utils/redisClient');
|
const redisClient = require('../../../utils/redisClient');
|
||||||
const { Logger } = require('werelogs');
|
const { Logger } = require('werelogs');
|
||||||
const RedisClientv2 = require('../../../libV2/redis');
|
const { getCounters, getMetricFromKey,
|
||||||
const {
|
getStateKeys, getKeys } = require('../../../lib/schema');
|
||||||
getCounters, getMetricFromKey,
|
|
||||||
getStateKeys, getKeys,
|
|
||||||
} = require('../../../lib/schema');
|
|
||||||
|
|
||||||
const log = new Logger('TestUtapiClient');
|
const log = new Logger('TestUtapiClient');
|
||||||
const redis = redisClient({
|
const redis = redisClient({
|
||||||
host: '127.0.0.1',
|
host: '127.0.0.1',
|
||||||
port: 6379,
|
port: 6379,
|
||||||
}, log);
|
}, log);
|
||||||
const redisV2 = new RedisClientv2({
|
const datastore = new Datastore().setClient(redis);
|
||||||
host: '127.0.0.1',
|
|
||||||
port: 6379,
|
|
||||||
});
|
|
||||||
redisV2.connect();
|
|
||||||
const datastore = new Datastore().setClient(
|
|
||||||
redisV2,
|
|
||||||
);
|
|
||||||
const utapiConfig = {
|
const utapiConfig = {
|
||||||
redis: {
|
redis: {
|
||||||
host: '127.0.0.1',
|
host: '127.0.0.1',
|
||||||
|
|
|
@ -5,7 +5,6 @@ const UtapiReplay = require('../../../lib/UtapiReplay');
|
||||||
const UtapiClient = require('../../../lib/UtapiClient');
|
const UtapiClient = require('../../../lib/UtapiClient');
|
||||||
const Datastore = require('../../../lib/Datastore');
|
const Datastore = require('../../../lib/Datastore');
|
||||||
const redisClient = require('../../../utils/redisClient');
|
const redisClient = require('../../../utils/redisClient');
|
||||||
const RedisClientv2 = require('../../../libV2/redis');
|
|
||||||
const { getAllResourceTypeKeys } = require('../../utils/utils');
|
const { getAllResourceTypeKeys } = require('../../utils/utils');
|
||||||
const safeJsonParse = require('../../../utils/safeJsonParse');
|
const safeJsonParse = require('../../../utils/safeJsonParse');
|
||||||
|
|
||||||
|
@ -14,12 +13,7 @@ const localCache = redisClient({
|
||||||
host: '127.0.0.1',
|
host: '127.0.0.1',
|
||||||
port: 6379,
|
port: 6379,
|
||||||
}, log);
|
}, log);
|
||||||
const redis = new RedisClientv2({
|
const datastore = new Datastore().setClient(localCache);
|
||||||
host: '127.0.0.1',
|
|
||||||
port: 6379,
|
|
||||||
});
|
|
||||||
redis.connect();
|
|
||||||
const datastore = new Datastore().setClient(redis);
|
|
||||||
const utapiClient = new UtapiClient({
|
const utapiClient = new UtapiClient({
|
||||||
redis: {
|
redis: {
|
||||||
host: '127.0.0.1',
|
host: '127.0.0.1',
|
||||||
|
|
|
@ -1,32 +0,0 @@
|
||||||
const RedisClient = require('../libV2/redis');
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new Redis client instance
|
|
||||||
* @param {object} conf - redis configuration
|
|
||||||
* @param {string} conf.host - redis host
|
|
||||||
* @param {number} conf.port - redis port
|
|
||||||
* @param {string} [conf.password] - redis password (optional)
|
|
||||||
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
|
|
||||||
* @param {Werelogs.Logger} log - Werelogs logger
|
|
||||||
* @return {Redis} - Redis client instance
|
|
||||||
*/
|
|
||||||
function redisClientv2(conf, log) {
|
|
||||||
const client = new RedisClient({
|
|
||||||
// disable offline queue
|
|
||||||
enableOfflineQueue: false,
|
|
||||||
// keep alive 3 seconds
|
|
||||||
keepAlive: 3000,
|
|
||||||
// Only emit `ready` if the server is able to accept commands
|
|
||||||
enableReadyCheck: true,
|
|
||||||
...conf,
|
|
||||||
});
|
|
||||||
|
|
||||||
client.connect();
|
|
||||||
client.on('error', err => log.trace('error with redis client', {
|
|
||||||
error: err,
|
|
||||||
}));
|
|
||||||
return client;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
module.exports = redisClientv2;
|
|
Loading…
Reference in New Issue