Postgresql: Fix reconnect & transactions, add \n in long queries, add support for VALUES (), fix additional where conditions in DELETE .. USING

master
Vitaliy Filippov 2017-12-13 17:01:25 +03:00
parent 5a45252587
commit 33f9eb64f8
1 changed files with 120 additions and 63 deletions

View File

@ -3,8 +3,8 @@
/** /**
* PDO/PostgreSQL wrapper with (mostly) DatabaseMySQL interface :) * PDO/PostgreSQL wrapper with (mostly) DatabaseMySQL interface :)
* Select builder is inspired by MediaWiki's one. * Select builder is inspired by MediaWiki's one.
* Version: 2016-09-02 * Version: 2017-12-13
* (c) Vitaliy Filippov, 2015-2016 * (c) Vitaliy Filippov, 2015-2017
*/ */
if (!defined('MS_HASH')) if (!defined('MS_HASH'))
@ -38,7 +38,7 @@ if (!interface_exists('Database'))
class DatabasePdoPgsql implements Database class DatabasePdoPgsql implements Database
{ {
var $host, $port, $socket, $username, $password, $dbname; var $host, $port, $socket, $username, $password, $dbname, $pgbouncer;
var $tableNames = array(); var $tableNames = array();
var $init = array(); var $init = array();
@ -84,6 +84,7 @@ class DatabasePdoPgsql implements Database
'queryLogFile' => '', 'queryLogFile' => '',
'autoBegin' => false, 'autoBegin' => false,
'ondestroy' => 'commit', 'ondestroy' => 'commit',
'pgbouncer' => false,
'init' => array(), 'init' => array(),
); );
$options += $defOpts; $options += $defOpts;
@ -116,7 +117,7 @@ class DatabasePdoPgsql implements Database
$this->link = new PDO($str, $this->username, $this->password, array( $this->link = new PDO($str, $this->username, $this->password, array(
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC, PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_EMULATE_PREPARES => false, PDO::ATTR_EMULATE_PREPARES => !empty($this->pgbouncer),
)); ));
foreach ($this->init as $q) foreach ($this->init as $q)
$this->link->query($q); $this->link->query($q);
@ -167,19 +168,28 @@ class DatabasePdoPgsql implements Database
$this->begin(); $this->begin();
$this->queryCount++; $this->queryCount++;
if ($this->queryLogFile) if ($this->queryLogFile)
file_put_contents($this->queryLogFile, date("[Y-m-d H:i:s] ").$sql."\n", FILE_APPEND); {
if (strlen($sql) == 5 && strtoupper($sql) == 'BEGIN') if (strlen($sql) <= 100)
return $this->link->beginTransaction(); $sql = preg_replace("/\n\s*/", ' ', $sql);
elseif (strlen($sql) == 8 && strtoupper($sql) == 'ROLLBACK') $t = substr(floatval(microtime()), 1, 7);
return $this->link->rollBack(); file_put_contents($this->queryLogFile, date("[Y-m-d H:i:s$t] ").$sql."\n", FILE_APPEND);
elseif (strlen($sql) == 6 && strtoupper($sql) == 'COMMIT') }
return $this->link->commit(); try
return $this->link->query($sql); {
$r = $this->link->query($sql);
}
catch (Exception $e)
{
if ($this->queryLogFile)
file_put_contents($this->queryLogFile, date("[Y-m-d H:i:s] ")."Error: $e\n---- Query: $sql\n", FILE_APPEND);
throw $e;
}
return $r;
} }
/** /**
* Starts a transaction, supports nested calls and savepoints. * Starts a transaction, supports nested calls and savepoints.
* @param boolean $savepoint Creates savepoints only this parameter is true. * @param boolean $savepoint Creates savepoints only if this parameter is true.
*/ */
function begin($savepoint = false) function begin($savepoint = false)
{ {
@ -199,11 +209,11 @@ class DatabasePdoPgsql implements Database
function commit() function commit()
{ {
$r = true; $r = true;
if (count($this->transactions) == 1)
$r = $this->query("COMMIT");
elseif ($this->transactions)
$r = $this->query("RELEASE SAVEPOINT sp".count($this->transactions));
$savepoint = array_pop($this->transactions); $savepoint = array_pop($this->transactions);
if (count($this->transactions) == 0)
$r = $this->query("COMMIT");
elseif ($savepoint)
$r = $this->query("RELEASE SAVEPOINT sp".count($this->transactions));
return $r; return $r;
} }
@ -241,12 +251,12 @@ class DatabasePdoPgsql implements Database
*/ */
function rollback() function rollback()
{ {
$r = false; $r = true;
$savepoint = array_pop($this->transactions);
if (count($this->transactions) == 1) if (count($this->transactions) == 1)
$r = $this->query("ROLLBACK"); $r = $this->query("ROLLBACK");
elseif ($this->transactions) elseif ($savepoint)
$r = $this->query("ROLLBACK TO SAVEPOINT sp".count($this->transactions)); $r = $this->query("ROLLBACK TO SAVEPOINT sp".count($this->transactions));
$savepoint = array_pop($this->transactions);
return $r; return $r;
} }
@ -312,7 +322,7 @@ class DatabasePdoPgsql implements Database
$wh[] = "$k IS NULL"; $wh[] = "$k IS NULL";
} }
if ($where) if ($where)
$where = '(' . join(') AND (', $wh) . ')'; $where = '(' . join(") AND (", $wh) . ')';
else else
$where = ''; $where = '';
return $where; return $where;
@ -350,7 +360,7 @@ class DatabasePdoPgsql implements Database
foreach ($fields as $k => $v) foreach ($fields as $k => $v)
if (!ctype_digit("$k")) if (!ctype_digit("$k"))
$fields[$k] = "$v AS ".$this->quoteId($k); $fields[$k] = "$v AS ".$this->quoteId($k);
$fields = join(',', $fields); $fields = join(",\n ", str_replace("\n", "\n ", $fields));
} }
$more = NULL; $more = NULL;
$tables = $this->tables_builder($tables, $more); $tables = $this->tables_builder($tables, $more);
@ -359,14 +369,14 @@ class DatabasePdoPgsql implements Database
$where = $this->where_builder($where); $where = $this->where_builder($where);
$this->calcFoundRows = isset($options['CALC_FOUND_ROWS']) || isset($options['SQL_CALC_FOUND_ROWS']); $this->calcFoundRows = isset($options['CALC_FOUND_ROWS']) || isset($options['SQL_CALC_FOUND_ROWS']);
if ($this->calcFoundRows) if ($this->calcFoundRows)
$fields .= ', COUNT(*) OVER () "*"'; $fields .= ",\n COUNT(*) OVER () \"*\"";
$sql = "SELECT $fields FROM $tables"; $sql = "SELECT $fields\nFROM $tables";
if ($where) if ($where)
$sql .= " WHERE $where"; $sql .= "\nWHERE $where";
if (!empty($options['GROUP BY']) && $options['GROUP BY'] !== '0') if (!empty($options['GROUP BY']) && $options['GROUP BY'] !== '0')
$sql .= " GROUP BY ".$this->order_option($options['GROUP BY']); $sql .= "\nGROUP BY ".$this->order_option($options['GROUP BY']);
if (!empty($options['ORDER BY']) && $options['ORDER BY'] !== '0') if (!empty($options['ORDER BY']) && $options['ORDER BY'] !== '0')
$sql .= " ORDER BY ".$this->order_option($options['ORDER BY']); $sql .= "\nORDER BY ".$this->order_option($options['ORDER BY']);
$sql .= $this->limit_option($options); $sql .= $this->limit_option($options);
if (isset($options['FOR UPDATE'])) if (isset($options['FOR UPDATE']))
$sql .= ' FOR UPDATE'; $sql .= ' FOR UPDATE';
@ -425,6 +435,7 @@ class DatabasePdoPgsql implements Database
* 'alias' => 'table', * 'alias' => 'table',
* 'alias' => array('INNER', 'table_name', $where_for_on_clause), * 'alias' => array('INNER', 'table_name', $where_for_on_clause),
* 'alias(ignored)' => array('INNER', $nested_tables, $on_for_join_group), * 'alias(ignored)' => array('INNER', $nested_tables, $on_for_join_group),
* 'alias' => array('INNER', $this->values_table($rows), $where_for_on_clause),
* ) * )
* or just a string "table1 INNER JOIN \"table2\" ON ..." * or just a string "table1 INNER JOIN \"table2\" ON ..."
* escaped names ("table2") will be transformed using $this->tableNames * escaped names ("table2") will be transformed using $this->tableNames
@ -447,7 +458,12 @@ class DatabasePdoPgsql implements Database
$join = 'FULL'; $join = 'FULL';
else /* if (!$join || $join == 'inne' || $join == 'join') */ else /* if (!$join || $join == 'inne' || $join == 'join') */
$join = 'INNER'; $join = 'INNER';
if (is_array($v[1])) // nested join (left join (A join B on ...) on ...) if ($v[1] instanceof DatabasePdoPgsql_Values)
{
$v[1]->alias = $k;
$table = ''.$v[1];
}
elseif (is_array($v[1])) // nested join (left join (A join B on ...) on ...)
{ {
$more = NULL; $more = NULL;
$table = $this->tables_builder($v[1], $more); $table = $this->tables_builder($v[1], $more);
@ -463,7 +479,7 @@ class DatabasePdoPgsql implements Database
$table .= ' ' . $k; $table .= ' ' . $k;
} }
if ($t) if ($t)
$t .= " $join JOIN $table ON ".($this->where_builder($v[2]) ?: '1=1'); $t .= "\n$join JOIN $table ON ".($this->where_builder($v[2]) ?: '1=1');
else else
{ {
$t = $table; $t = $table;
@ -553,13 +569,17 @@ class DatabasePdoPgsql implements Database
function delete($tables, $where, $options = NULL) function delete($tables, $where, $options = NULL)
{ {
list($what, $using) = $this->split_using($tables, $where); list($what, $using) = $this->split_using($tables, $where);
$more = NULL;
$sql = "DELETE FROM $what".($using ? " USING ".$this->tables_builder($using, $more) : '');
if ($more)
$where = array_merge($where, (array)$more);
$where = $this->where_builder($where) ?: '1=1'; $where = $this->where_builder($where) ?: '1=1';
$sql = "DELETE FROM $what".($using ? " USING ".$this->tables_builder($using) : '')." WHERE $where"; $sql .= " WHERE $where";
$sql .= $this->limit_option($options); $sql .= $this->limit_option($options);
return $this->query($sql); return $this->query($sql);
} }
function values($rows) protected function values($rows, $forInsert)
{ {
$key = reset($rows); $key = reset($rows);
if (!is_array($key)) if (!is_array($key))
@ -572,15 +592,25 @@ class DatabasePdoPgsql implements Database
{ {
$rs = array(); $rs = array();
foreach ($key as &$k) foreach ($key as &$k)
$rs[] = $this->quote(isset($r[$k]) ? $r[$k] : NULL); {
$rs[] = isset($r[$k]) ? $this->quote($r[$k]) : ($forInsert ? 'DEFAULT' : 'NULL');
}
$r = implode(",", $rs); $r = implode(",", $rs);
} }
foreach ($key as &$k) foreach ($key as &$k)
{
if (strpos($k, '"') === false) if (strpos($k, '"') === false)
$k = $this->quoteId($k); $k = $this->quoteId($k);
}
return array($key, $rows); return array($key, $rows);
} }
function values_table($rows, $alias = '')
{
list($keys, $values) = $this->values($rows, false);
return new DatabasePdoPgsql_Values($keys, $values, $alias);
}
/** /**
* Builds an INSERT query. * Builds an INSERT query.
* *
@ -590,6 +620,8 @@ class DatabasePdoPgsql implements Database
* @param array|string $uniqueColumns unique columns for conflict * @param array|string $uniqueColumns unique columns for conflict
* (defaults to {$table}_pkey for "insert or update" queries) * (defaults to {$table}_pkey for "insert or update" queries)
* @param array|NULL $updateCols Columns to update in case of a conflict * @param array|NULL $updateCols Columns to update in case of a conflict
* may be array('field1', 'field2', ...) or array('field1' => 'expression', ...)
* NEW.any_field will be replaced with EXCLUDED.any_field in these expressions
*/ */
function insert_builder($table, $rows, $action = NULL, $uniqueColumns = NULL, $updateCols = NULL) function insert_builder($table, $rows, $action = NULL, $uniqueColumns = NULL, $updateCols = NULL)
{ {
@ -597,7 +629,7 @@ class DatabasePdoPgsql implements Database
{ {
$table = $this->quoteId($this->tableNames[$table]); $table = $this->quoteId($this->tableNames[$table]);
} }
list($keys, $values) = $this->values($rows); list($keys, $values) = $this->values($rows, true);
$sql = "INSERT INTO $table (".implode(',', $keys).") VALUES (".implode('),(', $values).")"; $sql = "INSERT INTO $table (".implode(',', $keys).") VALUES (".implode('),(', $values).")";
if ($action) if ($action)
{ {
@ -615,8 +647,13 @@ class DatabasePdoPgsql implements Database
$key = reset($rows); $key = reset($rows);
$key = is_array($key) ? array_keys($key) : array_keys($rows); $key = is_array($key) ? array_keys($key) : array_keys($rows);
} }
foreach ($key as &$k) foreach ($key as $i => &$k)
$k = "$k = EXCLUDED.$k"; {
if (is_intval($i))
$k = "$k = EXCLUDED.$k";
else
$k = "$i = ".str_replace('NEW.', 'EXCLUDED.', $k);
}
$sql .= " DO UPDATE SET ".implode(", ", $key); $sql .= " DO UPDATE SET ".implode(", ", $key);
} }
else else
@ -658,49 +695,50 @@ class DatabasePdoPgsql implements Database
return $this->link->lastInsertId(); return $this->link->lastInsertId();
} }
/**
* Разбивает набор таблиц на основную обновляемую + набор дополнительных
*
* Идея в том, чтобы обрабатывать хотя бы 2 простые ситуации:
* UPDATE table1 INNER JOIN table2 ...
* UPDATE table1 LEFT JOIN table2 ...
*/
protected function split_using($tables, &$where) protected function split_using($tables, &$where)
{ {
$first = $has_inner = NULL;
$tables = (array)$tables; $tables = (array)$tables;
$first = NULL;
$isNextInner = true;
$i = 0;
foreach ($tables as $k => $t) foreach ($tables as $k => $t)
{ {
if (!is_array($t)) if ($i == 0)
{ {
if ($first === NULL) if (is_array($t) && ($t[1] instanceof DatabasePdoPgsql_Values || is_array($t[1])))
{ {
$first = $k; throw new DatabaseException("Can only update/delete from real tables, not sub-select, sub-join or VALUES");
}
elseif (!$has_inner)
{
$tables = array($k => array('INNER', $t)) + $tables;
$has_inner = true;
break;
} }
$first = $k;
} }
elseif (in_array(strtolower($t[0]), array('inner', 'cross', 'join'))) elseif ($i == 1)
{ {
$tables = array($k => $t) + $tables; $isNextInner = !is_array($t) || strtolower($t[0]) == 'inner';
$has_inner = true;
if ($first !== NULL)
{
break;
}
} }
else
break;
$i++;
} }
if (count($tables) > 1 && !$has_inner) $more = NULL;
if ($isNextInner)
{ {
if ($first === NULL) $what = $this->tables_builder([ $first => $tables[$first] ], $more);
{ unset($tables[$first]);
throw new DatabaseException("No update/delete subject found");
}
$what = $this->tables_builder([ "_$first" => $tables[$first] ]);
$where[] = "_$first.ctid=".(ctype_digit("$first") ? $tables[$first] : $first).".ctid";
} }
else else
{ {
$what = $this->tables_builder([ $first => $tables[$first] ]); $what = $this->tables_builder([ "_$first" => $tables[$first] ], $more);
unset($tables[$first]); $where[] = "_$first.ctid=".(ctype_digit("$first") ? $tables[$first] : $first).".ctid";
} }
if ($more)
$where = array_merge($where, (array)$more);
return array($what, $tables); return array($what, $tables);
} }
@ -725,9 +763,13 @@ class DatabasePdoPgsql implements Database
$sql[] = $v; $sql[] = $v;
} }
list($what, $using) = $this->split_using($table, $where); list($what, $using) = $this->split_using($table, $where);
$where = $this->where_builder($where) ?: '1=1'; $more = NULL;
$sql = 'UPDATE ' . $what . ' SET ' . implode(', ', $sql) . $sql = 'UPDATE ' . $what . ' SET ' . implode(', ', $sql) .
($using ? ' FROM '.$this->tables_builder($using) : '') . ' WHERE ' . $where; ($using ? ' FROM '.$this->tables_builder($using, $more) : '');
if ($more)
$where = array_merge($where, $more);
$where = $this->where_builder($where) ?: '1=1';
$sql .= ' WHERE ' . $where;
$r = $this->query($sql); $r = $this->query($sql);
if ($r) if ($r)
return $r->rowCount(); return $r->rowCount();
@ -780,3 +822,18 @@ class DatabasePdoPgsql implements Database
return $n; return $n;
} }
} }
class DatabasePdoPgsql_Values
{
function __construct($keys, $values, $alias)
{
$this->keys = $keys;
$this->values = $values;
$this->alias = $alias;
}
function __toString()
{
return "(VALUES (".implode("),(", $this->values).")) AS ".$this->alias." (".implode(',', $this->keys).")";
}
}