
LangGraph Postgres Checkpoints: Resumable AI Agents
Summary
Persist agent state to Postgres so LangGraph agents survive any crash.
Why Checkpointing Matters
Without persistence, every crash, restart, or container redeploy wipes your agent's working memory mid-task. LangGraph 1.0 ships with a checkpoint system that snapshots graph state after every node, and the Postgres backend turns those snapshots into a durable history you can resume, fork, and time-travel through.
In the next 15 minutes you will go from zero to a production-shaped, Postgres-backed LangGraph agent that survives a kill -9 in the middle of a tool call.
What You Will Build
- A two-node ReAct-style graph (model + tool)
- Each step persisted to Postgres via PostgresSaver
- Resumes mid-graph if the process dies
- Supports time-travel: replay from any historical snapshot
Prerequisites
- Python 3.11+
- Postgres 15+ (local Docker is fine)
- An OpenAI or Anthropic API key in your env
- 15 minutes
Step 1 — Install
pip install langgraph==1.0.* langgraph-checkpoint-postgres==1.0.* langchain-openai psycopg[binary]
Step 2 — Spin Up Postgres
docker run -d --name lg-pg -e POSTGRES_PASSWORD=lgpass -p 5432:5432 postgres:16
export DB_URI="postgresql://postgres:lgpass@localhost:5432/postgres?sslmode=disable"
Any reachable Postgres works — Supabase, Neon, RDS, your laptop. Just keep DB_URI in env.
Step 3 — Define Agent State
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
rate: float | None
The Annotated reducer (add_messages) appends to the list instead of overwriting it. Every other field defaults to last-write-wins.
Step 4 — Build the Graph
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, ToolMessage
llm = ChatOpenAI(model="gpt-5.4-mini")
def lookup_rate(currency: str) -> float:
fake = {"USD": 1.0, "EUR": 1.08, "GBP": 1.26}
return fake.get(currency.upper(), 0.0)
def call_model(state: AgentState):
response = llm.invoke(state["messages"])
return {"messages": [response]}
def call_tool(state: AgentState):
rate = lookup_rate("EUR")
return {"rate": rate, "messages": [ToolMessage(f"EUR={rate}", tool_call_id="r1")]}
graph = StateGraph(AgentState)
graph.add_node("model", call_model)
graph.add_node("tool", call_tool)
graph.set_entry_point("model")
graph.add_edge("model", "tool")
graph.add_edge("tool", END)
Step 5 — Wire in the Postgres Checkpointer
import os
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_core.messages import HumanMessage
DB_URI = os.environ["DB_URI"]
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup() # creates tables on first run
app = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "session-42"}}
out = app.invoke(
{"messages": [HumanMessage("Convert 100 EUR to USD")]},
config=config,
)
print(out["rate"])
Example output:
1.08
Step 6 — Resume After a Crash
Kill the process anywhere mid-run. When you restart with the same thread_id, LangGraph picks up exactly where the last successful node finished — pass None as the input to mean 'resume':
out = app.invoke(None, config={"configurable": {"thread_id": "session-42"}})
Behind the scenes the checkpointer reads the latest checkpoint row for that thread, rehydrates state, and re-enters the graph at the correct node.
Step 7 — Time-Travel (Replay or Fork)
history = list(app.get_state_history(config))
print(f"Snapshots: {len(history)}")
# Rewind to the 3rd snapshot and run a new branch from there
target = history[2].config
forked = app.invoke(
{"messages": [HumanMessage("Now show GBP")]},
config=target,
)
Each invocation creates a new checkpoint chain branched off the chosen point — useful for debugging, A/B tool experiments, and human-in-the-loop edits.
Common Pitfalls
- Forgetting checkpointer.setup() on first run — tables will not exist and you will see relation "checkpoints" does not exist
- Running two processes against the same thread_id concurrently — Postgres row-level locks block one; use a distinct thread_id per session
- Storing huge blobs in state — checkpoints get heavy fast; keep blobs in S3 and store references
- Skipping sslmode in production — managed Postgres providers usually reject non-TLS connections
- Mixing sync and async savers in the same app — pick PostgresSaver or AsyncPostgresSaver and stick with it
Quick Reference
| Component | Purpose |
|---|---|
| StateGraph | Defines nodes, edges, and state schema |
| PostgresSaver | Persists each step to Postgres |
| thread_id | Identifies a session/conversation |
| checkpoint_id | A specific snapshot inside a thread |
| add_messages | Reducer that appends instead of replacing |
| get_state_history | Lists all snapshots for time-travel |
Next Steps
- Swap PostgresSaver for AsyncPostgresSaver and switch to app.ainvoke for high-throughput services
- Add a human-in-the-loop interrupt before the tool node — checkpointer pauses cleanly until you resume
- Use connection pooling (psycopg_pool) instead of from_conn_string for production
- Add monitoring on the checkpoints table size and prune old threads on a cron
Your agent now survives restarts, supports time-travel, and stores every step durably without writing a single SQL statement. Ship it.
Comments
Be the first to comment