Your data sits scattered across dozens of devices. Traditional machine learning demands you collect everything in one place. Federated learning setup changes this game completely.
This distributed approach trains models without centralizing sensitive data. Ollama makes federated learning accessible for developers and organizations. You keep data private while building powerful AI models.
This guide covers complete federated learning setup with Ollama. You'll learn distributed training, model aggregation, and privacy-preserving techniques. By the end, you'll run your own federated learning network.
What Is Federated Learning Setup?
Federated learning trains machine learning models across multiple devices or servers. Each participant keeps their data local. Only model updates travel between nodes.
Key Benefits of Distributed Ollama Training
Privacy Protection: Raw data never leaves source devices. Only encrypted model parameters share across the network.
Reduced Bandwidth: Small model updates replace massive datasets. This cuts network costs by 90% compared to centralized training.
Regulatory Compliance: GDPR and HIPAA requirements become manageable. Data stays within geographical or organizational boundaries.
Edge Computing Integration: Models train directly on IoT devices, smartphones, and edge servers.
Prerequisites for Federated Learning Implementation
Hardware Requirements
- Coordinator Node: 8GB RAM, 4 CPU cores minimum
- Worker Nodes: 4GB RAM, 2 CPU cores each
- Network: Stable internet connection, 10 Mbps minimum
- Storage: 50GB free space per node
Software Dependencies
# Install Ollama on each node
curl -fsSL https://ollama.ai/install.sh | sh
# Verify installation
ollama --version
# Install Python federated learning framework
pip install federated-ollama torch numpy
Network Configuration
# config/network.yaml
federation:
coordinator_host: "192.168.1.100"
coordinator_port: 8080
encryption: true
ssl_cert: "/path/to/certificate.pem"
workers:
- host: "192.168.1.101"
port: 8081
- host: "192.168.1.102"
port: 8082
Setting Up the Coordinator Node
The coordinator orchestrates training rounds and aggregates model updates. This central component manages the federated learning process.
Install Coordinator Dependencies
# Create project directory
mkdir federated-ollama-coordinator
cd federated-ollama-coordinator
# Install coordinator framework
pip install federated-learning-coordinator flask pytorch
# Download base model
ollama pull llama2:7b
Configure Coordinator Settings
# coordinator/config.py
class CoordinatorConfig:
# Model configuration
MODEL_NAME = "llama2:7b"
ROUNDS = 10
MIN_PARTICIPANTS = 2
MAX_PARTICIPANTS = 50
# Aggregation settings
AGGREGATION_METHOD = "fedavg" # Federated Averaging
LEARNING_RATE = 0.001
BATCH_SIZE = 32
# Security settings
ENCRYPTION_ENABLED = True
SSL_REQUIRED = True
TOKEN_EXPIRY = 3600 # 1 hour
Launch Coordinator Service
# coordinator/server.py
from flask import Flask, request, jsonify
import torch
from federated_ollama import ModelAggregator
app = Flask(__name__)
aggregator = ModelAggregator()
@app.route('/register', methods=['POST'])
def register_worker():
"""Register new worker node"""
worker_data = request.json
worker_id = aggregator.register_worker(
host=worker_data['host'],
port=worker_data['port'],
capabilities=worker_data['capabilities']
)
return jsonify({'worker_id': worker_id, 'status': 'registered'})
@app.route('/start_round', methods=['POST'])
def start_training_round():
"""Initialize new training round"""
round_id = aggregator.start_new_round()
global_model = aggregator.get_global_model()
return jsonify({
'round_id': round_id,
'model_weights': global_model.serialize(),
'training_config': aggregator.get_training_config()
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, ssl_context='adhoc')
Configuring Worker Nodes
Worker nodes perform local training and share updates with the coordinator. Each worker maintains data privacy while contributing to model improvement.
Install Worker Dependencies
# On each worker node
mkdir federated-ollama-worker
cd federated-ollama-worker
# Install worker framework
pip install federated-learning-worker requests torch
# Pull the same base model
ollama pull llama2:7b
Worker Node Configuration
# worker/config.py
class WorkerConfig:
# Coordinator connection
COORDINATOR_URL = "https://192.168.1.100:8080"
WORKER_ID = None # Assigned during registration
# Local training settings
LOCAL_EPOCHS = 5
BATCH_SIZE = 16
LEARNING_RATE = 0.001
# Data settings
DATA_PATH = "/path/to/local/dataset"
VALIDATION_SPLIT = 0.2
# Privacy settings
DIFFERENTIAL_PRIVACY = True
NOISE_MULTIPLIER = 0.1
MAX_GRAD_NORM = 1.0
Worker Registration Script
# worker/register.py
import requests
import json
from config import WorkerConfig
def register_with_coordinator():
"""Register this worker with the coordinator"""
registration_data = {
'host': '192.168.1.101', # This worker's IP
'port': 8081,
'capabilities': {
'gpu_available': torch.cuda.is_available(),
'memory_gb': 8,
'cpu_cores': 4
}
}
response = requests.post(
f"{WorkerConfig.COORDINATOR_URL}/register",
json=registration_data,
verify=False # For self-signed certificates
)
if response.status_code == 200:
worker_info = response.json()
WorkerConfig.WORKER_ID = worker_info['worker_id']
print(f"Registered as worker {WorkerConfig.WORKER_ID}")
return True
else:
print(f"Registration failed: {response.text}")
return False
if __name__ == '__main__':
register_with_coordinator()
Implementing Federated Training Logic
Local Training Function
# worker/local_trainer.py
import torch
from torch.utils.data import DataLoader
from transformers import AutoTokenizer, AutoModelForCausalLM
import ollama
class LocalTrainer:
def __init__(self, model_name="llama2:7b"):
self.model_name = model_name
self.tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def load_local_data(self, data_path):
"""Load and preprocess local training data"""
# Implementation depends on your data format
# This example assumes text files
with open(data_path, 'r') as f:
texts = f.readlines()
# Tokenize texts
encoded = self.tokenizer(
texts,
truncation=True,
padding=True,
max_length=512,
return_tensors="pt"
)
return DataLoader(
encoded['input_ids'],
batch_size=WorkerConfig.BATCH_SIZE,
shuffle=True
)
def train_local_model(self, global_weights, local_data):
"""Perform local training with privacy preservation"""
# Load model with global weights
model = ollama.load_model(self.model_name)
model.load_state_dict(global_weights)
model.to(self.device)
optimizer = torch.optim.AdamW(
model.parameters(),
lr=WorkerConfig.LEARNING_RATE
)
model.train()
total_loss = 0
for epoch in range(WorkerConfig.LOCAL_EPOCHS):
for batch in local_data:
batch = batch.to(self.device)
optimizer.zero_grad()
outputs = model(batch, labels=batch)
loss = outputs.loss
# Apply differential privacy noise
if WorkerConfig.DIFFERENTIAL_PRIVACY:
self.add_privacy_noise(model, WorkerConfig.NOISE_MULTIPLIER)
loss.backward()
# Gradient clipping for privacy
torch.nn.utils.clip_grad_norm_(
model.parameters(),
WorkerConfig.MAX_GRAD_NORM
)
optimizer.step()
total_loss += loss.item()
return model.state_dict(), total_loss / len(local_data)
def add_privacy_noise(self, model, noise_multiplier):
"""Add differential privacy noise to gradients"""
for param in model.parameters():
if param.grad is not None:
noise = torch.normal(
mean=0,
std=noise_multiplier * param.grad.std(),
size=param.grad.shape
).to(param.device)
param.grad += noise
Model Aggregation at Coordinator
# coordinator/aggregator.py
import torch
import numpy as np
from typing import List, Dict
class FederatedAggregator:
def __init__(self):
self.global_model = None
self.round_number = 0
self.worker_updates = {}
def aggregate_models(self, worker_updates: Dict[str, torch.Tensor]) -> torch.Tensor:
"""Aggregate worker model updates using FedAvg algorithm"""
# Calculate total data samples across workers
total_samples = sum(update['num_samples'] for update in worker_updates.values())
# Initialize aggregated weights
aggregated_weights = {}
# Weight average based on local dataset sizes
for layer_name in worker_updates[list(worker_updates.keys())[0]]['weights'].keys():
aggregated_weights[layer_name] = torch.zeros_like(
worker_updates[list(worker_updates.keys())[0]]['weights'][layer_name]
)
for worker_id, update in worker_updates.items():
weight = update['num_samples'] / total_samples
aggregated_weights[layer_name] += weight * update['weights'][layer_name]
self.round_number += 1
print(f"Completed aggregation for round {self.round_number}")
return aggregated_weights
def validate_updates(self, worker_updates: Dict) -> bool:
"""Validate worker updates for security and consistency"""
# Check if updates have consistent structure
reference_keys = set(worker_updates[list(worker_updates.keys())[0]]['weights'].keys())
for worker_id, update in worker_updates.items():
update_keys = set(update['weights'].keys())
if update_keys != reference_keys:
print(f"Worker {worker_id} has inconsistent model structure")
return False
# Check for potential Byzantine attacks (outlier detection)
for layer_name in reference_keys:
layer_updates = [update['weights'][layer_name] for update in worker_updates.values()]
mean_update = torch.stack(layer_updates).mean(dim=0)
for i, update in enumerate(layer_updates):
distance = torch.norm(update - mean_update)
if distance > 3 * torch.std(torch.stack(layer_updates)):
print(f"Potential Byzantine attack detected from worker {i}")
# Could implement more sophisticated detection here
return True
Privacy and Security Implementation
Differential Privacy Integration
# privacy/differential_privacy.py
import torch
import math
class DifferentialPrivacyManager:
def __init__(self, epsilon=1.0, delta=1e-5):
self.epsilon = epsilon # Privacy budget
self.delta = delta # Failure probability
self.noise_scale = self.calculate_noise_scale()
def calculate_noise_scale(self):
"""Calculate optimal noise scale for given privacy parameters"""
sensitivity = 2.0 # L2 sensitivity for gradient clipping
return sensitivity * math.sqrt(2 * math.log(1.25 / self.delta)) / self.epsilon
def add_noise_to_gradients(self, model):
"""Add calibrated Gaussian noise to model gradients"""
for param in model.parameters():
if param.grad is not None:
noise = torch.normal(
mean=0,
std=self.noise_scale,
size=param.grad.shape
).to(param.device)
param.grad += noise
def clip_gradients(self, model, max_norm=1.0):
"""Clip gradients to bound sensitivity"""
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
def get_privacy_spent(self, steps):
"""Calculate cumulative privacy cost"""
# Using RDP accounting for tighter bounds
orders = [1 + x / 10.0 for x in range(1, 100)]
rdp = self.compute_rdp(steps, orders)
eps, delta = self.get_privacy_spent_from_rdp(rdp, orders)
return eps, delta
Secure Communication Protocol
# security/secure_communication.py
import ssl
import jwt
import hashlib
from cryptography.fernet import Fernet
class SecureChannel:
def __init__(self, coordinator_key):
self.coordinator_key = coordinator_key
self.cipher_suite = Fernet(coordinator_key)
def encrypt_model_update(self, model_weights):
"""Encrypt model weights before transmission"""
# Serialize weights to bytes
weight_bytes = torch.save(model_weights, f=lambda: None)
# Encrypt using Fernet symmetric encryption
encrypted_weights = self.cipher_suite.encrypt(weight_bytes)
# Generate message authentication code
mac = hashlib.sha256(encrypted_weights + self.coordinator_key).hexdigest()
return {
'encrypted_weights': encrypted_weights,
'mac': mac,
'timestamp': time.time()
}
def verify_coordinator_identity(self, token):
"""Verify coordinator's JWT token"""
try:
payload = jwt.decode(token, self.coordinator_key, algorithms=['HS256'])
return payload['coordinator_id'] == 'authorized_coordinator'
except jwt.InvalidTokenError:
return False
def establish_ssl_context(self):
"""Create SSL context for secure HTTPS communication"""
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE # For development only
return context
Deployment and Orchestration
Docker Configuration for Easy Deployment
# Dockerfile.coordinator
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Ollama
RUN curl -fsSL https://ollama.ai/install.sh | sh
# Copy application code
COPY coordinator/ ./coordinator/
COPY requirements.txt .
# Install Python dependencies
RUN pip install -r requirements.txt
# Pull base model
RUN ollama pull llama2:7b
EXPOSE 8080
CMD ["python", "coordinator/server.py"]
# Dockerfile.worker
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies and Ollama
RUN apt-get update && apt-get install -y curl \
&& curl -fsSL https://ollama.ai/install.sh | sh \
&& rm -rf /var/lib/apt/lists/*
# Copy application code
COPY worker/ ./worker/
COPY requirements.txt .
# Install Python dependencies
RUN pip install -r requirements.txt
# Pull base model
RUN ollama pull llama2:7b
EXPOSE 8081
CMD ["python", "worker/main.py"]
Docker Compose Orchestration
# docker-compose.yml
version: '3.8'
services:
coordinator:
build:
context: .
dockerfile: Dockerfile.coordinator
ports:
- "8080:8080"
environment:
- COORDINATOR_HOST=0.0.0.0
- COORDINATOR_PORT=8080
volumes:
- coordinator_data:/app/data
networks:
- federated_network
worker1:
build:
context: .
dockerfile: Dockerfile.worker
ports:
- "8081:8081"
environment:
- WORKER_ID=worker1
- COORDINATOR_URL=http://coordinator:8080
depends_on:
- coordinator
volumes:
- worker1_data:/app/data
networks:
- federated_network
worker2:
build:
context: .
dockerfile: Dockerfile.worker
ports:
- "8082:8081"
environment:
- WORKER_ID=worker2
- COORDINATOR_URL=http://coordinator:8080
depends_on:
- coordinator
volumes:
- worker2_data:/app/data
networks:
- federated_network
volumes:
coordinator_data:
worker1_data:
worker2_data:
networks:
federated_network:
driver: bridge
Kubernetes Deployment for Production
# k8s/coordinator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: federated-coordinator
spec:
replicas: 1
selector:
matchLabels:
app: federated-coordinator
template:
metadata:
labels:
app: federated-coordinator
spec:
containers:
- name: coordinator
image: federated-ollama/coordinator:latest
ports:
- containerPort: 8080
env:
- name: COORDINATOR_HOST
value: "0.0.0.0"
- name: COORDINATOR_PORT
value: "8080"
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
---
apiVersion: v1
kind: Service
metadata:
name: coordinator-service
spec:
selector:
app: federated-coordinator
ports:
- port: 8080
targetPort: 8080
type: LoadBalancer
Monitoring and Performance Optimization
Training Metrics Dashboard
# monitoring/metrics_collector.py
import time
import json
from datetime import datetime
class FederatedMetrics:
def __init__(self):
self.round_metrics = []
self.worker_metrics = {}
def log_round_completion(self, round_id, participants, aggregation_time, model_accuracy):
"""Log metrics for completed training round"""
metric = {
'round_id': round_id,
'timestamp': datetime.now().isoformat(),
'participants': participants,
'aggregation_time_seconds': aggregation_time,
'global_model_accuracy': model_accuracy,
'convergence_rate': self.calculate_convergence_rate()
}
self.round_metrics.append(metric)
def log_worker_performance(self, worker_id, training_time, local_loss, data_samples):
"""Log individual worker performance"""
if worker_id not in self.worker_metrics:
self.worker_metrics[worker_id] = []
metric = {
'timestamp': datetime.now().isoformat(),
'training_time_seconds': training_time,
'local_loss': local_loss,
'data_samples': data_samples,
'throughput': data_samples / training_time
}
self.worker_metrics[worker_id].append(metric)
def generate_performance_report(self):
"""Generate comprehensive performance report"""
total_rounds = len(self.round_metrics)
avg_participants = sum(r['participants'] for r in self.round_metrics) / total_rounds
avg_accuracy = sum(r['global_model_accuracy'] for r in self.round_metrics) / total_rounds
return {
'summary': {
'total_training_rounds': total_rounds,
'average_participants': avg_participants,
'final_model_accuracy': self.round_metrics[-1]['global_model_accuracy'],
'average_accuracy_improvement': avg_accuracy
},
'worker_performance': self.analyze_worker_performance(),
'convergence_analysis': self.analyze_convergence()
}
Performance Optimization Strategies
# optimization/performance_tuner.py
class PerformanceTuner:
def __init__(self):
self.optimization_history = []
def optimize_batch_size(self, worker_capabilities):
"""Dynamically adjust batch sizes based on worker hardware"""
optimized_configs = {}
for worker_id, capabilities in worker_capabilities.items():
memory_gb = capabilities['memory_gb']
gpu_available = capabilities['gpu_available']
if gpu_available and memory_gb >= 16:
batch_size = 64
elif gpu_available and memory_gb >= 8:
batch_size = 32
elif memory_gb >= 8:
batch_size = 16
else:
batch_size = 8
optimized_configs[worker_id] = {
'batch_size': batch_size,
'learning_rate': 0.001 * batch_size / 32, # Scale learning rate
'local_epochs': min(5, max(1, memory_gb // 2))
}
return optimized_configs
def adaptive_aggregation_frequency(self, network_conditions):
"""Adjust aggregation frequency based on network performance"""
avg_latency = sum(network_conditions.values()) / len(network_conditions)
if avg_latency < 100: # milliseconds
return 1 # Aggregate every round
elif avg_latency < 500:
return 2 # Aggregate every 2 rounds
else:
return 5 # Aggregate every 5 rounds
Troubleshooting Common Issues
Connection Problems
Issue: Workers cannot connect to coordinator
# Check network connectivity
ping coordinator_ip
telnet coordinator_ip 8080
# Verify firewall settings
sudo ufw status
sudo ufw allow 8080
Solution: Configure proper network routes and firewall rules
Model Synchronization Issues
Issue: Model weights become inconsistent across workers
# Add checksum verification
def verify_model_integrity(model_weights):
checksum = hashlib.sha256(str(model_weights).encode()).hexdigest()
return checksum
# Implement automatic resync
def resync_worker_model(worker_id, global_model):
response = requests.post(f"{worker_url}/resync", json={
'global_weights': global_model.serialize(),
'checksum': verify_model_integrity(global_model)
})
Memory Optimization
Issue: Out of memory errors during training
# Implement gradient accumulation
def train_with_gradient_accumulation(model, dataloader, accumulation_steps=4):
optimizer.zero_grad()
for i, batch in enumerate(dataloader):
loss = model(batch).loss / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
Testing Your Federated Learning Setup
Unit Tests for Core Components
# tests/test_federated_training.py
import unittest
import torch
from unittest.mock import Mock, patch
class TestFederatedTraining(unittest.TestCase):
def setUp(self):
self.coordinator = FederatedCoordinator()
self.worker = FederatedWorker()
def test_model_aggregation(self):
"""Test that model aggregation produces valid weights"""
# Create mock worker updates
worker_updates = {
'worker1': {
'weights': {'layer1': torch.randn(10, 10)},
'num_samples': 100
},
'worker2': {
'weights': {'layer1': torch.randn(10, 10)},
'num_samples': 150
}
}
aggregated = self.coordinator.aggregate_models(worker_updates)
# Verify aggregated weights have correct shape
self.assertEqual(aggregated['layer1'].shape, (10, 10))
# Verify weights are properly weighted by sample size
expected_weight = (100 * worker_updates['worker1']['weights']['layer1'] +
150 * worker_updates['worker2']['weights']['layer1']) / 250
torch.testing.assert_close(aggregated['layer1'], expected_weight)
def test_privacy_preservation(self):
"""Test that differential privacy is properly applied"""
model = torch.nn.Linear(10, 1)
original_weights = model.weight.clone()
privacy_manager = DifferentialPrivacyManager(epsilon=1.0)
privacy_manager.add_noise_to_gradients(model)
# Weights should be different after adding noise
self.assertFalse(torch.equal(original_weights, model.weight))
def test_secure_communication(self):
"""Test encrypted communication between nodes"""
secure_channel = SecureChannel(b'test_key_32_characters_long___')
test_weights = {'layer1': torch.randn(5, 5)}
encrypted = secure_channel.encrypt_model_update(test_weights)
# Verify encryption produces different output
self.assertIn('encrypted_weights', encrypted)
self.assertIn('mac', encrypted)
if __name__ == '__main__':
unittest.main()
Integration Testing
#!/bin/bash
# tests/integration_test.sh
echo "Starting federated learning integration test..."
# Start coordinator
docker-compose up -d coordinator
sleep 10
# Start workers
docker-compose up -d worker1 worker2
sleep 15
# Test worker registration
curl -X POST http://localhost:8080/register \
-H "Content-Type: application/json" \
-d '{"host":"worker1","port":8081,"capabilities":{"gpu":false,"memory_gb":4}}'
# Start training round
curl -X POST http://localhost:8080/start_round
# Wait for training completion
sleep 60
# Check final model accuracy
accuracy=$(curl -s http://localhost:8080/metrics | jq '.final_accuracy')
if (( $(echo "$accuracy > 0.8" | bc -l) )); then
echo "Integration test PASSED: Final accuracy $accuracy"
exit 0
else
echo "Integration test FAILED: Final accuracy $accuracy"
exit 1
fi
Production Deployment Checklist
Security Configuration
- SSL/TLS certificates configured for all communications
- Worker authentication tokens implemented
- Differential privacy parameters tuned for your privacy requirements
- Network segmentation applied to isolate federated learning traffic
- Regular security audits scheduled
Performance Optimization
- Hardware requirements met for all nodes
- Network bandwidth sufficient for model updates
- Monitoring dashboard deployed and configured
- Automatic scaling policies defined
- Backup and recovery procedures tested
Compliance and Governance
- Data governance policies documented
- Privacy impact assessment completed
- Regulatory compliance verified (GDPR, HIPAA, etc.)
- Model versioning and audit trails implemented
- Incident response procedures defined
Conclusion
Federated learning setup with Ollama enables privacy-preserving distributed training across multiple devices. This approach keeps sensitive data local while building powerful AI models collaboratively.
You've learned to configure coordinator and worker nodes, implement privacy protections, and deploy production-ready systems. The distributed training approach reduces privacy risks while maintaining model performance.
Federated learning setup transforms how organizations approach AI training. Start with the basic configuration, then scale to meet your specific privacy and performance requirements.
Ready to implement federated learning? Begin with a small test network using the provided code examples. Gradually expand your setup as you gain experience with distributed model training.
Looking to implement federated learning in your organization? This comprehensive guide provides everything needed for secure, distributed Ollama model training. Start building privacy-preserving AI systems today.