Integrating with Airtable (Python)

Airtable is a popular cloud collaboration tool that lands somewhere between a spreadsheet and database. As such, it can house all sorts of sensitive data which you may not want surfaced in a shared environment.

By utilizing Airtable's API in conjunction with Nightfall AI’s scan API, you can discover, classify, and remediate sensitive data within your Airtable bases.

Prerequisites

You will need a few things to follow along with this tutorial:

Installation

Install the Nightfall SDK and the requests library using pip.

pip install nightfall=1.2.0
pip install requests

Creating the Example

To start, import all the libraries we will be using.

The json, os, and csv libraries are part of Python so we don't need to install them.

import requests
import json
import os
import csv
from nightfall import Nightfall

We've configured the Airtable and Nightfall API keys as environment variables so they are not written directly into the code.

nightfall_api_key = os.environ.get('NIGHTFALL_API_KEY')
airtable_api_key = os.environ.get('AIRTABLE_API_KEY')

Next we define the Detection Rule with which we wish to scan our data.

The Detection Rule can be pre-made in the Nightfall web app and referenced by UUID.

Also, we abstract a nightfall class from the SDK, for our API key.

detectionRuleUUID = os.environ.get('DETECTION_RULE_UUID')

nightfall = Nightfall(nightfall_api_key)

The Airtable API doesn't list all bases in a workspace or all tables in a base; instead you must specifically call each table to get its contents.

In this example we have set up a config.json file to store that information for the Airtable My First Workspace bases.You may also wish to consider setting up a separate Base and Table that stores your schema and retrieve that information with a call to the Airtable API.

[
    {
        "base_id": "appp4vxoDwgURFwYp",
        "base_name": "Product Planning",
        "tables": [
            "Stories", 
            "Epics", 
            "Sprints", 
            "Release Milestones", 
            "Facets", 
            "App Sections"
        ]
    },
    {
        "base_id": "appwWnUfLVJhltYQv",
        "base_name": "Product Launch",
        "tables": [
            "Features",
            "Product Themes",
            "Monthly Newsletters"
        ]
    }
  ]

As an extension of this exercise, you could write Nightfall findings back to another table within that Base.

Now we set up the parameters we will need to call the Airtable API using the previously referenced API key and config file.

airtable_config = json.load(open('config.json', 'r'))
airtable_base_url = 'https://api.airtable.com/v0'
airtable_headers = {
  "Authorization": f"Bearer {airtable_api_key}"
}

We will now call the Airtable API to retrieve the contents of our Airtable workspace. The data hierarchy in Airtable goes Workspace > Base > Table. We will need to perform a GET request on each table in turn.

As we go along, we will convert each data field into its own string enriched with identifying metadata so that we can locate and remediate the data later should sensitive findings occur.

🚧

Warning

If you are sending more than 50,000 items or more than 500KB, consider using the file API. You can learn more about how to use the file API in the Using the File Scanning Endpoint with Airtable section below.

all_airtable = []

for base in airtable_config:
    base_id = base['base_id']
    req_tables = [i.replace(' ', '%20') for i in base['tables']]

    for table in req_tables:
        airtable_url = f"{airtable_base_url}/{base_id}/{table}"
        airtable_response = requests.get(airtable_url, headers=airtable_headers)
        airtable_content = json.loads(airtable_response.text)

        for i in airtable_content['records']:
            # We enrich each datum with metadata so it can be easily located later
            cur_str = f"BaseName: {base['base_name']} -|- BaseID: {base_id} -|- Table: {table} -|- Record: {i['id']} -|- Field: "

            for j in i['fields']:
                str_to_send = f"{cur_str}{j} -|- Content: {i['fields'][j]}"
                all_airtable.append(str_to_send)

Before moving on we will define a helper function to use later so that we can unpack the metadata from the strings we send to the Nightfall API.

def str_parser(sent_str):
    split_str = sent_str.split(' -|- ')
    split_dict = {i[:i.find(': ')]: i[i.find(': ')+2:] for i in split_str[:5]}
    findertext = f" -|- Field: {split_dict['Field']} -|- Content: "
    split_dict['Content'] = sent_str[sent_str.find(findertext)+len(findertext):]
    return split_dict

We will begin constructing an all_findings object to collect our results. The first row of our all_findings object will constitute our headers, since we will dump this object to a CSV file later.

This example will include the full finding below. As the finding might be a piece of sensitive data, we recommend using the Redaction feature of the Nightfall API to mask your data.

all_findings = []
all_findings.append(
  [
    'base_name', 'base_id', 'table_name', 'record_id', 'field',
    'detector', 'confidence', 
    'finding_start', 'finding_end', 'finding'
  ]
)

Now we call the Nightfall API on content retrieved from Airtable. For every sensitive data finding we receive, we strip out the identifying metadata from the sent string and store it with the finding in all_findings so we can analyze it later.

findings, redactions = nightfall.scan_text(
    all_airtable,
    detection_rule_uuids=[detectionRuleUUID]
)

# This level of loop corresponds to each list item sent to the Nightfall API
for field_idx, field_findings in enumerate(findings):
    
    sent_str = all_airtable[field_idx]
    # We call the helper function we defined earlier to help us parse the string sent to the Nightfall API
    parsed_str = str_parser(sent_str)
    offset = len(sent_str) - len(parsed_str['Content'])

    # This loop corresponds to each finding within an item sent to the Nightfall API
    for finding in field_findings:

        # If a finding is returned within the metadata for the content, we discount it
        if finding.byte_range.start < offset:
            continue

        # Add finding data to all_findings
        all_findings.append([
            parsed_str['BaseName'],
            parsed_str['BaseID'],
            parsed_str['Table'],
            parsed_str['Record'],
            parsed_str['Field'],
            finding.detector_name,
            finding.confidence.value,
            finding.byte_range.start,
            finding.byte_range.end,
            finding.finding
        ])

Finally, we export our results to a csv so they can be easily reviewed.

if len(all_findings) > 1:
    with open('output_file.csv', 'w') as output_file:
        csv_writer = csv.writer(output_file, delimiter = ',')
        csv_writer.writerows(all_findings)
else:
    print('No sensitive data detected. Hooray!')

That's it! You now have insight into all of the sensitive data stored within your Airtable workspace!

As a next step, you could write your findings to a separate 'Nightfall Findings' Airtable base for review or you could update and redact confirmed findings in situ using the Airtable API.

Using the File Scanning Endpoint with Airtable

The example above is specific for the Nightfall Text Scanning API. To scan files, we can use a similar process as we did the text scanning endpoint. The process is broken down in the sections below, as the file scanning process is more intensive.

File Scan Prerequisites

In order to utilize the File Scanning API you need the following:

  • An active API Key authorized for file scanning passed via the header Authorization: Bearer — see Authentication and Security
  • A Nightfall Detection Policy associated with a webhook URL
  • A web server configured to listen for file scanning results (more information below)

File Scan Implementation

Retrieve Airtable Data

Similar to the process in the beginning of this tutorial for the text scanning endpoint, we will now initialize our and retrieve the data we want to retrieve from Airtable.

airtable_config = json.load(open('config.json', 'r'))
airtable_base_url = 'https://api.airtable.com/v0'
airtable_headers = {
  "Authorization": f"Bearer {airtable_api_key}"
}

all_airtable = []
all_airtable.append(
  ['base_name', 'base_id', 'table_name', 'record_id', 'field', 'content']
)

Now we go through write the data to a .csv file.

filename = "nf_airtable_input-" + str(int(time.time())) + ".csv"

for base in airtable_config:
    base_id = base['base_id']
    req_tables = [i.replace(' ', '%20') for i in base['tables']]

    for table in req_tables:
        airtable_url = f"{airtable_base_url}/{base_id}/{table}"
        airtable_response = requests.get(airtable_url, headers=airtable_headers)
        airtable_content = json.loads(airtable_response.text)

        for i in airtable_content['records']:
            for j in i['fields']:
                # We enrich each datum with metadata so it can be easily located later
                # BaseName, BaseID, Table, Record, Field, Content
                row = [base['base_name'], base_id, table, i['id'], j, i['fields'][j]]
                all_airtable.append(row)

with open(filename, 'w') as output_file:
    csv_writer = csv.writer(output_file, delimiter=',')
    csv_writer.writerows(all_airtable)

print("Airtable Data Written to: ", filename)

Upload to the Scan API

Using the above .csv file, begin the Scan API file upload process.

scan_id, message = nightfall.scan_text(
    filename,
    webhook_url=WEBHOOK_URL,
    detection_rule_uuids=[detectionRuleUUID],
)

Use the Scan Endpoint

Once the files have been uploaded, use the scan endpoint.

A webhook server is required for the scan endpoint to submit its results. See our example webhook server.

Webhook Handles the Response

The scanning endpoint will work asynchronously for the files uploaded, so you can monitor the webhook server to see the API responses and file scan findings as they come in.

Additional Resources

File Scanning Process Documentation
File Scan API Reference: