2026/5/21 17:16:37
网站建设
项目流程
做系统和做网站哪个简单一些,网站关键词多少个,上海网站建设赢昶,福州在线项目建设管理系统LangChain 与 LangGraph 的组件化能力#xff0c;降低了 AI Agent 的原型构建门槛。但在项目迈向生产环境时#xff0c;AI Agent 工程化落地还存在一些问题。今天想在这里分享一下#xff0c;作者本人在实际生产环境中 AI Agent 开发过程中所遇到的问题和几点经验。一切都以…LangChain 与 LangGraph 的组件化能力降低了 AI Agent 的原型构建门槛。但在项目迈向生产环境时AI Agent 工程化落地还存在一些问题。今天想在这里分享一下作者本人在实际生产环境中 AI Agent 开发过程中所遇到的问题和几点经验。一切都以实际需求为准那么就以实际需求为切入点徐徐展开。1. 多租户 API 配置的动态切换需求场景在 SaaS 场景下把 Agent 封装成 API 服务对外提供时会出现不同用户需要使用不同 LLM 配置的情况费用隔离- 企业客户自带 API Key按自己的额度计费数据安全- 部分用户要求接入私有化部署的模型A/B 测试- 同一服务对不同请求使用不同模型对比效果除这三个常见的以外可能还有其他的使用场景。总之每个请求需要能够动态指定model、base_url和api_key。LangChain 原生实现的问题先看看 LangChain 原生是怎么做的from langchain_openai import ChatOpenAI # 初始化时就要绑定配置 llm ChatOpenAI( modelgpt-4, api_keysk-xxx, base_urlhttps://api.openai.com/v1 )这样写在单用户场景没问题但放到多租户 Web 服务里就有几个问题配置在初始化时绑定实例创建后无法切换ChatOpenAI的model、openai_api_base、openai_api_key都是构造参数一旦实例创建完成配置就固定了。如果要切换到另一个服务商或 API Key就必须创建新实例频繁创建实例每来一个用户请求就得new一个新的ChatOpenAI实例HTTP 连接无法复用每个实例会独立创建底层 HTTP 连接高并发时连接数不可控可能导致资源耗尽解决方案核心思路是Agent 复用Client 按需创建HTTP 连接池共享import httpx from openai import AsyncOpenAI # 1. 全局 HTTP 连接池单例 class GlobalHTTPFactory: _client None classmethod async def get_client(cls): if cls._client is None: cls._client httpx.AsyncClient( limitshttpx.Limits(max_connections50), http2True ) return cls._client # 2. Agent 基类 class BaseAgent: def __init__(self, default_configNone): self.default_config default_config or {} async def get_openai_client(self, runtime_configNone): # 合并配置运行时配置覆盖默认配置 config {**self.default_config, **runtime_config} if runtime_config else self.default_config # 复用全局 HTTP 连接池 shared_http await GlobalHTTPFactory.get_client() return AsyncOpenAI( api_keyconfig[api_key], base_urlconfig[base_url], http_clientshared_http # 关键注入共享的 HTTP 客户端 ) async def run(self, text, **runtime_config): client await self.get_openai_client(runtime_config) # 调用 LLM... # 3. Web 服务层 # 服务启动时创建一次 Agent agent BaseAgent() app.post(/chat) async def chat(request): # 每个请求传入自己的配置 return await agent.run( textrequest.text, modelrequest.model, base_urlrequest.base_url, api_keyrequest.api_key )整个设计的要点Agent 只创建一次服务启动时初始化全局复用OpenAI 客户端按需创建每个请求根据传入的配置生成HTTP 连接池全局共享通过http_client参数注入避免连接数爆炸极大降低运行所需内存为什么不用全局连接池内存会一直累加每次new AsyncOpenAI()都会创建一个内部的 HTTP 连接池。如果请求完成后没有显式调用close()底层连接不会立即释放。加上 Python GC 的延迟、HTTP Keep-alive 保持连接、异步对象可能存在的循环引用等因素这些连接池会不断累积最终导致内存持续增长甚至 OOM。使用全局连接池后只维护一个实例连接数有上限问题解决。2. 模型思考过程的流式透传需求场景DeepSeek-R1、GLM-4 等推理模型在生成最终答案之前会先进行一段思考。这个思考过程对用户来说是有价值的——它能让用户看到模型是如何分析问题的增强可信度。把 Agent 封装成 API 服务时需要把这个思考过程实时流式地透传给前端实时展示用户能看到模型正在思考而不是干等区分内容前端需要区分思考内容和最终回答分别展示多模型兼容不同模型厂商的思考参数格式不同DeepSeek、GLM、Qwen 各有各的写法问题在哪OpenAI SDK 返回的流式 chunk 结构是这样的# 普通内容在 delta.content # 思考内容在 delta.reasoning_content这是 DeepSeek 扩展的字段 chunk.choices[0].delta.content # 最终回答 chunk.choices[0].delta.reasoning_content # 思考过程问题是reasoning_content不是标准字段LangChain 原生不认识这个字段会直接丢弃不同模型参数不同DeepSeek 用reasoning_contentGLM 用thinking.typeQwen 用enable_thinking需要手动解析 chunk拿到 chunk 后要自己判断是思考内容还是正式内容解决方案核心问题是LangChain 的AIMessageChunk只保留content会把reasoning_content丢掉。解决思路是自定义消息类型 自定义 ChatModel把 OpenAI 原始响应完整保留下来from langchain_core.messages import BaseMessageChunk from langchain_core.language_models.chat_models import BaseChatModel # 1. 自定义消息类型保留完整的 OpenAI 原始响应 class ChatMessageChunk(BaseMessageChunk): 关键把 OpenAI 的原始 chunk 完整保留下来 chat_completion_chunk: Optional[ChatCompletionChunk] None # 2. 自定义 ChatModel流式时用自定义消息类型包装 class LLMClientChatModel(BaseChatModel): async def _astream(self, messages, **kwargs): async for chunk in self.client.astream(messages): # 用自定义消息包装不让 LangChain 丢掉 reasoning_content message ChatMessageChunk(content, chat_completion_chunkchunk) yield ChatGenerationChunk(messagemessage) # 3. Agent 中解析 chunk区分思考/内容 class MyAgent: async def run_stream(self, text, **kwargs): async for event in self.graph.astream_events(inputs, config): if event[event] on_chat_model_stream: chunk event[data][chunk] # 因为保留了完整 chunk这里能拿到 reasoning_content delta chunk.message.chat_completion_chunk.choices[0].delta if delta.reasoning_content: yield StreamChunk(typethinking, contentdelta.reasoning_content) elif delta.content: yield StreamChunk(typecontent, contentdelta.content) # 4. 不同模型的思考参数统一管理 class ThinkingConfig: def __init__(self): self.model_params { glm: {thinking: {type: enabled}}, deepseek: {}, # DeepSeek 默认支持 qwen: {enable_thinking: True} } def get_params(self, model_name, enable): model_type self._detect_model_type(model_name) return self.model_params.get(model_type, {}) if enable else {}整个设计的要点自定义消息类型ChatMessageChunk保留完整的ChatCompletionChunk不让 LangChain 丢弃扩展字段自定义 ChatModelLLMClientChatModel在流式输出时用自定义消息包装Agent 层解析从保留的完整 chunk 中取出reasoning_content区分类型输出统一配置管理ThinkingConfig封装不同模型的参数差异3. 中间执行路径的可观测性需求场景以一个智能研报生成 Agent为例它的执行流程比较复杂输入解析 → 多源数据采集并行→ 数据交叉验证 → 深度分析 → 观点提炼 → 报告生成 → 合规审查 → 输出 ↓ ↓ [财报API] [验证失败则重试] [新闻API] [行情API]用户调用 API 生成一份研报时可能需要等待 30 秒以上。如果这期间前端只显示一个转圈动画用户体验会很差。我们需要把执行进度实时透传出来实时反馈用户能看到「正在采集财报数据 (2/3)」「正在进行深度分析」而不是干等并行任务状态多个数据源并行采集时分别展示各自的进度条件分支可见数据验证失败触发重试时用户能知道发生了什么调试定位出问题时能快速定位是哪个节点、哪个数据源出错问题在哪LangGraph 的ainvoke()方法只返回最终结果中间过程完全是黑盒# 只能拿到最终结果中间 30 秒发生了什么完全不知道 result await graph.ainvoke(inputs)虽然 LangGraph 提供了astream_events()方法但它的事件类型很多需要自己过滤和解析# astream_events 会抛出各种事件on_chain_start, on_chain_end, on_chat_model_stream... # 需要自己判断哪些是节点事件哪些是 LLM 事件哪些是子图事件 async for event in graph.astream_events(inputs): # 怎么过滤怎么区分主图和子图怎么处理并行节点 pass解决方案核心是监听on_chain_start/on_chain_end事件结合节点元数据输出结构化的进度信息from pydantic import BaseModel, Field from typing import Optional, Literal from enum import Enum # 1. 统一的流式输出格式 class ChunkType(str, Enum): PROCESSING processing # 执行进度 THINKING thinking # 思考过程 CONTENT content # 正式内容 FINAL final # 最终结果 ERROR error # 错误信息 class StreamChunk(BaseModel): 流式输出块前端按 type 分别处理 type: ChunkType content: str metadata: Optional[dict] None # 2. 节点进度配置可配置化 NODE_PROGRESS_MAP { # 节点名 - (进度描述, 预估耗时秒, 所属阶段) parse_input: (解析用户输入, 1, 准备阶段), fetch_financial_data: (采集财报数据, 5, 数据采集), fetch_news_data: (采集新闻资讯, 3, 数据采集), fetch_market_data: (采集行情数据, 2, 数据采集), validate_data: (交叉验证数据, 3, 数据处理), deep_analysis: (深度分析, 10, 智能分析), extract_insights: (提炼核心观点, 5, 智能分析), generate_report: (生成研报内容, 8, 报告生成), compliance_check: (合规性审查, 3, 质量保障), } # 3. Agent 中监听节点事件 class ResearchReportAgent: def __init__(self): self.parallel_tasks {} # 跟踪并行任务状态 self.retry_count {} # 跟踪重试次数 async def run_stream(self, query: str, **kwargs): async for event in self.graph.astream_events(inputs, config): event_type event.get(event, ) node_name event.get(name, ) # 节点开始事件 if event_type on_chain_start: if node_name in NODE_PROGRESS_MAP: desc, est_time, stage NODE_PROGRESS_MAP[node_name] # 处理重试场景 retry self.retry_count.get(node_name, 0) retry_hint f第 {retry 1} 次尝试 if retry 0 else yield StreamChunk( typeChunkType.PROCESSING, contentf[{stage}] {desc}{retry_hint}..., metadata{ node: node_name, stage: stage, estimated_seconds: est_time, retry_count: retry } ) # 处理并行数据采集节点 elif node_name parallel_data_fetch: self.parallel_tasks {financial: pending, news: pending, market: pending} yield StreamChunk( typeChunkType.PROCESSING, content[数据采集] 正在并行采集多源数据..., metadata{parallel_status: self.parallel_tasks} ) # 节点结束事件 elif event_type on_chain_end: # 更新并行任务状态 if node_name fetch_financial_data: self.parallel_tasks[financial] completed yield StreamChunk( typeChunkType.PROCESSING, content[数据采集] 财报数据采集完成 ✓, metadata{parallel_status: self.parallel_tasks.copy()} ) # 处理验证失败触发重试的场景 elif node_name validate_data: output event.get(data, {}).get(output, {}) if not output.get(is_valid, True): failed_sources output.get(failed_sources, []) for src in failed_sources: self.retry_count[ffetch_{src}_data] self.retry_count.get(ffetch_{src}_data, 0) 1 yield StreamChunk( typeChunkType.PROCESSING, contentf[数据处理] 验证未通过{failed_sources} 将重新采集..., metadata{retry_sources: failed_sources} ) # 图执行完毕输出最终结果 elif node_name LangGraph: final_output event[data][output] yield StreamChunk( typeChunkType.FINAL, content, metadata{report: final_output} ) # LLM 流式输出事件 elif event_type on_chat_model_stream: chunk event[data][chunk] delta chunk.message.chat_completion_chunk.choices[0].delta if delta.reasoning_content: yield StreamChunk(typeChunkType.THINKING, contentdelta.reasoning_content) elif delta.content: yield StreamChunk(typeChunkType.CONTENT, contentdelta.content)前端拿到的流式输出会是这样{type: processing, content: [准备阶段] 解析用户输入..., metadata: {node: parse_input, stage: 准备阶段, estimated_seconds: 1}} {type: processing, content: [数据采集] 正在并行采集多源数据..., metadata: {parallel_status: {financial: pending, news: pending, market: pending}}} {type: processing, content: [数据采集] 采集新闻资讯..., metadata: {node: fetch_news_data, stage: 数据采集}} {type: processing, content: [数据采集] 采集财报数据..., metadata: {node: fetch_financial_data, stage: 数据采集}} {type: processing, content: [数据采集] 新闻数据采集完成 ✓, metadata: {parallel_status: {financial: pending, news: completed, market: pending}}} {type: processing, content: [数据采集] 财报数据采集完成 ✓, metadata: {parallel_status: {financial: completed, news: completed, market: pending}}} {type: processing, content: [数据采集] 行情数据采集完成 ✓, metadata: {parallel_status: {financial: completed, news: completed, market: completed}}} {type: processing, content: [数据处理] 交叉验证数据..., metadata: {node: validate_data}} {type: processing, content: [数据处理] 验证未通过[market] 将重新采集..., metadata: {retry_sources: [market]}} {type: processing, content: [数据采集] 采集行情数据第 2 次尝试..., metadata: {node: fetch_market_data, retry_count: 1}} {type: processing, content: [智能分析] 深度分析..., metadata: {node: deep_analysis, estimated_seconds: 10}} {type: thinking, content: 我需要从多个维度分析这家公司...} {type: thinking, content: 首先看财务数据营收同比增长...} {type: content, content: ## 一、公司概况\n\n} {type: content, content: 该公司是国内领先的...} {type: processing, content: [智能分析] 提炼核心观点..., metadata: {node: extract_insights}} {type: processing, content: [报告生成] 生成研报内容..., metadata: {node: generate_report}} {type: processing, content: [质量保障] 合规性审查..., metadata: {node: compliance_check}} {type: final, content: , metadata: {report: {title: XX公司深度研究报告, sections: [...]}}}整个设计的要点统一输出格式StreamChunk定义type字段前端按类型分别处理节点配置可扩展NODE_PROGRESS_MAP集中管理节点描述新增节点只需加一行配置并行任务可追踪通过parallel_status字段前端可以渲染多任务进度条重试过程透明验证失败、重新采集等异常流程对用户可见增强可信度预估时间可用estimated_seconds可用于前端渲染预估进度条总结本文分享了在开发open-pilot-agent过程中遇到的三个生产级挑战以及对应的解决方案多租户 API 配置的动态切换通过 Agent 复用 Client 按需创建 HTTP 连接池共享解决配置动态切换和资源管理问题模型思考过程的流式透传通过自定义消息类型和 ChatModel保留 OpenAI 原始响应中的reasoning_content字段中间执行路径的可观测性通过监听 LangGraph 的astream_events将节点执行进度实时透传给前端学习资源推荐如果你想更深入地学习大模型以下是一些非常有价值的学习资源这些资源将帮助你从不同角度学习大模型提升你的实践能力。一、全套AGI大模型学习路线AI大模型时代的学习之旅从基础到前沿掌握人工智能的核心技能因篇幅有限仅展示部分资料需要点击文章最下方名片即可前往获取二、640套AI大模型报告合集这套包含640份报告的合集涵盖了AI大模型的理论研究、技术实现、行业应用等多个方面。无论您是科研人员、工程师还是对AI大模型感兴趣的爱好者这套报告合集都将为您提供宝贵的信息和启示因篇幅有限仅展示部分资料需要点击文章最下方名片即可前往获取三、AI大模型经典PDF籍随着人工智能技术的飞速发展AI大模型已经成为了当今科技领域的一大热点。这些大型预训练模型如GPT-3、BERT、XLNet等以其强大的语言理解和生成能力正在改变我们对人工智能的认识。 那以下这些PDF籍就是非常不错的学习资源。因篇幅有限仅展示部分资料需要点击文章最下方名片即可前往获取四、AI大模型商业化落地方案作为普通人入局大模型时代需要持续学习和实践不断提高自己的技能和认知水平同时也需要有责任感和伦理意识为人工智能的健康发展贡献力量。