一、日志分析概述
日志分析是运维工程师解决系统故障,发现问题的主要手段
日志主要包括系统日志、应用程序日志和安全日志
系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及错误发生的原因
经常分析日志可以了解服务器的负荷,性能安全性,从而及时采取措施纠正错误
二、日志分析的作用
分析日志时刻监控系统运行的状态
分析日志来定位程序的bug
分析日志监控网站的访问流量
分析日志可以知道哪些sql语句需要优化
三、ELK概述 ELFK 已经成为目前最流行的集中式日志解决方案,它主要是由 filebeat、Logstash、Elasticsearch、Kibana 等组件组成,来共同完成实时日志的收集,存储,展示等一站式的解决方案。本文将会介绍 ELK 常见的架构以及相关问题解决。
Filebeat:Filebeat 是一款轻量级,占用服务资源非常少的数据收集引擎,它是 ELK 家族的新成员,可以代替 Logstash 作为在应用服务器端的日志收集引擎,支持将收集到的数据输出到 Kafka,Redis 等队列。
Logstash:数据收集引擎,相较于 Filebeat 比较重量级,但它集成了大量的插件,支持丰富的数据源收集,对收集的数据可以过滤,分析,格式化日志格式。
Elasticsearch:分布式数据搜索引擎,基于 Apache Lucene 实现,可集群,提供数据的集中式存储,分析,以及强大的数据搜索和聚合功能。
Kibana:数据的可视化平台,通过该 web 平台可以实时的查看 Elasticsearch 中的相关数据,并提供了丰富的图表统计功能
四、日志采集常见部署架构 filebeat作为日志收集器
使用 Filebeat 作为日志采集器,通常可以将其直接作为日志收集的前端组件,负责读取日志文件并发送到后续Elasticsearch 中。之后kabana做出图展示。
引入缓存队列的部署架构
该架构在第二种架构的基础上引入了 Kafka 消息队列(还可以是其他消息队列),将 Filebeat 收集到的数据发送至 Kafka,然后在通过 Logstasth 读取 Kafka 中的数据,这种架构主要是解决大数据量下的日志收集方案,使用缓存队列主要是解决数据安全与均衡 Logstash 与 Elasticsearch 负载压力。
本次我们部署efk架构。filebeat作为日志采集器,Elasticsearch存储数据,kibana展示采集的数据。
部署 Elasticsearch 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 1. 下载软件包 [root@node1 .local ~]# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.22-amd64.deb 2. 安装 [root@node1 .local ~]# dpkg -i elasticsearch-7.17.22-amd64.deb 3. 修改配置文件 [root@node1 .local ~]# vim /etc/elasticsearch/elasticsearch.yml cluster.name: cherry path.data: /var/lib/elasticsearch path.logs: /var/log /elasticsearch network.host: 0.0 .0 .0 http.port: 9200 transport.port: 9300 discovery.type: "single-node" 4. 启动服务 [root@node1 .local ~]# systemctl enable --now elasticsearch.service Synchronizing state of elasticsearch.service with SysV service script with /usr/lib/systemd/systemd-sysv-install. Executing: /usr/lib/systemd/systemd-sysv-install enable elasticsearch Created symlink /etc/systemd/system/multi-user.target.wants/elasticsearch.service → /usr/lib/systemd/system/elasticsearch.service.5. 查看端口验证服务是否启动成功 [root@node1 .local ~]# ss -ntl | grep "9[2|3]00" LISTEN 0 65535 *:9300 *:* LISTEN 0 65535 *:9200 *:* 6. 访问 [root@node1 .local ~]# curl http://192.1.7.244:9200/ { "name" : "node1.local" , "cluster_name" : "cherry" , "cluster_uuid" : "1-AfkL1DRXKIMuhaQuoFLQ" , "version" : { "number" : "7.17.22" , "build_flavor" : "default" , "build_type" : "deb" , "build_hash" : "38e9ca2e81304a821c50862dafab089ca863944b" , "build_date" : "2024-06-06T07:35:17.876121680Z" , "build_snapshot" : false , "lucene_version" : "8.11.3" , "minimum_wire_compatibility_version" : "6.8.0" , "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }7. 查看节点信息 [root@node1 .local ~]# curl 192.1.7.244:9200/_cat/nodes 192.1 .7 .244 18 99 2 0.05 0.15 0.07 cdfhilmrstw * node1.local [root@node1 .local ~]# [root@node1 .local ~]# curl 192.1.7.244:9200/_cat/nodes?v ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name192.1 .7 .244 18 99 0 0.04 0.14 0.07 cdfhilmrstw * node1.local
部署kibana 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 1 .下载kibana [[email protected] ~]2 .安装 [[email protected] ~]3 .修改配置文件 [[email protected] ~] ... server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: ["http://192.1.7.244:9200" ] i18n.locale: "zh-CN" 4 .启动 [[email protected] ~] [[email protected] ~] LISTEN 0 511 0.0 .0.0 :5601 0.0 .0.0 :* 5 .访问 [[email protected] ~] HTTP/1.1 302 Foundlocation: /spaces/enter x-content-type-options: nosniffreferrer-policy: no-referrer-when-downgradecontent-security-policy: script-src 'unsafe-eval' 'self'; worker-src blob: 'self'; style-src 'unsafe-inline' 'self'kbn-name: node1.localkbn-license-sig: f4384e4807b75451c2f4feacc4d54e13f9397d4b4f115c922f956f9017453f91cache-control: private, no-cache, no-store, must-revalidatecontent-length: 0 Date: Thu, 09 Jan 2025 05 :51 :05 GMTConnection: keep-aliveKeep-Alive: timeout= 120
部署filebeat
1 2 3 4 5 6 7 8 1. 下载 [root@node1 .local ~]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.22-amd64.deb 2. 安装 [root@node1 .local ~]# dpkg -i filebeat-7.17.22-amd64.deb 3. 验证是否安装成功 [root@node1 .local ~]# filebeat -h
filebeat组成部分
input:数据从哪里来? output:数据去哪里?
filebeat结构
- input: 数据从哪来? - stdin - tcp - log
- output: 数据到哪去? - console - elasticsearch
五、测试 定义filebeat采集的日志索引模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 [[email protected] filebeat ]filebeat.inputs: - type: log paths: - /var/log/*.log output.elasticsearch: hosts: - "http://192.1.7.244:9200" index: "system-log" setup.ilm.enabled: false setup.template.name: "system" setup.template.pattern: "system-log*" setup.template.overwrite: false setup.template.settings: index.number_of_shards: 5 index.number_of_replicas: 0
执行filebeat采集日志
1 [root@node1 .local filebeat]# filebeat -e -c /root/filebeat/systemlog.yaml
查看索引
添加索引
查看日志
六、logstash filebeat是基于logstash再次开发的,logstash可以对自己公司自研产品进行自定义日志,选择要截取的日志。
Logstash 是一个具有实时管道功能的开源数据收集引擎。Logstash可以动态统一来自不同来源的数据,并将数据规范化到您选择的目标中。为了多样化的高级下游分析和可视化用例,清理和使所有数据平等化。
虽然 Logstash 最初在日志收集方面推动了创新,但它的能力远远超出了该用例。任何类型的事件都可以通过广泛的输入、过滤和输出插件进行增强和转换,许多本地编解码器进一步简化了摄入过程。Logstash 通过利用更多的数据量和种类加速您的洞察力。
工作原理
Logstash 事件处理管道有三个阶段:输入 → 过滤器 → 输出 。
inputs 模块负责收集数据,filters 模块可以对收集到的数据进行格式化、过滤、简单的数据处理,outputs 模块负责将数据同步到目的地,Logstash的处理流程,就像管道一样,数据从管道的一端,流向另外一端。
inputs 和 outputs 支持编解码器,使您能够在数据进入或离开管道时对数据进行编码或解码,而无需使用单独的过滤器。
部署logstash
1 2 3 4 5 6 7 8 9 1 .下载logstash [root@node1 .local ~ ]# wget https:2 .安装logstash [root@node1 .local ~ ]# dpkg - i logstash- 7.17 .22 - amd64.deb3 .配置环境变量 [root@node1 .local ~ ]# ln - svf /usr/ share/logstash/ bin/logstash / usr/local/ sbin/ '/usr/ local/sbin/ logstash' -> '/usr/ share/logstash/ bin/ logstash'
logstash结构
- pipeline - input - stdin - file - filter - mutate - output - stdout - elasticsearch
fileter插件
grok:logstash 中最常用的日志解释和结构化插件。
grok :是一种采用组合多个预定义的正则表达式,用来匹配分割文本并映射到关键字的工具。 mutate :支持事件的变换,例如重命名、移除、替换、修改等
drop :完全丢弃事件
clone :克隆事件
geoip:添加关于 IP 地址的地理位置信息
date:专门用于处理日志的日期模块,将其作为时间戳可以替换”@timestamp”字段。
案例编写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 [[email protected] conf.d] input { file { path => ["/var/log/*.log" ] start_position => "beginning" } } filter { mutate { split => { "message" => "|" } add_field => { "other" => "%{[message][0]}" "userId" => "%{[message][1]}" "action" => "%{[message][2]}" "svip" => "%{[message][3]}" "price" => "%{[message][4]}" } } mutate { split => { "other" => " " } add_field => { datetime => "%{[other][1]} %{[other][2]}" } remove_field => [ "message" , "other" , "@version" ] } mutate { convert => { "price" => "float" "userId" => "integer" } } date { match => [ "datetime" , "yyyy-MM-dd HH:mm:ss" ] } } output { stdout {} elasticsearch { hosts => ["http://192.1.7.244:9200" ] index => "system-%{+yyyy.MM.dd}" } }
启动logstash实例
1 [root@node1.local ~]# logstash -rf /etc/logstash/conf .d /03-file -filter-elasticsearch.conf
kibana查看索引
创建索引
查看日志数据
很明显,相对于filebeat来说。logstash这时候已经精准过滤出我们需要的字段了
七、常见问题及解决方案 问题:如何实现日志的多行合并功能?
系统应用中的日志一般都是以特定格式进行打印的,属于同一条日志的数据可能分多行进行打印,那么在使用 ELK 收集日志的时候就需要将属于同一条日志的多行数据进行合并。
解决方案:使用 Filebeat 或 Logstash 中的 multiline 多行合并插件来实现
在使用 multiline 多行合并插件的时候需要注意,不同的 ELK 部署架构可能 multiline 的使用方式也不同,如果是本文的第一种部署架构,那么 multiline 需要在 Logstash 中配置使用,如果是第二种部署架构,那么 multiline 需要在 Filebeat 中配置使用,无需再在 Logstash 中配置 multiline。
1、multiline 在 Filebeat 中的配置方式:
negate:默认为 false,表示匹配 pattern 的行合并到上一行;true 表示不匹配 pattern 的行合并到上一行
match:after 表示合并到上一行的末尾,before 表示合并到上一行的行首
如:该配置表示将不匹配 pattern 模式的行合并到上一行的末尾
1 2 3 pattern: negate: true match: after
2、multiline 在 Logstash 中的配置方式
(1)Logstash 中配置的 what 属性值为 previous,相当于 Filebeat 中的 after,Logstash 中配置的 what 属性值为 next,相当于 Filebeat 中的 before。 (2)pattern => “%{LOGLEVEL}\s*]“ 中的 LOGLEVEL 是 Logstash 预制的正则匹配模式,预制的还有好多常用的正则匹配模式,详细请看:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
问题:如何将 Kibana 中显示日志的时间字段替换为日志信息中的时间?
默认情况下,我们在 Kibana 中查看的时间字段与日志信息中的时间不一致,因为默认的时间字段值是日志收集时的当前时间,所以需要将该字段的时间替换为日志信息中的时间。
解决方案:使用 grok 分词插件与 date 时间格式化插件来实现
在 Logstash 的配置文件的过滤器中配置 grok 分词插件与 date 时间格式化插件,如:
如要匹配的日志格式为:“[DEBUG][20170811 10:07:31,359][DefaultBeanDefinitionDocumentReader:106] Loading bean definitions”,解析出该日志的时间字段的方式有:
① 通过引入写好的表达式文件,如表达式文件为 customer_patterns,内容为: CUSTOMER_TIME %{YEAR}%{MONTHNUM}%{MONTHDAY}\s+%{TIME} 注:内容格式为:[自定义表达式名称] [正则表达式] 然后 logstash 中就可以这样引用:
② 以配置项的方式,规则为:(?< 自定义表达式名称> 正则匹配规则),如:
1 2 3 4 5 filter { grok { match => [ "message" , "(?<customer_time>%{YEAR}%{MONTHNUM}%{MONTHDAY}\s+%{TIME})" ] } }
问题:如何在 Kibana 中通过选择不同的系统日志模块来查看数据
一般在 Kibana 中显示的日志数据混合了来自不同系统模块的数据,那么如何来选择或者过滤只查看指定的系统模块的日志数据?
解决方案:新增标识不同系统模块的字段或根据不同系统模块建 ES 索引
1、新增标识不同系统模块的字段,然后在 Kibana 中可以根据该字段来过滤查询不同模块的数据
这里以第二种部署架构讲解,在 Filebeat 中的配置内容为:
通过新增:log_from 字段来标识不同的系统模块日志
2、根据不同的系统模块配置对应的 ES 索引,然后在 Kibana 中创建对应的索引模式匹配,即可在页面通过索引模式下拉框选择不同的系统模块数据。 这里以第二种部署架构讲解,分为两步:
① 在 Filebeat 中的配置内容为:
通过 document_type 来标识不同系统模块
② 修改 Logstash 中 output 的配置内容为:
在 output 中增加 index 属性,%{type} 表示按不同的 document_type 值建 ES 索引