我正在尝试使用flink批处理api从hdfs读取json格式的文件,我想注册为表源或接收器,但它不起作用;当我使用文件系统连接器和csv格式时,它可以读写。
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
tableEnv.connect(new FileSystem().path("hdfs:///tmp/test.json"))
.withFormat(new Json()
.deriveSchema()
.failOnMissingField(true)
).withSchema(……)
.registerTableSource("test_json")
抛出异常
Exception in thread "main" java.lang.Exception: csv2SqlServer running failed.
at cn.company.main.FlinkBrick$.main(FlinkBrick.scala:43)
at cn.company.test.FlinkTest$.main(FlinkTest.scala:8)
at cn.company.test.FlinkTest.main(FlinkTest.scala)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.path=hdfs://cdhNM:8020/tmp/test.json
connector.property-version=1
connector.type=filesystem
format.derive-schema=true
format.fail-on-missing-field=true
format.property-version=1
format.type=json
schema.0.name=busi_time
schema.0.type=TIMESTAMP
schema.1.name=seq_no
schema.1.type=INT
schema.2.name=stock_code
schema.2.type=VARCHAR
schema.3.name=stock_name
schema.3.type=VARCHAR
schema.4.name=trade_time
schema.4.type=ANY<java.util.Date, xsdgsda……>
schema.5.name=yes_close_price
schema.5.type=INT
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
暂无答案!
目前还没有任何答案,快来回答吧!