Getting Data into Snowflake using Snowpipe

Enable Heap data in Snowflake using our Retroactive S3 offering and Snowflake's connector and Snowpipe REST API.

Snowpipe is a managed, server-less ETL solution for ingesting data into Snowflake in real-time. Users can make calls to the Snowpipe REST API to ingest Heap data into target tables using Snowflake-provided compute resources.

Integrating with Retroactive S3

Customers on Retroactive S3 can call a public REST endpoint with the list of files in the sync manifest and a referenced pipe name (Java/Python SDKs provided for convenience) to ingest data into Snowflake.

Preparation Steps

Step 1: Create a snowstage for the data export S3 bucket

The snowstage does not need to specify a path beyond the bucket name as the specific path of the import files will be passed in during the ingest process.

CREATE OR REPLACE STAGE <stage_name> 
    URL='s3://<bucket_name>' 
    CREDENTIALS=(AWS_KEY_ID='ABC' AWS_SECRET_KEY='XYZ');

Step 2: Prepare to use the Snowpipe API

Snowflake has preparation guidelines for setting up Snowpipe API access. Users need to complete the following:

  • Create an ingest user and configure key pair authentication for the user.
  • Install the client SDK (Java or Python) for calling the Snowpipe public REST endpoints.

Recurring Sync Steps

These steps can be automated using the Snowflake connector and Snowpipe REST APIs. Snowflake provides sample Java and Python programs that illustrate how to submit a REST endpoint. Your program should read our manifest file for each sync and call the Snowpipe REST API to kick off the ingestion process.

Step 1: Kick off the ingest process

A manifest is uploaded to the bucket when a new export is available, which contains information about the export. This new manifest file should trigger your ingestion process. Your application should either poll for new manifests or use a notification service (e.g. Amazon SNS) that messages out when a new manifest file is detected. Amazon Lambda is a great service for hosting this ingest script as it can be set up to automatically run when a new manifest file is detected via S3 event notification.

Step 2: Ingest data using the Snowpipe API and the sync manifest's file list

The manifest file contains all syncing tables and a list of files that will be used to update each table. If new tables are present in this manifest that were not in the last export (e.g. you are starting to sync new events), you should create new tables (production and staging) and pipes before attempting to ingest data for these tables. For avro files, Snowflake only allows tables with one variant data-type column. However, you can build views that traverse the variant column after ingestion is complete.

Changing Schemas

Note that schemas may change over time (e.g. additional properties are added on users), so you will need to update the views using these tables when that occurs. We provide the columns for each table in the sync's metadata file, so you can check for new columns and modify the views accordingly.

Snowflake has good examples on using the connector to automate new table/pipe creation and on how to call the ingest API with a list of files.

CREATE OR REPLACE TABLE <table_name> (<json_object_column> VARIANT);

CREATE OR REPLACE PIPE <pipe_name> 
    AS COPY INTO <table_name> 
    FROM <stage_name> FILE_FORMAT = (TYPE ='AVRO');

Incremental Flag Logic
For each table, the manifest file contains an incremental flag that represents whether the files in the export should be used to append to or replace the table. You can use a staging table to handle the merge or replace action depending on the incremental flag for each table in the Heap manifest file:

  • For tables that have incremental flag set to false, drop the production table and rename the staging table.
  • For tables that have incremental flag set to true, merge the staging table with the production table. Merges should be handled on the primary key, which will be user_id for the users table and event_id for the events table(s).
MERGE INTO users AS u  
    USING users_staging AS s ON u.user_id = s.user_id
    WHEN MATCHED THEN UPDATE SET u.row_value = s.row_value
    WHEN NOT MATCHED THEN INSERT (row_value) VALUES (s.row_value);

Step 3: Apply the user migration data

user_migrations is a fully materialized mapping of from_user_ids to to_user_ids. You will need to join this with events/users tables to determine which user each event belongs to.

Step 4: Compute defined properties

Get the defined properties JSON file (file path is specified in the manifest) and convert to SQL. You can apply the definition via a view on all tables where properties are needed.

Example with the sample JSON defined properties file using a view:

[
  {
    "property_name": "channel",
    "type": "event",
    "cases": [
      {
        "value": {
          "type": "constant",
          "value": "Social"
        },
        "condition": {
          "boolean_operator": "or",
          "clauses": [
            {
              "boolean_operator": "and",
              "clauses": [
                {
                  "property_name": "campaign_name",
                  "operator": "=",
                  "value": "myfavoritecampaign"
                },
                {
                  "property_name": "utm_source",
                  "operator": "=",
                  "value": "facebook"
                }
              ]
            },
            {
              "property_name": "utm_source",
              "operator": "=",
              "value": "instagram"
            }
          ]
        }
      },
      {
        "value": {
          "type": "property",
          "value": "utm_source" // This is a property on the event
        },
        "condition": {
          "boolean_operator": "or",
          "clauses": [
            {
              "property_name": "utm_source",
              "operator": "=",
              "value": "google"
            },
            {
              "property_name": "utm_source",
              "operator": "=",
              "value": "bing"
            }
          ]
        }
      }
    ],
    "default_value": {
      "type": "constant",
      "value": "Unknown"
    }
  }
]
CREATE VIEW "event_table_with_defined_props_columns" AS
    SELECT
        <existing_columns_in_event_table>,
        CASE
            WHEN ((campaign_name = 'myfavoritecampaign' AND utm_source = 'facebook') OR utm_source = 'instagram') THEN 'Social'
            WHEN (utm_source = 'google' OR utm_source = 'bing') THEN utm_source 
            ELSE 'Unknown'
        END AS channel
    FROM event_table;