Rabbitmq
RabbitMQ
WebSocket과 STOMP의 차이
WebSocket
WebSocket은 클라이언트와 서버 간 양방향 실시간 통신을 가능하게 하는 프로토콜이다.
별도의 메시지 broker 없이도 서버와 직접 연결하여 데이터를 송수신할 수 있다.
STOMP
STOMP(Simple Text Oriented Messaging Protocol)은 메시지 broker와 client 간 상호작용을 위한 프로토콜이다.
메시지 큐 또는 broker와의 통신을 추상화하여 클라이언트가 일관된 형식으로 메시지를 송수신할 수 있도록 한다.
header와 body 구조를 가지므로 JWT 토큰과 같은 인증 정보를 함께 전달할 수 있으며
메시지 타입에 따라 컨트롤러를 분리하기에도 적합하다.
STOMP를 선택한 이유
보안 토큰을 헤더에 포함하여 인증을 강화하고 메시지 타입에 따라 컨트롤러를 명확하게 분리하기 위해 STOMP 프로토콜을 선택했다.
In-Memory Broker의 문제점
Spring에서 STOMP를 사용할 경우 기본적으로 In-Memory Message Broker를 사용하게 된다.
그러나 다음과 같은 한계가 존재한다.
1. 용량 제한
메모리 기반이기 때문에 동시에 수용할 수 있는 세션 수가 제한된다.
사용자 수가 증가할수록 서버 부하가 급격히 증가한다.
2. 장애 발생 시 메시지 유실
메시지가 메모리에만 저장되므로 서버 장애 발생 시 메시지가 유실될 가능성이 높아진다.
3. 모니터링의 어려움
외부 관리 도구가 부족하여 현재 시스템 상태를 파악하기 어렵다.
이러한 문제를 해결하기 위해 외부 메시지 broker인
RabbitMQ, Apache Kafka, Apache ActiveMQ 등을 사용할 수 있다.
Redis 적용과 한계
메시지 처리 속도를 개선하기 위해 Redis를 broker로 적용했었다.
그러나 Redis는 STOMP 프로토콜을 지원하지 않기 때문에 front에서 사용 중인 STOMP과 호환되지 않는 문제가 발생했다.
front를 수정하지 않고 유지하기 위해서는 STOMP을 지원하는 외부 broker가 필요했고
이에 따라 RabbitMQ 또는 ActiveMQ를 선택지로 두게 되었다.
*Redis broker → Websocket 적용, Stomp → RabbitMQ, ActiveMQ 적용
RabbitMQ와 ActiveMQ
두 broker 모두 Consumer가 메시지를 명시적으로 확인하기 전까지 메시지를 유지하는 Acknowledgement 방식을 지원한다.
이 방식은 메시지 손실을 방지하는 데 중요한 요소이다.
비교적 RabbitMQ는 더 높은 성능과 안정성을 제공하며 더 많은 정보를 얻을 수 있어 선택했다.
RabbitMQ와 Kafka
RabbitMQ 특징
- AMQP 프로토콜 기반
- 메시지 지속성 보장
- 유연한 라우팅 기능
- 재시도 메커니즘 내장
Kafka 특징
- 로그 기반 아키텍처
- 대용량 데이터 처리에 특화
- 파티션 단위로 순서 보장
- 재시도 로직을 애플리케이션에서 직접 구현해야 함
재시도 메커니즘
RabbitMQ는 메시지 처리 실패 시 다른 Consumer가 재처리할 수 있는 구조를 제공한다.
반면 Kafka는 파티션 단위 순서를 보장하기 때문에, 특정 메시지 처리 실패 시 후속 메시지 처리가 지연될 수 있다.
실시간 채팅 서비스에서는 안정적인 전달과 유연한 재처리가 중요하다고 판단하여 RabbitMQ를 선택했다.
RabbitMQ 메시지 flow
단순 내장 메시지 브로커가 활성화될 때 사용되는 구성 요소
외부 브로커(RabbitMQ)를 구성할 때 사용되는 구성 요소
두 다이어그램 사이의 차이점은 메시지를 TCP를 통해 외부 STOMP broker까지 전달하고
broker에서 가입된 client로 메시지를 전달하기 위해 “broker relay”를 사용한다는 것이다.
외부 broker를 사용할 경우 Broker Relay를 통해 STOMP 메시지가 TCP로 RabbitMQ에 전달된다.
메시지 흐름은 다음과 같다.
1
Producer → Exchange → Binding → Queue → Consumer
Producer
메시지를 생성하여 RabbitMQ로 전송한다.
Exchange
Producer로부터 받은 메시지를 라우팅 규칙에 따라 Queue로 전달한다.
Binding
Exchange와 Queue를 연결하는 규칙이다.
Routing Key를 기준으로 메시지 전달 여부가 결정된다.
Queue
Exchange로부터 전달받은 메시지를 저장한다.
Consumer
Queue에 저장된 메시지를 구독하고 처리한다.
code
의존성 설정
1
2
implementation 'org.springframework.boot:spring-boot-starter-reactor-netty'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
Reactor Netty는 STOMP Broker Relay와의 통신을 위해 필요하다.
AMQP 스타터는 RabbitMQ 연동을 위한 핵심 의존성이다.
application 설정
1
2
3
4
5
6
7
8
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
rabbitmq.queue.name=adme.queue
rabbitmq.exchange.name=adme.exchange
rabbitmq.routing.key=room.*
room.* 패턴은 room 다음에 한 단어가 오는 경우만 매칭된다.
room.# 패턴은 하나 이상의 단어를 허용한다.
routing key는 사용자 별로 채팅방 id가 다르기 때문에 와일드 카드를 사용했다.
ex) room.*일 때, “room.hello”와 “room.hi”는 인식하지만 “room.hello.hi”는 인식하지 못한다.
이럴 때 room.#을 사용해야한다.
room.*은 “room.” 다음에 딱 하나의 단어,
room.#은 “room.” 다음에 하나 이상의 단어가 나온다고 보면 된다.
Stomp
Spring은 기본적으로 /를 기준으로 경로를 구분한다.
하지만 RabbitMQ는 routing key를 . 기준으로 구분한다.
“/room”과 같은 주제를 구독할 때, RabbitMQ에서는 “/room” 대신에 “.room”으로 사용된다.
따라서 Spring의 configureMessageBroker()에서 config.setPathMatcher(new AntPathMatcher("."))를 사용한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app"); // pub
config.setPathMatcher(new AntPathMatcher(".")); // URL을 / → .으로
config.enableStompBrokerRelay( "/queue", "/topic", "/exchange", "/amq/queue");
// stompBrokerRelayMessageHandler는 외부 메시지 브로커와 통신하기 위한 설정이다.
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//소켓에 연결하기 위한 엔드 포인트를 지정
registry.addEndpoint("/coco").setAllowedOriginPatterns("*").withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// jwt 토큰 검증을 위해 생성한 stompHandler를 인터셉터로 지정해준다.
registration.interceptors(stompHandler);
}
}
RabbitMQ Config
Exchage 유형은 4가지가 있다. (Direct Exchange, Fanout Exchange, Topic Exchange, Headers Exchange)
고객센터의 채팅을 구현했기 때문에 1:1 통신에 적합한 방식인 Direct Exchange를 사용하려고 했으나
routing Key와 큐의 binding key가 정확히 일치하는 경우에만 메시지를 전달하기 때문에
와일드 카드(*, #)를 사용할 수가 없다.
예를 들어 다음과 같은 routing key가 존재한다.
room.1, room.2, room.15, room.101
이 경우 와일드카드가 필요하다.
TopicExchange는 다음 와일드카드를 지원한다.
*한 단어 대체#한 단어 이상 대체
사용자 별로 채팅방 id를 구분하므로 유연하게 처리하기 위해 TopicExchange을 사용했다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${rabbitmq.queue.name}")
private String queueName;
@Value("${rabbitmq.exchange.name}")
private String exchange;
@Value("${rabbitmq.routing.key}")
private String routingKey;
/**
* 지정된 큐 이름으로 Queue 빈을 생성
* @return Queue 빈 객체
*/
@Bean
public Queue queue() {
return new Queue(queueName, true);
}
/**
* routing Key와 일치하는 Queue로 메시지를 전송하기 위한 TopicExchange 빈 생성
* @return TopicExchange 빈 객체
*/
@Bean
public TopicExchange exchange() {
return new TopicExchange(exchange);
}
/**
* Exchange와 Queue를 바인딩하기 위한 Binding 빈 생성
* @param queue Queue 빈 객체
* @param exchange TopicExchange 빈 객체
* @return Binding 빈 객체
*/
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
// 라우팅 키를 사용하여 Exchange와 Queue를 바인딩
return BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey);
}
/**
* RabbitMQ와의 연결을 관리하는 클래스
* @return ConnectionFactory 빈 객체
*/
@Bean
public ConnectionFactory connectionFactory() {
// RabbitMQ와의 연결을 설정
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
/**
* RabbitMQ와의 메시지 통신을 담당하는 클래스
* @param connectionFactory ConnectionFactory 빈 객체
* @return RabbitTemplate 빈 객체
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
// RabbitMQ와의 메시지 통신을 설정
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
rabbitTemplate.setExchange(exchange);
rabbitTemplate.setRoutingKey(routingKey);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
// RabbitMQ 메시지 리스너 컨테이너 설정
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
return container;
}
/**
* Jackson library를 사용해서 메시지를 JSON 형식으로 변환하는 BEAN 생성
* @return MessageConverter 빈 객체
*/
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
durable을 true로 설정하면 서버 재시작 후에도 큐가 유지된다.
Producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
@RequiredArgsConstructor
@Slf4j
public class ChatMessageProducer {
private final RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.queue.name}")
private String queue;
@Value("${rabbitmq.exchange.name}")
private String exchange;
@Value("${rabbitmq.routing.key}")
private String routingKey;
public void sendMessage(ChatMessage message, String roomId) {
log.info("message send : {}", message);
rabbitTemplate.convertAndSend(exchange, roomId, message);
}
}
Exchange와 Routing Key를 기반으로 메시지를 발행한다.
Consumer
Redis의 경우 MessageListener를 구현하는 방식으로 메시지를 구독했었다.
RabbitMQ에서는 메시지를 소비하고 처리하기 위해 @RabbitListener를 사용하면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@Slf4j
public class ChatMessageConsumer{
@RabbitListener(queues = "${rabbitmq.queue.name}")
public void onMessage(Message message) { // Queue에서 message를 구독
try {
log.info("Received message: " + new String(message.getBody()));
} catch (Exception e) {
log.error("Error processing message: " + e.getMessage());
}
}
}
Queue에 적재된 메시지를 구독하여 처리한다.
Controller
Redis에서는 메시지를 수신하기 위해 Channel을 구독하고 이를 수신하는 MessageListener를 등록했었다.
하지만 RabbitMQ에서는 메시지를 수신하기 위해 명시적인 구독이 필요하지 않았다.
Producer가 메시지를 보내면 해당 Exchange에 메시지가 전달되고
이를 구독하는 Consumer는 큐로부터 메시지를 받아 처리한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Controller
@RequiredArgsConstructor
@Slf4j
public class ChatController {
private final ChatServiceImpl chatService;
private final JwtTokenProvider jwtTokenProvider;
private final RedisService redisService;
private final ChatMessageProducer producer;
@MessageMapping("chat.sendMessage")
public void sendMessage(@Payload ChatMessage chatMessage) {
producer.sendMessage(chatMessage, "room." + chatMessage.getRoomId());
}
@MessageMapping("chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
chatMessage.setType(MessageType.JOIN);
redisService.addRedis(chatMessage);
chatService.countUser("Connect", roomId, chatMessage);
producer.sendMessage(chatMessage, "room." + chatMessage.getRoomId());
}
}
STOMP 메시지를 수신한 후 RabbitMQ로 전달하는 역할을 한다.
Docker 실행
1
2
3
4
5
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-p 61613:61613 \
rabbitmq:management
- 5672: AMQP 포트
- 15672: 관리 UI 포트
- 61613: STOMP 포트
STOMP 플러그인을 활성화해야 한다.
1
2
rabbitmq-plugins enable rabbitmq_web_stomp
rabbitmq-plugins enable rabbitmq_web_stomp_examples
관리자 페이지 확인
application을 실행하면 아래 로그가 뜬다.
RabbitMQ 서버와 Connection이 정상적으로 동작하는 것을 확인할 수 있다.
http://localhost:15672로 들어가서 로그인을 한다.
Connections, Exchanges, Queues 탭에서 정상 연결 여부를 확인할 수 있다.
Connections
Connections 탭에서 연결이 되었는지 확인한다.
Exchange 확인
adme.exchange를 클릭해서 binding 탭을 보면
application.properties에 설정한 값 그대로 들어가 있는 것을 볼 수 있다.
Listener test
Consumer 쪽으로 메세지가 잘 들어가는지 테스트 해본다.
RabbitMQ management 에서 자체적으로 테스트를 할 수 있다.
Queues > adme.queue(각자 설정한 queueName)의 상세 화면을 보면 Publish message 토글이 있다.
Payload에 아무 글이나 작성하고 Pulbish message를 클릭한다.
메세지가 발행이 되었다고 alert창이 뜬다. 서버 log를 확인해본다.
code test
destination을 /queue/<name>으로 설정하면 메세지는 default exchange로 발행된다.
destination을 /topic/<routing_key>로 설정하면 메세지는 amq.topic 이라는 exchange로 발행된다.
나머지 destination에 대해 /exchange/<exchange_name>/[routing_key] 의 exchange로 메세지가 발행된다.
exchanges 탭을 보면 아래와 같이 보여지는데 adme.exchange는 직접 만든 것이고
나머지는 RabbitMQ에서 기본으로 만들어 준 것이다.
custom exchange
1
stompClient.subscribe('/exchange/adme.exchange/room.' + roomId, onMessageReceived);
1
2
3
4
5
6
private final static String CHAT_EXCHANGE_NAME = "adme.exchange";
@MessageMapping("chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
rabbitTemplate.convertAndSend(CHAT_EXCHANGE_NAME, "room." + chatMessage.getRoomId(), chatMessage);
}
queue(default exchange)
1
stompClient.subscribe('/queue/room.' + roomId, onMessageReceived);
1
2
3
4
5
6
private final static String CHAT_EXCHANGE_NAME = "adme.exchange";
@MessageMapping("chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
rabbitTemplate.convertAndSend(C"room." + chatMessage.getRoomId(), chatMessage);
}
topic(amq.topic)
1
2
let exchan
stompClient.subscribe('/topic/room.' + roomId, onMessageReceived);
1
2
3
4
5
6
private final static String CHAT_EXCHANGE_NAME = "adme.exchange";
@MessageMapping("chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
rabbitTemplate.convertAndSend("amq.topic", "room." + chatMessage.getRoomId(), chatMessage);
}
*RabbitMQ 관리 페이지와 관련된 글은 Rabbitmq 적용에서 더 볼 수 있다.
ERROR 정리
Connection refused
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information:
Netty에서 Connection refused 오류가 발생하는 경우 STOMP 포트 61613이 매핑되지 않았을 가능성이 높다.
Docker 실행 시 반드시 61613 포트를 매핑해야 한다.
1
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -p 61613:61613 rabbitmq:management
REFERENCE