Skip to content

Persistent Staging Area Stored Procedure Full Outer Join

This code-generation template uses a Full Outer Join (FOJ / full comparison) between a source data set, and the Persistent Staging Area (PSA) to derive a data differential. The identified data changes are inserted directly into the PSA.

Getting data into the data solution is often one of the hardest features to implement. This is due to the great variety of possible data sources (systems, APIs, technologies), compounded by company-specific limitations, policies, and the velocity, veracity and volume of the data itself.

A FOJ mechanism to detect changes in data is one of many ways to do so. As such, it is one of the option to consider for ingesting new data delta.

  • SQL Server family databases; the template uses procedural SQL (T-SQL) syntax.

This template can be used to skip a dedicated, transient, staging (landing) area, and insert any identified data delta directly in the PSA.

  • The template requires both the source and target databases to be accessible, either via a Linked Server, on-premises cross-database query feature, or because the source- and target data sets are located in the same database.
  • This pattern has limited scalability, and is best applied for smaller data sets.
  • A Full Outer Join mechanism will only detect data changes at runtime. If data has changed multiple times, only the state as per when the comparison is run will be detected.
{{#each dataObjectMappings~}}
CREATE OR ALTER PROCEDURE [SP_{{targetDataObject.name}}] @{{../conventions.auditTrailIdColumn}} INT = 0 --Applying 0 as default value.
AS
--
-- Persistent Staging Area Stored Procedure for {{targetDataObject.name}} using a DIRECT control framework wrapper.
-- This template expects a full copy of incoming data to be available in the source object e.g. the source or landing area, and performs a full outer join to detects data changes.
--
-- Generated from template '{{../templateMetadata.name}}'.
--
WITH STG_CTE AS
(
SELECT
{{#each dataItemMappings}}
[{{stringupper sourceDataItems.0.name}}] AS [{{ stringupper targetDataItem.name}}],
{{/each}}
HASHBYTES('MD5',
{{#each dataItemMappings}}
ISNULL(RTRIM(CONVERT(NVARCHAR(100),[{{stringupper sourceDataItems.0.name}}])), 'N/A') + '#~!'{{#unless @last}} +{{/unless}}
{{/each}}
) AS [{{../conventions.checksumColumn}}]
FROM [{{lookupExtension sourceDataObjects.0.dataConnection.extensions "datastore"}}].[{{lookupExtension sourceDataObjects.0.dataConnection.extensions "location"}}].[{{sourceDataObjects.0.name}}]
),
PSA_CTE AS
(
-- Query the most recently arrived PSA record which is not logically deleted for comparison.
SELECT
A.[{{../conventions.checksumColumn}}] AS [{{../conventions.checksumColumn}}],
{{#each dataItemMappings}}
A.[{{sourceDataItems.0.name}}] AS [{{targetDataItem.name}}]{{#unless @last}},{{/unless}}
{{/each}}
FROM [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{relatedDataObjects.0.name}}] A
JOIN
(
SELECT {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
B.[{{sourceDataItems.0.name}}], {{/each}} {{/each}}
[{{../conventions.inscriptionTimeStampColumn}}] AS [MAX_{{../conventions.inscriptionTimeStampColumn}}],
MAX([{{../conventions.inscriptionRecordIdColumn}}]) OVER
(
PARTITION BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{sourceDataItems.0.name}}], {{/each}} {{/each}}
[{{../conventions.inscriptionTimeStampColumn}}]
ORDER BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{sourceDataItems.0.name}}], {{/each}} {{/each}}
[{{../conventions.inscriptionTimeStampColumn}}]
) AS [MAX_{{../conventions.inscriptionRecordIdColumn}}],
ROW_NUMBER() OVER
(
PARTITION BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{sourceDataItems.0.name}}]{{#unless @last}},{{/unless}} {{/each}} {{/each}}
ORDER BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{sourceDataItems.0.name}}], {{/each}} {{/each}}
{{../conventions.inscriptionTimeStampColumn}} DESC
) AS MAX_ROWNUM
FROM [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{relatedDataObjects.0.name}}] B
) C ON {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
A.[{{targetDataItem.name}}] = C.[{{targetDataItem.name}}] AND{{/each}} {{/each}}
A.[{{../conventions.inscriptionRecordIdColumn}}] = C.[MAX_{{../conventions.inscriptionRecordIdColumn}}] AND
A.[{{../conventions.inscriptionTimeStampColumn}}] = C.[MAX_{{../conventions.inscriptionTimeStampColumn}}] AND
1 = C.[MAX_ROWNUM]
WHERE {{../conventions.changeDataColumn}} != 'D'
)
INSERT INTO [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{targetDataObject.name}}]
(
[{{../conventions.inscriptionTimeStampColumn}}],
[{{../conventions.inscriptionRecordIdColumn}}],
[{{../conventions.sourceTimestampColumn}}],
[{{../conventions.changeDataColumn}}],
[{{../conventions.auditTrailIdColumn}}],
[{{../conventions.checksumColumn}}],
{{#each dataItemMappings}}
[{{stringupper sourceDataItems.0.name}}]{{#unless @last}},{{/unless}}
{{/each}}
)
SELECT
-- Framework columns
SYSDATETIME() [{{../conventions.inscriptionTimeStampColumn}}],
ROW_NUMBER() OVER
(ORDER BY
{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}CASE WHEN STG_CTE.[{{sourceDataItems.0.name}}] IS NULL THEN PSA_CTE.[{{sourceDataItems.0.name}}] ELSE STG_CTE.[{{sourceDataItems.0.name}}] END{{#unless @last}},{{/unless}}{{/each}}{{/each}}
) AS [{{../conventions.inscriptionRecordIdColumn}}],
SYSDATETIME() [{{../conventions.sourceTimestampColumn}}],
CASE
WHEN STG_CTE.{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NULL THEN 'D'
WHEN PSA_CTE.{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NULL THEN 'C' --Inserts are also C
WHEN STG_CTE.{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NOT NULL
AND PSA_CTE.{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NOT NULL
AND STG_CTE.[{{../conventions.checksumColumn}}] != PSA_CTE.[{{../conventions.checksumColumn}}] THEN 'C' ELSE 'No Change'
END AS {{../conventions.changeDataColumn}},
@AUDIT_TRAIL_ID AS [{{../conventions.auditTrailIdColumn}}],
CASE
WHEN STG_CTE.{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{stringupper sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NULL
THEN PSA_CTE.[{{../conventions.checksumColumn}}]
ELSE STG_CTE.[{{../conventions.checksumColumn}}]
END AS [{{../conventions.checksumColumn}}],
-- Regular columns
{{#each dataItemMappings}}
CASE
WHEN STG_CTE.{{#each ../businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}[{{sourceDataItems.0.name}}]{{/if}}{{/each}}{{/each}} IS NULL
THEN PSA_CTE.[{{stringupper sourceDataItems.0.name}}]
ELSE STG_CTE.[{{stringupper sourceDataItems.0.name}}]
END AS [{{stringupper sourceDataItems.0.name}}]{{#unless @last}},{{/unless}}{{/each}}
FROM STG_CTE
FULL OUTER JOIN PSA_CTE ON {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
PSA_CTE.[{{stringupper sourceDataItems.0.name}}] = STG_CTE.[{{stringupper sourceDataItems.0.name}}]{{#unless @last}} AND{{/unless}}{{/each}} {{/each}}
WHERE
(
CASE
WHEN {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}STG_CTE.[{{stringupper sourceDataItems.0.name}}] IS NULL THEN 'D'{{/if}}{{/each}}{{/each}}
WHEN {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}PSA_CTE.[{{stringupper sourceDataItems.0.name}}] IS NULL THEN 'C'{{/if}}{{/each}}{{/each}}
WHEN {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}{{#if @first}}PSA_CTE.[{{stringupper sourceDataItems.0.name}}] IS NOT NULL AND PSA_CTE.{{stringupper sourceDataItems.0.name}} IS NOT NULL AND STG_CTE.[{{../../../conventions.checksumColumn}}] != PSA_CTE.[{{../../../conventions.checksumColumn}}] THEN 'C'{{/if}}{{/each}}{{/each}}
ELSE 'No Change'
END
) != 'No Change'
GO
{{!-- Integration with the control framework --}}
{{#each extensions}}
{{#stringcompare key "hasControlFramework"}}
/*
-- Integration with the control framework, module registration
EXEC [{{lookupExtension ../../extensions "controlFrameworkDataStore"}}].[{{lookupExtension ../../extensions "controlFrameworkLocation"}}].[RegisterModule]
@ModuleCode = 'm_{{../../targetDataObject.name}}'
,@ModuleAreaCode = '{{../../../conventions.persistentStagingAreaObjectPrefix}}'
,@Executable = 'EXEC [{{lookupExtension ../../targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../targetDataObject.dataConnection.extensions "location"}}].[SP_{{../../targetDataObject.name}}] @AUDIT_TRAIL_ID = @ModuleInstanceId'
,@ModuleDescription = 'Persistent Staging Area process for [{{../../sourceDataObjects.0.name}}]'
,@ModuleSourceDataObject = '[{{lookupExtension ../../sourceDataObjects.0.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../sourceDataObjects.0.dataConnection.extensions "location"}}].[{{../../sourceDataObjects.0.name}}]'
,@ModuleTargetDataObject = '[{{lookupExtension ../../targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../targetDataObject.dataConnection.extensions "location"}}].[{{../../targetDataObject.name}}]'
-- Run the process
EXEC [{{lookupExtension ../../extensions "controlFrameworkDataStore"}}].[{{lookupExtension ../../extensions "controlFrameworkLocation"}}].[RunModule]
@ModuleCode = 'm_{{../../targetDataObject.name}}'
,@ModuleInstanceIdColumnName='AUDIT_TRAIL_ID'
*/
{{/stringcompare}}{{/each}}
{{/each}}