当我尝试将数据流接收器放入elasticsearch 7.17.9时,我出错了以下是我的maven dependeies:
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.17.9</elasticsearch.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.16.2</flink.version>
<flink.client.version>1.14.6</flink.client.version>
</properties>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>uk.co.jemos.podam</groupId>
<artifactId>podam</artifactId>
<version>7.2.11.RELEASE</version>
</dependency>
<!--sink function 方法开始 -->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-java</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-java_2.12</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-clients_2.12</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-elasticsearch7_2.12</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.9</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.33</version>
</dependency>
这里的key code是:
log.info("开始执行 flink sink to elasticsearch 程序!");
SourceFunction<DataChangeDTO> sqlServerSource =
SqlServerSource.<DataChangeDTO>builder()
.hostname(sourceConfiguration.getHostName())
.port(sourceConfiguration.getPort())
.database(sourceConfiguration.getDatabase())
.tableList(String.join(",", sourceConfiguration.getTableList()))
.username(sourceConfiguration.getUserName())
.password(sourceConfiguration.getPassword())
.deserializer(new CustomJsonDebeziumDeserializationSchema())
//.startupOptions(StartupOptions.initial())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//enable checkpoint
env.enableCheckpointing(3000);
// set the source parallelism to 2
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8080);
HttpHost httpHost = new HttpHost(targetConfiguration.getHostName(), targetConfiguration.getPort(), "http");
//ElasticsearchSink.Builder<DataChangeDTO> esSinkBuilder = new ElasticsearchSink.Builder<>(ListUtil.of(httpHost), new CustomElasticSinkFunction());
//esSinkBuilder.setBulkFlushMaxActions(1);
env
.addSource(sqlServerSource)
.filter(new DataStreamFilter())
.sinkTo(
new Elasticsearch7SinkBuilder<DataChangeDTO>()
// 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
.setBulkFlushMaxActions(1)
.setHosts(httpHost)
.setEmitter(
(element, context, indexer) ->
indexer.add(createIndexRequest(element)))
.build())
//.print()
//.addSink(esSinkBuilder.build())
.setParallelism(1)
.name("flinkCDC to elasticsearch process");
//.setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute(" SqlServer to Elastic ");
我得到了错误详细信息:
Caused by: java.lang.NoClassDefFoundError: org/elasticsearch/common/unit/TimeValue
at org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder$1.apply(Elasticsearch7SinkBuilder.java:109) ~[flink-connector-elasticsearch7-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder$1.apply(Elasticsearch7SinkBuilder.java:69) ~[flink-connector-elasticsearch7-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.createBulkProcessor(ElasticsearchWriter.java:198) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.<init>(ElasticsearchWriter.java:105) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink.createWriter(ElasticsearchSink.java:90) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterStateHandler.createWriter(StatelessSinkWriterStateHandler.java:39) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-runtime-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-runtime-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-runtime-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-runtime-1.16.2.jar:1.16.2]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_292]
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.unit.TimeValue
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_292]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_292]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[na:1.8.0_292]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_292]
... 19 common frames omitted
我不明白,谢谢
我试着用其他方法,比如:
<dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-base</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-java_2.12</artifactId>-->
<!-- <version>1.14.6</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-clients_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>slf4j-api</artifactId>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.elasticsearch</groupId>-->
<!-- <artifactId>elasticsearch</artifactId>-->
<!-- <version>7.17.9</version>-->
<!-- </dependency>-->
然后使用env.addSink代替sink来...但问题还是发生了
我试着删除cdc步骤来构建一个有界流汇到elasticsearch,我得到了同样的错误,请看图片:enter image description here
1条答案
按热度按时间mpbci0fu1#
您尝试使用的Elasticsearch版本与Flink Elasticsearch连接器不兼容。的1.14版本使用7.5.1,可以在www.example.com中找到https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-elasticsearch7/pom.xml#L40
Flink无法升级到7.17.9,因为新版本使用的SSPL许可证与Apache 2.0许可证不兼容。