From 33f9eb64f897cca8187e6bde6a73d51a8502289b Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 13 Dec 2017 17:01:25 +0300 Subject: [PATCH] Postgresql: Fix reconnect & transactions, add \n in long queries, add support for VALUES (), fix additional where conditions in DELETE .. USING --- DatabasePdoPgsql.php | 183 ++++++++++++++++++++++++++++--------------- 1 file changed, 120 insertions(+), 63 deletions(-) diff --git a/DatabasePdoPgsql.php b/DatabasePdoPgsql.php index 7a138f9..6d2a658 100644 --- a/DatabasePdoPgsql.php +++ b/DatabasePdoPgsql.php @@ -3,8 +3,8 @@ /** * PDO/PostgreSQL wrapper with (mostly) DatabaseMySQL interface :) * Select builder is inspired by MediaWiki's one. - * Version: 2016-09-02 - * (c) Vitaliy Filippov, 2015-2016 + * Version: 2017-12-13 + * (c) Vitaliy Filippov, 2015-2017 */ if (!defined('MS_HASH')) @@ -38,7 +38,7 @@ if (!interface_exists('Database')) class DatabasePdoPgsql implements Database { - var $host, $port, $socket, $username, $password, $dbname; + var $host, $port, $socket, $username, $password, $dbname, $pgbouncer; var $tableNames = array(); var $init = array(); @@ -84,6 +84,7 @@ class DatabasePdoPgsql implements Database 'queryLogFile' => '', 'autoBegin' => false, 'ondestroy' => 'commit', + 'pgbouncer' => false, 'init' => array(), ); $options += $defOpts; @@ -116,7 +117,7 @@ class DatabasePdoPgsql implements Database $this->link = new PDO($str, $this->username, $this->password, array( PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC, - PDO::ATTR_EMULATE_PREPARES => false, + PDO::ATTR_EMULATE_PREPARES => !empty($this->pgbouncer), )); foreach ($this->init as $q) $this->link->query($q); @@ -167,19 +168,28 @@ class DatabasePdoPgsql implements Database $this->begin(); $this->queryCount++; if ($this->queryLogFile) - file_put_contents($this->queryLogFile, date("[Y-m-d H:i:s] ").$sql."\n", FILE_APPEND); - if (strlen($sql) == 5 && strtoupper($sql) == 'BEGIN') - return $this->link->beginTransaction(); - elseif (strlen($sql) == 8 && strtoupper($sql) == 'ROLLBACK') - return $this->link->rollBack(); - elseif (strlen($sql) == 6 && strtoupper($sql) == 'COMMIT') - return $this->link->commit(); - return $this->link->query($sql); + { + if (strlen($sql) <= 100) + $sql = preg_replace("/\n\s*/", ' ', $sql); + $t = substr(floatval(microtime()), 1, 7); + file_put_contents($this->queryLogFile, date("[Y-m-d H:i:s$t] ").$sql."\n", FILE_APPEND); + } + try + { + $r = $this->link->query($sql); + } + catch (Exception $e) + { + if ($this->queryLogFile) + file_put_contents($this->queryLogFile, date("[Y-m-d H:i:s] ")."Error: $e\n---- Query: $sql\n", FILE_APPEND); + throw $e; + } + return $r; } /** * Starts a transaction, supports nested calls and savepoints. - * @param boolean $savepoint Creates savepoints only this parameter is true. + * @param boolean $savepoint Creates savepoints only if this parameter is true. */ function begin($savepoint = false) { @@ -199,11 +209,11 @@ class DatabasePdoPgsql implements Database function commit() { $r = true; - if (count($this->transactions) == 1) - $r = $this->query("COMMIT"); - elseif ($this->transactions) - $r = $this->query("RELEASE SAVEPOINT sp".count($this->transactions)); $savepoint = array_pop($this->transactions); + if (count($this->transactions) == 0) + $r = $this->query("COMMIT"); + elseif ($savepoint) + $r = $this->query("RELEASE SAVEPOINT sp".count($this->transactions)); return $r; } @@ -241,12 +251,12 @@ class DatabasePdoPgsql implements Database */ function rollback() { - $r = false; + $r = true; + $savepoint = array_pop($this->transactions); if (count($this->transactions) == 1) $r = $this->query("ROLLBACK"); - elseif ($this->transactions) + elseif ($savepoint) $r = $this->query("ROLLBACK TO SAVEPOINT sp".count($this->transactions)); - $savepoint = array_pop($this->transactions); return $r; } @@ -312,7 +322,7 @@ class DatabasePdoPgsql implements Database $wh[] = "$k IS NULL"; } if ($where) - $where = '(' . join(') AND (', $wh) . ')'; + $where = '(' . join(") AND (", $wh) . ')'; else $where = ''; return $where; @@ -350,7 +360,7 @@ class DatabasePdoPgsql implements Database foreach ($fields as $k => $v) if (!ctype_digit("$k")) $fields[$k] = "$v AS ".$this->quoteId($k); - $fields = join(',', $fields); + $fields = join(",\n ", str_replace("\n", "\n ", $fields)); } $more = NULL; $tables = $this->tables_builder($tables, $more); @@ -359,14 +369,14 @@ class DatabasePdoPgsql implements Database $where = $this->where_builder($where); $this->calcFoundRows = isset($options['CALC_FOUND_ROWS']) || isset($options['SQL_CALC_FOUND_ROWS']); if ($this->calcFoundRows) - $fields .= ', COUNT(*) OVER () "*"'; - $sql = "SELECT $fields FROM $tables"; + $fields .= ",\n COUNT(*) OVER () \"*\""; + $sql = "SELECT $fields\nFROM $tables"; if ($where) - $sql .= " WHERE $where"; + $sql .= "\nWHERE $where"; if (!empty($options['GROUP BY']) && $options['GROUP BY'] !== '0') - $sql .= " GROUP BY ".$this->order_option($options['GROUP BY']); + $sql .= "\nGROUP BY ".$this->order_option($options['GROUP BY']); if (!empty($options['ORDER BY']) && $options['ORDER BY'] !== '0') - $sql .= " ORDER BY ".$this->order_option($options['ORDER BY']); + $sql .= "\nORDER BY ".$this->order_option($options['ORDER BY']); $sql .= $this->limit_option($options); if (isset($options['FOR UPDATE'])) $sql .= ' FOR UPDATE'; @@ -425,6 +435,7 @@ class DatabasePdoPgsql implements Database * 'alias' => 'table', * 'alias' => array('INNER', 'table_name', $where_for_on_clause), * 'alias(ignored)' => array('INNER', $nested_tables, $on_for_join_group), + * 'alias' => array('INNER', $this->values_table($rows), $where_for_on_clause), * ) * or just a string "table1 INNER JOIN \"table2\" ON ..." * escaped names ("table2") will be transformed using $this->tableNames @@ -447,7 +458,12 @@ class DatabasePdoPgsql implements Database $join = 'FULL'; else /* if (!$join || $join == 'inne' || $join == 'join') */ $join = 'INNER'; - if (is_array($v[1])) // nested join (left join (A join B on ...) on ...) + if ($v[1] instanceof DatabasePdoPgsql_Values) + { + $v[1]->alias = $k; + $table = ''.$v[1]; + } + elseif (is_array($v[1])) // nested join (left join (A join B on ...) on ...) { $more = NULL; $table = $this->tables_builder($v[1], $more); @@ -463,7 +479,7 @@ class DatabasePdoPgsql implements Database $table .= ' ' . $k; } if ($t) - $t .= " $join JOIN $table ON ".($this->where_builder($v[2]) ?: '1=1'); + $t .= "\n$join JOIN $table ON ".($this->where_builder($v[2]) ?: '1=1'); else { $t = $table; @@ -553,13 +569,17 @@ class DatabasePdoPgsql implements Database function delete($tables, $where, $options = NULL) { list($what, $using) = $this->split_using($tables, $where); + $more = NULL; + $sql = "DELETE FROM $what".($using ? " USING ".$this->tables_builder($using, $more) : ''); + if ($more) + $where = array_merge($where, (array)$more); $where = $this->where_builder($where) ?: '1=1'; - $sql = "DELETE FROM $what".($using ? " USING ".$this->tables_builder($using) : '')." WHERE $where"; + $sql .= " WHERE $where"; $sql .= $this->limit_option($options); return $this->query($sql); } - function values($rows) + protected function values($rows, $forInsert) { $key = reset($rows); if (!is_array($key)) @@ -572,15 +592,25 @@ class DatabasePdoPgsql implements Database { $rs = array(); foreach ($key as &$k) - $rs[] = $this->quote(isset($r[$k]) ? $r[$k] : NULL); + { + $rs[] = isset($r[$k]) ? $this->quote($r[$k]) : ($forInsert ? 'DEFAULT' : 'NULL'); + } $r = implode(",", $rs); } foreach ($key as &$k) + { if (strpos($k, '"') === false) $k = $this->quoteId($k); + } return array($key, $rows); } + function values_table($rows, $alias = '') + { + list($keys, $values) = $this->values($rows, false); + return new DatabasePdoPgsql_Values($keys, $values, $alias); + } + /** * Builds an INSERT query. * @@ -590,6 +620,8 @@ class DatabasePdoPgsql implements Database * @param array|string $uniqueColumns unique columns for conflict * (defaults to {$table}_pkey for "insert or update" queries) * @param array|NULL $updateCols Columns to update in case of a conflict + * may be array('field1', 'field2', ...) or array('field1' => 'expression', ...) + * NEW.any_field will be replaced with EXCLUDED.any_field in these expressions */ function insert_builder($table, $rows, $action = NULL, $uniqueColumns = NULL, $updateCols = NULL) { @@ -597,7 +629,7 @@ class DatabasePdoPgsql implements Database { $table = $this->quoteId($this->tableNames[$table]); } - list($keys, $values) = $this->values($rows); + list($keys, $values) = $this->values($rows, true); $sql = "INSERT INTO $table (".implode(',', $keys).") VALUES (".implode('),(', $values).")"; if ($action) { @@ -615,8 +647,13 @@ class DatabasePdoPgsql implements Database $key = reset($rows); $key = is_array($key) ? array_keys($key) : array_keys($rows); } - foreach ($key as &$k) - $k = "$k = EXCLUDED.$k"; + foreach ($key as $i => &$k) + { + if (is_intval($i)) + $k = "$k = EXCLUDED.$k"; + else + $k = "$i = ".str_replace('NEW.', 'EXCLUDED.', $k); + } $sql .= " DO UPDATE SET ".implode(", ", $key); } else @@ -658,49 +695,50 @@ class DatabasePdoPgsql implements Database return $this->link->lastInsertId(); } + /** + * Разбивает набор таблиц на основную обновляемую + набор дополнительных + * + * Идея в том, чтобы обрабатывать хотя бы 2 простые ситуации: + * UPDATE table1 INNER JOIN table2 ... + * UPDATE table1 LEFT JOIN table2 ... + */ protected function split_using($tables, &$where) { - $first = $has_inner = NULL; $tables = (array)$tables; + $first = NULL; + $isNextInner = true; + $i = 0; foreach ($tables as $k => $t) { - if (!is_array($t)) + if ($i == 0) { - if ($first === NULL) + if (is_array($t) && ($t[1] instanceof DatabasePdoPgsql_Values || is_array($t[1]))) { - $first = $k; - } - elseif (!$has_inner) - { - $tables = array($k => array('INNER', $t)) + $tables; - $has_inner = true; - break; + throw new DatabaseException("Can only update/delete from real tables, not sub-select, sub-join or VALUES"); } + $first = $k; } - elseif (in_array(strtolower($t[0]), array('inner', 'cross', 'join'))) + elseif ($i == 1) { - $tables = array($k => $t) + $tables; - $has_inner = true; - if ($first !== NULL) - { - break; - } + $isNextInner = !is_array($t) || strtolower($t[0]) == 'inner'; } + else + break; + $i++; } - if (count($tables) > 1 && !$has_inner) + $more = NULL; + if ($isNextInner) { - if ($first === NULL) - { - throw new DatabaseException("No update/delete subject found"); - } - $what = $this->tables_builder([ "_$first" => $tables[$first] ]); - $where[] = "_$first.ctid=".(ctype_digit("$first") ? $tables[$first] : $first).".ctid"; + $what = $this->tables_builder([ $first => $tables[$first] ], $more); + unset($tables[$first]); } else { - $what = $this->tables_builder([ $first => $tables[$first] ]); - unset($tables[$first]); + $what = $this->tables_builder([ "_$first" => $tables[$first] ], $more); + $where[] = "_$first.ctid=".(ctype_digit("$first") ? $tables[$first] : $first).".ctid"; } + if ($more) + $where = array_merge($where, (array)$more); return array($what, $tables); } @@ -725,9 +763,13 @@ class DatabasePdoPgsql implements Database $sql[] = $v; } list($what, $using) = $this->split_using($table, $where); - $where = $this->where_builder($where) ?: '1=1'; + $more = NULL; $sql = 'UPDATE ' . $what . ' SET ' . implode(', ', $sql) . - ($using ? ' FROM '.$this->tables_builder($using) : '') . ' WHERE ' . $where; + ($using ? ' FROM '.$this->tables_builder($using, $more) : ''); + if ($more) + $where = array_merge($where, $more); + $where = $this->where_builder($where) ?: '1=1'; + $sql .= ' WHERE ' . $where; $r = $this->query($sql); if ($r) return $r->rowCount(); @@ -780,3 +822,18 @@ class DatabasePdoPgsql implements Database return $n; } } + +class DatabasePdoPgsql_Values +{ + function __construct($keys, $values, $alias) + { + $this->keys = $keys; + $this->values = $values; + $this->alias = $alias; + } + + function __toString() + { + return "(VALUES (".implode("),(", $this->values).")) AS ".$this->alias." (".implode(',', $this->keys).")"; + } +}