我使用一个avro值转换,它生成如下模式(这只是一个子集,因为它太大了)
{
"type": "record",
"name": "Envelope",
"namespace": "mssql.dbo.InvTR_T",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "InvTR_ID",
"type": "int"
},
{
"name": "Type_CH",
"type": "string"
},
{
"name": "CalcType_CH",
"type": "string"
},
{
"name": "ER_CST_ID",
"type": "int"
},
{
"name": "ER_REQ_ID",
"type": "int"
},
{
"name": "Vendor_ID",
"type": "int"
},
{
"name": "VendInv_VC",
"type": "string"
},
{
"name": "Status_CH",
"type": "string"
},
{
"name": "Stage_TI",
"type": {
"type": "int",
"connect.type": "int16"
}
},
{
"name": "CheckOut_ID",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "ReCalcCk_LG",
"type": "boolean"
},
{
"name": "ReCalcAll_LG",
"type": "boolean"
},
{
"name": "PatMatch_LG",
"type": "boolean"
},
{
"name": "DocPatOvRd_LG",
"type": "boolean"
},
{
"name": "Locked_LG",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "SegErrFlag_LG",
"type": "boolean"
},
{
"name": "Hold_LG",
"type": "boolean"
},
{
"name": "Reason_ID",
"type": [
"null",
{
"type": "int",
"connect.type": "int16"
}
],
"default": null
},
{
"name": "HoldCom_VC",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "AllSegFin_LG",
"type": "boolean"
},
{
"name": "InvAmt_MN",
"type": {
"type": "bytes",
"scale": 4,
"precision": 19,
"connect.version": 1,
"connect.parameters": {
"scale": "4",
"connect.decimal.precision": "19"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
当我运行以下命令来创建一个流时
CREATE STREAM stream_invtr_t_json WITH (KAFKA_TOPIC='InvTR_T', VALUE_FORMAT='AVRO');
然后我描述了那个流,模式是一种非常奇怪的格式。我想使用ksql来过滤特定的信息并适当地分散这些事件。但是,我不能从kafka topic=>ksql stream=>kafka topic=>sink开始。如果我从那条流中创建一个新的主题,并尝试将它消化到一个Flume中,我会得到
Expected Envelope for transformation, passing it unchanged
然后一个关于pk缺失的错误。我试图删除展开转换,只是为了看看它会如何出来,并收到了错误的。
BEFORE | STRUCT<INVTR_ID INTEGER, TYPE_CH VARCHAR(STRING), CALCTYPE_CH VARCHAR(STRING), ER_CST_ID INTEGER, ER_REQ_ID INTEGER, VENDOR_ID INTEGER, VENDINV_VC VARCHAR(STRING), STATUS_CH VARCHAR(STRING), STAGE_TI INTEGER, CHECKOUT_ID INTEGER, RECALCCK_LG BOOLEAN, RECALCALL_LG BOOLEAN, PATMATCH_LG BOOLEAN, DOCPATOVRD_LG BOOLEAN, LOCKED_LG BOOLEAN, SEGERRFLAG_LG BOOLEAN, HOLD_LG BOOLEAN, REASON_ID INTEGER, HOLDCOM_VC VARCHAR(STRING), ALLSEGFIN_LG BOOLEAN, INVDATE_DT BIGINT, SHIPDATE_DT BIGINT, PDTERMS_CH VARCHAR(STRING), PMTDUE_DT BIGINT, PMTTERMS_VC VARCHAR(STRING), BILLTERMS_CH VARCHAR(STRING), JOINT_LG BOOLEAN, COMMENT_VC VARCHAR(STRING), SOURCE_CH VARCHAR(STRING), ADDBY_ID VARCHAR(STRING), ADDED_DT BIGINT, CHGBY_ID VARCHAR(STRING), CHGED_DT BIGINT, APPROVED_LG BOOLEAN, MULTIPO_VC VARCHAR(STRING), PRVAUDITED_INVTR_ID INTEGER, PRVVENDOR_ID INTEGER, TRANSITDAYS_SI INTEGER, SHIP_NUM_VC VARCHAR(STRING), PRVTRANSITDAYS_SI INTEGER, PRVJOINT_LG BOOLEAN, CLONEDFROM_INVTR_ID INTEGER, LASTCALC_DT BIGINT, TMSFMANUAL_LG BOOLEAN, FRTRATERSOURCE_CH VARCHAR(STRING), ACTPICKUP_DT BIGINT, ROUTVEND_SI INTEGER, CALCVRSN_TI INTEGER, VENDORRANK_SI INTEGER, SEQ_SI INTEGER, PRVAUDITED_DT BIGINT, FRTRATERBATCHTYPE_CH VARCHAR(STRING), CURRENCY_TYPE_CD VARCHAR(STRING), EXCHANGE_DT BIGINT, EXCHANGE_RATE_LOCKED_LG BOOLEAN, EXCHANGE_DT_LOCKED_LG BOOLEAN, CUSTAPPROVED_LG BOOLEAN, FRTRATERMATCH_INVTR_ID INTEGER, CRC_INVOICE_LG BOOLEAN, RG_ROUTVEND_SI INTEGER, RG_PRVVE
1条答案
按热度按时间laik7k3q1#
好像是关于
UnwrapFromEnvelope
解决部分问题。这只剩下小数部分没有通过。查看连接器的文档:https://debezium.io/documentation/reference/1.1/connectors/postgresql.html
我可以看到有一个
decimal.handling.mode
设置,如jiri所说。默认值为precise
它看起来将以ksqldb可以识别的格式输出avro十进制,除非源数字或十进制类型没有任何刻度。在这一点上,你最终与STRUCT
数据结构,包括一个字节字段。这条规则有个例外。如果使用数字或十进制类型时没有任何比例约束,则表示来自数据库的值对每个值具有不同的(变量)比例。在这种情况下,将使用类型io.debezium.data.variablescaledecimal,它包含传递值的值和小数位数。
因此,要将数据导入ksqldb,您需要:
等到我们支持bytes数据类型(目前还不在我们的路线图上)
更改源表的架构以定义列的比例。
将decimal.handling.mode更改为其他设置。您可以使用string,然后在ksql中将值转换为十进制。