Sunday 11 November 2012

Implementing Change Data Capture Part 3 – Creating an SSIS Package



In the last post I covered creating a wrapper function for use within an SSIS package and staging tables in the data warehouse. This post will deal with creating the SSIS package(s) itself.

As I mentioned in my first post about Change Data Capture (CDC) , SQL Server Books Online has a very good step-by-step how-to on setting up CDC and the related SSIS package to perform incremental loads to a data warehouse. My series of posts is meant to make that set up and implementation a little easier by somewhat automating the script and object generation process.




I’ve created a sample dtsx file that you can download. Below I will take you through the various control flow items and how to generate the SQL and VB or .NET scripts needed for each. However, I don’t intend to explain in detail how to create the package. Please see the MSDN or BOL help files for a detailed, step-by-step guide.
The package comprises the following steps broken out into two main sections:


1.       1. Control Flow items that are run for the entire CDC incremental load
a.       Truncating any existing staging tables (only if using the ‘all with merge’ option in the wrapper functions).
b.      Calculating the data retrieval interval – essentially you are setting the start and end dates and times of the data you want to load. In my example I’m grabbing 4 hours’ worth of data between 6 and 2 hours ago.
c.       Checking whether the data is ready – due to latency and other transaction related lags or delays there is a chance that not all the data will be ready for loading. So this package has a built in loop to check for the data a certain number of times before giving up. This is also why I’ve added the 2 hour padding into the retrieval interval above.
2.       2. Control Flow items that need to be created for each capture instance. These can also be set up as independent packages launched by the package steps above. However, I’m sticking with doing everything within one SSIS package.
a.       Dynamically preparing the query to retrieve data from each capture instance
b.      Retrieving the data and loading by either
                                                              i.      Applying the insert, update and delete changes (If ‘all with merge’ IS NOT being used)
                                                            ii.      Applying the delete and merge changes (If ‘all with merge’ IS being used)
3.   
   The following variables will need to be added at the package scope level. The names need not be exactly as stated below, but these are what I used in my sample package
a.       DataReady Int32
b.      DelaySeconds Int32
c.       ExtractedStartTime DateTime
d.      ExtractEndtime DateTime
e.      IntervalID Int32
f.        SqlDataQuery String
g.       TimeoutCeiling Int32
h.      TimeoutCount Int32


Section 1
1.  Truncate staging tables
a.       Create an ‘Execute SQL Task’ Control Flow in the SSIS package
b.      Run the following script on the source database and add the resulting truncate table statements into the Execute SQL Task
select 'TRUNCATE TABLE stg' + t.name
from sys.tables as t
where is_tracked_by_cdc = 1
2.    Calculating interval for incremental load – Create an ‘Execute SQL Task’ with the ResultSet set to Single row. Map the result set columns ExtractStartTime and ExtractEndTime to variables User::ExtractStartTime and User::ExtractEndTime respectively.
SELECT CAST(CONVERT (CHAR(15),DATEADD(hh,-6, GETDATE()),113) + '00:00' AS DATETIME) AS ExtractStartTime,
CAST(CONVERT (CHAR(15),DATEADD(hh,-2, GETDATE()),113) + '00:00' AS DATETIME) AS ExtractEndTime
3.      Check whether the data is ready
a.       Add a ForLoop container into the package and inside this container add
                                                  i.      Execute SQL Task: Set the ResultSet to Single row. Map the result set columns DataReady and TimeOutCount to the variables User::DataReady and User::TimeOutCount respectively. Then add the following query:
declare @DataReady int, @TimeoutCount int

if not exists (select tran_end_time from cdc.lsn_time_mapping
        where tran_end_time >)--@ExtractEndTime
    select @DataReady = 0
else
    if ? = 0
        select @DataReady = 3
else
    if not exists (select tran_end_time from cdc.lsn_time_mapping
            where tran_end_time <= ? )--@ExtractStartTime
        select @DataReady = 1
else
    select @DataReady = 2
select @TimeoutCount = ?
if (@DataReady = 0)
    select @TimeoutCount = @TimeoutCount + 1
else
    select @TimeoutCount = 0

if (@TimeoutCount > ?)
    select @DataReady = 5
select @DataReady as DataReady, @TimeoutCount as TimeoutCount

Map the input parameters as follows:
Variable name
Parameter name
User::ExtractEndTime
0
User::IntervalID
1
User::ExtractStartTime
2
User::TimeoutCount
3
User::TimeoutCeiling
4
     
                 ii.   Add two Script Tasks, one as a delay if the above query establishes the data is not ready and, in order to avoid an infinite loop, one to quit the package and raise/log an error. These can be written in tsql, VB or C#. Tsql is the least efficient of the three (and if using tsql then replace the Script Task with an Execute SQL Task). My example uses C# for the delay and VB.Net for the exit loop (I’m not proficient in either so I used what I best understood).


Section 2
1.       Create sql statement to execute wrapper function for each table (or more precisely, each capture instance. Once again, this can be written either as a Script Task in C#/VB or as an Execute SQL Task in tsql. My example uses C#. Run the following script on the source database to return a list of C# scripts for each capture instance. I’ve included the parameter @function_prefix in order to identify the wrapper functions in the database by their name prefix. When I created my custom wrapper functions in the last post, I named them using the prefix ‘fn_wrp_’.  But use whatever fits your naming conventions best.
Specify the following as the ReadOnlyVariables: User::ExtractEndTime,User::ExtractStartTime
Specify as the ReadWriteVariable: User::SqlDataQuery
DECLARE @function_prefix varchar(10) --the pattern to search for in the CDC functions
   SET @function_prefix = 'fn_wrp_'
   SELECT name as fn_name, 'int dataReady;
            System.DateTime extractStartTime;
            System.DateTime extractEndTime;
            string sqlDataQuery;

            dataReady = (int)Dts.Variables["DataReady"].Value;
            extractStartTime = (System.DateTime)Dts.Variables["ExtractStartTime"].Value;
            extractEndTime = (System.DateTime)Dts.Variables["ExtractEndTime"].Value;

            if (dataReady == 2)
            {
                sqlDataQuery = "SELECT * FROM dbo.' + + '(''" + string.Format("{0:yyyy-MM-dd hh:mm:ss}", extractStartTime) + "'', ''" + string.Format("{0:yyyy-MM-dd hh:mm:ss}", extractEndTime) + "'',''all with merge'')";
            }
            else
            {
                sqlDataQuery = "SELECT * FROM dbo.fn_net_changes_HumanResources_JobCandidate(null" + ", ''" + string.Format("{0:yyyy-MM-dd hh:mm:ss}", extractEndTime) + "'',''all with merge'')";
            }

            Dts.Variables["SqlDataQuery"].Value = sqlDataQuery;
            Dts.TaskResult = (int)ScriptResults.Success;' AS [.NET Script]
FROM sys.objects
WHERE name like '%' + @function_prefix + '%'


 2.       Add the Data Flow Task that uses the above prepared query to load the data into the data warehouse. This Data Flow Task contains several items:
a.       OLE DB Source item
This is defined with the Data Access Mode set to ‘SQL command from variable’ and uses the variable as defined by the script task we created in point 4. Select the Variable name User::SqldataQuery. Select the columns you want in the result set.





 

b.      Conditional Split item – based on the CDC Operation point to the appropriate OLE DB destination or command. Since my example uses the ‘all with merge’ @row_filter_option I have set two conditions:
                                                  i.      Inserts or updates: CDC_OPERATION == "M"
                                                ii.      Deletes: CDC_OPERATION == "D"






c.       OLE DB Destination items
                                                  i.      If using the ‘all with merge’ @row_filter_option
1.       OLE DB Command item for the delete
Run the following query on the source database to get delete statements for all the tables tracked by CDC (it assumes the data warehouse tables are named the same as the source tables):
SELECT 'delete from dbo.' + t.name + ' where ' + c.name + ' = ?'
FROM sys.tables as t
JOIN sys.columns as c on c.object_id = t.object_id
where t.is_tracked_by_cdc = 1 and c.is_identity = 1
2.       OLE DB Destination item for the insert into the staging tables – As this is a standard you will need to create this manually.
                                                ii.      If using the ‘all’ or ‘all with mask’ @row_filter_option
1.       OLE DB Command items for the delete – See above
2.       OLE DB Command items for the update
3.       OLE DB Destination item for the insert – As this is a standard you will need to create this manually.
3.       Lastly, add an Execute SQL Task item to perform the Merge. This step will look at a staging table and insert rows that don’t yet exist in the data warehouse table or update them if they do. I’ve created a script to aid in the creation of the MERGE statement. The resulting sql statements NEED to be reviewed before implementing. I do not guarantee the results. Run the following query on the source database (it assumes the data warehouse tables are named the same as the source tables):
select t.name, 'MERGE dbo.' + isnull(t.name , '') + ' AS target
USING (SELECT ' + substring((SELECT ', ' + isnull(c.name , '')
                                 from sys.columns as c
                                 where t.object_id = c.object_id
                           for xml path('')),2,65536) + '
         FROM dbo.tmp' + isnull(t.name , '') + ') AS source (' + substring((SELECT ', ' + isnull(c.name , '')
                                 from sys.columns as c
                                 where t.object_id = c.object_id
                           for xml path('')),2,65536) + ')
ON (target.' + (select isnull(c.name , '')
                     from sys.columns as c
                     where t.object_id = c.object_id and c.is_identity = 1) + ' = source.' +
                     (select isnull(c.name , '')
                     from sys.columns as c
                     where t.object_id = c.object_id and c.is_identity = 1) + ')
WHEN MATCHED THEN
    UPDATE SET ' + substring((SELECT ', ' + isnull(c.name , '') + ' = source.' + isnull(c.name , '')
                                 from sys.columns as c
                                 join sys.columns as c1 on c.column_id = c1.column_id and c.object_id = c1.object_id
                                 where t.object_id = c.object_id and c.is_identity != 1
                           for xml path('')),2,65536) + '
WHEN NOT MATCHED THEN     
    INSERT (' + substring((SELECT ', source.' + isnull(c.name , '')
                                 from sys.columns as c
                                 where t.object_id = c.object_id
                           for xml path('')),2,65536) + ')
    VALUES (' + substring((SELECT ', source.' + isnull(c.name , '')
                                 from sys.columns as c
                                 where t.object_id = c.object_id
                           for xml path('')),2,65536) + ');'
FROM sys.tables as t
where t.is_tracked_by_cdc = 1

I’ve now outlined the main steps in creating an SSIS package to incrementally extract data from an OLTP database into a data warehouse using CDC tracked tables. The above is a very basic solution. In a real-life scenario the data may be dealt with rather differently. For instance, many organisations do not want to delete data from the data warehouse. Any delete CDC Operations might instead be handled as an update in the DW, marking a row as having been deleted, but keeping the data for reporting or auditing reasons. In some cases the base data warehouse tables may not even update rows but insert a new row for each change or delete for particular primary keys.  There may also be instances where some of the transforms might be performed during the extract phase using the wrapper functions.
All my queries above can be modified to suit these and other various scenarios. The main goal of this and the previous few posts, and I hope I’ve achieved it, was to provide an overview of CDC and one of its uses: incrementally extracting data with an SSIS package, as well as providing some shortcuts to make the implementation a bit easier.
 

No comments: