当前位置: 首页 > news >正文

Kafka 消息推拉

1. 背景与需求

1.1 业务场景

阶段 场景 说明
当前 DLQ 预览 + 选择性重投 从不同业务的死信 Topic 拉取消息,人工预览后,将选中消息重投到原业务 Topic
后续 定时任务批量重投 按已提交 offset 增量拉取,自动重投
后续 多业务多 Topic Topic 名从数据库读取,本轮由调用方传入

1.2 本轮范围

做:

  • 引入 org.apache.kafka:kafka-clients 依赖
  • KafkaMessageUtils 推拉能力(拉取预览、发送重投、offset 提交)
  • 配置类、业务对象、错误码、单元测试

不做:

  • Controller / API 接口
  • 定时任务
  • 从数据库查 Topic 名
  • 消息业务解析与转换

2. 设计思想

2.1 方案选型

方案 描述 优点 缺点 结论
A(采用) KafkaMessageUtils + 短生命周期 Consumer + 单例 Producer 简单、预览不污染 offset、符合项目 XxxUtils 规范 高频拉取有连接开销 ✅ 推荐
B 长连接池化 Consumer/Producer 性能好 生命周期与并发复杂,本轮过度设计
C 引入 spring-kafka Spring 生态完整 与用户要求的原生 kafka-clients 不符,依赖更重

2.2 核心设计原则

(1)预览与消费分离

DLQ 预览的核心诉求是「看得到、不丢、可重复看」:

  • Consumer 设置 enable.auto.commit = false
  • pullMessages 只 poll、不 commit
  • 重投成功后由调用方显式调用 commitOffsets,避免误消费

(2)推拉职责清晰

pullMessages(dlqTopic)  →  业务筛选  →  sendMessages(originalTopic, selected)  →  commitOffsets(dlqTopic, selected)
  • :只读,返回带 partition + offsetKafkaMessageBO,供选择性重投定位
  • :保留原 keyheadersvalue 原样转发,不做反序列化
  • 提交:按分区取最大 offset + 1 提交,支持批量重投后一次性推进

(3)Client 生命周期

组件 策略 原因
KafkaConsumer 每次 pull / commitOffsets 临时创建,try-with-resources 关闭 预览为低频操作,避免长连接状态管理
KafkaProducer 应用内懒加载单例,@PreDestroy 关闭 重投可能批量发生,复用连接更高效

(4)多业务 Topic 隔离

  • Topic 名由调用方传入(后续从 DB 查)
  • Consumer Group ID 规则:{groupIdPrefix}-{topic},不同 DLQ Topic 的 offset 互不影响

(5)增量拉取预留

KafkaPullOptionsBO.fromBeginning

  • true(默认):seekToBeginning,适合人工全量预览
  • false:从当前 group 已提交 offset 继续读,适合后续定时任务增量重投

(6)异常与日志

遵循项目 BizException 规范:

场景 处理方式
参数非法(topic 为空、消息列表为空等) new BizException(PARAM_ERROR, msg),无 cause
Kafka 连接/拉取/发送/提交失败 BizException.wrap(KAFKA_OPERATION_FAILED, 固定文案, e)
中间层 log.warn 一行场景日志,不打堆栈

新增错误码:KAFKA_OPERATION_FAILED(2004, "Kafka操作失败")

(7)命名与分层

类型 类名 包路径
Spring 组件工具类 KafkaMessageUtils com.liang.learn.util
配置 KafkaProperties com.liang.learn.properties
业务中间对象 KafkaMessageBOKafkaPullOptionsBO com.liang.learn.model.bo

3. 架构与数据流

sequenceDiagramparticipant Caller as 调用方 Serviceparticipant Utils as KafkaMessageUtilsparticipant DLQ as DLQ Topicparticipant Biz as 业务 TopicCaller->>Utils: pullMessages(dlqTopic, options)Utils->>DLQ: poll(auto.commit=false)DLQ-->>Utils: ConsumerRecordsUtils-->>Caller: List<KafkaMessageBO>Note over Caller: 人工筛选待重投消息Caller->>Utils: sendMessages(originalTopic, selected)Utils->>Biz: Producer send(保留 key/headers)Caller->>Utils: commitOffsets(dlqTopic, selected)Utils->>DLQ: commitSync(offset+1)

3.1 类职责

职责
KafkaProperties Broker 地址、Consumer/Producer 默认参数
KafkaMessageBO 单条消息载体(含 partition/offset 供定位与提交)
KafkaPullOptionsBO 拉取条数、超时、是否从头预览
KafkaMessageUtils 推拉与 offset 提交的具体实现

4. 配置说明

4.1 Maven 依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

版本由 Spring Boot BOM 管理,无需显式指定。

4.2 application.yaml

app:kafka:bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:127.0.0.1:9092}consumer:group-id-prefix: learn-dlq-previewauto-offset-reset: earliestmax-poll-records: 100poll-timeout-ms: 3000producer:acks: allretries: 3
配置项 默认值 说明
bootstrap-servers 127.0.0.1:9092 Broker 地址,支持环境变量覆盖
consumer.group-id-prefix learn-dlq-preview 预览 Consumer Group 前缀
consumer.auto-offset-reset earliest 无已提交 offset 时从最早可读位置开始
consumer.max-poll-records 100 单次拉取上限
consumer.poll-timeout-ms 3000 poll 超时(毫秒)
producer.acks all 重投可靠性
producer.retries 3 发送重试次数

5. API 说明

5.1 KafkaMessageUtils

// 预览 DLQ,不提交 offset
List<KafkaMessageBO> pullMessages(String topic, KafkaPullOptionsBO options);// 重投单条 / 批量(保留 key、headers)
void sendMessage(String targetTopic, KafkaMessageBO message);
void sendMessages(String targetTopic, List<KafkaMessageBO> messages);// 重投成功后提交 DLQ offset
void commitOffsets(String dlqTopic, List<KafkaMessageBO> messages);

5.2 典型用法

@Autowired
private KafkaMessageUtils kafkaMessageUtils;public void replaySelectedDlq(String dlqTopic, String originalTopic) {// 1. 预览List<KafkaMessageBO> all = kafkaMessageUtils.pullMessages(dlqTopic, null);// 2. 业务筛选(示例:按 key 过滤)List<KafkaMessageBO> selected = all.stream().filter(m -> "retry-me".equals(m.getKey())).toList();if (selected.isEmpty()) {return;}// 3. 重投kafkaMessageUtils.sendMessages(originalTopic, selected);// 4. 提交 offsetkafkaMessageUtils.commitOffsets(dlqTopic, selected);
}

5.3 增量拉取(定时任务预留)

KafkaPullOptionsBO options = new KafkaPullOptionsBO();
options.setFromBeginning(false);
List<KafkaMessageBO> incremental = kafkaMessageUtils.pullMessages(dlqTopic, options);

6. 文件清单

文件 说明
pom.xml 新增 kafka-clients 依赖
application.yaml 新增 app.kafka 配置
ErrorCode.java 新增 KAFKA_OPERATION_FAILED
KafkaProperties.java Kafka 配置属性
KafkaMessageBO.java 消息业务对象
KafkaPullOptionsBO.java 拉取参数
KafkaMessageUtils.java 推拉工具类
KafkaMessageUtilsTest.java 单元测试

7. 源码

7.1 KafkaProperties

package com.liang.learn.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** Kafka 连接与推拉行为配置** @author liang* @since 2026-06-10 15:02:18*/
@Data
@Component
@ConfigurationProperties(prefix = "app.kafka")
public class KafkaProperties {/*** Kafka Broker 地址,多个以逗号分隔*/private String bootstrapServers;/*** Consumer 相关配置*/private Consumer consumer = new Consumer();/*** Producer 相关配置*/private Producer producer = new Producer();/*** Kafka Consumer 配置项*/@Datapublic static class Consumer {/*** DLQ 预览 Consumer Group 前缀,实际 groupId 为 prefix-topic*/private String groupIdPrefix = "learn-dlq-preview";/*** 无已提交 offset 时的起始策略,预览场景默认 earliest*/private String autoOffsetReset = "earliest";/*** 单次 poll 最大记录数*/private int maxPollRecords = 100;/*** poll 等待超时(毫秒)*/private long pollTimeoutMs = 3000L;}/*** Kafka Producer 配置项*/@Datapublic static class Producer {/*** 发送确认级别,重投场景默认 all*/private String acks = "all";/*** 发送失败重试次数*/private int retries = 3;}
}

7.2 KafkaMessageBO

package com.liang.learn.model.bo;import lombok.Data;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;/*** Kafka 单条消息业务对象,用于 DLQ 预览与选择性重投** @author liang* @since 2026-06-10 15:02:18*/
@Data
public class KafkaMessageBO {/*** 消息所在 Topic(预览时为 DLQ Topic)*/private String topic;/*** 分区号*/private int partition;/*** 分区内 offset*/private long offset;/*** 消息 Key,可为 null*/private String key;/*** 消息体*/private String value;/*** 消息头,重投时原样保留*/private Map<String, String> headers = new HashMap<>();/*** Broker 记录时间戳(毫秒)*/private long timestamp;/*** 返回不可变 headers 视图,避免外部修改内部状态** @return headers 只读副本*/public Map<String, String> getHeaders() {if (headers == null || headers.isEmpty()) {return Collections.emptyMap();}return Collections.unmodifiableMap(headers);}/*** 设置消息头,null 时置为空 Map** @param headers 消息头*/public void setHeaders(Map<String, String> headers) {if (headers == null || headers.isEmpty()) {this.headers = new HashMap<>();return;}this.headers = new HashMap<>(headers);}
}

7.3 KafkaPullOptionsBO

package com.liang.learn.model.bo;import lombok.Data;/*** Kafka 拉取(预览)参数** @author liang* @since 2026-06-10 15:02:18*/
@Data
public class KafkaPullOptionsBO {/*** 本次最多拉取条数,null 时使用配置默认值*/private Integer maxRecords;/*** poll 超时(毫秒),null 时使用配置默认值*/private Long pollTimeoutMs;/*** true 从各分区最早可读 offset 预览;false 从当前 group 已提交 offset 继续读*/private Boolean fromBeginning = Boolean.TRUE;
}

7.4 KafkaMessageUtils

package com.liang.learn.util;import com.liang.learn.exception.BizException;
import com.liang.learn.model.bo.KafkaMessageBO;
import com.liang.learn.model.bo.KafkaPullOptionsBO;
import com.liang.learn.model.enums.ErrorCode;
import com.liang.learn.properties.KafkaProperties;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Kafka 消息推拉工具类,面向 DLQ 预览与选择性重投场景** @author liang* @since 2026-06-10 15:02:18*/
@Slf4j
@Component
public class KafkaMessageUtils {private static final String STRING_SERIALIZER = StringSerializer.class.getName();private static final String STRING_DESERIALIZER = StringDeserializer.class.getName();private static final String GROUP_TOPIC_SEPARATOR = "-";private final KafkaProperties kafkaProperties;private volatile KafkaProducer<String, String> producer;public KafkaMessageUtils(KafkaProperties kafkaProperties) {this.kafkaProperties = kafkaProperties;}public List<KafkaMessageBO> pullMessages(String topic, KafkaPullOptionsBO options) {checkTopic(topic);checkBootstrapServers();KafkaPullOptionsBO resolvedOptions = resolvePullOptions(options);Properties consumerProps = buildConsumerProperties(topic);try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {List<TopicPartition> partitions = listTopicPartitions(consumer, topic);if (partitions.isEmpty()) {return Collections.emptyList();}return pollMessages(consumer, partitions, topic, resolvedOptions);} catch (BizException e) {throw e;} catch (Exception e) {log.warn("Kafka 消息拉取失败, topic={}", topic);throw BizException.wrap(ErrorCode.KAFKA_OPERATION_FAILED, "Kafka消息拉取失败", e);}}public void sendMessage(String targetTopic, KafkaMessageBO message) {checkTopic(targetTopic);checkMessage(message);sendMessages(targetTopic, List.of(message));}public void sendMessages(String targetTopic, List<KafkaMessageBO> messages) {checkTopic(targetTopic);checkMessagesNotEmpty(messages);checkBootstrapServers();KafkaProducer<String, String> activeProducer = getOrCreateProducer();try {for (KafkaMessageBO message : messages) {checkMessage(message);ProducerRecord<String, String> record = buildProducerRecord(targetTopic, message);activeProducer.send(record).get();}activeProducer.flush();} catch (InterruptedException e) {Thread.currentThread().interrupt();log.warn("Kafka 消息发送被中断, targetTopic={}", targetTopic);throw BizException.wrap(ErrorCode.KAFKA_OPERATION_FAILED, "Kafka消息发送失败", e);} catch (ExecutionException e) {log.warn("Kafka 消息发送失败, targetTopic={}", targetTopic);throw BizException.wrap(ErrorCode.KAFKA_OPERATION_FAILED, "Kafka消息发送失败", e.getCause());}}public void commitOffsets(String dlqTopic, List<KafkaMessageBO> messages) {checkTopic(dlqTopic);checkMessagesNotEmpty(messages);checkBootstrapServers();Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = buildOffsetsToCommit(dlqTopic, messages);Properties consumerProps = buildConsumerProperties(dlqTopic);try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {consumer.assign(new ArrayList<>(offsetsToCommit.keySet()));consumer.commitSync(offsetsToCommit);} catch (BizException e) {throw e;} catch (Exception e) {log.warn("Kafka offset 提交失败, dlqTopic={}", dlqTopic);throw BizException.wrap(ErrorCode.KAFKA_OPERATION_FAILED, "Kafka offset 提交失败", e);}}@PreDestroypublic void destroy() {if (producer != null) {producer.close();producer = null;}}private KafkaPullOptionsBO resolvePullOptions(KafkaPullOptionsBO options) {KafkaPullOptionsBO resolved = options == null ? new KafkaPullOptionsBO() : options;if (resolved.getMaxRecords() == null) {resolved.setMaxRecords(kafkaProperties.getConsumer().getMaxPollRecords());}if (resolved.getPollTimeoutMs() == null) {resolved.setPollTimeoutMs(kafkaProperties.getConsumer().getPollTimeoutMs());}if (resolved.getFromBeginning() == null) {resolved.setFromBeginning(Boolean.TRUE);}if (resolved.getMaxRecords() <= 0) {throw new BizException(ErrorCode.PARAM_ERROR, "maxRecords 必须大于 0");}if (resolved.getPollTimeoutMs() <= 0) {throw new BizException(ErrorCode.PARAM_ERROR, "pollTimeoutMs 必须大于 0");}return resolved;}private List<TopicPartition> listTopicPartitions(KafkaConsumer<String, String> consumer, String topic) {List<org.apache.kafka.common.PartitionInfo> partitionInfos = consumer.partitionsFor(topic);if (partitionInfos == null || partitionInfos.isEmpty()) {return Collections.emptyList();}List<TopicPartition> partitions = new ArrayList<>(partitionInfos.size());for (org.apache.kafka.common.PartitionInfo partitionInfo : partitionInfos) {partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));}return partitions;}private List<KafkaMessageBO> pollMessages(KafkaConsumer<String, String> consumer,List<TopicPartition> partitions,String topic,KafkaPullOptionsBO options) {consumer.assign(partitions);if (Boolean.TRUE.equals(options.getFromBeginning())) {consumer.seekToBeginning(partitions);}List<KafkaMessageBO> result = new ArrayList<>();long deadline = System.currentTimeMillis() + options.getPollTimeoutMs();while (result.size() < options.getMaxRecords() && System.currentTimeMillis() < deadline) {long remaining = deadline - System.currentTimeMillis();if (remaining <= 0) {break;}ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(remaining));if (records.isEmpty()) {break;}for (ConsumerRecord<String, String> record : records) {result.add(toMessageBO(record));if (result.size() >= options.getMaxRecords()) {return result;}}}return result;}private KafkaMessageBO toMessageBO(ConsumerRecord<String, String> record) {KafkaMessageBO message = new KafkaMessageBO();message.setTopic(record.topic());message.setPartition(record.partition());message.setOffset(record.offset());message.setKey(record.key());message.setValue(record.value());message.setHeaders(extractHeaders(record.headers()));message.setTimestamp(record.timestamp());return message;}private ProducerRecord<String, String> buildProducerRecord(String targetTopic, KafkaMessageBO message) {ProducerRecord<String, String> record =new ProducerRecord<>(targetTopic, null, message.getKey(), message.getValue());applyHeaders(record, message.getHeaders());return record;}private Map<TopicPartition, OffsetAndMetadata> buildOffsetsToCommit(String dlqTopic,List<KafkaMessageBO> messages) {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();for (KafkaMessageBO message : messages) {if (message.getPartition() < 0) {throw new BizException(ErrorCode.PARAM_ERROR, "消息 partition 非法");}if (message.getOffset() < 0) {throw new BizException(ErrorCode.PARAM_ERROR, "消息 offset 非法");}TopicPartition topicPartition = new TopicPartition(dlqTopic, message.getPartition());OffsetAndMetadata nextOffset = new OffsetAndMetadata(message.getOffset() + 1);offsets.merge(topicPartition, nextOffset, (existing, incoming) ->existing.offset() >= incoming.offset() ? existing : incoming);}return offsets;}private Map<String, String> extractHeaders(Headers headers) {if (headers == null) {return Collections.emptyMap();}Map<String, String> headerMap = new LinkedHashMap<>();for (Header header : headers) {byte[] value = header.value();String text = value == null ? null : new String(value, StandardCharsets.UTF_8);headerMap.put(header.key(), text);}return headerMap;}private void applyHeaders(ProducerRecord<String, String> record, Map<String, String> headers) {if (headers == null || headers.isEmpty()) {return;}for (Map.Entry<String, String> entry : headers.entrySet()) {String value = entry.getValue();byte[] bytes = value == null ? null : value.getBytes(StandardCharsets.UTF_8);record.headers().add(entry.getKey(), bytes);}}private KafkaProducer<String, String> getOrCreateProducer() {if (producer == null) {synchronized (this) {if (producer == null) {producer = new KafkaProducer<>(buildProducerProperties());}}}return producer;}private Properties buildConsumerProperties(String topic) {KafkaProperties.Consumer consumerConfig = kafkaProperties.getConsumer();Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, buildConsumerGroupId(topic));props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.getAutoOffsetReset());props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerConfig.getMaxPollRecords());return props;}private Properties buildProducerProperties() {KafkaProperties.Producer producerConfig = kafkaProperties.getProducer();Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);props.put(ProducerConfig.ACKS_CONFIG, producerConfig.getAcks());props.put(ProducerConfig.RETRIES_CONFIG, producerConfig.getRetries());return props;}private String buildConsumerGroupId(String topic) {return kafkaProperties.getConsumer().getGroupIdPrefix() + GROUP_TOPIC_SEPARATOR + topic;}private void checkTopic(String topic) {if (!StringUtils.hasText(topic)) {throw new BizException(ErrorCode.PARAM_ERROR, "Topic 不能为空");}}private void checkBootstrapServers() {if (!StringUtils.hasText(kafkaProperties.getBootstrapServers())) {throw new BizException(ErrorCode.PARAM_ERROR, "Kafka bootstrap-servers 未配置");}}private void checkMessage(KafkaMessageBO message) {if (message == null) {throw new BizException(ErrorCode.PARAM_ERROR, "消息不能为空");}}private void checkMessagesNotEmpty(List<KafkaMessageBO> messages) {if (messages == null || messages.isEmpty()) {throw new BizException(ErrorCode.PARAM_ERROR, "消息列表不能为空");}}
}

