Integrating with Airtable (Python)

Airtable is a popular cloud collaboration tool that lands somewhere between spreadsheet and database. As such, it can house all sorts of sensitive data which you may not want surfaced in a shared environment. By utilizing Airtable's API in conjunction with Nightfall AI’s scan API you can discover, classify, and remediate sensitive data within your Airtable bases.

You will need a few things to follow along with this tutorial:

  • An Airtable account and API key
  • A Nightfall API key
  • An existing Nightfall Detection Rule
  • A Python 3 environment (version 3.6 or later)
  • Most recent version of Python Nightfall SDK

To accomplish this, we will install the version required of the Nightfall SDK:

pip install nightfall=0.6.0

We will install the version required of the Nightfall SDK:

pip install nightfall=0.6.0

We will be using Python and importing the following libraries:

import requests 
import json 
import os 
import csv
import sys
from nightfall import Nightfall

We've configured the Airtable and Nightfall API keys as environment variables so they don't need to be committed directly into our code.

nightfall_api_key = os.environ.get('NIGHTFALL_API_KEY')
airtable_api_key = os.environ.get('AIRTABLE_API_KEY')

Next we define the Detection Rule with which we wish to scan our data. The Detection Rule can be pre-made in the Nightfall web app and referenced by UUID.

Also, we abstract a nightfall class from the SDK, for our API key.

detectionRuleUUID = os.environ.get('DETECTION_RULE_UUID')

nightfall = Nightfall(nightfall_api_key)

The Airtable API doesn't list all bases in a workspace or all tables in a base; instead you must specifically call each table to get its contents. In this example we have set up a config.json file to store that information for the Airtable My First Workspace bases. You may also wish to consider setting up a separate Base and Table that stores your schema and retrieve that information with a call to the Airtable API.
As an extension of this exercise, you could even write Nightfall findings back to another table within that Base.

[
    {
        "base_id": "appp4vxoDwgURFwYp",
        "base_name": "Product Planning",
        "tables": [
            "Stories", 
            "Epics", 
            "Sprints", 
            "Release Milestones", 
            "Facets", 
            "App Sections"
        ]
    },
    {
        "base_id": "appwWnUfLVJhltYQv",
        "base_name": "Product Launch",
        "tables": [
            "Features",
            "Product Themes",
            "Monthly Newsletters"
        ]
    }
  ]

Now we set up the parameters we will need to call the Airtable API using the previously referenced API key and config file.

airtable_config = json.load(open('config.json', 'r'))
airtable_base_url = 'https://api.airtable.com/v0'
airtable_headers = {
  "Authorization": f"Bearer {airtable_api_key}"
}

We will now call the Airtable API to retrieve the contents of our Airtable workspace. The data hierarchy in Airtable goes Workspace > Base > Table; we will need to perform a GET request on each table in turn.

As we go along, we will convert each data field into its own string enriched with identifying metadata so that we can locate and remediate the data later should sensitive findings occur. We will also keep a running tally of our payload size (both in memory and in list items) in case we have to break the payload down across multiple requests to the Nightfall SDK.

all_airtable = []
running_size = 0
chunk_len = 0
indices_to_chunk = [0]

for base in airtable_config:
  base_id = base['base_id']
  req_tables = [i.replace(' ', '%20') for i in base['tables']]
  
  for table in req_tables:
    airtable_url = f"{airtable_base_url}/{base_id}/{table}"
    airtable_response = requests.get(airtable_url, headers = airtable_headers)
    airtable_content = json.loads(airtable_response.text)

    for i in airtable_content['records']:
      # We enrich each datum with metadata so it can be easily located later
      cur_str = f"BaseName: {base['base_name']} -|- BaseID: {base_id} -|- Table: {table} -|- Record: {i['id']} -|- Field: "
      
      for j in i['fields']:
        str_to_send = f"{cur_str}{j} -|- Content: {i['fields'][j]}"
        # We'll check payload size as we go in case we need to break this up
        running_size += sys.getsizeof(str_to_send)
        memory_test = running_size >= nightfall_payload_limit
        length_test = (len(all_airtable) - indices_to_chunk[-1]) >= nightfall_payload_length
        
        if memory_test or length_test:
          indices_to_chunk.append(len(all_airtable))
          running_size = sys.getsizeof(str_to_send)
          all_airtable.append(str_to_send)

Before moving on we will define a helper function to use later so that we can unpack the metadata from the strings we send to the Nightfall API.

def str_parser(sent_str):
    split_str = sent_str.split(' -|- ')
    split_dict = {i[:i.find(': ')]: i[i.find(': ')+2:] for i in split_str[:5]}
    findertext = f" -|- Field: {split_dict['Field']} -|- Content: "
    split_dict['Content'] = sent_str[sent_str.find(findertext)+len(findertext):]
    return split_dict

We will begin constructing an all_findings object to collect our results. The first row of our all_findings object will constitute our headers, since we will dump this object to a CSV file later.

This example will include the full finding below. As the finding might be a piece of sensitive data, we would recommend using the Redaction feature of the Nightfall API to mask your data. More information can be seen in the 'Using Redaction to Mask Findings' section below.

all_findings = []
all_findings.append(
  [
    'base_name', 'base_id', 'table_name', 'record_id', 'field'
    'detector', 'confidence', 
    'finding_start', 'finding_end', 'finding'
  ]
)

Now we call the Nightfall API on content retrieved from Airtable, chunked into slices that fit into the Nightfall API's payload limit.
For every sensitive data finding we receive, we strip out the identifying metadata from the sent string and store it with the finding in all_findings so we can analyze it later.

indices_to_chunk.append(len(all_airtable))
idx_start = 0
for chunk in indices_to_chunk[1:]:

  # Break the AirTable content into a digestible chunk for the Nightfall API
  send_content = all_airtable[idx_start:chunk]

  nightfall_response = nightfall.scan_text(
    [send_content],
    detection_rule_uuids=[detectionRuleUUID]
  )            
  
  findings = json.loads(nightfall_response)

  # This level of loop corresponds to each list item sent to the Nightfall API
  for field_idx, field_findings in enumerate(findings):

    sent_str = all_airtable[field_idx]
    # We call the helper function we defined earlier to help us parse the string sent to the Nightfall API
    parsed_str = str_parser(sent_str)
    offset = len(sent_str) - len(parsed_str['Content'])

    # This loop corresponds to each finding within an item sent to the Nightfall API
    for finding in field_findings:

      fragment_start = finding['location']['byteRange']['start']
      fragment_end = finding['location']['byteRange']['end']

      # If a finding is returned within the metadata for the content, we discount it
      if fragment_start < offset:
        continue
      else:
        fragment_start -= offset
        fragment_end -= offset
        chars_before = fragment_start
      chars_after = len(relevant_text) - fragment_end
        pre_context = parsed_str['Content'][fragment_start - min(20,chars_before): fragment_start]
        post_context = parsed_str['Content'][fragment_end: fragment_end + min(20, chars_after)]

        # Add finding data to all_findings
        all_findings.append([
          parsed_str['BaseName'],
          parsed_str['BaseID'],
          parsed_str['Table'],
          parsed_str['Record'],
          parsed_str['Field'],
          finding['detector']['name'],
          finding['confidence'],
          finding['location']['byteRange']['start']
          finding['location']['byteRange']['end'],
          finding['finding']
        ])
      
# Re-set the starting index for the next chunk
idx_start = chunk

Finally, we export our results to a csv so they can be easily reviewed.

if len(all_findings) > 1:
  with open('output_file.csv', 'w') as output_file:
    csv_writer = csv.writer(output_file, delimiter = ',')
    csv_writer.writerows(all_findings)
else:
  print('No sensitive data detected. Hooray!')

That's it! You now have insight into all of the sensitive data stored within your Airtable workspace!

As a next step, you could write your findings to a separate 'Nightfall Findings' Airtable base for review or you could update and redact confirmed findings in situ using the Airtable API.

Using Redaction to Mask Findings

With the Nightfall API, you are also able to redact and mask your findings. You can add a Redaction Config, as part of your Detection Rule. For more information on how to use redaction, and its specific options, please refer to the guide here.

Using the File Scanning Endpoint with Airtable

The example above is specific for the Nightfall Text Scanning API. To scan files, we can use a similar process as we did the text scanning endpoint. The process is broken down in the sections below, as the file scanning process is more intensive.

Prerequisites:

In order to utilize the File Scanning API you need the following:

  • An active API Key authorized for file scanning passed via the header Authorization: Bearer — see Authentication and Security
  • A Nightfall Detection Policy associated with a webhook URL
  • A web server configured to listen for file scanning results (more information below)

The steps to use the endpoint are as follows:

  1. Retrieve data from Airtable

Similar to the process in the beginning of this tutorial for the text scanning endpoint, we will now initialize our and retrieve the data we like, from Airtable.

airtable_config = json.load(open('config.json', 'r'))
airtable_base_url = 'https://api.airtable.com/v0'
airtable_headers = {
  "Authorization": f"Bearer {airtable_api_key}"
}

all_airtable = []
running_size = 0
chunk_len = 0
indices_to_chunk = [0]

Now we go through write the data to a .csv file.

filename = "nf_airtable_input-" + str(int(time.time())) + ".csv" 

for base in airtable_config:
  base_id = base['base_id']
  req_tables = [i.replace(' ', '%20') for i in base['tables']]
  
  for table in req_tables:
    airtable_url = f"{airtable_base_url}/{base_id}/{table}"
    airtable_response = requests.get(airtable_url, headers = airtable_headers)
    airtable_content = json.loads(airtable_response.text)

    for i in airtable_content['records']:
      # We enrich each datum with metadata so it can be easily located later
      cur_str = f"BaseName: {base['base_name']} -|- BaseID: {base_id} -|- Table: {table} -|- Record: {i['id']} -|- Field: "
      
      for j in i['fields']:
        str_to_send = f"{cur_str}{j} -|- Content: {i['fields'][j]}"
                with open(filename, 'w') as output_file:
          csv_writer = csv.writer(output_file, delimiter=',')
          csv_writer.writerows(scan_logs)

print("Airtable Data Written to: ", filename)
  1. Begin the file upload process to the Scan API, with the above written .csv file, as shown here.

  2. Once the files have been uploaded, begin using the scan endpoint mentioned here. Note: As can be seen in the documentation, a webhook server is required for the scan endpoint, to which it will send the scanning results. An example webhook server setup can be seen here.

  3. The scanning endpoint will work asynchronously for the files uploaded, so you can monitor the webhook server to see the API responses and file scan findings as they come in.

Resources:

File Scanning Process Documentation
File Scan API Reference: