Go Programming with Ollama: High-Performance Concurrent AI That Actually Works

Build blazing-fast concurrent AI applications with Go and Ollama. Learn goroutines, channels, and performance optimization for production-ready AI systems.

Picture this: Your AI application processes one request at a time while your users wait in digital purgatory. Meanwhile, your Go program sits there with 99% of its cores twiddling their thumbs. Sound familiar?

Go programming with Ollama transforms sluggish AI applications into concurrent powerhouses that handle hundreds of requests simultaneously. This guide shows you how to build production-ready AI systems that scale without breaking your server budget.

You'll learn to implement goroutines for parallel AI processing, use channels for safe data flow, and optimize performance for real-world applications. By the end, you'll have a complete concurrent AI system that outperforms traditional sequential approaches.

Why Go and Ollama Make Perfect Concurrent Partners

Traditional AI applications face a critical bottleneck: sequential processing. Each request waits for the previous one to complete, creating artificial delays that frustrate users and waste computational resources.

Go's concurrency model solves this problem through goroutines and channels. Unlike threads that consume megabytes of memory, goroutines use just 2KB and can spawn thousands without performance degradation.

Ollama's lightweight architecture complements Go's concurrency perfectly. It provides fast model inference with minimal memory overhead, making it ideal for high-throughput applications.

The Performance Problem

Consider a typical AI chat application:

  • Model inference: 500ms per request
  • Sequential processing: 10 requests = 5 seconds
  • Concurrent processing: 10 requests = 500ms

That's a 10x performance improvement with proper concurrency implementation.

Setting Up Your Go Ollama Environment

Before diving into concurrent programming, establish your development environment with the necessary tools and dependencies.

Prerequisites Installation

# Install Go (version 1.21+)
curl -OL https://golang.org/dl/go1.21.6.linux-amd64.tar.gz
sudo tar -C /usr/local -xzf go1.21.6.linux-amd64.tar.gz

# Install Ollama
curl -fsSL https://ollama.com/install.sh | sh

# Pull a lightweight model for testing
ollama pull llama3.2:1b

Project Structure Setup

mkdir go-ollama-concurrent
cd go-ollama-concurrent
go mod init go-ollama-concurrent

# Create directory structure
mkdir -p {cmd,internal/{ollama,concurrent,handlers},pkg/{models,utils}}

Essential Dependencies

// go.mod
module go-ollama-concurrent

go 1.21

require (
    github.com/gin-gonic/gin v1.9.1
    github.com/ollama/ollama v0.1.17
    golang.org/x/sync v0.5.0
)
Development Environment Setup Terminal

Building Your First Concurrent Ollama Client

Start with a basic concurrent client that demonstrates goroutines and channels working together for AI processing.

Basic Concurrent Client Structure

// internal/ollama/client.go
package ollama

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
    "time"
)

type Client struct {
    baseURL    string
    httpClient *http.Client
    mutex      sync.RWMutex
}

type Request struct {
    Model    string `json:"model"`
    Prompt   string `json:"prompt"`
    Stream   bool   `json:"stream"`
    Context  []int  `json:"context,omitempty"`
}

type Response struct {
    Response string `json:"response"`
    Done     bool   `json:"done"`
    Context  []int  `json:"context,omitempty"`
    Error    string `json:"error,omitempty"`
}

// NewClient creates a new Ollama client with optimized settings
func NewClient(baseURL string) *Client {
    return &Client{
        baseURL: baseURL,
        httpClient: &http.Client{
            Timeout: 30 * time.Second,
            Transport: &http.Transport{
                MaxIdleConns:        100,
                MaxIdleConnsPerHost: 10,
                IdleConnTimeout:     90 * time.Second,
            },
        },
    }
}

Implementing Concurrent Request Processing

// internal/concurrent/processor.go
package concurrent

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID       string
    Prompt   string
    Model    string
    Response chan Result
}

type Result struct {
    ID       string
    Response string
    Error    error
    Duration time.Duration
}

type Processor struct {
    client     *ollama.Client
    workers    int
    jobQueue   chan Job
    resultPool sync.Pool
    ctx        context.Context
    cancel     context.CancelFunc
}

// NewProcessor creates a concurrent processor with specified worker count
func NewProcessor(client *ollama.Client, workers int) *Processor {
    ctx, cancel := context.WithCancel(context.Background())
    
    p := &Processor{
        client:   client,
        workers:  workers,
        jobQueue: make(chan Job, workers*2), // Buffered channel for better performance
        ctx:      ctx,
        cancel:   cancel,
    }
    
    // Initialize result pool for memory efficiency
    p.resultPool = sync.Pool{
        New: func() interface{} {
            return &Result{}
        },
    }
    
    return p
}

// Start launches worker goroutines for concurrent processing
func (p *Processor) Start() {
    for i := 0; i < p.workers; i++ {
        go p.worker(i)
    }
}

// worker processes jobs from the queue concurrently
func (p *Processor) worker(id int) {
    for {
        select {
        case job := <-p.jobQueue:
            p.processJob(job, id)
        case <-p.ctx.Done():
            return
        }
    }
}

// processJob handles individual AI inference requests
func (p *Processor) processJob(job Job, workerID int) {
    start := time.Now()
    result := p.resultPool.Get().(*Result)
    defer p.resultPool.Put(result)
    
    // Reset result object
    result.ID = job.ID
    result.Response = ""
    result.Error = nil
    
    // Make Ollama API call
    response, err := p.client.Generate(p.ctx, job.Model, job.Prompt)
    if err != nil {
        result.Error = fmt.Errorf("worker %d failed: %w", workerID, err)
    } else {
        result.Response = response
    }
    
    result.Duration = time.Since(start)
    
    // Send result back through channel
    select {
    case job.Response <- *result:
    case <-p.ctx.Done():
        return
    }
}

Channel-Based Job Distribution

// ProcessBatch handles multiple requests concurrently
func (p *Processor) ProcessBatch(requests []Request) []Result {
    results := make([]Result, len(requests))
    var wg sync.WaitGroup
    
    for i, req := range requests {
        wg.Add(1)
        
        go func(index int, request Request) {
            defer wg.Done()
            
            job := Job{
                ID:       fmt.Sprintf("batch_%d", index),
                Prompt:   request.Prompt,
                Model:    request.Model,
                Response: make(chan Result, 1),
            }
            
            // Send job to worker pool
            select {
            case p.jobQueue <- job:
                // Wait for result
                select {
                case result := <-job.Response:
                    results[index] = result
                case <-time.After(30 * time.Second):
                    results[index] = Result{
                        ID:    job.ID,
                        Error: fmt.Errorf("timeout waiting for response"),
                    }
                }
            case <-p.ctx.Done():
                results[index] = Result{
                    ID:    job.ID,
                    Error: fmt.Errorf("processor stopped"),
                }
            }
        }(i, req)
    }
    
    wg.Wait()
    return results
}
Concurrent Processing Flow Diagram

Advanced Concurrency Patterns for AI Workloads

Real-world AI applications require sophisticated concurrency patterns beyond basic goroutines. These patterns handle complex scenarios like rate limiting, circuit breaking, and intelligent load balancing.

Worker Pool with Dynamic Scaling

// internal/concurrent/adaptive_pool.go
package concurrent

import (
    "context"
    "sync"
    "sync/atomic"
    "time"
)

type AdaptivePool struct {
    minWorkers    int
    maxWorkers    int
    currentWorkers int64
    jobQueue      chan Job
    workerQueue   chan chan Job
    quit          chan bool
    metrics       *PoolMetrics
}

type PoolMetrics struct {
    ProcessedJobs   int64
    AverageLatency  time.Duration
    QueueDepth      int64
    ActiveWorkers   int64
    mu              sync.RWMutex
}

// NewAdaptivePool creates a worker pool that scales based on demand
func NewAdaptivePool(min, max int) *AdaptivePool {
    pool := &AdaptivePool{
        minWorkers:    min,
        maxWorkers:    max,
        currentWorkers: int64(min),
        jobQueue:      make(chan Job, max*2),
        workerQueue:   make(chan chan Job, max),
        quit:          make(chan bool),
        metrics:       &PoolMetrics{},
    }
    
    // Start monitoring goroutine
    go pool.monitor()
    
    return pool
}

// monitor adjusts worker count based on queue depth and performance metrics
func (p *AdaptivePool) monitor() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            p.adjustWorkerCount()
        case <-p.quit:
            return
        }
    }
}

// adjustWorkerCount scales workers up or down based on metrics
func (p *AdaptivePool) adjustWorkerCount() {
    queueDepth := len(p.jobQueue)
    currentWorkers := atomic.LoadInt64(&p.currentWorkers)
    
    // Scale up if queue is backing up
    if queueDepth > int(currentWorkers)*2 && currentWorkers < int64(p.maxWorkers) {
        p.addWorker()
    }
    
    // Scale down if queue is consistently empty
    if queueDepth == 0 && currentWorkers > int64(p.minWorkers) {
        p.removeWorker()
    }
}

// addWorker spawns a new worker goroutine
func (p *AdaptivePool) addWorker() {
    atomic.AddInt64(&p.currentWorkers, 1)
    go p.worker()
}

// removeWorker signals a worker to stop
func (p *AdaptivePool) removeWorker() {
    if atomic.LoadInt64(&p.currentWorkers) > int64(p.minWorkers) {
        atomic.AddInt64(&p.currentWorkers, -1)
        // Worker will stop when it sees the decreased count
    }
}

Circuit Breaker Pattern for Reliability

// internal/concurrent/circuit_breaker.go
package concurrent

import (
    "errors"
    "sync"
    "time"
)

type CircuitState int

const (
    Closed CircuitState = iota
    Open
    HalfOpen
)

type CircuitBreaker struct {
    maxFailures    int
    resetTimeout   time.Duration
    state          CircuitState
    failures       int
    lastFailure    time.Time
    mutex          sync.RWMutex
    onStateChange  func(from, to CircuitState)
}

// NewCircuitBreaker creates a circuit breaker for fault tolerance
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures:  maxFailures,
        resetTimeout: resetTimeout,
        state:        Closed,
    }
}

// Execute runs a function with circuit breaker protection
func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    if cb.state == Open {
        if time.Since(cb.lastFailure) > cb.resetTimeout {
            cb.state = HalfOpen
            cb.failures = 0
        } else {
            return errors.New("circuit breaker is open")
        }
    }
    
    err := fn()
    
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        
        if cb.failures >= cb.maxFailures {
            cb.state = Open
            if cb.onStateChange != nil {
                cb.onStateChange(Closed, Open)
            }
        }
        
        return err
    }
    
    // Success - reset circuit breaker
    if cb.state == HalfOpen {
        cb.state = Closed
        if cb.onStateChange != nil {
            cb.onStateChange(HalfOpen, Closed)
        }
    }
    cb.failures = 0
    
    return nil
}

Rate Limiting for API Protection

// internal/concurrent/rate_limiter.go
package concurrent

import (
    "context"
    "sync"
    "time"
)

type RateLimiter struct {
    tokens    chan struct{}
    refillRate time.Duration
    capacity  int
    quit      chan struct{}
    mu        sync.Mutex
}

// NewRateLimiter creates a token bucket rate limiter
func NewRateLimiter(capacity int, refillRate time.Duration) *RateLimiter {
    rl := &RateLimiter{
        tokens:    make(chan struct{}, capacity),
        refillRate: refillRate,
        capacity:  capacity,
        quit:      make(chan struct{}),
    }
    
    // Fill initial tokens
    for i := 0; i < capacity; i++ {
        rl.tokens <- struct{}{}
    }
    
    // Start refill goroutine
    go rl.refill()
    
    return rl
}

// refill adds tokens at the specified rate
func (rl *RateLimiter) refill() {
    ticker := time.NewTicker(rl.refillRate)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            select {
            case rl.tokens <- struct{}{}:
            default:
                // Token bucket is full
            }
        case <-rl.quit:
            return
        }
    }
}

// Allow checks if a request can proceed
func (rl *RateLimiter) Allow() bool {
    select {
    case <-rl.tokens:
        return true
    default:
        return false
    }
}

// Wait blocks until a token is available
func (rl *RateLimiter) Wait(ctx context.Context) error {
    select {
    case <-rl.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}
Circuit Breaker and Rate Limiting Visualization

Performance Optimization Techniques

Maximizing concurrent AI performance requires attention to memory management, connection pooling, and algorithmic optimizations specific to AI workloads.

Memory Pool Management

// internal/concurrent/memory_pool.go
package concurrent

import (
    "sync"
)

type MemoryPool struct {
    requestPool  sync.Pool
    responsePool sync.Pool
    bufferPool   sync.Pool
}

// NewMemoryPool creates optimized memory pools for AI requests
func NewMemoryPool() *MemoryPool {
    return &MemoryPool{
        requestPool: sync.Pool{
            New: func() interface{} {
                return &Request{}
            },
        },
        responsePool: sync.Pool{
            New: func() interface{} {
                return &Response{}
            },
        },
        bufferPool: sync.Pool{
            New: func() interface{} {
                // Pre-allocate 64KB buffers for JSON processing
                return make([]byte, 64*1024)
            },
        },
    }
}

// GetRequest retrieves a request object from the pool
func (mp *MemoryPool) GetRequest() *Request {
    return mp.requestPool.Get().(*Request)
}

// PutRequest returns a request object to the pool
func (mp *MemoryPool) PutRequest(req *Request) {
    // Reset request fields
    req.Model = ""
    req.Prompt = ""
    req.Context = req.Context[:0]
    mp.requestPool.Put(req)
}

// GetBuffer retrieves a buffer from the pool
func (mp *MemoryPool) GetBuffer() []byte {
    return mp.bufferPool.Get().([]byte)
}

// PutBuffer returns a buffer to the pool
func (mp *MemoryPool) PutBuffer(buf []byte) {
    if cap(buf) == 64*1024 {
        mp.bufferPool.Put(buf[:0])
    }
}

Connection Pool Optimization

// internal/ollama/optimized_client.go
package ollama

import (
    "context"
    "net/http"
    "time"
)

type OptimizedClient struct {
    client     *http.Client
    memoryPool *MemoryPool
    rateLimiter *RateLimiter
    circuitBreaker *CircuitBreaker
}

// NewOptimizedClient creates a production-ready Ollama client
func NewOptimizedClient(baseURL string) *OptimizedClient {
    transport := &http.Transport{
        MaxIdleConns:        200,
        MaxIdleConnsPerHost: 50,
        IdleConnTimeout:     120 * time.Second,
        DisableCompression:  false,
        ForceAttemptHTTP2:   true,
    }
    
    client := &http.Client{
        Transport: transport,
        Timeout:   45 * time.Second,
    }
    
    return &OptimizedClient{
        client:         client,
        memoryPool:     NewMemoryPool(),
        rateLimiter:    NewRateLimiter(100, time.Second/50), // 50 RPS
        circuitBreaker: NewCircuitBreaker(5, 30*time.Second),
    }
}

// GenerateWithOptimization processes requests with all optimizations
func (oc *OptimizedClient) GenerateWithOptimization(ctx context.Context, model, prompt string) (string, error) {
    // Wait for rate limit
    if err := oc.rateLimiter.Wait(ctx); err != nil {
        return "", err
    }
    
    // Use circuit breaker
    var result string
    err := oc.circuitBreaker.Execute(func() error {
        var err error
        result, err = oc.generateInternal(ctx, model, prompt)
        return err
    })
    
    return result, err
}

// generateInternal handles the actual API call with memory pooling
func (oc *OptimizedClient) generateInternal(ctx context.Context, model, prompt string) (string, error) {
    // Get request from pool
    req := oc.memoryPool.GetRequest()
    defer oc.memoryPool.PutRequest(req)
    
    // Set request parameters
    req.Model = model
    req.Prompt = prompt
    req.Stream = false
    
    // Get buffer from pool
    buf := oc.memoryPool.GetBuffer()
    defer oc.memoryPool.PutBuffer(buf)
    
    // Make API call with pooled resources
    return oc.makeAPICall(ctx, req, buf)
}

Batch Processing Optimization

// internal/concurrent/batch_processor.go
package concurrent

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type BatchProcessor struct {
    client          *OptimizedClient
    batchSize       int
    maxWaitTime     time.Duration
    processingQueue chan *BatchItem
    batches         chan []*BatchItem
    results         sync.Map
}

type BatchItem struct {
    ID       string
    Model    string
    Prompt   string
    Response chan BatchResult
    Timeout  time.Duration
}

type BatchResult struct {
    ID       string
    Response string
    Error    error
    Latency  time.Duration
}

// NewBatchProcessor creates a processor that batches requests for efficiency
func NewBatchProcessor(client *OptimizedClient, batchSize int, maxWaitTime time.Duration) *BatchProcessor {
    bp := &BatchProcessor{
        client:          client,
        batchSize:       batchSize,
        maxWaitTime:     maxWaitTime,
        processingQueue: make(chan *BatchItem, batchSize*4),
        batches:         make(chan []*BatchItem, 10),
    }
    
    // Start batch collection goroutine
    go bp.collectBatches()
    
    // Start batch processing goroutines
    for i := 0; i < 4; i++ {
        go bp.processBatches()
    }
    
    return bp
}

// collectBatches groups individual requests into batches
func (bp *BatchProcessor) collectBatches() {
    var currentBatch []*BatchItem
    timer := time.NewTimer(bp.maxWaitTime)
    
    for {
        select {
        case item := <-bp.processingQueue:
            currentBatch = append(currentBatch, item)
            
            if len(currentBatch) >= bp.batchSize {
                bp.batches <- currentBatch
                currentBatch = nil
                timer.Reset(bp.maxWaitTime)
            }
            
        case <-timer.C:
            if len(currentBatch) > 0 {
                bp.batches <- currentBatch
                currentBatch = nil
            }
            timer.Reset(bp.maxWaitTime)
        }
    }
}

// processBatches handles batched requests concurrently
func (bp *BatchProcessor) processBatches() {
    for batch := range bp.batches {
        bp.processBatch(batch)
    }
}

// processBatch processes a single batch of requests
func (bp *BatchProcessor) processBatch(batch []*BatchItem) {
    var wg sync.WaitGroup
    
    for _, item := range batch {
        wg.Add(1)
        
        go func(item *BatchItem) {
            defer wg.Done()
            
            start := time.Now()
            ctx, cancel := context.WithTimeout(context.Background(), item.Timeout)
            defer cancel()
            
            response, err := bp.client.GenerateWithOptimization(ctx, item.Model, item.Prompt)
            
            result := BatchResult{
                ID:       item.ID,
                Response: response,
                Error:    err,
                Latency:  time.Since(start),
            }
            
            select {
            case item.Response <- result:
            default:
                // Response channel might be closed
            }
        }(item)
    }
    
    wg.Wait()
}
Performance Benchmarks: Concurrent vs Sequential Processing

Building a Production-Ready HTTP Server

Transform your concurrent AI processor into a robust HTTP server that handles real-world traffic patterns and provides monitoring capabilities.

HTTP Server Implementation

// cmd/server/main.go
package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    "github.com/gin-gonic/gin"
    "go-ollama-concurrent/internal/handlers"
    "go-ollama-concurrent/internal/concurrent"
    "go-ollama-concurrent/internal/ollama"
)

func main() {
    // Initialize components
    client := ollama.NewOptimizedClient("http://localhost:11434")
    processor := concurrent.NewProcessor(client, 10)
    batchProcessor := concurrent.NewBatchProcessor(client, 5, 100*time.Millisecond)
    
    // Start background processors
    processor.Start()
    
    // Setup HTTP server
    router := gin.Default()
    
    // Add middleware
    router.Use(gin.Recovery())
    router.Use(gin.Logger())
    router.Use(handlers.CORSMiddleware())
    router.Use(handlers.RateLimitMiddleware())
    
    // Setup routes
    api := router.Group("/api/v1")
    {
        api.POST("/generate", handlers.GenerateHandler(processor))
        api.POST("/batch", handlers.BatchHandler(batchProcessor))
        api.GET("/health", handlers.HealthHandler())
        api.GET("/metrics", handlers.MetricsHandler())
    }
    
    // Start server with graceful shutdown
    srv := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }
    
    go func() {
        if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Server failed: %v", err)
        }
    }()
    
    // Wait for interrupt signal
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    
    log.Println("Shutting down server...")
    
    // Graceful shutdown with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("Server forced to shutdown:", err)
    }
    
    log.Println("Server exited")
}

Request Handlers with Validation

// internal/handlers/api.go
package handlers

import (
    "net/http"
    "strconv"
    "time"
    
    "github.com/gin-gonic/gin"
    "go-ollama-concurrent/internal/concurrent"
    "go-ollama-concurrent/pkg/models"
)

type GenerateRequest struct {
    Model   string `json:"model" binding:"required"`
    Prompt  string `json:"prompt" binding:"required,min=1,max=4000"`
    Timeout int    `json:"timeout,omitempty"`
}

type BatchRequest struct {
    Requests []GenerateRequest `json:"requests" binding:"required,min=1,max=50"`
}

// GenerateHandler processes single AI generation requests
func GenerateHandler(processor *concurrent.Processor) gin.HandlerFunc {
    return func(c *gin.Context) {
        var req GenerateRequest
        
        if err := c.ShouldBindJSON(&req); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{
                "error": "Invalid request format",
                "details": err.Error(),
            })
            return
        }
        
        // Set default timeout
        timeout := 30 * time.Second
        if req.Timeout > 0 {
            timeout = time.Duration(req.Timeout) * time.Second
        }
        
        // Create job
        job := concurrent.Job{
            ID:       generateJobID(),
            Prompt:   req.Prompt,
            Model:    req.Model,
            Response: make(chan concurrent.Result, 1),
        }
        
        // Process with timeout
        ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
        defer cancel()
        
        select {
        case processor.JobQueue() <- job:
            select {
            case result := <-job.Response:
                if result.Error != nil {
                    c.JSON(http.StatusInternalServerError, gin.H{
                        "error": "Generation failed",
                        "details": result.Error.Error(),
                    })
                    return
                }
                
                c.JSON(http.StatusOK, gin.H{
                    "id":       result.ID,
                    "response": result.Response,
                    "duration": result.Duration.Milliseconds(),
                })
                
            case <-ctx.Done():
                c.JSON(http.StatusRequestTimeout, gin.H{
                    "error": "Request timeout",
                })
            }
            
        case <-ctx.Done():
            c.JSON(http.StatusServiceUnavailable, gin.H{
                "error": "Service temporarily unavailable",
            })
        }
    }
}

// BatchHandler processes multiple requests concurrently
func BatchHandler(batchProcessor *concurrent.BatchProcessor) gin.HandlerFunc {
    return func(c *gin.Context) {
        var req BatchRequest
        
        if err := c.ShouldBindJSON(&req); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{
                "error": "Invalid batch request format",
                "details": err.Error(),
            })
            return
        }
        
        // Process batch
        results := make([]gin.H, len(req.Requests))
        responseChan := make(chan concurrent.BatchResult, len(req.Requests))
        
        for i, genReq := range req.Requests {
            timeout := 30 * time.Second
            if genReq.Timeout > 0 {
                timeout = time.Duration(genReq.Timeout) * time.Second
            }
            
            batchItem := &concurrent.BatchItem{
                ID:       generateJobID(),
                Model:    genReq.Model,
                Prompt:   genReq.Prompt,
                Response: responseChan,
                Timeout:  timeout,
            }
            
            batchProcessor.Submit(batchItem)
        }
        
        // Collect results
        for i := 0; i < len(req.Requests); i++ {
            select {
            case result := <-responseChan:
                results[i] = gin.H{
                    "id":       result.ID,
                    "response": result.Response,
                    "error":    result.Error,
                    "duration": result.Latency.Milliseconds(),
                }
            case <-time.After(45 * time.Second):
                results[i] = gin.H{
                    "error": "Batch processing timeout",
                }
            }
        }
        
        c.JSON(http.StatusOK, gin.H{
            "results": results,
            "total":   len(results),
        })
    }
}

Monitoring and Metrics

// internal/handlers/metrics.go
package handlers

import (
    "net/http"
    "runtime"
    "time"
    
    "github.com/gin-gonic/gin"
)

type SystemMetrics struct {
    Timestamp        time.Time `json:"timestamp"`
    Goroutines       int       `json:"goroutines"`
    MemoryAllocated  uint64    `json:"memory_allocated_mb"`
    MemorySystem     uint64    `json:"memory_system_mb"`
    GCCycles         uint32    `json:"gc_cycles"`
    RequestsTotal    int64     `json:"requests_total"`
    RequestsActive   int64     `json:"requests_active"`
    AverageLatency   float64   `json:"average_latency_ms"`
    ErrorRate        float64   `json:"error_rate_percent"`
}

var metrics = &SystemMetrics{}

// MetricsHandler returns system and application metrics
func MetricsHandler() gin.HandlerFunc {
    return func(c *gin.Context) {
        var m runtime.MemStats
        runtime.ReadMemStats(&m)
        
        metrics.Timestamp = time.Now()
        metrics.Goroutines = runtime.NumGoroutine()
        metrics.MemoryAllocated = m.Alloc / 1024 / 1024
        metrics.MemorySystem = m.Sys / 1024 / 1024
        metrics.GCCycles = m.NumGC
        
        c.JSON(http.StatusOK, metrics)
    }
}

// HealthHandler provides health check endpoint
func HealthHandler() gin.HandlerFunc {
    return func(c *gin.Context) {
        c.JSON(http.StatusOK, gin.H{
            "status":    "healthy",
            "timestamp": time.Now(),
            "version":   "1.0.0",
        })
    }
}
HTTP Server Metrics Dashboard

Testing and Benchmarking Your Implementation

Comprehensive testing ensures your concurrent AI system performs reliably under various load conditions and edge cases.

Load Testing Setup

// test/load_test.go
package test

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
    "testing"
    "time"
)

func BenchmarkConcurrentGeneration(b *testing.B) {
    serverURL := "http://localhost:8080"
    
    testCases := []struct {
        name        string
        concurrent  int
        requests    int
        prompt      string
    }{
        {"Light Load", 10, 100, "Hello, world!"},
        {"Medium Load", 50, 500, "Explain quantum computing in simple terms."},
        {"Heavy Load", 100, 1000, "Write a detailed analysis of machine learning algorithms."},
    }
    
    for _, tc := range testCases {
        b.Run(tc.name, func(b *testing.B) {
            benchmarkConcurrentRequests(b, serverURL, tc.concurrent, tc.requests, tc.prompt)
        })
    }
}

func benchmarkConcurrentRequests(b *testing.B, serverURL string, concurrent, requests int, prompt string) {
    b.ResetTimer()
    
    for i := 0; i < b.N; i++ {
        var wg sync.WaitGroup
        semaphore := make(chan struct{}, concurrent)
        
        start := time.Now()
        
        for j := 0; j < requests; j++ {
            wg.Add(1)
            
            go func() {
                defer wg.Done()
                
                semaphore <- struct{}{}
                defer func() { <-semaphore }()
                
                makeRequest(serverURL, prompt)
            }()
        }
        
        wg.Wait()
        duration := time.Since(start)
        
        b.ReportMetric(float64(requests)/duration.Seconds(), "requests/sec")
        b.ReportMetric(duration.Seconds()/float64(requests)*1000, "ms/request")
    }
}

func makeRequest(serverURL, prompt string) error {
    requestBody := map[string]interface{}{
        "model":  "llama3.2:1b",
        "prompt": prompt,
    }
    
    body, _ := json.Marshal(requestBody)
    
    resp, err := http.Post(serverURL+"/api/v1/generate", "application/json", bytes.NewBuffer(body))
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    return nil
}

Unit Testing for Concurrent Components

// test/concurrent_test.go
package test

import (
    "context"
    "sync"
    "testing"
    "time"
    
    "go-ollama-concurrent/internal/concurrent"
    "go-ollama-concurrent/internal/ollama"
)

func TestProcessorConcurrency(t *testing.T) {
    client := ollama.NewOptimizedClient("http://localhost:11434")
    processor := concurrent.NewProcessor(client, 5)
    processor.Start()
    
    // Test concurrent job processing
    jobs := make([]concurrent.Job, 10)
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {
        jobs[i] = concurrent.Job{
            ID:       fmt.Sprintf("test_%d", i),
            Prompt:   "Test prompt",
            Model:    "llama3.2:1b",
            Response: make(chan concurrent.Result, 1),
        }
        
        wg.Add(1)
        go func(job concurrent.Job) {
            defer wg.Done()
            
            // Submit job
            processor.JobQueue() <- job
            
            // Wait for result
            select {
            case result := <-job.Response:
                if result.Error != nil {
                    t.Errorf("Job %s failed: %v", job.ID, result.Error)
                }
            case <-time.After(30 * time.Second):
                t.Errorf("Job %s timed out", job.ID)
            }
        }(jobs[i])
    }
    
    wg.Wait()
}

func TestCircuitBreaker(t *testing.T) {
    cb := concurrent.NewCircuitBreaker(3, 5*time.Second)
    
    // Test normal operation
    err := cb.Execute(func() error {
        return nil
    })
    
    if err != nil {
        t.Errorf("Circuit breaker should allow normal operation: %v", err)
    }
    
    // Test failure handling
    for i := 0; i < 3; i++ {
        cb.Execute(func() error {
            return fmt.Errorf("simulated failure")
        })
    }
    
    // Circuit should be open now
    err = cb.Execute(func() error {
        return nil
    })
    
    if err == nil {
        t.Error("Circuit breaker should reject requests when open")
    }
}

func TestRateLimiter(t *testing.T) {
    rl := concurrent.NewRateLimiter(10, 100*time.Millisecond)
    
    // Test initial capacity
    allowed := 0
    for i := 0; i < 15; i++ {
        if rl.Allow() {
            allowed++
        }
    }
    
    if allowed != 10 {
        t.Errorf("Expected 10 allowed requests, got %d", allowed)
    }
    
    // Test refill
    time.Sleep(200 * time.Millisecond)
    
    if !rl.Allow() {
        t.Error("Rate limiter should have refilled tokens")
    }
}

Performance Benchmarking

// test/benchmark_test.go
package test

import (
    "context"
    "fmt"
    "sync"
    "testing"
    "time"
    
    "go-ollama-concurrent/internal/concurrent"
    "go-ollama-concurrent/internal/ollama"
)

