独立模式没有问题。但是Yarn模式,它不能工作。我在flink类路径(flink/lib)中添加了hadoopjar
https://issues.apache.org/jira/browse/flink-3373
https://issues.apache.org/jira/browse/flink-6126
https://issues.apache.org/jira/browse/flink-4587
是在flink 1.2里解决的?
Flinkversion:1.9.1
hadoop/Yarnversion:2.6.0
ElasticSearchversion:6.8.1
maven依赖项
<properties>
<compiler.version>1.8</compiler.version>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<log4j.version>2.6.2</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink steam api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.8.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.8.1</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- log -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
</dependencies>
堆栈跟踪:
2020-05-21 16:39:01,918 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchFieldError: INSTANCE
at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<init>(DefaultHttpRequestWriterFactory.java:53)
at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<init>(DefaultHttpRequestWriterFactory.java:57)
at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<clinit>(DefaultHttpRequestWriterFactory.java:47)
at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<init>(ManagedNHttpClientConnectionFactory.java:75)
at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<init>(ManagedNHttpClientConnectionFactory.java:83)
at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<clinit>(ManagedNHttpClientConnectionFactory.java:64)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager$InternalConnectionFactory.<init>(PoolingNHttpClientConnectionManager.java:553)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:163)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:147)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:119)
at org.apache.http.impl.nio.client.HttpAsyncClientBuilder.build(HttpAsyncClientBuilder.java:668)
at org.elasticsearch.client.RestClientBuilder$2.run(RestClientBuilder.java:241)
at org.elasticsearch.client.RestClientBuilder$2.run(RestClientBuilder.java:238)
at java.security.AccessController.doPrivileged(Native Method)
at org.elasticsearch.client.RestClientBuilder.createHttpClient(RestClientBuilder.java:238)
at org.elasticsearch.client.RestClientBuilder.access$000(RestClientBuilder.java:42)
at org.elasticsearch.client.RestClientBuilder$1.run(RestClientBuilder.java:209)
at org.elasticsearch.client.RestClientBuilder$1.run(RestClientBuilder.java:206)
at java.security.AccessController.doPrivileged(Native Method)
at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:206)
at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:269)
at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:261)
at cn.ipaynow.flink.SinkToElasticsearch.<clinit>(SinkToElasticsearch.java:44)
at cn.ipaynow.Main.main(Main.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
... 7 more
1条答案
按热度按时间ibrsph3r1#
我用
Flink Elasticsearch connector
解决问题。我昨天用函数做的。