Mono.delay의 동작 방식 본문
배경
우리 회사는 IoT 기기를 다루고 있다. 보다 더 넓은 서비스를 제공하기 위해, 회사의 제품을 더 큰 국내 IoT 플랫폼에 연동이 가능하게 만드는 게 내 일이다. 외부 플랫폼에서 우리 회사 기기를 제어, 조회할 수 있는 Api를 개발하고, 반대로 외부 플랫폼으로 기기 이벤트를 전달해야 한다.
이번에 신규 기기를 연동시키면서 재밌는 요구 사항이 있었다. 특정 이벤트가 들어오면 외부 플랫폼 측으로 A를 전달하고, 1분 후에 B를 전달해야 했다. 이벤트 파이프라인은 바쁘기에 1분을 블록킹 대기할 수 없다. 옆자리 형이 Mono.delay를 사용하여 논 블록킹으로 1분 후 다음 요청을 전달하는 로직을 구현하였고, 이를 리뷰하다가 Mono.delay의 동작 방식을 파보게 되었다. 이를 정리해보려고 한다.
I/O Multiplexing을 사용한 논 블록킹
대표적인 리액티브 라이브러리로 WebClient, Reactive MongoDB가 생각난다. WebClient는 Netty, Reactive MongoDB는 MongoDB 드라이버 위에서 동작하고, 이들은 커널의 I/O 멀티플렉싱 커멘드를 사용하여 논 블록킹을 구현한다. Api 요청, DB 쿼리 등 응답에 대기가 필요한 작업을 전달하고, 처리 완료 이벤트를 미리 만들어둔 이벤트 루프 스레드를 사용하여 처리한다.
플랫폼 비종속인 Java이기에, 코드 단에서는 어떤 멀티플랙싱 커멘드를 사용할 수 있는지 모른다는 점도 재밌다. Netty의 경우 코드 단에서 운영체제 정보와 사용할 수 있는 멀티플렉싱 커멘드를 찾는 코드가 포함되어 있고, 그 분기 로직은 바이트 코드로 컴파일되고, JVM 어셈블러/JIT에 의해 바이너리 코드로 변환될 때까지 결정되지 못하고 남아있다. 코드가 실행되면서, OS 정보와 사용할 수 있는 멀티플렉싱 커멘드를 확인하고, 이를 포함한 네이티브 함수가 JNI에 의해 수행되며 실제 시스템 콜이 발생하게 된다.
만약 사용할 수 있는 최적 커멘드(ex, Linux의 epoll, Mac의 kqueue)를 찾지 못하는 경우, Fallback으로 Java NIO 라이브러리를 사용한다. NIO는 poll/select처럼 유닉스 계열에선 범용적인, 그렇지만 epoll, kqueue 보다는 성능이 부족한, 멀티플렉싱 커멘드를 사용하는 네이티브 함수를 호출한다.
스케줄이 가능한 스레드 풀
이들과는 달리, Mono.delay는 커널 단의 멀티플렉싱 커멘드를 사용하지 않는다. 그보다 단순히 딜레이 시간을 지정할 수 있는 스레드 풀을 워커 스레드 풀로 사용하는 구조이다. 스레드 풀의 대기열에 작업과 딜레이 시간을 기록하고, 시간이 되면 해당 작업이 수행된다. 앞선 I/O Multiplexing을 활용한 논 블록킹과 동일하게, 워커 스레드 풀이 고갈되지 않도록 처리 로직을 짧게 해야 한다.
딜레이 시간을 지정할 수 있는 스레드 풀은 어떻게 만료 시간을 체크하는지, 그 방법이 궁금했다. 작업을 메모리에 올려두고 별도의 스레드 하나가 계속 시간이 다 되었는지를 폴링 하려나? 코드를 파고 시작했다. 아래는 주요 흐름을 캡처한 내용이다. 필요한 부분만 남겼기에, 실제와 다른 부분도 있다.
Mono.delay는 스케줄러를 따로 지정하지 않으면, Schedulers.parallel()를 기본으로 사용한다. Schedulers.parallel()의 생성 과정을 쭉 타고 들어가다 보면 결국 아래의 ParallerScheduler 임을 찾을 수 있다. 여기서 ScheduledExecutorService의 구현체로 ScheduledThreadPoolExecutor를 사용한다는 점이 포인트다. 이를 state라는 변수로 여러 개 들고 있다.

ParallerScheduler에 작업과 딜레이 시간을 등록하면, pick 메서드에서는 라운드 로빈으로 ScheduledThreadPoolExecutor 중 하나를 선택한다. Schedulers.directSchedule를 까보면, 선택한 ScheduledThreadPoolExecutor에 작업이 배정되는 것을 확인할 수 있다.

ScheduledThreadPoolExecutor의 생성자를 확인하면 대기열로 DelayedWorkQueue를 사용하는 것을 볼 수 있다. DelayedWorkQueue는 아이템을 적재할 때 만료 시간을 함께 등록하고, 아이템을 요청하면 대기 후 만료 시간이 된 아이템을 반환한다. 즉, ScheduledThreadPoolExecutor는 작업을 배정하면 큐에 딜레이 시간과 작업을 적재하고, 큐에서 딜레이 시간이 만료된 작업이 반환되면, 이를 스레드에서 수행하는 것으로, 딜레이 후 작업 수행이 가능해진다.

지금까지의 흐름을 정리하면 다음과 같다.
1. ParallelScheduler는 ScheduledThreadPoolExecutor 배열을 들고 있다. (Size = Cpu 코어 수)
2. 각 ScheduledThreadPoolExecutor는 단일 스레드, DelayedWorkQueue를 대기열로 한다.
3. Mono.delay()를 실행하면, ParallelScheduler에 딜레이 시간과 처리할 작업을 넘긴다.
4. ParallelScheduler는 들고 있는 ScheduledThreadPoolExecutor 배열에서 하나를 라운드로빈으로 선택한다.
5. 선택된 ScheduledThreadPoolExecutor는 대기열에 딜레이 시간과 작업을 등록한다.
6. 대기열은 DelayedWorkQueue으로 하여, 큐에서 아이템을 꺼내는 요청 시 딜레이 시간이 다 된 작업이 반환된다.
7. ScheduledThreadPoolExecutor가 반환된 작업을 스레드에 태우는 것으로, 딜레이 시간을 대기한 작업이 수행되는 것이다.
어떻게 딜레이 시간을 계산하는 걸까
결국 딜레이 시간을 확인하고, 수행해야 하는 작업을 반환하는 것은 DelayedWorkQueue였다. 이제 찾아야 하는 것은 "DelayedWorkQueue는 딜레이 시간이 다 끝난 건지 어떻게 계산할까"이다.
Java concurrent 라이브러리의 DelayQueue는 아이템과 함께 딜레이 시간을 적재하고, 꺼낼 때는 딜레이 시간이 지난 아이템이 꺼내진다. DelayedWorkQueue도 이와 동작과 원리가 동일한데, DelayQueue 코드가 더 명확하여, 그 코드로 설명한다.

큐에서 '꺼내기'를 수행하면 우선순위 큐에서 아이템을 꺼낸다. 보통 남은 딜레이 시간이 적은 아이템을 높은 우선순위로 하기 때문에, 즉 가장 짧게 남은 아이템이 반환될 것이다. 그 아이템의 딜레이 시간이 지났다면 큐에서 꺼내 반환하게 된다.
딜레이 시간이 남은 경우가 중요하다. Condition.awaitNanos로 스레드를 남은 시간 동안 대기 상태로 한다. awaitNanos로 지정한 시간이 끝나면 대기 상태에서 벗어나 반복문을 돌 것이고, 가장 짧게 남은 아이템이 이번에는 딜레이 시간을 지난 경우가 되어, 반환되는 것이다. 이때 leader 변수는 현재 대기 상태 중인 스레드를 표시한다.

이렇게 스레드가 대기 상태가 되는 동안, 다른 아이템이 들어오면 어떨까. 새로 추가된 아이템의 딜레이 시간이, 현재 스레드가 대기 상태로 기다리는 시간보다 더 짧다면 큰일이다. 이를 피하기 위해 추가된 아이템이 현재 큐에서 가장 적은 딜레이 시간의 아이템이라면, Condition.signal 으로 대기 상태였던 스레드를 다시 깨운다. 위 Take에서 대기 상태에 빠졌던 스레드가 다시 깨어나 반복문을 돌 것이고, 이번엔 방금 추가된 아이템이 가장 짧은 딜레이로 확인되어, 스레드는 그 시간만큼 대기 상태가 될 것이다.

Condition.awaitNanos()를 까보면, 결국 스레드를 대기시키는 일은 LockSupport.parkNanos 메서드가 하는데, 그 안에서도 U.park는 네이티브로 구현되어, OS에게 스레드를 지정 시간 동안 대기하는 커널 단의 커멘드를 수행하게 된다. 운영체제에 따라 다르지만, Unix의 futex 나 pthread_cond_timedwait 가 대표적이다. 결국 커널 단의 스레드 블록을 사용하는 것으로 Busy wait를 피할 수 있게 된다.

정리
이슈 : 'N초 딜레이 후 A 실행'에 Mono.delay 사용. 그 동작 원리 파보기
원리 :
- 스케줄이 가능한 스레드 풀, ScheduledThreadPoolExecutor에 작업 등록
- ScheduledThreadPoolExecutor는 작업 대기열로 DelayedWorkQueue를 사용하여, 대기 시간이 지난 작업을 실행
- DelayedWorkQueue는 가장 남은 딜레이가 적은 작업의 딜레이 시간 동안 스레드를 대기 처리
- 큐에 추가되는 작업이 현재 대기 중인 작업보다 남은 딜레이가 적은 경우, 대기 중인 스레드를 깨우고 새로운 대기 처리
- 대기가 끝난 스레드는 딜레이가 만료된 작업 반환
결과 :
- 스레드 자체를 딜레이 시간 동안대기 처리하는 것으로 Busy wait / Spin lock 방법을 피함
'Language > Java, Kotlin' 카테고리의 다른 글
| 두 가지 GC와 처리 영역들 (2) | 2023.12.04 |
|---|---|
| JitPack 으로 자바 라이브러리 배포하기 (2) | 2022.01.24 |
| Local Maven Repository 에 라이브러리 배포하기 (0) | 2022.01.22 |
| Optional 로 Null 을 알리는 습관 (0) | 2021.03.15 |
| HashSet의 원리 (2) | 2021.03.12 |
