LangChain Runnable核心类的讲解与使用
2010 年代末以来,人工智能(AI)技术突飞猛进,在 2023-2025 年间更是出现了里程碑式的发展。尤其是通用人工智能(AGI)和AI 自动化方面,全球顶尖研究机构和科技公司投入巨资与精力,推动了一系列前沿突破。本报告将梳理这些年份中 AGI 的最新进展、AI 在自动化领域的演进,以及面临的关键挑战和瓶颈。报告内容基于 OpenAI、DeepMind、Google AI、Anthropic、
01. RunnableParallel 并行运行
RunnableParallel 是 LangChain 中封装的支持运行多个 Runnable 的类,一般用于操作 Runnable 的输出,以匹配序列中下一个 Runnable 的输入,起到并行运行 Runnable 并格式化输出结构的作用。
例如 RunnableParallel 可以让我们同时执行多条 Chain,然后以字典的形式返回各个 Chain 的结果,对比每一条链单独执行,效率会高很多
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
# 1.编排2个提示模板
joke_prompt = ChatPromptTemplate.from_template("请讲一个关于{subject}的冷笑话,尽可能短")
poem_prompt = ChatPromptTemplate.from_template("请写一篇关于{subject}的诗,尽可能短")
# 2.创建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.构建两条链
joke_chain = joke_prompt | llm | parser
poem_chain = poem_prompt | llm | parser
# 5.使用RunnableParallel创建并行可运行
map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
# 6.运行并行可运行组件得到响应结果
resp = map_chain.invoke({"subject": "程序员"})
print(resp)
输出内容
{'joke': '为什么程序员总是用尺子测电脑屏幕?因为他们听说了“像素”是屏幕上的一种“尺寸”。', 'poem': '在代码的海洋里徜徉,\n程序员心怀梦想与创意。\n键盘敲击是旋律,\nbug 是诗歌的瑕疵。\n\n算法如诗的韵律,\n逻辑是句子的构思。\n编程者如诗人般,\n创造出数字的奇迹。'}
除了并行执行,RunnableParallel 还可以用于操作 Runnable 的输出,用于产生符合下一个 Runnable 组件的数据。
例如:用户传递数据,并行执行检索策略得到上下文随后传递给 Prompt 组件,如下
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
def retrieval(query: str) -> str:
"""模拟一个检索器,传入查询query,输出文本"""
print("执行检索:", query)
return "我是一名AI应用开发工程师。"
# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("""请根据用户的提问回答问题,可以参考对应的上下文进行回复。
<context>
{context}
<context>
用户的问题是: {query}""")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.编排链
chain = RunnableParallel(
context=retrieval,
query=RunnablePassthrough(),
) | prompt | llm | parser
# 5.调用链生成结果
content = chain.invoke("你好,我叫什么?")
print(content)
输出内容:
执行检索: 你好,我是什么?
你好,你是AI应用开发工程师。
在创建 RunnableParallel 的时候,支持传递字典、函数、映射、键值对数据等多种方式,RunnableParallel 底层会执行检测并将数据统一转换为 Runnable,核心源码如下
# langchain-core/runnables/base.py
def __init__(
self,
steps__: Optional[
Mapping[
str,
Union[
Runnable[Input, Any],
Callable[[Input], Any],
Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
]
] = None,
**kwargs: Union[
Runnable[Input, Any],
Callable[[Input], Any],
Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
) -> None:
# 1.检测是否传递字典,如果传递,则提取字段内的所有键值对
merged = {**steps__} if steps__ is not None else {}
# 2.传递了键值对,则将键值对更新到merged进行合并
merged.update(kwargs)
super().__init__( # type: ignore[call-arg]
# 3.循环遍历merged的所有键值对,并将每一个元素转换成Runnable
steps__={key: coerce_to_runnable(r) for key, r in merged.items()}
)
除此之外,在 Chains 中使用时,可以简写成字典的方式,__or__ 和 __ror__ 会自动将字典转换成 RunnableParallel,核心源码
# langchain_core/runnables/base.py
def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]:
"""Coerce a runnable-like object into a Runnable.
Args:
thing: A runnable-like object.
Returns:
A Runnable.
"""
if isinstance(thing, Runnable):
return thing
elif is_async_generator(thing) or inspect.isgeneratorfunction(thing):
return RunnableGenerator(thing)
elif callable(thing):
return RunnableLambda(cast(Callable[[Input], Output], thing))
elif isinstance(thing, dict):
# 如果类型为字典,使用字典创建RunnableParallel并转换成Runnable格式
return cast(Runnable[Input, Output], RunnableParallel(thing))
else:
raise TypeError(
f"Expected a Runnable, callable or dict."
f"Instead got an unsupported type: {type(thing)}"
)
02. RunnablePassthrough 传递数据
除了 RunnableParallel,在 LangChain 中,另外一个高频使用的 Runnable 类是 RunnablePassthrough,这个类透传上游参数输入,简单来说,就是可以获取上游的数据,并保持不变或者新增额外的键。
通常与 RunnableParallel 一起使用,将数据分配给映射中的新键。
例如:使用 RunnablePassthrough 来简化 invoke 的调用流程
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
# 1.编排prompt
prompt = ChatPromptTemplate.from_template("{query}")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
# 3.创建链
chain = {"query": RunnablePassthrough()} | prompt | llm | StrOutputParser()
# 4.调用链并获取结果
content = chain.invoke("你好,你是")
print(content)
输出内容:
你好!是的,我是ChatGPT,你需要什么帮助吗?
RunnablePassthrough() 获取的是整个输入的内容(字符串或者字典),如果想获取字典内的某个部分,可以使用 itemgetter 函数,并传入对应的字段名即可,如下
from operator import itemgetter
chain = {"query": itemgetter("query")} | prompt | llm | StrOutputParser()
content = chain.invoke({"query": "你好,你是?"})
除此之外,如果想在传递的数据中添加数据,还可以使用 RunnablePassthrough.assign() 方法来实现快速添加
例如
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
def retrieval(query: str) -> str:
"""模拟一个检索器,传入查询query,输出文本"""
print("执行检索:", query)
return "我是一名AI应用开发工程师。"
# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("""请根据用户的提问回答问题,可以参考对应的上下文进行回复。
<context>
{context}
<context>
用户的问题是: {query}""")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.编排链,RunnablePassthrough.assign写法
chain = (
RunnablePassthrough.assign(context=lambda x: retrieval(x["query"])) |
prompt |
llm |
parser
)
# 5.调用链生成结果
content = chain.invoke({"query": "你好,我叫什么?"})
print(content)
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)