Skip to main content
Open In ColabOpen on GitHub

Upstash 限流回调

在本指南中,我们将介绍如何基于请求次数或令牌数量使用 UpstashRatelimitHandler 添加速率限制。此处理器使用 Upstash 的限速库(ratelimit library),该库利用了 Upstash Redis

Upstash 限流通过在每次调用 limit 方法时向 Upstash Redis 发送一个 HTTP 请求来实现。检查并更新用户的剩余令牌/请求数。根据剩余的令牌数量,我们可以停止执行诸如调用大语言模型(LLM)或查询向量数据库等高成本操作:

response = ratelimit.limit()
if response.allowed:
execute_costly_operation()

UpstashRatelimitHandler 允许您在几分钟内将限流逻辑集成到您的链中。

首先,您需要前往 Upstash 控制台 并创建一个 Redis 数据库(参见我们的文档)。创建数据库后,您需要设置环境变量:

UPSTASH_REDIS_REST_URL="****"
UPSTASH_REDIS_REST_TOKEN="****"

接下来,您需要安装 Upstash Ratelimit 和 Redis 库,命令如下:

pip install upstash-ratelimit upstash-redis

您现在可以为您的链添加速率限制了!

每个请求的速率限制

让我们假设希望允许用户每分钟调用我们的链10次。实现这一点非常简单:

# set env variables
import os

os.environ["UPSTASH_REDIS_REST_URL"] = "****"
os.environ["UPSTASH_REDIS_REST_TOKEN"] = "****"

from langchain_community.callbacks import UpstashRatelimitError, UpstashRatelimitHandler
from langchain_core.runnables import RunnableLambda
from upstash_ratelimit import FixedWindow, Ratelimit
from upstash_redis import Redis

# create ratelimit
ratelimit = Ratelimit(
redis=Redis.from_env(),
# 10 requests per window, where window size is 60 seconds:
limiter=FixedWindow(max_requests=10, window=60),
)

# create handler
user_id = "user_id" # should be a method which gets the user id
handler = UpstashRatelimitHandler(identifier=user_id, request_ratelimit=ratelimit)

# create mock chain
chain = RunnableLambda(str)

# invoke chain with handler:
try:
result = chain.invoke("Hello world!", config={"callbacks": [handler]})
except UpstashRatelimitError:
print("Handling ratelimit.", UpstashRatelimitError)
Error in UpstashRatelimitHandler.on_chain_start callback: UpstashRatelimitError('Request limit reached!')
``````output
Handling ratelimit. <class 'langchain_community.callbacks.upstash_ratelimit_callback.UpstashRatelimitError'>

请注意,我们将处理器传递给 invoke 方法,而不是在定义链时传递处理器。

对于除 FixedWindow 以外的速率限制算法,请参见 upstash-ratelimit 文档

在执行管道中的任何步骤之前,ratelimit 将检查用户是否已超过请求限制。如果是,则会引发 UpstashRatelimitError

每个令牌的速率限制

另一种选择是根据以下条件对链调用进行速率限制:

  1. 提示中的令牌数量
  2. 提示和LLM生成中的令牌数量

这仅在您的链中有一个LLM时才有效。另一个要求是,您使用的LLM应在它的LLMOutput中返回令牌使用情况。

工作原理

处理程序将在调用大语言模型(LLM)之前获取剩余的令牌数。如果剩余令牌数大于0,则会调用LLM。否则将引发UpstashRatelimitError

调用LLM后,将使用令牌使用信息从用户的剩余令牌中扣除。在此链的阶段不会引发错误。

配置

对于第一个配置,只需像这样初始化处理器:

ratelimit = Ratelimit(
redis=Redis.from_env(),
# 1000 tokens per window, where window size is 60 seconds:
limiter=FixedWindow(max_requests=1000, window=60),
)

handler = UpstashRatelimitHandler(identifier=user_id, token_ratelimit=ratelimit)

对于第二种配置,以下是初始化处理器的方法:

ratelimit = Ratelimit(
redis=Redis.from_env(),
# 1000 tokens per window, where window size is 60 seconds:
limiter=FixedWindow(max_requests=1000, window=60),
)

handler = UpstashRatelimitHandler(
identifier=user_id,
token_ratelimit=ratelimit,
include_output_tokens=True, # set to True
)

您还可以同时根据请求和令牌进行速率限制,只需同时传入 request_ratelimittoken_ratelimit 参数即可。

这是一个使用LLM的链的示例:

# set env variables
import os

os.environ["UPSTASH_REDIS_REST_URL"] = "****"
os.environ["UPSTASH_REDIS_REST_TOKEN"] = "****"
os.environ["OPENAI_API_KEY"] = "****"

from langchain_community.callbacks import UpstashRatelimitError, UpstashRatelimitHandler
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from upstash_ratelimit import FixedWindow, Ratelimit
from upstash_redis import Redis

# create ratelimit
ratelimit = Ratelimit(
redis=Redis.from_env(),
# 500 tokens per window, where window size is 60 seconds:
limiter=FixedWindow(max_requests=500, window=60),
)

# create handler
user_id = "user_id" # should be a method which gets the user id
handler = UpstashRatelimitHandler(identifier=user_id, token_ratelimit=ratelimit)

# create mock chain
as_str = RunnableLambda(str)
model = ChatOpenAI()

chain = as_str | model

# invoke chain with handler:
try:
result = chain.invoke("Hello world!", config={"callbacks": [handler]})
except UpstashRatelimitError:
print("Handling ratelimit.", UpstashRatelimitError)
Error in UpstashRatelimitHandler.on_llm_start callback: UpstashRatelimitError('Token limit reached!')
``````output
Handling ratelimit. <class 'langchain_community.callbacks.upstash_ratelimit_callback.UpstashRatelimitError'>