ecsimsw
리액티브 프로그래밍으로 스레드 사용 효율을 높여볼까 본문
MongoDB
우리 팀에선 MongoDB를 함께 사용하고 있다. 기기 별로 속성 값이 다르기에 Map으로 동적인 {Key : Value} 값을 갖고 있고, DB에는 Json으로 저장한다. 물론 Mysql에서도 Json 타입이 있고 인덱싱을 지원한다고 공부했지만, 아래 이유로 기기 상태 내역 데이터의 경우, 당장은 기존대로 Mongo에 머물기로 했다.
1. 성능 최적화를 위한 가상 컬럼 생성, Json_extract 등이 복잡하다고 느껴졌고, 제대로 모르고 사용하면 성능 저하가 될 수 있는 부분들이 보였다.
2. 애플리케이션 내 쿼리를 위한 코드가 복잡했으며, 이미 몽고로 짜여 있는 부분이 많아, 통일을 위해선 변경 사항이 많았다.
무엇보다 상태 이력은 데이터가 매우 많아서 비즈니스에 사용되는 구체적인 기간 동안만 저장하는데, 몽고의 TTL 인덱스를 사용하면 기간이 지난 데이터를 직접 배치/스케줄링으로 제거하지 않아도 된다는 점이 컸다.
Reactive MongoDB
최근 기기 이벤트가 늘어, 처리량 개선에 많은 고민을 하고 있다.
그렇게 도입한 것들 중 하나가 MongoDB Reactive 였고, 큰 효과를 봤다.
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'
특히 스레드 풀 사용 효율이 늘었는데, 전에 스레드 풀에서 스레드를 할당받지 못한 작업이 대기열에 계속 쌓여 OOM으로 이어진 이슈를 경험했기에 더 크게 와닿았던 것 같다.
이후부턴 Reactive를 도입해서 효율을 얻었던 부분과 유의했던 포인트를 소개하려고 한다.
스레드 사용 효율
한 흐름 안에서 쿼리 여러 개가 조합되어 응답되어야 하는 경우가 많다. 그리고 각 쿼리는 서로 영향이 없어, 순차적으로 처리하는 것이 아닌, 병렬로 처리하여 응답 속도를 높이는 꼴이 많았다. 이때 기존 멀티 스레드를 사용한 병렬 처리는 각 작업별 스레드가 필요하다.
예를 들어 3개의 쿼리를 병렬 처리하면, 각 쿼리마다 별도의 스레드가 필요하고, DB 응답을 기다리는 동안 스레드가 계속 점유되어야 한다. 한 요청에 여러 추가 스레드가 필요하니, 스레드 풀에 스레드가 부족해지고 응답 속도가 느려지거나, OOM으로 이어질 수 있다.
메인 스레드 : 호출된 함수 수행
- 스레드 1 : 쿼리 A 수행, DB 결과 대기
- 스레드 2 : 쿼리 B 수행, DB 결과 대기
- 스레드 3 : 쿼리 C 수행, DB 결과 대기
반면 리액티브를 사용하는 경우, DB 결과 대기를 위한 스레드 점유가 불필요해진다. DB에 세 개 쿼리를 요청하고, DB 처리를 마치면 I/O 이벤트를 읽어 그제야 후처리를 위한 스레드가 사용된다
메인 스레드 : DB에 쿼리 A, B, C 전달, DB 결과를 대기하지 않음
- 워커 스레드 : I/O 이벤트가 발생되었을 때, 후처리
즉 DB 결과 대기가 각 스레드에서 불필요하다. 또 앞선 멀티 스레딩 방식은 세 개의 추가 스레드가 동시에 DB 결과 대기에 사용되나, 아래 Reactive에선 잠깐의 후처리에 사용되는 타이밍이 서로 다르기에, 동시에 필요한 최대 스레드 수가 적다.
블록킹 피하기
스레드 사용 효율을 높이기 위해 가급적 블록킹 꼴은 피해야 한다.
아래 두 api은 코드 한 줄 차이지만, 처리 흐름은 크게 다르다.
@GetMapping("/blocking")
public List<DeviceHistory> blocking() {
log.info("1");
var result = deviceHistoryRepository.findAll()
.doOnComplete(() -> log.info("2"))
.collectList()
.block();
log.info("3");
return result;
}
@GetMapping("/non-blocking")
public Mono<List<DeviceHistory>> nonBlocking() {
log.info("1");
var result = deviceHistoryRepository.findAll()
.doOnComplete(() -> log.info("2"))
.collectList();
log.info("3");
return result;
}
블록킹 핸들러의 경우 로그가 1 -> 2 -> 3 으로 출력되고, 논-블록킹 핸들러의 경우, 1 -> 3 -> 2로 출력된다. 이 순서 차이가 매우 중요하다.
블록킹 핸들러의 경우 스레드 점유 -> Log 1 출력 -> Reactive 쿼리 -> Log 2 출력 -> Log 3 출력 -> 스레드 반환의 흐름을 갖는다.
논-블록킹 핸들러의 경우, 스레드 점유 -> Log 1 출력 -> Reactive 쿼리 -> Log 3 출력 -> 스레드 반환 -> Log2 출력의 흐름을 갖게 된다.
즉 논 블록킹으로 요청을 처리할 때, 톰캣 스레드의 점유 시간을 크게 줄어든다. 동시간 요청이 몰렸을 때, 스레드가 부족하여 처리가 늦어지는 경우도 크게 줄일 수 있다.
참고로 Spring boot의 내장 톰캣 기본 스레드 풀 크기는 200이고, 기본 큐 크기(대기열) 사이즈는 100이다.
워커 스레드 수 주의
그렇다고 리액티브가 '스레드를 전혀 사용하지 않는다'는 것은 아니다. DB 쿼리 요청 후 완료까지 스레드 대기가 불필요하다는 것이지, 완료 후 I/O 이벤트가 발생했을 때부터의 후처리, 예외처리는 마찬가지로 스레드가 필요하다. Netty 기반의 리액티브 서버에서는 내부적으로 Netty의 EventLoopGroup 스레드 풀이 사용된다.
그렇기에 이 스레드 풀을 사용하는 후처리가 길어지면, 스레드 풀이 마르는 현상을 똑같이 발생할 수 있다. 특히 Netty의 기본 스레드 풀 사이즈는 코어 수 * 2로, 톰캣의 기본 수나 보통 애플리케이션에서 설정하는 스레드 풀 수보다 작기 때문에, 이를 고려하지 않은 리액티브 프로그래밍은 오히려 병목을 만들 수 있다. (Netty 기본 스레드 풀, 스레드 수 결정 방법)
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
}
비동기 순서 보장 주의
리액티브로 요청한 작업은 비동기로 처리되고, 그에 따른 순서 처리에 주의해야 한다. 이를 테면 내 경우에는 기기 상태 이벤트를 Kafka를 통해 전달받고 이를 Reactive Mongo로 저장하는 로직이 있다.
논 블록킹으로 처리되기 때문에, Kakfa에서 메시지를 수신하고 Reactive Mongo로 데이터 저장을 요청하는 흐름이 매우 짧아져, 메시지 처리량이 크게 늘어난다. 카프카 수신, 처리, ACK는 빠르지만, DB 저장 순서는 보장되지 않는다. 네트워크 상태, 워커 스레드 풀 상태에 따라 먼저 수신한 메시지가 그 후에 처리한 메시지보다 늦게 처리될 수도 있게 된다.
- 기존 : 1. 기기 상태 수신, 2. DB 저장 요청, 3. DB 결과 대기, 4. Kafka Ack
- Reactive : 1. 기기 상태 수신, 2. DB 저장 요청, 3. Kafka Ack
내 경우에는 메시지 안에 타임스탬프 정보가 들어있어, 데이터가 DB에 쌓이는 순서는 중요하지 않았다. 내역의 경우 이벤트 발생 순서에 상관없이 DB에 적재하고, 조회하는 시점에서 특정 기간 동안의 데이터를 조회, 시간 순으로 정렬하면 되는 꼴이었기 때문이다. 오히려 Kafka에서 빠르게 메시지를 빼거나, Api 요청 처리에서 톰캣 스레드 점유 시간을 줄여 처리량을 높이는 것이 중요했다.
어떻게 확인했는가?
1. 톰캣 스레드 풀 : Api 요청 처리 쪽에선 톰캣, 애플리케이션 스레드 풀 추이를 확인했다. 부하를 만들어 그라파나 대시보드를 확인했고, 특히 Future로 동시에 여러 쿼리를 요청하고 조합하는 Api에서 스레드 사용량이 눈에 띄게 줄은 것을 볼 수 있었다.
2. 초당 처리량 : Kafka에서 수신하여 MongoDB에 적재하는 로직에서는 분명한 차이가 있었다. 지정한 기간동안의 처리량을 확인할 수 있는 유틸을 만들어 사용하고 있다.
public class ThroughputCounter {
private final AtomicInteger counter = new AtomicInteger(0);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public void start(long period, TimeUnit unit) {
scheduler.scheduleAtFixedRate(() -> {
int count = counter.getAndSet(0);
log.info("Event count : {}, per {} {}", count, period, unit.toChronoUnit().name());
}, 1, period, unit);
}
public void up() {
counter.incrementAndGet();
}
}
Reactive 적용 전후의 1초당 처리량을 비교했다. 이벤트가 몰리는 피크 타임을 가정하기 위해 Kafka의 파티션은 단일로 구성했다.
우선 Reactive를 적용하기 전의 처리량이다. 위는 Kafka로 Publish 하는 초당 이벤트 개수이고, 아래는 Subscribe하여 메시지를 처리한 개수이다. publish 속도를 따라가지 못하고, 대략 80%의 처리량을 갖는 것을 볼 수 있다.
다음은 Reactive를 적용한 후의 처리량이다. 마찬가지로 위는 Publish, 아래는 Subscribe 하여 초당 메시지를 처리한 개수를 표현한다. 당연하게도 같은 시간대 Publish와 Subscribe가 동일한 값을 갖진 못하지만, 처리량 자체는 100%로 모든 메시지를 커버하는 것을 볼 수 있다.
해당 시점의 Kafka Lag도 특이사항 없다. Production, Consumption 속도도 일치한다. Reactive로 전환 전에는 여러 파티션으로 처리하던 메시지량을, 전환 후에는 단일 파티션만으로 커버할 수 있었다.
정리
초당 2,500개의 이벤트라는 팀의 요구사항에 맞춰, 초당 처리량이 1,700건에서 2,500건으로 약 47% 향상된 것을 확인할 수 있었다. 요구 사항이 초당 2,500건보다 더 높아질수록 개선 효과는 더욱 커질 것으로 예상된다.
또, 병렬 처리에 사용되는 스레드, 톰캣의 요청 스레드 사용 효율까지 향상되기에, Reactive 도입은 팀에서 예상했던 것보다 훨씬 큰 개선이 될 것 같다.
최근 급격히 이벤트 증가하면서, 처리량과 안정성 개선에 많은 고민과 토론이 오가는 와중이라, 더 재밌었던 공부와 실습이었다.
'Architecture > Application' 카테고리의 다른 글
이벤트 발행과 DB 트랜잭션 원자성 유지, Transaction outbox pattern (0) | 2024.06.01 |
---|---|
레디스로 분산 환경에서 스케줄러 단일 실행 보장 (0) | 2024.02.23 |
두 번의 갱신 분실 문제와 락 (12) | 2024.01.23 |
RepeatableRead 에서 발생할 수 있는 동시성 문제와 락 (8) | 2024.01.10 |
Future 를 활용한 비동기 이미지 비동기 업로드 흐름과 시연 (0) | 2023.11.28 |