How to Use Elasticsearch and Filebeat integration with Kinetic Structured Logs
Overview
In our Kinops SaaS offering, we're leveraging our structured logs with Elasticsearch and Kibana to provide us with enhanced troubleshooting, analytics, and reporting capabilities.
Below are some screenshots to demonstrate some of the benefits of leveraging Elasticsearch and Kibana.
Custom dashboards for displaying activity, performance, and health in your environment.
Perform queries to find errors in the logs for a specific user. Maybe providing the capability to avoid having to ask the question 'when did you experience the problem' and slowing down the investigation.
The steps below go over how to setup Elasticsearch, Filebeat, and Kibana to produce some Kibana dashboards/visualizations and allow aggregate log querying. The steps below assume you already have an Elasticsearch and Kibana environment.
Elasticsearch Configuration
Create an Elasticsearch index template
The following is what we use for our index template - you may need to change some settings for your Elasticsearch environment and data retention requirements. Your Elasticsearch administrator can help determine this.
PUT _template/kinetic-request
{
"order": 0,
"template": "kinetic-request-*",
"settings": {
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"_default_": {
"_meta": {
"version": "5.0.0"
},
"dynamic_templates": [
{
"strings_as_keyword": {
"mapping": {
"ignore_above": 1024,
"type": "keyword"
},
"match_mapping_type": "string"
}
}
],
"_all": {
"norms": false
},
"properties": {
"access.addressChain": {
"type": "text"
},
"access.originAddress": {
"type": "ip"
},
"access.remoteAddress": {
"type": "ip"
},
"access.requestPath": {
"type": "text"
},
"access.requestProtocol": {
"type": "keyword"
},
"access.requestMethod": {
"type": "keyword"
},
"access.requestQuery": {
"type": "text"
},
"access.responseStatus": {
"type": "keyword"
},
"access.responseTime": {
"type": "integer"
},
"app.correlationId": {
"type": "keyword"
},
"app.sessionId": {
"type": "keyword"
},
"app.instanceId": {
"type": "keyword"
},
"app.requestId": {
"type": "keyword"
},
"app.space": {
"type": "keyword"
},
"app.user": {
"type": "keyword"
},
"auth.event": {
"type": "keyword"
},
"auth.message": {
"type": "text"
},
"auth.principal": {
"type": "keyword"
},
"auth.strategy": {
"type": "keyword"
},
"@timestamp": {
"format": "yyyy-MM-dd HH:mm:ss,SSS||yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"type": "date"
},
"catchall": {
"type": "text"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text"
},
"thread": {
"type": "keyword"
},
"timestamp": {
"format": "yyyy-MM-dd HH:mm:ss,SSS||yyyy-MM-dd'T'HH:mm:ss.SSSZ||EEE MMM d HH:mm:ss z yyyy||EEE MMM dd HH:mm:ss z yyyy",
"type": "date"
}
}
}
}
}
Create Elasticsearch ingestion pipelines
There are four ingestion pipelines that we created, one for each structured log. These pipelines help parse the log message into document fields inside of the indexes and allows for querying the log files by app.user, app.space, error level, response times, and more.
Note: The following pipelines assume you're using the default structured log format. If you've modified your %DATA_DIR%/config/log4j.xml or %DATA_DIR%/config/log4j-default.xml file for the structured log files 'ConversionPattern' then you will have to change the ingestion pipeline patterns listed below.
PUT _ingest/pipeline/kinetic-request-structured-access-v1
{
"description": "kinetic-request-structured-access-v1",
"on_failure": [
{
"grok": {
"field": "message",
"patterns": [
"%{MULTILINEDATA:catchall}"
],
"pattern_definitions": {
"MULTILINEDATA": "(.|\\r|\\n)*"
}
}
},
{
"set": {
"field": "timestamp",
"value": "{{_ingest.timestamp}}"
}
}
],
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:app.instanceId} %{NOTSPACE:app.sessionId} %{NOTSPACE:app.correlationId} %{NOTSPACE:app.requestId} %{SLUG:app.space} %{DATA:app.user} (%{ADDRESS_CHAIN:access.addressChain}|NO_ADDRESS) \\- %{GREEDYDATA:message}"
],
"pattern_definitions": {
"SLUG": "[a-zA-Z0-9-]+",
"ADDRESS_CHAIN": "(%{IP},?)+"
}
}
},
{
"grok": {
"field": "access.addressChain",
"ignore_missing": true,
"patterns": [
"^%{IP:access.originAddress},%{MULTIPLEIPS},%{IP:access.remoteAddress}$|^%{IP:access.originAddress},%{IP:access.remoteAddress}$|^%{IP:access.remoteAddress}$"
],
"pattern_definitions": {
"MULTIPLEIPS": "(%{IP},?)+"
}
}
},
{
"grok": {
"field": "message",
"patterns": [
"%{HTTPMETHOD:access.requestMethod} %{URIPATH:access.requestPath}%{URIPARAM:access.requestQuery}? %{NUMBER:access.responseStatus} %{NUMBER:access.responseTime}ms"
],
"pattern_definitions": {
"HTTPMETHOD": "GET|HEAD|POST|PUT|DELETE|CONNECT|OPTIONS|TRACE|PATCH"
}
}
}
]
}
PUT _ingest/pipeline/kinetic-request-structured-auth-v1
{
"description": "kinetic-request-structured-auth-v1",
"on_failure": [
{
"grok": {
"field": "message",
"patterns": [
"%{MULTILINEDATA:catchall}"
],
"pattern_definitions": {
"MULTILINEDATA": "(.|\\r|\\n)*"
}
}
},
{
"set": {
"field": "timestamp",
"value": "{{_ingest.timestamp}}"
}
}
],
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:app.instanceId} %{NOTSPACE:app.sessionId} %{NOTSPACE:app.correlationId} %{NOTSPACE:app.requestId} %{SLUG:app.space} %{DATA:app.user} (%{ADDRESS_CHAIN:access.addressChain}|NO_ADDRESS) \\- +%{GREEDYDATA:message}"
],
"pattern_definitions": {
"SLUG": "[a-zA-Z0-9-]+",
"ADDRESS_CHAIN": "(%{IP},?)+"
}
}
},
{
"grok": {
"field": "access.addressChain",
"ignore_missing": true,
"patterns": [
"^%{IP:access.originAddress},%{MULTIPLEIPS},%{IP:access.remoteAddress}$|^%{IP:access.originAddress},%{IP:access.remoteAddress}$|^%{IP:access.remoteAddress}$"
],
"pattern_definitions": {
"MULTIPLEIPS": "(%{IP},?)+"
}
}
},
{
"grok": {
"field": "message",
"patterns": [
"%{NOTSPACE:auth.event} %{NOTSPACE:auth.strategy} ((PRINCIPAL\\:%{DATA:auth.principal} MESSAGE\\:%{GREEDYDATA:auth.message})|(PRINCIPAL\\:%{GREEDYDATA:auth.principal}))"
]
}
}
]
}
PUT _ingest/pipeline/kinetic-request-structured-application-v1
{
"description": "kinetic-request-structured-application-v1",
"on_failure": [
{
"grok": {
"field": "message",
"patterns": [
"%{MULTILINEDATA:catchall}"
],
"pattern_definitions": {
"MULTILINEDATA": "(.|\\r|\\n)*"
}
}
},
{
"set": {
"field": "timestamp",
"value": "{{_ingest.timestamp}}"
}
}
],
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:app.instanceId} %{NOTSPACE:app.sessionId} %{NOTSPACE:app.correlationId} %{NOTSPACE:app.requestId} %{SLUG:app.space} %{DATA:app.user} (%{ADDRESS_CHAIN:access.addressChain}|NO_ADDRESS) +%{LOGLEVEL:level} \\[%{NOTSPACE:thread}\\] \\- %{MULTILINEDATA:message}"
],
"pattern_definitions": {
"MULTILINEDATA": "(.|\\r|\\n)*",
"SLUG": "[a-zA-Z0-9-]+",
"ADDRESS_CHAIN": "(%{IP},?)+"
}
}
},
{
"grok": {
"field": "access.addressChain",
"ignore_missing": true,
"patterns": [
"^%{IP:access.originAddress},%{MULTIPLEIPS},%{IP:access.remoteAddress}$|^%{IP:access.originAddress},%{IP:access.remoteAddress}$|^%{IP:access.remoteAddress}$"
],
"pattern_definitions": {
"MULTIPLEIPS": "(%{IP},?)+"
}
}
}
]
}
PUT _ingest/pipeline/kinetic-request-structured-system-v1
{
"description": "kinetic-request-structured-system-v1",
"on_failure": [
{
"grok": {
"field": "message",
"patterns": [
"%{MULTILINEDATA:catchall}"
],
"pattern_definitions": {
"MULTILINEDATA": "(.|\\r|\\n)*"
}
}
},
{
"set": {
"field": "timestamp",
"value": "{{_ingest.timestamp}}"
}
}
],
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:app.instanceId} +%{LOGLEVEL:level} \\[%{NOTSPACE:thread}\\] \\- %{MULTILINEDATA:message}"
],
"pattern_definitions": {
"MULTILINEDATA": "(.|\\r|\\n)*"
}
}
}
]
}
Filebeat configuration
Filebeat.yml
filebeat.prospectors:
- input_type: log
paths:
- /path/to/your/tomcat8/webapps/kinetic/WEB-INF/logs/structured.access.log
fields:
logfile: access
application: core
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} '
multiline.negate: true
multiline.match: after
pipeline: kinetic-request-structured-access-v1
- input_type: log
paths:
- /path/to/your/tomcat8/webapps/kinetic/WEB-INF/logs/structured.application.log
fields:
logfile: application
application: core
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} '
multiline.negate: true
multiline.match: after
pipeline: kinetic-request-structured-application-v1
- input_type: log
paths:
- /path/to/your/tomcat8/webapps/kinetic/WEB-INF/logs/structured.authentication.log
fields:
logfile: authentication
application: core
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} '
multiline.negate: true
multiline.match: after
pipeline: kinetic-request-structured-auth-v1
- input_type: log
paths:
- /path/to/your/tomcat8/webapps/kinetic/WEB-INF/logs/structured.system.log
fields:
logfile: system
application: core
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} '
multiline.negate: true
multiline.match: after
pipeline: kinetic-request-structured-system-v1
output.elasticsearch:
hosts: ["elasticsearch.yourdomain.com:9200"]
index: "kinetic-request-%{+yyyy.MM.dd}"
Be sure to use your own server names, ports, paths etc in the Filebeat.yml. The file paths need to be the full filesystem path to your structured log files and the hosts needs match the Elasticsearch host information for your environment.
Kibana configuration
Visualizations
Import our sample visualizations, kinetic-request-kibana-visualizations.json, into your Kibana environment through the management console.
Saved Searches
Import our sample saved searches into your Kibana environment through the management console.
Dashboards
Import our sample saved dashboard into your Kibana environment through the management console.
Updated 3 months ago