Harnessing CDC for Healthcare Data Management in Snowflake
Written on
This document provides a technical overview of automating data updates and change tracking using Snowflake’s Change Data Capture (CDC) features. Although the examples focus on healthcare data, the principles can be applied to various scenarios.
Introduction
In this article, we will explore the implementation of Change Data Capture (CDC) through Snowflake’s streaming features. Specifically, we will utilize the management of healthcare patient records to demonstrate updating existing records, creating a change log table, and automating the entire process via Snowflake’s task functionality.
In the healthcare sector, maintaining up-to-date information is crucial. With Snowflake’s CDC capabilities, we can automatically log every alteration made to data in standard tables. This involves configuring stream objects, which are Snowflake features designed to capture any changes within the data.
We will detail the setup of these streams, their role in capturing data changes, and their utilization of offset storage to accurately track modifications. The article will also describe how these streams facilitate regular data processing and updating through a task we will create called the apply_healthcare_changes task. This task takes the logged changes and updates a history table named healthcare_logs_history, ensuring that all records reflect the latest information.
By the conclusion of this article, you will understand how to effectively use CDC in Snowflake for managing healthcare data, ensuring continuous updates and accuracy.
What Is Change Data Capture?
In the CDC process, stream objects are created to log alterations made to source tables, including updates, deletions, and insertions. These stream objects act as real-time logs of all data modifications.
The following example code illustrates the syntax for creating a stream object:
create or replace stream my_stream on table my_table;
Every change made to the my_table will be recorded in my_table with a timestamp, serving as a checkpoint for all processed and acknowledged changes. This mechanism is known as offset storage, as illustrated in the following diagram.
Each step in the diagram will be explained in subsequent sections, along with the objects we will utilize later in the process.
Source Table Insert/Update/Delete (healthcare_logs)
The initial point for data updates is the source table, which we will establish later in this demonstration and which holds logs for a healthcare system. You can think of it as a central log in a hospital where patient information is regularly updated. Whenever new test results come in, medications are modified, or any details are added, removed, or altered, it initiates a specific set of procedures to ensure everything is accurately documented.
Creating Snowflake Stream Objects and Capturing Data Changes (healthcare_logs_stream)
Snowflake’s stream objects function like digital recorders, consistently monitoring source data for updates. They document every addition, modification, or deletion of information, maintaining a complete history. In a hospital environment, this could involve capturing new diagnoses, changes to medication plans, or updates to patient details.
Offset Storage in Stream Creation
The healthcare_logs_stream closely monitors the data, recording every update in real-time. It assigns an offset—a unique marker along with a timestamp—to each change. This offset guarantees that no update is processed more than once and helps the healthcare_logs_stream remember its last position. This is vital for ensuring data accuracy and maintaining the correct sequence throughout the entire process.
Checkpoints in Stream Processing
The healthcare_logs_stream employs checkpoints to ensure that data processing can resume from the last known working point. This prevents the need to reprocess everything from the beginning. For example, if a processing task encounters an error after some changes, the system can revert to the most recent checkpoint and continue from there. This mechanism guarantees that data remains consistent and reliable throughout the process.
CDC Data Pipeline Flow
The diagram below illustrates a CDC Data Pipeline Flow in a hypothetical healthcare setting, where data activities such as inserts, updates, and deletions are directed to the healthcare_logs table. A stream object, healthcare_logs_stream, is then created to capture and log these changes along with their associated metadata. The apply_healthcare_changes task processes these collected changes and implements the updates.
In the following sections, we will delve deeper into the components involved in these steps.
Inserts/Updates/Deletes:
The process initiates with data modifications occurring in the Source table, which includes the insertion, updating, and deletion of data entries. This is generally where changes to patient records or healthcare logs commence within the system.
Create Stream (healthcare_logs_stream):
After data changes occur in the source table, they must be captured in real-time. This is achieved through the “Create Stream” step, which establishes the healthcare_logs_stream. This stream is specifically designed to observe and log all changes (both data and metadata) occurring in the source table.
Healthcare_logs_stream (Capture Data Changes):
Within the healthcare_logs_stream, every alteration to the source table—whether an insertion, update, or deletion—is recorded. This stream acts as a continuous feed, capturing each change along with relevant metadata (such as timestamps and the nature of the change). This phase is crucial for ensuring that no modification is missed.
Apply_healthcare_changes Task:
Once the stream has recorded the changes, the next step is to process them. The apply_healthcare_changes task automates the integration of these changes into another dataset, specifically the healthcare_logs_current table. This task processes and incorporates the captured data changes into this historical data table, ensuring that the information is up-to-date and accurately reflects all modifications.
Transform, Analysis, Monitor and Analytics:
After the data has been applied and stored in the healthcare_logs_current table, it may undergo additional transformations (if necessary) to prepare it for analysis. This could involve SQL queries or stored procedures to format or clean the data for effective examination. Following transformation, the data is utilized for monitoring and analytics, providing insights and decision support based on the most current information.
CDC Practical Implementation
In this section, we will outline a practical, step-by-step implementation of CDC in Snowflake, beginning with the creation of databases, tables, and initial data insertion. We will then proceed to set up streams and change tables to capture data modifications, followed by automating the ingestion of updates into the healthcare_logs_current table using Snowflake tasks. Finally, we will discuss the processes involved in updating and verifying the data to ensure its accuracy and reliability for downstream analysis.
Creating Database, Tables, and Inserting Data
At this stage, we will establish a healthcare database in Snowflake. Begin by creating a new database:
After the database is created, you will notice that two schemas—PUBLIC and INFORMATION_SCHEMA—are already present. The INFORMATION_SCHEMA contains views that provide information about the database.
As per standard practice in Snowflake, you will create your views, tables, tasks, and streams within the public schema.
Following the database creation, the following columns will be added to the healthcare log table. This table acts as a source table in addition to storing patient data. The SQL code is displayed below.
After the table is created, we will use the SQL code below to insert data. Note that this is sample data and not actual healthcare data. The healthcare log table now contains 1,000 records that we have added.
Executing this should yield the following result:
Creating Stream and Change Table
During this phase, we will establish a stream on the healthcare_logs table to ensure that any changes to patient journeys are reflected in the healthcare_logs_stream. We will also create a table named healthcare_logs_current to store these modifications.
Create this stream using the following code:
The healthcare_logs_current table will subsequently be created to capture changes made to the source table. Note that we will utilize Snowflake’s task functionality to generate the change log table.
Consequently, we will not be manually entering any data, as an automated process will handle that, which we will elaborate on later in this document.
Automation of Healthcare_Logs_Current Table Ingestion via Task
In a real-world electronic health record scenario, constant changes in patient journeys—such as medical tests, prescriptions, and medication changes—are common. Therefore, it becomes tedious for data engineers and administrators to manually track all these updates. To address this, Snowflake has introduced "Task," a user-friendly automated feature.
There are various tasks that can be employed in Snowflake; some of them include:
- Event Driven
- Time Driven
In our healthcare scenario, we will implement Snowflake’s time-triggered tasks to manage ongoing updates in patient journeys. These tasks will ensure regular updates reflecting the latest patient interactions and treatment records. They will load change data and relevant metadata into the healthcare_logs_current table for a comprehensive overview after capturing DML updates from the source table via a Snowflake stream. For more information on this implementation, see the code provided below.
After establishing the task, the next step is to activate it. Snowflake tasks are initially inactive upon creation and must be explicitly resumed to become operational. You can do this by executing the following script:
-- Start the tasks ALTER TASK apply_healthcare_changes RESUME;
Data Update and Verification
First, let’s inspect our healthcare_logs_current table before any updates are made. You will observe that currently, this table contains no records.
Now, let’s proceed to update the patient records using the SQL code below. In this update, we have changed the patient_event to ‘Checkup’ for the first record in the healthcare_logs table. One row is updated due to the LIMIT clause in the subquery.
UPDATE healthcare_logs SET patient_event = ‘Checkup’ WHERE patient_id = (Select patient_id FROM healthcare_logs LIMIT 1);
As previously mentioned, any updates made to the source table will be reflected in the change log table through the task. Next, to verify the successful automation, you should navigate to ‘task run history’ and view the ‘show query profile’ to monitor the task.
When you click on ‘show query profile,’ you will see that one update has been recorded in the healthcare_logs_current table.
Afterwards, we’ll check our change log table using the query below and examine the output.
As demonstrated, the update made to the patient event in the healthcare logs table is now reflected in the healthcare_logs_current table.
Conclusion
In this article, we outlined a comprehensive approach to implementing Change Data Capture (CDC) in Snowflake for managing healthcare data. This process regularly collects and logs patient records, ensuring data integrity and facilitating timely changes.
We also discussed setting up and utilizing Snowflake’s features for healthcare data management. We started with the creation of a database and establishing tables like the healthcare_logs table to monitor patient information. We then demonstrated how to automatically capture changes to this data using Snowflake’s streaming feature alongside the healthcare_logs_stream.
Additionally, we emphasized the importance of tasks in Snowflake, such as the apply_healthcare_changes task, which automates updates to the healthcare_logs_current table with new data from the healthcare_logs table. This ensures that patient records remain current without manual intervention.
Finally, we validated our work by updating records and reviewing these changes in Snowflake to confirm that everything was functioning correctly. This process illustrates that Snowflake can effectively manage patient data, providing healthcare providers with timely and accurate information.
By following these steps, healthcare organizations can enhance their management of patient data, leading to improved care and more efficient operations.