Automating Real-Time Data Ingestion and Analysis with Snowflake
Snowflake is a cloud-based data warehousing platform that allows you to store, analyze, and process large amounts of data. It provides various features to enable real-time data streaming and processing. In my project, I have utilized Snowflake to load the generated data from your JupyterLab environment into Snowflake databases.
Setting Up Snowflake for Real-Time Data Loading
We need to following steps, to setup the snowflake for real-time data loading. To look at the steps, you can look at following snowflake quickstart.
- Step 1: Snowflake Account: To get started with Snowflake, you need to have a Snowflake account. If you don't have one, sign up for Snowflake and set up your account.
- To get started with Snowflake, you need to have a Snowflake account. If you don't have one, sign up for Snowflake and set up your account.
- Your journey into real-time data processing with Snowflake begins by creating or accessing your Snowflake account. Snowflake offers a cloud-based data warehousing platform that allows you to store, manage, and analyze data with scalability and performance in mind. If you're new to Snowflake, you can sign up for an account to gain access to the platform.
- Step 2: Database and Schema: In Snowflake, you typically work within a specific database and schema. Ensure you have the appropriate database and schema created to receive the incoming data.
In Snowflake, data is organized within databases and schemas. A database acts as a logical container for your data, while a schema is a logical container within a database. You should ensure that you have the appropriate database and schema created to receive the incoming real-time data. This step is crucial for structuring your data in a way that aligns with your analysis requirements.
create database if not exists scd_demo;
use database scd_demo;
create schema if not exists scd2;
use schema scd2;
show tables;
create or replace table customer (
customer_id number,
first_name varchar,
last_name varchar,
email varchar,
street varchar,
city varchar,
state varchar,
country varchar,
update_timestamp timestamp_ntz default current_timestamp());
create or replace table customer_history (
customer_id number,
first_name varchar,
last_name varchar,
email varchar,
street varchar,
city varchar,
state varchar,
country varchar,
start_time timestamp_ntz default current_timestamp(),
end_time timestamp_ntz default current_timestamp(),
is_current boolean
);
create or replace table customer_raw (
customer_id number,
first_name varchar,
last_name varchar,
email varchar,
street varchar,
city varchar,
state varchar,
country varchar);
create or replace stream customer_table_changes on table customer;
Next is to set connection between the s3 bucket and the snowflake:
// Creating external stage (create your own bucket)
CREATE OR REPLACE STAGE SCD_DEMO.SCD2.customer_ext_stage
url='s3://[bucket-name]/path-to-data-folder'
credentials=(aws_key_id='provide_key' aws_secret_key='provide_secret_key');
CREATE OR REPLACE FILE FORMAT SCD_DEMO.SCD2.CSV
TYPE = CSV,
FIELD_DELIMITER = ","
SKIP_HEADER = 1;
SHOW STAGES;
LIST @customer_ext_stage;
CREATE OR REPLACE PIPE customer_s3_pipe
auto_ingest = true
AS
COPY INTO customer_raw
FROM @customer_ext_stage
FILE_FORMAT = CSV
;
show pipes;
select SYSTEM$PIPE_STATUS('customer_s3_pipe');
SELECT count(*) FROM customer_raw limit 10;
TRUNCATE customer_raw;
A Snowflake stream is a vital component for tracking changes in a table or a set of tables. It acts as a changelog or transaction log that records all data modifications, such as inserts, updates, and deletes. To enable real-time data loading, you'll create a Snowflake stream for the table where you want to continuously ingest and analyze data. This stream captures changes, ensuring that you have access to the latest data for analysis.
merge into customer c
using customer_raw cr
on c.customer_id = cr.customer_id
when matched and c.customer_id <> cr.customer_id or
c.first_name <> cr.first_name or
c.last_name <> cr.last_name or
c.email <> cr.email or
c.street <> cr.street or
c.city <> cr.city or
c.state <> cr.state or
c.country <> cr.country then update
set c.customer_id = cr.customer_id
,c.first_name = cr.first_name
,c.last_name = cr.last_name
,c.email = cr.email
,c.street = cr.street
,c.city = cr.city
,c.state = cr.state
,c.country = cr.country
,update_timestamp = current_timestamp()
when not matched then insert
(c.customer_id,c.first_name,c.last_name,c.email,c.street,c.city,c.state,c.country)
values (cr.customer_id,cr.first_name,cr.last_name,cr.email,cr.street,cr.city,cr.state,cr.country);
CREATE OR REPLACE PROCEDURE pdr_scd_demo()
returns string not null
language javascript
as
$$
var cmd = `
merge into customer c
using customer_raw cr
on c.customer_id = cr.customer_id
when matched and c.customer_id <> cr.customer_id or
c.first_name <> cr.first_name or
c.last_name <> cr.last_name or
c.email <> cr.email or
c.street <> cr.street or
c.city <> cr.city or
c.state <> cr.state or
c.country <> cr.country then update
set c.customer_id = cr.customer_id
,c.first_name = cr.first_name
,c.last_name = cr.last_name
,c.email = cr.email
,c.street = cr.street
,c.city = cr.city
,c.state = cr.state
,c.country = cr.country
,update_timestamp = current_timestamp()
when not matched then insert
(c.customer_id,c.first_name,c.last_name,c.email,c.street,c.city,c.state,c.country)
values (cr.customer_id,cr.first_name,cr.last_name,cr.email,cr.street,cr.city,cr.state,cr.country);
`
var cmd1 = "truncate table SCD_DEMO.SCD2.customer_raw;"
var sql = snowflake.createStatement({sqlText: cmd});
var sql1 = snowflake.createStatement({sqlText: cmd1});
var result = sql.execute();
var result1 = sql1.execute();
return cmd+'\n'+cmd1;
$$;
call pdr_scd_demo();
--Set up TASKADMIN role
use role securityadmin;
create or replace role taskadmin;
-- Set the active role to ACCOUNTADMIN before granting the EXECUTE TASK privilege to TASKADMIN
use role accountadmin;
grant execute task on account to role taskadmin;
-- Set the active role to SECURITYADMIN to show that this role can grant a role to another role
use role securityadmin;
grant role taskadmin to role sysadmin;
create or replace task tsk_scd_raw warehouse = COMPUTE_WH schedule = '1 minute'
ERROR_ON_NONDETERMINISTIC_MERGE=FALSE
as
call pdr_scd_demo();
show tasks;
alter task tsk_scd_raw suspend;--resume --suspend
show tasks;
select timestampdiff(second, current_timestamp, scheduled_time) as next_run, scheduled_time, current_timestamp, name, state
from table(information_schema.task_history()) where state = 'SCHEDULED' order by completed_time desc;
select * from customer where customer_id=0;