Use .errorfirst callback, do not re-insert existing messages

master
Vitaliy Filippov 2016-07-22 14:46:44 +03:00
parent a66867072d
commit 9bbdc56d1b
1 changed files with 100 additions and 99 deletions

View File

@ -41,34 +41,30 @@ function toPgArray(a)
function* main(NEXT, account)
{
var accountId;
var [ err, rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(NEXT.cb());
if (err) throw new Error(''+err);
var [ rows ] = yield pg.select('id').from('accounts').where({ email: account.email }).rows(NEXT.ef());
if (rows[0] && rows[0].id)
accountId = rows[0].id;
else
{
var [ err, row ] = yield pg.insert('accounts', {
var [ row ] = pg.insert('accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap
}
}).returning('id').row(NEXT.cb());
if (err) throw new Error(''+err);
}).returning('id').row(NEXT.ef());
accountId = row.id;
}
var srv = new Imap(account.imap);
srv.once('ready', NEXT.cb());
yield srv.connect();
var [ err, boxes ] = yield srv.getBoxes(NEXT.cb());
var [ boxes ] = yield srv.getBoxes(NEXT.ef());
for (var k in boxes)
{
var [ err, box ] = yield srv.openBox(k, true, NEXT.cb());
if (err) throw new Error(''+err);
var [ box ] = yield srv.openBox(k, true, NEXT.ef());
var boxId;
var [ err, rows ] = yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new })
.where({ account_id: accountId, name: box.name }).returning('id').rows(NEXT.cb());
if (err) throw new Error(''+err);
var [ rows ] = yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new })
.where({ account_id: accountId, name: box.name }).returning('id').rows(NEXT.ef());
if (rows[0] && rows[0].id)
{
// IMAP sync: http://tools.ietf.org/html/rfc4549
@ -77,14 +73,13 @@ function* main(NEXT, account)
}
else
{
var [ err, row ] = yield pg.insert('folders', {
var [ row ] = yield pg.insert('folders', {
name: box.name,
uidvalidity: box.uidvalidity,
account_id: accountId,
unread_count: box.messages.new,
// total_count: box.messages.count
}).returning('id').row(NEXT.cb());
if (err) throw new Error(''+err);
}).returning('id').row(NEXT.ef());
boxId = row.id;
}
var f = srv.fetch('1:*', {
@ -120,10 +115,6 @@ function* main(NEXT, account)
// TODO: send pull request
if (srv._curReq.fetchCache)
delete srv._curReq.fetchCache[seqnum];
/*var cached = 0;
for (var i in srv._curReq.fetchCache)
cached++;
console.log('msg '+seqnum+': '+cached+' in cache');*/
parsed++;
if (!paused && parsed > 20)
@ -132,100 +123,110 @@ function* main(NEXT, account)
srv._parser._ignoreReadable = true;
paused = true;
}
try {
var [ pgtx, end_transaction ] = yield pg.transaction(NEXT.cb(), function(e) { if (e) throw e; });
//yield NEXT.throttle(20);
var header = Imap.parseHeader(msgrow.headers);
for (var i in header)
for (var k = 0; k < header[i].length; k++)
header[i][k] = header[i][k].replace(/\x00/g, '');
header.from = header.from && splitEmails(header.from[0])[0];
msgrow.from_email = header.from && header.from.email || '';
msgrow.from_name = header.from && header.from.name || '';
header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0];
msgrow.replyto_email = header.replyto && header.replyto.email || '';
msgrow.replyto_name = header.replyto && header.replyto.name || '';
msgrow.to_list = header.to && header.to[0] || '';
msgrow.cc_list = header.cc && header.cc[0] || '';
msgrow.bcc_list = header.bcc && header.bcc[0] || '';
msgrow.subject = header.subject && header.subject[0] || '';
msgrow.messageid = header['message-id'] && header['message-id'][0] || '';
msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || '';
msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1');
var re = /(<[^>]*>)/;
var references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re));
msgrow.refs = toPgArray(references);
if (references.length)
var pgtx, end_transaction;
try
{
if (references.length > 10)
references = [ references[0] ].concat(references.slice(references.count-9));
if (!msgrow.inreplyto)
msgrow.inreplyto = references[references.length-1];
else if (references[references.length-1] != msgrow.inreplyto)
references.push(msgrow.inreplyto);
var [ err, threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(pg.sql.in('messageid', references)).val(NEXT.cb());
if (err) throw new Error(''+err);
if (!threadId)
[ pgtx, end_transaction ] = yield pg.transaction(NEXT.cb(), function(e) { if (e) throw e; });
msgrow.uid = attrs.uid;
msgrow.folder_id = boxId;
msgrow.flags = 0;
for (var i = 0; i < attrs.flags.length; i++)
msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()];
msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag
var [ exists ] = yield pgtx.select('id').from('messages').where({ folder_id: msgrow.folder_id, uid: msgrow.uid }).rows(NEXT.ef());
if (exists.length)
{
var [ err, threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(NEXT.cb());
if (err) throw new Error(''+err);
process.stderr.write('\rsynchronizing '+msgrow.uid+'...');
yield pgtx.update('messages', { flags: msgrow.flags }).where({ folder_id: msgrow.folder_id, uid: msgrow.uid }).run(NEXT.ef());
}
if (threadId)
else
{
try
var header = Imap.parseHeader(msgrow.headers);
for (var i in header)
for (var k = 0; k < header[i].length; k++)
header[i][k] = header[i][k].replace(/\x00/g, '');
header.from = header.from && splitEmails(header.from[0])[0];
header.replyto = header['reply-to'] && splitEmails(header['reply-to'][0])[0];
var re = /(<[^>]*>)/;
header.references = (header.references && header.references[0] || '').split(re).filter(a => a.match(re));
if (header.references.length)
{
var [ err ] = yield pgtx.update('threads', { msg_count: pg.sql('msg_count+1') })
.where({ id: threadId }).run(NEXT.cb());
if (err) throw new Error(''+err);
if (header.references.length > 10)
header.references = [ header.references[0] ].concat(header.references.slice(header.references.length-9));
if (!header['in-reply-to'] || !header['in-reply-to'][0])
header['in-reply-to'] = [ header.references[header.references.length-1] ];
else if (header.references[header.references.length-1] != header['in-reply-to'][0])
header.references.push(header['in-reply-to'][0]);
}
catch (e)
if (header.date)
{
throw new Error(''+e);
var t = Date.parse(header.date[0]);
if (!isNaN(t))
header.date = new Date(t);
}
if (!header.date)
header.date = new Date(attrs.date);
msgrow.from_email = header.from && header.from.email || '';
msgrow.from_name = header.from && header.from.name || '';
msgrow.replyto_email = header.replyto && header.replyto.email || '';
msgrow.replyto_name = header.replyto && header.replyto.name || '';
msgrow.to_list = header.to && header.to[0] || '';
msgrow.cc_list = header.cc && header.cc[0] || '';
msgrow.bcc_list = header.bcc && header.bcc[0] || '';
msgrow.subject = header.subject && header.subject[0] || '';
msgrow.messageid = header['message-id'] && header['message-id'][0] || '';
msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0] || '';
msgrow.inreplyto = msgrow.inreplyto.replace(/^[\s\S]*(<[^>]*>)[\s\S]*$/, '$1');
msgrow.time = header.date;
msgrow.refs = toPgArray(header.references);
if (header.references.length)
{
var [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(pg.sql.in('messageid', header.references)).val(NEXT.ef());
if (!threadId)
{
[ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(new pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(NEXT.ef());
}
if (threadId)
{
try
{
yield pgtx.update('threads', { msg_count: pg.sql('msg_count+1') })
.where({ id: threadId }).run(NEXT.ef());
}
catch (e)
{
throw new Error(''+e);
}
}
msgrow.thread_id = threadId;
}
console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject);
[ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(NEXT.ef());
if (!msgrow.thread_id)
{
[ msgrow.thread_id ] = yield pgtx.insert('threads', {
first_msg: msgrow.id,
msg_count: 1
}).returning('id').val(NEXT.ef());
yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT.ef());
}
}
msgrow.thread_id = threadId;
}
if (header.date)
{
var t = Date.parse(header.date[0]);
if (!isNaN(t))
msgrow.time = new Date(t);
}
if (!msgrow.time)
msgrow.time = new Date(attrs.date);
msgrow.uid = attrs.uid;
msgrow.folder_id = boxId;
msgrow.flags = 0;
for (var i = 0; i < attrs.flags.length; i++)
msgrow.flags = msgrow.flags || flagNum[attrs.flags[i].toLowerCase()];
msgrow.flags = (msgrow.flags & ~8) | (msgrow.flags & 8 ? 0 : 8); // invert "\seen" (unread) flag
console.log(msgrow.time+' '+msgrow.from_email+' '+msgrow.subject);
var [ err, id ] = yield pgtx.raw(
pgtx.insert('messages', msgrow)+' ON CONFLICT (folder_id, uid) DO UPDATE SET flags=excluded.flags RETURNING id'
).val(NEXT.cb());
if (err) throw new Error(''+err);
msgrow.id = id;
if (!msgrow.thread_id)
{
var [ err, thread_id ] = yield pgtx.insert('threads', {
first_msg: msgrow.id,
msg_count: 1
}).returning('id').val(NEXT.cb());
if (err) throw new Error(''+err);
msgrow.thread_id = thread_id;
var [ err, row ] = yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(NEXT.cb());
if (err) throw new Error(''+err);
}
end_transaction();
} catch (e0) {
end_transaction();
}
catch (e0)
{
if (end_transaction)
end_transaction();
throw e0;
}
parsed--;
if (paused && parsed <= 10)
{