From de1429e73a7b95c2c71c554ad6d6efd266d82268 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 8 May 2019 16:39:14 +0300 Subject: [PATCH] WIP: ES6 + async/await rework, pg-bricks replaced with an own select-builder --- ImapManager.js | 432 +++++++++++---------- Syncer.js | 808 ++++++++++++++++++++-------------------- SyncerWeb.js | 532 ++++++++++++++------------ operetta.js | 35 +- package.json | 8 +- run.sh | 8 +- select-builder-pgsql.js | 279 ++++++++++++++ 7 files changed, 1229 insertions(+), 873 deletions(-) create mode 100644 select-builder-pgsql.js diff --git a/ImapManager.js b/ImapManager.js index 0f2c718..98d78cb 100644 --- a/ImapManager.js +++ b/ImapManager.js @@ -1,240 +1,260 @@ -const gen = require('gen-thread'); const Imap = require('imap'); module.exports = ImapManager; -function ImapManager() +class ImapManager { - this.accounts = {}; - this.connections = {}; - this.busy = {}; - this.selected = {}; - this.queue = {}; - this.onIdle = {}; - this.onStopIdle = {}; -} - -ImapManager.prototype.setServer = function(accountId, settings) -{ - this.accounts[accountId] = settings; -} - -ImapManager.prototype.getConnection = function*(accountId, boxName, connKey, onIdle, onStopIdle) -{ - var self = this; - connKey = accountId+(connKey||''); - if (self.connections[connKey]) + constructor() { - let stoppingIdle = self.queue[connKey].length == 0; - if (self.busy[connKey]) + this.accounts = {}; + this.connections = {}; + this.busy = {}; + this.selected = {}; + this.queue = {}; + this.onIdle = {}; + this.onStopIdle = {}; + } + + setServer(accountId, settings) + { + this.accounts[accountId] = settings; + } + + async getConnection(accountId, boxName, connKey, onIdle, onStopIdle) + { + connKey = accountId+(connKey||''); + if (this.connections[connKey]) { - // wait for the queue to finish - yield self.queue[connKey].push(gen.cb()); + let stoppingIdle = this.queue[connKey].length == 0; + if (this.busy[connKey]) + { + // wait for the queue to finish + await this.queue[connKey](); + } + if (stoppingIdle && this.onStopIdle[connKey]) + { + // run "stop idle" callback + this.onStopIdle[connKey](accountId, this.connections[connKey]); + } + if (boxName && this.selected[connKey] != boxName) + { + // select different box + await new Promise((r, e) => this.connections[connKey].openBox(boxName, true, r)); + this.selected[connKey] = boxName; + } + this.busy[connKey] = true; + return this.connections[connKey]; } - if (stoppingIdle && self.onStopIdle[connKey]) + + let srv = new Imap(self.accounts[accountId]); + // FIXME handle connection errors + await new Promise((r, e) => { - // run "stop idle" callback - self.onStopIdle[connKey](accountId, self.connections[connKey]); - } - if (boxName && self.selected[connKey] != boxName) + srv.once('ready', r); + srv.connect(); + }); + await new Promise((r, e) => srv._enqueue('ENABLE QRESYNC', r)); + + // Monkey-patch node-imap to support VANISHED responses + var oldUT = srv._parser._resUntagged; + srv._parser._resUntagged = () => { - // select different box - yield self.connections[connKey].openBox(boxName, true, gen.ef()); - self.selected[connKey] = boxName; - } - self.busy[connKey] = true; - return self.connections[connKey]; - } + var m; + if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer)) + { + srv.emit('vanish', m[2].split(/,/).map(s => s.split(':'))); + } + oldUT.apply(this); + }; - var srv = new Imap(self.accounts[accountId]); - - srv.once('ready', gen.cb()); - // FIXME handle connection errors - yield srv.connect(); - yield srv._enqueue('ENABLE QRESYNC', gen.cb()); - - // Monkey-patch node-imap to support VANISHED responses - var oldUT = srv._parser._resUntagged; - srv._parser._resUntagged = function() - { - var m; - if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer)) + srv.on('close', () => { - srv.emit('vanish', m[2].split(/,/).map(s => s.split(':'))); + delete this.connections[connKey]; + if (this.srv == srv) + { + this.srv = null; + } + }); + + if (boxName) + { + await new Promise((r, e) => srv.openBox(boxName, true, r)); + this.selected[connKey] = boxName; } - oldUT.apply(this); - }; - srv.on('close', function() - { - delete self.connections[connKey]; - if (self.srv == srv) - delete self.srv; - }); - - if (boxName) - { - yield srv.openBox(boxName, true, gen.ef()); - self.selected[connKey] = boxName; + this.connections[connKey] = srv; + this.busy[connKey] = true; + this.queue[connKey] = []; + this.onIdle[connKey] = onIdle; + this.onStopIdle[connKey] = onStopIdle; + return srv; } - self.connections[connKey] = srv; - self.busy[connKey] = true; - self.queue[connKey] = []; - self.onIdle[connKey] = onIdle; - self.onStopIdle[connKey] = onStopIdle; - return srv; -} - -ImapManager.prototype.releaseConnection = function(accountId, connKey, allowClose) -{ - var self = this; - connKey = accountId+(connKey||''); - self.busy[connKey] = false; - if (self.queue[connKey].length) + releaseConnection(accountId, connKey, allowClose) { - (self.queue[connKey].shift())(); + connKey = accountId + (connKey||''); + this.busy[connKey] = false; + if (this.queue[connKey].length) + { + (this.queue[connKey].shift())(); + } + else if (allowClose) + { + this.connections[connKey].end(); + delete this.connections[connKey]; + delete this.busy[connKey]; + delete this.queue[connKey]; + delete this.selected[connKey]; + } + else + { + if (this.onIdle[connKey]) + this.onIdle[connKey](accountId, this.connections[connKey]); + } } - else if (allowClose) - { - self.connections[connKey].end(); - delete self.connections[connKey]; - delete self.busy[connKey]; - delete self.queue[connKey]; - delete self.selected[connKey]; - } - else - { - if (self.onIdle[connKey]) - self.onIdle[connKey](accountId, self.connections[connKey]); - } -} -ImapManager.prototype.runFetch = function*(srv, what, params, processor, args) -{ - var self = this; - var f = srv.fetch(what, params); - - var fetchState = { - ...(args||{}), - parsed: 0, - paused: false, - synced: 0, - pending: [], - results: [], - srv: srv - }; - - var cb, wait; - f.on('message', function(msg, seqnum) + async runFetch(srv, what, params, processor, args) { - gen.run(self.onMessage(fetchState, msg, seqnum, processor), checkFinish, function(e) { checkFinish(); throw e; }); - }); + let f = srv.fetch(what, params); - cb = gen.cb(); - yield f.once('end', function() - { - wait = true; - if (fetchState.parsed <= 0) - cb(); - else if (fetchState.pending.length > 0) - gen.run(processor(fetchState.pending, fetchState), saveLast, function(e) { saveLast(); throw e; }); - }); + let fetchState = { + ...(args||{}), + parsed: 0, + paused: false, + synced: 0, + pending: [], + results: [], + srv: srv, + }; + + let wait; + + await new Promise((resolve, reject) => + { + let error; + + let checkFinish = () => + { + if (fetchState.parsed <= 0 && wait) + { + // Если сообщение окончания придёт до окончания обработки + // последней порции, тогда ждём окончания обработки + if (error) + reject(error); + else + resolve(); + } + }; + + let saveLast = (results) => + { + if (results) + { + fetchState.results = fetchState.results.concat(results); + } + fetchState.parsed -= fetchState.pending.length; + fetchState.pending = []; + checkFinish(); + }; + + f.on('message', (msg, seqnum) => + { + this.onMessage(fetchState, msg, seqnum, processor) + .then(checkFinish) + .catch(e => { error = e; checkFinish(); }); + }); + + f.once('end', () => + { + wait = true; + if (fetchState.parsed <= 0) + { + resolve(); + } + else if (fetchState.pending.length > 0) + { + processor(fetchState.pending, fetchState) + .then(saveLast) + .catch(e => { error = e; saveLast(); }); + } + }); + }); - if (fetchState.results.length > 0) - { return fetchState.results; } - function saveLast(r) + async onMessage(fetchState, msg, seqnum, processor) { - if (r) - fetchState.results = fetchState.results.concat(r); - fetchState.parsed -= fetchState.pending.length; - fetchState.pending = []; - checkFinish(); - } - - function checkFinish() - { - if (fetchState.parsed <= 0 && wait) - cb(); - } -}; - -ImapManager.prototype.onMessage = function*(fetchState, msg, seqnum, processor) -{ - var self = this; - var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum); - // Workaround memory leak in node-imap - // TODO: send pull request - if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache) - delete fetchState.srv._curReq.fetchCache[seqnum]; - - fetchState.pending.push([ msgrow, attrs ]); - fetchState.parsed++; - if (!fetchState.paused && fetchState.parsed >= 100 && !fetchState.nopause) - { - // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! - fetchState.srv._parser._ignoreReadable = true; - fetchState.paused = true; - } - - if (fetchState.pending.length >= 100) - { - var m = fetchState.pending; - fetchState.pending = []; - var err; - var result; - try + let [ msgrow, attrs ] = await this.parseMessage(msg, seqnum); + // Workaround memory leak in node-imap + // TODO: send pull request + if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache) { - result = yield gen.run(processor(m, fetchState), gen.cb()); + delete fetchState.srv._curReq.fetchCache[seqnum]; + } + + fetchState.pending.push([ msgrow, attrs ]); + fetchState.parsed++; + if (!fetchState.paused && fetchState.parsed >= 100 && !fetchState.nopause) + { + // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! + fetchState.srv._parser._ignoreReadable = true; + fetchState.paused = true; + } + + if (fetchState.pending.length >= 100) + { + let m = fetchState.pending; + fetchState.pending = []; + let err; + let result = await processor(m, fetchState); if (result) + { fetchState.results = fetchState.results.concat(result); + } + fetchState.parsed -= m.length; + if (fetchState.paused && fetchState.parsed < 100) + { + fetchState.paused = false; + fetchState.srv._parser._ignoreReadable = false; + process.nextTick(fetchState.srv._parser._cbReadable); + } + if (err) + { + throw err; + } } - catch (e) + } + + async parseMessage(msg, seqnum) + { + let msgrow = {}; + let attrs; + msg.on('body', function(stream, info) { - err = e; - } - fetchState.parsed -= m.length; - if (fetchState.paused && fetchState.parsed < 100) + let buffer; + stream.on('data', function(chunk) + { + if (!buffer) + buffer = chunk; + else + buffer = Buffer.concat([ buffer, chunk ]); + }); + stream.once('end', function() + { + msgrow.headers = buffer; + }); + }); + msg.once('attributes', function(a) { - fetchState.paused = false; - fetchState.srv._parser._ignoreReadable = false; - process.nextTick(fetchState.srv._parser._cbReadable); - } - if (err) - throw err; + attrs = a; + }); + await new Promise((r, e) => msg.once('end', r)); + msgrow.uid = attrs.uid; + msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')); + let nf = msgrow.flags.filter(f => f != 'seen'); + nf = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf; + msgrow.flags = nf.sort(); + return [ msgrow, attrs ]; } } - -ImapManager.prototype.parseMessage = function*(msg, seqnum) -{ - var msgrow = {}; - var attrs; - msg.on('body', function(stream, info) - { - var buffer; - stream.on('data', function(chunk) - { - if (!buffer) - buffer = chunk; - else - buffer = Buffer.concat([ buffer, chunk ]); - }); - stream.once('end', function() - { - msgrow.headers = buffer; - }); - }); - msg.once('attributes', function(a) { - attrs = a; - }); - yield msg.once('end', gen.cb()); - msgrow.uid = attrs.uid; - msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')); - var nf = msgrow.flags.filter(f => f != 'seen'); - nf = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf; - msgrow.flags = nf.sort(); - return [ msgrow, attrs ]; -} diff --git a/Syncer.js b/Syncer.js index bdc3434..c91b9a9 100644 --- a/Syncer.js +++ b/Syncer.js @@ -1,4 +1,3 @@ -const gen = require('gen-thread'); const Imap = require('imap'); const ImapManager = require('./ImapManager.js'); const EventEmitter = require('events').EventEmitter; @@ -8,413 +7,432 @@ const mimelib = require('mimelib'); module.exports = Syncer; -function Syncer(pg) +class Syncer { - this.syncInProgress = false; - this.pg = pg; - this.imap = new ImapManager(); - this.runIdle = this.runIdle.bind(this); - this.stopIdle = this.stopIdle.bind(this); - this.events = new EventEmitter(); -} - -Syncer.prototype.init = function*(cfg) -{ - for (var i = 0; i < cfg.accounts.length; i++) - yield* this.addAccount(cfg.accounts[i]); - yield* this.loadAccounts(); -} - -Syncer.prototype.syncAll = function*() -{ - this.syncInProgress = true; - for (var id in this.accounts) - yield* this.syncAccount(this.accounts[id]); - this.syncInProgress = false; - this.events.emit('sync', { state: 'complete' }); -} - -Syncer.prototype.addAccount = function*(account) -{ - var self = this; - var [ row ] = yield this.pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef()); - if (row.length) + constructor(pg) { - row = row[0]; - yield this.pg.update('accounts', { settings: { imap: account.imap, folders: account.folders } }).where({ id: row.id }).run(gen.ef()); + this.syncInProgress = false; + this.pg = pg; + this.imap = new ImapManager(); + this.runIdle = this.runIdle.bind(this); + this.stopIdle = this.stopIdle.bind(this); + this.events = new EventEmitter(); } - else + + async init(cfg) { - [ row ] = yield this.pg.insert('accounts', { - name: account.name, - email: account.email, - settings: { - imap: account.imap, - folders: account.folders - } - }).returning('*').row(gen.ef()); - } - return row.id; -} - -Syncer.prototype.loadAccounts = function*() -{ - let [ rows ] = yield this.pg.select('*').from('accounts').rows(gen.ef()); - this.accounts = {}; - for (var i = 0; i < rows.length; i++) - { - this.accounts[rows[i].id] = rows[i]; - this.imap.setServer(rows[i].id, rows[i].settings.imap); - } -} - -Syncer.prototype.getSyncConnection = function*(accountId, boxName) -{ - var srv = yield* this.imap.getConnection(accountId, null, 'S', this.runIdle, this.stopIdle); - return srv; -} - -Syncer.prototype.idleUidvalidity = function(accountId, uidvalidity) -{ - // uidvalidity changes (FUUUU) remove everything and resync - -} - -Syncer.prototype.idleMail = function(accountId, count) -{ - // new messages arrived while idling, fetch them - var self = this; - gen.run(function*() - { - var srv = yield* self.getSyncConnection(accountId); - yield* self.syncBox(srv, accountId, 'INBOX'); - self.releaseSyncConnection(accountId); - }); -} - -Syncer.prototype.idleVanish = function(accountId, uids) -{ - // messages expunged by uids - var self = this; - gen.run(function*() - { - let [ boxId ] = yield self.pg.select('id').from('folders') - .where({ name: 'INBOX', account_id: accountId }).val(gen.ef()); - yield* self.deleteVanished(boxId, uids); - }); -} - -Syncer.prototype.idleExpunge = function(accountId, seqno) -{ - // message expunged by (FUUUU) sequence number(s?) - var self = this; - gen.run(function*() - { - var srv = yield* self.getSyncConnection(accountId); - yield* self.syncBox(srv, accountId, 'INBOX'); - self.releaseSyncConnection(accountId); - }); -} - -Syncer.prototype.runIdle = function(accountId, srv) -{ - var self = this; - if (!srv._idleCallbacks) - { - srv._idleCallbacks = { - uidvalidity: this.idleUidvalidity.bind(this, accountId), - mail: this.idleMail.bind(this, accountId), - vanish: this.idleVanish.bind(this, accountId), - expunge: this.idleExpunge.bind(this, accountId) - } - } - for (var i in srv._idleCallbacks) - { - srv.on(i, srv._idleCallbacks[i]); - } - srv.openBox('INBOX', true, function() {}); -} - -Syncer.prototype.stopIdle = function(accountId, srv) -{ - for (var i in srv._idleCallbacks) - { - srv.removeListener(i, srv._idleCallbacks[i]); - } -} - -Syncer.prototype.releaseSyncConnection = function(accountId, boxName) -{ - this.imap.releaseConnection(accountId, 'S'); -} - -Syncer.prototype.syncAccount = function*(account) -{ - var self = this; - var accountId; - var [ rows ] = yield this.pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef()); - if (rows[0] && rows[0].id) - accountId = rows[0].id; - else - { - var [ row ] = yield this.pg.insert('accounts', { - name: account.name, - email: account.email, - settings: { - imap: account.imap - } - }).returning('id').row(gen.ef()); - accountId = row.id; - } - var srv = yield* self.getSyncConnection(accountId); - var [ boxes ] = yield srv.getBoxes(gen.ef()); - for (var k in boxes) - { - var boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase(); - yield* self.syncBox(srv, accountId, k, boxKind, true); - } - self.releaseSyncConnection(accountId); -} - -Syncer.prototype.syncBox = function*(srv, accountId, boxName, boxKind, doFull) -{ - var [ boxStatus ] = yield srv.openBox(boxName, true, gen.ef()); - - // IMAP sync: http://tools.ietf.org/html/rfc4549 - var [ boxRow ] = yield this.pg.select('*').from('folders') - .where({ account_id: accountId, name: boxStatus.name }).rows(gen.ef()); - if (boxRow.length) - { - boxRow = boxRow[0]; - if (boxRow.uidvalidity != boxStatus.uidvalidity) + for (var i = 0; i < cfg.accounts.length; i++) { - yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxRow.id }, this.pg.sql('uid is not null'))); - boxRow.uidvalidity = boxStatus.uidvalidity; + await this.addAccount(cfg.accounts[i]); + } + await this.loadAccounts(); + } + + async syncAll() + { + this.syncInProgress = true; + for (let id in this.accounts) + { + await this.syncAccount(this.accounts[id]); + } + this.syncInProgress = false; + this.events.emit('sync', { state: 'complete' }); + } + + async addAccount(account) + { + let row = await SQL.select(this.pg, 'accounts', 'id', { email: account.email }, null, SQL.MS_ROW); + if (row) + { + await SQL.update(this.pg, 'accounts', { + settings: { imap: account.imap, folders: account.folders } + }, { id: row.id }); + } + else + { + row = (await SQL.insert('accounts', { + name: account.name, + email: account.email, + settings: { + imap: account.imap, + folders: account.folders + } + }, '*'))[0]; + } + return row.id; + } + + async loadAccounts() + { + let rows = await SQL.select(this.pg, 'accounts', '*', []); + this.accounts = {}; + for (let i = 0; i < rows.length; i++) + { + this.accounts[rows[i].id] = rows[i]; + this.imap.setServer(rows[i].id, rows[i].settings.imap); } } - else + + async getSyncConnection(accountId, boxName) { - [ boxRow ] = yield this.pg.insert('folders', { - name: boxStatus.name, + return await this.imap.getConnection(accountId, null, 'S', this.runIdle, this.stopIdle); + } + + idleUidvalidity(accountId, uidvalidity) + { + // FIXME uidvalidity changes (FUUUU) remove everything and resync + } + + idleMail(accountId, count) + { + // new messages arrived while idling, fetch them + (async () => + { + let srv = await this.getSyncConnection(accountId); + await this.syncBox(srv, accountId, 'INBOX'); + this.releaseSyncConnection(accountId); + })().catch(console.error); + } + + idleVanish(accountId, uids) + { + // messages expunged by uids + (async () => + { + let boxId = await SQL.select( + this.pg, 'folders', 'id', { name: 'INBOX', account_id: accountId }, null, SQL.MS_VALUE + ); + await this.deleteVanished(boxId, uids); + })().catch(console.error); + } + + idleExpunge(accountId, seqno) + { + // message expunged by (FUUUU) sequence number(s?) + (async () => + { + let srv = await this.getSyncConnection(accountId); + await this.syncBox(srv, accountId, 'INBOX'); + this.releaseSyncConnection(accountId); + })().catch(console.error); + } + + runIdle(accountId, srv) + { + if (!srv._idleCallbacks) + { + srv._idleCallbacks = { + uidvalidity: this.idleUidvalidity.bind(this, accountId), + mail: this.idleMail.bind(this, accountId), + vanish: this.idleVanish.bind(this, accountId), + expunge: this.idleExpunge.bind(this, accountId) + }; + } + for (let i in srv._idleCallbacks) + { + srv.on(i, srv._idleCallbacks[i]); + } + srv.openBox('INBOX', true, () => {}); + } + + stopIdle(accountId, srv) + { + for (let i in srv._idleCallbacks) + { + srv.removeListener(i, srv._idleCallbacks[i]); + } + } + + releaseSyncConnection(accountId, boxName) + { + this.imap.releaseConnection(accountId, 'S'); + } + + async syncAccount(account) + { + let accountId = await SQL.select(this.pg, 'accounts', 'id', { email: account.email }, null, SQL.MS_VALUE); + if (accountId) + { + let row = (await SQL.insert(this.pg, 'accounts', { + name: account.name, + email: account.email, + settings: { + imap: account.imap + } + }, 'id'))[0]; + accountId = row.id; + } + let srv = await this.getSyncConnection(accountId); + let boxes = await new Promise((r, e) => srv.getBoxes(r)); + for (let k in boxes) + { + let boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase(); + await this.syncBox(srv, accountId, k, boxKind, true); + } + this.releaseSyncConnection(accountId); + } + + async syncBox(srv, accountId, boxName, boxKind, doFull) + { + let boxStatus = await new Promise((r, e) => srv.openBox(boxName, true, r)); + + // IMAP sync: http://tools.ietf.org/html/rfc4549 + let boxRow = await SQL.select(this.pg, 'folders', '*', { account_id: accountId, name: boxStatus.name }, null, SQL.MS_ROW); + if (boxRow) + { + if (boxRow.uidvalidity != boxStatus.uidvalidity) + { + await this.deleteMessages({ folder_id: boxRow.id, 'uid is not null': [] }); + boxRow.uidvalidity = boxStatus.uidvalidity; + } + } + else + { + boxRow = (await SQL.insert(this.pg, 'folders', { + name: boxStatus.name, + uidvalidity: boxStatus.uidvalidity, + account_id: accountId, + highestmodseq: 0, + kind: boxKind||'' + }, '*'))[0]; + } + + // fetch new messages + let missing = []; + let maxUid = await SQL.select(this.pg, 'messages', 'MAX(uid)', { folder_id: boxRow.id }, null, SQL.MS_VALUE); + if (boxRow.highestmodseq) + { + this.events.emit('sync', { state: 'start', quick: true, email: this.accounts[accountId].email, folder: boxRow.name }); + process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': quick resync\n'); + await this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing); + boxRow.highestmodseq = boxStatus.highestmodseq; + } + else if (doFull && maxUid) + { + // list messages, update flags and version tag + this.events.emit('sync', { state: 'start', email: this.accounts[accountId].email, folder: boxRow.name }); + process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': full resync\n'); + await this.fullResync(srv, boxRow.id, maxUid, missing, boxStatus.messages.total); + } + + missing.push((maxUid ? maxUid+1 : 1)+':*'); + await this.imap.runFetch(srv, missing, { + size: true, + bodies: 'HEADER', + struct: true, + }, (messages, state) => this.saveMessages(messages, boxRow.id, state)); + + await SQL.update(this.pg, 'folders', { uidvalidity: boxStatus.uidvalidity, - account_id: accountId, - highestmodseq: 0, - kind: boxKind||'' - }).returning('id').row(gen.ef()); + highestmodseq: boxStatus.highestmodseq||0 + }, { id: boxRow.id }); } - // fetch new messages - var missing = []; - var [ maxUid ] = yield this.pg.select('MAX(uid)').from('messages') - .where({ folder_id: boxRow.id }).val(gen.ef()); - if (boxRow.highestmodseq) + async fullResync(srv, boxId, maxUid, missing, total) { - this.events.emit('sync', { state: 'start', quick: true, email: this.accounts[accountId].email, folder: boxRow.name }); - process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': quick resync\n'); - yield* this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing); - boxRow.highestmodseq = boxStatus.highestmodseq; - } - else if (doFull && maxUid) - { - // list messages, update flags and version tag - this.events.emit('sync', { state: 'start', email: this.accounts[accountId].email, folder: boxRow.name }); - process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': full resync\n'); - yield* this.fullResync(srv, boxRow.id, maxUid, missing, boxStatus.messages.total); - } + let flags = await SQL.select('messages', 'uid, flags', { folder_id: boxId }); + flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {}); - missing.push((maxUid ? maxUid+1 : 1)+':*'); - yield* this.imap.runFetch(srv, missing, { - size: true, - bodies: 'HEADER', - struct: true, - }, (messages, state) => this.saveMessages(messages, boxRow.id, state)); + let updateFlags = []; - yield this.pg.update('folders', { - uidvalidity: boxStatus.uidvalidity, - highestmodseq: boxStatus.highestmodseq||0 - }).where({ id: boxRow.id }).run(gen.ef()); -} + process.stderr.write('\rsynchronizing 0'); + await this.imap.runFetch( + srv, '1:'+maxUid, {}, + (messages, state) => this.queueFlags(messages, boxId, state), + { flags: flags, updateFlags: updateFlags, missing: missing||[], total: total, nopause: true } + ); + process.stderr.write('\n'); + this.events.emit('sync', { state: 'finish-box' }); -Syncer.prototype.fullResync = function*(srv, boxId, maxUid, missing, total) -{ - var [ flags ] = yield this.pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef()); - flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {}); + await this.updateFlags(boxId, updateFlags); - var updateFlags = []; - - process.stderr.write('\rsynchronizing 0'); - yield* this.imap.runFetch( - srv, '1:'+maxUid, {}, - (messages, state) => this.queueFlags(messages, boxId, state), - { flags: flags, updateFlags: updateFlags, missing: missing||[], total: total, nopause: true } - ); - process.stderr.write('\n'); - this.events.emit('sync', { state: 'finish-box' }); - - yield* this.updateFlags(boxId, updateFlags); - - // delete messages removed from IMAP server - flags = Object.keys(flags); - if (flags.length) - yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxId }, this.pg.sql.in('uid', flags))); -} - -Syncer.prototype.queueFlags = function*(messages, boxId, fetchState) -{ - for (var i = 0; i < messages.length; i++) - { - var m = messages[i][0]; - if (!fetchState.flags[m.uid]) - fetchState.missing.push(m.uid); - else + // delete messages removed from IMAP server + flags = Object.keys(flags); + if (flags.length) { - if (fetchState.flags[m.uid].join(',') != m.flags.join(',')) - fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); - delete fetchState.flags[m.uid]; + await this.deleteMessages({ folder_id: boxId, uid: flags }); } } - fetchState.synced += messages.length; - this.events.emit('sync', { state: 'progress', done: fetchState.synced, total: fetchState.total }); - process.stderr.write('\rsynchronizing '+fetchState.synced); -} -Syncer.prototype.updateFlags = function*(boxId, updateFlags, checkMissing) -{ - if (updateFlags.length) + queueFlags(messages, boxId, fetchState) { - var sql = this.pg.update('messages m', { flags: this.pg.sql('t.flags::varchar(255)[]') }) - .from('('+this.pg.sql.values(updateFlags)+') AS t (uid, flags)') - .where({ 'm.folder_id': boxId }).where(this.pg.sql('m.uid=t.uid')); - if (checkMissing) + for (let i = 0; i < messages.length; i++) { - var [ updated ] = yield sql.returning('m.uid').rows(gen.ef()); - var missing = {}; - for (var i = 0; i < updateFlags.length; i++) - missing[updateFlags[i].uid] = true; - for (var i = 0; i < updated.length; i++) - delete missing[updated[i].uid]; - return Object.keys(missing); + let m = messages[i][0]; + if (!fetchState.flags[m.uid]) + { + fetchState.missing.push(m.uid); + } + else + { + if (fetchState.flags[m.uid].join(',') != m.flags.join(',')) + { + fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); + } + delete fetchState.flags[m.uid]; + } } - else - yield sql.run(gen.ef()); + fetchState.synced += messages.length; + //this.events.emit('sync', { state: 'progress', done: fetchState.synced, total: fetchState.total }); + process.stderr.write('\rsynchronizing '+fetchState.synced); } - return []; -} -Syncer.prototype.quickResync = function*(srv, boxId, maxUid, changedSince, missing) -{ - var updateFlags = []; - var vanished = []; - var onVanish = function(dias) + async updateFlags(boxId, updateFlags, checkMissing) { - vanished = vanished.concat(vanished, dias); - }; - - srv.on('vanish', onVanish); - yield* this.imap.runFetch( - srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, - (messages, state) => this.queueQuickFlags(messages, boxId, state), { updateFlags: updateFlags } - ); - srv.removeListener('vanish', onVanish); - var checkedMissing = yield* this.updateFlags(boxId, updateFlags, missing && true); - if (missing) - missing.push.apply(missing, checkedMissing); - - if (vanished.length) - { - yield* this.deleteVanished(boxId, vanished); - } -} - -Syncer.prototype.deleteVanished = function*(boxId, vanished) -{ - let lst = [], dia = []; - for (let i = 0; i < vanished.length; i++) - { - if (vanished[i][1]) - dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]); - else - lst.push(vanished[i][0]); - } - if (lst.length) - dia.push('uid IN ('+lst.join(',')+')'); - yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxId }, this.pg.sql('('+dia.join(' OR ')+')'))); -} - -Syncer.prototype.queueQuickFlags = function*(messages, boxId, fetchState) -{ - for (var i = 0; i < messages.length; i++) - { - var m = messages[i][0]; - fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); - } -} - -Syncer.prototype.deleteMessages = function*(where) -{ - yield this.pg.update('threads', { first_msg: null }) - .where(this.pg.sql('first_msg IN ('+this.pg.select('id').from('messages').where(where)+')')) - .run(gen.ef()); - yield this.pg.delete('messages').where(where).run(gen.ef()); - yield this.pg.update('threads', - { first_msg: this.pg.sql('('+ - this.pg.select('id').from('messages').where({ thread_id: this.pg.sql('threads.id') }).orderBy('time').limit(1) - +')') }).where(this.pg.sql('first_msg IS NULL')).run(gen.ef()); - yield this.pg.delete('threads').where(this.pg.sql('first_msg IS NULL')).run(gen.ef()); -} - -Syncer.prototype.saveMessages = function*(messages, boxId) -{ - var self = this; - yield gen.throttle(2); - var uids = messages.map(m => m[1].uid); - var [ exist ] = yield this.pg.select('uid, flags').from('messages') - .where({ folder_id: boxId }).where(this.pg.sql.in('uid', uids)).rows(gen.ef()); - uids = {}; - for (var i = 0; i < exist.length; i++) - uids[exist[i].uid] = true; - for (var i = 0; i < messages.length; i++) - if (!uids[messages[i][1].uid]) - yield* this.addMessage(boxId, messages[i][0], messages[i][1]); -} - -Syncer.prototype.parseMsg = function*(msg) -{ - var parser = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' }); - parser.once('end', gen.cb()); - parser.write(msg); - var [ obj ] = yield parser.end(); - return obj; -} - -Syncer.prototype.extractAttachments = function(struct, attachments) -{ - attachments = attachments || []; - for (var i = 0; i < struct.length; i++) - { - if (struct[i] instanceof Array) - this.extractAttachments(struct[i], attachments); - else if (struct[i].disposition && struct[i].disposition.type == 'attachment') + if (updateFlags.length) { - attachments.push([ - mimelib.parseMimeWords(struct[i].disposition.params && struct[i].disposition.params.filename || struct[i].description || ''), - struct[i].type+'/'+struct[i].subtype, - struct[i].size - ]); + let updated = await SQL.update( + this.pg, { m: 'messages', t: SQL.values(updateFlags) }, + { 'flags = t.flags::varchar(255)[]' }, + { 'm.folder_id': boxId, 'm.uid=t.uid': [] }, + checkMissing ? { returning: 'm.uid' } : null + ); + if (checkMissing) + { + let missing = {}; + for (let i = 0; i < updateFlags.length; i++) + missing[updateFlags[i].uid] = true; + for (let i = 0; i < updated.length; i++) + delete missing[updated[i].uid]; + return Object.keys(missing); + } + } + return []; + } + + async quickResync(srv, boxId, maxUid, changedSince, missing) + { + let updateFlags = []; + let vanished = []; + let onVanish = function(dias) + { + vanished = vanished.concat(vanished, dias); + }; + + srv.on('vanish', onVanish); + await this.imap.runFetch( + srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, + (messages, state) => this.queueQuickFlags(messages, boxId, state), { updateFlags: updateFlags } + ); + srv.removeListener('vanish', onVanish); + let checkedMissing = await this.updateFlags(boxId, updateFlags, missing && true); + if (missing) + { + missing.push.apply(missing, checkedMissing); + } + + if (vanished.length) + { + await this.deleteVanished(boxId, vanished); } } - return attachments; -} -Syncer.prototype.addMessage = function*(boxId, msgrow, attrs) -{ - var self = this; - var pgtx, end_transaction; - try + async deleteVanished(boxId, vanished) { - [ pgtx, end_transaction ] = yield this.pg.transaction(gen.cb(), function(e) { if (e) throw e; }); + let lst = [], dia = []; + for (let i = 0; i < vanished.length; i++) + { + if (vanished[i][1]) + dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]); + else + lst.push(vanished[i][0]); + } + if (lst.length) + dia.push('uid IN ('+lst.join(',')+')'); + await this.deleteMessages({ folder_id: boxId, '('+dia.join(' OR ')+')': [] }); + } - let header = yield* this.parseMsg(msgrow.headers); + // FIXME: async + queueQuickFlags(messages, boxId, fetchState) + { + for (let i = 0; i < messages.length; i++) + { + let m = messages[i][0]; + fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); + } + } + + async deleteMessages(where) + { + await SQL.update( + this.pg, 'threads', { first_msg: null }, + { 'first_msg IN ('+SQL.select_builder('messages', 'id', where)+')': [] }, + ); + await SQL.delete(this.pg, 'messages', where); + await SQL.update( + this.pg, 'threads', + { ['first_msg=('+SQL.select_builder( + 'messages', 'id', { 'thread_id=threads.id': [] }, { order_by: 'time', limit: 1 } + )+')']: [] }, + { first_msg: null } + ); + await SQL.delete(this.pg, 'threads', { first_msg: null }); + } + + async saveMessages(messages, boxId) + { + let uids = messages.map(m => m[1].uid); + let exist = await SQL.select(this.pg, 'messages', 'uid, flags', { folder_id: boxId, uid: uids }); + uids = {}; + for (let i = 0; i < exist.length; i++) + { + uids[exist[i].uid] = true; + } + for (let i = 0; i < messages.length; i++) + { + if (!uids[messages[i][1].uid]) + { + await this.addMessage(boxId, messages[i][0], messages[i][1]); + } + } + } + + async parseMsg(msg) + { + let parser = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' }); + return await new Promise((r, j) => + { + parse.once('end', r); + parser.write(msg); + }); + } + + extractAttachments(struct, attachments) + { + attachments = attachments || []; + for (let i = 0; i < struct.length; i++) + { + if (struct[i] instanceof Array) + this.extractAttachments(struct[i], attachments); + else if (struct[i].disposition && struct[i].disposition.type == 'attachment') + { + attachments.push([ + mimelib.parseMimeWords(struct[i].disposition.params && struct[i].disposition.params.filename || struct[i].description || ''), + struct[i].type+'/'+struct[i].subtype, + struct[i].size + ]); + } + } + return attachments; + } + + async addMessage(boxId, msgrow, attrs) + { + await this.pg.query('BEGIN'); + try + { + await this.addMessageImpl(boxId, msgrow, attrs); + await this.pg.query('COMMIT'); + } + catch (e) + { + await this.pg.query('ROLLBACK'); + } + } + + async addMessageImpl(boxId, msgrow, attrs) + { + let header = await this.parseMsg(msgrow.headers); header.references = header.references || []; if (header.references.length) { @@ -458,45 +476,45 @@ Syncer.prototype.addMessage = function*(boxId, msgrow, attrs) if (typeof msgrow[i] == 'string') msgrow[i] = msgrow[i].replace(/\x00/g, ''); - var thisIsFirst = false; + let thisIsFirst = false; if (header.references.length) { - let [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') - .where(this.pg.sql.in('messageid', header.references)).val(gen.ef()); + let threadId = await SQL.select( + this.pg, 'messages', 'MAX(thread_id)', + { messageid: header.references }, null, SQL.MS_VALUE + ); if (!threadId) { - [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') - .where(new this.pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef()); + threadId = await SQL.select( + this.pg, 'messages', 'MAX(thread_id)', + { 'refs @> array[?]': msgrow.messageid }, null, SQL.MS_VALUE + ); if (threadId) + { thisIsFirst = true; + } } msgrow.thread_id = threadId; } console.log(msgrow.time+' '+(header.from && header.from[0] && header.from[0].address || '?')+' '+msgrow.subject); - [ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef()); + msgrow.id = (await SQL.insert(this.pg, 'messages', msgrow, { returning: 'id' }))[0].id; if (!msgrow.thread_id) { - [ msgrow.thread_id ] = yield pgtx.insert('threads', { + msgrow.thread_id = (await SQL.insert(this.pg, 'threads', { first_msg: msgrow.id, msg_count: 1 - }).returning('id').val(gen.ef()); - yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef()); + }, { returning: 'id' }))[0].id; + await SQL.update(this.pg, 'messages', { thread_id: msgrow.thread_id }, { id: msgrow.id }); } else { - let upd = pgtx.update('threads', { msg_count: this.pg.sql('msg_count+1') }); + let upd = { 'msg_count=msg_count+1': [] }; if (thisIsFirst) + { upd.first_msg = msgrow.id; - yield upd.where({ id: msgrow.threadId }).run(gen.ef()); + } + await SQL.update(this.pg, 'threads', upd, { id: msgrow.threadId }); } - - end_transaction(); - } - catch (e0) - { - if (end_transaction) - end_transaction(); - throw e0; } } diff --git a/SyncerWeb.js b/SyncerWeb.js index b071813..b9eb058 100644 --- a/SyncerWeb.js +++ b/SyncerWeb.js @@ -1,4 +1,3 @@ -const gen = require('gen-thread'); const MailParser = require('mailparser').MailParser; const htmlawed = require('htmlawed'); @@ -11,249 +10,296 @@ const multer = require('multer'); const css = require('css'); +const SQL = require('./select-builder-pgsql.js'); + const MAX_FETCH = 100; module.exports = SyncerWeb; -function SyncerWeb(syncer, pg, cfg) +class SyncerWeb { - this.syncer = syncer; - this.pg = pg; - this.cfg = cfg; - this.app = express(); - this.http = http.Server(this.app); - this.io = socket_io(this.http); - this.app.use(bodyparser.urlencoded({ extended: false })); - this.app.use(express_session({ - secret: this.cfg.sessionSecret || '1083581xm1l3s1l39k', - resave: false, - saveUninitialized: false - })); - this.app.get('/auth', this.get_auth); - this.app.post('/auth', this.post_auth); - this.app.get('/folders', genRequest(this.get_folders.bind(this))); - this.app.get('/groups', genRequest(this.get_groups.bind(this))); - this.app.get('/messages', genRequest(this.get_messages.bind(this))); - this.app.get('/message', genRequest(this.get_message.bind(this))); - this.app.post('/sync', genRequest(this.post_sync.bind(this))); - this.syncer.events.on('sync', this.syncer_sync.bind(this)); -} + constructor(syncer, pg, cfg) + { + this.syncer = syncer; + this.pg = pg; + this.cfg = cfg; + this.app = express(); + this.http = http.Server(this.app); + this.io = socket_io(this.http); + this.app.use(bodyparser.urlencoded({ extended: false })); + this.app.use(express_session({ + secret: this.cfg.sessionSecret || '1083581xm1l3s1l39k', + resave: false, + saveUninitialized: false + })); + this.app.get('/auth', this.get_auth); + this.app.post('/auth', this.post_auth); + this.app.get('/folders', wrapAsync(this.get_folders)); + this.app.get('/groups', wrapAsync(this.get_groups)); + this.app.get('/messages', wrapAsync(this.get_messages)); + this.app.get('/message', wrapAsync(this.get_message)); + this.app.post('/sync', wrapAsync(this.post_sync)); + this.syncer.events.on('sync', this.syncer_sync); + } -SyncerWeb.prototype.listen = function(port) -{ - this.http.listen(port); -} + listen(port) + { + this.http.listen(port); + } -SyncerWeb.prototype.get_auth = function(req, res) -{ - return res.type('html').send( - '
'+ - '
' - ); -} - -SyncerWeb.prototype.post_auth = function(req, res) -{ - if (!req.body) - return res.sendStatus(400); - if (req.body.login == this.cfg.login && req.body.password == this.cfg.password) + get_auth = (req, res) => { - req.session.auth = true; - return res.send({ ok: true }); - } - return res.send({ ok: false }); -} - -SyncerWeb.prototype.get_folders = function*(req, res) -{ - if (this.cfg.login && (!req.session || !req.session.auth)) - { - return res.sendStatus(401); - } - var [ accounts ] = yield this.pg.select( - 'id, name, email, settings->\'folders\' folderMap,'+ - ' (select count(*) from messages m, folders f where m.folder_id=f.id and f.account_id=a.id and (flags @> array[\'pinned\',\'unread\']::varchar(255)[])) pinned_unread_count' - ).from('accounts a').rows(gen.ef()); - var [ folders ] = yield this.pg.select( - 'id, account_id, name,'+ - ' (select count(*) from messages m where m.folder_id=f.id) total_count,'+ - ' (select count(*) from messages m where m.folder_id=f.id and (flags @> array[\'unread\']::varchar(255)[])) unread_count' - ).from('folders f').orderBy('account_id, name').rows(gen.ef()); - var fh = {}; - for (let i = 0; i < folders.length; i++) - { - fh[folders[i].account_id] = fh[folders[i].account_id] || []; - fh[folders[i].account_id].push(folders[i]); - } - for (let i = 0; i < accounts.length; i++) - { - accounts[i].folders = fh[accounts[i].id] || []; - } - return res.send({ accounts: accounts }); -} - -function ymd(dt) -{ - var m = dt.getMonth()+1; - var d = dt.getDate(); - return dt.getFullYear()+'-'+(m < 10 ? '0'+m : m)+'-'+(d < 10 ? '0'+d : d); -} - -SyncerWeb.prototype.msgSearchCond = function(query) -{ - var p = []; - if (query.folderId) - p.push(this.pg.sql.eq('m.folder_id', query.folderId)); - else if (query.folderType == 'unread') - p.push(this.pg.sql('(flags @> array[\'unread\']::varchar(255)[])')); - else if (query.folderType == 'pinned') - p.push(this.pg.sql('(flags @> array[\'flagged\']::varchar(255)[])')); - else if (query.folderType == 'inbox') - { - var folders = []; - for (var id in this.syncer.accounts) - { - n = this.syncer.accounts[id].settings.folders.spam; - if (n) - folders.push(this.pg.sql.and({ 'f.name': n, 'f.account_id': id })); - } - p.push(this.pg.sql.not(this.pg.sql.or.apply(this.pg.sql, folders))); - p.push(this.pg.sql('(flags @> array[\'in\']::varchar(255)[])')); - } - else if (query.folderType == 'out') - p.push(this.pg.sql('(flags @> array[\'out\']::varchar(255)[])')); - else if (query.folderType == 'outbox') - { - // TODO это какая-то хитрая метапапка, которая не живёт на IMAP'е? - } - else if (query.folderType == 'drafts' || query.folderType == 'spam' || query.folderType == 'trash') - { - var folders = []; - var n; - for (var id in this.syncer.accounts) - { - n = this.syncer.accounts[id].settings.folders[query.folderType]; - if (n) - folders.push(this.pg.sql.and({ 'f.name': n, 'f.account_id': id })); - } - p.push(this.pg.sql.or.apply(this.pg.sql, folders)); - } - if (typeof query.search == 'string' && query.search.trim()) - p.push(this.pg.sql('messages_fulltext(m) @@ plainto_tsquery($1)', query.search.trim())); - if (query.accountId) - p.push(this.pg.sql.and(p, this.pg.sql.eq('f.account_id', query.accountId))); - return p.length ? this.pg.sql.and.apply(this.pg.sql, p) : null; -} - -SyncerWeb.prototype.get_groups = function*(req, res) -{ - if (this.cfg.login && (!req.session || !req.session.auth)) - return res.sendStatus(401); - var cond = this.msgSearchCond(req.query); - if (!cond) - return res.status(500).send('Need message query parameters'); - var intervals = []; - var today, today_ts; - today = new Date(ymd(new Date())); - today_ts = today.getTime(); - var week_start = today_ts - ((today.getDay()+6)%7)*86400000; - var prev_week = ymd(new Date(week_start - 86400000*7)); - for (var i = 1; i <= 12; i++) - { - var d = today.getFullYear()+'-'+(i < 10 ? '0' : '')+i+'-01'; - if (d >= prev_week) - break; - intervals.push({ date: d, name: 'm'+i }); - } - intervals.push({ date: prev_week, name: 'pw' }); - for (var i = week_start, d = 1; i < today_ts; i += 86400000, d++) - { - intervals.push({ date: ymd(new Date(i)), name: 'd'+d }); - } - for (var i = today.getFullYear()-1; i >= 1970; i--) - { - intervals.unshift({ date: i+'-01-01', name: ''+i }); - } - intervals.push({ date: ymd(today), name: 't' }); - for (var i = 0; i < intervals.length-1; i++) - { - intervals[i].date_end = intervals[i+1].date; - } - intervals[intervals.length-1].date_end = '100000-12-31'; // it's faster than (is null or <) - var [ groups ] = yield this.pg - .select('d.name, d.date, ('+ - this.pg.select('count(*)') - .from('messages m') - .innerJoin('folders f', this.pg.sql('f.id=m.folder_id')) - .where(cond) - .where(this.pg.sql('m.time >= d.date::date and m.time < d.date_end::date')) - +') count') - .from(this.pg.sql.values(intervals).as('d').columns()) - .orderBy('date desc').rows(gen.ef()); - groups = groups.filter(g => g.count > 0); - return res.send({ groups: groups }); -} - -SyncerWeb.prototype.get_messages = function*(req, res) -{ - if (this.cfg.login && (!req.session || !req.session.auth)) - return res.sendStatus(401); - var cond = this.msgSearchCond(req.query); - if (!cond) - return res.status(500).send('Need message query parameters'); - var limit = req.query.limit || 50; - if (limit > MAX_FETCH) - limit = MAX_FETCH; - var offset = req.query.offset || 0; - var [ msgs ] = yield this.pg.select('m.*').from('messages m') - .innerJoin('folders f', this.pg.sql('f.id=m.folder_id')) - .where(cond).orderBy('time desc').limit(limit).offset(offset) - .rows(gen.ef()); - for (var i = 0; i < msgs.length; i++) - { - delete msgs[i].text_index; - } - return res.send({ messages: msgs }); -} - -SyncerWeb.prototype.get_message = function*(req, res) -{ - if (this.cfg.login && (!req.session || !req.session.auth)) - return res.sendStatus(401); - var msgId = req.query.msgId; - console.log('fetch message '+msgId); - var [ msg ] = yield this.pg.select('m.*, f.name folder_name, f.account_id') - .from('messages m').join('folders f', this.pg.sql('f.id=m.folder_id')) - .where({ 'm.id': msgId }).rows(gen.ef()); - if (!msg.length) - return res.send({ error: 'not-found' }); - msg = msg[0]; - if (!msg.body_html && !msg.body_text) - { - var srv = yield* this.syncer.imap.getConnection(msg.account_id, msg.folder_name); - var [ upd ] = yield* this.syncer.imap.runFetch( - srv, msg.uid, { bodies: '' }, - (messages, state) => this.getBody(messages, msg.folder_id) + return res.type('html').send( + '
'+ + '
' ); - this.syncer.imap.releaseConnection(msg.account_id); - return res.send({ msg: { ...msg, ...upd } }); } - return res.send({ msg: msg }); -} -SyncerWeb.prototype.syncer_sync = function(params) -{ - this.io.emit('sync', params); -} + post_auth = (req, res) => + { + if (!req.body) + return res.sendStatus(400); + if (req.body.login == this.cfg.login && req.body.password == this.cfg.password) + { + req.session.auth = true; + return res.send({ ok: true }); + } + return res.send({ ok: false }); + } -SyncerWeb.prototype.post_sync = function*(req, res) -{ - if (this.cfg.login && (!req.session || !req.session.auth)) - return res.sendStatus(401); - if (this.syncer.syncInProgress) - return res.send({ error: 'already-running' }); - gen.run(this.syncer.syncAll()); - return res.send({ status: 'started' }); + get_folders = async (req, res) => + { + if (this.cfg.login && (!req.session || !req.session.auth)) + { + return res.sendStatus(401); + } + const accounts = (await this.pg.query( + 'select id, name, email, settings->\'folders\' folderMap,'+ + ' (select count(*) from messages m, folders f'+ + ' where m.folder_id=f.id and f.account_id=a.id'+ + ' and (flags @> array[\'pinned\',\'unread\']::varchar(255)[])) pinned_unread_count'+ + ' from accounts a' + )).rows; + const folders = (await this.pg.query( + 'select id, account_id, name,'+ + ' (select count(*) from messages m where m.folder_id=f.id) total_count,'+ + ' (select count(*) from messages m where m.folder_id=f.id and (flags @> array[\'unread\']::varchar(255)[])) unread_count'+ + ' from folders f order by account_id, name' + )).rows; + let fh = {}; + for (let i = 0; i < folders.length; i++) + { + fh[folders[i].account_id] = fh[folders[i].account_id] || []; + fh[folders[i].account_id].push(folders[i]); + } + for (let i = 0; i < accounts.length; i++) + { + accounts[i].folders = fh[accounts[i].id] || []; + } + return res.send({ accounts: accounts }); + } + + msgSearchCond(query) + { + let p = {}; + if (query.folderId) + { + p['m.folder_id'] = query.folderId; + } + else if (query.folderType == 'unread') + { + p['(flags @> array[\'unread\']::varchar(255)[])'] = []; + } + else if (query.folderType == 'pinned') + { + p['(flags @> array[\'flagged\']::varchar(255)[])'] = []; + } + else if (query.folderType == 'inbox') + { + let folders = Object.keys(this.syncer.accounts) + .map(id => [ id, this.syncer.accounts[id].settings.folders.spam ]) + .filter(f => f[1]); + p['(f.account_id, f.name) NOT IN ('+folders.map(f => '(?, ?)').join(', ')+')'] = + [].concat.apply([], folders); + p['(flags @> array[\'in\']::varchar(255)[])'] = []; + } + else if (query.folderType == 'out') + { + p['(flags @> array[\'out\']::varchar(255)[])'] = []; + } + else if (query.folderType == 'outbox') + { + // TODO это какая-то хитрая метапапка, которая не живёт на IMAP'е? + } + else if (query.folderType == 'drafts' || query.folderType == 'spam' || query.folderType == 'trash') + { + let folders = Object.keys(this.syncer.accounts) + .map(id => [ id, this.syncer.accounts[id].settings.folders[query.folderType] ]) + .filter(f => f[1]); + p['(f.account_id, f.name) IN ('+folders.map(f => '(?, ?)').join(', ')+')'] = + [].concat.apply([], folders); + } + if (typeof query.search == 'string' && query.search.trim()) + { + p['messages_fulltext(m) @@ plainto_tsquery($1)'] = [ query.search.trim() ]; + } + if (query.accountId) + { + p['f.account_id'] = query.accountId; + } + return Object.keys(p).length ? p : null; + } + + get_groups = async (req, res) => + { + if (this.cfg.login && (!req.session || !req.session.auth)) + { + return res.sendStatus(401); + } + const cond = this.msgSearchCond(req.query); + if (!cond) + { + return res.status(500).send('Need message query parameters'); + } + let intervals = []; + let today, today_ts; + today = new Date(ymd(new Date())); + today_ts = today.getTime(); + let week_start = today_ts - ((today.getDay()+6)%7)*86400000; + let prev_week = ymd(new Date(week_start - 86400000*7)); + for (let i = 1; i <= 12; i++) + { + let d = today.getFullYear()+'-'+(i < 10 ? '0' : '')+i+'-01'; + if (d >= prev_week) + break; + intervals.push({ date: d, name: 'm'+i }); + } + intervals.push({ date: prev_week, name: 'pw' }); + for (let i = week_start, d = 1; i < today_ts; i += 86400000, d++) + { + intervals.push({ date: ymd(new Date(i)), name: 'd'+d }); + } + for (let i = today.getFullYear()-1; i >= 1970; i--) + { + intervals.unshift({ date: i+'-01-01', name: ''+i }); + } + intervals.push({ date: ymd(today), name: 't' }); + for (let i = 0; i < intervals.length-1; i++) + { + intervals[i].date_end = intervals[i+1].date; + } + intervals[intervals.length-1].date_end = '100000-12-31'; // it's faster than (is null or <) + let groups = await SQL.select( + this.pg, + { d: SQL.values(intervals) }, + 'd.name, d.date, ('+SQL.select_builder( + { m: 'messages', f: [ 'INNER', 'folders', [ 'f.id=m.folder_id' ] ] }, + 'count(*) count', + { ...cond, 'm.time >= d.date::date and m.time < d.date_end::date': [] }, + )+') count', + [], + { order_by: 'date desc' } + ); + groups = groups.filter(g => g.count > 0); + return res.send({ groups: groups }); + } + + get_messages = async (req, res) => + { + if (this.cfg.login && (!req.session || !req.session.auth)) + { + return res.sendStatus(401); + } + let cond = this.msgSearchCond(req.query); + if (!cond) + { + return res.status(500).send('Need message query parameters'); + } + let limit = req.query.limit || 50; + if (limit > MAX_FETCH) + limit = MAX_FETCH; + let offset = req.query.offset || 0; + let msgs = await SQL.select( + this.pg, + { m: 'messages', f: [ 'INNER', 'folders', [ 'f.id=m.folder_id' ] ] }, + 'm.*', cond, + { order_by: 'time desc', limit, offset } + ); + for (let i = 0; i < msgs.length; i++) + { + delete msgs[i].text_index; + } + return res.send({ messages: msgs }); + } + + get_message = async (req, res) => + { + if (this.cfg.login && (!req.session || !req.session.auth)) + { + return res.sendStatus(401); + } + let msgId = req.query.msgId; + console.log('fetch message '+msgId); + let msg = await SQL.select( + this.pg, + { m: 'messages', f: [ 'INNER', 'folders', [ 'f.id=m.folder_id' ] ] }, + 'm.*, f.name folder_name, f.account_id', + { 'm.id': msgId }, null, SQL.MS_ROW + ); + delete msg.text_index; + if (!msg) + { + return res.send({ error: 'not-found' }); + } + if (!msg.body_html && !msg.body_text) + { + let srv = await this.syncer.imap.getConnection(msg.account_id, msg.folder_name); + let upd = await this.syncer.imap.runFetch( + srv, msg.uid, { bodies: '' }, + (messages, state) => this.getBody(messages, msg.folder_id) + ); + this.syncer.imap.releaseConnection(msg.account_id); + return res.send({ msg: { ...msg, ...upd } }); + } + return res.send({ msg: msg }); + } + + syncer_sync = (params) => + { + this.io.emit('sync', params); + } + + post_sync = async (req, res) => + { + if (this.cfg.login && (!req.session || !req.session.auth)) + { + return res.sendStatus(401); + } + if (this.syncer.syncInProgress) + { + return res.send({ error: 'already-running' }); + } + this.syncer.syncAll().catch(console.error); + return res.send({ status: 'started' }); + } + + getBody = async (messages, boxId) => + { + for (let i = 0; i < messages.length; i++) + { + let msg = messages[i]; + let obj = await this.syncer.parseMsg(msg[0].headers); + obj.html = sanitizeHtml(obj.html); + let upd = { body_text: obj.text||'', body_html: obj.html }; + upd.body_html_text = obj.html.replace(/]*>.*<\/style\s*>|<\/?[^>]*>/g, ''); + await SQL.update(this.pg, 'messages m', upd, { folder_id: boxId, uid: msg[0].uid }); + if (messages.length == 1) + { + return [ upd ]; + } + } + return null; + } } function rewriteCss(ast) @@ -308,29 +354,21 @@ function sanitizeHtml(html) html = htmlawed.sanitize(html||'', { safe: 1, elements: '* +style', keep_bad: 0, comment: 1 }); html = html.replace(/]*>([\s\S]*)<\/style\s*>/ig, function(m, m1) { - var ast = css.parse(m1, { silent: true }); + let ast = css.parse(m1, { silent: true }); rewriteCss(ast); return ''; }); return html; } -SyncerWeb.prototype.getBody = function*(messages, boxId) +function wrapAsync(fn) { - for (var i = 0; i < messages.length; i++) - { - let msg = messages[i]; - let obj = yield* this.syncer.parseMsg(msg[0].headers); - obj.html = sanitizeHtml(obj.html); - let upd = { body_text: obj.text||'', body_html: obj.html }; - upd.body_html_text = obj.html.replace(/]*>.*<\/style\s*>|<\/?[^>]*>/g, ''); - yield this.pg.update('messages m', upd).where({ folder_id: boxId, uid: msg[0].uid }).run(gen.ef()); - if (messages.length == 1) - return [ upd ]; - } + return (req, res) => fn(req, res).catch(e => res.status(500).send('Internal Error: '+e.stack)); } -function genRequest(fn) +function ymd(dt) { - return (req, res) => gen.run(fn(req, res), null, e => res.status(500).send('Internal Error: '+e.stack)); + let m = dt.getMonth()+1; + let d = dt.getDate(); + return dt.getFullYear()+'-'+(m < 10 ? '0'+m : m)+'-'+(d < 10 ? '0'+d : d); } diff --git a/operetta.js b/operetta.js index 73e27ba..2a4d884 100644 --- a/operetta.js +++ b/operetta.js @@ -1,6 +1,5 @@ /** * TODO: - * - перейти на redux * - исправить параллелизм запросов и sync'а * - фоновая индексация всех текстов сообщений в ящике * - скачивание вложений @@ -37,29 +36,31 @@ * В определённом плане получается тупость - получается, что дублируешь * функционал самого почтового сервера. Но шо ж с ним поделаешь, если он "ни ф силах"... * Ведь по сути-то, MTA от такой штуки нужен только 1 метод: "добавить сообщение в папку". + * + * Блин, IMAP - кривой протокол + * - sequence number-ы это какая-то жопа + * - обновления идут по sequence number-ам + * - обновления идут только по активному mailbox-у + * - а ещё есть какие-то сраные неймспейсы */ -require('heapdump'); +process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; -require('babel-register'); -const gen = require('gen-thread'); -const bricks = require('pg-bricks'); +const pg = require('pg'); const Syncer = require('./Syncer.js'); const SyncerWeb = require('./SyncerWeb.js'); -process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; -var cfg = require('./cfg.json'); +let syncer = new Syncer(pg); +let syncerweb = new SyncerWeb(syncer, pg, cfg); -var pg = bricks.configure(cfg.pg); -pg._pg.types.setTypeParser(1082, 'text', val => val); // НЕ ПАРСИТЬ ДАТЫ ! ! ! - -var syncer = new Syncer(pg); -var syncerweb = new SyncerWeb(syncer, pg, cfg); - -gen.run(function*() +async function startSync(cfg) { - yield* syncer.init(cfg); - yield* syncer.syncAll(); -}); + let connection = new pg.Client(cfg.pg); + await connection.connect(); + await syncer.init(cfg, connection); + await syncer.syncAll(); +} +let cfg = require('./cfg.json'); +startSync(cfg).catch(console.error); syncerweb.listen(8057); diff --git a/package.json b/package.json index 3dea864..094fdfa 100644 --- a/package.json +++ b/package.json @@ -7,11 +7,11 @@ "name": "operetta-backend", "description": "Operetta webmail backend", "dependencies": { - "gen-thread": "latest", - "htmlawed": "latest", "body-parser": "latest", + "css": "latest", "express": "latest", "express-session": "latest", + "htmlawed": "latest", "iconv-lite": "latest", "imap": "latest", "mailparser": "git+https://github.com/vitalif/mailparser#master", @@ -19,13 +19,9 @@ "multer": "latest", "nodemailer": "latest", "pg": "latest", - "pg-bricks": "latest", - "sql-bricks": "latest", "socket.io": "latest", - "css": "latest" }, "peerDependencies": { - "sql-bricks": ">=1.4.0" }, "devDependencies": { "babel-cli": "latest", diff --git a/run.sh b/run.sh index 88f216a..91c4ee0 100755 --- a/run.sh +++ b/run.sh @@ -1,2 +1,6 @@ -node_modules/.bin/babel operetta.js > operetta.c.js -nodejs --max_old_space_size=100 operetta.c.js +#!/bin/sh + +for i in ImapManager.js operetta.js Syncer.js SyncerWeb.js; do + node_modules/.bin/babel $i > compiled/$i +done +nodejs --max_old_space_size=100 compiled/operetta.js diff --git a/select-builder-pgsql.js b/select-builder-pgsql.js new file mode 100644 index 0000000..042c819 --- /dev/null +++ b/select-builder-pgsql.js @@ -0,0 +1,279 @@ +// Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8 +// (c) Виталий Филиппов, 2019 +// Версия 2019-05-08 + +// В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи, +// благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ] + +const pg = require('pg'); + +// Сраный node-postgres конвертирует даты в Date и портит таймзону +const DATATYPE_DATE = 1082; +pg.types.setTypeParser(DATATYPE_DATE, function(val) +{ + return val === null ? null : val; +}); + +let pg_escape; + +const MS_HASH = 0; +const MS_LIST = 1; +const MS_ROW = 2; +const MS_COL = 4; +const MS_VALUE = 6; + +function select_builder(tables, fields, where, options) +{ + let sql = 'SELECT ', bind = []; + if (fields instanceof Array) + { + sql += fields.join(', '); + } + else if (typeof fields == 'string') + { + sql += fields; + } + else if (typeof fields == 'object') + { + sql += Object.keys(fields).map(k => fields[k]+' AS '+k).join(', '); + } + else + { + throw new Error('fields = '+fields+' is invalid'); + } + sql += ' FROM '; + let first = true; + let moreWhere = null; + tables = typeof tables == 'string' ? { t: tables } : tables; + for (const k in tables) + { + if (first) + { + if (typeof tables[k] != 'string') + { + // Бывает удобно указывать WHERE как условие "JOIN" первой таблицы + sql += tables[k][1] + ' ' + k; + moreWhere = tables[k][2]; + } + else + { + sql += tables[k] + ' ' + k; + } + first = false; + } + else if (typeof tables[k] == 'string') + { + sql += ' INNER JOIN '+tables[k]+' '+k+' ON 1=1'; + } + else + { + sql += ' ' + tables[k][0].toUpperCase() + ' JOIN '; + let t = tables[k][1]; + if (t instanceof Pg_Values) + { + sql += '(VALUES '; + let i = 0; + for (const row of t.rows) + { + sql += (i > 0 ? ', (' : '(') + t.keys.map(() => '$'+(++i)).join(', ')+')'; + bind.push.apply(bind, t.keys.map(k => row[k])); + } + sql += ') AS '+k+'('+t.keys.join(', ')+')'; + } + else + { + sql += t + ' ' + k; + } + const on = whereBuilder(tables[k][2]); + sql += ' ON ' + (on[0] || '1=1'); + bind.push.apply(bind, on[1]); + } + } + const w = whereBuilder(where); + sql += ' WHERE '+(w[0] || '1=1'); + bind.push.apply(bind, w[1]); + if (moreWhere) + { + moreWhere = whereBuilder(moreWhere); + if (moreWhere[0]) + { + sql += ' AND '+moreWhere[0]; + bind.push.apply(bind, moreWhere[1]); + } + } + options = options||{}; + if (options['GROUP BY'] || options.group_by) + { + let group = options['GROUP BY'] || options.group_by; + group = group instanceof Array ? group : [ group ]; + sql += ' GROUP BY '+group.join(', '); + } + if (options['ORDER BY'] || options.order_by) + { + let order = options['ORDER BY'] || options.order_by; + order = order instanceof Array ? order : [ order ]; + sql += ' ORDER BY '+order.join(', '); + } + if (options.LIMIT || options.limit) + { + sql += ' LIMIT '+((options.LIMIT || options.limit) | 0); + } + if (options.OFFSET || options.offset) + { + sql += ' LIMIT '+((options.OFFSET || options.offset) | 0); + } + return [ sql, bind ]; +} + +function whereOrSetBuilder(fields, where) +{ + if (typeof fields == 'string') + return [ fields, [] ]; + const w = [], bind = []; + for (const k in fields) + { + let v = fields[k]; + if (k.indexOf('?') >= 0) + { + if (!(v instanceof Array)) + v = [ v ]; + w.push(k); + bind.push.apply(bind, v); + } + else if (/^\d+$/.exec(k)) + { + if (v instanceof Array) + { + w.push(v[0]); + bind.push.apply(bind, v.slice(1)); + } + else + { + w.push(v); + } + } + else if (v != null || v instanceof Array && v.length) + { + v = v instanceof Array ? v : [ v ]; + w.push(v.length == 1 ? k + ' = ?' : k + ' in (' + v.map(() => '?').join(', ') + ')'); + bind.push.apply(bind, v); + } + } + if (!where) + return [ w.join(', '), bind ]; + return [ w.length ? '('+w.join(') and (')+')' : '', bind ]; +} + +function whereBuilder(where) +{ + return whereOrSetBuilder(where, true); +} + +function _positional(sql) +{ + let i = 0; + sql = sql.replace(/\?/g, () => '$'+(++i)); + return sql; +} + +function _inline(sql, bind) +{ + if (!pg_escape) + { + pg_escape = require('pg-escape'); + } + let i = 0; + sql = sql.replace(/\?/g, () => '\''+pg_escape.string(bind[i++])+'\''); + return sql; +} + +// dbh = node-postgres.Client +async function select(dbh, tables, fields, where, options, format) +{ + let [ sql, bind ] = select_builder(tables, fields, where, options); + //console.log(_inline(sql, bind)); + let data = await dbh.query(_positional(sql), bind); + if ((format & MS_LIST) || (format & MS_COL)) + data = data.rows.map(r => Object.values(r)); + else + data = data.rows; + if (format & MS_ROW) + data = data[0]; + if (data && (format & MS_COL)) + data = data[0]; + return data; +} + +async function insert(dbh, table, rows, options) +{ + if (!(rows instanceof Array)) + { + rows = [ rows ]; + } + if (!rows.length) + { + return null; + } + const keys = Object.keys(rows[0]); + let sql = 'insert into '+table+' ('+keys.join(', ')+') values '; + const bind = []; + let i = 0; + for (const row of rows) + { + sql += (i > 0 ? ', (' : '(') + keys.map(() => '$'+(++i)).join(', ')+')'; + bind.push.apply(bind, keys.map(k => row[k])); + } + if (options.returning) + { + sql += ' returning '+options.returning; + return (await dbh.query(sql, bind)).rows; + } + else + { + return await dbh.query(sql, bind); + } +} + +async function _delete(dbh, table, where) +{ + const w = whereBuilder(where); + const sql = 'DELETE FROM '+table+' WHERE '+(w[0] || '1=1'); + return await dbh.execute(_positional(sql), w[1]); +} + +async function update(dbh, table, set, where) +{ + set = whereOrSetBuilder(set, false); + where = whereOrSetBuilder(where, true) + const sql = 'UPDATE '+table+' SET '+set[0]+' WHERE '+(where[0] || '1=1'); + const bind = [ ...set[1], ...where[1] ]; + return await dbh.execute(_positional(sql), bind); +} + +function values(rows) +{ + return new Pg_Values(Object.keys(rows[0]), rows); +} + +class Pg_Values +{ + constructor(keys, rows) + { + this.keys = keys; + this.rows = rows; + } +} + +module.exports = { + select_builder, + select, + insert, + delete: _delete, + update, + values, + MS_HASH, + MS_LIST, + MS_ROW, + MS_COL, + MS_VALUE, +};