diff --git a/DatabaseMysql.php b/DatabaseMysql.php index 8ae7364..3705fd3 100644 --- a/DatabaseMysql.php +++ b/DatabaseMysql.php @@ -4,8 +4,8 @@ * Very stable interface for MySQL - object-oriented at last :) * Select builder is inspired by MediaWiki's one. * Also usable for querying SphinxQL. - * Version: 2015-05-13 - * (c) Vitaliy Filippov, 2012-2015 + * Version: 2016-05-28 + * (c) Vitaliy Filippov, 2012-2016 */ if (!defined('MS_HASH')) @@ -19,7 +19,12 @@ if (!defined('MS_HASH')) define('MS_RESULT', 8); } -class DatabaseException extends Exception +if (!class_exists('DatabaseException')) +{ + class DatabaseException extends Exception {} +} + +class DatabaseMysqlException extends DatabaseException { function isDuplicateKey() { @@ -37,6 +42,7 @@ class DatabaseMysql implements Database var $host, $port, $socket, $username, $password, $dbname; var $tableNames = array(); + var $init = array(); var $queryLogFile, $loggedQueries = ''; var $reconnect = true; var $autoBegin; @@ -59,8 +65,9 @@ class DatabaseMysql implements Database * tableNames Table name mappings (virtual => real) * queryLogFile Path to query log file * reconnect Whether to reconnect on idle timeout [true] - * autoBegin Whether to automatically begin a transaction of first query [false] + * autoBegin Whether to automatically begin a transaction on first query [false] * ondestroy commit/rollback/none during destruction [commit] + * init Initialisation queries (array) */ function __construct($options) { @@ -76,6 +83,7 @@ class DatabaseMysql implements Database 'queryLogFile' => '', 'autoBegin' => false, 'ondestroy' => 'commit', + 'init' => array(), ); $options += $defOpts; if ($options['socket']) @@ -86,6 +94,11 @@ class DatabaseMysql implements Database { $this->$k = $options[$k]; } + if ($this->username || $this->dbname) // skip for Sphinx + { + // READ COMMITTED is more consistent for average usage + $this->init[] = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED'; + } } function __destruct() @@ -105,37 +118,24 @@ class DatabaseMysql implements Database function connect() { if ($this->socket !== NULL) - { $this->link = new mysqli($this->host, $this->username, $this->password, $this->dbname, $this->port, $this->socket); - } elseif ($this->port !== NULL) - { $this->link = new mysqli($this->host, $this->username, $this->password, $this->dbname, $this->port); - } else - { $this->link = new mysqli($this->host, $this->username, $this->password, $this->dbname); - } $errno = $this->link->connect_errno; $error = $this->link->connect_error; if ($errno) { $this->link = NULL; - throw new DatabaseException($error, $errno); + throw new DatabaseMysqlException($error, $errno); } else { $this->transactions = array(); $this->link->set_charset('utf8'); - if ($this->username || $this->dbname) // skip for Sphinx - { - // READ COMMITTED is more consistent for average usage - $this->link->query('SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED'); - } - if ($this->autoBegin) - { - $this->begin(); - } + foreach ($this->init as $q) + $this->link->query($q); } } @@ -164,60 +164,65 @@ class DatabaseMysql implements Database function quote($value) { if ($value === NULL) - { return "NULL"; - } if (!$this->link) - { $this->connect(); - } return "'" . $this->link->real_escape_string($value) . "'"; } function query($sql, $fetchMode = MYSQLI_STORE_RESULT) { if (!$this->link) - { $this->connect(); - } + if ($this->autoBegin && !$this->transactions) + $this->begin(); $this->queryCount++; if ($this->queryLogFile) { $begin = explode(' ', microtime(), 2); } $r = $this->link->query($sql, $fetchMode); + if (!$r) + $r = $this->check_reconnect('query', [ $sql, $fetchMode ]); if ($this->queryLogFile) { $end = explode(' ', microtime(), 2); $this->loggedQueries .= date("[Y-m-d H:i:s.").substr($end[0], 2, 6)."] [". sprintf("%.05fs", $end[1]-$begin[1]+$end[0]-$begin[0])."] $sql\n"; } - if (!$r) + return $r; + } + + protected function check_reconnect($f, $args) + { + $r = false; + if ($this->link->errno == 2006 && $this->reconnect && (!$this->transactions || $args[0] == "BEGIN")) { - if ($this->link->errno == 2006 && $this->reconnect && !$this->transactions) - { - // "MySQL server has gone away" - $this->connect(); - $r = $this->link->query($sql, $fetchMode); - if (!$r && $this->link->errno == 2006) - { - $this->link = false; - } - } + // "MySQL server has gone away" + $this->connect(); + $r = call_user_func_array([ $this->link, $f ], $args); + if (!$r && $this->link->errno == 2006) + $this->link = false; if (!$r) - { - throw new DatabaseException($this->link->error . "\nQuery: $sql", $this->link->errno); - } + $st = " (reconnect failed)"; } + elseif ($this->link->errno != 2006) + $st = ""; + elseif (!$this->reconnect) + $st = " (reconnect disabled"; + elseif ($this->transactions) + $st = " (not reconnecting because of active transactions)"; + if (!$r) + throw new DatabaseMysqlException('#'.$this->link->errno.': '.$this->link->error . $st . "\nQuery: ".$args[0], $this->link->errno); return $r; } function multi_select(array $queries, $format = 0) { if (!$this->link) - { $this->connect(); - } + if ($this->autoBegin && !$this->transactions) + $this->begin(); $this->queryCount += count($queries); $log = ''; foreach ($queries as &$sql) @@ -236,22 +241,7 @@ class DatabaseMysql implements Database $sql = implode('; ', $queries); $r = $this->link->multi_query($sql); if (!$r) - { - if ($this->link->errno == 2006 && $this->reconnect && !$this->transactions) - { - // "MySQL server has gone away" - $this->connect(); - $r = $this->link->multi_query($sql); - if (!$r && $this->link->errno == 2006) - { - $this->link = false; - } - } - if (!$r) - { - throw new DatabaseException($this->link->error, $this->link->errno); - } - } + $this->check_reconnect('multi_query', [ $sql ]); $results = array(); $i = 0; foreach ($queries as $k => $q) @@ -272,16 +262,12 @@ class DatabaseMysql implements Database */ function begin($savepoint = false) { - $this->transactions[] = $savepoint; - $n = count($this->transactions); + $n = count($this->transactions)+1; + $this->transactions[] = $savepoint ? "sp$n" : false; if ($n == 1) - { return $this->query("BEGIN"); - } elseif ($savepoint) - { return $this->query("SAVEPOINT sp$n"); - } return true; } @@ -291,16 +277,41 @@ class DatabaseMysql implements Database */ function commit() { + $r = true; $savepoint = array_pop($this->transactions); if (!$this->transactions) - { - return $this->query("COMMIT"); - } + $r = $this->query("COMMIT"); elseif ($savepoint) + $r = $this->query("RELEASE SAVEPOINT $savepoint"); + return $r; + } + + /** + * Commits transaction + */ + function commitAll() + { + $r = true; + if ($this->transactions) { - return $this->query("RELEASE SAVEPOINT sp".(1+count($this->transactions))); + $r = $this->query("COMMIT"); + $this->transactions = []; } - return true; + return $r; + } + + /** + * Rollbacks transaction + */ + function rollbackAll() + { + $r = true; + if ($this->transactions) + { + $r = $this->query("ROLLBACK"); + $this->transactions = []; + } + return $r; } /** @@ -309,16 +320,13 @@ class DatabaseMysql implements Database */ function rollback() { + $r = false; $savepoint = array_pop($this->transactions); if (!$this->transactions) - { - return $this->query("ROLLBACK"); - } + $r = $this->query("ROLLBACK"); elseif ($savepoint) - { - return $this->query("ROLLBACK TO SAVEPOINT sp".(1+count($this->transactions))); - } - return false; + $r = $this->query("ROLLBACK TO SAVEPOINT $savepoint"); + return $r; } function errno() @@ -373,7 +381,7 @@ class DatabaseMysql implements Database if (!$v) { // FIXME: It seems we should return empty result in that case - throw new DatabaseException("Error: empty array for '$k IN (...)', don't know what to do"); + throw new DatabaseMysqlException("Error: empty array for '$k IN (...)', don't know what to do"); } else { @@ -469,8 +477,11 @@ class DatabaseMysql implements Database $fields[$k] = "$v AS ".$this->quoteId($k); $fields = join(',', $fields); } + $more = NULL; + $tables = $this->tables_builder($tables, $more); + if ($more) + $where = array_merge($where, $more); $where = $this->where_builder($where); - $tables = $this->tables_builder($tables); $sql = 'SELECT '; if (isset($options['CALC_FOUND_ROWS']) || isset($options['SQL_CALC_FOUND_ROWS'])) $sql .= 'SQL_CALC_FOUND_ROWS '; @@ -573,43 +584,37 @@ class DatabaseMysql implements Database * or just a string "`table1` INNER JOIN `table2` ON ..." * names taken into `backticks` will be transformed using $this->tableNames */ - function tables_builder($tables) + function tables_builder($tables, &$where = NULL) { if (is_array($tables)) { $t = ''; foreach ($tables as $k => $v) { - if (!ctype_digit("$k")) + if (!is_array($v)) + $v = [ 'INNER', $v, NULL ]; + $join = strtolower(substr($v[0], 0, 4)); + if ($join == 'righ') + $join = 'RIGHT'; + elseif ($join == 'left') + $join = 'LEFT'; + else /* if (!$join || $join == 'inne' || $join == 'join') */ + $join = 'INNER'; + if (is_array($v[1])) // nested join (left join (A join B on ...) on ...) + $table = '('.$this->tables_builder($v[1]).')'; + else { - if (is_array($v)) - { - $join = strtolower(substr($v[0], 0, 4)); - if ($join == 'righ') - $join = 'RIGHT'; - elseif ($join == 'left') - $join = 'LEFT'; - else /* if (!$join || $join == 'inne' || $join == 'join') */ - $join = 'INNER'; - if (is_array($v[1])) // nested join (left join (A join B on ...) on ...) - $table = '('.$this->tables_builder($v[1]).')'; - else - $table = (isset($this->tableNames[$v[1]]) ? $this->quoteId($this->tableNames[$v[1]]) : $v[1]) . ' ' . $k; - if ($t) - $t .= " $join JOIN $table ON ".($this->where_builder($v[2]) ?: '1=1'); - else - $t = $table; - continue; - } - else - $v = (isset($this->tableNames[$v]) ? $this->quoteId($this->tableNames[$v]) : $v) . ' ' . $k; + $table = (isset($this->tableNames[$v[1]]) ? $this->quoteId($this->tableNames[$v[1]]) : $v[1]); + if (!ctype_digit("$k")) + $table .= ' ' . $k; } - else - $v = (isset($this->tableNames[$v]) ? $this->quoteId($this->tableNames[$v]).' '.$v : $v); if ($t) - $t .= " INNER JOIN $v"; + $t .= " $join JOIN $table ON ".($this->where_builder($v[2]) ?: '1=1'); else - $t = $v; + { + $t = $table; + $where = $v[2]; // extract ON to WHERE if only a single join is specified + } } $tables = $t; } @@ -721,11 +726,14 @@ class DatabaseMysql implements Database * Builds an INSERT query. * @param string $table Table name to insert rows to. * @param array $rows Array of table rows to be inserted. - * @param boolean $onduplicatekey If true, create MySQL-specific "UPSERT" query using - * INSERT .. ON DUPLICATE KEY UPDATE column=VALUES(column) for all columns. - * @param boolean $replace If true, use REPLACE instead of INSERT + * @param string $action Conflict action: NULL, 'REPLACE', 'IGNORE' or 'UPDATE' + * REPLACE: delete matching rows, then insert all rows (MySQL REPLACE) + * IGNORE: ignore matching rows, insert missing rows (MySQL INSERT IGNORE) + * UPDATE: update matching rows, insert missing rows (MySQL INSERT ... ON DUPLICATE KEY UPDATE) + * @param array|string $uniqueKey Single unique key for conflict checking + * @param array|NULL $updateCols Columns to update in case of a conflict */ - function insert_builder($table, $rows, $onduplicatekey = false, $replace = false) + function insert_builder($table, $rows, $action = NULL, $uniqueKey = NULL, $updateCols = NULL) { if (isset($this->tableNames[$table])) { @@ -743,12 +751,28 @@ class DatabaseMysql implements Database foreach ($key as &$k) if (strpos($k, '`') === false && (!$sphinx || $k !== 'id')) $k = $this->quoteId($k); - $sql = ($replace ? "REPLACE" : "INSERT"). + $sql = ($action == "REPLACE" ? "REPLACE" : "INSERT" . ($action == "IGNORE" ? " IGNORE" : "")). " INTO $table (".implode(",",$key).") VALUES ".implode(",",$rows); - if ($onduplicatekey) + if ($action == "UPDATE") { + if ($uniqueKey) + { + $uniqueKey = array_flip(is_array($uniqueKey) ? $uniqueKey : array_map('trim', explode(",", $uniqueKey))); + $cond = $uniqueKey; + foreach ($cond as $k => $v) + $v = "$k!=VALUES($k)"; + // Trigger ERROR 1242 (21000): Subquery returns more than 1 row if trying to update based on different key conflict + $cond = "CASE WHEN ".implode(" OR ", $cond)." THEN (SELECT 1 UNION SELECT 2) ELSE "; + } + if ($updateCols) + $key = (array)$updateCols; foreach ($key as &$k) - $k = "$k=VALUES($k)"; + { + if ($uniqueKey && isset($uniqueKey[$k])) + $k = "$k=($cond $k END)"; + else + $k = "$k=VALUES($k)"; + } $sql .= " ON DUPLICATE KEY UPDATE ".implode(",",$key); } return $sql; @@ -774,54 +798,67 @@ class DatabaseMysql implements Database } /** - * Update or insert rows into $table. - * Update query: $this->update($table, $set, $where, $options); - * Insert-or-update query: $this->update($table, $rows); + * Update row(s) in $table. + * $this->update($table, $set, $where, $options); * * @param string $table Table name to update. - * @param array $rows One row or array of rows for insert-or-update query. * @param array $set Assoc array with values for update query. * @param array $where Conditions for update query, see $this->where_builder(). * @param array $options Options for update query: * 'LIMIT' => array($limit, $offset) or array($limit) or just $limit * 'OFFSET' => $offset, for the case when 'LIMIT' is just $limit - * 'REPLACE' => true to use REPLACE instead of INSERT */ function update($table, $rows, $where = NULL, $options = NULL) { if (!$rows) return false; - if (is_null($where)) + if (count(func_get_args()) == 2) + throw new Exception(__CLASS__."::update(table, rows) is the old syntax, use upsert()"); + $sql = array(); + foreach ((array)$rows as $k => $v) { - if (!is_array($rows)) - return false; - if (!is_array(@$rows[0])) - $rows = array($rows); - $sql = $this->insert_builder($table, $rows, empty($options['REPLACE']), !empty($options['REPLACE'])); - } - else - { - $sql = array(); - foreach ((array)$rows as $k => $v) - { - if (!ctype_digit("$k")) - $sql[] = $k.'='.$this->quote($v); - else - $sql[] = $v; - } - $where = $this->where_builder($where) ?: '1=1'; - $sql = 'UPDATE ' . $this->tables_builder($table) . ' SET ' . implode(', ', $sql) . ' WHERE ' . $where; - $sql .= $this->limit_option($options); + if (!ctype_digit("$k")) + $sql[] = $k.'='.$this->quote($v); + else + $sql[] = $v; } + $where = $this->where_builder($where) ?: '1=1'; + $sql = 'UPDATE ' . $this->tables_builder($table) . ' SET ' . implode(', ', $sql) . ' WHERE ' . $where; + $sql .= $this->limit_option($options); if ($this->query($sql)) return $this->link->affected_rows; return false; } - function replace($table, $rows, $where = NULL, $options = array()) + /** + * INSERT / REPLACE / INSERT IGNORE / INSERT OR UPDATE + */ + function insert($table, $rows, $onConflict = NULL, $uniqueKey = NULL) { - $options['REPLACE'] = true; - return $this->update($table, $rows, $where, $options); + if (!$rows || !is_array($rows)) + return false; + $first = reset($rows); + if (!is_array($first)) + $rows = array($rows); + $sql = $this->insert_builder($table, $rows, $onConflict, $uniqueKey); + if ($this->query($sql)) + return $this->link->affected_rows; + return false; + } + + function insert_ignore($table, $rows, $uniqueKey = NULL) + { + return $this->insert($table, $rows, 'IGNORE', $uniqueKey); + } + + function upsert($table, $rows, $uniqueKey = NULL, $updateCols = NULL) + { + return $this->insert($table, $rows, 'UPDATE', $uniqueKey, $updateCols); + } + + function replace($table, $rows, $uniqueKey = NULL) + { + return $this->insert($table, $rows, 'REPLACE', $uniqueKey); } protected function get_rows($res) diff --git a/DatabasePdoPgsql.php b/DatabasePdoPgsql.php new file mode 100644 index 0000000..68fe897 --- /dev/null +++ b/DatabasePdoPgsql.php @@ -0,0 +1,769 @@ +getCode() == 23505; + } +} + +if (!interface_exists('Database')) +{ + interface Database {} +} + +class DatabasePdoPgsql implements Database +{ + var $host, $port, $socket, $username, $password, $dbname; + + var $tableNames = array(); + var $init = array(); + var $queryLogFile; + var $reconnect = true; + var $autoBegin; + var $ondestroy = 'commit'; + + var $queryCount = 0; + var $link; + var $transactions = array(); + + protected $calcFoundRows, $foundRows; + + /** + * Creates a PostgreSQL connection object. + * + * @param array $options Possible options: + * host Host name or IP address to connect to [localhost] + * socket Directory of UNIX socket [/var/run/postgresql] + * port TCP port to connect to [5432] + * dbname DB name to use + * username Username + * password Password + * tableNames Table name mappings (virtual => real) + * queryLogFile Path to query log file + * reconnect Whether to reconnect on idle timeout [true] + * autoBegin Whether to automatically begin a transaction of first query [false] + * ondestroy commit/rollback/none during destruction [commit] + * init Initialisation queries (array) + */ + function __construct($options) + { + $defOpts = array( + 'host' => 'localhost', + 'port' => 5432, + 'socket' => '/var/run/postgresql', + 'dbname' => '', + 'username' => '', + 'password' => '', + 'reconnect' => true, + 'tableNames' => array(), + 'queryLogFile' => '', + 'autoBegin' => false, + 'ondestroy' => 'commit', + 'init' => array(), + ); + $options += $defOpts; + if ($options['socket']) + { + $options['host'] = 'localhost'; + } + foreach ($defOpts as $k => $v) + { + $this->$k = $options[$k]; + } + } + + function __destruct() + { + $o = $this->ondestroy; + if (($o === 'commit' || $o === 'rollback') && $this->transactions) + { + $this->transactions = array(false); + $this->$o(); + } + } + + function connect() + { + $str = "pgsql:port=".intval($this->port); + $str .= ";host='".($this->socket !== NULL ? $this->socket : $this->host)."';dbname='".$this->dbname."'"; + try + { + $this->link = new PDO($str, $this->username, $this->password, array( + PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, + PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC, + PDO::ATTR_EMULATE_PREPARES => false, + )); + foreach ($this->init as $q) + $this->link->query($q); + } + catch (PDOException $e) + { + throw new DatabaseException($e->getMessage(), $e->getCode(), $e); + } + $this->transactions = array(); + } + + function getDBName() + { + return $this->dbname; + } + + function quoteId($name) + { + return '"'.str_replace('"', '""', $name).'"'; + } + + function quoteInto($str, $params) + { + $i = 0; + $r = ''; + while (($p = strpos($str, '?')) !== false) + { + $r .= substr($str, 0, $p) . $this->quote($params[$i++]); + $str = substr($str, $p+1); + } + return $r.$str; + } + + function quote($value) + { + if ($value === NULL) + return "NULL"; + if (!$this->link) + $this->connect(); + return $this->link->quote($value); + } + + function query($sql) + { + if (!$this->link) + $this->connect(); + if ($this->autoBegin && !$this->transactions) + $this->begin(); + $this->queryCount++; + if ($this->queryLogFile) + file_put_contents($this->queryLogFile, date("[Y-m-d H:i:s] ").$sql."\n", FILE_APPEND); + return $this->link->query($sql); + } + + /** + * Starts a transaction, supports nested calls and savepoints. + * @param boolean $savepoint Creates savepoints only this parameter is true. + */ + function begin($savepoint = false) + { + $this->transactions[] = $savepoint; + $n = count($this->transactions); + if ($n == 1) + return $this->query("BEGIN"); + elseif ($savepoint) + return $this->query("SAVEPOINT sp$n"); + return true; + } + + /** + * Commits transaction or releases last savepoint. + * If there is no last savepoint, just returns true. + */ + function commit() + { + $r = true; + if (count($this->transactions) == 1) + $r = $this->query("COMMIT"); + elseif ($this->transactions) + $r = $this->query("RELEASE SAVEPOINT sp".count($this->transactions)); + $savepoint = array_pop($this->transactions); + return $r; + } + + /** + * Commits transaction + */ + function commitAll() + { + $r = true; + if ($this->transactions) + { + $r = $this->query("COMMIT"); + $this->transactions = []; + } + return $r; + } + + /** + * Rollbacks transaction + */ + function rollbackAll() + { + $r = true; + if ($this->transactions) + { + $r = $this->query("ROLLBACK"); + $this->transactions = []; + } + return $r; + } + + /** + * Rollbacks transaction or last savepoint. + * If there is no savepoint, returns false. + */ + function rollback() + { + $r = false; + if (count($this->transactions) == 1) + $r = $this->query("ROLLBACK"); + elseif ($this->transactions) + $r = $this->query("ROLLBACK TO SAVEPOINT sp".count($this->transactions)); + $savepoint = array_pop($this->transactions); + return $r; + } + + /** + * Builds WHERE-part of an SQL query. + * $where can also be a string - then it's passed as-is. + * + * @param array $where Query conditions: + * array( + * 'conditional expression', + * 'field_name' => 'value', + * 'field_name' => array('one', 'of', 'values'), + * 'field_name < ?' => 'value', + * 'field_name < DATE_SUB(?, ?)' => array('arg1', 'arg2'), + * 'field1,field2' => array(array(1, 2), array(3, 4)), + * ) + */ + function where_builder($where) + { + if (!is_array($where)) + return $where; + $wh = array(); + foreach ($where as $k => $v) + { + if (ctype_digit("$k")) + { + if (is_array($v)) + { + $str = array_shift($v); + $wh[] = $this->quoteInto($str, $v); + } + else + { + $wh[] = $v; + } + } + elseif (($p = strrpos($k, '?')) !== false) + { + $wh[] = $this->quoteInto($k, (array)$v); + } + elseif (is_array($v)) + { + if (!$v) + { + // FIXME: It seems we should return empty result in that case + throw new DatabaseException("Error: empty array for '$k IN (...)', don't know what to do"); + } + else + { + if (is_array(reset($v))) + foreach ($v as &$l) + $l = "(" . implode(",", array_map(array($this, 'quote'), $l)) . ")"; + else + $v = array_map(array($this, 'quote'), $v); + $wh[] = "$k IN (" . implode(",", $v) . ")"; + } + } + elseif (preg_match('/^-?\d+(\.\d+)?$/s', $v)) // int/float + $wh[] = "$k=$v"; + elseif ($v !== NULL) + $wh[] = "$k=".$this->quote($v); + else + $wh[] = "$k IS NULL"; + } + if ($where) + $where = '(' . join(') AND (', $wh) . ')'; + else + $where = ''; + return $where; + } + + /** + * Builds SQL query text. + * + * @param mixed $tables see $this->tablesBuilder() + * @param mixed $fields Field definitions - either a string or an array. + * Strings are passed to resulting query text as-is. + * Arrays have the following format: + * array('field1', 'alias2' => 'expression2', ...) + * @param mixed $where see $this->whereBuilder() + * @param array $options query options - array of: + * 'CALC_FOUND_ROWS' (emulated with window function) + * 'FOR UPDATE' or 'FOR SHARE' = 'LOCK IN SHARE MODE' + * 'GROUP BY' => array($groupby_field1 => 'ASC', $groupby_field2 => 'DESC') + * 'ORDER BY' => array($orderby_field1 => 'ASC', $orderby_field2 => 'DESC') + * 'LIMIT' => array($offset, $limit) or array($limit) or just $limit + * 'OFFSET' => $offset, for the case when 'LIMIT' is just $limit + */ + function select_builder($tables, $fields, $where, $options = NULL) + { + if (!$options) + $options = array(); + else + { + foreach ($options as $k => $v) + if (ctype_digit("$k")) + $options[$v] = true; + } + if (is_array($fields)) + { + foreach ($fields as $k => $v) + if (!ctype_digit("$k")) + $fields[$k] = "$v AS ".$this->quoteId($k); + $fields = join(',', $fields); + } + $more = NULL; + $tables = $this->tables_builder($tables, $more); + if ($more) + $where = array_merge($where, $more); + $where = $this->where_builder($where); + $this->calcFoundRows = isset($options['CALC_FOUND_ROWS']) || isset($options['SQL_CALC_FOUND_ROWS']); + if ($this->calcFoundRows) + $fields .= ', COUNT(*) OVER () "*"'; + $sql = "SELECT $fields FROM $tables"; + if ($where) + $sql .= " WHERE $where"; + if (!empty($options['GROUP BY']) && $options['GROUP BY'] !== '0') + $sql .= " GROUP BY ".$this->order_option($options['GROUP BY']); + if (!empty($options['ORDER BY']) && $options['ORDER BY'] !== '0') + $sql .= " ORDER BY ".$this->order_option($options['ORDER BY']); + $sql .= $this->limit_option($options); + if (isset($options['FOR UPDATE'])) + $sql .= ' FOR UPDATE'; + elseif (isset($options['LOCK IN SHARE MODE']) || isset($options['FOR SHARE'])) + $sql .= ' FOR SHARE'; + return $sql; + } + + /** + * Handles ORDER BY or GROUP BY options + */ + protected function order_option($g) + { + if (is_array($g)) + { + $g1 = array(); + foreach ($g as $k => $v) + { + if (ctype_digit("$k")) + $g1[] = $v; + else + $g1[] = "$k $v"; + } + $g = join(',', $g1); + } + return $g; + } + + /** + * Handles a single LIMIT or LIMIT and OFFSET options. + */ + protected function limit_option($options) + { + $g = ''; + if (!empty($options['LIMIT'])) + { + $g = $options['LIMIT']; + if (is_array($g) && count($g) != 2) + $g = $g[0]; + if (is_array($g)) + return " LIMIT ".$g[1]." OFFSET ".$g[0]; + $g = " LIMIT $g"; + } + if (!empty($options['OFFSET'])) + { + $g .= " OFFSET ".$options['OFFSET']; + } + return $g; + } + + /** + * Builds FROM-part of an SQL query. + * + * $tables = array( + * 'table', + * 'alias' => 'table', + * 'alias' => array('INNER', 'table_name', $where_for_on_clause), + * 'alias' => array('INNER', $nested_tables), + * ) + * or just a string "table1 INNER JOIN \"table2\" ON ..." + * escaped names ("table2") will be transformed using $this->tableNames + */ + function tables_builder($tables, &$where = NULL) + { + if (is_array($tables)) + { + $t = ''; + foreach ($tables as $k => $v) + { + if (!is_array($v)) + $v = [ 'INNER', $v, NULL ]; + $join = strtolower(substr($v[0], 0, 4)); + if ($join == 'righ') + $join = 'RIGHT'; + elseif ($join == 'left') + $join = 'LEFT'; + elseif ($join == 'full') + $join = 'FULL'; + else /* if (!$join || $join == 'inne' || $join == 'join') */ + $join = 'INNER'; + if (is_array($v[1])) // nested join (left join (A join B on ...) on ...) + $table = '('.$this->tables_builder($v[1]).')'; + else + { + $table = (isset($this->tableNames[$v[1]]) ? $this->quoteId($this->tableNames[$v[1]]) : $v[1]); + if (!ctype_digit("$k")) + $table .= ' ' . $k; + } + if ($t) + $t .= " $join JOIN $table ON ".($this->where_builder($v[2]) ?: '1=1'); + else + { + $t = $table; + $where = $v[2]; // extract ON to WHERE if only a single join is specified + } + } + $tables = $t; + } + else + $tables = preg_replace_callback('/((?:,|JOIN)\s*")([^"]+)/s', array($this, 'tables_builder_pregcb1'), $tables); + return $tables; + } + + function tables_builder_pregcb1($m) + { + if (isset($this->tableNames[$m[2]])) + return $m[1].$this->tableNames[$m[2]]; + return $m[1].$m[2]; + } + + /** + * Run a SELECT query and return results. + * + * Usage: either + * $this->select($tables, $fields, $where, $options, $format) + * using $this->select_builder() or + * $this->select($sql_text, $format) + * using query text. + * + * @param int $format Return format, bitmask of MS_XX constants: + * MS_RESULT = return PDO result object to manually fetch from + * MS_LIST = return rows as indexed arrays + * MS_HASH = return rows as associative arrays (default) + * MS_ROW = only return the first row + * MS_COL = only return the first column + * MS_VALUE = only return the first cell (just 1 value) + */ + function select($tables, $fields = '*', $where = 1, $options = NULL, $format = 0) + { + if (is_int($fields)) + { + $this->calcFoundRows = false; + $sql = $tables; + $format = $fields; + } + else + $sql = $this->select_builder($tables, $fields, $where, $options); + $res = $this->query($sql); + if ($format & MS_RESULT) + return $res; + if (!$res) + return $format == MS_VALUE ? NULL : []; + if ($this->calcFoundRows) + { + // Extract and remove last column from query result + $fs = $format & MS_LIST ? PDO::FETCH_NUM : PDO::FETCH_ASSOC; + $rows = $format & MS_ROW ? [ $res->fetch($fs) ] : $res->fetchAll($fs); + $this->foundRows = reset($rows); + $this->foundRows = end($this->foundRows); + if ($format & MS_COL) + foreach ($rows as &$row) + $row = reset($row); + else + foreach ($rows as &$row) + array_pop($row); + if ($format & MS_ROW) + $rows = $rows[0]; + return $rows; + } + $fs = $format & MS_COL ? PDO::FETCH_COLUMN : ($format & MS_LIST ? PDO::FETCH_NUM : PDO::FETCH_ASSOC); + return $format & MS_ROW ? $res->fetch($fs) : $res->fetchAll($fs); + } + + function found_rows() + { + return $this->foundRows; + } + + /** + * Delete a set of rows. + * @param mixed $tables see $this->tables_builder() (deletes rows only from the first table) + * @param mixed $where see $this->where_builder() + * @param array $options Options for query: + * 'LIMIT' => array($limit, $offset) or array($limit) or just $limit + * 'OFFSET' => $offset, for the case when 'LIMIT' is just $limit + */ + function delete($tables, $where, $options = NULL) + { + list($what, $using) = $this->split_using($tables, $where); + $where = $this->where_builder($where) ?: '1=1'; + $sql = "DELETE FROM $what".($using ? " USING ".$this->tables_builder($using) : '')." WHERE $where"; + $sql .= $this->limit_option($options); + return $this->query($sql); + } + + function values($rows) + { + $key = reset($rows); + if (!is_array($key)) + { + $key = $rows; + $rows = [ $rows ]; + } + $key = array_keys($key); + foreach ($rows as &$r) + { + $rs = array(); + foreach ($key as &$k) + $rs[] = $this->quote(isset($r[$k]) ? $r[$k] : NULL); + $r = implode(",", $rs); + } + foreach ($key as &$k) + if (strpos($k, '"') === false) + $k = $this->quoteId($k); + return array($key, $rows); + } + + /** + * Builds an INSERT query. + * + * @param string $table Table name to insert rows to. + * @param array $rows Array of table rows to be inserted. + * @param string $action NULL, "UPDATE" or "IGNORE", for PgSQL 9.5 "on conflict do ..." + * @param array|string $uniqueColumns unique columns for conflict + * (defaults to {$table}_pkey for "insert or update" queries) + * @param array|NULL $updateCols Columns to update in case of a conflict + */ + function insert_builder($table, $rows, $action = NULL, $uniqueColumns = NULL, $updateCols = NULL) + { + if (isset($this->tableNames[$table])) + { + $table = $this->quoteId($this->tableNames[$table]); + } + list($keys, $values) = $this->values($rows); + $sql = "INSERT INTO $table (".implode(',', $keys).") VALUES (".implode('),(', $values).")"; + if ($action) + { + $sql .= " ON CONFLICT"; + if ($uniqueColumns) + $sql .= " (".implode(", ", (array)$uniqueColumns).")"; + elseif ($action == "UPDATE") + $sql .= " ON CONSTRAINT ${table}_pkey"; + if ($action == "UPDATE") + { + if ($updateCols) + $key = (array)$updateCols; + else + { + $key = reset($rows); + $key = is_array($key) ? array_keys($key) : array_keys($rows); + } + foreach ($key as &$k) + $k = "$k = EXCLUDED.$k"; + $sql .= " DO UPDATE SET ".implode(", ", $key); + } + else + $sql .= " DO NOTHING"; + } + return $sql; + } + + /** + * Insert a single row into $table and return inserted ID. + * + * @param string $table Table name to insert row to. + * @param array $row Row to be inserted. + * @return int $insert_id Autoincrement ID of inserted row (if appropriate). + */ + function insert_row($table, $row) + { + $sql = $this->insert_builder($table, array($row)) . ' RETURNING LASTVAL()'; + return $this->select($sql, MS_VALUE); + } + + /** + * Insert multiple rows and return them all with their new values (including IDs) + * + * @param string $table Table name to insert row to. + * @param array $rows Rows to be inserted. + * @return array $rows Inserted rows with all fields (array of assoc arrays). + */ + function insert_returning($table, $rows) + { + if (!$rows) + return array(); + $sql = $this->insert_builder($table, $rows) . ' RETURNING *'; + return $this->select($sql, MS_HASH); + } + + function insert_id() + { + return $this->link->lastInsertId(); + } + + protected function split_using($tables, &$where) + { + $first = $has_inner = NULL; + $tables = (array)$tables; + foreach ($tables as $k => $t) + { + if (!is_array($t)) + { + if ($first === NULL) + { + $first = $k; + } + elseif (!$has_inner) + { + $tables = array($k => array('INNER', $t)) + $tables; + $has_inner = true; + break; + } + } + elseif (in_array(strtolower($t[0]), array('inner', 'cross', 'join'))) + { + $tables = array($k => $t) + $tables; + $has_inner = true; + if ($first !== NULL) + { + break; + } + } + } + if (count($tables) > 1 && !$has_inner) + { + if ($first === NULL) + { + throw new DatabaseException("No update/delete subject found"); + } + $what = $this->tables_builder([ "_$first" => $tables[$first] ]); + $where[] = "_$first.ctid=".(ctype_digit("$first") ? $tables[$first] : $first).".ctid"; + } + else + { + $what = $this->tables_builder([ $first => $tables[$first] ]); + unset($tables[$first]); + } + return array($what, $tables); + } + + /** + * Update row(s) in $table. + * $this->update($table, $set, $where); + * + * @param string $table Table(s) to update, see $this->tables_builder() + * @param array $set Assoc array with values for update query. + * @param array $where Conditions for update query, see $this->where_builder(). + */ + function update($table, $set, $where = NULL, $options = NULL) + { + if (!$set) + return false; + $sql = array(); + foreach ((array)$set as $k => $v) + { + if (!ctype_digit("$k")) + $sql[] = $k.'='.$this->quote($v); + else + $sql[] = $v; + } + list($what, $using) = $this->split_using($table, $where); + $where = $this->where_builder($where) ?: '1=1'; + $sql = 'UPDATE ' . $what . ' SET ' . implode(', ', $sql) . + ($using ? ' FROM '.$this->tables_builder($using) : '') . ' WHERE ' . $where; + $r = $this->query($sql); + if ($r) + return $r->rowCount(); + return false; + } + + /** + * Insert or update rows + */ + function insert($table, $rows, $onConflict = NULL, $uniqueKey = NULL) + { + if (!$rows) + return false; + $sql = $this->insert_builder($table, $rows, $onConflict, $uniqueKey); + $r = $this->query($sql); + if ($r) + return $r->rowCount(); + return false; + } + + function insert_ignore($table, $rows, $uniqueKey = NULL) + { + return $this->insert($table, $rows, 'IGNORE', $uniqueKey); + } + + function upsert($table, $rows, $uniqueKey = NULL, $updateCols = NULL) + { + return $this->insert($table, $rows, 'UPDATE', $uniqueKey, $updateCols); + } + + function replace($table, $rows, $uniqueKey = NULL) + { + $first = reset($rows); + if (!is_array($first)) + { + $first = $rows; + $rows = [ $first ]; + } + $key = $uniqueKey ? (is_array($uniqueKey) ? $uniqueKey : array_map('trim', explode(',', $uniqueKey))) : array_keys($first); + $where = []; + foreach ($rows as $row) + { + $r = []; + foreach ($key as $k) + $r[] = $row[$k]; + $where[] = $r; + } + $n = $this->delete($table, array("(".implode(", ", $key).")" => $where)); + $n += $this->insert($table, $rows); + return $n; + } +}