Automating Real-Time Data Ingestion and Analysis with Snowflake

Image Description
Image Description
Image Description

Snowflake is a cloud-based data warehousing platform that allows you to store, analyze, and process large amounts of data. It provides various features to enable real-time data streaming and processing. In my project, I have utilized Snowflake to load the generated data from your JupyterLab environment into Snowflake databases.

Setting Up Snowflake for Real-Time Data Loading

We need to following steps, to setup the snowflake for real-time data loading. To look at the steps, you can look at following snowflake quickstart.

  • Step 1: Snowflake Account: To get started with Snowflake, you need to have a Snowflake account. If you don't have one, sign up for Snowflake and set up your account.
    • To get started with Snowflake, you need to have a Snowflake account. If you don't have one, sign up for Snowflake and set up your account.
    • Your journey into real-time data processing with Snowflake begins by creating or accessing your Snowflake account. Snowflake offers a cloud-based data warehousing platform that allows you to store, manage, and analyze data with scalability and performance in mind. If you're new to Snowflake, you can sign up for an account to gain access to the platform.
  • Step 2: Database and Schema: In Snowflake, you typically work within a specific database and schema. Ensure you have the appropriate database and schema created to receive the incoming data.
  • In Snowflake, data is organized within databases and schemas. A database acts as a logical container for your data, while a schema is a logical container within a database. You should ensure that you have the appropriate database and schema created to receive the incoming real-time data. This step is crucial for structuring your data in a way that aligns with your analysis requirements.

    
                                    create database if not exists scd_demo;
                                    use database scd_demo;
                                    create schema if not exists scd2;
                                    use schema scd2;
                                    show tables;
    
                                    create or replace table customer (
                                        customer_id number,
                                        first_name varchar,
                                        last_name varchar,
                                        email varchar,
                                        street varchar,
                                        city varchar,
                                        state varchar,
                                        country varchar,
                                        update_timestamp timestamp_ntz default current_timestamp());
    
                                    create or replace table customer_history (
                                        customer_id number,
                                        first_name varchar,
                                        last_name varchar,
                                        email varchar,
                                        street varchar,
                                        city varchar,
                                        state varchar,
                                        country varchar,
                                        start_time timestamp_ntz default current_timestamp(),
                                        end_time timestamp_ntz default current_timestamp(),
                                        is_current boolean
                                        );
                                        
                                    create or replace table customer_raw (
                                        customer_id number,
                                        first_name varchar,
                                        last_name varchar,
                                        email varchar,
                                        street varchar,
                                        city varchar,
                                        state varchar,
                                        country varchar);
                                        
                                    create or replace stream customer_table_changes on table customer;
                                
  • Step 3: Snowpipe: Snowpipe is a feature in Snowflake that automates the ingestion of data from external stages (like S3 buckets). You'll configure Snowpipe to monitor a specific S3 bucket for new data files.
  • Next is to set connection between the s3 bucket and the snowflake:

    
                                    // Creating external stage (create your own bucket)
                                    CREATE OR REPLACE STAGE SCD_DEMO.SCD2.customer_ext_stage
                                        url='s3://[bucket-name]/path-to-data-folder'
                                        credentials=(aws_key_id='provide_key' aws_secret_key='provide_secret_key');
                                    
    
                                    CREATE OR REPLACE FILE FORMAT SCD_DEMO.SCD2.CSV
                                    TYPE = CSV,
                                    FIELD_DELIMITER = ","
                                    SKIP_HEADER = 1;
    
                                    SHOW STAGES;
                                    LIST @customer_ext_stage;
    
    
                                    CREATE OR REPLACE PIPE customer_s3_pipe
                                    auto_ingest = true
                                    AS
                                    COPY INTO customer_raw
                                    FROM @customer_ext_stage
                                    FILE_FORMAT = CSV
                                    ;
    
                                    show pipes;
                                    select SYSTEM$PIPE_STATUS('customer_s3_pipe');
    
                                    SELECT count(*) FROM customer_raw limit 10;
    
                                    TRUNCATE  customer_raw;
                                
  • Step 4: Snowflake Stream: A Snowflake stream is a changelog or transaction log that captures changes made to a table or set of tables. You will need to create a Snowflake stream for the table where you want to load real-time data.
  • A Snowflake stream is a vital component for tracking changes in a table or a set of tables. It acts as a changelog or transaction log that records all data modifications, such as inserts, updates, and deletes. To enable real-time data loading, you'll create a Snowflake stream for the table where you want to continuously ingest and analyze data. This stream captures changes, ensuring that you have access to the latest data for analysis.

    
                                    merge into customer c 
                                    using customer_raw cr
                                    on  c.customer_id = cr.customer_id
                                    when matched and c.customer_id <> cr.customer_id or
                                                    c.first_name  <> cr.first_name  or
                                                    c.last_name   <> cr.last_name   or
                                                    c.email       <> cr.email       or
                                                    c.street      <> cr.street      or
                                                    c.city        <> cr.city        or
                                                    c.state       <> cr.state       or
                                                    c.country     <> cr.country then update
                                        set c.customer_id = cr.customer_id
                                            ,c.first_name  = cr.first_name 
                                            ,c.last_name   = cr.last_name  
                                            ,c.email       = cr.email      
                                            ,c.street      = cr.street     
                                            ,c.city        = cr.city       
                                            ,c.state       = cr.state      
                                            ,c.country     = cr.country  
                                            ,update_timestamp = current_timestamp()
                                    when not matched then insert
                                            (c.customer_id,c.first_name,c.last_name,c.email,c.street,c.city,c.state,c.country)
                                        values (cr.customer_id,cr.first_name,cr.last_name,cr.email,cr.street,cr.city,cr.state,cr.country);
                                        
    
    
                                    CREATE OR REPLACE PROCEDURE pdr_scd_demo()
                                    returns string not null
                                    language javascript
                                    as
                                        $$
                                        var cmd = `
                                                    merge into customer c 
                                                    using customer_raw cr
                                                        on  c.customer_id = cr.customer_id
                                                    when matched and c.customer_id <> cr.customer_id or
                                                                    c.first_name  <> cr.first_name  or
                                                                    c.last_name   <> cr.last_name   or
                                                                    c.email       <> cr.email       or
                                                                    c.street      <> cr.street      or
                                                                    c.city        <> cr.city        or
                                                                    c.state       <> cr.state       or
                                                                    c.country     <> cr.country then update
                                                        set c.customer_id = cr.customer_id
                                                            ,c.first_name  = cr.first_name 
                                                            ,c.last_name   = cr.last_name  
                                                            ,c.email       = cr.email      
                                                            ,c.street      = cr.street     
                                                            ,c.city        = cr.city       
                                                            ,c.state       = cr.state      
                                                            ,c.country     = cr.country  
                                                            ,update_timestamp = current_timestamp()
                                                    when not matched then insert
                                                                (c.customer_id,c.first_name,c.last_name,c.email,c.street,c.city,c.state,c.country)
                                                        values (cr.customer_id,cr.first_name,cr.last_name,cr.email,cr.street,cr.city,cr.state,cr.country);
                                        `
                                        var cmd1 = "truncate table SCD_DEMO.SCD2.customer_raw;"
                                        var sql = snowflake.createStatement({sqlText: cmd});
                                        var sql1 = snowflake.createStatement({sqlText: cmd1});
                                        var result = sql.execute();
                                        var result1 = sql1.execute();
                                        return cmd+'\n'+cmd1;
                                        $$;
                                    call pdr_scd_demo();
    
    
                                    --Set up TASKADMIN role
                                    use role securityadmin;
                                    create or replace role taskadmin;
                                    -- Set the active role to ACCOUNTADMIN before granting the EXECUTE TASK privilege to TASKADMIN
                                    use role accountadmin;
                                    grant execute task on account to role taskadmin;
    
                                    -- Set the active role to SECURITYADMIN to show that this role can grant a role to another role 
                                    use role securityadmin;
                                    grant role taskadmin to role sysadmin;
    
                                    create or replace task tsk_scd_raw warehouse = COMPUTE_WH schedule = '1 minute'
                                    ERROR_ON_NONDETERMINISTIC_MERGE=FALSE
                                    as
                                    call pdr_scd_demo();
                                    show tasks;
                                    alter task tsk_scd_raw suspend;--resume --suspend
                                    show tasks;
    
                                    select timestampdiff(second, current_timestamp, scheduled_time) as next_run, scheduled_time, current_timestamp, name, state 
                                    from table(information_schema.task_history()) where state = 'SCHEDULED' order by completed_time desc;
    
    
                                    select * from customer where customer_id=0;
                                
  • Step 5: Snowflake Task: A Snowflake task is used to execute a series of SQL statements, including COPY INTO statements, when certain conditions are met. You will create a Snowflake task that uses the Snowpipe to load data from the S3 bucket into Snowflake whenever new data arrives.
  • Step 6: S3 Integration: Make sure your Snowflake account is integrated with your S3 bucket. This integration allows Snowflake to access data stored in your S3 bucket.
Go back