Various important fixes
- Don't decode other date types - Add LOCK options - Add CALC_FOUND_ROWS emulation - Allow Text object as tables - Export constants - Some occult fixes for error handling in long-running daemons - Add commit() and rollback() transaction wrapper methods - Don't use postgresql bind variables because it slows down big queries - Fix escaping! (shame on me)master
parent
70dcc2c70b
commit
e4a271d6bd
|
@ -1,21 +1,33 @@
|
|||
// Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8
|
||||
// (c) Виталий Филиппов, 2019
|
||||
// Версия 2019-07-04
|
||||
// Версия 2020-01-06
|
||||
|
||||
// В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи,
|
||||
// благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ]
|
||||
// Здесь так нельзя, поэтому этот синтаксис мы заменяем на { 't1.a=t2.a': [], 't2.b': [ 1, 2, 3 ] }
|
||||
// Или на [ 't1.a=t2.a', [ 't2.b', 1, 2, 3 ] ] - можно писать и так, и так
|
||||
|
||||
const pg = require('pg');
|
||||
const pg_escape = require('pg-escape');
|
||||
|
||||
// Сраный node-postgres конвертирует даты в Date и портит таймзону
|
||||
|
||||
const DATATYPE_DATE = 1082;
|
||||
pg.types.setTypeParser(DATATYPE_DATE, function(val)
|
||||
const DATATYPE_TIME = 1083;
|
||||
const DATATYPE_TIMESTAMP = 1114;
|
||||
const DATATYPE_TIMESTAMPTZ = 1184;
|
||||
const DATATYPE_TIMETZ = 1266;
|
||||
|
||||
pg.types.setTypeParser(DATATYPE_DATE, pgToString);
|
||||
pg.types.setTypeParser(DATATYPE_TIME, pgToString);
|
||||
pg.types.setTypeParser(DATATYPE_TIMESTAMP, pgToString);
|
||||
pg.types.setTypeParser(DATATYPE_TIMESTAMPTZ, pgToString);
|
||||
pg.types.setTypeParser(DATATYPE_TIMETZ, pgToString);
|
||||
|
||||
function pgToString(val)
|
||||
{
|
||||
return val === null ? null : val;
|
||||
});
|
||||
|
||||
let pg_escape;
|
||||
}
|
||||
|
||||
const MS_HASH = 0;
|
||||
const MS_LIST = 1;
|
||||
|
@ -81,6 +93,14 @@ function select_builder(tables, fields, where, options)
|
|||
{
|
||||
sql += ' OFFSET '+((options.OFFSET || options.offset) | 0);
|
||||
}
|
||||
if (options['FOR UPDATE'] || options.for_update)
|
||||
{
|
||||
sql += ' FOR UPDATE';
|
||||
}
|
||||
else if (options['LOCK IN SHARE MODE'] || options.lock_share)
|
||||
{
|
||||
sql += ' LOCK IN SHARE MODE';
|
||||
}
|
||||
return new Text(sql, bind);
|
||||
}
|
||||
|
||||
|
@ -94,6 +114,10 @@ function tables_builder(tables)
|
|||
sql = tables;
|
||||
return { sql, bind, moreWhere };
|
||||
}
|
||||
else if (tables instanceof Text)
|
||||
{
|
||||
return { sql: tables.sql, bind: tables.bind, moreWhere };
|
||||
}
|
||||
for (const k in tables)
|
||||
{
|
||||
let jointype = 'INNER', table = tables[k], conds = null;
|
||||
|
@ -118,6 +142,11 @@ function tables_builder(tables)
|
|||
}
|
||||
sql += ') AS '+k+'('+table.keys.join(', ')+')';
|
||||
}
|
||||
else if (table instanceof Text)
|
||||
{
|
||||
sql += '(' + table.sql + ') ' + k;
|
||||
bind.push.apply(bind, table.bind);
|
||||
}
|
||||
else if (typeof table == 'object')
|
||||
{
|
||||
// Nested join, `k` alias is ignored
|
||||
|
@ -320,32 +349,23 @@ function split_using(tables)
|
|||
|
||||
function quote(v)
|
||||
{
|
||||
if (!pg_escape)
|
||||
pg_escape = require('pg-escape');
|
||||
if (v == null)
|
||||
return 'null';
|
||||
else if (v === true || v === false)
|
||||
return ''+v;
|
||||
else if (typeof v == 'object')
|
||||
v = JSON.stringify(v);
|
||||
else if (typeof v == 'number')
|
||||
return v;
|
||||
return '\''+pg_escape.string(v)+'\'';
|
||||
return pg_escape.literal(v);
|
||||
}
|
||||
|
||||
// Превратить bind пераметры, выраженные как ?, в вид $n (как в node-postgres)
|
||||
function _positional(sql)
|
||||
{
|
||||
let i = 0;
|
||||
sql = sql.replace(/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g, (m, m1) => (m1 ? '$'+(++i) : m));
|
||||
return sql;
|
||||
}
|
||||
|
||||
// Встроить все bind пераметры, выраженные как ?, в строку
|
||||
// Встроить все bind пераметры, выраженные как ?, в строку.
|
||||
// Почему? А потому, что в node-postgres, похоже, есть лимит числа bind переменных. После ~30000 он сваливается.
|
||||
// FIXME: В postgresql есть оператор ?. Его надо научиться экранировать. Либо сразу строить запрос в строчку
|
||||
// и не заниматься ерундой с подстановками.
|
||||
function _inline(sql, bind)
|
||||
{
|
||||
if (!pg_escape)
|
||||
{
|
||||
pg_escape = require('pg-escape');
|
||||
}
|
||||
let i = 0;
|
||||
sql = sql.replace(
|
||||
/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g,
|
||||
|
@ -353,23 +373,16 @@ function _inline(sql, bind)
|
|||
{
|
||||
if (!m1)
|
||||
return m;
|
||||
let v = bind[i++];
|
||||
if (v == null)
|
||||
return 'null';
|
||||
else if (typeof v == 'object')
|
||||
v = JSON.stringify(v);
|
||||
else if (typeof v == 'number')
|
||||
return v;
|
||||
return '\''+pg_escape.string(v)+'\'';
|
||||
return quote(bind[i++]);
|
||||
}
|
||||
);
|
||||
return sql;
|
||||
}
|
||||
|
||||
// dbh: Connection
|
||||
// dbh: Connection or pg.Client
|
||||
async function select(dbh, tables, fields, where, options, format)
|
||||
{
|
||||
let sql_text;
|
||||
let sql_text, calc_found_rows, found_rows;
|
||||
if (arguments.length == 2 || arguments.length == 3)
|
||||
{
|
||||
sql_text = tables instanceof Text ? tables : new Text(tables, []);
|
||||
|
@ -377,9 +390,25 @@ async function select(dbh, tables, fields, where, options, format)
|
|||
}
|
||||
else
|
||||
{
|
||||
if (options && options.calc_found_rows)
|
||||
{
|
||||
calc_found_rows = true;
|
||||
fields = fields instanceof Array ? [ ...fields ] : [ fields ];
|
||||
fields.push('COUNT(*) OVER () \"*\"');
|
||||
}
|
||||
sql_text = select_builder(tables, fields, where, options);
|
||||
}
|
||||
let data = await dbh.query(sql_text.sql, sql_text.bind);
|
||||
if (calc_found_rows)
|
||||
{
|
||||
if (!data.rows.length)
|
||||
found_rows = 0;
|
||||
else
|
||||
{
|
||||
found_rows = data.rows[0]['*'];
|
||||
data.rows.forEach(r => delete r['*']);
|
||||
}
|
||||
}
|
||||
if ((format & MS_LIST) || (format & MS_COL))
|
||||
data = data.rows.map(r => Object.values(r));
|
||||
else
|
||||
|
@ -388,7 +417,7 @@ async function select(dbh, tables, fields, where, options, format)
|
|||
data = data[0];
|
||||
if (data && (format & MS_COL))
|
||||
data = data[0];
|
||||
return data;
|
||||
return calc_found_rows ? [ found_rows, data ] : data;
|
||||
}
|
||||
|
||||
async function insert(dbh, table, rows, options)
|
||||
|
@ -401,10 +430,6 @@ async function insert(dbh, table, rows, options)
|
|||
{
|
||||
return null;
|
||||
}
|
||||
if (!pg_escape)
|
||||
{
|
||||
pg_escape = require('pg-escape');
|
||||
}
|
||||
const keys = Object.keys(rows[0]);
|
||||
let sql = 'insert into '+table+' ('+keys.join(', ')+') values ';
|
||||
let i = 0;
|
||||
|
@ -418,10 +443,8 @@ async function insert(dbh, table, rows, options)
|
|||
sql += ', ';
|
||||
if (row[k] == null)
|
||||
sql += 'default';
|
||||
else if (typeof row[k] == 'object')
|
||||
sql += '\''+pg_escape(JSON.stringify(row[k]))+'\'';
|
||||
else
|
||||
sql += '\''+pg_escape(''+row[k])+'\'';
|
||||
sql += quote(row[k]);
|
||||
j++;
|
||||
}
|
||||
sql += ')';
|
||||
|
@ -560,6 +583,13 @@ class ConnectionBase
|
|||
}
|
||||
}
|
||||
|
||||
ConnectionBase.prototype.HASH = MS_HASH;
|
||||
ConnectionBase.prototype.LIST = MS_LIST;
|
||||
ConnectionBase.prototype.ROW = MS_ROW;
|
||||
ConnectionBase.prototype.COL = MS_COL;
|
||||
ConnectionBase.prototype.VALUE = MS_VALUE;
|
||||
ConnectionBase.prototype.select_builder = select_builder;
|
||||
|
||||
// Обёртка PostgreSQL-подключения
|
||||
// Автоматически переподключается при отвале
|
||||
class Connection extends ConnectionBase
|
||||
|
@ -571,15 +601,20 @@ class Connection extends ConnectionBase
|
|||
this.init_connect = [];
|
||||
this.in_transaction = null;
|
||||
this.transaction_queue = [];
|
||||
this.onerror = (e) =>
|
||||
this.quote = quote;
|
||||
}
|
||||
|
||||
handleError(dbh, e)
|
||||
{
|
||||
// Проверяем, что в нас не кидается ошибкой старое подключение к БД
|
||||
if (dbh == this.dbh)
|
||||
{
|
||||
console.warn(e);
|
||||
console.warn('Database connection dropped. Reconnecting');
|
||||
this.dbh = null;
|
||||
this.connection_lost = true;
|
||||
this.connect();
|
||||
};
|
||||
this.quote = quote;
|
||||
}
|
||||
}
|
||||
|
||||
getConnection()
|
||||
|
@ -613,20 +648,25 @@ class Connection extends ConnectionBase
|
|||
{
|
||||
try
|
||||
{
|
||||
this.dbh = new pg.Client(this.config);
|
||||
let dbh = this.dbh = new pg.Client(this.config);
|
||||
await this.dbh.connect();
|
||||
for (const cb of this.init_connect)
|
||||
{
|
||||
await cb(this);
|
||||
}
|
||||
this.dbh.on('error', this.onerror);
|
||||
dbh.on('error', e => this.handleError(dbh, e));
|
||||
return this.dbh;
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
this.dbh = null;
|
||||
console.warn(e);
|
||||
console.warn('Trying to connect again in '+retry+' seconds');
|
||||
await new Promise((r, j) => setTimeout(r, retry*1000));
|
||||
if (this.dbh)
|
||||
{
|
||||
return this.dbh;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -660,6 +700,21 @@ class Connection extends ConnectionBase
|
|||
return r;
|
||||
}
|
||||
|
||||
async end()
|
||||
{
|
||||
if (!this.dbh)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (this.in_transaction)
|
||||
{
|
||||
// Если уже кто-то активен - ждём его
|
||||
await new Promise((resolve, reject) => this.transaction_queue.push(resolve));
|
||||
}
|
||||
await this.dbh.end();
|
||||
this.dbh = null;
|
||||
}
|
||||
|
||||
_next_txn()
|
||||
{
|
||||
this.in_transaction = null;
|
||||
|
@ -678,15 +733,16 @@ class Connection extends ConnectionBase
|
|||
throw new Error('Connection lost while in transaction');
|
||||
}
|
||||
this.connection_lost = false;
|
||||
sql = (bind && bind.length ? _inline(sql, bind) : sql);
|
||||
if (this.config.log_queries)
|
||||
console.log('> '+(bind && bind.length ? _inline(sql, bind) : sql));
|
||||
console.log('> '+sql);
|
||||
try
|
||||
{
|
||||
if (!this.in_transaction)
|
||||
{
|
||||
this.in_transaction = true;
|
||||
}
|
||||
const r = await this.dbh.query(bind && bind.length ? _positional(sql) : sql, bind);
|
||||
const r = await this.dbh.query(sql);
|
||||
if (this.in_transaction === true)
|
||||
{
|
||||
this.in_transaction = false;
|
||||
|
@ -700,6 +756,9 @@ class Connection extends ConnectionBase
|
|||
{
|
||||
this.in_transaction = false;
|
||||
}
|
||||
// не падать, если в процессе выполнения запроса отвалилось подключение
|
||||
if (this.dbh)
|
||||
{
|
||||
if (this.in_transaction)
|
||||
{
|
||||
await this.in_transaction.query('rollback');
|
||||
|
@ -710,11 +769,14 @@ class Connection extends ConnectionBase
|
|||
console.log('> rollback');
|
||||
await this.dbh.query('rollback');
|
||||
}
|
||||
}
|
||||
e.message = 'Error running query: '+sql+'\n'+e.message;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Интересная обёртка для транзакций - позволяет использовать транзакции в асинхронном коде в одном подключении БД
|
||||
class Transaction extends ConnectionBase
|
||||
{
|
||||
constructor(dbh)
|
||||
|
@ -723,6 +785,16 @@ class Transaction extends ConnectionBase
|
|||
this.dbh = dbh;
|
||||
}
|
||||
|
||||
async commit()
|
||||
{
|
||||
await this.query('commit');
|
||||
}
|
||||
|
||||
async rollback()
|
||||
{
|
||||
await this.query('rollback');
|
||||
}
|
||||
|
||||
async query(sql, bind)
|
||||
{
|
||||
// Здесь уже ждать никого не надо, т.к. если мы сюда попали - то уже дождались своей очереди априори
|
||||
|
@ -741,7 +813,6 @@ module.exports = {
|
|||
where_builder,
|
||||
quote,
|
||||
quote_into: _inline,
|
||||
quote_positional: _positional,
|
||||
select,
|
||||
insert,
|
||||
delete: _delete,
|
||||
|
|
Loading…
Reference in New Issue