Skip to content

Landing Area Stored Procedure Delta

This code-generation template copies all data from the source object into the staging/landing object. No change data capture (CDC) or any form of detecting data differential is implemented in this template. The purpose is to get the data into the data solution environment for further processing, including detection of any data changes.

Getting data into the data solution is often one of the hardest features to implement. This is due to the great variety of possible data sources (systems, APIs, technologies), compounded by company-specific limitations, policies, and the velocity, veracity and volume of the data itself.

In some cases, the only reliable way to detect data changes is to apply a Full Outer Join (FOJ) mechanism to detect changes. This may not be possible to run against the source data object directly - for a wide range of reasons. As an alternative, data can be copied into the landing object using this template and processed further within the data solution platform.

  • SQL Server family databases; the template uses procedural SQL (T-SQL) syntax.

This template requires a dedicated, transient, staging (landing) area. As part of the procedure it generates, the template will truncate the target data object -the Landing Area object- and load all data into the staging object.

  • The template requires both the source and target databases to be accessible, either via a Linked Server, on-premises cross-database query feature, or because the source- and target data sets are located in the same database.
  • This pattern has limited scalability, and is best applied for smaller data sets.
  • Subsequent data delta / differential detection is recommended downstream, to be able to detect logical deletes.
{{#each dataObjectMappings~}}
--WIP
CREATE OR ALTER PROCEDURE [SP_{{targetDataObject.name}}] @{{../conventions.auditTrailIdColumn}} INT = 0 -- Applying 0 as default value
AS
--
-- Staging Area Stored Procedure for {{targetDataObject.name}} using a DIRECT control framework wrapper.
-- This template copies all data available in the source data object into the landing area.
-- When the data is available in the landing area, subsequent processes can be used to load the data into downstream objects or used to derive data delta.
--
-- Generated from template '{{../templateMetadata.name}}'.
--
TRUNCATE TABLE [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{targetDataObject.name}}];
WITH SOURCE_CTE AS
(
-- The source data object, which will be copied in its entirety.
SELECT
{{#each dataItemMappings}}
[{{sourceDataItems.0.name}}] AS [{{stringupper targetDataItem.name}}],
{{/each}}
HASHBYTES('MD5',
{{#each dataItemMappings}}
ISNULL(RTRIM(CONVERT(NVARCHAR(100),[{{sourceDataItems.0.name}}])), 'N/A') + '#~!'{{#unless @last}} +{{/unless}}
{{/each}}
) AS [{{../conventions.checksumColumn}}],
{{!-- Source row Id implemented for when not using identify column in the staging table --}}
ROW_NUMBER() OVER ( ORDER BY {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}[{{sourceDataItems.0.name}}] {{#unless @last}},{{/unless}}{{/each}}{{/each}}) AS {{../conventions.inscriptionRecordIdColumn}}
FROM [{{lookupExtension sourceDataObjects.0.dataConnection.extensions "datastore"}}].[{{lookupExtension sourceDataObjects.0.dataConnection.extensions "location"}}].[{{sourceDataObjects.0.name}}]
)
INSERT INTO [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{stringupper targetDataObject.name}}]
(
[{{../conventions.auditTrailIdColumn}}],
{{#each dataItemMappings}}
[{{stringupper sourceDataItems.0.name}}],
{{/each}}
[{{../conventions.checksumColumn}}],
[{{../conventions.changeDataColumn}}],
{{!-- Source row Id implemented for when not using identify column in the staging table --}}
--[{{../conventions.inscriptionRecordIdColumn}}],
[{{../conventions.sourceTimestampColumn}}]
)
SELECT
@AUDIT_TRAIL_ID AS [{{../conventions.auditTrailIdColumn}}],
{{#each dataItemMappings}}
[{{stringupper sourceDataItems.0.name}}],
{{/each}}
[{{../conventions.checksumColumn}}],
{{!-- Because no CDC is applied, the change data indicator is always 'C'--}}
'C' {{../conventions.changeDataColumn}},
{{!-- Source row Id implemented for when not using identify column in the staging table --}}
--ROW_NUMBER() OVER ( ORDER BY {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}CASE WHEN SOURCE_CTE.[{{sourceDataItems.0.name}}] IS NULL THEN PSA_CTE.[{{sourceDataItems.0.name}}] ELSE SOURCE_CTE.[{{sourceDataItems.0.name}}] END{{#unless @last}},{{/unless}}{{/each}}{{/each}}) AS {{../conventions.inscriptionRecordIdColumn}},
SYSDATETIME() AS [{{../conventions.sourceTimestampColumn}}]
FROM SOURCE_CTE
GO
{{!-- Integration with the control framework --}}
/*
{{#each extensions}}
{{#stringcompare key "hasControlFramework"}}
-- Integration with the control framework, module registration.
EXEC [{{lookupExtension ../../extensions "controlFrameworkDataStore"}}].[{{lookupExtension ../../extensions "controlFrameworkLocation"}}].[RegisterModule]
@ModuleCode = 'm_{{../../targetDataObject.name}}'
,@ModuleAreaCode = '{{../../../conventions.landingAreaObjectPrefix}}'
,@Executable = 'EXEC [{{lookupExtension ../../targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../targetDataObject.dataConnection.extensions "location"}}].[SP_{{../../targetDataObject.name}}] @AUDIT_TRAIL_ID = @ModuleInstanceId'
,@ModuleDescription = 'Staging Area process for [{{../../sourceDataObjects.0.name}}]'
,@ModuleSourceDataObject = '[{{lookupExtension ../../sourceDataObjects.0.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../sourceDataObjects.0.dataConnection.extensions "location"}}].[{{../../sourceDataObjects.0.name}}]'
,@ModuleTargetDataObject = '[{{lookupExtension ../../targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../targetDataObject.dataConnection.extensions "location"}}].[{{../../targetDataObject.name}}]'
-- Example: run the process.
EXEC [{{lookupExtension ../../extensions "controlFrameworkDataStore"}}].[{{lookupExtension ../../extensions "controlFrameworkLocation"}}].[RunModule]
@ModuleCode = 'm_{{../../targetDataObject.name}}'
,@ModuleInstanceIdColumnName='AUDIT_TRAIL_ID'
{{/stringcompare}}{{/each}}*/
{{/each}}