Two more fixes for promise-based code

master
Vitaliy Filippov 2019-05-13 16:23:53 +03:00
parent fcbf95ae6a
commit 6c8edc1667
2 changed files with 14 additions and 19 deletions

View File

@ -132,18 +132,13 @@ class ImapManager
await new Promise((resolve, reject) => await new Promise((resolve, reject) =>
{ {
let error;
let checkFinish = () => let checkFinish = () =>
{ {
if (fetchState.parsed <= 0 && wait) if (fetchState.parsed <= 0 && wait)
{ {
// Если сообщение окончания придёт до окончания обработки // Если сообщение окончания придёт до окончания обработки
// последней порции, тогда ждём окончания обработки // последней порции, тогда не резолвим, а ждём окончания обработки
if (error) resolve();
reject(error);
else
resolve();
} }
}; };
@ -162,7 +157,7 @@ class ImapManager
{ {
this.onMessage(fetchState, msg, seqnum, processor) this.onMessage(fetchState, msg, seqnum, processor)
.then(checkFinish) .then(checkFinish)
.catch(e => { error = e; checkFinish(); }); .catch(e => reject(e));
}); });
f.once('end', () => f.once('end', () =>
@ -176,7 +171,7 @@ class ImapManager
{ {
processor(fetchState.pending, fetchState) processor(fetchState.pending, fetchState)
.then(saveLast) .then(saveLast)
.catch(e => { error = e; saveLast(); }); .catch(e => reject(e));
} }
}); });
}); });
@ -207,7 +202,6 @@ class ImapManager
{ {
let m = fetchState.pending; let m = fetchState.pending;
fetchState.pending = []; fetchState.pending = [];
let err;
let result = await processor(m, fetchState); let result = await processor(m, fetchState);
if (result) if (result)
{ {
@ -220,10 +214,6 @@ class ImapManager
fetchState.srv._parser._ignoreReadable = false; fetchState.srv._parser._ignoreReadable = false;
process.nextTick(fetchState.srv._parser._cbReadable); process.nextTick(fetchState.srv._parser._cbReadable);
} }
if (err)
{
throw err;
}
} }
} }

View File

@ -163,7 +163,7 @@ class Syncer
accountId = row.id; accountId = row.id;
} }
let srv = await this.getSyncConnection(accountId); let srv = await this.getSyncConnection(accountId);
let boxes = await new Promise((r, e) => srv.getBoxes(r)); let boxes = await new Promise((res, err) => srv.getBoxes((e, r) => e ? err(e) : res(r)));
for (let k in boxes) for (let k in boxes)
{ {
let boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase(); let boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase();
@ -230,7 +230,7 @@ class Syncer
async fullResync(srv, boxId, maxUid, missing, total) async fullResync(srv, boxId, maxUid, missing, total)
{ {
let flags = await SQL.select('messages', 'uid, flags', { folder_id: boxId }); let flags = await SQL.select(this.pg, 'messages', 'uid, flags', { folder_id: boxId });
flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {}); flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {});
let updateFlags = []; let updateFlags = [];
@ -312,7 +312,8 @@ class Syncer
srv.on('vanish', onVanish); srv.on('vanish', onVanish);
await this.imap.runFetch( await this.imap.runFetch(
srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } }, srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } },
async (messages, state) => this.queueQuickFlags(messages, boxId, state), { updateFlags: updateFlags } async (messages, state) => this.queueQuickFlags(messages, boxId, state),
{ updateFlags: updateFlags }
); );
srv.removeListener('vanish', onVanish); srv.removeListener('vanish', onVanish);
let checkedMissing = await this.updateFlags(boxId, updateFlags, missing && true); let checkedMissing = await this.updateFlags(boxId, updateFlags, missing && true);
@ -397,8 +398,9 @@ class Syncer
let parser = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' }); let parser = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' });
return await new Promise((r, j) => return await new Promise((r, j) =>
{ {
parse.once('end', r); parser.once('end', r);
parser.write(msg); parser.write(msg);
parser.end();
}); });
} }
@ -408,7 +410,9 @@ class Syncer
for (let i = 0; i < struct.length; i++) for (let i = 0; i < struct.length; i++)
{ {
if (struct[i] instanceof Array) if (struct[i] instanceof Array)
{
this.extractAttachments(struct[i], attachments); this.extractAttachments(struct[i], attachments);
}
else if (struct[i].disposition && struct[i].disposition.type == 'attachment') else if (struct[i].disposition && struct[i].disposition.type == 'attachment')
{ {
attachments.push([ attachments.push([
@ -432,6 +436,7 @@ class Syncer
catch (e) catch (e)
{ {
await this.pg.query('ROLLBACK'); await this.pg.query('ROLLBACK');
throw e;
} }
} }
@ -492,7 +497,7 @@ class Syncer
{ {
threadId = await SQL.select( threadId = await SQL.select(
this.pg, 'messages', 'MAX(thread_id)', this.pg, 'messages', 'MAX(thread_id)',
{ 'refs @> array[?]': msgrow.messageid }, null, SQL.MS_VALUE { 'refs @> array[?]::varchar(1000)[]': msgrow.messageid }, null, SQL.MS_VALUE
); );
if (threadId) if (threadId)
{ {