如果要由dataflow处理的输入文件有数据,我正在执行一项清除memorystore缓存的任务。这意味着,如果输入文件没有记录,则不会刷新memorystore,但输入文件甚至有一条记录,则应该刷新memorystore,然后处理输入文件。
我的数据流应用程序是一个多管道应用程序,它读取、处理数据,然后将数据存储在memorystore中。管道正在成功执行。但是,内存存储的刷新正在工作,但是在刷新之后,插入没有发生。
我已经编写了一个函数,在检查输入文件是否有记录之后刷新memorystore。
flushingmemorystore.java文件
package com.click.example.functions;
import afu.org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class FlushingMemorystore {
private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);
public static FlushingMemorystore.Read read() {
return (new AutoValue_FlushingMemorystore_Read.Builder())
.setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
}
@AutoValue
public abstract static class Read extends PTransform<PCollection<Long>, PDone> {
public Read() {
}
@Nullable
abstract RedisConnectionConfiguration connectionConfiguration();
@Nullable
abstract Long expireTime();
abstract FlushingMemorystore.Read.Builder toBuilder();
public FlushingMemorystore.Read withEndpoint(String host, int port) {
Preconditions.checkArgument(host != null, "host cannot be null");
Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
}
public FlushingMemorystore.Read withAuth(String auth) {
Preconditions.checkArgument(auth != null, "auth cannot be null");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
}
public FlushingMemorystore.Read withTimeout(int timeout) {
Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
}
public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
}
public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
return this.toBuilder().setExpireTime(expireTimeMillis).build();
}
public PDone expand(PCollection<Long> input) {
Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
return PDone.in(input.getPipeline());
}
private static class ReadFn extends DoFn<Long, String> {
private static final int DEFAULT_BATCH_SIZE = 1000;
private final FlushingMemorystore.Read spec;
private transient Jedis jedis;
private transient Pipeline pipeline;
private int batchCount;
public ReadFn(FlushingMemorystore.Read spec) {
this.spec = spec;
}
@Setup
public void setup() {
this.jedis = this.spec.connectionConfiguration().connect();
}
@StartBundle
public void startBundle() {
this.pipeline = this.jedis.pipelined();
this.pipeline.multi();
this.batchCount = 0;
}
@ProcessElement
public void processElement(DoFn<Long, String>.ProcessContext c) {
Long count = c.element();
batchCount++;
if(count==null && count < 0) {
LOGGER.info("No Records are there in the input file");
} else {
if (pipeline.isInMulti()) {
pipeline.exec();
pipeline.sync();
jedis.flushDB();
}
LOGGER.info("*****The memorystore is flushed*****");
}
}
@FinishBundle
public void finishBundle() {
if (this.pipeline.isInMulti()) {
this.pipeline.exec();
this.pipeline.sync();
}
this.batchCount=0;
}
@Teardown
public void teardown() {
this.jedis.close();
}
}
@AutoValue.Builder
abstract static class Builder {
Builder() {
}
abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);
abstract FlushingMemorystore.Read build();
abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);
}
}
}
我正在我的启动程序管道代码中使用函数。
正在使用函数的启动程序管道的代码段:
StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StorageToRedisOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from(options.getInputFile()));
/**
* Flushing the Memorystore if there are records in the input file
*/
lines.apply("Checking Data in input file", Count.globally())
.apply("Flushing the data store", FlushingMemorystore.read()
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
清除缓存后要插入的已处理数据的代码段:
dataset.apply(SOME_DATASET_TRANSFORMATION, RedisIO.write()
.withMethod(RedisIO.Write.Method.SADD)
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
数据流执行得很好,它也会刷新memorystore,但之后插入就不起作用了。你能指出我哪里出错了吗?任何解决这个问题的办法都是值得赞赏的。提前谢谢!
编辑:
按评论中的要求提供补充资料
使用的运行时是Java11,它使用的是ApacheBeamSDK for 2.24.0
如果输入文件有记录,它将用一些逻辑处理数据。例如,如果输入文件包含如下数据:
abcabc|Bruce|Wayne|2000
abbabb|Tony|Stark|3423
数据流将统计记录的数量,在这种情况下是2,并根据逻辑处理id、名字等,然后存储在memorystore中。这个输入文件每天都会出现,因此,如果输入文件有记录,应该清除(或刷新)memorystore。
虽然管道没有破裂,但我想我遗漏了什么。
2条答案
按热度按时间gxwragnw1#
这个问题在我申请wait.on转换后就解决了,pablo的回答已经解释过了。但是,我不得不将flushingmemorystore.java重写为flushsignal标志的pcollection。
函数如下:
zxlwwiss2#
我怀疑这里的问题是,您需要确保“flush”步骤在redisio.write步骤发生之前运行(并完成)。beam有一个wait.on变换,您可以使用它。
为了实现这一点,我们可以使用flushing ptransform的输出作为我们已经刷新了数据库的信号—并且我们只在完成刷新之后才写入数据库。这个
process
呼叫您的冲洗dofn如下所示:一旦有信号指出数据库已被刷新,我们就可以使用它等待,然后再将新数据写入其中: