当前位置: 首页 > news >正文

使用AWS中国区Lambda集成Glue Schema Registry消费Kafka消息的实践

本文在 AWS 中国区(cn-north-1)实现 Docker 自建 Kafka 与 AWS Lambda + Glue Schema Registry 的完整集成。Kafka 运行在 EC2 实例上,Lambda 通过 VPC 内网消费消息,使用 Avro 格式进行数据序列化。

整体的数据流图如下

CloudWatch LogsGlue Schema RegistryLambda FunctionDocker KafkaProducer (Avro)CloudWatch LogsGlue Schema RegistryLambda FunctionDocker KafkaProducer (Avro)注册 Schema发送 Avro 消息ESM 轮询推送获取 Schema (by ID)返回 Schema 定义Avro 反序列化记录处理日志

核心概念

SelfManagedKafka 事件源

AWS Lambda 支持多种事件源,其中 SelfManagedKafka 类型允许 Lambda 直接连接自建 Kafka 集群,无需经过 MSK。

  • KafkaBootstrapServers: Kafka 代理地址(数组格式)
  • Topics: 订阅的 Topic 列表
  • StartingPosition: 消费起始位置 (LATEST/TRIM_HORIZON)
  • SourceAccessConfigurations: VPC 访问配置

注意KafkaBootstrapServers必须是数组类型:

KafkaBootstrapServers:-!Sub"${EC2PrivateIp}:9092"

EventSourceMapping 事件格式

Lambda 接收自 Kafka 的事件结构与直接调用不同:

  • records是字典,key 为{topic}-{partition}
  • value是 Base64 编码的消息内容
  • Lambda 需要遍历records字典的值
{"eventSource":"SelfManagedKafka","bootstrapServers":"172.31.14.46:9092","records":{"orders-0":[{"topic":"orders","partition":0,"offset":1,"timestamp":1779613023206,"timestampType":"CREATE_TIME","value":"eyJvcmRlcl9pZCI6...","headers":[]}]}}

Glue Schema Registry 集成

Glue Schema Registry 提供 Schema 定义和版本管理。但是根据 AWS 官方文档中国区的lambda服务目前不可用

Provisioned mode for event source mappings is not available in the China Regions.

Provisioned Mode是 Lambda ESM (Event Source Mapping) 的一种事件轮询模式,用于控制 Lambda 如何从 Kafka/MSK/SQS 拉取消息。

由于SchemaRegistryConfig必须配合ProvisionedPollerConfig(即 Provisioned Mode)使用,因此中国区 Lambda ESM无法使用 Schema Registry 自动验证。解决方案是在 Lambda 代码中手动处理 Avro 反序列化。

根据 AWS 官方文档 Using schema registries with Kafka event sources:

This feature is only available for event source mappings using provisioned mode. Schema registry doesn’t support event source mappings in on-demand mode.

如果尝试在 On-Demand 模式下配置 SchemaRegistryConfig,会收到以下错误:

SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.

Schema Registry 集成需要在 ESM poller 层面执行额外工作(查询 schema、解码消息),AWS 将此功能绑定到 Provisioned Mode 实现。

Lambda Function

Lambda ESM Poller

Kafka Cluster

Provisioned Mode Required

Kafka Message
(Avro bytes)

Schema Registry Lookup
自动查询 Glue Schema

Avro Decode
自动反序列化

Handler
收到 JSON 格式事件

由于 Provisioned Mode 不可用,需在 Lambda 代码中手动处理 Avro 序列化:

fromaws_schema_registryimportSchemaRegistryClient,KafkaDeserializerimportboto3 glue_client=boto3.client('glue',region_name='cn-north-1')registry_arn='arn:aws-cn:glue:cn-north-1:xxxxxxxxxx:registry/orders-registry'schema_client=SchemaRegistryClient(glue_client,registry_arn)deserializer=KafkaDeserializer(schema_client)deflambda_handler(event,context):fortopic_partition,recordsinevent.get('records',{}).items():forrecordinrecords:value_bytes=base64.b64decode(record['value'])decoded=deserializer.deserialize(topic,value_bytes)# 处理 decoded.data (Python dict)...

