过去几个月我一直在使用kafka connect,最近我加入了activemq源插件,以便读取一些包含json文件的jms主题消息,将它们放在kafka主题中,然后在ksqldb中创建一个流/表,将json文件中的一些键用作列。不过,插件将jms消息插入为带双引号的文本,因此在ksqldb中无法正确识别。我尝试了配置中的各种方法来修复它,但到目前为止没有任何效果。我还想使用json格式,而不是kafka connect中的avro(也没有运行模式注册表)。出于测试目的,我还尝试通过将头内容指定为“application/json”来发送jms消息,但仍然没有成功。
下面是我的activemq插件的样子
"config": {"connector.class":"ActiveMQSourceConnector", "tasks.max":"1", "kafka.topic":"activemq", "activemq.url":"tcp://localhost:61616","activemq.username":"admin","activemq.password":"admin","jms.destination.name":"topic.2","jms.destination.type":"topic","jms.message.format":"json","jms.message.converter":"org.apache.kafka.connect.json.JsonConverter","confluent.license":"","confluent.topic.bootstrap.servers":"localhost:9092"}}
下面是我的Kafka连接配置
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/opt/kafka_2.13-2.5.0/plugins
这里还有一个Kafka如何使用这些信息的例子
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": "{\"widget\": { \"debug\": \"on\", \"window\": { \"title\": \"Sample Konfabulator Widget\", \"name\": \"main_window\", \"width\": 500, \"height\": 500 }, \"image\": { \"src\": \"Images/Sun.png\", \"name\": \"sun1\", \"hOffset\": 250, \"vOffset\": 250, \"alignment\": \"center\" }, \"text\": { \"data\": \"Click Here\", \"size\": 36, \"style\": \"bold\", \"name\": \"text1\", \"hOffset\": 250, \"vOffset\": 100, \"alignment\": \"center\", \"onMouseUp\": \"sun1.opacity = 39\"} }}\n"
}
如果任何其他信息是需要的,请让我知道任何帮助将不胜感激。
更新:最终,最好的解决方案是能够以某种方式配置连接器,使其不会转义负载中的引号。同样不幸的是,转义引号是从activemq本身生成的,并且不是初始消息的一部分
所以信息应该是这样的
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": {
"widget": {
"debug": "on",
"window": {
"title": "Sample Konfabulator Widget",
"name": "main_window",
"width": 500,
"height": 500
},
"image": {
"src": "Images/Sun.png",
"name": "sun1",
"hOffset": 250,
"vOffset": 250,
"alignment":
"center"
}
}
1条答案
按热度按时间qojgxg4l1#
欢迎elen1no1yami!
在我看来问题是
text
消息的字段是一个字符串,其中包含您感兴趣的json负载,但该负载的双引号用\
烧焦。我假设activemq中的数据本身没有
\
查尔,但如果你能澄清这一点就好了。解决这个问题的方法是:
能够将连接器配置为不转义有效负载中的引号。所以信息看起来更像:
或者以某种方式让ksqldb处理消息,因为它仍然可以访问
text
现场。这就是你要找的东西吗?如果是,请更新您的问题以反映这一点(在你的问题中包含这样的细节是很好的,这样你的问题就清楚了。
至于答案。。。
我不是连接Maven,所以不能真正评论,也看不到连接器配置的任何细节,这些细节可能允许您更改
text
. 其他更了解连接的人也许能提供更多帮助。为了能够访问ksqldb中嵌入/转义的json,首先需要删除转义。有关使用ksqldb执行此操作的方法,请参见下文
使用ksqldb访问转义的json
在我们可以访问中的json文档之前
text
我们必须把逃逸的人移走。我能想到两种方法:
编写自定义自定义自定义项
最好的方法是编写一个自定义的udf'unescape\ujson',它可以删除转义。
如果编写正确,自定义udf方法将不会出现潜在的数据损坏问题
REPLACE
基于web的解决方案有很多缺点。使用replace删除转义
注意:这个解决方案是脆弱的:字符替换可以匹配和替换不应该匹配的内容,这取决于您的消息的内容!
让我们用更简单的测试数据来解释需要什么,例如我们想要转换:
收件人:
这需要三件事:
更换开口
"text": "{
与"text": {
全部替换\"
与"
.更换关闭
}"
与}
我们可以使用replace函数或regexp\u replace函数:当然,如果您的数据包含以下任何搜索项,则此解决方案可能会损坏您的数据:
"text": "{
,\"
或者"}
数据中的任何其他位置,例如。将错误地转换为
这就是为什么自定义自定义自定义项会更好。
更正输入内容(并将其写入新主题)后,可以正常导入数据: