Hi Guys,

In today’s hyper-connected world, applications face an ever-increasing threat of Distributed Denial of Service (DDoS) attacks. These attacks overwhelm a server by bombarding it with massive volumes of traffic, rendering it inaccessible to legitimate users. Even smaller-scale traffic spikes, whether malicious or accidental, can exhaust your system’s resources and degrade performance.

To mitigate these threats, rate limiting serves as an essential defensive strategy. By controlling the number of requests that can be processed within a specific time frame, rate limiting protects APIs and services from overuse, safeguards resources, and ensures fair access for all users.

But traditional rate-limiting approaches often fall short when handling asynchronous, high-concurrency systems. This is where reactive programming shines.

Why Reactive?

Traditional rate limiters often rely on blocking mechanisms, which can consume resources inefficiently under heavy load. A reactive rate limiter offers:

  1. Asynchronous Control: Non-blocking operations ensure threads are not idly waiting.
  2. Scalability: Handles a large number of concurrent requests efficiently.
  3. Better System Utilization: Works seamlessly in reactive ecosystems.

With Java 21’s enhancements and Reactor’s declarative style, building such a rate limiter becomes clean and intuitive.

Overview of the Solution

  • Limit the number of requests allowed in a defined time window.
  • Support asynchronous retries until a timeout period.
  • Use thread-safe and efficient data structures for managing state.

Implementation

import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;

public class ReactiveRateLimiter {
    private final int maxRequests;
    private final Duration timeWindow;
    private final ConcurrentLinkedQueue<Long> requestTimestamps;

    public RateLimiter(int maxRequests, Duration timeWindow) {
        this.maxRequests = maxRequests;
        this.timeWindow = timeWindow;
        this.requestTimestamps = new ConcurrentLinkedQueue<>();
    }

    public boolean tryAcquire() {
        long now = System.currentTimeMillis();
        cleanupOldRequests(now);
        
        if (requestTimestamps.size() < maxRequests) {
            requestTimestamps.add(now);
            return true;
        }
        return false;
    }

    public Mono<Boolean> acquire(Duration timeout) {
        Sinks.One<Boolean> sink = Sinks.one();
        long deadline = System.currentTimeMillis() + timeout.toMillis();

        Mono.defer(() -> {
            if (tryAcquire()) {
                sink.tryEmitValue(true);
                return Mono.empty();
            } else if (System.currentTimeMillis() > deadline) {
                sink.tryEmitValue(false);
                return Mono.empty();
            }
            return Mono.delay(Duration.ofMillis(10)).then(Mono.empty());
        })
        .repeatWhenEmpty(10, retrySignal -> retrySignal)
        .subscribe();

        return sink.asMono();
    }

    private void cleanupOldRequests(long currentTime) {
        long expirationTime = currentTime - timeWindow.toMillis();
        while (!requestTimestamps.isEmpty() && requestTimestamps.peek() < expirationTime) {
            requestTimestamps.poll();
        }
    }
}

How It Works

  1. Rate Limiting Logic
    • tryAcquire(): Checks if a request is allowed. Expired requests are removed, ensuring that only valid timestamps are considered.
  2. Asynchronous Wait
    • acquire(): Uses Reactor’s Mono to asynchronously wait for an available request slot or a timeout.
  3. Thread-Safe Queue
    • A ConcurrentLinkedQueue maintains timestamps, ensuring safe updates in a multi-threaded environment.
  4. Retry with Delay
    • Mono.defer() combined with repeatWhenEmpty() retries until a request can be processed or a timeout occurs.

Using the Rate Limiter

public class Main {
    public static void main(String[] args) {
        RateLimiter rateLimiter = new ReactiveRateLimiter(3, Duration.ofSeconds(1));

        // Synchronous example
        for (int i = 0; i < 5; i++) {
            System.out.println("Request allowed: " + rateLimiter.tryAcquire());
        }

        // Asynchronous example
        rateLimiter.acquire(Duration.ofSeconds(3))
            .doOnNext(allowed -> {
                if (allowed) {
                    System.out.println("Request processed after waiting.");
                } else {
                    System.out.println("Request timed out.");
                }
            })
            .block();
    }
}

Testing the Rate Limiter

import org.junit.jupiter.api.Test;
import java.time.Duration;

import static org.junit.jupiter.api.Assertions.*;

public class ReactiveRateLimiterTest {
    @Test
    void testTryAcquire() {
        ReactiveRateLimiter limiter = new ReactiveRateLimiter(2, Duration.ofSeconds(1));

        assertTrue(limiter.tryAcquire());
        assertTrue(limiter.tryAcquire());
        assertFalse(limiter.tryAcquire());
    }

    @Test
    void testAcquire() {
        ReactiveRateLimiter limiter = new ReactiveRateLimiter(1, Duration.ofMillis(500));
        boolean result = limiter.acquire(Duration.ofSeconds(1)).block();
        assertTrue(result);
    }
}

Advantages of This Approach

  • Efficiency: Non-blocking retries optimize resource usage.
  • Flexibility: Configurable limits and timeout periods.
  • Reactive Ecosystem: Seamlessly integrates with systems built on Reactor.

 

You can download the code from GitHub – https://github.com/niteshapte/reactive-rate-limiter-java

 

That’s it.

Hope you liked it.

Critics/feedbacks are welcome.

Have a great day ahead!

Loading