Integrating with Elasticsearch (Python)

Elasticsearch is a popular tool for storing, searching, and analyzing all kinds of structured and unstructured data, especially as a part of the larger ELK stack. However, along with all data storage tools, there is huge potential for unintentionally leaking sensitive data. By utilizing Elastic's own REST APIs in conjunction with Nightfall AI’s Scan API, you can discover, classify, and remediate sensitive data within your Elastic stack.

You can follow along with your own instance or spin up a sample instance with the commands listed below. By default, you will be able to download and interact with sample datasets from the elk instance at localhost:5601. Your data can be queried from localhost:9200. The "Add sample data" function can be found underneath the Observability section on the Home page; in this tutorial we reference the "Sample Web Logs" dataset..

docker pull sebp/elk

docker run -p 5601:5601 -p 9200:9200 -p 5044:5044 -it --name elk sebp/elk

You will need a few things to follow along with this tutorial:

  • An Elasticsearch instance with data to query
  • A Nightfall API key
  • An existing Nightfall Detection Rule
  • A Python 3 environment (version 3.6 or later)
  • Python Nightfall SDK

To accomplish this, we will install the version required of the Nightfall SDK:

pip install nightfall=0.6.0

We will be using Python and importing the following libraries:

import requests
import json
import os
import csv
from nightfall import Nightfall

We first configure the URLs to communicate with. If you are following along with the Sample Web Logs dataset alluded to at the beginning of this article, you can copy this Elasticsearch URL. If not, your URL will probably take the format http://<hostname>/<index_name>/_search.

elasticsearch_base_url = 'http://localhost:9200/kibana_sample_data_logs/_search'

Next we define the Detection Rule with which we wish to scan our data. The Detection Rule can be pre-made in the Nightfall web app and referenced by UUID.

Also, we abstract a nightfall class from the SDK, from our API key.

nightfall_api_key = os.environ.get('NIGHTFALL_API_KEY')

detectionRuleUUID = os.environ.get('DETECTION_RULE_UUID')

nightfall = Nightfall(nightfall_api_key)

We now construct the payload and headers for our call to Elasticsearch. The payload represents whichever subset of data you wish to query. In this example, we are querying all results from the previous hour.

We then make our call to the Elasticsearch data store and save the resulting response.

elasticsearch_payload = "{\"query\": {\"range\": {\"timestamp\": {\"gte\" : \"now-1h\"}}}}"

elasticsearch_headers = {
  'Content-Type': 'application/json'
}

elasticsearch_response = requests.get(
  url = elasticsearch_base_url, 
  headers = elasticsearch_headers, 
  data = elasticsearch_payload
)

logs = json.loads(elasticsearch_response.text)['hits']['hits']

Now we send our Elasticsearch query results to the Nightfall SDK for scanning.

nightfall_response = nightfall.scan_text{
  [json.dumps(i) for i in logs],
  detection_rule_uuids=[detectionRuleUUID]
}

findings = json.loads(nightfall_response)

We will create an all_findings object to store Nightfall Scan results. The first row of our all_findings object will constitute our headers, since we will dump this object to a CSV file later.

This example will include the full finding below. As the finding might be a piece of sensitive data, we would recommend using the Redaction feature of the Nightfall API to mask your data. More information can be seen in the 'Using Redaction to Mask Findings' section below.

all_findings = []
all_findings.append(
  [
    'index_name', 'log_id', 'detector', 'confidence', 
    'finding_start', 'finding_end', 'finding'
  ]
)

Next we go through our findings from the Nightfall Scan API and match them to the identifying fields from the Elasticsearch index so we can find them and remediate them in situ.

Finding locations here represent the location within the log as a string. Finding locations can also be found in byteRange.

# Each top level item in findings corresponds to one log
for log_idx, log_findings in enumerate(findings):
  for finding_idx, finding in enumerate(log_findings):

    index_name = logs[log_idx]['_index']
    log_id = logs[log_idx]['_id']

    row = [
      index_name,
      log_id,
      finding['detector']['name'],
      finding['confidence'],
      finding['location']['byteRange']['start'],
      finding['location']['byteRange']['end'],
      finding['location']['codepointRange']['start'],
      finding['location']['codepointRange']['end'],
      finding['finding']
    ] 
    all_findings.append(row)

Finally, we export our results to a csv so they can be easily reviewed.

if len(all_findings) > 1:
  with open('output_file.csv', 'w') as output_file:
    csv_writer = csv.writer(output_file, delimiter = ',')
    csv_writer.writerows(all_findings)
else:
  print('No sensitive data detected. Hooray!')

That's it! You now have insight into all sensitive data shared inside your Elasticsearch instance within the past hour.

However, in use-cases such as this where the data is well-structured, it can be more informative to call out which fiels are found to contain sensitive data, as opposed to the location of the data. While the above script is easy to implement without modifying the queried data, it does not provide insight into these fields.

Using Redaction to Mask Findings

With the Nightfall API, you are also able to redact and mask your Elasticsearch findings. You can add a Redaction Config, as part of your Detection Rule. For more information on how to use redaction, and its specific options, please refer to the guide here.

Using the File Scanning Endpoint with Elasticsearch

The example above is specific for the Nightfall Text Scanning API. To scan files, we can use a similar process as we did the text scanning endpoint. The process is broken down in the sections below, as the file scanning process is more intensive.

Prerequisites:

In order to utilize the File Scanning API you need the following:

  • An active API Key authorized for file scanning passed via the header Authorization: Bearer — see Authentication and Security
  • A Nightfall Detection Policy associated with a webhook URL
  • A web server configured to listen for file scanning results (more information below)

The steps to use the endpoint are as follows:

  1. Retrieve data from Elasticsearch

Similar to the process in the beginning of this tutorial for the text scanning endpoint, we will now initialize our and retrieve the data we like, from Elasticsearch:

elasticsearch_base_url = 'http://localhost:9200/kibana_sample_data_logs/_search'

elasticsearch_payload = "{\"query\": {\"range\": {\"timestamp\": {\"gte\" : \"now-1h\"}}}}"

elasticsearch_headers = {
  'Content-Type': 'application/json'
}

elasticsearch_response = requests.get(
  url = elasticsearch_base_url, 
  headers = elasticsearch_headers, 
  data = elasticsearch_payload
)

logs = (elasticsearch_response.text)['hits']['hits']

Now we go through write the logs to a .csv file.

filename = "nf_elasticsearch_input-" + str(int(time.time())) + ".csv"  

with open(filename, 'w') as output_file:
  csv_writer = csv.writer(output_file, delimiter=',')
  csv_writer.writerows(logs)
     
print("Elasticsearch Data Written to: ", filename)
  1. Begin the file upload process to the Scan API, with the above written .csv file, as shown here.

  2. Once the files have been uploaded, begin using the scan endpoint mentioned here. Note: As can be seen in the documentation, a webhook server is required for the scan endpoint, to which it will send the scanning results. An example webhook server setup can be seen here.

  3. The scanning endpoint will work asynchronously for the files uploaded, so you can monitor the webhook server to see the API responses and file scan findings as they come in.

Resources:

File Scanning Process Documentation
File Scan API Reference: