Home 비동기
Post
Cancel

비동기

비동기 처리

비동기에 대해서 스터디에서 잠깐 주제로 나와서 얘기를 해봤었다.

포인트는 응답과 동시성이었다.

내가 알고있는 비동기는 응답을 기다리지 않고 다음 로직을 실행하는 것이다.

그런데 동시에 일어난다가 동기이고 그렇지 않은 경우가 비동기로 알고있는 팀원도 있었다.
(→ 요청과 그 결과가 동시에 일어난다)

결론은 실행시켰을 때 반환값이 기대되는 경우는 반환값을 받기 전까지 실행하지 않는다는 얘기이기 때문에

같은 말이라고 보면 될 것 같다.


동기 ex) 콜센터
비동기 ex) 이메일





JAVA

JAVA에서 비동기 처리를 위해 Thread를 사용할 수 있다.

Thread를 구현하는 방법에는 두 가지가 존재한다.

하나는 Thread 클래스를 상속하여 구현하는 방법과 다른 하나는 Runnable 인터페이스를 구현하는 방법이 있다.

자바에서는 다중 상속을 하지 못하기 때문에 대부분 확장성을 고려하여 Runnable 인터페이스를 구현하는 편이다.


추가로, Executor는 기능적으로 보면 Thread와 유사하여 Thread의 대체제로 생각할 수 있지만,

정확하게는 Runnable의 작업을 실행시키는 함수를 담은 인터페이스라고 한다.


1
2
3
4
5
6
7
8
9
public void method() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            // do something
        }            
    });
    thread.start(); // start()로 Thread를 실행한다.
}

동시에 1000개의 호출이 이뤄진다면 동시에 1000개의 thread가 생성되는 방식이다.

thread를 관리할 수 없기 때문에 위험한 방법이다.



병렬작업 처리량이 많아지면 성능이 저하되는데, 이를 막기 위해서 Thread Pool을 사용해야 한다.

Thread Pool은 Thread 개수를 미리 정해 놓고, 작업 큐에 들어오는 요청을 미리 생성해 놓은 Thread들에게 할당하는 방식이다.

JDK 1.5부터는 java.util.concurrent Package에 ExecutorService 인터페이스와 Executors 클래스를 제공하고 있다.

주요 인터페이스로는, 1) Executor, 2) ExecutorService, 3) ScheduledExecutorService 가 있다.



Executor

동시에 여러 요청을 처리해야 하는 경우에 매번 새로운 쓰레드를 만드는 것은 비효율적이다.

그래서 쓰레드를 미리 만들어두고 재사용하기 위한 쓰레드 풀(Thread Pool)이 등장하게 되었는데,

Executor 인터페이스는 쓰레드 풀의 구현을 위한 인터페이스이다.

1
2
Executor executor = Executors.newSingleThreadExecutor(); // single thread
executor.execute(() -> System.out.println("Thread: " + Thread.currentThread().getName()));

Executor는 쓰레드를 생성하고 처리하는 인터페이스이다.

execute()라는 메서드만을 가지고 있는데, 쓰레드를 처리할 수는 있지만 종료할 수는 없다.

ide에서 실행시켜보면 강제종료하기 전까지 계속 실행되는 것을 확인할 수 있다.



ExecutorService

Thread를 관리하기 위해서는 JDK 1.5부터 제공하는 java.util.concurrent.ExecutorService를 사용하면 된다.

ExecutorService에 Task(작업)를 지정해주면 가진 ThreadPool을 이용하여 Task를 실행한다.

Task는 큐(Queue)로 관리되기 때문에 ThreadPool의 Thread 갯수보다 실행할 Task가 많은경우

미실행된 Task는 큐에 저장되어 실행을 마친 Thread가 생길 때까지 기다린다.

image

ExecutorService는 Executor를 상속 받은 인터페이스이다. Thread를 생성하고 처리하고 종료하는 등의 작업을 할 수 있다.

ExecutorService는 Executor를 상속 받았기 때문에 execute()submit() 모두 호출이 가능하다.

execute()는 Runnable 인터페이스만 인자로 받을 수 있지만, submit()은 Runnable과 Callable 인터페이스 모두 인자로 받을 수 있다.


image

execute()는 return값이 없는 Runnable 객체를 작업 큐에 저장한다.

작업 처리 도중에 예외가 발생하면 Thread가 종료되고 해당 Thread를 Thread Pool에서 제거한뒤

다른 작업 처리를 위해서 새로운 Thread를 생성한다.


submit()는 작업 처리 결과를 받을 수 있도록 Future를 리턴한다.
(비동기적 연산의 처리 결과를 표현하기 위해 사용)

submit()는 작업 처리 도중에 예외가 발생하더라도 Thread는 종료되지 않고 다음 작업을 위해 재사용된다.

그러므로 Thread 생성 오버헤드를 줄이기 위해 submit()을 사용하는 것이 좋다.

*오버헤드
어떤 처리를 하기 위해 들어가는 간접적인 처리 시간 · 메모리 등을 말한다.
예를 들어 A라는 처리를 단순하게 실행한다면 10초 걸리는데,
안전성을 고려하고 부가적인 B라는 처리를 추가한 결과 처리시간이 15초 걸렸다면, 오버헤드는 5초가 된다.


submit()을 사용하면 작업을 ExecutorService가 만든 쓰레드풀에서 처리하고, shutdown()으로 쓰레드 풀을 종료할 수 있다.

shutdown()을 사용하면 작업이 다 종료될 때까지 기다리고 종료하지만,

작업이 끝나든 말든 지금 당장 종료하고 싶다면 shutdownNow()를 사용하면 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
    public static void main(String args[]) throws InterruptedException {
       // 4개의 고정된 쓰레드풀을 갖고 있는 ExecutorService를 생성하는 코드
        ExecutorService executor = Executors.newFixedThreadPool(4);

        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job1 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job2 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job3 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job4 " + threadName);
        });

        // 더이상 ExecutorService에 Task를 추가할 수 없다.
        // 작업이 모두 완료되면 쓰레드풀을 종료시킨다.
        executor.shutdown();

        // shutdown() 호출 전에 등록된 Task 중에 아직 완료되지 않은 Task가 있을 수 있다.
        // Timeout을 20초 설정하고 완료되기를 기다린다.
        // 20초 전에 완료되면 true를 리턴하며, 20초가 지나도 완료되지 않으면 false를 리턴한다.
        if (executor.awaitTermination(20, TimeUnit.SECONDS)) {
            System.out.println(LocalTime.now() + " All jobs are terminated");
        } else {
            System.out.println(LocalTime.now() + " some jobs are not terminated");

            // 모든 Task를 강제 종료
            executor.shutdownNow();
        }
    }

ExecutorService를 만들어 작업을 실행하면, shutdown이 호출되기 전까지 계속해서 다음 작업을 대기하게 된다.

그러므로 작업이 완료되었다면 반드시 shutdown을 명시적으로 호출해주어야 한다.


위에서 얘기한 문제를 해결하기 위해 Thread Pool을 적용했지만 Thread Pool도 단점이 있다.

4개의 Thread를 만들어 놓았지만 실제로 1개의 요청만 들어온다면

나머지 3개의 Thread는 메모리만 차지하게 되고 메모리 낭비가 발생한다.


또한 작업 완료 소요 시간이 다를 경우 유휴 시간이 발생하게 된다.

예를 들어 A, B, C, D Thread가 있을 경우 A, B는 이미 처리가 끝났는데

C, D는 완료되지 않았을 경우 A, B Thread는 C, D의 작업이 끝날 때까지 가다리게게 된다.

이를 방지 하기 위해 JDK 1.7부터는 forJoinPool를 제공하며,

해당 블로그에서 자세한 설명을 볼 수 있다. 쓰레드풀 과 ForkJoinPool



💬
자바에서는 ExecutorService를 통해서 비동기를 처리할 수 있다.

요청마다 Thread를 찍어내는 방법도 있지만 매 요청 마다 Thread가 생성되면 Thread 관리가 되지 않아서 위험하다.

ExecutorService를 사용하면 원하는 크기만큼의 Thread Pool을 생성하고 풀에서 Thread를 꺼내서 사용하고 다시 반납하는 방식으로 처리한다.

application에서 비동기 메서드가 많이 필요한 경우 method를 비동기에 맞게 수정해야하는 번거로움이 있다.


스프링에서는 개발자들의 번거러움을 해결해주기 위해서 @EnablyAsync, @Async 어노테이션을 제공해준다.

@EnableAsync, @Async 두개로 비동기 메서드를 구현할 수 있다.

@EnableAsync를 사용하면 SimpleAsyncTaskExecutor를 사용하도록 설정되어 있다.

SimpleAsyncTaskExecutor는 매번 Thread로 생성하는 방식이기 때문에 설정을 오버라이딩해서 사용하는게 좋다.





Spring

기본적으로 Spring Event 는 동기적이다. 하지만 @Async를 통해 비동기로 동작할 수 있다.


@Async (SimpleAsyncTaskExecutor)

Spring의 @Async를 사용하면 간단하게 처리가 가능하다.

@EnableAsync를 Application 클래스에 붙이고, 비동기를 사용하려는 메소드위에 @Async를 붙이면 적용된다.


1. @EnableAsync@Async를 쓰겠다고 스프링에게 알린다.

2. 비동기로 수행되었으면 하는 메서드위에 @Async를 적용한다.

1
2
3
4
5
6
7
@SpringBootApplication
@EnableAsync
public class AdmeApplication {
    public static void main(String[] args) {
        SpringApplication.run(AdmeApplication.class, args);
    }
}
1
2
3
4
5
public class HelloService{
    @Async
    public void helloWorld() throws Exception{
    }
}

하지만 해당방법은 default값으로 적용되는데,

SimpleAsyncTaskExecutor를 사용하게되고 Thread Pool에 의한게 아닌 Thread를 만들어내는 역할만 한다.

Thread를 제대로 관리해주지 못한다.

@Async의 기본설정은 SimpleAsyncTaskExecutor를 사용하도록 되어있기 때문이다.



주의 사항 private 메서드에는 적용이 안된다. public만 된다.

self-invocation(자가 호출)해서는 안된다. → 같은 클래스 내부의 메서드를 호출하는 것은 안된다.





@Async (ThreadPoolTaskExecutor)

스터디에서 다른 팀원들은 설정 없이 @Async만 적용했다.(위 방법)

따로 설정없이 @Async만 하면 필요할때만 꺼내주기 때문에 설정안하는게 효과적이기 때문에 적용했다고 했다.

하지만 나는 채팅으로 실시간성이 중요하기 때문에 미리 해두는게 더 효율적이라고 느껴서 직접 설정했다.



직접 설정하는 방법은 Thread Pool을 이용해서 thread를 관리가능한 방식이다.

위에서 적용했던 Applcation에 @EnableAsync를 제거 해준 뒤, AsyncConfig 생성한다.
(SpringAsyncConfig 클래스에 설정해뒀기 때문에 중복 된다.)

Application 클래스에 @EnableAutoConfiguration(혹은 @SpringBootApplication) 설정이 되어있다면

런타임시 @Configuration가 설정된 SpringAsyncConfig 클래스의 threadPoolTaskExecutor bean 정보를 읽어들인다.





ThreadPoolTaskExecutor

Thread Pool을 적용해 일정 수의 사용자 동시에 처리가 가능하도록 한다.

Thread Pool이란 Thread를 미리 만들어놓은 집단? 이라고 보면된다.

Thread Pool을 사용하는 Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration // Bean 등록
@EnableAsync // Async 설정
public class AsyncConfiguration {
    @Bean(name="executor")
    public Executor asyncThreadPool() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        taskExecutor.setCorePoolSize(3); // 생성할 개수(thread pool에 항상 존재하는 최소 개수)
        taskExecutor.setMaxPoolSize(10); // 동시 동작하는 최대 Thread의 수
        taskExecutor.setQueueCapacity(15); // 큐의 사이즈
        taskExecutor.setThreadNamePrefix("Async-Executor-");
        taskExecutor.setDaemon(true);
        taskExecutor.initialize();

        return taskExecutor;
    }
}
  • CorePoolSize : 최초 동작 시에 corePoolSize만큼 Thread가 생성하여 사용된다.(Default 1)

  • MaxPoolSize : Queue 사이즈 이상의 요청이 들어오게 될 경우, Thread의 개수를 MaxPoolSize만큼 늘린다.
    (Default : Integer.MAX_VAULE)

  • QueueCapacity : CorePoolSize 이상의 요청이 들어올 경우, LinkedBlockingQueue에서 대기하게 되는데
    그 Queue의 사이즈를 지정해주는 것이다.(Default : Integer.MAX_VAULE)

  • SetThreadNamePrefix : Thread명 설정



최초 3개의 Thread에서 처리하다가 처리속도가 밀릴경우 15개 사이즈 queue에서 대기하고

그보다 많은 요청이 발생할 경우 최대 10개 Thread까지 생성해서 처리하게 된다.


현재 점유하고 있는 Thread의 개수가 corePoolSize만큼 있을 때 요청이 오면

queueCapacity의 개수만큼 있을 때 요청이 오면 maxPoolSize만큼 Thread Pool을 생성한다.

core 사이즈만큼의 Thread에서 task를 처리할 수 없을 경우 queue에서 대기하게 된다.

queue가 꽉 차게 되면 그때 max 사이즈만큼 Thread를 생성해서 처리하게 된다.

만약 현재 점유하고 있는 Thread의 개수가 maxPoolSize만큼 있고

큐에 담긴 요청이 queueCapactiry의 개수만큼 있을 때 요청이 오면 RejectedExecutionException


ex) Request 500, ThreadPool 300, Queue 100일 때,

  • Request 500개 요청
  • ThreadPool 300개 먼저 사용
  • Queue 100개 사용

나머지 100개의 요청에 대해서 어떻게 처리할 지 RejectedExcutionHandler에서 지정된 정책으로 처리

처리되지 못한 100개의 요청에 대해서 org.springframework.core.task.TaskRejectedException 발생되며 요청을 무시



클래스에 설정한 @Async annotation에 bean의 이름을 제공하면

SimpleAsyncTaskExecutor가 아닌 설정한 TaskExecutor로 thread를 관리하게 된다.

1
2
3
@Async("executor") // 비동기
public void publish(String sender, String roomId) {
}





RejectedExecutionHandler

MaxPoolSize thread까지 생성하고 queue까지 꽉 찬 상태에서 추가 요청이 오면

RejectedExecutionException 예외가 발생한다.


1
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

예외와 누락 없이 최대한 처리하려면 CallerRunsPolicy로 설정하는 것이 좋다고 한다.





Shutdown

별도로 정의한 Thread Pool에서 열심히 작업이 이루어지고 있을 때 application 종료를 요청하면

아직 처리되지 못한 task는 유실되게 된다.

유실 없이 마지막까지 다 처리하고 종료되길 원한다면 설정을 추가해야한다.

1
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);





Timeout

만약 모든 작업이 처리되길 기다리기 힘든 경우라면 최대 종료 대기 시간을 설정할 수 있다.

1
taskExecutor.setAwaitTerminationSeconds(60);   // shutdown 최대 60초 대기




비동기 처리를 위해서 ThreadPoolTaskExecutor 를 사용했는데 주 설정인

setCorePoolSize, setMaxPoolSize, setQueueCapacity 이 3가지만 알았고

나머지는 알지를 못했는데 비동기에 대해 공부를 하면서 이 부분을 체크하지 못하고 넘어갔다가

내 예상과 다른 결과가 나올 때 해당 원인을 모르고 넘어갈 수 있다는 생각이 들었다.





reference
ThreadPoolTaskExecutor 이용하여 성능 개선하기
ThreadPoolTaskExecutor Queue가 full의 처리 정책
Spring ThreadPoolTaskExecutor 설정
Spring @Async 비동기처리
[Spring] @Async 비동기 멀티스레드 사용법
스프링 비동기 (Asynchronous)
[JAVA] 비동기 처리 방법 - Thread
Asynchronous Request?
동기, 비동기 / 직렬, 동시
[Java] Callable, Future 및 Executors, Executor, ExecutorService, ScheduledExecutorService에 대한 이해 및 사용법
Java - ExecutorService를 사용하는 방법
ExecutorService 사용법
스레드풀(ThreadPool)에서 execute()와 submit()의 차이
[운영체제] 면접대비 Overview
Difference between ExecutorService execute() and submit() method in Java
How does @Async work? @Async를 지금까지 잘 못 쓰고 있었습니다(@Async 사용할 때 주의해야 할 것, 사용법)

This post is licensed under CC BY 4.0 by the author.