Compare commits

..

No commits in common. "f7c9758b9c07942ad65322d45a1113e7affe8fc0" and "acbf8880f61fa492782142a539fe1208ab35a2f6" have entirely different histories.

12 changed files with 61 additions and 364 deletions

View File

@ -33,10 +33,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
set(key, value, cb) { set(key, value, cb) {
return this._client.call( return this._client.set(key, value, cb);
(backend, done) => backend.set(key, value, done),
cb,
);
} }
/** /**
@ -48,7 +45,7 @@ class Datastore {
*/ */
setExpire(key, value, ttl) { setExpire(key, value, ttl) {
// This method is a Promise because no callback is given. // This method is a Promise because no callback is given.
return this._client.call(backend => backend.set(key, value, 'EX', ttl, 'NX')); return this._client.set(key, value, 'EX', ttl, 'NX');
} }
/** /**
@ -57,8 +54,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
del(key) { del(key) {
// This method is a Promise because no callback is given. return this._client.del(key);
return this._client.call(backend => backend.del(key));
} }
/** /**
@ -68,7 +64,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
get(key, cb) { get(key, cb) {
return this._client.call((backend, done) => backend.get(key, done), cb); return this._client.get(key, cb);
} }
/** /**
@ -78,7 +74,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
incr(key, cb) { incr(key, cb) {
return this._client.call((backend, done) => backend.incr(key, done), cb); return this._client.incr(key, cb);
} }
/** /**
@ -88,7 +84,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
decr(key, cb) { decr(key, cb) {
return this._client.call((backend, done) => backend.decr(key, done), cb); return this._client.decr(key, cb);
} }
/** /**
@ -99,7 +95,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
decrby(key, value, cb) { decrby(key, value, cb) {
return this._client.call((backend, done) => backend.decrby(key, value, done), cb); return this._client.decrby(key, value, cb);
} }
/** /**
@ -111,7 +107,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
zadd(key, score, value, cb) { zadd(key, score, value, cb) {
return this._client.call((backend, done) => backend.zadd(key, score, value, done), cb); return this._client.zadd(key, score, value, cb);
} }
/** /**
@ -124,7 +120,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
zrange(key, min, max, cb) { zrange(key, min, max, cb) {
return this._client.call((backend, done) => backend.zrange(key, min, max, done), cb); return this._client.zrange(key, min, max, cb);
} }
/** /**
@ -137,7 +133,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
zrangebyscore(key, min, max, cb) { zrangebyscore(key, min, max, cb) {
return this._client.call((backend, done) => backend.zrangebyscore(key, min, max, done), cb); return this._client.zrangebyscore(key, min, max, cb);
} }
/** /**
@ -150,12 +146,8 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
bZrangebyscore(keys, min, max, cb) { bZrangebyscore(keys, min, max, cb) {
return this._client.call( return this._client.pipeline(keys.map(
(backend, done) => backend item => ['zrangebyscore', item, min, max])).exec(cb);
.pipeline(keys.map(item => ['zrangebyscore', item, min, max]))
.exec(done),
cb,
);
} }
/** /**
@ -165,9 +157,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
batch(cmds, cb) { batch(cmds, cb) {
return this._client.call((backend, done) => { return this._client.multi(cmds).exec(cb);
backend.multi(cmds).exec(done);
}, cb);
} }
/** /**
@ -177,7 +167,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
pipeline(cmds, cb) { pipeline(cmds, cb) {
return this._client.call((backend, done) => backend.pipeline(cmds).exec(done), cb); return this._client.pipeline(cmds).exec(cb);
} }
/** /**
@ -187,10 +177,9 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
multi(cmds, cb) { multi(cmds, cb) {
return this._client.call((backend, done) => return this._client.multi(cmds).exec((err, res) => {
backend.multi(cmds).exec((err, res) => {
if (err) { if (err) {
return done(err); return cb(err);
} }
const flattenRes = []; const flattenRes = [];
const resErr = res.filter(item => { const resErr = res.filter(item => {
@ -198,10 +187,10 @@ class Datastore {
return item[0] !== null; return item[0] !== null;
}); });
if (resErr && resErr.length > 0) { if (resErr && resErr.length > 0) {
return done(resErr); return cb(resErr);
} }
return done(null, flattenRes); return cb(null, flattenRes);
}), cb); });
} }
/** /**
@ -214,7 +203,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
zremrangebyscore(key, min, max, cb) { zremrangebyscore(key, min, max, cb) {
return this._client.call((backend, done) => backend.zremrangebyscore(key, min, max, done), cb); return this._client.zremrangebyscore(key, min, max, cb);
} }
/** /**
@ -225,7 +214,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
lpush(key, val, cb) { lpush(key, val, cb) {
return this._client.call((backend, done) => backend.lpush(key, val, done), cb); return this._client.lpush(key, val, cb);
} }
/** /**
@ -235,7 +224,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
rpop(key, cb) { rpop(key, cb) {
return this._client.call((backend, done) => backend.rpop(key, done), cb); return this._client.rpop(key, cb);
} }
/** /**
@ -247,7 +236,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
lrange(key, start, stop, cb) { lrange(key, start, stop, cb) {
return this._client.call((backend, done) => backend.lrange(key, start, stop, done), cb); return this._client.lrange(key, start, stop, cb);
} }
/** /**
@ -257,7 +246,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
llen(key, cb) { llen(key, cb) {
return this._client.call((backend, done) => backend.llen(key, done), cb); return this._client.llen(key, cb);
} }
/** /**
@ -268,7 +257,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
publish(channel, message, cb) { publish(channel, message, cb) {
return this._client.call((backend, done) => backend.publish(channel, message, done), cb); return this._client.publish(channel, message, cb);
} }
/** /**
@ -279,7 +268,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
scan(cursor, pattern, cb) { scan(cursor, pattern, cb) {
return this._client.call((backend, done) => backend.scan(cursor, 'match', pattern, done), cb); return this._client.scan(cursor, 'match', pattern, cb);
} }
} }

View File

@ -4,7 +4,7 @@ const werelogs = require('werelogs');
const Datastore = require('./Datastore'); const Datastore = require('./Datastore');
const { generateKey, generateCounter, generateStateKey } = require('./schema'); const { generateKey, generateCounter, generateStateKey } = require('./schema');
const { errors } = require('arsenal'); const { errors } = require('arsenal');
const redisClientv2 = require('../utils/redisClientv2'); const redisClient = require('../utils/redisClient');
const member = require('../utils/member'); const member = require('../utils/member');
const methods = { const methods = {
@ -104,11 +104,11 @@ class UtapiClient {
} }
if (config.redis) { if (config.redis) {
this.ds = new Datastore() this.ds = new Datastore()
.setClient(redisClientv2(config.redis, this.log)); .setClient(redisClient(config.redis, this.log));
} }
if (config.localCache) { if (config.localCache) {
this.localCache = new Datastore() this.localCache = new Datastore()
.setClient(redisClientv2(config.localCache, this.log)); .setClient(redisClient(config.localCache, this.log));
} }
if (config.component) { if (config.component) {
// The configuration uses the property `component`, while // The configuration uses the property `component`, while

View File

@ -7,7 +7,7 @@ const { jsutil } = require('arsenal');
const werelogs = require('werelogs'); const werelogs = require('werelogs');
const Datastore = require('./Datastore'); const Datastore = require('./Datastore');
const RedisClient = require('../libV2/redis'); const redisClient = require('../utils/redisClient');
const REINDEX_SCHEDULE = '0 0 * * Sun'; const REINDEX_SCHEDULE = '0 0 * * Sun';
const REINDEX_LOCK_KEY = 's3:utapireindex:lock'; const REINDEX_LOCK_KEY = 's3:utapireindex:lock';
@ -62,7 +62,7 @@ class UtapiReindex {
} }
_getRedisClient() { _getRedisClient() {
const client = new RedisClient({ return redisClient({
sentinels: [{ sentinels: [{
host: this._sentinel.host, host: this._sentinel.host,
port: this._sentinel.port, port: this._sentinel.port,
@ -70,9 +70,7 @@ class UtapiReindex {
name: this._sentinel.name, name: this._sentinel.name,
sentinelPassword: this._sentinel.sentinelPassword, sentinelPassword: this._sentinel.sentinelPassword,
password: this._password, password: this._password,
}); }, this._log);
client.connect();
return client;
} }
_lock() { _lock() {

View File

@ -1,12 +1,11 @@
/* eslint-disable no-underscore-dangle */
const assert = require('assert'); const assert = require('assert');
const async = require('async'); const async = require('async');
const { scheduleJob } = require('node-schedule'); const { scheduleJob } = require('node-schedule');
const UtapiClient = require('./UtapiClient'); const UtapiClient = require('./UtapiClient');
const Datastore = require('./Datastore'); const Datastore = require('./Datastore');
const redisClient = require('../utils/redisClient');
const safeJsonParse = require('../utils/safeJsonParse'); const safeJsonParse = require('../utils/safeJsonParse');
const werelogs = require('werelogs'); const werelogs = require('werelogs');
const redisClientv2 = require('../utils/redisClientv2');
// Every five minutes. Cron-style scheduling used by node-schedule. // Every five minutes. Cron-style scheduling used by node-schedule.
const REPLAY_SCHEDULE = '*/5 * * * *'; const REPLAY_SCHEDULE = '*/5 * * * *';
@ -34,7 +33,6 @@ class UtapiReplay {
this.replaySchedule = REPLAY_SCHEDULE; this.replaySchedule = REPLAY_SCHEDULE;
this.batchSize = BATCH_SIZE; this.batchSize = BATCH_SIZE;
this.disableReplay = true; this.disableReplay = true;
this._isRunning = false;
if (config) { if (config) {
const message = 'missing required property in UtapiReplay ' + const message = 'missing required property in UtapiReplay ' +
@ -43,7 +41,7 @@ class UtapiReplay {
assert(config.localCache, `${message}: localCache`); assert(config.localCache, `${message}: localCache`);
this.utapiClient = new UtapiClient(config); this.utapiClient = new UtapiClient(config);
this.localCache = new Datastore() this.localCache = new Datastore()
.setClient(redisClientv2(config.localCache)); .setClient(redisClient(config.localCache, this.log));
if (config.replaySchedule) { if (config.replaySchedule) {
this.replaySchedule = config.replaySchedule; this.replaySchedule = config.replaySchedule;
} }
@ -186,25 +184,17 @@ class UtapiReplay {
this.log.info('disabled utapi replay scheduler'); this.log.info('disabled utapi replay scheduler');
return this; return this;
} }
const replay = scheduleJob(this.replaySchedule, () => { const replay = scheduleJob(this.replaySchedule, () =>
if (!this._isRunning) { this._setLock()
this._isRunning = true;
return this._setLock()
.then(res => { .then(res => {
// If `res` is not `null`, there is no pre-existing lock. // If `res` is not `null`, there is no pre-existing lock.
if (res) { if (res) {
return this._checkLocalCache(); return this._checkLocalCache();
} }
return undefined; return undefined;
}) }));
.then(() => { this._isRunning = false; }) replay.on('scheduled', date =>
.catch(() => { this._isRunning = false; }); this.log.info(`replay job started: ${date}`));
}
return;
});
replay.on('scheduled', date => this.log.info(`replay job started: ${date}`));
this.log.info('enabled utapi replay scheduler', { this.log.info('enabled utapi replay scheduler', {
schedule: this.replaySchedule, schedule: this.replaySchedule,
}); });

View File

@ -52,16 +52,6 @@ class Memory {
this.data = {}; this.data = {};
} }
/**
* A simple wrapper provided for API compatibility with redis
* @param {Function} func - Function to call
* @param {callback} cb - callback
* @returns {undefined}
*/
call(func, cb) {
return func(this, cb);
}
/** /**
* Set key to hold a value * Set key to hold a value
* @param {string} key - data key * @param {string} key - data key

View File

@ -12,7 +12,7 @@ const Route = require('../router/Route');
const Router = require('../router/Router'); const Router = require('../router/Router');
const UtapiRequest = require('../lib/UtapiRequest'); const UtapiRequest = require('../lib/UtapiRequest');
const Datastore = require('./Datastore'); const Datastore = require('./Datastore');
const RedisClient = require('../libV2/redis'); const redisClient = require('../utils/redisClient');
class UtapiServer { class UtapiServer {
/** /**
@ -91,7 +91,7 @@ class UtapiServer {
return this.errorResponse(utapiRequest, errors.AccessDenied); return this.errorResponse(utapiRequest, errors.AccessDenied);
} }
const redisClient = this.datastore.getClient(); const redisClient = this.datastore.getClient();
if (!redisClient.isReady) { if (redisClient.status !== 'ready') {
return this.errorResponse(utapiRequest, return this.errorResponse(utapiRequest,
errors.InternalError.customizeDescription( errors.InternalError.customizeDescription(
'Redis server is not ready')); 'Redis server is not ready'));
@ -220,15 +220,7 @@ function spawn(params) {
dump: log.dumpLevel }); dump: log.dumpLevel });
const cluster = new Clustering(workers, logger); const cluster = new Clustering(workers, logger);
cluster.start(worker => { cluster.start(worker => {
const client = new RedisClient({ const datastore = new Datastore().setClient(redisClient(redis, logger));
// disable offline queue
enableOfflineQueue: false,
// keep alive 3 seconds
keepAlive: 3000,
...redis,
});
client.connect();
const datastore = new Datastore().setClient(client);
const server = new UtapiServer(worker, port, datastore, logger, config); const server = new UtapiServer(worker, port, datastore, logger, config);
server.startup(); server.startup();
}); });

View File

@ -1,182 +0,0 @@
const EventEmitter = require('events');
const { callbackify, promisify } = require('util');
const IORedis = require('ioredis');
const { jsutil } = require('arsenal');
const errors = require('./errors');
const { LoggerContext, asyncOrCallback } = require('./utils');
const moduleLogger = new LoggerContext({
module: 'redis',
});
const COMMAND_TIMEOUT = 10000;
const CONNECTION_TIMEOUT = 30000;
/**
* Creates a new Redis client instance
* @param {object} conf - redis configuration
* @param {string} conf.host - redis host
* @param {number} conf.port - redis port
* @param {string} [conf.password] - redis password (optional)
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
* @param {Array<Object>} conf.sentinels - sentinels
* @param {Werelogs.Logger} log - Werelogs logger
* @return {Redis} - Redis client instance
*/
class RedisClient extends EventEmitter {
constructor(options) {
super();
this._redisOptions = options;
this._redis = null;
// Controls the use of additional command timeouts
// Only use if connecting to a sentinel cluster
this._useTimeouts = options.sentinels !== undefined;
this._inFlightTimeouts = this._useTimeouts ? new Set() : null;
this._runningRedisProbe = null;
this._isConnected = false;
this._isReady = false;
}
connect(callback) {
this._initClient(false);
if (callback) {
process.nextTick(callback);
}
}
disconnect(callback) {
return asyncOrCallback(async () => {
if (this._useTimeouts) {
Object.values(this._inFlightTimeouts)
.forEach(clearTimeout);
}
await this._redis.quit();
}, callback);
}
get isReady() {
return this._isConnected && this._isReady;
}
_initClient(startProbe = true) {
moduleLogger.debug('initializing redis client');
if (this._redis !== null) {
this._redis.off('connect', this._onConnect);
this._redis.off('ready', this._onReady);
this._redis.off('error', this._onError);
this._redis.quit();
}
this._isConnected = false;
this._isReady = false;
this._redis = new IORedis(this._redisOptions);
this._redis.on('connect', this._onConnect.bind(this));
this._redis.on('ready', this._onReady.bind(this));
this._redis.on('close', this._onClose.bind(this));
this._redis.on('reconnect', this._onReconnect.bind(this));
this._redis.on('end', this._onEnd.bind(this));
this._redis.on('error', this._onError.bind(this));
if (startProbe && this._runningRedisProbe === null) {
this._runningRedisProbe = setInterval(this._probeRedis.bind(this), CONNECTION_TIMEOUT);
}
}
_probeRedis() {
if (this.isReady) {
moduleLogger.debug('redis client is ready, clearing reinitialize interval');
clearInterval(this._runningRedisProbe);
this._runningRedisProbe = null;
} else {
moduleLogger.warn('redis client has failed to become ready, reinitializing');
this._initClient();
}
}
_onConnect() {
this._isConnected = true;
this.emit('connect');
}
_onReady() {
this._isReady = true;
this.emit('ready');
}
_onError(error) {
moduleLogger.error('error connecting to redis', { error });
this._isConnected = false;
this._isReady = false;
this.emit('error', error);
}
_onReconnect(delay) {
moduleLogger.debug(`reconnecting to redis, waiting ${delay} ms`);
}
_onClose() {
moduleLogger.debug('connection to redis closed');
this._isConnected = false;
this._isReady = false;
}
_onEnd() {
moduleLogger.debug('connection to redis ended.');
this._isConnected = false;
this._isReady = false;
}
_createCommandTimeout() {
let timer;
const cancelTimeout = jsutil.once(() => {
clearTimeout(timer);
this._inFlightTimeouts.delete(timer);
});
const timeout = new Promise((_, reject) => {
timer = setTimeout(
() => {
this.emit('timeout');
this._initClient();
},
COMMAND_TIMEOUT,
);
this._inFlightTimeouts.add(timer);
this.once('timeout', () => {
moduleLogger.warn('redis command timed out');
cancelTimeout();
reject(errors.OperationTimedOut);
});
});
return { timeout, cancelTimeout };
}
async _call(asyncFunc) {
const funcPromise = asyncFunc(this._redis);
if (!this._useTimeouts) {
// If timeouts are disabled simply return the Promise
return funcPromise;
}
const { timeout, cancelTimeout } = this._createCommandTimeout();
try {
// timeout always rejects so we can just return
return await Promise.race([funcPromise, timeout]);
} finally {
cancelTimeout();
}
}
call(func, callback) {
if (callback !== undefined) {
// If a callback is provided `func` is assumed to also take a callback
// and is converted to a promise using promisify
return callbackify(this._call.bind(this))(promisify(func), callback);
}
return this._call(func);
}
}
module.exports = RedisClient;

View File

@ -1,20 +0,0 @@
const { callbackify } = require('util');
/**
* Convenience function to handle "if no callback then return a promise" pattern
*
* @param {Function} asyncFunc - asyncFunction to call
* @param {Function|undefined} callback - optional callback
* @returns {Promise|undefined} - returns a Promise if no callback is passed
*/
function asyncOrCallback(asyncFunc, callback) {
if (typeof callback === 'function') {
callbackify(asyncFunc)(callback);
return undefined;
}
return asyncFunc();
}
module.exports = {
asyncOrCallback,
};

View File

@ -1,11 +0,0 @@
const log = require('./log');
const shard = require('./shard');
const timestamp = require('./timestamp');
const func = require('./func');
module.exports = {
...log,
...shard,
...timestamp,
...func,
};

View File

@ -4,25 +4,14 @@ const UtapiClient = require('../../../lib/UtapiClient');
const Datastore = require('../../../lib/Datastore'); const Datastore = require('../../../lib/Datastore');
const redisClient = require('../../../utils/redisClient'); const redisClient = require('../../../utils/redisClient');
const { Logger } = require('werelogs'); const { Logger } = require('werelogs');
const RedisClientv2 = require('../../../libV2/redis'); const { getCounters, getMetricFromKey,
const { getStateKeys, getKeys } = require('../../../lib/schema');
getCounters, getMetricFromKey,
getStateKeys, getKeys,
} = require('../../../lib/schema');
const log = new Logger('TestUtapiClient'); const log = new Logger('TestUtapiClient');
const redis = redisClient({ const redis = redisClient({
host: '127.0.0.1', host: '127.0.0.1',
port: 6379, port: 6379,
}, log); }, log);
const redisV2 = new RedisClientv2({ const datastore = new Datastore().setClient(redis);
host: '127.0.0.1',
port: 6379,
});
redisV2.connect();
const datastore = new Datastore().setClient(
redisV2,
);
const utapiConfig = { const utapiConfig = {
redis: { redis: {
host: '127.0.0.1', host: '127.0.0.1',

View File

@ -5,7 +5,6 @@ const UtapiReplay = require('../../../lib/UtapiReplay');
const UtapiClient = require('../../../lib/UtapiClient'); const UtapiClient = require('../../../lib/UtapiClient');
const Datastore = require('../../../lib/Datastore'); const Datastore = require('../../../lib/Datastore');
const redisClient = require('../../../utils/redisClient'); const redisClient = require('../../../utils/redisClient');
const RedisClientv2 = require('../../../libV2/redis');
const { getAllResourceTypeKeys } = require('../../utils/utils'); const { getAllResourceTypeKeys } = require('../../utils/utils');
const safeJsonParse = require('../../../utils/safeJsonParse'); const safeJsonParse = require('../../../utils/safeJsonParse');
@ -14,12 +13,7 @@ const localCache = redisClient({
host: '127.0.0.1', host: '127.0.0.1',
port: 6379, port: 6379,
}, log); }, log);
const redis = new RedisClientv2({ const datastore = new Datastore().setClient(localCache);
host: '127.0.0.1',
port: 6379,
});
redis.connect();
const datastore = new Datastore().setClient(redis);
const utapiClient = new UtapiClient({ const utapiClient = new UtapiClient({
redis: { redis: {
host: '127.0.0.1', host: '127.0.0.1',

View File

@ -1,32 +0,0 @@
const RedisClient = require('../libV2/redis');
/**
* Creates a new Redis client instance
* @param {object} conf - redis configuration
* @param {string} conf.host - redis host
* @param {number} conf.port - redis port
* @param {string} [conf.password] - redis password (optional)
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
* @param {Werelogs.Logger} log - Werelogs logger
* @return {Redis} - Redis client instance
*/
function redisClientv2(conf, log) {
const client = new RedisClient({
// disable offline queue
enableOfflineQueue: false,
// keep alive 3 seconds
keepAlive: 3000,
// Only emit `ready` if the server is able to accept commands
enableReadyCheck: true,
...conf,
});
client.connect();
client.on('error', err => log.trace('error with redis client', {
error: err,
}));
return client;
}
module.exports = redisClientv2;