Skip to content

Commit 80492bf

Browse files
Step 6 - Add queue worker
1 parent 5850186 commit 80492bf

File tree

3 files changed

+57
-1
lines changed

3 files changed

+57
-1
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
},
1313
"dependencies": {
1414
"ably": ">=0.9.0-beta",
15+
"amqplib": "^0.4",
1516
"express": ">=4.14.0"
1617
}
1718
}

server.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const Ably = require('ably');
22
const Express = require('express');
33
const ServerPort = 3000;
4+
const worker = require('./worker');
45

56
const ApiKey = 'INSERT-YOUR-API-KEY-HERE'; /* Add your API key here */
67
if (ApiKey.indexOf('INSERT') === 0) { throw('Cannot run without an API key. Add your key to server.js'); }
@@ -27,4 +28,6 @@ app.get('/auth', function (req, res) {
2728
app.use(Express.static('public'));
2829
app.listen(3000);
2930

30-
console.log('Web server listening on port', ServerPort);
31+
worker.start(ApiKey, 'wolfram:answers', 'wolfram', 'us-east-1-a-queue.ably.io:5671/shared');
32+
33+
console.log('Open the Wolfram demo in your browser: https://localhost:' + ServerPort + '/');

worker.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
'use strict';
2+
3+
const amqp = require('amqplib/callback_api');
4+
const Ably = require('ably');
5+
6+
/* Start the worker that consumes from the AMQP QUEUE */
7+
exports.start = function(apiKey, answersChannelName, queueName, queueEndpoint) {
8+
const appId = apiKey.split('.')[0];
9+
const queue = appId + ":" + queueName;
10+
const endpoint = queueEndpoint;
11+
const url = 'amqps://' + apiKey + '@' + endpoint;
12+
const rest = new Ably.Rest({ key: apiKey });
13+
const answersChannel = rest.channels.get(answersChannelName);
14+
15+
/* Connect to Ably queue */
16+
amqp.connect(url, (err, conn) => {
17+
if (err) {
18+
console.error('worker:', 'Queue error!', err);
19+
return;
20+
}
21+
console.log('worker:', 'Connected to AMQP endpoint', endpoint);
22+
23+
/* Create a communication channel */
24+
conn.createChannel((err, ch) => {
25+
if (err) {
26+
console.error('worker:', 'Queue error!', err);
27+
return;
28+
}
29+
console.log('worker:', 'Waiting for messages');
30+
31+
/* Wait for messages published to the Ably Reactor queue */
32+
ch.consume(queue, (item) => {
33+
const decodedEnvelope = JSON.parse(item.content);
34+
35+
const messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages);
36+
messages.forEach(function(message) {
37+
console.log('worker:', 'Received question', message.data, ' - about to ask Wolfram');
38+
answersChannel.publish('answer', 'Placeholder until Wolfram connected', function(err) {
39+
if (err) {
40+
console.error('worker:', 'Failed to publish question', message.data, ' - err:', JSON.stringify(err));
41+
}
42+
})
43+
});
44+
45+
/* Remove message from queue */
46+
ch.ack(item);
47+
});
48+
});
49+
50+
conn.on('error', function(err) { console.error('worker:', 'Connection error!', err); });
51+
});
52+
};

0 commit comments

Comments
 (0)