Rock Sun
Pydantic AI

使用 Graph 运行 Agent

谨慎使用

许多 Agent 框架的快速入门非常的酷炫。只要写几个 Agent 的 Role,写几个 Task,然后 Agent 就神奇的跑起来了。与之相比,PydanticAi 好像没这么强大,必须自己去控制整个流程,这就是 Graph。

前一阵子使用 Roo Code 尝试了下数据库的 MCP,效果还可以,但是 Roo Code 是个通用的编程工具,调用 MCP 的过程好像有一点绕,我想是不是可以通过 Pydantic 编写我的运维 Agent 工具。

这个工具能识别我的意图,然后调用合适的 MCP Tool,然后按照我的要求输出结果。其实这个需求,用一个带 MCP Tool 的 Agent 就可以实现。但是,如果是在更大的上下文中,用一个 Agent 可能过于勉强,所以这次我们略微过度工程一下。

下面是根据 PydanticAI 模型画的图,其实是根据 PydanticAI 代码生成的:

stateDiagram-v2
  [*] --> Intent
  Intent --> SimpleChat
  Intent --> DBQuery
  SimpleChat --> [*]
  DBQuery --> [*]

其中的每个节点,都是 Pydantic 的 Node,先看看 SimpleChat:

@dataclass
class SimpleChat(BaseNode):
    text: str

    async def run(
        self,
        ctx: GraphRunContext,
    ) -> End[str]:
        agent = Agent(
            model=model,
            system_prompt='Be concise, reply with one sentence.' # Read system prompt from .env or use default
        )
        
        response = await agent.run(self.text)
        return End(response.output)

这个 Node 很简单,就是简单的回复。

然后是 DBQuery

@dataclass
class DBQuery(BaseNode):
    text: str

    async def run(
        self,
        ctx: GraphRunContext,
    ) -> End[str]:
        
        server = MCPServerStdio(  
            'toolbox',
            args=[
                "--tools-file",
                "D:\\learns\\ai\\toolkit-mcp\\mysql.yaml",
                "--stdio"
            ]
        )
        agent = Agent(
            model=model,
            system_prompt='Please answer the question based on the provided database information. If you need to use a tool, do so.', 
            mcp_servers=[server]
        )

        async with agent.run_mcp_servers():        
            response = await agent.run(self.text)
            return End(response.output)

这里展示了 PydanticAI 如何使用 MCP Server 的 Tool。

下面是 Intent

@dataclass
class IntentChoice(BaseModel):
    intent: Literal["SimpleChat", "DBQuery"] = Field(
        description="""Determine the user's intent.
- Use 'DBQuery' for questions about data, people, databases, or specific entities that might be in a database.
- Use 'SimpleChat' for general conversation, greetings, or questions that do not require looking up information."""
    )

@dataclass
class Intent(BaseNode):
    text: str

    async def run(
        self,
        ctx: GraphRunContext,
    ) -> SimpleChat | DBQuery:
        intent_agent = Agent(
            model=model,
            output_type=IntentChoice,
            system_prompt="You are an intent classifier. Analyze the user's text and decide whether it's a simple chat or a database query. The database contains MySQL Database Related Data."
        )
        
        print(f"Classifying intent for: '{self.text}'")
        response = await intent_agent.run(self.text)

        if response.output.intent == "DBQuery":
            print("Intent classified as: DBQuery")
            return DBQuery(text=self.text)
        else:
            print("Intent classified as: SimpleChat")
            return SimpleChat(text=self.text)

ops_helper_graph = Graph(nodes=[Intent, SimpleChat, DBQuery])

Intent Node 会咨询 AI 确认用户输入的目的,如果是简单的查询,则交给 SimpleChat,如果是需要数据库搜索则交给 DBQuery

完整的代码如下:

from pydantic_ai import Agent
import os
import httpx
from pydantic_ai.models.gemini import GeminiModel  # Update import to use GeminiModel
from pydantic_ai.providers.google_gla import GoogleGLAProvider
from dataclasses import dataclass
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
from pydantic_ai.mcp import MCPServerStdio
from pydantic import BaseModel, Field
from typing import Literal

proxy = os.getenv('HTTP_PROXY')

client = httpx.AsyncClient(proxy=proxy)  # Create an HTTP client with proxy
model = GeminiModel(  # Use GeminiModel instead of OpenAIModel
    model_name='gemini-2.0-flash',
    provider=GoogleGLAProvider(
        http_client=client  # Pass the HTTP client with proxy to the provider
    )
)

@dataclass
class SimpleChat(BaseNode):
    text: str

    async def run(
        self,
        ctx: GraphRunContext,
    ) -> End[str]:
        agent = Agent(
            model=model,
            system_prompt='Be concise, reply with one sentence.' # Read system prompt from .env or use default
        )
        
        response = await agent.run(self.text)
        return End(response.output)

@dataclass
class DBQuery(BaseNode):
    text: str

    async def run(
        self,
        ctx: GraphRunContext,
    ) -> End[str]:
        
        server = MCPServerStdio(  
            'toolbox',
            args=[
                "--tools-file",
                "D:\\learns\\ai\\toolkit-mcp\\mysql.yaml",
                "--stdio"
            ]
        )
        agent = Agent(
            model=model,
            system_prompt='Please answer the question based on the provided database information. If you need to use a tool, do so.', 
            mcp_servers=[server]
        )

        async with agent.run_mcp_servers():        
            response = await agent.run(self.text)
            return End(response.output)


@dataclass
class IntentChoice(BaseModel):
    intent: Literal["SimpleChat", "DBQuery"] = Field(
        description="""Determine the user's intent.
- Use 'DBQuery' for questions about data, people, databases, or specific entities that might be in a database.
- Use 'SimpleChat' for general conversation, greetings, or questions that do not require looking up information."""
    )

@dataclass
class Intent(BaseNode):
    text: str

    async def run(
        self,
        ctx: GraphRunContext,
    ) -> SimpleChat | DBQuery:
        intent_agent = Agent(
            model=model,
            output_type=IntentChoice,
            system_prompt="You are an intent classifier. Analyze the user's text and decide whether it's a simple chat or a database query. The database contains MySQL Database Related Data."
        )
        
        print(f"Classifying intent for: '{self.text}'")
        response = await intent_agent.run(self.text)

        if response.output.intent == "DBQuery":
            print("Intent classified as: DBQuery")
            return DBQuery(text=self.text)
        else:
            print("Intent classified as: SimpleChat")
            return SimpleChat(text=self.text)

ops_helper_graph = Graph(nodes=[Intent, SimpleChat, DBQuery])

async def main(): 
    print(f"--- Running Test 1: Simple Chat ---")
    chat_question = "What is the capital of France?"
    result_chat = await ops_helper_graph.run(Intent(text=chat_question))
    print(f"Final Answer: {result_chat.output}")
    print("-" * 30)

    print(f"--- Running Test 2: DB Chat ---")
    chat_question = "列出所有的数据库"
    result_chat = await ops_helper_graph.run(Intent(text=chat_question))
    print(f"Final Answer: {result_chat.output}")
    print("-" * 30)

if __name__ == '__main__':
    import asyncio
    asyncio.run(main())

输出类似:

--- Running Test 1: Simple Chat ---
Classifying intent for: 'What is the capital of France?'
Intent classified as: SimpleChat
Final Answer: The capital of France is Paris.

------------------------------
--- Running Test 2: DB Chat ---
Classifying intent for: '列出所有的数据库'
Intent classified as: DBQuery
Final Answer: 这里是所有数据库的列表:mydb、mysql、nacos。
------------------------------

通过 Pydantic 的 Graph,我们可以设计更复杂的流程,实现更复杂的业务,一切都可以自己把控。