如何在 Apache APISIX 上实现幂等密钥规范经过@nfrankel
如何在 Apache APISIX 上实现幂等密钥规范

经过 Nicolas Fränkel14m2024/04/11
在这篇文章中,我通过插件展示了 Apache APISIX 上 Idempotency-Key 标头规范的简单实现。在这个阶段,它还有改进的空间:自动化测试、基于每个路由配置 Redis 的能力、将域/路径配置为请求的一部分、配置 Redis 集群而不是单个实例、使用另一个 K/V 存储等。
上周,我写了一篇关于IETF 幂等密钥规范分析。该规范旨在避免重复请求。简而言之,其理念是让客户端随请求一起发送一个唯一的密钥:

  • 如果服务器不知道密钥,它会照常进行,然后存储响应。

  • 如果服务器知道密钥,它会阻止任何进一步的处理并立即返回存储的响应。

这篇文章展示了如何使用Apache APISIX实现它。


在开始编码之前,我们需要定义一些事情。Apache APISIX 提供基于插件的架构。因此,我们将在插件中编写上述逻辑。

Apache APISIX 建立在 OpenResty 之上,而 OpenResty 又建立在 nginx 之上。每个组件都定义了阶段,这些阶段或多或少地映射到各个组件上。有关阶段的更多信息,请参阅此上一篇文章

最后,我们要确定优先级。优先级定义了 APISIX在阶段内运行插件的顺序。我决定使用1500 ,因为所有身份验证插件的优先级都在2000及以上范围内,但我想尽快返回缓存的响应。

规范要求我们存储数据。APISIX 提供了许多抽象,但存储不是其中之一。我们需要通过幂等键进行访问,因此它看起来像一个键值存储。

我随意选择了 Redis,因为它非常普及,而且客户端已经是 APISIX 发行版的一部分。请注意,简单的 Redis 不提供 JSON 存储;因此,我使用redis-stack Docker 映像。


 services: apisix: image: apache/apisix:3.9.0-debian volumes: - ./apisix/config.yml:/usr/local/apisix/conf/config.yaml:ro - ./apisix/apisix.yml:/usr/local/apisix/conf/apisix.yaml:ro #1 - ./plugin/src:/opt/apisix/plugins:ro #2 ports: - "9080:9080" redis: image: redis/redis-stack:7.2.0-v9 ports: - "8001:8001" #3
  1. 静态路由配置
  2. 通往我们未来插件的道路
  3. Redis Insights 端口(GUI)。本身不是必需的,但在开发调试过程中非常有用。

APISIX 配置如下:

 deployment: role: data_plane role_data_plane: config_provider: yaml #1 apisix: extra_lua_path: /opt/?.lua #2 plugins: - idempotency # priority: 1500 #3 plugin_attr: #4 idempotency: host: redis #5
  1. 配置 APISIX 以进行静态路由配置
  2. 配置插件的位置
  3. 自定义插件需要明确声明。优先级注释不是必需的,但是一种很好的做法,可以提高可维护性
  4. 所有路线的通用插件配置
  5. 见下文


 routes: - uri: /* plugins: idempotency: ~ #1 upstream: nodes: "": 1 #2 #END #3
  1. 声明我们要创建的插件。
  2. httpbin 是一个有用的上游,因为我们可以尝试不同的 URI 和方法。
  3. 静态路由配置必填!



Apache APISIX 插件的基础非常简单:

 local plugin_name = "idempotency" local _M = { version = 1.0, priority = 1500, schema = {}, name = plugin_name, } return _M

下一步是配置,例如Redis 主机和端口。首先,我们将为所有路由提供单一的 Redis 配置。这就是config.yaml文件中plugin_attr部分背后的想法:通用配置。让我们充实我们的插件:

 local core = require("apisix.core") local plugin = require("apisix.plugin") local attr_schema = { --1 type = "object", properties = { host = { type = "string", description = "Redis host", default = "localhost", }, port = { type = "integer", description = "Redis port", default = 6379, }, }, } function _M.init() local attr = plugin.plugin_attr(plugin_name) or {} local ok, err = core.schema.check(attr_schema, attr) --2 if not ok then core.log.error("Failed to check the plugin_attr[", plugin_name, "]", ": ", err) return false, err end end
  1. 定义配置的形状

  2. 检查配置是否有效

因为我在插件中定义了默认值,所以我只能将host覆盖为redis ,以在我的 Docker Compose 基础架构内运行并使用默认端口。

接下来,我需要创建 Redis 客户端。请注意,平台禁止我在重写/访问部分之后的任何阶段进行连接。因此,我将在init()方法中创建它并将其保留到最后。

 local redis_new = require("resty.redis").new --1 function _M.init() -- ... redis = redis_new() --2 redis:set_timeout(1000) local ok, err = redis:connect(, attr.port) if not ok then core.log.error("Failed to connect to Redis: ", err) return false, err end end
  1. 引用OpenResty Redis模块的new功能。

  2. 调用它来获取一个实例。

Redis 客户端现在可在插件执行周期的其余时间内在redis变量中使用。


在我以前的软件工程师生涯中,我通常首先实施名义路径。然后,我通过单独管理错误情况使代码更加健壮。这样,如果我必须在任何时候发布,我仍然会提供业务价值 - 带有警告。我将以同样的方式处理这个小项目。


 DO extract idempotency key from request DO look up value from Redis IF value doesn't exist DO set key in Redis with empty value ELSE RETURN cached response DO forward to upstream DO store response in Redis RETURN response

我们需要将逻辑映射到我上面提到的阶段。上游之前有两个阶段可用,即重写访问;之后有三个阶段,即header_filterbody_filterlog访问阶段之前的工作似乎很明显,但我需要在其他三个阶段之间做出选择。我随机选择了body_filter ,但我非常愿意听取其他阶段的合理论点。


 function _M.access(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") --1 local redis_key = "idempotency#" .. idempotency_key --2 local resp, err = redis:hgetall(redis_key) --3 if not resp then return end if next(resp) == nil then --4 local resp, err = redis:hset(redis_key, "request", true ) --4 if not resp then return end else local data = normalize_hgetall_result(resp) --5 local response = core.json.decode(data["response"]) --6 local body = response["body"] --7 local status_code = response["status"] --7 local headers = response["headers"] for k, v in pairs(headers) do --7 core.response.set_header(k, v) end return core.response.exit(status_code, body) --8 end end
  1. 从请求中提取幂等性密钥。
  2. 给密钥添加前缀,这样我们就可以避免潜在的冲突。
  3. 获取幂等性键下存储在Redis中的数据集。
  4. 如果找不到该键,则用布尔标记存储它。
  5. 通过自定义实用函数转换 Lua 表中的数据。
  6. 响应以 JSON 格式存储,以解释标题。
  7. 重建响应。
  8. 将重建的响应返回给客户端。请注意return语句:APISIX 跳过了后续的生命周期阶段。

 function _M.body_filter(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") --1 local redis_key = "idempotency#" .. idempotency_key if core.response then local response = { --2 status = ngx.status, body = core.response.hold_body_chunk(ctx, true), headers = ngx.resp.get_headers() } local redis_key = "idempotency#" .. redis_key local resp, err = red:set(redis_key, "response", core.json.encode(response)) --3 if not resp then return end end end
  1. 从请求中提取幂等性密钥。

  2. 在 Lua 表中排列响应的不同元素。

  3. 将 JSON 编码的响应存储在 Redis 集中



 curl -i -X POST -H 'Idempotency-Key: A' localhost:9080/response-headers\?freeform=hello curl -i -H 'Idempotency-Key: B' localhost:9080/status/250 curl -i -H 'Idempotency-Key: C' -H 'foo: bar' localhost:9080/status/250




  • 缺少幂等性密钥。

  • 幂等性密钥已被使用。

  • 此幂等性密钥的请求尚未完成


 function _M.access(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") if not idempotency_key then return core.response.exit(400, "This operation is idempotent and it requires correct usage of Idempotency Key") end -- ...

如果钥匙丢失,只需返回相应的 400。这个很简单。


有几个问题需要解决。首先,我没有找到现有的 API 来对core.request对象进行哈希处理,就像我更熟悉的其他语言中那样,例如Java 的Object.hash() 。我决定用 JSON 编码对象并哈希字符串。但是,现有的core.request有无法转换为 JSON 的子元素。我不得不提取上面提到的部分并转换表格。

 local function hash_request(request, ctx) local request = { --1 method = core.request.get_method(), uri = ctx.var.request_uri, headers = core.request.headers(), body = core.request.get_body() } local json = core.json.stably_encode(request) --2 return ngx.encode_base64(json) --3 end
  1. 创建一个仅包含相关部分的表格。

  2. cjson库生成的 JSON 的成员在多次调用中可能排序不同。因此,它会产生不同的哈希值。core.json.stably_encode 修复core.json.stably_encode这个问题。

  3. 将其哈希化。


 local hash = hash_request(core.request, ctx) if next(resp) == nil then core.log.warn("No key found in Redis for Idempotency-Key, set it: ", redis_key) local resp, err = redis:hset(redis_key, "request", hash) if not resp then core.log.error("Failed to set data in Redis: ", err) return end then -- ...


 local data = normalize_hgetall_result(resp) local stored_hash = data["request"] if hash ~= stored_hash then return core.response.exit(422, "This operation is idempotent and it requires correct usage of Idempotency Key. Idempotency Key MUST not be reused across different payloads of this operation.") end


  1. 请求附带幂等密钥X。

  2. 插件对指纹进行识别并将哈希值存储在 Redis 中。

  3. APISIX 将请求转发给上游。

  4. 重复请求具有相同的幂等键 X。

  5. 插件从 Redis 读取数据,发现没有缓存的响应。



 if not data["response"] then return core.response.exit(409, " request with the same Idempotency-Key for the same operation is being processed or is outstanding.") end



在这篇文章中,我通过插件展示了 Apache APISIX 上Idempotency-Key标头规范的简单实现。在这个阶段,它还有改进的空间:自动化测试、基于每个路由配置 Redis 的能力、将域/路径配置为请求的一部分、配置 Redis 集群而不是单个实例、使用另一个 K/V 存储等。




