01. RunnableParallel 并行运行

RunnableParallel 是 LangChain 中封装的支持运行多个 Runnable 的类,一般用于操作 Runnable 的输出,以匹配序列中下一个 Runnable 的输入,起到并行运行 Runnable 并格式化输出结构的作用

例如 RunnableParallel 可以让我们同时执行多条 Chain,然后以字典的形式返回各个 Chain 的结果,对比每一条链单独执行,效率会高很多

import dotenv

from langchain_core.output_parsers import StrOutputParser

from langchain_core.prompts import ChatPromptTemplate

from langchain_core.runnables import RunnableParallel

from langchain_openai import ChatOpenAI

dotenv.load_dotenv()

# 1.编排2个提示模板

joke_prompt = ChatPromptTemplate.from_template("请讲一个关于{subject}的冷笑话,尽可能短")

poem_prompt = ChatPromptTemplate.from_template("请写一篇关于{subject}的诗,尽可能短")

# 2.创建大语言模型

llm = ChatOpenAI(model="gpt-3.5-turbo-16k")

# 3.创建输出解析器

parser = StrOutputParser()

# 4.构建两条链

joke_chain = joke_prompt | llm | parser

poem_chain = poem_prompt | llm | parser

# 5.使用RunnableParallel创建并行可运行

map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)

# 6.运行并行可运行组件得到响应结果

resp = map_chain.invoke({"subject": "程序员"})

print(resp)

输出内容

{'joke': '为什么程序员总是用尺子测电脑屏幕?因为他们听说了“像素”是屏幕上的一种“尺寸”。', 'poem': '在代码的海洋里徜徉,\n程序员心怀梦想与创意。\n键盘敲击是旋律,\nbug 是诗歌的瑕疵。\n\n算法如诗的韵律,\n逻辑是句子的构思。\n编程者如诗人般,\n创造出数字的奇迹。'}

除了并行执行,RunnableParallel 还可以用于操作 Runnable 的输出,用于产生符合下一个 Runnable 组件的数据。

例如:用户传递数据,并行执行检索策略得到上下文随后传递给 Prompt 组件,如下

import dotenv

from langchain_core.output_parsers import StrOutputParser

from langchain_core.prompts import ChatPromptTemplate

from langchain_core.runnables import RunnableParallel, RunnablePassthrough

from langchain_openai import ChatOpenAI

dotenv.load_dotenv()

def retrieval(query: str) -> str:

    """模拟一个检索器,传入查询query,输出文本"""

    print("执行检索:", query)

    return "我是一名AI应用开发工程师。"

# 1.编排Prompt

prompt = ChatPromptTemplate.from_template("""请根据用户的提问回答问题,可以参考对应的上下文进行回复。

<context>

{context}

<context>

用户的问题是: {query}""")

# 2.构建大语言模型

llm = ChatOpenAI(model="gpt-3.5-turbo")

# 3.创建输出解析器

parser = StrOutputParser()

# 4.编排链

chain = RunnableParallel(

    context=retrieval,

    query=RunnablePassthrough(),

) | prompt | llm | parser

# 5.调用链生成结果

content = chain.invoke("你好,我叫什么?")

print(content)

输出内容:

执行检索: 你好,我是什么?

你好,你是AI应用开发工程师。

创建 RunnableParallel 的时候,支持传递字典、函数、映射、键值对数据等多种方式,RunnableParallel 底层会执行检测并将数据统一转换为 Runnable,核心源码如下

# langchain-core/runnables/base.py

def __init__(

    self,

    steps__: Optional[

        Mapping[

            str,

            Union[

                Runnable[Input, Any],

                Callable[[Input], Any],

                Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],

            ],

        ]

    ] = None,

    **kwargs: Union[

        Runnable[Input, Any],

        Callable[[Input], Any],

        Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],

    ],

) -> None:

    # 1.检测是否传递字典,如果传递,则提取字段内的所有键值对

    merged = {**steps__} if steps__ is not None else {}

    # 2.传递了键值对,则将键值对更新到merged进行合并

    merged.update(kwargs)

    super().__init__(  # type: ignore[call-arg]

        # 3.循环遍历merged的所有键值对,并将每一个元素转换成Runnable

        steps__={key: coerce_to_runnable(r) for key, r in merged.items()}

)

