From 3ca830b5e7cbc25c437e5d81506f985287a8f7d5 Mon Sep 17 00:00:00 2001 From: PC Date: Sat, 17 Sep 2022 09:54:26 +0200 Subject: [PATCH 1/4] #3d899gf - connecting publisher with some issues connecting consumer --- Gemfile | 2 ++ Gemfile.lock | 19 ++++++++++++++++ Rakefile | 1 + .../api/v1/applications_controller.rb | 1 + app/services/rabbit_publisher.rb | 22 +++++++++++++++++++ config/initializers/sidekiq.rb | 1 - config/initializers/sneakers.rb | 17 ++++++++++++++ docker-compose.yml | 16 ++++++++++++-- lib/tasks/rabbitmq.rake | 19 ++++++++++++++++ 9 files changed, 95 insertions(+), 3 deletions(-) create mode 100644 app/services/rabbit_publisher.rb create mode 100644 config/initializers/sneakers.rb create mode 100644 lib/tasks/rabbitmq.rake diff --git a/Gemfile b/Gemfile index 19d9a01..a10c94c 100644 --- a/Gemfile +++ b/Gemfile @@ -48,6 +48,8 @@ gem 'rswag-ui' gem 'sidekiq' gem 'sidekiq-cron' +gem 'sneakers' + group :development, :test do # See https://guides.rubyonrails.org/debugging_rails_applications.html#debugging-with-the-debug-gem gem "debug", platforms: %i[ mri mingw x64_mingw ] diff --git a/Gemfile.lock b/Gemfile.lock index a7dd100..a324678 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -68,9 +68,13 @@ GEM tzinfo (~> 2.0) addressable (2.8.1) public_suffix (>= 2.0.2, < 6.0) + amq-protocol (2.3.2) bootsnap (1.13.0) msgpack (~> 1.2) builder (3.2.4) + bunny (2.19.0) + amq-protocol (~> 2.3, >= 2.3.1) + sorted_set (~> 1, >= 1.0.2) concurrent-ruby (1.1.10) connection_pool (2.2.5) crass (1.0.6) @@ -215,6 +219,7 @@ GEM thor (~> 1.0) zeitwerk (~> 2.5) rake (13.0.6) + rbtree (0.4.5) redis (4.8.0) reline (0.3.1) io-console (~> 0.5) @@ -245,6 +250,9 @@ GEM actionpack (>= 3.1, < 7.1) railties (>= 3.1, < 7.1) ruby2_keywords (0.0.5) + serverengine (2.0.7) + sigdump (~> 0.2.2) + set (1.0.3) shoulda-matchers (5.1.0) activesupport (>= 5.2.0) sidekiq (6.5.6) @@ -254,6 +262,16 @@ GEM sidekiq-cron (1.7.0) fugit (~> 1) sidekiq (>= 4.2.1) + sigdump (0.2.4) + sneakers (2.11.0) + bunny (~> 2.12) + concurrent-ruby (~> 1.0) + rake + serverengine (~> 2.0.5) + thor + sorted_set (1.0.3) + rbtree + set (~> 1.0) strscan (3.0.4) thor (1.2.1) timeout (0.3.0) @@ -286,6 +304,7 @@ DEPENDENCIES shoulda-matchers sidekiq sidekiq-cron + sneakers tzinfo-data RUBY VERSION diff --git a/Rakefile b/Rakefile index 9a5ea73..744e5cd 100644 --- a/Rakefile +++ b/Rakefile @@ -2,5 +2,6 @@ # for example lib/tasks/capistrano.rake, and they will automatically be available to Rake. require_relative "config/application" +require 'sneakers/tasks' Rails.application.load_tasks diff --git a/app/controllers/api/v1/applications_controller.rb b/app/controllers/api/v1/applications_controller.rb index 5b188fc..af1a860 100644 --- a/app/controllers/api/v1/applications_controller.rb +++ b/app/controllers/api/v1/applications_controller.rb @@ -5,6 +5,7 @@ class Api::V1::ApplicationsController < ApplicationController # reads all the applications def index @applications = Application.all + RabbitPublisher.publish('', 'Hello World !!!!') json_response(@applications, :ok, except = [:id, :token]) end diff --git a/app/services/rabbit_publisher.rb b/app/services/rabbit_publisher.rb new file mode 100644 index 0000000..5a6613d --- /dev/null +++ b/app/services/rabbit_publisher.rb @@ -0,0 +1,22 @@ +class RabbitPublisher + + def self.connection + @connection ||= Bunny.new ({ host: 'rabbitmq' }) + @connection.start + end + + def self.channel + @channel ||= ConnectionPool.new do + connection.create_channel + end + end + + def self.publish(queue_name, message_obj) + channel.with do |channel| + exchange = channel.fanout('test_exchange') + exchange.publish(message_obj) + # queue_chats = channel.queue('test_queue', durable: true) + # channel.default_exchange.publish(message_obj, routing_key: queue_chats.name) + end + end +end diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index fada4b3..15597c2 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -1,6 +1,5 @@ Sidekiq.configure_server do |config| - print("----------------------- CONFIGURING SERVER") config.redis = { url: "redis://redis:6379/0" } config.on(:startup) do diff --git a/config/initializers/sneakers.rb b/config/initializers/sneakers.rb new file mode 100644 index 0000000..bc15ff8 --- /dev/null +++ b/config/initializers/sneakers.rb @@ -0,0 +1,17 @@ +require 'sneakers' + +Sneakers.configure log: STDOUT, amqp: 'amqp://guest:guest@rabbitmq', deamonize: true +Sneakers.logger.level = Logger::INFO + +# require 'bunny' +# +# connection = Bunny.new ({ host: 'rabbitmq' , user: "guest", password: "guest" }) +# connection.start +# +# channel = connection.create_channel +# exchange = channel.fanout('test_exchange') +# +# queue_chats = channel.queue('test_queue', durable: true) +# queue_chats.bind(exchange.name) +# +# connection.close diff --git a/docker-compose.yml b/docker-compose.yml index d7816c5..e4d797f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,5 @@ version: "3.9" + services: mysql: image: 'mysql:5.7' @@ -55,11 +56,22 @@ services: - xpack.security.enabled=false rabbitmq: - image: rabbitmq:3-management + image: rabbitmq:3.9.22-management volumes: - rabbitmq:/var/lib/rabbitmq ports: - - "15672:15672" + - "5672:15672" + + sneakers: + build: . + command: bundle exec rake sneakers:run + depends_on: + - app + - rabbitmq + volumes: + - .:/app + environment: + WORKERS: RabbitConsumer volumes: db_data: diff --git a/lib/tasks/rabbitmq.rake b/lib/tasks/rabbitmq.rake new file mode 100644 index 0000000..21a27f3 --- /dev/null +++ b/lib/tasks/rabbitmq.rake @@ -0,0 +1,19 @@ + +namespace :rabbitmq do + desc "Connecting RabbitConsumer to RabbitPublisher" + task :setup_bindings do + print('-------------------------- HELLO SETUP') + require 'bunny' + + connection = Bunny.new ({ host: 'rabbitmq', user: 'guest', password: 'guest' }) + connection.start + + channel = connection.create_channel + exchange = channel.fanout('test_exchange') + + queue_chats = channel.queue('test_queue', durable: true) + queue_chats.bind(exchange.name) + + connection.close + end +end From 16b03f0f7e61798ba8ecdeb6edd9bd6f59976e55 Mon Sep 17 00:00:00 2001 From: PC Date: Sat, 17 Sep 2022 10:10:33 +0200 Subject: [PATCH 2/4] #3d899gf - creating 2 separte queues at the beginning --- app/services/rabbit_publisher.rb | 2 +- app/workers/rabbit_consumer.rb | 11 +++++++++++ config/initializers/sneakers.rb | 28 ++++++++++++++++------------ 3 files changed, 28 insertions(+), 13 deletions(-) create mode 100644 app/workers/rabbit_consumer.rb diff --git a/app/services/rabbit_publisher.rb b/app/services/rabbit_publisher.rb index 5a6613d..9f7ced9 100644 --- a/app/services/rabbit_publisher.rb +++ b/app/services/rabbit_publisher.rb @@ -13,7 +13,7 @@ def self.channel def self.publish(queue_name, message_obj) channel.with do |channel| - exchange = channel.fanout('test_exchange') + exchange = channel.direct('chats_exchange') exchange.publish(message_obj) # queue_chats = channel.queue('test_queue', durable: true) # channel.default_exchange.publish(message_obj, routing_key: queue_chats.name) diff --git a/app/workers/rabbit_consumer.rb b/app/workers/rabbit_consumer.rb new file mode 100644 index 0000000..edba42d --- /dev/null +++ b/app/workers/rabbit_consumer.rb @@ -0,0 +1,11 @@ + +class RabbitConsumer + include Sneakers::Worker + + from_queue "test_queue", env: nil + + def work(message) + logger.info("--------- RabbitConsumer: " + message) + ack! + end +end diff --git a/config/initializers/sneakers.rb b/config/initializers/sneakers.rb index bc15ff8..eed3914 100644 --- a/config/initializers/sneakers.rb +++ b/config/initializers/sneakers.rb @@ -3,15 +3,19 @@ Sneakers.configure log: STDOUT, amqp: 'amqp://guest:guest@rabbitmq', deamonize: true Sneakers.logger.level = Logger::INFO -# require 'bunny' -# -# connection = Bunny.new ({ host: 'rabbitmq' , user: "guest", password: "guest" }) -# connection.start -# -# channel = connection.create_channel -# exchange = channel.fanout('test_exchange') -# -# queue_chats = channel.queue('test_queue', durable: true) -# queue_chats.bind(exchange.name) -# -# connection.close +require 'bunny' + +connection = Bunny.new ({ host: 'rabbitmq' , user: "guest", password: "guest" }) +connection.start + +channel = connection.create_channel + +chatsExchange = channel.direct('chats_exchange') +chatsQueue = channel.queue('chats_queue', durable: true) +chatsQueue.bind(chatsExchange.name) + +messagesExchange = channel.direct('messages_exchange') +messagesQueue = channel.queue('messages_queue', durable: true) +messagesQueue.bind(messagesExchange.name) + +connection.close From 6c3b95d8fce3802563dc46ef7cd4016f175a39cf Mon Sep 17 00:00:00 2001 From: PC Date: Sat, 17 Sep 2022 16:38:17 +0200 Subject: [PATCH 3/4] #3d899gf - implement write from RabbitMQ messages --- Gemfile | 1 + Gemfile.lock | 2 ++ .../api/v1/chat_messages_controller.rb | 8 +++++--- app/controllers/api/v1/chats_controller.rb | 8 +++++--- app/services/rabbit_publisher.rb | 14 +++++++++++--- app/workers/chats_rabbit_consumer.rb | 19 +++++++++++++++++++ app/workers/messages_rabbit_consumer.rb | 19 +++++++++++++++++++ docker-compose.yml | 2 +- 8 files changed, 63 insertions(+), 10 deletions(-) create mode 100644 app/workers/chats_rabbit_consumer.rb create mode 100644 app/workers/messages_rabbit_consumer.rb diff --git a/Gemfile b/Gemfile index a10c94c..73d1d8b 100644 --- a/Gemfile +++ b/Gemfile @@ -49,6 +49,7 @@ gem 'sidekiq' gem 'sidekiq-cron' gem 'sneakers' +gem 'json' group :development, :test do # See https://guides.rubyonrails.org/debugging_rails_applications.html#debugging-with-the-debug-gem diff --git a/Gemfile.lock b/Gemfile.lock index a324678..291d4a5 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -152,6 +152,7 @@ GEM io-console (0.5.11) irb (1.4.1) reline (>= 0.3.0) + json (2.5.1) json-schema (2.8.1) addressable (>= 2.4) loofah (2.18.0) @@ -294,6 +295,7 @@ DEPENDENCIES elasticsearch-rails factory_bot_rails faker + json mysql2 puma (~> 5.0) rails (~> 7.0.3, >= 7.0.3.1) diff --git a/app/controllers/api/v1/chat_messages_controller.rb b/app/controllers/api/v1/chat_messages_controller.rb index b452a18..1da1d29 100644 --- a/app/controllers/api/v1/chat_messages_controller.rb +++ b/app/controllers/api/v1/chat_messages_controller.rb @@ -22,7 +22,9 @@ def show # POST /api/v1/applications/:application_token/chats/:chat_number/messages def create - @chatMessage = @chat.messages.create!(creation_params) + @chatMessage = ChatMessage.new(creation_params) + RabbitPublisher.publish('message', @chatMessage) + json_response( { :message => "ChatMessage has be created successfully", @@ -33,8 +35,8 @@ def create # PUT - PATCH /api/v1/applications/:application_token/chats/:chat_number/messages/:number def update - @chatMessage.update!(chat_message_whitelist_params) - + @chatMessage.assign_attributes(chat_message_whitelist_params) + RabbitPublisher.publish('message', @chatMessage) json_response( { :message => "ChatMessage has be updated successfully", diff --git a/app/controllers/api/v1/chats_controller.rb b/app/controllers/api/v1/chats_controller.rb index c1d6549..a660368 100644 --- a/app/controllers/api/v1/chats_controller.rb +++ b/app/controllers/api/v1/chats_controller.rb @@ -16,7 +16,9 @@ def show # POST /api/v1/applications/:application_token/chats def create - @chat = @application.chats.create!(creation_params) + @chat = Chat.new(creation_params) + RabbitPublisher.publish('chat', @chat) + json_response( { :message => "Chat has be created successfully", @@ -27,8 +29,8 @@ def create # PUT - PATCH /api/v1/applications/:application_token/chats/:number def update - @chat.update!(chat_whitelist_params) - + @chat.assign_attributes(chat_whitelist_params) + RabbitPublisher.publish('chat', @chat) json_response( { :message => "Chat has be updated successfully", diff --git a/app/services/rabbit_publisher.rb b/app/services/rabbit_publisher.rb index 9f7ced9..5f81fc2 100644 --- a/app/services/rabbit_publisher.rb +++ b/app/services/rabbit_publisher.rb @@ -11,12 +11,20 @@ def self.channel end end - def self.publish(queue_name, message_obj) + def self.publish(type, message_obj) channel.with do |channel| - exchange = channel.direct('chats_exchange') - exchange.publish(message_obj) + exchange_name = get_exchange_name(type) + exchange = channel.direct(exchange_name) + exchange.publish(message_obj.to_json) # queue_chats = channel.queue('test_queue', durable: true) # channel.default_exchange.publish(message_obj, routing_key: queue_chats.name) end end + + private + + # TODO: this is SOLID, better use factory, with DI it would be better + def self.get_exchange_name(type) + return (type == 'chat') ? 'chats_exchange' : 'messages_exchange' + end end diff --git a/app/workers/chats_rabbit_consumer.rb b/app/workers/chats_rabbit_consumer.rb new file mode 100644 index 0000000..2220922 --- /dev/null +++ b/app/workers/chats_rabbit_consumer.rb @@ -0,0 +1,19 @@ +require 'sneakers' +require 'json' + +class ChatsRabbitConsumer + include Sneakers::Worker + + from_queue "chats_queue", env: nil, + exchange: 'chats_exchange', exchange_type: :direct + + def work(queue_message) + logger.info("--------- ChatsRabbitConsumer: " + queue_message) + chat_params = JSON.parse(queue_message) + + chat = Chat.new(chat_params) + chat.save! + ack! + logger.info("--------- ChatsRabbitConsumer: Saved successfully") + end +end diff --git a/app/workers/messages_rabbit_consumer.rb b/app/workers/messages_rabbit_consumer.rb new file mode 100644 index 0000000..5fb7a4d --- /dev/null +++ b/app/workers/messages_rabbit_consumer.rb @@ -0,0 +1,19 @@ +require 'sneakers' +require 'json' + +class MessagesRabbitConsumer + include Sneakers::Worker + + from_queue "messages_queue", env: nil, + exchange: 'messages_exchange', exchange_type: :direct + + def work(queue_message) + logger.info("--------- MessagesRabbitConsumer: " + queue_message) + message_params = JSON.parse(queue_message) + + chat_message = ChatMessage.new(message_params) + chat_message.save! + ack! + logger.info("--------- MessagesRabbitConsumer: Saved successfully") + end +end diff --git a/docker-compose.yml b/docker-compose.yml index e4d797f..6854e47 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -71,7 +71,7 @@ services: volumes: - .:/app environment: - WORKERS: RabbitConsumer + - WORKERS=ChatsRabbitConsumer,MessagesRabbitConsumer volumes: db_data: From 4d74ca3b08e5f6b79bc31c35911a159b411e697c Mon Sep 17 00:00:00 2001 From: PC Date: Sat, 17 Sep 2022 18:17:49 +0200 Subject: [PATCH 4/4] #3d899gf - fixing TCP connection bug --- docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yml b/docker-compose.yml index 6854e47..223665c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,6 +72,7 @@ services: - .:/app environment: - WORKERS=ChatsRabbitConsumer,MessagesRabbitConsumer + - RABBITMQ_HOST=rabbitmq volumes: db_data: