RabbitMQ 적용
이 전에 어떻게 적용을 해야하는지 파악했으니 이제 본 코드에 적용해보기로 했다.
적용할 부분
- 1:1 채팅
현재 나는 1:n 채팅은 존재하지 않는다.
따라서 “topic”은 지운다.
topic과 queue에 대한 stackoverflow 글
- MessageType에 따른 Queue 설정
Connect, Send, Disconnect에 대한 별도의 Queue를 설정하여 처리를 분산시킨다.
사용자마다 Queue를 생성하는 것보다 효율적으로 처리할 수 있을 것 같았다.
Code 수정
application.properties
1
2
3
4
5
6
7
8
9
rabbitmq.connect.queue = connect.queue
rabbitmq.send.queue = send.queue
rabbitmq.disconnect.queue = disconnect.queue
rabbitmq.connect.exchange = connect.exchange
rabbitmq.send.exchange = send.exchange
rabbitmq.disconnect.exchange = disconnect.exchange
rabbitmq.routing.key = room.*
Stomp Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app");
config.setPathMatcher(new AntPathMatcher("."));
config.enableStompBrokerRelay("/exchange"); // custom exchange 사용
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/coco").setAllowedOriginPatterns("*").withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(stompHandler);
}
}
custom exchange만을 사용하여 message routing을 처리하기 때문에 “/exchange”만 추가했다.
RabbitMQConfig
MessageType(connect, send, disconnect)에 따라 exchange, queue 분리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${rabbitmq.connect.queue}")
private String connectQueue;
@Value("${rabbitmq.send.queue}")
private String sendQueue;
@Value("${rabbitmq.disconnect.queue}")
private String disconnectQueue;
@Value("${rabbitmq.connect.exchange}")
private String connectExchange;
@Value("${rabbitmq.send.exchange}")
private String sendExchange;
@Value("${rabbitmq.disconnect.exchange}")
private String disconnectExchange;
@Value("${rabbitmq.routing.key}")
private String routingKey;
@Bean
public Queue connectQueue() {
return new Queue(connectQueue, true);
}
@Bean
public Queue sendQueue() {
return new Queue(sendQueue, true);
}
@Bean
public Queue disconnectQueue() {
return new Queue(disconnectQueue, true);
}
@Bean
public TopicExchange connectExchange() {
return new TopicExchange(connectExchange);
}
@Bean
public TopicExchange sendExchange() {
return new TopicExchange(sendExchange);
}
@Bean
public TopicExchange disconnectExchange() {
return new TopicExchange(disconnectExchange);
}
@Bean
public Binding connectBinding() {
return BindingBuilder
.bind(connectQueue())
.to(connectExchange())
.with(routingKey);
}
@Bean
public Binding sendBinding() {
return BindingBuilder
.bind(sendQueue())
.to(sendExchange())
.with(routingKey);
}
@Bean
public Binding disconnectBinding() {
return BindingBuilder
.bind(disconnectQueue())
.to(disconnectExchange())
.with(routingKey);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
rabbitTemplate.setRoutingKey(routingKey);
return rabbitTemplate;
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
connectExchange, sendExchange, disconnectExchange: 각각 연결, 전송, 연결 끊기와 관련된 Exchange를 정의했다.
connectQueue, sendQueue, disconnectQueue: 위와 동일하게 연결, 전송, 연결 끊기와 관련된 Queue를 정의했다.
그리고 Exchange와 Queue를 연결하는 Binding을 설정했다:
각각의 RabbitTemplate과 container 빈에 Exchange와 Queue 이름을 설정하지 않은 이유는
Exchange와 Queue가 각각 1개씩 있는 것이 아니라 여러 개 있기 때문이다.
RabbitTemplate과 container는 모든 Exchange와 Queue에 대해
메시지 전송 및 수신을 담당하는 역할을 하기 때문에 공통적인 설정만을 추가했다.
Controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Controller
@RequiredArgsConstructor
@Slf4j
public class ChatController {
private final ChatServiceImpl chatService;
private final JwtTokenProvider jwtTokenProvider;
private final RedisService redisService;
private final RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.connect.exchange}")
private String connectExchange;
@Value("${rabbitmq.send.exchange}")
private String sendExchange;
@MessageMapping("chat.sendMessage")
public void sendMessage(@Payload ChatMessage chatMessage) {
String roomId = chatMessage.getRoomId();
rabbitTemplate.convertAndSend(sendExchange, "room." + roomId, chatMessage);
}
@MessageMapping("chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
String token = headerAccessor.getFirstNativeHeader("Authorization");
User user = jwtTokenProvider.getUserFromToken(token);
String roomId = chatMessage.getRoomId();
chatMessage.setSender(user.getNickname());
chatMessage.setType(MessageType.JOIN);
chatMessage.setAuth(user.getRole().name());
rabbitTemplate.convertAndSend(connectExchange, "room." + roomId, chatMessage);
}
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = (String) headerAccessor.getHeader("simpSessionId");
String token = redisService.getSession(sessionId);
User user = jwtProvider.getUserFromToken(token);
String nickname = user.getNickname();
String roomId = redisService.getRoomId(nickname);
ChatMessage chatMessage = new ChatMessage();
chatMessage.setType(MessageType.LEAVE);
chatMessage.setSender(nickname);
chatMessage.setRoomId(roomId);
rabbitTemplate.convertAndSend(disconnectExchange, "room." + roomId, chatMessage);
}
}
client
1
2
3
stompClient.subscribe('/exchange/' + connectEx +'/room.' + roomId, message);
stompClient.subscribe('/exchange/' + sendEx +'/room.' + roomId, message);
stompClient.subscribe('/exchange/' + disconEx +'/room.' + roomId, message);
전체적인 흐름을 그림으로 보면 아래와 같다.
RabbitMQ 페이지
설정한 connect.exchange
, send.exchange
, disconnect.exchange
가 포함되어있는걸 볼 수 있다.
Exchange
이 중에 connect.exchange
를 클릭해보면 아래와 같이 띄워진다.
Publish (in): 해당 exchange로 메시지가 도착하는 속도
Publish (out): 해당 exchange에서 메시지가 발송되는 속도
connect.exchange
에 바인딩된 queue는 connect.queue
다.
Queue
각 exchange에 바인딩 된 queue는 아래와 같다.
connect Queue
채팅방에 입장을 하면 서버에서 message를 보낸다.
해당 message를 Get Message 버튼을 클릭하면 볼 수 있다.
send Queue
채팅방에서 메세지를 보내고 관리자 페이지를 확인했다.
Total | Ready | Unacked | In memory | Persistent | Transient, Paged Out | |
---|---|---|---|---|---|---|
Messages | 1 | 1 | 0 | 1 | 1 | 0 |
Message body bytes | 139 B | 139 B | 0 B | 139 B | 139 B | 0 B |
Process memory | 13 KiB |
Total Messages: 현재 시스템에 총 메시지 수
Ready Messages: 대기열에 있는 메시지 수. consumer에게 전달 될 message
Unacked Messages: 소비자가 메시지를 처리하려고 했지만 처리가 완료되지 않은 상태
소비자가 처리를 완료하고 메시지를 확인(acknowledge)하면
해당 메시지는 “Unacked” 상태에서 제거되고, “Ready” 상태로 전환된다.
ex) 서버 부하가 증가하거나 사용자가 많아져서 큐에 메시지가 쌓이는 경우 발생 할 수 있다.In Memory Messages: 메시지 큐에 현재 메모리에 저장된 메시지 수
Persistent Messages: 디스크에 저장된 메시지 수
메모리에 저장되는 대신 디스크에 지속적으로 저장되는 메시지는 지속적(persistent) 메시지다.
*durable queue를 사용하는 경우 메시지가 디스크에 저장된다.Transient Messages, Paged Out: 메시지 큐에서 메모리에서 페이지 아웃(메시지가 메모리에서 디스크로 이동)된 메시지 수
Message Body Bytes: 메시지들의 총 바이트 크기
Process Memory: RabbitMQ 프로세스가 사용하는 메모리 양
메시지를 한 번 처리할 때 사용 가능한 메모리 공간의 한계
여기서 한 번에 100개의 메시지를 보내면 어떻게 될까?
메시지를 보내는 과정중에 Process memory가 일시적으로 증가한다.
메시지를 더 이상 보내지 않았더니 메모리 양이 기존으로 다시 돌아왔다.
이를 통해 메시지의 전달과 소비에 따라 메모리의 사용량이 동적으로 변동하면서
메모리가 적절하게 관리되어 시스템의 안정성을 유지하고 성능을 최적화하는 것을 파악할 수 있다.