容器启动exitcode=2的异常,可能与flink sql配置单元连接器有关

n3schb8v  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(1168)

flink(scala)exitcode=2在mvn清洁包之后

我有一个简单的flink任务,它从一个hive表的两列中读取数据 mysource ,将列相加,然后将结果写入另一个配置单元表 mysink ,其中 mysource 有2列 a bigint 以及 b bigint ,和 mysink 只有一列 c bigint .
作业提交成功,但我观察到它一直在重试。

我点击每一次尝试,他们只是显示这个。

AM Container for appattempt_1607399514900_2511_001267 exited with exitCode: 2
For more detailed output, check application tracking page:http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e13_1607399514900_2511_1267_000001
Exit code: 2
Stack trace: ExitCodeException exitCode=2:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 2
Failing this attempt

然而,“日志”没有有用的信息-它抱怨日志库,但我相信它们实际上是警告,而不是错误。

LogType:jobmanager.err
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:1010
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
End of LogType:jobmanager.err

LogType:jobmanager.out
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:0
Log Contents:
End of LogType:jobmanager.out

这是用scala写的作业。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog

object HiveToyExample {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance.build
    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(execEnv, settings)

    val hiveCatalog = new HiveCatalog(
      "myhive",
      "aiinfra",
      "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
    )
    tableEnv.registerCatalog("myhive", hiveCatalog)
    tableEnv.useCatalog("myhive")

    tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

    tableEnv
      .executeSql("""
          |INSERT INTO mysink
          |SELECT a + b
          |FROM mysource
          |""".stripMargin)
  }
}

这是pom.xml。

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>exmple</groupId>
    <artifactId>featurepipelines</artifactId>
    <version>0.1.1</version>
    <packaging>jar</packaging>

    <name>Feature Pipelines</name>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.12.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.12</scala.version>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <filtering>true</filtering>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>false</shadedArtifactAttached>
                            <shadedClassifierName>Shade</shadedClassifierName>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.4.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

我就是这样 Package jar的。

mvn clean package

我就是这样做的。

flink run \
--yarnname scalaflink-hive-test \
-m yarn-cluster \
-yarnqueue datadev \
--class featurepipelines.ingestion.HiveToyExample \
./featurepipelines-0.1.1.jar

flink(scala)via intellij工作正常?!

因为我的本地环境能够访问上面提到的配置单元环境,所以我尝试运行 HiveToyExample 在intellij中,单击“运行”按钮。。。它工作得很好!

pyflink重写很好?!

因为逻辑很简单,所以我用pyflink重写了作业,看看会发生什么。这里显示了pyflink重写。

import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = EnvironmentSettings.new_instance().use_blink_planner().build()
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(exec_env, environment_settings=settings)

# There exists such a jar in the path

t_env.get_config().get_configuration().set_string(
    "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

catalog_name = "myhive"
default_database = "aiinfra"
hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/"

hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
t_env.register_catalog(catalog_name, hive_catalog)
t_env.use_catalog(catalog_name)

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()

我就是这样做的。

flink run \
--yarnname pyflink-hive-test \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py

令人惊讶的是,作业运行得很好—它很快就完成了,并将结果写入 mysink table。

为什么?

考虑到比较,我高度怀疑第一次运行失败是因为它打包不正确,即使我遵循flink文档,这可以通过查看我的pom来验证。
如果您正在构建自己的程序,那么mvn文件中需要以下依赖项。建议不要在结果jar文件中包含这些依赖项。您应该在运行时添加上述依赖项。

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>

此外,我还将flink-sql-connector-hive-2.2.0\u 2.11-1.12.0.jar包含在我的flink发行版的/lib中,如flink docs中所建议的:
添加依赖项的推荐方法是使用绑定的jar。只有当捆绑的jar不能满足您的需要时,才应该使用单独的jar。
我错过了什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题