7.5 ErrorCode 新增项

/*** Kafka 操作失败(拉取、发送、offset 提交等)*/
KAFKA_OPERATION_FAILED(2004, "Kafka操作失败"),

7.6 KafkaMessageUtilsTest

package com.liang.learn.util;import com.liang.learn.exception.BizException;
import com.liang.learn.model.bo.KafkaMessageBO;
import com.liang.learn.model.bo.KafkaPullOptionsBO;
import com.liang.learn.model.enums.ErrorCode;
import com.liang.learn.properties.KafkaProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;import java.util.Collections;
import java.util.List;
import java.util.Map;import static org.junit.jupiter.api.Assertions.*;class KafkaMessageUtilsTest {private KafkaMessageUtils kafkaMessageUtils;@BeforeEachvoid setUp() {KafkaProperties kafkaProperties = new KafkaProperties();kafkaProperties.setBootstrapServers("127.0.0.1:9092");kafkaMessageUtils = new KafkaMessageUtils(kafkaProperties);}@Testvoid pullMessages_blankTopic_shouldThrowParamError() {BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.pullMessages(" ", null));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("Topic"));}@Testvoid pullMessages_missingBootstrapServers_shouldThrowParamError() {KafkaProperties kafkaProperties = new KafkaProperties();KafkaMessageUtils utils = new KafkaMessageUtils(kafkaProperties);BizException exception = assertThrows(BizException.class,() -> utils.pullMessages("dlq-topic", null));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("bootstrap-servers"));}@Testvoid pullOptions_invalidMaxRecords_shouldThrowParamError() {KafkaPullOptionsBO options = new KafkaPullOptionsBO();options.setMaxRecords(0);BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.pullMessages("dlq-topic", options));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("maxRecords"));}@Testvoid sendMessage_nullMessage_shouldThrowParamError() {BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.sendMessage("target-topic", null));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("消息"));}@Testvoid sendMessages_emptyList_shouldThrowParamError() {BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.sendMessages("target-topic", Collections.emptyList()));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("消息列表"));}@Testvoid commitOffsets_emptyMessages_shouldThrowParamError() {BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.commitOffsets("dlq-topic", List.of()));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("消息列表"));}@Testvoid commitOffsets_invalidOffset_shouldThrowParamError() {KafkaMessageBO message = new KafkaMessageBO();message.setPartition(0);message.setOffset(-1L);BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.commitOffsets("dlq-topic", List.of(message)));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("offset"));}@Testvoid kafkaMessageBO_headers_shouldBeImmutableView() {KafkaMessageBO message = new KafkaMessageBO();message.setHeaders(Map.of("original-topic", "biz-topic"));Map<String, String> headers = message.getHeaders();assertEquals("biz-topic", headers.get("original-topic"));assertThrows(UnsupportedOperationException.class, () -> headers.put("k", "v"));}
}

8. 测试与验证

mvn test -Dtest=KafkaMessageUtilsTest
mvn test

当前测试覆盖:

  • Topic / bootstrap-servers 参数校验
  • maxRecords 非法值校验
  • 发送 / 提交时空消息列表校验
  • offset 非法值校验
  • KafkaMessageBO.headers 不可变视图

集成测试(连接真实 Kafka)未纳入本轮,可在后续接 Testcontainers 或内嵌 Kafka 补充。


9. 后续扩展建议

方向 说明
DLQ 预览 API 增加 Controller:/api/kafka/dlq/preview/api/kafka/dlq/replay
DB 查 Topic Service 层按业务 ID 查 DLQ / 原 Topic 映射,再调 KafkaMessageUtils
定时重投 @Scheduled + fromBeginning=false 增量拉取 + 全量重投 + commit
消息转换 按业务需要增加 JSON 反序列化、过滤、脱敏等中间层,不侵入 Utils
安全认证 KafkaProperties 扩展 SASL/SSL 配置项

10. 讨论记录摘要

问题 结论
主要场景 DLQ 预览 → 选择性重投;后续定时重投
重投方式 重投原 Topic,保留 key/headers
DLQ Topic 多业务多 Topic,Topic 名由调用方传入(后续 DB)
本轮范围 仅基础设施,不含 API / 定时任务 / DB
依赖选型 原生 kafka-clients,不用 spring-kafka
工具类命名 KafkaMessageUtils(Spring 组件,后缀 Utils
http://www.gsyq.cn/news/1499783.html

相关文章:

  • 2026年 医药品牌升级推荐榜:聚焦战略、视觉与信任重塑的全案解析及优质服务商盘点 - 品牌发掘
  • GEE 时间序列合成、时序线性插值与SG滤波
  • VSCode配置STM32开发环境避坑指南:从编译报错到调试成功,我踩过的那些坑
  • WAN2.2 All In One终极指南:8GB显存快速生成AI视频的完整教程
  • 远郊覆盖榜:北京远郊收酒不额外收费六家 - 光耀华夏品牌榜
  • 2026年6月最新版毫州第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • 2026年6月最新版广安第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • CH341A/B USB转USART/I2C/SPI介绍
  • 2026年喜铺推荐排行榜:广东喜铺/爱哆哆喜铺/红娘喜铺/婚庆策划喜铺/婚庆服务喜铺/婚礼喜铺精选 - 品牌发掘
  • AnimeGAN2-Pytorch图像动漫化指南:三步实现照片转动漫风格
  • Unity毛发系统常见问题解决:10个常见错误与修复方法
  • 项目三简易计算器 任务3-6六位密码锁2
  • 2026年6月最新版湖州第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • 2026年6月最新版黑河第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • CodeX Docs配置指南:轻松自定义你的免费文档系统
  • LLM Cursor Rules开源项目贡献指南:如何参与这个AI编程规则库
  • 如何快速掌握QMK固件:机械键盘爱好者的完整入门指南
  • Pixelle-Video终极指南:5个简单步骤让AI自动生成专业短视频
  • Lune测试与调试:单元测试、集成测试与性能分析完全指南
  • LLPlayer语言学习播放器终极指南:从零到精通掌握AI学习工具
  • 2026年6月最新版德州第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • Android11下APK调用USB serialn每次开机弹窗问题
  • JBrowserDriver vs 传统浏览器驱动:为什么纯Java无头方案更适合自动化测试?
  • TanStack Ranger:打造现代化滑块组件的终极无头UI解决方案
  • sublime-phpcs与版本控制集成:提交代码前自动检查的实现方法
  • 2026年6月最新版固原第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • 2026深圳拆装搬家服务专业服务商推荐:家具/空调/热水器专业拆移搬迁一站式服务 - 从来都是英雄出少年
  • 扎根青岛24年!本土老牌防水楼长修楼真实测评 - 青岛防水品牌推荐
  • 邮件配置与测试:awesome-checker-services邮件相关检查工具完全手册
  • Baserow企业级无代码平台:生产环境架构解析与高性能部署指南