|
1 | 1 | const Hapi = require('hapi'); |
2 | 2 | const Boom = require('boom'); |
3 | 3 | const Http2 = require('http2'); |
| 4 | +const Susie = require('susie'); |
4 | 5 |
|
| 6 | +require('events').EventEmitter.defaultMaxListeners = 50 //Set warning higher then normal to handle many clients |
5 | 7 |
|
6 | 8 | class OrbitdbAPI { |
7 | 9 | constructor (dbm, server_opts) { |
8 | 10 | let comparisons, rawiterator, getraw, unpack_contents, listener; |
9 | | - let dbMiddleware; |
| 11 | + let dbMiddleware, addEventListener; |
10 | 12 |
|
11 | 13 | listener = Http2.createSecureServer(server_opts.http2_opts); |
12 | 14 | this.server = new Hapi.Server({ |
@@ -54,6 +56,41 @@ class OrbitdbAPI { |
54 | 56 | return contents |
55 | 57 | }; |
56 | 58 |
|
| 59 | + addEventListener = (db, event_name, request, h) => { |
| 60 | + let event_map = new Map(Object.entries({ |
| 61 | + 'replicated': (address) => |
| 62 | + h.event({event:'replicated', data: {address:address}}), |
| 63 | + 'replicate': (address) => |
| 64 | + h.event({event:'replicate', data: {address:address}}), |
| 65 | + 'replicate.progress': (address, hash, entry, progress, have) => |
| 66 | + h.event({event:'replicate.progress', data: {address:address, hash:hash, entry:entry, progress:progress, have:have}}), |
| 67 | + 'load': (dbname) => |
| 68 | + h.event({event:'load', data: {dbname:dbname}}), |
| 69 | + 'load.progress': (address, hash, entry, progress, total) => |
| 70 | + h.event({event:'load.progress', data: {address:address, hash:hash, entry:entry, progress:progress, total:total}}), |
| 71 | + 'ready': (dbname, heads) => |
| 72 | + h.event({event:'ready', data: {dbname:dbname, heads:heads}}), |
| 73 | + 'write': (dbname, hash, entry) => |
| 74 | + h.event({event:'write', data: {dbname:dbname, hash:hash, entry:entry}}), |
| 75 | + 'closed': (dbname) => |
| 76 | + h.event({event:'closed', data: {dbname:dbname}}) |
| 77 | + })); |
| 78 | + |
| 79 | + let event_callback = event_map.get(event_name) |
| 80 | + if(event_callback){ |
| 81 | + db.events.on(event_name, event_callback) |
| 82 | + let keepalive = setInterval(() => h.event({event:'keep-alive'}), 10000) |
| 83 | + request.events.on('disconnect', () => { |
| 84 | + db.events.removeListener(event_name, event_callback) |
| 85 | + clearInterval(keepalive) |
| 86 | + }) |
| 87 | + } else { |
| 88 | + if(this.debug) throw Boom.badRequest(`Unrecognized event name: $(event_name)`) |
| 89 | + throw Boom.badRequest('Unrecognized event name') |
| 90 | + } |
| 91 | + } |
| 92 | + |
| 93 | + Promise.resolve(this.server.register(Susie)).catch((err) => {throw err}); |
57 | 94 | this.server.route([ |
58 | 95 | { |
59 | 96 | method: 'GET', |
@@ -229,7 +266,17 @@ class OrbitdbAPI { |
229 | 266 | await db.access.grant('write', request.payload.publicKey) |
230 | 267 | return {}; |
231 | 268 | }) |
232 | | - } |
| 269 | + }, |
| 270 | + { |
| 271 | + method: 'GET', |
| 272 | + path: '/db/{dbname}/events/{eventname}', |
| 273 | + handler: dbMiddleware( async (db, request, h) => { |
| 274 | + let events = request.params.eventname.split(',') |
| 275 | + events.forEach((event_name) => addEventListener(db,event_name, request, h)); |
| 276 | + return h.event({event:'registered', data: {eventnames:events}}) |
| 277 | + }) |
| 278 | + }, |
| 279 | + |
233 | 280 | ]); |
234 | 281 | } |
235 | 282 | } |
|
0 commit comments