skip to content

Part 4: From Development to Production - Deploying and Operating RAG Systems

This is Part 4 of a 4-part series on building production-ready, multi-layer RAG systems with Ragforge.


From Architecture to Reality

In the previous three parts, we explored:

  • Part 1: Why multi-layer RAG is necessary
  • Part 2: How the retrieval pipeline works
  • Part 3: The microservices architecture

Now comes the practical part: How do you actually run this system?

This final part covers:

  • Local development setup
  • Production deployment (AWS)
  • Testing at every level
  • Observability and debugging
  • Performance optimization
  • Extending to new domains

Let’s get your RAG system running!


Local Development Setup

Prerequisites

Terminal window
# Required
- Docker & Docker Compose (20.10+)
- Git
# Optional (for local development without Docker)
- Python 3.12
- Node.js 20+
- uv (https://github.com/astral-sh/uv)

Quick Start (5 Minutes)

Terminal window
# 1. Clone the repository
git clone https://github.com/your-org/ragforge.git
cd ragforge
# 2. Create environment file
cp .env.example .env
# 3. Add your API keys (required for LLM service)
# Edit .env and add:
# OPENAI_API_KEY=your_key_here
# or
# ANTHROPIC_API_KEY=your_key_here
# 4. Start all services
make docker-up
# This runs: docker compose up --build -d

What Just Happened?

Docker Compose starts 7 services:

✓ vector_db ... Started (PostgreSQL + pgvector)
✓ ontology_service ... Started (Graph service)
✓ embeddings ... Started (Embedding generation)
✓ llm_proxy ... Started (LLM abstraction)
✓ orchestrator ... Started (Retrieval coordinator)
✓ api_gateway ... Started (API entry point)
✓ ui ... Started (Next.js frontend)

Verify Everything Works

Terminal window
# Check service health
curl http://localhost:8001/health # API Gateway
curl http://localhost:8002/health # Orchestrator
curl http://localhost:8003/health # Embeddings
curl http://localhost:8004/health # Ontology
curl http://localhost:8005/health # LLM Proxy
# Access the UI
open http://localhost:3000
# Check logs
docker compose logs -f orchestrator

docker-compose.yml Walkthrough

version: '3.8'
services:
# 1. Vector Database (PostgreSQL + pgvector)
vector_db:
image: postgres:16
environment:
POSTGRES_DB: ragdb
POSTGRES_USER: raguser
POSTGRES_PASSWORD: ragpass
volumes:
- ./vector_db/init.sql:/docker-entrypoint-initdb.d/init.sql
- pgvector_data:/var/lib/postgresql/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U raguser"]
interval: 10s
timeout: 5s
retries: 5
# 2. Ontology Service
ontology_service:
build:
context: ./ontology_service
dockerfile: Dockerfile
ports:
- "8004:8004"
environment:
- GRAPH_TYPE=networkx # or neo4j for production
volumes:
- ./ontology_service/src/graph:/app/src/graph
depends_on:
- vector_db
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8004/health"]
interval: 10s
timeout: 5s
retries: 3
# 3. Embeddings Service
embeddings:
build:
context: ./embeddings
dockerfile: Dockerfile
ports:
- "8003:8003"
environment:
- MODEL_NAME=all-MiniLM-L6-v2
- DEVICE=cpu # Change to 'cuda' if GPU available
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8003/health"]
interval: 10s
timeout: 5s
retries: 3
# 4. LLM Proxy
llm_proxy:
build:
context: ./llm_proxy
dockerfile: Dockerfile
ports:
- "8005:8005"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LLM_PROVIDER=openai # or anthropic, bedrock
- LLM_MODEL=gpt-4
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8005/health"]
interval: 10s
timeout: 5s
retries: 3
# 5. Retrieval Orchestrator
orchestrator:
build:
context: ./orchestrator
dockerfile: Dockerfile
ports:
- "8002:8002"
environment:
- POSTGRES_HOST=vector_db
- POSTGRES_PORT=5432
- POSTGRES_DB=ragdb
- POSTGRES_USER=raguser
- POSTGRES_PASSWORD=ragpass
- EMBEDDING_SERVICE_URL=http://embeddings:8003
- ONTOLOGY_SERVICE_URL=http://ontology_service:8004
- LLM_SERVICE_URL=http://llm_proxy:8005
depends_on:
vector_db:
condition: service_healthy
embeddings:
condition: service_healthy
ontology_service:
condition: service_healthy
llm_proxy:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8002/health"]
interval: 10s
timeout: 5s
retries: 3
# 6. API Gateway
api_gateway:
build:
context: ./api_gateway
dockerfile: Dockerfile
ports:
- "8001:8001"
environment:
- ORCHESTRATOR_URL=http://orchestrator:8002
depends_on:
orchestrator:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
interval: 10s
timeout: 5s
retries: 3
# 7. UI Frontend
ui:
build:
context: ./ui
dockerfile: Dockerfile
ports:
- "3000:3000"
environment:
- NEXT_PUBLIC_API_URL=http://localhost:8001
depends_on:
- api_gateway
volumes:
pgvector_data:

Seed Sample Data

Terminal window
# Load sample Mahabharata data
make seed-data
# This runs:
python scripts/seed_sample_data.py

What it does:

  1. Downloads sample Wikipedia data
  2. Chunks into segments
  3. Extracts entities
  4. Generates embeddings
  5. Loads into vector DB
  6. Builds ontology graph

Test Your First Query

Terminal window
# Via API
curl -X POST http://localhost:8001/query \
-H "Content-Type: application/json" \
-d '{
"query": "Who helped Arjuna after Day 10 of the war?",
"max_results": 10,
"debug": true
}'
# Response (example)
{
"answer": "After Day 10, Arjuna was helped by Krishna (his charioteer and advisor) and Bhima (his brother who protected his position).",
"citations": [
{"doc_id": "doc_156", "title": "Day 11 Battle"},
{"doc_id": "doc_203", "title": "Bhima's Protection"}
],
"retrieval_trace": {
"query_parsing": {...},
"hops": [...],
"fusion_scores": [...]
}
}

Testing Strategy

Testing multi-layer RAG systems requires careful thought at each level.

1. Unit Tests

Test individual components in isolation.

tests/orchestrator/test_query_parser.py
import pytest
from orchestrator.services.query_parser import QueryParser
def test_entity_extraction():
parser = QueryParser()
result = parser.parse("Who helped Arjuna after Day 10 of the war?")
assert "Arjuna" in result.entities
assert result.query_type == "temporal_causal"
assert result.temporal_constraint["type"] == "after"
assert result.temporal_constraint["reference"] == 10
def test_intent_classification():
parser = QueryParser()
# Test who question
result1 = parser.parse("Who was Arjuna's ally?")
assert result1.intent == "relational"
assert result1.query_type == "who_question"
# Test what question
result2 = parser.parse("What happened on Day 10?")
assert result2.intent == "factual"
assert result2.query_type == "what_question"
# Test why question
result3 = parser.parse("Why did Krishna advise Arjuna?")
assert result3.intent == "causal"
assert result3.query_type == "why_question"

Run unit tests:

Terminal window
# From service directory
cd orchestrator
pytest tests/ -v
# Or use make targets
make test-orchestrator

2. Integration Tests

Test service-to-service interactions with mocked dependencies.

tests/orchestrator/test_integration_vector_db.py
import pytest
import asyncio
from orchestrator.clients.vector_client import VectorClient
@pytest.mark.integration
@pytest.mark.asyncio
async def test_vector_search_integration():
"""
Test actual connection to vector DB
Requires: docker compose up vector_db
"""
client = VectorClient()
await client.connect()
# Create test embedding
test_embedding = [0.1] * 384 # Match model dimension
# Search
results = await client.search(
embedding=test_embedding,
limit=10
)
assert len(results) > 0
assert "id" in results[0]
assert "content" in results[0]
assert "similarity" in results[0]
@pytest.mark.integration
@pytest.mark.asyncio
async def test_ontology_expansion_integration():
"""Test ontology service integration"""
from orchestrator.clients.ontology_client import OntologyClient
client = OntologyClient("http://ontology_service:8004")
result = await client.expand(
entities=["Arjuna"],
max_hops=2
)
assert "expanded" in result
assert "Krishna" in result["expanded"] # Known ally
assert "Bhima" in result["expanded"] # Known brother

