Skip to content

Commit 59627fa

Browse files
release: v0.2.1 (#10)
* fix: Use an exchange instead of a queue to handle real time updates (#9) * refactor(domain): Update method in the real time queue interface * chore: Update logging messages when connecting to PG database * fix: Use an exchange instead of a queue to send submission status updates
1 parent ed5bd58 commit 59627fa

File tree

8 files changed

+128
-133
lines changed

8 files changed

+128
-133
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
## [0.2.1](https://github.com/upb-code-labs/submissions-status-updater-microservice/compare/v0.2.0...v0.2.1) (2024-01-06)
2+
3+
4+
### Bug Fixes
5+
6+
* Use an exchange instead of a queue to handle real time updates ([#9](https://github.com/upb-code-labs/submissions-status-updater-microservice/issues/9)) ([3334eb4](https://github.com/upb-code-labs/submissions-status-updater-microservice/commit/3334eb4d88d04dfa84315a69ac6681f147000399))
7+
8+
9+
110
# [0.2.0](https://github.com/upb-code-labs/submissions-status-updater-microservice/compare/v0.1.0...v0.2.0) (2024-01-06)
211

312

src/application/submissions-status-updater-use-cases.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func (useCases *SubmissionsStatusUpdaterUseCases) UpdateSubmissionStatus(dto *dt
2121
}
2222

2323
// Send submission status update to the submissions real time updates queue
24-
err = useCases.SubmissionsRealTimeUpdatesQueue.EnqueueUpdate(dto)
24+
err = useCases.SubmissionsRealTimeUpdatesQueue.SendUpdate(dto)
2525
if err != nil {
2626
log.Println("[SubmissionsStatusUpdaterUseCases] Error enqueuing submission status update: " + err.Error())
2727
return err

src/config/connections/postgres-connection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ func GetPostgresConnection() *sql.DB {
2121

2222
// Check connection
2323
if err = db.Ping(); err != nil {
24-
log.Fatal(err.Error())
24+
log.Fatal("[Postgres]: Error connecting to the database: " + err.Error())
2525
}
2626

2727
// Set connection
28-
log.Println("Connected to Postgres")
28+
log.Println("[Postgres]: Connected")
2929
pgConnection = db
3030
}
3131

src/domain/definitions/submissions-real-time-updates-queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ import "github.com/upb-code-labs/submissions-status-updater-microservice/src/dom
44

55
type SubmissionsRealTimeUpdatesQueue interface {
66
// Publishes a message to the queue
7-
EnqueueUpdate(updateDTO *dtos.SubmissionStatusUpdateDTO) error
7+
SendUpdate(updateDTO *dtos.SubmissionStatusUpdateDTO) error
88
}

src/infrastructure/implementations/rubmissions-real-time-updates-queue-implementation.go

Lines changed: 0 additions & 127 deletions
This file was deleted.
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package implementations
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log"
7+
"time"
8+
9+
amqp "github.com/rabbitmq/amqp091-go"
10+
"github.com/upb-code-labs/submissions-status-updater-microservice/src/config/connections"
11+
"github.com/upb-code-labs/submissions-status-updater-microservice/src/domain/dtos"
12+
)
13+
14+
var exchangeName = "submissions-real-time-updates"
15+
16+
type SubmissionsRealTimeUpdatesRabbitmqMgr struct{}
17+
18+
// ## Singleton instance ##
19+
20+
// submissionsRealTimeUpdatesRabbitmqMgr Singleton instance
21+
var submissionsRealTimeUpdatesRabbitmqMgr *SubmissionsRealTimeUpdatesRabbitmqMgr
22+
23+
// GetSubmissionsRealTimeUpdatesQueueMgrInstance returns the singleton instance of the submissionsRealTimeUpdatesQueueMgr
24+
func GetSubmissionsRealTimeUpdatesQueueMgrInstance() *SubmissionsRealTimeUpdatesRabbitmqMgr {
25+
if submissionsRealTimeUpdatesRabbitmqMgr == nil {
26+
// Declare exchange
27+
declareRealTimeUpdatesExchange()
28+
29+
// Declare singleton instance
30+
submissionsRealTimeUpdatesRabbitmqMgr = &SubmissionsRealTimeUpdatesRabbitmqMgr{}
31+
}
32+
33+
return submissionsRealTimeUpdatesRabbitmqMgr
34+
}
35+
36+
func declareRealTimeUpdatesExchange() {
37+
ch := connections.GetRabbitMQChannel()
38+
39+
exchangeName := exchangeName
40+
exchangeType := "fanout"
41+
exchangeDurable := true
42+
exchangeAutoDelete := false
43+
exchangeInternal := false
44+
exchangeNoWait := false
45+
exchangeArgs := amqp.Table{}
46+
47+
err := ch.ExchangeDeclare(
48+
exchangeName,
49+
exchangeType,
50+
exchangeDurable,
51+
exchangeAutoDelete,
52+
exchangeInternal,
53+
exchangeNoWait,
54+
exchangeArgs,
55+
)
56+
57+
if err != nil {
58+
log.Fatal(
59+
"[RabbitMQ]: Error declaring real time updates exchange",
60+
err.Error(),
61+
)
62+
}
63+
64+
log.Println("[RabbitMQ]: Real time updates exchange declared")
65+
}
66+
67+
// ## Methods implementation ##
68+
69+
// SendUpdate enqueues a submission status update
70+
func (queueMgr *SubmissionsRealTimeUpdatesRabbitmqMgr) SendUpdate(updateDTO *dtos.SubmissionStatusUpdateDTO) error {
71+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
72+
defer cancel()
73+
74+
ch := connections.GetRabbitMQChannel()
75+
76+
// Parse update to JSON
77+
json, err := json.Marshal(updateDTO)
78+
if err != nil {
79+
log.Println(
80+
"[RabbitMQ Submissions Real Time Updates]: Error parsing submission status update to JSON",
81+
err.Error(),
82+
)
83+
return err
84+
}
85+
86+
// Publish
87+
msgExchange := exchangeName
88+
msgRoutingKey := ""
89+
msgMandatory := false
90+
msgImmediate := false
91+
92+
err = ch.PublishWithContext(
93+
ctx,
94+
msgExchange,
95+
msgRoutingKey,
96+
msgMandatory,
97+
msgImmediate,
98+
amqp.Publishing{
99+
ContentType: "application/json",
100+
Body: json,
101+
},
102+
)
103+
104+
if err != nil {
105+
log.Println(
106+
"[RabbitMQ Submissions Real Time Updates]: Error publishing submission status update",
107+
err.Error(),
108+
)
109+
return err
110+
}
111+
112+
return nil
113+
}

src/infrastructure/submission-status-updates-queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (queueMgr *SubmissionStatusUpdatesQueueMgr) ListenForSubmissionStatusUpdate
124124
// processSubmissionStatusUpdates creates an infinite loop that processes submission status updates
125125
func (queueMgr *SubmissionStatusUpdatesQueueMgr) processSubmissionStatusUpdates() {
126126
log.Println(
127-
"[RabbitMQ Submissions Status Updates Queue]: Listening for submission status updates...",
127+
"[RabbitMQ Submissions Status Updates Queue]: Listening for updates...",
128128
)
129129

130130
// Process each message in a separate goroutine

version.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
{
2-
"version": "0.2.0"
2+
"version": "0.2.1"
33
}

0 commit comments

Comments
 (0)