陷入困境:找不到合适的表工厂

tp5buhyn  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(488)

在和flink玩的时候,我一直在尝试将数据插入elasticsearch。我的stdout有个错误:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector.hosts=http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200
connector.index=transfers-sum
connector.key-null-literal=n/a
connector.property-version=1
connector.type=elasticsearch
connector.version=6
format.json-schema={      \"curr_careUnit\": {\"type\": \"text\"},      \"sum\": {\"type\": \"float\"}    }
format.property-version=1
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=curr_careUnit
schema.1.data-type=FLOAT
schema.1.name=sum
update-mode=upsert

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
    at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
...

下面是我的scala-flink代码:

def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data4", new SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV (with a header row per Kafka event into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("6")
        .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")
        .index("transfers-sum")
        .keyNullLiteral("n/a")
      .withFormat(new Json().jsonSchema("{      \"curr_careUnit\": {\"type\": \"text\"},      \"sum\": {\"type\": \"float\"}    }"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.FLOAT())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

我正在创建一个胖jar并将其上载到我的远程flink示例。以下是我的build.gradle依赖项:

compile 'org.scala-lang:scala-library:2.11.12'
compile 'org.apache.flink:flink-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-streaming-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-kafka-0.10_2.11:1.10.0'
compile 'org.apache.flink:flink-table-api-scala-bridge_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-elasticsearch6_2.11:1.10.0'
compile 'org.apache.flink:flink-json:1.10.0'
compile 'com.fasterxml.jackson.core:jackson-core:2.10.1'
compile 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
compile 'org.json4s:json4s-jackson_2.11:3.7.0-M1'

下面是如何为gradle构建farjar命令:

jar {
    from {
        (configurations.compile).collect {
            it.isDirectory() ? it : zipTree(it)
        }
    }
    manifest {
        attributes("Main-Class": "main" )
    }
}
task fatJar(type: Jar) {
    zip64 true
    manifest {
        attributes 'Main-Class': "flinkNamePull.Demo"
    }
    baseName = "${rootProject.name}"
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    with jar
}

谁能帮我看看我缺了什么吗?一般来说,我对flink和数据流还比较陌生。呵呵
提前谢谢!

3bygqnnd

3bygqnnd1#

您应该使用shadow插件来创建fat jar,而不是手动创建。
特别是,您需要合并服务描述符。

kh212irz

kh212irz2#

名单在吗 The following factories have been considered: 完成了吗?里面有吗 Elasticsearch6UpsertTableSinkFactory ? 如果不是我所能说的,服务发现依赖项有问题。
你如何提交你的工作?你能查一下你有没有档案吗 META-INF/services/org.apache.flink.table.factories.TableFactory 在uber jar中输入 Elasticsearch6UpsertTableSinkFactory ?
使用maven时,必须添加转换器才能正确合并服务文件:

<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

我不知道你在grad尔是怎么做到的。
编辑:感谢gradle的arvid heise在使用shadowjar插件时,您可以通过以下方式合并服务文件:

// Merging Service Files
shadowJar {
  mergeServiceFiles()
}

相关问题