Add idle listeners
parent
b662c71319
commit
60974be418
1
db.sql
1
db.sql
|
@ -20,6 +20,7 @@ create table folders (
|
||||||
name varchar(255) not null,
|
name varchar(255) not null,
|
||||||
unread_count int not null,
|
unread_count int not null,
|
||||||
highestmodseq int not null default 0,
|
highestmodseq int not null default 0,
|
||||||
|
kind varchar(255) not null,
|
||||||
foreign key (account_id) references accounts (id) on delete cascade on update cascade
|
foreign key (account_id) references accounts (id) on delete cascade on update cascade
|
||||||
);
|
);
|
||||||
create unique index folders_name on folders (account_id, name);
|
create unique index folders_name on folders (account_id, name);
|
||||||
|
|
210
operetta.js
210
operetta.js
|
@ -1,6 +1,6 @@
|
||||||
|
// TODO: Упростить лапшу, в частности syncAccount вообще некорректен при первом запуске
|
||||||
// TODO: Получать, парсить и хранить тела писем (и, вероятно, вложения) + индексировать тексты
|
// TODO: Получать, парсить и хранить тела писем (и, вероятно, вложения) + индексировать тексты
|
||||||
// TODO: Группировка писем
|
// TODO: Группировка писем
|
||||||
// TODO: Висеть в виде демона и сразу получать новые письма (IDLE)
|
|
||||||
// TODO: Сделать подписки на новые сообщения по вебсокетам
|
// TODO: Сделать подписки на новые сообщения по вебсокетам
|
||||||
// TODO: Чего я ещё хотел - интеграцию с maillog'ом и серверным спамфильтром
|
// TODO: Чего я ещё хотел - интеграцию с maillog'ом и серверным спамфильтром
|
||||||
|
|
||||||
|
@ -208,7 +208,9 @@ Syncer.getConnection = function*(accountId, boxName, connKey)
|
||||||
{
|
{
|
||||||
var m;
|
var m;
|
||||||
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
|
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
|
||||||
self.vanished = self.vanished.concat(self.vanished, m[2].split(/,/).map(s => s.split(':')));
|
{
|
||||||
|
srv.emit('vanish', m[2].split(/,/).map(s => s.split(':')));
|
||||||
|
}
|
||||||
oldUT.apply(this);
|
oldUT.apply(this);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -248,7 +250,7 @@ Syncer.releaseConnection = function(accountId, connKey, allowClose)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Syncer.sync = function*(account)
|
Syncer.syncAccount = function*(account)
|
||||||
{
|
{
|
||||||
var self = this;
|
var self = this;
|
||||||
var accountId;
|
var accountId;
|
||||||
|
@ -266,69 +268,107 @@ Syncer.sync = function*(account)
|
||||||
}).returning('id').row(gen.ef());
|
}).returning('id').row(gen.ef());
|
||||||
accountId = row.id;
|
accountId = row.id;
|
||||||
}
|
}
|
||||||
|
|
||||||
var srv = yield* self.getConnection(accountId, null, 'S');
|
var srv = yield* self.getConnection(accountId, null, 'S');
|
||||||
var [ boxes ] = yield srv.getBoxes(gen.ef());
|
var [ boxes ] = yield srv.getBoxes(gen.ef());
|
||||||
for (var k in boxes)
|
for (var k in boxes)
|
||||||
{
|
{
|
||||||
var [ box ] = yield srv.openBox(k, true, gen.ef());
|
var boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase();
|
||||||
var boxId, changedSince = 0;
|
yield* self.syncBox(srv, accountId, k, boxKind, true);
|
||||||
// IMAP sync: http://tools.ietf.org/html/rfc4549
|
|
||||||
var [ row ] = yield pg.select('*').from('folders')
|
|
||||||
.where({ account_id: accountId, name: box.name }).rows(gen.ef());
|
|
||||||
if (row.length)
|
|
||||||
{
|
|
||||||
row = row[0];
|
|
||||||
changedSince = row.highestmodseq;
|
|
||||||
boxId = row.id;
|
|
||||||
if (row.uidvalidity != box.uidvalidity)
|
|
||||||
{
|
|
||||||
yield pg.delete('messages').where({ folder_id: row.id })
|
|
||||||
.where(pg.sql('uid is not null')).run(gen.ef());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
[ row ] = yield pg.insert('folders', {
|
|
||||||
name: box.name,
|
|
||||||
uidvalidity: box.uidvalidity,
|
|
||||||
account_id: accountId,
|
|
||||||
highestmodseq: 0,
|
|
||||||
//unread_count: box.messages.new,
|
|
||||||
//total_count: box.messages.total,
|
|
||||||
}).returning('id').row(gen.ef());
|
|
||||||
boxId = row.id;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.missing = [];
|
|
||||||
var [ maxUid ] = yield pg.select('MAX(uid)').from('messages').where({ folder_id: boxId }).val(gen.ef());
|
|
||||||
if (changedSince)
|
|
||||||
{
|
|
||||||
yield* self.quickResync(srv, account, box, boxId, maxUid, changedSince);
|
|
||||||
}
|
|
||||||
else if (maxUid)
|
|
||||||
{
|
|
||||||
// list messages, update flags and version tag
|
|
||||||
yield* self.fullResync(srv, account, box, boxId, maxUid);
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetch new messages
|
|
||||||
self.missing.push((maxUid ? maxUid+1 : 1)+':*');
|
|
||||||
yield* self.runFetch(srv, self.missing, {
|
|
||||||
size: true,
|
|
||||||
bodies: 'HEADER'
|
|
||||||
}, boxId, 'saveMessages');
|
|
||||||
|
|
||||||
yield pg.update('folders', {
|
|
||||||
uidvalidity: box.uidvalidity,
|
|
||||||
//unread_count: box.messages.new,
|
|
||||||
highestmodseq: box.highestmodseq||0
|
|
||||||
}).where({ id: row.id }).run(gen.ef());
|
|
||||||
}
|
}
|
||||||
|
yield* self.runIdle(accountId, srv);
|
||||||
self.releaseConnection(accountId, 'S');
|
self.releaseConnection(accountId, 'S');
|
||||||
}
|
}
|
||||||
|
|
||||||
Syncer.fullResync = function*(srv, account, box, boxId, maxUid)
|
Syncer.syncBox = function*(srv, accountId, boxName, boxKind, doFull)
|
||||||
|
{
|
||||||
|
var [ boxStatus ] = yield srv.openBox(boxName, true, gen.ef());
|
||||||
|
|
||||||
|
// IMAP sync: http://tools.ietf.org/html/rfc4549
|
||||||
|
var [ boxRow ] = yield pg.select('*').from('folders')
|
||||||
|
.where({ account_id: accountId, name: boxStatus.name }).rows(gen.ef());
|
||||||
|
if (boxRow.length)
|
||||||
|
{
|
||||||
|
boxRow = boxRow[0];
|
||||||
|
if (boxRow.uidvalidity != boxStatus.uidvalidity)
|
||||||
|
{
|
||||||
|
yield* this.deleteMessages(pg.sql.and({ folder_id: boxRow.id }, pg.sql('uid is not null')));
|
||||||
|
boxRow.uidvalidity = boxStatus.uidvalidity;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
[ boxRow ] = yield pg.insert('folders', {
|
||||||
|
name: boxStatus.name,
|
||||||
|
uidvalidity: boxStatus.uidvalidity,
|
||||||
|
account_id: accountId,
|
||||||
|
highestmodseq: 0,
|
||||||
|
kind: boxKind||''
|
||||||
|
//unread_count: boxStatus.messages.new,
|
||||||
|
//total_count: boxStatus.messages.total,
|
||||||
|
}).returning('id').row(gen.ef());
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch new messages
|
||||||
|
var missing = [];
|
||||||
|
var [ maxUid ] = yield pg.select('MAX(uid)').from('messages')
|
||||||
|
.where({ folder_id: boxRow.id }).val(gen.ef());
|
||||||
|
if (boxStatus.highestmodseq)
|
||||||
|
{
|
||||||
|
yield* this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing);
|
||||||
|
boxRow.highestmodseq = boxStatus.highestmodseq;
|
||||||
|
}
|
||||||
|
else if (doFull && maxUid)
|
||||||
|
{
|
||||||
|
// list messages, update flags and version tag
|
||||||
|
yield* this.fullResync(srv, this.accounts[accountId], boxStatus, boxRow.id, maxUid, missing);
|
||||||
|
}
|
||||||
|
|
||||||
|
missing.push((maxUid ? maxUid+1 : 1)+':*');
|
||||||
|
yield* this.runFetch(srv, missing, {
|
||||||
|
size: true,
|
||||||
|
bodies: 'HEADER'
|
||||||
|
}, boxRow.id, 'saveMessages');
|
||||||
|
|
||||||
|
yield pg.update('folders', {
|
||||||
|
uidvalidity: boxRow.uidvalidity,
|
||||||
|
highestmodseq: boxRow.highestmodseq||0
|
||||||
|
}).where({ id: boxRow.id }).run(gen.ef());
|
||||||
|
}
|
||||||
|
|
||||||
|
Syncer.runIdle = function*(accountId, srv)
|
||||||
|
{
|
||||||
|
var self = this;
|
||||||
|
yield srv.openBox('INBOX', true, gen.ef());
|
||||||
|
srv.on('uidvalidity', function(uidvalidity)
|
||||||
|
{
|
||||||
|
// uidvalidity changes (FUUUU) remove everything
|
||||||
|
|
||||||
|
});
|
||||||
|
srv.on('mail', function(count)
|
||||||
|
{
|
||||||
|
// <count> new messages arrived while idling, fetch them
|
||||||
|
gen.run(function*()
|
||||||
|
{
|
||||||
|
var srv = yield* self.getConnection(accountId, null, 'S');
|
||||||
|
yield* self.syncBox(srv, accountId, 'INBOX');
|
||||||
|
self.releaseConnection(accountId, 'S');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
srv.on('vanish', function(uids)
|
||||||
|
{
|
||||||
|
// messages expunged by uids
|
||||||
|
console.log([ 'VANISH', uids ]);
|
||||||
|
|
||||||
|
});
|
||||||
|
srv.on('expunge', function(seqno)
|
||||||
|
{
|
||||||
|
// message expunged by (FUUUU) sequence number
|
||||||
|
console.log(arguments);
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Syncer.fullResync = function*(srv, account, box, boxId, maxUid, missing)
|
||||||
{
|
{
|
||||||
var [ flags ] = yield pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef());
|
var [ flags ] = yield pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef());
|
||||||
flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {});
|
flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {});
|
||||||
|
@ -337,10 +377,11 @@ Syncer.fullResync = function*(srv, account, box, boxId, maxUid)
|
||||||
|
|
||||||
process.stderr.write(account.email+'/'+box.name+': full resync\n');
|
process.stderr.write(account.email+'/'+box.name+': full resync\n');
|
||||||
process.stderr.write('\rsynchronizing 0');
|
process.stderr.write('\rsynchronizing 0');
|
||||||
yield* this.runFetch(srv, '1:'+maxUid, {}, boxId, 'queueFlags', { flags: flags, updateFlags: updateFlags });
|
yield* this.runFetch(srv, '1:'+maxUid, {}, boxId, 'queueFlags',
|
||||||
|
{ flags: flags, updateFlags: updateFlags, missing: missing||[] });
|
||||||
process.stderr.write('\n');
|
process.stderr.write('\n');
|
||||||
|
|
||||||
this.updateFlags(boxId, updateFlags);
|
yield* this.updateFlags(boxId, updateFlags);
|
||||||
|
|
||||||
// delete messages removed from IMAP server
|
// delete messages removed from IMAP server
|
||||||
flags = Object.keys(flags);
|
flags = Object.keys(flags);
|
||||||
|
@ -354,7 +395,7 @@ Syncer.queueFlags = function*(messages, boxId, fetchState)
|
||||||
{
|
{
|
||||||
var m = messages[i][0];
|
var m = messages[i][0];
|
||||||
if (!fetchState.flags[m.uid])
|
if (!fetchState.flags[m.uid])
|
||||||
this.missing.push(m.uid);
|
fetchState.missing.push(m.uid);
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (fetchState.flags[m.uid].join(',') != m.flags.join(','))
|
if (fetchState.flags[m.uid].join(',') != m.flags.join(','))
|
||||||
|
@ -366,43 +407,62 @@ Syncer.queueFlags = function*(messages, boxId, fetchState)
|
||||||
process.stderr.write('\rsynchronizing '+fetchState.synced);
|
process.stderr.write('\rsynchronizing '+fetchState.synced);
|
||||||
}
|
}
|
||||||
|
|
||||||
Syncer.updateFlags = function*(boxId, updateFlags)
|
Syncer.updateFlags = function*(boxId, updateFlags, checkMissing)
|
||||||
{
|
{
|
||||||
if (updateFlags.length)
|
if (updateFlags.length)
|
||||||
{
|
{
|
||||||
yield pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') })
|
var sql = pg.update('messages m', { flags: pg.sql('t.flags::varchar(255)[]') })
|
||||||
.from('('+pg.sql.values(updateFlags)+') AS t (uid, flags)')
|
.from('('+pg.sql.values(updateFlags)+') AS t (uid, flags)')
|
||||||
.where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid')).run(gen.ef());
|
.where({ 'm.folder_id': boxId }).where(pg.sql('m.uid=t.uid'));
|
||||||
|
if (checkMissing)
|
||||||
|
{
|
||||||
|
var [ updated ] = yield sql.returning('m.uid').rows(gen.ef());
|
||||||
|
var missing = {};
|
||||||
|
for (var i = 0; i < updateFlags.length; i++)
|
||||||
|
missing[updateFlags[i].uid] = true;
|
||||||
|
for (var i = 0; i < updated.length; i++)
|
||||||
|
delete missing[updated[i].uid];
|
||||||
|
return Object.keys(missing);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
yield sql.run(gen.ef());
|
||||||
}
|
}
|
||||||
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
Syncer.quickResync = function*(srv, account, box, boxId, maxUid, changedSince)
|
Syncer.quickResync = function*(srv, boxId, maxUid, changedSince, missing)
|
||||||
{
|
{
|
||||||
var updateFlags = [];
|
var updateFlags = [];
|
||||||
this.vanished = [];
|
var vanished = [];
|
||||||
|
var onVanish = function(dias)
|
||||||
|
{
|
||||||
|
vanished = vanished.concat(vanished, dias);
|
||||||
|
};
|
||||||
|
|
||||||
process.stderr.write(account.email+'/'+box.name+': quick resync\n');
|
srv.on('vanish', onVanish);
|
||||||
yield* this.runFetch(
|
yield* this.runFetch(
|
||||||
srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } },
|
srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } },
|
||||||
boxId, 'queueQuickFlags', { updateFlags: updateFlags }
|
boxId, 'queueQuickFlags', { updateFlags: updateFlags }
|
||||||
);
|
);
|
||||||
this.updateFlags(boxId, updateFlags);
|
srv.removeListener('vanish', onVanish);
|
||||||
|
var checkedMissing = yield* this.updateFlags(boxId, updateFlags, missing && true);
|
||||||
|
if (missing)
|
||||||
|
missing.push.apply(missing, checkedMissing);
|
||||||
|
|
||||||
if (this.vanished.length)
|
if (vanished.length)
|
||||||
{
|
{
|
||||||
let lst = [], dia = [];
|
let lst = [], dia = [];
|
||||||
for (let i = 0; i < this.vanished.length; i++)
|
for (let i = 0; i < vanished.length; i++)
|
||||||
{
|
{
|
||||||
if (this.vanished[i][1])
|
if (vanished[i][1])
|
||||||
dia.push('uid >= '+this.vanished[i][0]+' AND uid <= '+this.vanished[i][1]);
|
dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]);
|
||||||
else
|
else
|
||||||
lst.push(this.vanished[i][0]);
|
lst.push(vanished[i][0]);
|
||||||
}
|
}
|
||||||
if (lst.length)
|
if (lst.length)
|
||||||
dia.push('uid IN ('+lst.join(',')+')');
|
dia.push('uid IN ('+lst.join(',')+')');
|
||||||
yield* this.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('('+dia.join(' OR ')+')')));
|
yield* this.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('('+dia.join(' OR ')+')')));
|
||||||
}
|
}
|
||||||
delete this.vanished;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Syncer.queueQuickFlags = function*(messages, boxId, fetchState)
|
Syncer.queueQuickFlags = function*(messages, boxId, fetchState)
|
||||||
|
@ -681,7 +741,7 @@ Syncer.syncAll = function()
|
||||||
{
|
{
|
||||||
Syncer.syncInProgress = true;
|
Syncer.syncInProgress = true;
|
||||||
for (var i = 0; i < cfg.accounts.length; i++)
|
for (var i = 0; i < cfg.accounts.length; i++)
|
||||||
yield* Syncer.sync(cfg.accounts[i]);
|
yield* Syncer.syncAccount(cfg.accounts[i]);
|
||||||
Syncer.syncInProgress = false;
|
Syncer.syncInProgress = false;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue