Sunday 4 November 2012

Implementing Change Data Capture Part 2 – Preparing for SSIS

In the last post I discussed setting up Change Data Capture (CDC) on a database. This post deals with the first steps in setting up an incremental extract process. This would be, loosely speaking, the ‘E’ of a larger ETL (Extract, Transform, Load) process.

I feel that this is where CDC really shines. With CDC itself a transaction log based process like log shipping and mirroring and, set up as in the last post, with the CDC tables on a separate filegroup (even a separate disk array) it is a very efficient method of extracting data to a data warehouse.

Microsoft BOL’s step-by-step guide calls this an ‘incremental load’. But I see this really as the extract part of the process. This and the next post deal with getting the data out of an OLTP database and preparing it for the transform and load parts of an ETL. As you will see there is no transform occurring and it is just incidental that the data is being loaded into the data warehouse.



So, in this and the next post, I will discuss preparing for and creating an SSIS package that will perform the incremental extract and load.

This post will cover two steps:
  1. Creating wrapper functions to retrieve the CDC data
  2. Creating staging tables in the data warehouse for the MERGE/UPSERT (optional)

Step 1 – Create wrapper functions

During CDC setup SQL Server automatically creates 1 or 2 table-valued functions (depending on whether you specify ‘@supports_net_changes = 1’or not) for each tracked table to retrieve the change data. These system generated functions (cdc.fn_cdc_get_net_changes_* and cdc.fn_cdc_get_all_changes_) are very ‘naked’. So in order to make them more user and SSIS friendly we need to create so called wrapper functions.

The wrapper functions perform two main functions: Allow the use of datetime parameters rather than LSNs; provide metadata about the columns returned (required for SSIS) and in turn meaningful column headers for the result set.

The easiest way to create the wrapper functions is to run the following system stored procedure:
sys.sp_cdc_generate_wrapper_function
The above SP takes four optional parameters (@capture_instance sysname, @closed_high_end_point, @column_list, @update_flag_list). If it is run without any parameters it will return a result set of create function scripts for all the tables being tracked by CDC and include all of the columns in each table. It is quick and easy to use but I found it lacking for the following reasons:

  • Although specifying parameters gives you greater control over which tables to create wrapper functions for and which columns are to be retrieved for each table, if you are tracking a long list of tables many of which have lots of columns, you’ll be scripting out the stored procedures for ages
  • Running the stored procedure without any parameters produces column names that are not very user friendly. It is similar to the default Primary and Foreign Key naming. I’m not sure why Microsoft does it this way
  • The wrapper functions are all created with the ‘all’ value for the @row_filter_option which doesn’t allow you to use MERGE in you SSIS package. You can go through and change this if you like, but that’s a pain
  • Finally, the system stored procedure gives you no control over the names of the resulting wrapper functions

I’ve created a script that creates customised wrapper functions with the ability to create customised names, friendlier column names, the ability to define which @row_filter_option you want to use and the ability to exclude certain column types. As such, the where clause could be modified to filter out certain columns, either by data type or column function, i.e. primary keys, computed columns, binary data types, etc. I've omitted the UO and UN CDC Operators as I've not found a use for them. Below are scripts for both the standard generic stored procedure and my customised one:

- Option 1
exec sys.sp_cdc_generate_wrapper_function
- Option 2 (Create Wrapper function.sql)
decla
--Creates scripts to enable CDC for all tables in database. Place cdc tables on separate filegroup
exec('select ''EXEC ' + @dbname + '.sys.sp_cdc_enable_table @source_schema = N'''''' + s.name + ''''''
,@source_name = N'''''' + t.name + '''''',@role_name = N''''cdc_Admin''''
,@filegroup_name = N''''ChangeDataCaptureFG'''',@supports_net_changes = 1;''
from ' + @dbname + '.sys.schemas as s
join ' + @dbname + '.sys.tables as t on s.schema_id = t.schema_id
where s.name != ''cdc''  and t.name in (select name from ' + @dbname + '.' + @tablelist + ')')
s.columns as c

join sys.types as ty on c.user_type_id = ty.user_type_id
where t.object_id = c.object_id and ty.system_type_id not in (1,2,26,28,33) for xml path('')),3,65536) + ' )
as
begin
declare @from_lsn binary(10), @to_lsn binary(10)
if (@start_time is null) select @from_lsn = sys.fn_cdc_get_min_lsn(''' + t.name + ''')
else
select @from_lsn = sys.fn_cdc_increment_lsn(sys.fn_cdc_map_time_to_lsn(''largest less than or equal'',@start_time))
if (@end_time is null)
select @to_lsn = sys.fn_cdc_get_max_lsn()
else
select @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than or equal'',@end_time)
if (@from_lsn = sys.fn_cdc_increment_lsn(@to_lsn))
return
/* Query for change data */
insert into @' + t.name +
'select ' + substring((SELECT ', [' + c.name + ']'
from sys.columns as c
join sys.types as ty on c.user_type_id = ty.user_type_id
where t.object_id = c.object_id and ty.system_type_id not in (1,2,26,28,33)
for xml path('')),3,65536) + '
case __$operation
when 1 then ''D''
when 2 then ''I''
when 4 then ''U''
when 5 then ''M''
else null
end as CDC_OPERATION
from cdc.fn_cdc_get_net_changes_Customer(@from_lsn, @to_lsn, ''' + @row_filter_option + ''')
return
end
go'
FROM sys.tables as t
where is_tracked_by_cdc = 1

Step 2 – Create staging tables (DW staging tables.sql)

This step is only required if you created the wrapper functions using the ‘all with merge’ @row_filter_option. What that option does is separate the change data into two categories: Delete; Merge. Whereas the ‘all’ and ‘all with mask’ options break the change data into three: Delete; Insert; Update. According to BOL the ‘all with merge’ option is the more efficient method of running the function. It does however require you to create staging tables for each tracked table. In the end it is really up to you.

I’ve created this script to create staging tables that mirror the result set coming from the wrapper functions created above:
Run this query on the source database and run the resulting queries on the data warehouse to create the tables
SELECT 'CREATE TABLE stg' + t.name +
' (' + substring((SELECT ', [' + c.name + '] ' + ty.name + CASE WHEN ty.name LIKE '%char%' THEN ' (' + cast(c.max_length/2 AS varchar(10)) + ')'
WHEN ty.name = 'decimal' THEN ' (' + cast(c.precision AS varchar(5)) + ',' + cast(c.scale AS varchar(5)) + ')'
ELSE '' END
FROM sys.columns AS c
JOIN sys.types AS ty ON c.user_type_id = ty.user_type_id
WHERE t.object_id = c.object_id AND ty.user_type_id NOT IN (1,2,26,28,33)
FOR XML PATH('')),2,65536) + ')'
FROM sys.tables as t
WHERE is_tracked_by_cdc = 1

* A tracked table with its function(s) make up a ‘capture instance’. The capture instance name is the schema name and table name concatenated wwith an underscore separating them, i.e. Sales_Orders

No comments: