Optimize deletion, fix message fetching, remove babel

master
Vitaliy Filippov 2019-05-14 13:54:56 +03:00
parent 6c8edc1667
commit 3a09cdc7ef
7 changed files with 91 additions and 68 deletions

View File

@ -1,4 +0,0 @@
{
"plugins": [ "transform-es2015-destructuring", "transform-object-rest-spread", "transform-es2015-block-scoping" ],
"retainLines": true
}

View File

@ -120,58 +120,61 @@ class ImapManager
let fetchState = { let fetchState = {
...(args||{}), ...(args||{}),
parsed: 0,
paused: false, paused: false,
synced: 0, synced: 0,
parsing: 0,
pending: [], pending: [],
results: [], results: [],
srv: srv, srv: srv,
end: false,
}; };
let wait; let wait;
await new Promise((resolve, reject) => await new Promise((resolve, reject) =>
{ {
let checkFinish = () => let end = () =>
{ {
if (fetchState.parsed <= 0 && wait) if (!fetchState.pending.length)
{ {
// Если сообщение окончания придёт до окончания обработки
// последней порции, тогда не резолвим, а ждём окончания обработки
resolve(); resolve();
} }
}; else
let saveLast = (results) =>
{
if (results)
{ {
fetchState.results = fetchState.results.concat(results); let m = fetchState.pending;
fetchState.pending = [];
processor(m, fetchState)
.then(results =>
{
if (results)
{
fetchState.results = fetchState.results.concat(results);
}
resolve();
})
.catch(e => reject(e));
} }
fetchState.parsed -= fetchState.pending.length;
fetchState.pending = [];
checkFinish();
}; };
f.on('message', (msg, seqnum) => f.on('message', (msg, seqnum) =>
{ {
this.onMessage(fetchState, msg, seqnum, processor) this.onMessage(fetchState, msg, seqnum, processor)
.then(checkFinish) .then(() =>
{
if (fetchState.end && !fetchState.parsing)
{
end();
}
})
.catch(e => reject(e)); .catch(e => reject(e));
}); });
f.once('end', () => f.once('end', () =>
{ {
wait = true; fetchState.end = true;
if (fetchState.parsed <= 0) if (!fetchState.parsing)
{ {
resolve(); end();
}
else if (fetchState.pending.length > 0)
{
processor(fetchState.pending, fetchState)
.then(saveLast)
.catch(e => reject(e));
} }
}); });
}); });
@ -181,17 +184,26 @@ class ImapManager
async onMessage(fetchState, msg, seqnum, processor) async onMessage(fetchState, msg, seqnum, processor)
{ {
let [ msgrow, attrs ] = await this.parseMessage(msg, seqnum); let msgrow, attrs;
fetchState.parsing++;
try
{
[ msgrow, attrs ] = await this.parseMessage(msg, seqnum);
}
catch (e)
{
fetchState.parsing--;
throw e;
}
fetchState.parsing--;
// Workaround memory leak in node-imap // Workaround memory leak in node-imap
// TODO: send pull request // TODO: send pull request
if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache) if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache)
{ {
delete fetchState.srv._curReq.fetchCache[seqnum]; delete fetchState.srv._curReq.fetchCache[seqnum];
} }
fetchState.pending.push([ msgrow, attrs ]); fetchState.pending.push([ msgrow, attrs ]);
fetchState.parsed++; if (!fetchState.paused && fetchState.pending.length >= 100 && !fetchState.nopause)
if (!fetchState.paused && fetchState.parsed >= 100 && !fetchState.nopause)
{ {
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
fetchState.srv._parser._ignoreReadable = true; fetchState.srv._parser._ignoreReadable = true;
@ -207,8 +219,7 @@ class ImapManager
{ {
fetchState.results = fetchState.results.concat(result); fetchState.results = fetchState.results.concat(result);
} }
fetchState.parsed -= m.length; if (fetchState.paused)
if (fetchState.paused && fetchState.parsed < 100)
{ {
fetchState.paused = false; fetchState.paused = false;
fetchState.srv._parser._ignoreReadable = false; fetchState.srv._parser._ignoreReadable = false;

View File

@ -220,7 +220,7 @@ class Syncer
size: true, size: true,
bodies: 'HEADER', bodies: 'HEADER',
struct: true, struct: true,
}, async (messages, state) => await this.saveMessages(messages, boxRow.id, state)); }, (messages, state) => this.saveMessages(messages, boxRow.id, state));
await SQL.update(this.pg, 'folders', { await SQL.update(this.pg, 'folders', {
uidvalidity: boxStatus.uidvalidity, uidvalidity: boxStatus.uidvalidity,
@ -361,18 +361,23 @@ class Syncer
async deleteMessages(where) async deleteMessages(where)
{ {
let cond = SQL.where_builder(where);
let q = SQL.select_builder('messages', 'id', where); let q = SQL.select_builder('messages', 'id', where);
await SQL.update( await this.pg.query(
this.pg, 'threads', { first_msg: null }, SQL.quote_positional(
{ ['first_msg IN ('+q.sql+')']: q.bind }, 'WITH deleting_messages AS (SELECT id FROM messages WHERE '+cond.sql+')'+
', updated_threads AS ('+
'UPDATE threads SET first_msg=('+
'SELECT m.id FROM messages m WHERE m.thread_id=threads.id'+
' AND m.id NOT IN (SELECT id FROM deleting_messages) ORDER BY time LIMIT 1'+
') WHERE first_msg IN (SELECT id FROM deleting_messages)'+
' RETURNING id, first_msg'+
'), deleted_threads AS ('+
'DELETE FROM threads WHERE id IN (SELECT id FROM updated_threads WHERE first_msg IS NULL)'+
' RETURNING id'+
') DELETE FROM messages WHERE id IN (SELECT id FROM deleting_messages)'
), cond.bind
); );
await SQL.delete(this.pg, 'messages', where);
await SQL.update(
this.pg, 'threads',
[ 'first_msg=(select id from messages where thread_id=threads.id order by time limit 1)' ],
{ first_msg: null }
);
await SQL.delete(this.pg, 'threads', { first_msg: null });
} }
async saveMessages(messages, boxId) async saveMessages(messages, boxId)
@ -523,7 +528,7 @@ class Syncer
{ {
upd.first_msg = msgrow.id; upd.first_msg = msgrow.id;
} }
await SQL.update(this.pg, 'threads', upd, { id: msgrow.threadId }); await SQL.update(this.pg, 'threads', upd, { id: msgrow.thread_id });
} }
} }
} }

View File

@ -139,7 +139,7 @@ class SyncerWeb
} }
if (typeof query.search == 'string' && query.search.trim()) if (typeof query.search == 'string' && query.search.trim())
{ {
p['messages_fulltext(m) @@ plainto_tsquery($1)'] = [ query.search.trim() ]; p['messages_fulltext(m) @@ plainto_tsquery(?)'] = [ query.search.trim() ];
} }
if (query.accountId) if (query.accountId)
{ {
@ -252,12 +252,12 @@ class SyncerWeb
if (!msg.body_html && !msg.body_text) if (!msg.body_html && !msg.body_text)
{ {
let srv = await this.syncer.imap.getConnection(msg.account_id, msg.folder_name); let srv = await this.syncer.imap.getConnection(msg.account_id, msg.folder_name);
let upd = await this.syncer.imap.runFetch( let upd = (await this.syncer.imap.runFetch(
srv, msg.uid, { bodies: '' }, srv, msg.uid, { bodies: '' },
(messages, state) => this.getBody(messages, msg.folder_id) async (messages, state) => await this.getBody(messages, msg.folder_id)
); ));
this.syncer.imap.releaseConnection(msg.account_id); this.syncer.imap.releaseConnection(msg.account_id);
return res.send({ msg: { ...msg, ...upd } }); return res.send({ msg: { ...msg, ...upd[0] } });
} }
return res.send({ msg: msg }); return res.send({ msg: msg });
} }

View File

@ -24,11 +24,6 @@
}, },
"peerDependencies": {}, "peerDependencies": {},
"devDependencies": { "devDependencies": {
"babel-cli": "latest",
"babel-plugin-transform-es2015-block-scoping": "latest",
"babel-plugin-transform-es2015-destructuring": "latest",
"babel-plugin-transform-object-rest-spread": "latest",
"babel-register": "latest",
"bluebird": "^3.5.4", "bluebird": "^3.5.4",
"eslint": "latest", "eslint": "latest",
"eslint-plugin-react": "latest" "eslint-plugin-react": "latest"

6
run.sh
View File

@ -1,6 +0,0 @@
#!/bin/sh
for i in ImapManager.js operetta.js Syncer.js SyncerWeb.js; do
node_modules/.bin/babel $i > compiled/$i
done
nodejs --max_old_space_size=100 compiled/operetta.js

View File

@ -1,6 +1,6 @@
// Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8 // Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8
// (c) Виталий Филиппов, 2019 // (c) Виталий Филиппов, 2019
// Версия 2019-05-09 // Версия 2019-05-14
// В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи, // В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи,
// благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ] // благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ]
@ -73,9 +73,9 @@ function select_builder(tables, fields, where, options)
} }
if (options.OFFSET || options.offset) if (options.OFFSET || options.offset)
{ {
sql += ' LIMIT '+((options.OFFSET || options.offset) | 0); sql += ' OFFSET '+((options.OFFSET || options.offset) | 0);
} }
return { sql, bind }; return new Text(sql, bind);
} }
function tables_builder(tables) function tables_builder(tables)
@ -302,7 +302,8 @@ function split_using(tables)
function _positional(sql) function _positional(sql)
{ {
let i = 0; let i = 0;
sql = sql.replace(/\?/g, () => '$'+(++i)); sql = sql.replace(/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g, (m, m1) => (m1 ? '$'+(++i) : m));
console.log('> '+sql);
return sql; return sql;
} }
@ -313,7 +314,7 @@ function _inline(sql, bind)
pg_escape = require('pg-escape'); pg_escape = require('pg-escape');
} }
let i = 0; let i = 0;
sql = sql.replace(/\?/g, () => '\''+pg_escape.string(bind[i++])+'\''); sql = sql.replace(/'(?:[^\']*|\'\')*'|"(?:[^\"]*|\"\")*"|(\?)/g, (m, m1) => (m1 ? '\''+pg_escape.string(bind[i++])+'\'' : m));
return sql; return sql;
} }
@ -321,7 +322,6 @@ function _inline(sql, bind)
async function select(dbh, tables, fields, where, options, format) async function select(dbh, tables, fields, where, options, format)
{ {
let { sql, bind } = select_builder(tables, fields, where, options); let { sql, bind } = select_builder(tables, fields, where, options);
//console.log(_inline(sql, bind));
let data = await dbh.query(_positional(sql), bind); let data = await dbh.query(_positional(sql), bind);
if ((format & MS_LIST) || (format & MS_COL)) if ((format & MS_LIST) || (format & MS_COL))
data = data.rows.map(r => Object.values(r)); data = data.rows.map(r => Object.values(r));
@ -422,14 +422,36 @@ class Pg_Values
} }
} }
class Text
{
constructor(sql, bind)
{
this.sql = sql;
this.bind = bind || [];
}
toString()
{
return _inline(this.sql, this.bind);
}
concat(text)
{
return new Text(this.sql+text.sql, [ ...this.bind, ...text.bind ]);
}
}
module.exports = { module.exports = {
select_builder, select_builder,
where_builder, where_builder,
quote_into: _inline,
quote_positional: _positional,
select, select,
insert, insert,
delete: _delete, delete: _delete,
update, update,
values, values,
Text,
MS_HASH, MS_HASH,
MS_LIST, MS_LIST,
MS_ROW, MS_ROW,