Recently, I set up a local AI service called Ollama. After some exploration and testing, I discovered that Ollama doesn’t support parallel processing by default. There is an environment parameter,
, which can enable parallel processing, but it wasn’t sufficient for my testing needs. Additionally, the GPU has 6GB of VRAM. To address these limitations, I built a reactive Spring Boot application leveraging WebFlux for asynchronous communication and integrating Resilience4j and Circuit Breaker for enhanced performance.OLLAMA_NUM_PARALLEL
Visit GitHub for full implementation.
What is Ollama?
Ollama is a Large Language Model (LLM) service that can run as a server on your local machine. It provides developers with powerful natural language processing (NLP) capabilities for various tasks such as text generation, summarizations, translation, and more. Ollama supports multiple LLMs like Llama3, Mistral, Phi, LangChain, and many others. It follows OpenAI standards for endpoints and operates at port 11434 for model interaction.
For more information about Ollama, you can visit their website or check out their GitHub page.
The Challenge with Ollama
While Ollama offers impressive capabilities, it doesn’t support parallel processing out of the box. Although there’s an experimental feature that can be enabled using the OLLAMA_NUM_PARALLEL
environment variable, it didn’t meet my performance expectations during testing.
Ollama Async Wrapper using Spring Boot WebFlux
To overcome these limitations and make the most out of Ollama, I developed a Spring Boot WebFlux application that acts as an asynchronous wrapper around Ollama. The wrapper adds rate limiting, delay, and circuit breaker functionalities to ensure stable and reliable interactions with Ollama.
Features
- Asynchronous processing with WebFlux
- Rate limiting to control the number of requests
- Circuit breaker to prevent system overload
- Configurable connection and read timeouts
Requirements
- Java 21. But it will work for Java 8 and higher.
- Spring Boot 3.3.1
- Maven or Gradle
- Ollama service running locally on port 11434
Endpoints
Ollama follow OpenAI API standard. For details of all endpoints Ollama supports, please visit Ollama API doc.
Currently, below endpoints (wrapper endpoints rather) are supported.
- /ollama/generate
- /ollama/generate/stream
- /ollama/chat
- /ollama/chat/stream
Let’s start.
Step 1: Project Setup
1. Initialize a Spring Boot Project: Create a new Spring Boot project with the necessary dependencies for WebFlux and Resilience4j using Spring Initializr.
2. Add Dependencies: Ensure you have the following dependencies in the pom.xml
:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-ratelimiter</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-circuitbreaker</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-reactor</artifactId> <version>2.2.0</version> </dependency> </dependencies>
Step 2: Configuration
We will create 3 configuration classes – WebClient, Rate Limiter and Circuit Breaker.
WebClientCustomConfig
package ollama.async.wrapper.config; import java.time.Duration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.client.WebClient; import io.netty.channel.ChannelOption; import io.netty.handler.logging.LogLevel; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; import reactor.netty.http.client.HttpClient; import reactor.netty.transport.logging.AdvancedByteBufFormat; @Configuration public class WebClientCustomConfig { @Bean WebClient webClient() { HttpClient httpClient = HttpClient.create() .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000) .responseTimeout(Duration.ofSeconds(60)) .doOnConnected(conn -> conn .addHandlerLast(new ReadTimeoutHandler(60)) .addHandlerLast(new WriteTimeoutHandler(60))) .wiretap("reactor.netty.http.client.HttpClient", LogLevel.INFO, AdvancedByteBufFormat.TEXTUAL); return WebClient.builder() .clientConnector(new ReactorClientHttpConnector(httpClient)) .baseUrl("http://localhost:11434") .build(); } }
RateLimiterCustomConfig
package ollama.async.wrapper.config; import java.time.Duration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import io.github.resilience4j.ratelimiter.RateLimiter; import io.github.resilience4j.ratelimiter.RateLimiterConfig; @Configuration public class RateLimiterCustomConfig { @Bean RateLimiter rateLimiter() { RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom() .limitRefreshPeriod(Duration.ofSeconds(1)) .limitForPeriod(5) .timeoutDuration(Duration.ofMillis(500)) .build(); return RateLimiter.of("ollamaRateLimiter", rateLimiterConfig); } }
CircuitBreakerCustomConfig
package ollama.async.wrapper.config; import java.time.Duration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; @Configuration public class CircuitBreakerCustomConfig { @Bean CircuitBreaker circuitBreaker() { CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() .failureRateThreshold(50) .waitDurationInOpenState(Duration.ofMillis(1000)) .permittedNumberOfCallsInHalfOpenState(2) .slidingWindowSize(5) .build(); return CircuitBreaker.of("ollamaCircuitBreaker", circuitBreakerConfig); } }
Step 3: Service Implementation
This is where all the configuration classes will come together to interacts with Ollama.
1. Generate Service – This will interact with the /api/generate
endpoint of Ollama. The method ollamaGenerate()
is for stream=false
and ollamaGenerateStream()
is for stream=true
in the request JSON.
package ollama.async.wrapper.service; import java.time.Duration; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.client.WebClient; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.ratelimiter.RateLimiter; import io.github.resilience4j.ratelimiter.RequestNotPermitted; import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator; import lombok.RequiredArgsConstructor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; @RequiredArgsConstructor @Service public class OllamaAsyncWrapperGenerateServiceImpl implements OllamaAsyncWrapperGenerateService { private final WebClient webClient; private final RateLimiter rateLimiter; private final CircuitBreaker circuitBreaker; public Mono<String> ollamaGenerate(String payload) { return Mono.just(payload) .transformDeferred(RateLimiterOperator.of(rateLimiter)) .transformDeferred(CircuitBreakerOperator.of(circuitBreaker)) .onErrorResume(RequestNotPermitted.class, e -> Mono.error(new RuntimeException("Rate limit exceeded, please try again later"))) .onErrorResume(Throwable.class, e -> Mono.error(new RuntimeException("Service is currently unavailable, please try again later"))) .delayElement(Duration.ofMillis(500)) .flatMap(request -> webClient.post() .uri("/api/generate") .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .bodyValue(request) .retrieve() .bodyToMono(String.class) .timeout(Duration.ofSeconds(60)) .retry(3) .doOnError(Throwable::printStackTrace)) .subscribeOn(Schedulers.boundedElastic()); } public Flux<String> ollamaGenerateStream(String payload) { return Mono.just(payload) .transformDeferred(RateLimiterOperator.of(rateLimiter)) .transformDeferred(CircuitBreakerOperator.of(circuitBreaker)) .onErrorResume(RequestNotPermitted.class, e -> Mono.error(new RuntimeException("Rate limit exceeded, please try again later"))) .onErrorResume(Throwable.class, e -> Mono.error(new RuntimeException("Service is currently unavailable, please try again later"))) .delayElement(Duration.ofSeconds(1)) .flatMapMany(request -> webClient.post() .uri("/api/generate") .accept(MediaType.TEXT_PLAIN) .contentType(MediaType.APPLICATION_JSON) .bodyValue(request) .retrieve() .bodyToFlux(String.class) .timeout(Duration.ofSeconds(60)) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5))) .doOnError(Throwable::printStackTrace)) .subscribeOn(Schedulers.boundedElastic()); } }
2. Chat Service – This will interact with the /api/chat
endpoint of Ollama. The method ollamaChat()
is for stream=false
and ollamaChatStream()
is for stream=true
in the request JSON.
package ollama.async.wrapper.service; import java.time.Duration; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.client.WebClient; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.ratelimiter.RateLimiter; import io.github.resilience4j.ratelimiter.RequestNotPermitted; import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator; import lombok.RequiredArgsConstructor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; @RequiredArgsConstructor @Service public class OllamaAsyncWrapperChatServiceImpl implements OllamaAsyncWrapperChatService { private final WebClient webClient; private final RateLimiter rateLimiter; private final CircuitBreaker circuitBreaker; public Mono<String> ollamaChat(String payload) { return Mono.just(payload) .transformDeferred(RateLimiterOperator.of(rateLimiter)) .transformDeferred(CircuitBreakerOperator.of(circuitBreaker)) .onErrorResume(RequestNotPermitted.class, e -> Mono.error(new RuntimeException("Rate limit exceeded, please try again later"))) .onErrorResume(Throwable.class, e -> Mono.error(new RuntimeException("Service is currently unavailable, please try again later"))) .delayElement(Duration.ofSeconds(1)) .flatMap(request -> webClient.post() .uri("/api/chat") .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .bodyValue(request) .retrieve() .bodyToMono(String.class) .timeout(Duration.ofSeconds(60)) .retry(3) .doOnError(Throwable::printStackTrace)) .subscribeOn(Schedulers.boundedElastic()); } public Flux<String> ollamaChatStream(String payload) { return Mono.just(payload) .transformDeferred(RateLimiterOperator.of(rateLimiter)) .transformDeferred(CircuitBreakerOperator.of(circuitBreaker)) .onErrorResume(RequestNotPermitted.class, e -> Mono.error(new RuntimeException("Rate limit exceeded, please try again later"))) .onErrorResume(Throwable.class, e -> Mono.error(new RuntimeException("Service is currently unavailable, please try again later"))) .delayElement(Duration.ofSeconds(1)) .flatMapMany(request -> webClient.post() .uri("/api/chat") .accept(MediaType.TEXT_PLAIN) .contentType(MediaType.APPLICATION_JSON) .bodyValue(request) .retrieve() .bodyToFlux(String.class) .timeout(Duration.ofSeconds(60)) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5))) .doOnError(Throwable::printStackTrace)) .subscribeOn(Schedulers.boundedElastic()); } }
Step 4: The Controllers
Now, 2 controllers for using above 2 services.
1. Generate Controller – contains 2 endpoints /generate/
and /generate/stream/
for both stream=false
and stream=true
in the request JSON.
package ollama.async.wrapper.controller; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import lombok.RequiredArgsConstructor; import ollama.async.wrapper.service.OllamaAsyncWrapperGenerateService; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RequiredArgsConstructor @RestController @RequestMapping("ollama") public class OllamaAsyncWrapperGenerateController { private final OllamaAsyncWrapperGenerateService generateService; @PostMapping(value = "/generate/", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public Mono<String> generate(@RequestBody String payload) { return generateService.ollamaGenerate(payload); } @PostMapping(value = "/generate/stream", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> generateStream(@RequestBody String payload) { return generateService.ollamaGenerateStream(payload); } }
2. Chat Controller – contains 2 endpoints /chat/
and /chat/stream/
for both stream=false
and stream=true
in the request JSON.
package ollama.async.wrapper.controller; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import lombok.RequiredArgsConstructor; import ollama.async.wrapper.service.OllamaAsyncWrapperChatService; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RequiredArgsConstructor @RestController @RequestMapping("ollama") public class OllamaAsyncWrapperChatController { private final OllamaAsyncWrapperChatService chatService; @PostMapping(value = "/chat/", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public Mono<String> chat(@RequestBody String payload) { return chatService.ollamaChat(payload); } @PostMapping(value = "/chat/stream", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> chatStream(@RequestBody String payload) { return chatService.ollamaChatStream(payload); } }
Step 5: Set the port number and logging
In application.properties file, set a port number and logging format.
spring.application.name=ollama-async-wrapper server.http2.enabled=true server.port=5151 # Disable White Label Page server.error.whitelabel.enabled=false # Log setting logging.file.name=./logs/ollama-async-wrapper.log logging.pattern.file=%d %p %c{1.} [%t] %m%n logging.level.web=ERROR logging.level.org.springframework=ERROR logging.logback.rollingpolicy.max-history=2 logging.logback.rollingpolicy.clean-history-on-start=true logging.logback.rollingpolicy.total-size-cap.negative=false
Step 6: Send a Test request
curl -X POST "http://localhost:5151/ollama/generate/" -H "Content-Type: application/json" -d '{ "model": "llama3", "prompt": "Hi!", "stream": false }'
Conclusion
By following this approach, we can build a robust and resilient reactive wrapper application that communicates asynchronously with a local LLM service like Ollama. Leveraging WebFlux for non-blocking operations and Resilience4j for fault tolerance ensures that the application can handle high loads and potential failures gracefully. This makes Ollama in a way more responsive and scalable, providing a better user experience.
Feel free to explore the GitHub repository here for the complete implementation and additional details. Fork it and improve it as per your need.
Or you can checkout and run the application by following below steps –
Clone the Repository
git clone https://github.com/niteshapte/ollama-async-wrapper-spring-boot-webflux.git cd ollama-async-wrapper-spring-boot-webflux
Build the Project
./mvnw clean install
Run the Application
./mvnw spring-boot:run
Send a Test Request
curl -X POST "http://localhost:5151/ollama/generate" -H "Content-Type: application/json" -d '{ "model": "llama3", "prompt": "Hi!" }'
That’s it.
Hope you liked it.
Critics/feedbacks are welcome.
Have a great day ahead!
Leave a Reply