Run integration tests:

Terminal window
# Start dependencies first
docker compose up -d vector_db ontology_service
# Run tests
pytest tests/ -m integration -v

3. End-to-End Tests

Test the complete flow from API to answer.

tests/test_e2e.py
import pytest
import httpx
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_complete_query_flow():
"""
Test complete query from API gateway to answer
Requires: All services running (docker compose up)
"""
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8001/query",
json={
"query": "Who helped Arjuna after Day 10?",
"max_results": 5,
"debug": True
},
timeout=30.0
)
assert response.status_code == 200
data = response.json()
# Validate response structure
assert "answer" in data
assert "citations" in data
assert len(data["citations"]) > 0
# Validate answer quality
answer_lower = data["answer"].lower()
assert any(name in answer_lower for name in ["krishna", "bhima"])
# Validate debug trace
if "retrieval_trace" in data:
trace = data["retrieval_trace"]
assert "hops" in trace
assert len(trace["hops"]) >= 2 # At least 2 hops

Run E2E tests:

Terminal window
# Start all services
docker compose up -d
# Wait for health checks
./scripts/wait-for-services.sh
# Run E2E tests
pytest tests/ -m e2e -v
# Or use script
./scripts/e2e_test.sh

4. Evaluation & Quality Metrics

Test answer quality systematically.

tests/test_quality.py
import pytest
from evaluation.metrics import (
answer_relevance,
citation_precision,
citation_recall
)
@pytest.mark.evaluation
def test_answer_quality():
"""
Evaluate system on benchmark questions
"""
test_cases = [
{
"query": "Who helped Arjuna after Day 10?",
"expected_entities": ["Krishna", "Bhima"],
"expected_citations": ["doc_156", "doc_203"]
},
# More test cases...
]
results = []
for case in test_cases:
response = query_system(case["query"])
# Check if expected entities appear in answer
relevance = answer_relevance(
response["answer"],
case["expected_entities"]
)
# Check citation quality
precision = citation_precision(
response["citations"],
case["expected_citations"]
)
results.append({
"query": case["query"],
"relevance": relevance,
"precision": precision
})
# Assert minimum quality
avg_relevance = sum(r["relevance"] for r in results) / len(results)
assert avg_relevance > 0.8, f"Low answer relevance: {avg_relevance}"

Testing Pyramid

┌────────────┐
│ E2E │ 10% (Slow, expensive, brittle)
│ Tests │
├────────────┤
│Integration │ 20% (Medium speed, focused)
│ Tests │
├────────────┤
│ Unit │ 70% (Fast, reliable, cheap)
│ Tests │
└────────────┘

Philosophy: Most tests should be fast unit tests. Integration tests validate service boundaries. E2E tests verify the happy path.


Production Deployment (AWS)

Architecture Overview

┌─────────────────────────────────────────────────────────────┐
│ AWS Cloud │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ CloudFront │ │ ALB │ │
│ │ (CDN) │ │(Load Balancer)│ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ │ (Static) │ (API) │
│ ▼ ▼ │
│ ┌──────────────┐ ┌─────────────────┐ │
│ │ S3 Bucket │ │ ECS Fargate │ │
│ │ (UI Build) │ │ Services │ │
│ └──────────────┘ │ │ │
│ │ - API Gateway │ │
│ │ - Orchestrator │ │
│ │ - Embeddings │ │
│ │ - Ontology │ │
│ │ - LLM Proxy │ │
│ │ - Pipeline │ │
│ └────┬───┬───┬────┘ │
│ │ │ │ │
│ ┌────────────┘ │ └──────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────┐ ┌──────────┐ │
│ │ RDS │ │ Neo4j │ │ Secrets │ │
│ │ PostgreSQL │ │ Aura │ │ Manager │ │
│ │ + pgvector │ │ │ │ │ │
│ └──────────────┘ └──────────┘ └──────────┘ │
│ │
└───────────────────────────────────────────────────────────┘

Step 1: Build and Push Docker Images

Terminal window
# Install AWS CLI
brew install awscli # macOS
# or: pip install awscli
# Configure AWS credentials
aws configure
# Login to ECR
aws ecr get-login-password --region us-east-1 | \
docker login --username AWS --password-stdin \
YOUR_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com
# Build and tag images
export ECR_REGISTRY=YOUR_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com
docker build -t $ECR_REGISTRY/ragforge-api-gateway:latest ./api_gateway
docker build -t $ECR_REGISTRY/ragforge-orchestrator:latest ./orchestrator
docker build -t $ECR_REGISTRY/ragforge-embeddings:latest ./embeddings
docker build -t $ECR_REGISTRY/ragforge-ontology:latest ./ontology_service
docker build -t $ECR_REGISTRY/ragforge-llm-proxy:latest ./llm_proxy
# Push to ECR
docker push $ECR_REGISTRY/ragforge-api-gateway:latest
docker push $ECR_REGISTRY/ragforge-orchestrator:latest
docker push $ECR_REGISTRY/ragforge-embeddings:latest
docker push $ECR_REGISTRY/ragforge-ontology:latest
docker push $ECR_REGISTRY/ragforge-llm-proxy:latest

Step 2: Infrastructure as Code (Terraform)

terraform/main.tf
# VPC and Networking
resource "aws_vpc" "ragforge" {
cidr_block = "10.0.0.0/16"
enable_dns_hostnames = true
enable_dns_support = true
tags = {
Name = "ragforge-vpc"
}
}
# RDS PostgreSQL with pgvector
resource "aws_db_instance" "vector_db" {
identifier = "ragforge-vector-db"
engine = "postgres"
engine_version = "16.1"
instance_class = "db.t3.large"
allocated_storage = 100
storage_type = "gp3"
db_name = "ragdb"
username = "raguser"
password = data.aws_secretsmanager_secret_version.db_password.secret_string
vpc_security_group_ids = [aws_security_group.db.id]
db_subnet_group_name = aws_db_subnet_group.ragforge.name
# Enable pgvector extension via custom parameter group
parameter_group_name = aws_db_parameter_group.pgvector.name
backup_retention_period = 7
skip_final_snapshot = false
final_snapshot_identifier = "ragforge-final-snapshot"
tags = {
Name = "ragforge-vector-db"
}
}
# ECS Cluster
resource "aws_ecs_cluster" "ragforge" {
name = "ragforge-cluster"
setting {
name = "containerInsights"
value = "enabled"
}
}
# ECS Task Definition - Orchestrator
resource "aws_ecs_task_definition" "orchestrator" {
family = "ragforge-orchestrator"
requires_compatibilities = ["FARGATE"]
network_mode = "awsvpc"
cpu = "1024" # 1 vCPU
memory = "2048" # 2 GB
container_definitions = jsonencode([
{
name = "orchestrator"
image = "${var.ecr_registry}/ragforge-orchestrator:latest"
portMappings = [
{
containerPort = 8002
protocol = "tcp"
}
]
environment = [
{
name = "POSTGRES_HOST"
value = aws_db_instance.vector_db.endpoint
},
{
name = "EMBEDDING_SERVICE_URL"
value = "http://${aws_service_discovery_service.embeddings.name}.${aws_service_discovery_private_dns_namespace.ragforge.name}:8003"
}
]
secrets = [
{
name = "POSTGRES_PASSWORD"
valueFrom = aws_secretsmanager_secret.db_password.arn
}
]
logConfiguration = {
logDriver = "awslogs"
options = {
"awslogs-group" = "/ecs/ragforge-orchestrator"
"awslogs-region" = var.aws_region
"awslogs-stream-prefix" = "ecs"
}
}
}
])
}
# ECS Service - Orchestrator
resource "aws_ecs_service" "orchestrator" {
name = "orchestrator"
cluster = aws_ecs_cluster.ragforge.id
task_definition = aws_ecs_task_definition.orchestrator.arn
desired_count = 2
launch_type = "FARGATE"
network_configuration {
subnets = aws_subnet.private[*].id
security_groups = [aws_security_group.ecs_tasks.id]
}
load_balancer {
target_group_arn = aws_lb_target_group.orchestrator.arn
container_name = "orchestrator"
container_port = 8002
}
service_registries {
registry_arn = aws_service_discovery_service.orchestrator.arn
}
depends_on = [aws_lb_listener.api]
}
# Similar definitions for other services...

