skip to content

Part 3: Microservices Architecture - Building Scalable RAG Systems

This is Part 3 of a 4-part series on building production-ready, multi-layer RAG systems with Ragforge.


From Algorithm to Architecture

In Parts 1 and 2, we explored:

  • Why multi-layer RAG is necessary (Part 1)
  • How the retrieval pipeline works (Part 2)

Now comes the crucial question: How do we build this as a production system?

A multi-layer RAG system has many responsibilities:

  • Parse queries and extract entities
  • Generate embeddings
  • Search vector databases
  • Traverse knowledge graphs
  • Fuse and rerank results
  • Call LLM APIs
  • Ingest and process data
  • Serve user requests

You could build this as a monolith. But there’s a better way.


Why Microservices for RAG?

The Monolith Approach

┌─────────────────────────────────────────┐
│ Single RAG Application │
│ │
│ - Query parsing │
│ - Embedding generation │
│ - Vector search │
│ - Graph traversal │
│ - Fusion engine │
│ - LLM calls │
│ - Data pipeline │
│ - API serving │
│ │
│ All in one codebase, one deployment │
└─────────────────────────────────────────┘

Problems:

  • Can’t scale components independently (embeddings ≠ LLM calls)
  • Tight coupling makes testing harder
  • One bug can crash the entire system
  • Hard to optimize different parts differently
  • Technology lock-in (all Python, or all Node, etc.)

The Microservices Approach

┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ API │ │Orchestr- │ │Embedding │ │ Ontology │
│ Gateway │─→│ ator │─→│ Service │ │ Service │
└──────────┘ └────┬─────┘ └──────────┘ └──────────┘
├─────────────┬─────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Vector │ │ LLM │ │ Pipeline │
│ DB │ │ Proxy │ │ Service │
└──────────┘ └──────────┘ └──────────┘

Benefits:

  • Independent scaling: Scale embeddings 3x, orchestrator 2x, LLM proxy 1x
  • Technology flexibility: Python for ML, Node for API gateway (if needed)
  • Fault isolation: Embedding service crash doesn’t affect LLM calls
  • Independent deployment: Update graph traversal without touching vector search
  • Clear testing boundaries: Unit test each service, integration test interactions
  • Team autonomy: Different teams own different services

Tradeoffs:

  • ❌ Network overhead (mitigated by keeping services in same VPC)
  • ❌ More complex deployment (solved by Docker Compose / Kubernetes)
  • ❌ Distributed debugging (solved by good observability)

Decision: For a system with 7+ distinct responsibilities, microservices provide better long-term velocity and reliability.


The Ragforge Service Architecture

Ragforge implements 7 core services + 1 database as independent Docker containers:

┌───────────────────────────────────────────────────────────────┐
│ USER / BROWSER │
└────────────────────────────┬──────────────────────────────────┘
┌─────────────────┐
│ UI Frontend │ :3000
│ (Next.js) │
└────────┬─────────┘
┌─────────────────┐
│ API Gateway │ :8001
│ (FastAPI) │
└────────┬─────────┘
┌──────────────────────────────────────┐
│ Retrieval Orchestrator │ :8002
│ (FastAPI + Coordination Logic) │
└──┬──────────┬──────────┬──────────┬──┘
│ │ │ │
┌────────┘ │ │ └─────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│Embedding │ │ Vector │ │ Ontology │ │ LLM │
│ Service │ │ DB │ │ Service │ │ Proxy │
│:8003 │ │(Postgres │ │ :8004 │ │ :8005 │
│ │ │pgvector) │ │(Neo4j or │ │ │
│ │ │:5432 │ │NetworkX) │ │ │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
┌──────────┐
│ Pipeline │ :8006
│ Service │
│(Ingestion│
└──────────┘

Service 1: API Gateway

Purpose

Single entry point for all client requests. Handles routing, basic validation, and optionally authentication.

Responsibilities

  • Route requests to orchestrator
  • Request/response validation
  • Rate limiting (optional)
  • Authentication/authorization (optional)
  • Request logging and metrics

Tech Stack

  • Framework: FastAPI
  • Language: Python 3.12
  • Port: 8001

API Endpoints

api_gateway/src/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
app = FastAPI()
class QueryRequest(BaseModel):
query: str
max_results: int = 10
debug: bool = False
class QueryResponse(BaseModel):
answer: str
citations: list
retrieval_trace: dict | None = None
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""
Main query endpoint - forwards to orchestrator
"""
async with httpx.AsyncClient() as client:
response = await client.post(
"http://orchestrator:8002/orchestrate",
json=request.dict(),
timeout=30.0
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail="Orchestrator error"
)
return response.json()
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "healthy", "service": "api_gateway"}

Dockerfile

# api_gateway/Dockerfile
FROM python:3.12-slim
WORKDIR /app
# Install uv for fast dependency management
RUN pip install uv
# Copy dependencies
COPY pyproject.toml .
RUN uv pip install --system -r pyproject.toml
# Copy source code
COPY src/ ./src/
# Expose port
EXPOSE 8001
# Run application
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8001"]

Why This Service?

  • Single entry point: Easier to secure, monitor, and manage
  • Abstraction: Internal service changes don’t affect external API
  • Centralized concerns: Rate limiting, auth, logging in one place

Service 2: Retrieval Orchestrator (The Brain)

Purpose

Coordinates the entire multi-hop retrieval pipeline. This is the most complex service.

Responsibilities

  • Query parsing and understanding
  • Entity extraction
  • Semantic expansion
  • Ontology expansion (via graph service)
  • Multi-hop retrieval coordination
  • Fusion and reranking
  • Prompt construction
  • LLM invocation (via proxy)

Tech Stack

  • Framework: FastAPI
  • Language: Python 3.12
  • Port: 8002
  • Key Libraries: pydantic, httpx, numpy

Internal Architecture

orchestrator/
├── Dockerfile
├── pyproject.toml
└── src/
├── main.py # FastAPI app
├── models/
│ ├── query.py # Pydantic models
│ └── retrieval.py
├── services/
│ ├── query_parser.py # Parse queries
│ ├── semantic_expander.py
│ ├── ontology_expander.py
│ ├── hop_retriever.py # Multi-hop logic
│ ├── fusion_engine.py # Merge & rerank
│ └── prompt_builder.py
└── clients/
├── vector_client.py # Talk to pgvector
├── ontology_client.py # Talk to graph
└── llm_client.py # Talk to LLM proxy

Example: Hop Retriever

orchestrator/src/services/hop_retriever.py
from typing import List
from ..clients.vector_client import VectorClient
from ..clients.ontology_client import OntologyClient
class HopRetriever:
"""
Executes multi-hop retrieval strategy
"""
def __init__(self):
self.vector_client = VectorClient()
self.ontology_client = OntologyClient()
async def retrieve(
self,
query: str,
query_embedding: List[float],
entities: List[str],
temporal_constraint: dict | None = None
) -> dict:
"""
Execute 3-hop retrieval
"""
results = {"hops": []}
# Hop 1: Dense vector search
hop1_results = await self.vector_client.search(
embedding=query_embedding,
limit=20,
metadata_filter=temporal_constraint
)
results["hops"].append({
"hop": 1,
"type": "vector_search",
"count": len(hop1_results),
"results": hop1_results
})
# Hop 2: Graph-guided retrieval
# Get related entities from graph
expanded_entities = await self.ontology_client.expand(entities)
hop2_results = []
for entity in expanded_entities:
entity_docs = await self.vector_client.search_by_entity(
entity=entity,
embedding=query_embedding,
limit=10
)
hop2_results.extend(entity_docs)
results["hops"].append({
"hop": 2,
"type": "graph_guided",
"expanded_entities": expanded_entities,
"count": len(hop2_results),
"results": hop2_results
})
# Hop 3: Exact match on metadata/keywords
hop3_results = await self.vector_client.exact_match(
entities=entities,
temporal_constraint=temporal_constraint,
limit=10
)
results["hops"].append({
"hop": 3,
"type": "exact_match",
"count": len(hop3_results),
"results": hop3_results
})
return results

Example: Fusion Engine

orchestrator/src/services/fusion_engine.py
from typing import List, Dict
import numpy as np
class FusionEngine:
"""
Merges and reranks results from multiple hops
"""
def fuse(
self,
hop_results: dict,
query_info: dict,
weights: dict = None
) -> List[dict]:
"""
Combine results from all hops with multi-signal scoring
"""
if weights is None:
weights = {
"semantic": 0.25,
"graph": 0.30,
"temporal": 0.25,
"exact": 0.15,
"hop": 0.05
}
# Collect all documents
all_docs = []
for hop in hop_results["hops"]:
for doc in hop["results"]:
doc["hop"] = hop["hop"]
doc["hop_type"] = hop["type"]
all_docs.append(doc)
# Deduplicate by document ID
unique_docs = self._deduplicate(all_docs)
# Score each document
scored_docs = []
for doc in unique_docs:
score = self._compute_fusion_score(doc, query_info, weights)
doc["fusion_score"] = score
scored_docs.append(doc)
# Sort by score (descending)
scored_docs.sort(key=lambda x: x["fusion_score"], reverse=True)
return scored_docs
def _compute_fusion_score(
self,
doc: dict,
query_info: dict,
weights: dict
) -> float:
"""
Multi-signal scoring
"""
scores = {
"semantic": doc.get("similarity_score", 0.0),
"graph": self._graph_score(doc, query_info),
"temporal": self._temporal_score(doc, query_info),
"exact": 1.0 if doc.get("exact_match") else 0.0,
"hop": 1.0 if doc["hop"] == 1 else 0.9 if doc["hop"] == 2 else 0.8
}
final_score = sum(
scores[key] * weights[key]
for key in scores
)
return final_score
def _graph_score(self, doc: dict, query_info: dict) -> float:
"""
Score based on entity overlap with graph expansion
"""
doc_entities = set(doc.get("entities", []))
query_entities = set(query_info.get("expanded_entities", []))
if not query_entities:
return 0.5 # Neutral if no graph info
overlap = len(doc_entities & query_entities)
return min(overlap / len(query_entities), 1.0)
def _temporal_score(self, doc: dict, query_info: dict) -> float:
"""
Score based on temporal alignment
"""
constraint = query_info.get("temporal_constraint")
if not constraint:
return 1.0 # No constraint = perfect score
doc_day = doc.get("metadata", {}).get("day")
if doc_day is None:
return 0.5 # Unknown = neutral
ref_day = constraint.get("reference")
if constraint["type"] == "after":
return 1.0 if doc_day > ref_day else 0.0
elif constraint["type"] == "before":
return 1.0 if doc_day < ref_day else 0.0
elif constraint["type"] == "exact":
return 1.0 if doc_day == ref_day else 0.0
return 0.5
def _deduplicate(self, docs: List[dict]) -> List[dict]:
"""Remove duplicate documents by ID"""
seen = set()
unique = []
for doc in docs:
if doc["id"] not in seen:
seen.add(doc["id"])
unique.append(doc)
return unique

Why This Is Separate

  • Complexity: 6+ distinct responsibilities
  • Stateless: Can scale horizontally
  • Core logic: Changes frequently as retrieval improves
  • Testing: Need to mock vector DB, graph, LLM

Service 3: Embeddings Service

Purpose

Generate vector embeddings for documents and queries.

Responsibilities

  • Embed query text
  • Embed document chunks
  • Batch embedding jobs
  • Model management

Tech Stack

  • Framework: FastAPI
  • Language: Python 3.12
  • Port: 8003
  • Model: Sentence Transformers or OpenAI API

API Implementation

embeddings/src/main.py
from fastapi import FastAPI
from pydantic import BaseModel
from sentence_transformers import SentenceTransformer
from typing import List
app = FastAPI()
# Load model at startup
model = SentenceTransformer('all-MiniLM-L6-v2')
class EmbedRequest(BaseModel):
text: str | List[str]
class EmbedResponse(BaseModel):
embeddings: List[List[float]]
model: str
dimension: int
@app.post("/embed/query", response_model=EmbedResponse)
async def embed_query(request: EmbedRequest):
"""
Embed a single query or multiple queries
"""
texts = [request.text] if isinstance(request.text, str) else request.text
embeddings = model.encode(
texts,
convert_to_numpy=True,
show_progress_bar=False
)
return EmbedResponse(
embeddings=embeddings.tolist(),
model=model.get_config_dict()["name"],
dimension=embeddings.shape[1]
)
@app.post("/embed/document", response_model=EmbedResponse)
async def embed_document(request: EmbedRequest):
"""
Embed document chunks (same implementation, different endpoint)
"""
return await embed_query(request)
@app.post("/embed/batch")
async def embed_batch(texts: List[str]):
"""
Batch embedding for pipeline processing
"""
embeddings = model.encode(
texts,
batch_size=32,
convert_to_numpy=True,
show_progress_bar=True
)
return {
"count": len(texts),
"embeddings": embeddings.tolist()
}
@app.get("/health")
async def health():
return {
"status": "healthy",
"service": "embeddings",
"model": model.get_config_dict()["name"]
}

Why Separate?

  • Resource isolation: Embedding models use GPU/CPU differently
  • Independent scaling: Query volume ≠ document ingestion volume
  • Model flexibility: Swap models without touching orchestrator
  • Batch optimization: Different optimizations for batch vs. single

Service 4: Vector Database (pgvector)

Purpose

Store and search vector embeddings with metadata.

Tech Stack

  • Database: PostgreSQL 16
  • Extension: pgvector
  • Port: 5432

Schema

-- vector_db/init.sql
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Documents table
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title TEXT NOT NULL,
content TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
-- Entity and metadata (JSONB for flexibility)
entities JSONB DEFAULT '[]'::jsonb,
metadata JSONB DEFAULT '{}'::jsonb,
-- Vector embedding
embedding VECTOR(384), -- Dimension depends on model
-- Timestamps
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Indexes for performance
CREATE INDEX idx_documents_entities ON documents USING GIN(entities);
CREATE INDEX idx_documents_metadata ON documents USING GIN(metadata);
-- Vector similarity index (HNSW is faster than IVFFlat for most use cases)
CREATE INDEX idx_documents_embedding ON documents
USING hnsw (embedding vector_cosine_ops);
-- Metadata filters (common queries)
CREATE INDEX idx_documents_day ON documents ((metadata->>'day'));
CREATE INDEX idx_documents_entity_type ON documents ((metadata->>'entity_type'));

Example Queries

orchestrator/src/clients/vector_client.py
import asyncpg
from typing import List
class VectorClient:
def __init__(self):
self.pool = None
async def connect(self):
self.pool = await asyncpg.create_pool(
host="vector_db",
port=5432,
database="ragdb",
user="raguser",
password="ragpass"
)
async def search(
self,
embedding: List[float],
limit: int = 20,
metadata_filter: dict | None = None
) -> List[dict]:
"""
Vector similarity search with optional metadata filtering
"""
query = """
SELECT
id,
title,
content,
entities,
metadata,
1 - (embedding <=> $1) as similarity
FROM documents
"""
# Add metadata filter if provided
if metadata_filter:
if "day" in metadata_filter:
query += " WHERE (metadata->>'day')::int > $3"
query += " ORDER BY embedding <=> $1 LIMIT $2"
async with self.pool.acquire() as conn:
if metadata_filter and "day" in metadata_filter:
rows = await conn.fetch(
query,
embedding,
limit,
metadata_filter["day"]
)
else:
rows = await conn.fetch(query, embedding, limit)
return [dict(row) for row in rows]

Why pgvector?

  • PostgreSQL integration: Leverage existing expertise
  • ACID compliance: Data integrity guarantees
  • Hybrid search: Vector + SQL in one query
  • Mature tooling: Backups, replication, monitoring
  • Cost-effective: No specialized vector DB pricing

Alternative specialized vector DBs (Pinecone, Weaviate, Qdrant) are great but add operational complexity. Start with pgvector.


Service 5: Ontology Service

Purpose

Manage knowledge graph and perform graph traversals.

Responsibilities

  • Load ontology from triples
  • Entity expansion (find neighbors)
  • Multi-hop graph traversal
  • Relationship queries

Tech Stack

  • Framework: FastAPI
  • Language: Python 3.12
  • Port: 8004
  • Graph: NetworkX (demo) or Neo4j (production)

API Implementation (NetworkX)

ontology_service/src/main.py
from fastapi import FastAPI
from pydantic import BaseModel
import networkx as nx
from typing import List
app = FastAPI()
# Load graph at startup
G = nx.DiGraph()
@app.on_event("startup")
async def load_graph():
"""Load graph from CSV triples"""
# Format: subject,relation,object
with open("src/graph/triples.csv") as f:
for line in f:
subj, rel, obj = line.strip().split(",")
G.add_edge(subj, obj, relation=rel)
class ExpandRequest(BaseModel):
entities: List[str]
max_hops: int = 2
relations: List[str] | None = None
@app.post("/ontology/expand")
async def expand(request: ExpandRequest):
"""
Expand entities to their graph neighbors
"""
expanded = set(request.entities)
for entity in request.entities:
if entity not in G:
continue
# Get neighbors within max_hops
for _ in range(request.max_hops):
neighbors = list(G.neighbors(entity))
# Filter by relation type if specified
if request.relations:
neighbors = [
n for n in neighbors
if G[entity][n].get("relation") in request.relations
]
expanded.update(neighbors)
return {
"original": request.entities,
"expanded": list(expanded),
"count": len(expanded)
}
@app.post("/ontology/multihop")
async def multihop(start: str, end: str, max_depth: int = 3):
"""
Find paths between two entities
"""
try:
paths = list(nx.all_simple_paths(
G, start, end, cutoff=max_depth
))
return {"paths": paths, "count": len(paths)}
except nx.NodeNotFound:
return {"paths": [], "count": 0}
@app.get("/health")
async def health():
return {
"status": "healthy",
"service": "ontology",
"nodes": G.number_of_nodes(),
"edges": G.number_of_edges()
}

Production: Neo4j Implementation

# ontology_service/src/main.py (Neo4j version)
from neo4j import AsyncGraphDatabase
class OntologyService:
def __init__(self):
self.driver = AsyncGraphDatabase.driver(
"bolt://neo4j:7687",
auth=("neo4j", "password")
)
async def expand(self, entities: List[str], max_hops: int = 2):
query = """
MATCH (start)
WHERE start.name IN $entities
CALL apoc.path.subgraphNodes(start, {
maxLevel: $max_hops
})
YIELD node
RETURN DISTINCT node.name as name
"""
async with self.driver.session() as session:
result = await session.run(
query,
entities=entities,
max_hops=max_hops
)
records = await result.data()
return [r["name"] for r in records]

Service 6: LLM Proxy

Purpose

Abstract LLM provider and provide unified interface.

Responsibilities

  • Call external LLM APIs (OpenAI, Anthropic, Bedrock)
  • Retry logic and error handling
  • Token usage tracking
  • Response caching (optional)

Tech Stack

  • Framework: FastAPI
  • Language: Python 3.12
  • Port: 8005

Implementation

llm_proxy/src/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import openai
import os
app = FastAPI()
openai.api_key = os.getenv("OPENAI_API_KEY")
class GenerateRequest(BaseModel):
prompt: str
max_tokens: int = 1000
temperature: float = 0.7
class GenerateResponse(BaseModel):
answer: str
model: str
tokens_used: int
finish_reason: str
@app.post("/llm/generate", response_model=GenerateResponse)
async def generate(request: GenerateRequest):
"""
Generate completion from LLM
"""
try:
response = await openai.ChatCompletion.acreate(
model="gpt-4",
messages=[
{"role": "user", "content": request.prompt}
],
max_tokens=request.max_tokens,
temperature=request.temperature
)
return GenerateResponse(
answer=response.choices[0].message.content,
model=response.model,
tokens_used=response.usage.total_tokens,
finish_reason=response.choices[0].finish_reason
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"LLM generation failed: {str(e)}"
)
@app.get("/health")
async def health():
return {"status": "healthy", "service": "llm_proxy"}

Why a Proxy?

  • Abstraction: Swap OpenAI → Anthropic → Bedrock without changing orchestrator
  • Security: API keys managed in one place
  • Observability: Track all LLM calls, costs, latency
  • Resilience: Retry logic, fallbacks, rate limiting

Service 7: Pipeline Service

Purpose

Ingest raw data and transform it into vector/graph representations.

Responsibilities

  • Ingest documents
  • Clean and normalize text
  • Chunk into segments
  • Extract entities and relationships
  • Generate embeddings (via embedding service)
  • Load into vector DB and graph DB

Tech Stack

  • Framework: FastAPI + CLI (Typer)
  • Language: Python 3.12
  • Port: 8006

Example Pipeline

pipeline/src/main.py
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.post("/pipeline/run")
async def run_pipeline(source: str, domain: str = "default"):
"""
Run full ingestion pipeline
"""
steps = [
ingest_documents(source),
clean_and_chunk(),
extract_entities(),
build_graph_triples(),
generate_embeddings(),
load_to_databases()
]
results = {"steps": []}
for step in steps:
result = await step
results["steps"].append(result)
return results
async def ingest_documents(source: str):
"""Load raw documents"""
# Implementation here
return {"step": "ingest", "status": "success"}
async def extract_entities():
"""Extract entities using NER"""
# Use spaCy or similar
return {"step": "entities", "status": "success"}

Service-to-Service Communication

Communication Pattern

# All services communicate via HTTP REST
# Using asyncio for non-blocking calls
# Example: Orchestrator calling multiple services
async def orchestrate(query: str):
async with httpx.AsyncClient() as client:
# Parallel calls where possible
embed_task = client.post(
"http://embeddings:8003/embed/query",
json={"text": query}
)
ontology_task = client.post(
"http://ontology_service:8004/ontology/expand",
json={"entities": extracted_entities}
)
# Await both
embed_response, ontology_response = await asyncio.gather(
embed_task,
ontology_task
)
# Sequential call (depends on above)
vector_response = await client.post(
"http://vector_db:5432/search", # via client library
json={
"embedding": embed_response.json()["embeddings"][0],
"expanded_entities": ontology_response.json()["expanded"]
}
)

Docker Networking

docker-compose.yml
services:
orchestrator:
# Can reach other services by name
# http://embeddings:8003
# http://ontology_service:8004
depends_on:
- embeddings
- ontology_service
- vector_db

Key Design Decisions & Tradeoffs

1. REST vs gRPC?

Choice: REST

Why:

  • Simpler debugging (curl, browser)
  • Universal support
  • Great tooling (OpenAPI, Swagger)
  • Good enough performance for our use case

When to use gRPC:

  • If latency becomes critical (<10ms service calls needed)
  • If type safety at network boundary is required
  • If streaming is needed

2. Sync vs Async?

Choice: Async (asyncio + FastAPI)

Why:

  • Orchestrator makes many I/O calls (DB, HTTP)
  • Don’t waste threads waiting for responses
  • FastAPI makes async easy

3. Message Queue (Kafka, RabbitMQ)?

Choice: Not yet (direct HTTP)

Why:

  • Adds complexity
  • Not needed for request/response pattern
  • Would use for: async pipelines, event-driven updates

When to add:

  • Pipeline processing becomes very large
  • Need pub/sub for cache invalidation
  • Event sourcing becomes important

What’s Coming in Part 4

We’ve designed a clean microservices architecture. But how do you actually run this in production?

In Part 4, we’ll cover:

  • Local development setup (Docker Compose)
  • Production deployment on AWS (ECS and EKS)
  • Testing strategy (unit, integration, E2E)
  • Observability and debugging
  • Performance optimization
  • Extending to new domains

Key Takeaways

  1. Microservices enable independent scaling and development velocity
  2. 7 core services each with clear responsibilities
  3. pgvector + Neo4j + FastAPI provide a solid tech foundation
  4. REST is sufficient for most RAG systems; optimize later if needed
  5. Design for testability: Clear service boundaries = easy mocking

Lorem ipsum dolor sit amet, consectetur adipiscing elit.
00K00KMIT

Continue the Series

Part 2: Multi-Hop Retrieval ← Part 3 (you are here) → Part 4: From Development to Production

Coming Up Next: Deploy Ragforge locally and to production, implement comprehensive testing, and add observability.