1 为什么要使用 ELK
- 随着企业信息化进程加速,日志数据呈现量急剧增加、来源多样、格式复杂的特点,传统日志管理方式已难以满足需求,这是引入ELK的核心背景;
- ELK(ElasticSearch、Logstash、Kibana)的由三个组件构成,各自承担关键功能,形成高效的日志管理方案:
- Elasticsearch:提供强大的分布式搜索能力,支撑日志的快速检索;
- Logstash:具备灵活的数据采集与处理功能,负责日志的收集和预处理;
- Kibana:提供直观的数据可视化界面,将复杂日志数据以图表等形式呈现,便于理解分析;
- 三者结合,为企业和开发人员提供了高效、实时、可扩展且易用的日志管理解决方案,助力提升工作效率和问题解决速度;
- 使用ELK的主要原因
- 集中化管理与高效检索日志
* 在大型分布式系统中,ELK可构建集中式日志系统,实现所有节点日志的统一收集、管理和访问,大幅提高定位问题的效率;
* Elasticsearch的强大检索特性,能快速查询问题日志,显著提升运维人员的工作效率; - 全面的日志分析与系统监控
* ELK可管理和分析多种日志类型,包括系统日志、应用程序日志、安全日志等,帮助系统运维和开发人员了解服务器软硬件信息、检查配置错误及其原因;
* 通过分析和监控日志,能及时了解服务器的负荷、性能和安全性,从而及时采取措施纠正错误; - 直观的数据可视化与理解
* Kibana为Elasticsearch提供Web可视化界面,可生成各种维度的表格、图形,让复杂的日志数据“可视化”;
* 可视化界面帮助用户更直观地理解和分析数据,进一步提升日志分析和系统监控的效果。
- 集中化管理与高效检索日志
2 ELK 的整体架构分析
- ELK架构分为两种类型:
- 经典的ELK架构
- 整合消息队列(Redis/Kafka/RabbitMQ)和Nginx的ELK架构
2.1 经典的ELK架构
- 组成:
- 经典ELK架构主要由 Filebeat + Logstash + Elasticsearch + Kibana 组成;
- 早期架构仅包含
Logstash + Elasticsearch + Kibana,后来因Filebeat轻量级、高效性的特点,逐渐被引入作为日志收集工具;

- 流程与组件分工:
- Beats(以Filebeat为代表):负责 Data Collection(数据收集),作为轻量级日志收集代理,部署在客户端,消耗资源少,高效收集日志数据;
- Logstash:负责 Data Aggregation & Processing(数据聚合与处理),对Filebeat收集的日志数据进行过滤、转换等操作,再发送到Elasticsearch;
- Elasticsearch:负责 Indexing & Storage(索引与存储),基于Lucene的分布式搜索和分析引擎,提供强大的数据存储和搜索能力;
- Kibana:负责 Analysis & Visualization(分析与可视化),为Elasticsearch提供Web可视化界面,通过图表、仪表盘等形式直观查看和分析日志数据;
- 特点:
- 日志收集:Filebeat轻量级,客户端部署,资源消耗少,高效收集日志;
- 数据处理:Logstash作为数据处理管道,完成日志的过滤、转换等操作;
- 存储与检索:Elasticsearch提供分布式存储和强大检索能力;
- 可视化:Kibana实现日志数据的可视化分析;
- 适用场景:主要适用于数据量较小的开发环境。但因缺少消息队列的缓冲机制,若Logstash或Elasticsearch故障,存在数据丢失的风险。
2.2 整合消息队列+Nginx的ELK架构
- 组成:在经典ELK架构基础上,整合 消息队列(Redis、Kafka、RabbitMQ任选) 和 Nginx,形成更复杂的架构;
- 流程与组件分工:
- Beats:依然负责 Data Collection(数据收集);
- 消息队列(Redis/Kafka/RabbitMQ):负责 Buffering(缓冲),作为中间缓冲层;
- Logstash:负责 Data Aggregation & Processing(数据聚合与处理);
- Elasticsearch:负责 Indexing & Storage(索引与存储);
- Kibana:负责 Analysis & Visualization(分析与可视化);
- Nginx:作为高性能Web和反向代理服务器,优化系统性能和可用性;

- 特点:
- 消息队列:引入后作为缓冲机制,即使Logstash或Elasticsearch故障,日志数据也不会丢失;同时能修剪网络传输峰,降低数据丢失可能性;
- Nginx:在负载均衡、缓存等方面发挥作用,提升系统性能和用户访问体验;
- 扩展性:引入消息队列和Nginx后,架构扩展性增强,可根据实际需求动态调整各组件的资源分配和部署规模;
- 适用场景:主要适用于生产环境,尤其是需要处理大数据量的场景。能确保数据的安全性和完整性,同时提供高性能的日志处理和可视化分析服务。
3 数据处理管道Logstash
3.1 概述
- Logstash是免费且开放的服务器端数据处理管道,能够从多个来源采集数据、转换数据,然后将数据发送到目标存储库中;
- 官网:Logstash:收集、解析和转换日志 | Elastic;
- 应用场景:作为ETL工具(Extract-Transform-Load,提取-转换-加载)和数据采集处理引擎,支持日志、指标、Web应用、数据库等多源数据的采集与处理,最终服务于分析(Analysis)、归档(Archiving)、监控(Monitoring)、告警(Alerting)等场景;

3.2 Logstash的核心概念
- Pipeline
- 是Logstash的核心处理流程,包含Input—Filter—Output三个阶段;
- 负责插件生命周期管理和队列管理,是数据从采集到输出的完整流转通道;
- Logstash Event
- 是数据在Logstash内部流转时的具体表现形式:在Input阶段被转换为Event,在Output阶段被转化为目标格式数据;
- 本质是一个Java Object,可在配置文件中对其属性进行增删改查;
- Codec(Code / Decode,编码/解码):
- 将原始数据**decode(解码)**成Event;
- 将Event **encode(编码)**成目标数据,实现数据格式的转换适配。
3.3 Logstash的数据传输原理
- 数据传输遵循**“Input(数据采集)→ Filter(数据解析)→ Output(数据输出)”**的流程,各阶段有典型组件支撑:
| 阶段 | 功能 | 典型组件示例 |
| ------ | --------- | ------------------------------------------ |
| Input | 数据采集与输入 | Stdin(标准输入)、JDBC(数据库连接) |
| Filter | 实时解析和数据转换 | Mutate(数据修改)、Date(日期处理)、User Agent(用户代理解析) |
| Output | 存储与数据导出 | Elasticsearch(主流存储目标) |

- Logstash通过**管道(Pipeline)**完成数据的采集与处理,流程如下:
- 数据采集(Input):从数据源(Data Source,如日志文件、数据库等)采集数据,支持多种输入插件,以连续的流式传输方式获取数据;
- 数据过滤/预处理(Filter,可选):通过Filter插件对数据进行过滤、转换(如字段提取、格式调整等),构建结构化数据;
- 数据输出(Output):将处理后的数据发送到目标存储(如Elasticsearch),支持多种输出插件配置;

3.4 Logstash的安装与配置
3.4.1 Logstash的安装
- Logstash官方安装文档地址:[Installing Logstash | Logstash Reference 8.14] | Elastic;
- 下载并解压Logstash
- Logstash下载地址:Past Releases | Elastic;
- 选择版本 8.14.3;
- 不同系统的下载链接:
* Windows系统:https://artifacts.elastic.co/downloads/logstash/logstash-8.14.3-windows-x86_64.zip;
* Linux系统:https://artifacts.elastic.co/downloads/logstash/logstash-8.14.3-linux-x86_64.tar.gz;
- 测试:运行最基本的Logstash管道
- 进入Logstash目录:执行命令
cd logstash-8.14.3,切换到Logstash的安装目录; - 不同系统的测试命令:
* Linux系统:
bash bin/logstash -e 'input { stdin {} } output { stdout {} }'
* 其中-e选项表示“直接把配置放在命令中”,用于快速测试;
* Windows系统:
cmd .\bin\logstash.bat -e "input { stdin {} } output { stdout {} }" - 测试逻辑说明:
*input { stdin {} }:配置Logstash从**标准输入(键盘输入)**采集数据;
*output { stdout {} }:配置Logstash将处理后的数据输出到标准输出(终端);
- 进入Logstash目录:执行命令
- 当在终端输入内容(如
hello、hello fox)后,Logstash会将输入内容以结构化的Event形式输出- 包含以下字段:
*message:输入的原始内容(如"hello\r"、"hello fox\r");
*@timestamp:数据处理的时间戳(如2022-06-10T05:36:47.382Z);
*@version:Logstash事件的版本(固定为"1");
*host:运行Logstash的主机名(如"LAPTOP-UTD9471P");

- 这表明Logstash的管道已成功运行,能够完成“输入→输出”的基本数据处理流程。
- 包含以下字段:
3.4.2 Logstash的配置
- Logstash官方配置文档地址:[Creating a Logstash pipeline | Logstash Reference 8.14] | Elastic;
- Logstash的管道配置文件对每种类型的插件(Input、Filter、Output)提供单独配置部分,用于处理管道事件;
- Input部分
1input { 2 stdin {} 3}* 功能:配置数据输入源,此处使用`stdin`插件,从\*\*标准输入(键盘)\*\*采集数据;- Filter部分
1filter { 2 grok { 3 match => { "message" => "%{COMBINEDAPACHELOG}" } 4 } 5 date { 6 match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] 7 } 8}* `grok`插件:通过正则匹配(`%{COMBINEDAPACHELOG}`是预定义的Apache日志格式),尝试将`message`字段解析为结构化数据; * [`date`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.date.md)插件:尝试将名为`timestamp`的字段按格式[`dd/MMM/yyyy:HH:mm:ss Z`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.dd.md)解析为时间戳,用于统一时间格式;- Output部分
1output { 2 elasticsearch { 3 index => "logstash-demo" 4 hosts => ["localhost:9200"] 5 } 6 stdout { codec => rubydebug } 7}* `elasticsearch`插件:将处理后的数据输出到Elasticsearch,指定索引为`logstash-demo`,Elasticsearch服务地址为`localhost:9200`; * `stdout`插件:同时将数据输出到终端,`codec => rubydebug`用于以结构化格式(便于调试)展示输出内容; - 每个配置部分可包含一个或多个插件,Logstash会按照插件在配置文件中出现的顺序依次处理(例如Filter中的
grok先执行,再执行date); - 执行以下命令启动Logstash并加载配置文件(假设配置文件名为
logstash-demo.conf):
1bin/logstash -f logstash-demo.conf

