Skip to content

Persistent Staging Area Stored Procedure Delta

This code-generation template moves data delta from the staging / landing object into the Persistent Staging Area (PSA). The template expects the data delta to already be available in the landing object, e.g. the CDC processing must be implemented there for this template to work as expected.

The generated process will perform a left-join to prevent any data to be accidentally inserted more than once - which would lead to a constraint violation.

The PSA, when made part of the solution architecture, is a foundational feature of the data solution. The role as ‘transaction log’ of all data transactions that have been presented to the data solution requires a number of considerations, including prevention of record redundancy and ability to process all data, at all times.

The intent for this template is to be able to run the PSA process it generates continuously, or as part of a (micro) batch, but without any further dependencies.

  • Snowflake

This template supports idempotence and re-runnability by implementing two lookups (left outer joins).

The first join will check the ‘oldest’ record in the incoming data set against the ‘most recently arrived’ record in the PSA. If these records are the same, the record is omitted. If not, the records are inserted. Any subsequent records have already been identified as change records, and will be inserted into the PSA directly.

The second join will ensure that the data has not been loaded before, for example by running the same process more than once. Assuming a inscription timestamp is set on arrival, and made part of the PSA key, it is conceptually not possible to have multiple changes on the same point in time.

This PSA template detects this, so that it is possible to load, and re-load, all data at all times without encountering constraint violations.

  • The template requires both the source and target databases to be accessible, either via a Linked Server, on-premises cross-database query feature, or because the source- and target data sets are located in the same database.
  • The template requires a prior process to have determined the data delta. It does not perform any CDC itself.
{{#each dataObjectMappings~}}
CREATE OR ALTER PROCEDURE [SP_{{targetDataObject.name}}] @{{../conventions.auditTrailIdColumn}} INT
AS
--
-- Persistent Staging Area Stored Procedure for {{targetDataObject.name}} using a DIRECT control framework wrapper.
-- This template expects a data delta to already be prepared in the landing area, and performs a left join to prevent constraint violations when inserting data that might already be loaded into the PSA.
--
-- Generated from template '{{../templateMetadata.name}}'.
--
INSERT INTO [{{lookupExtension targetDataObject.extensions "datastore"}}].[{{lookupExtension targetDataObject.extensions "location"}}].[{{targetDataObject.name}}]
(
[{{../conventions.auditTrailIdColumn}}],
[{{../conventions.inscriptionTimeStampColumn}}],
[{{../conventions.sourceTimestampColumn}}],
[{{../conventions.inscriptionRecordIdColumn}}],
[{{../conventions.changeDataColumn}}],
[{{../conventions.checksumColumn}}],
{{#each dataItemMappings}}
[{{sourceDataItems.0.name}}]{{#unless @last}},{{/unless}}
{{/each}}
)
SELECT
@AUDIT_TRAIL_ID AS [{{../conventions.auditTrailIdColumn}}],
[{{../conventions.inscriptionTimeStampColumn}}],
[{{../conventions.sourceTimestampColumn}}],
[{{../conventions.inscriptionRecordIdColumn}}],
[{{../conventions.changeDataColumn}}],
[{{../conventions.checksumColumn}}],
{{#each dataItemMappings}}
[{{targetDataItem.name}}]{{#unless @last}},{{/unless}}
{{/each}}
{{!-- Debugging --}}
--[LKP_{{../conventions.checksumColumn}}],
--[LKP_{{../conventions.changeDataColumn}}],
--[KEY_ROW_NUMBER]
FROM
(
SELECT
STG.[{{../conventions.inscriptionTimeStampColumn}}],
STG.[{{../conventions.sourceTimestampColumn}}],
STG.[{{../conventions.inscriptionRecordIdColumn}}],
STG.[{{../conventions.changeDataColumn}}],
STG.[{{../conventions.checksumColumn}}],
{{#each dataItemMappings}}
STG.[{{sourceDataItems.0.name}}],
{{/each}}
COALESCE(maxsub.[LKP_{{../conventions.checksumColumn}}], 'N/A') AS [LKP_{{../conventions.checksumColumn}}],
COALESCE(maxsub.[LKP_{{../conventions.changeDataColumn}}], 'C') AS [LKP_{{../conventions.changeDataColumn}}],
CAST(ROW_NUMBER() OVER
( PARTITION BY
{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}STG.[{{sourceDataItems.0.name}}]{{#unless @last}},{{/unless}}
{{/each}}{{/each}}
ORDER BY
{{#each businessKeyDefinitions}}{{#each businessKeyComponentMappings}}STG.[{{sourceDataItems.0.name}}],
{{/each}}{{/each}}
STG.[{{../conventions.inscriptionTimeStampColumn}}],
STG.[{{../conventions.inscriptionRecordIdColumn}}]
) AS INT
) AS KEY_ROW_NUMBER
FROM [{{lookupExtension sourceDataObjects.0.extensions "datastore"}}].[{{lookupExtension sourceDataObjects.0.extensions "location"}}].[{{sourceDataObjects.0.name}}] STG
-- Prevent reprocessing
LEFT OUTER JOIN [{{lookupExtension targetDataObject.extensions "datastore"}}].[{{lookupExtension targetDataObject.extensions "location"}}].[{{targetDataObject.name}}] HSTG
ON
{{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
HSTG.[{{targetDataItem.name}}] = STG.[{{sourceDataItems.0.name}}] AND{{/each}} {{/each}}
HSTG.[{{../conventions.inscriptionRecordIdColumn}}] = STG.[{{../conventions.inscriptionRecordIdColumn}}] AND
HSTG.[{{../conventions.inscriptionTimeStampColumn}}] = STG.[{{../conventions.inscriptionTimeStampColumn}}]
-- Query the most recently arrived PSA record which is not logically deleted.
LEFT OUTER JOIN
(
SELECT {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
A.[{{sourceDataItems.0.name}}], {{/each}} {{/each}}
A.[{{../conventions.inscriptionRecordIdColumn}}],
A.[{{../conventions.checksumColumn}}] AS [LKP_{{../conventions.checksumColumn}}],
A.[{{../conventions.changeDataColumn}}] AS [LKP_{{../conventions.changeDataColumn}}]
FROM [{{lookupExtension targetDataObject.extensions "datastore"}}].[{{lookupExtension targetDataObject.extensions "location"}}].[{{targetDataObject.name}}] A
JOIN
(
SELECT {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
B.[{{sourceDataItems.0.name}}], {{/each}} {{/each}}
{{../conventions.inscriptionTimeStampColumn}} AS MAX_{{../conventions.inscriptionTimeStampColumn}},
MAX({{../conventions.inscriptionRecordIdColumn}}) OVER
(
PARTITION BY
{{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{sourceDataItems.0.name}}], {{/each}} {{/each}}
{{../conventions.inscriptionTimeStampColumn}}
ORDER BY
{{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{sourceDataItems.0.name}}], {{/each}} {{/each}}
{{../conventions.inscriptionTimeStampColumn}}
) AS MAX_{{../conventions.inscriptionRecordIdColumn}},
ROW_NUMBER() OVER
(
PARTITION BY {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{sourceDataItems.0.name}}]{{#unless @last}},{{/unless}} {{/each}} {{/each}}
ORDER BY
{{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
[{{sourceDataItems.0.name}}], {{/each}} {{/each}}
{{../conventions.inscriptionTimeStampColumn}} DESC
) AS MAX_ROWNUM
FROM [{{lookupExtension targetDataObject.extensions "datastore"}}].[{{lookupExtension targetDataObject.extensions "location"}}].[{{targetDataObject.name}}] B
) C ON {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
A.[{{targetDataItem.name}}] = C.[{{targetDataItem.name}}] AND{{/each}} {{/each}}
A.[{{../conventions.inscriptionRecordIdColumn}}] = C.[MAX_{{../conventions.inscriptionRecordIdColumn}}] AND
A.[{{../conventions.inscriptionTimeStampColumn}}] = C.[MAX_{{../conventions.inscriptionTimeStampColumn}}] AND
1 = C.[MAX_ROWNUM]
WHERE {{../conventions.changeDataColumn}} != 'D'
) maxsub ON {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}}
STG.[{{sourceDataItems.0.name}}] = maxsub.[{{targetDataItem.name}}] AND{{/each}} {{/each}}
STG.[{{../conventions.inscriptionRecordIdColumn}}] = maxsub.[{{../conventions.inscriptionRecordIdColumn}}]
WHERE {{#each businessKeyDefinitions}} {{#each businessKeyComponentMappings}} {{#if @first}}
HSTG.[{{targetDataItem.name}}] IS NULL -- prevent reprocessing{{/if}}{{/each}} {{/each}}
) sub
WHERE
(
KEY_ROW_NUMBER=1
AND
(
( {{../conventions.checksumColumn}} != LKP_{{../conventions.checksumColumn}} )
-- The checksums are different
OR
( [{{../conventions.checksumColumn}}] = [LKP_{{../conventions.checksumColumn}}] AND
[{{../conventions.changeDataColumn}}] != [LKP_{{../conventions.changeDataColumn}}] )
-- The checksums are the same but the CDC is different
-- In other words, if the hash is the same AND the CDC operation is the same then there is no change
)
)
OR
(
-- It's not the most recent change in the set, so the record can be inserted as-is
KEY_ROW_NUMBER!=1
)
SELECT @@ROWCOUNT AS ROWS_INSERTED
GO
{{!-- Integration with the control framework --}}
{{#each extensions}}
{{#stringcompare key "hasControlFramework"}}
-- Integration with the control framework, module registration
EXEC [{{lookupExtension ../../extensions "controlFrameworkDataStore"}}].[{{lookupExtension ../../extensions "controlFrameworkLocation"}}].[RegisterModule]
@ModuleCode = '150_{{../../targetDataObject.name}}'
,@ModuleAreaCode = 'PSA'
,@Executable = 'EXEC [{{lookupExtension ../../targetDataObject.extensions "datastore"}}].[{{lookupExtension ../../targetDataObject.extensions "location"}}].[SP_{{../../targetDataObject.name}}] @AUDIT_TRAIL_ID = @ModuleInstanceId'
,@ModuleDescription = 'Persistent Staging Area process for [{{../../sourceDataObjects.0.name}}]'
,@ModuleSourceDataObject = '[{{lookupExtension ../../sourceDataObjects.0.extensions "datastore"}}].[{{lookupExtension ../../sourceDataObjects.0.extensions "location"}}].[{{../../sourceDataObjects.0.name}}]'
,@ModuleTargetDataObject = '[{{lookupExtension ../../targetDataObject.extensions "datastore"}}].[{{lookupExtension ../../targetDataObject.extensions "location"}}].[{{../../targetDataObject.name}}]'
-- Run the process
EXEC [{{lookupExtension ../../extensions "controlFrameworkDataStore"}}].[{{lookupExtension ../../extensions "controlFrameworkLocation"}}].[RunModule] @ModuleCode = '150_{{../../targetDataObject.name}}', @Debug='Y', @ModuleInstanceIdColumnName='AUDIT_TRAIL_ID'
{{/stringcompare}}{{/each}}
{{/each}}