分布式专题——56 微服务日志采集与分析系统实战

作者:失散13日期:2025/11/19

1 为什么要使用 ELK

  • 随着企业信息化进程加速,日志数据呈现量急剧增加、来源多样、格式复杂的特点,传统日志管理方式已难以满足需求,这是引入ELK的核心背景;
  • ELK(ElasticSearch、Logstash、Kibana)的由三个组件构成,各自承担关键功能,形成高效的日志管理方案:
    • Elasticsearch:提供强大的分布式搜索能力,支撑日志的快速检索;
    • Logstash:具备灵活的数据采集与处理功能,负责日志的收集和预处理;
    • Kibana:提供直观的数据可视化界面,将复杂日志数据以图表等形式呈现,便于理解分析;
    • 三者结合,为企业和开发人员提供了高效、实时、可扩展且易用的日志管理解决方案,助力提升工作效率和问题解决速度;
  • 使用ELK的主要原因
    • 集中化管理与高效检索日志
      * 在大型分布式系统中,ELK可构建集中式日志系统,实现所有节点日志的统一收集、管理和访问,大幅提高定位问题的效率;
      * Elasticsearch的强大检索特性,能快速查询问题日志,显著提升运维人员的工作效率;
    • 全面的日志分析与系统监控
      * ELK可管理和分析多种日志类型,包括系统日志、应用程序日志、安全日志等,帮助系统运维和开发人员了解服务器软硬件信息、检查配置错误及其原因
      * 通过分析和监控日志,能及时了解服务器的负荷、性能和安全性,从而及时采取措施纠正错误;
    • 直观的数据可视化与理解
      * Kibana为Elasticsearch提供Web可视化界面,可生成各种维度的表格、图形,让复杂的日志数据“可视化”;
      * 可视化界面帮助用户更直观地理解和分析数据,进一步提升日志分析和系统监控的效果。

2 ELK 的整体架构分析

  • ELK架构分为两种类型:
    • 经典的ELK架构
    • 整合消息队列(Redis/Kafka/RabbitMQ)和Nginx的ELK架构

2.1 经典的ELK架构

  • 组成
    • 经典ELK架构主要由 Filebeat + Logstash + Elasticsearch + Kibana 组成;
    • 早期架构仅包含Logstash + Elasticsearch + Kibana,后来因Filebeat轻量级、高效性的特点,逐渐被引入作为日志收集工具;
  • 流程与组件分工
    • Beats(以Filebeat为代表):负责 Data Collection(数据收集),作为轻量级日志收集代理,部署在客户端,消耗资源少,高效收集日志数据;
    • Logstash:负责 Data Aggregation & Processing(数据聚合与处理),对Filebeat收集的日志数据进行过滤、转换等操作,再发送到Elasticsearch;
    • Elasticsearch:负责 Indexing & Storage(索引与存储),基于Lucene的分布式搜索和分析引擎,提供强大的数据存储和搜索能力;
    • Kibana:负责 Analysis & Visualization(分析与可视化),为Elasticsearch提供Web可视化界面,通过图表、仪表盘等形式直观查看和分析日志数据;
  • 特点
    • 日志收集:Filebeat轻量级,客户端部署,资源消耗少,高效收集日志;
    • 数据处理:Logstash作为数据处理管道,完成日志的过滤、转换等操作;
    • 存储与检索:Elasticsearch提供分布式存储和强大检索能力;
    • 可视化:Kibana实现日志数据的可视化分析;
  • 适用场景:主要适用于数据量较小的开发环境。但因缺少消息队列的缓冲机制,若Logstash或Elasticsearch故障,存在数据丢失的风险

2.2 整合消息队列+Nginx的ELK架构

  • 组成:在经典ELK架构基础上,整合 消息队列(Redis、Kafka、RabbitMQ任选)Nginx,形成更复杂的架构;
  • 流程与组件分工
    • Beats:依然负责 Data Collection(数据收集)
    • 消息队列(Redis/Kafka/RabbitMQ):负责 Buffering(缓冲),作为中间缓冲层;
    • Logstash:负责 Data Aggregation & Processing(数据聚合与处理)
    • Elasticsearch:负责 Indexing & Storage(索引与存储)
    • Kibana:负责 Analysis & Visualization(分析与可视化)
    • Nginx:作为高性能Web和反向代理服务器,优化系统性能和可用性;
  • 特点
    • 消息队列:引入后作为缓冲机制,即使Logstash或Elasticsearch故障,日志数据也不会丢失;同时能修剪网络传输峰,降低数据丢失可能性;
    • Nginx:在负载均衡、缓存等方面发挥作用,提升系统性能和用户访问体验;
    • 扩展性:引入消息队列和Nginx后,架构扩展性增强,可根据实际需求动态调整各组件的资源分配和部署规模;
  • 适用场景:主要适用于生产环境,尤其是需要处理大数据量的场景。能确保数据的安全性和完整性,同时提供高性能的日志处理和可视化分析服务。

3 数据处理管道Logstash

3.1 概述

  • Logstash是免费且开放的服务器端数据处理管道,能够从多个来源采集数据、转换数据,然后将数据发送到目标存储库中;
  • 官网:Logstash:收集、解析和转换日志 | Elastic
  • 应用场景:作为ETL工具(Extract-Transform-Load,提取-转换-加载)和数据采集处理引擎,支持日志、指标、Web应用、数据库等多源数据的采集与处理,最终服务于分析(Analysis)、归档(Archiving)、监控(Monitoring)、告警(Alerting)等场景;

3.2 Logstash的核心概念

  • Pipeline
    • 是Logstash的核心处理流程,包含Input—Filter—Output三个阶段
    • 负责插件生命周期管理队列管理,是数据从采集到输出的完整流转通道;
  • Logstash Event
    • 是数据在Logstash内部流转时的具体表现形式:在Input阶段被转换为Event,在Output阶段被转化为目标格式数据;
    • 本质是一个Java Object,可在配置文件中对其属性进行增删改查;
  • Codec(Code / Decode,编码/解码)
    • 将原始数据**decode(解码)**成Event;
    • 将Event **encode(编码)**成目标数据,实现数据格式的转换适配。

3.3 Logstash的数据传输原理

  • 数据传输遵循**“Input(数据采集)→ Filter(数据解析)→ Output(数据输出)”**的流程,各阶段有典型组件支撑:
    | 阶段 | 功能 | 典型组件示例 |
    | ------ | --------- | ------------------------------------------ |
    | Input | 数据采集与输入 | Stdin(标准输入)、JDBC(数据库连接) |
    | Filter | 实时解析和数据转换 | Mutate(数据修改)、Date(日期处理)、User Agent(用户代理解析) |
    | Output | 存储与数据导出 | Elasticsearch(主流存储目标) |
  • Logstash通过**管道(Pipeline)**完成数据的采集与处理,流程如下:
    • 数据采集(Input):从数据源(Data Source,如日志文件、数据库等)采集数据,支持多种输入插件,以连续的流式传输方式获取数据;
    • 数据过滤/预处理(Filter,可选):通过Filter插件对数据进行过滤、转换(如字段提取、格式调整等),构建结构化数据;
    • 数据输出(Output):将处理后的数据发送到目标存储(如Elasticsearch),支持多种输出插件配置;

3.4 Logstash的安装与配置

3.4.1 Logstash的安装

  • Logstash官方安装文档地址:[Installing Logstash | Logstash Reference 8.14] | Elastic
  • 下载并解压Logstash
  • 测试:运行最基本的Logstash管道
    • 进入Logstash目录:执行命令 cd logstash-8.14.3,切换到Logstash的安装目录;
    • 不同系统的测试命令:
      * Linux系统
      bash bin/logstash -e 'input { stdin {} } output { stdout {} }'
      * 其中-e选项表示“直接把配置放在命令中”,用于快速测试;
      * Windows系统
      cmd .\bin\logstash.bat -e "input { stdin {} } output { stdout {} }"
    • 测试逻辑说明:
      * input { stdin {} }:配置Logstash从**标准输入(键盘输入)**采集数据;
      * output { stdout {} }:配置Logstash将处理后的数据输出到标准输出(终端)
  • 当在终端输入内容(如hellohello fox)后,Logstash会将输入内容以结构化的Event形式输出
    • 包含以下字段:
      * message:输入的原始内容(如"hello\r""hello fox\r");
      * @timestamp:数据处理的时间戳(如2022-06-10T05:36:47.382Z);
      * @version:Logstash事件的版本(固定为"1");
      * host:运行Logstash的主机名(如"LAPTOP-UTD9471P");
    • 这表明Logstash的管道已成功运行,能够完成“输入→输出”的基本数据处理流程。

3.4.2 Logstash的配置

  • Logstash官方配置文档地址:[Creating a Logstash pipeline | Logstash Reference 8.14] | Elastic
  • Logstash的管道配置文件对每种类型的插件(Input、Filter、Output)提供单独配置部分,用于处理管道事件;
    • Input部分
    1input {  
    2  stdin {}  
    3}  
       * 功能:配置数据输入源,此处使用`stdin`插件,从\*\*标准输入(键盘)\*\*采集数据;  
    
    • Filter部分
    1filter {  
    2  grok {  
    3    match => { "message" => "%{COMBINEDAPACHELOG}" }  
    4  }  
    5  date {  
    6    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]  
    7  }  
    8}  
       * `grok`插件:通过正则匹配(`%{COMBINEDAPACHELOG}`是预定义的Apache日志格式),尝试将`message`字段解析为结构化数据;  
       * [`date`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.date.md)插件:尝试将名为`timestamp`的字段按格式[`dd/MMM/yyyy:HH:mm:ss Z`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.dd.md)解析为时间戳,用于统一时间格式;  
    
    • Output部分
    1output {  
    2  elasticsearch {  
    3    index => "logstash-demo"  
    4    hosts => ["localhost:9200"]  
    5  }  
    6  stdout { codec => rubydebug }  
    7}  
       * `elasticsearch`插件:将处理后的数据输出到Elasticsearch,指定索引为`logstash-demo`,Elasticsearch服务地址为`localhost:9200`;  
       * `stdout`插件:同时将数据输出到终端,`codec => rubydebug`用于以结构化格式(便于调试)展示输出内容;
    
  • 每个配置部分可包含一个或多个插件,Logstash会按照插件在配置文件中出现的顺序依次处理(例如Filter中的grok先执行,再执行date);
  • 执行以下命令启动Logstash并加载配置文件(假设配置文件名为logstash-demo.conf):
1bin/logstash -f logstash-demo.conf  

  • 当输入hello后,终端输出结构化数据,包含以下关键信息:
    * @version:事件版本(固定为"1"
    * host:运行Logstash的主机信息(如"hostname" => "Java"
    * tags:包含"_grokparsefailure",说明grok插件解析"hello"时失败(因hello不符合Apache日志格式)
    * message:原始输入内容("hello\r"
    * @timestamp:事件处理的时间戳(如2024-08-28T14:21:13.961676600Z
    * event.original:原始事件内容("hello\r"
  • 通过Elasticsearch的检索API可查看到,数据已成功写入logstash-demo索引,字段与终端输出一致,验证了数据已同步到Elasticsearch中;

3.4.3 Logstash的插件

3.4.3.1 插件分类
  • Logstash插件分为Input、Filter、Output、Codec四大类,各自承担不同功能;
  • Input Plugins(输入插件)
    • 官方文档:[Input plugins | Logstash Reference 8.14] | Elastic
    • 核心特点:一个Pipeline可以包含多个Input插件,支持从多种数据源采集数据;
    • 常见插件示例:
      * 基础输入:Stdin(标准输入)、File(文件);
      * 中间件/数据库:BeatsLog4JElasticsearchJDBCKafkaRabbitmqRedis
      * 协议/服务:JMXHTTPWebsocketUDPTCP
      * 云存储/平台:Google Cloud StorageS3GithubTwitter
  • Filter Plugins(过滤插件)
    • 官方文档:[Filter plugins | Logstash Reference 8.14] | Elastic
    • 核心功能:对Logstash Event进行解析、字段删除、类型转换等处理;
    • 常见插件示例:
      * Date:日期解析;
      * Dissect:分割符解析;
      * Grok:正则匹配解析;
      * Mutate:对字段做多种操作(Convert类型转换、Gsub字符串替换、Split/Join/Merge字符串/数组操作、Rename字段重命名、Update/Replace字段内容更新、Remove_field字段删除);
      * Ruby:利用Ruby代码动态修改Event;
  • Output Plugins(输出插件)
    • 官方文档:[Output plugins | Logstash Reference 8.14] | Elastic
    • 核心特点:是Pipeline的最后一个阶段,负责将Event发送到特定目的地;
    • 常见插件示例:
      * 存储引擎:Elasticsearch
      * 通知/监控:EmailPageduty
      * 数据库/中间件:InfluxdbKafkaMongodbOpensdbZabbix
      * 协议/服务:HttpTCPWebsocket
  • Codec Plugins(编解码插件)
    • 官方文档:[Codec plugins | Logstash Reference 8.14] | Elastic
    • 核心功能:将原始数据decode成Event,再将Event encode成目标数据,实现数据格式的转换适配;
    • 内置插件示例:Line/MultilineJSON/Avro/Cef(ArcSight Common Event Format)、Dots/Rubydebug
3.4.3.2 Codec插件之Multiline(多行编解码)
  • 用于处理多行数据(如异常堆栈信息),核心是通过配置参数将多行合并为一个Event;
  • 配置参数
    • pattern:设置行匹配的正则表达式;
    • what:匹配成功时,该行属于上一个事件(previous)还是下一个事件(next)
    • negate:是否对pattern的结果取反(true/false);
  • 例:处理Java异常堆栈
    • 以包含多行的NullPointerException堆栈为例,配置文件multiline-exception.conf如下:
    1input {  
    2  stdin {  
    3    codec => multiline {  
    4      pattern => "^\s"  # 匹配以空白开头的行  
    5      what => "previous"  # 匹配行属于上一个事件  
    6    }  
    7  }  
    8}  
    9filter {}  
    10output {  
    11  stdout { codec => rubydebug }  
    12}  
    • 执行命令:bin/logstash -f multiline-exception.conf,即可将多行异常堆栈合并为一个Event输出。
3.4.3.3 Logstash Queue(队列)
  • 用于在Pipeline中缓冲数据,分为两种类型:
  • In Memory Queue(内存队列):数据存储在内存中,若进程Crash或机器宕机,会导致数据丢失
  • Persistent Queue(持久化队列)
    • 特点:数据持久化到磁盘,即使机器宕机,数据也不会丢失;能保证数据被消费,可替代Kafka等消息队列的缓冲区作用;
    • 配置方式(在pipelines.yml中设置):
    1queue.type: persisted  # 默认是memory  
    2queue.max_bytes: 4gb   # 队列最大容量  
  • 队列在Pipeline中的流程:Input → Codec → Queue → Batcher → Filter → Output,多个Input可共享Queue进行数据缓冲与分发;

3.4.4 练习:同步MySQL数据到ElasticSearch

3.4.4.1 需求分析
  • 将MySQL数据库中的用户数据同步到ElasticSearch,借助ES的全文搜索能力提升搜索速度,具体需求包括:
    • 新增用户信息需同步到ES
    • 用户信息更新后,ES需同步更新
    • 支持增量更新(只同步变化的数据)
    • 用户注销后,不能被ES搜索到
3.4.4.2 实现思路
  • 借助Logstash的JDBC Input Plugin将数据从MySQL读取到Logstash,再通过Elasticsearch Output Plugin同步到ES。JDBC Input Plugin的关键能力:
    • 需自行提供JDBC Driver(如MySQL的JDBC驱动包)
    • 支持定时任务调度(语法基于Rufus-scheduler,扩展了Cron语法)
    • 支持通过Tracking_column/sql_last_value记录状态,实现增量更新
  • 官方文档:[Jdbc input plugin | Logstash Reference 8.14] | Elastic
3.4.4.3 JDBC Input Plugin实现步骤
  • 拷贝JDBC依赖:将MySQL的JDBC驱动包(如mysql-connector-java-5.1.49.jar)拷贝到Logstash的自定义目录(如logstash-8.14.3/drivers);
  • 准备配置文件mysql-demo.conf。配置文件分为inputoutput两个部分,实现MySQL数据采集与ES同步:
1input {  
2  jdbc {  
3    # JDBC驱动路径与类名  
4    jdbc_driver_library => "/home/fox/logstash-8.14.3/driver/mysql-connector-java-5.1.49.jar"  
5    jdbc_driver_class => "com.mysql.jdbc.Driver"  
6    # MySQL连接信息  
7    jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useSSL=false"  
8    jdbc_user => "root"  
9    jdbc_password => "123456"  
10    # 增量更新配置  
11    use_column_value => true  # 启用字段值追踪  
12    tracking_column => "last_updated"  # 追踪的字段(需是数值或时间类型)  
13    tracking_column_type => "numeric"  # 追踪字段类型(numeric/ timestamp)  
14    record_last_run => true  # 记录最后一次运行状态  
15    last_run_metadata_path => "jdbc-position.txt"  # 状态保存路径  
16    statement => "SELECT * FROM user where last_updated > :sql_last_value;"  # 增量查询SQL  
17    schedule => "* * * * * *"  # Cron表达式,设置任务触发频率  
18  }  
19}  
20output {  
21  # 同步到Elasticsearch  
22  elasticsearch {  
23    document_id => "%{id}"  # 文档ID关联MySQL的id字段  
24    document_type => "_doc"  
25    index => "users"  # ES索引名  
26    hosts => ["http://localhost:9200"]  
27    username => "elastic"  # ES认证信息  
28    password => "123456"  
29  }  
30  # 终端输出调试(可选)  
31  stdout {  
32    codec => rubydebug  
33  }  
34}  
  • 运行Logstash。执行命令启动Logsta sh并加载配置文件:
1bin/logstash -f mysql-demo.conf  
3.4.4.4 测试
  • 表结构:创建user表,包含id(主键)、nameaddresslast_updated(增量追踪字段)、is_deleted(注销标记字段);
1CREATE TABLE `user` (  
2  [`id`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.id.md) int NOT NULL AUTO_INCREMENT,  
3  `name` varchar(50) DEFAULT NULL,  
4  [`address`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.address.md) varchar(50) DEFAULT NULL,  
5  `last_updated` bigint DEFAULT NULL,  
6  `is_deleted` int DEFAULT NULL,  
7  PRIMARY KEY ([`id`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.id.md))  
8) ENGINE=InnoDB;  
  • 插入数据:新增用户“张三”,并设置last_updated为当前时间戳;
1INSERT INTO user(name,address,last_updated,is_deleted)  
2VALUES("张三","广州天河",unix_timestamp(NOW()),0);  
  • 更新数据:更新“张三”的地址,并更新last_updated
1UPDATE user SET address="广州白云山",last_updated=unix_timestamp(NOW()) WHERE name="张三";  
  • 注销用户:标记“张三”为注销(is_deleted=1),并更新last_updated
1UPDATE user SET is_deleted=1,last_updated=unix_timestamp(NOW()) WHERE name="张三";  
  • 数据同步效果
    • 插入、更新操作后,Logstash会通过增量查询将变化的数据同步到ES,终端可通过stdout { codec => rubydebug }看到结构化的同步数据;
    • 注销操作后,数据仍会同步到ES,但可通过ES的Alias机制过滤掉注销用户;
3.4.4.5 ES中查询优化(Alias机制)
  • 通过创建**别名(Alias)**并添加过滤条件,实现“只显示未被注销的用户”:
  • 创建Alias
1POST /_aliases  
2{  
3  "actions": [  
4    {  
5      "add": {  
6        "index": "users",  
7        "alias": "view_users",  
8        "filter": { "term": { "is_deleted": 0 } }  
9      }  
10    }  
11  ]  
12}  
  • 通过Alias查询:只会返回is_deleted=0的用户,注销用户(is_deleted=1)不会被检索到
1POST view_users/_search  
2{  
3  "query": {  
4    "term": {  
5      "name.keyword": {  
6        "value": "张三"  
7      }  
8    }  
9  }  
10}  

4 轻量级采集器FileBeat

4.1 概述

  • Beats是一个免费且开放的平台,集合了多种单一用途的数据采集器,可从成百上千甚至上万台机器和系统向Logstash或Elasticsearch发送数据;
  • Beats包含多个针对不同数据类型的采集器:
    • Filebeat:采集日志文件
    • Metricbeat:采集指标数据
    • Packetbeat:采集网络数据
    • Winlogbeat:采集Windows事件日志
    • Auditbeat:采集审计数据
    • Heartbeat:用于运行时间监控
    • Functionbeat:无服务器的采集器;
  • FileBeat是专门用于转发和收集日志数据的轻量级采集工具,可作为代理安装在服务器上,监控指定路径的日志文件,收集日志数据后发送到Elasticsearch或Logstash。

4.2 工作原理

  • FileBeat的工作流程由Prospector(勘探器)、Harvester(收割机)、Libbeat三个核心组件协同完成:
    • Prospector(勘探器):启动FileBeat时会启动一个或多个Input,用于监控指定的日志数据位置(如/var/log/*.log/var/log/nginx/*等路径);
    • Harvester(收割机):针对每一个被监控的文件启动一个Harvester,负责读取文件的日志内容,并将新的日志发送到Libbeat;
    • Libbeat:负责收集多个Harvester的数据,并将数据发送到输出端(Output),支持直接发送到Elasticsearch,或先发送到Logstash再转发到Elasticsearch;
  • 以监控/var/log/*.log(Prospector 1)和/var/log/nginx/*(Prospector 2)为例:
    • Prospector 1监控/var/log/*.log下的log1文件,为其启动Harvester读取日志,将新日志发送到Libbeat;
    • Prospector 2监控/var/log/nginx/*下的error.log文件,为其启动Harvester读取日志,将新日志发送到Libbeat;
    • Libbeat收集所有Harvester的数据后,可选择直接发送到Elasticsearch,或先发送到Logstash再由Logstash转发到Elasticsearch;

4.3 Logstash VS FileBeat

  • 资源消耗对比
    • Logstash:运行在JVM(Java虚拟机)上,资源消耗较大
    • FileBeat:基于Golang编写,功能相对少但资源消耗小,更轻量级,占用资源更少;
  • 功能定位对比
    • 两者都具备日志收集功能,但FileBeat更轻量;
    • Logstash独有Filter功能:可对日志进行过滤、分析(如字段提取、格式转换等);
  • 典型配合流程:
    • FileBeat采集日志,发送到**消息队列(Redis、MQ等)**做缓冲;
    • Logstash从消息队列获取日志,利用Filter功能过滤分析;
    • 最终将处理后的数据存储到Elasticsearch;
  • FileBeat支持背压压制
    • 当向Logstash或Elasticsearch发送数据时,使用背压敏感协议
    • 若Logstash忙于处理数据,会通知FileBeat减慢读取速度
    • 拥塞解决后,FileBeat恢复原速度继续传输数据,以此适配数据量波动,保证系统稳定。

4.4 FileBeat的安装与配置

1output.elasticsearch:  
2  hosts: ["192.168.65.174:9200", "192.168.65.192:9200", "192.168.65.204:9200"]  # Elasticsearch集群地址  
3  username: "elastic"  # Elasticsearch认证用户名  
4  password: "123456"   # Elasticsearch认证密码  
5setup.kibana:  
6  host: "192.168.65.174:5601"  # Kibana地址  
  • 启用和配置数据收集模块
    • 从FileBeat安装目录中执行以下操作,启用特定服务的日志收集模块;
    • 查看可启用的模块列表
    1./filebeat modules list  
    • 启用Nginx模块
      * 启用命令:
      bash ./filebeat modules enable nginx
      * 若需修改Nginx日志路径,编辑modules.d/nginx.yml
      yaml - module: nginx access: enabled: true var.paths: ["/var/log/nginx/access.log*"] # Nginx访问日志路径
    • 启用Logstash模块
      * 启用命令:
      bash ./filebeat modules enable logstash
      * 若需修改Logstash日志路径,编辑modules.d/logstash.yml
      yaml - module: logstash log: enabled: true var.paths: ["/home/fox/logstash-8.14.3/logs/*.log"] # Logstash日志路径
  • 启动FileBeat
    • 加载Kibana仪表板(可选,若未配置过则执行)
    1./filebeat setup  
    • 启动FileBeat
    1./filebeat -e  
    • 验证启动效果。启动成功后,可在Kibana的Observability → Logs → Stream中查看收集到的日志(如Logstash的日志),确认数据已成功传输和可视化;

4.5 练习1:FileBeat采集Tomcat服务器日志并发送到Logstash

  • 需求背景:Tomcat服务器运行时产生大量日志(如访问日志),需通过FileBeat采集这些日志,并发送到Logstash进行后续处理;
  • 配置FileBeat采集Tomcat日志并发送到Logstash
    • 创建配置文件filebeat-tomcat.yml。配置FileBeat的日志采集路径、多行合并规则,以及Logstash的连接信息:
    1filebeat.inputs:  
    2- type: log  
    3  enabled: true  
    4  paths:  
    5    - /home/fox/apache-tomcat-9.0.93/logs/*access*.*  # Tomcat访问日志路径  
    6  # 多行合并规则(因Tomcat日志以IP开头,需合并非IP开头的行到上一行)  
    7  multiline.pattern: '^\d+\.\d+\.\d+\.\d+ '  # 匹配IP开头的正则  
    8  multiline.negate: true  # 对pattern取反(即非IP开头的行)  
    9  multiline.match: after  # 合并到上一行的末尾  
    10output.logstash:  
    11  enabled: true  
    12  hosts: ["localhost:5044"]  # Logstash监听地址与端口  
       * 其中,`multiline`参数用于处理多行日志(如异常堆栈):  
                * `pattern`:正则表达式,匹配行的特征;  
                * `negate`:[`true`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.true.md)表示对`pattern`结果取反;  
                * `match`:`after`表示将匹配行合并到上一行末尾;  
    
    • 启动FileBeat并指定配置文件。执行命令启动FileBeat:
    1./filebeat -e -c filebeat-tomcat.yml  
    • 可能出现的异常及解决
      * 异常1:配置文件权限过高(安全限制)
      * 报错:Exiting: error loading config file: config file "...yml" can only be writable by the owner...
      * 解决:修改文件权限为644(仅 owner 可写):
      bash chmod 644 filebeat-tomcat.yml
      * 异常2:无法连接Logstash
      * 报错:Failed to connect to backoff(async(tcp://...)): dial tcp ...: connect: connection refused
      * 原因:未启动或未正确配置Logstash,导致FileBeat无法连接;
  • 配置Logstash接收FileBeat数据并打印
    • 创建Logstash配置文件logstsh-tomcat.conf。配置Logstash的Beats输入插件(监听FileBeat的连接)和终端输出(用于调试):
    1input {  
    2  beats {  
    3    port => 5044  # 与FileBeat配置的端口一致  
    4  }  
    5}  
    6output {  
    7  stdout {  
    8    codec => rubydebug  # 以结构化格式输出到终端  
    9  }  
    10}  
    • 测试Logstash配置是否正确**。执行命令验证配置语法:
    1bin/logstash -f config/logstsh-tomcat.conf --config.test_and_exit  
       * 若输出`Config Validation Result: OK`,则配置正确;  
    
    • 启动Logstash。执行命令启动Logstash,并启用配置自动重载:
    1bin/logstash -f config/logstsh-tomcat.conf --config.reload.automatic  
  • 测试:访问Tomcat,验证日志传输。访问Tomcat服务后,查看Logstash的终端输出,若能看到类似以下的结构化日志数据,说明传输成功:
1{  
2  "tags" => ["beats_input_codec_plain_applied"],  
3  "input" => { "type" => "log" },  
4  "event" => { "original" => "192.168.65.103 - - [30/Aug/2024:13:29:11 +0800] \"GET /docs/ HTTP/1.1\" 403 904" },  
5  "@version" => "1",  
6  "log" => { "offset" => 1559, "file" => { "path" => "/home/fox/apache-tomcat-9.0.93/logs/localhost_access_log.2024-08-30.txt" } },  
7  "message" => "192.168.65.103 - - [30/Aug/2024:13:29:11 +0800] \"GET /docs/ HTTP/1.1\" 403 904",  
8  "agent" => { "id" => "ed5031e7-fc13-441c-b87a-b554cb02df3c", "type" => "filebeat", "ephemeral_id" => "3f03ad41-d6f2-4483-9b78-9c7c456d031c", "version" => "8.14.3", "name" => "192-168-65-211" }  
9}  

4.6 练习2:整合ELK采集与分析Tomcat日志

4.6.1 Logstash输出数据到Elasticsearch

  • 若需将Tomcat日志从Logstash同步到Elasticsearch,需修改Logstash的output配置;
  • 配置文件logstsh-tomcat.conf
1input {  
2  beats {  
3    port => 5044  # 接收FileBeat数据的端口  
4  }  
5}  
6output {  
7  elasticsearch {  
8    hosts => ["http://localhost:9200"]  # ES地址  
9    index => "tomcat-logs"  # ES索引名  
10    user => "elastic"  # ES认证用户名  
11    password => "123456"  # ES认证密码  
12  }  
13  stdout {  
14    codec => rubydebug  # 终端调试输出(可选)  
15  }  
16}  
  • 启动Logstash
1bin/logstash -f config/logstsh-tomcat.conf --config.reload.automatic  
  • 测试ES数据存储。通过ES检索API查询tomcat-logs索引,若能看到结构化的Tomcat日志数据,说明同步成功;

4.6.2 利用Logstash过滤器解析日志

  • 为了将Tomcat日志从“原始字符串”解析为结构化字段(如IP、时间、请求方式等),需借助Logstash的GrokMutateDate插件;
  • 查看Logstash已经安装的插件:
1bin/logstash-plugin list  

4.6.2.1 Grok插件:日志结构化解析
  • 官方文档:[Grok filter plugin | Logstash Reference 8.14] | Elastic
  • 作用:将非结构化日志通过正则匹配解析为结构化字段,可理解为“增强版正则表达式”。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是其他任意的日志格式;
  • 语法%{SYNTAX:SEMANTIC},其中SYNTAX是Grok模式名称(如IPHTTPDATE),SEMANTIC是自定义字段名(如ipdate);
    • 例:
    1%{NUMBER:duration} %{IP:client}  
    2// duration表示:匹配一个数字,client表示匹配一个IP地址  
    • 默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:%{NUMBER:duration:int} %{IP:client}
  • 常用的Grok模式GROK模式语法参考-日志服务-阿里云
  • 用法
1filter {  
2  grok {  
3    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }  
4  }  
5}  
  • 比如针对下面的Tomcat访问日志格式:
1192.168.65.103 - - [23/Jun/2022:22:37:23 +0800] "GET /docs/images/docs-stylesheet.css HTTP/1.1" 200 5780  
  • 解析后的字段:
    | 字段名 | 说明 |
    | --------- | -------------- |
    | client IP | 浏览器端IP |
    | timestamp | 请求的时间戳 |
    | method | 请求方式(GET/POST) |
    | uri | 请求的链接地址 |
    | status | 服务器端响应状态 |
    | length | 响应的数据长度 |
  • Grok模式为:
1%{IP:ip} - - \[%{HTTPDATE:date}\] "%{WORD:method} %{PATH:uri} %{DATA:protocol}" %{INT:status:int} %{INT:length:int}  
  • 为了方便测试,可以使用Kibana进行Grok开发:
  • 修改Logstash配置文件:
1vim config/logstash-console.conf  
2input {  
3    beats {  
4      port => 5044  
5    }  
6}  
7filter {  
8  grok {  
9    match => {  
10    "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}"  
11    }  
12}  
13}  
14output {  
15    stdout {  
16      codec => rubydebug  
17    }  
18}  
  • 启动Logstash测试:
1bin/logstash -f config/logstash-console.conf --config.reload.automatic  

4.6.2.2 Mutate插件:字段过滤
  • 用于删除不需要的字段,精简数据结构:
1mutate {  
2  enable_metric => "false"  
3  remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]  
4}  
4.6.2.3 Date插件:日期格式转换
  • 官方文档:[Date filter plugin | Logstash Reference 8.14] | Elastic
  • 用法:
  • date字段转换为「年月日 时分秒」格式。默认字段经过Date插件处理后,会输出到@timestamp字段,所以可以通过修改target属性来重新定义输出字段:
1date {  
2  match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]  
3  target => "date"  
4}  
4.6.2.4 filter完整的配置
  • filter完整的配置:
  • 测试效果:
4.6.2.5 输出到ElasticSearch指定索引
  • 通过index参数指定ES索引名称,支持时间格式化语法(如%{+YYYY-MM}按年月拆分索引)。注意:
    • 索引名不能包含大写字符;
    • 若使用时间格式化,Filter需确保输出包含@timestamp字段,否则日期解析会失败;
1output {  
2  elasticsearch {  
3    index => "tomcat_web_log_%{+YYYY-MM}"  
4    hosts => ["http://localhost:9200"]  
5    user => "elastic"  
6    password => "123456"  
7  }  
8  stdout{  
9    codec => rubydebug  
10  }  
11}  
  • 完整的Logstash配置logstash-tomcat-es.conf
1input {  
2  beats {  
3    port => 5044  
4  }  
5}  
6filter {  
7  grok {  
8    match => {  
9      "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}"  
10    }  
11  }  
12  mutate {  
13    enable_metric => "false"  
14    remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]  
15  }  
16  date {  
17    match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]  
18    target => "date"  
19  }  
20}  
21output {  
22  stdout {  
23    codec => rubydebug  
24  }  
25  elasticsearch {  
26    index => "tomcat_web_log_%{+YYYY-MM}"  # 按年月生成索引  
27    hosts => ["http://localhost:9200"]  
28    user => "elastic"  
29    password => "123456"  
30  }  
31}  
  • 启动Logstash:
1bin/logstash -f config/logstash-tomcat-es.conf --config.reload.automatic  
  • 查询ES中是否有数据:
  • 通过Kibana分析服务日志:
    • 在Kibana的Stack Management → 数据视图中,创建针对tomcat_web_log_*索引的视图,指定时间字段(如@timestamp);
    • 进入Discover模块,选择刚创建的数据视图,即可:

      * 查看日志的结构化字段(如ipmethodstatus等);
      * 筛选特定条件的日志(如status: 403的错误请求);

5 微服务整合ELK实现日志采集与分析实战

5.1 实现思路分析

  • Spring Boot微服务日志流向ELK的流程如下:
    • Spring Boot应用生成日志数据,通过Logback日志框架记录日志;
    • Logstash作为日志收集器,接收Spring Boot发送的日志数据;
    • Logstash解析和过滤日志数据(可选格式化、处理);
    • 处理后的日志发送到Elasticsearch,存储在分布式索引中;
    • Kibana连接Elasticsearch,实现日志数据的可视化分析;

5.2 微服务整合Logstash实现日志采集

  • 使用Logstash日志插件(引入依赖)。在Spring Boot项目的pom.xml中引入Logstash-Logback依赖,实现日志向Logstash的传输:
1<dependency>  
2  <groupId>net.logstash.logback</groupId>  
3  <artifactId>logstash-logback-encoder</artifactId>  
4  <version>6.3</version>  
5</dependency>  
  • 配置Logback(logback-spring.xml。通过Logback配置,将日志发送到Logstash的TCP端口:
1<?xml version="1.0" encoding="UTF-8"?>  
2<configuration debug="false">  
3  <property name="LOG_HOME" value="logs/elk-demo.log" />  
4  <!-- 控制台输出Appender -->  
5  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">  
6    <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">  
7      <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>  
8    </encoder>  
9  </appender>  
10  <!-- Logstash TCP输出Appender -->  
11  <appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">  
12    <destination>192.168.65.211:4560</destination> <!-- Logstash监听的IP和端口 -->  
13    <encoder class="net.logstash.logback.encoder.LogstashEncoder">  
14      <customFields>{"appname": "elk-demo"}</customFields> <!-- 自定义字段(应用标识) -->  
15    </encoder>  
16  </appender>  
17  <!-- 日志输出级别 -->  
18  <root level="INFO">  
19    <appender-ref ref="STDOUT" />  
20    <appender-ref ref="logstash" /> <!-- 启用Logstash输出 -->  
21  </root>  
22</configuration>  
  • 配置Logstash并启动(elk-demo.conf。创建Logstash配置文件,定义输入(TCP接收日志)、输出(Elasticsearch存储):
1input {  
2  tcp {  
3    host => "0.0.0.0"  # 监听所有IP  
4    port => 4560       # 与Logback配置的端口一致  
5    mode => "server"  
6    codec => json_lines  # 日志格式为JSON行  
7  }  
8  stdin {}  # 标准输入(可选,用于调试)  
9}  
10filter {  
11  # 可选:日志过滤、解析逻辑  
12}  
13output {  
14  stdout {  
15    codec => rubydebug  # 终端调试输出(可选)  
16  }  
17  elasticsearch {  
18    hosts => ["127.0.0.1:9200"]  # Elasticsearch地址  
19    index => "%{[appname]}-%{+YYYY.MM.dd}"  # 按应用名和日期生成索引  
20  }  
21}  
  • 启动Logstash:
1bin/logstash -f config/elk-demo.conf  # 后台启动可加 &  

5.3 测试验证

  • Logstash控制台验证。调用Spring Boot应用的接口,触发日志输出,查看Logstash控制台,若能看到类似以下的结构化日志,说明接收成功:
1{  
2  "level" => "ERROR",  
3  "@timestamp" => 2024-08-30T08:24:03.429Z,  
4  "thread_name" => "http-nio-8080-exec-1",  
5  "logger_name" => "org.tuling.elkdemo.controller.HelloController",  
6  "@version" => "1",  
7  "message" => "HelloController执行-----log.error",  
8  "level_value" => 40000,  
9  "appname" => "elk-demo"  
10}  
  • Elasticsearch索引验证。在Kibana的索引管理中,查看是否存在以elk-demo-开头的索引(如elk-demo-2024.08.30),验证日志已存储到ES;

5.4 通过Kibana分析微服务日志

  • 创建数据视图。在Kibana的Stack Management → 数据视图中,创建针对elk-demo*索引的视图,指定时间字段(如@timestamp);
  • Discover模块分析日志。进入Discover模块,选择刚创建的数据视图,即可:
    • 查看微服务日志的结构化字段(如levelmessagethread_name等);
    • 筛选特定级别、特定服务的日志(如level: ERROR的错误日志);

分布式专题——56 微服务日志采集与分析系统实战》 是转载文章,点击查看原文


相关推荐


如果让我从头再来学习并发编程
桦说编程2025/11/18

大学时,我学习了一本国外的教科书,书名叫做《计算机网络——自顶向下方法》,这本书改变了我看待学习的角度。学习的顺序不是一成不变的,常规的路线通常从底层学习,这本书从应用层面入手,逐步讲解到底层,以一种对常规学习路线相反的方向学习,在我看来恰恰学习计算机网络最轻松上手的路径。很多时候我们追求一步到位,鞭辟入里的理解,反而忽略对于初学者,最佳的学习路线往往是兴趣与试错、感性和求索交织的,认识是逐渐深刻的。 知之者往往陷入知识的诅咒,知道的内容很难遗忘,甚至忘记了自己也是从不理解到懂的。有些书籍和文章


Python 的内置函数 setattr
IMPYLH2025/11/17

Python 内建函数列表 > Python 的内置函数 setattr Python 的内置函数 setattr() 用于动态设置对象的属性值。该函数接受三个参数:对象、属性名称字符串和属性值。当我们需要在运行时为对象添加或修改属性时,setattr() 提供了灵活的操作方式。 基本语法: setattr(object, attribute_name, value) 详细说明: 参数解析: object:需要设置属性的目标对象attribute_name:字符串形式的属性名


Bash 的 chown 命令
hubenchang05152025/11/16

#Bash 的 chown 命令 chown [OPTION]... [OWNER][:[GROUP]] FILE... 功能 修改文件的所有者和所属组。 类型 可执行文件(/usr/bin/chown),属于 coreutils。 参数 OPTION 选项: -c, --changes - 仅对发生变化的文件打印详细信息 -f, --silent, --quiet - 忽略大部分错误信息 -v, --verbose - 打印详细信息 --dereference - 影响符号链接引用的源文


【软件测试】《集成测试全攻略:Mock/Stub 原理 + Postman/JUnit/TestNG 实战》
云知谷2025/11/15

集成测试:一场“团队协作”的精彩大戏! 想象一下,你正在筹备一场超级英雄电影的首映礼!每个超级英雄(比如钢铁侠、美国队长、雷神)都是独立的组件,他们各自的能力(功能)都经过了严格测试(单元测试),证明他们“单兵作战”很强。 但是!电影上映时,他们必须一起合作——钢铁侠开战甲,美国队长指挥战术,雷神召唤闪电,才能打败灭霸(系统级问题)。如果他们配合不好(比如钢铁侠的战甲和美国队长的盾牌不兼容,或者雷神的闪电把战甲炸了),那电影就砸了! 这时候,集成测试(Integration Testing


DeepSeek-OCR实战(01):基础运行环境搭建-Ubuntu
paopao_wu2025/11/13

DeepSeek-OCR实战是一个系列文章,包含了从基础运行环境搭建到应用接入全过程。本章为:基础运行环境搭建,操作系统采用 Ubuntu Server 24 环境版本ubuntu-24.04.3 Serverrelease 10.0Cuda11.8显卡 RTX 2080 Ti 22G驱动 NVIDIA-Linux-x86_64-580.105.08conda25.9.1git2.47.3 1.操作系统基础安装 安装 Ubuntu 24 Server 版本后(全部默认安装),查看一下磁盘


圆桌论坛精华实录 | AI是重构运维逻辑的颠覆性革命?博睿数据与行业大咖亲授“AI+可观测性”的破局之道
Bonree博睿数据2025/11/12

全文约6500字  阅读时间约15分钟。 当前,人工智能正处于高速发展阶段,以前所未有的深度与广度重塑商业规则,推动企业数字化转型从规模化扩张迈入精细化深耕。面对这场汹涌而来的智能变革,运维领域正面临一道核心命题:AI究竟是提升效率的辅助工具,还是重构运维逻辑的颠覆性革命? 国内金融、制造等关键行业已步入数字化深水区,却普遍陷入运维复杂度激增、故障定位滞后、数据价值难以转化等行业焦虑。如何让AI技术真正落地运维场景?如何通过可观测性打通全链路数据孤岛?如何平衡技术创新与业务实用价值?


对于数据结构:堆的超详细保姆级解析——下(堆排序以及TOP-K问题)
ShineWinsu2025/11/10

开篇介绍: hello 大家,我们又见面了,在上一篇博客中,我们共同探索了如何实现堆这么一个数据结构,相信大家经过上篇博客的学习,对堆的了解程度以及掌握程度,都有了极大的水平提升。 那么堆,有什么用呢?首先,作为一个数据结构,它肯定具有存储数据的功能,这是毋庸置疑的,但是呢,我们知道,堆有大堆和小堆之分,那么那么,这一个知识点,蕴含着什么秘密呢? 诶,不错,就是我们标题所说的——堆排序,我们之前学过了冒泡排序以及qsort函数排序,但是呢说实话,这两种排序方式,效率都不高,说难听一点就是在


【Linux】进程初阶(1)——基本进程理解
终焉代码2025/11/8

目录 前言 1.1进程基本理解 1.2进程描述 1.3查看进程 1.4通过系统调用的基本进程操作 1.4.1通过系统调用获取pid 1.4.2通过系统调用创建进程 前言 操作系统对计算机软件与硬件进行管理的方式是先描述再组织,而PCB就是那个"描述",那到底什么是PCB?PCB又在描述什么呢?更多Linux学习内容看准Linux专栏 1.1进程基本理解 在操作系统中,我们运行的一个个软件本质上都是程序。例如我们在windows上打开浏览器时,本质是


Bash 的 cd 命令
hubenchang05152025/11/6

#Bash 的 cd 命令 cd [-L|-P] [DIRECTORY] 功能 切换工作目录。 类型 Bash 内置命令。 参数 OPTION 选项: -L - 逻辑路径;在跟踪符号链接之前解析 ..(默认) -P - 物理路径;在跟踪符号链接之后解析 .. DIRECTORY - 要切换到的目录路径;省略表示切换到用户目录,- 表示切换到上次的工作目录 #示例 基本示例 $ pwd # 查看当前路径 /home/user/primer


规训 AI Agent 实践
清沫2025/11/1

AI 编程工具目前的发展可谓是三十年河东三十年河西,时不时就会有爆炸性的能力提升。初步使用,效果极其惊艳,随着使用的加深,就会发现 AI 会时不时犯蠢。本文总结 AI 协作的一些实践,希望帮助你让 AI 成为更可靠的编程伙伴。 别着急动手, 先制定计划 很多人使用 AI 编程工具时,习惯直接让 AI "帮我实现 xxx 功能",然后 AI 就立即开始写代码。这种方式在简单需求下可以工作,但面对复杂任务时容易出问题。等待十几分钟漫长的生码过程后,发现结果与预期相差甚远,既费时又费 token。 因

首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2025 聚合阅读