SnowFlake -使用Azure管道进行CDC数据复制

7nbnzgx9  于 2023-06-06  发布在  其他
关注(0)|答案(1)|浏览(110)

有一个要求,我需要从Snowflake复制CDC数据,而不使用Stream。
我选择了CHANGES子句,通过Alter表使用,并能够在Snowflake中获取更改数据,并添加3个元数据列:但与我使用Pipeline从Azure Copy Activity阅读查询复制Activity的方式相同,但我无法复制数据。

积分:
1.查找读取所有表列表

  1. For Each活动迭代每个表并执行其他操作
    1.对于每个表,我都执行一个脚本,该脚本捕获时间并基于时间进行提取。4、复制数据
    我正在使用的查询:
SET ts1 = (SELECT LASTRUNTIME as Time FROM CDC_TABLE);
SELECT *,$ts1 as Time FROM CDC_TABLE;

 -- change data with all meta data.
SELECT * FROM AGENTS CHANGES(INFORMATION => DEFAULT)  AT (timestamp => $ts1 )  ORDER BY 'AGENT_CODE';

-- change data excluding meta data.
SELECT *  
EXCLUDE (METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID)
,CASE WHEN METADATA$ACTION = 'INSERT' AND METADATA$ISUPDATE = 'TRUE' THEN 'U'
              WHEN METADATA$ACTION = 'INSERT' AND METADATA$ISUPDATE = 'FALSE' THEN 'I'
              ELSE 'D'
         END AS CHANGE_OPERATION
FROM (SELECT * FROM AGENTS   CHANGES(INFORMATION => DEFAULT) AT (timestamp => $ts1 ) )
WHERE  METADATA$ACTION != 'DELETE';                                                             

  
  UPDATE CDC_TABLE 
  SET PREVIOUS_RUNTIME = LASTRUNTIME, LASTRUNTIME = CURRENT_TIMESTAMP
  WHERE TABLE_NAME = 'AGENTS';

上述查询在复制数据活动中不起作用,显示以下错误:

请在这件事上帮助我。寻求Maven的建议。

nue99wik

nue99wik1#

“errorCode”:“2200”,“消息”:“ErrorCode=UserErrorOdbcOperationFailed,'Type =Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=ERROR [42000] SQL编译错误:\n位置157处的语法错误行1意外';'.\n位置11处的语法错误行5意外........
此错误是因为在复制活动源查询中,应给出返回要复制到接收器的数据的查询。您可以使用查找活动/脚本活动来获取最新的运行时间戳值,并在复制活动源查询中使用查找活动的输出。更新查询应该在单独的脚本活动中给出,而不是在复制活动中给出。每个活动内部的管道流程应如下图所示。

相关问题