
What is an Idempotent Pipeline?
An idempotent pipeline produces the same results :
- Regardless of the day you run it
- Regardless of how many times you run it
- Regardless of the hour that you run it
This means that both production runs and backfills yield the same results, ensuring consistency and reliability.
🔄Characteristics of Idempotent Pipelines:
- Consistency: Produces the same results regardless of the day, time, or number of times it is run.
- Reliability: Ensures data integrity and consistency, making troubleshooting easier.
❌ The pains of not having idempotent pipelines
- Inconsistencies: Backfilling can cause discrepancies between old and restated data.
- Troubleshooting: Bugs are harder to identify and fix.
- Testing: Unit tests may not replicate production behavior.
- Silent failures: Issues may go unnoticed until they cause significant problems.
What can make a pipeline not idempotent?
1. INSERT INTO without TRUNCATE
Using INSERT INTO
without truncating the table first can lead to duplicate data.
INSERT INTO users (user_id, name, email) VALUES (1, 'Ram', 'ram@example.com'), (2, 'Sita', 'sita@example.com');
Output (Run Twice):
user_id | name | |
---|---|---|
1 | Ram | ram@example.com |
2 | Sita | sita@example.com |
1 | Ram | ram@example.com |
2 | Sita | sita@example.com |
Use MERGE
or INSERT OVERWRITE
to ensure idempotency.
INSERT OVERWRITE TABLE users SELECT * FROM (VALUES (1, 'Ram', 'ram@example.com'), (2, 'Sita', 'sita@example.com') ) AS new_data(user_id, name, email);
Output (Run Twice):
user_id | name | |
---|---|---|
1 | Ram | ram@example.com |
2 | Sita | sita@example.com |
2. Using StartDate without a corresponding EndDate
Using a start date without an end date can lead to non-idempotent results.
SELECT * FROM sales WHERE sale_date > '2025-01-10';
Output before making it idempotent (run on different days):
sale_id | sale_date | amount |
---|---|---|
1 | 2025-01-11 | 100 |
2 | 2025-01-11 | 150 |
3 | 2025-01-12 | 200 |
Always include an end date in the pipeline window to avoid this issue.
SELECT * FROM sales WHERE sale_date > '2025-01-10' AND sale_date < '2025-01-12';
Output after making it idempotent (run on different days):
sale_id | sale_date | amount |
---|---|---|
1 | 2025-01-11 | 100 |
2 | 2025-01-11 | 150 |
3. Not using a full set of partition sensors
Pipeline might run when there is no/partial date - i.e. the pipelines may run too early (when not all the inputs are ready), and when you backfill later in the production then it does not produce the same result.
Issue: Missing Partition Sensor
Some sales data is missing because the partition was not registered.
Sales Table:
sale_date | product | amount |
---|---|---|
2025-02-01 | Laptop | 1500 |
2025-02-02 | Phone | 800 ❌ (Missing in output) |
2025-02-03 | Tablet | 600 |
Partition Sensors Table (Incorrect):
sale_date |
---|
2025-02-01 |
2025-02-03 |
Incorrect Query:
SELECT sale_date, SUM(amount) AS total_sales FROM sales WHERE sale_date IN (SELECT sale_date FROM partition_sensors) GROUP BY sale_date;
Incorrect Output:
sale_date | total_sales |
---|---|
2025-02-01 | 1500 |
2025-02-03 | 600 |
Solution: Register Missing Partition
Add missing partitions before running the query.
INSERT INTO partition_sensors (sale_date) SELECT DISTINCT sale_date FROM sales WHERE sale_date NOT IN (SELECT sale_date FROM partition_sensors);
Partition Sensors Table (Fixed):
sale_date |
---|
2025-02-01 |
2025-02-02 âś… |
2025-02-03 |
Correct Query:
SELECT sale_date, SUM(amount) AS total_sales FROM sales GROUP BY sale_date;
Correct Output:
sale_date | total_sales |
---|---|
2025-02-01 | 1500 |
2025-02-02 | 800 âś… (Now included) |
2025-02-03 | 600 |
4. Not using depends_on_past for cumulative pipelines
Problem: Without depends_on_past
, the pipeline reprocesses all data instead of only new data.
Example: A daily sales summary table recalculates totals for all dates instead of just the latest.
Incorrect Approach:
INSERT INTO sales_summary (date, total_sales) SELECT date, SUM(amount) FROM sales GROUP BY date;
Issue: Each run duplicates calculations.
date | total_sales |
---|---|
2025-02-01 | 100 |
2025-02-02 | 200 |
2025-02-03 | 450 (Incorrect!) |
Root Cause: The pipeline does not track previously processed data and runs from scratch each time.
Solution: Use depends_on_past
to ensure only new data is processed.
Correct Approach:
INSERT INTO sales_summary (date, total_sales) SELECT date, SUM(amount) FROM sales WHERE date > (SELECT MAX(date) FROM sales_summary) GROUP BY date;
Correct Output:
date | total_sales |
---|---|
2025-02-01 | 100 |
2025-02-02 | 200 |
2025-02-03 | 150 |
Why This Works:
- Ensures only new records are processed.
- Prevents recalculating already processed data.
- Keeps cumulative totals accurate.
Final Note: In Apache Airflow, using depends_on_past=True
in DAG tasks ensures a task only runs if the previous execution was successful.
5. Relying on the “latest” partition of a not properly modeled table
Problem: Using the latest partition without proper modeling can lead to data inconsistencies.
Example: A sales report depends on the latest partition but does not handle late-arriving data.
Incorrect Approach:
SELECT * FROM sales WHERE partition_date = (SELECT MAX(partition_date) FROM sales);
Issue: Late-arriving data is ignored.
partition_date | sales_amount |
---|---|
2025-02-03 | 500 |
2025-02-02 (Late Data Ignored!) | 300 |
Root Cause: The query assumes the latest partition has all the data, but late-arriving records are excluded.
Solution: Use a proper data model that handles late-arriving data.
Correct Approach:
SELECT * FROM sales WHERE partition_date >= DATE_SUB((SELECT MAX(partition_date) FROM sales), INTERVAL 1 DAY);
Correct Output:
partition_date | sales_amount |
---|---|
2025-02-02 | 300 |
2025-02-03 | 500 |
Why This Works:
- Includes late-arriving data from previous partitions.
- Prevents missing important records.
- Ensures accuracy in cumulative reports.
Final Note: Proper partition handling ensures reliable reporting without data inconsistencies.
6. Relying on the “Latest” partition of anything else
Problem: Using the latest partition without proper checks can lead to missing or inconsistent data.
Example: A log table is queried based on the latest partition, assuming it contains complete data.
Incorrect Approach:
SELECT * FROM logs WHERE partition_date = (SELECT MAX(partition_date) FROM logs);
Issue: Some logs arrive late and are missed in reports.
partition_date | log_entry |
---|---|
2025-02-03 | System started |
2025-02-02 (Missed!) | Error detected |
Solution: Consider a fallback mechanism to ensure all logs are included.
Correct Approach:
SELECT * FROM logs WHERE partition_date >= DATE_SUB((SELECT MAX(partition_date) FROM logs), INTERVAL 1 DAY);
Correct Output:
partition_date | log_entry |
---|---|
2025-02-02 | Error detected |
2025-02-03 | System started |
Why is troubleshooting non-idempotent pipelines hard?
- Silent failures: Issues may not be immediately apparent.
- Data inconsistencies: Only noticeable when data discrepancies arise, often leading to frustration for data analysts.
References
After watching Zach's insightful video, I was inspired to develop this blog in a visual way.
Discover more from Data Engineer Journey
Subscribe to get the latest posts sent to your email.