Compare commits
5 Commits
developmen
...
bugfix/S3C
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | f7c9758b9c | |
Taylor McKinnon | 1542962d72 | |
Taylor McKinnon | 9bbec85160 | |
Taylor McKinnon | 41e183d434 | |
Taylor McKinnon | 2b4bd85e27 |
|
@ -33,7 +33,10 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
set(key, value, cb) {
|
||||
return this._client.set(key, value, cb);
|
||||
return this._client.call(
|
||||
(backend, done) => backend.set(key, value, done),
|
||||
cb,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -45,7 +48,7 @@ class Datastore {
|
|||
*/
|
||||
setExpire(key, value, ttl) {
|
||||
// This method is a Promise because no callback is given.
|
||||
return this._client.set(key, value, 'EX', ttl, 'NX');
|
||||
return this._client.call(backend => backend.set(key, value, 'EX', ttl, 'NX'));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -54,7 +57,8 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
del(key) {
|
||||
return this._client.del(key);
|
||||
// This method is a Promise because no callback is given.
|
||||
return this._client.call(backend => backend.del(key));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -64,7 +68,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
get(key, cb) {
|
||||
return this._client.get(key, cb);
|
||||
return this._client.call((backend, done) => backend.get(key, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -74,7 +78,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
incr(key, cb) {
|
||||
return this._client.incr(key, cb);
|
||||
return this._client.call((backend, done) => backend.incr(key, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -84,7 +88,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
decr(key, cb) {
|
||||
return this._client.decr(key, cb);
|
||||
return this._client.call((backend, done) => backend.decr(key, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,7 +99,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
decrby(key, value, cb) {
|
||||
return this._client.decrby(key, value, cb);
|
||||
return this._client.call((backend, done) => backend.decrby(key, value, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -107,7 +111,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
zadd(key, score, value, cb) {
|
||||
return this._client.zadd(key, score, value, cb);
|
||||
return this._client.call((backend, done) => backend.zadd(key, score, value, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -120,7 +124,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
zrange(key, min, max, cb) {
|
||||
return this._client.zrange(key, min, max, cb);
|
||||
return this._client.call((backend, done) => backend.zrange(key, min, max, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -133,7 +137,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
zrangebyscore(key, min, max, cb) {
|
||||
return this._client.zrangebyscore(key, min, max, cb);
|
||||
return this._client.call((backend, done) => backend.zrangebyscore(key, min, max, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -146,8 +150,12 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
bZrangebyscore(keys, min, max, cb) {
|
||||
return this._client.pipeline(keys.map(
|
||||
item => ['zrangebyscore', item, min, max])).exec(cb);
|
||||
return this._client.call(
|
||||
(backend, done) => backend
|
||||
.pipeline(keys.map(item => ['zrangebyscore', item, min, max]))
|
||||
.exec(done),
|
||||
cb,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -157,7 +165,9 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
batch(cmds, cb) {
|
||||
return this._client.multi(cmds).exec(cb);
|
||||
return this._client.call((backend, done) => {
|
||||
backend.multi(cmds).exec(done);
|
||||
}, cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -167,7 +177,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
pipeline(cmds, cb) {
|
||||
return this._client.pipeline(cmds).exec(cb);
|
||||
return this._client.call((backend, done) => backend.pipeline(cmds).exec(done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -177,9 +187,10 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
multi(cmds, cb) {
|
||||
return this._client.multi(cmds).exec((err, res) => {
|
||||
return this._client.call((backend, done) =>
|
||||
backend.multi(cmds).exec((err, res) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
return done(err);
|
||||
}
|
||||
const flattenRes = [];
|
||||
const resErr = res.filter(item => {
|
||||
|
@ -187,10 +198,10 @@ class Datastore {
|
|||
return item[0] !== null;
|
||||
});
|
||||
if (resErr && resErr.length > 0) {
|
||||
return cb(resErr);
|
||||
return done(resErr);
|
||||
}
|
||||
return cb(null, flattenRes);
|
||||
});
|
||||
return done(null, flattenRes);
|
||||
}), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -203,7 +214,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
zremrangebyscore(key, min, max, cb) {
|
||||
return this._client.zremrangebyscore(key, min, max, cb);
|
||||
return this._client.call((backend, done) => backend.zremrangebyscore(key, min, max, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -214,7 +225,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
lpush(key, val, cb) {
|
||||
return this._client.lpush(key, val, cb);
|
||||
return this._client.call((backend, done) => backend.lpush(key, val, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -224,7 +235,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
rpop(key, cb) {
|
||||
return this._client.rpop(key, cb);
|
||||
return this._client.call((backend, done) => backend.rpop(key, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -236,7 +247,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
lrange(key, start, stop, cb) {
|
||||
return this._client.lrange(key, start, stop, cb);
|
||||
return this._client.call((backend, done) => backend.lrange(key, start, stop, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -246,7 +257,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
llen(key, cb) {
|
||||
return this._client.llen(key, cb);
|
||||
return this._client.call((backend, done) => backend.llen(key, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -257,7 +268,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
publish(channel, message, cb) {
|
||||
return this._client.publish(channel, message, cb);
|
||||
return this._client.call((backend, done) => backend.publish(channel, message, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -268,7 +279,7 @@ class Datastore {
|
|||
* @return {undefined}
|
||||
*/
|
||||
scan(cursor, pattern, cb) {
|
||||
return this._client.scan(cursor, 'match', pattern, cb);
|
||||
return this._client.call((backend, done) => backend.scan(cursor, 'match', pattern, done), cb);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ const werelogs = require('werelogs');
|
|||
const Datastore = require('./Datastore');
|
||||
const { generateKey, generateCounter, generateStateKey } = require('./schema');
|
||||
const { errors } = require('arsenal');
|
||||
const redisClient = require('../utils/redisClient');
|
||||
const redisClientv2 = require('../utils/redisClientv2');
|
||||
const member = require('../utils/member');
|
||||
|
||||
const methods = {
|
||||
|
@ -104,11 +104,11 @@ class UtapiClient {
|
|||
}
|
||||
if (config.redis) {
|
||||
this.ds = new Datastore()
|
||||
.setClient(redisClient(config.redis, this.log));
|
||||
.setClient(redisClientv2(config.redis, this.log));
|
||||
}
|
||||
if (config.localCache) {
|
||||
this.localCache = new Datastore()
|
||||
.setClient(redisClient(config.localCache, this.log));
|
||||
.setClient(redisClientv2(config.localCache, this.log));
|
||||
}
|
||||
if (config.component) {
|
||||
// The configuration uses the property `component`, while
|
||||
|
|
|
@ -7,7 +7,7 @@ const { jsutil } = require('arsenal');
|
|||
const werelogs = require('werelogs');
|
||||
|
||||
const Datastore = require('./Datastore');
|
||||
const redisClient = require('../utils/redisClient');
|
||||
const RedisClient = require('../libV2/redis');
|
||||
|
||||
const REINDEX_SCHEDULE = '0 0 * * Sun';
|
||||
const REINDEX_LOCK_KEY = 's3:utapireindex:lock';
|
||||
|
@ -62,7 +62,7 @@ class UtapiReindex {
|
|||
}
|
||||
|
||||
_getRedisClient() {
|
||||
return redisClient({
|
||||
const client = new RedisClient({
|
||||
sentinels: [{
|
||||
host: this._sentinel.host,
|
||||
port: this._sentinel.port,
|
||||
|
@ -70,7 +70,9 @@ class UtapiReindex {
|
|||
name: this._sentinel.name,
|
||||
sentinelPassword: this._sentinel.sentinelPassword,
|
||||
password: this._password,
|
||||
}, this._log);
|
||||
});
|
||||
client.connect();
|
||||
return client;
|
||||
}
|
||||
|
||||
_lock() {
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
/* eslint-disable no-underscore-dangle */
|
||||
const assert = require('assert');
|
||||
const async = require('async');
|
||||
const { scheduleJob } = require('node-schedule');
|
||||
const UtapiClient = require('./UtapiClient');
|
||||
const Datastore = require('./Datastore');
|
||||
const redisClient = require('../utils/redisClient');
|
||||
const safeJsonParse = require('../utils/safeJsonParse');
|
||||
const werelogs = require('werelogs');
|
||||
const redisClientv2 = require('../utils/redisClientv2');
|
||||
|
||||
// Every five minutes. Cron-style scheduling used by node-schedule.
|
||||
const REPLAY_SCHEDULE = '*/5 * * * *';
|
||||
|
@ -33,6 +34,7 @@ class UtapiReplay {
|
|||
this.replaySchedule = REPLAY_SCHEDULE;
|
||||
this.batchSize = BATCH_SIZE;
|
||||
this.disableReplay = true;
|
||||
this._isRunning = false;
|
||||
|
||||
if (config) {
|
||||
const message = 'missing required property in UtapiReplay ' +
|
||||
|
@ -41,7 +43,7 @@ class UtapiReplay {
|
|||
assert(config.localCache, `${message}: localCache`);
|
||||
this.utapiClient = new UtapiClient(config);
|
||||
this.localCache = new Datastore()
|
||||
.setClient(redisClient(config.localCache, this.log));
|
||||
.setClient(redisClientv2(config.localCache));
|
||||
if (config.replaySchedule) {
|
||||
this.replaySchedule = config.replaySchedule;
|
||||
}
|
||||
|
@ -184,17 +186,25 @@ class UtapiReplay {
|
|||
this.log.info('disabled utapi replay scheduler');
|
||||
return this;
|
||||
}
|
||||
const replay = scheduleJob(this.replaySchedule, () =>
|
||||
this._setLock()
|
||||
const replay = scheduleJob(this.replaySchedule, () => {
|
||||
if (!this._isRunning) {
|
||||
this._isRunning = true;
|
||||
return this._setLock()
|
||||
.then(res => {
|
||||
// If `res` is not `null`, there is no pre-existing lock.
|
||||
if (res) {
|
||||
return this._checkLocalCache();
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}));
|
||||
replay.on('scheduled', date =>
|
||||
this.log.info(`replay job started: ${date}`));
|
||||
})
|
||||
.then(() => { this._isRunning = false; })
|
||||
.catch(() => { this._isRunning = false; });
|
||||
}
|
||||
return;
|
||||
});
|
||||
|
||||
replay.on('scheduled', date => this.log.info(`replay job started: ${date}`));
|
||||
this.log.info('enabled utapi replay scheduler', {
|
||||
schedule: this.replaySchedule,
|
||||
});
|
||||
|
|
|
@ -52,6 +52,16 @@ class Memory {
|
|||
this.data = {};
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple wrapper provided for API compatibility with redis
|
||||
* @param {Function} func - Function to call
|
||||
* @param {callback} cb - callback
|
||||
* @returns {undefined}
|
||||
*/
|
||||
call(func, cb) {
|
||||
return func(this, cb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set key to hold a value
|
||||
* @param {string} key - data key
|
||||
|
|
|
@ -12,7 +12,7 @@ const Route = require('../router/Route');
|
|||
const Router = require('../router/Router');
|
||||
const UtapiRequest = require('../lib/UtapiRequest');
|
||||
const Datastore = require('./Datastore');
|
||||
const redisClient = require('../utils/redisClient');
|
||||
const RedisClient = require('../libV2/redis');
|
||||
|
||||
class UtapiServer {
|
||||
/**
|
||||
|
@ -91,7 +91,7 @@ class UtapiServer {
|
|||
return this.errorResponse(utapiRequest, errors.AccessDenied);
|
||||
}
|
||||
const redisClient = this.datastore.getClient();
|
||||
if (redisClient.status !== 'ready') {
|
||||
if (!redisClient.isReady) {
|
||||
return this.errorResponse(utapiRequest,
|
||||
errors.InternalError.customizeDescription(
|
||||
'Redis server is not ready'));
|
||||
|
@ -220,7 +220,15 @@ function spawn(params) {
|
|||
dump: log.dumpLevel });
|
||||
const cluster = new Clustering(workers, logger);
|
||||
cluster.start(worker => {
|
||||
const datastore = new Datastore().setClient(redisClient(redis, logger));
|
||||
const client = new RedisClient({
|
||||
// disable offline queue
|
||||
enableOfflineQueue: false,
|
||||
// keep alive 3 seconds
|
||||
keepAlive: 3000,
|
||||
...redis,
|
||||
});
|
||||
client.connect();
|
||||
const datastore = new Datastore().setClient(client);
|
||||
const server = new UtapiServer(worker, port, datastore, logger, config);
|
||||
server.startup();
|
||||
});
|
||||
|
|
|
@ -0,0 +1,182 @@
|
|||
const EventEmitter = require('events');
|
||||
const { callbackify, promisify } = require('util');
|
||||
const IORedis = require('ioredis');
|
||||
const { jsutil } = require('arsenal');
|
||||
|
||||
const errors = require('./errors');
|
||||
const { LoggerContext, asyncOrCallback } = require('./utils');
|
||||
|
||||
const moduleLogger = new LoggerContext({
|
||||
module: 'redis',
|
||||
});
|
||||
|
||||
const COMMAND_TIMEOUT = 10000;
|
||||
const CONNECTION_TIMEOUT = 30000;
|
||||
|
||||
/**
|
||||
* Creates a new Redis client instance
|
||||
* @param {object} conf - redis configuration
|
||||
* @param {string} conf.host - redis host
|
||||
* @param {number} conf.port - redis port
|
||||
* @param {string} [conf.password] - redis password (optional)
|
||||
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
|
||||
* @param {Array<Object>} conf.sentinels - sentinels
|
||||
* @param {Werelogs.Logger} log - Werelogs logger
|
||||
* @return {Redis} - Redis client instance
|
||||
*/
|
||||
class RedisClient extends EventEmitter {
|
||||
constructor(options) {
|
||||
super();
|
||||
this._redisOptions = options;
|
||||
this._redis = null;
|
||||
// Controls the use of additional command timeouts
|
||||
// Only use if connecting to a sentinel cluster
|
||||
this._useTimeouts = options.sentinels !== undefined;
|
||||
this._inFlightTimeouts = this._useTimeouts ? new Set() : null;
|
||||
this._runningRedisProbe = null;
|
||||
this._isConnected = false;
|
||||
this._isReady = false;
|
||||
}
|
||||
|
||||
connect(callback) {
|
||||
this._initClient(false);
|
||||
if (callback) {
|
||||
process.nextTick(callback);
|
||||
}
|
||||
}
|
||||
|
||||
disconnect(callback) {
|
||||
return asyncOrCallback(async () => {
|
||||
if (this._useTimeouts) {
|
||||
Object.values(this._inFlightTimeouts)
|
||||
.forEach(clearTimeout);
|
||||
}
|
||||
await this._redis.quit();
|
||||
}, callback);
|
||||
}
|
||||
|
||||
get isReady() {
|
||||
return this._isConnected && this._isReady;
|
||||
}
|
||||
|
||||
_initClient(startProbe = true) {
|
||||
moduleLogger.debug('initializing redis client');
|
||||
if (this._redis !== null) {
|
||||
this._redis.off('connect', this._onConnect);
|
||||
this._redis.off('ready', this._onReady);
|
||||
this._redis.off('error', this._onError);
|
||||
this._redis.quit();
|
||||
}
|
||||
this._isConnected = false;
|
||||
this._isReady = false;
|
||||
this._redis = new IORedis(this._redisOptions);
|
||||
this._redis.on('connect', this._onConnect.bind(this));
|
||||
this._redis.on('ready', this._onReady.bind(this));
|
||||
this._redis.on('close', this._onClose.bind(this));
|
||||
this._redis.on('reconnect', this._onReconnect.bind(this));
|
||||
this._redis.on('end', this._onEnd.bind(this));
|
||||
this._redis.on('error', this._onError.bind(this));
|
||||
if (startProbe && this._runningRedisProbe === null) {
|
||||
this._runningRedisProbe = setInterval(this._probeRedis.bind(this), CONNECTION_TIMEOUT);
|
||||
}
|
||||
}
|
||||
|
||||
_probeRedis() {
|
||||
if (this.isReady) {
|
||||
moduleLogger.debug('redis client is ready, clearing reinitialize interval');
|
||||
clearInterval(this._runningRedisProbe);
|
||||
this._runningRedisProbe = null;
|
||||
} else {
|
||||
moduleLogger.warn('redis client has failed to become ready, reinitializing');
|
||||
this._initClient();
|
||||
}
|
||||
}
|
||||
|
||||
_onConnect() {
|
||||
this._isConnected = true;
|
||||
this.emit('connect');
|
||||
}
|
||||
|
||||
_onReady() {
|
||||
this._isReady = true;
|
||||
this.emit('ready');
|
||||
}
|
||||
|
||||
_onError(error) {
|
||||
moduleLogger.error('error connecting to redis', { error });
|
||||
this._isConnected = false;
|
||||
this._isReady = false;
|
||||
this.emit('error', error);
|
||||
}
|
||||
|
||||
_onReconnect(delay) {
|
||||
moduleLogger.debug(`reconnecting to redis, waiting ${delay} ms`);
|
||||
}
|
||||
|
||||
_onClose() {
|
||||
moduleLogger.debug('connection to redis closed');
|
||||
this._isConnected = false;
|
||||
this._isReady = false;
|
||||
}
|
||||
|
||||
_onEnd() {
|
||||
moduleLogger.debug('connection to redis ended.');
|
||||
this._isConnected = false;
|
||||
this._isReady = false;
|
||||
}
|
||||
|
||||
_createCommandTimeout() {
|
||||
let timer;
|
||||
const cancelTimeout = jsutil.once(() => {
|
||||
clearTimeout(timer);
|
||||
this._inFlightTimeouts.delete(timer);
|
||||
});
|
||||
|
||||
const timeout = new Promise((_, reject) => {
|
||||
timer = setTimeout(
|
||||
() => {
|
||||
this.emit('timeout');
|
||||
this._initClient();
|
||||
},
|
||||
COMMAND_TIMEOUT,
|
||||
);
|
||||
|
||||
this._inFlightTimeouts.add(timer);
|
||||
this.once('timeout', () => {
|
||||
moduleLogger.warn('redis command timed out');
|
||||
cancelTimeout();
|
||||
reject(errors.OperationTimedOut);
|
||||
});
|
||||
});
|
||||
|
||||
return { timeout, cancelTimeout };
|
||||
}
|
||||
|
||||
async _call(asyncFunc) {
|
||||
const funcPromise = asyncFunc(this._redis);
|
||||
if (!this._useTimeouts) {
|
||||
// If timeouts are disabled simply return the Promise
|
||||
return funcPromise;
|
||||
}
|
||||
|
||||
const { timeout, cancelTimeout } = this._createCommandTimeout();
|
||||
|
||||
try {
|
||||
// timeout always rejects so we can just return
|
||||
return await Promise.race([funcPromise, timeout]);
|
||||
} finally {
|
||||
cancelTimeout();
|
||||
}
|
||||
}
|
||||
|
||||
call(func, callback) {
|
||||
if (callback !== undefined) {
|
||||
// If a callback is provided `func` is assumed to also take a callback
|
||||
// and is converted to a promise using promisify
|
||||
return callbackify(this._call.bind(this))(promisify(func), callback);
|
||||
}
|
||||
return this._call(func);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = RedisClient;
|
|
@ -0,0 +1,20 @@
|
|||
const { callbackify } = require('util');
|
||||
|
||||
/**
|
||||
* Convenience function to handle "if no callback then return a promise" pattern
|
||||
*
|
||||
* @param {Function} asyncFunc - asyncFunction to call
|
||||
* @param {Function|undefined} callback - optional callback
|
||||
* @returns {Promise|undefined} - returns a Promise if no callback is passed
|
||||
*/
|
||||
function asyncOrCallback(asyncFunc, callback) {
|
||||
if (typeof callback === 'function') {
|
||||
callbackify(asyncFunc)(callback);
|
||||
return undefined;
|
||||
}
|
||||
return asyncFunc();
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
asyncOrCallback,
|
||||
};
|
|
@ -0,0 +1,11 @@
|
|||
const log = require('./log');
|
||||
const shard = require('./shard');
|
||||
const timestamp = require('./timestamp');
|
||||
const func = require('./func');
|
||||
|
||||
module.exports = {
|
||||
...log,
|
||||
...shard,
|
||||
...timestamp,
|
||||
...func,
|
||||
};
|
|
@ -4,14 +4,25 @@ const UtapiClient = require('../../../lib/UtapiClient');
|
|||
const Datastore = require('../../../lib/Datastore');
|
||||
const redisClient = require('../../../utils/redisClient');
|
||||
const { Logger } = require('werelogs');
|
||||
const { getCounters, getMetricFromKey,
|
||||
getStateKeys, getKeys } = require('../../../lib/schema');
|
||||
const RedisClientv2 = require('../../../libV2/redis');
|
||||
const {
|
||||
getCounters, getMetricFromKey,
|
||||
getStateKeys, getKeys,
|
||||
} = require('../../../lib/schema');
|
||||
|
||||
const log = new Logger('TestUtapiClient');
|
||||
const redis = redisClient({
|
||||
host: '127.0.0.1',
|
||||
port: 6379,
|
||||
}, log);
|
||||
const datastore = new Datastore().setClient(redis);
|
||||
const redisV2 = new RedisClientv2({
|
||||
host: '127.0.0.1',
|
||||
port: 6379,
|
||||
});
|
||||
redisV2.connect();
|
||||
const datastore = new Datastore().setClient(
|
||||
redisV2,
|
||||
);
|
||||
const utapiConfig = {
|
||||
redis: {
|
||||
host: '127.0.0.1',
|
||||
|
|
|
@ -5,6 +5,7 @@ const UtapiReplay = require('../../../lib/UtapiReplay');
|
|||
const UtapiClient = require('../../../lib/UtapiClient');
|
||||
const Datastore = require('../../../lib/Datastore');
|
||||
const redisClient = require('../../../utils/redisClient');
|
||||
const RedisClientv2 = require('../../../libV2/redis');
|
||||
const { getAllResourceTypeKeys } = require('../../utils/utils');
|
||||
const safeJsonParse = require('../../../utils/safeJsonParse');
|
||||
|
||||
|
@ -13,7 +14,12 @@ const localCache = redisClient({
|
|||
host: '127.0.0.1',
|
||||
port: 6379,
|
||||
}, log);
|
||||
const datastore = new Datastore().setClient(localCache);
|
||||
const redis = new RedisClientv2({
|
||||
host: '127.0.0.1',
|
||||
port: 6379,
|
||||
});
|
||||
redis.connect();
|
||||
const datastore = new Datastore().setClient(redis);
|
||||
const utapiClient = new UtapiClient({
|
||||
redis: {
|
||||
host: '127.0.0.1',
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
const RedisClient = require('../libV2/redis');
|
||||
|
||||
/**
|
||||
* Creates a new Redis client instance
|
||||
* @param {object} conf - redis configuration
|
||||
* @param {string} conf.host - redis host
|
||||
* @param {number} conf.port - redis port
|
||||
* @param {string} [conf.password] - redis password (optional)
|
||||
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
|
||||
* @param {Werelogs.Logger} log - Werelogs logger
|
||||
* @return {Redis} - Redis client instance
|
||||
*/
|
||||
function redisClientv2(conf, log) {
|
||||
const client = new RedisClient({
|
||||
// disable offline queue
|
||||
enableOfflineQueue: false,
|
||||
// keep alive 3 seconds
|
||||
keepAlive: 3000,
|
||||
// Only emit `ready` if the server is able to accept commands
|
||||
enableReadyCheck: true,
|
||||
...conf,
|
||||
});
|
||||
|
||||
client.connect();
|
||||
client.on('error', err => log.trace('error with redis client', {
|
||||
error: err,
|
||||
}));
|
||||
return client;
|
||||
}
|
||||
|
||||
|
||||
module.exports = redisClientv2;
|
Loading…
Reference in New Issue