Skip to content

Landing Area Stored Procedure Delta

This code-generation template uses a Full Outer Join (FOJ / full comparison) between a source data set, and the Persistent Staging Area (PSA) to derive a data differential (delta). The detected data changes will be inserted into the staging / landing area.

Getting data into the data solution is often one of the hardest features to implement. This is due to the great variety of possible data sources (systems, APIs, technologies), compounded by company-specific limitations, policies, and the velocity, veracity and volume of the data itself.

A FOJ mechanism to detect changes in data is one of many ways to do so. As such, it is one of the option to consider for ingesting new data delta.

  • SQL Server family databases; the template uses procedural SQL (T-SQL) syntax.

This template requires a dedicated, transient, staging (landing) area. As part of the procedure it generates, the template will truncate the target data object -the staging area object.

It will then load the identified data delta into the staging object by comparing the latest received Persistent Staging Area (PSA) data with the latest state (most current) of data in the source object.

  • The template requires both the source and target databases to be accessible, either via a Linked Server, on-premises cross-database query feature, or because the source- and target data sets are located in the same database.
  • This pattern has limited scalability, and is best applied for smaller data sets.
  • A Full Outer Join mechanism will only detect data changes at runtime. If data has changed multiple times, only the state as per when the comparison is run will be detected.
{{#each dataObjectMappings~}}
CREATE OR ALTER PROCEDURE [{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[SP_{{targetDataObject.name}}] @{{../conventions.auditTrailIdColumn}} INT = 0 -- Applying 0 as default value
AS
--
-- Staging Area Stored Procedure for {{targetDataObject.name}} using a DIRECT control framework wrapper.
-- This template prepares a data delta / differental using a full outer join, and loads this into the landing area.
-- The resulting data set contains the data delta at the point of execution.
--
-- Generated from template '{{../templateMetadata.name}}'.
--
TRUNCATE TABLE [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{targetDataObject.name}}];
WITH SOURCE_CTE AS
(
-- Incoming data, a full data set is expected to be available (no data delta).
SELECT
{{#each dataItemMappings}}
[{{sourceDataItems.0.name}}] AS [{{targetDataItem.name}}],
{{/each}}
HASHBYTES('MD5',
{{#each dataItemMappings}}
ISNULL(RTRIM(CONVERT(NVARCHAR(100),[{{sourceDataItems.0.name}}])), 'N/A') + '#~!'{{#unless @last}} +{{/unless}}
{{/each}}
) AS [{{../conventions.checksumColumn}}]
FROM [{{lookupExtension sourceDataObjects.0.dataConnection.extensions "datastore"}}].[{{lookupExtension sourceDataObjects.0.dataConnection.extensions "location"}}].[{{sourceDataObjects.0.name}}]
),
PSA_CTE AS
(
-- Query the most recently arrived PSA record which is not logically deleted for comparison.
SELECT
A.[{{../conventions.checksumColumn}}] AS [{{../conventions.checksumColumn}}],
{{#each dataItemMappings}}
A.[{{stringupper sourceDataItems.0.name}}] AS [{{stringupper targetDataItem.name}}]{{#unless @last}},{{/unless}}
{{/each}}
FROM [{{lookupExtension relatedDataObjects.0.dataConnection.extensions "datastore"}}].[{{lookupExtension relatedDataObjects.0.dataConnection.extensions "location"}}].[{{relatedDataObjects.0.name}}] A
JOIN
(
SELECT {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
B.[{{stringupper sourceDataItems.0.name}}], {{/each}} {{/each}}
[{{../conventions.inscriptionTimeStampColumn}}] AS [MAX_{{../conventions.inscriptionTimeStampColumn}}],
MAX([{{../conventions.inscriptionRecordIdColumn}}]) OVER
(
PARTITION BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{stringupper sourceDataItems.0.name}}], {{/each}} {{/each}}
[{{../conventions.inscriptionTimeStampColumn}}]
ORDER BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{stringupper sourceDataItems.0.name}}], {{/each}} {{/each}}
[{{../conventions.inscriptionTimeStampColumn}}]
) AS [MAX_{{../conventions.inscriptionRecordIdColumn}}],
ROW_NUMBER() OVER
(
PARTITION BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{stringupper sourceDataItems.0.name}}]{{#unless @last}},{{/unless}} {{/each}} {{/each}}
ORDER BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{stringupper sourceDataItems.0.name}}], {{/each}} {{/each}}
{{../conventions.inscriptionTimeStampColumn}} DESC
) AS MAX_ROWNUM
FROM [{{lookupExtension relatedDataObjects.0.dataConnection.extensions "datastore"}}].[{{lookupExtension relatedDataObjects.0.dataConnection.extensions "location"}}].[{{relatedDataObjects.0.name}}] B
) C ON {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
A.[{{targetDataItem.name}}] = C.[{{targetDataItem.name}}] AND{{/each}} {{/each}}
A.[{{../conventions.inscriptionRecordIdColumn}}] = C.[MAX_{{../conventions.inscriptionRecordIdColumn}}] AND
A.[{{../conventions.inscriptionTimeStampColumn}}] = C.[MAX_{{../conventions.inscriptionTimeStampColumn}}] AND
1 = C.[MAX_ROWNUM]
WHERE {{../conventions.changeDataColumn}} != 'D'
)
INSERT INTO [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{stringupper targetDataObject.name}}]
(
[{{../conventions.auditTrailIdColumn}}],
{{#each dataItemMappings}}
[{{stringupper sourceDataItems.0.name}}],
{{/each}}
[{{../conventions.checksumColumn}}],
[{{../conventions.changeDataColumn}}],
{{!-- Source row Id implemented for when not using identify column in the staging table --}}
--[{{../conventions.inscriptionRecordIdColumn}}],
[{{../conventions.sourceTimestampColumn}}]
)
SELECT
@AUDIT_TRAIL_ID AS [{{../conventions.auditTrailIdColumn}}],
{{#each dataItemMappings}}
CASE
WHEN SOURCE_CTE.{{#each ../businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NULL
THEN PSA_CTE.[{{stringupper sourceDataItems.0.name}}]
ELSE SOURCE_CTE.[{{stringupper sourceDataItems.0.name}}]
END AS [{{stringupper sourceDataItems.0.name}}],{{/each}}
CASE
WHEN SOURCE_CTE.{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NULL
THEN PSA_CTE.[{{../conventions.checksumColumn}}]
ELSE SOURCE_CTE.[{{../conventions.checksumColumn}}]
END AS [{{../conventions.checksumColumn}}],
CASE
WHEN SOURCE_CTE.{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NULL THEN 'D'
WHEN PSA_CTE.{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NULL THEN 'C' --Inserts are also C
WHEN SOURCE_CTE.{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NOT NULL
AND PSA_CTE.{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NOT NULL
AND SOURCE_CTE.[{{../conventions.checksumColumn}}] != PSA_CTE.[{{../conventions.checksumColumn}}] THEN 'C' ELSE 'No Change'
END AS {{../conventions.changeDataColumn}},
{{!-- Source row Id implemented for when not using identify column in the staging table --}}
--ROW_NUMBER() OVER ( ORDER BY {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}CASE WHEN SOURCE_CTE.[{{stringupper sourceDataItems.0.name}}] IS NULL THEN PSA_CTE.[{{stringupper sourceDataItems.0.name}}] ELSE SOURCE_CTE.[{{stringupper sourceDataItems.0.name}}] END{{#unless @last}},{{/unless}}{{/each}}{{/each}}) AS {{../conventions.inscriptionRecordIdColumn}},
SYSDATETIME() AS [{{../conventions.sourceTimestampColumn}}]
FROM SOURCE_CTE
FULL OUTER JOIN PSA_CTE ON {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
PSA_CTE.[{{stringupper sourceDataItems.0.name}}] = SOURCE_CTE.[{{stringupper sourceDataItems.0.name}}]{{#unless @last}} AND{{/unless}}{{/each}} {{/each}}
WHERE
(
CASE
WHEN {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}SOURCE_CTE.[{{stringupper sourceDataItems.0.name}}] IS NULL THEN 'D'{{/if}}{{/each}}{{/each}}
WHEN {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}PSA_CTE.[{{stringupper sourceDataItems.0.name}}] IS NULL THEN 'C'{{/if}}{{/each}}{{/each}}
WHEN {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}PSA_CTE.[{{stringupper sourceDataItems.0.name}}] IS NOT NULL AND PSA_CTE.{{stringupper sourceDataItems.0.name}} IS NOT NULL AND SOURCE_CTE.[{{../../../conventions.checksumColumn}}] != PSA_CTE.[{{../../../conventions.checksumColumn}}] THEN 'C'{{/if}}{{/each}}{{/each}}
ELSE 'No Change'
END
) != 'No Change'
GO
{{!-- Integration with the control framework --}}
/*
{{#each extensions}}
{{#stringcompare key "hasControlFramework"}}
-- Integration with the control framework, module registration.
EXEC [{{lookupExtension ../../extensions "controlFrameworkDataStore"}}].[{{lookupExtension ../../extensions "controlFrameworkLocation"}}].[RegisterModule]
@ModuleCode = 'm_{{../../targetDataObject.name}}'
,@ModuleAreaCode = '{{../../../conventions.landingAreaObjectPrefix}}'
,@Executable = 'EXEC [{{lookupExtension ../../targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../targetDataObject.dataConnection.extensions "location"}}].[SP_{{../../targetDataObject.name}}] @AUDIT_TRAIL_ID = @ModuleInstanceId'
,@ModuleDescription = 'Staging Area process for [{{../../sourceDataObjects.0.name}}]'
,@ModuleSourceDataObject = '[{{lookupExtension ../../sourceDataObjects.0.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../sourceDataObjects.0.dataConnection.extensions "location"}}].[{{../../sourceDataObjects.0.name}}]'
,@ModuleTargetDataObject = '[{{lookupExtension ../../targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../targetDataObject.dataConnection.extensions "location"}}].[{{../../targetDataObject.name}}]'
-- Example: run the process.
EXEC [{{lookupExtension ../../extensions "controlFrameworkDataStore"}}].[{{lookupExtension ../../extensions "controlFrameworkLocation"}}].[RunModule]
@ModuleCode = 'm_{{../../targetDataObject.name}}'
,@ModuleInstanceIdColumnName='AUDIT_TRAIL_ID'
{{/stringcompare}}{{/each}}*/
{{/each}}