How to Use Elasticsearch and Filebeat integration with Kinetic Structured Logs

Overview

In our Kinops SaaS offering, we're leveraging our structured logs with Elasticsearch and Kibana to provide us with enhanced troubleshooting, analytics, and reporting capabilities.

Below are some screenshots to demonstrate some of the benefits of leveraging Elasticsearch and Kibana.

screencapture-kibana-kinops-dashboard-1
Custom dashboards for displaying activity, performance, and health in your environment.

screencapture-kibana-kinops-aggregate-log-searching
Perform queries to find errors in the logs for a specific user. Maybe providing the capability to avoid having to ask the question 'when did you experience the problem' and slowing down the investigation.

The steps below go over how to setup Elasticsearch, Filebeat, and Kibana to produce some Kibana dashboards/visualizations and allow aggregate log querying. The steps below assume you already have an Elasticsearch and Kibana environment.

Elasticsearch Configuration

Create an Elasticsearch index template

The following is what we use for our index template - you may need to change some settings for your Elasticsearch environment and data retention requirements. Your Elasticsearch administrator can help determine this.

PUT _template/kinetic-request
{
    "order": 0,
    "template": "kinetic-request-*",
    "settings": {
      "index": {
        "refresh_interval": "5s"
      }
    },
    "mappings": {
      "_default_": {
        "_meta": {
          "version": "5.0.0"
        },
        "dynamic_templates": [
          {
            "strings_as_keyword": {
              "mapping": {
                "ignore_above": 1024,
                "type": "keyword"
              },
              "match_mapping_type": "string"
            }
          }
        ],
        "_all": {
          "norms": false
        },
        "properties": {
          "access.addressChain": {
            "type": "text"
          },
          "access.originAddress": {
            "type": "ip"
          },
          "access.remoteAddress": {
            "type": "ip"
          },
          "access.requestPath": {
            "type": "text"
          },
          "access.requestProtocol": {
            "type": "keyword"
          },
          "access.requestMethod": {
            "type": "keyword"
          },
          "access.requestQuery": {
            "type": "text"
          },
          "access.responseStatus": {
            "type": "keyword"
          },
          "access.responseTime": {
            "type": "integer"
          },
          "app.correlationId": {
            "type": "keyword"
          },
          "app.sessionId": {
            "type": "keyword"
          },
          "app.instanceId": {
            "type": "keyword"
          },
          "app.requestId": {
            "type": "keyword"
          },
          "app.space": {
            "type": "keyword"
          },
          "app.user": {
            "type": "keyword"
          },
          "auth.event": {
            "type": "keyword"
          },
          "auth.message": {
            "type": "text"
          },
          "auth.principal": {
            "type": "keyword"
          },
          "auth.strategy": {
            "type": "keyword"
          },
          "@timestamp": {
            "format": "yyyy-MM-dd HH:mm:ss,SSS||yyyy-MM-dd'T'HH:mm:ss.SSSZ",
            "type": "date"
          },
          "catchall": {
            "type": "text"
          },
          "level": {
            "type": "keyword"
          },
          "message": {
            "type": "text"
          },
          "thread": {
            "type": "keyword"
          },
          "timestamp": {
            "format": "yyyy-MM-dd HH:mm:ss,SSS||yyyy-MM-dd'T'HH:mm:ss.SSSZ||EEE MMM d HH:mm:ss z yyyy||EEE MMM dd HH:mm:ss z yyyy",
            "type": "date"
          }
        }
      }
    }
}

Create Elasticsearch ingestion pipelines

There are four ingestion pipelines that we created, one for each structured log. These pipelines help parse the log message into document fields inside of the indexes and allows for querying the log files by app.user, app.space, error level, response times, and more.

Note: The following pipelines assume you're using the default structured log format. If you've modified your %DATA_DIR%/config/log4j.xml or %DATA_DIR%/config/log4j-default.xml file for the structured log files 'ConversionPattern' then you will have to change the ingestion pipeline patterns listed below.

PUT _ingest/pipeline/kinetic-request-structured-access-v1
{
  "description": "kinetic-request-structured-access-v1",
  "on_failure": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{MULTILINEDATA:catchall}"
        ],
        "pattern_definitions": {
          "MULTILINEDATA": "(.|\\r|\\n)*"
        }
      }
    },
    {
      "set": {
        "field": "timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ],
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:app.instanceId} %{NOTSPACE:app.sessionId} %{NOTSPACE:app.correlationId} %{NOTSPACE:app.requestId} %{SLUG:app.space} %{DATA:app.user} (%{ADDRESS_CHAIN:access.addressChain}|NO_ADDRESS) \\- %{GREEDYDATA:message}"
        ],
        "pattern_definitions": {
          "SLUG": "[a-zA-Z0-9-]+",
          "ADDRESS_CHAIN": "(%{IP},?)+"
        }
      }
    },
    {
      "grok": {
        "field": "access.addressChain",
        "ignore_missing": true,
        "patterns": [
          "^%{IP:access.originAddress},%{MULTIPLEIPS},%{IP:access.remoteAddress}$|^%{IP:access.originAddress},%{IP:access.remoteAddress}$|^%{IP:access.remoteAddress}$"
        ],
        "pattern_definitions": {
          "MULTIPLEIPS": "(%{IP},?)+"
        }
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{HTTPMETHOD:access.requestMethod} %{URIPATH:access.requestPath}%{URIPARAM:access.requestQuery}? %{NUMBER:access.responseStatus} %{NUMBER:access.responseTime}ms"
        ],
        "pattern_definitions": {
          "HTTPMETHOD": "GET|HEAD|POST|PUT|DELETE|CONNECT|OPTIONS|TRACE|PATCH"
        }
      }
    }
  ]
}
PUT _ingest/pipeline/kinetic-request-structured-auth-v1
{
  "description": "kinetic-request-structured-auth-v1",
  "on_failure": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{MULTILINEDATA:catchall}"
        ],
        "pattern_definitions": {
          "MULTILINEDATA": "(.|\\r|\\n)*"
        }
      }
    },
    {
      "set": {
        "field": "timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ],
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:app.instanceId} %{NOTSPACE:app.sessionId} %{NOTSPACE:app.correlationId} %{NOTSPACE:app.requestId} %{SLUG:app.space} %{DATA:app.user} (%{ADDRESS_CHAIN:access.addressChain}|NO_ADDRESS) \\- +%{GREEDYDATA:message}"
        ],
        "pattern_definitions": {
          "SLUG": "[a-zA-Z0-9-]+",
          "ADDRESS_CHAIN": "(%{IP},?)+"
        }
      }
    },
    {
      "grok": {
        "field": "access.addressChain",
        "ignore_missing": true,
        "patterns": [
          "^%{IP:access.originAddress},%{MULTIPLEIPS},%{IP:access.remoteAddress}$|^%{IP:access.originAddress},%{IP:access.remoteAddress}$|^%{IP:access.remoteAddress}$"
        ],
        "pattern_definitions": {
          "MULTIPLEIPS": "(%{IP},?)+"
        }
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{NOTSPACE:auth.event} %{NOTSPACE:auth.strategy} ((PRINCIPAL\\:%{DATA:auth.principal} MESSAGE\\:%{GREEDYDATA:auth.message})|(PRINCIPAL\\:%{GREEDYDATA:auth.principal}))"
        ]
      }
    }
  ]
}
PUT _ingest/pipeline/kinetic-request-structured-application-v1
{
  "description": "kinetic-request-structured-application-v1",
  "on_failure": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{MULTILINEDATA:catchall}"
        ],
        "pattern_definitions": {
          "MULTILINEDATA": "(.|\\r|\\n)*"
        }
      }
    },
    {
      "set": {
        "field": "timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ],
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:app.instanceId} %{NOTSPACE:app.sessionId} %{NOTSPACE:app.correlationId} %{NOTSPACE:app.requestId} %{SLUG:app.space} %{DATA:app.user} (%{ADDRESS_CHAIN:access.addressChain}|NO_ADDRESS) +%{LOGLEVEL:level} \\[%{NOTSPACE:thread}\\] \\- %{MULTILINEDATA:message}"
        ],
        "pattern_definitions": {
          "MULTILINEDATA": "(.|\\r|\\n)*",
          "SLUG": "[a-zA-Z0-9-]+",
          "ADDRESS_CHAIN": "(%{IP},?)+"
        }
      }
    },
    {
      "grok": {
        "field": "access.addressChain",
        "ignore_missing": true,
        "patterns": [
          "^%{IP:access.originAddress},%{MULTIPLEIPS},%{IP:access.remoteAddress}$|^%{IP:access.originAddress},%{IP:access.remoteAddress}$|^%{IP:access.remoteAddress}$"
        ],
        "pattern_definitions": {
          "MULTIPLEIPS": "(%{IP},?)+"
        }
      }
    }
  ]
}
PUT _ingest/pipeline/kinetic-request-structured-system-v1
{
  "description": "kinetic-request-structured-system-v1",
  "on_failure": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{MULTILINEDATA:catchall}"
        ],
        "pattern_definitions": {
          "MULTILINEDATA": "(.|\\r|\\n)*"
        }
      }
    },
    {
      "set": {
        "field": "timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ],
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:app.instanceId} +%{LOGLEVEL:level} \\[%{NOTSPACE:thread}\\] \\- %{MULTILINEDATA:message}"
        ],
        "pattern_definitions": {
          "MULTILINEDATA": "(.|\\r|\\n)*"
        }
      }
    }
  ]
}

Filebeat configuration

Filebeat.yml

filebeat.prospectors:
- input_type: log
  paths:
    - /path/to/your/tomcat8/webapps/kinetic/WEB-INF/logs/structured.access.log
  fields:
    logfile: access
    application: core
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} '
  multiline.negate: true
  multiline.match: after
  pipeline: kinetic-request-structured-access-v1
- input_type: log
  paths:
    - /path/to/your/tomcat8/webapps/kinetic/WEB-INF/logs/structured.application.log
  fields:
    logfile: application
    application: core
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} '
  multiline.negate: true
  multiline.match: after
  pipeline: kinetic-request-structured-application-v1
- input_type: log
  paths:
    - /path/to/your/tomcat8/webapps/kinetic/WEB-INF/logs/structured.authentication.log
  fields:
    logfile: authentication
    application: core
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} '
  multiline.negate: true
  multiline.match: after
  pipeline: kinetic-request-structured-auth-v1
- input_type: log
  paths:
    - /path/to/your/tomcat8/webapps/kinetic/WEB-INF/logs/structured.system.log
  fields:
    logfile: system
    application: core
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} '
  multiline.negate: true
  multiline.match: after
  pipeline: kinetic-request-structured-system-v1

output.elasticsearch:
  hosts: ["elasticsearch.yourdomain.com:9200"]
  index: "kinetic-request-%{+yyyy.MM.dd}"

Be sure to use your own server names, ports, paths etc in the Filebeat.yml. The file paths need to be the full filesystem path to your structured log files and the hosts needs match the Elasticsearch host information for your environment.

Kibana configuration

Visualizations

Import our sample visualizations, kinetic-request-kibana-visualizations.json, into your Kibana environment through the management console.

Saved Searches

Import our sample saved searches into your Kibana environment through the management console.

Dashboards

Import our sample saved dashboard into your Kibana environment through the management console.