如何在flink-kafka流中使用sql?

ipakzgxi  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(379)

我从postgresql数据库中加载了一个规则表作为flink表,然后读取kafka消息并根据这些规则对消息进行分类。代码是这样的

val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.enableCheckpointing(5000)
    val stenv=StreamTableEnvironment.create(senv)
    val streamsource=senv.createInput(inputFormat)
    stenv.registerDataStream("rules",streamsource)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", KAFKA_BROKER)
    properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
    properties.setProperty("group.id", TRANSACTION_GROUP)
    val fkp = new FlinkKafkaProducer010[String](TOPIC1, new SimpleStringSchema(), properties)
    val fkc = new FlinkKafkaConsumer010[String](TOPIC, new SimpleStringSchema(), properties)
    val stream = senv.addSource(fkc).setParallelism(3)
    val jsons = stream.map {
    {
      r => {
        val sub = JSON.parseObject(r.toString)
        val value = sub.getDouble("value")
        val time = sub.getLong("time")
        val tag = sub.getString("name")
        val error = sub.getString("error")
        val t = stenv.sqlQuery("select * from rules").where("nodeid=" + tag) //error is here
        //todo
        }
    }

错误是这样的

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
    at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:686)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1143)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:617)
    at cettest$.main(cettest.scala:63)
    at cettest.main(cettest.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
    ... 7 more

我试过很多方法来解决这个问题,但都失败了!

nnt7mjpx

nnt7mjpx1#

欢迎使用堆栈溢出!如果您能列出到目前为止的尝试,这会很有帮助,但是您的问题的解决方案似乎相当简单--看起来streamtableenvironmentimpl没有扩展可序列化特性:https://www.oreilly.com/library/view/scala-cookbook/9781449340292/ch12s08.html
然而,使用flink的@internal类似乎是不对的。我更愿意创建自己的可序列化类,或者很可能是case类,它在默认情况下是可序列化的。
希望有帮助!

相关问题