Step 3: Deploy

Terminal window
# Initialize Terraform
cd terraform
terraform init
# Plan deployment
terraform plan -out=tfplan
# Apply
terraform apply tfplan
# Get outputs
terraform output alb_dns_name
# Output: ragforge-alb-123456789.us-east-1.elb.amazonaws.com

Step 4: Configure DNS

Terminal window
# Create CNAME record pointing to ALB
# api.ragforge.com -> ragforge-alb-123456789.us-east-1.elb.amazonaws.com

Observability and Debugging

1. Request Tracing

Every request gets a unique trace ID that flows through all services.

orchestrator/src/utils/observability.py
import uuid
from contextvars import ContextVar
# Context variable for trace ID
trace_id_var: ContextVar[str] = ContextVar('trace_id', default='')
def get_trace_id() -> str:
return trace_id_var.get()
def set_trace_id(trace_id: str):
trace_id_var.set(trace_id)
# In FastAPI middleware
from starlette.middleware.base import BaseHTTPMiddleware
class TraceMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
# Get or generate trace ID
trace_id = request.headers.get('X-Trace-ID', str(uuid.uuid4()))
set_trace_id(trace_id)
# Add to response headers
response = await call_next(request)
response.headers['X-Trace-ID'] = trace_id
return response
# Add to app
app.add_middleware(TraceMiddleware)

2. Structured Logging

orchestrator/src/utils/logging.py
import logging
import json
from .observability import get_trace_id
class StructuredLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
def info(self, message: str, **kwargs):
log_data = {
"level": "INFO",
"message": message,
"trace_id": get_trace_id(),
"service": "orchestrator",
**kwargs
}
self.logger.info(json.dumps(log_data))
# Usage
logger = StructuredLogger(__name__)
logger.info(
"Starting multi-hop retrieval",
query=query,
entities=entities,
hops=3
)

3. Metrics Collection

orchestrator/src/utils/metrics.py
from prometheus_client import Counter, Histogram
import time
# Define metrics
query_counter = Counter(
'ragforge_queries_total',
'Total number of queries',
['query_type', 'status']
)
query_latency = Histogram(
'ragforge_query_latency_seconds',
'Query latency in seconds',
['service', 'operation']
)
# Usage
@app.post("/orchestrate")
async def orchestrate(request: QueryRequest):
start_time = time.time()
try:
result = await do_orchestration(request)
# Record success
query_counter.labels(
query_type=result.query_type,
status="success"
).inc()
return result
except Exception as e:
# Record failure
query_counter.labels(
query_type="unknown",
status="error"
).inc()
raise
finally:
# Record latency
latency = time.time() - start_time
query_latency.labels(
service="orchestrator",
operation="full_query"
).observe(latency)

4. Debug Endpoint

orchestrator/src/main.py
@app.post("/debug/orchestrate")
async def debug_orchestrate(request: QueryRequest):
"""
Returns verbose debugging information about retrieval
"""
trace = {
"trace_id": get_trace_id(),
"steps": []
}
# Step 1: Parse query
start = time.time()
parsed = await query_parser.parse(request.query)
trace["steps"].append({
"step": "parse_query",
"latency_ms": (time.time() - start) * 1000,
"result": parsed.dict()
})
# Step 2: Semantic expansion
start = time.time()
expanded = await semantic_expander.expand(parsed)
trace["steps"].append({
"step": "semantic_expansion",
"latency_ms": (time.time() - start) * 1000,
"expansions": expanded
})
# ... more steps with timing ...
return {
"answer": final_answer,
"debug_trace": trace
}

5. Grafana Dashboards

Create dashboards showing:

  • Query volume over time
  • Latency percentiles (p50, p95, p99)
  • Error rates by service
  • LLM token usage and cost
  • Cache hit rates
  • Vector search recall

Performance Optimization

1. Caching Strategy

orchestrator/src/utils/cache.py
from functools import lru_cache
import redis
import pickle
# Redis for distributed caching
redis_client = redis.Redis(host='redis', port=6379)
def cache_embedding(text: str, embedding: list):
"""Cache query embeddings"""
key = f"embedding:{hash(text)}"
redis_client.setex(
key,
3600, # 1 hour TTL
pickle.dumps(embedding)
)
def get_cached_embedding(text: str):
"""Retrieve cached embedding"""
key = f"embedding:{hash(text)}"
cached = redis_client.get(key)
if cached:
return pickle.loads(cached)
return None
# In-memory cache for ontology expansions
@lru_cache(maxsize=1000)
def expand_entity(entity: str, max_hops: int):
"""Cache common entity expansions"""
return ontology_client.expand([entity], max_hops)

2. Connection Pooling

orchestrator/src/clients/vector_client.py
import asyncpg
class VectorClient:
def __init__(self):
self.pool = None
async def initialize(self):
"""Create connection pool at startup"""
self.pool = await asyncpg.create_pool(
host=os.getenv("POSTGRES_HOST"),
port=5432,
database="ragdb",
user="raguser",
password=os.getenv("POSTGRES_PASSWORD"),
min_size=5,
max_size=20
)
async def search(self, embedding, limit=20):
async with self.pool.acquire() as conn:
# Use pooled connection
results = await conn.fetch(query, embedding, limit)
return results

3. Parallel Execution

orchestrator/src/services/hop_retriever.py
import asyncio
async def retrieve_multi_hop(query_info):
"""Execute hops in parallel where possible"""
# Get embedding and ontology expansion in parallel
embedding_task = embeddings_client.embed(query_info.query)
ontology_task = ontology_client.expand(query_info.entities)
embedding, expanded_entities = await asyncio.gather(
embedding_task,
ontology_task
)
# Now execute multiple Hop 2 queries in parallel
hop2_tasks = [
vector_client.search_by_entity(entity, embedding)
for entity in expanded_entities
]
hop2_results = await asyncio.gather(*hop2_tasks)
return combine_results(hop2_results)

Extending to New Domains

Want to use Ragforge for medical research, legal documents, or your company’s knowledge base?

Step-by-Step Guide

1. Prepare Your Data

data/medical/
# - pubmed_abstracts.json
# - clinical_trials.json
# - drug_interactions.csv

2. Define Your Ontology

domain_config/medical_ontology.py
ENTITY_TYPES = [
"Drug",
"Disease",
"Symptom",
"Treatment",
"Gene",
"Protein"
]
RELATIONS = [
"TREATS", # Drug -[TREATS]-> Disease
"CAUSES", # Gene -[CAUSES]-> Disease
"HAS_SYMPTOM", # Disease -[HAS_SYMPTOM]-> Symptom
"INTERACTS_WITH", # Drug -[INTERACTS_WITH]-> Drug
"INHIBITS", # Drug -[INHIBITS]-> Protein
]

