Use pg-bricks, babel with destructuring

master
Vitaliy Filippov 2016-06-27 16:03:06 +03:00
parent b0f9635380
commit 84e801d31e
5 changed files with 51 additions and 387 deletions

4
.babelrc Normal file
View File

@ -0,0 +1,4 @@
{
"plugins": [ "transform-es2015-destructuring" ],
"retainLines": true
}

3
db.sql
View File

@ -9,6 +9,7 @@ create table accounts (
-- out_server varchar(255) not null,
-- reply_to
);
create unique index accounts_email on accounts (email);
create table folders (
id serial not null primary key,
@ -18,7 +19,7 @@ create table folders (
unread_count int not null,
foreign key (account_id) references accounts (id) on delete cascade on update cascade
);
create unique key folders_name on folders (name);
create unique index folders_name on folders (name);
create table messages (
id serial not null primary key,

View File

@ -1,91 +1,15 @@
var cfg = require('./cfg.json');
console.log(cfg);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
var Imap = require('imap');//, ImapPromise = require('imap-promise');
var cfg = require('./cfg.json');
var gen = require('gen-thread');
var Imap = require('imap');
var inspect = require('util').inspect;
var pg;
try { require('pg-native'); pg = require('pg').native; }
catch(e) { pg = require('pg'); }
//var pg;
//try { require('pg-native'); pg = require('pg').native; }
//catch(e) { pg = require('pg'); }
//var pg_pool = new pg.Pool(cfg.pg);
var pg_pool = new pg.Pool(cfg.pg);
//var Promise = require('bluebird');
/*function onready(srv)
{
return new Promise(function(resolve)
{
srv.openBox(box, readOnly, resolve);
});
}*/
function runThread(main, done)
{
var thread = function()
{
thread.args = Array.prototype.slice.call(arguments, 0);
thread.checkDone();
};
thread.checkDone = function()
{
var v = thread.gen.next();
if (v.done && done)
done(v.value);
};
thread.gen = main(thread);
thread.checkDone();
}
function runParallel(threads, done)
{
var results = [];
var resultCount = 0;
var allDone = function(i, result)
{
if (!results[i])
{
results[i] = result;
resultCount++;
if (resultCount == threads.length)
done(results);
}
};
threads.map((t, i) => runThread(t, function(result) { allDone(i, result); }));
}
function threadPhase(thread, phase)
{
return function()
{
thread.phase = phase;
return thread.apply(this, arguments);
};
}
function* test(thread)
{
console.log('start');
yield setTimeout(function() { thread('zhopa', 123); }, 500);
console.log([ 'next', thread.args ]);
yield runParallel([
function*(thread)
{
yield setTimeout(function() { thread('callback 1'); }, 500);
return 'result 1';
},
function*(thread)
{
yield setTimeout(function() { thread('callback 2'); }, 500);
return 'result 2';
}
], thread);
console.log('abc');
console.log(thread.args);
return 'result';
}
//runThread(test, function(result) { console.log(result); });
var pg = require('pg-bricks').configure('postgresql://'+cfg.pg.user+':'+cfg.pg.password+'@'+(cfg.pg.host||'')+':'+cfg.pg.port+'/'+cfg.pg.database);
function splitEmails(s)
{
@ -96,19 +20,32 @@ function splitEmails(s)
return r;
}
function* main(thread)
function* main(thread, account)
{
yield pg_pool.connect(thread); // => (err, client, done);
var srv = new Imap(cfg.imap);
var accountId;
var [ err, row ] = yield pg.select('id').from('accounts').where({ email: account.email }).row(thread);
if (err) throw err;
if (row && row.id)
accountId = row.id;
else
{
var [ err, row ] = yield pg.insert('accounts', {
name: account.name,
email: account.email,
settings: {
imap: account.imap
}
}).returning('id').row(thread);
if (err) throw err;
accountId = row.id;
}
var srv = new Imap(account.imap);
srv.once('ready', thread);
yield srv.connect();
yield srv.getBoxes(thread);
var boxes = thread.args[1];
var [ err, boxes ] = yield srv.getBoxes(thread);
for (var k in boxes)
{
console.log('load '+k);
yield srv.openBox(k, true, thread); // => (err, box)
var box = thread.args[1];
var [ err, box ] = yield srv.openBox(k, true, thread);
var boxrow = {
name: box.name,
uidvalidity: box.uidvalidity,
@ -121,11 +58,10 @@ function* main(thread)
});
f.on('message', function(msg)
{
runThread(function*(thread)
gen.run(function*(thread)
{
var msgrow = {};
yield msg.on('body', thread);
var stream = thread.args[0];
var [ stream, info ] = yield msg.on('body', thread);
var buffer = '';
stream.on('data', function(chunk)
{
@ -141,8 +77,7 @@ function* main(thread)
msgrow.date = header.date && header.date[0];
msgrow.messageid = header['message-id'] && header['message-id'][0];
msgrow.inreplyto = header['in-reply-to'] && header['in-reply-to'][0];
yield msg.once('attributes', thread);
var attrs = thread.args[0] || {};
var [ attrs ] = yield msg.once('attributes', thread);
msgrow.date = msgrow.date || attrs.date;
msgrow.uid = attrs.uid;
yield msg.once('end', thread);
@ -155,290 +90,4 @@ function* main(thread)
srv.end();
}
runThread(main);
return;
var srv = new Imap(cfg.imap);
/*
srv.connectAsync()
.then(function() { console.log('connected'); })
.then(function() { return srv.getBoxesAsync(); })
.then()
.then(function() { return srv.openBoxAsync('INBOX', true); })*/
srv.once('ready', function()
{
console.log('connected');
srv.getBoxes(function(err, boxes)
{
for (var k in { INBOX: 1 })
{
console.log(k);
srv.openBox(k, true, function(err, box)
{
var f = srv.fetch('1:*', {
size: true,
bodies: 'HEADER.FIELDS (FROM TO SUBJECT DATE)'
});
f.on('message', function(msg, seqno)
{
console.log('message '+seqno);
var msgrow = {};
msg.on('body', function(stream, info)
{
console.log('message '+seqno+' body');
var buffer = '';
stream.on('data', function(chunk)
{
buffer += chunk.toString('utf8');
});
stream.once('end', function()
{
console.log('message '+seqno+' body end');
var header = Imap.parseHeader(buffer);
msgrow.from = header.from;
msgrow.to = header.to;
msgrow.subject = header.subject;
});
});
msg.once('attributes', function(attrs)
{
console.log('message '+seqno+' attributes');
msgrow.date = new Date(attrs.date);
msgrow.uid = attrs.uid;
});
msg.once('end', function()
{
console.log('message '+seqno+' end');
// console.log(msgrow);
});
});
f.on('end', function()
{
console.log('fetch done');
// 'done fetching';
});
});
}
});
});
srv.once('error', function(err) {
console.log(err);
});
srv.once('end', function() {
console.log('Connection ended');
});
srv.connect();
/*var connect = require('connect');
var url = require('url');
var sse = require('connect-sse')();
var clientId = 1;
var clients = {};
var subs = {};
var app = connect();
app.use('/sse', sse);
app.use('/sse', subscribe);
app.use('/notify', notify);
app.use('/test.htm', function(req, res)
{
// Test page
res.writeHead(200, {'Content-Type': 'text/html'});
res.end('<html>\n\
<script>\n\
setTimeout(function() {\n\
var es = new EventSource("/sse");\n\
es.addEventListener("change", function (event) {\n\
console.log(event);\n\
}); }, 100);\n\
</script>\n\
<body></body>\n\
</html>');
});
module.exports = app;
function publishEvent(toNotify, subs, event)
{
for (var key in subs)
{
if (!key)
{
for (var clid in subs[key])
{
toNotify[clid] = toNotify[clid] || [];
toNotify[clid].push(event);
}
}
else
{
for (var value in subs[key])
if ((''+event[key]) == value || !event[key])
publishEvent(toNotify, subs[key][value], event);
}
}
}
function notify(req, res)
{
if (req.method == 'POST')
{
var body = '';
req.on('data', function(data)
{
body += data;
if (body.length > 10000)
{
req.connection.destroy();
}
});
req.on('end', function()
{
var data;
try
{
data = JSON.parse(body);
}
catch (e)
{
}
if (data)
{
var toNotify = {};
for (var i = 0; i < data.length; i++)
{
publishEvent(toNotify, subs, data[i]);
}
for (var cl in toNotify)
{
clients[cl][1].json(toNotify[cl], 'change');
}
res.writeHead(200, {'Content-Type': 'text/plain'});
}
else
{
res.writeHead(400, {'Content-Type': 'text/plain'});
}
res.end();
});
}
}
function isEmpty(obj)
{
for (var k in obj)
return false;
return true;
}
function saveSubscriptions(clientId, params, ifSubscribe)
{
var loops = [];
for (var k in params)
{
loops.push([ k, params[k] instanceof Array ? params[k] : [ params[k] ], 0 ]);
}
loops = loops.sort(function(a, b)
{
if (a[0] < b[0])
return -1;
else if (a[0] > b[0])
return 1;
return 0;
});
loops[0][3] = subs;
var k, cur;
var i = 0;
while (true)
{
if (i < loops.length)
{
if (loops[i][2] >= loops[i][1].length)
{
if (!ifSubscribe)
{
cur = loops[i][3];
if (isEmpty(cur[loops[i][0]]))
{
delete cur[loops[i][0]];
if (isEmpty(cur) && i > 0)
delete loops[i-1][3][loops[i-1][0]][loops[i-1][1][loops[i-1][2]-1]];
}
}
if (!i)
break;
loops[i][2] = 0;
i--;
}
else
{
cur = loops[i][3];
k = loops[i][0];
cur[k] = cur[k] || {};
cur = cur[k];
k = loops[i][1][loops[i][2]];
cur[k] = cur[k] || {};
cur = cur[k];
loops[i][2]++;
i++;
if (i < loops.length)
loops[i][3] = cur;
}
}
else
{
cur[''] = cur[''] || {};
if (ifSubscribe)
cur[''][clientId] = 1;
else
{
delete cur[''][clientId];
if (isEmpty(cur['']))
delete cur[''];
if (i > 0 && isEmpty(cur))
delete loops[i-1][3][loops[i-1][0]][loops[i-1][1][loops[i-1][2]-1]];
}
if (i > 0)
i--;
else
break;
}
}
}
function subscribe(req, res)
{
req._id = clientId++;
var s = {};
var params = url.parse(req.url, true).query;
var sp = { type: [ 'commitment', 'override' ] };
var p = params && params.plans ? params.plans.split(',') : [ 0 ];
for (var i = 0; i < p.length; i++)
{
if (!p[i])
{
p = null;
break;
}
}
if (p)
sp[plan_id] = p;
sp.instance = params && params.instance || 0;
saveSubscriptions(req._id, sp, true);
req._subscribeParams = sp;
clients[req._id] = [ req, res ];
req.on('close', unsubscribe);
}
function unsubscribe()
{
saveSubscriptions(this._id, this._subscribeParams, false);
delete clients[this._id];
}
*/
gen.run(main, cfg.accounts[0]);

View File

@ -7,10 +7,19 @@
"name": "operetta-backend",
"description": "Operetta webmail backend",
"dependencies": {
"gen-thread": "latest",
"imap": "^0.8.17",
"imap-promise": "^1.0.2",
"mailparser": "^0.6.0",
"nodemailer": "^2.4.2",
"pg": "^6.0.1"
"pg": "^6.0.1",
"pg-bricks": "^0.4.0"
},
"devDependencies": {
"babel-cli": "^6.10.1",
"babel-plugin-transform-es2015-destructuring": "^6.9.0"
},
"scripts": {
"run": "babel operetta.js | nodejs"
}
}

1
run.sh Executable file
View File

@ -0,0 +1 @@
node_modules/.bin/babel operetta.js | nodejs