Tutorials

Enriching your Snowflake data with external functions | Census

Dave Hughes October 28, 2020

Data enrichment is a powerful technique for adding valuable context to your customer data, and it’s simple in theory – given a bit of data about a record (e.g. a user's email or the IP of a web session), tell me more information about it so I can take effective action (e.g. map the email domain to a company or map the IP to a geolocation). In practice, however, it can be difficult to integrate a data enrichment process into an existing ELT pipeline, for a few reasons:

  • Maintaining Freshness: How recent are the results being correlated with my data? This may be a function of how often external datasets are loaded or how up-to-date your third-party data provider’s information is.
  • Ensuring Consistency: Once data is enriched with new attributes, can I easily push those attributes to my warehouse and SaaS tools to maintain a synchronized data model?
  • Limiting Resource Usage: Enrichment processes can be expensive in storage costs (to maintain huge third-party datasets), processing time, and API quota usage for paid enrichment services. And enrichment processes shouldn’t unnecessarily repeat the same work over and over again.

In this post, I’ll share our recipe for enriching our own customer data at Census using the Clearbit API, Snowflake External Functions, and dbt to enrich data correctly and efficiently over large datasets.

One note before we begin, if you’re new to data enrichment: there’s a key difference between enriching using a dataset and enriching using an API. Some enrichment pipelines make use of third party datasets that can be purchased in their entirety – you can see many examples of these in the Snowflake Data Marketplace. This kind of data can be easier to work with because you can rely on your warehouse to efficiently LEFT JOIN to these datasets and materialize the result on a schedule. However, an enrichment API doesn’t give you the full dataset to work with; it replies to your queries (one at a time or in bulk) in a request-reply style to tell you if the API has additional data for the company or person in question. These are more challenging to work with in a data warehousing context, and we built this pipeline so that we can perform both types of enrichment without leaving the comfort of our warehouse SQL environment.

Snowflake's External Functions

External functions are a recent addition in modern data warehouses. An external function is like a traditional user-defined function (UDF), but it allows you to break free of the database sandbox by calling an external handler to calculate results. For example, Redshift has basic support for external functions that call Lambda functions to compute their results.

Snowflake's external functions call out to an AWS API Gateway endpoint to retrieve results, which allows us to connect many different handlers, including Lambda functions. Calls from SQL will be batched into JSON HTTP requests and proxied to the configured handler, which needs to unwrap the batch items, calculate appropriate values, and then return a batch of results as JSON. Setting up a gateway and handlers in AWS for the first time isn’t trivial, but the design allows a lot of flexibility. I won't go through that process in this post but you can read about it in Snowflake's documentation. We automate this using a Terraform module, which we hope to open source soon. Feel free to contact us if you’re curious!

Architecture diagram of our external functions with backing resources

Once the remote gateway is set up, we need to set up an API integration in Snowflake and create one or more external functions that use the integration:


CREATE OR REPLACE API INTEGRATION clearbit_api_integration
  api_provider=aws_api_gateway
  -- This role is created during gateway setup and is the role that a Snowflake-managed user will assume
  api_aws_role_arn='arn:aws:iam::123456789012:role/gateway_access_role'
  api_allowed_prefixes=('https://<gateway-id>.execute-api.us-west-2.amazonaws.com/production')
  enabled=true;

CREATE OR REPLACE EXTERNAL FUNCTION clearbit_person(email_col VARCHAR)
  RETURNS VARIANT
  API_INTEGRATION = clearbit_api_integration
  AS 'https://<gateway-id>.execute-api.us-west-2.amazonaws.com/production/clearbit_person';

CREATE OR REPLACE EXTERNAL FUNCTION clearbit_company(domain_col VARCHAR)
  RETURNS VARIANT
  API_INTEGRATION = clearbit_api_integration
  AS 'https://<gateway-id>.execute-api.us-west-2.amazonaws.com/production/clearbit_company';

Once the API Integration and external functions are created, assuming that the backing gateway is configured properly and our Lambda that forwards requests to Clearbit's API is implemented properly, we should be able to call them like any other SQL function:

> SELECT CLEARBIT_PERSON('dave@getcensus.com');                                                                                     
+----------------------------------------------------------------------------------------------------------------------------------+
| CLEARBIT_PERSON('DAVE@GETCENSUS.COM')                                                                                            |
|----------------------------------------------------------------------------------------------------------------------------------|
| {                                                                                                                                |
|   "company": {                                                                                                                   |
|     "category": {                                                                                                                |
|       "industry": null,                                                                                                          |
|       "industryGroup": null,                                                                                                     |
|       "naicsCode": null,                                                                                                         |
|       "sector": null,                                                                                                            |
|       "sicCode": null,                                                                                                           |
|       "subIndustry": null                                                                                                        |
|     },                                                                                                                           |
|     "crunchbase": {                                                                                                              |
|       "handle": null                                                                                                             |
|     },                                                                                                                           |
|     "description": "Census is the easiest way to sync your data warehouse to the apps you use. No engineering favors required.", |
|     "domain": "getcensus.com",                                                                                                   |
|     "domainAliases": [                                                                                                           |
|       "sutrolabs.com"                                                                                                            |
|     ],                                                                                                                           |
|     "emailProvider": false,                                                                                                      |
|     ...                                                                                                                          |
|   },                                                                                                                             |
|   "person": {                                                                                                                    |
|     "avatar": null,                                                                                                              |
|     "bio": null,                                                                                                                 |
|     "email": "dave@getcensus.com",                                                                                               |
|     "emailProvider": false,                                                                                                      |
|     "employment": {                                                                                                              |
|       "domain": "getcensus.com",                                                                                                 |
|       "name": "Census",                                                                                                          |
|       "role": "engineering",                                                                                                     |
|       "seniority": null,                                                                                                         |
|       "subRole": "software_engineer",                                                                                            |
|       "title": "Software Engineer"                                                                                               |
|     },                                                                                                                           |
|     ...                                                                                                                          |
|   }                                                                                                                              |
| }                                                                                                                                |
+----------------------------------------------------------------------------------------------------------------------------------+

Caching for Performance (and saving your API quotas)

Although our CLEARBIT_PERSON function now works properly, we have another issue to contend with: every time our ELT pipeline steps run, we will call Clearbit's API through the external function handler, eating into our API quota and running slowly due to the overhead of retrieving remote data. Since we don't expect the enrichment data to change with high frequency and can probably tolerate some level of staleness, we can save API invocations and execution time by caching the lookup results.

Let's start with a simple example with a users model that we'd like to enrich with each user's title, company name, and company domain. We'll accomplish this with a handful of dbt models that combine to form these logical data flows:

Subgraphs for fetching and joining enrichment data

Notice that there is some overlap between the model nodes, as these flows form the following topology when combined.

Combined user enrichment flow

The pieces that we'll want in place are:

users  – The primary user data table

+----+---------------------------+--------------------+
| ID | CREATED_AT                | EMAIL              |
|----+---------------------------+--------------------|
|  1 | 2020-10-15 13:05:22 -0700 | dave@getcensus.com |
+----+---------------------------+--------------------+

user_clearbit_data_all –  A table to hold Clearbit data for each email we look up, along with a timestamp when it was fetched. This may have multiple records per email when data is stale and needs to be refetched.

+-------+-------------------------------+---------------+
| EMAIL | FETCHED_AT                    | CLEARBIT_DATA |
|-------+-------------------------------+---------------|
| ...   | ...                           | {...}         |
+-------+-------------------------------+---------------+

Since we're using dbt to express our models, this ends up as part of the definition of model user_clearbit_data_all, which uses an incremental materialization strategy (meaning that query results are added to the existing data in the table rather than overwriting them).




-- new rows to insert into user_clearbit_data_all
SELECT                                   
  CURRENT_TIMESTAMP AS fetched_at,       
  email,                                 
  CLEARBIT_PERSON(email) AS clearbit_data
FROM 

user_clearbit_data –  A view to filter user_clearbit_data_all to the freshest version fetched and exclude any stale data


WITH                                                        
  fresh_fetches AS (                                        
    SELECT *                                                
    FROM 
    -- impose definition of stale -> fetched more than 7 days ago
    WHERE fetched_at > DATEADD('day', -7, CURRENT_TIMESTAMP)
  ),                                                        

ranked_fetches_by_recency AS (                            
    SELECT                                                  
      *,                                                    
      ROW_NUMBER() OVER (                                   
        PARTITION BY email                                  
        ORDER BY fetched_at DESC                            
      ) AS rank                                             
    FROM fresh_fetches                                      
  ),                                                        

most_recent_fetches_by_email AS (                         
    SELECT                                                  
      fetched_at,                                           
      email,                                                
      clearbit_data                                         
    FROM ranked_fetches_by_recency                          
    WHERE rank = 1                                          
  )

SELECT * FROM most_recent_fetches_by_email

users_enriched – a view joining users and user_clearbit_data into combined, enriched records.


-- users_enriched
SELECT
  u.*,
  clearbit_data:person.employment.title AS title,  
  clearbit_data:company.name AS company_name,      
  clearbit_data:company.domain AS company_domain   
FROM  u
LEFT JOIN {{ ref('user_clearbit_data') || c ON u.email = c.email

users_to_enrich – a view to compute the email values in users that aren't represented in user_clearbit_data.  (We will need this to update the enrichment data)


-- users_to_enrich
SELECT u.email
FROM {{ source('enrichment', 'users') }} u
LEFT JOIN {{ source('enrichment', 'user_clearbit_data') }} c ON u.email = c.email
WHERE c.email IS NULL

With these model definitions in place, we can execute dbt run --models +users_enriched to build the tables and views, then SELECT * FROM users_enriched to view the results.

Deploying this in production

The ability to incorporate API data results into the declarative ELT flows enabled by SQL/dbt is a game-changer, and provides a worthy replacement for external workers that periodically run enrichment jobs against the data warehouse. However, it may be complex for some teams to set up and maintain these patterns, so it’s worth checking comfort levels with the stack components before taking the plunge. Here at Census, we are exploring ways to make this process more turnkey for our customers, so watch this space for updates.

  1. One pitfall that dbt users may recognize in the topology above is that the diff step introduces a cycle in the model graph, which isn't allowed in dbt since its dependency graph is a DAG. To avoid introducing a true dependency cycle, we use a source reference (rather than a ref) to user_clearbit_data in the users_to_enrich model, effectively changing dbt's instructions from "compute this dependency" to "use the contents of this table/view as-is". This is definitely something of a hack and has some downsides (like needing to be defined separately but kept in sync with the user_clearbit_data model), but it enables us to do something here that we couldn't do otherwise.

Related articles

Customer Stories
Built With Census Embedded: Labelbox Becomes Data Warehouse-Native
Built With Census Embedded: Labelbox Becomes Data Warehouse-Native

Every business’s best source of truth is in their cloud data warehouse. If you’re a SaaS provider, your customer’s best data is in their cloud data warehouse, too.

Best Practices
Keeping Data Private with the Composable CDP
Keeping Data Private with the Composable CDP

One of the benefits of composing your Customer Data Platform on your data warehouse is enforcing and maintaining strong controls over how, where, and to whom your data is exposed.

Product News
Sync data 100x faster on Snowflake with Census Live Syncs
Sync data 100x faster on Snowflake with Census Live Syncs

For years, working with high-quality data in real time was an elusive goal for data teams. Two hurdles blocked real-time data activation on Snowflake from becoming a reality: Lack of low-latency data flows and transformation pipelines The compute cost of running queries at high frequency in order to provide real-time insights Today, we’re solving both of those challenges by partnering with Snowflake to support our real-time Live Syncs, which can be 100 times faster and 100 times cheaper to operate than traditional Reverse ETL. You can create a Live Sync using any Snowflake table (including Dynamic Tables) as a source, and sync data to over 200 business tools within seconds. We’re proud to offer the fastest Reverse ETL platform on the planet, and the only one capable of real-time activation with Snowflake. 👉 Luke Ambrosetti discusses Live Sync architecture in-depth on Snowflake’s Medium blog here. Real-Time Composable CDP with Snowflake Developed alongside Snowflake’s product team, we’re excited to enable the fastest-ever data activation on Snowflake. Today marks a massive paradigm shift in how quickly companies can leverage their first-party data to stay ahead of their competition. In the past, businesses had to implement their real-time use cases outside their Data Cloud by building a separate fast path, through hosted custom infrastructure and event buses, or piles of if-this-then-that no-code hacks — all with painful limitations such as lack of scalability, data silos, and low adaptability. Census Live Syncs were born to tear down the latency barrier that previously prevented companies from centralizing these integrations with all of their others. Census Live Syncs and Snowflake now combine to offer real-time CDP capabilities without having to abandon the Data Cloud. This Composable CDP approach transforms the Data Cloud infrastructure that companies already have into an engine that drives business growth and revenue, delivering huge cost savings and data-driven decisions without complex engineering. Together we’re enabling marketing and business teams to interact with customers at the moment of intent, deliver the most personalized recommendations, and update AI models with the freshest insights. Doing the Math: 100x Faster and 100x Cheaper There are two primary ways to use Census Live Syncs — through Snowflake Dynamic Tables, or directly through Snowflake Streams. Near real time: Dynamic Tables have a target lag of minimum 1 minute (as of March 2024). Real time: Live Syncs can operate off a Snowflake Stream directly to achieve true real-time activation in single-digit seconds. Using a real-world example, one of our customers was looking for real-time activation to personalize in-app content immediately. They replaced their previous hourly process with Census Live Syncs, achieving an end-to-end latency of <1 minute. They observed that Live Syncs are 144 times cheaper and 150 times faster than their previous Reverse ETL process. It’s rare to offer customers multiple orders of magnitude of improvement as part of a product release, but we did the math. Continuous Syncs (traditional Reverse ETL) Census Live Syncs Improvement Cost 24 hours = 24 Snowflake credits. 24 * $2 * 30 = $1440/month ⅙ of a credit per day. ⅙ * $2 * 30 = $10/month 144x Speed Transformation hourly job + 15 minutes for ETL = 75 minutes on average 30 seconds on average 150x Cost The previous method of lowest latency Reverse ETL, called Continuous Syncs, required a Snowflake compute platform to be live 24/7 in order to continuously detect changes. This was expensive and also wasteful for datasets that don’t change often. Assuming that one Snowflake credit is on average $2, traditional Reverse ETL costs 24 credits * $2 * 30 days = $1440 per month. Using Snowflake’s Streams to detect changes offers a huge saving in credits to detect changes, just 1/6th of a single credit in equivalent cost, lowering the cost to $10 per month. Speed Real-time activation also requires ETL and transformation workflows to be low latency. In this example, our customer needed real-time activation of an event that occurs 10 times per day. First, we reduced their ETL processing time to 1 second with our HTTP Request source. On the activation side, Live Syncs activate data with subsecond latency. 1 second HTTP Live Sync + 1 minute Dynamic Table refresh + 1 second Census Snowflake Live Sync = 1 minute end-to-end latency. This process can be even faster when using Live Syncs with a Snowflake Stream. For this customer, using Census Live Syncs on Snowflake was 144x cheaper and 150x faster than their previous Reverse ETL process How Live Syncs work It’s easy to set up a real-time workflow with Snowflake as a source in three steps: