diff --git a/ImapManager.js b/ImapManager.js new file mode 100644 index 0000000..b565e36 --- /dev/null +++ b/ImapManager.js @@ -0,0 +1,230 @@ +const gen = require('gen-thread'); +const Imap = require('imap'); +const iconv = require('iconv-lite'); + +module.exports = ImapManager; + +function ImapManager() +{ + this.accounts = {}; + this.connections = {}; + this.busy = {}; + this.selected = {}; + this.queue = {}; +} + +ImapManager.prototype.setServer = function(accountId, settings) +{ + this.accounts[accountId] = settings; +} + +ImapManager.prototype.getConnection = function*(accountId, boxName, connKey) +{ + var self = this; + connKey = accountId+(connKey||''); + if (self.connections[connKey]) + { + if (self.busy[connKey]) + yield self.queue[connKey].push(gen.cb()); + if (boxName && self.selected[connKey] != boxName) + { + yield srv.openBox(boxName, true, gen.ef()); + self.selected[connKey] = boxName; + } + self.busy[connKey] = true; + return self.connections[connKey]; + } + + var srv = new Imap(self.accounts[accountId]); + + srv.once('ready', gen.cb()); + // FIXME handle connection errors + yield srv.connect(); + yield srv._enqueue('ENABLE QRESYNC', gen.cb()); + + // Monkey-patch node-imap to support VANISHED responses + var oldUT = srv._parser._resUntagged; + srv._parser._resUntagged = function() + { + var m; + if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer)) + { + srv.emit('vanish', m[2].split(/,/).map(s => s.split(':'))); + } + oldUT.apply(this); + }; + + srv.on('close', function() + { + delete self.connections[connKey]; + if (self.srv == srv) + delete self.srv; + }); + + if (boxName) + { + yield srv.openBox(boxName, true, gen.ef()); + self.selected[connKey] = boxName; + } + + self.connections[connKey] = srv; + self.busy[connKey] = true; + self.queue[connKey] = []; + return srv; +} + +ImapManager.prototype.releaseConnection = function(accountId, connKey, allowClose) +{ + var self = this; + connKey = accountId+(connKey||''); + self.busy[connKey] = false; + if (self.queue[connKey].length) + (self.queue[connKey].shift())(); + else if (allowClose) + { + self.connections[connKey].end(); + delete self.connections[connKey]; + delete self.busy[connKey]; + delete self.queue[connKey]; + delete self.selected[connKey]; + } +} + +ImapManager.prototype.runFetch = function*(srv, what, params, processor, args) +{ + var self = this; + var f = srv.fetch(what, params); + + var fetchState = { + ...(args||{}), + parsed: 0, + paused: false, + synced: 0, + pending: [], + results: [], + srv: srv + }; + + var cb, wait; + f.on('message', function(msg, seqnum) + { + gen.run(self.onMessage(fetchState, msg, seqnum, processor), checkFinish, function(e) { checkFinish(); throw e; }); + }); + + cb = gen.cb(); + yield f.once('end', function() + { + wait = true; + if (fetchState.parsed <= 0) + cb(); + else if (fetchState.pending.length > 0) + gen.run(processor(fetchState.pending, fetchState), saveLast, function(e) { saveLast(); throw e; }); + }); + + if (fetchState.results.length > 0) + { + return fetchState.results; + } + + function saveLast(r) + { + if (r) + fetchState.results = fetchState.results.concat(r); + fetchState.parsed -= fetchState.pending.length; + fetchState.pending = []; + checkFinish(); + } + + function checkFinish() + { + if (fetchState.parsed <= 0 && wait) + cb(); + } +}; + +ImapManager.prototype.onMessage = function*(fetchState, msg, seqnum, processor) +{ + var self = this; + var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum); + // Workaround memory leak in node-imap + // TODO: send pull request + if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache) + delete fetchState.srv._curReq.fetchCache[seqnum]; + + fetchState.pending.push([ msgrow, attrs ]); + fetchState.parsed++; + if (!fetchState.paused && fetchState.parsed >= 100) + { + // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! + fetchState.srv._parser._ignoreReadable = true; + fetchState.paused = true; + } + + if (fetchState.pending.length >= 100) + { + var m = fetchState.pending; + fetchState.pending = []; + var err; + var result; + try + { + result = yield gen.run(processor(m, fetchState), gen.cb()); + if (result) + fetchState.results = fetchState.results.concat(result); + } + catch (e) + { + err = e; + } + fetchState.parsed -= m.length; + if (fetchState.paused && fetchState.parsed < 100) + { + fetchState.paused = false; + fetchState.srv._parser._ignoreReadable = false; + process.nextTick(fetchState.srv._parser._cbReadable); + } + if (err) + throw err; + } +} + +ImapManager.prototype.parseMessage = function*(msg, seqnum) +{ + var msgrow = {}; + var attrs; + msg.on('body', function(stream, info) + { + var buffer; + stream.on('data', function(chunk) + { + if (!buffer) + buffer = chunk; + else + buffer = Buffer.concat([ buffer, chunk ]); + }); + stream.once('end', function() + { + var b = buffer.toString('utf8'); + if (b.indexOf('�') >= 0) + { + let enc = /Content-type:\s*[^;\n]*;\s*charset=(\S+)/i.exec(b); + enc = enc ? enc[1] : 'windows-1251'; + try { b = iconv.decode(buffer, enc); } + catch (e) {} + } + if (b.indexOf('\0') >= 0) + b = b.substr(0, b.indexOf('\0')); + msgrow.headers = b; + }); + }); + msg.once('attributes', function(a) { + attrs = a; + }); + yield msg.once('end', gen.cb()); + msgrow.uid = attrs.uid; + msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')); + var nf = msgrow.flags.filter(f => f != 'seen'); + nf = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf; + msgrow.flags = nf.sort(); + return [ msgrow, attrs ]; +} diff --git a/Syncer.js b/Syncer.js new file mode 100644 index 0000000..4e0760e --- /dev/null +++ b/Syncer.js @@ -0,0 +1,427 @@ +const gen = require('gen-thread'); +const Imap = require('imap'); +const ImapManager = require('./ImapManager.js'); + +module.exports = Syncer; + +function Syncer(pg) +{ + this.syncInProgress = false; + this.pg = pg; + this.imap = new ImapManager(); +} + +Syncer.prototype.init = function*(cfg) +{ + for (var i = 0; i < cfg.accounts.length; i++) + yield* this.addAccount(cfg.accounts[i]); + yield* this.loadAccounts(); +} + +Syncer.prototype.syncAll = function*() +{ + this.syncInProgress = true; + for (var id in this.accounts) + yield* this.syncAccount(this.accounts[id]); + this.syncInProgress = false; +} + +Syncer.prototype.addAccount = function*(account) +{ + var self = this; + var [ row ] = yield this.pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef()); + if (row.length) + { + row = row[0]; + } + else + { + [ row ] = yield this.pg.insert('accounts', { + name: account.name, + email: account.email, + settings: { + imap: account.imap + } + }).returning('*').row(gen.ef()); + } + return row.id; +} + +Syncer.prototype.loadAccounts = function*() +{ + let [ rows ] = yield this.pg.select('*').from('accounts').rows(gen.ef()); + this.accounts = {}; + for (var i = 0; i < rows.length; i++) + { + this.accounts[rows[i].id] = rows[i]; + this.imap.setServer(rows[i].id, rows[i].settings.imap); + } +} + +Syncer.prototype.syncAccount = function*(account) +{ + var self = this; + var accountId; + var [ rows ] = yield this.pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef()); + if (rows[0] && rows[0].id) + accountId = rows[0].id; + else + { + var [ row ] = yield this.pg.insert('accounts', { + name: account.name, + email: account.email, + settings: { + imap: account.imap + } + }).returning('id').row(gen.ef()); + accountId = row.id; + } + var srv = yield* self.imap.getConnection(accountId, null, 'S'); + var [ boxes ] = yield srv.getBoxes(gen.ef()); + for (var k in boxes) + { + var boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase(); + yield* self.syncBox(srv, accountId, k, boxKind, true); + } + yield* self.runIdle(accountId, srv); + self.imap.releaseConnection(accountId, 'S'); +} + +Syncer.prototype.syncBox = function*(srv, accountId, boxName, boxKind, doFull) +{ + var [ boxStatus ] = yield srv.openBox(boxName, true, gen.ef()); + + // IMAP sync: http://tools.ietf.org/html/rfc4549 + var [ boxRow ] = yield this.pg.select('*').from('folders') + .where({ account_id: accountId, name: boxStatus.name }).rows(gen.ef()); + if (boxRow.length) + { + boxRow = boxRow[0]; + if (boxRow.uidvalidity != boxStatus.uidvalidity) + { + yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxRow.id }, this.pg.sql('uid is not null'))); + boxRow.uidvalidity = boxStatus.uidvalidity; + } + } + else + { + [ boxRow ] = yield this.pg.insert('folders', { + name: boxStatus.name, + uidvalidity: boxStatus.uidvalidity, + account_id: accountId, + highestmodseq: 0, + kind: boxKind||'' + //unread_count: boxStatus.messages.new, + //total_count: boxStatus.messages.total, + }).returning('id').row(gen.ef()); + } + + // fetch new messages + var missing = []; + var [ maxUid ] = yield this.pg.select('MAX(uid)').from('messages') + .where({ folder_id: boxRow.id }).val(gen.ef()); + if (boxStatus.highestmodseq) + { + process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': quick resync\n'); + yield* this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing); + boxRow.highestmodseq = boxStatus.highestmodseq; + } + else if (doFull && maxUid) + { + // list messages, update flags and version tag + process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': full resync\n'); + yield* this.fullResync(srv, boxRow.id, maxUid, missing); + } + + missing.push((maxUid ? maxUid+1 : 1)+':*'); + yield* this.imap.runFetch(srv, missing, { + size: true, + bodies: 'HEADER' + }, (messages, state) => this.saveMessages(messages, boxRow.id, state)); + + yield this.pg.update('folders', { + uidvalidity: boxRow.uidvalidity, + highestmodseq: boxRow.highestmodseq||0 + }).where({ id: boxRow.id }).run(gen.ef()); +} + +Syncer.prototype.runIdle = function*(accountId, srv) +{ + var self = this; + yield srv.openBox('INBOX', true, gen.ef()); + srv.on('uidvalidity', function(uidvalidity) + { + // uidvalidity changes (FUUUU) remove everything + + }); + srv.on('mail', function(count) + { + // new messages arrived while idling, fetch them + gen.run(function*() + { + var srv = yield* self.imap.getConnection(accountId, null, 'S'); + yield* self.syncBox(srv, accountId, 'INBOX'); + self.imap.releaseConnection(accountId, 'S'); + }); + }); + srv.on('vanish', function(uids) + { + // messages expunged by uids + console.log([ 'VANISH', uids ]); + + }); + srv.on('expunge', function(seqno) + { + // message expunged by (FUUUU) sequence number + console.log(arguments); + + }); +} + +Syncer.prototype.fullResync = function*(srv, boxId, maxUid, missing) +{ + var [ flags ] = yield this.pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef()); + flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {}); + + var updateFlags = []; + + process.stderr.write('\rsynchronizing 0'); + yield* this.imap.runFetch( + srv, '1:'+maxUid, {}, + (messages, state) => this.queueFlags(messages, boxId, state), + { flags: flags, updateFlags: updateFlags, missing: missing||[] } + ); + process.stderr.write('\n'); + + yield* this.updateFlags(boxId, updateFlags); + + // delete messages removed from IMAP server + flags = Object.keys(flags); + if (flags.length) + yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxId }, this.pg.sql.in('uid', flags))); +} + +Syncer.prototype.queueFlags = function*(messages, boxId, fetchState) +{ + for (var i = 0; i < messages.length; i++) + { + var m = messages[i][0]; + if (!fetchState.flags[m.uid]) + fetchState.missing.push(m.uid); + else + { + if (fetchState.flags[m.uid].join(',') != m.flags.join(',')) + fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); + delete fetchState.flags[m.uid]; + } + } + fetchState.synced += messages.length; + process.stderr.write('\rsynchronizing '+fetchState.synced); +} + +Syncer.prototype.updateFlags = function*(boxId, updateFlags, checkMissing) +{ + if (updateFlags.length) + { + var sql = this.pg.update('messages m', { flags: this.pg.sql('t.flags::varchar(255)[]') }) + .from('('+this.pg.sql.values(updateFlags)+') AS t (uid, flags)') + .where({ 'm.folder_id': boxId }).where(this.pg.sql('m.uid=t.uid')); + if (checkMissing) + { + var [ updated ] = yield sql.returning('m.uid').rows(gen.ef()); + var missing = {}; + for (var i = 0; i < updateFlags.length; i++) + missing[updateFlags[i].uid] = true; + for (var i = 0; i < updated.length; i++) + delete missing[updated[i].uid]; + return Object.keys(missing); + } + else + yield sql.run(gen.ef()); + } + return []; +} + +Syncer.prototype.quickResync = function*(srv, boxId, maxUid, changedSince, missing) +{ + var updateFlags = []; + var vanished = []; + var onVanish = function(dias) + { + vanished = vanished.concat(vanished, dias); + }; + + srv.on('vanish', onVanish); + yield* this.imap.runFetch( + srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, + (messages, state) => queueQuickFlags(messages, boxId, state), { updateFlags: updateFlags } + ); + srv.removeListener('vanish', onVanish); + var checkedMissing = yield* this.updateFlags(boxId, updateFlags, missing && true); + if (missing) + missing.push.apply(missing, checkedMissing); + + if (vanished.length) + { + let lst = [], dia = []; + for (let i = 0; i < vanished.length; i++) + { + if (vanished[i][1]) + dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]); + else + lst.push(vanished[i][0]); + } + if (lst.length) + dia.push('uid IN ('+lst.join(',')+')'); + yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxId }, this.pg.sql('('+dia.join(' OR ')+')'))); + } +} + +Syncer.prototype.queueQuickFlags = function*(messages, boxId, fetchState) +{ + for (var i = 0; i < messages.length; i++) + { + var m = messages[i][0]; + fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); + } +} + +Syncer.prototype.deleteMessages = function*(where) +{ + yield this.pg.update('threads', { first_msg: null }) + .where(this.pg.sql('first_msg IN ('+this.pg.select('id').from('messages').where(where)+')')) + .run(gen.ef()); + yield this.pg.delete('messages').where(where).run(gen.ef()); + yield this.pg.update('threads', + { first_msg: this.pg.sql('('+ + this.pg.select('id').from('messages').where({ thread_id: this.pg.sql('threads.id') }).orderBy('time').limit(1) + +')') }).where(this.pg.sql('first_msg IS NULL')).run(gen.ef()); + yield this.pg.delete('threads').where(this.pg.sql('first_msg IS NULL')).run(gen.ef()); +} + +Syncer.prototype.saveMessages = function*(messages, boxId) +{ + var self = this; + yield gen.throttle(2); + var uids = messages.map(m => m[1].uid); + var [ exist ] = yield this.pg.select('uid, flags').from('messages') + .where({ folder_id: boxId }).where(this.pg.sql.in('uid', uids)).rows(gen.ef()); + uids = {}; + for (var i = 0; i < exist.length; i++) + uids[exist[i].uid] = true; + for (var i = 0; i < messages.length; i++) + if (!uids[messages[i][1].uid]) + yield* this.addMessage(boxId, messages[i][0], messages[i][1]); +} + +Syncer.prototype.addMessage = function*(boxId, msgrow, attrs) +{ + var self = this; + var pgtx, end_transaction; + try + { + [ pgtx, end_transaction ] = yield this.pg.transaction(gen.cb(), function(e) { if (e) throw e; }); + + var header = Imap.parseHeader(msgrow.headers); + for (var i in header) + for (var k = 0; k < header[i].length; k++) + header[i][k] = header[i][k].replace(/\x00/g, ''); + header.from = header.from && splitEmails(header.from[0])[0]; + header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0]; + var re = /(<[^>]*>)/; + header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re)); + if (header.references.length) + { + if (header.references.length > 10) + header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9)); + if (!header['in-reply-to'] || !header['in-reply-to'][0]) + header['in-reply-to'] = [ header.references[header.references.length-1] ]; + else if (header.references[header.references.length-1] != header['in-reply-to'][0]) + header.references.push(header['in-reply-to'][0]); + } + if (header.date) + { + var t = Date.parse(header.date[0]); + if (!isNaN(t)) + header.date = new Date(t); + else + header.date = null; + } + if (!header.date) + header.date = new Date(attrs.date); + + msgrow.folder_id = boxId; + msgrow.from_email = header.from && header.from.email || ''; + msgrow.from_name = header.from && header.from.name || ''; + msgrow.replyto_email = header.replyto && header.replyto.email || ''; + msgrow.replyto_name = header.replyto && header.replyto.name || ''; + msgrow.to_list = header.to && header.to[0] || ''; + msgrow.cc_list = header.cc && header.cc[0] || ''; + msgrow.bcc_list = header.bcc && header.bcc[0] || ''; + msgrow.subject = header.subject && header.subject[0] || ''; + msgrow.messageid = header['message-id'] && header['message-id'][0] || ''; + msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || ''; + msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1'); + msgrow.time = header.date; + msgrow.flags = toPgArray(msgrow.flags); + msgrow.refs = toPgArray(header.references); + + var thisIsFirst = false; + if (header.references.length) + { + let [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') + .where(this.pg.sql.in('messageid', header.references)).val(gen.ef()); + if (!threadId) + { + [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') + .where(new this.pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef()); + if (threadId) + thisIsFirst = true; + } + msgrow.thread_id = threadId; + } + console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); + [ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef()); + if (!msgrow.thread_id) + { + [ msgrow.thread_id ] = yield pgtx.insert('threads', { + first_msg: msgrow.id, + msg_count: 1 + }).returning('id').val(gen.ef()); + yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef()); + } + else + { + let upd = pgtx.update('threads', { msg_count: this.pg.sql('msg_count+1') }); + if (thisIsFirst) + upd.first_msg = msgrow.id; + yield upd.where({ id: msgrow.threadId }).run(gen.ef()); + } + + end_transaction(); + } + catch (e0) + { + if (end_transaction) + end_transaction(); + throw e0; + } +} + +function splitEmails(s) +{ + var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|]+)>?)/; // ' + var m, r = []; + while (m = re.exec(s)) + { + s = s.substr(m[0].length); + r.push({ name: (m[1]||m[2]||'').trim(), email: (m[3]||m[4]||'').trim() }); + } + return r; +} + +function toPgArray(a) +{ + a = JSON.stringify(a); + return '{'+a.substring(1, a.length-1)+'}'; +} diff --git a/SyncerWeb.js b/SyncerWeb.js new file mode 100644 index 0000000..4efdef2 --- /dev/null +++ b/SyncerWeb.js @@ -0,0 +1,141 @@ +const gen = require('gen-thread'); +const MailParser = require('mailparser').MailParser; +const htmlawed = require('htmlawed'); + +const express = require('express'); +const express_session = require('express-session'); +const bodyparser = require('body-parser'); +const multer = require('multer'); + +module.exports = SyncerWeb; + +function SyncerWeb(syncer, pg, cfg) +{ + this.syncer = syncer; + this.pg = pg; + this.cfg = cfg; + this.app = express(); + this.app.use(bodyparser.urlencoded({ extended: false })); + this.app.use(express_session({ + secret: this.cfg.sessionSecret || '1083581xm1l3s1l39k', + resave: false, + saveUninitialized: false + })); + this.app.get('/auth', this.get_auth); + this.app.post('/auth', this.post_auth); + this.app.get('/folders', genRequest(this.get_folders.bind(this))); + this.app.get('/messages', genRequest(this.get_messages.bind(this))); + this.app.get('/message', genRequest(this.get_message.bind(this))); + this.app.post('/sync', genRequest(this.post_sync.bind(this))); +} + +SyncerWeb.prototype.get_auth = function(req, res) +{ + return res.type('html').send('
'); +} + +SyncerWeb.prototype.post_auth = function(req, res) +{ + if (!req.body) + return res.sendStatus(400); + if (req.body.login == this.cfg.login && req.body.password == this.cfg.password) + { + req.session.auth = true; + return res.send({ ok: true }); + } + return res.send({ ok: false }); +} + +SyncerWeb.prototype.get_folders = function*(req, res) +{ + if (!req.session || !req.session.auth) + return res.sendStatus(401); + var [ accounts ] = yield this.pg.select('id, name, email').from('accounts').rows(gen.ef()); + var [ folders ] = yield this.pg.select( + 'id, account_id, name,'+ + ' (select count(*) from messages m where m.folder_id=f.id) total_count,'+ + ' (select count(*) from messages m where m.folder_id=f.id and (flags @> array[\'unread\']::varchar(255)[])) unread_count' + ).from('folders f').orderBy('account_id, name').rows(gen.ef()); + var fh = {}; + for (let i = 0; i < folders.length; i++) + { + fh[folders[i].account_id] = fh[folders[i].account_id] || []; + fh[folders[i].account_id].push(folders[i]); + } + for (let i = 0; i < accounts.length; i++) + { + accounts[i].folders = fh[accounts[i].id] || []; + } + return res.send({ accounts: accounts }); +} + +SyncerWeb.prototype.get_messages = function*(req, res) +{ + if (!req.session || !req.session.auth) + return res.sendStatus(401); + var folderId = req.query.folderId; + if (!folderId) + return res.status(500).send('Need `folderId` query parameter'); + var limit = req.query.limit || 50; + var offset = req.query.offset || 0; + var [ msgs ] = yield this.pg.select('*').from('messages').where({ folder_id: folderId }) + .orderBy('time desc').limit(limit).offset(offset).rows(gen.ef()); + return res.send({ messages: msgs }); +} + +SyncerWeb.prototype.get_message = function*(req, res) +{ + if (!req.session || !req.session.auth) + return res.sendStatus(401); + var msgId = req.query.msgId; + var [ msg ] = yield this.pg.select('m.*, f.name folder_name, f.account_id') + .from('messages m').join('folders f', this.pg.sql('f.id=m.folder_id')) + .where({ 'm.id': msgId }).row(gen.ef()); + if (!msg) + return res.send({ error: 'not-found' }); + delete msg.text_index; + if (!msg.body_html && !msg.body_text) + { + var srv = yield* this.syncer.imap.getConnection(msg.account_id, msg.folder_name); + var [ upd ] = yield* this.syncer.imap.runFetch( + srv, msg.uid, { bodies: '' }, + (messages, state) => getBody(this.pg, messages, msg.folder_id) + ); + this.syncer.imap.releaseConnection(msg.account_id); + return res.send({ msg: { ...msg, ...upd } }); + } + return res.send({ msg: msg }); +} + +SyncerWeb.prototype.post_sync = function*(req, res) +{ + if (!req.session || !req.session.auth) + return res.sendStatus(401); + if (self.syncer.syncInProgress) + return res.send({ error: 'already-running' }); + gen.run(self.syncer.syncAll()); + return res.send({ status: 'started' }); +} + +function* getBody(pg, messages, boxId) +{ + var p = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' }); + for (var i = 0; i < messages.length; i++) + { + let msg = messages[i]; + p.on('end', gen.cb()); + p.write(msg[0].headers); + let [ obj ] = yield p.end(); + obj.html = htmlawed.sanitize(obj.html||'', { safe: 1, elements: '* +style' }); + let upd = { body_text: obj.text||'', body_html: obj.html }; + upd.body_html_text = obj.html.replace(/]*>.*<\/style\s*>|<\/?[^>]*>/g, ''); + yield pg.update('messages m', upd).where({ folder_id: boxId, uid: msg[0].uid }).run(gen.ef()); + if (messages.length == 1) + return [ upd ]; + } +} + +function genRequest(fn) +{ + return (req, res) => gen.run(fn(req, res), null, e => res.status(500).send('Internal Error: '+e.stack)); +} diff --git a/cfg.json.example b/cfg.json.example new file mode 100644 index 0000000..4191837 --- /dev/null +++ b/cfg.json.example @@ -0,0 +1,23 @@ +{ + "login": "", + "password": "", + "pg": { + "user": "", + "database": "", + "password": "", + "port": 5432, + "max": 10, + "idleTimeoutMillis": 30000 + }, + "accounts": [ { + "email": "", + "name": "", + "imap": { + "user": "", + "password": "", + "host": "", + "port": 143, + "tls": false + } + } ] +} diff --git a/operetta.js b/operetta.js index afb29f7..607aa2c 100644 --- a/operetta.js +++ b/operetta.js @@ -1,4 +1,3 @@ -// TODO: Упростить лапшу, в частности syncAccount вообще некорректен при первом запуске // TODO: Получать, парсить и хранить тела писем (и, вероятно, вложения) + индексировать тексты // TODO: Группировка писем // TODO: Сделать подписки на новые сообщения по вебсокетам @@ -25,17 +24,11 @@ require('heapdump'); -const gen = require("gen-thread"); -const Imap = require('imap'); -const iconv = require('iconv-lite'); -const MailParser = require('mailparser').MailParser; +require('babel-register'); +const gen = require('gen-thread'); const bricks = require('pg-bricks'); -const htmlawed = require('htmlawed'); - -const express = require('express'); -const express_session = require('express-session'); -const bodyparser = require('body-parser'); -const multer = require('multer'); +const Syncer = require('./Syncer.js'); +const SyncerWeb = require('./SyncerWeb.js'); process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; var cfg = require('./cfg.json'); @@ -43,732 +36,13 @@ var cfg = require('./cfg.json'); var pg = bricks.configure(cfg.pg); pg._pg.types.setTypeParser(1082, 'text', val => val); // НЕ ПАРСИТЬ ДАТЫ ! ! ! -var Syncer = { - selected: {}, - connections: {}, - busy: {}, - queue: {}, - syncInProgress: false -}; +var syncer = new Syncer(pg); +var syncerweb = new SyncerWeb(syncer, pg, cfg); -Syncer.app = express(); - -Syncer.app.use(bodyparser.urlencoded({ extended: false })); - -Syncer.app.use(express_session({ - secret: cfg.sessionSecret || '1083581xm1l3s1l39k', - resave: false, - saveUninitialized: false -})); - -Syncer.app.get('/auth', function(req, res) +gen.run(function*() { - return res.type('html').send('
'); + yield* syncer.init(cfg); + yield* syncer.syncAll(); }); -Syncer.app.post('/auth', function(req, res) -{ - if (!req.body) - return res.sendStatus(400); - if (req.body.login == cfg.login && req.body.password == cfg.password) - { - req.session.auth = true; - return res.send({ ok: true }); - } - return res.send({ ok: false }); -}); - -Syncer.app.get('/folders', genRequest(function*(req, res) -{ - var self = Syncer; - if (!req.session || !req.session.auth) - return res.sendStatus(401); - var [ accounts ] = yield pg.select('id, name, email').from('accounts').rows(gen.ef()); - var [ folders ] = yield pg.select( - 'id, account_id, name,'+ - ' (select count(*) from messages m where m.folder_id=f.id) total_count,'+ - ' (select count(*) from messages m where m.folder_id=f.id and (flags @> array[\'unread\']::varchar(255)[])) unread_count' - ).from('folders f').orderBy('account_id, name').rows(gen.ef()); - var fh = {}; - for (let i = 0; i < folders.length; i++) - { - fh[folders[i].account_id] = fh[folders[i].account_id] || []; - fh[folders[i].account_id].push(folders[i]); - } - for (let i = 0; i < accounts.length; i++) - { - accounts[i].folders = fh[accounts[i].id] || []; - } - return res.send({ accounts: accounts }); -})); - -Syncer.app.get('/messages', genRequest(function*(req, res) -{ - var self = Syncer; - if (!req.session || !req.session.auth) - return res.sendStatus(401); - var folderId = req.query.folderId; - if (!folderId) - return res.status(500).send('Need `folderId` query parameter'); - var limit = req.query.limit || 50; - var offset = req.query.offset || 0; - var [ msgs ] = yield pg.select('*').from('messages').where({ folder_id: folderId }) - .orderBy('time desc').limit(limit).offset(offset).rows(gen.ef()); - return res.send({ messages: msgs }); -})); - -Syncer.app.get('/message', genRequest(function*(req, res) -{ - var self = Syncer; - if (!req.session || !req.session.auth) - return res.sendStatus(401); - var msgId = req.query.msgId; - var [ msg ] = yield pg.select('m.*, f.name folder_name, f.account_id') - .from('messages m').join('folders f', pg.sql('f.id=m.folder_id')) - .where({ 'm.id': msgId }).row(gen.ef()); - if (!msg) - return res.send({ error: 'not-found' }); - delete msg.text_index; - if (!msg.body_html && !msg.body_text) - { - var srv = yield* self.getConnection(msg.account_id, msg.folder_name); - var [ upd ] = yield* self.runFetch(srv, msg.uid, { bodies: '' }, msg.folder_id, 'getBody'); - self.releaseConnection(msg.account_id); - return res.send({ msg: { ...msg, ...upd } }); - } - return res.send({ msg: msg }); -})); - -Syncer.app.get('/sync', genRequest(function*(req, res) -{ - var self = Syncer; - if (!req.session || !req.session.auth) - return res.sendStatus(401); - if (self.syncInProgress) - return res.send({ error: 'already-running' }); - Syncer.syncAll(); - return res.send({ status: 'started' }); -})); - -Syncer.getBody = function*(messages, boxId) -{ - var self = this; - var p = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' }); - for (var i = 0; i < messages.length; i++) - { - let msg = messages[i]; - p.on('end', gen.cb()); - p.write(msg[0].headers); - let [ obj ] = yield p.end(); - obj.html = htmlawed.sanitize(obj.html||'', { safe: 1, elements: '* +style' }); - let upd = { body_text: obj.text||'', body_html: obj.html }; - upd.body_html_text = obj.html.replace(/]*>.*<\/style\s*>|<\/?[^>]*>/g, ''); - yield pg.update('messages m', upd).where({ folder_id: boxId, uid: msg[0].uid }).run(gen.ef()); - if (messages.length == 1) - return [ upd ]; - } -}; - -Syncer.getConnection = function*(accountId, boxName, connKey) -{ - var self = this; - connKey = accountId+(connKey||''); - if (self.connections[connKey]) - { - if (self.busy[connKey]) - yield self.queue[connKey].push(gen.cb()); - if (boxName && self.selected[connKey] != boxName) - { - yield srv.openBox(boxName, true, gen.ef()); - self.selected[connKey] = boxName; - } - self.busy[connKey] = true; - return self.connections[connKey]; - } - - if (!self.accounts) - { - self.accounts = {}; - let [ rows ] = yield pg.select('*').from('accounts').rows(gen.ef()); - for (var i = 0; i < rows.length; i++) - { - self.accounts[rows[i].id] = rows[i]; - } - } - var srv = new Imap(self.accounts[accountId].settings.imap); - - srv.once('ready', gen.cb()); - // FIXME handle connection errors - yield srv.connect(); - yield srv._enqueue('ENABLE QRESYNC', gen.cb()); - - // Monkey-patch node-imap to support VANISHED responses - var oldUT = srv._parser._resUntagged; - srv._parser._resUntagged = function() - { - var m; - if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer)) - { - srv.emit('vanish', m[2].split(/,/).map(s => s.split(':'))); - } - oldUT.apply(this); - }; - - srv.on('close', function() - { - delete self.connections[connKey]; - if (self.srv == srv) - delete self.srv; - }); - - if (boxName) - { - yield srv.openBox(boxName, true, gen.ef()); - self.selected[connKey] = boxName; - } - - self.connections[connKey] = srv; - self.busy[connKey] = true; - self.queue[connKey] = []; - return srv; -} - -Syncer.releaseConnection = function(accountId, connKey, allowClose) -{ - var self = this; - connKey = accountId+(connKey||''); - self.busy[connKey] = false; - if (self.queue[connKey].length) - (self.queue[connKey].shift())(); - else if (allowClose) - { - self.connections[connKey].end(); - delete self.connections[connKey]; - delete self.busy[connKey]; - delete self.queue[connKey]; - delete self.selected[connKey]; - } -} - -Syncer.syncAccount = function*(account) -{ - var self = this; - var accountId; - var [ rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef()); - if (rows[0] && rows[0].id) - accountId = rows[0].id; - else - { - var [ row ] = yield pg.insert('accounts', { - name: account.name, - email: account.email, - settings: { - imap: account.imap - } - }).returning('id').row(gen.ef()); - accountId = row.id; - } - var srv = yield* self.getConnection(accountId, null, 'S'); - var [ boxes ] = yield srv.getBoxes(gen.ef()); - for (var k in boxes) - { - var boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase(); - yield* self.syncBox(srv, accountId, k, boxKind, true); - } - yield* self.runIdle(accountId, srv); - self.releaseConnection(accountId, 'S'); -} - -Syncer.syncBox = function*(srv, accountId, boxName, boxKind, doFull) -{ - var [ boxStatus ] = yield srv.openBox(boxName, true, gen.ef()); - - // IMAP sync: http://tools.ietf.org/html/rfc4549 - var [ boxRow ] = yield pg.select('*').from('folders') - .where({ account_id: accountId, name: boxStatus.name }).rows(gen.ef()); - if (boxRow.length) - { - boxRow = boxRow[0]; - if (boxRow.uidvalidity != boxStatus.uidvalidity) - { - yield* this.deleteMessages(pg.sql.and({ folder_id: boxRow.id }, pg.sql('uid is not null'))); - boxRow.uidvalidity = boxStatus.uidvalidity; - } - } - else - { - [ boxRow ] = yield pg.insert('folders', { - name: boxStatus.name, - uidvalidity: boxStatus.uidvalidity, - account_id: accountId, - highestmodseq: 0, - kind: boxKind||'' - //unread_count: boxStatus.messages.new, - //total_count: boxStatus.messages.total, - }).returning('id').row(gen.ef()); - } - - // fetch new messages - var missing = []; - var [ maxUid ] = yield pg.select('MAX(uid)').from('messages') - .where({ folder_id: boxRow.id }).val(gen.ef()); - if (boxStatus.highestmodseq) - { - yield* this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing); - boxRow.highestmodseq = boxStatus.highestmodseq; - } - else if (doFull && maxUid) - { - // list messages, update flags and version tag - yield* this.fullResync(srv, this.accounts[accountId], boxStatus, boxRow.id, maxUid, missing); - } - - missing.push((maxUid ? maxUid+1 : 1)+':*'); - yield* this.runFetch(srv, missing, { - size: true, - bodies: 'HEADER' - }, boxRow.id, 'saveMessages'); - - yield pg.update('folders', { - uidvalidity: boxRow.uidvalidity, - highestmodseq: boxRow.highestmodseq||0 - }).where({ id: boxRow.id }).run(gen.ef()); -} - -Syncer.runIdle = function*(accountId, srv) -{ - var self = this; - yield srv.openBox('INBOX', true, gen.ef()); - srv.on('uidvalidity', function(uidvalidity) - { - // uidvalidity changes (FUUUU) remove everything - - }); - srv.on('mail', function(count) - { - // new messages arrived while idling, fetch them - gen.run(function*() - { - var srv = yield* self.getConnection(accountId, null, 'S'); - yield* self.syncBox(srv, accountId, 'INBOX'); - self.releaseConnection(accountId, 'S'); - }); - }); - srv.on('vanish', function(uids) - { - // messages expunged by uids - console.log([ 'VANISH', uids ]); - - }); - srv.on('expunge', function(seqno) - { - // message expunged by (FUUUU) sequence number - console.log(arguments); - - }); -} - -Syncer.fullResync = function*(srv, account, box, boxId, maxUid, missing) -{ - var [ flags ] = yield pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef()); - flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {}); - - var updateFlags = []; - - process.stderr.write(account.email+'/'+box.name+': full resync\n'); - process.stderr.write('\rsynchronizing 0'); - yield* this.runFetch(srv, '1:'+maxUid, {}, boxId, 'queueFlags', - { flags: flags, updateFlags: updateFlags, missing: missing||[] }); - process.stderr.write('\n'); - - yield* this.updateFlags(boxId, updateFlags); - - // delete messages removed from IMAP server - flags = Object.keys(flags); - if (flags.length) - yield* this.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql.in('uid', flags))); -} - -Syncer.queueFlags = function*(messages, boxId, fetchState) -{ - for (var i = 0; i < messages.length; i++) - { - var m = messages[i][0]; - if (!fetchState.flags[m.uid]) - fetchState.missing.push(m.uid); - else - { - if (fetchState.flags[m.uid].join(',') != m.flags.join(',')) - fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); - delete fetchState.flags[m.uid]; - } - } - fetchState.synced += messages.length; - process.stderr.write('\rsynchronizing '+fetchState.synced); -} - -Syncer.updateFlags = function*(boxId, updateFlags, checkMissing) -{ - if (updateFlags.length) - { - var sql = pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') }) - .from('('+pg.sql.values(updateFlags)+') AS t (uid, flags)') - .where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')); - if (checkMissing) - { - var [ updated ] = yield sql.returning('m.uid').rows(gen.ef()); - var missing = {}; - for (var i = 0; i < updateFlags.length; i++) - missing[updateFlags[i].uid] = true; - for (var i = 0; i < updated.length; i++) - delete missing[updated[i].uid]; - return Object.keys(missing); - } - else - yield sql.run(gen.ef()); - } - return []; -} - -Syncer.quickResync = function*(srv, boxId, maxUid, changedSince, missing) -{ - var updateFlags = []; - var vanished = []; - var onVanish = function(dias) - { - vanished = vanished.concat(vanished, dias); - }; - - srv.on('vanish', onVanish); - yield* this.runFetch( - srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, - boxId, 'queueQuickFlags', { updateFlags: updateFlags } - ); - srv.removeListener('vanish', onVanish); - var checkedMissing = yield* this.updateFlags(boxId, updateFlags, missing && true); - if (missing) - missing.push.apply(missing, checkedMissing); - - if (vanished.length) - { - let lst = [], dia = []; - for (let i = 0; i < vanished.length; i++) - { - if (vanished[i][1]) - dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]); - else - lst.push(vanished[i][0]); - } - if (lst.length) - dia.push('uid IN ('+lst.join(',')+')'); - yield* this.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('('+dia.join(' OR ')+')'))); - } -} - -Syncer.queueQuickFlags = function*(messages, boxId, fetchState) -{ - for (var i = 0; i < messages.length; i++) - { - var m = messages[i][0]; - fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); - } -} - -Syncer.deleteMessages = function*(where) -{ - yield pg.update('threads', { first_msg: null }) - .where(pg.sql('first_msg IN ('+pg.select('id').from('messages').where(where)+')')) - .run(gen.ef()); - yield pg.delete('messages').where(where).run(gen.ef()); - yield pg.update('threads', - { first_msg: pg.sql('('+ - pg.select('id').from('messages').where({ thread_id: pg.sql('threads.id') }).orderBy('time').limit(1) - +')') }).where(pg.sql('first_msg IS NULL')).run(gen.ef()); - yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef()); -} - -Syncer.runFetch = function*(srv, what, params, boxId, processor, args) -{ - var self = this; - var f = srv.fetch(what, params); - - var fetchState = { - ...(args||{}), - parsed: 0, - paused: false, - synced: 0, - pending: [], - results: [], - srv: srv - }; - - var cb, wait; - f.on('message', function(msg, seqnum) - { - gen.run(self.onMessage(fetchState, msg, seqnum, boxId, processor), checkFinish, function(e) { checkFinish(); throw e; }); - }); - - cb = gen.cb(); - yield f.once('end', function() - { - wait = true; - if (fetchState.parsed <= 0) - cb(); - else if (fetchState.pending.length > 0) - gen.run(self[processor](fetchState.pending, boxId, fetchState), saveLast, function(e) { saveLast(); throw e; }); - }); - - if (fetchState.results.length > 0) - { - return fetchState.results; - } - - function saveLast(r) - { - if (r) - fetchState.results = fetchState.results.concat(r); - fetchState.parsed -= fetchState.pending.length; - fetchState.pending = []; - checkFinish(); - } - - function checkFinish() - { - if (fetchState.parsed <= 0 && wait) - cb(); - } -}; - -Syncer.onMessage = function*(fetchState, msg, seqnum, boxId, processor) -{ - var self = this; - var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum, boxId); - // Workaround memory leak in node-imap - // TODO: send pull request - if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache) - delete fetchState.srv._curReq.fetchCache[seqnum]; - - fetchState.pending.push([ msgrow, attrs ]); - fetchState.parsed++; - if (!fetchState.paused && fetchState.parsed >= 100) - { - // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! - fetchState.srv._parser._ignoreReadable = true; - fetchState.paused = true; - } - - if (fetchState.pending.length >= 100) - { - var m = fetchState.pending; - fetchState.pending = []; - var err; - var result; - try - { - result = yield gen.run(self[processor](m, boxId, fetchState), gen.cb()); - if (result) - fetchState.results = fetchState.results.concat(result); - } - catch (e) - { - err = e; - } - fetchState.parsed -= m.length; - if (fetchState.paused && fetchState.parsed < 100) - { - fetchState.paused = false; - fetchState.srv._parser._ignoreReadable = false; - process.nextTick(fetchState.srv._parser._cbReadable); - } - if (err) - throw err; - } -} - -Syncer.parseMessage = function*(msg, seqnum, boxId) -{ - var msgrow = {}; - var attrs; - msg.on('body', function(stream, info) - { - var buffer; - stream.on('data', function(chunk) - { - if (!buffer) - buffer = chunk; - else - buffer = Buffer.concat([ buffer, chunk ]); - }); - stream.once('end', function() - { - var b = buffer.toString('utf8'); - if (b.indexOf('�') >= 0) - { - let enc = /Content-type:\s*[^;\n]*;\s*charset=(\S+)/i.exec(b); - enc = enc ? enc[1] : 'windows-1251'; - try { b = iconv.decode(buffer, enc); } - catch (e) {} - } - if (b.indexOf('\0') >= 0) - b = b.substr(0, b.indexOf('\0')); - msgrow.headers = b; - }); - }); - msg.once('attributes', function(a) { - attrs = a; - }); - yield msg.once('end', gen.cb()); - msgrow.uid = attrs.uid; - msgrow.folder_id = boxId; - msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$')); - var nf = msgrow.flags.filter(f => f != 'seen'); - nf = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf; - msgrow.flags = nf.sort(); - return [ msgrow, attrs ]; -} - -Syncer.saveMessages = function*(messages, boxId) -{ - var self = this; - yield gen.throttle(2); - var uids = messages.map(m => m[1].uid); - var [ exist ] = yield pg.select('uid, flags').from('messages') - .where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef()); - uids = {}; - for (var i = 0; i < exist.length; i++) - uids[exist[i].uid] = true; - for (var i = 0; i < messages.length; i++) - if (!uids[messages[i][1].uid]) - yield* this.addMessage(messages[i][0], messages[i][1]); -} - -Syncer.addMessage = function*(msgrow, attrs) -{ - var self = this; - var pgtx, end_transaction; - try - { - [ pgtx, end_transaction ] = yield pg.transaction(gen.cb(), function(e) { if (e) throw e; }); - - var header = Imap.parseHeader(msgrow.headers); - for (var i in header) - for (var k = 0; k < header[i].length; k++) - header[i][k] = header[i][k].replace(/\x00/g, ''); - header.from = header.from && splitEmails(header.from[0])[0]; - header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0]; - var re = /(<[^>]*>)/; - header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re)); - if (header.references.length) - { - if (header.references.length > 10) - header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9)); - if (!header['in-reply-to'] || !header['in-reply-to'][0]) - header['in-reply-to'] = [ header.references[header.references.length-1] ]; - else if (header.references[header.references.length-1] != header['in-reply-to'][0]) - header.references.push(header['in-reply-to'][0]); - } - if (header.date) - { - var t = Date.parse(header.date[0]); - if (!isNaN(t)) - header.date = new Date(t); - else - header.date = null; - } - if (!header.date) - header.date = new Date(attrs.date); - - msgrow.from_email = header.from && header.from.email || ''; - msgrow.from_name = header.from && header.from.name || ''; - msgrow.replyto_email = header.replyto && header.replyto.email || ''; - msgrow.replyto_name = header.replyto && header.replyto.name || ''; - msgrow.to_list = header.to && header.to[0] || ''; - msgrow.cc_list = header.cc && header.cc[0] || ''; - msgrow.bcc_list = header.bcc && header.bcc[0] || ''; - msgrow.subject = header.subject && header.subject[0] || ''; - msgrow.messageid = header['message-id'] && header['message-id'][0] || ''; - msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || ''; - msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1'); - msgrow.time = header.date; - msgrow.flags = toPgArray(msgrow.flags); - msgrow.refs = toPgArray(header.references); - - var thisIsFirst = false; - if (header.references.length) - { - let [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') - .where(pg.sql.in('messageid', header.references)).val(gen.ef()); - if (!threadId) - { - [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') - .where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef()); - if (threadId) - thisIsFirst = true; - } - msgrow.thread_id = threadId; - } - console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); - [ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef()); - if (!msgrow.thread_id) - { - [ msgrow.thread_id ] = yield pgtx.insert('threads', { - first_msg: msgrow.id, - msg_count: 1 - }).returning('id').val(gen.ef()); - yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef()); - } - else - { - let upd = pgtx.update('threads', { msg_count: pg.sql('msg_count+1') }); - if (thisIsFirst) - upd.first_msg = msgrow.id; - yield upd.where({ id: msgrow.threadId }).run(gen.ef()); - } - - end_transaction(); - } - catch (e0) - { - if (end_transaction) - end_transaction(); - throw e0; - } -} - -Syncer.syncAll = function() -{ - gen.run(function*() - { - Syncer.syncInProgress = true; - for (var i = 0; i < cfg.accounts.length; i++) - yield* Syncer.syncAccount(cfg.accounts[i]); - Syncer.syncInProgress = false; - }); -} - -Syncer.app.listen(8057); - -Syncer.syncAll(); - -function genRequest(fn) -{ - return (req, res) => gen.run(fn(req, res), null, e => res.status(500).send('Internal Error: '+e.stack)); -} - -function splitEmails(s) -{ - var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|]+)>?)/; // ' - var m, r = []; - while (m = re.exec(s)) - { - s = s.substr(m[0].length); - r.push({ name: (m[1]||m[2]||'').trim(), email: (m[3]||m[4]||'').trim() }); - } - return r; -} - -function toPgArray(a) -{ - a = JSON.stringify(a); - return '{'+a.substring(1, a.length-1)+'}'; -} +syncerweb.app.listen(8057); diff --git a/package.json b/package.json index 6707f03..0887e1e 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ }, "devDependencies": { "babel-cli": "latest", + "babel-register": "latest", "babel-plugin-transform-es2015-block-scoping": "latest", "babel-plugin-transform-es2015-destructuring": "latest", "babel-plugin-transform-object-rest-spread": "latest", @@ -34,6 +35,6 @@ "eslint-plugin-react": "latest" }, "scripts": { - "run": "babel operetta.js | nodejs" + "run": "nodejs operetta.js" } }