Post

Rabbitmq 적용

Rabbitmq 적용

RabbitMQ 적용

이 전에 어떻게 적용을 해야하는지 파악했으니 이제 본 코드에 적용해보기로 했다.



적용할 부분


  • MessageType에 따른 Queue 설정
    Connect, Send, Disconnect에 대한 별도의 Queue를 설정하여 처리를 분산시킨다.
    사용자마다 Queue를 생성하는 것보다 효율적으로 처리할 수 있을 것 같았다.




Code 수정

application.properties

1
2
3
4
5
6
7
8
9
rabbitmq.connect.queue = connect.queue
rabbitmq.send.queue = send.queue
rabbitmq.disconnect.queue = disconnect.queue

rabbitmq.connect.exchange = connect.exchange
rabbitmq.send.exchange = send.exchange
rabbitmq.disconnect.exchange = disconnect.exchange

rabbitmq.routing.key = room.*




Stomp Config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {
    private final StompHandler stompHandler;
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.setPathMatcher(new AntPathMatcher("."));
        config.enableStompBrokerRelay("/exchange"); // custom exchange 사용 
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/coco").setAllowedOriginPatterns("*").withSockJS();
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(stompHandler);
    }
}

custom exchange만을 사용하여 message routing을 처리하기 때문에 “/exchange”만 추가했다.




RabbitMQConfig

MessageType(connect, send, disconnect)에 따라 exchange, queue 분리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${rabbitmq.connect.queue}")
    private String connectQueue;

    @Value("${rabbitmq.send.queue}")
    private String sendQueue;

    @Value("${rabbitmq.disconnect.queue}")
    private String disconnectQueue;

    @Value("${rabbitmq.connect.exchange}")
    private String connectExchange;

    @Value("${rabbitmq.send.exchange}")
    private String sendExchange;

    @Value("${rabbitmq.disconnect.exchange}")
    private String disconnectExchange;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    @Bean
    public Queue connectQueue() {
        return new Queue(connectQueue, true);
    }

    @Bean
    public Queue sendQueue() {
        return new Queue(sendQueue, true);
    }

    @Bean
    public Queue disconnectQueue() {
        return new Queue(disconnectQueue, true);
    }

    @Bean
    public TopicExchange connectExchange() {
        return new TopicExchange(connectExchange);
    }

    @Bean
    public TopicExchange sendExchange() {
        return new TopicExchange(sendExchange);
    }

    @Bean
    public TopicExchange disconnectExchange() {
        return new TopicExchange(disconnectExchange);
    }

    @Bean
    public Binding connectBinding() {
        return BindingBuilder
                .bind(connectQueue())
                .to(connectExchange())
                .with(routingKey);
    }

    @Bean
    public Binding sendBinding() {
        return BindingBuilder
                .bind(sendQueue())
                .to(sendExchange())
                .with(routingKey);
    }

    @Bean
    public Binding disconnectBinding() {
        return BindingBuilder
                .bind(disconnectQueue())
                .to(disconnectExchange())
                .with(routingKey);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        rabbitTemplate.setRoutingKey(routingKey);
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

connectExchange, sendExchange, disconnectExchange: 각각 연결, 전송, 연결 끊기와 관련된 Exchange를 정의했다.

connectQueue, sendQueue, disconnectQueue: 위와 동일하게 연결, 전송, 연결 끊기와 관련된 Queue를 정의했다.

그리고 Exchange와 Queue를 연결하는 Binding을 설정했다:


각각의 RabbitTemplate과 container 빈에 Exchange와 Queue 이름을 설정하지 않은 이유는

Exchange와 Queue가 각각 1개씩 있는 것이 아니라 여러 개 있기 때문이다.

RabbitTemplate과 container는 모든 Exchange와 Queue에 대해

메시지 전송 및 수신을 담당하는 역할을 하기 때문에 공통적인 설정만을 추가했다.




Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Controller
@RequiredArgsConstructor
@Slf4j
public class ChatController {
    private final ChatServiceImpl chatService;
    private final JwtTokenProvider jwtTokenProvider;
    private final RedisService redisService;
    private final RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.connect.exchange}")
    private String connectExchange;

    @Value("${rabbitmq.send.exchange}")
    private String sendExchange;

    @MessageMapping("chat.sendMessage")
    public void sendMessage(@Payload ChatMessage chatMessage) {
        String roomId = chatMessage.getRoomId();
        rabbitTemplate.convertAndSend(sendExchange, "room." + roomId, chatMessage);
    }

    @MessageMapping("chat.addUser")
    public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
        String token = headerAccessor.getFirstNativeHeader("Authorization");
        User user = jwtTokenProvider.getUserFromToken(token);
        String roomId = chatMessage.getRoomId();

        chatMessage.setSender(user.getNickname());
        chatMessage.setType(MessageType.JOIN);
        chatMessage.setAuth(user.getRole().name());

        rabbitTemplate.convertAndSend(connectExchange, "room." + roomId, chatMessage);
    }

    @EventListener
    public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
        String sessionId = (String) headerAccessor.getHeader("simpSessionId");
        String token = redisService.getSession(sessionId);
        User user = jwtProvider.getUserFromToken(token);
        String nickname = user.getNickname();
        String roomId = redisService.getRoomId(nickname);

        ChatMessage chatMessage = new ChatMessage();
        chatMessage.setType(MessageType.LEAVE);
        chatMessage.setSender(nickname);
        chatMessage.setRoomId(roomId);

        rabbitTemplate.convertAndSend(disconnectExchange, "room." + roomId, chatMessage);
    }
}




client

1
2
3
stompClient.subscribe('/exchange/' + connectEx +'/room.' + roomId, message);
stompClient.subscribe('/exchange/' + sendEx +'/room.' + roomId, message);
stompClient.subscribe('/exchange/' + disconEx +'/room.' + roomId, message);



전체적인 흐름을 그림으로 보면 아래와 같다.





RabbitMQ 페이지

설정한 connect.exchange, send.exchange, disconnect.exchange가 포함되어있는걸 볼 수 있다.

Exchange



이 중에 connect.exchange를 클릭해보면 아래와 같이 띄워진다.

Publish (in): 해당 exchange로 메시지가 도착하는 속도

Publish (out): 해당 exchange에서 메시지가 발송되는 속도



connect.exchange에 바인딩된 queue는 connect.queue다.





Queue

각 exchange에 바인딩 된 queue는 아래와 같다.




connect Queue

채팅방에 입장을 하면 서버에서 message를 보낸다.



해당 message를 Get Message 버튼을 클릭하면 볼 수 있다.




send Queue

채팅방에서 메세지를 보내고 관리자 페이지를 확인했다.



 TotalReadyUnackedIn memoryPersistentTransient, Paged Out
Messages110110
Message body bytes139 B139 B0 B139 B139 B0 B
Process memory13 KiB     
  • Total Messages: 현재 시스템에 총 메시지 수

  • Ready Messages: 대기열에 있는 메시지 수. consumer에게 전달 될 message

  • Unacked Messages: 소비자가 메시지를 처리하려고 했지만 처리가 완료되지 않은 상태
  • 소비자가 처리를 완료하고 메시지를 확인(acknowledge)하면

해당 메시지는 “Unacked” 상태에서 제거되고, “Ready” 상태로 전환된다.

ex) 서버 부하가 증가하거나 사용자가 많아져서 큐에 메시지가 쌓이는 경우 발생 할 수 있다.

  • In Memory Messages: 메시지 큐에 현재 메모리에 저장된 메시지 수

  • Persistent Messages: 디스크에 저장된 메시지 수
  • 메모리에 저장되는 대신 디스크에 지속적으로 저장되는 메시지는 지속적(persistent) 메시지다.

*durable queue를 사용하는 경우 메시지가 디스크에 저장된다.

  • Transient Messages, Paged Out: 메시지 큐에서 메모리에서 페이지 아웃(메시지가 메모리에서 디스크로 이동)된 메시지 수

  • Message Body Bytes: 메시지들의 총 바이트 크기

  • Process Memory: RabbitMQ 프로세스가 사용하는 메모리 양

    메시지를 한 번 처리할 때 사용 가능한 메모리 공간의 한계




여기서 한 번에 100개의 메시지를 보내면 어떻게 될까?

메시지를 보내는 과정중에 Process memory가 일시적으로 증가한다.




메시지를 더 이상 보내지 않았더니 메모리 양이 기존으로 다시 돌아왔다.



이를 통해 메시지의 전달과 소비에 따라 메모리의 사용량이 동적으로 변동하면서

메모리가 적절하게 관리되어 시스템의 안정성을 유지하고 성능을 최적화하는 것을 파악할 수 있다.

This post is licensed under CC BY 4.0 by the author.