Integrating with Snowflake (Python)

Snowflake is a data warehouse built on top of the Amazon Web Services or Microsoft Azure cloud infrastructure. This tutorial demonstrates how to use the Nightfall API for scanning a Snowflake database.

This tutorial allows you to scan your Snowflake databases using the Nightfall API/SDK.

You will need a few things first to use this tutorial:

  • A Snowflake account with at least one database
  • A Nightfall API key
  • An existing Nightfall Detection Rule
  • Most recent version of Python Nightfall SDK

We will first install the required Snowflake Python connector modules and the Nightfall SDK that we need to work with:

pip install snowflake-connector-python
pip install nightfall=0.6.0

To accomplish this, we will be using Python and importing the following libraries:

import requests
import snowflake.connector
import os
import sys
import json
from nightfall import Nightfall

We will set the size and length limits for data allowed by the Nightfall API per request. Also, we extract our API Key, and abstract a nightfall class from the SDK, for it.

size_limit = 500000
length_limit = 50000

Next we extract our API Key, and abstract a nightfall class from the SDK, for it.

nightfall = Nightfall(os.environ['NIGHTFALL_API_KEY'])

Next we define the Detection Rule with which we wish to scan our data. The Detection Rule can be pre-made in the Nightfall web app and referenced by UUID.

detectionRuleUUID = os.environ.get('DETECTION_RULE_UUID')

First we will set up the connection with Snowflake, and get the data to be scanned from there.

Note, we are setting the Snowflake authentication information as the below environment variables, and referencing the values from there:

  • SNOWFLAKE_USER
  • SNOWFLAKE_PASSWORD
  • SNOWFLAKE_ACCOUNT
  • SNOWFLAKE_DATABASE
  • SNOWFLAKE_SCHEMA
  • SNOWFLAKE_TABLE
  • SNOWFLAKE_PRIMARY_KEY
connection = snowflake.connector.connect(
  user=os.environ.get('SNOWFLAKE_USER'),
  password=os.environ.get('SNOWFLAKE_PASSWORD'),
  account=os.environ.get('SNOWFLAKE_ACCOUNT'),
  schema=os.environ.get('SNOWFLAKE_SCHEMA'),
  database=os.environ.get('SNOWFLAKE_DATABASE')
)
table_name = os.environ.get('SNOWFLAKE_TABLE')
primary_key = os.environ.get('SNOWFLAKE_PRIMARY_KEY')

cursor = connection.cursor()

sql = f"""
        SELECT *
        FROM {table_name}
        LIMIT 1000;
        """

cursor.execute(sql)

cols = [i[0] for i in cursor.description]
data = cursor.fetchall()

We can then check the data size, and as long as it is below the aforementioned limits, can be ran through the API.

If the data payloads are larger than the size or length limits of the API, extra code will be required to further chunk the data into smaller bits that are processable by the Nightfall scan API.

This can be seen in the second and third code panes below:

primary_key_col = []

if len(data) == 0:
  raise Exception('Table is empty! No data to scan.')

all_findings = []
for col_idx, col in enumerate(columns):
    payload = [str(i[col_idx]) for i in data]
    if col == primary_key:
      primary_key_col = payload
      col_size = sys.getsizeof(payload)

    if col_size < size_limit:
   	 resp = nightfall.scanText(
        [payload],
        detection_rule_uuids=[detectionRuleUUID])
    
     col_resp = json.loads(resp)

for item_idx, item in enumerate(col_resp):
  if item != None:
    for finding in item:
      finding['column'] = col
      try:
        finding['index'] = primary_key_col[item_idx]
      except:
          finding['index'] = item_idx
      all_findings.append(finding)
col_resp = []
chunks = []
chunk = []
running_size = 0
big_items = []

for item_idx, item in enumerate(payload):
  item_size = sys.getsizeof(item)
  if (running_size + item_size < size_limit) and (len(chunk) < length_limit):
    chunk.append(item)
    running_size += item_size
  elif item_size < size_limit:
    chunks.append(chunk)
    chunk = [item]
    running_size = item_size
  else:
    if len(chunk) < length_limit:
      chunk.append('')
    else:
      chunks.append(chunk)
      chunk = ['']
      big_items.append(item_idx)
      chunks.append(chunk)

chunk_cursor = 0

for chunk in chunks:
  resp = nightfall.scanText({
        "text": [chunk],
        "detectionRuleUUIDs": [conditionSetUUID]})
  col_resp.extend(json.loads(resp.text))
  chunk_cursor += len(chunk)
  
