Integrating with Datadog (Python)

Datadog is a monitoring and analytics tool for information technology (IT) and DevOps teams that can be used to determine performance metrics as well as event monitoring for infrastructure and cloud services. This tutorial demonstrates how to use the Nightfall API for scanning your Datadog logs/metrics/events.

This tutorial allows you to scan your Datadog instance using the Nightfall API/SDK.

You will need a few things first to use this tutorial:

  • A Datadog account with an API key and Application key
  • A Nightfall API key
  • An existing Nightfall Detection Rule
  • A Python 3 environment (version 3.6 or later)
  • Python Nightfall SDK

To accomplish this, we will install the version required of the Nightfall SDK:

pip install nightfall=0.6.0

We will be using Python and installing/importing the following libraries:

import argparse
import csv
import json
import os
import sys
import time
import collections

import requests
from nightfall import Nightfall

Next we define the Detection Rule with which we wish to scan our data. The Detection Rule can be pre-made in the Nightfall web app and referenced by UUID.

Note, we are setting the Datadog authentication information as the below environment variables, and referencing the values from there:

  • DD_API_KEY
  • DD_APPLICATION_KEY
dd_api_key = os.environ.get('DD_API_KEY')
dd_application_key = os.environ.get('DD_APPLICATION_KEY')
nightfall_api_key = os.environ.get('NIGHTFALL_API_KEY')
detectionRuleUUID = os.environ.get('DETECTION_RULE_UUID')

Next we abstract a nightfall class from the SDK, for our API key.

nightfall = Nightfall(nightfall_api_key)

First we will set up the connection with Datadog, and get the data to be scanned from there.

The three different code sample options below are for the three different available items from Datadog to scan:

  1. logs - Scans the 100 most recent logs from Datadog.
  2. metrics - Scans all active metric tags from the last 24 hours.
  3. events - Scans all events from the last 24 hours.

Please follow that same option in the next few panes:

# This will return the most recent 100 logs from Datadog.

dd_url = 'https://api.datadoghq.com/api/v2/logs/events?page[limit]=100'

dd_headers = {
  'Content-Type': 'application/json',
  'DD-API-KEY': dd_api_key,
  'DD-APPLICATION-KEY': dd_application_key
}

try:
  response = requests.get(
    url=dd_url,
    headers=dd_headers
  )

  response.raise_for_status()

except requests.HTTPError:
  msg = f"ERROR: Datadog API returned: {response.status_code}"
  sys.exit(msg)

scan_logs = response.json()['data']
"""Uses Datadog API to retrieve all metric names submitted in the last 24 hours
    from datadog, then iterates over all the tags attached """

from_time = int(time.time()) - 60 * 60 * 24 * 1
dd_list_metrics_url = f"https://api.datadoghq.com/api/v1/metrics?from={from_time}"
dd_metric_metadata_url = "https://api.datadoghq.com/api/v2/metrics/{metric_name}/all-tags"

dd_headers = {
  'Content-Type': 'application/json',
  'DD-API-KEY': dd_api_key,
  'DD-APPLICATION-KEY': dd_application_key
}

try:
  response = requests.get(
    url=dd_list_metrics_url,
    headers=dd_headers
  )

  response.raise_for_status()

except requests.HTTPError:
  msg = f"ERROR: Datadog API returned: {response.status_code}"
  sys.exit(msg)

  tags = {}
  for metric_name in response.json()["metrics"]:
    try:
      response = requests.get(
        url=dd_metric_metadata_url.format(metric_name=metric_name),
        headers=dd_headers
      )

      response.raise_for_status()

    except requests.HTTPError:
      msg = f"ERROR: Datadog API returned: {response.status_code}"
      sys.exit(msg)

      json_resp = response.json()["data"]
      tags[json_resp["id"]] = json_resp["attributes"]["tags"]

    json.dump( tags, open( "tags.json", 'w' ) )
  return tags
"""Uses Datadog API to retrieve all events submitted in the last 24 hours
    from datadog and extracts scannable content"""

to_time = int(time.time())
from_time = to_time - 60 * 60 * 24 * 1
dd_list_events_url = f"https://api.datadoghq.com/api/v1/events"

dd_headers = {
  'Content-Type': 'application/json',
  'DD-API-KEY': dd_api_key,
  'DD-APPLICATION-KEY': dd_application_key
}


events = []
while from_time < to_time:
  dd_query = {
    'start': from_time,
    'end': to_time,
  }

  try:
    response = requests.get(
      url=dd_list_events_url,
      headers=dd_headers,
      params=dd_query,
    )

    response.raise_for_status()

  except requests.HTTPError:
    msg = f"ERROR: Datadog API returned: {response.status_code}"
    sys.exit(msg)

  events += response.json()["events"]
  if len(response.json()["events"]) < 1000:
    break

  from_time = events[-1]["date_happened"]

data_to_scan = {}
for e in events:
  data_to_scan[e["url"]] = str(Event(e["tags"], e["title"], e["text"]))

We then run a scan on the aggregated data from Data, using the Nightfall SDK:

findings = nightfall.scan_text(
    [scan_logs],
    detection_rule_uuids=[detectionRuleUUID]
)
scan_tags = {metric_name:str(ts) for metric_name,ts in tags.items()}

findings = nightfall.scan_text(
    [scan_tags],
    detection_rule_uuids=[detectionRuleUUID]
)
scan_tags = {metric_name:str(ts) for metric_name,ts in tags.items()}

findings = nightfall.scan_text(
    [data_to_scan],
    detection_rule_uuids=[detectionRuleUUID]
)

To review the results, we will write the findings to an output csv file:

all_findings = []
all_findings.append(
  [
    'event_id', 'event_url', 'detector', 'confidence', 
    'finding_start', 'finding_end', 'finding'
  ]
)

for log_id, finding in findings.items():
  if finding is None:
    continue
 
    for item in finding:
      fragment = item['finding']


      row = [
        log_id,
        f"https://app.datadoghq.com/logs?event={log_id}",
        item['detector']['name'],
        item['confidence'],
        item['location']['byteRange']['start'],
        item['location']['byteRange']['end'],
        item['location']['codepointRange']['start'],
        item['location']['codepointRange']['end'],
        fragment
      ]
      all_findings.append(row)

      if len(all_findings) > 1:
        filename = "nf_datadog_output-" + str(int(time.time())) + ".csv"
        with open(filename, 'w') as output_file:
          csv_writer = csv.writer(output_file, delimiter=',')
          csv_writer.writerows(all_findings)
        print("Output findings written to", filename)
      else:
        print('No sensitive data detected. Hooray!')
all_findings = []
all_findings.append(
  [
    'metric_name','detector', 'confidence', 
    'finding_start', 'finding_end', 'finding'
  ]
)

do_mask = true


for metric_name, finding in findings.items():
  if finding is None:
    continue

    for item in finding:
      if do_mask:
        if len(item['finding']) <= 3:
          fragment = "***"
        else:
          fragment = item['finding'][:3] + \
          ('*'*(len(item['finding'])-3))
      else:
        fragment = item['finding']


        row = [
            metric_name,
            item['detector']['name'],
            item['confidence'],
            item['location']['byteRange']['start'],
            item['location']['byteRange']['end'],
            item['location']['codepointRange']['start'],
            item['location']['codepointRange']['end'],
            fragment
          ]
          all_findings.append(row)

      if len(all_findings) > 1:
        filename = "nf_datadog_output-" + str(int(time.time())) + ".csv"
        with open(filename, 'w') as output_file:
          csv_writer = csv.writer(output_file, delimiter=',')
          csv_writer.writerows(all_findings)
        print("Output findings written to", filename)
      else:
        print('No sensitive data detected. Hooray!')
all_findings = []
all_findings.append(
  [
    'event_url', 'detector', 'confidence', 
    'finding_start', 'finding_end', 'fragment'
  ]
)

do_mask = true

for event_url, finding in findings.items():
  if finding is None:
    continue

    for item in finding:
      if do_mask:
        if len(item['fragment']) <= 3:
          fragment = "***"
        else:
          fragment = item['fragment'][:3] + \
          ('*'*(len(item['fragment'])-3))
      else:
        fragment = item['fragment']


        row = [
            "https://app.datadoghq.com" + event_url,
            fragment,
            item['detector']['name'],
            item['confidence'],
            item['location']['byteRange']['start'],
            item['location']['byteRange']['end'],
            item['location']['codepointRange']['start'],
            item['location']['codepointRange']['end']
          ]
          all_findings.append(row)

      if len(all_findings) > 1:
        filename = "nf_datadog_output-" + str(int(time.time())) + ".csv"
        with open(filename, 'w') as output_file:
          csv_writer = csv.writer(output_file, delimiter=',')
          csv_writer.writerows(all_findings)
        print("Output findings written to", filename)
      else:
        print('No sensitive data detected. Hooray!')

Note:
Results of the scan will be outputted to a file named nf_datadog_output-TIMESTAMP.csv.

This example will include the full finding below. As the finding might be a piece of sensitive data, we would recommend using the Redaction feature of the Nightfall API to mask your data. More information can be seen in the 'Using Redaction to Mask Findings' section below.

Using Redaction to Mask Findings

With the Nightfall API, you are also able to redact and mask your Datadog findings. You can add a Redaction Config, as part of your Detection Rule. For more information on how to use redaction, and its specific options, please refer to the guide here.

Using the File Scanning Endpoint with Datadog

The example above is specific for the Nightfall Text Scanning API. To scan files, we can use a similar process as we did the text scanning endpoint. The process is broken down in the sections below, as the file scanning process is more intensive.

Prerequisites:

In order to utilize the File Scanning API you need the following:

  • An active API Key authorized for file scanning passed via the header Authorization: Bearer — see Authentication and Security
  • A Nightfall Detection Policy associated with a webhook URL
  • A web server configured to listen for file scanning results (more information below)

The steps to use the endpoint are as follows:

  1. Retrieve data from Datadog

Similar to the process in the beginning of this tutorial for the text scanning endpoint, we will now initialize our and retrieve the data we like, from Datadog. This can be either logs/metrics/events. The below example will show logs:

# This will return the most recent 100 logs from Datadog.

dd_url = 'https://api.datadoghq.com/api/v2/logs/events?page[limit]=100'

dd_headers = {
  'Content-Type': 'application/json',
  'DD-API-KEY': dd_api_key,
  'DD-APPLICATION-KEY': dd_application_key
}

try:
  response = requests.get(
    url=dd_url,
    headers=dd_headers
  )

  response.raise_for_status()

except requests.HTTPError:
  msg = f"ERROR: Datadog API returned: {response.status_code}"
  sys.exit(msg)

scan_logs = response.json()['data']

Now we go through write the logs to a .csv file.

filename = "nf_datadog_input-" + str(int(time.time())) + ".csv"  

with open(filename, 'w') as output_file:
  csv_writer = csv.writer(output_file, delimiter=',')
  csv_writer.writerows(scan_logs)
     
print("Datadog Logs Written to: ", filename)
  1. Begin the file upload process to the Scan API, with the above written .csv file, as shown here.

  2. Once the files have been uploaded, begin using the scan endpoint mentioned here. Note: As can be seen in the documentation, a webhook server is required for the scan endpoint, to which it will send the scanning results. An example webhook server setup can be seen here.

  3. The scanning endpoint will work asynchronously for the files uploaded, so you can monitor the webhook server to see the API responses and file scan findings as they come in.

Resources:

File Scanning Process Documentation
File Scan API Reference: