Building Sustainable Data Pipelines
REA started building a data warehouse using Microsoft SQL Server around 2008. It has been powering our listing campaign report on consumer interaction for more than 10 years. Maintaining a large and complex data warehouse proved to be challenging, yet it was also a valuable learning ground.
This blog summarises issues we’ve encountered and suggests a few best practices to build data pipelines and to manage data warehouses more sustainably.
Techniques in Building Data pipelines
Firstly, let us define some core concepts.
What is a data pipeline? A data pipeline is a software that runs either continuously (stream processing) or at a scheduled time (batch processing) to move and process data from its source to a destination. Its function is similar to an ETL (Extract, Transform, Load) process; however, a data pipeline can contain many steps, where each step can be a separate ETL flow. As a result, there could be multiple ETLs in a data pipeline. Using data pipelines helps automate data processing jobs and make them more repeatable without human interventions.
The following sections describe how we apply software development principles to build data pipelines.
Version controlled and auto deployment
We treat data pipelines similar to any software systems in REA. That means they are version controlled, there are rollback strategies in place and have a build pipeline to deploy to the staging and production environments. This is a critical first step to enable team members to confidently make changes knowing they can revert to the previous version safely.
One of the dreaded tasks of being a Data Engineer is having to do data fixes, which are expensive and error prone. We must carefully pick which rows to delete and rerun. These two steps can span many hours and can introduce potential issues, especially when we are dealing with large amounts of data. We can manage this complexity by implementing idempotent data pipelines.
“Idempotence” is a well-known concept in the Functional Programming paradigm, where a function produces the same output even when it is called multiple times. Similarly, “idempotent” data pipelines produce the same output when their inputs and processing logic have not changed. The article Functional Data Engineering explained this concept in more detail.
Overwriting the output destinations is key to building idempotent data pipelines and it must be applicable to database tables, S3 buckets, document stores or any destination as a matter of fact.
We applied this principle in building the campaign data pipeline processing total consumer interaction metrics on each listing on the REA website. To calculate a running total, we could retrieve the previous calculations and add today’s numbers on top. Though this approach sounded simple and efficient, the processing logic had to account for the late arrival of data. We knew that our mobile application, due to the nature of offline event collection, was often late by a day or two. To account for these late-arriving data points, we needed to keep track of what has been changed and invalidated previous calculations. An easier approach was to recalculate on every run instead of using previous outputs. As we only needed to calculate consumer interactions for currently active listings on the REA website, our data volume was reasonably constant, and this approach was feasible.
By recalculating and overwriting the output on every run, we did not need to store external states in the form of previous calculations. We also gained advantages in these scenarios:
- Ability to stop data pipelines halfway through to fix any issues and rerun them without side effects
Quick turnaround time to support adding new business metrics
- When overwriting the entire database table is not possible, we consider using the table’s daily partitions and only overwrite those portions. We apply this approach in our new data pipelines writing to BigQuery tables.
Modularity – breaking up large SQL files
When all the data is in a table format, and the data warehouse has a lot of processing power, SQL becomes a compelling data transformation tool. In the past, we had a lot of processing logic in SQL files. They grew organically and became very large and complex with up to more than a thousand lines of code in a single file. Large SQL files were unreadable and unmaintainable, especially when we were not the original author. Approaching this problem with a modular mindset, we broke SQL transformation logic into user-defined-functions (UDFs), intermediate tables, and views to encapsulate business logic into more manageable chunks.
Another benefit of modularity is reusability. Once we have smaller components, we can reuse them in the related data processing and save on development costs.
Resiliency – dealing with bad records: close enough data vs. no data at all
Bad records always exist. We often see a batch job failing to process millions of rows due to a single bad one. This failure could be desirable according to the business rule and the strict usage requirement of the output. However, we need to ask and validate this assumption. It is more maintainable to build resilient data pipelines that can tolerate some errors and to have the ability to replay bad records after fixing them. To achieve this, we route the error rows into a separate table and set up monitoring so that we can fix the data. Depending on the urgency of the data, we either build a small step to process this corrected data or feed it to the data pipeline as input on the next run.
Test driven – data quality
Similar to a software system, the quality of the data output by our pipelines is very crucial. To make sure our SQL transformation logic is correct, we have developed a SQL unit testing process as part of our in–house ETL framework. In each continuous integration build, we run SQL transformations against the seeded data and assert that the outputs are identical to the expected ones.
When there is source data to compare to, we perform reconciliation checks of our output data against the source. In the case of Adobe Analytics, we query Adobe for the total of event count received and compare it to our totals.
When there is no reconciliation source, we perform analysis to capture min, max, average, and trend lines of our data. We use Great Expectations to automatically validate our data outputs against a set of predefined expectations: uniqueness, not null, and min/max range.
Traceability – data lineage
Data Lineage describes the data flow from its source to the transformation steps and the output. Having a data lineage tool is analogous to having an IDE for code. We can find a variable’s usage across a project using an IDE, and we can confidently rename or delete the variable.
Data lineage has many benefits: helps with governance, compliance, quality, and business impact analysis. One of the use cases is when two dashboards show different figures, we can efficiently track down the source of the issue using a data lineage tool. Another use case is data decommissioning; we can confidently delete unused data assets after checking their usage.
While there are commercial tools for data lineage, support for BigQuery is still limited. We are extracting BigQuery usage logs to build our own data lineage using the Neo4j Graph database.
Managing the Data warehouse
While building data pipelines are necessary, it is just half of the picture. Ultimately data is stored in the data warehouse, which we also need to manage effectively.
Simplification – Star schema vs. Denormalised tables
In the Star schema approach, we organise a data warehouse in fact tables having foreign key linked to other dimension tables. Fact tables’ rows do not have all the information by themselves; we must join with the other dimension tables to get the values.
Organising data in Star schema has certain advantages over the denormalised version: data integrity, efficiency in data update, and disk space savings. However, data processing jobs are more complex, including normalising data into dimension tables and using table joins to read the data.
Denormalisation is the opposite of the Star schema approach, where we store data in a flat structure. In our raw tables in BigQuery, we store data as they arrive without much processing. In the later transformation steps, we apply business logic and enrich data rather than normalise it into dimensions. By using a denormalised data model, we simplify our processing and retain the data history at the time of ingestion. The downside is an increase in storage cost, which is small in BigQuery.
We are not suggesting denormalising and flattening all data. We need to design the data model to support the business use cases adequately.
Untangling – avoiding having all tables in one spaghetti pot
We built our data warehouse using a single schema containing all tables and views. While we simplified the access control and were able to join any tables, we had a ‘one big spaghetti pot’ problem. When we decommissioned a table, we had to check all our processing steps, stored procedures, and views to make sure we were not using that table that was decommissioned.
A better approach is to design tables into their domains (aka. Data Marts). In this design, we can separate data marts and data pipelines into parallel streams rather than cross-referenced. When we no longer use a data mart, we can safely delete the entire data mart and its processing jobs without affecting anything else.
Pruning dead branches – decommission when no longer used
As business requirements change, there are some tables and data processing jobs that are no longer needed. Often people do not tell you this as they have moved to a different division or are no longer with the company. We need to keep track of the usage log for every table in the data warehouse to help with decommissioning efforts. Frequent cleaning saves processing power, storage space, shortens the duration of batch jobs, and most importantly, it saves cognitive load for people maintaining the data warehouse.
BigQuery has a built-in usage log. We have built a dashboard to visualise this data and to keep track of low usage tables for decommissioning purposes. By using the data lineage technique mentioned in the above section, we can safely and confidently delete unused tables.
Data engineering is a challenging field due to the sheer volume of the data involved, the myriad of technology options, vendors and wild claims of silver bullet solutions. We believe that data pipelines and data warehouses should be built and run with the same high level of software engineering practices that we apply to any other software system at REA – modular, testable, safe to deploy and easy to rollback and fun to work on. We have been using the techniques outlined in this blog post to achieve that goal.
Linh is a Lead Developer working on building data ingestion pipelines and on publishing data for internal REA usage via data APIs. He loves exploring data flows and interdependencies between systems to improve their design.View profile for Linh Pham
Position: Senior Developer - UIHours: Full time Duration: Permanent
Position: Android DeveloperHours: Full time Duration: Permanent
Position: iOS DeveloperHours: Full time Duration: Permanent
Position: Senior Developer - DataHours: Full time Duration: Permanent
Position: Senior Mobile DeveloperHours: Full time Duration: Permanent
Position: UI DeveloperHours: Full time Duration: Permanent