部署与配置

SAM部署基础设施

SAM 模板如下

AWSTemplateFormatVersion:'2010-09-09'Transform:AWS::Serverless-2016-10-31Resources:# Lambda Layer - 包含 aws-glue-schema-registryGlueSchemaRegistryLayer:Type:AWS::Serverless::LayerVersionProperties:LayerName:glue-schema-registry-layerContentUri:glue-schema-registry-layer.zipCompatibleRuntimes:-python3.12# Glue RegistryGlueRegistry:Type:AWS::Glue::RegistryProperties:Name:orders-registry# Lambda FunctionConsumerFunction:Type:AWS::Serverless::FunctionProperties:FunctionName:kafka-order-consumerRuntime:python3.12Handler:consumer.lambda_handlerCodeUri:.Layers:-!RefGlueSchemaRegistryLayerVpcConfig:SubnetIds:-!RefPrivateSubnet1SecurityGroupIds:-!RefLambdaSecurityGroupEnvironment:Variables:GLUE_REGISTRY_ARN:!RefGlueRegistryEvents:KafkaEvent:Type:SelfManagedKafkaProperties:KafkaBootstrapServers:-!Sub"${EC2PrivateIp}:9092"Topics:-ordersStartingPosition:LATESTSourceAccessConfigurations:-Type:VPC_SUBNETURI:!RefPrivateSubnet1-Type:VPC_SECURITY_GROUPURI:!RefLambdaSecurityGroupPolicies:-Statement:-Sid:GlueAccessEffect:AllowAction:-glue:GetRegistry-glue:GetSchemaVersion-glue:GetSchemaByDefinition-glue:GetSchemaResource:"*"

由于中国区不支持 ESM 级别的 Schema Registry 自动验证,Lambda 需要手动集成 Glue Schema Registry 进行消息反序列化。

Lambda 需要包含aws-glue-schema-registry库。创建 Layer:

# 在本地创建 Layermkdir-player/python pipinstall-tlayer/python aws-glue-schema-registry boto3cdlayer&&zip-r../glue-schema-registry-layer.zip.

在 SAM 模板中引用:

Layers:-!RefGlueSchemaRegistryLayer

部署命令

# 构建sam build# 部署sam deploy --resolve-s3 --no-confirm-changeset

部署资源如下

Lambda代码示例

Handler 代码如下

importjsonimportbase64importosimportloggingimportboto3fromaws_schema_registryimportSchemaRegistryClient,KafkaDeserializer# 初始化(在 handler 外部,避免每次调用重新初始化)logger=logging.getLogger()logger.setLevel(os.getenv('LOG_LEVEL','INFO'))glue_client=boto3.client('glue',region_name='cn-north-1')registry_name='orders-registry'# Schema Registry 客户端(延迟初始化)schema_client=Nonedeserializer=Nonedefget_deserializer():"""延迟初始化 deserializer"""globalschema_client,deserializerifdeserializerisNone:schema_client=SchemaRegistryClient(glue_client,registry_name)deserializer=KafkaDeserializer(schema_client)returndeserializerdeflambda_handler(event,context):""" 处理 Kafka 事件,使用 Glue Schema Registry 反序列化 Avro 消息. 支持两种消息格式: 1. Avro 格式(带 schema ID 前缀)- 使用 Glue Schema Registry 反序列化 2. JSON 格式 - 直接解析 """logger.info(f"Event source:{event.get('eventSource')}")results=[]batch_item_failures=[]records_by_topic=event.get('records',{})fortopic_partition,recordsinrecords_by_topic.items():logger.info(f"Processing{topic_partition}:{len(records)}records")forrecordinrecords:try:topic=record.get('topic','unknown')partition=record.get('partition',-1)offset=record.get('offset',-1)value_b64=record.get('value','')ifnotvalue_b64:value={}else:value_bytes=base64.b64decode(value_b64)# 尝试 Avro 反序列化try:deser=get_deserializer()decoded=deser.deserialize(topic,value_bytes)value=decoded.data logger.info(f"[{topic}] p={partition}o={offset}(Avro) data={value}")exceptExceptionasavro_err:# 回退到 JSON 解析try:value=json.loads(value_bytes.decode('utf-8'))logger.info(f"[{topic}] p={partition}o={offset}(JSON) data={value}")exceptExceptionasjson_err:logger.error(f"Failed to deserialize: avro={avro_err}, json={json_err}")raiseavro_err# 处理业务逻辑process_order(value)results.append({'recordId':record.get('recordId',''),'result':'Ok','data':value_b64})exceptExceptionase:logger.error(f"Failed to process record:{e}")batch_item_failures.append({'itemIdentifier':str(record.get('offset'))})ifbatch_item_failures:return{'batchItemFailures':batch_item_failures}return{'records':results}defprocess_order(order:dict):"""业务处理逻辑"""order_id=order.get('order_id')logger.info(f"Processing order:{order_id}")

Producer 使用aws-glue-schema-registry库序列化 Avro 消息:

#!/usr/bin/env python3importuuidfromdatetimeimportdatetime,timezoneimportboto3fromaws_schema_registryimportSchemaRegistryClient,KafkaSerializer,DataAndSchemafromaws_schema_registry.avroimportAvroSchemafromconfluent_kafkaimportProducer REGISTRY_NAME="orders-registry"BOOTSTRAP_SERVERS="172.31.1.2:9092"TOPIC="orders"AVRO_SCHEMA=""" { "type": "record", "name": "Order", "namespace": "com.example.orders", "fields": [ {"name": "order_id", "type": "string"}, {"name": "customer_id", "type": "string"}, {"name": "amount", "type": "double"}, {"name": "status", "type": "string"}, {"name": "created_at", "type": "string"} ] } """defmain():glue=boto3.client("glue",region_name="cn-north-1")schema_client=SchemaRegistryClient(glue,registry_name=REGISTRY_NAME)serializer=KafkaSerializer(schema_client)producer=Producer({"bootstrap.servers":BOOTSTRAP_SERVERS})foriinrange(3):order={"order_id":f"avro-{uuid.uuid4().hex[:8]}","customer_id":f"cust-{(i%5)+1:03d}","amount":round(100.0+i*10.5,2),"status":"pending","created_at":datetime.now(timezone.utc).isoformat(),}print(f"Sending Avro message{i+1}:{order['order_id']}")schema=AvroSchema(AVRO_SCHEMA.strip())serialized=serializer.serialize(TOPIC,DataAndSchema(data=order,schema=schema),)producer.produce(topic=TOPIC,value=serialized,callback=lambdaerr,msg:print(f"Delivered to{msg.topic()}[{msg.partition()}]"ifnoterrelsef"Failed:{err}"),)producer.poll(0)producer.flush()print(f"\nSent 3 Avro messages to{TOPIC}")if__name__=="__main__":main()

注意aws-glue-schema-registry会自动在 Glue 中创建{topic}-value命名的 schema(如orders-value),而非使用 SAM 创建的order-schema

kafka部署

使用 KRaft 模式:

services:kafka:image:confluentinc/cp-kafka:7.5.0container_name:kafkaports:-"9092:9092"environment:KAFKA_PROCESS_ROLES:"broker,controller"KAFKA_NODE_ID:"1"KAFKA_CONTROLLER_QUORUM_VOTERS:"1@kafka:9093"KAFKA_CONTROLLER_LISTENER_NAMES:"CONTROLLER"KAFKA_LISTENERS:"INTERNAL://:9092,CONTROLLER://:9093"KAFKA_ADVERTISED_LISTENERS:"INTERNAL://${PRIVATE_IP}:9092"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:"INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"KAFKA_INTER_BROKER_LISTENER_NAME:"INTERNAL"KAFKA_AUTO_CREATE_TOPICS_ENABLE:"false"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:"1"CLUSTER_ID:"MkU3OEVBNTcwNTJENDM2Qk"volumes:-kafka-data:/var/lib/kafka/datavolumes:kafka-data:

