Rabbitmq 적용
RabbitMQ 적용
이 전에 어떻게 적용을 해야하는지 파악했으니 이제 본 코드에 적용해보기로 했다.
적용할 부분
- 1:1 채팅
현재 나는 1:n 채팅은 존재하지 않는다.
따라서 “topic”은 지운다.
topic과 queue에 대한 stackoverflow 글
- MessageType에 따른 Queue 설정
Connect, Send, Disconnect에 대한 별도의 Queue를 설정하여 처리를 분산시킨다.
사용자마다 Queue를 생성하는 것보다 효율적으로 처리할 수 있을 것 같았다.
Code 수정
application.properties
1
2
3
4
5
6
7
8
9
rabbitmq.connect.queue = connect.queue
rabbitmq.send.queue = send.queue
rabbitmq.disconnect.queue = disconnect.queue
rabbitmq.connect.exchange = connect.exchange
rabbitmq.send.exchange = send.exchange
rabbitmq.disconnect.exchange = disconnect.exchange
rabbitmq.routing.key = room.*
Stomp Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app");
config.setPathMatcher(new AntPathMatcher("."));
config.enableStompBrokerRelay("/exchange"); // custom exchange 사용
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/coco").setAllowedOriginPatterns("*").withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(stompHandler);
}
}
custom exchange만을 사용하여 message routing을 처리하기 때문에 “/exchange”만 추가했다.
RabbitMQConfig
MessageType(connect, send, disconnect)에 따라 exchange, queue 분리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${rabbitmq.connect.queue}")
private String connectQueue;
@Value("${rabbitmq.send.queue}")
private String sendQueue;
@Value("${rabbitmq.disconnect.queue}")
private String disconnectQueue;
@Value("${rabbitmq.connect.exchange}")
private String connectExchange;
@Value("${rabbitmq.send.exchange}")
private String sendExchange;
@Value("${rabbitmq.disconnect.exchange}")
private String disconnectExchange;
@Value("${rabbitmq.routing.key}")
private String routingKey;
@Bean
public Queue connectQueue() {
return new Queue(connectQueue, true);
}
@Bean
public Queue sendQueue() {
return new Queue(sendQueue, true);
}
@Bean
public Queue disconnectQueue() {
return new Queue(disconnectQueue, true);
}
@Bean
public TopicExchange connectExchange() {
return new TopicExchange(connectExchange);
}
@Bean
public TopicExchange sendExchange() {
return new TopicExchange(sendExchange);
}
@Bean
public TopicExchange disconnectExchange() {
return new TopicExchange(disconnectExchange);
}
@Bean
public Binding connectBinding() {
return BindingBuilder
.bind(connectQueue())
.to(connectExchange())
.with(routingKey);
}
@Bean
public Binding sendBinding() {
return BindingBuilder
.bind(sendQueue())
.to(sendExchange())
.with(routingKey);
}
@Bean
public Binding disconnectBinding() {
return BindingBuilder
.bind(disconnectQueue())
.to(disconnectExchange())
.with(routingKey);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
rabbitTemplate.setRoutingKey(routingKey);
return rabbitTemplate;
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
connectExchange, sendExchange, disconnectExchange: 각각 연결, 전송, 연결 끊기와 관련된 Exchange를 정의했다.
connectQueue, sendQueue, disconnectQueue: 위와 동일하게 연결, 전송, 연결 끊기와 관련된 Queue를 정의했다.
그리고 Exchange와 Queue를 연결하는 Binding을 설정했다:
각각의 RabbitTemplate과 container 빈에 Exchange와 Queue 이름을 설정하지 않은 이유는
Exchange와 Queue가 각각 1개씩 있는 것이 아니라 여러 개 있기 때문이다.
RabbitTemplate과 container는 모든 Exchange와 Queue에 대해
메시지 전송 및 수신을 담당하는 역할을 하기 때문에 공통적인 설정만을 추가했다.
Controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Controller
@RequiredArgsConstructor
@Slf4j
public class ChatController {
private final ChatServiceImpl chatService;
private final JwtTokenProvider jwtTokenProvider;
private final RedisService redisService;
private final RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.connect.exchange}")
private String connectExchange;
@Value("${rabbitmq.send.exchange}")
private String sendExchange;
@MessageMapping("chat.sendMessage")
public void sendMessage(@Payload ChatMessage chatMessage) {
String roomId = chatMessage.getRoomId();
rabbitTemplate.convertAndSend(sendExchange, "room." + roomId, chatMessage);
}
@MessageMapping("chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
String token = headerAccessor.getFirstNativeHeader("Authorization");
User user = jwtTokenProvider.getUserFromToken(token);
String roomId = chatMessage.getRoomId();
chatMessage.setSender(user.getNickname());
chatMessage.setType(MessageType.JOIN);
chatMessage.setAuth(user.getRole().name());
rabbitTemplate.convertAndSend(connectExchange, "room." + roomId, chatMessage);
}
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = (String) headerAccessor.getHeader("simpSessionId");
String token = redisService.getSession(sessionId);
User user = jwtProvider.getUserFromToken(token);
String nickname = user.getNickname();
String roomId = redisService.getRoomId(nickname);
ChatMessage chatMessage = new ChatMessage();
chatMessage.setType(MessageType.LEAVE);
chatMessage.setSender(nickname);
chatMessage.setRoomId(roomId);
rabbitTemplate.convertAndSend(disconnectExchange, "room." + roomId, chatMessage);
}
}
client
1
2
3
stompClient.subscribe('/exchange/' + connectEx +'/room.' + roomId, message);
stompClient.subscribe('/exchange/' + sendEx +'/room.' + roomId, message);
stompClient.subscribe('/exchange/' + disconEx +'/room.' + roomId, message);
전체적인 흐름을 그림으로 보면 아래와 같다.
RabbitMQ 페이지
설정한 connect.exchange, send.exchange, disconnect.exchange가 포함되어있는걸 볼 수 있다.
Exchange
이 중에 connect.exchange를 클릭해보면 아래와 같이 띄워진다.
Publish (in): 해당 exchange로 메시지가 도착하는 속도
Publish (out): 해당 exchange에서 메시지가 발송되는 속도
connect.exchange에 바인딩된 queue는 connect.queue다.
Queue
각 exchange에 바인딩 된 queue는 아래와 같다.
connect Queue
채팅방에 입장을 하면 서버에서 message를 보낸다.
해당 message를 Get Message 버튼을 클릭하면 볼 수 있다.
send Queue
채팅방에서 메세지를 보내고 관리자 페이지를 확인했다.
| Total | Ready | Unacked | In memory | Persistent | Transient, Paged Out | |
|---|---|---|---|---|---|---|
| Messages | 1 | 1 | 0 | 1 | 1 | 0 |
| Message body bytes | 139 B | 139 B | 0 B | 139 B | 139 B | 0 B |
| Process memory | 13 KiB |
Total Messages: 현재 시스템에 총 메시지 수
Ready Messages: 대기열에 있는 메시지 수. consumer에게 전달 될 message
- Unacked Messages: 소비자가 메시지를 처리하려고 했지만 처리가 완료되지 않은 상태
- 소비자가 처리를 완료하고 메시지를 확인(acknowledge)하면
해당 메시지는 “Unacked” 상태에서 제거되고, “Ready” 상태로 전환된다.
ex) 서버 부하가 증가하거나 사용자가 많아져서 큐에 메시지가 쌓이는 경우 발생 할 수 있다.
In Memory Messages: 메시지 큐에 현재 메모리에 저장된 메시지 수
- Persistent Messages: 디스크에 저장된 메시지 수
- 메모리에 저장되는 대신 디스크에 지속적으로 저장되는 메시지는 지속적(persistent) 메시지다.
*durable queue를 사용하는 경우 메시지가 디스크에 저장된다.
Transient Messages, Paged Out: 메시지 큐에서 메모리에서 페이지 아웃(메시지가 메모리에서 디스크로 이동)된 메시지 수
Message Body Bytes: 메시지들의 총 바이트 크기
Process Memory: RabbitMQ 프로세스가 사용하는 메모리 양
메시지를 한 번 처리할 때 사용 가능한 메모리 공간의 한계
여기서 한 번에 100개의 메시지를 보내면 어떻게 될까?
메시지를 보내는 과정중에 Process memory가 일시적으로 증가한다.
메시지를 더 이상 보내지 않았더니 메모리 양이 기존으로 다시 돌아왔다.
이를 통해 메시지의 전달과 소비에 따라 메모리의 사용량이 동적으로 변동하면서
메모리가 적절하게 관리되어 시스템의 안정성을 유지하고 성능을 최적화하는 것을 파악할 수 있다.









