const fs = require('fs'); const path = require('path'); const crypto = require('crypto'); const Imap = require('imap'); const EventEmitter = require('events').EventEmitter; const iconv = require('iconv-lite'); const MailParser = require('mailparser').MailParser; const mimelib = require('mimelib'); const fsp = require('./fsp.js'); const ImapManager = require('./ImapManager.js'); const sanitizeHtml = require('./sanitize.js'); const SQL = require('./select-builder-pgsql.js'); class Syncer { constructor(pg) { this.syncInProgress = false; this.pg = pg; this.imap = new ImapManager(); this.runIdle = this.runIdle.bind(this); this.stopIdle = this.stopIdle.bind(this); this.events = new EventEmitter(); } // public async init(cfg) { this.files_path = path.resolve(cfg.files_path); try { fs.accessSync(this.files_path, fs.constants.R_OK || fs.constants.W_OK); } catch (e) { throw new Error(this.files_path+' is not writable'); } for (let i = 0; i < cfg.accounts.length; i++) { await this.addAccount(cfg.accounts[i]); } await this.loadAccounts(); } // public async syncAll() { this.syncInProgress = true; for (let id in this.accounts) { await this.syncAccount(this.accounts[id]); } this.syncInProgress = false; this.events.emit('sync', { state: 'complete' }); } async addAccount(account) { let row = await SQL.select(this.pg, 'accounts', 'id', { email: account.email }, null, SQL.MS_ROW); if (row) { await SQL.update(this.pg, 'accounts', { settings: JSON.stringify({ imap: account.imap, folders: account.folders }) }, { id: row.id }); } else { row = (await SQL.insert('accounts', { name: account.name, email: account.email, settings: JSON.stringify({ imap: account.imap, folders: account.folders }) }, { returning: '*' }))[0]; } return row.id; } async loadAccounts() { let rows = await SQL.select(this.pg, 'accounts', '*', []); this.accounts = {}; for (let i = 0; i < rows.length; i++) { this.accounts[rows[i].id] = rows[i]; this.imap.setServer(rows[i].id, rows[i].settings.imap); } } async getSyncConnection(accountId, boxName) { return await this.imap.getConnection(accountId, null, 'S', this.runIdle, this.stopIdle); } idleUidvalidity(accountId, uidvalidity) { // FIXME uidvalidity changes (FUUUU) remove everything and resync } idleMail(accountId, count) { // new messages arrived while idling, fetch them (async () => { let srv = await this.getSyncConnection(accountId); await this.syncBox(srv, accountId, 'INBOX'); this.releaseSyncConnection(accountId); })().catch(e => console.error(e.stack)); } idleVanish(accountId, uids) { // messages expunged by uids (async () => { let boxId = await SQL.select( this.pg, 'folders', 'id', { name: 'INBOX', account_id: accountId }, null, SQL.MS_VALUE ); await this.deleteVanished(boxId, uids); })().catch(e => console.error(e.stack)); } idleExpunge(accountId, seqno) { // message expunged by (FUUUU) sequence number(s?) (async () => { let srv = await this.getSyncConnection(accountId); await this.syncBox(srv, accountId, 'INBOX'); this.releaseSyncConnection(accountId); })().catch(e => console.error(e.stack)); } runIdle(accountId, srv) { if (!srv._idleCallbacks) { srv._idleCallbacks = { uidvalidity: this.idleUidvalidity.bind(this, accountId), mail: this.idleMail.bind(this, accountId), vanish: this.idleVanish.bind(this, accountId), expunge: this.idleExpunge.bind(this, accountId) }; } for (let i in srv._idleCallbacks) { srv.on(i, srv._idleCallbacks[i]); } srv.openBox('INBOX', true, () => {}); } stopIdle(accountId, srv) { for (let i in srv._idleCallbacks) { srv.removeListener(i, srv._idleCallbacks[i]); } } releaseSyncConnection(accountId, boxName) { this.imap.releaseConnection(accountId, 'S'); } // public async syncAccount(account) { let accountId = await SQL.select(this.pg, 'accounts', 'id', { email: account.email }, null, SQL.MS_VALUE); if (!accountId) { let row = (await SQL.insert(this.pg, 'accounts', { name: account.name, email: account.email, settings: JSON.stringify({ imap: account.imap }) }, { returning: 'id' }))[0]; accountId = row.id; } let srv = await this.getSyncConnection(accountId); let boxes = await new Promise((res, err) => srv.getBoxes((e, r) => e ? err(e) : res(r))); for (let k in boxes) { let boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase(); await this.syncBox(srv, accountId, k, boxKind, true); } this.releaseSyncConnection(accountId); } async syncBox(srv, accountId, boxName, boxKind, doFull) { let boxStatus = await new Promise((r, e) => srv.openBox(boxName, true, (err, info) => err ? e(err) : r(info))); // IMAP sync: http://tools.ietf.org/html/rfc4549 let boxRow = await SQL.select(this.pg, 'folders', '*', { account_id: accountId, name: boxStatus.name }, null, SQL.MS_ROW); if (boxRow) { if (boxRow.uidvalidity != boxStatus.uidvalidity) { await this.deleteMessages({ folder_id: boxRow.id, 'uid is not null': [] }); boxRow.uidvalidity = boxStatus.uidvalidity; } } else { boxRow = (await SQL.insert(this.pg, 'folders', { name: boxStatus.name, uidvalidity: boxStatus.uidvalidity, account_id: accountId, highestmodseq: 0, kind: boxKind||'' }, { returning: '*' }))[0]; } // fetch new messages let missing = []; let maxUid = await SQL.select(this.pg, 'messages', 'MAX(uid)', { folder_id: boxRow.id }, null, SQL.MS_VALUE); if (boxRow.highestmodseq) { this.events.emit('sync', { state: 'start', quick: true, email: this.accounts[accountId].email, folder: boxRow.name }); process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': quick resync\n'); await this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing); boxRow.highestmodseq = boxStatus.highestmodseq; } else if (doFull && maxUid) { // list messages, update flags and version tag this.events.emit('sync', { state: 'start', email: this.accounts[accountId].email, folder: boxRow.name }); process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': full resync\n'); await this.fullResync(srv, boxRow.id, maxUid, missing, boxStatus.messages.total); } missing.push((maxUid ? maxUid+1 : 1)+':*'); await this.imap.runFetch(srv, missing, { size: true, bodies: 'HEADER', struct: true, }, (messages, state) => this.saveMessages(messages, boxRow.id, state)); await SQL.update(this.pg, 'folders', { uidvalidity: boxStatus.uidvalidity, highestmodseq: boxStatus.highestmodseq||0 }, { id: boxRow.id }); } async fullResync(srv, boxId, maxUid, missing, total) { let flags = await SQL.select(this.pg, 'messages', 'uid, flags', { folder_id: boxId }); flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {}); let updateFlags = []; process.stderr.write('\rsynchronizing 0'); await this.imap.runFetch( srv, '1:'+maxUid, {}, async (messages, state) => this.queueFlags(messages, boxId, state), { flags: flags, updateFlags: updateFlags, missing: missing||[], total: total, nopause: true } ); process.stderr.write('\n'); this.events.emit('sync', { state: 'finish-box' }); await this.updateFlags(boxId, updateFlags); // delete messages removed from IMAP server flags = Object.keys(flags); if (flags.length) { await this.deleteMessages({ folder_id: boxId, uid: flags }); } } queueFlags(messages, boxId, fetchState) { for (let i = 0; i < messages.length; i++) { let m = messages[i][0]; if (!fetchState.flags[m.uid]) { fetchState.missing.push(m.uid); } else { let flags = this.transformFlags(m.flags); if (fetchState.flags[m.uid].join(',') != flags.join(',')) { fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(flags) }); } delete fetchState.flags[m.uid]; } } fetchState.synced += messages.length; if (fetchState.synced-(fetchState.prevSynced||0) >= fetchState.total/100) { this.events.emit('sync', { state: 'progress', done: fetchState.synced, total: fetchState.total }); fetchState.prevSynced = fetchState.synced; } process.stderr.write('\rsynchronizing '+fetchState.synced); } async updateFlags(boxId, updateFlags, checkMissing) { if (updateFlags.length) { let updated = await SQL.update( this.pg, { m: 'messages', t: SQL.values(updateFlags) }, [ 'flags = t.flags::text[]' ], { 'm.folder_id': boxId, 'm.uid = t.uid::int': [] }, checkMissing ? { returning: 'm.uid' } : null ); if (checkMissing) { let missing = {}; for (let i = 0; i < updateFlags.length; i++) missing[updateFlags[i].uid] = true; for (let i = 0; i < updated.length; i++) delete missing[updated[i].uid]; return Object.keys(missing); } } return []; } async quickResync(srv, boxId, maxUid, changedSince, missing) { let updateFlags = []; let vanished = []; let onVanish = function(dias) { vanished = vanished.concat(vanished, dias); }; srv.on('vanish', onVanish); await this.imap.runFetch( srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, async (messages, state) => this.queueQuickFlags(messages, boxId, state), { updateFlags: updateFlags } ); srv.removeListener('vanish', onVanish); let checkedMissing = await this.updateFlags(boxId, updateFlags, missing && true); if (missing) { missing.push.apply(missing, checkedMissing); } if (vanished.length) { await this.deleteVanished(boxId, vanished); } } async deleteVanished(boxId, vanished) { let lst = [], dia = []; for (let i = 0; i < vanished.length; i++) { if (vanished[i][1]) { if (Number(vanished[i][1]) > Number(vanished[i][0]) + 1) dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]); else lst.push(vanished[i][0], vanished[i][1]); } else lst.push(vanished[i][0]); } if (lst.length) { dia.push('uid IN ('+lst.join(',')+')'); } await this.deleteMessages({ folder_id: boxId, ['('+dia.join(' OR ')+')']: [] }); } queueQuickFlags(messages, boxId, fetchState) { for (let i = 0; i < messages.length; i++) { let m = messages[i][0]; fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) }); } } async deleteMessages(where) { let cond = SQL.where_builder(where); let q = SQL.select_builder('messages', 'id', where); await this.pg.query( SQL.quote_positional( 'WITH deleting_messages AS (SELECT id FROM messages WHERE '+cond.sql+')'+ ', updated_threads AS ('+ 'UPDATE threads SET first_msg=('+ 'SELECT m.id FROM messages m WHERE m.thread_id=threads.id'+ ' AND m.id NOT IN (SELECT id FROM deleting_messages) ORDER BY time LIMIT 1'+ ') WHERE first_msg IN (SELECT id FROM deleting_messages)'+ ' RETURNING id, first_msg'+ '), deleted_threads AS ('+ 'DELETE FROM threads WHERE id IN (SELECT id FROM updated_threads WHERE first_msg IS NULL)'+ ' RETURNING id'+ ') DELETE FROM messages WHERE id IN (SELECT id FROM deleting_messages)' ), cond.bind ); } async saveMessages(messages, boxId) { let uids = messages.map(m => m[1].uid); let exist = await SQL.select(this.pg, 'messages', 'uid, flags', { folder_id: boxId, uid: uids }); uids = {}; for (let i = 0; i < exist.length; i++) { uids[exist[i].uid] = true; } for (let i = 0; i < messages.length; i++) { if (!uids[messages[i][1].uid]) { await this.addMessage(boxId, messages[i][0], messages[i][1]); } } } async parseMsg(msg_text) { let parser = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' }); let msg = await new Promise((resolve, reject) => { parser.on('error', reject); parser.once('end', resolve); parser.write(msg_text); parser.end(); }); let byid = {}; for (let a of msg.attachments||[]) { byid[a.contentId||''] = a; } msg.html = (msg.html||'').replace(/(]*src=["']?)cid:([^'"\s]{1,256})/g, (m, m1, m2) => { if (!byid[m2]) { return m1 + 'cid:' + m2; } return m1 + 'data:' + byid[m2].contentType + ';base64,' + byid[m2].toString('base64'); }); let attachments = []; for (let a of msg.attachments||[]) { let hash = crypto.createHash('sha1'); hash.update(a.content); let sha1 = hash.digest('hex'); let subdir = sha1.substr(0, 2)+'/'+sha1.substr(2, 2); let filename = subdir+'/'+sha1+'.bin'; if (!await fsp.exists(this.files_path+'/'+filename)) { if (!await fsp.exists(this.files_path+'/'+sha1.substr(0, 2))) { await fsp.mkdir(this.files_path+'/'+sha1.substr(0, 2)); } if (!await fsp.exists(this.files_path+'/'+subdir)) { await fsp.mkdir(this.files_path+'/'+subdir); } await fsp.writeFile(this.files_path+'/'+filename, a.content); } attachments.push({ id: a.contentId, name: a.fileName, mimetype: a.contentType, size: a.length, sha1, filename, }); } msg.attachments = attachments; return msg; } transformFlags(flags) { // the absence of 'seen' is transformed into the added 'unread' flag // 'unseen' is something that mail.ru adds by itself // 'unread' is removed to not mess up with our 'unread' flags = flags.filter(f => f != 'unseen' && f != 'recent' && f != 'unread'); if (!flags.filter(f => f == 'seen').length) flags = [ ...flags, 'unread' ]; else flags = flags.filter(f => f != 'seen'); flags = flags.sort(); return flags; } extractAttachments(struct, attachments) { attachments = attachments || []; for (let i = 0; i < struct.length; i++) { if (struct[i] instanceof Array) { this.extractAttachments(struct[i], attachments); } else if (struct[i].disposition && struct[i].disposition.type == 'attachment') { attachments.push({ name: mimelib.parseMimeWords(struct[i].disposition.params && struct[i].disposition.params.filename || struct[i].description || ''), mimetype: struct[i].type+'/'+struct[i].subtype, size: struct[i].size, }); } } return attachments; } async addMessage(boxId, msgrow, attrs) { await this.pg.query('BEGIN'); try { await this.addMessageImpl(boxId, msgrow, attrs); await this.pg.query('COMMIT'); } catch (e) { await this.pg.query('ROLLBACK'); throw e; } } async addMessageImpl(boxId, msgrow, attrs) { let header = await this.parseMsg(msgrow.headers); header.references = header.references || []; if (header.references.length) { if (!header.inReplyTo || !header.inReplyTo[0]) header.inReplyTo = [ header.references[header.references.length-1] ]; else if (header.references[header.references.length-1] != header.inReplyTo[0]) header.references.push(header.inReplyTo[0]); } if (!header.date) header.date = new Date(attrs.date); if (JSON.stringify(header).indexOf('�') >= 0) { // Charset error! console.log(iconv.decode(msgrow.headers, 'cp1251')); console.log(header); } delete msgrow.headers; msgrow.folder_id = boxId; msgrow.subject = header.subject || ''; msgrow.props = JSON.stringify({ from: ((header.from||[]).map((a) => [ a.name, a.address ]))[0], to: (header.to||[]).map((a) => [ a.name, a.address ]), cc: (header.cc||[]).map((a) => [ a.name, a.address ]), bcc: (header.bcc||[]).map((a) => [ a.name, a.address ]), replyto: (header.replyTo||[]).map((a) => [ a.name, a.address ])[0], attachments: this.extractAttachments(attrs.struct), inout: (header.headers.received||[]).length ? 'in' : 'out', }); msgrow.messageid = header.messageId || ''; msgrow.inreplyto = header.inReplyTo && header.inReplyTo[0] || ''; msgrow.time = header.date; msgrow.size = attrs.size; msgrow.flags = toPgArray(this.transformFlags(msgrow.flags)); msgrow.refs = toPgArray(header.references); for (let i in msgrow) if (typeof msgrow[i] == 'string') msgrow[i] = msgrow[i].replace(/\x00/g, ''); let thisIsFirst = false; if (header.references.length) { let threadId = await SQL.select( this.pg, 'messages', 'MAX(thread_id)', { messageid: header.references }, null, SQL.MS_VALUE ); if (!threadId) { threadId = await SQL.select( this.pg, 'messages', 'MAX(thread_id)', { 'refs @> array[?]': msgrow.messageid }, null, SQL.MS_VALUE ); if (threadId) { thisIsFirst = true; } } msgrow.thread_id = threadId; } console.log(msgrow.time+' '+(header.from && header.from[0] && header.from[0].address || '?')+' '+msgrow.subject); msgrow.id = (await SQL.insert(this.pg, 'messages', msgrow, { returning: 'id' }))[0].id; if (!msgrow.thread_id) { msgrow.thread_id = (await SQL.insert(this.pg, 'threads', { first_msg: msgrow.id, msg_count: 1 }, { returning: 'id' }))[0].id; await SQL.update(this.pg, 'messages', { thread_id: msgrow.thread_id }, { id: msgrow.id }); } else { let upd = { 'msg_count=msg_count+1': [] }; if (thisIsFirst) { upd.first_msg = msgrow.id; } await SQL.update(this.pg, 'threads', upd, { id: msgrow.thread_id }); } } async fetchFullMessage(account_id, folder_id, folder_name, msg_uid) { let srv = await this.imap.getConnection(account_id, folder_name); let upd = await this.imap.runFetch( srv, msg_uid, { bodies: '' }, (messages, state) => this._parseBody(messages, folder_id) ); this.imap.releaseConnection(account_id); return upd; } async _parseBody(messages, boxId) { for (let i = 0; i < messages.length; i++) { let msg = messages[i]; let obj = await this.parseMsg(msg[0].headers); obj.html = sanitizeHtml(obj.html); let upd = { body_text: obj.text||'', body_html: obj.html, body_html_text: obj.html.replace(/]*>[\s\S]*?<\/style\s*>|<\/?[^>]*>/g, ''), }; await SQL.update( this.pg, 'messages m', { ...upd, 'props = props || ?': [ { attachments: obj.attachments } ] }, { folder_id: boxId, uid: msg[0].uid } ); if (messages.length == 1) { upd.props = { attachments: obj.attachments }; return [ upd ]; } } return null; } // flags = lowercase and without \ async processFlags(msgIds, action, flags) { flags = flags instanceof Array ? flags : [ flags ]; const bad_flags = flags.filter(f => f != 'seen' && f != 'answered' && f != 'flagged' && f != 'deleted' && f != 'draft'); if (bad_flags.length) { throw new Error('bad flags: '+bad_flags.join(', ')); } let rows = await SQL.select( this.pg, { m: 'messages', f: 'folders' }, 'f.account_id, m.folder_id, f.name folder_name, m.uid', { 'm.id': msgIds, 'f.id=m.folder_id': [] }, { order_by: 'm.folder_id, m.uid' } ); if (!rows.length) { return; } let uids = []; if (!(action == 'add' || action == 'set' || action == 'del')) { throw new Error('processFlags: bad action = '+action); } for (let i = 0; i < rows.length; i++) { uids.push(rows[i].uid); if (i == rows.length-1 || i > 0 && rows[i].folder_id != rows[i-1].folder_id) { let srv = await this.imap.getConnection(rows[i].account_id, rows[i].folder_name); await new Promise((ok, no) => srv[action+'Flags']( uids, flags.map(f => '\\'+f.substr(0, 1).toUpperCase()+f.substr(1)), (err, res) => err ? no(err) : ok(res) )); this.imap.releaseConnection(rows[i].account_id); uids = []; } } let upd = 'flags', bind = []; if (action == 'add') { for (let flag of flags) { if (flag == 'seen') { // instead of the 'seen' flag we store the absence of 'unread' upd = 'array_remove(' + upd + ', ?)'; bind.push('unread'); } else { upd = upd + ' || (case when flags @> array[?] then \'{}\' else array[?] end)'; bind.push(flag, flag); } } } else if (action == 'del') { for (let flag of flags) { if (flag == 'seen') { // instead of the absence of 'seen' flag we store 'unread' upd = upd + ' || (case when flags @> array[?] then \'{}\' else array[?] end)'; bind.push('unread', 'unread'); } else { upd = 'array_remove('+upd+', ?)'; bind.push(flag); } } } else { flags = flags.filter(f => f == 'seen').length > 0 ? flags.filter(f => f != 'seen') : [ ...flags, 'unread' ]; upd = 'array[' + flags.map(f => '?').join(', ') + ']::text[]'; bind = [ ...flags ]; } await SQL.update(this.pg, 'messages m', { ['flags = '+upd]: bind }, { 'm.id': msgIds }); } } function toPgArray(a) { a = JSON.stringify(a); return '{'+a.substring(1, a.length-1)+'}'; } function mapToHash(map) { let h = {}; for (let v of map) h[v[0]] = v[1]; return h; } module.exports = Syncer;