RabbitMQ
Message Broker는 실시간 채팅 서비스를 구현하는 데 핵심적인 부분이다.
Message Broker를 적용하는 과정에 대해서 정리했다.
WebSocket과 Stomp의 차이점에 대해서 살펴보면
WebSocket은 클라이언트와 서버 간 양방향 실시간 통신을 가능하게 하는 프로토콜이다.
별도의 메시지 브로커 없이도 클라이언트와 서버 간의 통신을 구현할 수 있다.
Stomp는 메시지 브로커와 클라이언트 간 상호작용을 위한 프로토콜이다.
메시지 큐나 브로커와의 통신을 추상화하여 클라이언트가 메시지를 보내고 받을 때의 프로토콜을 통일한다. (일관된 방식)
따라서 보안 토큰과 같은 정보를 헤더와 바디를 가진 메시지로 전달할 수 있다.
나는 Stomp 프로토콜을 선택하여 보안을 강화하고 메시지의 타입에 따라 컨트롤러를 분리하기로 했다.
따라서 나는 header의 token을 담아 보안을 높이고 메세지 타입에 따른 controller를 분리 시키기 위해 Stomp을 사용했었다.
In Memory Broker의 문제점
Stomp 프로토콜을 사용할 때는 기본적으로 In-Memory Message Broker를 사용하게된다.
In Memory Broker를 사용하는 경우 몇 가지 문제점이 있다.
용량 제한:
세션을 수용할 수 있는 크기가 제한되어 있어서,
사용자 수가 증가할수록 서버 부하와 처리량이 감소한다.
장애 발생 시 메시지 유실 가능성:
장애가 발생하면 메시지가 메모리에만 저장되어 있기 때문에 메시지 유실 가능성이 높아진다.
모니터링 어려움:
메모리 기반 브로커의 경우 모니터링 도구나 기능이 제한되어 있어 시스템 상태를 파악하기 어렵다.
이러한 문제점을 해결하기 위해 외부 브로커인 RabbitMQ, Kafka, Active MQ 등을 사용할 수 있다.
Redis의 적용과 문제점
문제를 해결하기 위해 Redis를 사용하여 메시지를 빠르게 처리할 수 있도록 적용했었다.
그러나 Redis는 STOMP 프로토콜을 지원하지 않았다.
따라서 front에서 사용 중인 STOMP 프로토콜과의 호환성 문제가 발생했다.
현재 채팅은 front에서 STOMP 프로토콜을 사용하고 있기 때문에
Redis broker를 사용해서 채팅을 진행하려면 websocket을 사용해야하고
front를 수정하지 않고 동작시키기 위해서는
STOMP 프로토콜의 Message Broker 기능을 제공하는 RabbitMQ, ActiveMQ 등을 사용해야했다.
RabbitMQ와 ActiveMQ 비교
RabbitMQ와 ActiveMQ는 메시지를 수신한 구독자가 해당 메시지를 명시적으로 확인하기 전까지는 메시지를 보관하고 유지한다.
이러한 방식을 메시지 액키스(Acknowledgement) 또는 컨슈머의 확인(Consumer Acknowledgement)이라고 한다.
따라서 메시지를 받기 전까지 삭제하지 않으며, 이는 메시지 손실을 방지하는 데 도움이 된다.
비교적으로 RabbitMQ는 더 높은 성능과 안정성을 제공하며, 더 많은 정보를 얻을 수 있다.
또한, 단순하고 쉽게 설정하고 관리할 수 있는 특성을 가지고 있다.
위와 같은 이유로, RabbitMQ와 ActiveMQ 중에서는 RabbitMQ를 선택했다.
남은 RabbitMQ와 Kafka를 후보로 두고 내가 정한 기준에 따라 결정했다.
1. 문제가 발생했을 시 재시도 가능
2. Stomp 프로토콜을 지원
RabbitMQ와 Kafka의 특징
RabbitMQ는 AMQP 프로토콜을 지원하여 다양한 클라이언트 언어와 플랫폼에서 사용할 수 있다.
또한 메시지 지속성을 통해 메시지 손실을 방지하고 유연한 메시지 라우팅 기능을 제공한다.
이는 비동기 및 분산 시스템에 적합하다.
반면 Kafka는 로그 기반 아키텍처를 통해 대량의 데이터를 신속하게 처리할 수 있으며,
분산 및 확장성을 갖춘 클러스터 구성으로 대규모 데이터 처리가 가능하다.
또한 파티션 단위로 데이터를 저장하여 순서를 보장한다.
RabbitMQ와 Kafka의 재시도 메커니즘
RabbitMQ는 메시지 처리 실패 시 재시도가 가능한 내장된 메커니즘을 제공한다.
이를 통해 다른 소비자가 해당 메시지를 처리할 수 있으며, 재시도 동안 다른 메시지의 처리가 중단되지 않는다.
반면 Kafka는 내장된 재시도 메커니즘이 없기 때문에 소비자 애플리케이션에서 메시지 재시도 로직을 직접 구현해야 한다.
또한 파티션 내에서 순서가 보장되므로 메시지 처리 중에 다른 메시지 처리가 대기해야 한다.
이러한 특징을 고려하여 RabbitMQ가 메시지의 안정적인 전달을 보장하고 실시간 채팅 서비스에 적합하다고 판단했다.
RabbitMQ flow
단순 내장 메시지 브로커가 활성화될 때 사용되는 구성 요소
외부 브로커(RabbitMQ)를 구성할 때 사용되는 구성 요소
두 다이어그램 사이의 차이점은 메시지를 TCP를 통해 외부 STOMP broker까지 전달하고
broker에서 가입된 client로 메시지를 전달하기 위해 “broker relay”를 사용한다는 것이다.
message는 Producer에서 시작하여 Exchange를 거쳐 Binding된 Queue로 이동하고, 최종적으로 Consumer에게 전달된다.
1
2
3
4
5
6
+----------+ +------------+ +----------+ +------+
| Producer | --> | Exchange | --> | Binding | --> | Queue| --> Consumer
+----------+ +------------+ +----------+ +------+
| Routing |
| Rules |
+--------------+
Producer(프로듀서):
Producer는 메시지를 생성하고 Message Broker(RabbitMQ)에 전송한다.
Exchange(익스체인지):
Exchange는 Producer로부터 받은 메시지를 수신하고 라우팅한다.
Exchange는 설정된 라우팅 규칙(route key)에 따라 메시지를 하나 이상의 Queue로 라우팅한다.
Binding(바인딩):
Binding은 Exchange와 Queue 간의 연결을 말한다.
Binding을 통해 Exchange는 어떤 Queue로 메시지를 전달할지 결정한다.
Queue(큐):
Queue는 Exchange로부터 메시지를 수신하여 저장한다.
Binding된 Queue 중 하나에 메시지가 전달된다.
Consumer(컨슈머):
Consumer는 Queue에 쌓인 메시지를 소비하고 처리한다.
Consumer는 Queue에 새로운 메시지가 도착할 때마다 해당 메시지를 받아 처리한다.
Code 작성
의존성 주입
1
2
implementation 'org.springframework.boot:spring-boot-starter-reactor-netty'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
stompBrokerRelayMessageHandler는 Spring Framework의 WebSocket 모듈에서
STOMP 메시징 프로토콜을 사용하여 메시지 브로커와 통신하기 위한 핸들러다.
이 핸들러를 사용하려면 STOMP 메시지 브로커와의 통신을 지원하는 라이브러리가 필요하다.
Reactor Netty를 백엔드로 사용하기 때문에 관련 설정을 추가해준다.
1
2
3
4
5
6
7
8
9
10
# application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# RabbitMQ queue, exchange, routing-key info
rabbitmq.queue.name = adme.queue
rabbitmq.exchange.name = adme.exchange
rabbitmq.routing.key = room.*
routing key는 사용자 별로 채팅방 id가 다르기 때문에 와일드 카드를 사용했다..
*
: 한 단어만을 대신#
: 한 단어 이상을 대신
ex) room.*
일 때, “room.hello”와 “room.hi”는 인식하지만 “room.hello.hi”는 인식하지 못한다.
이럴 때 room.#
을 사용해야한다.
room.* 패턴은 “room.” 다음에 딱 하나의 단어가, room.# 패턴은 “room.” 다음에 하나 이상의 단어가 나온다고 보면 된다.
stomp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app"); // pub
config.setPathMatcher(new AntPathMatcher(".")); // URL을 / → .으로
config.enableStompBrokerRelay( "/queue", "/topic", "/exchange", "/amq/queue");
// stompBrokerRelayMessageHandler는 외부 메시지 브로커와 통신하기 위한 설정이다.
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//소켓에 연결하기 위한 엔드 포인트를 지정
registry.addEndpoint("/coco").setAllowedOriginPatterns("*").withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// jwt 토큰 검증을 위해 생성한 stompHandler를 인터셉터로 지정해준다.
registration.interceptors(stompHandler);
}
}
RabbitMQ에서는 점(.)을 사용하여 경로를 구분한다.
예를 들어, “/room”과 같은 주제를 구독할 때, RabbitMQ에서는 “/room” 대신에 “.room”으로 사용된다.
따라서 Spring의 configureMessageBroker()
에서 config.setPathMatcher(new AntPathMatcher("."))
를 사용한다.
Config
Exchage 유형은 4가지가 있다. (Direct Exchange, Fanout Exchange, Topic Exchange, Headers Exchange)
고객센터의 채팅을 구현했기 때문에 1:1 통신에 적합한 방식인 direct를 사용하려고 했으나
routing Key와 큐의 binding key가 정확히 일치하는 경우에만 메시지를 전달하기 때문에
와일드 카드(*
, #
)를 사용할 수가 없다.
사용자 별로 채팅방 id를 구분하기 때문에 와일드 카드를 사용해야해서 TopicExchange을 사용했다.
TopicExchange는 pub/sub에 적합하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${rabbitmq.queue.name}")
private String queueName;
@Value("${rabbitmq.exchange.name}")
private String exchange;
@Value("${rabbitmq.routing.key}")
private String routingKey;
/**
* 지정된 큐 이름으로 Queue 빈을 생성
* @return Queue 빈 객체
*/
@Bean
public Queue queue() {
return new Queue(queueName, true);
}
/**
* routing Key와 일치하는 Queue로 메시지를 전송하기 위한 TopicExchange 빈 생성
* @return TopicExchange 빈 객체
*/
@Bean
public TopicExchange exchange() {
return new TopicExchange(exchange);
}
/**
* Exchange와 Queue를 바인딩하기 위한 Binding 빈 생성
* @param queue Queue 빈 객체
* @param exchange TopicExchange 빈 객체
* @return Binding 빈 객체
*/
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
// 라우팅 키를 사용하여 Exchange와 Queue를 바인딩
return BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey);
}
/**
* RabbitMQ와의 연결을 관리하는 클래스
* @return ConnectionFactory 빈 객체
*/
@Bean
public ConnectionFactory connectionFactory() {
// RabbitMQ와의 연결을 설정
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
/**
* RabbitMQ와의 메시지 통신을 담당하는 클래스
* @param connectionFactory ConnectionFactory 빈 객체
* @return RabbitTemplate 빈 객체
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
// RabbitMQ와의 메시지 통신을 설정
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
rabbitTemplate.setExchange(exchange);
rabbitTemplate.setRoutingKey(routingKey);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
// RabbitMQ 메시지 리스너 컨테이너 설정
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
return container;
}
/**
* Jackson library를 사용해서 메시지를 JSON 형식으로 변환하는 BEAN 생성
* @return MessageConverter 빈 객체
*/
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
new Queue(queueName, true);
boolean durable 매개변수는 큐의 지속성(durability)을 나타낸다.
true로 설정하면, 큐가 지속되어 RabbitMQ 서버가 다시 시작되어도 유지된다.
이는 큐에 저장된 메시지가 손실되지 않고 유지되는 것을 의미한다.
false로 설정하면, 큐가 비지속적(non-durable)이며, RabbitMQ 서버가 다시 시작될 때 해당 큐가 삭제된다.
이 경우 큐에 저장된 메시지는 유실될 수 있다.
Producer
Redis의 Publisher-Subscriber 모델은 RabbitMQ의 Producer-Consumer 모델과 유사한 메시지 전달 패턴을 가지고 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
@RequiredArgsConstructor
@Slf4j
public class ChatMessageProducer {
private final RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.queue.name}")
private String queue;
@Value("${rabbitmq.exchange.name}")
private String exchange;
@Value("${rabbitmq.routing.key}")
private String routingKey;
public void sendMessage(ChatMessage message, String roomId) {
log.info("message send : {}", message);
rabbitTemplate.convertAndSend(exchange, roomId, message);
}
}
Consumer
Redis의 경우 MessageListener를 구현하는 방식으로 메시지를 구독했었다.
RabbitMQ에서는 메시지를 소비하고 처리하기 위해 @RabbitListener
을 사용하면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@Slf4j
public class ChatMessageConsumer{
@RabbitListener(queues = "${rabbitmq.queue.name}")
public void onMessage(Message message) { // Queue에서 message를 구독
try {
log.info("Received message: " + new String(message.getBody()));
} catch (Exception e) {
log.error("Error processing message: " + e.getMessage());
}
}
}
Controller
Redis에서는 메시지를 수신하기 위해 Channel을 구독하고, 이를 수신하는 MessageListener를 등록했었다.
하지만 RabbitMQ에서는 메시지를 수신하기 위해 명시적인 구독이 필요하지 않았다.
Producer가 메시지를 보내면 해당 Exchange에 메시지가 전달되고,
이를 구독하는 Consumer는 큐로부터 메시지를 받아 처리한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Controller
@RequiredArgsConstructor
@Slf4j
public class ChatController {
private final ChatServiceImpl chatService;
private final JwtTokenProvider jwtTokenProvider;
private final RedisService redisService;
private final ChatMessageProducer producer;
@MessageMapping("chat.sendMessage")
public void sendMessage(@Payload ChatMessage chatMessage) {
producer.sendMessage(chatMessage, "room." + chatMessage.getRoomId());
}
@MessageMapping("chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
chatMessage.setType(MessageType.JOIN);
redisService.addRedis(chatMessage);
chatService.countUser("Connect", roomId, chatMessage);
producer.sendMessage(chatMessage, "room." + chatMessage.getRoomId());
}
}
Docker
1
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -p 61613:61613 rabbitmq:management
RabbitMQ의 AMQP port : 5672
관리자 웹 인터페이스 port : 15672
RabbitMQ의 STOMP port : 61613
RabbitMQ STOMP plug-in 활성화
1
2
$ rabbitmq-plugins enable rabbitmq_web_stomp
$ rabbitmq-plugins enable rabbitmq_web_stomp_examples
RabbitMQ 관리자 페이지
application을 실행하면 아래 로그가 뜬다.
RabbitMQ 서버와 Connection이 정상적으로 동작하는 것을 확인할 수 있다.
http://localhost:15672
로 들어가서 로그인을 한다.
Connections
Connections 탭에서 연결이 되었는지 확인한다.
Exchange 확인
adme.exchange를 클릭해서 binding 탭을 보면
application.properties
에 설정한 값 그대로 들어가 있는 것을 볼 수 있다.
Listener test
Consumer 쪽으로 메세지가 잘 들어가는지 테스트 해본다.
RabbitMQ management 에서 자체적으로 테스트를 할 수 있다.
Queues > adme.queue(각자 설정한 queueName)의 상세 화면을 보면 Publish message 토글이 있다.
Payload에 아무 글이나 작성하고 Pulbish message를 클릭한다.
메세지가 발행이 되었다고 alert창이 뜬다. 서버 log를 확인해본다.
code test
destination을 /queue/<name>
으로 설정하면 메세지는 default exchange로 발행된다.
destination을 /topic/<routing_key>
로 설정하면 메세지는 amq.topic 이라는 exchange로 발행된다.
나머지 destination에 대해 /exchange/<exchange_name>/[routing_key]
의 exchange로 메세지가 발행된다.
exchanges 탭을 보면 아래와 같이 보여지는데 adme.exchange는 직접 만든 것이고
나머지는 RabbitMQ에서 기본으로 만들어 준 것이다.
custom exchange
1
stompClient.subscribe('/exchange/adme.exchange/room.' + roomId, onMessageReceived);
1
2
3
4
5
6
private final static String CHAT_EXCHANGE_NAME = "adme.exchange";
@MessageMapping("chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
rabbitTemplate.convertAndSend(CHAT_EXCHANGE_NAME, "room." + chatMessage.getRoomId(), chatMessage);
}
queue(default exchange)
1
stompClient.subscribe('/queue/room.' + roomId, onMessageReceived);
1
2
3
4
5
6
private final static String CHAT_EXCHANGE_NAME = "adme.exchange";
@MessageMapping("chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
rabbitTemplate.convertAndSend(C"room." + chatMessage.getRoomId(), chatMessage);
}
topic(amq.topic)
1
2
let exchan
stompClient.subscribe('/topic/room.' + roomId, onMessageReceived);
1
2
3
4
5
6
private final static String CHAT_EXCHANGE_NAME = "adme.exchange";
@MessageMapping("chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
rabbitTemplate.convertAndSend("amq.topic", "room." + chatMessage.getRoomId(), chatMessage);
}
*RabbitMQ 관리 페이지와 관련된 글은 Rabbitmq 적용에서 더 볼 수 있다.
ERROR 모음집
Connection refused
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information:
해결방법
RabbitMQ의 STOMP 포트인 61613 포트를 매핑시킨다.
1
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -p 61613:61613 rabbitmq:management
REFERENCE
- Stomp + Kafka를 이용한 채팅 기능 개발하기 - (with Spring Boot) #1 (Kafka와 Stomp는 무엇일까?)
- [36] WebSocket - In Memory 대신 외부 브로커 사용하는 이유
= = = = = = = = = = = = = = = = = = = = = = = = = = = - WebSocket - RabbitMQ
- Springboot + RabbitMQ 연동 및 초간단 샘플 프로젝트 만들기
- STOMP에 RabbitMQ를 추가해보았다.