diff --git a/Syncer.js b/Syncer.js index 5a9e8fa..393df2e 100644 --- a/Syncer.js +++ b/Syncer.js @@ -1,6 +1,7 @@ const gen = require('gen-thread'); const Imap = require('imap'); const ImapManager = require('./ImapManager.js'); +const EventEmitter = require('events').EventEmitter; module.exports = Syncer; @@ -11,6 +12,7 @@ function Syncer(pg) this.imap = new ImapManager(); this.runIdle = this.runIdle.bind(this); this.stopIdle = this.stopIdle.bind(this); + this.events = new EventEmitter(); } Syncer.prototype.init = function*(cfg) @@ -26,6 +28,7 @@ Syncer.prototype.syncAll = function*() for (var id in this.accounts) yield* this.syncAccount(this.accounts[id]); this.syncInProgress = false; + this.events.emit('sync', { state: 'complete' }); } Syncer.prototype.addAccount = function*(account) @@ -204,6 +207,7 @@ Syncer.prototype.syncBox = function*(srv, accountId, boxName, boxKind, doFull) .where({ folder_id: boxRow.id }).val(gen.ef()); if (boxStatus.highestmodseq) { + this.events.emit('sync', { state: 'start', quick: true, email: this.accounts[accountId].email, folder: boxRow.name }); process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': quick resync\n'); yield* this.quickResync(srv, boxRow.id, maxUid, boxRow.highestmodseq, missing); boxRow.highestmodseq = boxStatus.highestmodseq; @@ -211,8 +215,9 @@ Syncer.prototype.syncBox = function*(srv, accountId, boxName, boxKind, doFull) else if (doFull && maxUid) { // list messages, update flags and version tag + this.events.emit('sync', { state: 'start', email: this.accounts[accountId].email, folder: boxRow.name }); process.stderr.write(this.accounts[accountId].email+'/'+boxRow.name+': full resync\n'); - yield* this.fullResync(srv, boxRow.id, maxUid, missing); + yield* this.fullResync(srv, boxRow.id, maxUid, missing, boxStatus.messages.total); } missing.push((maxUid ? maxUid+1 : 1)+':*'); @@ -227,7 +232,7 @@ Syncer.prototype.syncBox = function*(srv, accountId, boxName, boxKind, doFull) }).where({ id: boxRow.id }).run(gen.ef()); } -Syncer.prototype.fullResync = function*(srv, boxId, maxUid, missing) +Syncer.prototype.fullResync = function*(srv, boxId, maxUid, missing, total) { var [ flags ] = yield this.pg.select('uid, flags').from('messages').where({ folder_id: boxId }).rows(gen.ef()); flags = flags.reduce((o, row) => { o[row.uid] = row.flags; return o; }, {}); @@ -238,9 +243,10 @@ Syncer.prototype.fullResync = function*(srv, boxId, maxUid, missing) yield* this.imap.runFetch( srv, '1:'+maxUid, {}, (messages, state) => this.queueFlags(messages, boxId, state), - { flags: flags, updateFlags: updateFlags, missing: missing||[] } + { flags: flags, updateFlags: updateFlags, missing: missing||[], total: total } ); process.stderr.write('\n'); + this.events.emit('sync', { state: 'finish-box' }); yield* this.updateFlags(boxId, updateFlags); @@ -265,6 +271,7 @@ Syncer.prototype.queueFlags = function*(messages, boxId, fetchState) } } fetchState.synced += messages.length; + this.events.emit('sync', { state: 'progress', done: fetchState.synced, total: fetchState.total }); process.stderr.write('\rsynchronizing '+fetchState.synced); } diff --git a/SyncerWeb.js b/SyncerWeb.js index 4dc98e6..6c261f8 100644 --- a/SyncerWeb.js +++ b/SyncerWeb.js @@ -2,6 +2,8 @@ const gen = require('gen-thread'); const MailParser = require('mailparser').MailParser; const htmlawed = require('htmlawed'); +const http = require('http'); +const socket_io = require('socket.io'); const express = require('express'); const express_session = require('express-session'); const bodyparser = require('body-parser'); @@ -17,6 +19,8 @@ function SyncerWeb(syncer, pg, cfg) this.pg = pg; this.cfg = cfg; this.app = express(); + this.http = http.Server(this.app); + this.io = socket_io(this.http); this.app.use(bodyparser.urlencoded({ extended: false })); this.app.use(express_session({ secret: this.cfg.sessionSecret || '1083581xm1l3s1l39k', @@ -29,6 +33,12 @@ function SyncerWeb(syncer, pg, cfg) this.app.get('/messages', genRequest(this.get_messages.bind(this))); this.app.get('/message', genRequest(this.get_message.bind(this))); this.app.post('/sync', genRequest(this.post_sync.bind(this))); + this.syncer.events.on('sync', this.syncer_sync.bind(this)); +} + +SyncerWeb.prototype.listen = function(port) +{ + this.http.listen(port); } SyncerWeb.prototype.get_auth = function(req, res) @@ -122,13 +132,18 @@ SyncerWeb.prototype.get_message = function*(req, res) return res.send({ msg: msg }); } +SyncerWeb.prototype.syncer_sync = function(params) +{ + this.io.emit('sync', params); +} + SyncerWeb.prototype.post_sync = function*(req, res) { if (this.cfg.login && (!req.session || !req.session.auth)) return res.sendStatus(401); - if (self.syncer.syncInProgress) + if (this.syncer.syncInProgress) return res.send({ error: 'already-running' }); - gen.run(self.syncer.syncAll()); + gen.run(this.syncer.syncAll()); return res.send({ status: 'started' }); } diff --git a/operetta.js b/operetta.js index 607aa2c..5a946e8 100644 --- a/operetta.js +++ b/operetta.js @@ -42,7 +42,7 @@ var syncerweb = new SyncerWeb(syncer, pg, cfg); gen.run(function*() { yield* syncer.init(cfg); - yield* syncer.syncAll(); + //yield* syncer.syncAll(); }); -syncerweb.app.listen(8057); +syncerweb.listen(8057);