当前位置: 首页 > news >正文

使用Logstash实现PostgreSQL到Elasticsearch的数据摄取

使用Logstash实现PostgreSQL到Elasticsearch的数据摄取

什么是Logstash?

Logstash是Elastic提供的开源数据处理管道工具,用于摄取、转换和将数据发送到不同源,包括Elasticsearch、Kafka、平面文件等。

Logstash管道包含三个不同的处理过程:

  • 输入:从中收集数据以进行摄取的数据源
  • 过滤器:使用Grok、Mutate、Date等插件转换(清理、聚合等)数据
  • 输出:摄取的目标(Elasticsearch、平面文件、数据库等)

以下是使用Logstash将数据发送到Elasticsearch的先决条件:

  • 系统上安装了Logstash和Postgres的JDBC驱动程序
  • 具有要同步的表或函数的Postgres数据库
  • 正在运行的Elasticsearch实例

Logstash设置(Windows版)

以下是本地安装和运行Logstash的简要步骤。

1. 安装Java

从官方Oracle网站下载JDK包(Java 8或更高版本)。下载完成后,将文件解压缩到首选位置。

解压缩文件后,需要添加环境变量以便系统识别Java命令。

转到环境变量,添加一个名为JAVA_HOME的新变量,并将其指向Java文件所在的目录。将%JAVA_HOME%\bin附加到路径中。

要验证安装是否成功,请转到命令提示符并运行以下命令:

java -version

如果一切设置正确,它将显示Java版本。

2. 安装Logstash

要安装Logstash,请从官方Elastic网站下载包,并将其解压缩到首选位置。

要在本地测试,请打开命令提示符,导航到Logstash文件夹中的bin文件夹,并运行以下命令:

logstash -e "input { stdin {} } output { stdout {} }"

Logstash摄取管道

1. 安装所需的JDBC驱动程序

从官方PostgreSQL网站下载Postgres驱动程序。将jar文件放在可访问的位置。

2. 创建Logstash管道

以下是示例管道:

input {jdbc {jdbc_driver_library => "c:/logstash/jdbc/postgresql.jar"jdbc_driver_class => "org.postgresql.Driver"jdbc_connection_string => "${JDBC_HOST}"jdbc_user => "${DB_USER}"jdbc_password => "${DB_PWD}"jdbc_paging_enabled => truejdbc_page_size => 1000schedule => "* * * * *"  # 计划每分钟运行一次statement => "SELECT * FROM employee WHERE updated_at > :sql_last_value"use_column_value => truetracking_column => "updated_at"tracking_column_type => "timestamp"last_run_metadata_path => "c:/logstash/employee.tracker"}
}filter {mutate {remove_field => ["date", "@timestamp", "host"]}# 如果需要解析JSON字段的示例json {source => "first_name"target => "name"}
}output {stdout { codec => json_lines }elasticsearch {hosts => ["http://localhost:9200"]index => "my_table_index"custom_headers => {"Authorization" => "${AUTH_KEY}"}document_id => "%{table_id}" # 表中的唯一标识符timeout => 120}
}

上述管道用于增量摄取。这意味着它会跟踪最后一次运行,并从最后一次运行开始获取记录,按照计划摄取数据。

以下是使用的关键概念:

输入

  • jdbc_driver_library - JDBC驱动程序文件(.jar)的存储位置
  • jdbc_driver_class - 正在使用的驱动程序类
  • jdbc_connection_string - postgres数据库连接字符串
  • jdbc_user - 数据库用户名
  • jdbc_password - 用户的数据库密码
  • paging - 数据将以多页形式发送,每页大小为1000。这将提高管道的性能,并有助于跟踪发送到Elasticsearch的记录数
  • schedule - 上述管道计划每分钟运行一次。以下是计划的格式:
  • statement - 管道将执行的SQL语句。要执行复杂的语句,可以将其保存在单独的.sql文件中,并将文件路径提及到statement_filepath而不是statement。最好使用视图或物化视图而不是具有复杂连接的查询。

最后一部分用于增量摄取:

use_column_value => true
tracking_column => "updated_dt"
tracking_column_type => "timestamp"
last_run_metadata_path => "c:/project/logstash/date.tracker"
  • use_column_value设置为true。它让Logstash知道跟踪在tracking_column中使用的列updated_at的实际值,而不是使用上次运行查询的时间。在这种情况下,:sql_last_value将使用updated_dt值。
  • 如果设置为false,Logstash将使用上次查询执行时间作为:sql_last_value
  • 最后一次运行时间将保存在last_run_metadata_path中提到的文件中。它将用于跟踪管道最后一次运行的时间。

过滤器
这是一个可选部分,用于在将数据发送到目标之前操作数据。

在上述管道中,日期字段正在从摄取中删除。此外,它还将数据中的first_name发送到目标中的name字段。

输出
此部分定义数据的目标。在这种情况下,它是Elasticsearch端点、授权密钥(如果有)、elastic索引、document_id。document_id是索引中elastic文档的唯一标识符。如果未提及此字段,Elasticsearch将自动为文档分配唯一标识符。

在增量摄取的情况下,建议定义此字段。在摄取期间,Elasticsearch将在索引中查找此字段;如果匹配,它将更新同一文档。

如果未定义该字段,它将在索引中创建一个新文档,从而导致重复记录。

运行管道

要运行此管道,请打开命令提示符,转到Logstash文件夹,并运行以下命令:

bin/logstash -f c:/logstash/sample_pipeline.conf

以下是管道的输出。

来自Elasticsearch索引的输出。

{"took": 1,"timed_out": false,"_shards": {"total": 1,"successful": 1,"skipped": 0,"failed": 0},"hits": {"total": {"value": 3,"relation": "eq"},"max_score": 1.0,"hits": [{"_index": "testing","_id": "1","_score": 1.0,"_source": {"name": "James","id": 1,"last_name": "Smith","updated_dt": "2024-12-12T16:10:57.349Z","@version": "1","@timestamp": "2025-06-25T20:41:02.167442600Z"}},{"_index": "testing","_id": "2","_score": 1.0,"_source": {"name": "John","id": 2,"last_name": "Doe","updated_dt": "2024-12-12T16:10:57.349Z","@version": "1","@timestamp": "2025-06-25T20:41:02.169021400Z"}},{"_index": "testing","_id": "3","_score": 1.0,"_source": {"name": "Kate","id": 3,"last_name": "Williams","updated_dt": "2024-12-12T16:10:57.349Z","@version": "1","@timestamp": "2025-06-25T20:41:02.170098800Z"}}]}
}

这种方法有几个优点:

  • Logstash是一个开源工具,易于实现。
  • 有200多个可用于数据转换的插件。使用这些插件,可以使用过滤器解析和转换数据。
  • 它是数据源和Elasticsearch之间的解耦架构。
  • 与Elasticsearch无缝集成。

尽管这是一种开源的简单实现方法,但它也有一些缺点:

  • 延迟问题:对于需要极低延迟或实时数据的应用程序来说,它并不理想。随着管道的增长,加载、转换/过滤和发送数据需要时间。
  • 错误处理:除非明确监控,否则很难跟踪错误,这可能导致数据丢失。
  • 如果管道定义不当,可能会产生重复项。
  • 与其他工具相比,启动时间更长。
  • 它使用YAML风格的配置文件,这使其变得复杂且难以维护。
  • 资源利用:在重负载和复杂管道的情况下,它可能利用更多资源。

如果某人正在寻找更强大和集中化的数据流管道,可以使用上述管道。它不适用于实时数据传送。
更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
对网络安全、黑客技术感兴趣的朋友可以关注我的安全公众号(网络安全技术点滴分享)

公众号二维码

公众号二维码

http://www.gsyq.cn/news/60281.html

相关文章:

  • 2025年封闭母线槽优质厂家权威推荐榜单:耐火母线槽/防水母线槽/空气型母线槽源头厂家精选
  • 2025年数据分类分级产品选型排名与深度解析:可视化、自适应、一键部署成关键能力
  • 官网发布|智感未来-聚链共生-2026中国激光雷达大会暨展览会/火热招展中!!!
  • 2025年三集一体除湿热泵机组选购指南及厂家推荐,目前三集一体除湿热泵机组直销厂家联系电话精选实力品牌
  • 2025年哈尔滨自闭症康复机构权威推荐榜单:孤独症/发育迟缓/发育落后源头机构精选
  • HarmonyOS自动化测试与持续集成实战指南 - 教程
  • 2025年11月审计报告事务所推荐:权威榜单与选择指南
  • 腾讯云TBDS与CDH迁移常见问题有哪些?建议由CDH迁移到CMP 7.13 平台(类Cloudera CDP,如华为鲲鹏 ARM 版)
  • 4A平台的新变化与国内典型厂商全景盘点
  • 2025年学生平板电脑制造厂权威推荐榜单:商务平板电脑/护眼大屏学习机/学生学习机源头工厂精选
  • 从被动防御到智能自治:安全运营中心(SOC)的演进之路
  • 2025年型材铝扣板批发厂家权威推荐榜单:吊顶铝扣板/集成铝扣板/墙面铝扣板源头厂家精选
  • 光催化全解水反应器源头厂家TOP5,品牌综合实力榜单发布
  • 有时休息时,身体会动一下
  • 2025年上海品牌营销推荐公司榜单途阔营销
  • 吴恩达深度学习课程三: 结构化机器学习项目 第一周:机器学习策略(一)正交化调优和评估指标
  • Mac与Kali主机间SSH连接故障排除:主机密钥变更的解决便捷的方案
  • 2025年11月高新技术企业认定公司推荐榜单与选择指南:权威评测与高性价比解决方案
  • 为什么软件反应特别慢?一次因版本架构错误导致的性能问题排查记录
  • Enefit - Kaggle项目
  • 2025年11月审计报告事务所推荐:知名机构选择指南及避坑要点详解
  • 2025年11月高新技术企业认定公司推荐:榜单与权威选择指南
  • **`Series` / `DataFrame`** 和 **`ndarray`** 除了“容器”,还有别的叫法吗?
  • XML 序列化工具类
  • 详解C语言操作符 - 详解
  • 2025年11月智能AI客服品牌推荐热度榜:基于性能指标的结果承诺保障方案
  • 2025年11月AI智能客服机器人品牌推荐权威榜单:十大品牌核心价值与解决方案解析
  • 2025年比较好的染色机优质厂家推荐榜单
  • 2025年比较好的工业真空包装袋厂家最新权威实力榜
  • zkw 线段树