如何在Flink sql客户端中连接到MinIO文件系统?

svmlkihl  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(599)

I'm trying to build data pipeline built with Flink and MinIO as the storage, currently I can sink the data to MinIO bucket success, but when I try to create a table WITH the minio file, it always encountered the Connection Refused error:

Flink SQL> CREATE TABLE WordCountTable (
>   word STRING,
>   `count` INT
> )  WITH (
>   'connector' = 'filesystem',         
>   'path' = 's3://test/wordcount2', 
>   'format' = 'csv',     
>   'csv.field-delimiter'=' '
> );
[INFO] Execute statement succeed.

Flink SQL> select * from WordCountTable;
[ERROR] Could not execute SQL statement. Reason:
java.net.ConnectException: Connection refused

I tried to google it, and the only useful post is - https://github.com/fhueske/flink-sql-demo , in the minio section, but it already out of date.

Here is the docker compose file:

version: '3'
services:
  minio:
    image: minio/minio
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - minio_storage:/data
    environment:
      MINIO_ROOT_USER: minio
      MINIO_ROOT_PASSWORD: minio123
    command: server --console-address ":9001" /data

  jobmanager:
    image: flink:1.15.0-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        
        state.backend: filesystem
        state.checkpoints.dir: s3://state/checkpoint
        s3.endpoint: http://minio:9000
        s3.path.style.access: true
        s3.access-key: minio
        s3.secret-key: minio123

  taskmanager:
    image: flink:1.15.0-scala_2.12
    links:
      - jobmanager
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2        
        state.backend: filesystem
        state.checkpoints.dir: s3://state/checkpoint
        s3.endpoint: http://minio:9000
        s3.path.style.access: true
        s3.access-key: minio
        s3.secret-key: minio123

  sql-client:
    image: flink:1.15.0-scala_2.12
    command: bin/sql-client.sh
    links:
      - jobmanager
    depends_on:
      - jobmanager
    environment:
      FLINK_JOBMANAGER_HOST: jobmanager

volumes:
  minio_storage: { }

Thanks in advance.

UPDATE Today I tried to verify the network connection with ping and nc : seems everythink is ok:

root@0e452dd7385e:/usr/bin# ping jobmanager
PING jobmanager (192.168.128.3) 56(84) bytes of data.
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=1 ttl=64 time=3.39 ms
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=2 ttl=64 time=0.193 ms
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=3 ttl=64 time=0.339 ms
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=4 ttl=64 time=0.186 ms
root@0e452dd7385e:/usr/bin# nc -zv jobmanager 6123
Connection to jobmanager (192.168.128.3) 6123 port [tcp/*] succeeded!

But I found in the sql client log, there is a Connection refused: /0.0.0.0:8081 error:

2022-07-28 06:44:16,870 WARN  org.apache.flink.client.program.rest.RestClusterClient       [] - Attempt to submit job 'collect' (80b7f32d13c2e3f1deeee4db3df6b923) to 'http://0.0.0.0:8081' has failed.
java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /0.0.0.0:8081
  at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
  at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
  at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
  at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
  at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:476) ~[flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321) [flink-dist-1.15.0.jar:1.15.0]

it is confused that why Flink sql client try to connect to 0.0.0.0:8081? WHY NOTjobmanager:8081 ?

相关问题