Power Tools for Visual Studio Team System 2008 Database Edition

Gert and team announce the availability of Power Tools for Visual Studio Team System 2008 Database Edition!

Cool new stuff includes a couple new test conditions, file- and XML-based data generators, and (my favorite) SQL Static Code Analysis integrated into MSBuild / Team Build!

Download page:
http://www.microsoft.com/downloads/details.aspx?FamilyID=73ba5038-8e37-4c8e-812b-db14ede2c354&displaylang=en

Installer download:
http://download.microsoft.com/download/f/b/8/fb8d1c0d-c0c4-4004-ab86-12396b2a3ee3/VSTSDB2008PT.msi

Documentation download:
http://download.microsoft.com/download/f/b/8/fb8d1c0d-c0c4-4004-ab86-12396b2a3ee3/Power Tools 2008.doc

Great work Team!

:{> Andy

Introducing Change Data Capture, SSIS, and SQL Server 2008 CTP5 (Nov 2007)

Introduction 

On Thursday, 24 Jan 2008, I presented New Features In SSIS 2008 to the Richmond SQL Server Users Group.

Most of the presentation was dedicated to demonstrating Change Data Capture (CDC) interacting with SQL Server 2008 Integration Services. I started seriously working on this demo the first week of January, thinking I’d put 2 – 6 hours into it to get it running using the detailed instructions in Books Online. Things were going relatively well working through the demo until I hit calls from SSIS to table-valued functions created by CDC.

The TVFs didn’t exist. Indifferent

Well, that’s not exactly true. Their renamed stubs exist in CTP5, but they only return 0. Bummer. Or is it? This is what’s cool about CTPs – you get a feel for where the development is and where it will likely go. And while working through some other stuff related to CDC I’d learned enough to cobble together a solution, so I did. What I built will demonstrate the principles of CDC.

<As_Is>

I assure you there will be differences in the RTM and would wager good money this post will be obsolete with CTP6. So if I get any questions / comments the first thing I will ask is “Which CTP are you using?” If it’s not CTP5 this post is not appropriate.
</As_Is>

You can build your own virtual server or PC and play along, or you can download a prebuilt virtual hard drive (vhd) file here. I built my own. Once I got the SQL Server 2008 November CTP up and running I logged in and began tinkering. Here’s what I did and what I learned:

Introducing CDC 101

The idea of Change Data Capture is pretty cool. The data is stored in a format similar to that used by some database engine transaction logs. In a transaction log changes are respresented in row-based before- and after-images Inserts have empty before-images and the inserted data in the after-image. This makes sense because there’s no data there before, only after the insert. Deletes have deleted data in the before-image and empty after-images. Updates have the existing row (or affected columns in the row) in the before-image and the updated row (or affected columns in the row) in the after-image.

In SQL Server 2008’s implementation of CDC, changes are stored in a table. The table is created in a special schema named “cdc” built in the database when you enable Change Data Capture. I had a cynical thought: I wonder if the Center for Disease Control uses SQL Server… Wink

A table in the cdc schema is created for each table for which Change Tracking is enabled. The table is named cdc.schema_table_CT and contains a column for each column in the Tracked table plus metadata columns. In my example I enabled Change Tracking on a table named dbo.Contact. The table created in the cdc schema is cdc.dbo_Contact_CT.

One of the metadata columns is __$operation and it identifies the record (image) type. Inserts and Deletes have no empty images. Deletes are respresented in the row by an __$operation column value of 1; Inserts by an __$operation column value of 2. The before-image of an Update in the row by an __$operation column value of 3; the after-image by an __$operation column value of 4.

Setting It Up

I’m going to demonstrate using Change Data Capture to develop high-performance ETL with SSIS 2008. I started with the Improving Incremental Loads with Change Data Capture – accessible from Change Data Capture In Integration Services – topic in SQL Server 2008 CTP5 Books Online (ms-help://MS.SQLCC.v10/MS.SQLSVR.v10.en/s10is_1devconc/html/c4aaba1b-73e5-4187-a97b-61c10069cc5a.htm).

I first create two databases: CDCSource and CDCTarget. I use the following script:

use master
go
— create CDCSource database…
if not exists(select name
from sys.databases
where name = ‘CDCSource’)
Create Database CDCSource
go
use CDCSource
go

— create CDCTarget database…
if not exists(select name
from sys.databases
where name = ‘CDCTarget’)
Create Database CDCTarget
go

Next I create a table in each database to serve as my ETL source and destination. I use a portion of the AdventureWorks Person.Contact table:

use CDCTarget
go

— create and populate dbo.Contact table…
if exists(select name
from sys.tables
where name = ‘Contact’)
Drop Table dbo.Contact
go

select ContactID
,NameStyle
,Title
,FirstName
,MiddleName
,LastName
,Suffix
,EmailAddress
,EmailPromotion
,Phone
,ModifiedDate
into dbo.Contact
from AdventureWorks.Person.Contact
go

use CDCSource
go

— create and populate dbo.Contact table…
if exists(select name
from sys.tables
where name = ‘Contact’)
Drop Table dbo.Contact
go

select ContactID
,NameStyle
,Title
,FirstName
,MiddleName
,LastName
,Suffix
,EmailAddress
,EmailPromotion
,Phone
,ModifiedDate
into dbo.Contact
from AdventureWorks.Person.Contact
go

Change Tracking

Change Tracking is enabled on the database and then on individual tables. To enable Change Tracking in CTP5 use the following T-SQL statement:

— enable CDC on database…
if not exists(select name, is_cdc_enabled
              from master.sys.databases
              where name = ‘CDCSource’
               and is_cdc_enabled = 1)
 exec sys.sp_cdc_enable_db_change_data_capture
go

Once Change Tracking is enabled on the database you can enable Change Data Capture on individual tables using a script similar to the following:

— enable CDC on dbo.Contact table…
exec sys.sp_cdc_enable_table_change_data_capture
  @source_schema = ‘dbo’
, @source_name = ‘Contact’
, @role_name = ‘cdc_admin’;
go

Making It Work

Books Online has us using a table-valued function to access the cdc.dbo_Contact_CT table, but that function is merely a stub in CTP5. Here’s where I got creative to make it work.

Create a table-valued function dbo.uf_Contact:

— build CDC table-valued function…
if exists(select name
          from sys.objects
          where name = ‘uf_Contact’)
 drop function cdc.uf_Contact
go

CREATE function cdc.uf_Contact (
@start_time datetime
,@end_time datetime
)
returns @Contact table (
ContactID int
,NameStyle bit
,Title nvarchar(8)
,FirstName nvarchar(50)
,MiddleName nvarchar(50)
,LastName nvarchar(50)
,Suffix nvarchar(10)
,EmailAddress nvarchar(50)
,EmailPromotion int
,Phone nvarchar(25)
,ModifiedDate datetime
,CDC_OPERATION varchar(1)
) as
begin
declare @from_lsn binary(10), @to_lsn binary(10)
if (@start_time is null)
select @from_lsn = sys.fn_cdc_get_min_lsn(‘Contact’)
else
select @from_lsn = sys.fn_cdc_increment_lsn(sys.fn_cdc_map_time_to_lsn(‘largest less than or equal’,@start_time))
if (@end_time is null)
select @to_lsn = sys.fn_cdc_get_max_lsn()
else
select @to_lsn = sys.fn_cdc_map_time_to_lsn(‘largest less than or equal’,@end_time)
if (@from_lsn = sys.fn_cdc_increment_lsn(@to_lsn))
return
— Query for change data
insert into @Contact
select ContactID
,NameStyle
,Title
,FirstName
,MiddleName
,LastName
,Suffix
,EmailAddress
,EmailPromotion
,Phone
,ModifiedDate
,case __$operation
 when 1 then ‘D’
 when 2 then ‘I’
 when 4 then ‘U’
 else null
end as CDC_OPERATION
from cdc.dbo_Contact_CT
where __$start_lsn between @from_lsn and @to_lsn
 and __$operation != 3 –‘all’
return
end
go

This is the function we will query from SSIS to get changes.

Change Tracking is now in place for CDCSource and dbo.Contact. Changes made to the dbo.Contact will be stored in the cdc.dbo_Contact_CT table.

The SSIS Package

Let’s build an SSIS package to take advantage of Change Tracking. Create a new SSIS 2008 project and rename the default package ChangeDataCapture.dtsx. Again, I’m using the Improving Incremental Loads with Change Data Capture – accessible from Change Data Capture In Integration Services – topic in SQL Server 2008 CTP5 Books Online (ms-help://MS.SQLCC.v10/MS.SQLSVR.v10.en/s10is_1devconc/html/c4aaba1b-73e5-4187-a97b-61c10069cc5a.htm).

Add the following package-scoped variables:

Name                     Data Type            Value
ExtractStartTime     DateTime             
ExtractEndTime      DateTime             
DataReady             Int32                    2
DelaySeconds        Int32                    10
IntervalID                Int32                    0
TimeoutCount         Int32                    0
TimeoutCeiling        Int32                    20
SQLDataQuery       String                  SELECT * FROM cdc.uf_Contact(null, ‘2008-01-16 01:00:00’)

Right-click in the Connection Managers space at the bottom of the package and select” New ADO.NET Connection…”. Connect to an instance of SQL Server 2008 CTP5. My instance is the default instance on a virtual server named VPCSQL2K8CTP5. In the “Select or enter a database name” textbox enter CDCSource:

Click the Test Connection button to make sure all is well with your connectivity. Correct any issues and click the OK button to close the editor.

You are free to leave the connection manager named as is, but I like to have the names make sense to me. Right-click the Connection Manager and rename it “CDCSource.ADO”.

Create two more Connection Managers. Make them OLEDB Connection Managers. Set up one for CDCSource and name it CDCSource.OLEDB. Set up the other for CDCTarget and name it (wait for it…) CDCTarget.OLEDB. Wink

Drag an Execute SQL Task onto the Control Flow canvas and rename it Get Interval. Set the ConnectionType property to the OLEDB and the Connection property CDCSource.OLEDB. Set the SQLStatement property to:

SELECT DATEADD(hh,-2, GETDATE()) AS ExtractStartTime,
GetDate() AS ExtractEndTime

Set the ResultSet property to Single Row. On the Result Set page, add two results:

Result Name     Variable Name
0                      User::ExtractStartTime
1                      User::ExtractEndTime

Drag and drop a For Loop Container onto the Control Flow and connect the Get Interval Execute SQL Task to it using a Success precedence constraint. Double-click the For Loop to open the editor. Set the InitExpression property to @DataReady = 0 and the EvalExpression property to @DataReady == 0:

Drag an Execute SQL Task into the For Loop Container. Change the name to Check Change Data Status and double-click the Task to open the editor. Set the Connection to CDCSource.OLEDB and the SQLStatement property to the following script:

declare @DataReady int
declare @TimeoutCount int
if not exists (select tran_end_time
                   from cdc.lsn_time_mapping
                   where tran_end_time > ? )
select @DataReady = 0
else
if ? = 0
select @DataReady = 3
else
if not exists (select tran_end_time
                   from cdc.lsn_time_mapping
                   where tran_end_time <= ? )
select @DataReady = 1
else
select @DataReady = 2
select @TimeoutCount = ?
if (@DataReady = 0)
select @TimeoutCount = @TimeoutCount + 1
else
select @TimeoutCount = 0
if (@TimeoutCount > ?)
select @DataReady = 5
select @DataReady as DataReady, @TimeoutCount as TimeoutCount 

Set the Parameters as shown:

Set the ResultSet property to Single Row and the Result Set page as follows:

Click the OK button to close the editor. Drag a Script Task into the For Loop Container with the Execute SQL Task. Double-click it to open the editor. Set the Script Language to Microsoft Visual Basic 2005 (you can now also use Microsoft Visual C# 2005) and click the ellipsis in the ReadOnlyVariables property and select User::DelaySeconds:

I have to pause here and say “Kudos!” to the SSIS team for this interface. I usually fat-finger the variable names when typing them. This is much, much nicer. Thanks!

Click the OK button to close the Select Variables form. Click the Edit Script button to open the VSTA editor. Double-click the ScriptMain.vb class to open the script editor. Edit Public Sub Main() so it reads as follows:

Public Sub Main()

‘ Add your code here

System.Threading.Thread.Sleep(CType(Dts.Variables(“DelaySeconds”).Value, Integer) * 1000)
Dts.TaskResult = ScriptResults.Success
End Sub

Close the VSTA editor and click the OK button to close the Script Task editor.

Connect the Execute SQL Task to the Script Task with a Success Precedence Constraint. Double-click the constraint to open the editor. Dropdown the Evaluation operation and select “Expression and Constraint”. Make sure the Value is set to Success. Enter the following expression into the Expression textbox: “@DataReady == 0 && @TimeoutCount <= @TimeoutCeiling” as shown:

Click the Test button to test the validity of the expression. Click the OK buttont to close the Precedence Constraint editor.

I like the visual effects available for precedence constraints. When you hover over the Fx box you get a tooltip that displays the Expression value:

But note: if you select the precedence constraint and view properties (a quick shortcut is to press the F4 key) you can set the Annotation property from AsNeeded to ConstraintOptions, which will display the Expression property value always – how cool:

Add a second Script Task inside the For Loop Container and double-click to open the editor. Select Microsoft Visual Basic 2005 as the language and select User::DataReady and User::ExtractStartTime as ReadOnlyVariables. Click the Edit Script button and open ScriptMain.vb. Replace the code in Public Sub Main() with the following VB.Net:

Public Sub Main()

‘ Add your code here

‘ User variables.
Dim dataReady As Integer = _
CType(Dts.Variables(“DataReady”).Value, Integer)
Dim extractStartTime As Date = _
CType(Dts.Variables(“ExtractStartTime”).Value, DateTime)
‘ System variables.
Dim packageName As String = _
Dts.Variables(“PackageName”).Value.ToString()
Dim executionStartTime As Date = _
CType(Dts.Variables(“StartTime”).Value, DateTime)
Dim eventMessage As New System.Text.StringBuilder()
If dataReady = 1 OrElse dataReady = 5 Then
If dataReady = 1 Then
eventMessage.AppendLine(“Start Time Error”)
Else
eventMessage.AppendLine(“Timeout Error”)
End If
With eventMessage
.Append(“The package “)
.Append(packageName)
.Append(” started at “)
.Append(executionStartTime.ToString())
.Append(” and ended at “)
.AppendLine(DateTime.Now().ToString())
If dataReady = 1 Then
.Append(“The specified ExtractStartTime was “)
.AppendLine(extractStartTime.ToString())
End If
End With
System.Windows.Forms.MessageBox.Show(eventMessage.ToString())
Dts.Log(eventMessage.ToString(), 0, Nothing)
Dts.TaskResult = ScriptResults.Failure
  Else
    Dts.TaskResult = ScriptResults.Success
  End If
End Sub

Close the editor and click the OK button to close the Script Task editor.

Connect the Execute SQL Task to the new Script Task with a Success Precedence Constraint. Edit the Precedence Constraint, setting the Evaluation Operation to “Success and Constraint” and the Expression to “@DataReady == 1 || @DataReady == 5”. Click the OK button to close the editor.

And Now For Something Completely Different

This next part isn’t included in the Books Online walk-through. I needed to add it because I was stopping a lot during my presentations. It doesn’t hurt to have this in your package. So here goes.

We’re now done with the For Loop Container. Add a new Execute SQL Task to the Control Flow canvas below the For Loop Container, and connect the For Loop Container to it using a Success Precedence Constraint. Double-click it to open the editor. Change the Name property to Update EndTime and the ResultSet property to Single Row. Set the Connection property to CDCSource.OLEDB and add the following statement to the SQLStatement property:

select GETDATE() as ExtractEndTIme

Click the Result Set page and add a Result. Set the Name to 0 and the Variable Name property to User::ExtractEndTime. Click the OK button to close the editor.

Why did I do this? The scripts that retrieve the changed rows use time. You’ll see when we walk through the demo portion, you can wait several minutes between the time you initially set the ExtractEndTime variable in the original Execute SQL Task and the time you actually update data in the Change-Tracked table. This task makes certain that the value of ExtractEndTime is current when you exit the loop.

Back To Our Regularly Scheduled Programming

Next, add a Script Task and double-click it to open the editor. This time select Microsoft Visual C# as the ScriptLanguage property. Add User::DataReady, User::ExtractEndTime, and User::ExtractStartTime in the ReadOnlyVariables property, and User:SQLDataQuery in the ReadWriteVariables property:

Click the Edit Script button to open the VSTA editor and double-click ScriptMain.cs to open the class. Replace the public void Main() method code with the following code:

public void Main()
  {
    //string sEST;
    //sEST = Dts.Variables[“ExtractStartTime”].Value.ToString();
    //System.Windows.Forms.MessageBox.Show(sEST);
    int dataReady;
    System.DateTime extractStartTime;
    System.DateTime extractEndTime;
    string sqlDataQuery;
    dataReady = (int)Dts.Variables[“DataReady”].Value;
    extractStartTime = (System.DateTime)Dts.Variables[“ExtractStartTime”].Value;
    extractEndTime = (System.DateTime)(Dts.Variables[“ExtractEndTime”].Value);

    //string sExtractStart;
    //sExtractStart = extractStartTime.ToString();
    //System.Windows.Forms.MessageBox.Show(sExtractStart);
    if (dataReady == 2)
      {
        sqlDataQuery = “SELECT * FROM cdc.uf_Contact(‘” + string.Format(“{0:yyyy-MM-dd hh:mm:ss}”,
extractStartTime) + “‘, ‘” + string.Format(“{0:yyyy-MM-dd hh:mm:ss}”, extractEndTime) + “‘)”;
      }
    else
      {
        sqlDataQuery = “SELECT * FROM cdc.uf_Contact(null” + “, ‘” + string.Format(“{0:yyyy-MM-dd hh:mm:ss}”, extractEndTime) + “‘)”;
      }
    Dts.Variables[“SQLDataQuery”].Value = sqlDataQuery;
    //system.windows.forms.messagebox[sqlDataQuery];
    System.Windows.Forms.MessageBox.Show(sqlDataQuery);
    Dts.TaskResult = (int)ScriptResults.Success;
  }

Close the VSTA editor and click the OK button to close the Script Task editor.

Add a Data Flow Task and connect the former Script Task to it via a Success Precedence Constraint. Double-click the Data Flow Task to open the editor. Drag an OLE DB Data Adapter onto the Data Flow canvas. Set the Connection Manager property to CDCSource.OLEDB, the Data Access Mode property to “SQL command from variable”, and the Variable name property to User::SQLDataQuery:

 

Click the OK button to close the editor.

As in SSIS 2005, a Conditional Split transformation accepts a single input and allows you to define multiple outputs using the SSIS Expression Language. Rows at the input are redirected to one and only one of the outputs. If you are familiar with the switch function in C-ish languages or the Select Case function in VB-ish languages, the Conditional Split operates much the same way. The else from these programming functions is accomplished via the Default Output, which you can also label with a name.

Drag a Conditional Split Transformation onto the canvas. Add an output named Inserts with the Condition expression set to CDC_OPERATION == “I”. Add another output named Updates with the Condition expression set to CDC_OPERATION == “U”. Add a third output named Deletes with the Condition expression set to CDC_OPERATION == “D”:

These three outputs will divide our data stream into rows to be inserted, updated, and deleted (respectively). For the Inserts, drag an OLEDB Destination Adapter onto the Data Flow canvas. For the Updates and Deletes, drag two OLEDB Command transformations onto the canvas. Rename the OLEDB Destination Adapter “Land Inserts”. Rename the OLEDB Command transformations “Apply Updates” and “Apply Deletes” (respectively).

Connect a data flow from the Conditional Split to the Destination Adapter. Because there are multiple outputs available from the Conditional Split, you will be prompted to select one. Select the Inserts output:

 

Click the OK button to close the Input Output Selection form. 

Double-click the Destination Adapter to open the editor. Set the OLEDB Connection Manager property the to CDCTarget.OLEDB connection manager. Select the dbo.Contact table from the “Name of the table or view” dropdown:

Click the OK button to close the editor. 

Drag a second data flow from the Conditional Split to the Apply Updates OLEDB Command transformation and select Updates when prompted for an output. Double-click the Apply Updates OLEDB Command transformation to open the Advanced Editor for Apply Updates (OLEDB Command transformations do not have a standard editor – at least not in CTP5). Select the CDCTarget.OLEDB connection manager on the Connection Managers tab. On the Component Properties tab, set the SqlCommand property to the following T-SQL statement:

Update dbo.Contact
 set
  NameStyle = ?
 ,Title = ?
 ,FirstName = ?
 ,MiddleName = ?
 ,LastName = ?
 ,Suffix = ?
 ,EmailAddress = ?
 ,EmailPromotion = ?
 ,Phone = ?
 ,ModifiedDate = ?
where ContactID = ?

Click the Column Mappings tab and map Available Input Columns to Available Destination Columns in one of two ways: 

  1. On the top half of the editor drag individual Input Columns and drop them on the desired Destination Columns.
  2. On the lower half of the editor select Input Columns to match to Destination Columns.

The question marks in the SqlCommand statement map to Paramater values (Param_n) by ordinal. The first question mark maps to Param_0, the second to Param_1, and so on. Our first question mark assigns a value to the NameStyle column, so map the NameStyle Input to Param_0 in the Destination. Continue this, mapping the following:

Input                                      Destination
NameStyle                             Param_0
Title                                       Param_1
FirstName                              Param_2
MiddleName                           Param_3
LastName                              Param_4
Suffix                                     Param_5
EmailAddress                         Param_6
EmailPromotion                      Param_7
Phone                                    Param_8
ModifiedDate                          Param_9
ContactID                               Param_10

 

Click the OK button to close the Advanced Editor.

Drag another output from the Conditional Split to the Apply Deletes OLEDB Command transformation and select Deletes when prompted to select an output. Double-click the OLEDB Command to open its Advanced Editor and select CDCTarget.OLEDB for the Connection Manager. On the Component Properties tab, enter the following statement in the SqlCommand property:

delete dbo.Contact where ContactID = ?

On the Column Mappings tab, map the ContactID input to the Param_0 Destination and click the OK button to close the Advanced Editor.

The Data Flow is complete and should appear similar to the following:

Go, Go, Change Data Capture!

Execute the SSIS package and view the Control Flow. If you do nothing more, you will see the For Loop execute 20 times, each time incrementing the @TimeoutCount variable value and checking to see if it has reached the @TimeoutCeiling variable value:

 However, if you open SQL Server Management Studio (SSMS) and execute a query that changes the underlying data in the CDCSource database, the SSIS package will detect the change and break out of this loop.

Restart the SSIS package (to reset the @TimeoutCount variable value). Use the following query to execute changes in the CDCSource.dbo.Contact table:

———————–
— Execute Changes —
———————–

— Update —
update dbo.Contact
set EmailAddress = ‘_’ + EmailAddress
where ContactID % 5 = 0

— Insert —
insert into dbo.Contact
(NameStyle
,Title
,FirstName
,MiddleName
,LastName
,Suffix
,EmailAddress
,EmailPromotion
,Phone
,ModifiedDate)
values
(0
,’Mr.’
,’Andy’
,’Mortimer’
,’Leonard’
,null
,’aleonard@solidq.com’
,0
,’123-456-6789′
tdate()
)

— Delete —
delete dbo.Contact
where ContactID % 27 = 0

Return to SSIS and note that on the next pass through the loop more code is executed:

If you didn’t comment it out, the C# script should display a message box containing the query (stored in @User::SqlDataQuery) that will serve as the source of the OLEDB Source Adapter in the Data Flow:

Click the OK button to dismiss the non-modal message box.

Click the Data Flow tab to observe changes as they are applied to CDCTarget. When complete, the Data Flow should look similar to the following:

You can also look at the table containing the changes by executing the following statement against CDCSource in SSMS:

select
__$start_lsn
,__$end_lsn
,__$seqval
,__$operation
,__$update_mask
,ContactID
,NameStyle
,Title
,FirstName
,MiddleName
,LastName
,Suffix
,EmailAddress
,EmailPromotion
,Phone
,ModifiedDate
from cdc.dbo_Contact_CT

In my database the change tracking table appears as shown:

You will recall from (building the function) that the __$operation column is used to determine whether the change is an insert, update, or delete. I mentioned this earlier: The value in this column captures the type of change.

__$operation         Change Type
1                          Delete
2                          Insert
3                          Update (Before)
4                          Update (After)

The changes in the image above are all Updates.

Conclusion

This post represents one method to use SQL Server 2008 CTP5 (Nov 2007) Change Data Capture with SSIS 2008. CTP6 will be out soon and things will likely change. I will write an updated version at that time, and endeavor to keep my readers posted on other changes to this technology between now and SQL Server 2008 Release To Manufacturing (RTM).

:{> Andy

PS – The PASS Summit 2015 Call for Speakers closes at 9:00 PM PDT Sunday, 15 Mar 2015 (04:00 GMT 16 Mar 2015). There’s still time, but hurry!

Learn more: 
Linchpin People Blog: SSIS 
Stairway to Biml 
Stairway to Integration Services 

SSIS2014DesignPatterns200 

Database Testing on .Net Rocks!

I am honored to be the guest of a .Net Rocks! show on Database Testing! Richard Campbell, Carl Franklin, and I recorded the show last night and I had a lot of fun!

Richard and I commiserated about the “DBA condition” in the enterprise. Everyone shared “war stories” about projects. A good time was had by all.

I have to thank Chris Love – friend, tireless advocate of the Raleigh Developer Community, fellow Solid Quality Mentor, and (finally!) MVP – for setting this up. He was the gracious emcee of my recent presentation on Testing The Database to TriNUG (Triangle .Net User Group). Chris had heard a recent .Net Rocks! show where the discussion turned briefly to database testing, so he did the leg work to put Carl, Richard, and myself together for this show.

The show is scheduled for release 29 Jan 2008.

:{> Andy

Speaking in Raleigh and Philadelphia This Week

Now that our 5-month old son – Riley Cooper – is on the mend, I am hitting the speaking trail again!

09 Jan 2008: I’m presenting Testing The Database to the Triangle .Net User Group (TRINUG) Wednesday evening in Research Triangle Park, NC (location details).

12 Jan 2008: I’m presenting Incremental Loads With SSIS – by far my most popular SSIS Design Pattern post – at the Philadelphia Code Camp 2008.1.

If you read this blog and are attending, please introduce yourself!

 :{> Andy

VSTS 2008 VPC Images

Microsoft released a couple Visual Studio 2008 Team System VPC images over the holidays – one loaded with Team System 2008, Team Explorer, and TFS 2008; the other with TFS 2008 and Team Explorer only. The images expire 1 Apr 2008 and provide a cool way to tinker with the new stuff without the hassles of installation and configuration.

 :{> Andy

SSIS Design Pattern – ETL Instrumentation, Part 4

Introduction

This post is part of a series of posts on ETL Instrumentation.

In Part 1 we built a database to hold collected SSIS run time metrics and an SSIS package to deomnstrate how and why we would load metrics into the database.

In Part 2 we expanded on our database and the SSIS package to annotate version metadata, manage error metrics capture, and task status reporting.

In Part 3, we started using the ETL Instrumentation infrastructure we have built to measure some actual ETL. We started by counting rows.

In Part 4, we continue instrumenting by adding yet another ETL process and again scaling our measurement capabilities.

A Brief History Of Our ETL Instrumentation Project

To review, our metrics database is named SSISRunTimeMetrics. It contains a schema named ssis. In this schema are eleven objects:
 – a table named ssis.RunTimeMetrics.
 – a table named ssis.RunTimeErrors.
 – a table named ssis.TaskMetrics.
 – a table named ssis.RowCounts.
 – a table named ssis.RowCountTypes.
 – a stored procedure named ssis.usp_RecordPackageStart.
 – a stored procedure named ssis.usp_RecordPackageEnd.
 – a stored procedure named ssis.usp_RecordPackageError.
 – a stored procedure named ssis.usp_RecordTaskStart.
 – a stored procedure named ssis.usp_RecordTaskEnd.
 – a stored procedure named ssis.usp_RecordRowCounts.

Our source database is AdventureWorks and our destination database is SSISRunTimeMetrics_Target. SSISRunTimeMetrics_Target contains one object:
 – a table named dbo.Contact.

We expanded the types of run-time data we are collecting. Part 1 introduced Status collection, in Part 2 we added Exception collection. We also introduced scope into both types of collection, recording Exception information on error and finalizing Status (reporting that an error occurred).

At the beginning of SSIS package execution, we call ssis.usp_RecordPackageStart from an Execute SQL Task. We pass the package start date and time, the package name, and the package version. We also pass in a status of “Started”. From this stored procedure we get the ID of the newly created row, which we then push into a Package Load ID variable (iPackageLoadID).

At the beginning of a task or collection of tasks that define a process, we call ssis.usp_RecordTaskStart from an Execute SQL Task. We pass the task or process start date and time, the task (source) name, iPackageLoadID, and a status of “Started”. From this stored procedure we get the ID of the newly created row, which we then push into a Task Load ID variable (iTaskLoadID).

We have a Data Flow Task to move rows from the AdventureWorks.Person.Contact table to a target database and table we created: SSISRunTimeMetrics_Target.dbo.Contact. We optimized the package for set-based updates and collect row count metrics which are inserted into SSISRunTimeMetrics.ssis.usp_RecordRowCounts.

When this task completes, we call ssis.usp_RecordTaskEnd from an Execute SQL Task. We pass in the Task Load ID from the iTaskLoadID variable, the current date and time, and the status “Succeeded”.

On error, we capture Exception data and record an Error Status – both are crucial to knowing what happens when an exception is thrown.

When the package completes execution, we call ssis.usp_RecordPackageEnd from an Execute SQL Task. We pass in the Package Load ID from the variable, the current date and time, and the status “Succeeded”.

Let’s get started on the next step!

Version Control

First, update version information:

Remember to update Version properties:

 

Now we are ready to start developing.

Sell, Sell, Sell

Let’s extract and load some Sales data.

Open the SSISRunTimeMetrics package you built previously. Delete the Success Precedence Constraint between the “Step 1 – Load Contact” Sequence Container and the “Log End of Package Execution” Execute SQL Task.

Drag a Sequence Container onto the Control Flow canvas. Move the “Log End of Package Execution” Execute SQL Task down some and position the new Sequence Container between the “Step 1 – Load Contact” Sequence Container and the “Log End of Package Execution” Execute SQL Task.

 Connect the “Step 1 – Load Contact” Sequence Container to the new Sequence Container with a Success Precedence Constraint, and the new Sequence Container to the “Log End of Package Execution” Execute SQL Task with a Success Precedence Constraint.

Rename the new Sequence Container “Step 2 – Load Sales”. 

Good design is reuseable. Maybe not 100%, but most good designs are at least partially reuseable. Such is the case here – we have a good design in “Step 1 – Load Contact” – we will reuse lots of it in “Step 2 – Load Sales”. Let’s frame-out the flow, then fill in the details.

Drag two Execute SQL Tasks and a Data Flow Task into “Step 2 – Load Sales”.

Name the first Execute SQL Task “Load Sales” and double-click it to open the editor. Set the ResultSet property to “Single row” and the Connection property to “(local).SSISRunTimeMetrics”. Enter the following in the SQLStatement property:

declare @Now datetime
set @Now = GetDate()
exec ssis.usp_RecordTaskStart ?,NULL,@Now,?,’Started’

On the Parameter Mappings page, add two input parameters. Set Parameter 0 to Long data type and supply the User::iPackageLoadID variable. Set Parameter 1 to VarChar data type and supply the System::TaskName variable:

On the Result Set page, add one Result named 0 aimed at the User::iTaskLoadID variable: 

Click the OK button to close the editor.

Before proceeding, note that this Execute SQL Task is also the product of good design. In fact, the only difference task and it’s counterpart in “Step 1 – Load Contact” is the name of the task itself. Everything else is identical.

So why not copy and paste the task? Good question – we certainly could have! And we will copy and paste other tasks starting now.

Connect a Success Precedence Constraint from the “Load Sales” Execute SQL Task to the Data Flow Task. We need to do some cleanup here before proceeding. In the “Step 1 – Load Contact” Sequence Container there’s a Data Flow Task named “Data Flow Task”. We have one of those in our “Step 2 – Load Sales” Sequence Container as well. This is permissible because the objects are in different containers and have different scope.

It robs us of an important navigation facility – one we will likely need: the ability to use the Data Flow Task tab’s dropdown box. Have a look:

To remedy this, let’s rename the Data Flow Task in “Step 1 – Load Contact” “Load Contact Data”. Similary, let’s rename the “Step 2 – Load Sales” Data Flow Task “Load Sales Data”.

There. Much better.

Connect the “Load Sales Data” Data Flow Task to the second Execute SQL Task with a Success Precedence Constraint and rename it (the second Execute SQL Task) “Apply Staged Updates”. Double-click it to open the editor and set the Connection property to “(local).SSISRunTimeMetrics_Target”. We will return to this task later – click the OK button to close the editor.

Copy the “Log Successful End of Task” Execute SQL Task from the “Step 1 – Load Contact” Sequence Container and paste it onto the Data Flow canvas. Then drag it into the the “Step 2 – Load Sales” Sequence Container.

Note: You can paste it directly into the “Step 2 – Load Sales” Sequence Container if you want to, but I recommend you not do this in SSIS 2005. The Sequence Container will expand to accomodate anything inside it, and the paste functionality in SSIS 2005 completely ignores the mouse pointer position (and eveything else, so far as I can tell) when you paste from the clipboard. Combined, these behaviors cause sequence containers to grow unpredictably large when you paste directly into them.

No modifications are required for the “Log Successful End of Task” Execute SQL Task to function as desired in the new sequence container – how cool is that?

Copy the “Log Failed End Of Task” Execute SQL Task and paste it onto the Control Flow canvas. The new task will show up named “Log Failed End Of Task 1”. Again, a naming convention conflict. To resolve it, rename the original “Log Failed End Of Task” Execute SQL Task – connected to the “Step 1 – Load Contact” Sequence Container via a Failure Precedence Constraint – to “Log Failed End of Load Contact Task”.

Rename the newly pasted “Log Failed End Of Task 1” Execute SQL Task to “Log Failed End Of Load Sales Task” and connect the “Step 2 – Load Sales” Sequence Container to “Log Failed End Of Load Sales Task” via a Failure Precedence Constraint.

Copy the “Record Row Count” Execute SQL Task from the “Step 1 – Load Contact” Sequence Container. Again, paste it onto the Control Flow canvas and then drag it into the “Step 2 – Load Sales” Sequence Container. Connect the “Log Successful End Of Task” Execute SQL Task to the “Record Row Count” Execute SQL Task with a Success Precedence Constraint and double-click the task to open the editor.

All is well with the General page, but the Parameter Mapping page reveals some poor variable-naming choices in the last exercise. We can fix this in the variable dropdown. Click the dropdown that currently contains the User::iContactCount variable and select <New variable…>:

When the Add Variable dialog displays, click the Container dropdown and select the package (“SSISRunTimeMetrics”). This determines the scope of the variable and we want a package-scoped variable.

Click the OK button to select the package scope. Set the Name of the variable to iSalesInputCount, the Value Type (data type) to Int32, and the Value to 0:

Click the OK button to close the Add Variable dialog.

Repeat the procedure above for the “Counts” variables. Name the remaining three Counts variables iSalesNewRowsCount, iSalesChangedRowsCount, and iSalesUnchangedRowsCount; respectively. When complete, the Parameter Mapping page should appear as shown:

Click the OK button to close the Execute SQL Task editor.

The flow is now framed-out. We are ready to begin our Sales-specific coding.

Building The LZ (Landing Zone)

We need a place for our Sales data to land in SSISRunTimeMetrics_Target database.

In this section I am going to walk through the first phase of the process of converting a well-designed OLTP schema into a denormalized schema.

We’ll start with the AdventureWorks Sales schema. First, let’s list all the tables in the Sales schema using the following query:

use AdventureWorks;
go

select s.name + ‘.’ + t.name
from sys.tables t
inner join sys.schemas s on s.schema_id = t.schema_id
where s.name = ‘Sales’

This gives us a list of tables in the Sales schema:

Sales.StoreContact
Sales.ContactCreditCard
Sales.CountryRegionCurrency
Sales.CreditCard
Sales.Currency
Sales.SalesOrderDetail
Sales.CurrencyRate
Sales.Customer
Sales.SalesOrderHeader
Sales.CustomerAddress
Sales.SalesOrderHeaderSalesReason
Sales.SalesPerson
Sales.SalesPersonQuotaHistory
Sales.SalesReason
Sales.Individual
Sales.SalesTaxRate
Sales.SalesTerritory
Sales.SalesTerritoryHistory
Sales.ShoppingCartItem
Sales.SpecialOffer
Sales.SpecialOfferProduct
Sales.Store

Let’s select Sales.SalesOrderDetail as our base table… we have to start somewhere. Open SQL Server Management Studio and connect the Object Browser to your local (or development) instance of SQL Server 2005. Expand Databases, then AdventureWorks, then Tables:

 

Scroll down to Sales.SalesOrderDetail. Right-click the table object in Object Browser. Hover over “Script Table as”, then “CREATE To”, and click “New Query Editor Window”:

 

This creates a nice CREATE script (and more) for the Sales.SalesOrderDetail table. I only need the CREATE TABLE portion so I remove the rest. I modify the script further – making the table part of the dbo schema. I discard the constraints, NOT NULLs, brackets, and extended properties and I’m left with:

CREATE TABLE dbo.SalesOrderDetail(
SalesOrderID int NULL,
SalesOrderDetailID int NULL,
CarrierTrackingNumber nvarchar(25) NULL,
OrderQty smallint NULL,
ProductID int NULL,
SpecialOfferID int NULL,
UnitPrice money NULL,
UnitPriceDiscount money NULL,
LineTotal money,
rowguid uniqueidentifier,
ModifiedDate datetime NULL)

Repeating the process for the Sale.SalesOrderHeader table yields:

CREATE TABLE dbo.SalesOrderHeader(
SalesOrderID int NULL,
RevisionNumber tinyint NULL,
OrderDate datetime NULL,
DueDate datetime NULL,
ShipDate datetime NULL,
Status tinyint NULL,
OnlineOrderFlag bit NULL,
SalesOrderNumber nvarchar(25) NULL,
PurchaseOrderNumber nvarchar(25) NULL,
AccountNumber nvarchar(15) NULL,
CustomerID int NULL,
ContactID int NULL,
SalesPersonID int NULL,
TerritoryID int NULL
BillToAddressID int NULL,
ShipToAddressID int NULL,
ShipMethodID int NULL,
CreditCardID int NULL,
CreditCardApprovalCode varchar(15) NULL,
CurrencyRateID int NULL,
SubTotal money NULL,
TaxAmt money NULL,
Freight money NULL,
TotalDue money NULL,
Comment nvarchar(128) NULL,
rowguid uniqueidentifier NULL,
ModifiedDate datetime NULL)

I can now combine these statements, removing the duplication, to create a destination table statement:

CREATE TABLE dbo.SalesOrderHeaderDetail(
SalesOrderID int NULL,
SalesOrderDetailID int NULL,
CarrierTrackingNumber nvarchar(25) NULL,
OrderQty smallint NULL,
ProductID int NULL,
SpecialOfferID int NULL,
UnitPrice money NULL,
UnitPriceDiscount money NULL,
LineTotal money,
SalesOrderDtatilrowguid uniqueidentifier,
SalesOrderDetailModifiedDate datetime NULL,
RevisionNumber tinyint NULL,
OrderDate datetime NULL,
DueDate datetime NULL,
ShipDate datetime NULL,
Status tinyint NULL,
OnlineOrderFlag bit NULL,
SalesOrderNumber nvarchar(25) NULL,
PurchaseOrderNumber nvarchar(25) NULL,
AccountNumber nvarchar(15) NULL,
CustomerID int NULL,
ContactID int NULL,
SalesPersonID int NULL,
TerritoryID int NULL,
BillToAddressID int NULL,
ShipToAddressID int NULL,
ShipMethodID int NULL,
CreditCardID int NULL,
CreditCardApprovalCode varchar(15) NULL,
CurrencyRateID int NULL,
SubTotal money NULL,
TaxAmt money NULL,
Freight money NULL,
TotalDue money NULL,
Comment nvarchar(128) NULL,
SalesOrderHeaderrowguid uniqueidentifier NULL,
SalesOrderHeaderModifiedDate datetime NULL)

Execute this statement against the SSISRunTimeMetrics_Target database to create our destination table.

Filling In The Blanks

Double-click the “Load Sales Data” Data Flow Task to switch to the Data Flow tab for editing. Drag an OLE DB Source Adapter onto the canvas and double-click it to open the editor. Select the (local).AdventureWorks connection manager. Change the Data access mode to Sql Command and enter the following SQL statement into the SQL Command Text textbox: 

SELECT SalesOrderID
,SalesOrderDetailID
,CarrierTrackingNumber
,OrderQty
,ProductID
,SpecialOfferID
,UnitPrice
,UnitPriceDiscount
,LineTotal
,rowguid
,ModifiedDate
FROM Sales.SalesOrderDetail

Click the OK button to close the editor. Right-click the Source Adapter and rename it “Sales Detail Source”:

Drag a second OLE DB Source Adapter onto the Data Flow canvas and double-click it to open the editor. Select “(local).AdventureWorks” as the connection manager and SQL Command as the Data Access Mode. Enter the following statement into the SQL Command Text textbox:

SELECT SalesOrderID
,RevisionNumber
,OrderDate
,DueDate
,ShipDate
,Status
,OnlineOrderFlag
,SalesOrderNumber
,PurchaseOrderNumber
,AccountNumber
,CustomerID
,ContactID
,SalesPersonID
,TerritoryID
,BillToAddressID
,ShipToAddressID
,ShipMethodID
,CreditCardID
,CreditCardApprovalCode
,CurrencyRateID
,SubTotal
,TaxAmt
,Freight
,TotalDue
,Comment
,rowguid
,ModifiedDate
FROM Sales.SalesOrderHeader

 

Click the OK button to close the editor and rename the Source Adapter “Sales Header Source”.

Drag a Merge Join onto the Data Flow canvas and connect a Data Flow Path (green arrow) from each Source Adapter to the Merge Join:

Note the Merge Join has an error – the Left Input is not sorted. (Neither is the Right Input, but validation fails on, and reports, the first error). To address this condition, right-click each Source Adapter and select Show Advanced Editor:

Click on the Input and Output Properties tab and expand the OLE DB Source Output object, then expand the Output Columns logical folder. Click on the OLE DB Source Output object (which represents the Output buffer) and change the IsSorted property to True:

In the Output Columns list, click on the SalesOrderID column and change the SortKeyPosition property to 1:

Click the OK button to close the Advanced Editor. Double-click the source adapter to open the editor and append an Order By clause to the SQL Command: “ORDER BY SalesOrderID”. Close the editor and repeat this process for the other Source Adapter.

But wait – we still have an error:

 

Double-click the Merge Join to open the editor. Click every column from the Left input and every column except SalesOrderID from the Right Input. Two columns are named the same in both tables – rowguid and ModifiedDate. To differentiate, prepend each column’s Output Alias with the table name:

 

Click the OK button to close the editor. The error clears.

Ok

We did all that to set this up. 

Count ‘Em Up 

As with the Contacts data, we will count the rows entering the process. Depending on the nature of the process, source data, and desired measurement(s); you may choose to measure immediately after the Source Adapters or after the Merge Join – or both. We’ll start with the same kind of counts measurements we built in the Contacts data flow.

Drag a Row Count transformation onto the Data Flow canvas and connect the output of the Merge Join to its input. Double-click the Row Count to open the editor and assign the User::iSalesInputCount to the VariableName property:

 

As with the Contacts data, our next steps are to correlate and filter the data, so drag a Lookup and Conditional Split transformation onto the data flow canvas and connect them (in respective order) to the Row Count transformation:

Double-click the Lookup to open the editor and assign the following properties:
OLE DB Connection Manager: (local).SSISRunTimeMetrics_Target
Table or View: dbo.SalesOrdeHeaderDetail 

Click the Columns tab, right-click in the white-space, and click Select All Mappings. Right-click again and select Delete Selected Mappings. Connect the SalesOrderID and SalesOrderDetailID columns. Select every column in the Available Lookup Columns list by checking each checkbox, then prepend each Output Alias with “Dest_”:

Click the Configure Error Output button and change the Lookup Error from “Fail Component” to “Ignore Failure”. Remember, this converts the default INNER JOIN functionality of the Lookup transformation into a LEFT OUTER JOIN. Click the OK button to close the Error Output Configuration, then click the OK button again to close the Lookup editor.

We are loading the pipeline with lookup data in our data flow that matches data – by SalesOrderID and SalesOrderDetailID – in the destination.

Connect the output data flow path of the Lookup of the Lookup transformation to the Conditional Split transformation and double-click the Conditional Split transformation to open the editor. Create a new output named “New Sales” with the Condition: “IsNull(Dest_SalesOrderDetailID)”. If the LEFT OUTER JOIN functionality of the Lookup returns a NULL Dest_SalesOrderDetailID – and really every destination column will be NULL if there’s no matching destination row, we could use any of them – then this is a new Sales data row.

Add a second condition named “Changed Sales” with the following condition expression:

(ISNULL(Dest_RevisionNumber) ? -1 : Dest_RevisionNumber) != (ISNULL(RevisionNumber) ? -1 : RevisionNumber) || (ISNULL(Dest_OrderDate) ? (DT_DBDate)0 : Dest_OrderDate) != (ISNULL(OrderDate) ? (DT_DBDate)0 : OrderDate) || (ISNULL(Dest_DueDate) ? (DT_DBDate)0 : Dest_DueDate) != (ISNULL(DueDate) ? (DT_DBDate)0 : DueDate) || (ISNULL(Dest_ShipDate) ? (DT_DBDate)0 : Dest_ShipDate) != (ISNULL(ShipDate) ? (DT_DBDate)0 : ShipDate) || (ISNULL(Dest_Status) ? 0 : Dest_Status) != (ISNULL(Status) ? 0 : Status) || (ISNULL(Dest_OnlineOrderFlag) ?  TRUE  : Dest_OnlineOrderFlag) != (ISNULL(OnlineOrderFlag) ?  TRUE  : OnlineOrderFlag) || (ISNULL(Dest_SalesOrderNumber) ? “NULL” : Dest_SalesOrderNumber) != (ISNULL(SalesOrderNumber) ? “NULL” : SalesOrderNumber) || (ISNULL(Dest_PurchaseOrderNumber) ? “NULL” : Dest_PurchaseOrderNumber) != (ISNULL(PurchaseOrderNumber) ? “NULL” : PurchaseOrderNumber) || (ISNULL(Dest_AccountNumber) ? “NULL” : Dest_AccountNumber) != (ISNULL(AccountNumber) ? “NULL” : AccountNumber) || (ISNULL(Dest_CustomerID) ? -1 : Dest_CustomerID) != (ISNULL(CustomerID) ? -1 : CustomerID) || (ISNULL(Dest_ContactID) ? -1 : Dest_ContactID) != (ISNULL(ContactID) ? -1 : ContactID) || (ISNULL(Dest_SalesPersonID) ? -1 : Dest_SalesPersonID) != (ISNULL(SalesPersonID) ? -1 : SalesPersonID) || (ISNULL(Dest_TerritoryID) ? -1 : Dest_TerritoryID) != (ISNULL(TerritoryID) ? -1 : TerritoryID) || (ISNULL(Dest_BillToAddressID) ? -1 : Dest_BillToAddressID) != (ISNULL(BillToAddressID) ? -1 : BillToAddressID) || (ISNULL(Dest_ShipToAddressID) ? -1 : Dest_ShipToAddressID) != (ISNULL(ShipToAddressID) ? -1 : ShipToAddressID) || (ISNULL(Dest_ShipMethodID) ? -1 : Dest_ShipMethodID) != (ISNULL(ShipMethodID) ? -1 : ShipMethodID) || (ISNULL(Dest_CreditCardID) ? -1 : Dest_CreditCardID) != (ISNULL(CreditCardID) ? -1 : CreditCardID) || (ISNULL(Dest_CreditCardApprovalCode) ? “NULL” : Dest_CreditCardApprovalCode) != (ISNULL(CreditCardApprovalCode) ? “NULL” : CreditCardApprovalCode) || (ISNULL(Dest_CurrencyRateID) ? -1 : Dest_CurrencyRateID) != (ISNULL(CurrencyRateID) ? -1 : CurrencyRateID) || (ISNULL(Dest_SubTotal) ? 0 : Dest_SubTotal) != (ISNULL(SubTotal) ? 0 : SubTotal) || (ISNULL(Dest_TaxAmt) ? 0 : Dest_TaxAmt) != (ISNULL(TaxAmt) ? 0 : TaxAmt) || (ISNULL(Dest_Freight) ? 0 : Dest_Freight) != (ISNULL(Freight) ? 0 : Freight) || (ISNULL(Dest_TotalDue) ? 0 : Dest_TotalDue) != (ISNULL(TotalDue) ? 0 : TotalDue) || (ISNULL(Dest_Comment) ? “NULL” : Dest_Comment) != (ISNULL(Comment) ? “NULL” : Comment) || (ISNULL(Dest_SalesOrderHeaderModifiedDate) ? (DT_DBDATE)0 : Dest_SalesOrderHeaderModifiedDate) != (ISNULL(SalesOrderHeaderModifiedDate) ? (DT_DBDATE)0 : SalesOrderHeaderModifiedDate) || (ISNULL(Dest_CarrierTrackingNumber) ? “NULL” : Dest_CarrierTrackingNumber) != (ISNULL(CarrierTrackingNumber) ? “NULL” : CarrierTrackingNumber) || (ISNULL(Dest_OrderQty) ? 0 : Dest_OrderQty) != (ISNULL(OrderQty) ? 0 : OrderQty) || (ISNULL(Dest_ProductID) ? -1 : Dest_ProductID) != (ISNULL(ProductID) ? -1 : ProductID) || (ISNULL(Dest_SpecialOfferID) ? -1 : Dest_SpecialOfferID) != (ISNULL(SpecialOfferID) ? -1 : SpecialOfferID)

Rename the default output “Unchanged Rows”. Click the OK button to close the editor.

Drag an OLE DB Destination Adapter onto the Data Flow canvas and rename it “New Sales Destination”. Connect an output of the Conditional Split to the new Destination Adapter. When prompted, select the “New Sales” output of the Conditional Split:

 

Double-click the Destination Adapter ot open the editor. Set the Connection Manager property to (local).SSISRunTimeMetrics_Target. Set the Data Access Mode property to “Table or View” and select the dbo.SalesOrderHeaderDetail table. Click on the Mappings page to automap the pipeline fields to the table columns:

 

Click the OK button to close the editor.

Drag another OLE DB Destination Adapter onto the Data Flow canvas and rename it “stgSalesChangedRows”. Connect an output from the Conditional Split to the new Destination Adapter and select the “Changed Sales” output when prompted. Double-click the Destination Adapter to open the editor. Set the Connection Manager property to (local).SSISRunTimeMetrics_Target and set the Data Access Mode property to “Table or View”. Click the New button next to the “Name of the Table or View” dropdown:

Click the OK button to create the stgSalesChangedRows table. Click the Mappings page to automap the columns, then click the OK button to close the editor.

We now have Sales ETL. Cool

To complete our counts logic, add three Row Count transformations to the Data Flow canvas. Name them “New Rows Count”, “Changed Rows Count”, and “Unchanged Rows Count”. Position “New Rows Count” between the Conditional Split and the “New Sales Destination” Adapter. Double-click to open the editor and set the VariableName property to “User::iSalesNewRowsCount”. Click the OK button to close the editor.

Position the “Changed Rows Count” between the Conditional Split and the stgSalesChangedRows Destination Adapter. Open its editor and set the VariableName property to “User::iSalesChangedRowsCount”.

Open the editor for the “Unchanged Rows Count” transformation and set the VariableName property to “User::iSalesUnchangedRowsCount”.

 Before we leave this section, let’s complete the staged updates by returning to the Control Flow and updating the SQLStatement property of the “Apply Staged Updates” Execute SQL Task inside the “Step 2 – Load Sales” Sequence Container with the following statement:

UPDATE dest
SET dest.SalesOrderID = stage.SalesOrderID
,dest.CarrierTrackingNumber = stage.CarrierTrackingNumber
,dest.OrderQty = stage.OrderQty
,dest.ProductID = stage.ProductID
,dest.SpecialOfferID = stage.SpecialOfferID
,dest.UnitPrice = stage.UnitPrice
,dest.UnitPriceDiscount = stage.UnitPriceDiscount
,dest.LineTotal = stage.LineTotal
,dest.SalesOrderDtatilrowguid = stage.SalesOrderDtatilrowguid
,dest.SalesOrderDetailModifiedDate = stage.SalesOrderDetailModifiedDate
,dest.RevisionNumber = stage.RevisionNumber
,dest.OrderDate = stage.OrderDate
,dest.DueDate = stage.DueDate
,dest.ShipDate = stage.ShipDate
,dest.Status = stage.Status
,dest.OnlineOrderFlag = stage.OnlineOrderFlag
,dest.SalesOrderNumber = stage.SalesOrderNumber
,dest.PurchaseOrderNumber = stage.PurchaseOrderNumber
,dest.AccountNumber = stage.AccountNumber
,dest.CustomerID = stage.CustomerID
,dest.ContactID = stage.ContactID
,dest.SalesPersonID = stage.SalesPersonID
,dest.TerritoryID = stage.TerritoryID
,dest.BillToAddressID = stage.BillToAddressID
,dest.ShipToAddressID = stage.ShipToAddressID
,dest.ShipMethodID = stage.ShipMethodID
,dest.CreditCardID = stage.CreditCardID
,dest.CreditCardApprovalCode = stage.CreditCardApprovalCode
,dest.CurrencyRateID = stage.CurrencyRateID
,dest.SubTotal = stage.SubTotal
,dest.TaxAmt = stage.TaxAmt
,dest.Freight = stage.Freight
,dest.TotalDue = stage.TotalDue
,dest.Comment = stage.Comment
,dest.SalesOrderHeaderrowguid = stage.SalesOrderHeaderrowguid
,dest.SalesOrderHeaderModifiedDate = stage.SalesOrderHeaderModifiedDate
FROM dbo.SalesOrderHeaderDetail dest
INNER JOIN dbo.stgSalesChangedRows stage ON stage.SalesOrderDetailID = dest.SalesOrderDetailID

There. Done and done. 

If You Build It…

Up until now, we’ve basically followed the same template used for Contacts to construct the Sales ETL. We added some complexity (by design) to grow our understanding of ETL along with our knowledge of SSIS.

Our ETL measurement is record counts and record counts only. Let’s expand on that some by also capturing a monetary sum. This will add even more confidence in our ETL, once we validate (Validation is Part 5).

Let’s begin by creating a new destination table to hold our sums: SSISRunTimeMetrics.ssis.RowSums. Use the following script to create the table:

use SSISRunTimeMetrics
go

— vars…
declare @sql varchar(255)

— create ssis schema…
if not exists(select name
              from sys.schemas
              where name = ‘ssis’)
 begin
  set @sql = ‘Create Schema ssis’
  exec(@sql)
 end

— create RunTimeErrors table…
if exists(select s.name + ‘.’ + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = ‘RowSums’
           and s.name = ‘ssis’)
 drop table ssis.RowSums
go

Create Table ssis.RowSums
 (RowSumsID int identity(1,1)
 ,TaskMetricsID int null
 ,RunTimeMetricsId int not null
 ,ParentTaskMetricsID int null
 ,RowSum decimal(38,2) null
 ,RowSumColumnName varchar(255) null
 ,RowSumTypeID char(1) null)

This table is remarkably similar to the ssis.RowCounts table we created to hold Row Count data – and for good reason, the functions of these two tables are remarkably similar. As with the Row Counts data, we need to add a stored procedure to insert Sums data, and another table to hold Inserted Types… or do we? Instead of re-creating the functionality contained in the ssis.RowCountTypes table, let’s rename – and expand the purpose of – the table.

Executing the following script accomplishes this nicely:

use SSISRunTimeMetrics
go

— vars…
declare @sql varchar(255)

— create ssis schema…
if not exists(select name
              from sys.schemas
              where name = ‘ssis’)
 begin
  set @sql = ‘Create Schema ssis’
  exec(@sql)
 end

— delete RowCountTypes table, if exists…
if exists(select s.name + ‘.’ + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = ‘RowCountTypes’
           and s.name = ‘ssis’)
 drop table ssis.RowCountTypes
go

— delete RowTypes table, if exists…
if exists(select s.name + ‘.’ + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = ‘RowTypes’
           and s.name = ‘ssis’)
 drop table ssis.RowTypes
go

Create Table ssis.RowTypes
(RowTypeID char(1) not null
,RowTypeName varchar(25) null
,RowTypeDescription varchar(255) null)
go

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘I’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘I’
 ,’Selected Input Rows’
 ,’Input rows selected from a source’)

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘N’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘N’
 ,’New Rows’
 ,’New rows’)

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘C’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘C’
 ,’Changed Rows’
 ,’Changed rows’)

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘U’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘U’
 ,’Unchanged Rows’
 ,’No changes detected in rows’)
go

Our stored procedure to accomplish inserts:

use SSISRunTimeMetrics
go
if exists(select s.name + ‘.’ + p.name
from sys.procedures p
inner join sys.schemas s on s.schema_id = p.schema_id
where p.name = ‘usp_RecordRowSum’
and s.name = ‘ssis’)
begin
Drop Procedure ssis.usp_RecordRowSum
end
go
Create Procedure ssis.usp_RecordRowSum
@RunTimeMetricsID int
,@TaskMetricsID int
,@RowSum decimal(38,2)
,@RowSumTypeID char(1)
,@RowSumColumnName varchar(255) = null
,@ParentTaskMetricsID int = null
As
begin
— insert the run time errors data…
insert into ssis.RowSums
(TaskMetricsID
,RunTimeMetricsId
,ParentTaskMetricsID
,RowSum
,RowSumColumnName
,RowSumTypeID)
values
(@TaskMetricsID
,@RunTimeMetricsID
,@ParentTaskMetricsID
,@RowSum
,@RowSumColumnName
,@RowSumTypeID)
end

go

Now that our infrastructure is built we can start using it to load SSIS run time metrics.  

Add ‘Em Up

We need variables to hold the sums we intend to collect. Right-click the Control Flow and click Variables. Click the New Variable button and add a package-scoped variable, data type Double, named iSalesInputAmount. Repeat the process for three other Double variables named iSalesNewAmount, iSalesChangedAmount, and iSalesUnchangedAmount.

There are other ways to load this type of data. The way I choose to demonstrate here is not the cleanest but it clearly exercises the principles of ETL Instrumentation.

Return to the “Load Sales Data” Data Flow Task and add one each Multicast and Aggregate transformations. Position the Mulitcast between Merge Join and Input Row Count transformations and connect them through it. Rename the Aggregate transformation “Input Line Total” and connect another output of the Multicast to it:

 

Double-click the Aggregate transformation to open the editor and check the LineTotal input column. Select Sum from the Operation column – this will add the total of all the LineTotal columns that pass between the Merge Join and Input Row Count transformations: 

Add a Script Component to the Data Flow. When prompted for Script Component Type, select Destination:

Rename the Script Component “Push InputLineTotal into Input Variable”, connect the “Input Line Total” Aggregate transformation to it, and double-click the Script Component to open the editor.

On the Input Columns page, check the Line Total input. On the Script page, enter iSalesInputAmount in the ReadWriteVariables property and click the Design Script button. In the script editor, enter the following code:

Dim iAmount As Double

Public Overrides Sub Input0_ProcessInputRow(ByVal Row As Input0Buffer)
  ‘
  ‘ Add your code here
  ‘

  iAmount = Row.LineTotal

End Sub

Public Overrides Sub PostExecute()

  MyBase.PostExecute()
  Me.Variables.iSalesInputAmount = iAmount
End Sub

Close the Script Editor and click the OK button to clase the Script Component editor. This should load the aggregated value into the iSalesInputAmount variable.

Drag two each Multicast and three each Aggregate, and Script Component transformations onto the Data Flow canvas. Repeat the procedure outlined above for the New, Changed, and Unchanged Conditional Split outputs – for the iSalesNewAmount, iSalesChangedAmount, and iSalesUnchangedAmount variable values (respectively). Note you do not need a Multicase transformation for the Unchanged output. Sum the LineTotal fields for each output.

This is a lot of work and there is lots of room for error. Take your time. Double-check your work. Don’t take shortcuts. When complete, the New section will look something like this:

Load ‘Em Up

Return to the Control Flow – it’s time to captue these metrics!

In the “Step 2 – Load Sales” Sequence Container, rename the “Record Row Count” Execute SQL Task “Record Metrics”. Double-click it to open the editor. Click the ellipsis on the SQLStatement property and add the following script to the existing statement:

exec ssis.usp_RecordRowSum ?,?,?,’I’,’Sales.SalesOrderDetail.LineTotal’

exec ssis.usp_RecordRowSum ?,?,?,’N’,’Sales.SalesOrderDetail.LineTotal’

exec ssis.usp_RecordRowSum ?,?,?,’C’,’Sales.SalesOrderDetail.LineTotal’

exec ssis.usp_RecordRowSum ?,?,?,’U’,’Sales.SalesOrderDetail.LineTotal’

Click the Parameter Mapping page and add twelve parameters. With the existing twelve parameters, the new twelve are numbered 12 – 23:

The parameters follow the pattern iPackageLoadID (Input, Long, Incrementally Numbered), iTaskLoadID (Input, Long, Incrementally Numbered), iSales___Amount (Input, Double, Incrementally Numbered).

Click the Ok button to close the editor.

Testing, One, Two, Three…

Execute the package to test the Summing functionality. The following represents a better report query for our collected data:

use SSiSRunTimeMetrics
go

select
m.packageName
,m.packageStartDateTime
,DateDiff(ss, m.packageStartDateTime, m.packageEndDateTime) as ‘packageRunTime’
,m.packageStatus
,t.SourceName
,t.TaskStartDateTime
,DateDiff(ss, t.TaskStartDateTime, t.TaskEndDateTime) as ‘taskRunTime’
,t.TaskStatus
,s.RowSum as ‘Measurement’
,’Sum’ as ‘MeasurementType’
,st.RowTypeName
from ssis.TaskMetrics t
inner join ssis.RunTimeMetrics m on t.RunTimeMetricsID = m.id
inner join ssis.RowSums s on s.TaskMetricsID = t.TaskMetricsID
inner join ssis.RowTypes st on st.RowTypeID = s.RowSumTypeID
where m.id = (select Max(id)
from ssis.RunTimeMetrics)
and s.RowSum > 0

union

select
m.packageName
,m.packageStartDateTime
,DateDiff(ss, m.packageStartDateTime, m.packageEndDateTime) as ‘packageRunTime’
,m.packageStatus
,t.SourceName
,t.TaskStartDateTime
,DateDiff(ss, t.TaskStartDateTime, t.TaskEndDateTime) as ‘taskRunTime’
,t.TaskStatus
,c.[RowCount] as ‘Measurement’
,’Counts’ as ‘MeasurementType’
,ct.RowTypeName
from ssis.TaskMetrics t
inner join ssis.RunTimeMetrics m on t.RunTimeMetricsID = m.id
inner join ssis.RowCounts c on c.TaskMetricsID = t.TaskMetricsID
inner join ssis.RowTypes ct on ct.RowTypeID = c.RowCountTypeID
where m.id = (select Max(id)
from ssis.RunTimeMetrics)
and c.[RowCount] > 0

Conclusion

Again, these examples are intended to demonstrate the principles and characteristics of ETL Instrumentation. They are not complete and Production-ready. I make no claims that this is “the right way” or even a best practice to capture ETL Run Time Metrics data. I do maintain that such data is useful in many ways – especially for troubleshooting and certain performance predictive analytics.

Next: Validation – putting this data to work.

:{> Andy

PS – The PASS Summit 2015 Call for Speakers closes at 9:00 PM PDT Sunday, 15 Mar 2015 (04:00 GMT 16 Mar 2015). There’s still time, but hurry!

Learn more: 
Linchpin People Blog: SSIS 
Stairway to Biml 
Stairway to Integration Services 

SSIS2014DesignPatterns200 

SSIS Design Pattern – ETL Instrumentation, Part 4

Introduction

This post is part of a series of posts on ETL Instrumentation.

In Part 1 we built a database to hold collected SSIS run time metrics and an SSIS package to deomnstrate how and why we would load metrics into the database.

In Part 2 we expanded on our database and the SSIS package to annotate version metadata, manage error metrics capture, and task status reporting.

In Part 3, we started using the ETL Instrumentation infrastructure we have built to measure some actual ETL. We started by counting rows.

In Part 4, we continue instrumenting by adding yet another ETL process and again scaling our measurement capabilities.

A Brief History Of Our ETL Instrumentation Project

To review, our metrics database is named SSISRunTimeMetrics. It contains a schema named ssis. In this schema are eleven objects:
 – a table named ssis.RunTimeMetrics.
 – a table named ssis.RunTimeErrors.
 – a table named ssis.TaskMetrics.
 – a table named ssis.RowCounts.
 – a table named ssis.RowCountTypes.
 – a stored procedure named ssis.usp_RecordPackageStart.
 – a stored procedure named ssis.usp_RecordPackageEnd.
 – a stored procedure named ssis.usp_RecordPackageError.
 – a stored procedure named ssis.usp_RecordTaskStart.
 – a stored procedure named ssis.usp_RecordTaskEnd.
 – a stored procedure named ssis.usp_RecordRowCounts.

Our source database is AdventureWorks and our destination database is SSISRunTimeMetrics_Target. SSISRunTimeMetrics_Target contains one object:
 – a table named dbo.Contact.

We expanded the types of run-time data we are collecting. Part 1 introduced Status collection, in Part 2 we added Exception collection. We also introduced scope into both types of collection, recording Exception information on error and finalizing Status (reporting that an error occurred).

At the beginning of SSIS package execution, we call ssis.usp_RecordPackageStart from an Execute SQL Task. We pass the package start date and time, the package name, and the package version. We also pass in a status of “Started”. From this stored procedure we get the ID of the newly created row, which we then push into a Package Load ID variable (iPackageLoadID).

At the beginning of a task or collection of tasks that define a process, we call ssis.usp_RecordTaskStart from an Execute SQL Task. We pass the task or process start date and time, the task (source) name, iPackageLoadID, and a status of “Started”. From this stored procedure we get the ID of the newly created row, which we then push into a Task Load ID variable (iTaskLoadID).

We have a Data Flow Task to move rows from the AdventureWorks.Person.Contact table to a target database and table we created: SSISRunTimeMetrics_Target.dbo.Contact. We optimized the package for set-based updates and collect row count metrics which are inserted into SSISRunTimeMetrics.ssis.usp_RecordRowCounts.

When this task completes, we call ssis.usp_RecordTaskEnd from an Execute SQL Task. We pass in the Task Load ID from the iTaskLoadID variable, the current date and time, and the status “Succeeded”.

On error, we capture Exception data and record an Error Status – both are crucial to knowing what happens when an exception is thrown.

When the package completes execution, we call ssis.usp_RecordPackageEnd from an Execute SQL Task. We pass in the Package Load ID from the variable, the current date and time, and the status “Succeeded”.

Let’s get started on the next step!

Version Control

First, update version information:

Remember to update Version properties:

 

Now we are ready to start developing.

Sell, Sell, Sell

Let’s extract and load some Sales data.

Open the SSISRunTimeMetrics package you built previously. Delete the Success Precedence Constraint between the “Step 1 – Load Contact” Sequence Container and the “Log End of Package Execution” Execute SQL Task.

Drag a Sequence Container onto the Control Flow canvas. Move the “Log End of Package Execution” Execute SQL Task down some and position the new Sequence Container between the “Step 1 – Load Contact” Sequence Container and the “Log End of Package Execution” Execute SQL Task.

 Connect the “Step 1 – Load Contact” Sequence Container to the new Sequence Container with a Success Precedence Constraint, and the new Sequence Container to the “Log End of Package Execution” Execute SQL Task with a Success Precedence Constraint.

Rename the new Sequence Container “Step 2 – Load Sales”. 

Good design is reuseable. Maybe not 100%, but most good designs are at least partially reuseable. Such is the case here – we have a good design in “Step 1 – Load Contact” – we will reuse lots of it in “Step 2 – Load Sales”. Let’s frame-out the flow, then fill in the details.

Drag two Execute SQL Tasks and a Data Flow Task into “Step 2 – Load Sales”.

Name the first Execute SQL Task “Load Sales” and double-click it to open the editor. Set the ResultSet property to “Single row” and the Connection property to “(local).SSISRunTimeMetrics”. Enter the following in the SQLStatement property:

declare @Now datetime
set @Now = GetDate()
exec ssis.usp_RecordTaskStart ?,NULL,@Now,?,’Started’

On the Parameter Mappings page, add two input parameters. Set Parameter 0 to Long data type and supply the User::iPackageLoadID variable. Set Parameter 1 to VarChar data type and supply the System::TaskName variable:

On the Result Set page, add one Result named 0 aimed at the User::iTaskLoadID variable: 

Click the OK button to close the editor.

Before proceeding, note that this Execute SQL Task is also the product of good design. In fact, the only difference task and it’s counterpart in “Step 1 – Load Contact” is the name of the task itself. Everything else is identical.

So why not copy and paste the task? Good question – we certainly could have! And we will copy and paste other tasks starting now.

Connect a Success Precedence Constraint from the “Load Sales” Execute SQL Task to the Data Flow Task. We need to do some cleanup here before proceeding. In the “Step 1 – Load Contact” Sequence Container there’s a Data Flow Task named “Data Flow Task”. We have one of those in our “Step 2 – Load Sales” Sequence Container as well. This is permissible because the objects are in different containers and have different scope.

It robs us of an important navigation facility – one we will likely need: the ability to use the Data Flow Task tab’s dropdown box. Have a look:

To remedy this, let’s rename the Data Flow Task in “Step 1 – Load Contact” “Load Contact Data”. Similary, let’s rename the “Step 2 – Load Sales” Data Flow Task “Load Sales Data”.

There. Much better.

Connect the “Load Sales Data” Data Flow Task to the second Execute SQL Task with a Success Precedence Constraint and rename it (the second Execute SQL Task) “Apply Staged Updates”. Double-click it to open the editor and set the Connection property to “(local).SSISRunTimeMetrics_Target”. We will return to this task later – click the OK button to close the editor.

Copy the “Log Successful End of Task” Execute SQL Task from the “Step 1 – Load Contact” Sequence Container and paste it onto the Data Flow canvas. Then drag it into the the “Step 2 – Load Sales” Sequence Container.

Note: You can paste it directly into the “Step 2 – Load Sales” Sequence Container if you want to, but I recommend you not do this in SSIS 2005. The Sequence Container will expand to accomodate anything inside it, and the paste functionality in SSIS 2005 completely ignores the mouse pointer position (and eveything else, so far as I can tell) when you paste from the clipboard. Combined, these behaviors cause sequence containers to grow unpredictably large when you paste directly into them.

No modifications are required for the “Log Successful End of Task” Execute SQL Task to function as desired in the new sequence container – how cool is that?

Copy the “Log Failed End Of Task” Execute SQL Task and paste it onto the Control Flow canvas. The new task will show up named “Log Failed End Of Task 1”. Again, a naming convention conflict. To resolve it, rename the original “Log Failed End Of Task” Execute SQL Task – connected to the “Step 1 – Load Contact” Sequence Container via a Failure Precedence Constraint – to “Log Failed End of Load Contact Task”.

Rename the newly pasted “Log Failed End Of Task 1” Execute SQL Task to “Log Failed End Of Load Sales Task” and connect the “Step 2 – Load Sales” Sequence Container to “Log Failed End Of Load Sales Task” via a Failure Precedence Constraint.

Copy the “Record Row Count” Execute SQL Task from the “Step 1 – Load Contact” Sequence Container. Again, paste it onto the Control Flow canvas and then drag it into the “Step 2 – Load Sales” Sequence Container. Connect the “Log Successful End Of Task” Execute SQL Task to the “Record Row Count” Execute SQL Task with a Success Precedence Constraint and double-click the task to open the editor.

All is well with the General page, but the Parameter Mapping page reveals some poor variable-naming choices in the last exercise. We can fix this in the variable dropdown. Click the dropdown that currently contains the User::iContactCount variable and select <New variable…>:

When the Add Variable dialog displays, click the Container dropdown and select the package (“SSISRunTimeMetrics”). This determines the scope of the variable and we want a package-scoped variable.

Click the OK button to select the package scope. Set the Name of the variable to iSalesInputCount, the Value Type (data type) to Int32, and the Value to 0:

Click the OK button to close the Add Variable dialog.

Repeat the procedure above for the “Counts” variables. Name the remaining three Counts variables iSalesNewRowsCount, iSalesChangedRowsCount, and iSalesUnchangedRowsCount; respectively. When complete, the Parameter Mapping page should appear as shown:

Click the OK button to close the Execute SQL Task editor.

The flow is now framed-out. We are ready to begin our Sales-specific coding.

Building The LZ (Landing Zone)

We need a place for our Sales data to land in SSISRunTimeMetrics_Target database.

In this section I am going to walk through the first phase of the process of converting a well-designed OLTP schema into a denormalized schema.

We’ll start with the AdventureWorks Sales schema. First, let’s list all the tables in the Sales schema using the following query:

use AdventureWorks;
go

select s.name + ‘.’ + t.name
from sys.tables t
inner join sys.schemas s on s.schema_id = t.schema_id
where s.name = ‘Sales’

This gives us a list of tables in the Sales schema:

Sales.StoreContact
Sales.ContactCreditCard
Sales.CountryRegionCurrency
Sales.CreditCard
Sales.Currency
Sales.SalesOrderDetail
Sales.CurrencyRate
Sales.Customer
Sales.SalesOrderHeader
Sales.CustomerAddress
Sales.SalesOrderHeaderSalesReason
Sales.SalesPerson
Sales.SalesPersonQuotaHistory
Sales.SalesReason
Sales.Individual
Sales.SalesTaxRate
Sales.SalesTerritory
Sales.SalesTerritoryHistory
Sales.ShoppingCartItem
Sales.SpecialOffer
Sales.SpecialOfferProduct
Sales.Store

Let’s select Sales.SalesOrderDetail as our base table… we have to start somewhere. Open SQL Server Management Studio and connect the Object Browser to your local (or development) instance of SQL Server 2005. Expand Databases, then AdventureWorks, then Tables:

 

Scroll down to Sales.SalesOrderDetail. Right-click the table object in Object Browser. Hover over “Script Table as”, then “CREATE To”, and click “New Query Editor Window”:

 

This creates a nice CREATE script (and more) for the Sales.SalesOrderDetail table. I only need the CREATE TABLE portion so I remove the rest. I modify the script further – making the table part of the dbo schema. I discard the constraints, NOT NULLs, brackets, and extended properties and I’m left with:

CREATE TABLE dbo.SalesOrderDetail(
SalesOrderID int NULL,
SalesOrderDetailID int NULL,
CarrierTrackingNumber nvarchar(25) NULL,
OrderQty smallint NULL,
ProductID int NULL,
SpecialOfferID int NULL,
UnitPrice money NULL,
UnitPriceDiscount money NULL,
LineTotal money,
rowguid uniqueidentifier,
ModifiedDate datetime NULL)

Repeating the process for the Sale.SalesOrderHeader table yields:

CREATE TABLE dbo.SalesOrderHeader(
SalesOrderID int NULL,
RevisionNumber tinyint NULL,
OrderDate datetime NULL,
DueDate datetime NULL,
ShipDate datetime NULL,
Status tinyint NULL,
OnlineOrderFlag bit NULL,
SalesOrderNumber nvarchar(25) NULL,
PurchaseOrderNumber nvarchar(25) NULL,
AccountNumber nvarchar(15) NULL,
CustomerID int NULL,
ContactID int NULL,
SalesPersonID int NULL,
TerritoryID int NULL
BillToAddressID int NULL,
ShipToAddressID int NULL,
ShipMethodID int NULL,
CreditCardID int NULL,
CreditCardApprovalCode varchar(15) NULL,
CurrencyRateID int NULL,
SubTotal money NULL,
TaxAmt money NULL,
Freight money NULL,
TotalDue money NULL,
Comment nvarchar(128) NULL,
rowguid uniqueidentifier NULL,
ModifiedDate datetime NULL)

I can now combine these statements, removing the duplication, to create a destination table statement:

CREATE TABLE dbo.SalesOrderHeaderDetail(
SalesOrderID int NULL,
SalesOrderDetailID int NULL,
CarrierTrackingNumber nvarchar(25) NULL,
OrderQty smallint NULL,
ProductID int NULL,
SpecialOfferID int NULL,
UnitPrice money NULL,
UnitPriceDiscount money NULL,
LineTotal money,
SalesOrderDtatilrowguid uniqueidentifier,
SalesOrderDetailModifiedDate datetime NULL,
RevisionNumber tinyint NULL,
OrderDate datetime NULL,
DueDate datetime NULL,
ShipDate datetime NULL,
Status tinyint NULL,
OnlineOrderFlag bit NULL,
SalesOrderNumber nvarchar(25) NULL,
PurchaseOrderNumber nvarchar(25) NULL,
AccountNumber nvarchar(15) NULL,
CustomerID int NULL,
ContactID int NULL,
SalesPersonID int NULL,
TerritoryID int NULL,
BillToAddressID int NULL,
ShipToAddressID int NULL,
ShipMethodID int NULL,
CreditCardID int NULL,
CreditCardApprovalCode varchar(15) NULL,
CurrencyRateID int NULL,
SubTotal money NULL,
TaxAmt money NULL,
Freight money NULL,
TotalDue money NULL,
Comment nvarchar(128) NULL,
SalesOrderHeaderrowguid uniqueidentifier NULL,
SalesOrderHeaderModifiedDate datetime NULL)

Execute this statement against the SSISRunTimeMetrics_Target database to create our destination table.

Filling In The Blanks

Double-click the “Load Sales Data” Data Flow Task to switch to the Data Flow tab for editing. Drag an OLE DB Source Adapter onto the canvas and double-click it to open the editor. Select the (local).AdventureWorks connection manager. Change the Data access mode to Sql Command and enter the following SQL statement into the SQL Command Text textbox: 

SELECT SalesOrderID
,SalesOrderDetailID
,CarrierTrackingNumber
,OrderQty
,ProductID
,SpecialOfferID
,UnitPrice
,UnitPriceDiscount
,LineTotal
,rowguid
,ModifiedDate
FROM Sales.SalesOrderDetail

Click the OK button to close the editor. Right-click the Source Adapter and rename it “Sales Detail Source”:

Drag a second OLE DB Source Adapter onto the Data Flow canvas and double-click it to open the editor. Select “(local).AdventureWorks” as the connection manager and SQL Command as the Data Access Mode. Enter the following statement into the SQL Command Text textbox:

SELECT SalesOrderID
,RevisionNumber
,OrderDate
,DueDate
,ShipDate
,Status
,OnlineOrderFlag
,SalesOrderNumber
,PurchaseOrderNumber
,AccountNumber
,CustomerID
,ContactID
,SalesPersonID
,TerritoryID
,BillToAddressID
,ShipToAddressID
,ShipMethodID
,CreditCardID
,CreditCardApprovalCode
,CurrencyRateID
,SubTotal
,TaxAmt
,Freight
,TotalDue
,Comment
,rowguid
,ModifiedDate
FROM Sales.SalesOrderHeader

 

Click the OK button to close the editor and rename the Source Adapter “Sales Header Source”.

Drag a Merge Join onto the Data Flow canvas and connect a Data Flow Path (green arrow) from each Source Adapter to the Merge Join:

Note the Merge Join has an error – the Left Input is not sorted. (Neither is the Right Input, but validation fails on, and reports, the first error). To address this condition, right-click each Source Adapter and select Show Advanced Editor:

Click on the Input and Output Properties tab and expand the OLE DB Source Output object, then expand the Output Columns logical folder. Click on the OLE DB Source Output object (which represents the Output buffer) and change the IsSorted property to True:

In the Output Columns list, click on the SalesOrderID column and change the SortKeyPosition property to 1:

Click the OK button to close the Advanced Editor. Double-click the source adapter to open the editor and append an Order By clause to the SQL Command: “ORDER BY SalesOrderID”. Close the editor and repeat this process for the other Source Adapter.

But wait – we still have an error:

 

Double-click the Merge Join to open the editor. Click every column from the Left input and every column except SalesOrderID from the Right Input. Two columns are named the same in both tables – rowguid and ModifiedDate. To differentiate, prepend each column’s Output Alias with the table name:

 

Click the OK button to close the editor. The error clears.

Ok

We did all that to set this up. 

Count ‘Em Up 

As with the Contacts data, we will count the rows entering the process. Depending on the nature of the process, source data, and desired measurement(s); you may choose to measure immediately after the Source Adapters or after the Merge Join – or both. We’ll start with the same kind of counts measurements we built in the Contacts data flow.

Drag a Row Count transformation onto the Data Flow canvas and connect the output of the Merge Join to its input. Double-click the Row Count to open the editor and assign the User::iSalesInputCount to the VariableName property:

 

As with the Contacts data, our next steps are to correlate and filter the data, so drag a Lookup and Conditional Split transformation onto the data flow canvas and connect them (in respective order) to the Row Count transformation:

Double-click the Lookup to open the editor and assign the following properties:
OLE DB Connection Manager: (local).SSISRunTimeMetrics_Target
Table or View: dbo.SalesOrdeHeaderDetail 

Click the Columns tab, right-click in the white-space, and click Select All Mappings. Right-click again and select Delete Selected Mappings. Connect the SalesOrderID and SalesOrderDetailID columns. Select every column in the Available Lookup Columns list by checking each checkbox, then prepend each Output Alias with “Dest_”:

Click the Configure Error Output button and change the Lookup Error from “Fail Component” to “Ignore Failure”. Remember, this converts the default INNER JOIN functionality of the Lookup transformation into a LEFT OUTER JOIN. Click the OK button to close the Error Output Configuration, then click the OK button again to close the Lookup editor.

We are loading the pipeline with lookup data in our data flow that matches data – by SalesOrderID and SalesOrderDetailID – in the destination.

Connect the output data flow path of the Lookup of the Lookup transformation to the Conditional Split transformation and double-click the Conditional Split transformation to open the editor. Create a new output named “New Sales” with the Condition: “IsNull(Dest_SalesOrderDetailID)”. If the LEFT OUTER JOIN functionality of the Lookup returns a NULL Dest_SalesOrderDetailID – and really every destination column will be NULL if there’s no matching destination row, we could use any of them – then this is a new Sales data row.

Add a second condition named “Changed Sales” with the following condition expression:

(ISNULL(Dest_RevisionNumber) ? -1 : Dest_RevisionNumber) != (ISNULL(RevisionNumber) ? -1 : RevisionNumber) || (ISNULL(Dest_OrderDate) ? (DT_DBDate)0 : Dest_OrderDate) != (ISNULL(OrderDate) ? (DT_DBDate)0 : OrderDate) || (ISNULL(Dest_DueDate) ? (DT_DBDate)0 : Dest_DueDate) != (ISNULL(DueDate) ? (DT_DBDate)0 : DueDate) || (ISNULL(Dest_ShipDate) ? (DT_DBDate)0 : Dest_ShipDate) != (ISNULL(ShipDate) ? (DT_DBDate)0 : ShipDate) || (ISNULL(Dest_Status) ? 0 : Dest_Status) != (ISNULL(Status) ? 0 : Status) || (ISNULL(Dest_OnlineOrderFlag) ?  TRUE  : Dest_OnlineOrderFlag) != (ISNULL(OnlineOrderFlag) ?  TRUE  : OnlineOrderFlag) || (ISNULL(Dest_SalesOrderNumber) ? “NULL” : Dest_SalesOrderNumber) != (ISNULL(SalesOrderNumber) ? “NULL” : SalesOrderNumber) || (ISNULL(Dest_PurchaseOrderNumber) ? “NULL” : Dest_PurchaseOrderNumber) != (ISNULL(PurchaseOrderNumber) ? “NULL” : PurchaseOrderNumber) || (ISNULL(Dest_AccountNumber) ? “NULL” : Dest_AccountNumber) != (ISNULL(AccountNumber) ? “NULL” : AccountNumber) || (ISNULL(Dest_CustomerID) ? -1 : Dest_CustomerID) != (ISNULL(CustomerID) ? -1 : CustomerID) || (ISNULL(Dest_ContactID) ? -1 : Dest_ContactID) != (ISNULL(ContactID) ? -1 : ContactID) || (ISNULL(Dest_SalesPersonID) ? -1 : Dest_SalesPersonID) != (ISNULL(SalesPersonID) ? -1 : SalesPersonID) || (ISNULL(Dest_TerritoryID) ? -1 : Dest_TerritoryID) != (ISNULL(TerritoryID) ? -1 : TerritoryID) || (ISNULL(Dest_BillToAddressID) ? -1 : Dest_BillToAddressID) != (ISNULL(BillToAddressID) ? -1 : BillToAddressID) || (ISNULL(Dest_ShipToAddressID) ? -1 : Dest_ShipToAddressID) != (ISNULL(ShipToAddressID) ? -1 : ShipToAddressID) || (ISNULL(Dest_ShipMethodID) ? -1 : Dest_ShipMethodID) != (ISNULL(ShipMethodID) ? -1 : ShipMethodID) || (ISNULL(Dest_CreditCardID) ? -1 : Dest_CreditCardID) != (ISNULL(CreditCardID) ? -1 : CreditCardID) || (ISNULL(Dest_CreditCardApprovalCode) ? “NULL” : Dest_CreditCardApprovalCode) != (ISNULL(CreditCardApprovalCode) ? “NULL” : CreditCardApprovalCode) || (ISNULL(Dest_CurrencyRateID) ? -1 : Dest_CurrencyRateID) != (ISNULL(CurrencyRateID) ? -1 : CurrencyRateID) || (ISNULL(Dest_SubTotal) ? 0 : Dest_SubTotal) != (ISNULL(SubTotal) ? 0 : SubTotal) || (ISNULL(Dest_TaxAmt) ? 0 : Dest_TaxAmt) != (ISNULL(TaxAmt) ? 0 : TaxAmt) || (ISNULL(Dest_Freight) ? 0 : Dest_Freight) != (ISNULL(Freight) ? 0 : Freight) || (ISNULL(Dest_TotalDue) ? 0 : Dest_TotalDue) != (ISNULL(TotalDue) ? 0 : TotalDue) || (ISNULL(Dest_Comment) ? “NULL” : Dest_Comment) != (ISNULL(Comment) ? “NULL” : Comment) || (ISNULL(Dest_SalesOrderHeaderModifiedDate) ? (DT_DBDATE)0 : Dest_SalesOrderHeaderModifiedDate) != (ISNULL(SalesOrderHeaderModifiedDate) ? (DT_DBDATE)0 : SalesOrderHeaderModifiedDate) || (ISNULL(Dest_CarrierTrackingNumber) ? “NULL” : Dest_CarrierTrackingNumber) != (ISNULL(CarrierTrackingNumber) ? “NULL” : CarrierTrackingNumber) || (ISNULL(Dest_OrderQty) ? 0 : Dest_OrderQty) != (ISNULL(OrderQty) ? 0 : OrderQty) || (ISNULL(Dest_ProductID) ? -1 : Dest_ProductID) != (ISNULL(ProductID) ? -1 : ProductID) || (ISNULL(Dest_SpecialOfferID) ? -1 : Dest_SpecialOfferID) != (ISNULL(SpecialOfferID) ? -1 : SpecialOfferID)

Rename the default output “Unchanged Rows”. Click the OK button to close the editor.

Drag an OLE DB Destination Adapter onto the Data Flow canvas and rename it “New Sales Destination”. Connect an output of the Conditional Split to the new Destination Adapter. When prompted, select the “New Sales” output of the Conditional Split:

 

Double-click the Destination Adapter ot open the editor. Set the Connection Manager property to (local).SSISRunTimeMetrics_Target. Set the Data Access Mode property to “Table or View” and select the dbo.SalesOrderHeaderDetail table. Click on the Mappings page to automap the pipeline fields to the table columns:

 

Click the OK button to close the editor.

Drag another OLE DB Destination Adapter onto the Data Flow canvas and rename it “stgSalesChangedRows”. Connect an output from the Conditional Split to the new Destination Adapter and select the “Changed Sales” output when prompted. Double-click the Destination Adapter to open the editor. Set the Connection Manager property to (local).SSISRunTimeMetrics_Target and set the Data Access Mode property to “Table or View”. Click the New button next to the “Name of the Table or View” dropdown:

Click the OK button to create the stgSalesChangedRows table. Click the Mappings page to automap the columns, then click the OK button to close the editor.

We now have Sales ETL. Cool

To complete our counts logic, add three Row Count transformations to the Data Flow canvas. Name them “New Rows Count”, “Changed Rows Count”, and “Unchanged Rows Count”. Position “New Rows Count” between the Conditional Split and the “New Sales Destination” Adapter. Double-click to open the editor and set the VariableName property to “User::iSalesNewRowsCount”. Click the OK button to close the editor.

Position the “Changed Rows Count” between the Conditional Split and the stgSalesChangedRows Destination Adapter. Open its editor and set the VariableName property to “User::iSalesChangedRowsCount”.

Open the editor for the “Unchanged Rows Count” transformation and set the VariableName property to “User::iSalesUnchangedRowsCount”.

 Before we leave this section, let’s complete the staged updates by returning to the Control Flow and updating the SQLStatement property of the “Apply Staged Updates” Execute SQL Task inside the “Step 2 – Load Sales” Sequence Container with the following statement:

UPDATE dest
SET dest.SalesOrderID = stage.SalesOrderID
,dest.CarrierTrackingNumber = stage.CarrierTrackingNumber
,dest.OrderQty = stage.OrderQty
,dest.ProductID = stage.ProductID
,dest.SpecialOfferID = stage.SpecialOfferID
,dest.UnitPrice = stage.UnitPrice
,dest.UnitPriceDiscount = stage.UnitPriceDiscount
,dest.LineTotal = stage.LineTotal
,dest.SalesOrderDtatilrowguid = stage.SalesOrderDtatilrowguid
,dest.SalesOrderDetailModifiedDate = stage.SalesOrderDetailModifiedDate
,dest.RevisionNumber = stage.RevisionNumber
,dest.OrderDate = stage.OrderDate
,dest.DueDate = stage.DueDate
,dest.ShipDate = stage.ShipDate
,dest.Status = stage.Status
,dest.OnlineOrderFlag = stage.OnlineOrderFlag
,dest.SalesOrderNumber = stage.SalesOrderNumber
,dest.PurchaseOrderNumber = stage.PurchaseOrderNumber
,dest.AccountNumber = stage.AccountNumber
,dest.CustomerID = stage.CustomerID
,dest.ContactID = stage.ContactID
,dest.SalesPersonID = stage.SalesPersonID
,dest.TerritoryID = stage.TerritoryID
,dest.BillToAddressID = stage.BillToAddressID
,dest.ShipToAddressID = stage.ShipToAddressID
,dest.ShipMethodID = stage.ShipMethodID
,dest.CreditCardID = stage.CreditCardID
,dest.CreditCardApprovalCode = stage.CreditCardApprovalCode
,dest.CurrencyRateID = stage.CurrencyRateID
,dest.SubTotal = stage.SubTotal
,dest.TaxAmt = stage.TaxAmt
,dest.Freight = stage.Freight
,dest.TotalDue = stage.TotalDue
,dest.Comment = stage.Comment
,dest.SalesOrderHeaderrowguid = stage.SalesOrderHeaderrowguid
,dest.SalesOrderHeaderModifiedDate = stage.SalesOrderHeaderModifiedDate
FROM dbo.SalesOrderHeaderDetail dest
INNER JOIN dbo.stgSalesChangedRows stage ON stage.SalesOrderDetailID = dest.SalesOrderDetailID

There. Done and done. 

If You Build It…

Up until now, we’ve basically followed the same template used for Contacts to construct the Sales ETL. We added some complexity (by design) to grow our understanding of ETL along with our knowledge of SSIS.

Our ETL measurement is record counts and record counts only. Let’s expand on that some by also capturing a monetary sum. This will add even more confidence in our ETL, once we validate (Validation is Part 5).

Let’s begin by creating a new destination table to hold our sums: SSISRunTimeMetrics.ssis.RowSums. Use the following script to create the table:

use SSISRunTimeMetrics
go

— vars…
declare @sql varchar(255)

— create ssis schema…
if not exists(select name
              from sys.schemas
              where name = ‘ssis’)
 begin
  set @sql = ‘Create Schema ssis’
  exec(@sql)
 end

— create RunTimeErrors table…
if exists(select s.name + ‘.’ + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = ‘RowSums’
           and s.name = ‘ssis’)
 drop table ssis.RowSums
go

Create Table ssis.RowSums
 (RowSumsID int identity(1,1)
 ,TaskMetricsID int null
 ,RunTimeMetricsId int not null
 ,ParentTaskMetricsID int null
 ,RowSum decimal(38,2) null
 ,RowSumColumnName varchar(255) null
 ,RowSumTypeID char(1) null)

This table is remarkably similar to the ssis.RowCounts table we created to hold Row Count data – and for good reason, the functions of these two tables are remarkably similar. As with the Row Counts data, we need to add a stored procedure to insert Sums data, and another table to hold Inserted Types… or do we? Instead of re-creating the functionality contained in the ssis.RowCountTypes table, let’s rename – and expand the purpose of – the table.

Executing the following script accomplishes this nicely:

use SSISRunTimeMetrics
go

— vars…
declare @sql varchar(255)

— create ssis schema…
if not exists(select name
              from sys.schemas
              where name = ‘ssis’)
 begin
  set @sql = ‘Create Schema ssis’
  exec(@sql)
 end

— delete RowCountTypes table, if exists…
if exists(select s.name + ‘.’ + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = ‘RowCountTypes’
           and s.name = ‘ssis’)
 drop table ssis.RowCountTypes
go

— delete RowTypes table, if exists…
if exists(select s.name + ‘.’ + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = ‘RowTypes’
           and s.name = ‘ssis’)
 drop table ssis.RowTypes
go

Create Table ssis.RowTypes
(RowTypeID char(1) not null
,RowTypeName varchar(25) null
,RowTypeDescription varchar(255) null)
go

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘I’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘I’
 ,’Selected Input Rows’
 ,’Input rows selected from a source’)

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘N’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘N’
 ,’New Rows’
 ,’New rows’)

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘C’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘C’
 ,’Changed Rows’
 ,’Changed rows’)

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘U’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘U’
 ,’Unchanged Rows’
 ,’No changes detected in rows’)
go

Our stored procedure to accomplish inserts:

use SSISRunTimeMetrics
go
if exists(select s.name + ‘.’ + p.name
from sys.procedures p
inner join sys.schemas s on s.schema_id = p.schema_id
where p.name = ‘usp_RecordRowSum’
and s.name = ‘ssis’)
begin
Drop Procedure ssis.usp_RecordRowSum
end
go
Create Procedure ssis.usp_RecordRowSum
@RunTimeMetricsID int
,@TaskMetricsID int
,@RowSum decimal(38,2)
,@RowSumTypeID char(1)
,@RowSumColumnName varchar(255) = null
,@ParentTaskMetricsID int = null
As
begin
— insert the run time errors data…
insert into ssis.RowSums
(TaskMetricsID
,RunTimeMetricsId
,ParentTaskMetricsID
,RowSum
,RowSumColumnName
,RowSumTypeID)
values
(@TaskMetricsID
,@RunTimeMetricsID
,@ParentTaskMetricsID
,@RowSum
,@RowSumColumnName
,@RowSumTypeID)
end

go

Now that our infrastructure is built we can start using it to load SSIS run time metrics.  

Add ‘Em Up

We need variables to hold the sums we intend to collect. Right-click the Control Flow and click Variables. Click the New Variable button and add a package-scoped variable, data type Double, named iSalesInputAmount. Repeat the process for three other Double variables named iSalesNewAmount, iSalesChangedAmount, and iSalesUnchangedAmount.

There are other ways to load this type of data. The way I choose to demonstrate here is not the cleanest but it clearly exercises the principles of ETL Instrumentation.

Return to the “Load Sales Data” Data Flow Task and add one each Multicast and Aggregate transformations. Position the Mulitcast between Merge Join and Input Row Count transformations and connect them through it. Rename the Aggregate transformation “Input Line Total” and connect another output of the Multicast to it:

 

Double-click the Aggregate transformation to open the editor and check the LineTotal input column. Select Sum from the Operation column – this will add the total of all the LineTotal columns that pass between the Merge Join and Input Row Count transformations: 

Add a Script Component to the Data Flow. When prompted for Script Component Type, select Destination:

Rename the Script Component “Push InputLineTotal into Input Variable”, connect the “Input Line Total” Aggregate transformation to it, and double-click the Script Component to open the editor.

On the Input Columns page, check the Line Total input. On the Script page, enter iSalesInputAmount in the ReadWriteVariables property and click the Design Script button. In the script editor, enter the following code:

Dim iAmount As Double

Public Overrides Sub Input0_ProcessInputRow(ByVal Row As Input0Buffer)
  ‘
  ‘ Add your code here
  ‘

  iAmount = Row.LineTotal

End Sub

Public Overrides Sub PostExecute()

  MyBase.PostExecute()
  Me.Variables.iSalesInputAmount = iAmount
End Sub

Close the Script Editor and click the OK button to clase the Script Component editor. This should load the aggregated value into the iSalesInputAmount variable.

Drag two each Multicast and three each Aggregate, and Script Component transformations onto the Data Flow canvas. Repeat the procedure outlined above for the New, Changed, and Unchanged Conditional Split outputs – for the iSalesNewAmount, iSalesChangedAmount, and iSalesUnchangedAmount variable values (respectively). Note you do not need a Multicase transformation for the Unchanged output. Sum the LineTotal fields for each output.

This is a lot of work and there is lots of room for error. Take your time. Double-check your work. Don’t take shortcuts. When complete, the New section will look something like this:

Load ‘Em Up

Return to the Control Flow – it’s time to captue these metrics!

In the “Step 2 – Load Sales” Sequence Container, rename the “Record Row Count” Execute SQL Task “Record Metrics”. Double-click it to open the editor. Click the ellipsis on the SQLStatement property and add the following script to the existing statement:

exec ssis.usp_RecordRowSum ?,?,?,’I’,’Sales.SalesOrderDetail.LineTotal’

exec ssis.usp_RecordRowSum ?,?,?,’N’,’Sales.SalesOrderDetail.LineTotal’

exec ssis.usp_RecordRowSum ?,?,?,’C’,’Sales.SalesOrderDetail.LineTotal’

exec ssis.usp_RecordRowSum ?,?,?,’U’,’Sales.SalesOrderDetail.LineTotal’

Click the Parameter Mapping page and add twelve parameters. With the existing twelve parameters, the new twelve are numbered 12 – 23:

The parameters follow the pattern iPackageLoadID (Input, Long, Incrementally Numbered), iTaskLoadID (Input, Long, Incrementally Numbered), iSales___Amount (Input, Double, Incrementally Numbered).

Click the Ok button to close the editor.

Testing, One, Two, Three…

Execute the package to test the Summing functionality. The following represents a better report query for our collected data:

use SSiSRunTimeMetrics
go

select
m.packageName
,m.packageStartDateTime
,DateDiff(ss, m.packageStartDateTime, m.packageEndDateTime) as ‘packageRunTime’
,m.packageStatus
,t.SourceName
,t.TaskStartDateTime
,DateDiff(ss, t.TaskStartDateTime, t.TaskEndDateTime) as ‘taskRunTime’
,t.TaskStatus
,s.RowSum as ‘Measurement’
,’Sum’ as ‘MeasurementType’
,st.RowTypeName
from ssis.TaskMetrics t
inner join ssis.RunTimeMetrics m on t.RunTimeMetricsID = m.id
inner join ssis.RowSums s on s.TaskMetricsID = t.TaskMetricsID
inner join ssis.RowTypes st on st.RowTypeID = s.RowSumTypeID
where m.id = (select Max(id)
from ssis.RunTimeMetrics)
and s.RowSum > 0

union

select
m.packageName
,m.packageStartDateTime
,DateDiff(ss, m.packageStartDateTime, m.packageEndDateTime) as ‘packageRunTime’
,m.packageStatus
,t.SourceName
,t.TaskStartDateTime
,DateDiff(ss, t.TaskStartDateTime, t.TaskEndDateTime) as ‘taskRunTime’
,t.TaskStatus
,c.[RowCount] as ‘Measurement’
,’Counts’ as ‘MeasurementType’
,ct.RowTypeName
from ssis.TaskMetrics t
inner join ssis.RunTimeMetrics m on t.RunTimeMetricsID = m.id
inner join ssis.RowCounts c on c.TaskMetricsID = t.TaskMetricsID
inner join ssis.RowTypes ct on ct.RowTypeID = c.RowCountTypeID
where m.id = (select Max(id)
from ssis.RunTimeMetrics)
and c.[RowCount] > 0

Conclusion

Again, these examples are intended to demonstrate the principles and characteristics of ETL Instrumentation. They are not complete and Production-ready. I make no claims that this is “the right way” or even a best practice to capture ETL Run Time Metrics data. I do maintain that such data is useful in many ways – especially for troubleshooting and certain performance predictive analytics.

Next: Validation – putting this data to work.

:{> Andy

PS – The PASS Summit 2015 Call for Speakers closes at 9:00 PM PDT Sunday, 15 Mar 2015 (04:00 GMT 16 Mar 2015). There’s still time, but hurry!

Learn more: 
Linchpin People Blog: SSIS 
Stairway to Biml 
Stairway to Integration Services 

SSIS2014DesignPatterns200 

SSIS Design Pattern – ETL Instrumentation, Part 4

Introduction

This post is part of a series of posts on ETL Instrumentation.

In Part 1 we built a database to hold collected SSIS run time metrics and an SSIS package to deomnstrate how and why we would load metrics into the database.

In Part 2 we expanded on our database and the SSIS package to annotate version metadata, manage error metrics capture, and task status reporting.

In Part 3, we started using the ETL Instrumentation infrastructure we have built to measure some actual ETL. We started by counting rows.

In Part 4, we continue instrumenting by adding yet another ETL process and again scaling our measurement capabilities.

A Brief History Of Our ETL Instrumentation Project

To review, our metrics database is named SSISRunTimeMetrics. It contains a schema named ssis. In this schema are eleven objects:
 – a table named ssis.RunTimeMetrics.
 – a table named ssis.RunTimeErrors.
 – a table named ssis.TaskMetrics.
 – a table named ssis.RowCounts.
 – a table named ssis.RowCountTypes.
 – a stored procedure named ssis.usp_RecordPackageStart.
 – a stored procedure named ssis.usp_RecordPackageEnd.
 – a stored procedure named ssis.usp_RecordPackageError.
 – a stored procedure named ssis.usp_RecordTaskStart.
 – a stored procedure named ssis.usp_RecordTaskEnd.
 – a stored procedure named ssis.usp_RecordRowCounts.

Our source database is AdventureWorks and our destination database is SSISRunTimeMetrics_Target. SSISRunTimeMetrics_Target contains one object:
 – a table named dbo.Contact.

We expanded the types of run-time data we are collecting. Part 1 introduced Status collection, in Part 2 we added Exception collection. We also introduced scope into both types of collection, recording Exception information on error and finalizing Status (reporting that an error occurred).

At the beginning of SSIS package execution, we call ssis.usp_RecordPackageStart from an Execute SQL Task. We pass the package start date and time, the package name, and the package version. We also pass in a status of “Started”. From this stored procedure we get the ID of the newly created row, which we then push into a Package Load ID variable (iPackageLoadID).

At the beginning of a task or collection of tasks that define a process, we call ssis.usp_RecordTaskStart from an Execute SQL Task. We pass the task or process start date and time, the task (source) name, iPackageLoadID, and a status of “Started”. From this stored procedure we get the ID of the newly created row, which we then push into a Task Load ID variable (iTaskLoadID).

We have a Data Flow Task to move rows from the AdventureWorks.Person.Contact table to a target database and table we created: SSISRunTimeMetrics_Target.dbo.Contact. We optimized the package for set-based updates and collect row count metrics which are inserted into SSISRunTimeMetrics.ssis.usp_RecordRowCounts.

When this task completes, we call ssis.usp_RecordTaskEnd from an Execute SQL Task. We pass in the Task Load ID from the iTaskLoadID variable, the current date and time, and the status “Succeeded”.

On error, we capture Exception data and record an Error Status – both are crucial to knowing what happens when an exception is thrown.

When the package completes execution, we call ssis.usp_RecordPackageEnd from an Execute SQL Task. We pass in the Package Load ID from the variable, the current date and time, and the status “Succeeded”.

Let’s get started on the next step!

Version Control

First, update version information:

Remember to update Version properties:

 

Now we are ready to start developing.

Sell, Sell, Sell

Let’s extract and load some Sales data.

Open the SSISRunTimeMetrics package you built previously. Delete the Success Precedence Constraint between the “Step 1 – Load Contact” Sequence Container and the “Log End of Package Execution” Execute SQL Task.

Drag a Sequence Container onto the Control Flow canvas. Move the “Log End of Package Execution” Execute SQL Task down some and position the new Sequence Container between the “Step 1 – Load Contact” Sequence Container and the “Log End of Package Execution” Execute SQL Task.

 Connect the “Step 1 – Load Contact” Sequence Container to the new Sequence Container with a Success Precedence Constraint, and the new Sequence Container to the “Log End of Package Execution” Execute SQL Task with a Success Precedence Constraint.

Rename the new Sequence Container “Step 2 – Load Sales”. 

Good design is reuseable. Maybe not 100%, but most good designs are at least partially reuseable. Such is the case here – we have a good design in “Step 1 – Load Contact” – we will reuse lots of it in “Step 2 – Load Sales”. Let’s frame-out the flow, then fill in the details.

Drag two Execute SQL Tasks and a Data Flow Task into “Step 2 – Load Sales”.

Name the first Execute SQL Task “Load Sales” and double-click it to open the editor. Set the ResultSet property to “Single row” and the Connection property to “(local).SSISRunTimeMetrics”. Enter the following in the SQLStatement property:

declare @Now datetime
set @Now = GetDate()
exec ssis.usp_RecordTaskStart ?,NULL,@Now,?,’Started’

On the Parameter Mappings page, add two input parameters. Set Parameter 0 to Long data type and supply the User::iPackageLoadID variable. Set Parameter 1 to VarChar data type and supply the System::TaskName variable:

On the Result Set page, add one Result named 0 aimed at the User::iTaskLoadID variable: 

Click the OK button to close the editor.

Before proceeding, note that this Execute SQL Task is also the product of good design. In fact, the only difference task and it’s counterpart in “Step 1 – Load Contact” is the name of the task itself. Everything else is identical.

So why not copy and paste the task? Good question – we certainly could have! And we will copy and paste other tasks starting now.

Connect a Success Precedence Constraint from the “Load Sales” Execute SQL Task to the Data Flow Task. We need to do some cleanup here before proceeding. In the “Step 1 – Load Contact” Sequence Container there’s a Data Flow Task named “Data Flow Task”. We have one of those in our “Step 2 – Load Sales” Sequence Container as well. This is permissible because the objects are in different containers and have different scope.

It robs us of an important navigation facility – one we will likely need: the ability to use the Data Flow Task tab’s dropdown box. Have a look:

To remedy this, let’s rename the Data Flow Task in “Step 1 – Load Contact” “Load Contact Data”. Similary, let’s rename the “Step 2 – Load Sales” Data Flow Task “Load Sales Data”.

There. Much better.

Connect the “Load Sales Data” Data Flow Task to the second Execute SQL Task with a Success Precedence Constraint and rename it (the second Execute SQL Task) “Apply Staged Updates”. Double-click it to open the editor and set the Connection property to “(local).SSISRunTimeMetrics_Target”. We will return to this task later – click the OK button to close the editor.

Copy the “Log Successful End of Task” Execute SQL Task from the “Step 1 – Load Contact” Sequence Container and paste it onto the Data Flow canvas. Then drag it into the the “Step 2 – Load Sales” Sequence Container.

Note: You can paste it directly into the “Step 2 – Load Sales” Sequence Container if you want to, but I recommend you not do this in SSIS 2005. The Sequence Container will expand to accomodate anything inside it, and the paste functionality in SSIS 2005 completely ignores the mouse pointer position (and eveything else, so far as I can tell) when you paste from the clipboard. Combined, these behaviors cause sequence containers to grow unpredictably large when you paste directly into them.

No modifications are required for the “Log Successful End of Task” Execute SQL Task to function as desired in the new sequence container – how cool is that?

Copy the “Log Failed End Of Task” Execute SQL Task and paste it onto the Control Flow canvas. The new task will show up named “Log Failed End Of Task 1”. Again, a naming convention conflict. To resolve it, rename the original “Log Failed End Of Task” Execute SQL Task – connected to the “Step 1 – Load Contact” Sequence Container via a Failure Precedence Constraint – to “Log Failed End of Load Contact Task”.

Rename the newly pasted “Log Failed End Of Task 1” Execute SQL Task to “Log Failed End Of Load Sales Task” and connect the “Step 2 – Load Sales” Sequence Container to “Log Failed End Of Load Sales Task” via a Failure Precedence Constraint.

Copy the “Record Row Count” Execute SQL Task from the “Step 1 – Load Contact” Sequence Container. Again, paste it onto the Control Flow canvas and then drag it into the “Step 2 – Load Sales” Sequence Container. Connect the “Log Successful End Of Task” Execute SQL Task to the “Record Row Count” Execute SQL Task with a Success Precedence Constraint and double-click the task to open the editor.

All is well with the General page, but the Parameter Mapping page reveals some poor variable-naming choices in the last exercise. We can fix this in the variable dropdown. Click the dropdown that currently contains the User::iContactCount variable and select <New variable…>:

When the Add Variable dialog displays, click the Container dropdown and select the package (“SSISRunTimeMetrics”). This determines the scope of the variable and we want a package-scoped variable.

Click the OK button to select the package scope. Set the Name of the variable to iSalesInputCount, the Value Type (data type) to Int32, and the Value to 0:

Click the OK button to close the Add Variable dialog.

Repeat the procedure above for the “Counts” variables. Name the remaining three Counts variables iSalesNewRowsCount, iSalesChangedRowsCount, and iSalesUnchangedRowsCount; respectively. When complete, the Parameter Mapping page should appear as shown:

Click the OK button to close the Execute SQL Task editor.

The flow is now framed-out. We are ready to begin our Sales-specific coding.

Building The LZ (Landing Zone)

We need a place for our Sales data to land in SSISRunTimeMetrics_Target database.

In this section I am going to walk through the first phase of the process of converting a well-designed OLTP schema into a denormalized schema.

We’ll start with the AdventureWorks Sales schema. First, let’s list all the tables in the Sales schema using the following query:

use AdventureWorks;
go

select s.name + ‘.’ + t.name
from sys.tables t
inner join sys.schemas s on s.schema_id = t.schema_id
where s.name = ‘Sales’

This gives us a list of tables in the Sales schema:

Sales.StoreContact
Sales.ContactCreditCard
Sales.CountryRegionCurrency
Sales.CreditCard
Sales.Currency
Sales.SalesOrderDetail
Sales.CurrencyRate
Sales.Customer
Sales.SalesOrderHeader
Sales.CustomerAddress
Sales.SalesOrderHeaderSalesReason
Sales.SalesPerson
Sales.SalesPersonQuotaHistory
Sales.SalesReason
Sales.Individual
Sales.SalesTaxRate
Sales.SalesTerritory
Sales.SalesTerritoryHistory
Sales.ShoppingCartItem
Sales.SpecialOffer
Sales.SpecialOfferProduct
Sales.Store

Let’s select Sales.SalesOrderDetail as our base table… we have to start somewhere. Open SQL Server Management Studio and connect the Object Browser to your local (or development) instance of SQL Server 2005. Expand Databases, then AdventureWorks, then Tables:

 

Scroll down to Sales.SalesOrderDetail. Right-click the table object in Object Browser. Hover over “Script Table as”, then “CREATE To”, and click “New Query Editor Window”:

 

This creates a nice CREATE script (and more) for the Sales.SalesOrderDetail table. I only need the CREATE TABLE portion so I remove the rest. I modify the script further – making the table part of the dbo schema. I discard the constraints, NOT NULLs, brackets, and extended properties and I’m left with:

CREATE TABLE dbo.SalesOrderDetail(
SalesOrderID int NULL,
SalesOrderDetailID int NULL,
CarrierTrackingNumber nvarchar(25) NULL,
OrderQty smallint NULL,
ProductID int NULL,
SpecialOfferID int NULL,
UnitPrice money NULL,
UnitPriceDiscount money NULL,
LineTotal money,
rowguid uniqueidentifier,
ModifiedDate datetime NULL)

Repeating the process for the Sale.SalesOrderHeader table yields:

CREATE TABLE dbo.SalesOrderHeader(
SalesOrderID int NULL,
RevisionNumber tinyint NULL,
OrderDate datetime NULL,
DueDate datetime NULL,
ShipDate datetime NULL,
Status tinyint NULL,
OnlineOrderFlag bit NULL,
SalesOrderNumber nvarchar(25) NULL,
PurchaseOrderNumber nvarchar(25) NULL,
AccountNumber nvarchar(15) NULL,
CustomerID int NULL,
ContactID int NULL,
SalesPersonID int NULL,
TerritoryID int NULL
BillToAddressID int NULL,
ShipToAddressID int NULL,
ShipMethodID int NULL,
CreditCardID int NULL,
CreditCardApprovalCode varchar(15) NULL,
CurrencyRateID int NULL,
SubTotal money NULL,
TaxAmt money NULL,
Freight money NULL,
TotalDue money NULL,
Comment nvarchar(128) NULL,
rowguid uniqueidentifier NULL,
ModifiedDate datetime NULL)

I can now combine these statements, removing the duplication, to create a destination table statement:

CREATE TABLE dbo.SalesOrderHeaderDetail(
SalesOrderID int NULL,
SalesOrderDetailID int NULL,
CarrierTrackingNumber nvarchar(25) NULL,
OrderQty smallint NULL,
ProductID int NULL,
SpecialOfferID int NULL,
UnitPrice money NULL,
UnitPriceDiscount money NULL,
LineTotal money,
SalesOrderDtatilrowguid uniqueidentifier,
SalesOrderDetailModifiedDate datetime NULL,
RevisionNumber tinyint NULL,
OrderDate datetime NULL,
DueDate datetime NULL,
ShipDate datetime NULL,
Status tinyint NULL,
OnlineOrderFlag bit NULL,
SalesOrderNumber nvarchar(25) NULL,
PurchaseOrderNumber nvarchar(25) NULL,
AccountNumber nvarchar(15) NULL,
CustomerID int NULL,
ContactID int NULL,
SalesPersonID int NULL,
TerritoryID int NULL,
BillToAddressID int NULL,
ShipToAddressID int NULL,
ShipMethodID int NULL,
CreditCardID int NULL,
CreditCardApprovalCode varchar(15) NULL,
CurrencyRateID int NULL,
SubTotal money NULL,
TaxAmt money NULL,
Freight money NULL,
TotalDue money NULL,
Comment nvarchar(128) NULL,
SalesOrderHeaderrowguid uniqueidentifier NULL,
SalesOrderHeaderModifiedDate datetime NULL)

Execute this statement against the SSISRunTimeMetrics_Target database to create our destination table.

Filling In The Blanks

Double-click the “Load Sales Data” Data Flow Task to switch to the Data Flow tab for editing. Drag an OLE DB Source Adapter onto the canvas and double-click it to open the editor. Select the (local).AdventureWorks connection manager. Change the Data access mode to Sql Command and enter the following SQL statement into the SQL Command Text textbox: 

SELECT SalesOrderID
,SalesOrderDetailID
,CarrierTrackingNumber
,OrderQty
,ProductID
,SpecialOfferID
,UnitPrice
,UnitPriceDiscount
,LineTotal
,rowguid
,ModifiedDate
FROM Sales.SalesOrderDetail

Click the OK button to close the editor. Right-click the Source Adapter and rename it “Sales Detail Source”:

Drag a second OLE DB Source Adapter onto the Data Flow canvas and double-click it to open the editor. Select “(local).AdventureWorks” as the connection manager and SQL Command as the Data Access Mode. Enter the following statement into the SQL Command Text textbox:

SELECT SalesOrderID
,RevisionNumber
,OrderDate
,DueDate
,ShipDate
,Status
,OnlineOrderFlag
,SalesOrderNumber
,PurchaseOrderNumber
,AccountNumber
,CustomerID
,ContactID
,SalesPersonID
,TerritoryID
,BillToAddressID
,ShipToAddressID
,ShipMethodID
,CreditCardID
,CreditCardApprovalCode
,CurrencyRateID
,SubTotal
,TaxAmt
,Freight
,TotalDue
,Comment
,rowguid
,ModifiedDate
FROM Sales.SalesOrderHeader

 

Click the OK button to close the editor and rename the Source Adapter “Sales Header Source”.

Drag a Merge Join onto the Data Flow canvas and connect a Data Flow Path (green arrow) from each Source Adapter to the Merge Join:

Note the Merge Join has an error – the Left Input is not sorted. (Neither is the Right Input, but validation fails on, and reports, the first error). To address this condition, right-click each Source Adapter and select Show Advanced Editor:

Click on the Input and Output Properties tab and expand the OLE DB Source Output object, then expand the Output Columns logical folder. Click on the OLE DB Source Output object (which represents the Output buffer) and change the IsSorted property to True:

In the Output Columns list, click on the SalesOrderID column and change the SortKeyPosition property to 1:

Click the OK button to close the Advanced Editor. Double-click the source adapter to open the editor and append an Order By clause to the SQL Command: “ORDER BY SalesOrderID”. Close the editor and repeat this process for the other Source Adapter.

But wait – we still have an error:

 

Double-click the Merge Join to open the editor. Click every column from the Left input and every column except SalesOrderID from the Right Input. Two columns are named the same in both tables – rowguid and ModifiedDate. To differentiate, prepend each column’s Output Alias with the table name:

 

Click the OK button to close the editor. The error clears.

Ok

We did all that to set this up. 

Count ‘Em Up 

As with the Contacts data, we will count the rows entering the process. Depending on the nature of the process, source data, and desired measurement(s); you may choose to measure immediately after the Source Adapters or after the Merge Join – or both. We’ll start with the same kind of counts measurements we built in the Contacts data flow.

Drag a Row Count transformation onto the Data Flow canvas and connect the output of the Merge Join to its input. Double-click the Row Count to open the editor and assign the User::iSalesInputCount to the VariableName property:

 

As with the Contacts data, our next steps are to correlate and filter the data, so drag a Lookup and Conditional Split transformation onto the data flow canvas and connect them (in respective order) to the Row Count transformation:

Double-click the Lookup to open the editor and assign the following properties:
OLE DB Connection Manager: (local).SSISRunTimeMetrics_Target
Table or View: dbo.SalesOrdeHeaderDetail 

Click the Columns tab, right-click in the white-space, and click Select All Mappings. Right-click again and select Delete Selected Mappings. Connect the SalesOrderID and SalesOrderDetailID columns. Select every column in the Available Lookup Columns list by checking each checkbox, then prepend each Output Alias with “Dest_”:

Click the Configure Error Output button and change the Lookup Error from “Fail Component” to “Ignore Failure”. Remember, this converts the default INNER JOIN functionality of the Lookup transformation into a LEFT OUTER JOIN. Click the OK button to close the Error Output Configuration, then click the OK button again to close the Lookup editor.

We are loading the pipeline with lookup data in our data flow that matches data – by SalesOrderID and SalesOrderDetailID – in the destination.

Connect the output data flow path of the Lookup of the Lookup transformation to the Conditional Split transformation and double-click the Conditional Split transformation to open the editor. Create a new output named “New Sales” with the Condition: “IsNull(Dest_SalesOrderDetailID)”. If the LEFT OUTER JOIN functionality of the Lookup returns a NULL Dest_SalesOrderDetailID – and really every destination column will be NULL if there’s no matching destination row, we could use any of them – then this is a new Sales data row.

Add a second condition named “Changed Sales” with the following condition expression:

(ISNULL(Dest_RevisionNumber) ? -1 : Dest_RevisionNumber) != (ISNULL(RevisionNumber) ? -1 : RevisionNumber) || (ISNULL(Dest_OrderDate) ? (DT_DBDate)0 : Dest_OrderDate) != (ISNULL(OrderDate) ? (DT_DBDate)0 : OrderDate) || (ISNULL(Dest_DueDate) ? (DT_DBDate)0 : Dest_DueDate) != (ISNULL(DueDate) ? (DT_DBDate)0 : DueDate) || (ISNULL(Dest_ShipDate) ? (DT_DBDate)0 : Dest_ShipDate) != (ISNULL(ShipDate) ? (DT_DBDate)0 : ShipDate) || (ISNULL(Dest_Status) ? 0 : Dest_Status) != (ISNULL(Status) ? 0 : Status) || (ISNULL(Dest_OnlineOrderFlag) ?  TRUE  : Dest_OnlineOrderFlag) != (ISNULL(OnlineOrderFlag) ?  TRUE  : OnlineOrderFlag) || (ISNULL(Dest_SalesOrderNumber) ? “NULL” : Dest_SalesOrderNumber) != (ISNULL(SalesOrderNumber) ? “NULL” : SalesOrderNumber) || (ISNULL(Dest_PurchaseOrderNumber) ? “NULL” : Dest_PurchaseOrderNumber) != (ISNULL(PurchaseOrderNumber) ? “NULL” : PurchaseOrderNumber) || (ISNULL(Dest_AccountNumber) ? “NULL” : Dest_AccountNumber) != (ISNULL(AccountNumber) ? “NULL” : AccountNumber) || (ISNULL(Dest_CustomerID) ? -1 : Dest_CustomerID) != (ISNULL(CustomerID) ? -1 : CustomerID) || (ISNULL(Dest_ContactID) ? -1 : Dest_ContactID) != (ISNULL(ContactID) ? -1 : ContactID) || (ISNULL(Dest_SalesPersonID) ? -1 : Dest_SalesPersonID) != (ISNULL(SalesPersonID) ? -1 : SalesPersonID) || (ISNULL(Dest_TerritoryID) ? -1 : Dest_TerritoryID) != (ISNULL(TerritoryID) ? -1 : TerritoryID) || (ISNULL(Dest_BillToAddressID) ? -1 : Dest_BillToAddressID) != (ISNULL(BillToAddressID) ? -1 : BillToAddressID) || (ISNULL(Dest_ShipToAddressID) ? -1 : Dest_ShipToAddressID) != (ISNULL(ShipToAddressID) ? -1 : ShipToAddressID) || (ISNULL(Dest_ShipMethodID) ? -1 : Dest_ShipMethodID) != (ISNULL(ShipMethodID) ? -1 : ShipMethodID) || (ISNULL(Dest_CreditCardID) ? -1 : Dest_CreditCardID) != (ISNULL(CreditCardID) ? -1 : CreditCardID) || (ISNULL(Dest_CreditCardApprovalCode) ? “NULL” : Dest_CreditCardApprovalCode) != (ISNULL(CreditCardApprovalCode) ? “NULL” : CreditCardApprovalCode) || (ISNULL(Dest_CurrencyRateID) ? -1 : Dest_CurrencyRateID) != (ISNULL(CurrencyRateID) ? -1 : CurrencyRateID) || (ISNULL(Dest_SubTotal) ? 0 : Dest_SubTotal) != (ISNULL(SubTotal) ? 0 : SubTotal) || (ISNULL(Dest_TaxAmt) ? 0 : Dest_TaxAmt) != (ISNULL(TaxAmt) ? 0 : TaxAmt) || (ISNULL(Dest_Freight) ? 0 : Dest_Freight) != (ISNULL(Freight) ? 0 : Freight) || (ISNULL(Dest_TotalDue) ? 0 : Dest_TotalDue) != (ISNULL(TotalDue) ? 0 : TotalDue) || (ISNULL(Dest_Comment) ? “NULL” : Dest_Comment) != (ISNULL(Comment) ? “NULL” : Comment) || (ISNULL(Dest_SalesOrderHeaderModifiedDate) ? (DT_DBDATE)0 : Dest_SalesOrderHeaderModifiedDate) != (ISNULL(SalesOrderHeaderModifiedDate) ? (DT_DBDATE)0 : SalesOrderHeaderModifiedDate) || (ISNULL(Dest_CarrierTrackingNumber) ? “NULL” : Dest_CarrierTrackingNumber) != (ISNULL(CarrierTrackingNumber) ? “NULL” : CarrierTrackingNumber) || (ISNULL(Dest_OrderQty) ? 0 : Dest_OrderQty) != (ISNULL(OrderQty) ? 0 : OrderQty) || (ISNULL(Dest_ProductID) ? -1 : Dest_ProductID) != (ISNULL(ProductID) ? -1 : ProductID) || (ISNULL(Dest_SpecialOfferID) ? -1 : Dest_SpecialOfferID) != (ISNULL(SpecialOfferID) ? -1 : SpecialOfferID)

Rename the default output “Unchanged Rows”. Click the OK button to close the editor.

Drag an OLE DB Destination Adapter onto the Data Flow canvas and rename it “New Sales Destination”. Connect an output of the Conditional Split to the new Destination Adapter. When prompted, select the “New Sales” output of the Conditional Split:

 

Double-click the Destination Adapter ot open the editor. Set the Connection Manager property to (local).SSISRunTimeMetrics_Target. Set the Data Access Mode property to “Table or View” and select the dbo.SalesOrderHeaderDetail table. Click on the Mappings page to automap the pipeline fields to the table columns:

 

Click the OK button to close the editor.

Drag another OLE DB Destination Adapter onto the Data Flow canvas and rename it “stgSalesChangedRows”. Connect an output from the Conditional Split to the new Destination Adapter and select the “Changed Sales” output when prompted. Double-click the Destination Adapter to open the editor. Set the Connection Manager property to (local).SSISRunTimeMetrics_Target and set the Data Access Mode property to “Table or View”. Click the New button next to the “Name of the Table or View” dropdown:

Click the OK button to create the stgSalesChangedRows table. Click the Mappings page to automap the columns, then click the OK button to close the editor.

We now have Sales ETL. Cool

To complete our counts logic, add three Row Count transformations to the Data Flow canvas. Name them “New Rows Count”, “Changed Rows Count”, and “Unchanged Rows Count”. Position “New Rows Count” between the Conditional Split and the “New Sales Destination” Adapter. Double-click to open the editor and set the VariableName property to “User::iSalesNewRowsCount”. Click the OK button to close the editor.

Position the “Changed Rows Count” between the Conditional Split and the stgSalesChangedRows Destination Adapter. Open its editor and set the VariableName property to “User::iSalesChangedRowsCount”.

Open the editor for the “Unchanged Rows Count” transformation and set the VariableName property to “User::iSalesUnchangedRowsCount”.

 Before we leave this section, let’s complete the staged updates by returning to the Control Flow and updating the SQLStatement property of the “Apply Staged Updates” Execute SQL Task inside the “Step 2 – Load Sales” Sequence Container with the following statement:

UPDATE dest
SET dest.SalesOrderID = stage.SalesOrderID
,dest.CarrierTrackingNumber = stage.CarrierTrackingNumber
,dest.OrderQty = stage.OrderQty
,dest.ProductID = stage.ProductID
,dest.SpecialOfferID = stage.SpecialOfferID
,dest.UnitPrice = stage.UnitPrice
,dest.UnitPriceDiscount = stage.UnitPriceDiscount
,dest.LineTotal = stage.LineTotal
,dest.SalesOrderDtatilrowguid = stage.SalesOrderDtatilrowguid
,dest.SalesOrderDetailModifiedDate = stage.SalesOrderDetailModifiedDate
,dest.RevisionNumber = stage.RevisionNumber
,dest.OrderDate = stage.OrderDate
,dest.DueDate = stage.DueDate
,dest.ShipDate = stage.ShipDate
,dest.Status = stage.Status
,dest.OnlineOrderFlag = stage.OnlineOrderFlag
,dest.SalesOrderNumber = stage.SalesOrderNumber
,dest.PurchaseOrderNumber = stage.PurchaseOrderNumber
,dest.AccountNumber = stage.AccountNumber
,dest.CustomerID = stage.CustomerID
,dest.ContactID = stage.ContactID
,dest.SalesPersonID = stage.SalesPersonID
,dest.TerritoryID = stage.TerritoryID
,dest.BillToAddressID = stage.BillToAddressID
,dest.ShipToAddressID = stage.ShipToAddressID
,dest.ShipMethodID = stage.ShipMethodID
,dest.CreditCardID = stage.CreditCardID
,dest.CreditCardApprovalCode = stage.CreditCardApprovalCode
,dest.CurrencyRateID = stage.CurrencyRateID
,dest.SubTotal = stage.SubTotal
,dest.TaxAmt = stage.TaxAmt
,dest.Freight = stage.Freight
,dest.TotalDue = stage.TotalDue
,dest.Comment = stage.Comment
,dest.SalesOrderHeaderrowguid = stage.SalesOrderHeaderrowguid
,dest.SalesOrderHeaderModifiedDate = stage.SalesOrderHeaderModifiedDate
FROM dbo.SalesOrderHeaderDetail dest
INNER JOIN dbo.stgSalesChangedRows stage ON stage.SalesOrderDetailID = dest.SalesOrderDetailID

There. Done and done. 

If You Build It…

Up until now, we’ve basically followed the same template used for Contacts to construct the Sales ETL. We added some complexity (by design) to grow our understanding of ETL along with our knowledge of SSIS.

Our ETL measurement is record counts and record counts only. Let’s expand on that some by also capturing a monetary sum. This will add even more confidence in our ETL, once we validate (Validation is Part 5).

Let’s begin by creating a new destination table to hold our sums: SSISRunTimeMetrics.ssis.RowSums. Use the following script to create the table:

use SSISRunTimeMetrics
go

— vars…
declare @sql varchar(255)

— create ssis schema…
if not exists(select name
              from sys.schemas
              where name = ‘ssis’)
 begin
  set @sql = ‘Create Schema ssis’
  exec(@sql)
 end

— create RunTimeErrors table…
if exists(select s.name + ‘.’ + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = ‘RowSums’
           and s.name = ‘ssis’)
 drop table ssis.RowSums
go

Create Table ssis.RowSums
 (RowSumsID int identity(1,1)
 ,TaskMetricsID int null
 ,RunTimeMetricsId int not null
 ,ParentTaskMetricsID int null
 ,RowSum decimal(38,2) null
 ,RowSumColumnName varchar(255) null
 ,RowSumTypeID char(1) null)

This table is remarkably similar to the ssis.RowCounts table we created to hold Row Count data – and for good reason, the functions of these two tables are remarkably similar. As with the Row Counts data, we need to add a stored procedure to insert Sums data, and another table to hold Inserted Types… or do we? Instead of re-creating the functionality contained in the ssis.RowCountTypes table, let’s rename – and expand the purpose of – the table.

Executing the following script accomplishes this nicely:

use SSISRunTimeMetrics
go

— vars…
declare @sql varchar(255)

— create ssis schema…
if not exists(select name
              from sys.schemas
              where name = ‘ssis’)
 begin
  set @sql = ‘Create Schema ssis’
  exec(@sql)
 end

— delete RowCountTypes table, if exists…
if exists(select s.name + ‘.’ + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = ‘RowCountTypes’
           and s.name = ‘ssis’)
 drop table ssis.RowCountTypes
go

— delete RowTypes table, if exists…
if exists(select s.name + ‘.’ + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = ‘RowTypes’
           and s.name = ‘ssis’)
 drop table ssis.RowTypes
go

Create Table ssis.RowTypes
(RowTypeID char(1) not null
,RowTypeName varchar(25) null
,RowTypeDescription varchar(255) null)
go

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘I’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘I’
 ,’Selected Input Rows’
 ,’Input rows selected from a source’)

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘N’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘N’
 ,’New Rows’
 ,’New rows’)

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘C’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘C’
 ,’Changed Rows’
 ,’Changed rows’)

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = ‘U’)
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 (‘U’
 ,’Unchanged Rows’
 ,’No changes detected in rows’)
go

Our stored procedure to accomplish inserts:

use SSISRunTimeMetrics
go
if exists(select s.name + ‘.’ + p.name
from sys.procedures p
inner join sys.schemas s on s.schema_id = p.schema_id
where p.name = ‘usp_RecordRowSum’
and s.name = ‘ssis’)
begin
Drop Procedure ssis.usp_RecordRowSum
end
go
Create Procedure ssis.usp_RecordRowSum
@RunTimeMetricsID int
,@TaskMetricsID int
,@RowSum decimal(38,2)
,@RowSumTypeID char(1)
,@RowSumColumnName varchar(255) = null
,@ParentTaskMetricsID int = null
As
begin
— insert the run time errors data…
insert into ssis.RowSums
(TaskMetricsID
,RunTimeMetricsId
,ParentTaskMetricsID
,RowSum
,RowSumColumnName
,RowSumTypeID)
values
(@TaskMetricsID
,@RunTimeMetricsID
,@ParentTaskMetricsID
,@RowSum
,@RowSumColumnName
,@RowSumTypeID)
end

go

Now that our infrastructure is built we can start using it to load SSIS run time metrics.  

Add ‘Em Up

We need variables to hold the sums we intend to collect. Right-click the Control Flow and click Variables. Click the New Variable button and add a package-scoped variable, data type Double, named iSalesInputAmount. Repeat the process for three other Double variables named iSalesNewAmount, iSalesChangedAmount, and iSalesUnchangedAmount.

There are other ways to load this type of data. The way I choose to demonstrate here is not the cleanest but it clearly exercises the principles of ETL Instrumentation.

Return to the “Load Sales Data” Data Flow Task and add one each Multicast and Aggregate transformations. Position the Mulitcast between Merge Join and Input Row Count transformations and connect them through it. Rename the Aggregate transformation “Input Line Total” and connect another output of the Multicast to it:

 

Double-click the Aggregate transformation to open the editor and check the LineTotal input column. Select Sum from the Operation column – this will add the total of all the LineTotal columns that pass between the Merge Join and Input Row Count transformations: 

Add a Script Component to the Data Flow. When prompted for Script Component Type, select Destination:

Rename the Script Component “Push InputLineTotal into Input Variable”, connect the “Input Line Total” Aggregate transformation to it, and double-click the Script Component to open the editor.

On the Input Columns page, check the Line Total input. On the Script page, enter iSalesInputAmount in the ReadWriteVariables property and click the Design Script button. In the script editor, enter the following code:

Dim iAmount As Double

Public Overrides Sub Input0_ProcessInputRow(ByVal Row As Input0Buffer)
  ‘
  ‘ Add your code here
  ‘

  iAmount = Row.LineTotal

End Sub

Public Overrides Sub PostExecute()

  MyBase.PostExecute()
  Me.Variables.iSalesInputAmount = iAmount
End Sub

Close the Script Editor and click the OK button to clase the Script Component editor. This should load the aggregated value into the iSalesInputAmount variable.

Drag two each Multicast and three each Aggregate, and Script Component transformations onto the Data Flow canvas. Repeat the procedure outlined above for the New, Changed, and Unchanged Conditional Split outputs – for the iSalesNewAmount, iSalesChangedAmount, and iSalesUnchangedAmount variable values (respectively). Note you do not need a Multicase transformation for the Unchanged output. Sum the LineTotal fields for each output.

This is a lot of work and there is lots of room for error. Take your time. Double-check your work. Don’t take shortcuts. When complete, the New section will look something like this:

Load ‘Em Up

Return to the Control Flow – it’s time to captue these metrics!

In the “Step 2 – Load Sales” Sequence Container, rename the “Record Row Count” Execute SQL Task “Record Metrics”. Double-click it to open the editor. Click the ellipsis on the SQLStatement property and add the following script to the existing statement:

exec ssis.usp_RecordRowSum ?,?,?,’I’,’Sales.SalesOrderDetail.LineTotal’

exec ssis.usp_RecordRowSum ?,?,?,’N’,’Sales.SalesOrderDetail.LineTotal’

exec ssis.usp_RecordRowSum ?,?,?,’C’,’Sales.SalesOrderDetail.LineTotal’

exec ssis.usp_RecordRowSum ?,?,?,’U’,’Sales.SalesOrderDetail.LineTotal’

Click the Parameter Mapping page and add twelve parameters. With the existing twelve parameters, the new twelve are numbered 12 – 23:

The parameters follow the pattern iPackageLoadID (Input, Long, Incrementally Numbered), iTaskLoadID (Input, Long, Incrementally Numbered), iSales___Amount (Input, Double, Incrementally Numbered).

Click the Ok button to close the editor.

Testing, One, Two, Three…

Execute the package to test the Summing functionality. The following represents a better report query for our collected data:

use SSiSRunTimeMetrics
go

select
m.packageName
,m.packageStartDateTime
,DateDiff(ss, m.packageStartDateTime, m.packageEndDateTime) as ‘packageRunTime’
,m.packageStatus
,t.SourceName
,t.TaskStartDateTime
,DateDiff(ss, t.TaskStartDateTime, t.TaskEndDateTime) as ‘taskRunTime’
,t.TaskStatus
,s.RowSum as ‘Measurement’
,’Sum’ as ‘MeasurementType’
,st.RowTypeName
from ssis.TaskMetrics t
inner join ssis.RunTimeMetrics m on t.RunTimeMetricsID = m.id
inner join ssis.RowSums s on s.TaskMetricsID = t.TaskMetricsID
inner join ssis.RowTypes st on st.RowTypeID = s.RowSumTypeID
where m.id = (select Max(id)
from ssis.RunTimeMetrics)
and s.RowSum > 0

union

select
m.packageName
,m.packageStartDateTime
,DateDiff(ss, m.packageStartDateTime, m.packageEndDateTime) as ‘packageRunTime’
,m.packageStatus
,t.SourceName
,t.TaskStartDateTime
,DateDiff(ss, t.TaskStartDateTime, t.TaskEndDateTime) as ‘taskRunTime’
,t.TaskStatus
,c.[RowCount] as ‘Measurement’
,’Counts’ as ‘MeasurementType’
,ct.RowTypeName
from ssis.TaskMetrics t
inner join ssis.RunTimeMetrics m on t.RunTimeMetricsID = m.id
inner join ssis.RowCounts c on c.TaskMetricsID = t.TaskMetricsID
inner join ssis.RowTypes ct on ct.RowTypeID = c.RowCountTypeID
where m.id = (select Max(id)
from ssis.RunTimeMetrics)
and c.[RowCount] > 0

Conclusion

Again, these examples are intended to demonstrate the principles and characteristics of ETL Instrumentation. They are not complete and Production-ready. I make no claims that this is “the right way” or even a best practice to capture ETL Run Time Metrics data. I do maintain that such data is useful in many ways – especially for troubleshooting and certain performance predictive analytics.

Next: Validation – putting this data to work.

:{> Andy

PS – The PASS Summit 2015 Call for Speakers closes at 9:00 PM PDT Sunday, 15 Mar 2015 (04:00 GMT 16 Mar 2015). There’s still time, but hurry!

Learn more: 
Linchpin People Blog: SSIS 
Stairway to Biml 
Stairway to Integration Services 

SSIS2014DesignPatterns200