当我试图在rollingsink中设置avroparquetwriter时出现问题,sink路径和writer路径似乎冲突
flink版本:1.1.3
Parquetavro版本:1.8.1
错误:
[...]
12/14/2016 11:19:34 Source: Custom Source -> Sink: Unnamed(8/8) switched to CANCELED
INFO JobManager - Status of job af0880ede809e0d699eb69eb385ca204 (Flink Streaming Job) changed to FAILED.
java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists: /home/user/data/file
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:264)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:257)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:386)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:447)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:223)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:266)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:183)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:153)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:119)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:92)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:66)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
at fr.test.SpecificParquetWriter.open(SpecificParquetWriter.java:28) // line in code => writer = new AvroParquetWriter(new Path("/home/user/data/file"), schema, compressionCodecName, blockSize, pageSize);
at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:451)
at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:371)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
... 7 more
INFO JobClientActor - 12/14/2016 11:19:34 Job execution switched to status FAILED.
12/14/2016 11:19:34 Job execution switched to status FAILED.
INFO JobClientActor - Terminate JobClientActor.
[...]
主要内容:
RollingSink sink = new RollingSink<String>("/home/user/data");
sink.setBucketer(new DateTimeBucketer("yyyy/MM/dd"));
sink.setWriter(new SpecificParquetWriter());
stream.addSink(sink);
具体Parquet师:
public class SpecificParquetWriter<V> extends StreamWriterBase<V> {
private transient AvroParquetWriter writer;
private CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private int blockSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
private int pageSize = ParquetWriter.DEFAULT_PAGE_SIZE;
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public SpecificParquetWriter(){
}
@Override
// workaround
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
Schema schema = new Schema.Parser().parse(USER_SCHEMA);
writer = new AvroParquetWriter(new Path("/home/user/data/file"), schema, compressionCodecName, blockSize, pageSize);
}
@Override
public void write(Object element) throws IOException {
if(writer != null)
writer.write(element);
}
@Override
public Writer duplicate() {
return new SpecificParquetWriter();
}
}
我不知道我做的对不对。。。
有没有简单的方法?
1条答案
按热度按时间a1o7rhls1#
在rollingsink中是writer的基类或在bucketing sink中是streambasewriter的基类存在这个问题,因为它们只接受可以处理outputstream的writer,而不保存自己的writer。
writer= new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, streamObject);
鉴于avroparquetwriter或parquetwriter接受filepathwriter = AvroParquetWriter.<V>builder(new Path("filePath")) .withCompressionCodec(CompressionCodecName.SNAPPY) .withSchema(schema).build();
我深入了解了parquetwriter,意识到我们正在尝试的东西没有意义,因为像storm这样的事件处理系统flink不能向parquet写入一条记录,而spark streaming可以,因为它是基于microbatch原理工作的。使用风暴与三叉戟,我们仍然可以写Parquet文件,但与Flink,我们不能,直到Flink推出类似的微博客。
因此,对于这种类型的用例,spark流是一个更好的选择。
如果想使用flink,也可以进行批处理。