当HDFS有效名称节点在HA模式下关闭时,Flink检查点返回No Route to Host

dgtucam1  于 2023-08-01  发布在  HDFS
关注(0)|答案(1)|浏览(217)

我按照stackable方法部署HDFS ha模式,它工作正常
这是k8s上的我的hdfs简单集群清单

apiVersion: hdfs.stackable.tech/v1alpha1
kind: HdfsCluster
metadata:
  name: simple-hdfs
spec:
  image:
    productVersion: 3.3.4
    stackableVersion: 23.4.0
  clusterConfig:
    zookeeperConfigMapName: simple-hdfs-znode
    listenerClass: external-unstable
    dfsReplication: 2
  nameNodes:
    roleGroups:
      default:
        replicas: 2
        configOverrides:
          core-site.xml:
            fs.defaultFS: hdfs://simple-hdfs
          hdfs-site.xml:
            dfs.nameservices: simple-hdfs
            dfs.client.failover.proxy.provider.simple-hdfs: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
            dfs.permissions.enabled: "false"
            dfs.ha.namenodes.simple-hdfs: nn1, nn2
            dfs.namenode.rpc-address.simple-hdfs.nn1: simple-hdfs-namenode-default-0:8020
            dfs.namenode.rpc-address.simple-hdfs.nn2: simple-hdfs-namenode-default-1:8020
            dfs.namenode.http-address.simple-hdfs.nn1: simple-hdfs-namenode-default-0:9870
            dfs.namenode.http-address.simple-hdfs.nn2: simple-hdfs-namenode-default-1:9870
            dfs.namenode.shared.edits.dir: qjournal://simple-hdfs-journalnode-default-0:8485/simple-hdfs

  dataNodes:
    roleGroups:
      default:
        replicas: 1
  journalNodes:
    roleGroups:
      default:
        replicas: 1

字符串
我试图测试,如果flink检查点保存在hdfs时,一个namenode是下来,例如我删除活动namenode的pod,但当从pod日志我面对下面的错误

2023-06-13 07:01:27,445 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Flat Map -> Sink: Print to Std. Out (2/2) (f7ef99f032a93b4bf4808ad02550230b_20ba6b65f97481d5570070de90e4e791_1_68) switched from INITIALIZING to RUNNING.
    INITIALIZING to RUNNING.
        2023-06-13 07:01:29,301 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 139896 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1686639689281 for job b5209b7b03b4197859796a7d13db8163.
        2023-06-13 07:02:24,792 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline checkpoint 139896 by task f7ef99f032a93b4bf4808ad02550230b_20ba6b65f97481d5570070de90e4e791_0_68 of job b5209b7b03b4197859796a7d13db8163 at basic-checkpoint-ha-example-taskmanager-1-78 @ 10.42.1.92 (dataPort=36231).
        org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.16.1.jar:1.16.1]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
                at java.lang.Thread.run(Unknown Source) [?:?]
        Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 139896 for operator Flat Map -> Sink: Print to Std. Out (1/2)#68.
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist-1.16.1.jar:1.16.1]
                ... 4 more
        Caused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.io.IOException: Could not open output stream for state backend
                at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?]
                at java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?]
                at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:543) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
                ... 3 more
        Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Could not open output stream for state backend
                at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:463) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:310) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:268) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.runtime.util.ForwardingOutputStream.write(ForwardingOutputStream.java:42) ~[flink-dist-1.16.1.jar:1.16.1]
                at java.io.DataOutputStream.writeInt(Unknown Source) ~[?:?]

                at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:540) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
                ... 3 more
        Caused by: org.apache.flink.util.SerializedThrowable: java.net.NoRouteToHostException: No Route to Host from  basic-checkpoint-ha-example-taskmanager-1-78/10.42.1.92 to simple-hdfs-namenode-default:8020 failed on socket timeout exception: java.net.NoRouteToHostException: No route to host; For more details see:  http://wiki.apache.org/hadoop/NoRouteToHost
                at jdk.internal.reflect.GeneratedConstructorAccessor60.newInstance(Unknown Source) ~[?:?]
                at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
                at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
                at com.sun.proxy.$Proxy33.create(Unknown Source) ~[?:?]
  
                at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:540) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
                ... 3 more
        Caused by: org.apache.flink.util.SerializedThrowable: java.net.NoRouteToHostException: No route to host
                at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
                at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[?:?]

                at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:297) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
                at jdk.internal.reflect.GeneratedMethodAccessor71.invoke(Unknown Source) ~[?:?]
                at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
                at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
                at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
                at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
                at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
                at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
                at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
                at com.sun.proxy.$Proxy34.create(Unknown Source) ~[?:?]

        2023-06-13 07:02:24,794 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 139896 for job b5209b7b03b4197859796a7d13db8163. (0 consecutive failed attempts so far)
        org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.
                at org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1036) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-dist-1.16.1.jar:1.16.1]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
                at java.lang.Thread.run(Unknown Source) [?:?]
        Caused by: org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.16.1.jar:1.16.1]
                ... 3 more
        Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 139896 for operator Flat Map -> Sink: Print to Std. Out (1/2)#68.
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.16.1.jar:1.16.1]
                ... 3 more
        Caused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.io.IOException: Could not open output stream for state backend
                at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?]
                at java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?]
                at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:543) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.16.1.jar:1.16.1]
                at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
                ... 3 more

        Caused by: org.apache.flink.util.SerializedThrowable: java.net.NoRouteToHostException: No Route to Host from  basic-checkpoint-ha-example-taskmanager-1-78/10.42.1.92 to simple-hdfs-namenode-default:8020 failed on socket timeout exception: java.net.NoRouteToHostException: No route to host; For more details see:  http://wiki.apache.org/hadoop/NoRouteToHost
                at jdk.internal.reflect.GeneratedConstructorAccessor60.newInstance(Unknown Source) ~[?:?]
                at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
                at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
0]
                at com.sun.proxy.$Proxy34.create(Unknown Source) ~[?:?]
 
        Caused by: org.apache.flink.util.SerializedThrowable: java.net.NoRouteToHostException: No route to host
                at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
                at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[?:?]
]
                at com.sun.proxy.$Proxy33.create(Unknown Source) ~[?:?]

        2023-06-13 07:02:24,796 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
        org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    
                at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
                at java.lang.Thread.run(Unknown Source) ~[?:?]
        2023-06-13 07:02:24,796 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 4 tasks will be restarted to recover from a global failure.
        2023-06-13 07:02:24,796 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job State machine job (b5209b7b03b4197859796a7d13db8163) switched from state RUNNING to RESTARTING.


最后这是我的flink部署清单

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-checkpoint-ha-example
spec:
#  image: flink:1.17
  image: razzaghib/flink:latest
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"   
    state.backend: filesystem
    fs.defaultFS: "hdfs://simple-hdfs"
    fs.default.name: "hdfs://simple-hdfs"
    dfs.nameservices: simple-hdfs
    dfs.client.failover.proxy.provider.simple-hdfs: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
    dfs.permissions.enabled: "false"
    dfs.ha.namenodes.simple-hdfs: simple-hdfs-namenode-default-0, simple-hdfs-namenode-default-1
    dfs.namenode.rpc-address.simple-hdfs.simple-hdfs-namenode-default-0: simple-hdfs-namenode-default-0.simple-hdfs-namenode-default.default.svc.cluster.local:8020
    dfs.namenode.rpc-address.simple-hdfs.simple-hdfs-namenode-default-1: simple-hdfs-namenode-default-1.simple-hdfs-namenode-default.default.svc.cluster.local:8020
    dfs.namenode.http-address.simple-hdfs.simple-hdfs-namenode-default-0: simple-hdfs-namenode-default-0.simple-hdfs-namenode-default.default.svc.cluster.local:9870
    dfs.namenode.http-address.simple-hdfs.simple-hdfs-namenode-default-1: simple-hdfs-namenode-default-1.simple-hdfs-namenode-default.default.svc.cluster.local:9870
    dfs.namenode.shared.edits.dir: qjournal://simple-hdfs-journalnode-default-0.simple-hdfs-journalnode-default.default.svc.cluster.local:8485/simple-hdfs

    state.backend: filesystem
    execution.checkpointing.interval: 10s
    state.savepoints.dir: hdfs://simple-hdfs-namenode-default/savepoints
    state.checkpoints.dir: hdfs://simple-hdfs-namenode-default/checkpoints
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: hdfs://simple-hdfs-namenode-default/ha

  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
          - mountPath: /flink-data
            name: flink-volume
      volumes:
      - name: flink-volume
        hostPath:
          # directory location on host
          path: /tmp/flink
          # this field is optional
#          type: Directory
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: savepoint
    state: running
    savepointTriggerNonce: 0

eufgjt7s

eufgjt7s1#

要将Flink k8s运算符与HDFS ha一起使用:

  • 将hdfs configs挂载到Flink容器(使用Configmap):core-site.xml、hdfs-site.xml。例如,我挂载到/usr/local/hadoop/etc/hadoop
  • 为容器添加env以获取配置:
podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: pod-template
    spec:
      serviceAccount: flink
      containers:
        # Do not change the main container name
        - name: flink-main-container
          env:
            - name: PATH_HADOOP_CONFIG
              value: /usr/local/hadoop/etc/hadoop
            - name: HADOOP_CLASSPATH
              value: /usr/local/hadoop/etc/hadoop

字符串
之后,您可以使用hdfs名称服务配置检查点和保存点目录(注意:用hdfs名称服务替换myhdfscluster

flinkConfiguration:
    # scheduler-mode: REACTIVE
    taskmanager.numberOfTaskSlots: "2"
    state.backend: "rocksdb"
    state.checkpoints.dir: "hdfs://myhdfscluster/checkpoints/test"
    state.savepoints.dir: "hdfs://myhdfscluster/savepoints/test"


更多文档:https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java

相关问题