for item_idx, item in enumerate(col_resp):
  if item != None:
    for finding in item:
      finding['column'] = col
      try:
        finding['index'] = primary_key_col[item_idx]
      except:
          finding['index'] = item_idx
      all_findings.append(finding)
for big in big_items:
  item_size = sys.getsizeof(big)
    chunks_req = (item_size // size_limit) + 1
    chunk_len = len(item) // chunks_req
    cursor = 0
    item_findings = []
    for _ in range(chunks_req):
        p = item[cursor : min(cursor + chunk_len, len(item))]
        resp = nightfall.scanText({
        "text": [[p]],
        "detectionRuleUUIDs": [conditionSetUUID]})
        item_findings.extend(json.loads(resp.text))
        cursor += chunk_len
  
  if item_findings == []:
    raise Exception(f"Error while scanning large item at column {col}, Index {primary_key_col[big]}")
  for find_chunk in item_resp:
      if find_chunk != None:
        for finding in find_chunk:
          finding['column'] = col
          try:
            finding['index'] = primary_key_col[big]
          except:
            finding['index'] = big
          all_findings.append(finding)

To review the results, we will print the number of findings, and write the findings to an output file:

print(f"{len(all_findings)} sensitive findings in {os.environ.get('SNOWFLAKE_TABLE')}")
with open('snowflake_findings.json', 'w') as output_file:
  json.dump(all_findings, output_file)

The following are potential ways to continue building upon this service:

  • Writing Nightfall results to a database and reading that into a visualization tool
  • Redacting sensitive findings in place once they are detected, either automatically or as a follow-up script once findings have been reviewed

Using Redaction to Mask Findings

With the Nightfall API, you are also able to redact and mask your Snowflake findings. You can add a Redaction Config, as part of your Detection Rule. For more information on how to use redaction, and its specific options, please refer to the guide here.

Using the File Scanning Endpoint with Snowflake

The example above is specific for the Nightfall Text Scanning API. To scan files, we can use a similar process as we did the text scanning endpoint. The process is broken down in the sections below, as the file scanning process is more intensive.

Prerequisites:

In order to utilize the File Scanning API you need the following:

  • An active API Key authorized for file scanning passed via the header Authorization: Bearer — see Authentication and Security
  • A Nightfall Detection Policy associated with a webhook URL
  • A web server configured to listen for file scanning results (more information below)

The steps to use the endpoint are as follows:

  1. Retrieve data from Snowflake

Similar to the process in the beginning of this tutorial for the text scanning endpoint, we will now initialize our Snowflake Connection. Once the session is established, we can query from Snowflake.

connection = snowflake.connector.connect(
  user=os.environ.get('SNOWFLAKE_USER'),
  password=os.environ.get('SNOWFLAKE_PASSWORD'),
  account=os.environ.get('SNOWFLAKE_ACCOUNT'),
  schema=os.environ.get('SNOWFLAKE_SCHEMA'),
  database=os.environ.get('SNOWFLAKE_DATABASE')
)
table_name = os.environ.get('SNOWFLAKE_TABLE')
primary_key = os.environ.get('SNOWFLAKE_PRIMARY_KEY')

cursor = connection.cursor()

sql = f"""
        SELECT *
        FROM {table_name}
        LIMIT 1000;
        """

cursor.execute(sql)

cols = [i[0] for i in cursor.description]
data = cursor.fetchall()

Now we go through the data and write to a .csv file.

primary_key_col = []

if len(data) == 0:
  raise Exception('Table is empty! No data to scan.')

filename = "nf_snowflake_input-" + str(int(time.time())) + ".csv"  

for col_idx, col in enumerate(columns):
    payload = [str(i[col_idx]) for i in data]   
    with open(filename, 'w') as output_file:
      csv_writer = csv.writer(output_file, delimiter=',')
      csv_writer.writerows(payload)
     
print("Snowflake Data Written to: ", filename)
  1. Begin the file upload process to the Scan API, with the above written .csv file, as shown here.

  2. Once the files have been uploaded, begin using the scan endpoint mentioned here. Note: As can be seen in the documentation, a webhook server is required for the scan endpoint, to which it will send the scanning results. An example webhook server setup can be seen here.

  3. The scanning endpoint will work asynchronously for the files uploaded, so you can monitor the webhook server to see the API responses and file scan findings as they come in.

Resources:

File Scanning Process Documentation
File Scan API Reference: