ecsimsw

Rate Limit 을 직접 구현해보자 본문

Rate Limit 을 직접 구현해보자

JinHwan Kim 2024. 2. 1. 00:37

미리 보기

핸들러의 요청 처리 속도를 제한한다. 아래 핸들러는 0.1초에 한 번으로 처리 속도가 제한되고, 5개까지 보관해 두었다가 속도에 맞춰 처리한다.
 

 
10 개의 요청을 동시에 전송했을 때 5개만 처리되고 나머지는 429 (Too Many Requests) 를 응답받는 것을 확인할 수 있다. 그리고 처리되는 5개의 요청은 속도 제한에 따라 0.1초에 하나씩 처리된다.
 

 

 

Leaky bucket algorithm

처리 속도를 일정하게 정하여 네트워크 트래픽 체증을 제어한다. bucket 크기를 미리 지정하여 순차적으로 처리하고, 버킷 크기를 넘어선 요청은 버려진다.
 

 

즉시 처리

rate는 0.1초, 사이즈는 3 인 버킷에, 5개의 패킷이 동시에 도착하고 0.2초 후에 3개의 패킷이 이어 도착했다고 가정해 보자. 처음 5개 패킷이 도착한 시점을 0.0 이라고 하면 0.0초에 3개가 들어오고 2개가 버려지고, 0.2초엔 2개가 더 들어오고, 1개가 버려진다. 정상적으로 0.1초 간격으로 요청을 처리하도록 제어했지만 3번 요청과 7번 요청은 모두 3 사이클이나 지난 후에나 처리될 것이다.


요청을 들어온 즉시 처리하면 어떨까. 즉시 처리하되 대기열만 제대로 유지한다면 0.0초 ~ 0.5초까지는 크게 볼 때 0.1초에 한건이라는 속도 제한을 만족할 것이다. 동시에 3번 요청과 7번 요청 모두 요청이 전달된 시점에 지연 없이 처리된다

 

 


물론 그렇다고 즉시 처리가 언제나 옳은 것은 아니다. 단순히 네트워크 혼잡 제어 목적이 아닌, 서버가 처리하는 양 / 흐름을 조절하려고 한다면 즉시 처리는 위험할 수 있을 것 같다. 서버가 처리할 수 있는 양보다 동시에 요청되어 즉시 처리된 요청의 수가 더 크다면 문제가 될 테니 말이다.

 

 

즉시 처리를 설정했을 때

 

구현하기

스케줄러와 큐를 사용하여 간단한 Leaky bucket 을 구현한다.

 

- 핸들러의 @RateLimit 어노테이션을 읽어 RateLimitFilter 를 등록한다.

- RateLimitFilter 는 사용자의 요청을 대기열에 저장하고 대기한다.

- 스케줄러가 요청 제한 시간 간격으로 대기열에서 요청을 방출한다.

- 사용자 스레드는 대기열에서 본인 요청이 방출되었는지를 확인한다.

- 대기열에서 방출되었음을 확인하면 Handler 로 넘어가 요청이 처리된다.

 

 


 

1. Leaky bucket 구현

 
- Capacity 만큼 담을 수 있는 큐를 구현하여 leacky bucket 의 대기열로 한다.
- 스레드에 안전한 큐이면서, Capacity를 지정할 수 있는 BlockingQueue 를 사용하였다.
- flowRate 간격으로 스케줄링하면서 대기열에 하나씩 Release 한다.

- 각 요청 스레드는 flowRate 간격으로 대기열을 확인하며 본인이 방출되었는지, 아닌지 검사한다.

- 방출되었음으로 요청 처리가 가능함을 확인하고 대기모드를 마친다.
 

public class LeakyBucket<T> {

    private final BlockingQueue<T> waitings = new ArrayBlockingQueue<>(capacity);

    public void fixdedFlow(int flowRate, int capacity) {       
        var scheduleService = Executors.newScheduledThreadPool(1);
        scheduleService.scheduleAtFixedRate(() -> {
            if (!waitings.isEmpty()) {
                waitings.poll();
            }
        }, 0, flowRate, TimeUnit.MILLISECONDS);
    }

    public void put(T id) {
        try {
            waitings.add(id);
        } catch (IllegalStateException e) {
            throw new BucketFullException("bucket full");
        }
    }

    public void putAndWait(T id) throws TimeoutException {
        put(id);
        while (duration < flowRate * capacity) {
            if (!waitings.contains(id)) {
                return;
            }
            sleep(flowRate);
        }
        throw new TimeoutException("time out");
    }
}

 
 

2. RateLimitFilter 정의

 
- 대기열이 꽉 찬 상태라면 429 (Too many requests) 로 응답한다.
- noDelay 옵션이 적용되어 있다면 bucket 에 대기열을 올리고 즉시 처리한다.
- noDelay 옵션이 적용되지 않았다면 bucket 에 대기열을 올리고 fixedFlow 에 의해 요청이 처리 가능해질 때까지 대기한다.
 

@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
    var id = requestIds.getAndIncrement();
    try {
        if (noDelay) {
            bucket.put(id);
            filterChain.doFilter(request, response);
            return;
        }
        bucket.putAndWait(id);
        filterChain.doFilter(request, response);
    } catch (BucketFullException | TimeoutException e) {
        responseTooManyRequest(response);
    }
}

 
 

3. 핸들러별 Filter 등록

 
- ComponentScan 대상 패키지의 Controller 를 읽고, 그 안에 @RateLimit이 정의된 핸들러를 모두 찾는다.
- @RateLimit 조건에 따라 RateLimitFilter 를 생성하고 Bean 으로 등록한다.

 

@Component
public class FilterRegister implements BeanFactoryPostProcessor {

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        ...
        for (var handler : rateLimitHandlers) {
            registerRateLimitFilter(
                beanFactory,
                handler.getDeclaredAnnotation(RateLimit.class),
                handler.getDeclaredAnnotation(RequestMapping.class).path()
            );
        }
    }

    private static void registerRateLimitFilter(ConfigurableListableBeanFactory beanFactory, RateLimit rateLimitConfig, String[] urlPaths) {
        var registrationBean = new FilterRegistrationBean<>();
        registrationBean.setFilter(new RateLimitFilter(
            rateLimitConfig.rate(),
            rateLimitConfig.burst(),
            rateLimitConfig.noDelay()
        ));
        beanFactory.registerSingleton(filterName, registrationBean);
    }
}

 
 

4. @RateLimit 을 표시에 따라 요청 처리 속도 제한

 

@RestController
public class MainController {

    @RateLimit(rate = 100, burst = 5, noDelay = true)
    @RequestMapping("/foo")
    ResponseEntity<String> handleFoo() {
        return ResponseEntity.ok("hi");
    }
}

 
 

5. 전체 코드

 
전체 코드는 https://github.com/ecsimsw/java-rate-limit 에서 확인 할 수 있다.
 

Comments