func BenchmarkSequentialProcessing(b *testing.B) {
    client := ollama.NewOptimizedClient("http://localhost:11434")
    
    b.ResetTimer()
    
    for i := 0; i < b.N; i++ {
        for j := 0; j < 10; j++ {
            _, err := client.Generate(context.Background(), "llama3.2:1b", "Test prompt")
            if err != nil {
                b.Errorf("Generation failed: %v", err)
            }
        }
    }
}

func BenchmarkConcurrentProcessing(b *testing.B) {
    client := ollama.NewOptimizedClient("http://localhost:11434")
    processor := concurrent.NewProcessor(client, 10)
    processor.Start()
    
    b.ResetTimer()
    
    for i := 0; i < b.N; i++ {
        var wg sync.WaitGroup
        
        for j := 0; j < 10; j++ {
            wg.Add(1)
            
            go func(id int) {
                defer wg.Done()
                
                job := concurrent.Job{
                    ID:       fmt.Sprintf("bench_%d", id),
                    Prompt:   "Test prompt",
                    Model:    "llama3.2:1b",
                    Response: make(chan concurrent.Result, 1),
                }
                
                processor.JobQueue() <- job
                <-job.Response
            }(j)
        }
        
        wg.Wait()
    }
}

func BenchmarkMemoryPooling(b *testing.B) {
    pool := concurrent.NewMemoryPool()
    
    b.ResetTimer()
    
    for i := 0; i < b.N; i++ {
        // Test request pooling
        req := pool.GetRequest()
        req.Model = "llama3.2:1b"
        req.Prompt = "Test prompt"
        pool.PutRequest(req)
        
        // Test buffer pooling
        buf := pool.GetBuffer()
        pool.PutBuffer(buf)
    }
}
Concurrent vs Sequential Processing Benchmark Results

Deployment and Scaling Strategies

Deploy your concurrent AI system with container orchestration, load balancing, and auto-scaling capabilities for production environments.

Docker Configuration

# Dockerfile
FROM golang:1.21-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main cmd/server/main.go

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/

COPY --from=builder /app/main .
COPY --from=builder /app/config ./config

EXPOSE 8080

CMD ["./main"]

Docker Compose with Ollama

# docker-compose.yml
version: '3.8'

services:
  ollama:
    image: ollama/ollama:latest
    ports:
      - "11434:11434"
    volumes:
      - ollama_data:/root/.ollama
    environment:
      - OLLAMA_HOST=0.0.0.0
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

  go-ai-server:
    build: .
    ports:
      - "8080:8080"
    depends_on:
      - ollama
    environment:
      - OLLAMA_URL=http://ollama:11434
      - WORKER_COUNT=20
      - RATE_LIMIT=100
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '2'
          memory: 2G
        reservations:
          cpus: '1'
          memory: 1G

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - go-ai-server

volumes:
  ollama_data:

Kubernetes Deployment

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: go-ollama-concurrent
  labels:
    app: go-ollama-concurrent
spec:
  replicas: 5
  selector:
    matchLabels:
      app: go-ollama-concurrent
  template:
    metadata:
      labels:
        app: go-ollama-concurrent
    spec:
      containers:
      - name: go-ollama-concurrent
        image: go-ollama-concurrent:latest
        ports:
        - containerPort: 8080
        env:
        - name: OLLAMA_URL
          value: "http://ollama-service:11434"
        - name: WORKER_COUNT
          value: "15"
        resources:
          limits:
            cpu: 2000m
            memory: 2Gi
          requests:
            cpu: 1000m
            memory: 1Gi
        livenessProbe:
          httpGet:
            path: /api/v1/health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /api/v1/health
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: go-ollama-service
spec:
  selector:
    app: go-ollama-concurrent
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: go-ollama-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: go-ollama-concurrent
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

Load Balancer Configuration

# nginx.conf
upstream go_ollama_backend {
    least_conn;
    server go-ai-server:8080 max_fails=3 fail_timeout=30s;
    server go-ai-server:8080 max_fails=3 fail_timeout=30s;
    server go-ai-server:8080 max_fails=3 fail_timeout=30s;
}

server {
    listen 80;
    
    location /api/v1/ {
        proxy_pass http://go_ollama_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # Timeouts for long-running AI requests
        proxy_connect_timeout 60s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
        
        # Buffer settings
        proxy_buffering on;
        proxy_buffer_size 8k;
        proxy_buffers 8 8k;
        
        # Rate limiting
        limit_req zone=api burst=20 nodelay;
    }
    
    location /health {
        access_log off;
        proxy_pass http://go_ollama_backend;
    }
}

# Rate limiting zone
http {
    limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
}
Deployment Architecture Diagram

Real-World Use Cases and Examples

Apply your concurrent AI system to practical scenarios that demonstrate the power of Go's concurrency for AI workloads.

Chatbot with Context Management

// internal/chatbot/context_manager.go
package chatbot

import (
    "context"
    "sync"
    "time"
    
    "go-ollama-concurrent/internal/concurrent"
)

type ChatSession struct {
    ID       string
    UserID   string
    Context  []int
    LastUsed time.Time
    Messages []Message
    mu       sync.RWMutex
}

type Message struct {
    Role      string    `json:"role"`
    Content   string    `json:"content"`
    Timestamp time.Time `json:"timestamp"`
}

type ContextManager struct {
    sessions    map[string]*ChatSession
    processor   *concurrent.Processor
    mu          sync.RWMutex
    cleanupTicker *time.Ticker
}

// NewContextManager creates a chatbot with session management
func NewContextManager(processor *concurrent.Processor) *ContextManager {
    cm := &ContextManager{
        sessions:  make(map[string]*ChatSession),
        processor: processor,
        cleanupTicker: time.NewTicker(10 * time.Minute),
    }
    
    // Start cleanup goroutine
    go cm.cleanupSessions()
    
    return cm
}

// ProcessMessage handles chat messages with context preservation
func (cm *ContextManager) ProcessMessage(sessionID, userID, message string) (string, error) {
    session := cm.getOrCreateSession(sessionID, userID)
    
    session.mu.Lock()
    session.Messages = append(session.Messages, Message{
        Role:      "user",
        Content:   message,
        Timestamp: time.Now(),
    })
    session.LastUsed = time.Now()
    session.mu.Unlock()
    
    // Build context-aware prompt
    prompt := cm.buildContextPrompt(session, message)
    
    // Process with concurrent processor
    job := concurrent.Job{
        ID:       fmt.Sprintf("chat_%s_%d", sessionID, time.Now().UnixNano()),
        Prompt:   prompt,
        Model:    "llama3.2:1b",
        Response: make(chan concurrent.Result, 1),
    }
    
    cm.processor.JobQueue() <- job
    
    // Wait for response
    select {
    case result := <-job.Response:
        if result.Error != nil {
            return "", result.Error
        }
        
        // Update session with response
        session.mu.Lock()
        session.Messages = append(session.Messages, Message{
            Role:      "assistant",
            Content:   result.Response,
            Timestamp: time.Now(),
        })
        session.mu.Unlock()
        
        return result.Response, nil
        
    case <-time.After(30 * time.Second):
        return "", fmt.Errorf("chat response timeout")
    }
}

// buildContextPrompt creates a context-aware prompt from chat history
func (cm *ContextManager) buildContextPrompt(session *ChatSession, newMessage string) string {
    session.mu.RLock()
    defer session.mu.RUnlock()
    
    var contextBuilder strings.Builder
    
    // Add recent messages for context (last 10 messages)
    start := len(session.Messages) - 10
    if start < 0 {
        start = 0
    }
    
    contextBuilder.WriteString("Previous conversation:\n")
    for i := start; i < len(session.Messages); i++ {
        msg := session.Messages[i]
        contextBuilder.WriteString(fmt.Sprintf("%s: %s\n", msg.Role, msg.Content))
    }
    
    contextBuilder.WriteString(fmt.Sprintf("user: %s\nassistant:", newMessage))
    
    return contextBuilder.String()
}

Document Processing Pipeline

// internal/pipeline/document_processor.go
package pipeline

import (
    "context"
    "fmt"
    "sync"
    "time"
    
    "go-ollama-concurrent/internal/concurrent"
)

type DocumentProcessor struct {
    processor     *concurrent.Processor
    stages        []ProcessingStage
    results       chan ProcessingResult
    errors        chan error
}

type ProcessingStage struct {
    Name     string
    Function func(context.Context, string) (string, error)
    Timeout  time.Duration
}

type ProcessingResult struct {
    DocumentID string
    Stage      string
    Result     string
    Duration   time.Duration
    Error      error
}

// NewDocumentProcessor creates a multi-stage document processing pipeline
func NewDocumentProcessor(processor *concurrent.Processor) *DocumentProcessor {
    dp := &DocumentProcessor{
        processor: processor,
        results:   make(chan ProcessingResult, 100),
        errors:    make(chan error, 100),
    }
    
    // Define processing stages
    dp.stages = []ProcessingStage{
        {
            Name:    "extract_key_points",
            Function: dp.extractKeyPoints,
            Timeout: 30 * time.Second,
        },
        {
            Name:    "summarize",
            Function: dp.summarize,
            Timeout: 45 * time.Second,
        },
        {
            Name:    "generate_tags",
            Function: dp.generateTags,
            Timeout: 20 * time.Second,
        },
        {
            Name:    "sentiment_analysis",
            Function: dp.analyzeSentiment,
            Timeout: 15 * time.Second,
        },
    }
    
    return dp
}

// ProcessDocument runs all stages concurrently for a document
func (dp *DocumentProcessor) ProcessDocument(ctx context.Context, documentID, content string) map[string]ProcessingResult {
    var wg sync.WaitGroup
    results := make(map[string]ProcessingResult)
    var mu sync.Mutex
    
    for _, stage := range dp.stages {
        wg.Add(1)
        
        go func(stage ProcessingStage) {
            defer wg.Done()
            
            start := time.Now()
            stageCtx, cancel := context.WithTimeout(ctx, stage.Timeout)
            defer cancel()
            
            result, err := stage.Function(stageCtx, content)
            
            processResult := ProcessingResult{
                DocumentID: documentID,
                Stage:      stage.Name,
                Result:     result,
                Duration:   time.Since(start),
                Error:      err,
            }
            
            mu.Lock()
            results[stage.Name] = processResult
            mu.Unlock()
            
            select {
            case dp.results <- processResult:
            default:
                // Channel full, log warning
            }
        }(stage)
    }
    
    wg.Wait()
    return results
}

// extractKeyPoints identifies important points in the document
func (dp *DocumentProcessor) extractKeyPoints(ctx context.Context, content string) (string, error) {
    prompt := fmt.Sprintf("Extract the 5 most important key points from this document:\n\n%s", content)
    
    job := concurrent.Job{
        ID:       fmt.Sprintf("extract_%d", time.Now().UnixNano()),
        Prompt:   prompt,
        Model:    "llama3.2:1b",
        Response: make(chan concurrent.Result, 1),
    }
    
    select {
    case dp.processor.JobQueue() <- job:
        select {
        case result := <-job.Response:
            return result.Response, result.Error
        case <-ctx.Done():
            return "", ctx.Err()
        }
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

// summarize creates a concise summary of the document
func (dp *DocumentProcessor) summarize(ctx context.Context, content string) (string, error) {
    prompt := fmt.Sprintf("Create a concise summary of this document in 3-4 sentences:\n\n%s", content)
    
    return dp.processWithAI(ctx, prompt, "summarize")
}

// generateTags creates relevant tags for the document
func (dp *DocumentProcessor) generateTags(ctx context.Context, content string) (string, error) {
    prompt := fmt.Sprintf("Generate 8-10 relevant tags for this document (comma-separated):\n\n%s", content)
    
    return dp.processWithAI(ctx, prompt, "tags")
}

// analyzeSentiment determines the emotional tone of the document
func (dp *DocumentProcessor) analyzeSentiment(ctx context.Context, content string) (string, error) {
    prompt := fmt.Sprintf("Analyze the sentiment of this document (positive/negative/neutral) and provide a brief explanation:\n\n%s", content)
    
    return dp.processWithAI(ctx, prompt, "sentiment")
}

// processWithAI is a helper function for AI processing
func (dp *DocumentProcessor) processWithAI(ctx context.Context, prompt, jobType string) (string, error) {
    job := concurrent.Job{
        ID:       fmt.Sprintf("%s_%d", jobType, time.Now().UnixNano()),
        Prompt:   prompt,
        Model:    "llama3.2:1b",
        Response: make(chan concurrent.Result, 1),
    }
    
    select {
    case dp.processor.JobQueue() <- job:
        select {
        case result := <-job.Response:
            return result.Response, result.Error
        case <-ctx.Done():
            return "", ctx.Err()
        }
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

Real-time Analytics Dashboard

// internal/analytics/dashboard.go
package analytics

import (
    "context"
    "encoding/json"
    "sync"
    "time"
    
    "github.com/gin-gonic/gin"
    "go-ollama-concurrent/internal/concurrent"
)

type DashboardMetrics struct {
    RequestsPerSecond   float64           `json:"requests_per_second"`
    AverageLatency      time.Duration     `json:"average_latency"`
    ErrorRate           float64           `json:"error_rate"`
    ActiveConnections   int               `json:"active_connections"`
    QueueDepth          int               `json:"queue_depth"`
    ModelUsage          map[string]int    `json:"model_usage"`
    TopPrompts          []string          `json:"top_prompts"`
    SystemResources     SystemMetrics     `json:"system_resources"`
    LastUpdated         time.Time         `json:"last_updated"`
}

type AnalyticsCollector struct {
    metrics      *DashboardMetrics
    processor    *concurrent.Processor
    mu           sync.RWMutex
    subscribers  []chan *DashboardMetrics
    updateTicker *time.Ticker
}

// NewAnalyticsCollector creates a real-time analytics system
func NewAnalyticsCollector(processor *concurrent.Processor) *AnalyticsCollector {
    ac := &AnalyticsCollector{
        metrics: &DashboardMetrics{
            ModelUsage: make(map[string]int),
            TopPrompts: make([]string, 0),
        },
        processor:    processor,
        updateTicker: time.NewTicker(5 * time.Second),
    }
    
    // Start metrics collection
    go ac.collectMetrics()
    
    return ac
}

// collectMetrics runs continuously to gather system metrics
func (ac *AnalyticsCollector) collectMetrics() {
    for {
        select {
        case <-ac.updateTicker.C:
            ac.updateMetrics()
            ac.broadcastMetrics()
        }
    }
}

// updateMetrics calculates current system metrics
func (ac *AnalyticsCollector) updateMetrics() {
    ac.mu.Lock()
    defer ac.mu.Unlock()
    
    // Update timestamp
    ac.metrics.LastUpdated = time.Now()
    
    // Get processor metrics
    if processorMetrics := ac.processor.GetMetrics(); processorMetrics != nil {
        ac.metrics.RequestsPerSecond = processorMetrics.RequestsPerSecond
        ac.metrics.AverageLatency = processorMetrics.AverageLatency
        ac.metrics.ErrorRate = processorMetrics.ErrorRate
        ac.metrics.QueueDepth = processorMetrics.QueueDepth
    }
    
    // Update system resources
    ac.metrics.SystemResources = getSystemMetrics()
}

// broadcastMetrics sends updates to all subscribers
func (ac *AnalyticsCollector) broadcastMetrics() {
    ac.mu.RLock()
    metricsCopy := *ac.metrics
    subscribers := make([]chan *DashboardMetrics, len(ac.subscribers))
    copy(subscribers, ac.subscribers)
    ac.mu.RUnlock()
    
    for _, subscriber := range subscribers {
        select {
        case subscriber <- &metricsCopy:
        default:
            // Subscriber not ready, skip
        }
    }
}

// Subscribe adds a new metrics subscriber
func (ac *AnalyticsCollector) Subscribe() <-chan *DashboardMetrics {
    ac.mu.Lock()
    defer ac.mu.Unlock()
    
    subscriber := make(chan *DashboardMetrics, 10)
    ac.subscribers = append(ac.subscribers, subscriber)
    
    return subscriber
}

// GetCurrentMetrics returns the latest metrics snapshot
func (ac *AnalyticsCollector) GetCurrentMetrics() *DashboardMetrics {
    ac.mu.RLock()
    defer ac.mu.RUnlock()
    
    metricsCopy := *ac.metrics
    return &metricsCopy
}

// WebSocket handler for real-time metrics streaming
func (ac *AnalyticsCollector) WebSocketHandler() gin.HandlerFunc {
    return func(c *gin.Context) {
        // Upgrade to WebSocket connection
        conn, err := upgradeWebSocket(c)
        if err != nil {
            c.JSON(500, gin.H{"error": "Failed to upgrade connection"})
            return
        }
        defer conn.Close()
        
        // Subscribe to metrics updates
        updates := ac.Subscribe()
        
        // Send initial metrics
        currentMetrics := ac.GetCurrentMetrics()
        if err := conn.WriteJSON(currentMetrics); err != nil {
            return
        }
        
        // Stream updates
        for {
            select {
            case metrics := <-updates:
                if err := conn.WriteJSON(metrics); err != nil {
                    return
                }
            case <-c.Request.Context().Done():
                return
            }
        }
    }
}
Real-time Analytics Dashboard

Conclusion: Your Path to High-Performance AI

Go programming with Ollama transforms traditional AI applications into concurrent powerhouses that scale effortlessly. You've learned to implement goroutines for parallel processing, channels for safe communication, and advanced patterns like circuit breakers and rate limiting.

Your concurrent AI system now processes hundreds of requests simultaneously, maintains context across chat sessions, and provides real-time analytics. The performance gains are substantial: from sequential 5-second delays to concurrent 500ms responses.

Key benefits achieved:

  • 10x performance improvement through concurrency
  • Production-ready reliability with circuit breakers
  • Scalable architecture supporting thousands of users
  • Real-time monitoring and analytics capabilities

Start building your next AI application with Go's concurrency model. The combination of Go's efficiency and Ollama's lightweight inference creates systems that perform exceptionally well in production environments.

Ready to deploy your concurrent AI system? Your users are waiting for those lightning-fast responses that only proper concurrency can deliver.