生产和消费

发送测试消息

# JSON 格式(用于基础测试)sudodockerexeckafkabash-c'echo "{\"order_id\":\"test-001\",\"customer_id\":\"cust-001\",\"amount\":99.99,\"status\":\"test\"}" | kafka-console-producer --bootstrap-server localhost:9092 --topic orders'# Avro 格式~/.local/bin/uv run python producer_avro.py

查看 Lambda 日志

aws--regioncn-north-1 logstail/aws/lambda/kafka-order-consumer--since2m--formatshort

成功日志如下

Event source: SelfManagedKafka Processing orders-1: 2 records Fetching schema version 498aaebe-e863-48c3-b330-fcc3940ea57d... [orders] p=1 o=6 (Avro) data={'order_id': 'avro-9591de65', 'customer_id': 'cust-002', 'amount': 110.5, 'status': 'pending', 'created_at': '2026-05-24T10:54:40.799489+00:00'} Processing order: avro-9591de65

日志截图

http://www.gsyq.cn/news/1372492.html

相关文章:

  • 2026 四川 H 型钢优质供应商推荐|盛世钢联全品类现货批发,生产厂家与采购指南 - 四川盛世钢联营销中心
  • 上海嘉定区宸智雅筑装饰官方联系方式 合作电话 官方网站官网 - 元点智创
  • CoolProp热物理计算终极指南:从入门到精通的热力学工具
  • AutoCut视频剪辑神器:用文本编辑快速剪切视频的完整指南
  • 静电筛选与机器学习势函数加速:高通量预测材料分裂空位缺陷
  • 不变性学习自适应算法:从VC维到样本效率的理论与实践
  • 机器学习优化3D打印热电材料:从墨水配方到性能闭环
  • 品牌生死局——2026GEO优化公司全景测评必选指南 - GEO优化
  • equalsIgnoreCase忽略大小写直接对比
  • 2026年4月比较好的测漏公司推荐,地暖管道清洗/墙面测漏/墙面漏水维修/水管测漏/厨房漏水维修,测漏企业推荐 - 品牌推荐师
  • 通过Hermes Agent对接Taotoken自定义模型提供方
  • 2026 四川螺纹钢优质供应商推荐|盛世钢联全品类现货批发,价格行情与采购指南 - 四川盛世钢联营销中心
  • 【稻米计数】基于matlab形态学稻米计数【含Matlab源码 15562期】
  • LinkSwift网盘直链下载助手:一站式解决9大网盘下载难题
  • C#学习(26_05_24)
  • 环境变量助手
  • 2026论文写作工具红黑榜:AI论文工具怎么选?别再瞎找了!
  • 2026年亲测一键生成论文工具指南(高效定稿版)
  • 云计算成本优化与管理
  • C++ - 面向对象 - virtual、虚函数与纯虚函数
  • 如何快速实现网盘下载加速:终极网盘直链下载助手指南
  • 上海篇:2026上海企业GEO优化实力榜单与全意图方法论解码 - GEO优化
  • 【教育科技爆款内容生产核心】:用ChatGPT批量生成带答案解析+难度分级+认知维度标签的脑筋急转弯(附可商用JSON Schema)
  • 利用Taotoken实现多模型备选方案以提升业务连续性
  • 开源AI工具真能替代商业方案?2024最新Benchmark数据揭示92%团队忽略的关键短板
  • 别错过机会!2026亲测靠谱的AI论文写作工具|避坑版
  • 从零开发游戏需要学习的c#模块,第二十三章(存档与高分系统)
  • 2026年一键生成论文工具实测精选:5款神器从构思到提交全流程护航
  • 【图像压缩】基于ADMM的卷积稀疏编码高效算法Matlab实现
  • 2026杭州GEO优化公司深度评测:从“流量收割”到“全意图增长”的战略选型指南 - GEO优化