在和flink玩的时候,我一直在尝试将数据插入elasticsearch。我的stdout有个错误:
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.hosts=http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200
connector.index=transfers-sum
connector.key-null-literal=n/a
connector.property-version=1
connector.type=elasticsearch
connector.version=6
format.json-schema={ \"curr_careUnit\": {\"type\": \"text\"}, \"sum\": {\"type\": \"float\"} }
format.property-version=1
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=curr_careUnit
schema.1.data-type=FLOAT
schema.1.name=sum
update-mode=upsert
The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
...
下面是我的scala-flink代码:
def main(args: Array[String]) {
// Create streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// Set properties per KafkaConsumer API
val properties = new Properties()
properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
properties.setProperty("group.id", "test")
// Add Kafka source to environment
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data4", new SimpleStringSchema(), properties)
// Read from beginning of topic
myKConsumer.setStartFromEarliest()
val streamSource = env
.addSource(myKConsumer)
// Transform CSV (with a header row per Kafka event into a Transfers object
val streamTransfers = streamSource.map(new TransfersMapper())
// create a TableEnvironment
val tEnv = StreamTableEnvironment.create(env)
// register a Table
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
tEnv.createTemporaryView("transfers", tblTransfers)
tEnv.connect(
new Elasticsearch()
.version("6")
.host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")
.index("transfers-sum")
.keyNullLiteral("n/a")
.withFormat(new Json().jsonSchema("{ \"curr_careUnit\": {\"type\": \"text\"}, \"sum\": {\"type\": \"float\"} }"))
.withSchema(new Schema()
.field("curr_careUnit", DataTypes.STRING())
.field("sum", DataTypes.FLOAT())
)
.inUpsertMode()
.createTemporaryTable("transfersSum")
val result = tEnv.sqlQuery(
"""
|SELECT curr_careUnit, sum(los)
|FROM transfers
|GROUP BY curr_careUnit
|""".stripMargin)
result.insertInto("transfersSum")
env.execute("Flink Streaming Demo Dump to Elasticsearch")
}
}
我正在创建一个胖jar并将其上载到我的远程flink示例。以下是我的build.gradle依赖项:
compile 'org.scala-lang:scala-library:2.11.12'
compile 'org.apache.flink:flink-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-streaming-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-kafka-0.10_2.11:1.10.0'
compile 'org.apache.flink:flink-table-api-scala-bridge_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-elasticsearch6_2.11:1.10.0'
compile 'org.apache.flink:flink-json:1.10.0'
compile 'com.fasterxml.jackson.core:jackson-core:2.10.1'
compile 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
compile 'org.json4s:json4s-jackson_2.11:3.7.0-M1'
下面是如何为gradle构建farjar命令:
jar {
from {
(configurations.compile).collect {
it.isDirectory() ? it : zipTree(it)
}
}
manifest {
attributes("Main-Class": "main" )
}
}
task fatJar(type: Jar) {
zip64 true
manifest {
attributes 'Main-Class': "flinkNamePull.Demo"
}
baseName = "${rootProject.name}"
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
谁能帮我看看我缺了什么吗?一般来说,我对flink和数据流还比较陌生。呵呵
提前谢谢!
2条答案
按热度按时间3bygqnnd1#
您应该使用shadow插件来创建fat jar,而不是手动创建。
特别是,您需要合并服务描述符。
kh212irz2#
名单在吗
The following factories have been considered:
完成了吗?里面有吗Elasticsearch6UpsertTableSinkFactory
? 如果不是我所能说的,服务发现依赖项有问题。你如何提交你的工作?你能查一下你有没有档案吗
META-INF/services/org.apache.flink.table.factories.TableFactory
在uber jar中输入Elasticsearch6UpsertTableSinkFactory
?使用maven时,必须添加转换器才能正确合并服务文件:
我不知道你在grad尔是怎么做到的。
编辑:感谢gradle的arvid heise在使用shadowjar插件时,您可以通过以下方式合并服务文件: