WIP: ES6 + async/await rework, pg-bricks replaced with an own select-builder

master
Vitaliy Filippov 2019-05-08 16:39:14 +03:00
parent 5456d34e23
commit de1429e73a
7 changed files with 1229 additions and 873 deletions

View File

@ -1,240 +1,260 @@
const gen = require('gen-thread');
const Imap = require('imap');
module.exports = ImapManager;
function ImapManager()
class ImapManager
{
this.accounts = {};
this.connections = {};
this.busy = {};
this.selected = {};
this.queue = {};
this.onIdle = {};
this.onStopIdle = {};
}
ImapManager.prototype.setServer = function(accountId, settings)
{
this.accounts[accountId] = settings;
}
ImapManager.prototype.getConnection = function*(accountId, boxName, connKey, onIdle, onStopIdle)
{
var self = this;
connKey = accountId+(connKey||'');
if (self.connections[connKey])
constructor()
{
let stoppingIdle = self.queue[connKey].length == 0;
if (self.busy[connKey])
this.accounts = {};
this.connections = {};
this.busy = {};
this.selected = {};
this.queue = {};
this.onIdle = {};
this.onStopIdle = {};
}
setServer(accountId, settings)
{
this.accounts[accountId] = settings;
}
async getConnection(accountId, boxName, connKey, onIdle, onStopIdle)
{
connKey = accountId+(connKey||'');
if (this.connections[connKey])
{
// wait for the queue to finish
yield self.queue[connKey].push(gen.cb());
let stoppingIdle = this.queue[connKey].length == 0;
if (this.busy[connKey])
{
// wait for the queue to finish
await this.queue[connKey]();
}
if (stoppingIdle && this.onStopIdle[connKey])
{
// run "stop idle" callback
this.onStopIdle[connKey](accountId, this.connections[connKey]);
}
if (boxName && this.selected[connKey] != boxName)
{
// select different box
await new Promise((r, e) => this.connections[connKey].openBox(boxName, true, r));
this.selected[connKey] = boxName;
}
this.busy[connKey] = true;
return this.connections[connKey];
}
if (stoppingIdle && self.onStopIdle[connKey])
let srv = new Imap(self.accounts[accountId]);
// FIXME handle connection errors
await new Promise((r, e) =>
{
// run "stop idle" callback
self.onStopIdle[connKey](accountId, self.connections[connKey]);
}
if (boxName && self.selected[connKey] != boxName)
srv.once('ready', r);
srv.connect();
});
await new Promise((r, e) => srv._enqueue('ENABLE QRESYNC', r));
// Monkey-patch node-imap to support VANISHED responses
var oldUT = srv._parser._resUntagged;
srv._parser._resUntagged = () =>
{
// select different box
yield self.connections[connKey].openBox(boxName, true, gen.ef());
self.selected[connKey] = boxName;
}
self.busy[connKey] = true;
return self.connections[connKey];
}
var m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
{
srv.emit('vanish', m[2].split(/,/).map(s => s.split(':')));
}
oldUT.apply(this);
};
var srv = new Imap(self.accounts[accountId]);
srv.once('ready', gen.cb());
// FIXME handle connection errors
yield srv.connect();
yield srv._enqueue('ENABLE QRESYNC', gen.cb());
// Monkey-patch node-imap to support VANISHED responses
var oldUT = srv._parser._resUntagged;
srv._parser._resUntagged = function()
{
var m;
if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer))
srv.on('close', () =>
{
srv.emit('vanish', m[2].split(/,/).map(s => s.split(':')));
delete this.connections[connKey];
if (this.srv == srv)
{
this.srv = null;
}
});
if (boxName)
{
await new Promise((r, e) => srv.openBox(boxName, true, r));
this.selected[connKey] = boxName;
}
oldUT.apply(this);
};
srv.on('close', function()
{
delete self.connections[connKey];
if (self.srv == srv)
delete self.srv;
});
if (boxName)
{
yield srv.openBox(boxName, true, gen.ef());
self.selected[connKey] = boxName;
this.connections[connKey] = srv;
this.busy[connKey] = true;
this.queue[connKey] = [];
this.onIdle[connKey] = onIdle;
this.onStopIdle[connKey] = onStopIdle;
return srv;
}
self.connections[connKey] = srv;
self.busy[connKey] = true;
self.queue[connKey] = [];
self.onIdle[connKey] = onIdle;
self.onStopIdle[connKey] = onStopIdle;
return srv;
}
ImapManager.prototype.releaseConnection = function(accountId, connKey, allowClose)
{
var self = this;
connKey = accountId+(connKey||'');
self.busy[connKey] = false;
if (self.queue[connKey].length)
releaseConnection(accountId, connKey, allowClose)
{
(self.queue[connKey].shift())();
connKey = accountId + (connKey||'');
this.busy[connKey] = false;
if (this.queue[connKey].length)
{
(this.queue[connKey].shift())();
}
else if (allowClose)
{
this.connections[connKey].end();
delete this.connections[connKey];
delete this.busy[connKey];
delete this.queue[connKey];
delete this.selected[connKey];
}
else
{
if (this.onIdle[connKey])
this.onIdle[connKey](accountId, this.connections[connKey]);
}
}
else if (allowClose)
{
self.connections[connKey].end();
delete self.connections[connKey];
delete self.busy[connKey];
delete self.queue[connKey];
delete self.selected[connKey];
}
else
{
if (self.onIdle[connKey])
self.onIdle[connKey](accountId, self.connections[connKey]);
}
}
ImapManager.prototype.runFetch = function*(srv, what, params, processor, args)
{
var self = this;
var f = srv.fetch(what, params);
var fetchState = {
...(args||{}),
parsed: 0,
paused: false,
synced: 0,
pending: [],
results: [],
srv: srv
};
var cb, wait;
f.on('message', function(msg, seqnum)
async runFetch(srv, what, params, processor, args)
{
gen.run(self.onMessage(fetchState, msg, seqnum, processor), checkFinish, function(e) { checkFinish(); throw e; });
});
let f = srv.fetch(what, params);
cb = gen.cb();
yield f.once('end', function()
{
wait = true;
if (fetchState.parsed <= 0)
cb();
else if (fetchState.pending.length > 0)
gen.run(processor(fetchState.pending, fetchState), saveLast, function(e) { saveLast(); throw e; });
});
let fetchState = {
...(args||{}),
parsed: 0,
paused: false,
synced: 0,
pending: [],
results: [],
srv: srv,
};
let wait;
await new Promise((resolve, reject) =>
{
let error;
let checkFinish = () =>
{
if (fetchState.parsed <= 0 && wait)
{
// Если сообщение окончания придёт до окончания обработки
// последней порции, тогда ждём окончания обработки
if (error)
reject(error);
else
resolve();
}
};
let saveLast = (results) =>
{
if (results)
{
fetchState.results = fetchState.results.concat(results);
}
fetchState.parsed -= fetchState.pending.length;
fetchState.pending = [];
checkFinish();
};
f.on('message', (msg, seqnum) =>
{
this.onMessage(fetchState, msg, seqnum, processor)
.then(checkFinish)
.catch(e => { error = e; checkFinish(); });
});
f.once('end', () =>
{
wait = true;
if (fetchState.parsed <= 0)
{
resolve();
}
else if (fetchState.pending.length > 0)
{
processor(fetchState.pending, fetchState)
.then(saveLast)
.catch(e => { error = e; saveLast(); });
}
});
});
if (fetchState.results.length > 0)
{
return fetchState.results;
}
function saveLast(r)
async onMessage(fetchState, msg, seqnum, processor)
{
if (r)
fetchState.results = fetchState.results.concat(r);
fetchState.parsed -= fetchState.pending.length;
fetchState.pending = [];
checkFinish();
}
function checkFinish()
{
if (fetchState.parsed <= 0 && wait)
cb();
}
};
ImapManager.prototype.onMessage = function*(fetchState, msg, seqnum, processor)
{
var self = this;
var [ msgrow, attrs ] = yield* self.parseMessage(msg, seqnum);
// Workaround memory leak in node-imap
// TODO: send pull request
if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache)
delete fetchState.srv._curReq.fetchCache[seqnum];
fetchState.pending.push([ msgrow, attrs ]);
fetchState.parsed++;
if (!fetchState.paused && fetchState.parsed >= 100 && !fetchState.nopause)
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
fetchState.srv._parser._ignoreReadable = true;
fetchState.paused = true;
}
if (fetchState.pending.length >= 100)
{
var m = fetchState.pending;
fetchState.pending = [];
var err;
var result;
try
let [ msgrow, attrs ] = await this.parseMessage(msg, seqnum);
// Workaround memory leak in node-imap
// TODO: send pull request
if (fetchState.srv._curReq && fetchState.srv._curReq.fetchCache)
{
result = yield gen.run(processor(m, fetchState), gen.cb());
delete fetchState.srv._curReq.fetchCache[seqnum];
}
fetchState.pending.push([ msgrow, attrs ]);
fetchState.parsed++;
if (!fetchState.paused && fetchState.parsed >= 100 && !fetchState.nopause)
{
// ГОРШОЧЕК, НЕ ВАРИ!!! И так уже кучу сообщений прочитал из сокета, хорош!
fetchState.srv._parser._ignoreReadable = true;
fetchState.paused = true;
}
if (fetchState.pending.length >= 100)
{
let m = fetchState.pending;
fetchState.pending = [];
let err;
let result = await processor(m, fetchState);
if (result)
{
fetchState.results = fetchState.results.concat(result);
}
fetchState.parsed -= m.length;
if (fetchState.paused && fetchState.parsed < 100)
{
fetchState.paused = false;
fetchState.srv._parser._ignoreReadable = false;
process.nextTick(fetchState.srv._parser._cbReadable);
}
if (err)
{
throw err;
}
}
catch (e)
}
async parseMessage(msg, seqnum)
{
let msgrow = {};
let attrs;
msg.on('body', function(stream, info)
{
err = e;
}
fetchState.parsed -= m.length;
if (fetchState.paused && fetchState.parsed < 100)
let buffer;
stream.on('data', function(chunk)
{
if (!buffer)
buffer = chunk;
else
buffer = Buffer.concat([ buffer, chunk ]);
});
stream.once('end', function()
{
msgrow.headers = buffer;
});
});
msg.once('attributes', function(a)
{
fetchState.paused = false;
fetchState.srv._parser._ignoreReadable = false;
process.nextTick(fetchState.srv._parser._cbReadable);
}
if (err)
throw err;
attrs = a;
});
await new Promise((r, e) => msg.once('end', r));
msgrow.uid = attrs.uid;
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$'));
let nf = msgrow.flags.filter(f => f != 'seen');
nf = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
msgrow.flags = nf.sort();
return [ msgrow, attrs ];
}
}
ImapManager.prototype.parseMessage = function*(msg, seqnum)
{
var msgrow = {};
var attrs;
msg.on('body', function(stream, info)
{
var buffer;
stream.on('data', function(chunk)
{
if (!buffer)
buffer = chunk;
else
buffer = Buffer.concat([ buffer, chunk ]);
});
stream.once('end', function()
{
msgrow.headers = buffer;
});
});
msg.once('attributes', function(a) {
attrs = a;
});
yield msg.once('end', gen.cb());
msgrow.uid = attrs.uid;
msgrow.flags = attrs.flags.map(f => f[0] == '\\' ? f.toLowerCase().replace(/^\\/, '') : f.replace(/^\$*/, '$'));
var nf = msgrow.flags.filter(f => f != 'seen');
nf = nf.length == msgrow.flags.length ? nf.concat(['unread']) : nf;
msgrow.flags = nf.sort();
return [ msgrow, attrs ];
}

808
Syncer.js
View File

@ -1,4 +1,3 @@
const gen = require('gen-thread');
const Imap = require('imap');
const ImapManager = require('./ImapManager.js');
const EventEmitter = require('events').EventEmitter;
@ -8,413 +7,432 @@ const mimelib = require('mimelib');
module.exports = Syncer;
function Syncer(pg)
class Syncer
{
this.syncInProgress = false;
this.pg = pg;
this.imap = new ImapManager();
this.runIdle = this.runIdle.bind(this);
this.stopIdle = this.stopIdle.bind(this);
this.events = new EventEmitter();
}
Syncer.prototype.init = function*(cfg)
{
for (var i = 0; i < cfg.accounts.length; i++)
yield* this.addAccount(cfg.accounts[i]);
yield* this.loadAccounts();
}
Syncer.prototype.syncAll = function*()
{
this.syncInProgress = true;
for (var id in this.accounts)
yield* this.syncAccount(this.accounts[id]);
this.syncInProgress = false;
this.events.emit('sync', { state: 'complete' });
}
Syncer.prototype.addAccount = function*(account)
{
var self = this;
var [ row ] = yield this.pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef());
if (row.length)
constructor(pg)
{
row = row[0];
yield this.pg.update('accounts', { settings: { imap: account.imap, folders: account.folders } }).where({ id: row.id }).run(gen.ef());
this.syncInProgress = false;
this.pg = pg;
this.imap = new ImapManager();
this.runIdle = this.runIdle.bind(this);
this.stopIdle = this.stopIdle.bind(this);
this.events = new EventEmitter();
}
else
async init(cfg)
{
[ row ] = yield this.pg.insert('accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap,
folders: account.folders
}
}).returning('*').row(gen.ef());
}
return row.id;
}
Syncer.prototype.loadAccounts = function*()
{
let [ rows ] = yield this.pg.select('*').from('accounts').rows(gen.ef());
this.accounts = {};
for (var i = 0; i < rows.length; i++)
{
this.accounts[rows[i].id] = rows[i];
this.imap.setServer(rows[i].id, rows[i].settings.imap);
}
}
Syncer.prototype.getSyncConnection = function*(accountId, boxName)
{
var srv = yield* this.imap.getConnection(accountId, null, 'S', this.runIdle, this.stopIdle);
return srv;
}
Syncer.prototype.idleUidvalidity = function(accountId, uidvalidity)
{
// uidvalidity changes (FUUUU) remove everything and resync
}
Syncer.prototype.idleMail = function(accountId, count)
{
// <count> new messages arrived while idling, fetch them
var self = this;
gen.run(function*()
{
var srv = yield* self.getSyncConnection(accountId);
yield* self.syncBox(srv, accountId, 'INBOX');
self.releaseSyncConnection(accountId);
});
}
Syncer.prototype.idleVanish = function(accountId, uids)
{
// messages expunged by uids
var self = this;
gen.run(function*()
{
let [ boxId ] = yield self.pg.select('id').from('folders')
.where({ name: 'INBOX', account_id: accountId }).val(gen.ef());
yield* self.deleteVanished(boxId, uids);
});
}
Syncer.prototype.idleExpunge = function(accountId, seqno)
{
// message expunged by (FUUUU) sequence number(s?)
var self = this;
gen.run(function*()
{
var srv = yield* self.getSyncConnection(accountId);
yield* self.syncBox(srv, accountId, 'INBOX');
self.releaseSyncConnection(accountId);
});
}
Syncer.prototype.runIdle = function(accountId, srv)
{
var self = this;
if (!srv._idleCallbacks)
{
srv._idleCallbacks = {
uidvalidity: this.idleUidvalidity.bind(this, accountId),
mail: this.idleMail.bind(this, accountId),
vanish: this.idleVanish.bind(this, accountId),
expunge: this.idleExpunge.bind(this, accountId)
}
}
for (var i in srv._idleCallbacks)
{
srv.on(i, srv._idleCallbacks[i]);
}
srv.openBox('INBOX', true, function() {});
}
Syncer.prototype.stopIdle = function(accountId, srv)
{
for (var i in srv._idleCallbacks)
{
srv.removeListener(i, srv._idleCallbacks[i]);
}
}
Syncer.prototype.releaseSyncConnection = function(accountId, boxName)
{
this.imap.releaseConnection(accountId, 'S');
}
Syncer.prototype.syncAccount = function*(account)
{
var self = this;
var accountId;
var [ rows ] = yield this.pg.select('id').from('accounts').where({ email: account.email }).rows(gen.ef());
if (rows[0] && rows[0].id)
accountId = rows[0].id;
else
{
var [ row ] = yield this.pg.insert('accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap
}
}).returning('id').row(gen.ef());
accountId = row.id;
}
var srv = yield* self.getSyncConnection(accountId);
var [ boxes ] = yield srv.getBoxes(gen.ef());
for (var k in boxes)
{
var boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase();
yield* self.syncBox(srv, accountId, k, boxKind, true);
}
self.releaseSyncConnection(accountId);
}
Syncer.prototype.syncBox = function*(srv, accountId, boxName, boxKind, doFull)
{
var [ boxStatus ] = yield srv.openBox(boxName, true, gen.ef());
// IMAP sync: http://tools.ietf.org/html/rfc4549
var [ boxRow ] = yield this.pg.select('*').from('folders')
.where({ account_id: accountId, name: boxStatus.name }).rows(gen.ef());
if (boxRow.length)
{
boxRow = boxRow[0];
if (boxRow.uidvalidity != boxStatus.uidvalidity)
for (var i = 0; i < cfg.accounts.length; i++)
{
yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxRow.id }, this.pg.sql('uid is not null')));
boxRow.uidvalidity = boxStatus.uidvalidity;
await this.addAccount(cfg.accounts[i]);
}
await this.loadAccounts();
}
async syncAll()
{
this.syncInProgress = true;
for (let id in this.accounts)
{
await this.syncAccount(this.accounts[id]);
}
this.syncInProgress = false;
this.events.emit('sync', { state: 'complete' });
}
async addAccount(account)
{
let row = await SQL.select(this.pg, 'accounts', 'id', { email: account.email }, null, SQL.MS_ROW);
if (row)
{
await SQL.update(this.pg, 'accounts', {
settings: { imap: account.imap, folders: account.folders }
}, { id: row.id });
}
else
{
row = (await SQL.insert('accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap,
folders: account.folders
}
}, '*'))[0];
}
return row.id;
}
async loadAccounts()
{
let rows = await SQL.select(this.pg, 'accounts', '*', []);
this.accounts = {};
for (let i = 0; i < rows.length; i++)
{
this.accounts[rows[i].id] = rows[i];
this.imap.setServer(rows[i].id, rows[i].settings.imap);
}
}
else
async getSyncConnection(accountId, boxName)
{
[ boxRow ] = yield this.pg.insert('folders', {
name: boxStatus.name,
return await this.imap.getConnection(accountId, null, 'S', this.runIdle, this.stopIdle);
}
idleUidvalidity(accountId, uidvalidity)
{
// FIXME uidvalidity changes (FUUUU) remove everything and resync
}
idleMail(accountId, count)
{
// <count> new messages arrived while idling, fetch them
(async () =>
{
let srv = await this.getSyncConnection(accountId);
await this.syncBox(srv, accountId, 'INBOX');
this.releaseSyncConnection(accountId);
})().catch(console.error);
}
idleVanish(accountId, uids)
{
// messages expunged by uids
(async () =>
{
let boxId = await SQL.select(
this.pg, 'folders', 'id', { name: 'INBOX', account_id: accountId }, null, SQL.MS_VALUE
);
await this.deleteVanished(boxId, uids);
})().catch(console.error);
}
idleExpunge(accountId, seqno)
{
// message expunged by (FUUUU) sequence number(s?)
(async () =>
{
let srv = await this.getSyncConnection(accountId);
await this.syncBox(srv, accountId, 'INBOX');
this.releaseSyncConnection(accountId);
})().catch(console.error);
}
runIdle(accountId, srv)
{
if (!srv._idleCallbacks)
{
srv._idleCallbacks = {
uidvalidity: this.idleUidvalidity.bind(this, accountId),
mail: this.idleMail.bind(this, accountId),
vanish: this.idleVanish.bind(this, accountId),
expunge: this.idleExpunge.bind(this, accountId)
};
}
for (let i in srv._idleCallbacks)
{
srv.on(i, srv._idleCallbacks[i]);
}
srv.openBox('INBOX', true, () => {});
}
stopIdle(accountId, srv)
{
for (let i in srv._idleCallbacks)
{
srv.removeListener(i, srv._idleCallbacks[i]);
}
}
releaseSyncConnection(accountId, boxName)
{
this.imap.releaseConnection(accountId, 'S');
}
async syncAccount(account)
{
let accountId = await SQL.select(this.pg, 'accounts', 'id', { email: account.email }, null, SQL.MS_VALUE);
if (accountId)
{
let row = (await SQL.insert(this.pg, 'accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap
}
}, 'id'))[0];
accountId = row.id;
}
let srv = await this.getSyncConnection(accountId);
let boxes = await new Promise((r, e) => srv.getBoxes(r));
for (let k in boxes)
{
let boxKind = (boxes[k].special_use_attrib || '').replace('\\', '').toLowerCase();
await this.syncBox(srv, accountId, k, boxKind, true);
}
this.releaseSyncConnection(accountId);
}
async syncBox(srv, accountId, boxName, boxKind, doFull)
{
let boxStatus = await new Promise((r, e) => srv.openBox(boxName, true, r));
// IMAP sync: http://tools.ietf.org/html/rfc4549
let boxRow = await SQL.select(this.pg, 'folders', '*', { account_id: accountId, name: boxStatus.name }, null, SQL.MS_ROW);
if (boxRow)
{
if (boxRow.uidvalidity != boxStatus.uidvalidity)
{
await this.deleteMessages({ folder_id: boxRow.id, 'uid is not null': [] });
boxRow.uidvalidity = boxStatus.uidvalidity;
}
}
else
{
boxRow = (await SQL.insert(this.pg, 'folders', {
name: boxStatus.name,
uidvalidity: boxStatus.uidvalidity,
account_id: accountId,
highestmodseq: 0,
kind: boxKind||''
}, '*'))[0];
}
// fetch new messages
let missing = [];
let maxUid = await SQL.select(this.pg, 'messages', 'MAX(uid)', { folder_id: boxRow.id }, null, SQL.MS_VALUE);
if (boxRow.highestmodseq)
{
this.events.emit('sync', { state: 'start', quick: true, email: this.accounts[accountId].email, folder: boxRow.name });
process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': quick resync\n');
await this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing);
boxRow.highestmodseq = boxStatus.highestmodseq;
}
else if (doFull && maxUid)
{
// list messages, update flags and version tag
this.events.emit('sync', { state: 'start', email: this.accounts[accountId].email, folder: boxRow.name });
process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': full resync\n');
await this.fullResync(srv, boxRow.id, maxUid, missing, boxStatus.messages.total);
}
missing.push((maxUid ? maxUid+1 : 1)+':*');
await this.imap.runFetch(srv, missing, {
size: true,
bodies: 'HEADER',
struct: true,
}, (messages, state) => this.saveMessages(messages, boxRow.id, state));
await SQL.update(this.pg, 'folders', {
uidvalidity: boxStatus.uidvalidity,
account_id: accountId,
highestmodseq: 0,
kind: boxKind||''
}).returning('id').row(gen.ef());
highestmodseq: boxStatus.highestmodseq||0
}, { id: boxRow.id });
}
// fetch new messages
var missing = [];
var [ maxUid ] = yield this.pg.select('MAX(uid)').from('messages')
.where({ folder_id: boxRow.id }).val(gen.ef());
if (boxRow.highestmodseq)
async fullResync(srv, boxId, maxUid, missing, total)
{
this.events.emit('sync', { state: 'start', quick: true, email: this.accounts[accountId].email, folder: boxRow.name });
process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': quick resync\n');
yield* this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing);
boxRow.highestmodseq = boxStatus.highestmodseq;
}
else if (doFull && maxUid)
{
// list messages, update flags and version tag
this.events.emit('sync', { state: 'start', email: this.accounts[accountId].email, folder: boxRow.name });
process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': full resync\n');
yield* this.fullResync(srv, boxRow.id, maxUid, missing, boxStatus.messages.total);
}
let flags = await SQL.select('messages', 'uid, flags', { folder_id: boxId });
flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {});
missing.push((maxUid ? maxUid+1 : 1)+':*');
yield* this.imap.runFetch(srv, missing, {
size: true,
bodies: 'HEADER',
struct: true,
}, (messages, state) => this.saveMessages(messages, boxRow.id, state));
let updateFlags = [];
yield this.pg.update('folders', {
uidvalidity: boxStatus.uidvalidity,
highestmodseq: boxStatus.highestmodseq||0
}).where({ id: boxRow.id }).run(gen.ef());
}
process.stderr.write('\rsynchronizing 0');
await this.imap.runFetch(
srv, '1:'+maxUid, {},
(messages, state) => this.queueFlags(messages, boxId, state),
{ flags: flags, updateFlags: updateFlags, missing: missing||[], total: total, nopause: true }
);
process.stderr.write('\n');
this.events.emit('sync', { state: 'finish-box' });
Syncer.prototype.fullResync = function*(srv, boxId, maxUid, missing, total)
{
var [ flags ] = yield this.pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef());
flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {});
await this.updateFlags(boxId, updateFlags);
var updateFlags = [];
process.stderr.write('\rsynchronizing 0');
yield* this.imap.runFetch(
srv, '1:'+maxUid, {},
(messages, state) => this.queueFlags(messages, boxId, state),
{ flags: flags, updateFlags: updateFlags, missing: missing||[], total: total, nopause: true }
);
process.stderr.write('\n');
this.events.emit('sync', { state: 'finish-box' });
yield* this.updateFlags(boxId, updateFlags);
// delete messages removed from IMAP server
flags = Object.keys(flags);
if (flags.length)
yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxId }, this.pg.sql.in('uid', flags)));
}
Syncer.prototype.queueFlags = function*(messages, boxId, fetchState)
{
for (var i = 0; i < messages.length; i++)
{
var m = messages[i][0];
if (!fetchState.flags[m.uid])
fetchState.missing.push(m.uid);
else
// delete messages removed from IMAP server
flags = Object.keys(flags);
if (flags.length)
{
if (fetchState.flags[m.uid].join(',') != m.flags.join(','))
fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) });
delete fetchState.flags[m.uid];
await this.deleteMessages({ folder_id: boxId, uid: flags });
}
}
fetchState.synced += messages.length;
this.events.emit('sync', { state: 'progress', done: fetchState.synced, total: fetchState.total });
process.stderr.write('\rsynchronizing '+fetchState.synced);
}
Syncer.prototype.updateFlags = function*(boxId, updateFlags, checkMissing)
{
if (updateFlags.length)
queueFlags(messages, boxId, fetchState)
{
var sql = this.pg.update('messages m', { flags: this.pg.sql('t.flags::varchar(255)[]') })
.from('('+this.pg.sql.values(updateFlags)+') AS t (uid, flags)')
.where({ 'm.folder_id': boxId }).where(this.pg.sql('m.uid=t.uid'));
if (checkMissing)
for (let i = 0; i < messages.length; i++)
{
var [ updated ] = yield sql.returning('m.uid').rows(gen.ef());
var missing = {};
for (var i = 0; i < updateFlags.length; i++)
missing[updateFlags[i].uid] = true;
for (var i = 0; i < updated.length; i++)
delete missing[updated[i].uid];
return Object.keys(missing);
let m = messages[i][0];
if (!fetchState.flags[m.uid])
{
fetchState.missing.push(m.uid);
}
else
{
if (fetchState.flags[m.uid].join(',') != m.flags.join(','))
{
fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) });
}
delete fetchState.flags[m.uid];
}
}
else
yield sql.run(gen.ef());
fetchState.synced += messages.length;
//this.events.emit('sync', { state: 'progress', done: fetchState.synced, total: fetchState.total });
process.stderr.write('\rsynchronizing '+fetchState.synced);
}
return [];
}
Syncer.prototype.quickResync = function*(srv, boxId, maxUid, changedSince, missing)
{
var updateFlags = [];
var vanished = [];
var onVanish = function(dias)
async updateFlags(boxId, updateFlags, checkMissing)
{
vanished = vanished.concat(vanished, dias);
};
srv.on('vanish', onVanish);
yield* this.imap.runFetch(
srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } },
(messages, state) => this.queueQuickFlags(messages, boxId, state), { updateFlags: updateFlags }
);
srv.removeListener('vanish', onVanish);
var checkedMissing = yield* this.updateFlags(boxId, updateFlags, missing && true);
if (missing)
missing.push.apply(missing, checkedMissing);
if (vanished.length)
{
yield* this.deleteVanished(boxId, vanished);
}
}
Syncer.prototype.deleteVanished = function*(boxId, vanished)
{
let lst = [], dia = [];
for (let i = 0; i < vanished.length; i++)
{
if (vanished[i][1])
dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]);
else
lst.push(vanished[i][0]);
}
if (lst.length)
dia.push('uid IN ('+lst.join(',')+')');
yield* this.deleteMessages(this.pg.sql.and({ folder_id: boxId }, this.pg.sql('('+dia.join(' OR ')+')')));
}
Syncer.prototype.queueQuickFlags = function*(messages, boxId, fetchState)
{
for (var i = 0; i < messages.length; i++)
{
var m = messages[i][0];
fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) });
}
}
Syncer.prototype.deleteMessages = function*(where)
{
yield this.pg.update('threads', { first_msg: null })
.where(this.pg.sql('first_msg IN ('+this.pg.select('id').from('messages').where(where)+')'))
.run(gen.ef());
yield this.pg.delete('messages').where(where).run(gen.ef());
yield this.pg.update('threads',
{ first_msg: this.pg.sql('('+
this.pg.select('id').from('messages').where({ thread_id: this.pg.sql('threads.id') }).orderBy('time').limit(1)
+')') }).where(this.pg.sql('first_msg IS NULL')).run(gen.ef());
yield this.pg.delete('threads').where(this.pg.sql('first_msg IS NULL')).run(gen.ef());
}
Syncer.prototype.saveMessages = function*(messages, boxId)
{
var self = this;
yield gen.throttle(2);
var uids = messages.map(m => m[1].uid);
var [ exist ] = yield this.pg.select('uid, flags').from('messages')
.where({ folder_id: boxId }).where(this.pg.sql.in('uid', uids)).rows(gen.ef());
uids = {};
for (var i = 0; i < exist.length; i++)
uids[exist[i].uid] = true;
for (var i = 0; i < messages.length; i++)
if (!uids[messages[i][1].uid])
yield* this.addMessage(boxId, messages[i][0], messages[i][1]);
}
Syncer.prototype.parseMsg = function*(msg)
{
var parser = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' });
parser.once('end', gen.cb());
parser.write(msg);
var [ obj ] = yield parser.end();
return obj;
}
Syncer.prototype.extractAttachments = function(struct, attachments)
{
attachments = attachments || [];
for (var i = 0; i < struct.length; i++)
{
if (struct[i] instanceof Array)
this.extractAttachments(struct[i], attachments);
else if (struct[i].disposition && struct[i].disposition.type == 'attachment')
if (updateFlags.length)
{
attachments.push([
mimelib.parseMimeWords(struct[i].disposition.params && struct[i].disposition.params.filename || struct[i].description || ''),
struct[i].type+'/'+struct[i].subtype,
struct[i].size
]);
let updated = await SQL.update(
this.pg, { m: 'messages', t: SQL.values(updateFlags) },
{ 'flags = t.flags::varchar(255)[]' },
{ 'm.folder_id': boxId, 'm.uid=t.uid': [] },
checkMissing ? { returning: 'm.uid' } : null
);
if (checkMissing)
{
let missing = {};
for (let i = 0; i < updateFlags.length; i++)
missing[updateFlags[i].uid] = true;
for (let i = 0; i < updated.length; i++)
delete missing[updated[i].uid];
return Object.keys(missing);
}
}
return [];
}
async quickResync(srv, boxId, maxUid, changedSince, missing)
{
let updateFlags = [];
let vanished = [];
let onVanish = function(dias)
{
vanished = vanished.concat(vanished, dias);
};
srv.on('vanish', onVanish);
await this.imap.runFetch(
srv, '1:'+maxUid, { modifiers: { changedsince: changedSince+' VANISHED' } },
(messages, state) => this.queueQuickFlags(messages, boxId, state), { updateFlags: updateFlags }
);
srv.removeListener('vanish', onVanish);
let checkedMissing = await this.updateFlags(boxId, updateFlags, missing && true);
if (missing)
{
missing.push.apply(missing, checkedMissing);
}
if (vanished.length)
{
await this.deleteVanished(boxId, vanished);
}
}
return attachments;
}
Syncer.prototype.addMessage = function*(boxId, msgrow, attrs)
{
var self = this;
var pgtx, end_transaction;
try
async deleteVanished(boxId, vanished)
{
[ pgtx, end_transaction ] = yield this.pg.transaction(gen.cb(), function(e) { if (e) throw e; });
let lst = [], dia = [];
for (let i = 0; i < vanished.length; i++)
{
if (vanished[i][1])
dia.push('uid >= '+vanished[i][0]+' AND uid <= '+vanished[i][1]);
else
lst.push(vanished[i][0]);
}
if (lst.length)
dia.push('uid IN ('+lst.join(',')+')');
await this.deleteMessages({ folder_id: boxId, '('+dia.join(' OR ')+')': [] });
}
let header = yield* this.parseMsg(msgrow.headers);
// FIXME: async
queueQuickFlags(messages, boxId, fetchState)
{
for (let i = 0; i < messages.length; i++)
{
let m = messages[i][0];
fetchState.updateFlags.push({ uid: m.uid, flags: toPgArray(m.flags) });
}
}
async deleteMessages(where)
{
await SQL.update(
this.pg, 'threads', { first_msg: null },
{ 'first_msg IN ('+SQL.select_builder('messages', 'id', where)+')': [] },
);
await SQL.delete(this.pg, 'messages', where);
await SQL.update(
this.pg, 'threads',
{ ['first_msg=('+SQL.select_builder(
'messages', 'id', { 'thread_id=threads.id': [] }, { order_by: 'time', limit: 1 }
)+')']: [] },
{ first_msg: null }
);
await SQL.delete(this.pg, 'threads', { first_msg: null });
}
async saveMessages(messages, boxId)
{
let uids = messages.map(m => m[1].uid);
let exist = await SQL.select(this.pg, 'messages', 'uid, flags', { folder_id: boxId, uid: uids });
uids = {};
for (let i = 0; i < exist.length; i++)
{
uids[exist[i].uid] = true;
}
for (let i = 0; i < messages.length; i++)
{
if (!uids[messages[i][1].uid])
{
await this.addMessage(boxId, messages[i][0], messages[i][1]);
}
}
}
async parseMsg(msg)
{
let parser = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' });
return await new Promise((r, j) =>
{
parse.once('end', r);
parser.write(msg);
});
}
extractAttachments(struct, attachments)
{
attachments = attachments || [];
for (let i = 0; i < struct.length; i++)
{
if (struct[i] instanceof Array)
this.extractAttachments(struct[i], attachments);
else if (struct[i].disposition && struct[i].disposition.type == 'attachment')
{
attachments.push([
mimelib.parseMimeWords(struct[i].disposition.params && struct[i].disposition.params.filename || struct[i].description || ''),
struct[i].type+'/'+struct[i].subtype,
struct[i].size
]);
}
}
return attachments;
}
async addMessage(boxId, msgrow, attrs)
{
await this.pg.query('BEGIN');
try
{
await this.addMessageImpl(boxId, msgrow, attrs);
await this.pg.query('COMMIT');
}
catch (e)
{
await this.pg.query('ROLLBACK');
}
}
async addMessageImpl(boxId, msgrow, attrs)
{
let header = await this.parseMsg(msgrow.headers);
header.references = header.references || [];
if (header.references.length)
{
@ -458,45 +476,45 @@ Syncer.prototype.addMessage = function*(boxId, msgrow, attrs)
if (typeof msgrow[i] == 'string')
msgrow[i] = msgrow[i].replace(/\x00/g, '');
var thisIsFirst = false;
let thisIsFirst = false;
if (header.references.length)
{
let [ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(this.pg.sql.in('messageid', header.references)).val(gen.ef());
let threadId = await SQL.select(
this.pg, 'messages', 'MAX(thread_id)',
{ messageid: header.references }, null, SQL.MS_VALUE
);
if (!threadId)
{
[ threadId ] = yield pgtx.select('MAX(thread_id)').from('messages')
.where(new this.pg.sql.Binary('@>', 'refs', toPgArray([msgrow.messageid]))).val(gen.ef());
threadId = await SQL.select(
this.pg, 'messages', 'MAX(thread_id)',
{ 'refs @> array[?]': msgrow.messageid }, null, SQL.MS_VALUE
);
if (threadId)
{
thisIsFirst = true;
}
}
msgrow.thread_id = threadId;
}
console.log(msgrow.time+' '+(header.from && header.from[0] && header.from[0].address || '?')+' '+msgrow.subject);
[ msgrow.id ] = yield pgtx.insert('messages', msgrow).returning('id').val(gen.ef());
msgrow.id = (await SQL.insert(this.pg, 'messages', msgrow, { returning: 'id' }))[0].id;
if (!msgrow.thread_id)
{
[ msgrow.thread_id ] = yield pgtx.insert('threads', {
msgrow.thread_id = (await SQL.insert(this.pg, 'threads', {
first_msg: msgrow.id,
msg_count: 1
}).returning('id').val(gen.ef());
yield pgtx.update('messages', { thread_id: msgrow.thread_id }).where({ id: msgrow.id }).run(gen.ef());
}, { returning: 'id' }))[0].id;
await SQL.update(this.pg, 'messages', { thread_id: msgrow.thread_id }, { id: msgrow.id });
}
else
{
let upd = pgtx.update('threads', { msg_count: this.pg.sql('msg_count+1') });
let upd = { 'msg_count=msg_count+1': [] };
if (thisIsFirst)
{
upd.first_msg = msgrow.id;
yield upd.where({ id: msgrow.threadId }).run(gen.ef());
}
await SQL.update(this.pg, 'threads', upd, { id: msgrow.threadId });
}
end_transaction();
}
catch (e0)
{
if (end_transaction)
end_transaction();
throw e0;
}
}

View File

@ -1,4 +1,3 @@
const gen = require('gen-thread');
const MailParser = require('mailparser').MailParser;
const htmlawed = require('htmlawed');
@ -11,249 +10,296 @@ const multer = require('multer');
const css = require('css');
const SQL = require('./select-builder-pgsql.js');
const MAX_FETCH = 100;
module.exports = SyncerWeb;
function SyncerWeb(syncer, pg, cfg)
class SyncerWeb
{
this.syncer = syncer;
this.pg = pg;
this.cfg = cfg;
this.app = express();
this.http = http.Server(this.app);
this.io = socket_io(this.http);
this.app.use(bodyparser.urlencoded({ extended: false }));
this.app.use(express_session({
secret: this.cfg.sessionSecret || '1083581xm1l3s1l39k',
resave: false,
saveUninitialized: false
}));
this.app.get('/auth', this.get_auth);
this.app.post('/auth', this.post_auth);
this.app.get('/folders', genRequest(this.get_folders.bind(this)));
this.app.get('/groups', genRequest(this.get_groups.bind(this)));
this.app.get('/messages', genRequest(this.get_messages.bind(this)));
this.app.get('/message', genRequest(this.get_message.bind(this)));
this.app.post('/sync', genRequest(this.post_sync.bind(this)));
this.syncer.events.on('sync', this.syncer_sync.bind(this));
}
constructor(syncer, pg, cfg)
{
this.syncer = syncer;
this.pg = pg;
this.cfg = cfg;
this.app = express();
this.http = http.Server(this.app);
this.io = socket_io(this.http);
this.app.use(bodyparser.urlencoded({ extended: false }));
this.app.use(express_session({
secret: this.cfg.sessionSecret || '1083581xm1l3s1l39k',
resave: false,
saveUninitialized: false
}));
this.app.get('/auth', this.get_auth);
this.app.post('/auth', this.post_auth);
this.app.get('/folders', wrapAsync(this.get_folders));
this.app.get('/groups', wrapAsync(this.get_groups));
this.app.get('/messages', wrapAsync(this.get_messages));
this.app.get('/message', wrapAsync(this.get_message));
this.app.post('/sync', wrapAsync(this.post_sync));
this.syncer.events.on('sync', this.syncer_sync);
}
SyncerWeb.prototype.listen = function(port)
{
this.http.listen(port);
}
listen(port)
{
this.http.listen(port);
}
SyncerWeb.prototype.get_auth = function(req, res)
{
return res.type('html').send(
'<form action="/auth" method="post"><input name="login" />'+
' <input name="password" type="password" /> <input type="submit" /></form>'
);
}
SyncerWeb.prototype.post_auth = function(req, res)
{
if (!req.body)
return res.sendStatus(400);
if (req.body.login == this.cfg.login && req.body.password == this.cfg.password)
get_auth = (req, res) =>
{
req.session.auth = true;
return res.send({ ok: true });
}
return res.send({ ok: false });
}
SyncerWeb.prototype.get_folders = function*(req, res)
{
if (this.cfg.login && (!req.session || !req.session.auth))
{
return res.sendStatus(401);
}
var [ accounts ] = yield this.pg.select(
'id, name, email, settings->\'folders\' folderMap,'+
' (select count(*) from messages m, folders f where m.folder_id=f.id and f.account_id=a.id and (flags @> array[\'pinned\',\'unread\']::varchar(255)[])) pinned_unread_count'
).from('accounts a').rows(gen.ef());
var [ folders ] = yield this.pg.select(
'id, account_id, name,'+
' (select count(*) from messages m where m.folder_id=f.id) total_count,'+
' (select count(*) from messages m where m.folder_id=f.id and (flags @> array[\'unread\']::varchar(255)[])) unread_count'
).from('folders f').orderBy('account_id, name').rows(gen.ef());
var fh = {};
for (let i = 0; i < folders.length; i++)
{
fh[folders[i].account_id] = fh[folders[i].account_id] || [];
fh[folders[i].account_id].push(folders[i]);
}
for (let i = 0; i < accounts.length; i++)
{
accounts[i].folders = fh[accounts[i].id] || [];
}
return res.send({ accounts: accounts });
}
function ymd(dt)
{
var m = dt.getMonth()+1;
var d = dt.getDate();
return dt.getFullYear()+'-'+(m < 10 ? '0'+m : m)+'-'+(d < 10 ? '0'+d : d);
}
SyncerWeb.prototype.msgSearchCond = function(query)
{
var p = [];
if (query.folderId)
p.push(this.pg.sql.eq('m.folder_id', query.folderId));
else if (query.folderType == 'unread')
p.push(this.pg.sql('(flags @> array[\'unread\']::varchar(255)[])'));
else if (query.folderType == 'pinned')
p.push(this.pg.sql('(flags @> array[\'flagged\']::varchar(255)[])'));
else if (query.folderType == 'inbox')
{
var folders = [];
for (var id in this.syncer.accounts)
{
n = this.syncer.accounts[id].settings.folders.spam;
if (n)
folders.push(this.pg.sql.and({ 'f.name': n, 'f.account_id': id }));
}
p.push(this.pg.sql.not(this.pg.sql.or.apply(this.pg.sql, folders)));
p.push(this.pg.sql('(flags @> array[\'in\']::varchar(255)[])'));
}
else if (query.folderType == 'out')
p.push(this.pg.sql('(flags @> array[\'out\']::varchar(255)[])'));
else if (query.folderType == 'outbox')
{
// TODO это какая-то хитрая метапапка, которая не живёт на IMAP'е?
}
else if (query.folderType == 'drafts' || query.folderType == 'spam' || query.folderType == 'trash')
{
var folders = [];
var n;
for (var id in this.syncer.accounts)
{
n = this.syncer.accounts[id].settings.folders[query.folderType];
if (n)
folders.push(this.pg.sql.and({ 'f.name': n, 'f.account_id': id }));
}
p.push(this.pg.sql.or.apply(this.pg.sql, folders));
}
if (typeof query.search == 'string' && query.search.trim())
p.push(this.pg.sql('messages_fulltext(m) @@ plainto_tsquery($1)', query.search.trim()));
if (query.accountId)
p.push(this.pg.sql.and(p, this.pg.sql.eq('f.account_id', query.accountId)));
return p.length ? this.pg.sql.and.apply(this.pg.sql, p) : null;
}
SyncerWeb.prototype.get_groups = function*(req, res)
{
if (this.cfg.login && (!req.session || !req.session.auth))
return res.sendStatus(401);
var cond = this.msgSearchCond(req.query);
if (!cond)
return res.status(500).send('Need message query parameters');
var intervals = [];
var today, today_ts;
today = new Date(ymd(new Date()));
today_ts = today.getTime();
var week_start = today_ts - ((today.getDay()+6)%7)*86400000;
var prev_week = ymd(new Date(week_start - 86400000*7));
for (var i = 1; i <= 12; i++)
{
var d = today.getFullYear()+'-'+(i < 10 ? '0' : '')+i+'-01';
if (d >= prev_week)
break;
intervals.push({ date: d, name: 'm'+i });
}
intervals.push({ date: prev_week, name: 'pw' });
for (var i = week_start, d = 1; i < today_ts; i += 86400000, d++)
{
intervals.push({ date: ymd(new Date(i)), name: 'd'+d });
}
for (var i = today.getFullYear()-1; i >= 1970; i--)
{
intervals.unshift({ date: i+'-01-01', name: ''+i });
}
intervals.push({ date: ymd(today), name: 't' });
for (var i = 0; i < intervals.length-1; i++)
{
intervals[i].date_end = intervals[i+1].date;
}
intervals[intervals.length-1].date_end = '100000-12-31'; // it's faster than (is null or <)
var [ groups ] = yield this.pg
.select('d.name, d.date, ('+
this.pg.select('count(*)')
.from('messages m')
.innerJoin('folders f', this.pg.sql('f.id=m.folder_id'))
.where(cond)
.where(this.pg.sql('m.time >= d.date::date and m.time < d.date_end::date'))
+') count')
.from(this.pg.sql.values(intervals).as('d').columns())
.orderBy('date desc').rows(gen.ef());
groups = groups.filter(g => g.count > 0);
return res.send({ groups: groups });
}
SyncerWeb.prototype.get_messages = function*(req, res)
{
if (this.cfg.login && (!req.session || !req.session.auth))
return res.sendStatus(401);
var cond = this.msgSearchCond(req.query);
if (!cond)
return res.status(500).send('Need message query parameters');
var limit = req.query.limit || 50;
if (limit > MAX_FETCH)
limit = MAX_FETCH;
var offset = req.query.offset || 0;
var [ msgs ] = yield this.pg.select('m.*').from('messages m')
.innerJoin('folders f', this.pg.sql('f.id=m.folder_id'))
.where(cond).orderBy('time desc').limit(limit).offset(offset)
.rows(gen.ef());
for (var i = 0; i < msgs.length; i++)
{
delete msgs[i].text_index;
}
return res.send({ messages: msgs });
}
SyncerWeb.prototype.get_message = function*(req, res)
{
if (this.cfg.login && (!req.session || !req.session.auth))
return res.sendStatus(401);
var msgId = req.query.msgId;
console.log('fetch message '+msgId);
var [ msg ] = yield this.pg.select('m.*, f.name folder_name, f.account_id')
.from('messages m').join('folders f', this.pg.sql('f.id=m.folder_id'))
.where({ 'm.id': msgId }).rows(gen.ef());
if (!msg.length)
return res.send({ error: 'not-found' });
msg = msg[0];
if (!msg.body_html && !msg.body_text)
{
var srv = yield* this.syncer.imap.getConnection(msg.account_id, msg.folder_name);
var [ upd ] = yield* this.syncer.imap.runFetch(
srv, msg.uid, { bodies: '' },
(messages, state) => this.getBody(messages, msg.folder_id)
return res.type('html').send(
'<form action="/auth" method="post"><input name="login" />'+
' <input name="password" type="password" /> <input type="submit" /></form>'
);
this.syncer.imap.releaseConnection(msg.account_id);
return res.send({ msg: { ...msg, ...upd } });
}
return res.send({ msg: msg });
}
SyncerWeb.prototype.syncer_sync = function(params)
{
this.io.emit('sync', params);
}
post_auth = (req, res) =>
{
if (!req.body)
return res.sendStatus(400);
if (req.body.login == this.cfg.login && req.body.password == this.cfg.password)
{
req.session.auth = true;
return res.send({ ok: true });
}
return res.send({ ok: false });
}
SyncerWeb.prototype.post_sync = function*(req, res)
{
if (this.cfg.login && (!req.session || !req.session.auth))
return res.sendStatus(401);
if (this.syncer.syncInProgress)
return res.send({ error: 'already-running' });
gen.run(this.syncer.syncAll());
return res.send({ status: 'started' });
get_folders = async (req, res) =>
{
if (this.cfg.login && (!req.session || !req.session.auth))
{
return res.sendStatus(401);
}
const accounts = (await this.pg.query(
'select id, name, email, settings->\'folders\' folderMap,'+
' (select count(*) from messages m, folders f'+
' where m.folder_id=f.id and f.account_id=a.id'+
' and (flags @> array[\'pinned\',\'unread\']::varchar(255)[])) pinned_unread_count'+
' from accounts a'
)).rows;
const folders = (await this.pg.query(
'select id, account_id, name,'+
' (select count(*) from messages m where m.folder_id=f.id) total_count,'+
' (select count(*) from messages m where m.folder_id=f.id and (flags @> array[\'unread\']::varchar(255)[])) unread_count'+
' from folders f order by account_id, name'
)).rows;
let fh = {};
for (let i = 0; i < folders.length; i++)
{
fh[folders[i].account_id] = fh[folders[i].account_id] || [];
fh[folders[i].account_id].push(folders[i]);
}
for (let i = 0; i < accounts.length; i++)
{
accounts[i].folders = fh[accounts[i].id] || [];
}
return res.send({ accounts: accounts });
}
msgSearchCond(query)
{
let p = {};
if (query.folderId)
{
p['m.folder_id'] = query.folderId;
}
else if (query.folderType == 'unread')
{
p['(flags @> array[\'unread\']::varchar(255)[])'] = [];
}
else if (query.folderType == 'pinned')
{
p['(flags @> array[\'flagged\']::varchar(255)[])'] = [];
}
else if (query.folderType == 'inbox')
{
let folders = Object.keys(this.syncer.accounts)
.map(id => [ id, this.syncer.accounts[id].settings.folders.spam ])
.filter(f => f[1]);
p['(f.account_id, f.name) NOT IN ('+folders.map(f => '(?, ?)').join(', ')+')'] =
[].concat.apply([], folders);
p['(flags @> array[\'in\']::varchar(255)[])'] = [];
}
else if (query.folderType == 'out')
{
p['(flags @> array[\'out\']::varchar(255)[])'] = [];
}
else if (query.folderType == 'outbox')
{
// TODO это какая-то хитрая метапапка, которая не живёт на IMAP'е?
}
else if (query.folderType == 'drafts' || query.folderType == 'spam' || query.folderType == 'trash')
{
let folders = Object.keys(this.syncer.accounts)
.map(id => [ id, this.syncer.accounts[id].settings.folders[query.folderType] ])
.filter(f => f[1]);
p['(f.account_id, f.name) IN ('+folders.map(f => '(?, ?)').join(', ')+')'] =
[].concat.apply([], folders);
}
if (typeof query.search == 'string' && query.search.trim())
{
p['messages_fulltext(m) @@ plainto_tsquery($1)'] = [ query.search.trim() ];
}
if (query.accountId)
{
p['f.account_id'] = query.accountId;
}
return Object.keys(p).length ? p : null;
}
get_groups = async (req, res) =>
{
if (this.cfg.login && (!req.session || !req.session.auth))
{
return res.sendStatus(401);
}
const cond = this.msgSearchCond(req.query);
if (!cond)
{
return res.status(500).send('Need message query parameters');
}
let intervals = [];
let today, today_ts;
today = new Date(ymd(new Date()));
today_ts = today.getTime();
let week_start = today_ts - ((today.getDay()+6)%7)*86400000;
let prev_week = ymd(new Date(week_start - 86400000*7));
for (let i = 1; i <= 12; i++)
{
let d = today.getFullYear()+'-'+(i < 10 ? '0' : '')+i+'-01';
if (d >= prev_week)
break;
intervals.push({ date: d, name: 'm'+i });
}
intervals.push({ date: prev_week, name: 'pw' });
for (let i = week_start, d = 1; i < today_ts; i += 86400000, d++)
{
intervals.push({ date: ymd(new Date(i)), name: 'd'+d });
}
for (let i = today.getFullYear()-1; i >= 1970; i--)
{
intervals.unshift({ date: i+'-01-01', name: ''+i });
}
intervals.push({ date: ymd(today), name: 't' });
for (let i = 0; i < intervals.length-1; i++)
{
intervals[i].date_end = intervals[i+1].date;
}
intervals[intervals.length-1].date_end = '100000-12-31'; // it's faster than (is null or <)
let groups = await SQL.select(
this.pg,
{ d: SQL.values(intervals) },
'd.name, d.date, ('+SQL.select_builder(
{ m: 'messages', f: [ 'INNER', 'folders', [ 'f.id=m.folder_id' ] ] },
'count(*) count',
{ ...cond, 'm.time >= d.date::date and m.time < d.date_end::date': [] },
)+') count',
[],
{ order_by: 'date desc' }
);
groups = groups.filter(g => g.count > 0);
return res.send({ groups: groups });
}
get_messages = async (req, res) =>
{
if (this.cfg.login && (!req.session || !req.session.auth))
{
return res.sendStatus(401);
}
let cond = this.msgSearchCond(req.query);
if (!cond)
{
return res.status(500).send('Need message query parameters');
}
let limit = req.query.limit || 50;
if (limit > MAX_FETCH)
limit = MAX_FETCH;
let offset = req.query.offset || 0;
let msgs = await SQL.select(
this.pg,
{ m: 'messages', f: [ 'INNER', 'folders', [ 'f.id=m.folder_id' ] ] },
'm.*', cond,
{ order_by: 'time desc', limit, offset }
);
for (let i = 0; i < msgs.length; i++)
{
delete msgs[i].text_index;
}
return res.send({ messages: msgs });
}
get_message = async (req, res) =>
{
if (this.cfg.login && (!req.session || !req.session.auth))
{
return res.sendStatus(401);
}
let msgId = req.query.msgId;
console.log('fetch message '+msgId);
let msg = await SQL.select(
this.pg,
{ m: 'messages', f: [ 'INNER', 'folders', [ 'f.id=m.folder_id' ] ] },
'm.*, f.name folder_name, f.account_id',
{ 'm.id': msgId }, null, SQL.MS_ROW
);
delete msg.text_index;
if (!msg)
{
return res.send({ error: 'not-found' });
}
if (!msg.body_html && !msg.body_text)
{
let srv = await this.syncer.imap.getConnection(msg.account_id, msg.folder_name);
let upd = await this.syncer.imap.runFetch(
srv, msg.uid, { bodies: '' },
(messages, state) => this.getBody(messages, msg.folder_id)
);
this.syncer.imap.releaseConnection(msg.account_id);
return res.send({ msg: { ...msg, ...upd } });
}
return res.send({ msg: msg });
}
syncer_sync = (params) =>
{
this.io.emit('sync', params);
}
post_sync = async (req, res) =>
{
if (this.cfg.login && (!req.session || !req.session.auth))
{
return res.sendStatus(401);
}
if (this.syncer.syncInProgress)
{
return res.send({ error: 'already-running' });
}
this.syncer.syncAll().catch(console.error);
return res.send({ status: 'started' });
}
getBody = async (messages, boxId) =>
{
for (let i = 0; i < messages.length; i++)
{
let msg = messages[i];
let obj = await this.syncer.parseMsg(msg[0].headers);
obj.html = sanitizeHtml(obj.html);
let upd = { body_text: obj.text||'', body_html: obj.html };
upd.body_html_text = obj.html.replace(/<style[^>]*>.*<\/style\s*>|<\/?[^>]*>/g, '');
await SQL.update(this.pg, 'messages m', upd, { folder_id: boxId, uid: msg[0].uid });
if (messages.length == 1)
{
return [ upd ];
}
}
return null;
}
}
function rewriteCss(ast)
@ -308,29 +354,21 @@ function sanitizeHtml(html)
html = htmlawed.sanitize(html||'', { safe: 1, elements: '* +style', keep_bad: 0, comment: 1 });
html = html.replace(/<style[^>]*>([\s\S]*)<\/style\s*>/ig, function(m, m1)
{
var ast = css.parse(m1, { silent: true });
let ast = css.parse(m1, { silent: true });
rewriteCss(ast);
return '<style>'+css.stringify(ast)+'</style>';
});
return html;
}
SyncerWeb.prototype.getBody = function*(messages, boxId)
function wrapAsync(fn)
{
for (var i = 0; i < messages.length; i++)
{
let msg = messages[i];
let obj = yield* this.syncer.parseMsg(msg[0].headers);
obj.html = sanitizeHtml(obj.html);
let upd = { body_text: obj.text||'', body_html: obj.html };
upd.body_html_text = obj.html.replace(/<style[^>]*>.*<\/style\s*>|<\/?[^>]*>/g, '');
yield this.pg.update('messages m', upd).where({ folder_id: boxId, uid: msg[0].uid }).run(gen.ef());
if (messages.length == 1)
return [ upd ];
}
return (req, res) => fn(req, res).catch(e => res.status(500).send('Internal Error: '+e.stack));
}
function genRequest(fn)
function ymd(dt)
{
return (req, res) => gen.run(fn(req, res), null, e => res.status(500).send('Internal Error: '+e.stack));
let m = dt.getMonth()+1;
let d = dt.getDate();
return dt.getFullYear()+'-'+(m < 10 ? '0'+m : m)+'-'+(d < 10 ? '0'+d : d);
}

View File

@ -1,6 +1,5 @@
/**
* TODO:
* - перейти на redux
* - исправить параллелизм запросов и sync'а
* - фоновая индексация всех текстов сообщений в ящике
* - скачивание вложений
@ -37,29 +36,31 @@
* В определённом плане получается тупость - получается, что дублируешь
* функционал самого почтового сервера. Но шо ж с ним поделаешь, если он "ни ф силах"...
* Ведь по сути-то, MTA от такой штуки нужен только 1 метод: "добавить сообщение в папку".
*
* Блин, IMAP - кривой протокол
* - sequence number-ы это какая-то жопа
* - обновления идут по sequence number-ам
* - обновления идут только по активному mailbox-у
* - а ещё есть какие-то сраные неймспейсы
*/
require('heapdump');
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
require('babel-register');
const gen = require('gen-thread');
const bricks = require('pg-bricks');
const pg = require('pg');
const Syncer = require('./Syncer.js');
const SyncerWeb = require('./SyncerWeb.js');
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
var cfg = require('./cfg.json');
let syncer = new Syncer(pg);
let syncerweb = new SyncerWeb(syncer, pg, cfg);
var pg = bricks.configure(cfg.pg);
pg._pg.types.setTypeParser(1082, 'text', val => val); // НЕ ПАРСИТЬ ДАТЫ ! ! !
var syncer = new Syncer(pg);
var syncerweb = new SyncerWeb(syncer, pg, cfg);
gen.run(function*()
async function startSync(cfg)
{
yield* syncer.init(cfg);
yield* syncer.syncAll();
});
let connection = new pg.Client(cfg.pg);
await connection.connect();
await syncer.init(cfg, connection);
await syncer.syncAll();
}
let cfg = require('./cfg.json');
startSync(cfg).catch(console.error);
syncerweb.listen(8057);

View File

@ -7,11 +7,11 @@
"name": "operetta-backend",
"description": "Operetta webmail backend",
"dependencies": {
"gen-thread": "latest",
"htmlawed": "latest",
"body-parser": "latest",
"css": "latest",
"express": "latest",
"express-session": "latest",
"htmlawed": "latest",
"iconv-lite": "latest",
"imap": "latest",
"mailparser": "git+https://github.com/vitalif/mailparser#master",
@ -19,13 +19,9 @@
"multer": "latest",
"nodemailer": "latest",
"pg": "latest",
"pg-bricks": "latest",
"sql-bricks": "latest",
"socket.io": "latest",
"css": "latest"
},
"peerDependencies": {
"sql-bricks": ">=1.4.0"
},
"devDependencies": {
"babel-cli": "latest",

8
run.sh
View File

@ -1,2 +1,6 @@
node_modules/.bin/babel operetta.js > operetta.c.js
nodejs --max_old_space_size=100 operetta.c.js
#!/bin/sh
for i in ImapManager.js operetta.js Syncer.js SyncerWeb.js; do
node_modules/.bin/babel $i > compiled/$i
done
nodejs --max_old_space_size=100 compiled/operetta.js

279
select-builder-pgsql.js Normal file
View File

@ -0,0 +1,279 @@
// Простенький "селект билдер" по мотивам MediaWiki-овского, успешно юзаю подобный в PHP уже лет 8
// (c) Виталий Филиппов, 2019
// Версия 2019-05-08
// В PHP, правда, прикольнее - там в массиве можно смешивать строковые и численные ключи,
// благодаря чему можно писать $where = [ 't1.a=t2.a', 't2.b' => [ 1, 2, 3 ] ]
const pg = require('pg');
// Сраный node-postgres конвертирует даты в Date и портит таймзону
const DATATYPE_DATE = 1082;
pg.types.setTypeParser(DATATYPE_DATE, function(val)
{
return val === null ? null : val;
});
let pg_escape;
const MS_HASH = 0;
const MS_LIST = 1;
const MS_ROW = 2;
const MS_COL = 4;
const MS_VALUE = 6;
function select_builder(tables, fields, where, options)
{
let sql = 'SELECT ', bind = [];
if (fields instanceof Array)
{
sql += fields.join(', ');
}
else if (typeof fields == 'string')
{
sql += fields;
}
else if (typeof fields == 'object')
{
sql += Object.keys(fields).map(k => fields[k]+' AS '+k).join(', ');
}
else
{
throw new Error('fields = '+fields+' is invalid');
}
sql += ' FROM ';
let first = true;
let moreWhere = null;
tables = typeof tables == 'string' ? { t: tables } : tables;
for (const k in tables)
{
if (first)
{
if (typeof tables[k] != 'string')
{
// Бывает удобно указывать WHERE как условие "JOIN" первой таблицы
sql += tables[k][1] + ' ' + k;
moreWhere = tables[k][2];
}
else
{
sql += tables[k] + ' ' + k;
}
first = false;
}
else if (typeof tables[k] == 'string')
{
sql += ' INNER JOIN '+tables[k]+' '+k+' ON 1=1';
}
else
{
sql += ' ' + tables[k][0].toUpperCase() + ' JOIN ';
let t = tables[k][1];
if (t instanceof Pg_Values)
{
sql += '(VALUES ';
let i = 0;
for (const row of t.rows)
{
sql += (i > 0 ? ', (' : '(') + t.keys.map(() => '$'+(++i)).join(', ')+')';
bind.push.apply(bind, t.keys.map(k => row[k]));
}
sql += ') AS '+k+'('+t.keys.join(', ')+')';
}
else
{
sql += t + ' ' + k;
}
const on = whereBuilder(tables[k][2]);
sql += ' ON ' + (on[0] || '1=1');
bind.push.apply(bind, on[1]);
}
}
const w = whereBuilder(where);
sql += ' WHERE '+(w[0] || '1=1');
bind.push.apply(bind, w[1]);
if (moreWhere)
{
moreWhere = whereBuilder(moreWhere);
if (moreWhere[0])
{
sql += ' AND '+moreWhere[0];
bind.push.apply(bind, moreWhere[1]);
}
}
options = options||{};
if (options['GROUP BY'] || options.group_by)
{
let group = options['GROUP BY'] || options.group_by;
group = group instanceof Array ? group : [ group ];
sql += ' GROUP BY '+group.join(', ');
}
if (options['ORDER BY'] || options.order_by)
{
let order = options['ORDER BY'] || options.order_by;
order = order instanceof Array ? order : [ order ];
sql += ' ORDER BY '+order.join(', ');
}
if (options.LIMIT || options.limit)
{
sql += ' LIMIT '+((options.LIMIT || options.limit) | 0);
}
if (options.OFFSET || options.offset)
{
sql += ' LIMIT '+((options.OFFSET || options.offset) | 0);
}
return [ sql, bind ];
}
function whereOrSetBuilder(fields, where)
{
if (typeof fields == 'string')
return [ fields, [] ];
const w = [], bind = [];
for (const k in fields)
{
let v = fields[k];
if (k.indexOf('?') >= 0)
{
if (!(v instanceof Array))
v = [ v ];
w.push(k);
bind.push.apply(bind, v);
}
else if (/^\d+$/.exec(k))
{
if (v instanceof Array)
{
w.push(v[0]);
bind.push.apply(bind, v.slice(1));
}
else
{
w.push(v);
}
}
else if (v != null || v instanceof Array && v.length)
{
v = v instanceof Array ? v : [ v ];
w.push(v.length == 1 ? k + ' = ?' : k + ' in (' + v.map(() => '?').join(', ') + ')');
bind.push.apply(bind, v);
}
}
if (!where)
return [ w.join(', '), bind ];
return [ w.length ? '('+w.join(') and (')+')' : '', bind ];
}
function whereBuilder(where)
{
return whereOrSetBuilder(where, true);
}
function _positional(sql)
{
let i = 0;
sql = sql.replace(/\?/g, () => '$'+(++i));
return sql;
}
function _inline(sql, bind)
{
if (!pg_escape)
{
pg_escape = require('pg-escape');
}
let i = 0;
sql = sql.replace(/\?/g, () => '\''+pg_escape.string(bind[i++])+'\'');
return sql;
}
// dbh = node-postgres.Client
async function select(dbh, tables, fields, where, options, format)
{
let [ sql, bind ] = select_builder(tables, fields, where, options);
//console.log(_inline(sql, bind));
let data = await dbh.query(_positional(sql), bind);
if ((format & MS_LIST) || (format & MS_COL))
data = data.rows.map(r => Object.values(r));
else
data = data.rows;
if (format & MS_ROW)
data = data[0];
if (data && (format & MS_COL))
data = data[0];
return data;
}
async function insert(dbh, table, rows, options)
{
if (!(rows instanceof Array))
{
rows = [ rows ];
}
if (!rows.length)
{
return null;
}
const keys = Object.keys(rows[0]);
let sql = 'insert into '+table+' ('+keys.join(', ')+') values ';
const bind = [];
let i = 0;
for (const row of rows)
{
sql += (i > 0 ? ', (' : '(') + keys.map(() => '$'+(++i)).join(', ')+')';
bind.push.apply(bind, keys.map(k => row[k]));
}
if (options.returning)
{
sql += ' returning '+options.returning;
return (await dbh.query(sql, bind)).rows;
}
else
{
return await dbh.query(sql, bind);
}
}
async function _delete(dbh, table, where)
{
const w = whereBuilder(where);
const sql = 'DELETE FROM '+table+' WHERE '+(w[0] || '1=1');
return await dbh.execute(_positional(sql), w[1]);
}
async function update(dbh, table, set, where)
{
set = whereOrSetBuilder(set, false);
where = whereOrSetBuilder(where, true)
const sql = 'UPDATE '+table+' SET '+set[0]+' WHERE '+(where[0] || '1=1');
const bind = [ ...set[1], ...where[1] ];
return await dbh.execute(_positional(sql), bind);
}
function values(rows)
{
return new Pg_Values(Object.keys(rows[0]), rows);
}
class Pg_Values
{
constructor(keys, rows)
{
this.keys = keys;
this.rows = rows;
}
}
module.exports = {
select_builder,
select,
insert,
delete: _delete,
update,
values,
MS_HASH,
MS_LIST,
MS_ROW,
MS_COL,
MS_VALUE,
};