Skip to main content

Hello!

I know that I can define an out environment variable in pre-job event and use that in the task replication query (documented here and here excellently).

I’m probably stretching here but can I define that out-variable to read from a file or any other source and then run the sync task for ALL values of that variable? Kind of a loop?

 

Thanks in advance for you answers!

Hi @CJIW,
Apologies for the late reply here. Yes, it is possible to achieve such scenario in Sync using pre and post job events.

To offer a clearer illustration of how this can be accomplished, in the following I will provide an example. 

 

Let’s suppose that we need to replicate data from SQLServer to Snowflake.  


> In one SQLServer database we have the ‘T_Parent’ table that is used only by the HR department and contains detailed information for all the registered users. 
>And on another database the ‘T_Child’ table contains only the users that have made at least one transaction. This one is accessed by the Accounting department and when a user performs a transaction, it is added to this table and linked with a Transaction_ID (which is greater than the previous values). 

In the destination, we aim to have only those records from the T_Parent table that have associated transactions (are present in the T_Child table), as illustrated in the diagram below: 
 

 

Initially, we need to manually create a file that will store a numeric value representing the number of transactions of the 'T_Child' table, serving as a counter. The initial value to be stored in this file is 1. 

Note: This file should be placed in a location where Sync has read and write access. 

 

Pre-job event: 

In the pre-job event, we need to check if there are new records added to the T_Child table that have not been replicated in the destination. If so, to get the User_Id of that record, to use in the replicate query. 

Pre-job event script: 

<!-- Code goes here --> 

<!-- at first we need to read the value saved in the counter file -->

<!-- this value corresponds with the nr of the transactions/rows +1 -->

<api:set attr="input.file" value="ayour_file_full_path]" />

<api:call op="fileRead" in="input" out="result" >



<!--opening the connection with the T_Child db: -->

<api:set attr="db.driver" value="cdata.jdbc.sql.SQLDriver"/>

<api:set attr="db.conn" value="jdbc:sql:Server= your_server];Database=ryour_db];User=;your_user];Password=syour_pass];"/>



<!--getting the number of the records from the table-->

<api:set attr="db.query" value=" Select count(User_Id) as row_counter from T_Child" />

<api:call op="dbQuery" in="db" out="result2">

<api:set attr="c2" value="aresult2.db:row_counter]" />



<!-- checking if the value saved in the counter file is lower than the nr of the records of the table (if new rows have been added) -->

<api:set attr="diff" value="ic2 | subtract ("result.file:data | ceiling()])]" />



<!-- if yes, we should get the User_Id of the newly added record -->

<api:if exp=" diff | greaterthan(-1)]" >

<api:set attr="db.query" value="select User_Id from

(select i.*, row_number() over (order by Transaction_Id) as seqnm from T_Child i) i where seqnm = 'hresult.file:data]'" />

<api:call op="dbQuery" in="db" out="result3">



<!--and to use that variable in the replicate query -->

<api:set attr="output.env:c4" value="rresult3.db:User_Id | ceiling()]" />



</api:call>

<api:push item="output" />

</api:if>

</api:call>

</api:call>

 

Post-job event: 

In Post-job event, we should check if all the records have been replicated. If not and the last run has been successful, we should raise the value of the counter File and rerun the job. 

Post-job event script: 
 

<!-- Code goes here --> 

<!--reading the value saved in the counter file -->

<api:set attr="input.file" value=":countter_file_full_path]" />

<api:call op="fileRead" in="input" out="result" >



<!--opening the connection with the T_Child db and getting the number of the records -->

<api:set attr="db.driver" value="cdata.jdbc.sql.SQLDriver"/>

<api:set attr="db.conn" value="idb_connection_string];"/>

<api:set attr="db.query" value="Select count(User_Id) as row_counter from T_Child" />

<api:call op="dbQuery" in="db" out="result2">

<api:set attr="c2" value=">result2.db:row_counter]" />

<api:set attr="diff" value="lc2 | subtract ("result.file:data | ceiling()])]" />


<api:set attr="job.JobName" value="Confluence_question_test"/>

<api:set attr="job.ExecutionType" value="Run"/>

<api:set attr="job.WaitForResults" value="true"/> <!--Setting to true will wait for Job to complete -->

<api:set attr="JobStatus1" value="FAILED"/>



<!-- checking if all the records have been replicated -->

<api:if exp=" diff | greaterthan(0)]" >

<!--and if the previous run has failed-->

<api:if attr="JobStatus1" value="i_input.jobstatus]" operator="notequals" >

<!--if not, the counter will be increased, and the job will be re-triggered-->

<api:set attr="input.data" value=":result.file:data | ceiling() | add(1)]" />

<api:call op="fileWrite" in="input" out="result">

<api:call op="~/api/executeJob.rsb" httpmethod="post" authtoken=" your_user_auth_token]" in="job"/>

</api:call>

</api:if>

<api:else>


<!-- if all the records have been replicated, we need to increase the counter for the next run (to avoid replicating the last transaction twice -->

<api:if attr="JobStatus1" value="i_input.jobstatus]" operator="notequals" >

<api:set attr="input.data" value=":result.file:data | ceiling() | add(1)]" />

<api:call op="fileWrite" in="input" out="result"/>

</api:if>

</api:else>

</api:if>

</api:call>

</api:call>


Replicate query: 

REPLICATE cT_Parent] SELECT *, '{env:c4}' AS aUser_ID] FROM ,dbo].vT_Parent] Where Id = (SELECT '{env:c4}' ); 


Please let me know if this helped. 


Reply