在接收Kafka流时看不到消息,在flink 1.2中看不到打印消息

wrrgggsh  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(683)

我的目标是使用kafka读入json格式的字符串,对字符串进行过滤,然后将消息输出(仍然是json字符串格式)。
出于测试目的,我的输入字符串消息如下所示:

{"a":1,"b":2}

我的执行准则是:

def main(args: Array[String]): Unit = {

// parse input arguments
val params = ParameterTool.fromArgs(args)

if (params.getNumberOfParameters < 4) {
  println("Missing parameters!\n"
    + "Usage: Kafka --input-topic <topic> --output-topic <topic> "
    + "--bootstrap.servers <kafka brokers> "
    + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]")
  return
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)

// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new JSONKeyValueDeserializationSchema(false),
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a").asText.equals("1")
                      && node.get("b").asText.equals("2"))

messageStream.print()
// Refer to: https://stackoverflow.com/documentation/apache-flink/9004/how-to-define-a-custom-deserialization-schema#t=201708080802319255857
filteredStream.addSink(new FlinkKafkaProducer010[ObjectNode](
  params.getRequired("output-topic"),
  new SerializationSchema[ObjectNode] {
    override def serialize(element: ObjectNode): Array[Byte] = element.toString.getBytes()
  }, params.getProperties
))

env.execute("Kafka 0.10 Example")
}

可以看出,我想将消息流打印到控制台,并将过滤后的消息接收到kafka。然而,他们两个我都看不见。
有趣的是,如果我将kafkaconsumer的模式从jsonkeyvaluedeserializationschema修改为simplestringschema,我可以看到messagestream打印到控制台。代码如下:

val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new SimpleStringSchema,
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)
messageStream.print()

这让我觉得如果我使用jsonkeyvaluedeserializationschema,我的输入消息实际上不会被kafka接受。但这看起来很奇怪,和网上的文件有很大的不同(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html)
希望有人能帮我!

mqxuamgl

mqxuamgl1#

jsonkeyvaluedeserializationschema()期望每个kafka消息都有消息键,我假设在生成json消息并通过kafka主题发送时没有提供任何键。
因此,要解决这个问题,请尝试使用jsondeserializationschema(),它只需要消息,并根据收到的消息创建一个对象节点。

相关问题