Skip to content

Commit 8bfb9a4

Browse files
committed
feat(rabbitmq): wIP rabbitmq adapter
1 parent 8d0dc6b commit 8bfb9a4

14 files changed

+457
-8
lines changed

package-lock.json

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
"@types/amqplib": "^0.5.13",
9292
"@types/jest": "^24.0.17",
9393
"@types/node": "^12.6.8",
94+
"@types/uuid": "^3.4.5",
9495
"colors": "^1.3.3",
9596
"commitizen": "^4.0.3",
9697
"coveralls": "^3.0.5",

rollup.config.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ export default {
1818
// Indicate here external modules you don't wanna include in your bundle (i.e.: 'lodash')
1919
external: [
2020
'amqplib',
21-
'amqp-connection-manager'
21+
'amqp-connection-manager',
22+
'uuid'
2223
],
2324
watch: {
2425
include: 'src/**'

src/adapter/amqp/amqp-adapter.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { connect, AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager'
2+
import { ConnectionOptions } from 'tls'
3+
import { URL } from 'url'
4+
5+
import { Broker, BrokerSender, BrokerReceiver, Connection } from '../../domain/model/broker.model'
6+
import { AmqpSender } from './amqp-sender'
7+
import { AmqpReceiver } from './amqp-receiver'
8+
9+
export class AmqpBroker implements Broker {
10+
connectionManager?: AmqpConnectionManager
11+
channel?: ChannelWrapper
12+
sender?: BrokerSender
13+
receiver?: BrokerReceiver
14+
15+
constructor(private url: string, private options?: ConnectionOptions) {}
16+
17+
connect(): Connection {
18+
if (this.connectionManager) {
19+
throw new Error('AMQP connection already created')
20+
}
21+
22+
const connection = connect(
23+
[this.url],
24+
{ connectionOptions: this.options }
25+
)
26+
27+
const channel = connection.createChannel({
28+
json: true
29+
})
30+
31+
this.connectionManager = connection
32+
this.channel = channel
33+
this.sender = new AmqpSender(channel)
34+
this.receiver = new AmqpReceiver(channel)
35+
36+
return {
37+
onConnect: listener => {
38+
connection.on('connect', connected => {
39+
const url = new URL(connected.url)
40+
listener({ host: url.host })
41+
})
42+
},
43+
onDisconnect: listener => {
44+
connection.on('disconnect', err => listener(err.err))
45+
}
46+
}
47+
}
48+
49+
getSender(): AmqpSender {
50+
if (!this.connectionManager) {
51+
throw new Error('AMQP sender must be accessed after connection creation')
52+
}
53+
54+
return this.sender as AmqpSender
55+
}
56+
57+
getReceiver(): AmqpReceiver {
58+
if (!this.connectionManager) {
59+
throw new Error('AMQP receiver must be accessed after connection creation')
60+
}
61+
62+
return this.receiver as AmqpReceiver
63+
}
64+
}

src/adapter/amqp/amqp-receiver.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { ChannelWrapper } from 'amqp-connection-manager'
2+
import { ConfirmChannel } from 'amqplib'
3+
4+
import { BrokerReceiver, MessageConsumer } from '../../domain/model/broker.model'
5+
6+
export class AmqpReceiver implements BrokerReceiver {
7+
constructor(private channel: ChannelWrapper) {}
8+
9+
async consume(queueName: string, consumer: MessageConsumer): Promise<void> {
10+
await this.channel.addSetup((channel: ConfirmChannel) => {
11+
return channel.consume(queueName, message => {
12+
if (message) {
13+
consumer(JSON.parse(message.content.toString()))
14+
.then(() => channel.ack(message))
15+
.catch(() => {
16+
channel.nack(message)
17+
})
18+
}
19+
})
20+
})
21+
}
22+
}

src/adapter/amqp/amqp-sender.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { ChannelWrapper } from 'amqp-connection-manager'
2+
import { ConfirmChannel } from 'amqplib'
3+
4+
import {
5+
BrokerSender,
6+
QueueSpecitication,
7+
ExchangeSpecification,
8+
BindingSpecification
9+
} from '../../domain/model/broker.model'
10+
11+
export class AmqpSender implements BrokerSender {
12+
constructor(private channel: ChannelWrapper) {}
13+
14+
declareQueue({ name, ...rest }: QueueSpecitication): Promise<void> {
15+
return this.channel.addSetup((channel: ConfirmChannel) => channel.assertQueue(name, rest))
16+
}
17+
18+
declareTopic({ name }: ExchangeSpecification): Promise<void> {
19+
return this.channel.addSetup((channel: ConfirmChannel) => channel.assertExchange(name, 'topic'))
20+
}
21+
22+
bind({ queue, topic, routingKey }: BindingSpecification): Promise<void> {
23+
return this.channel.addSetup((channel: ConfirmChannel) =>
24+
channel.bindQueue(queue, topic, routingKey)
25+
)
26+
}
27+
28+
publish(topic: string, routingKey: string, content: object) {
29+
return this.channel.publish(topic, routingKey, content as Buffer, {
30+
deliveryMode: 2
31+
})
32+
}
33+
}

src/domain/handler-registry.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import {
2+
CommandHandler,
3+
HandlerRegistryRoutes,
4+
QueryHandler,
5+
EventListener
6+
} from './model/message-handlers.model'
7+
8+
export class HandlerRegistry {
9+
commandHandlers: HandlerRegistryRoutes<CommandHandler<any>> = {}
10+
queryHandlers: HandlerRegistryRoutes<QueryHandler<any, any>> = {}
11+
eventListeners: HandlerRegistryRoutes<EventListener<any>> = {}
12+
13+
private constructor() {}
14+
15+
public static register() {
16+
return new HandlerRegistry()
17+
}
18+
19+
public handleCommand<M>(commandName: string, handler: CommandHandler<M>): void {
20+
this.push(this.commandHandlers, commandName, handler)
21+
}
22+
23+
public serverQuery<M, R>(queryName: string, handler: QueryHandler<M, R>): void {
24+
this.push(this.queryHandlers, queryName, handler)
25+
}
26+
27+
public listenEvent<M>(eventName: string, handler: EventListener<M>): void {
28+
this.push(this.eventListeners, eventName, handler)
29+
}
30+
31+
private push<H>(routes: HandlerRegistryRoutes<H>, path: string, handler: H) {
32+
if (!routes[path]) {
33+
routes[path] = []
34+
}
35+
36+
routes[path].push(handler)
37+
}
38+
}

src/domain/model/broker.model.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
export interface QueueSpecitication {
2+
name: string
3+
exclusive: boolean
4+
}
5+
6+
export interface ExchangeSpecification {
7+
name: string
8+
}
9+
10+
export interface BindingSpecification {
11+
queue: string
12+
topic: string
13+
routingKey: string
14+
}
15+
16+
export interface Connection {
17+
onConnect(listener: (event: { host: string }) => void): void
18+
onDisconnect(listener: (err: Error) => void): void
19+
}
20+
21+
export type MessageConsumer = (messageContent: object) => Promise<void>
22+
23+
export interface BrokerSender {
24+
declareQueue(specification: QueueSpecitication): Promise<void>
25+
26+
declareTopic(specification: ExchangeSpecification): Promise<void>
27+
28+
bind(specification: BindingSpecification): Promise<void>
29+
}
30+
31+
export interface BrokerReceiver {
32+
consume(queueName: string, consumer: MessageConsumer): Promise<void>
33+
}
34+
35+
export interface Broker {
36+
connect(): Connection
37+
getSender(): BrokerSender
38+
getReceiver(): BrokerReceiver
39+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export type CommandHandler<M> = (message: M) => Promise<void>
2+
export type QueryHandler<M, R> = (message: M) => Promise<R>
3+
export type EventListener<M> = (message: M) => Promise<void>
4+
5+
export type HandlerRegistryRoutes<H> = {
6+
[path: string]: H[]
7+
}

test/adapter/amqp-adapter.test.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { connect, AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager'
2+
import { ConnectionOptions } from 'tls'
3+
4+
import { AmqpBroker } from '../../src/adapter/amqp/amqp-adapter'
5+
import { AmqpSender } from '../../src/adapter/amqp/amqp-sender'
6+
import { AmqpReceiver } from '../../src/adapter/amqp/amqp-receiver'
7+
8+
jest.mock('amqp-connection-manager', () => ({
9+
_esModule: true,
10+
connect: jest.fn(() => ({
11+
createChannel: jest.fn(),
12+
on: jest.fn()
13+
}))
14+
}))
15+
16+
const mockedConnect = connect as jest.MockedFunction<typeof connect>
17+
const url = 'amqp://myrabbitserver'
18+
const options: ConnectionOptions = { timeout: 10 }
19+
20+
describe(`AMQP Broker`, () => {
21+
let broker: AmqpBroker
22+
23+
beforeEach(() => {
24+
jest.clearAllMocks()
25+
broker = new AmqpBroker(url, options)
26+
})
27+
28+
it(`connects using connection manager`, () => {
29+
broker.connect()
30+
31+
const connectionManager = mockedConnect.mock.results[0].value as AmqpConnectionManager
32+
33+
expect(mockedConnect).toHaveBeenCalledTimes(1)
34+
expect(mockedConnect.mock.calls[0][0]).toStrictEqual([url])
35+
expect(mockedConnect.mock.calls[0][1]).toStrictEqual({ connectionOptions: options })
36+
expect(broker.connectionManager).toBe(connectionManager)
37+
38+
const mockedChannel = connectionManager.createChannel as jest.MockedFunction<
39+
typeof connectionManager.createChannel
40+
>
41+
42+
expect(mockedChannel).toHaveBeenCalledTimes(1)
43+
expect(mockedChannel.mock.calls[0][0]).toStrictEqual({ json: true })
44+
})
45+
46+
it(`connect can only be called once`, () => {
47+
broker.connect()
48+
expect(() => broker.connect()).toThrow('AMQP connection already created')
49+
})
50+
51+
it(`must create an instance of AmqpSender`, () => {
52+
broker.connect()
53+
54+
expect(broker.getSender()).toBeInstanceOf(AmqpSender)
55+
})
56+
57+
it(`must create an instance of AmqpReceiver`, () => {
58+
broker.connect()
59+
60+
expect(broker.getReceiver()).toBeInstanceOf(AmqpReceiver)
61+
})
62+
63+
it(`AmqpSender can only be accessed after connect`, () => {
64+
expect(() => broker.getSender()).toThrow(
65+
'AMQP sender must be accessed after connection creation'
66+
)
67+
})
68+
69+
it(`AmqpReceiver can only be accessed after connect`, () => {
70+
expect(() => broker.getReceiver()).toThrow(
71+
'AMQP receiver must be accessed after connection creation'
72+
)
73+
})
74+
75+
it(`should call onConnect callback when connectionManager emits it`, () => {
76+
const connection = broker.connect()
77+
const connectionManager = mockedConnect.mock.results[0].value as AmqpConnectionManager
78+
const mockedOn = connectionManager.on as jest.MockedFunction<typeof connectionManager.on>
79+
80+
const onConnect = jest.fn()
81+
connection.onConnect(onConnect)
82+
83+
const listener = mockedOn.mock.calls[0][1] as (arg: object) => void
84+
listener({ url })
85+
86+
expect(mockedOn.mock.calls[0][0]).toBe('connect')
87+
expect(onConnect.mock.calls[0][0]).toStrictEqual({ host: 'myrabbitserver' })
88+
})
89+
90+
it(`should call onDisconnect callback when connectionManager emits it`, () => {
91+
const connection = broker.connect()
92+
const connectionManager = mockedConnect.mock.results[0].value as AmqpConnectionManager
93+
const mockedOn = connectionManager.on as jest.MockedFunction<typeof connectionManager.on>
94+
const error = Error('something went wrong')
95+
96+
const onDisconnect = jest.fn()
97+
connection.onDisconnect(onDisconnect)
98+
99+
const listener = mockedOn.mock.calls[0][1]
100+
listener({ err: error })
101+
102+
expect(mockedOn.mock.calls[0][0]).toBe('disconnect')
103+
expect(onDisconnect.mock.calls[0][0]).toBe(error)
104+
})
105+
})

0 commit comments

Comments
 (0)