diff --git a/db.sql b/db.sql index 769790f..b8a1da5 100644 --- a/db.sql +++ b/db.sql @@ -20,6 +20,7 @@ create table folders ( name varchar(255) not null, unread_count int not null, highestmodseq int not null default 0, + kind varchar(255) not null, foreign key (account_id) references accounts (id) on delete cascade on update cascade ); create unique index folders_name on folders (account_id, name); diff --git a/operetta.js b/operetta.js index 9c193b6..afb29f7 100644 --- a/operetta.js +++ b/operetta.js @@ -1,6 +1,6 @@ +// TODO: Упростить лапшу, в частности syncAccount вообще некорректен при первом запуске // TODO: Получать, парсить и хранить тела писем (и, вероятно, вложения) + индексировать тексты // TODO: Группировка писем -// TODO: Висеть в виде демона и сразу получать новые письма (IDLE) // TODO: Сделать подписки на новые сообщения по вебсокетам // TODO: Чего я ещё хотел - интеграцию с maillog'ом и серверным спамфильтром @@ -208,7 +208,9 @@ Syncer.getConnection = function*(accountId, boxName, connKey) { var m; if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer)) - self.vanished = self.vanished.concat(self.vanished, m[2].split(/,/).map(s => s.split(':'))); + { + srv.emit('vanish', m[2].split(/,/).map(s => s.split(':'))); + } oldUT.apply(this); }; @@ -248,7 +250,7 @@ Syncer.releaseConnection = function(accountId, connKey, allowClose) } } -Syncer.sync = function*(account) +Syncer.syncAccount = function*(account) { var self = this; var accountId; @@ -266,69 +268,107 @@ Syncer.sync = function*(account) }).returning('id').row(gen.ef()); accountId = row.id; } - var srv = yield* self.getConnection(accountId, null, 'S'); var [ boxes ] = yield srv.getBoxes(gen.ef()); for (var k in boxes) { - var [ box ] = yield srv.openBox(k, true, gen.ef()); - var boxId, changedSince = 0; - // IMAP sync: http://tools.ietf.org/html/rfc4549 - var [ row ] = yield pg.select('*').from('folders') - .where({ account_id: accountId, name: box.name }).rows(gen.ef()); - if (row.length) - { - row = row[0]; - changedSince = row.highestmodseq; - boxId = row.id; - if (row.uidvalidity != box.uidvalidity) - { - yield pg.delete('messages').where({ folder_id: row.id }) - .where(pg.sql('uid is not null')).run(gen.ef()); - } - } - else - { - [ row ] = yield pg.insert('folders', { - name: box.name, - uidvalidity: box.uidvalidity, - account_id: accountId, - highestmodseq: 0, - //unread_count: box.messages.new, - //total_count: box.messages.total, - }).returning('id').row(gen.ef()); - boxId = row.id; - } - - self.missing = []; - var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef()); - if (changedSince) - { - yield* self.quickResync(srv, account, box, boxId, maxUid, changedSince); - } - else if (maxUid) - { - // list messages, update flags and version tag - yield* self.fullResync(srv, account, box, boxId, maxUid); - } - - // fetch new messages - self.missing.push((maxUid ? maxUid+1 : 1)+':*'); - yield* self.runFetch(srv, self.missing, { - size: true, - bodies: 'HEADER' - }, boxId, 'saveMessages'); - - yield pg.update('folders', { - uidvalidity: box.uidvalidity, - //unread_count: box.messages.new, - highestmodseq: box.highestmodseq||0 - }).where({ id: row.id }).run(gen.ef()); + var boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase(); + yield* self.syncBox(srv, accountId, k, boxKind, true); } + yield* self.runIdle(accountId, srv); self.releaseConnection(accountId, 'S'); } -Syncer.fullResync = function*(srv, account, box, boxId, maxUid) +Syncer.syncBox = function*(srv, accountId, boxName, boxKind, doFull) +{ + var [ boxStatus ] = yield srv.openBox(boxName, true, gen.ef()); + + // IMAP sync: http://tools.ietf.org/html/rfc4549 + var [ boxRow ] = yield pg.select('*').from('folders') + .where({ account_id: accountId, name: boxStatus.name }).rows(gen.ef()); + if (boxRow.length) + { + boxRow = boxRow[0]; + if (boxRow.uidvalidity != boxStatus.uidvalidity) + { + yield* this.deleteMessages(pg.sql.and({ folder_id: boxRow.id }, pg.sql('uid is not null'))); + boxRow.uidvalidity = boxStatus.uidvalidity; + } + } + else + { + [ boxRow ] = yield pg.insert('folders', { + name: boxStatus.name, + uidvalidity: boxStatus.uidvalidity, + account_id: accountId, + highestmodseq: 0, + kind: boxKind||'' + //unread_count: boxStatus.messages.new, + //total_count: boxStatus.messages.total, + }).returning('id').row(gen.ef()); + } + + // fetch new messages + var missing = []; + var [ maxUid ] = yield pg.select('MAX(uid)').from('messages') + .where({ folder_id: boxRow.id }).val(gen.ef()); + if (boxStatus.highestmodseq) + { + yield* this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing); + boxRow.highestmodseq = boxStatus.highestmodseq; + } + else if (doFull && maxUid) + { + // list messages, update flags and version tag + yield* this.fullResync(srv, this.accounts[accountId], boxStatus, boxRow.id, maxUid, missing); + } + + missing.push((maxUid ? maxUid+1 : 1)+':*'); + yield* this.runFetch(srv, missing, { + size: true, + bodies: 'HEADER' + }, boxRow.id, 'saveMessages'); + + yield pg.update('folders', { + uidvalidity: boxRow.uidvalidity, + highestmodseq: boxRow.highestmodseq||0 + }).where({ id: boxRow.id }).run(gen.ef()); +} + +Syncer.runIdle = function*(accountId, srv) +{ + var self = this; + yield srv.openBox('INBOX', true, gen.ef()); + srv.on('uidvalidity', function(uidvalidity) + { + // uidvalidity changes (FUUUU) remove everything + + }); + srv.on('mail', function(count) + { + // new messages arrived while idling, fetch them + gen.run(function*() + { + var srv = yield* self.getConnection(accountId, null, 'S'); + yield* self.syncBox(srv, accountId, 'INBOX'); + self.releaseConnection(accountId, 'S'); + }); + }); + srv.on('vanish', function(uids) + { + // messages expunged by uids + console.log([ 'VANISH', uids ]); + + }); + srv.on('expunge', function(seqno) + { + // message expunged by (FUUUU) sequence number + console.log(arguments); + + }); +} + +Syncer.fullResync = function*(srv, account, box, boxId, maxUid, missing) { var [ flags ] = yield pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef()); flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {}); @@ -337,10 +377,11 @@ Syncer.fullResync = function*(srv, account, box, boxId, maxUid) process.stderr.write(account.email+'/'+box.name+': full resync\n'); process.stderr.write('\rsynchronizing 0'); - yield* this.runFetch(srv, '1:'+maxUid, {}, boxId, 'queueFlags', { flags: flags, updateFlags: updateFlags }); + yield* this.runFetch(srv, '1:'+maxUid, {}, boxId, 'queueFlags', + { flags: flags, updateFlags: updateFlags, missing: missing||[] }); process.stderr.write('\n'); - this.updateFlags(boxId, updateFlags); + yield* this.updateFlags(boxId, updateFlags); // delete messages removed from IMAP server flags = Object.keys(flags); @@ -354,7 +395,7 @@ Syncer.queueFlags = function*(messages, boxId, fetchState) { var m = messages[i][0]; if (!fetchState.flags[m.uid]) - this.missing.push(m.uid); + fetchState.missing.push(m.uid); else { if (fetchState.flags[m.uid].join(',') != m.flags.join(',')) @@ -366,43 +407,62 @@ Syncer.queueFlags = function*(messages, boxId, fetchState) process.stderr.write('\rsynchronizing '+fetchState.synced); } -Syncer.updateFlags = function*(boxId, updateFlags) +Syncer.updateFlags = function*(boxId, updateFlags, checkMissing) { if (updateFlags.length) { - yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') }) + var sql = pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') }) .from('('+pg.sql.values(updateFlags)+') AS t (uid, flags)') - .where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).run(gen.ef()); + .where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')); + if (checkMissing) + { + var [ updated ] = yield sql.returning('m.uid').rows(gen.ef()); + var missing = {}; + for (var i = 0; i < updateFlags.length; i++) + missing[updateFlags[i].uid] = true; + for (var i = 0; i < updated.length; i++) + delete missing[updated[i].uid]; + return Object.keys(missing); + } + else + yield sql.run(gen.ef()); } + return []; } -Syncer.quickResync = function*(srv, account, box, boxId, maxUid, changedSince) +Syncer.quickResync = function*(srv, boxId, maxUid, changedSince, missing) { var updateFlags = []; - this.vanished = []; + var vanished = []; + var onVanish = function(dias) + { + vanished = vanished.concat(vanished, dias); + }; - process.stderr.write(account.email+'/'+box.name+': quick resync\n'); + srv.on('vanish', onVanish); yield* this.runFetch( srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, boxId, 'queueQuickFlags', { updateFlags: updateFlags } ); - this.updateFlags(boxId, updateFlags); + srv.removeListener('vanish', onVanish); + var checkedMissing = yield* this.updateFlags(boxId, updateFlags, missing && true); + if (missing) + missing.push.apply(missing, checkedMissing); - if (this.vanished.length) + if (vanished.length) { let lst = [], dia = []; - for (let i = 0; i < this.vanished.length; i++) + for (let i = 0; i < vanished.length; i++) { - if (this.vanished[i][1]) - dia.push('uid >= '+this.vanished[i][0]+' AND uid <= '+this.vanished[i][1]); + if (vanished[i][1]) + dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]); else - lst.push(this.vanished[i][0]); + lst.push(vanished[i][0]); } if (lst.length) dia.push('uid IN ('+lst.join(',')+')'); yield* this.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('('+dia.join(' OR ')+')'))); } - delete this.vanished; } Syncer.queueQuickFlags = function*(messages, boxId, fetchState) @@ -681,7 +741,7 @@ Syncer.syncAll = function() { Syncer.syncInProgress = true; for (var i = 0; i < cfg.accounts.length; i++) - yield* Syncer.sync(cfg.accounts[i]); + yield* Syncer.syncAccount(cfg.accounts[i]); Syncer.syncInProgress = false; }); }