A Coding Implementation to Build Bulletproof Agentic Workflows with PydanticAI Using Strict Schemas, Tool Injection, and Model-Agnostic Execution

Binance
A Coding Implementation to Build Bulletproof Agentic Workflows with PydanticAI Using Strict Schemas, Tool Injection, and Model-Agnostic Execution
Binance


Thank you for reading this post, don't forget to subscribe!

In this tutorial, we build a production-ready agentic workflow that prioritizes reliability over best-effort generation by enforcing strict, typed outputs at every step. We use PydanticAI to define clear response schemas, wire in tools via dependency injection, and ensure the agent can safely interact with external systems, such as a database, without breaking execution. By running everything in a notebook-friendly, async-first setup, we demonstrate how to move beyond fragile chatbot patterns toward robust agentic systems suitable for real enterprise workflows.

!pip -q install “pydantic-ai-slim[openai]” pydantic

import os, json, sqlite3
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Literal, Optional, List

from pydantic import BaseModel, Field, field_validator
from pydantic_ai import Agent, RunContext, ModelRetry

if not os.environ.get(“OPENAI_API_KEY”):
try:
from google.colab import userdata
os.environ[“OPENAI_API_KEY”] = (userdata.get(“OPENAI_API_KEY”) or “”).strip()
except Exception:
pass

if not os.environ.get(“OPENAI_API_KEY”):
import getpass
os.environ[“OPENAI_API_KEY”] = getpass.getpass(“Paste your OPENAI_API_KEY: “).strip()

assert os.environ.get(“OPENAI_API_KEY”), “OPENAI_API_KEY is required.”

We set up the execution environment and ensure all required libraries are available for the agent to run correctly. We securely load the OpenAI API key in a Colab-friendly way so the tutorial works without manual configuration changes. We also import all core dependencies that will be shared across schemas, tools, and agent logic.

Priority = Literal[“low”, “medium”, “high”, “critical”]
ActionType = Literal[“create_ticket”, “update_ticket”, “query_ticket”, “list_open_tickets”, “no_action”]
Confidence = Literal[“low”, “medium”, “high”]

class TicketDraft(BaseModel):
title: str = Field(…, min_length=8, max_length=120)
customer: str = Field(…, min_length=2, max_length=60)
priority: Priority
category: Literal[“billing”, “bug”, “feature_request”, “security”, “account”, “other”]
description: str = Field(…, min_length=20, max_length=1000)
expected_outcome: str = Field(…, min_length=10, max_length=250)

class AgentDecision(BaseModel):
action: ActionType
reason: str = Field(…, min_length=20, max_length=400)
confidence: Confidence
ticket: Optional[TicketDraft] = None
ticket_id: Optional[int] = None
follow_up_questions: List[str] = Field(default_factory=list, max_length=5)

@field_validator(“follow_up_questions”)
@classmethod
def short_questions(cls, v):
for q in v:
if len(q) > 140:
raise ValueError(“Each follow-up question must be <= 140 characters.”)
return v

We define the strict data models that act as the contract between the agent and the rest of the system. We use typed fields and validation rules to guarantee that every agent response follows a predictable structure. By enforcing these schemas, we prevent malformed outputs from silently propagating through the workflow.

@dataclass
class SupportDeps:
db: sqlite3.Connection
tenant: str
policy: dict

def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()

def init_db() -> sqlite3.Connection:
conn = sqlite3.connect(“:memory:”, check_same_thread=False)
conn.execute(“””
CREATE TABLE tickets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant TEXT NOT NULL,
title TEXT NOT NULL,
customer TEXT NOT NULL,
priority TEXT NOT NULL,
category TEXT NOT NULL,
description TEXT NOT NULL,
expected_outcome TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
“””)
conn.commit()
return conn

def seed_ticket(db: sqlite3.Connection, tenant: str, ticket: TicketDraft, status: str = “open”) -> int:
now = utc_now_iso()
cur = db.execute(
“””
INSERT INTO tickets
(tenant, title, customer, priority, category, description, expected_outcome, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
“””,
(
tenant,
ticket.title,
ticket.customer,
ticket.priority,
ticket.category,
ticket.description,
ticket.expected_outcome,
status,
now,
now,
),
)
db.commit()
return int(cur.lastrowid)

We construct the dependency layer and initialize a lightweight SQLite database for persistence. We model real-world runtime dependencies, such as database connections and tenant policies, and make them injectable into the agent. We also define helper functions that safely insert and manage ticket data during execution.

def build_agent(model_name: str) -> Agent[SupportDeps, AgentDecision]:
agent = Agent(
f”openai:{model_name}”,
output_type=AgentDecision,
output_retries=2,
instructions=(
“You are a production support triage agent.\n”
“Return an output that matches the AgentDecision schema.\n”
“Use tools when you need DB state.\n”
“Never invent ticket IDs.\n”
“If the user intent is unclear, ask concise follow-up questions.\n”
),
)

@agent.tool
def create_ticket(ctx: RunContext[SupportDeps], ticket: TicketDraft) -> int:
deps = ctx.deps
if ticket.priority in (“critical”, “high”) and deps.policy.get(“require_security_phrase_for_critical”, False):
if ticket.category == “security” and “incident” not in ticket.description.lower():
raise ModelRetry(“For security high/critical, include the word ‘incident’ in description and retry.”)
return seed_ticket(deps.db, deps.tenant, ticket, status=”open”)

@agent.tool
def update_ticket_status(
ctx: RunContext[SupportDeps],
ticket_id: int,
status: Literal[“open”, “in_progress”, “resolved”, “closed”],
) -> dict:
deps = ctx.deps
now = utc_now_iso()
cur = deps.db.execute(“SELECT id FROM tickets WHERE tenant=? AND id=?”, (deps.tenant, ticket_id))
if not cur.fetchone():
raise ModelRetry(f”Ticket {ticket_id} not found for this tenant. Ask for the correct ticket_id.”)
deps.db.execute(
“UPDATE tickets SET status=?, updated_at=? WHERE tenant=? AND id=?”,
(status, now, deps.tenant, ticket_id),
)
deps.db.commit()
return {“ticket_id”: ticket_id, “status”: status, “updated_at”: now}

@agent.tool
def query_ticket(ctx: RunContext[SupportDeps], ticket_id: int) -> dict:
deps = ctx.deps
cur = deps.db.execute(
“””
SELECT id, title, customer, priority, category, status, created_at, updated_at
FROM tickets WHERE tenant=? AND id=?
“””,
(deps.tenant, ticket_id),
)
row = cur.fetchone()
if not row:
raise ModelRetry(f”Ticket {ticket_id} not found. Ask the user for a valid ticket_id.”)
keys = [“id”, “title”, “customer”, “priority”, “category”, “status”, “created_at”, “updated_at”]
return dict(zip(keys, row))

@agent.tool
def list_open_tickets(ctx: RunContext[SupportDeps], limit: int = 5) -> list:
deps = ctx.deps
limit = max(1, min(int(limit), 20))
cur = deps.db.execute(
“””
SELECT id, title, priority, category, status, updated_at
FROM tickets
WHERE tenant=? AND status IN (‘open’,’in_progress’)
ORDER BY updated_at DESC
LIMIT ?
“””,
(deps.tenant, limit),
)
rows = cur.fetchall()
return [
{“id”: r[0], “title”: r[1], “priority”: r[2], “category”: r[3], “status”: r[4], “updated_at”: r[5]}
for r in rows
]

@agent.output_validator
def validate_decision(ctx: RunContext[SupportDeps], out: AgentDecision) -> AgentDecision:
deps = ctx.deps
if out.action == “create_ticket” and out.ticket is None:
raise ModelRetry(“You chose create_ticket but did not provide ticket. Provide ticket fields and retry.”)
if out.action in (“update_ticket”, “query_ticket”) and out.ticket_id is None:
raise ModelRetry(“You chose update/query but did not provide ticket_id. Ask for ticket_id and retry.”)
if out.ticket and out.ticket.priority == “critical” and not deps.policy.get(“allow_critical”, True):
raise ModelRetry(“This tenant does not allow ‘critical’. Downgrade to ‘high’ and retry.”)
return out

return agent

It contains the core agent logic for assembling a model-agnostic PydanticAI agent. We register typed tools for creating, querying, updating, and listing tickets, allowing the agent to interact with external state in a controlled way. We also enforce output validation so the agent can self-correct whenever its decisions violate business rules.

db = init_db()
deps = SupportDeps(
db=db,
tenant=”acme_corp”,
policy={“allow_critical”: True, “require_security_phrase_for_critical”: True},
)

seed_ticket(
db,
deps.tenant,
TicketDraft(
title=”Double-charged on invoice 8831″,
customer=”Riya”,
priority=”high”,
category=”billing”,
description=”Customer reports they were billed twice for invoice 8831 and wants a refund and confirmation email.”,
expected_outcome=”Issue a refund and confirm resolution to customer.”,
),
)
seed_ticket(
db,
deps.tenant,
TicketDraft(
title=”App crashes on login after update”,
customer=”Sam”,
priority=”high”,
category=”bug”,
description=”After latest update, the app crashes immediately on login. Reproducible on two devices; needs investigation.”,
expected_outcome=”Provide a fix or workaround and restore successful logins.”,
),
)

agent = build_agent(“gpt-4o-mini”)

async def run_case(prompt: str):
res = await agent.run(prompt, deps=deps)
out = res.output
print(json.dumps(out.model_dump(), indent=2))
return out

case_a = await run_case(
“We suspect account takeover: multiple password reset emails and unauthorized logins. ”
“Customer=Leila. Priority=critical. Open a security ticket.”
)

case_b = await run_case(“List our open tickets and summarize what to tackle first.”)

case_c = await run_case(“What is the status of ticket 1? If it’s open, move it to in_progress.”)

agent_alt = build_agent(“gpt-4o”)
alt_res = await agent_alt.run(
“Create a feature request ticket: customer=Noah wants ‘export to CSV’ in analytics dashboard; priority=medium.”,
deps=deps,
)

print(json.dumps(alt_res.output.model_dump(), indent=2))

We wire everything together by seeding initial data and running the agent asynchronously, in a notebook-safe manner. We execute multiple real-world scenarios to show how the agent reasons, calls tools, and returns schema-valid outputs. We also demonstrate how easily we can swap the underlying model while keeping the same workflows and guarantees intact.

In conclusion, we showed how a type-safe agent can reason, call tools, validate its own outputs, and recover from errors without manual intervention. We kept the logic model-agnostic, allowing us to swap underlying LLMs while preserving the same schemas and tools, which is critical for long-term maintainability. Overall, we demonstrated how combining strict schema enforcement, dependency injection, and async execution closes the reliability gap in agentic AI and provides a solid foundation for building dependable production systems.

Check out the Full Codes Here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.



Source link

Binance