除此之外,在 Chains 中使用时,可以简写成字典的方式,__or__ 和 __ror__ 会自动将字典转换成 RunnableParallel,核心源码

# langchain_core/runnables/base.py

def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]:

    """Coerce a runnable-like object into a Runnable.

    Args:

        thing: A runnable-like object.

    Returns:

        A Runnable.

    """

    if isinstance(thing, Runnable):

        return thing

    elif is_async_generator(thing) or inspect.isgeneratorfunction(thing):

        return RunnableGenerator(thing)

    elif callable(thing):

        return RunnableLambda(cast(Callable[[Input], Output], thing))

    elif isinstance(thing, dict):

        # 如果类型为字典,使用字典创建RunnableParallel并转换成Runnable格式

        return cast(Runnable[Input, Output], RunnableParallel(thing))

    else:

        raise TypeError(

            f"Expected a Runnable, callable or dict."

            f"Instead got an unsupported type: {type(thing)}"

        )

02. RunnablePassthrough 传递数据

除了 RunnableParallel,在 LangChain 中,另外一个高频使用的 Runnable 类是 RunnablePassthrough,这个类透传上游参数输入,简单来说,就是可以获取上游的数据,并保持不变或者新增额外的键。

通常与 RunnableParallel 一起使用,将数据分配给映射中的新键。

例如:使用 RunnablePassthrough 来简化 invoke 的调用流程

import dotenv

from langchain_core.output_parsers import StrOutputParser

from langchain_core.prompts import ChatPromptTemplate

from langchain_core.runnables import RunnablePassthrough

from langchain_openai import ChatOpenAI

dotenv.load_dotenv()

# 1.编排prompt

prompt = ChatPromptTemplate.from_template("{query}")

# 2.构建大语言模型

llm = ChatOpenAI(model="gpt-3.5-turbo-16k")

# 3.创建链

chain = {"query": RunnablePassthrough()} | prompt | llm | StrOutputParser()

# 4.调用链并获取结果

content = chain.invoke("你好,你是")

print(content)

输出内容:

你好!是的,我是ChatGPT,你需要什么帮助吗?

RunnablePassthrough() 获取的是整个输入的内容(字符串或者字典),如果想获取字典内的某个部分,可以使用 itemgetter 函数,并传入对应的字段名即可,如下

from operator import itemgetter

chain = {"query": itemgetter("query")} | prompt | llm | StrOutputParser()

content = chain.invoke({"query": "你好,你是?"})

除此之外,如果想在传递的数据中添加数据,还可以使用 RunnablePassthrough.assign() 方法来实现快速添加

例如

import dotenv

from langchain_core.output_parsers import StrOutputParser

from langchain_core.prompts import ChatPromptTemplate

from langchain_core.runnables import RunnablePassthrough

from langchain_openai import ChatOpenAI

dotenv.load_dotenv()

def retrieval(query: str) -> str:

    """模拟一个检索器,传入查询query,输出文本"""

    print("执行检索:", query)

    return "我是一名AI应用开发工程师。"

# 1.编排Prompt

prompt = ChatPromptTemplate.from_template("""请根据用户的提问回答问题,可以参考对应的上下文进行回复。

<context>

{context}

<context>

用户的问题是: {query}""")

# 2.构建大语言模型

llm = ChatOpenAI(model="gpt-3.5-turbo")

# 3.创建输出解析器

parser = StrOutputParser()

# 4.编排链,RunnablePassthrough.assign写法

chain = (

        RunnablePassthrough.assign(context=lambda x: retrieval(x["query"])) |

        prompt |

        llm |

        parser

)

# 5.调用链生成结果

content = chain.invoke({"query": "你好,我叫什么?"})

print(content)

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