文章前言
在这篇博文中Anand Tiwari将讲述他在建立这样一个监控和警报系统时的经历和面临的挑战
背景介绍
在2017年OWASP引入了一个新的风险\\”日志记录和监控不足\\”,作为其三年一次更新的Web应用程序十大风险列表的一部分,虽然不是直接的漏洞但是OWASP将日志记录和监控不足列为有效的日志记录和监控是一项重要的防御措施,通过持续监控日志文件来快速检测异常情况可以帮助公司快速识别和响应攻击,从而潜在地预防攻击
OWASP建议:
-
应该有一个系统来记录各种认证和授权事件,例如:失败的登录尝试,暴力等
-
应该建立一个有效的监测和警报系统来发现可疑活动并及时做出反应
-
应该采用行业标准事件响应和恢复计划,例如:NIST 800-61第2版或更高版本
-
如果出现妥协,必须通知相关团队
不管我们是拥有一个小型组织还是企业级组织,我们必须考虑的一件事是监控对我们的应用程序和网络的攻击,我们需要一个实时监控系统来保护我们的应用以防有人试图攻击,我们可以识别攻击并阻止它或采取必要的措施,为了实现这一点,我们需要一个集中式系统,在该系统中用户可以持续监控日志并在仪表板中可视化数据,且拥有一个通知系统,在该系统中可以通知攻击
在这篇博客中我们将讨论如何在应用程序前将ModSecurity设置为Web应用程序防火墙(WAF),以及应用程序如何将其日志假脱机到ELK (Elasticsearch,Logstash,Kibana)堆栈以进行监控,并假脱机到ElastAlert以发出警报,这可以用于现有的SIEM(安全事故和事件监控)解决方案,或者作为使用开源解决方案的独立主动监控系统
工作流程
使用ModSecurity和ELK的连续监控和警报系统的高级工作流程可描述如下::
-
实现ModSecurity WAF
-
监控警报攻击模式和来源IP
-
使用ELK stack进行分析和可视化
-
分析ModSecurity WA日志中OWASP前10大风险
在网络中使用ModSecurity和ELK进行监控和报警的图示如下所示:
图表中编号实体的工作/角色如下:
-
WAF阻止了恶意请求
-
ModSecurity作为WAF运行
-
已配置Nginx代理服务器
-
服务器上承载的Web应用程序
-
WAF的日志通过Nginx日志和应用程序日志生成
-
Beats:将日志从服务器发送到Logstash
-
Logstash:一个开源数据处理管道,从多个来源获取数据
-
Elasticsearch:轻松大规模存储、搜索和分析数据
-
Kibana:可视化Elasticsearch数据,并为所需信息提供配置仪表板的选项
-
ElastAlert是一个开源框架,用于根据Elasticsearch中数据的给定模式发出警报
-
通过电子邮件/其他通信渠道收到的警报
现在让我们详细讨论一下每个实体:
ModSecurity
ModSecurity是一个WAF(Web Application Firewall),一个开源的工具包,它为Web应用程序防护者提供了对HTTP流量的可见性和针对攻击的高级保护,下面是将ModSecurity WAF设置为Nginx反向代理的参考:
https://github.com/andrewnk/docker-alpine-nginx-modsec
ModSecurity安装后将生成一个日志文件,其中包含所有被阻止的请求,基本上有三个日志文件将被配置到Nginx和ModSecurity配置文件中:
A、Error Logs
当在服务器上遇到错误或任何恶意尝试时会生成错误日志,因为我们已经用Nginx配置了我们的设置,所以所有的错误日志(包括Nginx错误)都是在同一个文件\\”error.log\\”中生成的,该文件默认位于以下路径:
/var/log/nginx/
B、Debug Logs
调试日志用于调试目的,对故障排除非常有用,它可以通过\\”modsecurity.conf\\”文件启用
C、Audit Logs
审计日志包含ModSecurity检测到恶意事件时生成的日志的详细信息,并包含有关系统客户端请求的有用信息,包括客户端标头和数据有效负载,默认情况下它是不启用的,可以通过\\”modsecurity.conf\\”配置文件进行配置,这里我们将只关注\\”error.log\\”并为我们的分析解析该信息
Elasticsearch,Logstash,Kibana
下面让我们了解一下Filebeat和ELK的作用:
-
Filebeat:Filebeat负责将所有日志转发给Logstash,后者可以进一步将其传递到管道中,它是轻量级的且支持SSL和TLS加密,非常可靠
-
Logstash:Logstash是一个用来解析日志并将其发送到Elasticsearch的工具,它功能强大,创建了一个管道和索引事件或日志,它可以用于弹性搜索生态系统
-
ElasticSearch:ES是一个高度可扩展的开源分析引擎,它允许我们快速存储、搜索和分析数据,当我们处理复杂的搜索特性和需求时,它通常是有用的,它还能够在Lucene标准分析器的基础上提供一个分布式系统来进行索引
-
Kibana:这是一个与Elasticsearch集群交互并可视化Elasticsearch数据的UI工具
现在让我们分析日志并了解在创建监控可视化时有用的所有参数,下图显示了一个示例攻击错误日志:
上面截图中的每个编号部分解释如下:
-
我们首先需要的是生成请求的客户机IP地址
-
下一个重要信息是ModSecurity配置文件路径,ModSecurity在其中定义了攻击规则,将从路径中提取攻击名称,在上图中文件路径为/usr/local/owasp-modsecurity-crs/rules/REQUEST-941-APPLICATION-ATTACK-XSS.conf
-
接下来的信息是ModSecurity生成的一条消息,这对我们今后的工作很有用
-
包含攻击参数和有效载荷的数据
-
最重要的我们从日志中提取的URI
-
用于跟踪的Unique_id值
Configuring ELK
你可以参考Rohit Salecha写的博文,在你的系统中配置Filebeat、Elasticsearch、Logstash和Kibana:
https://www.rohitsalecha.com/post/practical_devops_continous_monitoring_elasticsearch_logstash_kibana_filebeat/
Configure Logs with Filebeat
安装了Filebeat后我们将需要在Filebeat配置文件中提供日志,以便它可以将日志发送到Logstash,此外Logstash会将它们发送到Elasticsearch
Filebeat.yml配置文件:
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
filebeat.modules:
- module: logstash
log:
enabled: true
filebeat.inputs:
- type: log
enabled: true
paths:
/etc/log/nginx/error.log
output.logstash:
enabled: true
hosts: logstash_server
ssl.enabled: false
Configuring Logstash
Logstash配置文件采用JSON格式,位于\\”/etc/logstash/conf.d\\”中,配置文件由三部分组成:输入、过滤器、输出,我们创建了一个配置文件\\”beats-input.conf\\”,然后我们设置了我们的\\”Filebeat\\”输入:
beats-input.conf:
input {
beats {
port => 5044
codec => \\\"json\\\"
}
}
Filter {
if [message] =~ \\\"\\\\A\\\\{.+\\\\}\\\\z\\\" {
json {
source => \\\"message\\\"
target => \\\"httpRequest\\\"
}
mutate {
remove_field => [ \\\"json\\\", \\\"message\\\" ]
}
}
mutate {
remove_field => [ \\\"json\\\", \\\"agent\\\" ]
remove_field => [ \\\"json\\\", \\\"tags\\\" ]
remove_field => [ \\\"json\\\", \\\"thread_name\\\" ]
}
}
output {
elasticsearch {
hosts => [“elasticsearch_server:9200”]
manage_template => false
index => \\\"logstash-%{+YYYY.MM.dd}\\\"
}
}
Feeding Data into Elasticsearch
一切就绪后数据就被解析并发送到Elasticsearch服务器,它将快速索引和分析数据,接下来是使用Kibana进行可视化的Elasticsearch的设置
Setting up Kibana
为了从Elasticsearch获取数据,我们需要首先在Kibana中创建一个\\”索引模式\\”,然后按照下图所示的步骤操作:
Step 1:通过在索引模式字段中将索引模式定义为logstash-*来创建索引模式
Step 2:接下来在时间过滤器字段中提供@timestamp,这将确保按时间过滤您的数据
Step 3:点击\\”发现\\”图标查看您的日志
您应该看到所有WAF错误日志都反映在消息字段中
在Elasticsearch中输入日志后我们会将个人信息(如下所述)分离出来作为索引,这样我们就可以在仪表板上可视化所需的信息,我们需要仪表板日志中的以下信息:
-
Client IP Address
-
Attack Request
-
Attack Name
-
Attack Pattern (Payloads)
-
Attack URL
当日志从Logstash发送到Elasticsearch并在Kibana中呈现时,数据在\\”消息\\”字段中以非结构化的方式发送,在这种情况下查询有意义的信息会很麻烦,因为所有的日志数据都存储在一个键下,应该更好地组织日志消息,因此我们使用了Grok,它是Logstash中的一个过滤器插件,它将非结构化数据解析成结构化和可查询的数据,它使用文本模式来匹配日志文件中的行
如果你仔细观察原始数据你会发现它实际上是由不同的部分组成的,每个部分之间用一个空格隔开,让我们利用Logstash Grok过滤器并使用Grok过滤器模式创建结构化数据,Logstash Grok filter带有100多种用于结构化非结构化数据的内置模式,由于我们在modsecurity \\”error.log\\”数据的内置模式方面运气不好,我们使用一个名为Grok debugger的在线工具和一些有用的Grok模式构建了一个自定义的Grok模式
Grok支持正则表达式,Grok使用的正则表达式库是Oniguruma,更多细节可以访问Grok filter插件站点,使用oniguruma正则表达式可以匹配一段文本并将其保存为字段,语法如下:
(?<field_name>the pattern here)
首先让我们使用以下语法从消息数据中过滤时间戳:
(?<timestamp>%{YEAR}[./]%{MONTHNUM}[./]%{MONTHDAY} %{TIME})
现在我们将使用语法%{GREEDYDATA:field_name}作为攻击字段,隔离可以过滤的未格式化数据,GREEDYDATA的意思是\\”.*\\” ,根据周围的限制它们扩展到尽可能多的字符
我们已经通过使用Grok filter %{IP:client}过滤了客户端IP,该过滤器主要从日志数据中过滤IP地址:
下面是上述案例的Grok片段,解释了将无格式数据分离为攻击字段并删除消息字段
grok {
match => { \\\"message\\\" => \\\"(?<timestamp>%{YEAR}[./]%{MONTHNUM}[./]%{MONTHDAY} %{TIME}) %{GREEDYDATA:attack}, client: %{IP:client}, server: %{GREEDYDATA:server}\\\"}
remove_field => [\\\"message\\\"]
}
grok输出如下:
[error] 34#34: *215 [client 192.168.33.1] ModSecurity: Access denied with code 403 (phase 2). detected XSS using libinjection. [file \\\"/usr/local/owasp-modsecurity-crs/rules/REQUEST-941-APPLICATION-ATTACK-XSS.conf\\\"] [line \\\"37\\\"] [id \\\"941100\\\"] [rev \\\"\\\"] [msg \\\"XSS Attack Detected via libinjection\\\"] [data \\\"Matched Data: XSS data found within ARGS:email: \\\"><Script>alert(0)</script>\\\"] [severity \\\"2\\\"] [ver \\\"OWASP_CRS/3.2.0\\\"] [maturity \\\"0\\\"] [accuracy \\\"0\\\"] [tag \\\"application-multi\\\"] [tag \\\"language-multi\\\"] [tag \\\"platform-multi\\\"] [tag \\\"attack-xss\\\"] [tag \\\"OWASP_CRS\\\"] [tag \\\"OWASP_CRS/WEB_ATTACK/XSS\\\"] [tag \\\"WASCTC/WASC-8\\\"] [tag \\\"WASCTC/WASC-22\\\"] [tag \\\"OWASP_TOP_10/A3\\\"] [tag \\\"OWASP_AppSensor/IE1\\\"] [tag \\\"CAPEC-242\\\"] [hostname \\\"172.18.0.2\\\"] [uri \\\"/login.action\\\"] [unique_id \\\"158625916198.227197\\\"] [ref \\\"v661,27t:utf8toUnicode,t:urlDecodeUni,t:htmlEntityDecode,t:jsDecode,t:cssDecode,t:removeNulls\\\"]
现在我们需要从攻击字段数据中过滤出未格式化的值:
-
Attack Name
-
Attack Request
-
Attack Pattern (Payloads)
-
Attack URL
由于我们没有其他无格式值的Grok模式,我们可以使用正则表达式来查找无格式值,下面我们使用正则表达式来查找单个攻击名称,您可以使用此网站进行在线正则表达式创建、测试和调试-https://regex101.com/
如下图所示,在Grok调试器中我们提取了路径值,然后将/usr/local/owasp-modsecurity-CRS/rules/REQUEST-941-APPLICATION-ATTACK-XSS.conf路径值中的名称剥离为REQUEST-941-APPLICATION-ATTACK-XSS
grok {
match => {\\\"attack\\\" => \\\"(?<attack_file>\\\\[file \\\\\\\".+\\\\/(.*?).conf\\\\\\\"\\\\])\\\"}
}
grok {
match => {\\\"attack_file\\\" => \\\"(?<attack_name>[A-Z][^.]+)\\\"}
remove_field => [\\\"attack_file\\\"]
}
类似地我们从攻击字段数据中去除了其他值,并创建了一个包含所有隔离值的完整Logstash配置文件,完整日志存储配置
input
{
beats
{
ssl => false
port => 5000
codec => \\\"json\\\"
}
}
filter {
grok {
match => { \\\"message\\\" => \\\"(?<timestamp>%{YEAR}[./]%{MONTHNUM}[./]%{MONTHDAY} %{TIME}) \\\\[%{LOGLEVEL:severity}\\\\] %{POSINT:pid}#%{NUMBER:threadid}\\\\: \\\\*%{NUMBER:connectionid} %{GREEDYDATA:attack}, client: %{IP:client}, server: %{GREEDYDATA:server}\\\"}
remove_field => [\\\"message\\\"]
}
grok {
match => {\\\"attack\\\" => \\\"(?<attack_file>\\\\[file \\\\\\\".+\\\\/(.*?).conf\\\\\\\"\\\\])\\\"}
}
grok {
match => {\\\"attack_file\\\" => \\\"(?<attack_name>[A-Z][^.]+)\\\"}
remove_field => [\\\"attack_file\\\"]
}
grok {
match => {\\\"attack\\\" => \\\"(?<attack_message>\\\\[msg \\\\\\\"(.*?)\\\\\\\"\\\\])\\\"}
}
grok {
match => {\\\"attack\\\" => \\\"(?<attack_data>\\\\[data \\\\\\\"(.*?)\\\\\\\"\\\\])\\\"}
}
grok {
match => {\\\"attack\\\" => \\\"(?<attack_uri>\\\\[uri \\\\\\\"(.*?)\\\\\\\"\\\\])\\\"}
remove_field => [\\\"attack\\\"]
}
grok {
match => {\\\"attack_uri\\\" => \\\"(?<attack_url>[/].+\\\\\\\")\\\"}
}
if [message] =~ \\\"\\\\A\\\\{.+\\\\}\\\\z\\\" {
json {
source => \\\"message\\\"
target => \\\"httpRequest\\\"
}
mutate {
remove_field => [ \\\"json\\\", \\\"message\\\" ]
}
}
mutate {
remove_field => [ \\\"json\\\", \\\"agent\\\" ]
remove_field => [ \\\"json\\\", \\\"tags\\\" ]
remove_field => [ \\\"json\\\", \\\"thread_name\\\" ]
}
}
output {
elasticsearch {
hosts => [\\\"{{elasticsearch_server}}\\\"]
manage_template => false
index => \\\"logstash-%{+YYYY.MM.dd}\\\"
}
}
如您所见,现在Elasticsearch索引中有多个字段,它可以过滤单个值
Attack Dashboard
现在让我们创建一个包括所有攻击计数和模式的控制面板,根据我们的要求我们也可以将它可视化为饼状图或任何东西
参考链接
https://resources.infosecinstitute.com/topic/analyzing-mod-security-logs/
https://hackernoon.com/structuring-unstructured-data-with-grok-bcdbb240fcd1
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
https://logz.io/blog/filebeat-vs-logstash/
https://owasp.org/www-project-top-ten/2017/A10_2017-Insufficient_Logging%2526Monitoring.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html#_regular_expressions
原创文章,作者:七芒星实验室,如若转载,请注明出处:https://www.sudun.com/ask/34090.html