Skip to content

Commit 24ea25b

Browse files
committed
feat(commands): add support for command listener
1 parent ba42f71 commit 24ea25b

File tree

13 files changed

+121
-42
lines changed

13 files changed

+121
-42
lines changed

example/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ registry.serveQuery<string, string>('cosa.traeme', request => {
1919

2020
app
2121
.start()
22-
.then(connection => {
23-
connection.onConnect(() => console.log('Connection stablished'))
24-
connection.onDisconnect(err => console.error(err))
22+
.then(context => {
23+
context.connection.onConnect(() => console.log('Connection stablished'))
24+
context.connection.onDisconnect(err => console.error(err))
2525

2626
const appCreated = createEvent<string>('myInstance.appCreated')
27-
connection.eventBus
27+
context.eventBus
2828
.emit(appCreated('133', 'myApp'))
2929
.then(() => console.log('this will succeed'))
3030
.catch(err => console.error(err))

src/adapter/amqp/amqp-connection.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,13 @@ import { v4 as uuid } from 'uuid'
44
import { Connection } from '../../domain/model/broker.model'
55
import { AmqpReceiver } from './amqp-receiver'
66
import { AmqpSender } from './amqp-sender'
7-
import { EventBus } from '../../domain/api/emitters/event-bus'
87

98
export class AmqpConnection implements Connection {
109
public id: string
1110

1211
private channel: ChannelWrapper
1312
private _sender: AmqpSender
1413
private _receiver: AmqpReceiver
15-
private _eventBus: EventBus
1614

1715
get sender() {
1816
return this._sender
@@ -22,10 +20,6 @@ export class AmqpConnection implements Connection {
2220
return this._receiver
2321
}
2422

25-
get eventBus() {
26-
return this._eventBus
27-
}
28-
2923
constructor(private connectionManager: AmqpConnectionManager) {
3024
this.id = uuid().replace(/-/g, '')
3125

@@ -35,7 +29,6 @@ export class AmqpConnection implements Connection {
3529

3630
this._sender = new AmqpSender(this.channel)
3731
this._receiver = new AmqpReceiver(this.channel)
38-
this._eventBus = new EventBus(this._sender, 'domainEvents')
3932
}
4033

4134
onConnect(listener: (event: { host: string }) => void) {

src/domain/api/direct-gateway.model.ts

Lines changed: 0 additions & 7 deletions
This file was deleted.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { Command } from '../../..'
2+
import { Query } from '../../model/messages.model'
3+
import { BrokerSender, Message } from '../../model/broker.model'
4+
5+
export class DirectGateway {
6+
constructor(private sender: BrokerSender, private topicName: string) {}
7+
8+
sendCommand<T>(command: Command<T>, targetName: string): Promise<void> {
9+
return this.sender.publish(this.topicName, targetName, new Message(command))
10+
}
11+
12+
requestReply<T, R>(query: Query<T>): Promise<R> {
13+
return Promise.reject(`Not implemented yet`)
14+
}
15+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { Connection } from '../../model/broker.model'
2+
import { MessageListener } from '../../model/message-listener.model'
3+
import { Command } from '../../model/messages.model'
4+
import { HandlerRegistry } from '../handler-registry'
5+
import { ReactiveCommonsConfiguration } from '../reactive-commons'
6+
7+
export class CommandListener implements MessageListener {
8+
constructor(
9+
private config: ReactiveCommonsConfiguration,
10+
private connection: Connection,
11+
private registry: HandlerRegistry
12+
) {}
13+
14+
public setupResources(): Promise<void> {
15+
const queueName = `${this.config.appName}.commands`
16+
const topicName = 'directMessages'
17+
const routingKey = this.config.appName
18+
19+
return Promise.all([
20+
this.declareCommandsQueue(queueName),
21+
this.declareDirectMessagesTopic(topicName),
22+
this.setupBindings(queueName, topicName, routingKey)
23+
]).then()
24+
}
25+
26+
public startListening(): Promise<void> {
27+
return this.connection.receiver.consume<Command<unknown>>(
28+
`${this.config.appName}.commands`,
29+
message => {
30+
const command = message.data
31+
const handler = this.registry.getCommandHandler(command.name)
32+
33+
return handler(command)
34+
}
35+
)
36+
}
37+
38+
private declareCommandsQueue(queueName: string): Promise<void> {
39+
return this.connection.sender.declareQueue({
40+
name: queueName
41+
})
42+
}
43+
44+
private declareDirectMessagesTopic(topicName: string): Promise<void> {
45+
return this.connection.sender.declareTopic({
46+
name: topicName,
47+
type: 'direct'
48+
})
49+
}
50+
51+
private setupBindings(queueName: string, topicName: string, routingKey: string): Promise<void> {
52+
return this.connection.sender.bind({
53+
queue: queueName,
54+
topic: topicName,
55+
routingKey: routingKey
56+
})
57+
}
58+
}

src/domain/api/listeners/event-listener.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { Connection } from '../../model/broker.model'
2+
import { MessageListener } from '../../model/message-listener.model'
3+
import { Event } from '../../model/messages.model'
24
import { HandlerRegistry } from '../handler-registry'
35
import { ReactiveCommonsConfiguration } from '../reactive-commons'
4-
import { Event } from '../../model/messages.model'
56

6-
export class EventListener {
7+
export class EventListener implements MessageListener {
78
constructor(
89
private config: ReactiveCommonsConfiguration,
910
private connection: Connection,

src/domain/api/listeners/query-listener.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { Connection, Headers, Message } from '../../model/broker.model'
2+
import { COMPLETION_ONLY_SIGNAL, CORRELATION_ID, REPLY_ID } from '../../model/headers.model'
3+
import { MessageListener } from '../../model/message-listener.model'
24
import { Query } from '../../model/messages.model'
35
import { HandlerRegistry } from '../handler-registry'
46
import { ReactiveCommonsConfiguration } from '../reactive-commons'
5-
import { REPLY_ID, CORRELATION_ID, COMPLETION_ONLY_SIGNAL } from '../../model/headers.model'
67

7-
export class QueryListener {
8+
export class QueryListener implements MessageListener {
89
constructor(
910
private config: ReactiveCommonsConfiguration,
1011
private connection: Connection,
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { Connection } from '../model/broker.model'
2+
import { DirectGateway } from './emitters/direct-gateway'
3+
import { EventBus } from './emitters/event-bus'
4+
5+
export class ReactiveCommonsContext {
6+
public id: string
7+
public eventBus: EventBus
8+
public directGateway: DirectGateway
9+
10+
constructor(public connection: Connection) {
11+
this.id = connection.id
12+
this.eventBus = new EventBus(connection.sender, 'domainEvents')
13+
this.directGateway = new DirectGateway(connection.sender, 'directMessages')
14+
}
15+
}

src/domain/api/reactive-commons.ts

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import { Broker } from '../model/broker.model'
2+
import { MessageListener } from '../model/message-listener.model'
13
import { HandlerRegistry } from './handler-registry'
2-
import { Broker, Connection } from '../model/broker.model'
4+
import { CommandListener } from './listeners/command-listener'
35
import { EventListener } from './listeners/event-listener'
46
import { QueryListener } from './listeners/query-listener'
7+
import { ReactiveCommonsContext } from './reactive-commons-context'
58

69
export interface ReactiveCommonsConfiguration {
710
appName: string
@@ -14,17 +17,21 @@ export class ReactiveCommons {
1417
private config: ReactiveCommonsConfiguration
1518
) {}
1619

17-
start(): Promise<Connection> {
20+
start(): Promise<ReactiveCommonsContext> {
1821
const connection = this.broker.connect()
1922

20-
const eventListener = new EventListener(this.config, connection, this.registry)
21-
const queryListener = new QueryListener(this.config, connection, this.registry)
23+
const messageListeners: MessageListener[] = [
24+
new EventListener(this.config, connection, this.registry),
25+
new QueryListener(this.config, connection, this.registry),
26+
new CommandListener(this.config, connection, this.registry)
27+
]
2228

23-
return Promise.all([
24-
eventListener.setupResources(),
25-
queryListener.setupResources(),
26-
eventListener.startListening(),
27-
queryListener.startListening()
28-
]).then(() => connection)
29+
const context = new ReactiveCommonsContext(connection)
30+
31+
return Promise.all(
32+
messageListeners.map(listener =>
33+
Promise.all([listener.setupResources(), listener.startListening()])
34+
)
35+
).then(() => context)
2936
}
3037
}

src/domain/model/broker.model.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ export interface Connection {
55
id: string
66
sender: BrokerSender
77
receiver: BrokerReceiver
8-
eventBus: EventBus
98

109
onConnect(listener: (event: { host: string }) => void): void
1110
onDisconnect(listener: (err: Error) => void): void

0 commit comments

Comments
 (0)