仅使用spark流媒体读取和写入avro文件

trnvg8h3  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(316)

我有一个从tcp套接字读取avro文件并用spark流将它们写入hdfs的项目。
现在,我的程序从JavaTCP套接字服务器读取。tcp套接字服务器创建avro记录并将它们放入套接字中,以便连接的spark流客户端可以读取它。
然后spark流将其写入本地磁盘空间(稍后的目标是hdfs)。spark流消费者不需要理解文件的内容,我只是使用它,因为他有tcp连接器 socketTextStream ,目标是能够在存储时读取它。问题是,当我稍后尝试使用avro api读取avro文件时,出现了以下错误:
线程“main”org.apache.avro.invalidavromagicexception中出现异常:不是avro数据文件。
从tcp套接字读取的spark流代码如下:

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class DataConsumer {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("DataConsumer");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

        JavaReceiverInputDStream<String> datas = jssc.socketTextStream("localhost", 9999);

        datas.dstream().saveAsTextFiles("datas/data", ".avro");

        jssc.start();
        jssc.awaitTermination();
    }
}

写入tcp套接字服务器的代码如下:

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;

public class DummyDataWriter {
    private static String DATA_SCHEMA_PATH = "./src/main/resources/data-schema.avsc";

    private static GenericData.Record createDummyDataAvroRecord(Schema schema, String instrument) {
        GenericData.Record dataRecord = new GenericData.Record(schema);

        dataRecord.put("name", instrument);
        dataRecord.put("info1", (double) System.nanoTime());
        dataRecord.put("info2", (double) System.nanoTime());

        return dataRecord;
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Schema schema = new Schema.Parser().parse(new File(DATA_SCHEMA_PATH));

        GenericDatumWriter genericDatumWriter = new GenericDatumWriter(schema);
        ServerSocket serverSocket = new ServerSocket(9999);
        Socket accept = serverSocket.accept();
        OutputStream outputStream = accept.getOutputStream();

        try {
            while (true) {
                GenericData.Record dummyDataAvroRecord = createDummyDataAvroRecord(schema, instruments.get(randomNum));

                BinaryEncoder binaryEncoder = new EncoderFactory().binaryEncoder(outputStream, null);
                genericDatumWriter.write(dummyDataAvroRecord, binaryEncoder);
                binaryEncoder.flush();
                Thread.sleep(2500);
            }
        } finally {
            outputStream.close();
        }
    }
}

编码肯定是错的,但我不能指出这个问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题