Wednesday, July 31, 2013

Porting the Microsoft BI stack to Amazon Redshift - part 6


Over the previous 5 posts we covered those phase 1 tasks specific to the source SQL Server user tables, Sqoop, Avro, Pig and Redshift. Now, we need to address CRUD (Create-Read-Update-Delete) transactions, as well as the business requirement that Redshift databases have referential integrity. As previously noted, though Redshift supports the definition of primary key, foreign key and uniqueness constraints, it does not enforce these constraints. Consequently, any consumer of Redshift whose databases must have referential integrity has no other option than to implement their own mechanism(s) for enforcing these constraints across the data sets managed by Redshift.

CRUDer Data Services

As noted previously, our source SQL Server data sets are sharded across numerous user databases. This fact has a direct bearing on how data sets are loaded from S3 into Redshift. Moreover, when loading data into Redshift it is critically important to leverage Redshift's massively parallel processing (MPP) architecture to read and load those source data files. 

The most efficient (and therefore most cost effective) way to load data into Redshift is to use its COPY command. The COPY command is able to read multiple data files simultaneously and to perform the load process in parallel. Therefore, taking advantage of the features of the COPY command is critically important when loading the baseline data sets into Redshift. And, the benefits of the COPY command are also desirable whenever loading an incremental data set, though we must also, with incremental data sets, handle updates and deletes as well. Let's investigate the loading of baseline data sets first.

When the Sqoop baseline load scripts completed, multiple data files were generated for each shard of a given user table. Though having multiple data files to feed into the COPY command enables us to take advantage of the command's MPP read/load capabilities, we can not be certain that the records in those files is in sort key order, nor can we be certain that there are no duplicate records throughout the shards. Handling these issues is the responsibility of the CRUDer data services, specifically, its BaselineLoader service.

BaselineLoader Service

The BaselineLoader service is implemented as three mechanisms: a  'De-duplicate' Pig job, a 'Total Order Sort' ElasticMapReduce job (written in Java), and a generated Redshift COPY script (note: we will be using Oozie to schedule and coordinate the linear sequential execution of these mechanisms). In the last blog I explained why Pig is needed to change the Avro formatted data files into a delimited format which is compatible with Redshift. And now, that you understand the need to guarantee that duplicate records are not uploaded into Redshift tables, I can explain another important feature of these generated phase 1 Pig scripts.

First, the Pig script reads the Avro formatted data files, stored in their S3 bucket, using its LOAD command. Next, it invokes its DISTINCT command to remove duplicate records. Unlike SQL servers, Pig accomplishes this by working on the entire record, not on individual fields in a record. These de-duplicated record sets are then stored back into S3, but this time using PigStorage (not Avro) where the record fields are delimited by a '|'. Please note that at this point we have a collection of unique, unsorted, records, but that is not to assert that these records comply with the uniqueness constraints bound to their target Redshift table: more on this later. Where politically feasible, it is worthwhile providing reports on duplicate records to the parties responsible for the data quality in the source SQL Server user tables.

Loading our sharded data sets in sort key order is necessary if we are concerned about query performance, and if we wish to avoid the cost of a VACUUM process. The difficulty here is ensuring that the records are sorted 'globally' across all source data files: that is the goal of the 'Total Order Sort' job. In this context, records are considered to be sorted globally if, after concatenating all source files into a single file, the order of the data from record-to-record is in sort key order. Now, of course, for obvious reasons, we are not going to literally concatenate the source files, so I trust you understand the point being made here.

The Total Order Sort job has two phases. In the first phase the range of data values that will produce equal-sized subsets of data has to be computed. In the second phase the data is actually sorted globally across all output data files. This is a rather complicated MapReduce job, certainly too complicated to cover in a blog posting, and so it is recommended that you read all about this type of MapReduce design pattern in Donald Miner's and Adam Shook's most excellent book 'MapReduce Design Patterns.'

Now, we have data files which are guaranteed not to contain duplicate records, and with all records in sort key order. At this point, to save on storage costs, you'll want to automatically delete the input files to the Total Order Sort job. And, by this point, you have already executed the corresponding table DDL within Redshift, and granted the appropriate permission to users and groups on that table. The last step in the BaselineLoader service is to execute the generated COPY command which bulk-loads the baseline data set into the target Redshift table.

IncrementalLoader Service

As you recall from the first blog of this series, after the baseline data sets are uploaded into Redshift, we need to periodically upload the incremental data sets as well. It is the sole purpose of the IncrementalLoader service to handle the incremental data sets.

This requirement to upload incremental data sets opens up a whole other can of worms. Fortunately, we are using Microsoft SQL server's Change Data Capture (CDC) feature to identify the updated, newly inserted and deleted records, and we have generated Sqoop scripts which output '|' delimited data files into S3 for each category of record.

The newly inserted records are processed in the same manner that we process the baseline loads: remove duplicate records, put records in globally sorted order and COPY into Redshift table). However, updated records and deleted records require different processing.

To process the updated records, first create an 'update' table in Redshift which will be used to store the updated records. Next, execute the generated COPY command which loads those updated records into the 'update' table. Third, execute the generated UPDATE script. The UPDATE script contains a DML statement which updates the target Redshift table based on the result of a subquery (of the update table) in the WHERE clause. Lastly, truncate the update table.

To process the deleted records, first create a 'delete' table in Redshift which will be used to store the deleted records. Next, execute the generated COPY command which loads those deleted records into the 'delete' table. Third, execute the generated DELETE script. The DELETE script contains a DML statement which includes the USING keyword and a WHERE clause. For example, 'delete from target_table using delete_table where target_table.primary_key=delete_table.primary_key'. Lastly, truncate the delete table.

Now, since the DDL of the update table and the delete table are identical to the target table, one can readily use a single table which functions are both the update and the delete table. Its your choice. But, after any update or delete DML which handles the incremental data load, we must, to ensure consistent query performance execute the generated VACUUM script (for reasons previously explained) after the IncrementalLoad service has completed its three tasks.

In the next blog we will cover the RIValidator Service, which enforces primary key, foreign key and uniqueness constraints. In case your curious, the 'RI' stands for referential integrity.

No comments:

Post a Comment