// Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8 // (c) Виталий Филиппов, 2019-2022 // Версия 2022-01-19 // В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи, // благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ] // Здесь так нельзя, поэтому этот синтаксис мы заменяем на { 't1.a=t2.a': [], 't2.b': [ 1, 2, 3 ] } // Или на [ 't1.a=t2.a', [ 't2.b', [ 1, 2, 3 ] ] ] - можно писать и так, и так const pg = require('pg'); const pg_escape = require('pg-escape'); // Сраный node-postgres конвертирует даты в Date и портит таймзону const DATATYPE_DATE = 1082; const DATATYPE_TIME = 1083; const DATATYPE_TIMESTAMP = 1114; const DATATYPE_TIMESTAMPTZ = 1184; const DATATYPE_TIMETZ = 1266; pg.types.setTypeParser(DATATYPE_DATE, pgToString); pg.types.setTypeParser(DATATYPE_TIME, pgToString); pg.types.setTypeParser(DATATYPE_TIMESTAMP, pgToString); pg.types.setTypeParser(DATATYPE_TIMESTAMPTZ, pgToString); pg.types.setTypeParser(DATATYPE_TIMETZ, pgToString); function pgToString(val) { return val === null ? null : val; } const MS_HASH = 0; const MS_LIST = 1; const MS_ROW = 2; const MS_COL = 4; const MS_VALUE = 6; function select_builder(tables, fields, where, options) { options = options||{}; let sql = 'SELECT ', bind = []; if (options['DISTINCT ON'] || options.distinct_on) { let group = options['DISTINCT ON'] || options.distinct_on; group = group instanceof Array ? group : [ group ]; sql += 'DISTINCT ON ('+group.join(', ')+') '; } if (fields instanceof Array) { sql += fields.join(', '); } else if (typeof fields == 'string') { sql += fields; } else if (typeof fields == 'object') { sql += Object.keys(fields).map(k => fields[k]+' AS '+k).join(', '); } else { throw new Error('fields = '+fields+' is invalid'); } sql += ' FROM '; const t = tables_builder(tables); sql += t.sql; t.bind.forEach(v => bind.push(v)); where = where_builder(where); sql += ' WHERE '+(where.sql || '1=1'); where.bind.forEach(v => bind.push(v)); if (t.moreWhere) { sql += ' AND '+t.moreWhere.sql; t.moreWhere.bind.forEach(v => bind.push(v)); } if (options['GROUP BY'] || options.group_by) { let group = options['GROUP BY'] || options.group_by; group = group instanceof Array ? group : [ group ]; sql += ' GROUP BY '+group.join(', '); } if (options['ORDER BY'] || options.order_by) { let order = options['ORDER BY'] || options.order_by; order = order instanceof Array ? order : [ order ]; sql += ' ORDER BY '+order.join(', '); } if (options.LIMIT || options.limit) { sql += ' LIMIT '+((options.LIMIT || options.limit) | 0); } if (options.OFFSET || options.offset) { sql += ' OFFSET '+((options.OFFSET || options.offset) | 0); } if (options['FOR UPDATE'] || options.for_update) { sql += ' FOR UPDATE'; } else if (options['LOCK IN SHARE MODE'] || options.lock_share) { sql += ' LOCK IN SHARE MODE'; } return new Text(sql, bind); } function tables_builder(tables) { let sql = '', bind = []; let moreWhere = null; let first = true; if (typeof tables == 'string') { sql = tables; return { sql, bind, moreWhere }; } else if (tables instanceof Text) { return { sql: tables.sql, bind: tables.bind, moreWhere }; } for (const k in tables) { let jointype = 'INNER', table = tables[k], conds = null; if (table instanceof Array) { [ jointype, table, conds ] = table; } if (!first) { sql += ' ' + jointype.toUpperCase() + ' JOIN '; } let more_on; if (table instanceof Pg_Values) { sql += '(VALUES '; let i = 0; for (const row of table.rows) { sql += (i > 0 ? ', (' : '(') + table.keys.map(() => '?').join(', ')+')'; table.keys.forEach(k => bind.push(row[k])); i++; } sql += ') AS '+k+'('+table.keys.join(', ')+')'; } else if (table instanceof Text) { sql += '(' + table.sql + ') ' + k; table.bind.forEach(v => bind.push(v)); } else if (typeof table == 'object') { // Nested join, `k` alias is ignored let subjoin = tables_builder(table); if (subjoin.moreWhere) { more_on = subjoin.moreWhere; } if (Object.keys(table).length > 1) { sql += "("+subjoin.sql+")"; } else { sql += subjoin.sql; } subjoin.bind.forEach(v => bind.push(v)); } else { sql += table + ' ' + k; } conds = where_builder(conds); if (more_on) { if (!conds.sql) conds = more_on; else { conds.sql += ' AND ' + more_on.sql; more_on.bind.forEach(v => conds.bind.push(v)); } } if (!first) { sql += ' ON ' + (conds.sql || '1=1'); conds.bind.forEach(v => bind.push(v)); } else { // Бывает удобно указывать WHERE как условие "JOIN" первой таблицы moreWhere = conds.sql ? conds : null; first = false; } } return { sql, bind, moreWhere }; } function where_in(key, values, result, bind, for_where) { if (values[0] instanceof Array) { // only for WHERE: [ '(a, b)', [ [ 1, 2 ], [ 3, 4 ], ... ] ] if (!for_where) { throw new Error('IN syntax can only be used inside WHERE'); } result.push(key + ' in (' + values.map(vi => '('+vi.map(() => '?').join(', ')+')') + ')'); values.forEach(vi => vi.forEach(v => bind.push(v))); } else if (!for_where) { if (values.length > 1) { throw new Error('IN syntax can only be used inside WHERE'); } if (values[0] instanceof Text) { push_text(result, bind, new Text(key+' = '+values[0].sql, values[0].bind)); } else { result.push(key+' = ?'); bind.push(values[0]); } } else { // [ field, [ values ] ] let non_null = values.filter(vi => vi != null); let has_null = values.length > non_null.length; if (non_null.length > 0) { result.push( key+' in (' + non_null.map(() => '?').join(', ') + ')' + (has_null ? ' or '+key+' is null' : '') ); non_null.forEach(v => bind.push(v)); } else if (has_null) { result.push(key+' is null'); } } } function push_text(where, bind, v) { if (v instanceof Text) { where.push(v.sql); for (let i = 0; i < v.bind.length; i++) bind.push(v.bind[i]); } else where.push(v); } // fields: one of: // - string: 'a=b AND c=d' // - array: [ 'a=b', [ 'a=? or b=?', 1, 2 ], [ 'a', [ 1, 2 ] ] ] // - object: { a: 1, b: [ 1, 2 ], 'a = b': [], '(a, b)': [ [ 1, 2 ], [ 2, 3 ] ], 'c=? or d=?': [ 2, 3 ] } // - key does not contain '?', value is a scalar or non-empty array => (key IN ...) // - key does not contain '?', value is an empty array => just (key) // - key contains '?', value is a scalar or non-empty array => (key) with bind params (...value) // - key is numeric, then value is treated as in array function where_or_set(fields, for_where) { if (typeof fields == 'string') { return { sql: fields, bind: [] }; } const where = [], bind = []; for (let k in fields) { let v = fields[k]; if (/^\d+$/.exec(k)) { if (!(v instanceof Array)) { push_text(where, bind, v); continue; } else if (v.length == 0 || typeof v[0] !== 'string') { // invalid value continue; } else if (v.length == 1) { // [ text ] or just text push_text(where, bind, v[0]); } else if (v[0].indexOf('?') >= 0) { // [ text, bind1, bind2, ... ] // FIXME: check bind variable count where.push(v[0]); for (let i = 1; i < v.length; i++) bind.push(v[i]); } else { // [ field, [ ...values ] ] if (v.length > 2) { throw new Error('Invalid condition: '+JSON.stringify(v)); } v[1] = v[1] instanceof Array ? v[1] : [ v[1] ]; where_in(v[0], v[1], where, bind, for_where); } } else { if (!(v instanceof Array)) { v = [ v ]; } if (k.indexOf('?') >= 0 || v.length == 0) { // { expr: [ bind ] } // FIXME: check bind variable count where.push(k); for (let i = 0; i < v.length; i++) bind.push(v[i]); } else { where_in(k, v, where, bind, for_where); } } } if (!for_where) { // SET return { sql: where.join(', '), bind }; } // WHERE return { sql: where.length ? '('+where.join(') and (')+')' : '', bind }; } function where_builder(where) { return where_or_set(where, true); } /** * Разбивает набор таблиц на основную обновляемую + набор дополнительных * * Идея в том, чтобы обрабатывать хотя бы 2 простые ситуации: * UPDATE table1 INNER JOIN table2 ... * UPDATE table1 LEFT JOIN table2 ... */ function split_using(tables) { if (typeof tables == 'string') { return { what: { sql: tables, bind: [] }, using: null, moreWhere: null }; } let first = null; let is_next_inner = true; let i = 0; for (let k in tables) { let t = tables[k]; if (i == 0) { if (t instanceof Array && typeof(t[1]) != 'string') { throw new Error('Can only update/delete from real tables, not sub-select, sub-join or VALUES'); } first = k; } else if (i == 1) { is_next_inner = !(t instanceof Array) || t[0].toLowerCase() == 'inner'; } else { break; } i++; } let what, moreWhere; if (is_next_inner) { what = tables_builder({ [first]: tables[first] }); delete tables[first]; moreWhere = what.moreWhere; what.moreWhere = null; } else { what = tables_builder({ ["_"+first]: tables[first] }); const cond = '_'+first+'.ctid='+(/^\d+$/.exec(first) ? tables[first] : first)+'.ctid'; moreWhere = what.moreWhere ? { sql: what.moreWhere.sql+' AND '+cond, bind: what.moreWhere.bind } : { sql: cond, bind: [] }; what.moreWhere = null; } return { what, using: Object.keys(tables).length > 0 ? tables : null, moreWhere }; } function quote(v) { if (v == null) return 'null'; else if (v === true || v === false) return ''+v; else if (typeof v == 'object') v = JSON.stringify(v); else if (typeof v == 'number') return v; return pg_escape.literal(v); } // Встроить все bind пераметры, выраженные как ?, в строку. // Почему? А потому, что в node-postgres, похоже, есть лимит числа bind переменных. После ~30000 он сваливается. // FIXME: В postgresql есть оператор ?. Его надо научиться экранировать. Либо сразу строить запрос в строчку // и не заниматься ерундой с подстановками. function _inline(sql, bind) { let i = 0; sql = sql.replace( /'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g, (m, m1) => { if (!m1) return m; return quote(bind[i++]); } ); return sql; } // dbh: Connection or pg.Client async function select(dbh, tables, fields, where, options, format) { let sql_text, calc_found_rows, found_rows; if (arguments.length == 2 || arguments.length == 3) { sql_text = tables instanceof Text ? tables : new Text(tables, []); format = fields; } else { if (options && options.calc_found_rows) { calc_found_rows = true; fields = fields instanceof Array ? [ ...fields ] : [ fields ]; fields.push('COUNT(*) OVER () \"*\"'); } sql_text = select_builder(tables, fields, where, options); } let data = await dbh.query(sql_text.sql, sql_text.bind); if (calc_found_rows) { if (!data.rows.length) found_rows = 0; else { found_rows = data.rows[0]['*']; data.rows.forEach(r => delete r['*']); } } if (format & MS_LIST) data = data.rows.map(r => Object.values(r)); else if (format & MS_COL) data = data.rows.map(r => Object.values(r)[0]); else data = data.rows; if (format & MS_ROW) data = data[0]; return calc_found_rows ? [ found_rows, data ] : data; } async function insert(dbh, table, rows, options) { if (!(rows instanceof Array)) { rows = [ rows ]; } if (!rows.length) { return null; } const keys = Object.keys(rows[0]); let sql = 'insert into '+table+' ('+keys.join(', ')+') values '; let i = 0; for (const row of rows) { let j = 0; sql += (i > 0 ? ', (' : '('); for (let k of keys) { if (j > 0) sql += ', '; if (row[k] == null) sql += 'default'; else if (row[k] instanceof Text) sql += row[k].toString(); else sql += quote(row[k]); j++; } sql += ')'; i++; } if (options) { if (options.upsert) { sql += ' on conflict '+ (options.upsert instanceof Array ? '('+options.upsert.join(', ')+')' : (typeof options.upsert == 'string' ? options.upsert : '(id)'))+ ' do update set '+ (options.upsert_fields || keys).map(k => `${k} = excluded.${k}`).join(', '); } else if (options.ignore) { sql += ' on conflict '+ (options.ignore instanceof Array ? '('+options.ignore.join(', ')+')' : (typeof options.ignore == 'string' ? options.ignore : '(id)'))+ ' do nothing'; } if (options.returning) { sql += ' returning '+options.returning; return (await dbh.query(sql)).rows; } } return await dbh.query(sql); } async function _delete(dbh, table, where, options) { where = where_builder(where); const split = split_using(table); if (split.using) { split.using = tables_builder(split.using); } let sql = 'delete from '+split.what.sql+ (split.using ? ' using '+split.using.sql : '')+ ' where '+(where.sql || '1=1')+(split.moreWhere ? ' and '+split.moreWhere.sql : ''); let bind = [ ...split.what.bind, ...where.bind, ...(split.moreWhere ? split.moreWhere.bind : []) ]; if (options && options.returning) { sql += ' returning '+options.returning; return (await dbh.query(sql, bind)).rows; } return await dbh.query(sql, bind); } async function update(dbh, table, set, where, options) { set = where_or_set(set, false); where = where_builder(where); const split = split_using(table); if (split.using) { split.using = tables_builder(split.using); } let sql = 'update '+split.what.sql+' set '+set.sql+ (split.using ? ' from '+split.using.sql : '')+ ' where '+(where.sql || '1=1')+(split.moreWhere ? ' and '+split.moreWhere.sql : ''); let bind = [ ...split.what.bind, ...set.bind, ...(split.using ? split.using.bind : []), ...where.bind, ...(split.moreWhere ? split.moreWhere.bind : []) ]; if (options && options.returning) { sql += ' returning '+options.returning; return (await dbh.query(sql, bind)).rows; } return await dbh.query(sql, bind); } function values(rows) { return new Pg_Values(Object.keys(rows[0]), rows); } class Pg_Values { constructor(keys, rows) { this.keys = keys; this.rows = rows; } } class Text { constructor(sql, bind) { this.sql = sql; this.bind = bind || []; } toString() { return _inline(this.sql, this.bind); } concat(text) { return new Text(this.sql+text.sql, [ ...this.bind, ...text.bind ]); } } class ConnectionBase { async select(tables, fields, where, options, format) { return arguments.length <= 2 ? await select(this, tables, fields) : await select(this, tables, fields, where, options, format); } async insert(table, rows, options) { return await insert(this, table, rows, options); } async update(table, set, where, options) { return await update(this, table, set, where, options); } async delete(table, where, options) { return await _delete(this, table, where, options); } } ConnectionBase.prototype.HASH = MS_HASH; ConnectionBase.prototype.LIST = MS_LIST; ConnectionBase.prototype.ROW = MS_ROW; ConnectionBase.prototype.COL = MS_COL; ConnectionBase.prototype.VALUE = MS_VALUE; ConnectionBase.prototype.select_builder = select_builder; // Обёртка PostgreSQL-подключения // Автоматически переподключается при отвале class Connection extends ConnectionBase { constructor(config) { super(); this.config = config; this.init_connect = []; this.in_transaction = null; this.transaction_queue = []; this.quote = quote; } handleError(dbh, e) { // Проверяем, что в нас не кидается ошибкой старое подключение к БД if (dbh == this.dbh) { console.warn(e); console.warn('Database connection dropped. Reconnecting'); this.dbh = null; this.connection_lost = true; this.connect(); } } getConnection() { return this.dbh; } async runWhenConnected(cb) { if (this.dbh) { await cb(this); } this.init_connect.push(cb); } dontRunWhenConnected(cb) { this.init_connect = this.init_connect.filter(c => c != cb); } async connect() { if (this.dbh) { return this.dbh; } let retry = this.config.retry || 30; // eslint-disable-next-line no-constant-condition while (true) { try { let dbh = this.dbh = new pg.Client(this.config); await this.dbh.connect(); for (const cb of this.init_connect) { await cb(this); } dbh.on('error', e => this.handleError(dbh, e)); return this.dbh; } catch (e) { this.dbh = null; console.warn(e); console.warn('Trying to connect again in '+retry+' seconds'); await new Promise((r, j) => setTimeout(r, retry*1000)); if (this.dbh) { return this.dbh; } } } } async begin() { while (this.in_transaction) { // Если уже кто-то активен - ждём его await new Promise((resolve, reject) => this.transaction_queue.push(resolve)); } this.in_transaction = new Transaction(this); await this._query('begin'); return this.in_transaction; } async txn(cb) { let r; const txn = await this.begin(); try { r = await cb(txn); await txn.commit(); } catch (e) { await txn.rollback(); throw e; } return r; } async query(sql, bind) { if (sql.length == 5 && sql.toLowerCase() == 'begin') { throw new Error('Do not use transactions in asynchronous code directly!'); } while (this.in_transaction) { // Если уже кто-то активен - ждём его await new Promise((resolve, reject) => this.transaction_queue.push(resolve)); } if (!this.dbh) await this.connect(); this.in_transaction = true; const r = await this._query(sql, bind); this.in_transaction = null; // Если есть ещё кто-то в очереди - пусть проходит this._next_txn(); return r; } async end() { if (!this.dbh) { return; } while (this.in_transaction) { // Если уже кто-то активен - ждём его await new Promise((resolve, reject) => this.transaction_queue.push(resolve)); } await this.dbh.end(); this.dbh = null; } _next_txn() { this.in_transaction = null; const next = this.transaction_queue.shift(); if (next) setImmediate(next); } async _query(sql, bind) { if (this.in_transaction && this.in_transaction !== true && this.connection_lost) { this.connection_lost = false; this._next_txn(); throw new Error('Connection lost while in transaction'); } this.connection_lost = false; sql = (bind && bind.length ? _inline(sql, bind) : sql); let start_time; try { if (this.config.log_queries) { start_time = Date.now(); } const r = await this.dbh.query(sql); if (this.config.log_queries) { const tm = (Date.now()-start_time)/1000; if (!this.config.slow_query_time || tm > this.config.slow_query_time) { console.log('> pid='+process.pid+' '+tm.toFixed(3)+' '+sql); } } return r; } catch (e) { // в postgresql надо откатывать всю транзакцию при любой ошибке // не падать, если в процессе выполнения запроса отвалилось подключение if (this.dbh) { if (this.in_transaction) { if (this.in_transaction === true) this._next_txn(); else await this.in_transaction.query('rollback'); } else { if (this.config.log_queries) console.log('> rollback'); await this.dbh.query('rollback'); } } e.message = 'Error running query: '+sql+'\n'+e.message; throw e; } } } // Интересная обёртка для транзакций - позволяет использовать транзакции в асинхронном коде в одном подключении БД class Transaction extends ConnectionBase { constructor(dbh) { super(); this.dbh = dbh; this.nested = 0; this.rolled_back = false; } async begin() { // Вложенная транзакция // SAVEPOINT пока не поддерживаем - просто делаем вид, что началась вложенная транзакция this.nested++; return this; } async txn(cb) { return await cb(this); } async commit() { if (this.nested > 0) { this.nested--; } else { await this.query('commit'); } } async rollback() { if (!this.rolled_back) { await this.query('rollback'); this.rolled_back = true; } else if (this.nested > 0) { this.nested--; } } async query(sql, bind) { if (this.rolled_back) { return null; } // Здесь уже ждать никого не надо, т.к. если мы сюда попали - то уже дождались своей очереди априори const r = await this.dbh._query(sql, bind); if (sql.length == 6 && sql.toLowerCase() == 'commit' || sql.length == 8 && sql.toLowerCase() == 'rollback') { this.dbh._next_txn(); } return r; } } module.exports = { select_builder, where_builder, where_or_set, quote, quote_into: _inline, select, insert, delete: _delete, update, values, Text, Connection, MS_HASH, MS_LIST, MS_ROW, MS_COL, MS_VALUE, };