当前位置: 首页 > news >正文

Kafka 和springboot 整合Logback日志

Spring Boot 整合 Kafka 进行日志处理是一个常见的任务,可以帮助你更好地管理和分析应用程序的日志。以下是一个基本的步骤指南,帮助你完成这个整合。

本文介绍了如何在SpringBoot应用中整合Kafka,利用Logback收集日志并发送到Kafka。详细步骤包括添加Kafka相关依赖,配置logback.xml,自定义KafkaAppender以及提供测试。

首先安装kafka.。之前已经安装过了,我们这里不再讲。,可以参考前面讲的类容:

https://blog.csdn.net/lchmyhua88/article/details/155640953?spm=1001.2014.3001.5502

接着我们按照下面的步骤开始:

  • 1.pom依赖
  • 2.logback.xml配置
  • 3.配置 kafka消费者
  • 4.监听消费消息&测试代码

1. 添加依赖

<!-- Spring Boot Starter Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Logback Kafka Appender --> <dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> <version>0.2.0-RC2</version> </dependency> <!-- Kafka Clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>

2.配置 Logback

<configuration> <!-- 控制台输出appender --> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <!-- Kafka appender --> <appender name="KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> <!-- Kafka topic名称 --> <topic>log-topic</topic> <!-- Kafka bootstrap servers --> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" /> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> <!-- Kafka producer配置 --> <producerConfig>bootstrap.servers=192.168.33.10:9092</producerConfig> <producerConfig>acks=0</producerConfig> <producerConfig>linger.ms=1000</producerConfig> <producerConfig>max.block.ms=0</producerConfig> <!-- 过滤器设置(可选) --> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> </appender> <!-- 根logger配置 --> <root level="INFO"> <appender-ref ref="CONSOLE"/> <appender-ref ref="KAFKA"/> </root> </configuration>

3.配置 kafka消费者,为了方便测试我们把生产者消费者都配置上

spring.kafka.bootstrap-servers=192.168.33.10:9092 #spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonDeserializer

4.注入kafka bean ,方便后面调试生产者。

@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }

5.监听消费消息

@Component public class LogConsumer { @KafkaListener(topics = "log-topic",groupId = "my-group") public void listen(String message) { System.out.println("收到日志消息: " + message); } }

6.添加日志打印

@RestController @RequestMapping("/kafka") public class LogController { @Autowired private KafkaProducerServiceImpl producerService; private static final Logger logger = (Logger) LoggerFactory.getLogger(LogController.class); @GetMapping("/log") public String log() { //producerService.sendLogMessage("This is a test log message"); logger.info("这是一条测试日志消息"); logger.warn("这是一条警告日志消息"); logger.error("这是一条错误日志消息"); return "Log message sent"; } }

下面我们开始测试,通过调用接口:

控制台可以看到我写3条日志,kafka消费3条消息

这样我们就通过logback把日志收集到kafka里面。方便数据分析。当然你也可以写入es进行分析画像。

我们可以通过kafka manger控制台来监控消息。

下载kafka-manager,下载地址: https://github.com/yahoo/kafka-manager

解压后配置application.conf kafka地址:

保存后,指定8888端口启动:

bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8888

启动后打开控制面板,可以看到kafka集群节点信息和分区,消费偏移等信息:

http://192.168.33.10:8888/

详情可以去官网看看介绍:

https://github.com/yahoo/CMAK

http://www.gsyq.cn/news/98305.html

相关文章:

  • 黑马点评前125节课遇到的问题及解决方案(在看网课过程中会有很多老师运行成功但我们失败并且老师还不没有讲到的情况,本文致力于解决这个问题,记录了本人在做这个项目的时候遇到的所有问题)
  • 探索多虚拟电厂联合调度优化模型:集中式算法的实践
  • C++ 相对 C 的语法补充:解决痛点,让代码更简洁安全
  • (19)Bean的循环依赖问题
  • 内存条电压
  • Vue的Class绑定对象语法如何让动态类名切换变得直观高效?
  • 25、文本处理工具全解析
  • 中国以食物命名的城市:地域文化与自然馈赠的诗意联结——全国排名第一起名大师颜廷利教授的深度解读
  • 32、深入掌握 Bash 条件测试与流程控制
  • 【保姆级教程】几分钟从零部署 RedInk:一句话生成小红书图文的开源项目完整指南
  • 33、提升程序交互性:键盘输入读取与循环控制
  • python_字幕、音频、媒体文件(图片或视频)一键组合
  • AI+个人品牌:IT人从“技术骨干”到“行业IP”的跃迁密码
  • 基于SpringBoot爬山登山陪爬平台的设计与实现毕业设计项目源码
  • 2025年国内优质的化粪池清掏公司推荐榜,有实力的化粪池清掏厂家永邦环卫专注行业多年经验,口碑良好 - 品牌推荐师
  • 深入解析:【git】多人协作
  • Dots.OCR:多语言文档布局解析的终极解决方案
  • Vue3
  • 20、Swerve详细设计解析
  • 手把手教你学Simulink--机器人基础关节控制场景实例:基于Simulink的BLDC关节方波控制与正弦波控制对比仿真
  • 2025最新深度解析:吉林长春出租车顶灯广告市场主流服务商概览 - 2025年11月品牌推荐榜
  • 深入解析:运筹说145期:从快递到自动驾驶:启发式算法的智慧幕后
  • NetSonar终极指南:如何快速诊断网络问题
  • Springboot美食分享网站a73c9(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • AutoGPT在服装搭配建议系统中的风格迁移应用
  • vue基于Spring Boot框架的光辉家政服务评价系统 保洁员预约系统的设计与实现_s3d3g194
  • 近视
  • 39、高级Shell技巧与特性解析
  • 3D部件处理实战指南:4种核心文件格式的深度应用
  • 10、Ubuntu系统使用指南:从基础设置到多媒体体验