Hi Everyone,

Recently, I set up a local AI service called Ollama. After some exploration and testing, I discovered that Ollama doesn’t support parallel processing by default. There is an environment parameter, OLLAMA_NUM_PARALLEL, which can enable parallel processing, but it wasn’t sufficient for my testing needs. Additionally, the GPU has 6GB of VRAM. To address these limitations, I built a reactive Spring Boot application leveraging WebFlux for asynchronous communication and integrating Resilience4j and Circuit Breaker for enhanced performance.

Visit GitHub for full implementation.

What is Ollama?

Ollama is a Large Language Model (LLM) service that can run as a server on your local machine. It provides developers with powerful natural language processing (NLP) capabilities for various tasks such as text generation, summarizations, translation, and more. Ollama supports multiple LLMs like Llama3, Mistral, Phi, LangChain, and many others. It follows OpenAI standards for endpoints and operates at port 11434 for model interaction.

For more information about Ollama, you can visit their website or check out their GitHub page.

The Challenge with Ollama

While Ollama offers impressive capabilities, it doesn’t support parallel processing out of the box. Although there’s an experimental feature that can be enabled using the OLLAMA_NUM_PARALLEL environment variable, it didn’t meet my performance expectations during testing.

Ollama Async Wrapper using Spring Boot WebFlux

To overcome these limitations and make the most out of Ollama, I developed a Spring Boot WebFlux application that acts as an asynchronous wrapper around Ollama. The wrapper adds rate limiting, delay, and circuit breaker functionalities to ensure stable and reliable interactions with Ollama.

Features

  • Asynchronous processing with WebFlux
  • Rate limiting to control the number of requests
  • Circuit breaker to prevent system overload
  • Configurable connection and read timeouts

Requirements

  • Java 21. But it will work for Java 8 and higher.
  • Spring Boot 3.3.1
  • Maven or Gradle
  • Ollama service running locally on port 11434

Endpoints

Ollama follow OpenAI API standard. For details of all endpoints Ollama supports, please visit Ollama API doc.

Currently, below endpoints (wrapper endpoints rather) are supported.

  • /ollama/generate
  • /ollama/generate/stream
  • /ollama/chat
  • /ollama/chat/stream

Let’s start.

Step 1: Project Setup

1. Initialize a Spring Boot Project: Create a new Spring Boot project with the necessary dependencies for WebFlux and Resilience4j using Spring Initializr.

2. Add Dependencies: Ensure you have the following dependencies in the pom.xml:

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.github.resilience4j</groupId>
      <artifactId>resilience4j-ratelimiter</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>io.github.resilience4j</groupId>
      <artifactId>resilience4j-circuitbreaker</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>io.github.resilience4j</groupId>
      <artifactId>resilience4j-reactor</artifactId>
      <version>2.2.0</version>
    </dependency>
  </dependencies>

Step 2: Configuration

We will create 3 configuration classes – WebClient, Rate Limiter and Circuit Breaker.

WebClientCustomConfig

package ollama.async.wrapper.config;

import java.time.Duration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.netty.http.client.HttpClient;
import reactor.netty.transport.logging.AdvancedByteBufFormat;

@Configuration
public class WebClientCustomConfig {

  @Bean
  WebClient webClient() {
    HttpClient httpClient = HttpClient.create()
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
                .responseTimeout(Duration.ofSeconds(60))
                .doOnConnected(conn -> conn
                        .addHandlerLast(new ReadTimeoutHandler(60)) 
                        .addHandlerLast(new WriteTimeoutHandler(60)))
                .wiretap("reactor.netty.http.client.HttpClient", LogLevel.INFO, AdvancedByteBufFormat.TEXTUAL);
    
        return WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .baseUrl("http://localhost:11434")
                .build();
  }
}

RateLimiterCustomConfig

package ollama.async.wrapper.config;

import java.time.Duration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;

@Configuration
public class RateLimiterCustomConfig {

  @Bean
  RateLimiter rateLimiter() {
    RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofSeconds(1))
                .limitForPeriod(5)
                .timeoutDuration(Duration.ofMillis(500))
                .build();
        return RateLimiter.of("ollamaRateLimiter", rateLimiterConfig);
  }
}

CircuitBreakerCustomConfig

package ollama.async.wrapper.config;

import java.time.Duration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;

@Configuration
public class CircuitBreakerCustomConfig {

  @Bean
  CircuitBreaker circuitBreaker() {
    CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
                .failureRateThreshold(50)
                .waitDurationInOpenState(Duration.ofMillis(1000))
                .permittedNumberOfCallsInHalfOpenState(2)
                .slidingWindowSize(5)
                .build();
        return CircuitBreaker.of("ollamaCircuitBreaker", circuitBreakerConfig);
  }
}

Step 3: Service Implementation

This is where all the configuration classes will come together to interacts with Ollama.

1. Generate Service – This will interact with the /api/generate endpoint of Ollama. The method ollamaGenerate()is for stream=falseand ollamaGenerateStream()is for stream=true in the request JSON.

package ollama.async.wrapper.service;

import java.time.Duration;

import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import lombok.RequiredArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

@RequiredArgsConstructor
@Service
public class OllamaAsyncWrapperGenerateServiceImpl implements OllamaAsyncWrapperGenerateService {

  private final WebClient webClient;
  private final RateLimiter rateLimiter;
        private final CircuitBreaker circuitBreaker;
    
  public Mono<String> ollamaGenerate(String payload) {
    return Mono.just(payload)
                .transformDeferred(RateLimiterOperator.of(rateLimiter))
                .transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
                .onErrorResume(RequestNotPermitted.class, e -> Mono.error(new RuntimeException("Rate limit exceeded, please try again later")))
                .onErrorResume(Throwable.class, e -> Mono.error(new RuntimeException("Service is currently unavailable, please try again later")))
                .delayElement(Duration.ofMillis(500)) 
                .flatMap(request -> webClient.post()
                        .uri("/api/generate")
                        .accept(MediaType.APPLICATION_JSON)
                        .contentType(MediaType.APPLICATION_JSON)
                        .bodyValue(request)
                        .retrieve()
                        .bodyToMono(String.class)
                        .timeout(Duration.ofSeconds(60))
                        .retry(3)
                        .doOnError(Throwable::printStackTrace))
    .subscribeOn(Schedulers.boundedElastic());
  }		
  
  public Flux<String> ollamaGenerateStream(String payload) {
    return Mono.just(payload)
                .transformDeferred(RateLimiterOperator.of(rateLimiter))
                .transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
                .onErrorResume(RequestNotPermitted.class, e -> Mono.error(new RuntimeException("Rate limit exceeded, please try again later")))
                .onErrorResume(Throwable.class, e -> Mono.error(new RuntimeException("Service is currently unavailable, please try again later")))
                .delayElement(Duration.ofSeconds(1)) 
                .flatMapMany(request -> webClient.post()
                        .uri("/api/generate")
                        .accept(MediaType.TEXT_PLAIN)
                        .contentType(MediaType.APPLICATION_JSON)
                        .bodyValue(request)
                        .retrieve()
                        .bodyToFlux(String.class)
                        .timeout(Duration.ofSeconds(60))
                        .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)))
                        .doOnError(Throwable::printStackTrace))
                .subscribeOn(Schedulers.boundedElastic());
  }
}

2. Chat Service – This will interact with the /api/chat endpoint of Ollama. The method ollamaChat()is for stream=falseand ollamaChatStream()is for stream=true in the request JSON.

package ollama.async.wrapper.service;

import java.time.Duration;

import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import lombok.RequiredArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

@RequiredArgsConstructor
@Service
public class OllamaAsyncWrapperChatServiceImpl implements OllamaAsyncWrapperChatService {
  
  private final WebClient webClient;
  private final RateLimiter rateLimiter;
        private final CircuitBreaker circuitBreaker;

  public Mono<String> ollamaChat(String payload) {
    return Mono.just(payload)
                .transformDeferred(RateLimiterOperator.of(rateLimiter))
                .transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
                .onErrorResume(RequestNotPermitted.class, e -> Mono.error(new RuntimeException("Rate limit exceeded, please try again later")))
                .onErrorResume(Throwable.class, e -> Mono.error(new RuntimeException("Service is currently unavailable, please try again later")))
                .delayElement(Duration.ofSeconds(1)) 
                .flatMap(request -> webClient.post()
                        .uri("/api/chat")
                        .accept(MediaType.APPLICATION_JSON)
                        .contentType(MediaType.APPLICATION_JSON)
                        .bodyValue(request)
                        .retrieve()
                        .bodyToMono(String.class)
                        .timeout(Duration.ofSeconds(60))
                        .retry(3)
                        .doOnError(Throwable::printStackTrace))
    .subscribeOn(Schedulers.boundedElastic());
  }
  
  public Flux<String> ollamaChatStream(String payload) {
    return Mono.just(payload)
                .transformDeferred(RateLimiterOperator.of(rateLimiter))
                .transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
                .onErrorResume(RequestNotPermitted.class, e -> Mono.error(new RuntimeException("Rate limit exceeded, please try again later")))
                .onErrorResume(Throwable.class, e -> Mono.error(new RuntimeException("Service is currently unavailable, please try again later")))
                .delayElement(Duration.ofSeconds(1)) 
                .flatMapMany(request -> webClient.post()
                        .uri("/api/chat")
                        .accept(MediaType.TEXT_PLAIN)
                        .contentType(MediaType.APPLICATION_JSON)
                        .bodyValue(request)
                        .retrieve()
                        .bodyToFlux(String.class)
                        .timeout(Duration.ofSeconds(60))
                        .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)))
                        .doOnError(Throwable::printStackTrace))
                .subscribeOn(Schedulers.boundedElastic());
  }
}

Step 4: The Controllers

Now, 2 controllers for using above 2 services.

1. Generate Controller – contains 2 endpoints /generate/ and /generate/stream/ for both stream=false and stream=true in the request JSON.

package ollama.async.wrapper.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.RequiredArgsConstructor;
import ollama.async.wrapper.service.OllamaAsyncWrapperGenerateService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RequiredArgsConstructor
@RestController
@RequestMapping("ollama")
public class OllamaAsyncWrapperGenerateController {
  
  private final OllamaAsyncWrapperGenerateService generateService;
  
  @PostMapping(value = "/generate/", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
  public Mono<String> generate(@RequestBody String payload) {
    return generateService.ollamaGenerate(payload);
  }
  
  @PostMapping(value = "/generate/stream", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<String> generateStream(@RequestBody String payload) {
    return generateService.ollamaGenerateStream(payload);
  }
}

2. Chat Controller – contains 2 endpoints /chat/ and /chat/stream/ for both stream=false and stream=true in the request JSON.

package ollama.async.wrapper.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import lombok.RequiredArgsConstructor;
import ollama.async.wrapper.service.OllamaAsyncWrapperChatService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RequiredArgsConstructor
@RestController
@RequestMapping("ollama")
public class OllamaAsyncWrapperChatController {
  
  private final OllamaAsyncWrapperChatService chatService;

  @PostMapping(value = "/chat/", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
  public Mono<String> chat(@RequestBody String payload) {
    return chatService.ollamaChat(payload);
  }
  
  @PostMapping(value = "/chat/stream", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<String> chatStream(@RequestBody String payload) {
    return chatService.ollamaChatStream(payload);
  }
}

Step 5: Set the port number and logging

In application.properties file, set a port number and logging format.

spring.application.name=ollama-async-wrapper

server.http2.enabled=true
server.port=5151

# Disable White Label Page
server.error.whitelabel.enabled=false

# Log setting
logging.file.name=./logs/ollama-async-wrapper.log
logging.pattern.file=%d %p %c{1.} [%t] %m%n
logging.level.web=ERROR
logging.level.org.springframework=ERROR
logging.logback.rollingpolicy.max-history=2
logging.logback.rollingpolicy.clean-history-on-start=true
logging.logback.rollingpolicy.total-size-cap.negative=false

Step 6: Send a Test request

curl -X POST "http://localhost:5151/ollama/generate/" -H "Content-Type: application/json" -d '{
  "model": "llama3",
  "prompt": "Hi!",
  "stream": false 
}'

Conclusion

By following this approach, we can build a robust and resilient reactive wrapper application that communicates asynchronously with a local LLM service like Ollama. Leveraging WebFlux for non-blocking operations and Resilience4j for fault tolerance ensures that the application can handle high loads and potential failures gracefully. This makes Ollama in a way more responsive and scalable, providing a better user experience.

Feel free to explore the GitHub repository here for the complete implementation and additional details. Fork it and improve it as per your need.

Or you can checkout and run the application by following below steps –

Clone the Repository
git clone https://github.com/niteshapte/ollama-async-wrapper-spring-boot-webflux.git
cd ollama-async-wrapper-spring-boot-webflux
Build the Project
./mvnw clean install
Run the Application
./mvnw spring-boot:run
Send a Test Request
curl -X POST "http://localhost:5151/ollama/generate" -H "Content-Type: application/json" -d '{
  "model": "llama3",
  "prompt": "Hi!"
}'

 

That’s it.

Hope you liked it.

Critics/feedbacks are welcome.

Have a great day ahead!

 

 

Loading