Pydantic AI
使用 Graph 运行 Agent
谨慎使用
许多 Agent 框架的快速入门非常的酷炫。只要写几个 Agent 的 Role,写几个 Task,然后 Agent 就神奇的跑起来了。与之相比,PydanticAi 好像没这么强大,必须自己去控制整个流程,这就是 Graph。
前一阵子使用 Roo Code 尝试了下数据库的 MCP,效果还可以,但是 Roo Code 是个通用的编程工具,调用 MCP 的过程好像有一点绕,我想是不是可以通过 Pydantic 编写我的运维 Agent 工具。
这个工具能识别我的意图,然后调用合适的 MCP Tool,然后按照我的要求输出结果。其实这个需求,用一个带 MCP Tool 的 Agent 就可以实现。但是,如果是在更大的上下文中,用一个 Agent 可能过于勉强,所以这次我们略微过度工程一下。
下面是根据 PydanticAI 模型画的图,其实是根据 PydanticAI 代码生成的:
stateDiagram-v2
[*] --> Intent
Intent --> SimpleChat
Intent --> DBQuery
SimpleChat --> [*]
DBQuery --> [*]其中的每个节点,都是 Pydantic 的 Node,先看看 SimpleChat:
@dataclass
class SimpleChat(BaseNode):
text: str
async def run(
self,
ctx: GraphRunContext,
) -> End[str]:
agent = Agent(
model=model,
system_prompt='Be concise, reply with one sentence.' # Read system prompt from .env or use default
)
response = await agent.run(self.text)
return End(response.output)这个 Node 很简单,就是简单的回复。
然后是 DBQuery:
@dataclass
class DBQuery(BaseNode):
text: str
async def run(
self,
ctx: GraphRunContext,
) -> End[str]:
server = MCPServerStdio(
'toolbox',
args=[
"--tools-file",
"D:\\learns\\ai\\toolkit-mcp\\mysql.yaml",
"--stdio"
]
)
agent = Agent(
model=model,
system_prompt='Please answer the question based on the provided database information. If you need to use a tool, do so.',
mcp_servers=[server]
)
async with agent.run_mcp_servers():
response = await agent.run(self.text)
return End(response.output)这里展示了 PydanticAI 如何使用 MCP Server 的 Tool。
下面是 Intent:
@dataclass
class IntentChoice(BaseModel):
intent: Literal["SimpleChat", "DBQuery"] = Field(
description="""Determine the user's intent.
- Use 'DBQuery' for questions about data, people, databases, or specific entities that might be in a database.
- Use 'SimpleChat' for general conversation, greetings, or questions that do not require looking up information."""
)
@dataclass
class Intent(BaseNode):
text: str
async def run(
self,
ctx: GraphRunContext,
) -> SimpleChat | DBQuery:
intent_agent = Agent(
model=model,
output_type=IntentChoice,
system_prompt="You are an intent classifier. Analyze the user's text and decide whether it's a simple chat or a database query. The database contains MySQL Database Related Data."
)
print(f"Classifying intent for: '{self.text}'")
response = await intent_agent.run(self.text)
if response.output.intent == "DBQuery":
print("Intent classified as: DBQuery")
return DBQuery(text=self.text)
else:
print("Intent classified as: SimpleChat")
return SimpleChat(text=self.text)
ops_helper_graph = Graph(nodes=[Intent, SimpleChat, DBQuery])Intent Node 会咨询 AI 确认用户输入的目的,如果是简单的查询,则交给 SimpleChat,如果是需要数据库搜索则交给 DBQuery。
完整的代码如下:
from pydantic_ai import Agent
import os
import httpx
from pydantic_ai.models.gemini import GeminiModel # Update import to use GeminiModel
from pydantic_ai.providers.google_gla import GoogleGLAProvider
from dataclasses import dataclass
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
from pydantic_ai.mcp import MCPServerStdio
from pydantic import BaseModel, Field
from typing import Literal
proxy = os.getenv('HTTP_PROXY')
client = httpx.AsyncClient(proxy=proxy) # Create an HTTP client with proxy
model = GeminiModel( # Use GeminiModel instead of OpenAIModel
model_name='gemini-2.0-flash',
provider=GoogleGLAProvider(
http_client=client # Pass the HTTP client with proxy to the provider
)
)
@dataclass
class SimpleChat(BaseNode):
text: str
async def run(
self,
ctx: GraphRunContext,
) -> End[str]:
agent = Agent(
model=model,
system_prompt='Be concise, reply with one sentence.' # Read system prompt from .env or use default
)
response = await agent.run(self.text)
return End(response.output)
@dataclass
class DBQuery(BaseNode):
text: str
async def run(
self,
ctx: GraphRunContext,
) -> End[str]:
server = MCPServerStdio(
'toolbox',
args=[
"--tools-file",
"D:\\learns\\ai\\toolkit-mcp\\mysql.yaml",
"--stdio"
]
)
agent = Agent(
model=model,
system_prompt='Please answer the question based on the provided database information. If you need to use a tool, do so.',
mcp_servers=[server]
)
async with agent.run_mcp_servers():
response = await agent.run(self.text)
return End(response.output)
@dataclass
class IntentChoice(BaseModel):
intent: Literal["SimpleChat", "DBQuery"] = Field(
description="""Determine the user's intent.
- Use 'DBQuery' for questions about data, people, databases, or specific entities that might be in a database.
- Use 'SimpleChat' for general conversation, greetings, or questions that do not require looking up information."""
)
@dataclass
class Intent(BaseNode):
text: str
async def run(
self,
ctx: GraphRunContext,
) -> SimpleChat | DBQuery:
intent_agent = Agent(
model=model,
output_type=IntentChoice,
system_prompt="You are an intent classifier. Analyze the user's text and decide whether it's a simple chat or a database query. The database contains MySQL Database Related Data."
)
print(f"Classifying intent for: '{self.text}'")
response = await intent_agent.run(self.text)
if response.output.intent == "DBQuery":
print("Intent classified as: DBQuery")
return DBQuery(text=self.text)
else:
print("Intent classified as: SimpleChat")
return SimpleChat(text=self.text)
ops_helper_graph = Graph(nodes=[Intent, SimpleChat, DBQuery])
async def main():
print(f"--- Running Test 1: Simple Chat ---")
chat_question = "What is the capital of France?"
result_chat = await ops_helper_graph.run(Intent(text=chat_question))
print(f"Final Answer: {result_chat.output}")
print("-" * 30)
print(f"--- Running Test 2: DB Chat ---")
chat_question = "列出所有的数据库"
result_chat = await ops_helper_graph.run(Intent(text=chat_question))
print(f"Final Answer: {result_chat.output}")
print("-" * 30)
if __name__ == '__main__':
import asyncio
asyncio.run(main())输出类似:
--- Running Test 1: Simple Chat ---
Classifying intent for: 'What is the capital of France?'
Intent classified as: SimpleChat
Final Answer: The capital of France is Paris.
------------------------------
--- Running Test 2: DB Chat ---
Classifying intent for: '列出所有的数据库'
Intent classified as: DBQuery
Final Answer: 这里是所有数据库的列表:mydb、mysql、nacos。
------------------------------通过 Pydantic 的 Graph,我们可以设计更复杂的流程,实现更复杂的业务,一切都可以自己把控。