diff --git a/db.sql b/db.sql index 4816391..b86f28d 100644 --- a/db.sql +++ b/db.sql @@ -41,8 +41,12 @@ create table messages ( cc_list text not null, bcc_list text not null, headers text not null, - body text not null, + body_html text not null, + body_text text not null, + body_html_text text not null, + text_index tsvector not null, time timestamptz not null, + size unsigned not null, flags varchar(255)[] not null, vertag int not null default 0, foreign key (folder_id) references folders (id) on delete cascade on update cascade @@ -53,6 +57,29 @@ create index messages_messageid on messages (messageid); create index messages_refs on messages using gin (refs); create index messages_vertag on messages (folder_id, vertag); create index messages_time on messages (folder_id, time); +create index messages_text on messages using gin (text_index); +create or replace function fn_messages_text_index() returns trigger +security definer language plpgsql as $$ +begin + NEW.text_index = ( + setweight(to_tsvector('russian', regexp_replace(NEW.from_name || ' ' || NEW.from_email || ' ' || + NEW.replyto_name || ' ' || NEW.replyto_email || ' ' || + NEW.to_list || ' ' || NEW.cc_list || ' ' || NEW.bcc_list || ' ' || NEW.subject, '\W+', ' ', 'g')), 'A') || + setweight(to_tsvector('russian', NEW.body_html_text || ' ' || NEW.body_text), 'B') + ); + return NEW; +end +$$; +create trigger messages_text_index before insert or update on messages +for each row execute procedure fn_messages_text_index(); + +create table attachments ( + id serial not null primary key, + msg_id int not null, + ctype varchar(255) not null, + size unsigned not null, + foreign key (msg_id) references messages (id) on delete cascade on update cascade +); create table threads ( id serial not null primary key, @@ -60,6 +87,7 @@ create table threads ( msg_count int not null default 1, foreign key (first_msg) references messages (id) on delete restrict on update cascade ); +create index threads_first_msg on threads (first_msg); alter table messages add foreign key (thread_id) references threads (id) on delete restrict on update cascade; diff --git a/operetta.js b/operetta.js index 8caa738..a00072e 100644 --- a/operetta.js +++ b/operetta.js @@ -1,40 +1,217 @@ +// TODO: Получать, парсить и хранить тела писем (и, вероятно, вложения) + индексировать тексты +// TODO: Группировка писем // TODO: Висеть в виде демона и сразу получать новые письма (IDLE) -// TODO: Сделать веб-сервер для обновления view +// TODO: Сделать веб-сервер // TODO: Сделать подписки на новые сообщения по вебсокетам +// TODO: Чего я ещё хотел - интеграцию с maillog'ом и серверным спамфильтром + +/** + * Нужные методы API: + * - список аккаунтов и папок + * - список сообщений в папке + * - содержимое сообщения + * - поиск по тексту + * - список тредов в папке, с сообщениями + * - проверить почту + * - пометить прочтённым + * - переместить + * - удалить + * - подсказка адресов To + * - отправить сообщение + * + * В определённом плане получается тупость - получается, что дублируешь + * функционал самого почтового сервера. Но шо ж с ним поделаешь, если он "ни ф силах"... + * Ведь по сути-то, MTA от такой штуки нужен только 1 метод: "добавить сообщение в папку". + */ + +require('heapdump'); + +const gen = require("gen-thread"); +const Imap = require('imap'); +const iconv = require('iconv-lite'); +const MailParser = require('mailparser').MailParser; +const bricks = require('pg-bricks'); +const htmlawed = require('htmlawed'); + +const express = require('express'); +const express_session = require('express-session'); +const bodyparser = require('body-parser'); +const multer = require('multer'); process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; var cfg = require('./cfg.json'); -require('heapdump'); -var gen = require('gen-thread'); -var Imap = require('imap'); -var inspect = require('util').inspect; -var iconv = require('iconv-lite'); - -var bricks = require('pg-bricks'); -var pg = bricks.configure('postgresql://'+cfg.pg.user+':'+cfg.pg.password+'@'+(cfg.pg.host||'')+':'+cfg.pg.port+'/'+cfg.pg.database); - -function splitEmails(s) -{ - var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|]+)>?)/; // ' - var m, r = []; - while (m = re.exec(s)) - { - s = s.substr(m[0].length); - r.push({ name: (m[1]||m[2]||'').trim(), email: (m[3]||m[4]||'').trim() }); - } - return r; -} - -function toPgArray(a) -{ - a = JSON.stringify(a); - return '{'+a.substring(1, a.length-1)+'}'; -} +var pg = bricks.configure(cfg.pg); var Syncer = { }; +Syncer.app = express(); + +Syncer.app.use(bodyparser.urlencoded({ extended: false })); + +Syncer.app.use(express_session({ + secret: cfg.sessionSecret || '1083581xm1l3s1l39k', + resave: false, + saveUninitialized: false +})); + +Syncer.app.get('/auth', function(req, res) +{ + return res.type('html').send('
'); +}); + +Syncer.app.post('/auth', function(req, res) +{ + if (!req.body) + return res.sendStatus(400); + if (req.body.login == cfg.login && req.body.password == cfg.password) + { + req.session.auth = true; + return res.send({ ok: true }); + } + return res.send({ ok: false }); +}); + +Syncer.app.get('/folders', genRequest(function*(req, res) +{ + var self = Syncer; + if (!req.session || !req.session.auth) + return res.sendStatus(401); + var [ accounts ] = yield pg.select('id, name, email').from('accounts').rows(gen.ef()); + var [ folders ] = yield pg.select( + 'id, account_id, name,'+ + ' (select count(*) from messages m where m.folder_id=f.id) total_count,'+ + ' (select count(*) from messages m where m.folder_id=f.id and (flags @> array[\'unread\']::varchar(255)[])) unread_count' + ).from('folders f').orderBy('account_id, name').rows(gen.ef()); + var fh = {}; + for (let i = 0; i < folders.length; i++) + { + fh[folders[i].account_id] = fh[folders[i].account_id] || []; + fh[folders[i].account_id].push(folders[i]); + } + for (let i = 0; i < accounts.length; i++) + { + accounts[i].folders = fh[accounts[i].id] || []; + } + return res.send({ accounts: accounts }); +})); + +Syncer.app.get('/messages', genRequest(function*(req, res) +{ + var self = Syncer; + if (!req.session || !req.session.auth) + return res.sendStatus(401); + var folderId = req.query.folderId; + if (!folderId) + return res.status(500).send('Need `folderId` query parameter'); + var limit = req.query.limit || 50; + var offset = req.query.offset || 0; + var [ msgs ] = yield pg.select('*').from('messages').where({ folder_id: folderId }) + .limit(limit).offset(offset).rows(gen.ef()); + return res.send({ messages: msgs }); +})); + +Syncer.app.get('/message', genRequest(function*(req, res) +{ + var self = Syncer; + if (!req.session || !req.session.auth) + return res.sendStatus(401); + var msgId = req.query.msgId; + var [ msg ] = yield pg.select('m.*, f.name folder_name, f.account_id') + .from('messages m').join('folders f', 'f.id=m.folder_id') + .where({ 'm.id': msgId }).row(gen.ef()); + if (!msg) + return res.send({ error: 'not-found' }); + if (!msg.body_html && !msg.body_text) + { + var srv = yield* self.getConnection(msg.account_id, boxName); + var [ upd ] = yield* self.runFetch(msg.uid, { bodies: '' }, msg.folder_id, 'getBody'); + self.releaseConnection(accountId); + return res.send({ msg: { ...msg, ...upd } }); + } +})); + +Syncer.getBody = function*(messages, boxId) +{ + var self = this; + var p = new MailParser({ streamAttachments: false, defaultCharset: 'windows-1251' }); + for (var i = 0; i < messages.length; i++) + { + let msg = messages[i]; + p.on('end', gen.cb()); + p.write(msg[0].headers); + let obj = yield p.end(); + delete msg[0].headers; + obj.html = htmlawed.sanitize(obj.html, { safe: 1, elements: '* +style' }); + let upd = { body_text: obj.text, body_html: obj.html }; + upd.body_html_text = obj.html.replace(/]*>.*<\/style\s*>|<\/?[^>]*>/g, ''); + yield pg.update('messages m', upd).where({ folder_id: boxId, uid: msg[0].uid }).run(gen.ef()); + if (messages.length == 1) + return [ upd ]; + } +}; + +Syncer.getConnection = function*(accountId, boxName) +{ + var self = this; + if (self.connections[accountId]) + { + if (self.busy[accountId]) + yield self.queue[accountId].push(gen.cb()); + if (boxName && self.selected[accountId] != boxName) + { + yield srv.openBox(boxName, true, gen.ef()); + self.selected[accountId] = boxName; + } + self.busy[accountId] = true; + return self.connections[accountId]; + } + + var srv = new Imap(self.accounts[accountId].imap); + + srv.once('ready', gen.cb()); + // FIXME handle connection errors + yield srv.connect(); + yield srv._enqueue('ENABLE QRESYNC', gen.cb()); + + // Monkey-patch node-imap to support VANISHED responses + var oldUT = srv._parser._resUntagged; + srv._parser._resUntagged = function() + { + var m; + if (m = /^\* VANISHED( \(EARLIER\))? ([\d:,]+)/.exec(this._buffer)) + self.vanished.push(m[2].split(/,/).map(s => s.split(':'))); + oldUT.apply(this); + }; + + srv.on('close', function() + { + delete self.connections[accountId]; + if (self.srv == srv) + delete self.srv; + }); + + if (boxName) + { + yield srv.openBox(boxName, true, gen.ef()); + self.selected[accountId] = boxName; + } + + self.connections[accountId] = srv; + self.busy[accountId] = true; + self.queue[accountId] = []; + return srv; +} + +Syncer.releaseConnection = function(accountId) +{ + var self = this; + self.busy[accountId] = false; + if (self.queue[accountId].length) + (self.queue[accountId].shift())(); +} + Syncer.sync = function*(account) { var self = this; @@ -95,8 +272,6 @@ Syncer.sync = function*(account) .where({ folder_id: row.id }).val(gen.ef()); self.versionTag = self.versionTag || 0; } - yield pg.update('folders', { uidvalidity: box.uidvalidity, unread_count: box.messages.new, highestmodseq: box.highestmodseq||0 }) - .where({ id: row.id }).run(gen.ef()); } else { @@ -104,9 +279,9 @@ Syncer.sync = function*(account) name: box.name, uidvalidity: box.uidvalidity, account_id: accountId, - unread_count: box.messages.new, - highestmodseq: box.highestmodseq||0, - //total_count: box.messages.count + highestmodseq: 0, + //unread_count: box.messages.new, + //total_count: box.messages.total, }).returning('id').row(gen.ef()); boxId = row.id; } @@ -131,7 +306,7 @@ Syncer.sync = function*(account) } if (lst.length) dia.push('uid IN ('+lst.join(',')+')'); - yield pg.delete('messages').where({ folder_id: boxId }).where(pg.sql('('+dia.join(' OR ')+')')).run(gen.ef()); + yield* self.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('('+dia.join(' OR ')+')'))); } } else if (maxUid) @@ -150,22 +325,7 @@ Syncer.sync = function*(account) process.stderr.write('\n'); // delete messages removed from IMAP server - yield pg.update('threads', { first_msg: null }) - .where(pg.sql('first_msg IN ('+ - pg.select('id').from('messages') - .where({ folder_id: boxId }) - .where(pg.sql('uid is not null')) - .where(pg.sql.lt('vertag', self.versionTag)) - +')')).run(gen.ef()); - yield pg.delete('messages') - .where({ folder_id: boxId }) - .where(pg.sql('uid is not null')) - .where(pg.sql.lt('vertag', self.versionTag)).run(gen.ef()); - yield pg.update('threads', - { first_msg: pg.sql('('+ - pg.select('MIN(id)').from('messages').where({ thread_id: pg.sql('threads.id') })+')' - ) }).where(pg.sql('first_msg IS NULL')).run(gen.ef()); - yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef()); + yield* self.deleteMessages(pg.sql.and({ folder_id: boxId }, pg.sql('uid is not null'), pg.sql.lt('vertag', self.versionTag))); } // fetch new messages @@ -175,12 +335,32 @@ Syncer.sync = function*(account) bodies: 'HEADER' }, boxId, 'saveMessages'); + yield pg.update('folders', { + uidvalidity: box.uidvalidity, + //unread_count: box.messages.new, + highestmodseq: box.highestmodseq||0 + }).where({ id: row.id }).run(gen.ef()); + yield srv.closeBox(gen.cb()); } srv.end(); self.srv = null; } +Syncer.deleteMessages = function*(where) +{ + console.log(where+''); + yield pg.update('threads', { first_msg: null }) + .where(pg.sql('first_msg IN ('+pg.select('id').from('messages').where(where)+')')) + .run(gen.ef()); + yield pg.delete('messages').where(where).run(gen.ef()); + yield pg.update('threads', + { first_msg: pg.sql('('+ + pg.select('id').from('messages').where({ thread_id: pg.sql('threads.id') }).orderBy('time').limit(1) + +')') }).where(pg.sql('first_msg IS NULL')).run(gen.ef()); + yield pg.delete('threads').where(pg.sql('first_msg IS NULL')).run(gen.ef()); +} + Syncer.runFetch = function*(what, params, boxId, processor) { var self = this; @@ -190,6 +370,7 @@ Syncer.runFetch = function*(what, params, boxId, processor) self.paused = false; self.synced = 0; self.pending = []; + self.results = []; var cb, wait; f.on('message', function(msg, seqnum) @@ -204,9 +385,20 @@ Syncer.runFetch = function*(what, params, boxId, processor) if (self.parsed <= 0) cb(); else if (self.pending.length > 0) - gen.run(self[processor](self.pending, boxId), saveLast, function(e) { saveLast(); throw e; }); + { + var result = gen.run(self[processor](self.pending, boxId), saveLast, function(e) { saveLast(); throw e; }); + if (result) + self.results = self.results.concat(result); + } }); + if (self.results.length > 0) + { + let r = self.results; + delete self.results; + return r; + } + function saveLast() { self.parsed -= self.pending.length; @@ -240,9 +432,12 @@ Syncer.onMessage = function*(msg, seqnum, boxId, processor) var m = self.pending; self.pending = []; var err; + var result; try { - yield gen.run(self[processor](m, boxId), gen.cb()); + result = yield gen.run(self[processor](m, boxId), gen.cb()); + if (result) + self.results = self.results.concat(result); } catch (e) { @@ -276,7 +471,6 @@ Syncer.parseMessage = function*(msg, seqnum, boxId) }); stream.once('end', function() { - msgrow.body = ''; var b = buffer.toString('utf8'); if (b.indexOf('�') >= 0) { @@ -436,9 +630,33 @@ Syncer.addMessage = function*(msgrow, attrs) } } +Syncer.app.listen(8057); + gen.run(function*() { for (var i = 0; i < cfg.accounts.length; i++) yield* Syncer.sync(cfg.accounts[i]); - process.exit(); }); + +function genRequest(fn) +{ + return (req, res) => gen.run(fn(req, res), null, e => res.status(500).send('Internal Error: '+e)); +} + +function splitEmails(s) +{ + var re = /^[\s,]*(?:(?:["'](.*?)["']|([^<]+))\s*<([^>]+)>|]+)>?)/; // ' + var m, r = []; + while (m = re.exec(s)) + { + s = s.substr(m[0].length); + r.push({ name: (m[1]||m[2]||'').trim(), email: (m[3]||m[4]||'').trim() }); + } + return r; +} + +function toPgArray(a) +{ + a = JSON.stringify(a); + return '{'+a.substring(1, a.length-1)+'}'; +} diff --git a/package.json b/package.json index c8781c6..6707f03 100644 --- a/package.json +++ b/package.json @@ -8,12 +8,22 @@ "description": "Operetta webmail backend", "dependencies": { "gen-thread": "latest", + "htmlawed": "latest", + "body-parser": "latest", + "express": "latest", + "express-session": "latest", "iconv-lite": "latest", "imap": "latest", "mailparser": "latest", + "multer": "latest", "nodemailer": "latest", "pg": "latest", - "pg-bricks": "latest" + "pg-bricks": "latest", + "sql-bricks": "latest", + "socket.io": "latest" + }, + "peerDependencies": { + "sql-bricks": ">=1.4.0" }, "devDependencies": { "babel-cli": "latest",