Microservices Architecture - Building Scalable RAG Systems
/ 14 min read
Table of Contents
Part 3: Microservices Architecture - Building Scalable RAG Systems
This is Part 3 of a 4-part series on building production-ready, multi-layer RAG systems with Ragforge.
- Part 1: Beyond Vector Search
- Part 2: The Multi-Hop Retrieval Pipeline
- Part 3: Microservices Architecture (you are here)
- Part 4: From Development to Production
From Algorithm to Architecture
In Parts 1 and 2, we explored:
- Why multi-layer RAG is necessary (Part 1)
- How the retrieval pipeline works (Part 2)
Now comes the crucial question: How do we build this as a production system?
A multi-layer RAG system has many responsibilities:
- Parse queries and extract entities
- Generate embeddings
- Search vector databases
- Traverse knowledge graphs
- Fuse and rerank results
- Call LLM APIs
- Ingest and process data
- Serve user requests
You could build this as a monolith. But there’s a better way.
Why Microservices for RAG?
The Monolith Approach
┌─────────────────────────────────────────┐│ Single RAG Application ││ ││ - Query parsing ││ - Embedding generation ││ - Vector search ││ - Graph traversal ││ - Fusion engine ││ - LLM calls ││ - Data pipeline ││ - API serving ││ ││ All in one codebase, one deployment │└─────────────────────────────────────────┘Problems:
- Can’t scale components independently (embeddings ≠ LLM calls)
- Tight coupling makes testing harder
- One bug can crash the entire system
- Hard to optimize different parts differently
- Technology lock-in (all Python, or all Node, etc.)
The Microservices Approach
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐│ API │ │Orchestr- │ │Embedding │ │ Ontology ││ Gateway │─→│ ator │─→│ Service │ │ Service │└──────────┘ └────┬─────┘ └──────────┘ └──────────┘ │ ├─────────────┬─────────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Vector │ │ LLM │ │ Pipeline │ │ DB │ │ Proxy │ │ Service │ └──────────┘ └──────────┘ └──────────┘Benefits:
- ✅ Independent scaling: Scale embeddings 3x, orchestrator 2x, LLM proxy 1x
- ✅ Technology flexibility: Python for ML, Node for API gateway (if needed)
- ✅ Fault isolation: Embedding service crash doesn’t affect LLM calls
- ✅ Independent deployment: Update graph traversal without touching vector search
- ✅ Clear testing boundaries: Unit test each service, integration test interactions
- ✅ Team autonomy: Different teams own different services
Tradeoffs:
- ❌ Network overhead (mitigated by keeping services in same VPC)
- ❌ More complex deployment (solved by Docker Compose / Kubernetes)
- ❌ Distributed debugging (solved by good observability)
Decision: For a system with 7+ distinct responsibilities, microservices provide better long-term velocity and reliability.
The Ragforge Service Architecture
Ragforge implements 7 core services + 1 database as independent Docker containers:
┌───────────────────────────────────────────────────────────────┐│ USER / BROWSER │└────────────────────────────┬──────────────────────────────────┘ │ ▼ ┌─────────────────┐ │ UI Frontend │ :3000 │ (Next.js) │ └────────┬─────────┘ │ ▼ ┌─────────────────┐ │ API Gateway │ :8001 │ (FastAPI) │ └────────┬─────────┘ │ ▼ ┌──────────────────────────────────────┐ │ Retrieval Orchestrator │ :8002 │ (FastAPI + Coordination Logic) │ └──┬──────────┬──────────┬──────────┬──┘ │ │ │ │ ┌────────┘ │ │ └─────────┐ │ │ │ │ ▼ ▼ ▼ ▼┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐│Embedding │ │ Vector │ │ Ontology │ │ LLM ││ Service │ │ DB │ │ Service │ │ Proxy ││:8003 │ │(Postgres │ │ :8004 │ │ :8005 ││ │ │pgvector) │ │(Neo4j or │ │ ││ │ │:5432 │ │NetworkX) │ │ │└──────────┘ └──────────┘ └──────────┘ └──────────┘ ▲ │ ┌──────────┐ │ Pipeline │ :8006 │ Service │ │(Ingestion│ └──────────┘Service 1: API Gateway
Purpose
Single entry point for all client requests. Handles routing, basic validation, and optionally authentication.
Responsibilities
- Route requests to orchestrator
- Request/response validation
- Rate limiting (optional)
- Authentication/authorization (optional)
- Request logging and metrics
Tech Stack
- Framework: FastAPI
- Language: Python 3.12
- Port: 8001
API Endpoints
from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelimport httpx
app = FastAPI()
class QueryRequest(BaseModel): query: str max_results: int = 10 debug: bool = False
class QueryResponse(BaseModel): answer: str citations: list retrieval_trace: dict | None = None
@app.post("/query", response_model=QueryResponse)async def query(request: QueryRequest): """ Main query endpoint - forwards to orchestrator """ async with httpx.AsyncClient() as client: response = await client.post( "http://orchestrator:8002/orchestrate", json=request.dict(), timeout=30.0 ) if response.status_code != 200: raise HTTPException( status_code=response.status_code, detail="Orchestrator error" ) return response.json()
@app.get("/health")async def health(): """Health check endpoint""" return {"status": "healthy", "service": "api_gateway"}Dockerfile
# api_gateway/DockerfileFROM python:3.12-slim
WORKDIR /app
# Install uv for fast dependency managementRUN pip install uv
# Copy dependenciesCOPY pyproject.toml .RUN uv pip install --system -r pyproject.toml
# Copy source codeCOPY src/ ./src/
# Expose portEXPOSE 8001
# Run applicationCMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8001"]Why This Service?
- Single entry point: Easier to secure, monitor, and manage
- Abstraction: Internal service changes don’t affect external API
- Centralized concerns: Rate limiting, auth, logging in one place
Service 2: Retrieval Orchestrator (The Brain)
Purpose
Coordinates the entire multi-hop retrieval pipeline. This is the most complex service.
Responsibilities
- Query parsing and understanding
- Entity extraction
- Semantic expansion
- Ontology expansion (via graph service)
- Multi-hop retrieval coordination
- Fusion and reranking
- Prompt construction
- LLM invocation (via proxy)
Tech Stack
- Framework: FastAPI
- Language: Python 3.12
- Port: 8002
- Key Libraries: pydantic, httpx, numpy
Internal Architecture
orchestrator/├── Dockerfile├── pyproject.toml└── src/ ├── main.py # FastAPI app ├── models/ │ ├── query.py # Pydantic models │ └── retrieval.py ├── services/ │ ├── query_parser.py # Parse queries │ ├── semantic_expander.py │ ├── ontology_expander.py │ ├── hop_retriever.py # Multi-hop logic │ ├── fusion_engine.py # Merge & rerank │ └── prompt_builder.py └── clients/ ├── vector_client.py # Talk to pgvector ├── ontology_client.py # Talk to graph └── llm_client.py # Talk to LLM proxyExample: Hop Retriever
from typing import Listfrom ..clients.vector_client import VectorClientfrom ..clients.ontology_client import OntologyClient
class HopRetriever: """ Executes multi-hop retrieval strategy """ def __init__(self): self.vector_client = VectorClient() self.ontology_client = OntologyClient()
async def retrieve( self, query: str, query_embedding: List[float], entities: List[str], temporal_constraint: dict | None = None ) -> dict: """ Execute 3-hop retrieval """ results = {"hops": []}
# Hop 1: Dense vector search hop1_results = await self.vector_client.search( embedding=query_embedding, limit=20, metadata_filter=temporal_constraint ) results["hops"].append({ "hop": 1, "type": "vector_search", "count": len(hop1_results), "results": hop1_results })
# Hop 2: Graph-guided retrieval # Get related entities from graph expanded_entities = await self.ontology_client.expand(entities)
hop2_results = [] for entity in expanded_entities: entity_docs = await self.vector_client.search_by_entity( entity=entity, embedding=query_embedding, limit=10 ) hop2_results.extend(entity_docs)
results["hops"].append({ "hop": 2, "type": "graph_guided", "expanded_entities": expanded_entities, "count": len(hop2_results), "results": hop2_results })
# Hop 3: Exact match on metadata/keywords hop3_results = await self.vector_client.exact_match( entities=entities, temporal_constraint=temporal_constraint, limit=10 ) results["hops"].append({ "hop": 3, "type": "exact_match", "count": len(hop3_results), "results": hop3_results })
return resultsExample: Fusion Engine
from typing import List, Dictimport numpy as np
class FusionEngine: """ Merges and reranks results from multiple hops """
def fuse( self, hop_results: dict, query_info: dict, weights: dict = None ) -> List[dict]: """ Combine results from all hops with multi-signal scoring """ if weights is None: weights = { "semantic": 0.25, "graph": 0.30, "temporal": 0.25, "exact": 0.15, "hop": 0.05 }
# Collect all documents all_docs = [] for hop in hop_results["hops"]: for doc in hop["results"]: doc["hop"] = hop["hop"] doc["hop_type"] = hop["type"] all_docs.append(doc)
# Deduplicate by document ID unique_docs = self._deduplicate(all_docs)
# Score each document scored_docs = [] for doc in unique_docs: score = self._compute_fusion_score(doc, query_info, weights) doc["fusion_score"] = score scored_docs.append(doc)
# Sort by score (descending) scored_docs.sort(key=lambda x: x["fusion_score"], reverse=True)
return scored_docs
def _compute_fusion_score( self, doc: dict, query_info: dict, weights: dict ) -> float: """ Multi-signal scoring """ scores = { "semantic": doc.get("similarity_score", 0.0), "graph": self._graph_score(doc, query_info), "temporal": self._temporal_score(doc, query_info), "exact": 1.0 if doc.get("exact_match") else 0.0, "hop": 1.0 if doc["hop"] == 1 else 0.9 if doc["hop"] == 2 else 0.8 }
final_score = sum( scores[key] * weights[key] for key in scores ) return final_score
def _graph_score(self, doc: dict, query_info: dict) -> float: """ Score based on entity overlap with graph expansion """ doc_entities = set(doc.get("entities", [])) query_entities = set(query_info.get("expanded_entities", []))
if not query_entities: return 0.5 # Neutral if no graph info
overlap = len(doc_entities & query_entities) return min(overlap / len(query_entities), 1.0)
def _temporal_score(self, doc: dict, query_info: dict) -> float: """ Score based on temporal alignment """ constraint = query_info.get("temporal_constraint") if not constraint: return 1.0 # No constraint = perfect score
doc_day = doc.get("metadata", {}).get("day") if doc_day is None: return 0.5 # Unknown = neutral
ref_day = constraint.get("reference") if constraint["type"] == "after": return 1.0 if doc_day > ref_day else 0.0 elif constraint["type"] == "before": return 1.0 if doc_day < ref_day else 0.0 elif constraint["type"] == "exact": return 1.0 if doc_day == ref_day else 0.0
return 0.5
def _deduplicate(self, docs: List[dict]) -> List[dict]: """Remove duplicate documents by ID""" seen = set() unique = [] for doc in docs: if doc["id"] not in seen: seen.add(doc["id"]) unique.append(doc) return uniqueWhy This Is Separate
- Complexity: 6+ distinct responsibilities
- Stateless: Can scale horizontally
- Core logic: Changes frequently as retrieval improves
- Testing: Need to mock vector DB, graph, LLM
Service 3: Embeddings Service
Purpose
Generate vector embeddings for documents and queries.
Responsibilities
- Embed query text
- Embed document chunks
- Batch embedding jobs
- Model management
Tech Stack
- Framework: FastAPI
- Language: Python 3.12
- Port: 8003
- Model: Sentence Transformers or OpenAI API
API Implementation
from fastapi import FastAPIfrom pydantic import BaseModelfrom sentence_transformers import SentenceTransformerfrom typing import List
app = FastAPI()
# Load model at startupmodel = SentenceTransformer('all-MiniLM-L6-v2')
class EmbedRequest(BaseModel): text: str | List[str]
class EmbedResponse(BaseModel): embeddings: List[List[float]] model: str dimension: int
@app.post("/embed/query", response_model=EmbedResponse)async def embed_query(request: EmbedRequest): """ Embed a single query or multiple queries """ texts = [request.text] if isinstance(request.text, str) else request.text
embeddings = model.encode( texts, convert_to_numpy=True, show_progress_bar=False )
return EmbedResponse( embeddings=embeddings.tolist(), model=model.get_config_dict()["name"], dimension=embeddings.shape[1] )
@app.post("/embed/document", response_model=EmbedResponse)async def embed_document(request: EmbedRequest): """ Embed document chunks (same implementation, different endpoint) """ return await embed_query(request)
@app.post("/embed/batch")async def embed_batch(texts: List[str]): """ Batch embedding for pipeline processing """ embeddings = model.encode( texts, batch_size=32, convert_to_numpy=True, show_progress_bar=True ) return { "count": len(texts), "embeddings": embeddings.tolist() }
@app.get("/health")async def health(): return { "status": "healthy", "service": "embeddings", "model": model.get_config_dict()["name"] }Why Separate?
- Resource isolation: Embedding models use GPU/CPU differently
- Independent scaling: Query volume ≠ document ingestion volume
- Model flexibility: Swap models without touching orchestrator
- Batch optimization: Different optimizations for batch vs. single
Service 4: Vector Database (pgvector)
Purpose
Store and search vector embeddings with metadata.
Tech Stack
- Database: PostgreSQL 16
- Extension: pgvector
- Port: 5432
Schema
-- vector_db/init.sql
-- Enable pgvector extensionCREATE EXTENSION IF NOT EXISTS vector;
-- Documents tableCREATE TABLE documents ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), title TEXT NOT NULL, content TEXT NOT NULL, chunk_index INTEGER NOT NULL,
-- Entity and metadata (JSONB for flexibility) entities JSONB DEFAULT '[]'::jsonb, metadata JSONB DEFAULT '{}'::jsonb,
-- Vector embedding embedding VECTOR(384), -- Dimension depends on model
-- Timestamps created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
-- Indexes for performanceCREATE INDEX idx_documents_entities ON documents USING GIN(entities);CREATE INDEX idx_documents_metadata ON documents USING GIN(metadata);
-- Vector similarity index (HNSW is faster than IVFFlat for most use cases)CREATE INDEX idx_documents_embedding ON documentsUSING hnsw (embedding vector_cosine_ops);
-- Metadata filters (common queries)CREATE INDEX idx_documents_day ON documents ((metadata->>'day'));CREATE INDEX idx_documents_entity_type ON documents ((metadata->>'entity_type'));Example Queries
import asyncpgfrom typing import List
class VectorClient: def __init__(self): self.pool = None
async def connect(self): self.pool = await asyncpg.create_pool( host="vector_db", port=5432, database="ragdb", user="raguser", password="ragpass" )
async def search( self, embedding: List[float], limit: int = 20, metadata_filter: dict | None = None ) -> List[dict]: """ Vector similarity search with optional metadata filtering """ query = """ SELECT id, title, content, entities, metadata, 1 - (embedding <=> $1) as similarity FROM documents """
# Add metadata filter if provided if metadata_filter: if "day" in metadata_filter: query += " WHERE (metadata->>'day')::int > $3"
query += " ORDER BY embedding <=> $1 LIMIT $2"
async with self.pool.acquire() as conn: if metadata_filter and "day" in metadata_filter: rows = await conn.fetch( query, embedding, limit, metadata_filter["day"] ) else: rows = await conn.fetch(query, embedding, limit)
return [dict(row) for row in rows]Why pgvector?
- ✅ PostgreSQL integration: Leverage existing expertise
- ✅ ACID compliance: Data integrity guarantees
- ✅ Hybrid search: Vector + SQL in one query
- ✅ Mature tooling: Backups, replication, monitoring
- ✅ Cost-effective: No specialized vector DB pricing
Alternative specialized vector DBs (Pinecone, Weaviate, Qdrant) are great but add operational complexity. Start with pgvector.
Service 5: Ontology Service
Purpose
Manage knowledge graph and perform graph traversals.
Responsibilities
- Load ontology from triples
- Entity expansion (find neighbors)
- Multi-hop graph traversal
- Relationship queries
Tech Stack
- Framework: FastAPI
- Language: Python 3.12
- Port: 8004
- Graph: NetworkX (demo) or Neo4j (production)
API Implementation (NetworkX)
from fastapi import FastAPIfrom pydantic import BaseModelimport networkx as nxfrom typing import List
app = FastAPI()
# Load graph at startupG = nx.DiGraph()
@app.on_event("startup")async def load_graph(): """Load graph from CSV triples""" # Format: subject,relation,object with open("src/graph/triples.csv") as f: for line in f: subj, rel, obj = line.strip().split(",") G.add_edge(subj, obj, relation=rel)
class ExpandRequest(BaseModel): entities: List[str] max_hops: int = 2 relations: List[str] | None = None
@app.post("/ontology/expand")async def expand(request: ExpandRequest): """ Expand entities to their graph neighbors """ expanded = set(request.entities)
for entity in request.entities: if entity not in G: continue
# Get neighbors within max_hops for _ in range(request.max_hops): neighbors = list(G.neighbors(entity))
# Filter by relation type if specified if request.relations: neighbors = [ n for n in neighbors if G[entity][n].get("relation") in request.relations ]
expanded.update(neighbors)
return { "original": request.entities, "expanded": list(expanded), "count": len(expanded) }
@app.post("/ontology/multihop")async def multihop(start: str, end: str, max_depth: int = 3): """ Find paths between two entities """ try: paths = list(nx.all_simple_paths( G, start, end, cutoff=max_depth )) return {"paths": paths, "count": len(paths)} except nx.NodeNotFound: return {"paths": [], "count": 0}
@app.get("/health")async def health(): return { "status": "healthy", "service": "ontology", "nodes": G.number_of_nodes(), "edges": G.number_of_edges() }Production: Neo4j Implementation
# ontology_service/src/main.py (Neo4j version)from neo4j import AsyncGraphDatabase
class OntologyService: def __init__(self): self.driver = AsyncGraphDatabase.driver( "bolt://neo4j:7687", auth=("neo4j", "password") )
async def expand(self, entities: List[str], max_hops: int = 2): query = """ MATCH (start) WHERE start.name IN $entities CALL apoc.path.subgraphNodes(start, { maxLevel: $max_hops }) YIELD node RETURN DISTINCT node.name as name """
async with self.driver.session() as session: result = await session.run( query, entities=entities, max_hops=max_hops ) records = await result.data() return [r["name"] for r in records]Service 6: LLM Proxy
Purpose
Abstract LLM provider and provide unified interface.
Responsibilities
- Call external LLM APIs (OpenAI, Anthropic, Bedrock)
- Retry logic and error handling
- Token usage tracking
- Response caching (optional)
Tech Stack
- Framework: FastAPI
- Language: Python 3.12
- Port: 8005
Implementation
from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelimport openaiimport os
app = FastAPI()
openai.api_key = os.getenv("OPENAI_API_KEY")
class GenerateRequest(BaseModel): prompt: str max_tokens: int = 1000 temperature: float = 0.7
class GenerateResponse(BaseModel): answer: str model: str tokens_used: int finish_reason: str
@app.post("/llm/generate", response_model=GenerateResponse)async def generate(request: GenerateRequest): """ Generate completion from LLM """ try: response = await openai.ChatCompletion.acreate( model="gpt-4", messages=[ {"role": "user", "content": request.prompt} ], max_tokens=request.max_tokens, temperature=request.temperature )
return GenerateResponse( answer=response.choices[0].message.content, model=response.model, tokens_used=response.usage.total_tokens, finish_reason=response.choices[0].finish_reason )
except Exception as e: raise HTTPException( status_code=500, detail=f"LLM generation failed: {str(e)}" )
@app.get("/health")async def health(): return {"status": "healthy", "service": "llm_proxy"}Why a Proxy?
- Abstraction: Swap OpenAI → Anthropic → Bedrock without changing orchestrator
- Security: API keys managed in one place
- Observability: Track all LLM calls, costs, latency
- Resilience: Retry logic, fallbacks, rate limiting
Service 7: Pipeline Service
Purpose
Ingest raw data and transform it into vector/graph representations.
Responsibilities
- Ingest documents
- Clean and normalize text
- Chunk into segments
- Extract entities and relationships
- Generate embeddings (via embedding service)
- Load into vector DB and graph DB
Tech Stack
- Framework: FastAPI + CLI (Typer)
- Language: Python 3.12
- Port: 8006
Example Pipeline
from fastapi import FastAPIimport asyncio
app = FastAPI()
@app.post("/pipeline/run")async def run_pipeline(source: str, domain: str = "default"): """ Run full ingestion pipeline """ steps = [ ingest_documents(source), clean_and_chunk(), extract_entities(), build_graph_triples(), generate_embeddings(), load_to_databases() ]
results = {"steps": []} for step in steps: result = await step results["steps"].append(result)
return results
async def ingest_documents(source: str): """Load raw documents""" # Implementation here return {"step": "ingest", "status": "success"}
async def extract_entities(): """Extract entities using NER""" # Use spaCy or similar return {"step": "entities", "status": "success"}Service-to-Service Communication
Communication Pattern
# All services communicate via HTTP REST# Using asyncio for non-blocking calls
# Example: Orchestrator calling multiple servicesasync def orchestrate(query: str): async with httpx.AsyncClient() as client: # Parallel calls where possible embed_task = client.post( "http://embeddings:8003/embed/query", json={"text": query} )
ontology_task = client.post( "http://ontology_service:8004/ontology/expand", json={"entities": extracted_entities} )
# Await both embed_response, ontology_response = await asyncio.gather( embed_task, ontology_task )
# Sequential call (depends on above) vector_response = await client.post( "http://vector_db:5432/search", # via client library json={ "embedding": embed_response.json()["embeddings"][0], "expanded_entities": ontology_response.json()["expanded"] } )Docker Networking
services: orchestrator: # Can reach other services by name # http://embeddings:8003 # http://ontology_service:8004 depends_on: - embeddings - ontology_service - vector_dbKey Design Decisions & Tradeoffs
1. REST vs gRPC?
Choice: REST
Why:
- Simpler debugging (curl, browser)
- Universal support
- Great tooling (OpenAPI, Swagger)
- Good enough performance for our use case
When to use gRPC:
- If latency becomes critical (<10ms service calls needed)
- If type safety at network boundary is required
- If streaming is needed
2. Sync vs Async?
Choice: Async (asyncio + FastAPI)
Why:
- Orchestrator makes many I/O calls (DB, HTTP)
- Don’t waste threads waiting for responses
- FastAPI makes async easy
3. Message Queue (Kafka, RabbitMQ)?
Choice: Not yet (direct HTTP)
Why:
- Adds complexity
- Not needed for request/response pattern
- Would use for: async pipelines, event-driven updates
When to add:
- Pipeline processing becomes very large
- Need pub/sub for cache invalidation
- Event sourcing becomes important
What’s Coming in Part 4
We’ve designed a clean microservices architecture. But how do you actually run this in production?
In Part 4, we’ll cover:
- Local development setup (Docker Compose)
- Production deployment on AWS (ECS and EKS)
- Testing strategy (unit, integration, E2E)
- Observability and debugging
- Performance optimization
- Extending to new domains
Key Takeaways
- Microservices enable independent scaling and development velocity
- 7 core services each with clear responsibilities
- pgvector + Neo4j + FastAPI provide a solid tech foundation
- REST is sufficient for most RAG systems; optimize later if needed
- Design for testability: Clear service boundaries = easy mocking
Continue the Series
Part 2: Multi-Hop Retrieval ← Part 3 (you are here) → Part 4: From Development to Production
Coming Up Next: Deploy Ragforge locally and to production, implement comprehensive testing, and add observability.