THE SQL Server Blog Spot on the Web

Welcome to SQLblog.com - The SQL Server blog spot on the web Sign in | |
in Search

Andy Leonard

Andy Leonard is CSO of Linchpin People and SQLPeople, an SSIS Trainer, Consultant, and developer; a Business Intelligence Markup Language (Biml) developer; SQL Server database and data warehouse developer, community mentor, engineer, and farmer. He is a co-author of SQL Server 2012 Integration Services Design Patterns. His background includes web application architecture and development, VB, and ASP. Andy loves the SQL Server Community!
Note: Comments are moderated. Spam shall not pass! </GandalfVoice>

SSIS Design Pattern - ETL Instrumentation, Part 4

Introduction

This post is part of a series of posts on ETL Instrumentation.

In Part 1 we built a database to hold collected SSIS run time metrics and an SSIS package to deomnstrate how and why we would load metrics into the database.

In Part 2 we expanded on our database and the SSIS package to annotate version metadata, manage error metrics capture, and task status reporting.

In Part 3, we started using the ETL Instrumentation infrastructure we have built to measure some actual ETL. We started by counting rows.

In Part 4, we continue instrumenting by adding yet another ETL process and again scaling our measurement capabilities.

A Brief History Of Our ETL Instrumentation Project

To review, our metrics database is named SSISRunTimeMetrics. It contains a schema named ssis. In this schema are eleven objects:
 - a table named ssis.RunTimeMetrics.
 - a table named ssis.RunTimeErrors.
 - a table named ssis.TaskMetrics.
 - a table named ssis.RowCounts.
 - a table named ssis.RowCountTypes.
 - a stored procedure named ssis.usp_RecordPackageStart.
 - a stored procedure named ssis.usp_RecordPackageEnd.
 - a stored procedure named ssis.usp_RecordPackageError.
 - a stored procedure named ssis.usp_RecordTaskStart.
 - a stored procedure named ssis.usp_RecordTaskEnd.
 - a stored procedure named ssis.usp_RecordRowCounts.

Our source database is AdventureWorks and our destination database is SSISRunTimeMetrics_Target. SSISRunTimeMetrics_Target contains one object:
 - a table named dbo.Contact.

We expanded the types of run-time data we are collecting. Part 1 introduced Status collection, in Part 2 we added Exception collection. We also introduced scope into both types of collection, recording Exception information on error and finalizing Status (reporting that an error occurred).

At the beginning of SSIS package execution, we call ssis.usp_RecordPackageStart from an Execute SQL Task. We pass the package start date and time, the package name, and the package version. We also pass in a status of "Started". From this stored procedure we get the ID of the newly created row, which we then push into a Package Load ID variable (iPackageLoadID).

At the beginning of a task or collection of tasks that define a process, we call ssis.usp_RecordTaskStart from an Execute SQL Task. We pass the task or process start date and time, the task (source) name, iPackageLoadID, and a status of "Started". From this stored procedure we get the ID of the newly created row, which we then push into a Task Load ID variable (iTaskLoadID).

We have a Data Flow Task to move rows from the AdventureWorks.Person.Contact table to a target database and table we created: SSISRunTimeMetrics_Target.dbo.Contact. We optimized the package for set-based updates and collect row count metrics which are inserted into SSISRunTimeMetrics.ssis.usp_RecordRowCounts.

When this task completes, we call ssis.usp_RecordTaskEnd from an Execute SQL Task. We pass in the Task Load ID from the iTaskLoadID variable, the current date and time, and the status "Succeeded".

On error, we capture Exception data and record an Error Status - both are crucial to knowing what happens when an exception is thrown.

When the package completes execution, we call ssis.usp_RecordPackageEnd from an Execute SQL Task. We pass in the Package Load ID from the variable, the current date and time, and the status "Succeeded".

Let's get started on the next step!

Version Control

First, update version information:

Remember to update Version properties:

 

Now we are ready to start developing.

Sell, Sell, Sell

Let's extract and load some Sales data.

Open the SSISRunTimeMetrics package you built previously. Delete the Success Precedence Constraint between the "Step 1 - Load Contact" Sequence Container and the "Log End of Package Execution" Execute SQL Task.

Drag a Sequence Container onto the Control Flow canvas. Move the "Log End of Package Execution" Execute SQL Task down some and position the new Sequence Container between the "Step 1 - Load Contact" Sequence Container and the "Log End of Package Execution" Execute SQL Task.

 Connect the "Step 1 - Load Contact" Sequence Container to the new Sequence Container with a Success Precedence Constraint, and the new Sequence Container to the "Log End of Package Execution" Execute SQL Task with a Success Precedence Constraint.

Rename the new Sequence Container "Step 2 - Load Sales". 

Good design is reuseable. Maybe not 100%, but most good designs are at least partially reuseable. Such is the case here - we have a good design in "Step 1 - Load Contact" - we will reuse lots of it in "Step 2 - Load Sales". Let's frame-out the flow, then fill in the details.

Drag two Execute SQL Tasks and a Data Flow Task into "Step 2 - Load Sales".

Name the first Execute SQL Task "Load Sales" and double-click it to open the editor. Set the ResultSet property to "Single row" and the Connection property to "(local).SSISRunTimeMetrics". Enter the following in the SQLStatement property:

declare @Now datetime
set
@Now = GetDate()
exec ssis.usp_RecordTaskStart ?,NULL,@Now,?,'Started'

On the Parameter Mappings page, add two input parameters. Set Parameter 0 to Long data type and supply the User::iPackageLoadID variable. Set Parameter 1 to VarChar data type and supply the System::TaskName variable:

On the Result Set page, add one Result named 0 aimed at the User::iTaskLoadID variable: 

Click the OK button to close the editor.

Before proceeding, note that this Execute SQL Task is also the product of good design. In fact, the only difference task and it's counterpart in "Step 1 - Load Contact" is the name of the task itself. Everything else is identical.

So why not copy and paste the task? Good question - we certainly could have! And we will copy and paste other tasks starting now.

Connect a Success Precedence Constraint from the "Load Sales" Execute SQL Task to the Data Flow Task. We need to do some cleanup here before proceeding. In the "Step 1 - Load Contact" Sequence Container there's a Data Flow Task named "Data Flow Task". We have one of those in our "Step 2 - Load Sales" Sequence Container as well. This is permissible because the objects are in different containers and have different scope.

It robs us of an important navigation facility - one we will likely need: the ability to use the Data Flow Task tab's dropdown box. Have a look:

To remedy this, let's rename the Data Flow Task in "Step 1 - Load Contact" "Load Contact Data". Similary, let's rename the "Step 2 - Load Sales" Data Flow Task "Load Sales Data".

There. Much better.

Connect the "Load Sales Data" Data Flow Task to the second Execute SQL Task with a Success Precedence Constraint and rename it (the second Execute SQL Task) "Apply Staged Updates". Double-click it to open the editor and set the Connection property to "(local).SSISRunTimeMetrics_Target". We will return to this task later - click the OK button to close the editor.

Copy the "Log Successful End of Task" Execute SQL Task from the "Step 1 - Load Contact" Sequence Container and paste it onto the Data Flow canvas. Then drag it into the the "Step 2 - Load Sales" Sequence Container.

Note: You can paste it directly into the "Step 2 - Load Sales" Sequence Container if you want to, but I recommend you not do this in SSIS 2005. The Sequence Container will expand to accomodate anything inside it, and the paste functionality in SSIS 2005 completely ignores the mouse pointer position (and eveything else, so far as I can tell) when you paste from the clipboard. Combined, these behaviors cause sequence containers to grow unpredictably large when you paste directly into them.

No modifications are required for the "Log Successful End of Task" Execute SQL Task to function as desired in the new sequence container - how cool is that?

Copy the "Log Failed End Of Task" Execute SQL Task and paste it onto the Control Flow canvas. The new task will show up named "Log Failed End Of Task 1". Again, a naming convention conflict. To resolve it, rename the original "Log Failed End Of Task" Execute SQL Task - connected to the "Step 1 - Load Contact" Sequence Container via a Failure Precedence Constraint - to "Log Failed End of Load Contact Task".

Rename the newly pasted "Log Failed End Of Task 1" Execute SQL Task to "Log Failed End Of Load Sales Task" and connect the "Step 2 - Load Sales" Sequence Container to "Log Failed End Of Load Sales Task" via a Failure Precedence Constraint.

Copy the "Record Row Count" Execute SQL Task from the "Step 1 - Load Contact" Sequence Container. Again, paste it onto the Control Flow canvas and then drag it into the "Step 2 - Load Sales" Sequence Container. Connect the "Log Successful End Of Task" Execute SQL Task to the "Record Row Count" Execute SQL Task with a Success Precedence Constraint and double-click the task to open the editor.

All is well with the General page, but the Parameter Mapping page reveals some poor variable-naming choices in the last exercise. We can fix this in the variable dropdown. Click the dropdown that currently contains the User::iContactCount variable and select <New variable...>:

When the Add Variable dialog displays, click the Container dropdown and select the package ("SSISRunTimeMetrics"). This determines the scope of the variable and we want a package-scoped variable.

Click the OK button to select the package scope. Set the Name of the variable to iSalesInputCount, the Value Type (data type) to Int32, and the Value to 0:

Click the OK button to close the Add Variable dialog.

Repeat the procedure above for the "Counts" variables. Name the remaining three Counts variables iSalesNewRowsCount, iSalesChangedRowsCount, and iSalesUnchangedRowsCount; respectively. When complete, the Parameter Mapping page should appear as shown:

Click the OK button to close the Execute SQL Task editor.

The flow is now framed-out. We are ready to begin our Sales-specific coding.

Building The LZ (Landing Zone)

We need a place for our Sales data to land in SSISRunTimeMetrics_Target database.

In this section I am going to walk through the first phase of the process of converting a well-designed OLTP schema into a denormalized schema.

We'll start with the AdventureWorks Sales schema. First, let's list all the tables in the Sales schema using the following query:

use AdventureWorks;
go

select s.name + '.' + t.name
from sys.tables t
inner join sys.schemas s on s.schema_id = t.schema_id
where s.name = 'Sales'

This gives us a list of tables in the Sales schema:

Sales.StoreContact
Sales.ContactCreditCard
Sales.CountryRegionCurrency
Sales.CreditCard
Sales.Currency
Sales.SalesOrderDetail
Sales.CurrencyRate
Sales.Customer
Sales.SalesOrderHeader
Sales.CustomerAddress
Sales.SalesOrderHeaderSalesReason
Sales.SalesPerson
Sales.SalesPersonQuotaHistory
Sales.SalesReason
Sales.Individual
Sales.SalesTaxRate
Sales.SalesTerritory
Sales.SalesTerritoryHistory
Sales.ShoppingCartItem
Sales.SpecialOffer
Sales.SpecialOfferProduct
Sales.Store

Let's select Sales.SalesOrderDetail as our base table... we have to start somewhere. Open SQL Server Management Studio and connect the Object Browser to your local (or development) instance of SQL Server 2005. Expand Databases, then AdventureWorks, then Tables:

 

Scroll down to Sales.SalesOrderDetail. Right-click the table object in Object Browser. Hover over "Script Table as", then "CREATE To", and click "New Query Editor Window":

 

This creates a nice CREATE script (and more) for the Sales.SalesOrderDetail table. I only need the CREATE TABLE portion so I remove the rest. I modify the script further - making the table part of the dbo schema. I discard the constraints, NOT NULLs, brackets, and extended properties and I'm left with:

CREATE TABLE dbo.SalesOrderDetail(
SalesOrderID int NULL,
SalesOrderDetailID int NULL,
CarrierTrackingNumber nvarchar(25) NULL,
OrderQty smallint NULL,
ProductID int NULL,
SpecialOfferID int NULL,
UnitPrice money NULL,
UnitPriceDiscount money NULL,
LineTotal money,
rowguid uniqueidentifier,
ModifiedDate datetime NULL)

Repeating the process for the Sale.SalesOrderHeader table yields:

CREATE TABLE dbo.SalesOrderHeader(
SalesOrderID int NULL,
RevisionNumber tinyint NULL,
OrderDate datetime NULL,
DueDate datetime NULL,
ShipDate datetime NULL,
Status tinyint NULL,
OnlineOrderFlag bit NULL,
SalesOrderNumber nvarchar(25) NULL,
PurchaseOrderNumber nvarchar(25) NULL,
AccountNumber nvarchar(15) NULL,
CustomerID int NULL,
ContactID int NULL,
SalesPersonID int NULL,
TerritoryID int NULL
BillToAddressID int NULL,
ShipToAddressID int NULL,
ShipMethodID int NULL,
CreditCardID int NULL,
CreditCardApprovalCode varchar(15) NULL,
CurrencyRateID int NULL,
SubTotal money NULL,
TaxAmt money NULL,
Freight money NULL,
TotalDue money NULL,
Comment nvarchar(128) NULL,
rowguid uniqueidentifier NULL,
ModifiedDate datetime NULL)

I can now combine these statements, removing the duplication, to create a destination table statement:

CREATE TABLE dbo.SalesOrderHeaderDetail(
SalesOrderID int NULL,
SalesOrderDetailID int NULL,
CarrierTrackingNumber nvarchar(25) NULL,
OrderQty smallint NULL,
ProductID int NULL,
SpecialOfferID int NULL,
UnitPrice money NULL,
UnitPriceDiscount money NULL,
LineTotal money,
SalesOrderDtatilrowguid uniqueidentifier,
SalesOrderDetailModifiedDate datetime NULL,

RevisionNumber tinyint NULL,
OrderDate datetime NULL,
DueDate datetime NULL,
ShipDate datetime NULL,
Status tinyint NULL,
OnlineOrderFlag bit NULL,
SalesOrderNumber nvarchar(25) NULL,
PurchaseOrderNumber nvarchar(25) NULL,
AccountNumber nvarchar(15) NULL,
CustomerID int NULL,
ContactID int NULL,
SalesPersonID int NULL,
TerritoryID int NULL,
BillToAddressID int NULL,
ShipToAddressID int NULL,
ShipMethodID int NULL,
CreditCardID int NULL,
CreditCardApprovalCode varchar(15) NULL,
CurrencyRateID int NULL,
SubTotal money NULL,
TaxAmt money NULL,
Freight money NULL,
TotalDue money NULL,
Comment nvarchar(128) NULL,
SalesOrderHeaderrowguid uniqueidentifier NULL,
SalesOrderHeaderModifiedDate datetime NULL)

Execute this statement against the SSISRunTimeMetrics_Target database to create our destination table.

Filling In The Blanks

Double-click the "Load Sales Data" Data Flow Task to switch to the Data Flow tab for editing. Drag an OLE DB Source Adapter onto the canvas and double-click it to open the editor. Select the (local).AdventureWorks connection manager. Change the Data access mode to Sql Command and enter the following SQL statement into the SQL Command Text textbox: 

SELECT SalesOrderID
,SalesOrderDetailID
,CarrierTrackingNumber
,OrderQty
,ProductID
,SpecialOfferID
,UnitPrice
,UnitPriceDiscount
,LineTotal
,rowguid
,ModifiedDate
FROM Sales.SalesOrderDetail

Click the OK button to close the editor. Right-click the Source Adapter and rename it "Sales Detail Source":

Drag a second OLE DB Source Adapter onto the Data Flow canvas and double-click it to open the editor. Select "(local).AdventureWorks" as the connection manager and SQL Command as the Data Access Mode. Enter the following statement into the SQL Command Text textbox:

SELECT SalesOrderID
,RevisionNumber
,OrderDate
,DueDate
,ShipDate
,Status
,OnlineOrderFlag
,SalesOrderNumber
,PurchaseOrderNumber
,AccountNumber
,CustomerID
,ContactID
,SalesPersonID
,TerritoryID
,BillToAddressID
,ShipToAddressID
,ShipMethodID
,CreditCardID
,CreditCardApprovalCode
,CurrencyRateID
,SubTotal
,TaxAmt
,Freight
,TotalDue
,Comment
,rowguid
,ModifiedDate
FROM Sales.SalesOrderHeader

 

Click the OK button to close the editor and rename the Source Adapter "Sales Header Source".

Drag a Merge Join onto the Data Flow canvas and connect a Data Flow Path (green arrow) from each Source Adapter to the Merge Join:

Note the Merge Join has an error - the Left Input is not sorted. (Neither is the Right Input, but validation fails on, and reports, the first error). To address this condition, right-click each Source Adapter and select Show Advanced Editor:

Click on the Input and Output Properties tab and expand the OLE DB Source Output object, then expand the Output Columns logical folder. Click on the OLE DB Source Output object (which represents the Output buffer) and change the IsSorted property to True:

In the Output Columns list, click on the SalesOrderID column and change the SortKeyPosition property to 1:

Click the OK button to close the Advanced Editor. Double-click the source adapter to open the editor and append an Order By clause to the SQL Command: "ORDER BY SalesOrderID". Close the editor and repeat this process for the other Source Adapter.

But wait - we still have an error:

 

Double-click the Merge Join to open the editor. Click every column from the Left input and every column except SalesOrderID from the Right Input. Two columns are named the same in both tables - rowguid and ModifiedDate. To differentiate, prepend each column's Output Alias with the table name:

 

Click the OK button to close the editor. The error clears.

Ok

We did all that to set this up. 

Count 'Em Up 

As with the Contacts data, we will count the rows entering the process. Depending on the nature of the process, source data, and desired measurement(s); you may choose to measure immediately after the Source Adapters or after the Merge Join - or both. We'll start with the same kind of counts measurements we built in the Contacts data flow.

Drag a Row Count transformation onto the Data Flow canvas and connect the output of the Merge Join to its input. Double-click the Row Count to open the editor and assign the User::iSalesInputCount to the VariableName property:

 

As with the Contacts data, our next steps are to correlate and filter the data, so drag a Lookup and Conditional Split transformation onto the data flow canvas and connect them (in respective order) to the Row Count transformation:

Double-click the Lookup to open the editor and assign the following properties:
OLE DB Connection Manager: (local).SSISRunTimeMetrics_Target
Table or View: dbo.SalesOrdeHeaderDetail 

Click the Columns tab, right-click in the white-space, and click Select All Mappings. Right-click again and select Delete Selected Mappings. Connect the SalesOrderID and SalesOrderDetailID columns. Select every column in the Available Lookup Columns list by checking each checkbox, then prepend each Output Alias with "Dest_":

Click the Configure Error Output button and change the Lookup Error from "Fail Component" to "Ignore Failure". Remember, this converts the default INNER JOIN functionality of the Lookup transformation into a LEFT OUTER JOIN. Click the OK button to close the Error Output Configuration, then click the OK button again to close the Lookup editor.

We are loading the pipeline with lookup data in our data flow that matches data - by SalesOrderID and SalesOrderDetailID - in the destination.

Connect the output data flow path of the Lookup of the Lookup transformation to the Conditional Split transformation and double-click the Conditional Split transformation to open the editor. Create a new output named "New Sales" with the Condition: "IsNull(Dest_SalesOrderDetailID)". If the LEFT OUTER JOIN functionality of the Lookup returns a NULL Dest_SalesOrderDetailID - and really every destination column will be NULL if there's no matching destination row, we could use any of them - then this is a new Sales data row.

Add a second condition named "Changed Sales" with the following condition expression:

(ISNULL(Dest_RevisionNumber) ? -1 : Dest_RevisionNumber) != (ISNULL(RevisionNumber) ? -1 : RevisionNumber) || (ISNULL(Dest_OrderDate) ? (DT_DBDate)0 : Dest_OrderDate) != (ISNULL(OrderDate) ? (DT_DBDate)0 : OrderDate) || (ISNULL(Dest_DueDate) ? (DT_DBDate)0 : Dest_DueDate) != (ISNULL(DueDate) ? (DT_DBDate)0 : DueDate) || (ISNULL(Dest_ShipDate) ? (DT_DBDate)0 : Dest_ShipDate) != (ISNULL(ShipDate) ? (DT_DBDate)0 : ShipDate) || (ISNULL(Dest_Status) ? 0 : Dest_Status) != (ISNULL(Status) ? 0 : Status) || (ISNULL(Dest_OnlineOrderFlag) ?  TRUE  : Dest_OnlineOrderFlag) != (ISNULL(OnlineOrderFlag) ?  TRUE  : OnlineOrderFlag) || (ISNULL(Dest_SalesOrderNumber) ? "NULL" : Dest_SalesOrderNumber) != (ISNULL(SalesOrderNumber) ? "NULL" : SalesOrderNumber) || (ISNULL(Dest_PurchaseOrderNumber) ? "NULL" : Dest_PurchaseOrderNumber) != (ISNULL(PurchaseOrderNumber) ? "NULL" : PurchaseOrderNumber) || (ISNULL(Dest_AccountNumber) ? "NULL" : Dest_AccountNumber) != (ISNULL(AccountNumber) ? "NULL" : AccountNumber) || (ISNULL(Dest_CustomerID) ? -1 : Dest_CustomerID) != (ISNULL(CustomerID) ? -1 : CustomerID) || (ISNULL(Dest_ContactID) ? -1 : Dest_ContactID) != (ISNULL(ContactID) ? -1 : ContactID) || (ISNULL(Dest_SalesPersonID) ? -1 : Dest_SalesPersonID) != (ISNULL(SalesPersonID) ? -1 : SalesPersonID) || (ISNULL(Dest_TerritoryID) ? -1 : Dest_TerritoryID) != (ISNULL(TerritoryID) ? -1 : TerritoryID) || (ISNULL(Dest_BillToAddressID) ? -1 : Dest_BillToAddressID) != (ISNULL(BillToAddressID) ? -1 : BillToAddressID) || (ISNULL(Dest_ShipToAddressID) ? -1 : Dest_ShipToAddressID) != (ISNULL(ShipToAddressID) ? -1 : ShipToAddressID) || (ISNULL(Dest_ShipMethodID) ? -1 : Dest_ShipMethodID) != (ISNULL(ShipMethodID) ? -1 : ShipMethodID) || (ISNULL(Dest_CreditCardID) ? -1 : Dest_CreditCardID) != (ISNULL(CreditCardID) ? -1 : CreditCardID) || (ISNULL(Dest_CreditCardApprovalCode) ? "NULL" : Dest_CreditCardApprovalCode) != (ISNULL(CreditCardApprovalCode) ? "NULL" : CreditCardApprovalCode) || (ISNULL(Dest_CurrencyRateID) ? -1 : Dest_CurrencyRateID) != (ISNULL(CurrencyRateID) ? -1 : CurrencyRateID) || (ISNULL(Dest_SubTotal) ? 0 : Dest_SubTotal) != (ISNULL(SubTotal) ? 0 : SubTotal) || (ISNULL(Dest_TaxAmt) ? 0 : Dest_TaxAmt) != (ISNULL(TaxAmt) ? 0 : TaxAmt) || (ISNULL(Dest_Freight) ? 0 : Dest_Freight) != (ISNULL(Freight) ? 0 : Freight) || (ISNULL(Dest_TotalDue) ? 0 : Dest_TotalDue) != (ISNULL(TotalDue) ? 0 : TotalDue) || (ISNULL(Dest_Comment) ? "NULL" : Dest_Comment) != (ISNULL(Comment) ? "NULL" : Comment) || (ISNULL(Dest_SalesOrderHeaderModifiedDate) ? (DT_DBDATE)0 : Dest_SalesOrderHeaderModifiedDate) != (ISNULL(SalesOrderHeaderModifiedDate) ? (DT_DBDATE)0 : SalesOrderHeaderModifiedDate) || (ISNULL(Dest_CarrierTrackingNumber) ? "NULL" : Dest_CarrierTrackingNumber) != (ISNULL(CarrierTrackingNumber) ? "NULL" : CarrierTrackingNumber) || (ISNULL(Dest_OrderQty) ? 0 : Dest_OrderQty) != (ISNULL(OrderQty) ? 0 : OrderQty) || (ISNULL(Dest_ProductID) ? -1 : Dest_ProductID) != (ISNULL(ProductID) ? -1 : ProductID) || (ISNULL(Dest_SpecialOfferID) ? -1 : Dest_SpecialOfferID) != (ISNULL(SpecialOfferID) ? -1 : SpecialOfferID)

Rename the default output "Unchanged Rows". Click the OK button to close the editor.

Drag an OLE DB Destination Adapter onto the Data Flow canvas and rename it "New Sales Destination". Connect an output of the Conditional Split to the new Destination Adapter. When prompted, select the "New Sales" output of the Conditional Split:

 

Double-click the Destination Adapter ot open the editor. Set the Connection Manager property to (local).SSISRunTimeMetrics_Target. Set the Data Access Mode property to "Table or View" and select the dbo.SalesOrderHeaderDetail table. Click on the Mappings page to automap the pipeline fields to the table columns:

 

Click the OK button to close the editor.

Drag another OLE DB Destination Adapter onto the Data Flow canvas and rename it "stgSalesChangedRows". Connect an output from the Conditional Split to the new Destination Adapter and select the "Changed Sales" output when prompted. Double-click the Destination Adapter to open the editor. Set the Connection Manager property to (local).SSISRunTimeMetrics_Target and set the Data Access Mode property to "Table or View". Click the New button next to the "Name of the Table or View" dropdown:

Click the OK button to create the stgSalesChangedRows table. Click the Mappings page to automap the columns, then click the OK button to close the editor.

We now have Sales ETL. Cool

To complete our counts logic, add three Row Count transformations to the Data Flow canvas. Name them "New Rows Count", "Changed Rows Count", and "Unchanged Rows Count". Position "New Rows Count" between the Conditional Split and the "New Sales Destination" Adapter. Double-click to open the editor and set the VariableName property to "User::iSalesNewRowsCount". Click the OK button to close the editor.

Position the "Changed Rows Count" between the Conditional Split and the stgSalesChangedRows Destination Adapter. Open its editor and set the VariableName property to "User::iSalesChangedRowsCount".

Open the editor for the "Unchanged Rows Count" transformation and set the VariableName property to "User::iSalesUnchangedRowsCount".

 Before we leave this section, let's complete the staged updates by returning to the Control Flow and updating the SQLStatement property of the "Apply Staged Updates" Execute SQL Task inside the "Step 2 - Load Sales" Sequence Container with the following statement:

UPDATE dest
SET dest.SalesOrderID = stage.SalesOrderID
,dest.CarrierTrackingNumber = stage.CarrierTrackingNumber
,dest.OrderQty = stage.OrderQty
,dest.ProductID = stage.ProductID
,dest.SpecialOfferID = stage.SpecialOfferID
,dest.UnitPrice = stage.UnitPrice
,dest.UnitPriceDiscount = stage.UnitPriceDiscount
,dest.LineTotal = stage.LineTotal
,dest.SalesOrderDtatilrowguid = stage.SalesOrderDtatilrowguid
,dest.SalesOrderDetailModifiedDate = stage.SalesOrderDetailModifiedDate
,dest.RevisionNumber = stage.RevisionNumber
,dest.OrderDate = stage.OrderDate
,dest.DueDate = stage.DueDate
,dest.ShipDate = stage.ShipDate
,dest.Status = stage.Status
,dest.OnlineOrderFlag = stage.OnlineOrderFlag
,dest.SalesOrderNumber = stage.SalesOrderNumber
,dest.PurchaseOrderNumber = stage.PurchaseOrderNumber
,dest.AccountNumber = stage.AccountNumber
,dest.CustomerID = stage.CustomerID
,dest.ContactID = stage.ContactID
,dest.SalesPersonID = stage.SalesPersonID
,dest.TerritoryID = stage.TerritoryID
,dest.BillToAddressID = stage.BillToAddressID
,dest.ShipToAddressID = stage.ShipToAddressID
,dest.ShipMethodID = stage.ShipMethodID
,dest.CreditCardID = stage.CreditCardID
,dest.CreditCardApprovalCode = stage.CreditCardApprovalCode
,dest.CurrencyRateID = stage.CurrencyRateID
,dest.SubTotal = stage.SubTotal
,dest.TaxAmt = stage.TaxAmt
,dest.Freight = stage.Freight
,dest.TotalDue = stage.TotalDue
,dest.Comment = stage.Comment
,dest.SalesOrderHeaderrowguid = stage.SalesOrderHeaderrowguid
,dest.SalesOrderHeaderModifiedDate = stage.SalesOrderHeaderModifiedDate
FROM dbo.SalesOrderHeaderDetail dest
INNER JOIN dbo.stgSalesChangedRows stage ON stage.SalesOrderDetailID = dest.SalesOrderDetailID

There. Done and done. 

If You Build It...

Up until now, we've basically followed the same template used for Contacts to construct the Sales ETL. We added some complexity (by design) to grow our understanding of ETL along with our knowledge of SSIS.

Our ETL measurement is record counts and record counts only. Let's expand on that some by also capturing a monetary sum. This will add even more confidence in our ETL, once we validate (Validation is Part 5).

Let's begin by creating a new destination table to hold our sums: SSISRunTimeMetrics.ssis.RowSums. Use the following script to create the table:

use SSISRunTimeMetrics
go

-- vars...
declare @sql varchar(255)

-- create ssis schema...
if not exists(select name
              from sys.schemas
              where name = 'ssis')
 begin
  set @sql = 'Create Schema ssis'
  exec(@sql)
 end

-- create RunTimeErrors table...
if exists(select s.name + '.' + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = 'RowSums'
           and s.name = 'ssis')
 drop table ssis.RowSums
go

Create Table ssis.RowSums
 (RowSumsID int identity(1,1)
 ,TaskMetricsID int null
 ,RunTimeMetricsId int not null
 ,ParentTaskMetricsID int null
 ,RowSum decimal(38,2) null
 ,RowSumColumnName varchar(255) null
 ,RowSumTypeID char(1) null)

This table is remarkably similar to the ssis.RowCounts table we created to hold Row Count data - and for good reason, the functions of these two tables are remarkably similar. As with the Row Counts data, we need to add a stored procedure to insert Sums data, and another table to hold Inserted Types... or do we? Instead of re-creating the functionality contained in the ssis.RowCountTypes table, let's rename - and expand the purpose of - the table.

Executing the following script accomplishes this nicely:

use SSISRunTimeMetrics
go

-- vars...
declare @sql varchar(255)

-- create ssis schema...
if not exists(select name
              from sys.schemas
              where name = 'ssis')
 begin
  set @sql = 'Create Schema ssis'
  exec(@sql)
 end

-- delete RowCountTypes table, if exists...
if exists(select s.name + '.' + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = 'RowCountTypes'
           and s.name = 'ssis')
 drop table ssis.RowCountTypes
go

-- delete RowTypes table, if exists...
if exists(select s.name + '.' + t.name
          from sys.tables t
          inner join sys.schemas s on s.schema_id = t.schema_id
          where t.name = 'RowTypes'
           and s.name = 'ssis')
 drop table ssis.RowTypes
go

Create Table ssis.RowTypes
(RowTypeID char(1) not null
,RowTypeName varchar(25) null
,RowTypeDescription varchar(255) null)
go

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = 'I')
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 ('I'
 ,'Selected Input Rows'
 ,'Input rows selected from a source')

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = 'N')
 insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 ('N'
 ,'New Rows'
 ,'New rows')

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = 'C')
 
insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 ('C'
 ,'Changed Rows'
 ,'Changed rows')

if not exists(select RowTypeID
              from ssis.RowTypes
              where RowTypeID = 'U')
 
insert into ssis.RowTypes
 (RowTypeID
 ,RowTypeName
 ,RowTypeDescription)
 values
 ('U'
 ,'Unchanged Rows'
 ,'No changes detected in rows')
go

Our stored procedure to accomplish inserts:

use SSISRunTimeMetrics
go
if exists(select s.name + '.' + p.name
from sys.procedures p
inner join sys.schemas s on s.schema_id = p.schema_id
where p.name = 'usp_RecordRowSum'
and s.name = 'ssis')
begin
Drop Procedure ssis.usp_RecordRowSum
end
go
Create Procedure ssis.usp_RecordRowSum
@RunTimeMetricsID
int
,@TaskMetricsID int
,@RowSum decimal(38,2)
,@RowSumTypeID char(1)
,@RowSumColumnName varchar(255) = null
,@ParentTaskMetricsID int = null
As
begin
-- insert the run time errors data...
insert into ssis.RowSums
(TaskMetricsID
,RunTimeMetricsId
,ParentTaskMetricsID
,RowSum
,RowSumColumnName
,RowSumTypeID)
values
(@TaskMetricsID
,@RunTimeMetricsID
,@ParentTaskMetricsID
,@RowSum
,@RowSumColumnName
,@RowSumTypeID)
end

go

Now that our infrastructure is built we can start using it to load SSIS run time metrics.  

Add 'Em Up

We need variables to hold the sums we intend to collect. Right-click the Control Flow and click Variables. Click the New Variable button and add a package-scoped variable, data type Double, named iSalesInputAmount. Repeat the process for three other Double variables named iSalesNewAmount, iSalesChangedAmount, and iSalesUnchangedAmount.

There are other ways to load this type of data. The way I choose to demonstrate here is not the cleanest but it clearly exercises the principles of ETL Instrumentation.

Return to the "Load Sales Data" Data Flow Task and add one each Multicast and Aggregate transformations. Position the Mulitcast between Merge Join and Input Row Count transformations and connect them through it. Rename the Aggregate transformation "Input Line Total" and connect another output of the Multicast to it:

 

Double-click the Aggregate transformation to open the editor and check the LineTotal input column. Select Sum from the Operation column - this will add the total of all the LineTotal columns that pass between the Merge Join and Input Row Count transformations: 

Add a Script Component to the Data Flow. When prompted for Script Component Type, select Destination:

Rename the Script Component "Push InputLineTotal into Input Variable", connect the "Input Line Total" Aggregate transformation to it, and double-click the Script Component to open the editor.

On the Input Columns page, check the Line Total input. On the Script page, enter iSalesInputAmount in the ReadWriteVariables property and click the Design Script button. In the script editor, enter the following code:

Dim iAmount As Double

Public Overrides Sub Input0_ProcessInputRow(ByVal Row As Input0Buffer)
  '
  ' Add your code here
  '

  iAmount = Row.LineTotal

End Sub

Public
Overrides Sub PostExecute()

  MyBase
.PostExecute()
  Me.Variables.iSalesInputAmount = iAmount
End Sub

Close the Script Editor and click the OK button to clase the Script Component editor. This should load the aggregated value into the iSalesInputAmount variable.

Drag two each Multicast and three each Aggregate, and Script Component transformations onto the Data Flow canvas. Repeat the procedure outlined above for the New, Changed, and Unchanged Conditional Split outputs - for the iSalesNewAmount, iSalesChangedAmount, and iSalesUnchangedAmount variable values (respectively). Note you do not need a Multicase transformation for the Unchanged output. Sum the LineTotal fields for each output.

This is a lot of work and there is lots of room for error. Take your time. Double-check your work. Don't take shortcuts. When complete, the New section will look something like this:

Load 'Em Up

Return to the Control Flow - it's time to captue these metrics!

In the "Step 2 - Load Sales" Sequence Container, rename the "Record Row Count" Execute SQL Task "Record Metrics". Double-click it to open the editor. Click the ellipsis on the SQLStatement property and add the following script to the existing statement:

exec ssis.usp_RecordRowSum ?,?,?,'I','Sales.SalesOrderDetail.LineTotal'

exec ssis.usp_RecordRowSum ?,?,?,'N','Sales.SalesOrderDetail.LineTotal'

exec ssis.usp_RecordRowSum ?,?,?,'C','Sales.SalesOrderDetail.LineTotal'

exec ssis.usp_RecordRowSum ?,?,?,'U','Sales.SalesOrderDetail.LineTotal'

Click the Parameter Mapping page and add twelve parameters. With the existing twelve parameters, the new twelve are numbered 12 - 23:

The parameters follow the pattern iPackageLoadID (Input, Long, Incrementally Numbered), iTaskLoadID (Input, Long, Incrementally Numbered), iSales___Amount (Input, Double, Incrementally Numbered).

Click the Ok button to close the editor.

Testing, One, Two, Three...

Execute the package to test the Summing functionality. The following represents a better report query for our collected data:

use SSiSRunTimeMetrics
go

select
m
.packageName
,m.packageStartDateTime
,DateDiff(ss, m.packageStartDateTime, m.packageEndDateTime) as 'packageRunTime'
,m.packageStatus
,t.SourceName
,t.TaskStartDateTime
,DateDiff(ss, t.TaskStartDateTime, t.TaskEndDateTime) as 'taskRunTime'
,t.TaskStatus
,s.RowSum as 'Measurement'
,'Sum' as 'MeasurementType'
,st.RowTypeName
from ssis.TaskMetrics t
inner join ssis.RunTimeMetrics m on t.RunTimeMetricsID = m.id
inner join ssis.RowSums s on s.TaskMetricsID = t.TaskMetricsID
inner join ssis.RowTypes st on st.RowTypeID = s.RowSumTypeID
where m.id = (select Max(id)
from ssis.RunTimeMetrics)
and s.RowSum > 0

union

select

m
.packageName
,m.packageStartDateTime
,DateDiff(ss, m.packageStartDateTime, m.packageEndDateTime) as 'packageRunTime'
,m.packageStatus
,t.SourceName
,t.TaskStartDateTime
,DateDiff(ss, t.TaskStartDateTime, t.TaskEndDateTime) as 'taskRunTime'
,t.TaskStatus
,c.[RowCount] as 'Measurement'
,'Counts' as 'MeasurementType'
,ct.RowTypeName
from ssis.TaskMetrics t
inner join ssis.RunTimeMetrics m on t.RunTimeMetricsID = m.id
inner join ssis.RowCounts c on c.TaskMetricsID = t.TaskMetricsID
inner join ssis.RowTypes ct on ct.RowTypeID = c.RowCountTypeID
where m.id = (select Max(id)
from ssis.RunTimeMetrics)
and c.[RowCount] > 0

Conclusion

Again, these examples are intended to demonstrate the principles and characteristics of ETL Instrumentation. They are not complete and Production-ready. I make no claims that this is "the right way" or even a best practice to capture ETL Run Time Metrics data. I do maintain that such data is useful in many ways - especially for troubleshooting and certain performance predictive analytics.

Next: Validation - putting this data to work.

:{> Andy

Published Wednesday, December 26, 2007 3:50 AM by andyleonard

Comment Notification

If you would like to receive an email when updates are made to this post, please register here

Subscribe to this post's comments using RSS

Comments

 

jerryhung said:

December 27, 2007 10:55 AM
 

andyleonard said:

Thanks Jerry!

  I corrected the link in the post.

:{> Andy

December 27, 2007 3:06 PM
 

Winston said:

Hi Andy,

I've been following along with your posts as they really do help us beginners out here. But I'm having a problem with this Part 4... On executing the package it hangs at the point where the Aggregate Tasks should be receiving columns to aggregate for the various variables. The problem I believe lies in the fact that conditional splits can result in no records being sent down a particular line and therefore no aggregation can be done (not even to return zero???). Have you experienced this issue; how did you manage to resolve it? I can't seem to find a solution to this anywhere...

Thanks in advance,

winston

December 22, 2009 1:37 PM
 

Valdy said:

Thank you for sharing this. We've reviewed our BI logging and try to combine that we have and yours. Excellent article.

March 17, 2010 10:37 PM

Leave a Comment

(required) 
(required) 
Submit

This Blog

Syndication

My Company


Other Blog

Check out my personal blog...
http://andyleonard.me

Contact Me

Twitter: @AndyLeonard
Email: andy.leonard@gmail.com

Powered by Community Server (Commercial Edition), by Telligent Systems
  Privacy Statement