diff --git a/.babelrc b/.babelrc deleted file mode 100644 index beec9bd..0000000 --- a/.babelrc +++ /dev/null @@ -1,4 +0,0 @@ -{ - "plugins": [ "transform-es2015-destructuring", "transform-object-rest-spread", "transform-es2015-block-scoping" ], - "retainLines": true -} diff --git a/ImapManager.js b/ImapManager.js index a2c45ad..56491e1 100644 --- a/ImapManager.js +++ b/ImapManager.js @@ -120,58 +120,61 @@ class ImapManager let fetchState = { ...(args||{}), - parsed: 0, paused: false, synced: 0, + parsing: 0, pending: [], results: [], srv: srv, + end: false, }; let wait; await new Promise((resolve, reject) => { - let checkFinish = () => + let end = () => { - if (fetchState.parsed <= 0 && wait) + if (!fetchState.pending.length) { - // Если сообщение окончания придёт до окончания обработки - // последней порции, тогда не резолвим, а ждём окончания обработки resolve(); } - }; - - let saveLast = (results) => - { - if (results) + else { - fetchState.results = fetchState.results.concat(results); + let m = fetchState.pending; + fetchState.pending = []; + processor(m, fetchState) + .then(results => + { + if (results) + { + fetchState.results = fetchState.results.concat(results); + } + resolve(); + }) + .catch(e => reject(e)); } - fetchState.parsed -= fetchState.pending.length; - fetchState.pending = []; - checkFinish(); }; f.on('message', (msg, seqnum) => { this.onMessage(fetchState, msg, seqnum, processor) - .then(checkFinish) + .then(() => + { + if (fetchState.end && !fetchState.parsing) + { + end(); + } + }) .catch(e => reject(e)); }); f.once('end', () => { - wait = true; - if (fetchState.parsed <= 0) + fetchState.end = true; + if (!fetchState.parsing) { - resolve(); - } - else if (fetchState.pending.length > 0) - { - processor(fetchState.pending, fetchState) - .then(saveLast) - .catch(e => reject(e)); + end(); } }); }); @@ -181,17 +184,26 @@ class ImapManager async onMessage(fetchState, msg, seqnum, processor) { - let [ msgrow, attrs ] = await this.parseMessage(msg, seqnum); + let msgrow, attrs; + fetchState.parsing++; + try + { + [ msgrow, attrs ] = await this.parseMessage(msg, seqnum); + } + catch (e) + { + fetchState.parsing--; + throw e; + } + fetchState.parsing--; // Workaround memory leak in node-imap // TODO: send pull request if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache) { delete fetchState.srv._curReq.fetchCache[seqnum]; } - fetchState.pending.push([ msgrow, attrs ]); - fetchState.parsed++; - if (!fetchState.paused && fetchState.parsed >= 100 && !fetchState.nopause) + if (!fetchState.paused && fetchState.pending.length >= 100 && !fetchState.nopause) { // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! fetchState.srv._parser._ignoreReadable = true; @@ -207,8 +219,7 @@ class ImapManager { fetchState.results = fetchState.results.concat(result); } - fetchState.parsed -= m.length; - if (fetchState.paused && fetchState.parsed < 100) + if (fetchState.paused) { fetchState.paused = false; fetchState.srv._parser._ignoreReadable = false; diff --git a/Syncer.js b/Syncer.js index ed3ef6b..6569ee3 100644 --- a/Syncer.js +++ b/Syncer.js @@ -220,7 +220,7 @@ class Syncer size: true, bodies: 'HEADER', struct: true, - }, async (messages, state) => await this.saveMessages(messages, boxRow.id, state)); + }, (messages, state) => this.saveMessages(messages, boxRow.id, state)); await SQL.update(this.pg, 'folders', { uidvalidity: boxStatus.uidvalidity, @@ -361,18 +361,23 @@ class Syncer async deleteMessages(where) { + let cond = SQL.where_builder(where); let q = SQL.select_builder('messages', 'id', where); - await SQL.update( - this.pg, 'threads', { first_msg: null }, - { ['first_msg IN ('+q.sql+')']: q.bind }, + await this.pg.query( + SQL.quote_positional( + 'WITH deleting_messages AS (SELECT id FROM messages WHERE '+cond.sql+')'+ + ', updated_threads AS ('+ + 'UPDATE threads SET first_msg=('+ + 'SELECT m.id FROM messages m WHERE m.thread_id=threads.id'+ + ' AND m.id NOT IN (SELECT id FROM deleting_messages) ORDER BY time LIMIT 1'+ + ') WHERE first_msg IN (SELECT id FROM deleting_messages)'+ + ' RETURNING id, first_msg'+ + '), deleted_threads AS ('+ + 'DELETE FROM threads WHERE id IN (SELECT id FROM updated_threads WHERE first_msg IS NULL)'+ + ' RETURNING id'+ + ') DELETE FROM messages WHERE id IN (SELECT id FROM deleting_messages)' + ), cond.bind ); - await SQL.delete(this.pg, 'messages', where); - await SQL.update( - this.pg, 'threads', - [ 'first_msg=(select id from messages where thread_id=threads.id order by time limit 1)' ], - { first_msg: null } - ); - await SQL.delete(this.pg, 'threads', { first_msg: null }); } async saveMessages(messages, boxId) @@ -523,7 +528,7 @@ class Syncer { upd.first_msg = msgrow.id; } - await SQL.update(this.pg, 'threads', upd, { id: msgrow.threadId }); + await SQL.update(this.pg, 'threads', upd, { id: msgrow.thread_id }); } } } diff --git a/SyncerWeb.js b/SyncerWeb.js index 515972c..dcdaf2c 100644 --- a/SyncerWeb.js +++ b/SyncerWeb.js @@ -139,7 +139,7 @@ class SyncerWeb } if (typeof query.search == 'string' && query.search.trim()) { - p['messages_fulltext(m) @@ plainto_tsquery($1)'] = [ query.search.trim() ]; + p['messages_fulltext(m) @@ plainto_tsquery(?)'] = [ query.search.trim() ]; } if (query.accountId) { @@ -252,12 +252,12 @@ class SyncerWeb if (!msg.body_html && !msg.body_text) { let srv = await this.syncer.imap.getConnection(msg.account_id, msg.folder_name); - let upd = await this.syncer.imap.runFetch( + let upd = (await this.syncer.imap.runFetch( srv, msg.uid, { bodies: '' }, - (messages, state) => this.getBody(messages, msg.folder_id) - ); + async (messages, state) => await this.getBody(messages, msg.folder_id) + )); this.syncer.imap.releaseConnection(msg.account_id); - return res.send({ msg: { ...msg, ...upd } }); + return res.send({ msg: { ...msg, ...upd[0] } }); } return res.send({ msg: msg }); } diff --git a/package.json b/package.json index e98a2b8..5cb1de2 100644 --- a/package.json +++ b/package.json @@ -24,11 +24,6 @@ }, "peerDependencies": {}, "devDependencies": { - "babel-cli": "latest", - "babel-plugin-transform-es2015-block-scoping": "latest", - "babel-plugin-transform-es2015-destructuring": "latest", - "babel-plugin-transform-object-rest-spread": "latest", - "babel-register": "latest", "bluebird": "^3.5.4", "eslint": "latest", "eslint-plugin-react": "latest" diff --git a/run.sh b/run.sh deleted file mode 100755 index 91c4ee0..0000000 --- a/run.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/sh - -for i in ImapManager.js operetta.js Syncer.js SyncerWeb.js; do - node_modules/.bin/babel $i > compiled/$i -done -nodejs --max_old_space_size=100 compiled/operetta.js diff --git a/select-builder-pgsql.js b/select-builder-pgsql.js index 32d033c..a6457f8 100644 --- a/select-builder-pgsql.js +++ b/select-builder-pgsql.js @@ -1,6 +1,6 @@ // Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8 // (c) Виталий Филиппов, 2019 -// Версия 2019-05-09 +// Версия 2019-05-14 // В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи, // благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ] @@ -73,9 +73,9 @@ function select_builder(tables, fields, where, options) } if (options.OFFSET || options.offset) { - sql += ' LIMIT '+((options.OFFSET || options.offset) | 0); + sql += ' OFFSET '+((options.OFFSET || options.offset) | 0); } - return { sql, bind }; + return new Text(sql, bind); } function tables_builder(tables) @@ -302,7 +302,8 @@ function split_using(tables) function _positional(sql) { let i = 0; - sql = sql.replace(/\?/g, () => '$'+(++i)); + sql = sql.replace(/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g, (m, m1) => (m1 ? '$'+(++i) : m)); + console.log('> '+sql); return sql; } @@ -313,7 +314,7 @@ function _inline(sql, bind) pg_escape = require('pg-escape'); } let i = 0; - sql = sql.replace(/\?/g, () => '\''+pg_escape.string(bind[i++])+'\''); + sql = sql.replace(/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g, (m, m1) => (m1 ? '\''+pg_escape.string(bind[i++])+'\'' : m)); return sql; } @@ -321,7 +322,6 @@ function _inline(sql, bind) async function select(dbh, tables, fields, where, options, format) { let { sql, bind } = select_builder(tables, fields, where, options); - //console.log(_inline(sql, bind)); let data = await dbh.query(_positional(sql), bind); if ((format & MS_LIST) || (format & MS_COL)) data = data.rows.map(r => Object.values(r)); @@ -422,14 +422,36 @@ class Pg_Values } } +class Text +{ + constructor(sql, bind) + { + this.sql = sql; + this.bind = bind || []; + } + + toString() + { + return _inline(this.sql, this.bind); + } + + concat(text) + { + return new Text(this.sql+text.sql, [ ...this.bind, ...text.bind ]); + } +} + module.exports = { select_builder, where_builder, + quote_into: _inline, + quote_positional: _positional, select, insert, delete: _delete, update, values, + Text, MS_HASH, MS_LIST, MS_ROW,