process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; var cfg = require('./cfg.json'); require('heapdump'); var gen = require('gen-thread'); var Imap = require('imap'); var inspect = require('util').inspect; //var pg; //try { require('pg-native'); pg = require('pg').native; } //catch(e) { pg = require('pg'); } //var pg_pool = new pg.Pool(cfg.pg); var bricks = require('pg-bricks'); var pg = bricks.configure('postgresql://'+cfg.pg.user+':'+cfg.pg.password+'@'+(cfg.pg.host||'')+':'+cfg.pg.port+'/'+cfg.pg.database); var flagNum = { '\\recent': 1, '\\flagged': 2, '\\answered': 4, '\\seen': 8, }; function splitEmails(s) { var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|]+)>?)/; // ' var m, r = []; while (m = re.exec(s)) { s = s.substr(m[0].length); r.push({ name: (m[1]||m[2]||'').trim(), email: (m[3]||m[4]||'').trim() }); } return r; } function toPgArray(a) { a = JSON.stringify(a); return '{'+a.substring(1, a.length-1)+'}'; } function* main(account) { var accountId; var [ rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef()); if (rows[0] && rows[0].id) accountId = rows[0].id; else { var [ row ] = pg.insert('accounts', { name: account.name, email: account.email, settings: { imap: account.imap } }).returning('id').row(gen.ef()); accountId = row.id; } var srv = new Imap(account.imap); srv.once('ready', gen.cb()); yield srv.connect(); var [ boxes ] = yield srv.getBoxes(gen.ef()); for (var k in boxes) { var [ box ] = yield srv.openBox(k, true, gen.ef()); var boxId; var [ rows ] = yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new }) .where({ account_id: accountId, name: box.name }).returning('id').rows(gen.ef()); if (rows[0] && rows[0].id) { // IMAP sync: http://tools.ietf.org/html/rfc4549 // TODO: check old uidvalidity boxId = rows[0].id; } else { var [ row ] = yield pg.insert('folders', { name: box.name, uidvalidity: box.uidvalidity, account_id: accountId, unread_count: box.messages.new, // total_count: box.messages.count }).returning('id').row(gen.ef()); boxId = row.id; } var f = srv.fetch('1:*', { size: true, bodies: 'HEADER' }); var parsed = 0, paused = false, synced = 0; var messages = []; function* parseMessage(msg, seqnum, boxId) { var msgrow = {}; var attrs; msg.on('body', function(stream, info) { var buffer = ''; stream.on('data', function(chunk) { buffer += chunk.toString('utf8'); }); stream.once('end', function() { msgrow.body = ''; msgrow.headers = buffer; }); }); msg.once('attributes', function(a) { attrs = a; }); yield msg.once('end', gen.cb()); msgrow.uid = attrs.uid; msgrow.folder_id = boxId; msgrow.flags = 0; for (var i = 0; i < attrs.flags.length; i++) msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()]; msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag return [ msgrow, attrs ]; } function* saveMessages(messages, boxId) { var err; yield gen.throttle(3); try { var uids = []; var uk = {}; for (var i = 0; i < messages.length; i++) { uids.push(messages[i][1].uid); uk[messages[i][1].uid] = messages[i]; } var [ exist ] = yield pg.select('uid, flags').from('messages').where({ folder_id: boxId }).where(pg.sql.in('uid', uids)).rows(gen.ef()); for (var i = 0; i < exist.length; i++) { var fl = uk[exist[i].uid][0].flags; if (fl != exist[i].flags) yield pg.update('messages', { flags: fl }).where({ folder_id: boxId, uid: exist[i].uid }).run(gen.ef()); delete uk[exist[i].uid]; } synced += exist.length; process.stderr.write('\rsynchronizing '+synced); var run = []; for (var i in uk) run.push(addMessage(uk[i][0], uk[i][1])); if (run.length) { process.stderr.write('\n'); yield gen.runParallel(run, gen.cb()); } } catch (e) { err = e; } parsed -= messages.length; if (paused && parsed < 20) { paused = false; srv._parser._ignoreReadable = false; process.nextTick(srv._parser._cbReadable); } if (err) throw err; } function* addMessage(msgrow, attrs) { var pgtx, end_transaction; try { [ pgtx, end_transaction ] = yield pg.transaction(gen.cb(), function(e) { if (e) throw e; }); var header = Imap.parseHeader(msgrow.headers); for (var i in header) for (var k = 0; k < header[i].length; k++) header[i][k] = header[i][k].replace(/\x00/g, ''); header.from = header.from && splitEmails(header.from[0])[0]; header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0]; var re = /(<[^>]*>)/; header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re)); if (header.references.length) { if (header.references.length > 10) header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9)); if (!header['in-reply-to'] || !header['in-reply-to'][0]) header['in-reply-to'] = [ header.references[header.references.length-1] ]; else if (header.references[header.references.length-1] != header['in-reply-to'][0]) header.references.push(header['in-reply-to'][0]); } if (header.date) { var t = Date.parse(header.date[0]); if (!isNaN(t)) header.date = new Date(t); } if (!header.date) header.date = new Date(attrs.date); msgrow.from_email = header.from && header.from.email || ''; msgrow.from_name = header.from && header.from.name || ''; msgrow.replyto_email = header.replyto && header.replyto.email || ''; msgrow.replyto_name = header.replyto && header.replyto.name || ''; msgrow.to_list = header.to && header.to[0] || ''; msgrow.cc_list = header.cc && header.cc[0] || ''; msgrow.bcc_list = header.bcc && header.bcc[0] || ''; msgrow.subject = header.subject && header.subject[0] || ''; msgrow.messageid = header['message-id'] && header['message-id'][0] || ''; msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || ''; msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1'); msgrow.time = header.date; msgrow.refs = toPgArray(header.references); if (header.references.length) { var [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') .where(pg.sql.in('messageid', header.references)).val(gen.ef()); if (!threadId) { [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages') .where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef()); } if (threadId) { try { yield pgtx.update('threads', { msg_count: pg.sql('msg_count+1') }) .where({ id: threadId }).run(gen.ef()); } catch (e) { throw new Error(''+e); } } msgrow.thread_id = threadId; } console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject); [ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef()); if (!msgrow.thread_id) { [ msgrow.thread_id ] = yield pgtx.insert('threads', { first_msg: msgrow.id, msg_count: 1 }).returning('id').val(gen.ef()); yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef()); } end_transaction(); } catch (e0) { if (end_transaction) end_transaction(); throw e0; } } f.on('message', function(msg, seqnum) { gen.run(function*() { var [ msgrow, attrs ] = yield* parseMessage(msg, seqnum, boxId); // Workaround memory leak in node-imap // TODO: send pull request if (srv._curReq.fetchCache) delete srv._curReq.fetchCache[seqnum]; messages.push([ msgrow, attrs ]); parsed++; if (!paused && parsed >= 20) { // ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош! srv._parser._ignoreReadable = true; paused = true; } if (messages.length >= 20) { var m = messages; messages = []; yield gen.run(saveMessages(m, boxId), gen.cb()); } }); }); var cb = gen.cb(); yield f.once('end', function() { process.stderr.write('\n'); if (messages.length > 0) gen.run(saveMessages(messages, boxId)); cb(); }); yield srv.closeBox(gen.cb()); } srv.end(); } gen.run(main(cfg.accounts[0]), function() { process.exit() });