3. Customize Entity Extraction

pipeline/src/extractors/medical_extractor.py
import spacy
# Load medical NER model
nlp = spacy.load("en_ner_bc5cdr_md") # Bio-medical NER
def extract_medical_entities(text: str):
doc = nlp(text)
entities = []
for ent in doc.ents:
entities.append({
"text": ent.text,
"type": ent.label_, # DISEASE, CHEMICAL
"start": ent.start_char,
"end": ent.end_char
})
return entities

4. Run the Pipeline

Terminal window
# Load medical data
python pipeline/src/main.py \
--domain medical \
--data-path data/medical/ \
--ontology domain_config/medical_ontology.py
# This will:
# 1. Load medical documents
# 2. Extract medical entities
# 3. Build medical ontology graph
# 4. Generate embeddings
# 5. Load into vector DB

5. Query Your Domain

Terminal window
curl -X POST http://localhost:8001/query \
-H "Content-Type: application/json" \
-d '{
"query": "What drugs treat hypertension by inhibiting ACE?",
"domain": "medical"
}'

The same multi-layer retrieval pipeline now works for your medical domain!


Production Checklist

Before going live:

Security

  • API authentication enabled
  • HTTPS/TLS configured
  • Secrets in AWS Secrets Manager (not env vars)
  • VPC security groups configured
  • IAM roles with least privilege
  • Rate limiting enabled

Reliability

  • Health checks on all services
  • Auto-scaling configured
  • Database backups enabled
  • Disaster recovery plan
  • Circuit breakers for external APIs
  • Retry logic with exponential backoff

Observability

  • Structured logging enabled
  • Metrics collection (Prometheus)
  • Distributed tracing (Jaeger/X-Ray)
  • Alerting configured
  • Dashboards created (Grafana)
  • On-call rotation defined

Performance

  • Load testing completed
  • Caching strategy implemented
  • Database indexes optimized
  • Connection pooling configured
  • CDN configured for UI

Quality

  • Unit tests > 80% coverage
  • Integration tests passing
  • E2E smoke tests passing
  • Answer quality benchmarks met
  • Documentation complete

Future Enhancements

Ideas for extending Ragforge:

1. Advanced Reranking

  • Cross-encoder models for final reranking
  • Learn-to-rank with user feedback

2. Conversational Memory

  • Redis-based conversation history
  • Context carryover between queries

3. Multi-Modal Support

  • Image embeddings (CLIP)
  • Table extraction and understanding
  • PDF visual parsing

4. Evaluation Framework

  • Automated RAG evaluation metrics
  • A/B testing framework
  • Human feedback loop

5. Query Optimization

  • Query rewriting with LLMs
  • Intent-based retrieval strategies
  • Adaptive hop depth

Conclusion: You’re Ready!

You now have a complete understanding of building production-ready, multi-layer RAG systems:

Part 1: Why vector search alone isn’t enough Part 2: How multi-hop retrieval actually works Part 3: The microservices architecture Part 4: Deploying and operating in production

Get Started Today

Terminal window
# Clone and run locally
git clone https://github.com/iamthatdev/ragforge.git
cd ragforge
make docker-up
# Visit http://localhost:3000 and start asking questions!

Series Complete

Congratulations on completing the series!

You’ve learned:

  • The limitations of vector-only RAG
  • How multi-layer retrieval combines semantic + symbolic + multi-hop
  • A production microservices architecture
  • Deployment, testing, and operations

What’s next?

  • Build your own RAG system with Ragforge
  • Extend it to your domain
  • Share your experience with the community

Part 3: Microservices Architecture ← Part 4 (you are here)

Start building:

Lorem ipsum dolor sit amet, consectetur adipiscing elit.
00K00KMIT