RabbitMQ
목차
- RabbitMQ 개념
- RabbitMQ 사용 이유
- RabbitMQ 적용 예제
- Kafka vs RabbitMQ 차이
개요
메시징 큐 시스템을 kafka는 사용해보았지만,
이직 한 회사에서 RabbitMQ를 이용해 비동기 처리를 하고 있는 것을 보고, RabbitMQ에 대해서 공부해보려고 한다.
상세
1. RabbitMQ 개념과 원리
RabbitMQ는 오픈 소스 메시지 브로커 소프트웨어로서,
AMQP(Advanced Message Queueing Protocol) 프로토콜을 구현한 메시지 브로커입니다.
- AMQP란?
- Advanced Message Queueing Protocol의 약자로 메세지 큐 오픈소스에 기반한 표준 프로토콜입니다. Producer가 Middleware Broker에 메세지를 발행하고 Consumer가 Broker의 Queue를 통해 메세지를 구독합니다.
AMQP는 Producer, Consumer, Broker, Exchange, Binding, Queue로 구성되어 있습니다.
- Producer : Producer는 Exchange에 메세지를 발행합니다.(Client)
- Consumer : Consumer는 Broker 안 Queue를 통해 메세지를 구독합니다.
- Broker : Exchange에서 받은 메세지를 Binding규칙에 의해 Queue로 메세지 전달합니다.
- Binding : Exchange와 Queue 간 라우팅 규칙과 관계를 정의합니다.
- Queue : 메세지를 저장합니다.
- Exchange : Exchange는 Producer로부터 수신한 메세지를 Bingding 규칙에 따라 적절한 Queue로 메세지를 라우팅합니다.
- Direct Exchange : 메세지는 Binding Key가 Routing Key와 정확히 일치하는 Queue로 전달
- Fanout Exchange : 모든 Queue에게 메세지 전달
- Topic Exchange : 정해진 Binding 패턴이 일치하는 Queue로 전달
- Headers Exchange : 헤더의 key-value로 정의된 일치조건에 따라 Queue로 전달
2. RabbitMq 사용 이유
- 비동기 처리 가능
- 빠른 개발 가능
- 사용하기 쉽다
3. RabbitMQ 적용 예제
일단, RabbitMQ를 설치하고 RabbitMQ 서버에 접속합니다.
그 이후 RabbitMQ 서버에서 exchange, queue, binding을 설정합니다.
- Durability : 브로커가 재시작 될 때 남아 있는지 여부(durable : 재시작 시 유지 , transient : 재시작 시 삭제)
- Auto delete : 마지막 Queue 연결이 해제되면 삭제
RabbitMQ exchange 세팅
RabbitMQ queue 세팅
RabbitMQ binding 세팅
rabbitmq를 사용하기 위해서 아래의 dependency 추가
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
#Consumer 예제
application.yml에 파일에는 rabbitmq host, port, username, password를 작성한다.
server:
port: 9999
spring:
rabbitmq:
host: localhost
port: 5672
username: test
password: test
Consumer 부분에는 __@RabbitListener__를 이용해 Queue명을 설정하면 간단하게 메세지를 수신할수 있다.
package com.server.people92.rabbitmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RabbitmqListener {
private static final String RABBITMQ_TEST_QUEUE = "rabbitmq.test.queue";
@RabbitListener(queues = RABBITMQ_TEST_QUEUE)
public void receiveRabbitmqMessage(String message){
log.info("==== rabbitmq result => {}", message);
}
}
#Producer 예제
application.yml에는 Consumer와 일치하지만 서버를 동시에 올리기 위해 서버 포트만 consumer와 다르게 설정한다.
server:
port: 9710
spring:
rabbitmq:
host: localhost
port: 5672
username: test
password: test
RabbitMQ 서버에서 생성한 exchange, queue, biding 명을 빈으로 등록한다.
package com.server.people92.rabbitmq.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
public class RabbitmqConfig {
private static final String TEST_EXCHANGE_NAME = "rabbitmq.test.direct";
private static final String TEST_QUEUE_NAME = "rabbitmq.test.queue";
private static final String TEST_BIDING_KEY = "rabbitmq.test.key";
@Bean
TopicExchange exchange() {
return new TopicExchange(TEST_EXCHANGE_NAME);
}
@Bean
Queue queue() {
return new Queue(TEST_QUEUE_NAME);
}
@Bean
Binding binding (Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(TEST_BIDING_KEY);
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
테스트를 위해 Controller를 호출하여 RabbitMQ로 메세지를 발행하도록 구현하였다.
RabbitTemplate을 이용해 메세지를 RabbitMQ로 발행한다.
package com.server.people92.rabbitmq.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RabbitmqProducer {
private static final String TEST_EXCHANGE_NAME = "rabbitmq.test.direct";
private static final String TEST_BIDING_KEY = "rabbitmq.test.key";
private RabbitTemplate rabbitTemplate;
public RabbitmqProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@GetMapping("/producer/{message}")
public String RabbitmqProducer(@PathVariable String message){
rabbitTemplate.convertAndSend(TEST_EXCHANGE_NAME,TEST_BIDING_KEY, message);
return "SUCCESS";
}
}
Consumer에선 boot 실행시 RabbitMQ에 연결된 로그가 나오지만, producer에선 나오지 않았다.
처음엔 연결이 잘못된건가 해서 찾아보다가, 실제 RabbitMQ에 메세지를 발행하니 연결된 로그가 발생하였다.
Producer는 실제 RabbitMQ에 부트 실행시 연결되는게 아니라 첫 메세지 발행 후 연결되는 것 같다.
producer spring boot 실행 로그
2021-05-02 15:44:01.327 INFO 70592 --- [ main] c.s.p.r.producer.ProducerApplication : Starting ProducerApplication using Java 1.8.0_281 on MinJungui-MacBookAir.local with PID 70592 (/Users/minjungyun/Desktop/Coding/producer/target/classes started by minjungyun in /Users/minjungyun/Desktop/Coding/producer)
2021-05-02 15:44:01.329 INFO 70592 --- [ main] c.s.p.r.producer.ProducerApplication : No active profile set, falling back to default profiles: default
2021-05-02 15:44:02.370 INFO 70592 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9710 (http)
2021-05-02 15:44:02.377 INFO 70592 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-05-02 15:44:02.378 INFO 70592 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.45]
2021-05-02 15:44:02.444 INFO 70592 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2021-05-02 15:44:02.445 INFO 70592 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1075 ms
2021-05-02 15:44:03.178 INFO 70592 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-05-02 15:44:03.410 INFO 70592 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9710 (http) with context path ''
2021-05-02 15:44:03.430 INFO 70592 --- [ main] c.s.p.r.producer.ProducerApplication : Started ProducerApplication in 2.498 seconds (JVM running for 3.116)
consumer spring boot 실행 로그
2021-05-02 15:46:45.687 INFO 71348 --- [ main] c.s.p.r.c.RabbitmqConsumerApplication : Starting RabbitmqConsumerApplication using Java 1.8.0_281 on MinJungui-MacBookAir.local with PID 71348 (/Users/minjungyun/Desktop/Coding/consumer/target/classes started by minjungyun in /Users/minjungyun/Desktop/Coding/consumer)
2021-05-02 15:46:45.689 INFO 71348 --- [ main] c.s.p.r.c.RabbitmqConsumerApplication : No active profile set, falling back to default profiles: default
2021-05-02 15:46:46.622 INFO 71348 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9999 (http)
2021-05-02 15:46:46.627 INFO 71348 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-05-02 15:46:46.627 INFO 71348 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.45]
2021-05-02 15:46:46.676 INFO 71348 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2021-05-02 15:46:46.676 INFO 71348 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 944 ms
2021-05-02 15:46:46.943 INFO 71348 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-05-02 15:46:47.440 INFO 71348 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9999 (http) with context path ''
2021-05-02 15:46:47.442 INFO 71348 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2021-05-02 15:46:47.474 INFO 71348 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#4992613f:0/SimpleConnection@45e22def [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56055]
2021-05-02 15:46:47.586 INFO 71348 --- [ main] c.s.p.r.c.RabbitmqConsumerApplication : Started RabbitmqConsumerApplication in 2.336 seconds (JVM running for 2.869)
producer 메세지 발행 후 로그
2021-05-02 15:48:39.696 INFO 70592 --- [nio-9710-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-05-02 15:48:39.698 INFO 70592 --- [nio-9710-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-05-02 15:48:39.723 INFO 70592 --- [nio-9710-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 25 ms
2021-05-02 15:48:39.800 INFO 70592 --- [nio-9710-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2021-05-02 15:48:39.839 INFO 70592 --- [nio-9710-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2b97cc1f:0/SimpleConnection@40a851c8 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56081]
참고
http://alexvolov.com/2016/06/amqp/
https://oingdaddy.tistory.com/166
https://spring.io/guides/gs/messaging-rabbitmq/