Deploying and Operating RAG Systems
/ 14 min read
Table of Contents
Part 4: From Development to Production - Deploying and Operating RAG Systems
This is Part 4 of a 4-part series on building production-ready, multi-layer RAG systems with Ragforge.
- Part 1: Beyond Vector Search
- Part 2: The Multi-Hop Retrieval Pipeline
- Part 3: Microservices Architecture
- Part 4: From Development to Production (you are here)
From Architecture to Reality
In the previous three parts, we explored:
- Part 1: Why multi-layer RAG is necessary
- Part 2: How the retrieval pipeline works
- Part 3: The microservices architecture
Now comes the practical part: How do you actually run this system?
This final part covers:
- Local development setup
- Production deployment (AWS)
- Testing at every level
- Observability and debugging
- Performance optimization
- Extending to new domains
Let’s get your RAG system running!
Local Development Setup
Prerequisites
# Required- Docker & Docker Compose (20.10+)- Git
# Optional (for local development without Docker)- Python 3.12- Node.js 20+- uv (https://github.com/astral-sh/uv)Quick Start (5 Minutes)
# 1. Clone the repositorygit clone https://github.com/your-org/ragforge.gitcd ragforge
# 2. Create environment filecp .env.example .env
# 3. Add your API keys (required for LLM service)# Edit .env and add:# OPENAI_API_KEY=your_key_here# or# ANTHROPIC_API_KEY=your_key_here
# 4. Start all servicesmake docker-up
# This runs: docker compose up --build -dWhat Just Happened?
Docker Compose starts 7 services:
✓ vector_db ... Started (PostgreSQL + pgvector)✓ ontology_service ... Started (Graph service)✓ embeddings ... Started (Embedding generation)✓ llm_proxy ... Started (LLM abstraction)✓ orchestrator ... Started (Retrieval coordinator)✓ api_gateway ... Started (API entry point)✓ ui ... Started (Next.js frontend)Verify Everything Works
# Check service healthcurl http://localhost:8001/health # API Gatewaycurl http://localhost:8002/health # Orchestratorcurl http://localhost:8003/health # Embeddingscurl http://localhost:8004/health # Ontologycurl http://localhost:8005/health # LLM Proxy
# Access the UIopen http://localhost:3000
# Check logsdocker compose logs -f orchestratordocker-compose.yml Walkthrough
version: '3.8'
services: # 1. Vector Database (PostgreSQL + pgvector) vector_db: image: postgres:16 environment: POSTGRES_DB: ragdb POSTGRES_USER: raguser POSTGRES_PASSWORD: ragpass volumes: - ./vector_db/init.sql:/docker-entrypoint-initdb.d/init.sql - pgvector_data:/var/lib/postgresql/data ports: - "5432:5432" healthcheck: test: ["CMD-SHELL", "pg_isready -U raguser"] interval: 10s timeout: 5s retries: 5
# 2. Ontology Service ontology_service: build: context: ./ontology_service dockerfile: Dockerfile ports: - "8004:8004" environment: - GRAPH_TYPE=networkx # or neo4j for production volumes: - ./ontology_service/src/graph:/app/src/graph depends_on: - vector_db healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8004/health"] interval: 10s timeout: 5s retries: 3
# 3. Embeddings Service embeddings: build: context: ./embeddings dockerfile: Dockerfile ports: - "8003:8003" environment: - MODEL_NAME=all-MiniLM-L6-v2 - DEVICE=cpu # Change to 'cuda' if GPU available healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8003/health"] interval: 10s timeout: 5s retries: 3
# 4. LLM Proxy llm_proxy: build: context: ./llm_proxy dockerfile: Dockerfile ports: - "8005:8005" environment: - OPENAI_API_KEY=${OPENAI_API_KEY} - LLM_PROVIDER=openai # or anthropic, bedrock - LLM_MODEL=gpt-4 healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8005/health"] interval: 10s timeout: 5s retries: 3
# 5. Retrieval Orchestrator orchestrator: build: context: ./orchestrator dockerfile: Dockerfile ports: - "8002:8002" environment: - POSTGRES_HOST=vector_db - POSTGRES_PORT=5432 - POSTGRES_DB=ragdb - POSTGRES_USER=raguser - POSTGRES_PASSWORD=ragpass - EMBEDDING_SERVICE_URL=http://embeddings:8003 - ONTOLOGY_SERVICE_URL=http://ontology_service:8004 - LLM_SERVICE_URL=http://llm_proxy:8005 depends_on: vector_db: condition: service_healthy embeddings: condition: service_healthy ontology_service: condition: service_healthy llm_proxy: condition: service_healthy healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8002/health"] interval: 10s timeout: 5s retries: 3
# 6. API Gateway api_gateway: build: context: ./api_gateway dockerfile: Dockerfile ports: - "8001:8001" environment: - ORCHESTRATOR_URL=http://orchestrator:8002 depends_on: orchestrator: condition: service_healthy healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8001/health"] interval: 10s timeout: 5s retries: 3
# 7. UI Frontend ui: build: context: ./ui dockerfile: Dockerfile ports: - "3000:3000" environment: - NEXT_PUBLIC_API_URL=http://localhost:8001 depends_on: - api_gateway
volumes: pgvector_data:Seed Sample Data
# Load sample Mahabharata datamake seed-data
# This runs:python scripts/seed_sample_data.pyWhat it does:
- Downloads sample Wikipedia data
- Chunks into segments
- Extracts entities
- Generates embeddings
- Loads into vector DB
- Builds ontology graph
Test Your First Query
# Via APIcurl -X POST http://localhost:8001/query \ -H "Content-Type: application/json" \ -d '{ "query": "Who helped Arjuna after Day 10 of the war?", "max_results": 10, "debug": true }'
# Response (example){ "answer": "After Day 10, Arjuna was helped by Krishna (his charioteer and advisor) and Bhima (his brother who protected his position).", "citations": [ {"doc_id": "doc_156", "title": "Day 11 Battle"}, {"doc_id": "doc_203", "title": "Bhima's Protection"} ], "retrieval_trace": { "query_parsing": {...}, "hops": [...], "fusion_scores": [...] }}Testing Strategy
Testing multi-layer RAG systems requires careful thought at each level.
1. Unit Tests
Test individual components in isolation.
import pytestfrom orchestrator.services.query_parser import QueryParser
def test_entity_extraction(): parser = QueryParser() result = parser.parse("Who helped Arjuna after Day 10 of the war?")
assert "Arjuna" in result.entities assert result.query_type == "temporal_causal" assert result.temporal_constraint["type"] == "after" assert result.temporal_constraint["reference"] == 10
def test_intent_classification(): parser = QueryParser()
# Test who question result1 = parser.parse("Who was Arjuna's ally?") assert result1.intent == "relational" assert result1.query_type == "who_question"
# Test what question result2 = parser.parse("What happened on Day 10?") assert result2.intent == "factual" assert result2.query_type == "what_question"
# Test why question result3 = parser.parse("Why did Krishna advise Arjuna?") assert result3.intent == "causal" assert result3.query_type == "why_question"Run unit tests:
# From service directorycd orchestratorpytest tests/ -v
# Or use make targetsmake test-orchestrator2. Integration Tests
Test service-to-service interactions with mocked dependencies.
import pytestimport asynciofrom orchestrator.clients.vector_client import VectorClient
@pytest.mark.integration@pytest.mark.asyncioasync def test_vector_search_integration(): """ Test actual connection to vector DB Requires: docker compose up vector_db """ client = VectorClient() await client.connect()
# Create test embedding test_embedding = [0.1] * 384 # Match model dimension
# Search results = await client.search( embedding=test_embedding, limit=10 )
assert len(results) > 0 assert "id" in results[0] assert "content" in results[0] assert "similarity" in results[0]
@pytest.mark.integration@pytest.mark.asyncioasync def test_ontology_expansion_integration(): """Test ontology service integration""" from orchestrator.clients.ontology_client import OntologyClient
client = OntologyClient("http://ontology_service:8004")
result = await client.expand( entities=["Arjuna"], max_hops=2 )
assert "expanded" in result assert "Krishna" in result["expanded"] # Known ally assert "Bhima" in result["expanded"] # Known brotherRun integration tests:
# Start dependencies firstdocker compose up -d vector_db ontology_service
# Run testspytest tests/ -m integration -v3. End-to-End Tests
Test the complete flow from API to answer.
import pytestimport httpx
@pytest.mark.e2e@pytest.mark.asyncioasync def test_complete_query_flow(): """ Test complete query from API gateway to answer Requires: All services running (docker compose up) """ async with httpx.AsyncClient() as client: response = await client.post( "http://localhost:8001/query", json={ "query": "Who helped Arjuna after Day 10?", "max_results": 5, "debug": True }, timeout=30.0 )
assert response.status_code == 200 data = response.json()
# Validate response structure assert "answer" in data assert "citations" in data assert len(data["citations"]) > 0
# Validate answer quality answer_lower = data["answer"].lower() assert any(name in answer_lower for name in ["krishna", "bhima"])
# Validate debug trace if "retrieval_trace" in data: trace = data["retrieval_trace"] assert "hops" in trace assert len(trace["hops"]) >= 2 # At least 2 hopsRun E2E tests:
# Start all servicesdocker compose up -d
# Wait for health checks./scripts/wait-for-services.sh
# Run E2E testspytest tests/ -m e2e -v
# Or use script./scripts/e2e_test.sh4. Evaluation & Quality Metrics
Test answer quality systematically.
import pytestfrom evaluation.metrics import ( answer_relevance, citation_precision, citation_recall)
@pytest.mark.evaluationdef test_answer_quality(): """ Evaluate system on benchmark questions """ test_cases = [ { "query": "Who helped Arjuna after Day 10?", "expected_entities": ["Krishna", "Bhima"], "expected_citations": ["doc_156", "doc_203"] }, # More test cases... ]
results = [] for case in test_cases: response = query_system(case["query"])
# Check if expected entities appear in answer relevance = answer_relevance( response["answer"], case["expected_entities"] )
# Check citation quality precision = citation_precision( response["citations"], case["expected_citations"] )
results.append({ "query": case["query"], "relevance": relevance, "precision": precision })
# Assert minimum quality avg_relevance = sum(r["relevance"] for r in results) / len(results) assert avg_relevance > 0.8, f"Low answer relevance: {avg_relevance}"Testing Pyramid
┌────────────┐ │ E2E │ 10% (Slow, expensive, brittle) │ Tests │ ├────────────┤ │Integration │ 20% (Medium speed, focused) │ Tests │ ├────────────┤ │ Unit │ 70% (Fast, reliable, cheap) │ Tests │ └────────────┘Philosophy: Most tests should be fast unit tests. Integration tests validate service boundaries. E2E tests verify the happy path.
Production Deployment (AWS)
Architecture Overview
┌─────────────────────────────────────────────────────────────┐│ AWS Cloud │├─────────────────────────────────────────────────────────────┤│ ││ ┌──────────────┐ ┌──────────────┐ ││ │ CloudFront │ │ ALB │ ││ │ (CDN) │ │(Load Balancer)│ ││ └──────┬───────┘ └──────┬───────┘ ││ │ │ ││ │ (Static) │ (API) ││ ▼ ▼ ││ ┌──────────────┐ ┌─────────────────┐ ││ │ S3 Bucket │ │ ECS Fargate │ ││ │ (UI Build) │ │ Services │ ││ └──────────────┘ │ │ ││ │ - API Gateway │ ││ │ - Orchestrator │ ││ │ - Embeddings │ ││ │ - Ontology │ ││ │ - LLM Proxy │ ││ │ - Pipeline │ ││ └────┬───┬───┬────┘ ││ │ │ │ ││ ┌────────────┘ │ └──────────┐ ││ ▼ ▼ ▼ ││ ┌──────────────┐ ┌──────────┐ ┌──────────┐ ││ │ RDS │ │ Neo4j │ │ Secrets │ ││ │ PostgreSQL │ │ Aura │ │ Manager │ ││ │ + pgvector │ │ │ │ │ ││ └──────────────┘ └──────────┘ └──────────┘ ││ │└───────────────────────────────────────────────────────────┘Step 1: Build and Push Docker Images
# Install AWS CLIbrew install awscli # macOS# or: pip install awscli
# Configure AWS credentialsaws configure
# Login to ECRaws ecr get-login-password --region us-east-1 | \ docker login --username AWS --password-stdin \ YOUR_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com
# Build and tag imagesexport ECR_REGISTRY=YOUR_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com
docker build -t $ECR_REGISTRY/ragforge-api-gateway:latest ./api_gatewaydocker build -t $ECR_REGISTRY/ragforge-orchestrator:latest ./orchestratordocker build -t $ECR_REGISTRY/ragforge-embeddings:latest ./embeddingsdocker build -t $ECR_REGISTRY/ragforge-ontology:latest ./ontology_servicedocker build -t $ECR_REGISTRY/ragforge-llm-proxy:latest ./llm_proxy
# Push to ECRdocker push $ECR_REGISTRY/ragforge-api-gateway:latestdocker push $ECR_REGISTRY/ragforge-orchestrator:latestdocker push $ECR_REGISTRY/ragforge-embeddings:latestdocker push $ECR_REGISTRY/ragforge-ontology:latestdocker push $ECR_REGISTRY/ragforge-llm-proxy:latestStep 2: Infrastructure as Code (Terraform)
# VPC and Networkingresource "aws_vpc" "ragforge" { cidr_block = "10.0.0.0/16" enable_dns_hostnames = true enable_dns_support = true
tags = { Name = "ragforge-vpc" }}
# RDS PostgreSQL with pgvectorresource "aws_db_instance" "vector_db" { identifier = "ragforge-vector-db" engine = "postgres" engine_version = "16.1" instance_class = "db.t3.large"
allocated_storage = 100 storage_type = "gp3"
db_name = "ragdb" username = "raguser" password = data.aws_secretsmanager_secret_version.db_password.secret_string
vpc_security_group_ids = [aws_security_group.db.id] db_subnet_group_name = aws_db_subnet_group.ragforge.name
# Enable pgvector extension via custom parameter group parameter_group_name = aws_db_parameter_group.pgvector.name
backup_retention_period = 7 skip_final_snapshot = false final_snapshot_identifier = "ragforge-final-snapshot"
tags = { Name = "ragforge-vector-db" }}
# ECS Clusterresource "aws_ecs_cluster" "ragforge" { name = "ragforge-cluster"
setting { name = "containerInsights" value = "enabled" }}
# ECS Task Definition - Orchestratorresource "aws_ecs_task_definition" "orchestrator" { family = "ragforge-orchestrator" requires_compatibilities = ["FARGATE"] network_mode = "awsvpc" cpu = "1024" # 1 vCPU memory = "2048" # 2 GB
container_definitions = jsonencode([ { name = "orchestrator" image = "${var.ecr_registry}/ragforge-orchestrator:latest" portMappings = [ { containerPort = 8002 protocol = "tcp" } ] environment = [ { name = "POSTGRES_HOST" value = aws_db_instance.vector_db.endpoint }, { name = "EMBEDDING_SERVICE_URL" value = "http://${aws_service_discovery_service.embeddings.name}.${aws_service_discovery_private_dns_namespace.ragforge.name}:8003" } ] secrets = [ { name = "POSTGRES_PASSWORD" valueFrom = aws_secretsmanager_secret.db_password.arn } ] logConfiguration = { logDriver = "awslogs" options = { "awslogs-group" = "/ecs/ragforge-orchestrator" "awslogs-region" = var.aws_region "awslogs-stream-prefix" = "ecs" } } } ])}
# ECS Service - Orchestratorresource "aws_ecs_service" "orchestrator" { name = "orchestrator" cluster = aws_ecs_cluster.ragforge.id task_definition = aws_ecs_task_definition.orchestrator.arn desired_count = 2 launch_type = "FARGATE"
network_configuration { subnets = aws_subnet.private[*].id security_groups = [aws_security_group.ecs_tasks.id] }
load_balancer { target_group_arn = aws_lb_target_group.orchestrator.arn container_name = "orchestrator" container_port = 8002 }
service_registries { registry_arn = aws_service_discovery_service.orchestrator.arn }
depends_on = [aws_lb_listener.api]}
# Similar definitions for other services...Step 3: Deploy
# Initialize Terraformcd terraformterraform init
# Plan deploymentterraform plan -out=tfplan
# Applyterraform apply tfplan
# Get outputsterraform output alb_dns_name# Output: ragforge-alb-123456789.us-east-1.elb.amazonaws.comStep 4: Configure DNS
# Create CNAME record pointing to ALB# api.ragforge.com -> ragforge-alb-123456789.us-east-1.elb.amazonaws.comObservability and Debugging
1. Request Tracing
Every request gets a unique trace ID that flows through all services.
import uuidfrom contextvars import ContextVar
# Context variable for trace IDtrace_id_var: ContextVar[str] = ContextVar('trace_id', default='')
def get_trace_id() -> str: return trace_id_var.get()
def set_trace_id(trace_id: str): trace_id_var.set(trace_id)
# In FastAPI middlewarefrom starlette.middleware.base import BaseHTTPMiddleware
class TraceMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): # Get or generate trace ID trace_id = request.headers.get('X-Trace-ID', str(uuid.uuid4())) set_trace_id(trace_id)
# Add to response headers response = await call_next(request) response.headers['X-Trace-ID'] = trace_id return response
# Add to appapp.add_middleware(TraceMiddleware)2. Structured Logging
import loggingimport jsonfrom .observability import get_trace_id
class StructuredLogger: def __init__(self, name: str): self.logger = logging.getLogger(name) self.logger.setLevel(logging.INFO)
def info(self, message: str, **kwargs): log_data = { "level": "INFO", "message": message, "trace_id": get_trace_id(), "service": "orchestrator", **kwargs } self.logger.info(json.dumps(log_data))
# Usagelogger = StructuredLogger(__name__)
logger.info( "Starting multi-hop retrieval", query=query, entities=entities, hops=3)3. Metrics Collection
from prometheus_client import Counter, Histogramimport time
# Define metricsquery_counter = Counter( 'ragforge_queries_total', 'Total number of queries', ['query_type', 'status'])
query_latency = Histogram( 'ragforge_query_latency_seconds', 'Query latency in seconds', ['service', 'operation'])
# Usage@app.post("/orchestrate")async def orchestrate(request: QueryRequest): start_time = time.time()
try: result = await do_orchestration(request)
# Record success query_counter.labels( query_type=result.query_type, status="success" ).inc()
return result
except Exception as e: # Record failure query_counter.labels( query_type="unknown", status="error" ).inc() raise
finally: # Record latency latency = time.time() - start_time query_latency.labels( service="orchestrator", operation="full_query" ).observe(latency)4. Debug Endpoint
@app.post("/debug/orchestrate")async def debug_orchestrate(request: QueryRequest): """ Returns verbose debugging information about retrieval """ trace = { "trace_id": get_trace_id(), "steps": [] }
# Step 1: Parse query start = time.time() parsed = await query_parser.parse(request.query) trace["steps"].append({ "step": "parse_query", "latency_ms": (time.time() - start) * 1000, "result": parsed.dict() })
# Step 2: Semantic expansion start = time.time() expanded = await semantic_expander.expand(parsed) trace["steps"].append({ "step": "semantic_expansion", "latency_ms": (time.time() - start) * 1000, "expansions": expanded })
# ... more steps with timing ...
return { "answer": final_answer, "debug_trace": trace }5. Grafana Dashboards
Create dashboards showing:
- Query volume over time
- Latency percentiles (p50, p95, p99)
- Error rates by service
- LLM token usage and cost
- Cache hit rates
- Vector search recall
Performance Optimization
1. Caching Strategy
from functools import lru_cacheimport redisimport pickle
# Redis for distributed cachingredis_client = redis.Redis(host='redis', port=6379)
def cache_embedding(text: str, embedding: list): """Cache query embeddings""" key = f"embedding:{hash(text)}" redis_client.setex( key, 3600, # 1 hour TTL pickle.dumps(embedding) )
def get_cached_embedding(text: str): """Retrieve cached embedding""" key = f"embedding:{hash(text)}" cached = redis_client.get(key) if cached: return pickle.loads(cached) return None
# In-memory cache for ontology expansions@lru_cache(maxsize=1000)def expand_entity(entity: str, max_hops: int): """Cache common entity expansions""" return ontology_client.expand([entity], max_hops)2. Connection Pooling
import asyncpg
class VectorClient: def __init__(self): self.pool = None
async def initialize(self): """Create connection pool at startup""" self.pool = await asyncpg.create_pool( host=os.getenv("POSTGRES_HOST"), port=5432, database="ragdb", user="raguser", password=os.getenv("POSTGRES_PASSWORD"), min_size=5, max_size=20 )
async def search(self, embedding, limit=20): async with self.pool.acquire() as conn: # Use pooled connection results = await conn.fetch(query, embedding, limit) return results3. Parallel Execution
import asyncio
async def retrieve_multi_hop(query_info): """Execute hops in parallel where possible"""
# Get embedding and ontology expansion in parallel embedding_task = embeddings_client.embed(query_info.query) ontology_task = ontology_client.expand(query_info.entities)
embedding, expanded_entities = await asyncio.gather( embedding_task, ontology_task )
# Now execute multiple Hop 2 queries in parallel hop2_tasks = [ vector_client.search_by_entity(entity, embedding) for entity in expanded_entities ]
hop2_results = await asyncio.gather(*hop2_tasks)
return combine_results(hop2_results)Extending to New Domains
Want to use Ragforge for medical research, legal documents, or your company’s knowledge base?
Step-by-Step Guide
1. Prepare Your Data
# - pubmed_abstracts.json# - clinical_trials.json# - drug_interactions.csv2. Define Your Ontology
ENTITY_TYPES = [ "Drug", "Disease", "Symptom", "Treatment", "Gene", "Protein"]
RELATIONS = [ "TREATS", # Drug -[TREATS]-> Disease "CAUSES", # Gene -[CAUSES]-> Disease "HAS_SYMPTOM", # Disease -[HAS_SYMPTOM]-> Symptom "INTERACTS_WITH", # Drug -[INTERACTS_WITH]-> Drug "INHIBITS", # Drug -[INHIBITS]-> Protein]3. Customize Entity Extraction
import spacy
# Load medical NER modelnlp = spacy.load("en_ner_bc5cdr_md") # Bio-medical NER
def extract_medical_entities(text: str): doc = nlp(text)
entities = [] for ent in doc.ents: entities.append({ "text": ent.text, "type": ent.label_, # DISEASE, CHEMICAL "start": ent.start_char, "end": ent.end_char })
return entities4. Run the Pipeline
# Load medical datapython pipeline/src/main.py \ --domain medical \ --data-path data/medical/ \ --ontology domain_config/medical_ontology.py
# This will:# 1. Load medical documents# 2. Extract medical entities# 3. Build medical ontology graph# 4. Generate embeddings# 5. Load into vector DB5. Query Your Domain
curl -X POST http://localhost:8001/query \ -H "Content-Type: application/json" \ -d '{ "query": "What drugs treat hypertension by inhibiting ACE?", "domain": "medical" }'The same multi-layer retrieval pipeline now works for your medical domain!
Production Checklist
Before going live:
Security
- API authentication enabled
- HTTPS/TLS configured
- Secrets in AWS Secrets Manager (not env vars)
- VPC security groups configured
- IAM roles with least privilege
- Rate limiting enabled
Reliability
- Health checks on all services
- Auto-scaling configured
- Database backups enabled
- Disaster recovery plan
- Circuit breakers for external APIs
- Retry logic with exponential backoff
Observability
- Structured logging enabled
- Metrics collection (Prometheus)
- Distributed tracing (Jaeger/X-Ray)
- Alerting configured
- Dashboards created (Grafana)
- On-call rotation defined
Performance
- Load testing completed
- Caching strategy implemented
- Database indexes optimized
- Connection pooling configured
- CDN configured for UI
Quality
- Unit tests > 80% coverage
- Integration tests passing
- E2E smoke tests passing
- Answer quality benchmarks met
- Documentation complete
Future Enhancements
Ideas for extending Ragforge:
1. Advanced Reranking
- Cross-encoder models for final reranking
- Learn-to-rank with user feedback
2. Conversational Memory
- Redis-based conversation history
- Context carryover between queries
3. Multi-Modal Support
- Image embeddings (CLIP)
- Table extraction and understanding
- PDF visual parsing
4. Evaluation Framework
- Automated RAG evaluation metrics
- A/B testing framework
- Human feedback loop
5. Query Optimization
- Query rewriting with LLMs
- Intent-based retrieval strategies
- Adaptive hop depth
Conclusion: You’re Ready!
You now have a complete understanding of building production-ready, multi-layer RAG systems:
Part 1: Why vector search alone isn’t enough Part 2: How multi-hop retrieval actually works Part 3: The microservices architecture Part 4: Deploying and operating in production
Get Started Today
# Clone and run locallygit clone https://github.com/iamthatdev/ragforge.gitcd ragforgemake docker-up
# Visit http://localhost:3000 and start asking questions!Series Complete
Congratulations on completing the series!
You’ve learned:
- The limitations of vector-only RAG
- How multi-layer retrieval combines semantic + symbolic + multi-hop
- A production microservices architecture
- Deployment, testing, and operations
What’s next?
- Build your own RAG system with Ragforge
- Extend it to your domain
- Share your experience with the community
Part 3: Microservices Architecture ← Part 4 (you are here)
Start building: