在hdfs中写入avro文件-存在

5kgi1eie  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(504)

目前我正在学习spark流媒体和avro,所以我的第一个例子是,读取spark rdd并构建通用记录,创建avro文件,这个文件我应该用hdfs编写。现在我可以打开avro文件,我是否附加到hdfs文件存在?
这段代码编写了一个avro文件,但是当我尝试添加或附加时,失败了。我使用Java8来实现这个

public static void saveAvro(GenericRecord record, Schema schema) throws IOException {

        DatumWriter<GenericRecord> bdPersonDatumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(bdPersonDatumWriter);

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create("hdfs://sandbox-hdp.hortonworks.com:8020/tmp/poc/ResultHDFSTest.avro"),
                conf);
        Path F = new Path("hdfs://sandbox-hdp.hortonworks.com:8020/tmp/poc/ResultHDFSTest.avro");
        fs.setReplication(F, (short) 1);

        if (!fs.exists(F)) {
            System.out.println("File not exists.. creating....");
            OutputStream out = fs.create(F, (short) 1);
            System.out.println("OutputStream create.");
            dataFileWriter.create(schema, out);
            System.out.println("dataFileWriter create.");
            dataFileWriter.append(record);
            System.out.println("dataFileWriter append OK {0} .");

        } else {
            //Here fail, not open file.. avro stored in HDFS
            System.out.println("File exists....");
           // I want to add information to an existing avro file.
            dataFileWriter.append(record);
            System.out.println("dataFileWriter append OK {1} .");
        }
        dataFileWriter.close();
        System.out.println("dataFileWriter closed.");

    }

附加存在文件avro hdfs的堆栈跟踪:
线程“main”org.apache.avro.avroruntimeexception中出现异常:未在org.apache.avro.file.datafilewriter.assertopen(datafilewriter)打开。java:88)在org.apache.avro.file.datafilewriter.append(datafilewriter。java:311)在com.test.avro.app.saveavro(app。java:83)在com.test.avro.app.main(app。java:55)
datafilewriter appendto方法只接受java.nio文件。我想做的是正确的还是有别的方法?
编辑1。我想向现有文件添加信息。
第一个代码片段显示了您试图创建avro文件的实现。以下是我的spark流媒体框架代码:

JavaStreamingContext jssc = sparkConfigurationBuilder
                .buildJSC(sparkConfigurationBuilder.buildSparkConfiguration());

    jssc.sparkContext().checkpointFile("c:\\tmp");
    Map<String, Object> kafkaParams = sparkDriverUtils.getKafkaProperties();        
    Collection<String> topics = Arrays.asList(sparkDriverUtils.getTopics().trim().split(","));// 1 o more topics        
    LOGGER.warn("Lista de Topics: " + topics.toString());

...

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
//This DSTream resulto to avro..
JavaDStream<Transactions> transactionsDS = transactions.map(f-> {
            Transactions txn = jsonMapperUtil.rowToTransaction(f);
            LOGGER.warn("Retornar  : JavaDStream<Transactions>");
            return  txn;
        });

现在我想把transactionsds结果保存为hdfs中的avro文件。我有一个问题,javastreamingcontext我可以为数据集创建sparksession,或者我应该改变订阅kafka代理的方式吗?
当做。

wpcxdonn

wpcxdonn1#

datafilewriter appendto方法只接受java.nio文件
对的。avro与hdfs路径没有连接。
为了“附加到hdfs文件”,您需要在本地下载它们,然后覆盖它们的全部内容
除此之外,您还提到了spark流,但所示代码中没有任何部分实际使用spark api调用

相关问题