I presented a paper at SAS Global Forum - Orchestrating analytical workloads on Kubernetes powered by GKE on GCP. Link to my paper: SAS on Kubernetes and my presentation is: https://www.slideshare.net/SumanthYamala/sas-on-kubernetes
At Core Compete we enable analytic workloads on containers. SAS on Kubernetes and SAS on Containers articulates our approach.
Cloud Data Warehouse
Describes cloud data warehousing. Processes involved in building a cloud data warehousing - data extraction, data validation, building data pipelines, orchestration engines, monitoring of data pipelines.
Saturday, May 4, 2019
Saturday, January 19, 2019
Idempotency in data pipelines
Any data pipeline will experience a failure and there is no exception. It is how a data pipeline handles these failures and gracefully recovers, differentiates a well designed data pipeline from one that is not. Many applications in the cloud need to be designed to behave in a predictable way. Idempotency in data pipelines will be the focus of this blog.
Idempotence according to Wikipedia is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application.
Data pipelines are a sequence of steps to transform and in some cases migrate data. There are applications in the cloud marketplace that help us create these data pipelines - Airflow, AWS Glue, GCP Data Flow, Azure data factory to name a few. Data pipelines involved integration across disparate systems, could involve processing engines like EMR or DataProc, with the purpose to transform and move data. These data pipelines have multiple points of failure and It becomes imperative to plan for failures to ensure data integrity is maintained.
The options to commit/rollback on the entire pipeline never exists. Almost all big data systems do not support multi-statement transactions. Ideal way to handle the recovery from failures in a data pipeline is by enforcing idempotent behavior, i.e. the pipeline can be called with the same set of parameters any number of times, and the output will still be the same. Many of the data orchestration applications that run the pipelines, nudge us to design in an idempotent way, but it is not guaranteed. It needs to be captured by the developer designing these data pipelines.
Points of Failures:
- The pipeline failed before it started
- It failed half way
- The pipeline completed but it failed to return a success code
A data pipeline's design needs to accommodate these failures and handle them gracefully and should self-heal. Having a manual intervention to solve these problems increases the cost to support the application and not sustainable. In many cases asking the question of what if helps us think through the problem.
We will go over two scenarios. Scenario 1 highlights issues even when the flow is idempotent. Scenario 2 identifies how a minor variation in design can make the design idempotent.
Scenario 1:
Lets take a simple pipeline which looks like the following
- Restful data API which extract data from third party data source (daily frequency)
- Insert data from REST API into Cloud Storage (GCS)
- Read data from GCS and apply spark transforms
- load transformed data to Big Query (BQ)
There are multiple approaches to ensure the design of a pipeline is idempotent. One needs to consider processing time, volume of data and most importantly the properties of the underlying stroage sub-system.
Approach 1:
1) GET data from API
2) Put data on GCS with a naming convention gs://inbound_data/data_<date>.csv
3) Truncate partition and load data into BQ partition
In this approach when all things work as planned, there are no issues. What if step 3 fails after truncate? the systems mostly a Tableau/Looker querying BQ will not have any data until we re-run the entire flow. The sequence of operations matters - remember there is no transaction across statements.
1) GET data from API
2) Put data on GCS with a naming convention gs://inbound_data/data_<date>.csv
3) Truncate partition and load data into BQ partition
In this approach when all things work as planned, there are no issues. What if step 3 fails after truncate? the systems mostly a Tableau/Looker querying BQ will not have any data until we re-run the entire flow. The sequence of operations matters - remember there is no transaction across statements.
Approach 2:
1) GET data from API
2) Put data on GCS with a naming convention gs://inbound/data_<date>.csv
3) Apply spark transforms on input gs://inbound/data_<date>.csv
4) Overwrite data of the partition in Big Query
Both the approaches are idempotent, but approach 1 could lead us to a situation where one application is down until we re-run the entire flow again to fix the data.
Both the approaches are idempotent, but approach 1 could lead us to a situation where one application is down until we re-run the entire flow again to fix the data.
REST API idempotency:
Behavior of third party REST API's vary. One needs to understand if the REST API's are implemented in an idempotent way (not the case in most scenarios). In most cases one needs to study the API and identify the API parameters that governs the data being provided by the API. Test and validate. No silver bullet here.
GCS write idempotency:
As you can see, we use the characteristics of object storage to our advantage, the way it treats an object writes as transactional,by coming up with a naming strategy. The step of writing the file, will merely overwrite the existing file, but not creating a new file.
Spark transforms idempotency:
Spark transforms produces a predictable and deterministic output, provided the input is the same. GCS idempotent behavior ensures the input is the same.
BigQuery write idempotency:
Big Query lets us overwrite the partitions, assuming the partition key is based on the <date> column.
Scenario 2:
Intraday flows of reading data from an OLTP system and put it into HDFS every n minutes. Consider this pipeline.
Approach 1 ( not idempotent):
1) Read data from OLTP database using Spark JDBC where timestamp gt. <lastTimeIRan>
2) Apply Spark transform
3) Write to HDFS
4) Update variable <lastTimeIRan>
Approach 2 (idempotent):
1) Read from HDFS identify maxTimeStamp in data
2) Read data from OLTP database using Spark JDBC where timestamp gt. > maxTimeStamp
3) Write to HDFS
The second approach is idempotent, because you can run it any number of times and the system can break down at any step, the data integrity is guaranteed.
In the first approach however if there is a failure between step 3 and step 4 ( the variable <lastTimeIRan> never got updated) , the next time it runs one will end up with duplicate records and the data gets corrupted resulting in downstream data to be corrupted as well. It only requires one bad design to corrupt your entire downstream data pipeline and the analytics or business intelligence that comes out of it.
Idempotent design helps:
1) Operational stability of the solution
2) Building a automated system
3) Ensuring data integrity
4) A good nights sleep!
Summary:
Idempotency helps in graceful recovery from a failure in data pipelines.Understanding the storage subsystem (HDFS, object storage, databases, cloud data warehouse , SAS) helps in designing an idempotent design.Data pipelines not adhering to an idempotent design results in significant increase in operational costs, manual interventions and more importantly the data integrity is in question. An additional thing to be considered when designing data pipelines, is to avoid wasteful reprocessing of data.
At CoreCompete we build idempotent and reliable data pipelines going through a check list.
At CoreCompete we build idempotent and reliable data pipelines going through a check list.
Wednesday, December 26, 2018
Spark JDBC vs Squoop (Use Spark JDBC)
In one of my recent projects at Core Compete , which involved data warehouse modernization and transitioning the customer's data warehouse from an on-premise data warehouse to cloud, data ingestion was a key component - creating a data lake on HDFS. The underlying platform is HDP. Various high performance data transforms were developed using pyspark to transform data read from data lake. A custom tool was built to orchestrate incremental and full data loads as described in this page with some variation. ( Prefer Apache Airflow ) . This article focuses on my experience using Spark JDBC to enable data ingestion. This could be used for cloud data warehouse migration.
http://sqoop.apache.org/ is a popular tool used to extract data in bulk from a relational database to HDFS. Sqoop is a wrapper around JDBC process. Using Sqoop we ran into a few minor issues:
It is very important to remember that Spark JDBC /Sqoop will not be comparable in performance to a native database solution like a TPT for Teradata, so those need to be considered and evaluated.
It is very important to understand the different parameters in Spark JDBC, and the meaning of these parameters when using the load function in spark.
A load statement will look like: ( an illustration in pyspark)
#df1
df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", " ( select * from dm.employee) as emp ")
Note: The dbtable option can also be a sql and not necessarily a table. Ensure you enclose the sql with brackets.
The above statement will run a single connection to the database and extract the data and could be very slow. (very very slow)
So how do we change that ?
#df2
df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", "( select * from dm.employee) as emp ")\
.option("password",<password>)\
.option("numPartitions", 20)\
.option("partitionColumn","employee_id")\
.option("lowerBound", 1)\
.option("upperBound", 20000)
Note: The above statement fires 20 concurrent queries to extract data from the employee. Each query will have a clause added to the end
Spark internally breaks the query:
select * from ( <sql>) where emp_id >=1 and emp_id <=1000 --> mapper 1
select * from ( <sql>) where emp_id >=1001 and emp_id <=2000 --> mapper 2
and so on...until mapper 20
Almost there. We might still have a problem ... what happens if the upper bound and lower bound is dynamic ..i.e employee ids are not static.
How about we identify the min and max values like the following query like -
(select max(emp_id ) max_val, min(emp_id) min_val from <sql> ) t , this becomes the value for the "dbtable" option in #df1
bounded_df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", <sql>)
bounded_values = bounded_df.collect()[0]
lower_bound = bounded_values["min_val"]
upper_bound = bounded_values["max_val"]
Use these values in #df2 for upper and lower bounds:
df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", "( select * from dm.employee) as emp ")\
.option("password",<password>)\
.option("numPartitions", 20)\
.option("partitionColumn","employee_id")\
.option("lowerBound", lower_bound)\
.option("upperBound", upper_bound)
With the approach described, one can use Spark JDBC as a framework to ingest data.
The entire data ingest process was developed as a component driven by configuration identifying all the meta information (table, partitioning keys, data partition key on HDFS, type 2 SCD). Having a component like this accelerates the creation of data lake!
The target system can be HDFS, S3 or GCS.
http://sqoop.apache.org/ is a popular tool used to extract data in bulk from a relational database to HDFS. Sqoop is a wrapper around JDBC process. Using Sqoop we ran into a few minor issues:
- The version we used did not support ORC format
- Timestamps needed further data processing
- Additional step needed to convert from AVRO to ORC
While the above issues were no big obstacles, the key issue we had, was having a separate process. Having the data ingest process, more integrated with the data transforms that were developed in Spark, and one that could leverage the data, when in memory, to apply additional transforms like Type 2 SCD would be the most efficient approach. The team at Core Compete designed a reusable framework that downloads data efficiently, in parallel, and also lays down the data on HDFS using a partitioning strategy that helps accelerate the business transforms. Applies a Type 2 SCD, if the table needs one.
It is very important to remember that Spark JDBC /Sqoop will not be comparable in performance to a native database solution like a TPT for Teradata, so those need to be considered and evaluated.
It is very important to understand the different parameters in Spark JDBC, and the meaning of these parameters when using the load function in spark.
A load statement will look like: ( an illustration in pyspark)
#df1
df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", " ( select * from dm.employee) as emp ")
Note: The dbtable option can also be a sql and not necessarily a table. Ensure you enclose the sql with brackets.
The above statement will run a single connection to the database and extract the data and could be very slow. (very very slow)
So how do we change that ?
- Analyze the table that is being extracted or the data being extracted
- Is there a key like employee_id which has a normal distribution , essentially a key which ensures the data is not skewed.
- Once the key is identified - identify its upper bound and lower bound ... for example the first employee id is 1 and the max employee id is 100
- Set these values to be the upper and lower bounds below
- Set the partitionColumn to be the key. (employee_id)
- NumParititons -> here identify two things.
- Identifies the number of MAX parallel JDBC connections that are going to be fired
- Identifies the number of spark block partitions it is going to write to the HDFS
- Be careful that the database can handle this concurrent connections. check with DBA
- Set the upper bound and lower bound based on the partition key range.
#df2
df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", "( select * from dm.employee) as emp ")\
.option("password",<password>)\
.option("numPartitions", 20)\
.option("partitionColumn","employee_id")\
.option("lowerBound", 1)\
.option("upperBound", 20000)
Note: The above statement fires 20 concurrent queries to extract data from the employee. Each query will have a clause added to the end
Spark internally breaks the query:
select * from ( <sql>) where emp_id >=1 and emp_id <=1000 --> mapper 1
select * from ( <sql>) where emp_id >=1001 and emp_id <=2000 --> mapper 2
and so on...until mapper 20
Almost there. We might still have a problem ... what happens if the upper bound and lower bound is dynamic ..i.e employee ids are not static.
How about we identify the min and max values like the following query like -
(select max(emp_id ) max_val, min(emp_id) min_val from <sql> ) t , this becomes the value for the "dbtable" option in #df1
bounded_df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", <sql>)
bounded_values = bounded_df.collect()[0]
lower_bound = bounded_values["min_val"]
upper_bound = bounded_values["max_val"]
Use these values in #df2 for upper and lower bounds:
df = spark.read.format("jdbc")\
.option("url",jdbc_url)\
.option("driver",<driver name>)\
.option("user",<user>)\
.option("dbtable", "( select * from dm.employee) as emp ")\
.option("password",<password>)\
.option("numPartitions", 20)\
.option("partitionColumn","employee_id")\
.option("lowerBound", lower_bound)\
.option("upperBound", upper_bound)
With the approach described, one can use Spark JDBC as a framework to ingest data.
The entire data ingest process was developed as a component driven by configuration identifying all the meta information (table, partitioning keys, data partition key on HDFS, type 2 SCD). Having a component like this accelerates the creation of data lake!
The target system can be HDFS, S3 or GCS.
Other things to consider as part of data ingest process, which we address for our customers, as reusable components:
- Idempotency is a very important attribute, all modules need to adhere to, especially when you are extracting data on HDFS. This needs to be incorporated in the design... what happens when we run an incremental load twice, does the data get corrupted with duplicates? Idempotency design ensures data is not corrupted.
- Data validation from source data warehouse to HDFS is needed to ensure data is consistent. Numerical and statistical validation including sampling techniques needs to be built.
- Type 2 SCD - In this specific scenario it was a fast changing dimension , so we had to come up with an approach to do this in parallel and efficiently in spark
Subscribe to:
Posts (Atom)