当前位置: 首页 > news >正文

Flink数据流写入Elasticsearch实战

目录

1. 代码结构

2. 代码解析

(1) 主程序入口

(2) 配置 Elasticsearch 集群

(3) 定义 Elasticsearch Sink Function

(4) 添加 Elasticsearch Sink

(5) 执行任务

3. 验证命令


这段代码是一个使用 Apache Flink 将数据流(Event对象)写入 Elasticsearch 的示例。以下是对代码的详细解析和说明:

1. 代码结构

  • 包声明package sink
    定义了代码所在的包。

  • 导入依赖
    导入了必要的 Java 和 Flink 相关类库,包括:

    • java.util:用于使用ArrayListHashMap
    • org.apache.flink:Flink 的核心类库。
    • org.apache.flink.streaming.connectors.elasticsearch:Flink 的 Elasticsearch Sink 相关类。
    • org.elasticsearch.client.Requests:Elasticsearch 的请求工具类。
  • Event
    定义了一个简单的Event类,包含三个字段:

    • user:用户名称。
    • url:访问的 URL。
    • timestamp:时间戳。
  • sinkToEs对象
    主程序入口,包含 Flink 流处理逻辑和 Elasticsearch Sink 的配置。

package sink import java.util import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests import source.ClickSource case class Event(user:String,url:String,timestamp:Long) /** * * @PROJECT_NAME: flink1.13 * @PACKAGE_NAME: sink * @author: 赵嘉盟-HONOR * @data: 2023-11-20 15:04 * @DESCRIPTION * */ object sinkToEs { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.fromElements( Event("Mary", "./home", 100L), Event("Sum", "./cart", 500L), Event("King", "./prod", 1000L), Event("King", "./root", 200L) ) //定义es集群主机列表 val hosts = new util.ArrayList[HttpHost]() hosts.add(new HttpHost("master",9200)) //定义一个esSinkFunction val esFun = new ElasticsearchSinkFunction[Event] { override def process(t: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { val data = new util.HashMap[String, String]() data.put(t.user, t.url) //包装要发送的http请求 val request = Requests.indexRequest() .index("clicks") //表名 .source(data) //数据 .`type`("event") //类型 //发送请求 requestIndexer.add(request) } } data.addSink(new ElasticsearchSink.Builder[Event](hosts,esFun).build()) //验证命令 //curl 'localhost:9200/_cat/indices?v' //curl 'localhost:9200/clicks/_search?pretty' env.execute("sinkRedis") } }

基于scala使用flink将读取到的数据写入到ES

发送完毕后可以使用以下命令进行验证:

curl 'localhost:9200/_cat/indices?v' curl 'localhost:9200/clicks/_search?pretty'

2. 代码解析

(1) 主程序入口
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.fromElements( Event("Mary", "./home", 100L), Event("Sum", "./cart", 500L), Event("King", "./prod", 1000L), Event("King", "./root", 200L) )
  • 创建 Flink 流处理环境StreamExecutionEnvironment
  • 使用fromElements方法生成一个包含 4 个Event对象的流。
(2) 配置 Elasticsearch 集群
val hosts = new util.ArrayList[HttpHost]() hosts.add(new HttpHost("master", 9200))
  • 创建一个ArrayList,用于存储 Elasticsearch 集群的主机信息。
  • 添加一个 Elasticsearch 节点(master,端口9200)。
(3) 定义 Elasticsearch Sink Function
val esFun = new ElasticsearchSinkFunction[Event] { override def process(t: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { val data = new util.HashMap[String, String]() data.put(t.user, t.url) val request = Requests.indexRequest() .index("clicks") // 索引名称 .source(data) // 数据 .`type`("event") // 类型 requestIndexer.add(request) } }
  • 实现ElasticsearchSinkFunction接口,定义如何将Event对象写入 Elasticsearch。
  • process方法中:
    • Event对象的userurl字段存入HashMap
    • 使用Requests.indexRequest()创建一个索引请求。
    • 指定索引名称(clicks)、数据源(data)和类型(event)。
    • 通过requestIndexer.add(request)发送请求。
(4) 添加 Elasticsearch Sink
data.addSink(new ElasticsearchSink.Builder[Event](hosts, esFun).build())
  • 使用ElasticsearchSink.Builder构建 Elasticsearch Sink。
  • 将 Sink 添加到数据流中。
(5) 执行任务
env.execute("sinkRedis")
  • 启动 Flink 流处理任务,任务名称为sinkRedis

3. 验证命令

  • 查看 Elasticsearch 索引

    bash

    curl 'localhost:9200/_cat/indices?v'

    检查clicks索引是否创建成功。

  • 查询索引数据

    bash

    curl 'localhost:9200/clicks/_search?pretty'

    查看clicks索引中的数据。

http://www.gsyq.cn/news/1387382.html

相关文章:

  • 实测对比:MPU6050在STM32上的Sleep与Cycle模式,哪个更省电?(附电流数据)
  • 构建非侵入式智能帮助系统:三层感知架构与无感集成实践
  • PostgreSQL CASE语句深度解析:性能、类型与NULL安全实战指南
  • 【ChatGPT】美国泛林集团Sabre® 系列水平镀铜设备深度拆解、爆炸图10张、信息图10张、C++代码框架
  • 从一次生产事故复盘:我们如何优雅地处理用户上传的‘异常’Excel文件(附Apache POI配置详解)
  • 避坑指南:树莓派4B编译FFmpeg支持H.264硬编时,我遇到的‘OMX_Core.h not found’等错误全解决
  • Topit:macOS窗口置顶神器,让多任务处理效率翻倍
  • 从零到一:用PySide6和Qt Creator 4.14打造你的第一个Python GUI应用
  • RCNet:基于RNN的Delta-Sigma ADC自动化设计新方法
  • Archon Specs:用约束性规范与实时验证消除AI代码生成中的幻觉问题
  • 全国职业院校技能大赛-心得+环境代码全资源
  • 量子程序调试新方法:Bloch向量断言技术解析
  • 3分钟快速上手:用BetterNCM安装器彻底改造你的网易云音乐
  • AX-MES生产制造管理系统-总览
  • 抖音数字资产管理方法论:构建个人内容沉淀系统的技术实践
  • 3步搞定洛雪音乐播放:六音音源修复版完整配置指南
  • nginx配置 请求静态文件时带上额外的响应头信息(可用作获取客户端IP)
  • 接口测试用例设计实战:从契约验证到状态跃迁
  • 重新定义数据科学范式:SISSO如何颠覆黑盒机器学习的认知框架
  • LLM在HPC代码翻译中的实践与评估
  • SideX安全最佳实践:保护你的代码编辑环境
  • 实战教程:如何使用GLM-4.1V-9B-Thinking-gs-A8W8进行图像理解和视频分析的完整指南
  • 从13个虚假集成到真实数据流:AI审计揭示前后端割裂与架构重构
  • Geolib地理计算库:零依赖的经纬度处理终极指南
  • ComfyUI-Manager终极指南:3个核心功能彻底解决AI工作流管理难题
  • Ventoy终极指南:一个U盘启动所有系统,告别重复格式化烦恼 [特殊字符]
  • CentOS 7上VSFTPD报错‘user unknown’?别慌,可能是PAM配置和nologin用户惹的祸
  • ComfyUI深度估计神器:5分钟搞定Marigold完整部署指南
  • NativeScript Firebase安全指南:保护用户数据的7个关键措施
  • WordPress Widget Boilerplate与Gutenberg编辑器集成:现代WordPress开发终极指南 [特殊字符]