Compare commits
5 Commits
acbf8880f6
...
f7c9758b9c
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | f7c9758b9c | |
Taylor McKinnon | 1542962d72 | |
Taylor McKinnon | 9bbec85160 | |
Taylor McKinnon | 41e183d434 | |
Taylor McKinnon | 2b4bd85e27 |
|
@ -33,7 +33,10 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
set(key, value, cb) {
|
set(key, value, cb) {
|
||||||
return this._client.set(key, value, cb);
|
return this._client.call(
|
||||||
|
(backend, done) => backend.set(key, value, done),
|
||||||
|
cb,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -45,7 +48,7 @@ class Datastore {
|
||||||
*/
|
*/
|
||||||
setExpire(key, value, ttl) {
|
setExpire(key, value, ttl) {
|
||||||
// This method is a Promise because no callback is given.
|
// This method is a Promise because no callback is given.
|
||||||
return this._client.set(key, value, 'EX', ttl, 'NX');
|
return this._client.call(backend => backend.set(key, value, 'EX', ttl, 'NX'));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -54,7 +57,8 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
del(key) {
|
del(key) {
|
||||||
return this._client.del(key);
|
// This method is a Promise because no callback is given.
|
||||||
|
return this._client.call(backend => backend.del(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -64,7 +68,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
get(key, cb) {
|
get(key, cb) {
|
||||||
return this._client.get(key, cb);
|
return this._client.call((backend, done) => backend.get(key, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -74,7 +78,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
incr(key, cb) {
|
incr(key, cb) {
|
||||||
return this._client.incr(key, cb);
|
return this._client.call((backend, done) => backend.incr(key, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -84,7 +88,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
decr(key, cb) {
|
decr(key, cb) {
|
||||||
return this._client.decr(key, cb);
|
return this._client.call((backend, done) => backend.decr(key, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -95,7 +99,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
decrby(key, value, cb) {
|
decrby(key, value, cb) {
|
||||||
return this._client.decrby(key, value, cb);
|
return this._client.call((backend, done) => backend.decrby(key, value, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -107,7 +111,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
zadd(key, score, value, cb) {
|
zadd(key, score, value, cb) {
|
||||||
return this._client.zadd(key, score, value, cb);
|
return this._client.call((backend, done) => backend.zadd(key, score, value, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -120,7 +124,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
zrange(key, min, max, cb) {
|
zrange(key, min, max, cb) {
|
||||||
return this._client.zrange(key, min, max, cb);
|
return this._client.call((backend, done) => backend.zrange(key, min, max, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -133,7 +137,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
zrangebyscore(key, min, max, cb) {
|
zrangebyscore(key, min, max, cb) {
|
||||||
return this._client.zrangebyscore(key, min, max, cb);
|
return this._client.call((backend, done) => backend.zrangebyscore(key, min, max, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -146,8 +150,12 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
bZrangebyscore(keys, min, max, cb) {
|
bZrangebyscore(keys, min, max, cb) {
|
||||||
return this._client.pipeline(keys.map(
|
return this._client.call(
|
||||||
item => ['zrangebyscore', item, min, max])).exec(cb);
|
(backend, done) => backend
|
||||||
|
.pipeline(keys.map(item => ['zrangebyscore', item, min, max]))
|
||||||
|
.exec(done),
|
||||||
|
cb,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -157,7 +165,9 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
batch(cmds, cb) {
|
batch(cmds, cb) {
|
||||||
return this._client.multi(cmds).exec(cb);
|
return this._client.call((backend, done) => {
|
||||||
|
backend.multi(cmds).exec(done);
|
||||||
|
}, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -167,7 +177,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
pipeline(cmds, cb) {
|
pipeline(cmds, cb) {
|
||||||
return this._client.pipeline(cmds).exec(cb);
|
return this._client.call((backend, done) => backend.pipeline(cmds).exec(done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -177,9 +187,10 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
multi(cmds, cb) {
|
multi(cmds, cb) {
|
||||||
return this._client.multi(cmds).exec((err, res) => {
|
return this._client.call((backend, done) =>
|
||||||
|
backend.multi(cmds).exec((err, res) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
return cb(err);
|
return done(err);
|
||||||
}
|
}
|
||||||
const flattenRes = [];
|
const flattenRes = [];
|
||||||
const resErr = res.filter(item => {
|
const resErr = res.filter(item => {
|
||||||
|
@ -187,10 +198,10 @@ class Datastore {
|
||||||
return item[0] !== null;
|
return item[0] !== null;
|
||||||
});
|
});
|
||||||
if (resErr && resErr.length > 0) {
|
if (resErr && resErr.length > 0) {
|
||||||
return cb(resErr);
|
return done(resErr);
|
||||||
}
|
}
|
||||||
return cb(null, flattenRes);
|
return done(null, flattenRes);
|
||||||
});
|
}), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -203,7 +214,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
zremrangebyscore(key, min, max, cb) {
|
zremrangebyscore(key, min, max, cb) {
|
||||||
return this._client.zremrangebyscore(key, min, max, cb);
|
return this._client.call((backend, done) => backend.zremrangebyscore(key, min, max, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -214,7 +225,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
lpush(key, val, cb) {
|
lpush(key, val, cb) {
|
||||||
return this._client.lpush(key, val, cb);
|
return this._client.call((backend, done) => backend.lpush(key, val, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -224,7 +235,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
rpop(key, cb) {
|
rpop(key, cb) {
|
||||||
return this._client.rpop(key, cb);
|
return this._client.call((backend, done) => backend.rpop(key, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -236,7 +247,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
lrange(key, start, stop, cb) {
|
lrange(key, start, stop, cb) {
|
||||||
return this._client.lrange(key, start, stop, cb);
|
return this._client.call((backend, done) => backend.lrange(key, start, stop, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -246,7 +257,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
llen(key, cb) {
|
llen(key, cb) {
|
||||||
return this._client.llen(key, cb);
|
return this._client.call((backend, done) => backend.llen(key, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -257,7 +268,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
publish(channel, message, cb) {
|
publish(channel, message, cb) {
|
||||||
return this._client.publish(channel, message, cb);
|
return this._client.call((backend, done) => backend.publish(channel, message, done), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -268,7 +279,7 @@ class Datastore {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
scan(cursor, pattern, cb) {
|
scan(cursor, pattern, cb) {
|
||||||
return this._client.scan(cursor, 'match', pattern, cb);
|
return this._client.call((backend, done) => backend.scan(cursor, 'match', pattern, done), cb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ const werelogs = require('werelogs');
|
||||||
const Datastore = require('./Datastore');
|
const Datastore = require('./Datastore');
|
||||||
const { generateKey, generateCounter, generateStateKey } = require('./schema');
|
const { generateKey, generateCounter, generateStateKey } = require('./schema');
|
||||||
const { errors } = require('arsenal');
|
const { errors } = require('arsenal');
|
||||||
const redisClient = require('../utils/redisClient');
|
const redisClientv2 = require('../utils/redisClientv2');
|
||||||
const member = require('../utils/member');
|
const member = require('../utils/member');
|
||||||
|
|
||||||
const methods = {
|
const methods = {
|
||||||
|
@ -104,11 +104,11 @@ class UtapiClient {
|
||||||
}
|
}
|
||||||
if (config.redis) {
|
if (config.redis) {
|
||||||
this.ds = new Datastore()
|
this.ds = new Datastore()
|
||||||
.setClient(redisClient(config.redis, this.log));
|
.setClient(redisClientv2(config.redis, this.log));
|
||||||
}
|
}
|
||||||
if (config.localCache) {
|
if (config.localCache) {
|
||||||
this.localCache = new Datastore()
|
this.localCache = new Datastore()
|
||||||
.setClient(redisClient(config.localCache, this.log));
|
.setClient(redisClientv2(config.localCache, this.log));
|
||||||
}
|
}
|
||||||
if (config.component) {
|
if (config.component) {
|
||||||
// The configuration uses the property `component`, while
|
// The configuration uses the property `component`, while
|
||||||
|
|
|
@ -7,7 +7,7 @@ const { jsutil } = require('arsenal');
|
||||||
const werelogs = require('werelogs');
|
const werelogs = require('werelogs');
|
||||||
|
|
||||||
const Datastore = require('./Datastore');
|
const Datastore = require('./Datastore');
|
||||||
const redisClient = require('../utils/redisClient');
|
const RedisClient = require('../libV2/redis');
|
||||||
|
|
||||||
const REINDEX_SCHEDULE = '0 0 * * Sun';
|
const REINDEX_SCHEDULE = '0 0 * * Sun';
|
||||||
const REINDEX_LOCK_KEY = 's3:utapireindex:lock';
|
const REINDEX_LOCK_KEY = 's3:utapireindex:lock';
|
||||||
|
@ -62,7 +62,7 @@ class UtapiReindex {
|
||||||
}
|
}
|
||||||
|
|
||||||
_getRedisClient() {
|
_getRedisClient() {
|
||||||
return redisClient({
|
const client = new RedisClient({
|
||||||
sentinels: [{
|
sentinels: [{
|
||||||
host: this._sentinel.host,
|
host: this._sentinel.host,
|
||||||
port: this._sentinel.port,
|
port: this._sentinel.port,
|
||||||
|
@ -70,7 +70,9 @@ class UtapiReindex {
|
||||||
name: this._sentinel.name,
|
name: this._sentinel.name,
|
||||||
sentinelPassword: this._sentinel.sentinelPassword,
|
sentinelPassword: this._sentinel.sentinelPassword,
|
||||||
password: this._password,
|
password: this._password,
|
||||||
}, this._log);
|
});
|
||||||
|
client.connect();
|
||||||
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
_lock() {
|
_lock() {
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
|
/* eslint-disable no-underscore-dangle */
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const { scheduleJob } = require('node-schedule');
|
const { scheduleJob } = require('node-schedule');
|
||||||
const UtapiClient = require('./UtapiClient');
|
const UtapiClient = require('./UtapiClient');
|
||||||
const Datastore = require('./Datastore');
|
const Datastore = require('./Datastore');
|
||||||
const redisClient = require('../utils/redisClient');
|
|
||||||
const safeJsonParse = require('../utils/safeJsonParse');
|
const safeJsonParse = require('../utils/safeJsonParse');
|
||||||
const werelogs = require('werelogs');
|
const werelogs = require('werelogs');
|
||||||
|
const redisClientv2 = require('../utils/redisClientv2');
|
||||||
|
|
||||||
// Every five minutes. Cron-style scheduling used by node-schedule.
|
// Every five minutes. Cron-style scheduling used by node-schedule.
|
||||||
const REPLAY_SCHEDULE = '*/5 * * * *';
|
const REPLAY_SCHEDULE = '*/5 * * * *';
|
||||||
|
@ -33,6 +34,7 @@ class UtapiReplay {
|
||||||
this.replaySchedule = REPLAY_SCHEDULE;
|
this.replaySchedule = REPLAY_SCHEDULE;
|
||||||
this.batchSize = BATCH_SIZE;
|
this.batchSize = BATCH_SIZE;
|
||||||
this.disableReplay = true;
|
this.disableReplay = true;
|
||||||
|
this._isRunning = false;
|
||||||
|
|
||||||
if (config) {
|
if (config) {
|
||||||
const message = 'missing required property in UtapiReplay ' +
|
const message = 'missing required property in UtapiReplay ' +
|
||||||
|
@ -41,7 +43,7 @@ class UtapiReplay {
|
||||||
assert(config.localCache, `${message}: localCache`);
|
assert(config.localCache, `${message}: localCache`);
|
||||||
this.utapiClient = new UtapiClient(config);
|
this.utapiClient = new UtapiClient(config);
|
||||||
this.localCache = new Datastore()
|
this.localCache = new Datastore()
|
||||||
.setClient(redisClient(config.localCache, this.log));
|
.setClient(redisClientv2(config.localCache));
|
||||||
if (config.replaySchedule) {
|
if (config.replaySchedule) {
|
||||||
this.replaySchedule = config.replaySchedule;
|
this.replaySchedule = config.replaySchedule;
|
||||||
}
|
}
|
||||||
|
@ -184,17 +186,25 @@ class UtapiReplay {
|
||||||
this.log.info('disabled utapi replay scheduler');
|
this.log.info('disabled utapi replay scheduler');
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
const replay = scheduleJob(this.replaySchedule, () =>
|
const replay = scheduleJob(this.replaySchedule, () => {
|
||||||
this._setLock()
|
if (!this._isRunning) {
|
||||||
|
this._isRunning = true;
|
||||||
|
return this._setLock()
|
||||||
.then(res => {
|
.then(res => {
|
||||||
// If `res` is not `null`, there is no pre-existing lock.
|
// If `res` is not `null`, there is no pre-existing lock.
|
||||||
if (res) {
|
if (res) {
|
||||||
return this._checkLocalCache();
|
return this._checkLocalCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
return undefined;
|
return undefined;
|
||||||
}));
|
})
|
||||||
replay.on('scheduled', date =>
|
.then(() => { this._isRunning = false; })
|
||||||
this.log.info(`replay job started: ${date}`));
|
.catch(() => { this._isRunning = false; });
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
});
|
||||||
|
|
||||||
|
replay.on('scheduled', date => this.log.info(`replay job started: ${date}`));
|
||||||
this.log.info('enabled utapi replay scheduler', {
|
this.log.info('enabled utapi replay scheduler', {
|
||||||
schedule: this.replaySchedule,
|
schedule: this.replaySchedule,
|
||||||
});
|
});
|
||||||
|
|
|
@ -52,6 +52,16 @@ class Memory {
|
||||||
this.data = {};
|
this.data = {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A simple wrapper provided for API compatibility with redis
|
||||||
|
* @param {Function} func - Function to call
|
||||||
|
* @param {callback} cb - callback
|
||||||
|
* @returns {undefined}
|
||||||
|
*/
|
||||||
|
call(func, cb) {
|
||||||
|
return func(this, cb);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set key to hold a value
|
* Set key to hold a value
|
||||||
* @param {string} key - data key
|
* @param {string} key - data key
|
||||||
|
|
|
@ -12,7 +12,7 @@ const Route = require('../router/Route');
|
||||||
const Router = require('../router/Router');
|
const Router = require('../router/Router');
|
||||||
const UtapiRequest = require('../lib/UtapiRequest');
|
const UtapiRequest = require('../lib/UtapiRequest');
|
||||||
const Datastore = require('./Datastore');
|
const Datastore = require('./Datastore');
|
||||||
const redisClient = require('../utils/redisClient');
|
const RedisClient = require('../libV2/redis');
|
||||||
|
|
||||||
class UtapiServer {
|
class UtapiServer {
|
||||||
/**
|
/**
|
||||||
|
@ -91,7 +91,7 @@ class UtapiServer {
|
||||||
return this.errorResponse(utapiRequest, errors.AccessDenied);
|
return this.errorResponse(utapiRequest, errors.AccessDenied);
|
||||||
}
|
}
|
||||||
const redisClient = this.datastore.getClient();
|
const redisClient = this.datastore.getClient();
|
||||||
if (redisClient.status !== 'ready') {
|
if (!redisClient.isReady) {
|
||||||
return this.errorResponse(utapiRequest,
|
return this.errorResponse(utapiRequest,
|
||||||
errors.InternalError.customizeDescription(
|
errors.InternalError.customizeDescription(
|
||||||
'Redis server is not ready'));
|
'Redis server is not ready'));
|
||||||
|
@ -220,7 +220,15 @@ function spawn(params) {
|
||||||
dump: log.dumpLevel });
|
dump: log.dumpLevel });
|
||||||
const cluster = new Clustering(workers, logger);
|
const cluster = new Clustering(workers, logger);
|
||||||
cluster.start(worker => {
|
cluster.start(worker => {
|
||||||
const datastore = new Datastore().setClient(redisClient(redis, logger));
|
const client = new RedisClient({
|
||||||
|
// disable offline queue
|
||||||
|
enableOfflineQueue: false,
|
||||||
|
// keep alive 3 seconds
|
||||||
|
keepAlive: 3000,
|
||||||
|
...redis,
|
||||||
|
});
|
||||||
|
client.connect();
|
||||||
|
const datastore = new Datastore().setClient(client);
|
||||||
const server = new UtapiServer(worker, port, datastore, logger, config);
|
const server = new UtapiServer(worker, port, datastore, logger, config);
|
||||||
server.startup();
|
server.startup();
|
||||||
});
|
});
|
||||||
|
|
|
@ -0,0 +1,182 @@
|
||||||
|
const EventEmitter = require('events');
|
||||||
|
const { callbackify, promisify } = require('util');
|
||||||
|
const IORedis = require('ioredis');
|
||||||
|
const { jsutil } = require('arsenal');
|
||||||
|
|
||||||
|
const errors = require('./errors');
|
||||||
|
const { LoggerContext, asyncOrCallback } = require('./utils');
|
||||||
|
|
||||||
|
const moduleLogger = new LoggerContext({
|
||||||
|
module: 'redis',
|
||||||
|
});
|
||||||
|
|
||||||
|
const COMMAND_TIMEOUT = 10000;
|
||||||
|
const CONNECTION_TIMEOUT = 30000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new Redis client instance
|
||||||
|
* @param {object} conf - redis configuration
|
||||||
|
* @param {string} conf.host - redis host
|
||||||
|
* @param {number} conf.port - redis port
|
||||||
|
* @param {string} [conf.password] - redis password (optional)
|
||||||
|
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
|
||||||
|
* @param {Array<Object>} conf.sentinels - sentinels
|
||||||
|
* @param {Werelogs.Logger} log - Werelogs logger
|
||||||
|
* @return {Redis} - Redis client instance
|
||||||
|
*/
|
||||||
|
class RedisClient extends EventEmitter {
|
||||||
|
constructor(options) {
|
||||||
|
super();
|
||||||
|
this._redisOptions = options;
|
||||||
|
this._redis = null;
|
||||||
|
// Controls the use of additional command timeouts
|
||||||
|
// Only use if connecting to a sentinel cluster
|
||||||
|
this._useTimeouts = options.sentinels !== undefined;
|
||||||
|
this._inFlightTimeouts = this._useTimeouts ? new Set() : null;
|
||||||
|
this._runningRedisProbe = null;
|
||||||
|
this._isConnected = false;
|
||||||
|
this._isReady = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
connect(callback) {
|
||||||
|
this._initClient(false);
|
||||||
|
if (callback) {
|
||||||
|
process.nextTick(callback);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
disconnect(callback) {
|
||||||
|
return asyncOrCallback(async () => {
|
||||||
|
if (this._useTimeouts) {
|
||||||
|
Object.values(this._inFlightTimeouts)
|
||||||
|
.forEach(clearTimeout);
|
||||||
|
}
|
||||||
|
await this._redis.quit();
|
||||||
|
}, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
get isReady() {
|
||||||
|
return this._isConnected && this._isReady;
|
||||||
|
}
|
||||||
|
|
||||||
|
_initClient(startProbe = true) {
|
||||||
|
moduleLogger.debug('initializing redis client');
|
||||||
|
if (this._redis !== null) {
|
||||||
|
this._redis.off('connect', this._onConnect);
|
||||||
|
this._redis.off('ready', this._onReady);
|
||||||
|
this._redis.off('error', this._onError);
|
||||||
|
this._redis.quit();
|
||||||
|
}
|
||||||
|
this._isConnected = false;
|
||||||
|
this._isReady = false;
|
||||||
|
this._redis = new IORedis(this._redisOptions);
|
||||||
|
this._redis.on('connect', this._onConnect.bind(this));
|
||||||
|
this._redis.on('ready', this._onReady.bind(this));
|
||||||
|
this._redis.on('close', this._onClose.bind(this));
|
||||||
|
this._redis.on('reconnect', this._onReconnect.bind(this));
|
||||||
|
this._redis.on('end', this._onEnd.bind(this));
|
||||||
|
this._redis.on('error', this._onError.bind(this));
|
||||||
|
if (startProbe && this._runningRedisProbe === null) {
|
||||||
|
this._runningRedisProbe = setInterval(this._probeRedis.bind(this), CONNECTION_TIMEOUT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_probeRedis() {
|
||||||
|
if (this.isReady) {
|
||||||
|
moduleLogger.debug('redis client is ready, clearing reinitialize interval');
|
||||||
|
clearInterval(this._runningRedisProbe);
|
||||||
|
this._runningRedisProbe = null;
|
||||||
|
} else {
|
||||||
|
moduleLogger.warn('redis client has failed to become ready, reinitializing');
|
||||||
|
this._initClient();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_onConnect() {
|
||||||
|
this._isConnected = true;
|
||||||
|
this.emit('connect');
|
||||||
|
}
|
||||||
|
|
||||||
|
_onReady() {
|
||||||
|
this._isReady = true;
|
||||||
|
this.emit('ready');
|
||||||
|
}
|
||||||
|
|
||||||
|
_onError(error) {
|
||||||
|
moduleLogger.error('error connecting to redis', { error });
|
||||||
|
this._isConnected = false;
|
||||||
|
this._isReady = false;
|
||||||
|
this.emit('error', error);
|
||||||
|
}
|
||||||
|
|
||||||
|
_onReconnect(delay) {
|
||||||
|
moduleLogger.debug(`reconnecting to redis, waiting ${delay} ms`);
|
||||||
|
}
|
||||||
|
|
||||||
|
_onClose() {
|
||||||
|
moduleLogger.debug('connection to redis closed');
|
||||||
|
this._isConnected = false;
|
||||||
|
this._isReady = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
_onEnd() {
|
||||||
|
moduleLogger.debug('connection to redis ended.');
|
||||||
|
this._isConnected = false;
|
||||||
|
this._isReady = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
_createCommandTimeout() {
|
||||||
|
let timer;
|
||||||
|
const cancelTimeout = jsutil.once(() => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
this._inFlightTimeouts.delete(timer);
|
||||||
|
});
|
||||||
|
|
||||||
|
const timeout = new Promise((_, reject) => {
|
||||||
|
timer = setTimeout(
|
||||||
|
() => {
|
||||||
|
this.emit('timeout');
|
||||||
|
this._initClient();
|
||||||
|
},
|
||||||
|
COMMAND_TIMEOUT,
|
||||||
|
);
|
||||||
|
|
||||||
|
this._inFlightTimeouts.add(timer);
|
||||||
|
this.once('timeout', () => {
|
||||||
|
moduleLogger.warn('redis command timed out');
|
||||||
|
cancelTimeout();
|
||||||
|
reject(errors.OperationTimedOut);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
return { timeout, cancelTimeout };
|
||||||
|
}
|
||||||
|
|
||||||
|
async _call(asyncFunc) {
|
||||||
|
const funcPromise = asyncFunc(this._redis);
|
||||||
|
if (!this._useTimeouts) {
|
||||||
|
// If timeouts are disabled simply return the Promise
|
||||||
|
return funcPromise;
|
||||||
|
}
|
||||||
|
|
||||||
|
const { timeout, cancelTimeout } = this._createCommandTimeout();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// timeout always rejects so we can just return
|
||||||
|
return await Promise.race([funcPromise, timeout]);
|
||||||
|
} finally {
|
||||||
|
cancelTimeout();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
call(func, callback) {
|
||||||
|
if (callback !== undefined) {
|
||||||
|
// If a callback is provided `func` is assumed to also take a callback
|
||||||
|
// and is converted to a promise using promisify
|
||||||
|
return callbackify(this._call.bind(this))(promisify(func), callback);
|
||||||
|
}
|
||||||
|
return this._call(func);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = RedisClient;
|
|
@ -0,0 +1,20 @@
|
||||||
|
const { callbackify } = require('util');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convenience function to handle "if no callback then return a promise" pattern
|
||||||
|
*
|
||||||
|
* @param {Function} asyncFunc - asyncFunction to call
|
||||||
|
* @param {Function|undefined} callback - optional callback
|
||||||
|
* @returns {Promise|undefined} - returns a Promise if no callback is passed
|
||||||
|
*/
|
||||||
|
function asyncOrCallback(asyncFunc, callback) {
|
||||||
|
if (typeof callback === 'function') {
|
||||||
|
callbackify(asyncFunc)(callback);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
return asyncFunc();
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
asyncOrCallback,
|
||||||
|
};
|
|
@ -0,0 +1,11 @@
|
||||||
|
const log = require('./log');
|
||||||
|
const shard = require('./shard');
|
||||||
|
const timestamp = require('./timestamp');
|
||||||
|
const func = require('./func');
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
...log,
|
||||||
|
...shard,
|
||||||
|
...timestamp,
|
||||||
|
...func,
|
||||||
|
};
|
|
@ -4,14 +4,25 @@ const UtapiClient = require('../../../lib/UtapiClient');
|
||||||
const Datastore = require('../../../lib/Datastore');
|
const Datastore = require('../../../lib/Datastore');
|
||||||
const redisClient = require('../../../utils/redisClient');
|
const redisClient = require('../../../utils/redisClient');
|
||||||
const { Logger } = require('werelogs');
|
const { Logger } = require('werelogs');
|
||||||
const { getCounters, getMetricFromKey,
|
const RedisClientv2 = require('../../../libV2/redis');
|
||||||
getStateKeys, getKeys } = require('../../../lib/schema');
|
const {
|
||||||
|
getCounters, getMetricFromKey,
|
||||||
|
getStateKeys, getKeys,
|
||||||
|
} = require('../../../lib/schema');
|
||||||
|
|
||||||
const log = new Logger('TestUtapiClient');
|
const log = new Logger('TestUtapiClient');
|
||||||
const redis = redisClient({
|
const redis = redisClient({
|
||||||
host: '127.0.0.1',
|
host: '127.0.0.1',
|
||||||
port: 6379,
|
port: 6379,
|
||||||
}, log);
|
}, log);
|
||||||
const datastore = new Datastore().setClient(redis);
|
const redisV2 = new RedisClientv2({
|
||||||
|
host: '127.0.0.1',
|
||||||
|
port: 6379,
|
||||||
|
});
|
||||||
|
redisV2.connect();
|
||||||
|
const datastore = new Datastore().setClient(
|
||||||
|
redisV2,
|
||||||
|
);
|
||||||
const utapiConfig = {
|
const utapiConfig = {
|
||||||
redis: {
|
redis: {
|
||||||
host: '127.0.0.1',
|
host: '127.0.0.1',
|
||||||
|
|
|
@ -5,6 +5,7 @@ const UtapiReplay = require('../../../lib/UtapiReplay');
|
||||||
const UtapiClient = require('../../../lib/UtapiClient');
|
const UtapiClient = require('../../../lib/UtapiClient');
|
||||||
const Datastore = require('../../../lib/Datastore');
|
const Datastore = require('../../../lib/Datastore');
|
||||||
const redisClient = require('../../../utils/redisClient');
|
const redisClient = require('../../../utils/redisClient');
|
||||||
|
const RedisClientv2 = require('../../../libV2/redis');
|
||||||
const { getAllResourceTypeKeys } = require('../../utils/utils');
|
const { getAllResourceTypeKeys } = require('../../utils/utils');
|
||||||
const safeJsonParse = require('../../../utils/safeJsonParse');
|
const safeJsonParse = require('../../../utils/safeJsonParse');
|
||||||
|
|
||||||
|
@ -13,7 +14,12 @@ const localCache = redisClient({
|
||||||
host: '127.0.0.1',
|
host: '127.0.0.1',
|
||||||
port: 6379,
|
port: 6379,
|
||||||
}, log);
|
}, log);
|
||||||
const datastore = new Datastore().setClient(localCache);
|
const redis = new RedisClientv2({
|
||||||
|
host: '127.0.0.1',
|
||||||
|
port: 6379,
|
||||||
|
});
|
||||||
|
redis.connect();
|
||||||
|
const datastore = new Datastore().setClient(redis);
|
||||||
const utapiClient = new UtapiClient({
|
const utapiClient = new UtapiClient({
|
||||||
redis: {
|
redis: {
|
||||||
host: '127.0.0.1',
|
host: '127.0.0.1',
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
const RedisClient = require('../libV2/redis');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new Redis client instance
|
||||||
|
* @param {object} conf - redis configuration
|
||||||
|
* @param {string} conf.host - redis host
|
||||||
|
* @param {number} conf.port - redis port
|
||||||
|
* @param {string} [conf.password] - redis password (optional)
|
||||||
|
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
|
||||||
|
* @param {Werelogs.Logger} log - Werelogs logger
|
||||||
|
* @return {Redis} - Redis client instance
|
||||||
|
*/
|
||||||
|
function redisClientv2(conf, log) {
|
||||||
|
const client = new RedisClient({
|
||||||
|
// disable offline queue
|
||||||
|
enableOfflineQueue: false,
|
||||||
|
// keep alive 3 seconds
|
||||||
|
keepAlive: 3000,
|
||||||
|
// Only emit `ready` if the server is able to accept commands
|
||||||
|
enableReadyCheck: true,
|
||||||
|
...conf,
|
||||||
|
});
|
||||||
|
|
||||||
|
client.connect();
|
||||||
|
client.on('error', err => log.trace('error with redis client', {
|
||||||
|
error: err,
|
||||||
|
}));
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
module.exports = redisClientv2;
|
Loading…
Reference in New Issue