Picture this: Your AI application processes one request at a time while your users wait in digital purgatory. Meanwhile, your Go program sits there with 99% of its cores twiddling their thumbs. Sound familiar?
Go programming with Ollama transforms sluggish AI applications into concurrent powerhouses that handle hundreds of requests simultaneously. This guide shows you how to build production-ready AI systems that scale without breaking your server budget.
You'll learn to implement goroutines for parallel AI processing, use channels for safe data flow, and optimize performance for real-world applications. By the end, you'll have a complete concurrent AI system that outperforms traditional sequential approaches.
Why Go and Ollama Make Perfect Concurrent Partners
Traditional AI applications face a critical bottleneck: sequential processing. Each request waits for the previous one to complete, creating artificial delays that frustrate users and waste computational resources.
Go's concurrency model solves this problem through goroutines and channels. Unlike threads that consume megabytes of memory, goroutines use just 2KB and can spawn thousands without performance degradation.
Ollama's lightweight architecture complements Go's concurrency perfectly. It provides fast model inference with minimal memory overhead, making it ideal for high-throughput applications.
The Performance Problem
Consider a typical AI chat application:
- Model inference: 500ms per request
- Sequential processing: 10 requests = 5 seconds
- Concurrent processing: 10 requests = 500ms
That's a 10x performance improvement with proper concurrency implementation.
Setting Up Your Go Ollama Environment
Before diving into concurrent programming, establish your development environment with the necessary tools and dependencies.
Prerequisites Installation
# Install Go (version 1.21+)
curl -OL https://golang.org/dl/go1.21.6.linux-amd64.tar.gz
sudo tar -C /usr/local -xzf go1.21.6.linux-amd64.tar.gz
# Install Ollama
curl -fsSL https://ollama.com/install.sh | sh
# Pull a lightweight model for testing
ollama pull llama3.2:1b
Project Structure Setup
mkdir go-ollama-concurrent
cd go-ollama-concurrent
go mod init go-ollama-concurrent
# Create directory structure
mkdir -p {cmd,internal/{ollama,concurrent,handlers},pkg/{models,utils}}
Essential Dependencies
// go.mod
module go-ollama-concurrent
go 1.21
require (
github.com/gin-gonic/gin v1.9.1
github.com/ollama/ollama v0.1.17
golang.org/x/sync v0.5.0
)
Building Your First Concurrent Ollama Client
Start with a basic concurrent client that demonstrates goroutines and channels working together for AI processing.
Basic Concurrent Client Structure
// internal/ollama/client.go
package ollama
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
type Client struct {
baseURL string
httpClient *http.Client
mutex sync.RWMutex
}
type Request struct {
Model string `json:"model"`
Prompt string `json:"prompt"`
Stream bool `json:"stream"`
Context []int `json:"context,omitempty"`
}
type Response struct {
Response string `json:"response"`
Done bool `json:"done"`
Context []int `json:"context,omitempty"`
Error string `json:"error,omitempty"`
}
// NewClient creates a new Ollama client with optimized settings
func NewClient(baseURL string) *Client {
return &Client{
baseURL: baseURL,
httpClient: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
},
}
}
Implementing Concurrent Request Processing
// internal/concurrent/processor.go
package concurrent
import (
"context"
"fmt"
"sync"
"time"
)
type Job struct {
ID string
Prompt string
Model string
Response chan Result
}
type Result struct {
ID string
Response string
Error error
Duration time.Duration
}
type Processor struct {
client *ollama.Client
workers int
jobQueue chan Job
resultPool sync.Pool
ctx context.Context
cancel context.CancelFunc
}
// NewProcessor creates a concurrent processor with specified worker count
func NewProcessor(client *ollama.Client, workers int) *Processor {
ctx, cancel := context.WithCancel(context.Background())
p := &Processor{
client: client,
workers: workers,
jobQueue: make(chan Job, workers*2), // Buffered channel for better performance
ctx: ctx,
cancel: cancel,
}
// Initialize result pool for memory efficiency
p.resultPool = sync.Pool{
New: func() interface{} {
return &Result{}
},
}
return p
}
// Start launches worker goroutines for concurrent processing
func (p *Processor) Start() {
for i := 0; i < p.workers; i++ {
go p.worker(i)
}
}
// worker processes jobs from the queue concurrently
func (p *Processor) worker(id int) {
for {
select {
case job := <-p.jobQueue:
p.processJob(job, id)
case <-p.ctx.Done():
return
}
}
}
// processJob handles individual AI inference requests
func (p *Processor) processJob(job Job, workerID int) {
start := time.Now()
result := p.resultPool.Get().(*Result)
defer p.resultPool.Put(result)
// Reset result object
result.ID = job.ID
result.Response = ""
result.Error = nil
// Make Ollama API call
response, err := p.client.Generate(p.ctx, job.Model, job.Prompt)
if err != nil {
result.Error = fmt.Errorf("worker %d failed: %w", workerID, err)
} else {
result.Response = response
}
result.Duration = time.Since(start)
// Send result back through channel
select {
case job.Response <- *result:
case <-p.ctx.Done():
return
}
}
Channel-Based Job Distribution
// ProcessBatch handles multiple requests concurrently
func (p *Processor) ProcessBatch(requests []Request) []Result {
results := make([]Result, len(requests))
var wg sync.WaitGroup
for i, req := range requests {
wg.Add(1)
go func(index int, request Request) {
defer wg.Done()
job := Job{
ID: fmt.Sprintf("batch_%d", index),
Prompt: request.Prompt,
Model: request.Model,
Response: make(chan Result, 1),
}
// Send job to worker pool
select {
case p.jobQueue <- job:
// Wait for result
select {
case result := <-job.Response:
results[index] = result
case <-time.After(30 * time.Second):
results[index] = Result{
ID: job.ID,
Error: fmt.Errorf("timeout waiting for response"),
}
}
case <-p.ctx.Done():
results[index] = Result{
ID: job.ID,
Error: fmt.Errorf("processor stopped"),
}
}
}(i, req)
}
wg.Wait()
return results
}
Advanced Concurrency Patterns for AI Workloads
Real-world AI applications require sophisticated concurrency patterns beyond basic goroutines. These patterns handle complex scenarios like rate limiting, circuit breaking, and intelligent load balancing.
Worker Pool with Dynamic Scaling
// internal/concurrent/adaptive_pool.go
package concurrent
import (
"context"
"sync"
"sync/atomic"
"time"
)
type AdaptivePool struct {
minWorkers int
maxWorkers int
currentWorkers int64
jobQueue chan Job
workerQueue chan chan Job
quit chan bool
metrics *PoolMetrics
}
type PoolMetrics struct {
ProcessedJobs int64
AverageLatency time.Duration
QueueDepth int64
ActiveWorkers int64
mu sync.RWMutex
}
// NewAdaptivePool creates a worker pool that scales based on demand
func NewAdaptivePool(min, max int) *AdaptivePool {
pool := &AdaptivePool{
minWorkers: min,
maxWorkers: max,
currentWorkers: int64(min),
jobQueue: make(chan Job, max*2),
workerQueue: make(chan chan Job, max),
quit: make(chan bool),
metrics: &PoolMetrics{},
}
// Start monitoring goroutine
go pool.monitor()
return pool
}
// monitor adjusts worker count based on queue depth and performance metrics
func (p *AdaptivePool) monitor() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.adjustWorkerCount()
case <-p.quit:
return
}
}
}
// adjustWorkerCount scales workers up or down based on metrics
func (p *AdaptivePool) adjustWorkerCount() {
queueDepth := len(p.jobQueue)
currentWorkers := atomic.LoadInt64(&p.currentWorkers)
// Scale up if queue is backing up
if queueDepth > int(currentWorkers)*2 && currentWorkers < int64(p.maxWorkers) {
p.addWorker()
}
// Scale down if queue is consistently empty
if queueDepth == 0 && currentWorkers > int64(p.minWorkers) {
p.removeWorker()
}
}
// addWorker spawns a new worker goroutine
func (p *AdaptivePool) addWorker() {
atomic.AddInt64(&p.currentWorkers, 1)
go p.worker()
}
// removeWorker signals a worker to stop
func (p *AdaptivePool) removeWorker() {
if atomic.LoadInt64(&p.currentWorkers) > int64(p.minWorkers) {
atomic.AddInt64(&p.currentWorkers, -1)
// Worker will stop when it sees the decreased count
}
}
Circuit Breaker Pattern for Reliability
// internal/concurrent/circuit_breaker.go
package concurrent
import (
"errors"
"sync"
"time"
)
type CircuitState int
const (
Closed CircuitState = iota
Open
HalfOpen
)
type CircuitBreaker struct {
maxFailures int
resetTimeout time.Duration
state CircuitState
failures int
lastFailure time.Time
mutex sync.RWMutex
onStateChange func(from, to CircuitState)
}
// NewCircuitBreaker creates a circuit breaker for fault tolerance
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
resetTimeout: resetTimeout,
state: Closed,
}
}
// Execute runs a function with circuit breaker protection
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mutex.Lock()
defer cb.mutex.Unlock()
if cb.state == Open {
if time.Since(cb.lastFailure) > cb.resetTimeout {
cb.state = HalfOpen
cb.failures = 0
} else {
return errors.New("circuit breaker is open")
}
}
err := fn()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = Open
if cb.onStateChange != nil {
cb.onStateChange(Closed, Open)
}
}
return err
}
// Success - reset circuit breaker
if cb.state == HalfOpen {
cb.state = Closed
if cb.onStateChange != nil {
cb.onStateChange(HalfOpen, Closed)
}
}
cb.failures = 0
return nil
}
Rate Limiting for API Protection
// internal/concurrent/rate_limiter.go
package concurrent
import (
"context"
"sync"
"time"
)
type RateLimiter struct {
tokens chan struct{}
refillRate time.Duration
capacity int
quit chan struct{}
mu sync.Mutex
}
// NewRateLimiter creates a token bucket rate limiter
func NewRateLimiter(capacity int, refillRate time.Duration) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, capacity),
refillRate: refillRate,
capacity: capacity,
quit: make(chan struct{}),
}
// Fill initial tokens
for i := 0; i < capacity; i++ {
rl.tokens <- struct{}{}
}
// Start refill goroutine
go rl.refill()
return rl
}
// refill adds tokens at the specified rate
func (rl *RateLimiter) refill() {
ticker := time.NewTicker(rl.refillRate)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case rl.tokens <- struct{}{}:
default:
// Token bucket is full
}
case <-rl.quit:
return
}
}
}
// Allow checks if a request can proceed
func (rl *RateLimiter) Allow() bool {
select {
case <-rl.tokens:
return true
default:
return false
}
}
// Wait blocks until a token is available
func (rl *RateLimiter) Wait(ctx context.Context) error {
select {
case <-rl.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
Performance Optimization Techniques
Maximizing concurrent AI performance requires attention to memory management, connection pooling, and algorithmic optimizations specific to AI workloads.
Memory Pool Management
// internal/concurrent/memory_pool.go
package concurrent
import (
"sync"
)
type MemoryPool struct {
requestPool sync.Pool
responsePool sync.Pool
bufferPool sync.Pool
}
// NewMemoryPool creates optimized memory pools for AI requests
func NewMemoryPool() *MemoryPool {
return &MemoryPool{
requestPool: sync.Pool{
New: func() interface{} {
return &Request{}
},
},
responsePool: sync.Pool{
New: func() interface{} {
return &Response{}
},
},
bufferPool: sync.Pool{
New: func() interface{} {
// Pre-allocate 64KB buffers for JSON processing
return make([]byte, 64*1024)
},
},
}
}
// GetRequest retrieves a request object from the pool
func (mp *MemoryPool) GetRequest() *Request {
return mp.requestPool.Get().(*Request)
}
// PutRequest returns a request object to the pool
func (mp *MemoryPool) PutRequest(req *Request) {
// Reset request fields
req.Model = ""
req.Prompt = ""
req.Context = req.Context[:0]
mp.requestPool.Put(req)
}
// GetBuffer retrieves a buffer from the pool
func (mp *MemoryPool) GetBuffer() []byte {
return mp.bufferPool.Get().([]byte)
}
// PutBuffer returns a buffer to the pool
func (mp *MemoryPool) PutBuffer(buf []byte) {
if cap(buf) == 64*1024 {
mp.bufferPool.Put(buf[:0])
}
}
Connection Pool Optimization
// internal/ollama/optimized_client.go
package ollama
import (
"context"
"net/http"
"time"
)
type OptimizedClient struct {
client *http.Client
memoryPool *MemoryPool
rateLimiter *RateLimiter
circuitBreaker *CircuitBreaker
}
// NewOptimizedClient creates a production-ready Ollama client
func NewOptimizedClient(baseURL string) *OptimizedClient {
transport := &http.Transport{
MaxIdleConns: 200,
MaxIdleConnsPerHost: 50,
IdleConnTimeout: 120 * time.Second,
DisableCompression: false,
ForceAttemptHTTP2: true,
}
client := &http.Client{
Transport: transport,
Timeout: 45 * time.Second,
}
return &OptimizedClient{
client: client,
memoryPool: NewMemoryPool(),
rateLimiter: NewRateLimiter(100, time.Second/50), // 50 RPS
circuitBreaker: NewCircuitBreaker(5, 30*time.Second),
}
}
// GenerateWithOptimization processes requests with all optimizations
func (oc *OptimizedClient) GenerateWithOptimization(ctx context.Context, model, prompt string) (string, error) {
// Wait for rate limit
if err := oc.rateLimiter.Wait(ctx); err != nil {
return "", err
}
// Use circuit breaker
var result string
err := oc.circuitBreaker.Execute(func() error {
var err error
result, err = oc.generateInternal(ctx, model, prompt)
return err
})
return result, err
}
// generateInternal handles the actual API call with memory pooling
func (oc *OptimizedClient) generateInternal(ctx context.Context, model, prompt string) (string, error) {
// Get request from pool
req := oc.memoryPool.GetRequest()
defer oc.memoryPool.PutRequest(req)
// Set request parameters
req.Model = model
req.Prompt = prompt
req.Stream = false
// Get buffer from pool
buf := oc.memoryPool.GetBuffer()
defer oc.memoryPool.PutBuffer(buf)
// Make API call with pooled resources
return oc.makeAPICall(ctx, req, buf)
}
Batch Processing Optimization
// internal/concurrent/batch_processor.go
package concurrent
import (
"context"
"fmt"
"sync"
"time"
)
type BatchProcessor struct {
client *OptimizedClient
batchSize int
maxWaitTime time.Duration
processingQueue chan *BatchItem
batches chan []*BatchItem
results sync.Map
}
type BatchItem struct {
ID string
Model string
Prompt string
Response chan BatchResult
Timeout time.Duration
}
type BatchResult struct {
ID string
Response string
Error error
Latency time.Duration
}
// NewBatchProcessor creates a processor that batches requests for efficiency
func NewBatchProcessor(client *OptimizedClient, batchSize int, maxWaitTime time.Duration) *BatchProcessor {
bp := &BatchProcessor{
client: client,
batchSize: batchSize,
maxWaitTime: maxWaitTime,
processingQueue: make(chan *BatchItem, batchSize*4),
batches: make(chan []*BatchItem, 10),
}
// Start batch collection goroutine
go bp.collectBatches()
// Start batch processing goroutines
for i := 0; i < 4; i++ {
go bp.processBatches()
}
return bp
}
// collectBatches groups individual requests into batches
func (bp *BatchProcessor) collectBatches() {
var currentBatch []*BatchItem
timer := time.NewTimer(bp.maxWaitTime)
for {
select {
case item := <-bp.processingQueue:
currentBatch = append(currentBatch, item)
if len(currentBatch) >= bp.batchSize {
bp.batches <- currentBatch
currentBatch = nil
timer.Reset(bp.maxWaitTime)
}
case <-timer.C:
if len(currentBatch) > 0 {
bp.batches <- currentBatch
currentBatch = nil
}
timer.Reset(bp.maxWaitTime)
}
}
}
// processBatches handles batched requests concurrently
func (bp *BatchProcessor) processBatches() {
for batch := range bp.batches {
bp.processBatch(batch)
}
}
// processBatch processes a single batch of requests
func (bp *BatchProcessor) processBatch(batch []*BatchItem) {
var wg sync.WaitGroup
for _, item := range batch {
wg.Add(1)
go func(item *BatchItem) {
defer wg.Done()
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), item.Timeout)
defer cancel()
response, err := bp.client.GenerateWithOptimization(ctx, item.Model, item.Prompt)
result := BatchResult{
ID: item.ID,
Response: response,
Error: err,
Latency: time.Since(start),
}
select {
case item.Response <- result:
default:
// Response channel might be closed
}
}(item)
}
wg.Wait()
}
Building a Production-Ready HTTP Server
Transform your concurrent AI processor into a robust HTTP server that handles real-world traffic patterns and provides monitoring capabilities.
HTTP Server Implementation
// cmd/server/main.go
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/gin-gonic/gin"
"go-ollama-concurrent/internal/handlers"
"go-ollama-concurrent/internal/concurrent"
"go-ollama-concurrent/internal/ollama"
)
func main() {
// Initialize components
client := ollama.NewOptimizedClient("http://localhost:11434")
processor := concurrent.NewProcessor(client, 10)
batchProcessor := concurrent.NewBatchProcessor(client, 5, 100*time.Millisecond)
// Start background processors
processor.Start()
// Setup HTTP server
router := gin.Default()
// Add middleware
router.Use(gin.Recovery())
router.Use(gin.Logger())
router.Use(handlers.CORSMiddleware())
router.Use(handlers.RateLimitMiddleware())
// Setup routes
api := router.Group("/api/v1")
{
api.POST("/generate", handlers.GenerateHandler(processor))
api.POST("/batch", handlers.BatchHandler(batchProcessor))
api.GET("/health", handlers.HealthHandler())
api.GET("/metrics", handlers.MetricsHandler())
}
// Start server with graceful shutdown
srv := &http.Server{
Addr: ":8080",
Handler: router,
}
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Server failed: %v", err)
}
}()
// Wait for interrupt signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// Graceful shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal("Server forced to shutdown:", err)
}
log.Println("Server exited")
}
Request Handlers with Validation
// internal/handlers/api.go
package handlers
import (
"net/http"
"strconv"
"time"
"github.com/gin-gonic/gin"
"go-ollama-concurrent/internal/concurrent"
"go-ollama-concurrent/pkg/models"
)
type GenerateRequest struct {
Model string `json:"model" binding:"required"`
Prompt string `json:"prompt" binding:"required,min=1,max=4000"`
Timeout int `json:"timeout,omitempty"`
}
type BatchRequest struct {
Requests []GenerateRequest `json:"requests" binding:"required,min=1,max=50"`
}
// GenerateHandler processes single AI generation requests
func GenerateHandler(processor *concurrent.Processor) gin.HandlerFunc {
return func(c *gin.Context) {
var req GenerateRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Invalid request format",
"details": err.Error(),
})
return
}
// Set default timeout
timeout := 30 * time.Second
if req.Timeout > 0 {
timeout = time.Duration(req.Timeout) * time.Second
}
// Create job
job := concurrent.Job{
ID: generateJobID(),
Prompt: req.Prompt,
Model: req.Model,
Response: make(chan concurrent.Result, 1),
}
// Process with timeout
ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
defer cancel()
select {
case processor.JobQueue() <- job:
select {
case result := <-job.Response:
if result.Error != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": "Generation failed",
"details": result.Error.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"id": result.ID,
"response": result.Response,
"duration": result.Duration.Milliseconds(),
})
case <-ctx.Done():
c.JSON(http.StatusRequestTimeout, gin.H{
"error": "Request timeout",
})
}
case <-ctx.Done():
c.JSON(http.StatusServiceUnavailable, gin.H{
"error": "Service temporarily unavailable",
})
}
}
}
// BatchHandler processes multiple requests concurrently
func BatchHandler(batchProcessor *concurrent.BatchProcessor) gin.HandlerFunc {
return func(c *gin.Context) {
var req BatchRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Invalid batch request format",
"details": err.Error(),
})
return
}
// Process batch
results := make([]gin.H, len(req.Requests))
responseChan := make(chan concurrent.BatchResult, len(req.Requests))
for i, genReq := range req.Requests {
timeout := 30 * time.Second
if genReq.Timeout > 0 {
timeout = time.Duration(genReq.Timeout) * time.Second
}
batchItem := &concurrent.BatchItem{
ID: generateJobID(),
Model: genReq.Model,
Prompt: genReq.Prompt,
Response: responseChan,
Timeout: timeout,
}
batchProcessor.Submit(batchItem)
}
// Collect results
for i := 0; i < len(req.Requests); i++ {
select {
case result := <-responseChan:
results[i] = gin.H{
"id": result.ID,
"response": result.Response,
"error": result.Error,
"duration": result.Latency.Milliseconds(),
}
case <-time.After(45 * time.Second):
results[i] = gin.H{
"error": "Batch processing timeout",
}
}
}
c.JSON(http.StatusOK, gin.H{
"results": results,
"total": len(results),
})
}
}
Monitoring and Metrics
// internal/handlers/metrics.go
package handlers
import (
"net/http"
"runtime"
"time"
"github.com/gin-gonic/gin"
)
type SystemMetrics struct {
Timestamp time.Time `json:"timestamp"`
Goroutines int `json:"goroutines"`
MemoryAllocated uint64 `json:"memory_allocated_mb"`
MemorySystem uint64 `json:"memory_system_mb"`
GCCycles uint32 `json:"gc_cycles"`
RequestsTotal int64 `json:"requests_total"`
RequestsActive int64 `json:"requests_active"`
AverageLatency float64 `json:"average_latency_ms"`
ErrorRate float64 `json:"error_rate_percent"`
}
var metrics = &SystemMetrics{}
// MetricsHandler returns system and application metrics
func MetricsHandler() gin.HandlerFunc {
return func(c *gin.Context) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
metrics.Timestamp = time.Now()
metrics.Goroutines = runtime.NumGoroutine()
metrics.MemoryAllocated = m.Alloc / 1024 / 1024
metrics.MemorySystem = m.Sys / 1024 / 1024
metrics.GCCycles = m.NumGC
c.JSON(http.StatusOK, metrics)
}
}
// HealthHandler provides health check endpoint
func HealthHandler() gin.HandlerFunc {
return func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "healthy",
"timestamp": time.Now(),
"version": "1.0.0",
})
}
}
Testing and Benchmarking Your Implementation
Comprehensive testing ensures your concurrent AI system performs reliably under various load conditions and edge cases.
Load Testing Setup
// test/load_test.go
package test
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sync"
"testing"
"time"
)
func BenchmarkConcurrentGeneration(b *testing.B) {
serverURL := "http://localhost:8080"
testCases := []struct {
name string
concurrent int
requests int
prompt string
}{
{"Light Load", 10, 100, "Hello, world!"},
{"Medium Load", 50, 500, "Explain quantum computing in simple terms."},
{"Heavy Load", 100, 1000, "Write a detailed analysis of machine learning algorithms."},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
benchmarkConcurrentRequests(b, serverURL, tc.concurrent, tc.requests, tc.prompt)
})
}
}
func benchmarkConcurrentRequests(b *testing.B, serverURL string, concurrent, requests int, prompt string) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
semaphore := make(chan struct{}, concurrent)
start := time.Now()
for j := 0; j < requests; j++ {
wg.Add(1)
go func() {
defer wg.Done()
semaphore <- struct{}{}
defer func() { <-semaphore }()
makeRequest(serverURL, prompt)
}()
}
wg.Wait()
duration := time.Since(start)
b.ReportMetric(float64(requests)/duration.Seconds(), "requests/sec")
b.ReportMetric(duration.Seconds()/float64(requests)*1000, "ms/request")
}
}
func makeRequest(serverURL, prompt string) error {
requestBody := map[string]interface{}{
"model": "llama3.2:1b",
"prompt": prompt,
}
body, _ := json.Marshal(requestBody)
resp, err := http.Post(serverURL+"/api/v1/generate", "application/json", bytes.NewBuffer(body))
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
Unit Testing for Concurrent Components
// test/concurrent_test.go
package test
import (
"context"
"sync"
"testing"
"time"
"go-ollama-concurrent/internal/concurrent"
"go-ollama-concurrent/internal/ollama"
)
func TestProcessorConcurrency(t *testing.T) {
client := ollama.NewOptimizedClient("http://localhost:11434")
processor := concurrent.NewProcessor(client, 5)
processor.Start()
// Test concurrent job processing
jobs := make([]concurrent.Job, 10)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
jobs[i] = concurrent.Job{
ID: fmt.Sprintf("test_%d", i),
Prompt: "Test prompt",
Model: "llama3.2:1b",
Response: make(chan concurrent.Result, 1),
}
wg.Add(1)
go func(job concurrent.Job) {
defer wg.Done()
// Submit job
processor.JobQueue() <- job
// Wait for result
select {
case result := <-job.Response:
if result.Error != nil {
t.Errorf("Job %s failed: %v", job.ID, result.Error)
}
case <-time.After(30 * time.Second):
t.Errorf("Job %s timed out", job.ID)
}
}(jobs[i])
}
wg.Wait()
}
func TestCircuitBreaker(t *testing.T) {
cb := concurrent.NewCircuitBreaker(3, 5*time.Second)
// Test normal operation
err := cb.Execute(func() error {
return nil
})
if err != nil {
t.Errorf("Circuit breaker should allow normal operation: %v", err)
}
// Test failure handling
for i := 0; i < 3; i++ {
cb.Execute(func() error {
return fmt.Errorf("simulated failure")
})
}
// Circuit should be open now
err = cb.Execute(func() error {
return nil
})
if err == nil {
t.Error("Circuit breaker should reject requests when open")
}
}
func TestRateLimiter(t *testing.T) {
rl := concurrent.NewRateLimiter(10, 100*time.Millisecond)
// Test initial capacity
allowed := 0
for i := 0; i < 15; i++ {
if rl.Allow() {
allowed++
}
}
if allowed != 10 {
t.Errorf("Expected 10 allowed requests, got %d", allowed)
}
// Test refill
time.Sleep(200 * time.Millisecond)
if !rl.Allow() {
t.Error("Rate limiter should have refilled tokens")
}
}
Performance Benchmarking
// test/benchmark_test.go
package test
import (
"context"
"fmt"
"sync"
"testing"
"time"
"go-ollama-concurrent/internal/concurrent"
"go-ollama-concurrent/internal/ollama"
)
func BenchmarkSequentialProcessing(b *testing.B) {
client := ollama.NewOptimizedClient("http://localhost:11434")
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < 10; j++ {
_, err := client.Generate(context.Background(), "llama3.2:1b", "Test prompt")
if err != nil {
b.Errorf("Generation failed: %v", err)
}
}
}
}
func BenchmarkConcurrentProcessing(b *testing.B) {
client := ollama.NewOptimizedClient("http://localhost:11434")
processor := concurrent.NewProcessor(client, 10)
processor.Start()
b.ResetTimer()
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
for j := 0; j < 10; j++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
job := concurrent.Job{
ID: fmt.Sprintf("bench_%d", id),
Prompt: "Test prompt",
Model: "llama3.2:1b",
Response: make(chan concurrent.Result, 1),
}
processor.JobQueue() <- job
<-job.Response
}(j)
}
wg.Wait()
}
}
func BenchmarkMemoryPooling(b *testing.B) {
pool := concurrent.NewMemoryPool()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Test request pooling
req := pool.GetRequest()
req.Model = "llama3.2:1b"
req.Prompt = "Test prompt"
pool.PutRequest(req)
// Test buffer pooling
buf := pool.GetBuffer()
pool.PutBuffer(buf)
}
}
Deployment and Scaling Strategies
Deploy your concurrent AI system with container orchestration, load balancing, and auto-scaling capabilities for production environments.
Docker Configuration
# Dockerfile
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main cmd/server/main.go
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/main .
COPY --from=builder /app/config ./config
EXPOSE 8080
CMD ["./main"]
Docker Compose with Ollama
# docker-compose.yml
version: '3.8'
services:
ollama:
image: ollama/ollama:latest
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
environment:
- OLLAMA_HOST=0.0.0.0
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
go-ai-server:
build: .
ports:
- "8080:8080"
depends_on:
- ollama
environment:
- OLLAMA_URL=http://ollama:11434
- WORKER_COUNT=20
- RATE_LIMIT=100
deploy:
replicas: 3
resources:
limits:
cpus: '2'
memory: 2G
reservations:
cpus: '1'
memory: 1G
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- go-ai-server
volumes:
ollama_data:
Kubernetes Deployment
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: go-ollama-concurrent
labels:
app: go-ollama-concurrent
spec:
replicas: 5
selector:
matchLabels:
app: go-ollama-concurrent
template:
metadata:
labels:
app: go-ollama-concurrent
spec:
containers:
- name: go-ollama-concurrent
image: go-ollama-concurrent:latest
ports:
- containerPort: 8080
env:
- name: OLLAMA_URL
value: "http://ollama-service:11434"
- name: WORKER_COUNT
value: "15"
resources:
limits:
cpu: 2000m
memory: 2Gi
requests:
cpu: 1000m
memory: 1Gi
livenessProbe:
httpGet:
path: /api/v1/health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /api/v1/health
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: go-ollama-service
spec:
selector:
app: go-ollama-concurrent
ports:
- port: 80
targetPort: 8080
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: go-ollama-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: go-ollama-concurrent
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Load Balancer Configuration
# nginx.conf
upstream go_ollama_backend {
least_conn;
server go-ai-server:8080 max_fails=3 fail_timeout=30s;
server go-ai-server:8080 max_fails=3 fail_timeout=30s;
server go-ai-server:8080 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
location /api/v1/ {
proxy_pass http://go_ollama_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts for long-running AI requests
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
# Buffer settings
proxy_buffering on;
proxy_buffer_size 8k;
proxy_buffers 8 8k;
# Rate limiting
limit_req zone=api burst=20 nodelay;
}
location /health {
access_log off;
proxy_pass http://go_ollama_backend;
}
}
# Rate limiting zone
http {
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
}
Real-World Use Cases and Examples
Apply your concurrent AI system to practical scenarios that demonstrate the power of Go's concurrency for AI workloads.
Chatbot with Context Management
// internal/chatbot/context_manager.go
package chatbot
import (
"context"
"sync"
"time"
"go-ollama-concurrent/internal/concurrent"
)
type ChatSession struct {
ID string
UserID string
Context []int
LastUsed time.Time
Messages []Message
mu sync.RWMutex
}
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
Timestamp time.Time `json:"timestamp"`
}
type ContextManager struct {
sessions map[string]*ChatSession
processor *concurrent.Processor
mu sync.RWMutex
cleanupTicker *time.Ticker
}
// NewContextManager creates a chatbot with session management
func NewContextManager(processor *concurrent.Processor) *ContextManager {
cm := &ContextManager{
sessions: make(map[string]*ChatSession),
processor: processor,
cleanupTicker: time.NewTicker(10 * time.Minute),
}
// Start cleanup goroutine
go cm.cleanupSessions()
return cm
}
// ProcessMessage handles chat messages with context preservation
func (cm *ContextManager) ProcessMessage(sessionID, userID, message string) (string, error) {
session := cm.getOrCreateSession(sessionID, userID)
session.mu.Lock()
session.Messages = append(session.Messages, Message{
Role: "user",
Content: message,
Timestamp: time.Now(),
})
session.LastUsed = time.Now()
session.mu.Unlock()
// Build context-aware prompt
prompt := cm.buildContextPrompt(session, message)
// Process with concurrent processor
job := concurrent.Job{
ID: fmt.Sprintf("chat_%s_%d", sessionID, time.Now().UnixNano()),
Prompt: prompt,
Model: "llama3.2:1b",
Response: make(chan concurrent.Result, 1),
}
cm.processor.JobQueue() <- job
// Wait for response
select {
case result := <-job.Response:
if result.Error != nil {
return "", result.Error
}
// Update session with response
session.mu.Lock()
session.Messages = append(session.Messages, Message{
Role: "assistant",
Content: result.Response,
Timestamp: time.Now(),
})
session.mu.Unlock()
return result.Response, nil
case <-time.After(30 * time.Second):
return "", fmt.Errorf("chat response timeout")
}
}
// buildContextPrompt creates a context-aware prompt from chat history
func (cm *ContextManager) buildContextPrompt(session *ChatSession, newMessage string) string {
session.mu.RLock()
defer session.mu.RUnlock()
var contextBuilder strings.Builder
// Add recent messages for context (last 10 messages)
start := len(session.Messages) - 10
if start < 0 {
start = 0
}
contextBuilder.WriteString("Previous conversation:\n")
for i := start; i < len(session.Messages); i++ {
msg := session.Messages[i]
contextBuilder.WriteString(fmt.Sprintf("%s: %s\n", msg.Role, msg.Content))
}
contextBuilder.WriteString(fmt.Sprintf("user: %s\nassistant:", newMessage))
return contextBuilder.String()
}
Document Processing Pipeline
// internal/pipeline/document_processor.go
package pipeline
import (
"context"
"fmt"
"sync"
"time"
"go-ollama-concurrent/internal/concurrent"
)
type DocumentProcessor struct {
processor *concurrent.Processor
stages []ProcessingStage
results chan ProcessingResult
errors chan error
}
type ProcessingStage struct {
Name string
Function func(context.Context, string) (string, error)
Timeout time.Duration
}
type ProcessingResult struct {
DocumentID string
Stage string
Result string
Duration time.Duration
Error error
}
// NewDocumentProcessor creates a multi-stage document processing pipeline
func NewDocumentProcessor(processor *concurrent.Processor) *DocumentProcessor {
dp := &DocumentProcessor{
processor: processor,
results: make(chan ProcessingResult, 100),
errors: make(chan error, 100),
}
// Define processing stages
dp.stages = []ProcessingStage{
{
Name: "extract_key_points",
Function: dp.extractKeyPoints,
Timeout: 30 * time.Second,
},
{
Name: "summarize",
Function: dp.summarize,
Timeout: 45 * time.Second,
},
{
Name: "generate_tags",
Function: dp.generateTags,
Timeout: 20 * time.Second,
},
{
Name: "sentiment_analysis",
Function: dp.analyzeSentiment,
Timeout: 15 * time.Second,
},
}
return dp
}
// ProcessDocument runs all stages concurrently for a document
func (dp *DocumentProcessor) ProcessDocument(ctx context.Context, documentID, content string) map[string]ProcessingResult {
var wg sync.WaitGroup
results := make(map[string]ProcessingResult)
var mu sync.Mutex
for _, stage := range dp.stages {
wg.Add(1)
go func(stage ProcessingStage) {
defer wg.Done()
start := time.Now()
stageCtx, cancel := context.WithTimeout(ctx, stage.Timeout)
defer cancel()
result, err := stage.Function(stageCtx, content)
processResult := ProcessingResult{
DocumentID: documentID,
Stage: stage.Name,
Result: result,
Duration: time.Since(start),
Error: err,
}
mu.Lock()
results[stage.Name] = processResult
mu.Unlock()
select {
case dp.results <- processResult:
default:
// Channel full, log warning
}
}(stage)
}
wg.Wait()
return results
}
// extractKeyPoints identifies important points in the document
func (dp *DocumentProcessor) extractKeyPoints(ctx context.Context, content string) (string, error) {
prompt := fmt.Sprintf("Extract the 5 most important key points from this document:\n\n%s", content)
job := concurrent.Job{
ID: fmt.Sprintf("extract_%d", time.Now().UnixNano()),
Prompt: prompt,
Model: "llama3.2:1b",
Response: make(chan concurrent.Result, 1),
}
select {
case dp.processor.JobQueue() <- job:
select {
case result := <-job.Response:
return result.Response, result.Error
case <-ctx.Done():
return "", ctx.Err()
}
case <-ctx.Done():
return "", ctx.Err()
}
}
// summarize creates a concise summary of the document
func (dp *DocumentProcessor) summarize(ctx context.Context, content string) (string, error) {
prompt := fmt.Sprintf("Create a concise summary of this document in 3-4 sentences:\n\n%s", content)
return dp.processWithAI(ctx, prompt, "summarize")
}
// generateTags creates relevant tags for the document
func (dp *DocumentProcessor) generateTags(ctx context.Context, content string) (string, error) {
prompt := fmt.Sprintf("Generate 8-10 relevant tags for this document (comma-separated):\n\n%s", content)
return dp.processWithAI(ctx, prompt, "tags")
}
// analyzeSentiment determines the emotional tone of the document
func (dp *DocumentProcessor) analyzeSentiment(ctx context.Context, content string) (string, error) {
prompt := fmt.Sprintf("Analyze the sentiment of this document (positive/negative/neutral) and provide a brief explanation:\n\n%s", content)
return dp.processWithAI(ctx, prompt, "sentiment")
}
// processWithAI is a helper function for AI processing
func (dp *DocumentProcessor) processWithAI(ctx context.Context, prompt, jobType string) (string, error) {
job := concurrent.Job{
ID: fmt.Sprintf("%s_%d", jobType, time.Now().UnixNano()),
Prompt: prompt,
Model: "llama3.2:1b",
Response: make(chan concurrent.Result, 1),
}
select {
case dp.processor.JobQueue() <- job:
select {
case result := <-job.Response:
return result.Response, result.Error
case <-ctx.Done():
return "", ctx.Err()
}
case <-ctx.Done():
return "", ctx.Err()
}
}
Real-time Analytics Dashboard
// internal/analytics/dashboard.go
package analytics
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/gin-gonic/gin"
"go-ollama-concurrent/internal/concurrent"
)
type DashboardMetrics struct {
RequestsPerSecond float64 `json:"requests_per_second"`
AverageLatency time.Duration `json:"average_latency"`
ErrorRate float64 `json:"error_rate"`
ActiveConnections int `json:"active_connections"`
QueueDepth int `json:"queue_depth"`
ModelUsage map[string]int `json:"model_usage"`
TopPrompts []string `json:"top_prompts"`
SystemResources SystemMetrics `json:"system_resources"`
LastUpdated time.Time `json:"last_updated"`
}
type AnalyticsCollector struct {
metrics *DashboardMetrics
processor *concurrent.Processor
mu sync.RWMutex
subscribers []chan *DashboardMetrics
updateTicker *time.Ticker
}
// NewAnalyticsCollector creates a real-time analytics system
func NewAnalyticsCollector(processor *concurrent.Processor) *AnalyticsCollector {
ac := &AnalyticsCollector{
metrics: &DashboardMetrics{
ModelUsage: make(map[string]int),
TopPrompts: make([]string, 0),
},
processor: processor,
updateTicker: time.NewTicker(5 * time.Second),
}
// Start metrics collection
go ac.collectMetrics()
return ac
}
// collectMetrics runs continuously to gather system metrics
func (ac *AnalyticsCollector) collectMetrics() {
for {
select {
case <-ac.updateTicker.C:
ac.updateMetrics()
ac.broadcastMetrics()
}
}
}
// updateMetrics calculates current system metrics
func (ac *AnalyticsCollector) updateMetrics() {
ac.mu.Lock()
defer ac.mu.Unlock()
// Update timestamp
ac.metrics.LastUpdated = time.Now()
// Get processor metrics
if processorMetrics := ac.processor.GetMetrics(); processorMetrics != nil {
ac.metrics.RequestsPerSecond = processorMetrics.RequestsPerSecond
ac.metrics.AverageLatency = processorMetrics.AverageLatency
ac.metrics.ErrorRate = processorMetrics.ErrorRate
ac.metrics.QueueDepth = processorMetrics.QueueDepth
}
// Update system resources
ac.metrics.SystemResources = getSystemMetrics()
}
// broadcastMetrics sends updates to all subscribers
func (ac *AnalyticsCollector) broadcastMetrics() {
ac.mu.RLock()
metricsCopy := *ac.metrics
subscribers := make([]chan *DashboardMetrics, len(ac.subscribers))
copy(subscribers, ac.subscribers)
ac.mu.RUnlock()
for _, subscriber := range subscribers {
select {
case subscriber <- &metricsCopy:
default:
// Subscriber not ready, skip
}
}
}
// Subscribe adds a new metrics subscriber
func (ac *AnalyticsCollector) Subscribe() <-chan *DashboardMetrics {
ac.mu.Lock()
defer ac.mu.Unlock()
subscriber := make(chan *DashboardMetrics, 10)
ac.subscribers = append(ac.subscribers, subscriber)
return subscriber
}
// GetCurrentMetrics returns the latest metrics snapshot
func (ac *AnalyticsCollector) GetCurrentMetrics() *DashboardMetrics {
ac.mu.RLock()
defer ac.mu.RUnlock()
metricsCopy := *ac.metrics
return &metricsCopy
}
// WebSocket handler for real-time metrics streaming
func (ac *AnalyticsCollector) WebSocketHandler() gin.HandlerFunc {
return func(c *gin.Context) {
// Upgrade to WebSocket connection
conn, err := upgradeWebSocket(c)
if err != nil {
c.JSON(500, gin.H{"error": "Failed to upgrade connection"})
return
}
defer conn.Close()
// Subscribe to metrics updates
updates := ac.Subscribe()
// Send initial metrics
currentMetrics := ac.GetCurrentMetrics()
if err := conn.WriteJSON(currentMetrics); err != nil {
return
}
// Stream updates
for {
select {
case metrics := <-updates:
if err := conn.WriteJSON(metrics); err != nil {
return
}
case <-c.Request.Context().Done():
return
}
}
}
}
Conclusion: Your Path to High-Performance AI
Go programming with Ollama transforms traditional AI applications into concurrent powerhouses that scale effortlessly. You've learned to implement goroutines for parallel processing, channels for safe communication, and advanced patterns like circuit breakers and rate limiting.
Your concurrent AI system now processes hundreds of requests simultaneously, maintains context across chat sessions, and provides real-time analytics. The performance gains are substantial: from sequential 5-second delays to concurrent 500ms responses.
Key benefits achieved:
- 10x performance improvement through concurrency
- Production-ready reliability with circuit breakers
- Scalable architecture supporting thousands of users
- Real-time monitoring and analytics capabilities
Start building your next AI application with Go's concurrency model. The combination of Go's efficiency and Ollama's lightweight inference creates systems that perform exceptionally well in production environments.
Ready to deploy your concurrent AI system? Your users are waiting for those lightning-fast responses that only proper concurrency can deliver.