当前位置: 首页 > news >正文

Kafka客户端基础使用

依赖

引入以下依赖

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.1.0</version></dependency>

生产者

  • 文档: https://kafka.apache.org/documentation/#producerapi
  1. 设置服务配置,例如序列化,IP端口等
        final Properties props = new Properties() {{// 服务IP端口配置put(BOOTSTRAP_SERVERS_CONFIG, "study.fedora01.com:9092,study.fedora02.com:9092,study.fedora03.com:9092");// 序列化put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());}};
  1. 发送消息
    2.1 单向发送,不关心是否发送成功
        try (final Producer<String, String> producer = new KafkaProducer<>(props)) {final Random rnd = new Random();final int numMessages = 10;for (int i = 0; i < numMessages; i++) {String user = users[rnd.nextInt(users.length)];String item = items[rnd.nextInt(items.length)];// 单向发送, 不关心是否发送成功producer.send(new ProducerRecord<>(topic, user, item));System.out.printf("%s events were produced to topic %s%n", numMessages, topic);}

2.2 同步发送, 发送完成后获取服务端发送情况,如果一直未发送成功那么就会进行阻塞

     try (final Producer<String, String> producer = new KafkaProducer<>(props)) {final Random rnd = new Random();final int numMessages = 10;for (int i = 0; i < numMessages; i++) {String user = users[rnd.nextInt(users.length)];String item = items[rnd.nextInt(items.length)];// 同步发送, 发送完成后获取服务端发送情况,如果一直未发送成功那么就会进行阻塞RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, user, item),(event, ex) -> {if (ex != null)ex.printStackTrace();elseSystem.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);}).get();}

2.3 异步发送, 回调函数处理异步响应信息

      try (final Producer<String, String> producer = new KafkaProducer<>(props)) {final Random rnd = new Random();final int numMessages = 10;for (int i = 0; i < numMessages; i++) {String user = users[rnd.nextInt(users.length)];String item = items[rnd.nextInt(items.length)];// 异步发送, 回调函数处理异步响应信息producer.send(new ProducerRecord<>(topic, user, item),(event, ex) -> {if (ex != null)ex.printStackTrace();elseSystem.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);});}System.out.printf("%s events were produced to topic %s%n", numMessages, topic);}

消费者

  • 文档: https://kafka.apache.org/documentation/#consumerapi
  1. 设置配置信息
        String topic = "producer-topic";Properties props = new Properties();// 设置服务地址props.put("bootstrap.servers", "study.fedora01.com:9092,study.fedora02.com:9092,study.fedora03.com:9092");// 消费者端需要设置一个group id来消费信息props.put("group.id", "test-group-2"); // 序列化props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);
  1. 监听topic
        consumer.subscribe(Collections.singletonList(topic));System.out.println("开始消费topic: " + topic);
  1. 处理消息,每隔1000毫秒拉取一次
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));if (records.isEmpty()) {System.out.println("还没有读取到消息...");}for (ConsumerRecord<String, String> record : records) {System.out.printf("Key=%s, Value=%s, Partition=%d, Offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}}
http://www.gsyq.cn/news/47463.html

相关文章:

  • 【ArcMap】查看、反转线的方向
  • systemd-timedated.service Dbus参考
  • 2025年比较好的防火岩棉板厂家实力及用户口碑排行榜
  • 2025年口碑好的链条输送机实力厂家TOP推荐榜
  • 传统企业能源管理痛点破解:MyEMS 如何解决 “数据散、分析难、优化慢” 三大核心问题?
  • Magisk体系:Android Root权限的工程化部署方案
  • 2025年口碑好的门式起重机最新TOP厂家排名
  • 2025年11月酶制剂品牌对比榜:五家代表企业深度解析
  • 2025年11月蛋白胨厂家对比榜:五家代表企业综合评测报告
  • 2025年质量好的上海裸眼3DLED显示屏厂家推荐及选购参考榜
  • 2025年专业的nfc标签厂家最新推荐排行榜
  • 2025年11月酵母抽提物品牌推荐:年度榜对比与鲜味性能评价
  • 2025年有实力自建房家用电梯厂家最新TOP排行榜
  • 2025年专业的亚克力制品行业内知名厂家排行榜
  • 2025年11月北京昌平回龙观酒店推荐榜:会议婚宴与运动配套对比榜
  • 2025年评价高的短视频运营最新TOP厂家排名
  • 2025年专业的钢板预处理线优质厂家推荐榜单
  • 2025年专业的肌理板压花机厂家推荐及选购参考榜
  • 2025年专业的负氧离子床垫厂家最新TOP排行榜
  • 2025年知名的F30喷涂四氟TOP品牌厂家排行榜
  • 2025年0.75mm 0.8mm土工布土工膜厂家最新TOP实力排行
  • 2025年质量好的玻璃钢储罐厂家最新TOP实力排行
  • 2025年11月酵母蛋白品牌评测榜:从资质到应用全维度解析
  • 2025年有实力石塑地板生产线厂家推荐及选择指南
  • 2025年纽特舒玛蛋白粉权威盘点:深度解析医用级乳清配方优势
  • 2025年有实力的气压组合农用榨油机厂家推荐及采购指南
  • 2025年有实力的北京母线槽厂家最新TOP排行榜
  • 2025年1.2mm填埋场防渗土工膜高端项目推荐排行榜
  • 2025年有实力的公寓床行业内口碑厂家排行榜
  • 2025年如何选择半自动环形绕线机厂家最新TOP实力排行