无法使用streamexecutionenvironment使用s3接收器写入s3-apache flink 1.1.4

x4shl7ld  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(450)

我创建了一个简单的apache-flink项目,它将从kafka主题读取数据并将数据写入s3存储桶。当我运行这个项目时,我没有收到任何错误,它成功地读取了kafka主题中的每条消息,但是没有任何内容写入我的s3 bucket。没有错误,因此很难尝试和调试正在进行的操作。下面是我的项目和配置。这仅在我使用StreamExecutionEnvironment时发生。如果我尝试使用常规的批处理执行环境生成s3,它就可以工作了。
s3测试java程序

public class S3Test {

public static void main(String[] args) throws Exception {
    // parse input arguments
    final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);

    if(parameterTool.getNumberOfParameters() < 4) {
        System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
                "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
        return;
    }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
    env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
    env.getConfig().setGlobalJobParameters(parameterTool); //make parameters available in the web interface

    DataStream<String> messageStream = env
            .addSource(new FlinkKafkaConsumer09<String>(
                    parameterTool.getRequired("kafka.topic"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties()));

    // write kafka stream to standard out.
    //messageStream.print();
    String id = UUID.randomUUID().toString();
    messageStream.writeAsText("s3://flink-data/" + id + ".txt").setParallelism(1);

    env.execute("Write to S3 Example");
}
}

pom.xml文件

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk</artifactId>
        <version>1.7.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>2.7.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.2.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpcore</artifactId>
        <version>4.2.5</version>
    </dependency>

    <!-- Apache Kafka Dependencies -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

</dependencies>

core-site.xml(hadoop配置)

<configuration>
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
</property>

<property>
   <name>fs.s3.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>

<!-- Comma separated list of local directories used to buffer
 large results prior to transmitting them to S3. -->
<property>
  <name>fs.s3a.buffer.dir</name>
  <value>/tmp</value>
</property>

<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
    <name>fs.s3a.access.key</name>
    <value>***************</value>
</property>

<!-- set your AWS access key -->
<property>
    <name>fs.s3a.secret.key</name>
    <value>****************</value>
</property>

</configuration>
9rbhqvlz

9rbhqvlz1#

通过flink将kafka主题持久化到s3需要使用 RollingSink . rollingsink使用bucketer指定零件文件将保存到的目录的名称。datetime是默认的bucketer,但是您也可以创建一个自定义的bucketer。当达到最大批量大小时,将保存并关闭零件文件,然后将创建新的零件文件。以下代码起作用:

public class TestRollingSink {

    public static void main(String[] args){
        Map<String, String> configs = ConfigUtils.loadConfigs("/Users/path/to/config.yaml");

    final ParameterTool parameterTool = ParameterTool.fromMap(configs);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.getConfig().disableSysoutLogging();
    env.getConfig().setGlobalJobParameters(parameterTool);
    env.socketTextStream("localhost", 9092);

    DataStream<String> parsed = env
            .addSource(new FlinkKafkaConsumer09<String>(
                    parameterTool.getRequired("kafka.topic"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties()));

    env.enableCheckpointing(2000, CheckpointingMode.AT_LEAST_ONCE);

    RollingSink<String> sink = new RollingSink<String>("s3://flink-test/"+"TEST");
    sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
    sink.setWriter(new StringWriter<String>());
    sink.setBatchSize(200);
    sink.setPendingPrefix("file-");
    sink.setPendingSuffix(".txt");
    parsed.print();
    parsed.addSink(sink).setParallelism(1);

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

p8ekf7hl

p8ekf7hl2#

帮助您获得一些调试信息的一个简单方法是打开s3 bucket的日志记录,该bucket应该接收kafka数据。这将为您提供更多信息,帮助您从s3的Angular 确定错误的来源:
http://docs.aws.amazon.com/amazons3/latest/ug/managingbucketlogging.html

bpzcxfmw

bpzcxfmw3#

iam权限-确保要写入s3存储桶的角色。

相关问题