我有一个从tcp套接字读取avro文件并用spark流将它们写入hdfs的项目。
现在,我的程序从JavaTCP套接字服务器读取。tcp套接字服务器创建avro记录并将它们放入套接字中,以便连接的spark流客户端可以读取它。
然后spark流将其写入本地磁盘空间(稍后的目标是hdfs)。spark流消费者不需要理解文件的内容,我只是使用它,因为他有tcp连接器 socketTextStream
,目标是能够在存储时读取它。问题是,当我稍后尝试使用avro api读取avro文件时,出现了以下错误:
线程“main”org.apache.avro.invalidavromagicexception中出现异常:不是avro数据文件。
从tcp套接字读取的spark流代码如下:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class DataConsumer {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("DataConsumer");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaReceiverInputDStream<String> datas = jssc.socketTextStream("localhost", 9999);
datas.dstream().saveAsTextFiles("datas/data", ".avro");
jssc.start();
jssc.awaitTermination();
}
}
写入tcp套接字服务器的代码如下:
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
public class DummyDataWriter {
private static String DATA_SCHEMA_PATH = "./src/main/resources/data-schema.avsc";
private static GenericData.Record createDummyDataAvroRecord(Schema schema, String instrument) {
GenericData.Record dataRecord = new GenericData.Record(schema);
dataRecord.put("name", instrument);
dataRecord.put("info1", (double) System.nanoTime());
dataRecord.put("info2", (double) System.nanoTime());
return dataRecord;
}
public static void main(String[] args) throws IOException, InterruptedException {
Schema schema = new Schema.Parser().parse(new File(DATA_SCHEMA_PATH));
GenericDatumWriter genericDatumWriter = new GenericDatumWriter(schema);
ServerSocket serverSocket = new ServerSocket(9999);
Socket accept = serverSocket.accept();
OutputStream outputStream = accept.getOutputStream();
try {
while (true) {
GenericData.Record dummyDataAvroRecord = createDummyDataAvroRecord(schema, instruments.get(randomNum));
BinaryEncoder binaryEncoder = new EncoderFactory().binaryEncoder(outputStream, null);
genericDatumWriter.write(dummyDataAvroRecord, binaryEncoder);
binaryEncoder.flush();
Thread.sleep(2500);
}
} finally {
outputStream.close();
}
}
}
编码肯定是错的,但我不能指出这个问题。
暂无答案!
目前还没有任何答案,快来回答吧!