Building a Confluence RAG Knowledge Base: Auto-Sync, Freshness Checks & AI-Powered Q&A
Ever asked your team a question about some internal process only to get "yeah it's somewhere in Confluence" as the answer? You know it's documented, but finding the right page, reading through three pages of context, and piecing together the full picture from scattered spaces is a full-time job on its own.
Today we're solving that problem completely. We'll build a RAG (Retrieval-Augmented Generation) system that:
- ✅ Auto-syncs all your Confluence spaces every 24 hours
- ✅ Detects changed pages using MD5 checksums (only re-embeds what actually changed)
- ✅ Stores vector embeddings in PostgreSQL with
pgvector - ✅ Answers questions using OpenAI + LangChain with source citations
- ✅ Ships as a complete Docker stack you can run in minutes
I've used this exact architecture to build a knowledge base for a large engineering team's QA documentation — it went from "30 minutes searching Confluence" to "15-second AI answer with sources".
TL;DR: What We're Building
- Confluence client to fetch pages via the REST API
- Document processor to strip HTML, chunk text, and compute checksums
- Embeddings service using OpenAI
text-embedding-3-small - PostgreSQL + pgvector for similarity search
- FastAPI RAG API to accept questions and return answers with sources
- n8n workflow for scheduled 24-hour sync
- Docker Compose to tie it all together
Let's build! 🚀
Architecture Overview
Before we write code, here's how all the pieces connect:
┌─────────────────────────────────────────────────────────┐
│ n8n Scheduler │
│ (triggers every 24 hours) │
└──────────────────────────┬──────────────────────────────┘
│ POST /api/sync
▼
┌─────────────────────────────────────────────────────────┐
│ FastAPI Sync Endpoint │
│ 1. Fetch pages from Confluence REST API │
│ 2. Compute MD5 checksum of each page │
│ 3. Skip pages where checksum hasn't changed │
│ 4. Chunk updated pages into ~500-token segments │
│ 5. Generate OpenAI embeddings for each chunk │
│ 6. Upsert into PostgreSQL (pgvector) │
└──────────────────────────┬──────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ PostgreSQL + pgvector │
│ pages: id, title, checksum, last_synced_at │
│ chunks: id, page_id, content, embedding vector(1536) │
└──────────────────────────┬──────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ FastAPI Query Endpoint │
│ POST /api/query │
│ 1. Embed user question with OpenAI │
│ 2. Cosine similarity search (top-k chunks) │
│ 3. Build prompt with retrieved context │
│ 4. Call GPT-4o-mini to generate answer │
│ 5. Return answer + source page references │
└─────────────────────────────────────────────────────────┘Clean separation of concerns — sync is decoupled from query, and the vector store is just a PostgreSQL table you already know how to operate.
Tech Stack & Project Structure
confluence-rag/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app entry point
│ ├── confluence_client.py # Confluence REST API wrapper
│ ├── document_processor.py # HTML stripping & text chunking
│ ├── embeddings_service.py # OpenAI embedding + pgvector ops
│ └── rag_service.py # LangChain RAG query logic
├── sql/
│ └── schema.sql # PostgreSQL + pgvector schema
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── .env.templaterequirements.txt
fastapi==0.115.0
uvicorn[standard]==0.30.6
psycopg2-binary==2.9.9
pgvector==0.3.2
openai==1.45.0
langchain==0.3.0
langchain-openai==0.2.0
langchain-community==0.3.0
beautifulsoup4==4.12.3
requests==2.32.3
python-dotenv==1.0.1
pydantic-settings==2.5.2
tiktoken==0.7.0
tenacity==9.0.0Install everything:
pip install -r requirements.txtDatabase Schema
We use PostgreSQL with the pgvector extension for storing and querying vector embeddings. OpenAI's text-embedding-3-small model produces 1536-dimension vectors.
sql/schema.sql
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Tracks each synced Confluence page
CREATE TABLE IF NOT EXISTS confluence_pages (
id TEXT PRIMARY KEY, -- Confluence page ID
space_key TEXT NOT NULL,
title TEXT NOT NULL,
url TEXT NOT NULL,
checksum TEXT NOT NULL, -- MD5 of raw content
last_synced_at TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_pages_space ON confluence_pages(space_key);
CREATE INDEX IF NOT EXISTS idx_pages_checksum ON confluence_pages(checksum);
-- Stores text chunks with their vector embeddings
CREATE TABLE IF NOT EXISTS document_chunks (
id BIGSERIAL PRIMARY KEY,
page_id TEXT NOT NULL REFERENCES confluence_pages(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
token_count INTEGER NOT NULL,
embedding vector(1536), -- OpenAI text-embedding-3-small
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (page_id, chunk_index)
);
-- IVFFlat index for fast approximate nearest-neighbour search
-- lists = sqrt(number of rows) is a good starting point
CREATE INDEX IF NOT EXISTS idx_chunks_embedding
ON document_chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- Analytics: track what people ask
CREATE TABLE IF NOT EXISTS query_log (
id BIGSERIAL PRIMARY KEY,
question TEXT NOT NULL,
answer TEXT,
sources JSONB,
latency_ms INTEGER,
created_at TIMESTAMPTZ DEFAULT NOW()
);Confluence Client
The ConfluenceClient handles all communication with the Confluence REST API, including automatic pagination and a helper to extract the clean HTML body.
app/confluence_client.py
import hashlib
import os
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class ConfluenceClient:
"""Thin wrapper around the Confluence Cloud REST API v2."""
def __init__(self) -> None:
self.base_url = os.environ["CONFLUENCE_BASE_URL"].rstrip("/")
self.email = os.environ["CONFLUENCE_EMAIL"]
self.api_token = os.environ["CONFLUENCE_API_TOKEN"]
self._session = self._build_session()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _build_session(self) -> requests.Session:
session = requests.Session()
session.auth = (self.email, self.api_token)
session.headers.update({"Accept": "application/json"})
retry = Retry(
total=4,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _get(self, path: str, **params) -> dict:
url = f"{self.base_url}/wiki/rest/api{path}"
resp = self._session.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def get_space_pages(self, space_key: str) -> list[dict]:
"""Return every page in a Confluence space (handles pagination)."""
pages: list[dict] = []
start = 0
limit = 50
while True:
data = self._get(
"/content",
spaceKey=space_key,
type="page",
status="current",
expand="body.storage,version",
start=start,
limit=limit,
)
pages.extend(data["results"])
if len(data["results"]) < limit:
break
start += limit
time.sleep(0.2) # be kind to the API
return pages
def get_page(self, page_id: str) -> dict:
"""Fetch a single page with full body and metadata."""
return self._get(
f"/content/{page_id}",
expand="body.storage,version,space,ancestors",
)
def page_url(self, page: dict) -> str:
"""Build the browser URL for a page dict returned by the API."""
base = self.base_url
space = page.get("space", {}).get("key", "")
title_encoded = page["title"].replace(" ", "+")
return f"{base}/wiki/spaces/{space}/pages/{page['id']}/{title_encoded}"
@staticmethod
def content_checksum(html_body: str) -> str:
"""MD5 of raw HTML — used to detect whether a page has changed."""
return hashlib.md5(html_body.encode("utf-8")).hexdigest()Document Processor
Raw Confluence content is HTML — we strip tags with BeautifulSoup, then split into overlapping chunks so no context is lost at chunk boundaries.
app/document_processor.py
import re
from dataclasses import dataclass
import tiktoken
from bs4 import BeautifulSoup
CHUNK_SIZE_TOKENS = 400 # target tokens per chunk
CHUNK_OVERLAP_TOKENS = 80 # overlap to preserve sentence context
@dataclass
class DocumentChunk:
index: int
content: str
token_count: int
class DocumentProcessor:
def __init__(self) -> None:
# cl100k_base is the tokenizer for text-embedding-3-* models
self._encoder = tiktoken.get_encoding("cl100k_base")
# ------------------------------------------------------------------
# HTML → plain text
# ------------------------------------------------------------------
def html_to_text(self, html: str) -> str:
"""Strip all HTML tags and normalise whitespace."""
soup = BeautifulSoup(html, "html.parser")
# Remove script / style noise
for tag in soup(["script", "style", "head"]):
tag.decompose()
text = soup.get_text(separator="\n")
# Collapse blank lines
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
# ------------------------------------------------------------------
# Chunking
# ------------------------------------------------------------------
def _token_count(self, text: str) -> int:
return len(self._encoder.encode(text))
def chunk(self, text: str) -> list[DocumentChunk]:
"""Split text into overlapping fixed-token chunks."""
tokens = self._encoder.encode(text)
chunks: list[DocumentChunk] = []
start = 0
idx = 0
while start < len(tokens):
end = min(start + CHUNK_SIZE_TOKENS, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = self._encoder.decode(chunk_tokens)
chunks.append(
DocumentChunk(
index=idx,
content=chunk_text.strip(),
token_count=len(chunk_tokens),
)
)
if end == len(tokens):
break
start += CHUNK_SIZE_TOKENS - CHUNK_OVERLAP_TOKENS
idx += 1
return chunksEmbeddings Service
The EmbeddingsService wraps all vector database operations — storing chunks, upserting pages, and running similarity searches — behind a clean interface.
app/embeddings_service.py
import os
from typing import Optional
import psycopg2
import psycopg2.extras
from openai import OpenAI
from pgvector.psycopg2 import register_vector
from .document_processor import DocumentChunk
EMBED_MODEL = "text-embedding-3-small"
TOP_K = 6 # chunks to retrieve per query
class EmbeddingsService:
def __init__(self) -> None:
self._client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
self._conn = self._connect()
# ------------------------------------------------------------------
# Database helpers
# ------------------------------------------------------------------
def _connect(self) -> psycopg2.extensions.connection:
conn = psycopg2.connect(os.environ["DATABASE_URL"])
conn.autocommit = False
register_vector(conn)
return conn
def _cursor(self):
return self._conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# ------------------------------------------------------------------
# Embedding generation
# ------------------------------------------------------------------
def embed(self, text: str) -> list[float]:
"""Return an embedding vector for a piece of text."""
response = self._client.embeddings.create(
model=EMBED_MODEL,
input=text,
)
return response.data[0].embedding
def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Batch embed up to 2048 texts in a single API call."""
response = self._client.embeddings.create(
model=EMBED_MODEL,
input=texts,
)
# API returns items sorted by index
return [item.embedding for item in sorted(response.data, key=lambda x: x.index)]
# ------------------------------------------------------------------
# Page & chunk persistence
# ------------------------------------------------------------------
def upsert_page(
self,
page_id: str,
space_key: str,
title: str,
url: str,
checksum: str,
) -> None:
with self._cursor() as cur:
cur.execute(
"""
INSERT INTO confluence_pages (id, space_key, title, url, checksum, last_synced_at)
VALUES (%s, %s, %s, %s, %s, NOW())
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
url = EXCLUDED.url,
checksum = EXCLUDED.checksum,
last_synced_at = NOW()
""",
(page_id, space_key, title, url, checksum),
)
self._conn.commit()
def delete_chunks(self, page_id: str) -> None:
with self._cursor() as cur:
cur.execute("DELETE FROM document_chunks WHERE page_id = %s", (page_id,))
self._conn.commit()
def store_chunks(self, page_id: str, chunks: list[DocumentChunk], embeddings: list[list[float]]) -> None:
rows = [
(page_id, chunk.index, chunk.content, chunk.token_count, embedding)
for chunk, embedding in zip(chunks, embeddings)
]
with self._cursor() as cur:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO document_chunks (page_id, chunk_index, content, token_count, embedding)
VALUES %s
ON CONFLICT (page_id, chunk_index) DO UPDATE SET
content = EXCLUDED.content,
token_count = EXCLUDED.token_count,
embedding = EXCLUDED.embedding
""",
rows,
template="(%s, %s, %s, %s, %s::vector)",
)
self._conn.commit()
# ------------------------------------------------------------------
# Similarity search
# ------------------------------------------------------------------
def get_page_checksum(self, page_id: str) -> Optional[str]:
with self._cursor() as cur:
cur.execute("SELECT checksum FROM confluence_pages WHERE id = %s", (page_id,))
row = cur.fetchone()
return row["checksum"] if row else None
def similarity_search(self, query_embedding: list[float]) -> list[dict]:
"""Return the top-k most similar chunks with page metadata."""
with self._cursor() as cur:
cur.execute(
"""
SELECT
dc.content,
dc.chunk_index,
cp.title,
cp.url,
cp.space_key,
1 - (dc.embedding <=> %s::vector) AS similarity
FROM document_chunks dc
JOIN confluence_pages cp ON cp.id = dc.page_id
ORDER BY dc.embedding <=> %s::vector
LIMIT %s
""",
(query_embedding, query_embedding, TOP_K),
)
return [dict(row) for row in cur.fetchall()]RAG Service
The RagService wires together LangChain, the vector store, and GPT-4o-mini to produce answers with source citations.
app/rag_service.py
import os
from langchain.schema import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from .embeddings_service import EmbeddingsService
SYSTEM_PROMPT = """You are a knowledgeable assistant for an engineering team.
You answer questions strictly based on the Confluence documentation provided as context.
Rules:
- If the context contains the answer, provide a clear, structured response.
- If the context does not contain enough information, say so honestly.
- Always cite the Confluence page title(s) you referenced.
- Keep answers concise and actionable.
- Use bullet points or numbered lists where appropriate."""
class RagService:
def __init__(self) -> None:
self._embeddings = EmbeddingsService()
self._llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.1,
openai_api_key=os.environ["OPENAI_API_KEY"],
)
def query(self, question: str) -> dict:
"""
1. Embed the question.
2. Retrieve the top-k similar chunks.
3. Build a context prompt.
4. Ask GPT-4o-mini and return the answer + sources.
"""
# Step 1 — embed question
query_embedding = self._embeddings.embed(question)
# Step 2 — retrieve context
results = self._embeddings.similarity_search(query_embedding)
if not results:
return {
"answer": "I couldn't find any relevant information in the knowledge base.",
"sources": [],
}
# Step 3 — build context block
context_parts = []
seen_pages: dict[str, str] = {} # title → url
for r in results:
context_parts.append(
f"[Source: {r['title']}]\n{r['content']}"
)
seen_pages[r["title"]] = r["url"]
context = "\n\n---\n\n".join(context_parts)
# Step 4 — call LLM
messages = [
SystemMessage(content=SYSTEM_PROMPT),
HumanMessage(
content=f"Context from Confluence:\n\n{context}\n\n---\n\nQuestion: {question}"
),
]
response = self._llm.invoke(messages)
sources = [{"title": title, "url": url} for title, url in seen_pages.items()]
return {
"answer": response.content,
"sources": sources,
"chunks_used": len(results),
}FastAPI Application
The main app exposes three endpoints: /api/sync to trigger a knowledge base refresh, /api/query to ask questions, and /api/health for liveness checks.
app/main.py
import os
import time
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .confluence_client import ConfluenceClient
from .document_processor import DocumentProcessor
from .embeddings_service import EmbeddingsService
from .rag_service import RagService
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Initialise shared services once at startup
# ---------------------------------------------------------------------------
_embeddings: EmbeddingsService
_rag: RagService
@asynccontextmanager
async def lifespan(app: FastAPI):
global _embeddings, _rag
_embeddings = EmbeddingsService()
_rag = RagService()
yield
app = FastAPI(title="Confluence RAG API", version="1.0.0", lifespan=lifespan)
# ---------------------------------------------------------------------------
# Request / response models
# ---------------------------------------------------------------------------
class SyncRequest(BaseModel):
space_keys: list[str]
force: bool = False # if True, re-embed even unchanged pages
class QueryRequest(BaseModel):
question: str
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/api/health")
def health():
return {"status": "ok"}
@app.post("/api/sync")
def sync(req: SyncRequest):
"""
Pull pages from one or more Confluence spaces, check freshness,
and update the vector store for changed pages only.
"""
confluence = ConfluenceClient()
processor = DocumentProcessor()
stats = {"synced": 0, "skipped": 0, "failed": 0}
for space_key in req.space_keys:
logger.info("Syncing space: %s", space_key)
try:
pages = confluence.get_space_pages(space_key)
except Exception as exc:
logger.error("Failed to fetch space %s: %s", space_key, exc)
stats["failed"] += 1
continue
for page in pages:
page_id = page["id"]
html_body = page.get("body", {}).get("storage", {}).get("value", "")
checksum = confluence.content_checksum(html_body)
# Skip if the page hasn't changed (unless force=True)
if not req.force:
existing_checksum = _embeddings.get_page_checksum(page_id)
if existing_checksum == checksum:
stats["skipped"] += 1
continue
try:
plain_text = processor.html_to_text(html_body)
if not plain_text.strip():
continue
chunks = processor.chunk(plain_text)
if not chunks:
continue
embeddings = _embeddings.embed_batch([c.content for c in chunks])
_embeddings.upsert_page(
page_id=page_id,
space_key=space_key,
title=page["title"],
url=confluence.page_url(page),
checksum=checksum,
)
_embeddings.delete_chunks(page_id)
_embeddings.store_chunks(page_id, chunks, embeddings)
logger.info("Indexed page: %s (%d chunks)", page["title"], len(chunks))
stats["synced"] += 1
except Exception as exc:
logger.error("Failed to index page %s: %s", page_id, exc)
stats["failed"] += 1
return {"status": "complete", **stats}
@app.post("/api/query")
def query(req: QueryRequest):
"""Answer a question using the Confluence knowledge base."""
if not req.question.strip():
raise HTTPException(status_code=400, detail="Question must not be empty")
start = time.perf_counter()
result = _rag.query(req.question)
latency_ms = int((time.perf_counter() - start) * 1000)
return {**result, "latency_ms": latency_ms}Environment Configuration
.env.template
# Confluence
CONFLUENCE_BASE_URL=https://yourorg.atlassian.net
CONFLUENCE_EMAIL=you@yourcompany.com
CONFLUENCE_API_TOKEN=your_confluence_api_token
# OpenAI
OPENAI_API_KEY=sk-...
# PostgreSQL (matches docker-compose service)
DATABASE_URL=postgresql://raguser:ragpassword@db:5432/ragdbGetting a Confluence API token: Log into id.atlassian.com, click Create API token, give it a label, and copy it. That's your
CONFLUENCE_API_TOKEN.
Docker Compose Stack
One command to run the full stack: PostgreSQL with pgvector, the RAG API, and n8n for scheduling.
docker-compose.yml
version: "3.9"
services:
db:
image: pgvector/pgvector:pg16
restart: unless-stopped
environment:
POSTGRES_USER: raguser
POSTGRES_PASSWORD: ragpassword
POSTGRES_DB: ragdb
volumes:
- pgdata:/var/lib/postgresql/data
- ./sql/schema.sql:/docker-entrypoint-initdb.d/01-schema.sql
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U raguser -d ragdb"]
interval: 10s
timeout: 5s
retries: 5
api:
build: .
restart: unless-stopped
env_file: .env
environment:
DATABASE_URL: postgresql://raguser:ragpassword@db:5432/ragdb
ports:
- "8080:8080"
depends_on:
db:
condition: service_healthy
command: uvicorn app.main:app --host 0.0.0.0 --port 8080
n8n:
image: n8nio/n8n:latest
restart: unless-stopped
environment:
N8N_BASIC_AUTH_ACTIVE: "true"
N8N_BASIC_AUTH_USER: admin
N8N_BASIC_AUTH_PASSWORD: changeme
WEBHOOK_URL: http://localhost:5678
ports:
- "5678:5678"
volumes:
- n8ndata:/home/node/.n8n
depends_on:
- api
volumes:
pgdata:
n8ndata:Dockerfile
FROM python:3.11-slim
WORKDIR /app
# System deps for psycopg2
RUN apt-get update && apt-get install -y libpq-dev gcc && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]n8n Sync Workflow
Import this JSON into n8n (Workflows → Import from JSON) to get a fully configured daily sync trigger.
{
"name": "Confluence RAG Sync",
"nodes": [
{
"parameters": {
"rule": {
"interval": [
{
"field": "hours",
"hoursInterval": 24
}
]
}
},
"name": "Daily Schedule",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"url": "http://api:8080/api/sync",
"options": {},
"sendBody": true,
"bodyParameters": {
"parameters": [
{
"name": "space_keys",
"value": "={{ [\"ENG\", \"QA\", \"OPS\"] }}"
},
{
"name": "force",
"value": false
}
]
}
},
"name": "Sync Knowledge Base",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 3,
"position": [500, 300]
},
{
"parameters": {
"conditions": {
"number": [
{
"value1": "={{ $json.failed }}",
"operation": "larger",
"value2": 0
}
]
}
},
"name": "Any failures?",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [750, 300]
},
{
"parameters": {
"channel": "#ops-alerts",
"text": "⚠️ Confluence RAG sync finished with {{ $json.failed }} failed page(s). Synced: {{ $json.synced }}, Skipped: {{ $json.skipped }}",
"authentication": "oAuth2"
},
"name": "Slack Alert",
"type": "n8n-nodes-base.slack",
"typeVersion": 1,
"position": [1000, 200]
}
],
"connections": {
"Daily Schedule": {
"main": [[{ "node": "Sync Knowledge Base", "type": "main", "index": 0 }]]
},
"Sync Knowledge Base": {
"main": [[{ "node": "Any failures?", "type": "main", "index": 0 }]]
},
"Any failures?": {
"main": [[{ "node": "Slack Alert", "type": "main", "index": 0 }], []]
}
}
}Replace "ENG", "QA", "OPS" with your actual Confluence space keys, and configure your Slack credentials in n8n.
Quick Start: Running It All
# 1. Clone and configure
cp .env.template .env
# Fill in CONFLUENCE_BASE_URL, CONFLUENCE_EMAIL, CONFLUENCE_API_TOKEN, OPENAI_API_KEY
# 2. Start the full stack
docker compose up -d
# 3. Trigger an initial sync (replace space keys with yours)
curl -s -X POST http://localhost:8080/api/sync \
-H "Content-Type: application/json" \
-d '{"space_keys": ["ENG"], "force": true}' | python -m json.tool
# Expected output:
# {
# "status": "complete",
# "synced": 47,
# "skipped": 0,
# "failed": 0
# }
# 4. Ask a question!
curl -s -X POST http://localhost:8080/api/query \
-H "Content-Type: application/json" \
-d '{"question": "How do I set up a new test environment?"}' | python -m json.toolTesting & Sample Responses
Health Check
curl http://localhost:8080/api/health
# {"status": "ok"}Sync Response
{
"status": "complete",
"synced": 12,
"skipped": 35,
"failed": 0
}skipped pages are ones whose MD5 checksum matched what was already in the database — we didn't waste API calls or embedding credits on them. ✅
Query Response
{
"answer": "To set up a new test environment, follow these steps:\n\n1. **Provision the CIF** — use the MCP automation script at `/tools/provision-cif.sh` with the target environment flag.\n2. **Assign PID** — run the PID assignment workflow documented on the 'PID Management' page.\n3. **Verify MCP connectivity** — check the digital MCP setup checklist before running regression tests.\n\nSources: 'Test Environment Setup Guide', 'PID Management', 'MCP Automation Runbook'",
"sources": [
{
"title": "Test Environment Setup Guide",
"url": "https://yourorg.atlassian.net/wiki/spaces/QA/pages/123456/Test+Environment+Setup+Guide"
},
{
"title": "PID Management",
"url": "https://yourorg.atlassian.net/wiki/spaces/ENG/pages/789012/PID+Management"
},
{
"title": "MCP Automation Runbook",
"url": "https://yourorg.atlassian.net/wiki/spaces/OPS/pages/345678/MCP+Automation+Runbook"
}
],
"chunks_used": 6,
"latency_ms": 1842
}Every answer includes clickable source links — your team can verify the AI's response against the original page in seconds.
Freshness: How the Change Detection Works
The sync endpoint computes an MD5 checksum of each page's raw HTML body and compares it to the stored value in confluence_pages.checksum:
# New checksum from API response
checksum = confluence.content_checksum(html_body)
# Stored checksum from our DB
existing_checksum = _embeddings.get_page_checksum(page_id)
if existing_checksum == checksum:
# Page hasn't changed — skip embedding entirely
stats["skipped"] += 1
continueThis means:
- Zero unnecessary embedding API calls on unchanged pages
- Automatic re-indexing whenever a Confluence author edits a page
- Cost stays low — a 100-page space with 5 changed pages only bills for 5 pages of embeddings
Production Considerations
1. Index Tuning
The ivfflat index works great up to ~1M vectors. For larger knowledge bases, switch to hnsw:
CREATE INDEX idx_chunks_embedding_hnsw
ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);2. Rate Limiting
The OpenAI Embeddings API has a 1M tokens/min limit on tier 1. For large initial syncs, batch pages and add a small sleep between batches:
import time
BATCH_SIZE = 20
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i : i + BATCH_SIZE]
embeddings = _embeddings.embed_batch([c.content for c in batch])
# ... store embeddings ...
time.sleep(0.5) # ~500ms between batches3. Query Logging
Log every query to the query_log table for analytics:
cur.execute(
"INSERT INTO query_log (question, answer, sources, latency_ms) VALUES (%s, %s, %s, %s)",
(question, answer, json.dumps(sources), latency_ms),
)This tells you what your team asks most — great input for improving documentation coverage!
4. Authentication
For production deployments, add API key auth to FastAPI:
from fastapi.security import APIKeyHeader
from fastapi import Security, Depends
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(key: str = Security(api_key_header)):
if key != os.environ["API_KEY"]:
raise HTTPException(status_code=403, detail="Invalid API key")Key Takeaways
- Freshness via checksums — MD5 comparison before embedding is cheap and effective; don't re-index what hasn't changed.
- pgvector is production-ready — it's a PostgreSQL extension, not a separate service, so you operate it with the same tools you already know.
- Chunk overlap matters — 80-token overlap between 400-token chunks prevents context loss at boundaries without dramatically increasing storage.
- Cite your sources — an AI answer without source links is untrustworthy; always return the Confluence URLs alongside the answer.
- n8n makes scheduling painless — trigger sync on a 24-hour schedule with Slack alerting on failures, no custom cron infra needed.
Wrapping Up
We now have a complete, production-ready RAG system that turns your Confluence spaces into an AI-queryable knowledge base. The most satisfying part? The freshness detection — the system is smart enough to only do work when pages actually change, keeping costs low even for large spaces.
From "I know it's somewhere in Confluence" to a 15-second AI answer with clickable source links — give it a try with your own team's documentation!
Drop a comment below if you run into anything, or if you extend this with streaming responses or a chat UI. Happy building! 🚀
Full source code is available in the linked GitHub repository. The Docker Compose setup means you're one docker compose up -d away from a running system.