Integrating with Amazon RDS (Python)

RDS is a service for managing relational databases and can contain databases from several different varieties. This tutorial demonstrates connectivity with a postgresSQL database but could be modified to support other database options.

This tutorial allows you to scan your RDS managed databases using the Nightfall API/SDK.

You will need a few things first to use this tutorial:

  • An AWS account with at least one RDS database (this example uses postgres but could be modified to support other varieties of SQL)
  • A Nightfall API key
  • An existing Nightfall Detection Rule
  • A Python 3 environment (version 3.6 or later)
  • Python Nightfall SDK

To accomplish this, we will install the version required of the Nightfall SDK:

pip install nightfall=0.6.0

We will be using Python and importing the following libraries:

import requests
import psycopg2
import os
import sys
import json
from nightfall import Nightfall

We will set the size and length limits for data allowed by the Nightfall API per request. Also, we extract our API Key, and abstract a nightfall class from the SDK, for it.

size_limit = 500000
length_limit = 50000

Next we extract our API Key, and abstract a nightfall class from the SDK, for it.

nightfall = Nightfall(os.environ['NIGHTFALL_API_KEY'])

Next we define the Detection Rule with which we wish to scan our data. The Detection Rule can be pre-made in the Nightfall web app and referenced by UUID.

detectionRuleUUID = os.environ.get('DETECTION_RULE_UUID')

First we will set up the connection with the Postgres table, in RDS, and get the data to be scanned from there.

Note, we are setting the RDS authentication information as the below environment variables, and referencing the values from there:

  • 'RDS_ENDPOINT'
  • 'RDS_USER'
  • 'RDS_PASSWORD'
  • 'RDS_DATABASE'
  • 'RDS_TABLE'
  • 'RDS_PRIMARYKEY'
connection = psycopg2.connect(
        host = os.environ.get('RDS_ENDPOINT'),
        port = 5432,
        user = os.environ.get('RDS_USER'),
        password = os.environ.get('RDS_PASSWORD'),
        database = os.environ.get('RDS_DATABASE')
    )
table_name = os.environ.get('RDS_TABLE')
primary_key = os.environ.get('RDS_PRIMARYKEY')
cursor = connection.cursor()

sql = f"""
SELECT *
FROM {table_name}
"""

cursor.execute(sql)
connection.commit()

cols = [i.name for i in cursor.description]
data = cursor.fetchall()

We can then check the data size, and as long as it is below the aforementioned limits, can be ran through the API.

If the data payloads are larger than the size or length limits of the API, extra code will be required to further chunk the data into smaller bits that are processable by the Nightfall scan API.

This can be seen in the second and third code panes below:

primary_key_col = []

if len(data) == 0:
  raise Exception('Table is empty! No data to scan.')

all_findings = []
for col_idx, col in enumerate(columns):
    payload = [str(i[col_idx]) for i in data]
    if col == primary_key:
      primary_key_col = payload
      col_size = sys.getsizeof(payload)

    if col_size < size_limit:
   	 resp = nightfall.scanText(
        [payload],
        detection_rule_uuids=[detectionRuleUUID])
    
     col_resp = json.loads(resp)
      
for item_idx, item in enumerate(col_resp):
  if item != None:
    for finding in item:
      finding['column'] = col
      try:
        finding['index'] = primary_key_col[item_idx]
      except:
          finding['index'] = item_idx
      all_findings.append(finding)
col_resp = []
chunks = []
chunk = []
running_size = 0
big_items = []

for item_idx, item in enumerate(payload):
  item_size = sys.getsizeof(item)
  if (running_size + item_size < size_limit) and (len(chunk) < length_limit):
    chunk.append(item)
    running_size += item_size
  elif item_size < size_limit:
    chunks.append(chunk)
    chunk = [item]
    running_size = item_size
  else:
    if len(chunk) < length_limit:
      chunk.append('')
    else:
      chunks.append(chunk)
      chunk = ['']
      big_items.append(item_idx)
      chunks.append(chunk)

chunk_cursor = 0

for chunk in chunks:
  resp = nightfall.scanText(
        [chunk],
        detection_rule_uuids=[conditionSetUUID])
  col_resp.extend(json.loads(resp.text))
  chunk_cursor += len(chunk)

for item_idx, item in enumerate(col_resp):
  if item != None:
    for finding in item:
      finding['column'] = col
      try:
        finding['index'] = primary_key_col[item_idx]
      except:
          finding['index'] = item_idx
      all_findings.append(finding)
for big in big_items:
  item_size = sys.getsizeof(big)
    chunks_req = (item_size // size_limit) + 1
    chunk_len = len(item) // chunks_req
    cursor = 0
    item_findings = []
    for _ in range(chunks_req):
        p = item[cursor : min(cursor + chunk_len, len(item))]
        resp = nightfall.scanText({
        [[p]],
        detection_rule_uuids=[conditionSetUUID])
        item_findings.extend(json.loads(resp.text))
        cursor += chunk_len
  
  if item_findings == []:
    raise Exception(f"Error while scanning large item at column {col}, Index {primary_key_col[big]}")
  for find_chunk in item_resp:
      if find_chunk != None:
        for finding in find_chunk:
          finding['column'] = col
          try:
            finding['index'] = primary_key_col[big]
          except:
            finding['index'] = big
          all_findings.append(finding)

To review the results, we will print the number of findings, and write the findings to an output file:

print(f"{len(all_findings)} sensitive findings in {os.environ.get('RDS_TABLE')}")
with open('rds_findings.json', 'w') as output_file:
  json.dump(all_findings, output_file)

Please find the full script together below, broken into functions that can be ran in full:

import requests
import psycopg2
import os
import sys
import json
from nightfall.api import Nightfall

size_limit = 500000
length_limit = 50000

nightfall = Nightfall(os.environ['NIGHTFALL_API_KEY'])

detectionRuleUUID = os.environ.get('DETECTION_RULE_UUID')

def get_from_rds():
    '''
    Gets data to be scanned from postgres table in RDS.
    Inputs: 
        None (all required info should be stored as environment variables)
    Returns: 
        data, list[tuple()] - data from postgres table as a list of tuples
        cols, list[str] - list of column names
        primary_key, str - name of primary key column
    '''
    connection = psycopg2.connect(
        host = os.environ.get('RDS_ENDPOINT'),
        port = 5432,
        user = os.environ.get('RDS_USER'),
        password = os.environ.get('RDS_PASSWORD'),
        database = os.environ.get('RDS_DATABASE')
    )
    table_name = os.environ.get('RDS_TABLE')
    primary_key = os.environ.get('RDS_PRIMARYKEY')
    cursor = connection.cursor()

    sql = f"""
    SELECT *
    FROM {table_name}
    """

    cursor.execute(sql)
    connection.commit()

    cols = [i.name for i in cursor.description]
    data = cursor.fetchall()

    return data, cols, primary_key

def nightfall_scan(payload):
    '''
    Calls the Nightfall scan API on input text
    Inputs:
        payload, list[str] - list of strings to be scanned
    Returns:
        resp - http response from the Nightfall API containing scan results
    '''
    return nightfall.scanText(
        [payload],
        detection_rule_uuids=[detectionRuleUUID])

def craft_chunks(payload, size_limit, length_limit):
    '''
    Chunks a payload into smaller bits processable by the Nightfall scan API
    Inputs:
        payload, list[str] - list of strings to be scanned
        size_limit, int - maximum data size allowed by Nightfall API per request
        length_limit, int - maximum no. of items allowed by Nightfall API per request
    Returns:
        chunks, list[list[str]] - list of lists of strings to be scanned
        big_items, list[int] - list of indices of items that exceed the size_limit on their own
    '''
    chunks = []
    chunk = []
    running_size = 0
    big_items = []
    for item_idx, item in enumerate(payload):
        item_size = sys.getsizeof(item)
        if (running_size + item_size < size_limit) and (len(chunk) < length_limit):
            chunk.append(item)
            running_size += item_size
        elif item_size < size_limit:
            chunks.append(chunk)
            chunk = [item]
            running_size = item_size
        else:
            if len(chunk) < length_limit:
                chunk.append('')
            else:
                chunks.append(chunk)
                chunk = ['']
            big_items.append(item_idx)
    chunks.append(chunk)
    return chunks, big_items

def scan_big_item(item, size_limit):
    '''
    Chunks a single large block of text and sends it to the Nightfall API
    in processable bits
    Inputs:
        item, str - a single string to be scanned by the Nightfall API
        size_limit, int - maximum data size allowed by Nightfall API per request
    Returns:
        item_findings, list[list[dict]] - findings from the Nightfall API for the entire item
    '''
    item_size = sys.getsizeof(item)
    chunks_req = (item_size // size_limit) + 1
    chunk_len = len(item) // chunks_req
    cursor = 0
    item_findings = []
    for _ in range(chunks_req):
        p = item[cursor : min(cursor + chunk_len, len(item))]
        resp = nightfall.scanText(
          [[p]],
          detection_rule_uuids=[detectionRuleUUID])
        item_findings.extend(json.loads(resp.text))
        cursor += chunk_len
    return item_findings

if __name__ == '__main__':
    # This script will be for Postgres but other SQL varieties 
    # will work with modifications
    data, columns, primary_key = get_from_rds()
    primary_key_col = []
    if len(data) == 0:
        raise Exception('Table is empty! No data to scan.')

    all_findings = []
    for col_idx, col in enumerate(columns):
        payload = [str(i[col_idx]) for i in data]
        if col == primary_key:
            primary_key_col = payload
        col_size = sys.getsizeof(payload)

        if col_size < size_limit:
            resp = resp = nightfall.scanText({
              [[p]],
              detection_rule_uuids=[detectionRuleUUID])
            col_resp = json.loads(resp)
        else:
            col_resp = []
            chunks, big_items = craft_chunks(payload, size_limit, length_limit)
            chunk_cursor = 0
            for chunk in chunks:
                resp = nightfall.scanText(
                  [chunk],
                  detection_rule_uuids=[detectionRuleUUID])
                col_resp.extend(json.loads(resp))
                chunk_cursor += len(chunk)
            
            for big in big_items:
                item_resp = scan_big_item(payload[big], size_limit)
                if item_resp == None:
                    raise Exception(f"Error while scanning large item at column {col}, Index {primary_key_col[big]}")
                for find_chunk in item_resp:
                    if find_chunk != None:
                        for finding in find_chunk:
                            finding['column'] = col
                            try:
                                finding['index'] = primary_key_col[big]
                            except:
                                finding['index'] = big
                            all_findings.append(finding)

        # Add location within source Table
        for item_idx, item in enumerate(col_resp):
            if item != None:
                for finding in item:
                    finding['column'] = col
                    try:
                        finding['index'] = primary_key_col[item_idx]
                    except:
                        finding['index'] = item_idx
                    all_findings.append(finding)

    print(f"{len(all_findings)} sensitive findings in {os.environ.get('RDS_TABLE')}")
    with open('rds_findings.json', 'w') as output_file:
        json.dump(all_findings, output_file)

The following are potential ways to continue building upon this service:

  • Writing Nightfall results to a database and reading that into a visualization tool
  • Adding to this script to support other varieties of SQL
  • Redacting sensitive findings in place once they are detected, either automatically or as a follow-up script once findings have been reviewed

Using Redaction to Mask Findings

With the Nightfall API, you are also able to redact and mask your RDS findings. You can add a Redaction Config, as part of your Detection Rule. For more information on how to use redaction, and its specific options, please refer to the guide here.

Using the File Scanning Endpoint with RDS

The example above is specific for the Nightfall Text Scanning API. To scan files, we can use a similar process as we did the text scanning endpoint. The process is broken down in the sections below, as the file scanning process is more intensive.

Prerequisites:

In order to utilize the File Scanning API you need the following:

  • An active API Key authorized for file scanning passed via the header Authorization: Bearer — see Authentication and Security
  • A Nightfall Detection Policy associated with a webhook URL
  • A web server configured to listen for file scanning results (more information below)

The steps to use the endpoint are as follows:

  1. Retrieve data from RDS

Similar to the process in the beginning of this tutorial for the text scanning endpoint, we will now initialize our AWS RDS Connection. Once the session is established, we can query from RDS.

connection = psycopg2.connect(
        host = os.environ.get('RDS_ENDPOINT'),
        port = 5432,
        user = os.environ.get('RDS_USER'),
        password = os.environ.get('RDS_PASSWORD'),
        database = os.environ.get('RDS_DATABASE')
    )
table_name = os.environ.get('RDS_TABLE')
primary_key = os.environ.get('RDS_PRIMARYKEY')
cursor = connection.cursor()

sql = f"""
SELECT *
FROM {table_name}
"""

cursor.execute(sql)
connection.commit()

cols = [i.name for i in cursor.description]
data = cursor.fetchall()

Now we go through the data and write to a .csv file.

primary_key_col = []

if len(data) == 0:
  raise Exception('Table is empty! No data to scan.')

filename = "nf_rds_input-" + str(int(time.time())) + ".csv"  

for col_idx, col in enumerate(columns):
    payload = [str(i[col_idx]) for i in data]   
    with open(filename, 'w') as output_file:
      csv_writer = csv.writer(output_file, delimiter=',')
      csv_writer.writerows(payload)
     
print("RDS Data Written to: ", filename)
  1. Begin the file upload process to the Scan API, with the above written .csv file, as shown here.

  2. Once the files have been uploaded, begin using the scan endpoint mentioned here. Note: As can be seen in the documentation, a webhook server is required for the scan endpoint, to which it will send the scanning results. An example webhook server setup can be seen here.

  3. The scanning endpoint will work asynchronously for the files uploaded, so you can monitor the webhook server to see the API responses and file scan findings as they come in.

Resources:

File Scanning Process Documentation
File Scan API Reference: