Persistent Staging Area Stored Procedure Delta
Purpose
Section titled “Purpose”This code-generation template moves data delta from the staging / landing object into the Persistent Staging Area (PSA). The template expects the data delta to already be available in the landing object, e.g. the CDC processing must be implemented there for this template to work as expected.
The generated process will perform a left-join to prevent any data to be accidentally inserted more than once - which would lead to a constraint violation.
Motivation
Section titled “Motivation”The PSA, when made part of the solution architecture, is a foundational feature of the data solution. The role as ‘transaction log’ of all data transactions that have been presented to the data solution requires a number of considerations, including prevention of record redundancy and ability to process all data, at all times.
The intent for this template is to be able to run the PSA process it generates continuously, or as part of a (micro) batch, but without any further dependencies.
Applicability
Section titled “Applicability”- SQL Server family databases; the template uses procedural SQL (T-SQL) syntax.
Design Pattern
Section titled “Design Pattern”Schema Type
Section titled “Schema Type”Output Type
Section titled “Output Type”Implementation guidelines
Section titled “Implementation guidelines”This template supports idempotence and re-runnability by implementing two lookups (left outer joins).
The first join will check the ‘oldest’ record in the incoming data set against the ‘most recently arrived’ record in the PSA. If these records are the same, the record is omitted. If not, the records are inserted. Any subsequent records have already been identified as change records, and will be inserted into the PSA directly.
The second join will ensure that the data has not been loaded before, for example by running the same process more than once. Assuming a inscription timestamp is set on arrival, and made part of the PSA key, it is conceptually not possible to have multiple changes on the same point in time.
This PSA template detects this, so that it is possible to load, and re-load, all data at all times without encountering constraint violations.
Considerations and consequences
Section titled “Considerations and consequences”- The template requires both the source and target databases to be accessible, either via a Linked Server, on-premises cross-database query feature, or because the source- and target data sets are located in the same database.
- The template requires a prior process to have determined the data delta. It does not perform any CDC itself.
Extensions
Section titled “Extensions”{{#each dataObjectMappings~}}
CREATE OR ALTER PROCEDURE [{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[SP_{{targetDataObject.name}}] @{{../conventions.auditTrailIdColumn}} INT = 0 --Applying 0 as default value.AS
---- Persistent Staging Area Stored Procedure for {{targetDataObject.name}} using a DIRECT control framework wrapper.-- This template expects a data delta to already be prepared in the landing area, and performs a left join to prevent constraint violations when inserting data that might already be loaded into the PSA.---- Generated from template '{{../templateMetadata.name}}'.--
INSERT INTO [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{targetDataObject.name}}]( [{{../conventions.auditTrailIdColumn}}], [{{../conventions.inscriptionTimeStampColumn}}], [{{../conventions.sourceTimestampColumn}}], [{{../conventions.inscriptionRecordIdColumn}}], [{{../conventions.changeDataColumn}}], [{{../conventions.checksumColumn}}], {{#each dataItemMappings}} [{{stringupper sourceDataItems.0.name}}]{{#unless @last}},{{/unless}} {{/each}})SELECT @AUDIT_TRAIL_ID AS [{{../conventions.auditTrailIdColumn}}], [{{../conventions.inscriptionTimeStampColumn}}], [{{../conventions.sourceTimestampColumn}}], [{{../conventions.inscriptionRecordIdColumn}}], [{{../conventions.changeDataColumn}}], [{{../conventions.checksumColumn}}], {{#each dataItemMappings}} [{{stringupper targetDataItem.name}}]{{#unless @last}},{{/unless}} {{/each}} {{!-- Debugging --}} --[LKP_{{../conventions.checksumColumn}}], --[LKP_{{../conventions.changeDataColumn}}], --[KEY_ROW_NUMBER]FROM( SELECT STG.[{{../conventions.inscriptionTimeStampColumn}}], STG.[{{../conventions.sourceTimestampColumn}}], STG.[{{../conventions.inscriptionRecordIdColumn}}], STG.[{{../conventions.changeDataColumn}}], STG.[{{../conventions.checksumColumn}}], {{#each dataItemMappings}} STG.[{{sourceDataItems.0.name}}], {{/each}} COALESCE(maxsub.[LKP_{{../conventions.checksumColumn}}], 'N/A') AS [LKP_{{../conventions.checksumColumn}}], COALESCE(maxsub.[LKP_{{../conventions.changeDataColumn}}], 'C') AS [LKP_{{../conventions.changeDataColumn}}], CAST(ROW_NUMBER() OVER ( PARTITION BY {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}STG.[{{sourceDataItems.0.name}}]{{#unless @last}},{{/unless}} {{/each}}{{/each}} ORDER BY {{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}STG.[{{sourceDataItems.0.name}}], {{/each}}{{/each}}STG.[{{../conventions.inscriptionTimeStampColumn}}], STG.[{{../conventions.inscriptionRecordIdColumn}}] ) AS INT ) AS KEY_ROW_NUMBER FROM [{{lookupExtension sourceDataObjects.0.dataConnection.extensions "datastore"}}].[{{lookupExtension sourceDataObjects.0.dataConnection.extensions "location"}}].[{{sourceDataObjects.0.name}}] STG -- Prevent reprocessing LEFT OUTER JOIN [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{targetDataObject.name}}] HSTG ON {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} HSTG.[{{targetDataItem.name}}] = STG.[{{sourceDataItems.0.name}}] AND{{/each}} {{/each}} HSTG.[{{../conventions.inscriptionRecordIdColumn}}] = STG.[{{../conventions.inscriptionRecordIdColumn}}] AND HSTG.[{{../conventions.inscriptionTimeStampColumn}}] = STG.[{{../conventions.inscriptionTimeStampColumn}}] -- Query the most recently arrived PSA record which is not logically deleted. LEFT OUTER JOIN ( SELECT {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} A.[{{sourceDataItems.0.name}}], {{/each}} {{/each}} A.[{{../conventions.inscriptionRecordIdColumn}}], A.[{{../conventions.checksumColumn}}] AS [LKP_{{../conventions.checksumColumn}}], A.[{{../conventions.changeDataColumn}}] AS [LKP_{{../conventions.changeDataColumn}}] FROM [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{targetDataObject.name}}] A JOIN ( SELECT {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} B.[{{sourceDataItems.0.name}}], {{/each}} {{/each}} {{../conventions.inscriptionTimeStampColumn}} AS MAX_{{../conventions.inscriptionTimeStampColumn}}, MAX({{../conventions.inscriptionRecordIdColumn}}) OVER ( PARTITION BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} [{{sourceDataItems.0.name}}], {{/each}} {{/each}} {{../conventions.inscriptionTimeStampColumn}} ORDER BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} [{{sourceDataItems.0.name}}], {{/each}} {{/each}} {{../conventions.inscriptionTimeStampColumn}} ) AS MAX_{{../conventions.inscriptionRecordIdColumn}}, ROW_NUMBER() OVER ( PARTITION BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} [{{sourceDataItems.0.name}}]{{#unless @last}},{{/unless}} {{/each}} {{/each}} ORDER BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} [{{sourceDataItems.0.name}}], {{/each}} {{/each}} {{../conventions.inscriptionTimeStampColumn}} DESC ) AS MAX_ROWNUM FROM [{{lookupExtension targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension targetDataObject.dataConnection.extensions "location"}}].[{{targetDataObject.name}}] B ) C ON {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} A.[{{targetDataItem.name}}] = C.[{{targetDataItem.name}}] AND{{/each}} {{/each}} A.[{{../conventions.inscriptionRecordIdColumn}}] = C.[MAX_{{../conventions.inscriptionRecordIdColumn}}] AND A.[{{../conventions.inscriptionTimeStampColumn}}] = C.[MAX_{{../conventions.inscriptionTimeStampColumn}}] AND 1 = C.[MAX_ROWNUM] WHERE {{../conventions.changeDataColumn}} != 'D' ) maxsub ON {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} STG.[{{sourceDataItems.0.name}}] = maxsub.[{{targetDataItem.name}}] AND{{/each}} {{/each}} STG.[{{../conventions.inscriptionRecordIdColumn}}] = maxsub.[{{../conventions.inscriptionRecordIdColumn}}] WHERE {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} {{#if @first}} HSTG.[{{targetDataItem.name}}] IS NULL -- prevent reprocessing{{/if}}{{/each}} {{/each}}) subWHERE( KEY_ROW_NUMBER=1 AND ( ( {{../conventions.checksumColumn}} != LKP_{{../conventions.checksumColumn}} ) -- The checksums are different OR ( [{{../conventions.checksumColumn}}] = [LKP_{{../conventions.checksumColumn}}] AND [{{../conventions.changeDataColumn}}] != [LKP_{{../conventions.changeDataColumn}}] ) -- The checksums are the same but the CDC is different -- In other words, if the hash is the same AND the CDC operation is the same then there is no change ))OR( -- It's not the most recent change in the set, so the record can be inserted as-is KEY_ROW_NUMBER!=1)GO
{{!-- Integration with the control framework --}}{{#each extensions}}{{#stringcompare key "hasControlFramework"}}/*-- Integration with the control framework, module registrationEXEC [{{lookupExtension ../../extensions "controlFrameworkDataStore"}}].[{{lookupExtension ../../extensions "controlFrameworkLocation"}}].[RegisterModule] @ModuleCode = 'm_{{../../targetDataObject.name}}',@ModuleAreaCode = '{{../../../conventions.persistentStagingAreaObjectPrefix}}',@Executable = 'EXEC [{{lookupExtension ../../targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../targetDataObject.dataConnection.extensions "location"}}].[SP_{{../../targetDataObject.name}}] @AUDIT_TRAIL_ID = @ModuleInstanceId',@ModuleDescription = 'Persistent Staging Area process for [{{../../sourceDataObjects.0.name}}]',@ModuleSourceDataObject = '[{{lookupExtension ../../sourceDataObjects.0.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../sourceDataObjects.0.dataConnection.extensions "location"}}].[{{../../sourceDataObjects.0.name}}]',@ModuleTargetDataObject = '[{{lookupExtension ../../targetDataObject.dataConnection.extensions "datastore"}}].[{{lookupExtension ../../targetDataObject.dataConnection.extensions "location"}}].[{{../../targetDataObject.name}}]'
-- Run the processEXEC [{{lookupExtension ../../extensions "controlFrameworkDataStore"}}].[{{lookupExtension ../../extensions "controlFrameworkLocation"}}].[RunModule] @ModuleCode = 'm_{{../../targetDataObject.name}}',@ModuleInstanceIdColumnName='AUDIT_TRAIL_ID'*/{{/stringcompare}}{{/each}}{{/each}}