- 当输入
hello后,终端输出结构化数据,包含以下关键信息:
*@version:事件版本(固定为"1")
*host:运行Logstash的主机信息(如"hostname" => "Java")
*tags:包含"_grokparsefailure",说明grok插件解析"hello"时失败(因hello不符合Apache日志格式)
*message:原始输入内容("hello\r")
*@timestamp:事件处理的时间戳(如2024-08-28T14:21:13.961676600Z)
*event.original:原始事件内容("hello\r") - 通过Elasticsearch的检索API可查看到,数据已成功写入
logstash-demo索引,字段与终端输出一致,验证了数据已同步到Elasticsearch中;

3.4.3 Logstash的插件
3.4.3.1 插件分类
- Logstash插件分为Input、Filter、Output、Codec四大类,各自承担不同功能;
- Input Plugins(输入插件)
- 官方文档:[Input plugins | Logstash Reference 8.14] | Elastic;
- 核心特点:一个Pipeline可以包含多个Input插件,支持从多种数据源采集数据;
- 常见插件示例:
* 基础输入:Stdin(标准输入)、File(文件);
* 中间件/数据库:Beats、Log4J、Elasticsearch、JDBC、Kafka、Rabbitmq、Redis;
* 协议/服务:JMX、HTTP、Websocket、UDP、TCP;
* 云存储/平台:Google Cloud Storage、S3、Github、Twitter;
- Filter Plugins(过滤插件)
- 官方文档:[Filter plugins | Logstash Reference 8.14] | Elastic;
- 核心功能:对Logstash Event进行解析、字段删除、类型转换等处理;
- 常见插件示例:
*Date:日期解析;
*Dissect:分割符解析;
*Grok:正则匹配解析;
*Mutate:对字段做多种操作(Convert类型转换、Gsub字符串替换、Split/Join/Merge字符串/数组操作、Rename字段重命名、Update/Replace字段内容更新、Remove_field字段删除);
*Ruby:利用Ruby代码动态修改Event;
- Output Plugins(输出插件)
- 官方文档:[Output plugins | Logstash Reference 8.14] | Elastic;
- 核心特点:是Pipeline的最后一个阶段,负责将Event发送到特定目的地;
- 常见插件示例:
* 存储引擎:Elasticsearch;
* 通知/监控:Email、Pageduty;
* 数据库/中间件:Influxdb、Kafka、Mongodb、Opensdb、Zabbix;
* 协议/服务:Http、TCP、Websocket;
- Codec Plugins(编解码插件)
- 官方文档:[Codec plugins | Logstash Reference 8.14] | Elastic;
- 核心功能:将原始数据decode成Event,再将Event encode成目标数据,实现数据格式的转换适配;
- 内置插件示例:
Line/Multiline、JSON/Avro/Cef(ArcSight Common Event Format)、Dots/Rubydebug。
3.4.3.2 Codec插件之Multiline(多行编解码)
- 用于处理多行数据(如异常堆栈信息),核心是通过配置参数将多行合并为一个Event;
- 配置参数
- 例:处理Java异常堆栈
- 以包含多行的
NullPointerException堆栈为例,配置文件multiline-exception.conf如下:
1input { 2 stdin { 3 codec => multiline { 4 pattern => "^\s" # 匹配以空白开头的行 5 what => "previous" # 匹配行属于上一个事件 6 } 7 } 8} 9filter {} 10output { 11 stdout { codec => rubydebug } 12}- 执行命令:
bin/logstash -f multiline-exception.conf,即可将多行异常堆栈合并为一个Event输出。
- 以包含多行的
3.4.3.3 Logstash Queue(队列)
- 用于在Pipeline中缓冲数据,分为两种类型:
- In Memory Queue(内存队列):数据存储在内存中,若进程Crash或机器宕机,会导致数据丢失;
- Persistent Queue(持久化队列)
- 特点:数据持久化到磁盘,即使机器宕机,数据也不会丢失;能保证数据被消费,可替代Kafka等消息队列的缓冲区作用;
- 配置方式(在
pipelines.yml中设置):
1queue.type: persisted # 默认是memory 2queue.max_bytes: 4gb # 队列最大容量 - 队列在Pipeline中的流程:
Input → Codec → Queue → Batcher → Filter → Output,多个Input可共享Queue进行数据缓冲与分发;

3.4.4 练习:同步MySQL数据到ElasticSearch
3.4.4.1 需求分析
- 将MySQL数据库中的用户数据同步到ElasticSearch,借助ES的全文搜索能力提升搜索速度,具体需求包括:
- 新增用户信息需同步到ES
- 用户信息更新后,ES需同步更新
- 支持增量更新(只同步变化的数据)
- 用户注销后,不能被ES搜索到
3.4.4.2 实现思路
- 借助Logstash的JDBC Input Plugin将数据从MySQL读取到Logstash,再通过Elasticsearch Output Plugin同步到ES。JDBC Input Plugin的关键能力:
- 需自行提供JDBC Driver(如MySQL的JDBC驱动包)
- 支持定时任务调度(语法基于Rufus-scheduler,扩展了Cron语法)
- 支持通过
Tracking_column/sql_last_value记录状态,实现增量更新
- 官方文档:[Jdbc input plugin | Logstash Reference 8.14] | Elastic。
3.4.4.3 JDBC Input Plugin实现步骤
- 拷贝JDBC依赖:将MySQL的JDBC驱动包(如
mysql-connector-java-5.1.49.jar)拷贝到Logstash的自定义目录(如logstash-8.14.3/drivers); - 准备配置文件
mysql-demo.conf。配置文件分为input、output两个部分,实现MySQL数据采集与ES同步:
1input { 2 jdbc { 3 # JDBC驱动路径与类名 4 jdbc_driver_library => "/home/fox/logstash-8.14.3/driver/mysql-connector-java-5.1.49.jar" 5 jdbc_driver_class => "com.mysql.jdbc.Driver" 6 # MySQL连接信息 7 jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useSSL=false" 8 jdbc_user => "root" 9 jdbc_password => "123456" 10 # 增量更新配置 11 use_column_value => true # 启用字段值追踪 12 tracking_column => "last_updated" # 追踪的字段(需是数值或时间类型) 13 tracking_column_type => "numeric" # 追踪字段类型(numeric/ timestamp) 14 record_last_run => true # 记录最后一次运行状态 15 last_run_metadata_path => "jdbc-position.txt" # 状态保存路径 16 statement => "SELECT * FROM user where last_updated > :sql_last_value;" # 增量查询SQL 17 schedule => "* * * * * *" # Cron表达式,设置任务触发频率 18 } 19} 20output { 21 # 同步到Elasticsearch 22 elasticsearch { 23 document_id => "%{id}" # 文档ID关联MySQL的id字段 24 document_type => "_doc" 25 index => "users" # ES索引名 26 hosts => ["http://localhost:9200"] 27 username => "elastic" # ES认证信息 28 password => "123456" 29 } 30 # 终端输出调试(可选) 31 stdout { 32 codec => rubydebug 33 } 34}
- 运行Logstash。执行命令启动Logsta sh并加载配置文件:
1bin/logstash -f mysql-demo.conf
3.4.4.4 测试
1CREATE TABLE `user` ( 2 [`id`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.id.md) int NOT NULL AUTO_INCREMENT, 3 `name` varchar(50) DEFAULT NULL, 4 [`address`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.address.md) varchar(50) DEFAULT NULL, 5 `last_updated` bigint DEFAULT NULL, 6 `is_deleted` int DEFAULT NULL, 7 PRIMARY KEY ([`id`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.id.md)) 8) ENGINE=InnoDB;
- 插入数据:新增用户“张三”,并设置
last_updated为当前时间戳;
1INSERT INTO user(name,address,last_updated,is_deleted) 2VALUES("张三","广州天河",unix_timestamp(NOW()),0);
- 更新数据:更新“张三”的地址,并更新
last_updated;
1UPDATE user SET address="广州白云山",last_updated=unix_timestamp(NOW()) WHERE name="张三";
- 注销用户:标记“张三”为注销(
is_deleted=1),并更新last_updated;
1UPDATE user SET is_deleted=1,last_updated=unix_timestamp(NOW()) WHERE name="张三";
- 数据同步效果:
- 插入、更新操作后,Logstash会通过增量查询将变化的数据同步到ES,终端可通过
stdout { codec => rubydebug }看到结构化的同步数据; - 注销操作后,数据仍会同步到ES,但可通过ES的Alias机制过滤掉注销用户;
- 插入、更新操作后,Logstash会通过增量查询将变化的数据同步到ES,终端可通过
3.4.4.5 ES中查询优化(Alias机制)
- 通过创建**别名(Alias)**并添加过滤条件,实现“只显示未被注销的用户”:
- 创建Alias:
1POST /_aliases 2{ 3 "actions": [ 4 { 5 "add": { 6 "index": "users", 7 "alias": "view_users", 8 "filter": { "term": { "is_deleted": 0 } } 9 } 10 } 11 ] 12}
- 通过Alias查询:只会返回
is_deleted=0的用户,注销用户(is_deleted=1)不会被检索到
1POST view_users/_search 2{ 3 "query": { 4 "term": { 5 "name.keyword": { 6 "value": "张三" 7 } 8 } 9 } 10}
4 轻量级采集器FileBeat
4.1 概述
- Beats是一个免费且开放的平台,集合了多种单一用途的数据采集器,可从成百上千甚至上万台机器和系统向Logstash或Elasticsearch发送数据;
- Beats包含多个针对不同数据类型的采集器:
- Filebeat:采集日志文件;
- Metricbeat:采集指标数据;
- Packetbeat:采集网络数据;
- Winlogbeat:采集Windows事件日志;
- Auditbeat:采集审计数据;
- Heartbeat:用于运行时间监控;
- Functionbeat:无服务器的采集器;

- FileBeat是专门用于转发和收集日志数据的轻量级采集工具,可作为代理安装在服务器上,监控指定路径的日志文件,收集日志数据后发送到Elasticsearch或Logstash。
4.2 工作原理
- FileBeat的工作流程由Prospector(勘探器)、Harvester(收割机)、Libbeat三个核心组件协同完成:
- Prospector(勘探器):启动FileBeat时会启动一个或多个Input,用于监控指定的日志数据位置(如
/var/log/*.log、/var/log/nginx/*等路径); - Harvester(收割机):针对每一个被监控的文件启动一个Harvester,负责读取文件的日志内容,并将新的日志发送到Libbeat;
- Libbeat:负责收集多个Harvester的数据,并将数据发送到输出端(Output),支持直接发送到Elasticsearch,或先发送到Logstash再转发到Elasticsearch;
- Prospector(勘探器):启动FileBeat时会启动一个或多个Input,用于监控指定的日志数据位置(如
- 以监控
/var/log/*.log(Prospector 1)和/var/log/nginx/*(Prospector 2)为例:- Prospector 1监控
/var/log/*.log下的log1文件,为其启动Harvester读取日志,将新日志发送到Libbeat; - Prospector 2监控
/var/log/nginx/*下的error.log文件,为其启动Harvester读取日志,将新日志发送到Libbeat; - Libbeat收集所有Harvester的数据后,可选择直接发送到Elasticsearch,或先发送到Logstash再由Logstash转发到Elasticsearch;

- Prospector 1监控
4.3 Logstash VS FileBeat
- 资源消耗对比
- Logstash:运行在JVM(Java虚拟机)上,资源消耗较大;
- FileBeat:基于Golang编写,功能相对少但资源消耗小,更轻量级,占用资源更少;
- 功能定位对比
- 两者都具备日志收集功能,但FileBeat更轻量;
- Logstash独有Filter功能:可对日志进行过滤、分析(如字段提取、格式转换等);
- 典型配合流程:
- FileBeat采集日志,发送到**消息队列(Redis、MQ等)**做缓冲;
- Logstash从消息队列获取日志,利用Filter功能过滤分析;
- 最终将处理后的数据存储到Elasticsearch;
- FileBeat支持背压压制:
- 当向Logstash或Elasticsearch发送数据时,使用背压敏感协议;
- 若Logstash忙于处理数据,会通知FileBeat减慢读取速度;
- 拥塞解决后,FileBeat恢复原速度继续传输数据,以此适配数据量波动,保证系统稳定。
4.4 FileBeat的安装与配置
- 下载并解压FileBeat
- FileBeat官方安装配置文档:[Filebeat quick start: installation and configuration | Filebeat Reference 8.14] | Elastic;
- 下载地址:Past Releases | Elastic;
- 选择版本:8.14.3
- Windows系统下载链接:
https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.14.3-windows-x86_64.zip; - Linux系统:执行命令下载并解压
1curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.14.3-linux-x86_64.tar.gz 2tar xzvf filebeat-8.14.3-linux-x86_64.tar.gz - 编辑配置。修改
filebeat.yml以配置Elasticsearch和Kibana的连接信息:
1output.elasticsearch: 2 hosts: ["192.168.65.174:9200", "192.168.65.192:9200", "192.168.65.204:9200"] # Elasticsearch集群地址 3 username: "elastic" # Elasticsearch认证用户名 4 password: "123456" # Elasticsearch认证密码 5setup.kibana: 6 host: "192.168.65.174:5601" # Kibana地址
- 启用和配置数据收集模块
- 从FileBeat安装目录中执行以下操作,启用特定服务的日志收集模块;
- 查看可启用的模块列表
1./filebeat modules list- 启用Nginx模块
* 启用命令:
bash ./filebeat modules enable nginx
* 若需修改Nginx日志路径,编辑modules.d/nginx.yml:
yaml - module: nginx access: enabled: true var.paths: ["/var/log/nginx/access.log*"] # Nginx访问日志路径 - 启用Logstash模块
* 启用命令:
bash ./filebeat modules enable logstash
* 若需修改Logstash日志路径,编辑modules.d/logstash.yml:
yaml - module: logstash log: enabled: true var.paths: ["/home/fox/logstash-8.14.3/logs/*.log"] # Logstash日志路径
- 启动FileBeat
- 加载Kibana仪表板(可选,若未配置过则执行)
1./filebeat setup- 启动FileBeat
1./filebeat -e- 验证启动效果。启动成功后,可在Kibana的Observability → Logs → Stream中查看收集到的日志(如Logstash的日志),确认数据已成功传输和可视化;

4.5 练习1:FileBeat采集Tomcat服务器日志并发送到Logstash
- 需求背景:Tomcat服务器运行时产生大量日志(如访问日志),需通过FileBeat采集这些日志,并发送到Logstash进行后续处理;
- 配置FileBeat采集Tomcat日志并发送到Logstash
- 创建配置文件
filebeat-tomcat.yml。配置FileBeat的日志采集路径、多行合并规则,以及Logstash的连接信息:
1filebeat.inputs: 2- type: log 3 enabled: true 4 paths: 5 - /home/fox/apache-tomcat-9.0.93/logs/*access*.* # Tomcat访问日志路径 6 # 多行合并规则(因Tomcat日志以IP开头,需合并非IP开头的行到上一行) 7 multiline.pattern: '^\d+\.\d+\.\d+\.\d+ ' # 匹配IP开头的正则 8 multiline.negate: true # 对pattern取反(即非IP开头的行) 9 multiline.match: after # 合并到上一行的末尾 10output.logstash: 11 enabled: true 12 hosts: ["localhost:5044"] # Logstash监听地址与端口* 其中,`multiline`参数用于处理多行日志(如异常堆栈): * `pattern`:正则表达式,匹配行的特征; * `negate`:[`true`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.true.md)表示对`pattern`结果取反; * `match`:`after`表示将匹配行合并到上一行末尾;- 启动FileBeat并指定配置文件。执行命令启动FileBeat:
1./filebeat -e -c filebeat-tomcat.yml- 可能出现的异常及解决
* 异常1:配置文件权限过高(安全限制)
* 报错:Exiting: error loading config file: config file "...yml" can only be writable by the owner...
* 解决:修改文件权限为644(仅 owner 可写):
bash chmod 644 filebeat-tomcat.yml
* 异常2:无法连接Logstash
* 报错:Failed to connect to backoff(async(tcp://...)): dial tcp ...: connect: connection refused
* 原因:未启动或未正确配置Logstash,导致FileBeat无法连接;
- 创建配置文件
- 配置Logstash接收FileBeat数据并打印
- 创建Logstash配置文件
logstsh-tomcat.conf。配置Logstash的Beats输入插件(监听FileBeat的连接)和终端输出(用于调试):
1input { 2 beats { 3 port => 5044 # 与FileBeat配置的端口一致 4 } 5} 6output { 7 stdout { 8 codec => rubydebug # 以结构化格式输出到终端 9 } 10}- 测试Logstash配置是否正确**。执行命令验证配置语法:
1bin/logstash -f config/logstsh-tomcat.conf --config.test_and_exit* 若输出`Config Validation Result: OK`,则配置正确;- 启动Logstash。执行命令启动Logstash,并启用配置自动重载:
1bin/logstash -f config/logstsh-tomcat.conf --config.reload.automatic - 创建Logstash配置文件
- 测试:访问Tomcat,验证日志传输。访问Tomcat服务后,查看Logstash的终端输出,若能看到类似以下的结构化日志数据,说明传输成功:
1{ 2 "tags" => ["beats_input_codec_plain_applied"], 3 "input" => { "type" => "log" }, 4 "event" => { "original" => "192.168.65.103 - - [30/Aug/2024:13:29:11 +0800] \"GET /docs/ HTTP/1.1\" 403 904" }, 5 "@version" => "1", 6 "log" => { "offset" => 1559, "file" => { "path" => "/home/fox/apache-tomcat-9.0.93/logs/localhost_access_log.2024-08-30.txt" } }, 7 "message" => "192.168.65.103 - - [30/Aug/2024:13:29:11 +0800] \"GET /docs/ HTTP/1.1\" 403 904", 8 "agent" => { "id" => "ed5031e7-fc13-441c-b87a-b554cb02df3c", "type" => "filebeat", "ephemeral_id" => "3f03ad41-d6f2-4483-9b78-9c7c456d031c", "version" => "8.14.3", "name" => "192-168-65-211" } 9}
4.6 练习2:整合ELK采集与分析Tomcat日志
4.6.1 Logstash输出数据到Elasticsearch
- 若需将Tomcat日志从Logstash同步到Elasticsearch,需修改Logstash的
output配置; - 配置文件
logstsh-tomcat.conf
1input { 2 beats { 3 port => 5044 # 接收FileBeat数据的端口 4 } 5} 6output { 7 elasticsearch { 8 hosts => ["http://localhost:9200"] # ES地址 9 index => "tomcat-logs" # ES索引名 10 user => "elastic" # ES认证用户名 11 password => "123456" # ES认证密码 12 } 13 stdout { 14 codec => rubydebug # 终端调试输出(可选) 15 } 16}
- 启动Logstash
1bin/logstash -f config/logstsh-tomcat.conf --config.reload.automatic
- 测试ES数据存储。通过ES检索API查询
tomcat-logs索引,若能看到结构化的Tomcat日志数据,说明同步成功;

4.6.2 利用Logstash过滤器解析日志
- 为了将Tomcat日志从“原始字符串”解析为结构化字段(如IP、时间、请求方式等),需借助Logstash的
Grok、Mutate、Date插件; - 查看Logstash已经安装的插件:
1bin/logstash-plugin list

4.6.2.1 Grok插件:日志结构化解析
- 官方文档:[Grok filter plugin | Logstash Reference 8.14] | Elastic;
- 作用:将非结构化日志通过正则匹配解析为结构化字段,可理解为“增强版正则表达式”。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是其他任意的日志格式;
- 语法:
%{SYNTAX:SEMANTIC},其中SYNTAX是Grok模式名称(如IP、HTTPDATE),SEMANTIC是自定义字段名(如ip、date);- 例:
1%{NUMBER:duration} %{IP:client} 2// duration表示:匹配一个数字,client表示匹配一个IP地址- 默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:
%{NUMBER:duration:int} %{IP:client};
- 常用的Grok模式:GROK模式语法参考-日志服务-阿里云;
- 用法:
1filter { 2 grok { 3 match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } 4 } 5}
- 比如针对下面的Tomcat访问日志格式:
1192.168.65.103 - - [23/Jun/2022:22:37:23 +0800] "GET /docs/images/docs-stylesheet.css HTTP/1.1" 200 5780
- 解析后的字段:
| 字段名 | 说明 |
| --------- | -------------- |
| client IP | 浏览器端IP |
| timestamp | 请求的时间戳 |
| method | 请求方式(GET/POST) |
| uri | 请求的链接地址 |
| status | 服务器端响应状态 |
| length | 响应的数据长度 | - Grok模式为:
1%{IP:ip} - - \[%{HTTPDATE:date}\] "%{WORD:method} %{PATH:uri} %{DATA:protocol}" %{INT:status:int} %{INT:length:int}
- 为了方便测试,可以使用Kibana进行Grok开发:

- 修改Logstash配置文件:
1vim config/logstash-console.conf 2input { 3 beats { 4 port => 5044 5 } 6} 7filter { 8 grok { 9 match => { 10 "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}" 11 } 12} 13} 14output { 15 stdout { 16 codec => rubydebug 17 } 18}
- 启动Logstash测试:
1bin/logstash -f config/logstash-console.conf --config.reload.automatic

4.6.2.2 Mutate插件:字段过滤
- 用于删除不需要的字段,精简数据结构:
1mutate { 2 enable_metric => "false" 3 remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"] 4}
4.6.2.3 Date插件:日期格式转换
- 官方文档:[Date filter plugin | Logstash Reference 8.14] | Elastic;
- 用法:

- 将
date字段转换为「年月日 时分秒」格式。默认字段经过Date插件处理后,会输出到@timestamp字段,所以可以通过修改target属性来重新定义输出字段:
1date { 2 match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"] 3 target => "date" 4}
4.6.2.4 filter完整的配置
- filter完整的配置:

- 测试效果:

4.6.2.5 输出到ElasticSearch指定索引
- 通过
index参数指定ES索引名称,支持时间格式化语法(如%{+YYYY-MM}按年月拆分索引)。注意:- 索引名不能包含大写字符;
- 若使用时间格式化,Filter需确保输出包含
@timestamp字段,否则日期解析会失败;
1output { 2 elasticsearch { 3 index => "tomcat_web_log_%{+YYYY-MM}" 4 hosts => ["http://localhost:9200"] 5 user => "elastic" 6 password => "123456" 7 } 8 stdout{ 9 codec => rubydebug 10 } 11}
- 完整的Logstash配置
logstash-tomcat-es.conf
1input { 2 beats { 3 port => 5044 4 } 5} 6filter { 7 grok { 8 match => { 9 "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}" 10 } 11 } 12 mutate { 13 enable_metric => "false" 14 remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"] 15 } 16 date { 17 match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"] 18 target => "date" 19 } 20} 21output { 22 stdout { 23 codec => rubydebug 24 } 25 elasticsearch { 26 index => "tomcat_web_log_%{+YYYY-MM}" # 按年月生成索引 27 hosts => ["http://localhost:9200"] 28 user => "elastic" 29 password => "123456" 30 } 31}
- 启动Logstash:
1bin/logstash -f config/logstash-tomcat-es.conf --config.reload.automatic
- 查询ES中是否有数据:

- 通过Kibana分析服务日志:
- 在Kibana的Stack Management → 数据视图中,创建针对
tomcat_web_log_*索引的视图,指定时间字段(如@timestamp);

- 进入Discover模块,选择刚创建的数据视图,即可:

* 查看日志的结构化字段(如ip、method、status等);
* 筛选特定条件的日志(如status: 403的错误请求);

- 在Kibana的Stack Management → 数据视图中,创建针对
5 微服务整合ELK实现日志采集与分析实战
5.1 实现思路分析
- Spring Boot微服务日志流向ELK的流程如下:
- Spring Boot应用生成日志数据,通过Logback日志框架记录日志;
- Logstash作为日志收集器,接收Spring Boot发送的日志数据;
- Logstash解析和过滤日志数据(可选格式化、处理);
- 处理后的日志发送到Elasticsearch,存储在分布式索引中;
- Kibana连接Elasticsearch,实现日志数据的可视化分析;

5.2 微服务整合Logstash实现日志采集
- 使用Logstash日志插件(引入依赖)。在Spring Boot项目的
pom.xml中引入Logstash-Logback依赖,实现日志向Logstash的传输:
1<dependency> 2 <groupId>net.logstash.logback</groupId> 3 <artifactId>logstash-logback-encoder</artifactId> 4 <version>6.3</version> 5</dependency>
- 配置Logback(
logback-spring.xml)。通过Logback配置,将日志发送到Logstash的TCP端口:
1<?xml version="1.0" encoding="UTF-8"?> 2<configuration debug="false"> 3 <property name="LOG_HOME" value="logs/elk-demo.log" /> 4 <!-- 控制台输出Appender --> 5 <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> 6 <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> 7 <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> 8 </encoder> 9 </appender> 10 <!-- Logstash TCP输出Appender --> 11 <appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> 12 <destination>192.168.65.211:4560</destination> <!-- Logstash监听的IP和端口 --> 13 <encoder class="net.logstash.logback.encoder.LogstashEncoder"> 14 <customFields>{"appname": "elk-demo"}</customFields> <!-- 自定义字段(应用标识) --> 15 </encoder> 16 </appender> 17 <!-- 日志输出级别 --> 18 <root level="INFO"> 19 <appender-ref ref="STDOUT" /> 20 <appender-ref ref="logstash" /> <!-- 启用Logstash输出 --> 21 </root> 22</configuration>
- 配置Logstash并启动(
elk-demo.conf)。创建Logstash配置文件,定义输入(TCP接收日志)、输出(Elasticsearch存储):
1input { 2 tcp { 3 host => "0.0.0.0" # 监听所有IP 4 port => 4560 # 与Logback配置的端口一致 5 mode => "server" 6 codec => json_lines # 日志格式为JSON行 7 } 8 stdin {} # 标准输入(可选,用于调试) 9} 10filter { 11 # 可选:日志过滤、解析逻辑 12} 13output { 14 stdout { 15 codec => rubydebug # 终端调试输出(可选) 16 } 17 elasticsearch { 18 hosts => ["127.0.0.1:9200"] # Elasticsearch地址 19 index => "%{[appname]}-%{+YYYY.MM.dd}" # 按应用名和日期生成索引 20 } 21}
- 启动Logstash:
1bin/logstash -f config/elk-demo.conf # 后台启动可加 &
5.3 测试验证
- Logstash控制台验证。调用Spring Boot应用的接口,触发日志输出,查看Logstash控制台,若能看到类似以下的结构化日志,说明接收成功:
1{ 2 "level" => "ERROR", 3 "@timestamp" => 2024-08-30T08:24:03.429Z, 4 "thread_name" => "http-nio-8080-exec-1", 5 "logger_name" => "org.tuling.elkdemo.controller.HelloController", 6 "@version" => "1", 7 "message" => "HelloController执行-----log.error", 8 "level_value" => 40000, 9 "appname" => "elk-demo" 10}
- Elasticsearch索引验证。在Kibana的索引管理中,查看是否存在以
elk-demo-开头的索引(如elk-demo-2024.08.30),验证日志已存储到ES;

5.4 通过Kibana分析微服务日志
- 创建数据视图。在Kibana的Stack Management → 数据视图中,创建针对
elk-demo*索引的视图,指定时间字段(如@timestamp);

- Discover模块分析日志。进入Discover模块,选择刚创建的数据视图,即可:
- 查看微服务日志的结构化字段(如
level、message、thread_name等); - 筛选特定级别、特定服务的日志(如
level: ERROR的错误日志);

- 查看微服务日志的结构化字段(如
《分布式专题——56 微服务日志采集与分析系统实战》 是转载文章,点击查看原文。