我对java有点陌生,所以我希望得到一些关于处理kafka predicate 中多个条件的建议。我有下面的代码,我能够有基于动态输入的动态过滤器,并避免下面的'如果/elseifs'?。我保持代码简单和愚蠢,以便更容易理解我要做什么。我试着理解,作为用户输入的结果,应用/附加多少过滤器的推荐方法是什么。我还想知道是否有可能有基于用户输入的比较运算符(等于/包含/…)。
public Topology buildTopology(Properties envProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = envProps.getProperty("input.topic.name");
final String streamsOutputTopic = envProps.getProperty("streams.approved.topic.name");
final String tableOutputTopic = envProps.getProperty("table.output.topic.name");
final Serde<String> stringSerde = Serdes.String();
final KStream<String, ClientTask> stream = builder.stream(inputTopic, Consumed.with(stringSerde,StreamsSerdes.ClientTask()));
String[] Filters = {"State=Work In Progress","Priorit=High"};
final KStream<String, ClientTask> filter_stream = stream.filter(IsGoodtoGoFilter(Filters));
filter_stream.to(streamsOutputTopic, Produced.with(stringSerde, StreamsSerdes.ClientTask()));
return builder.build();
}
public static Predicate<String, ClientTask> IsGoodtoGoFilter(String[] Filters) {
ArrayList<String> GetColumn = new ArrayList<>();
ArrayList<String> GetValue = new ArrayList<>();
for (String FilterColumn: Filters) {
String[] ColumnAndValue = FilterColumn.split("=");
GetColumn.add(ColumnAndValue[0]);
GetValue.add(ColumnAndValue[1]);
}
if (GetColumn.size() == 1) {
return (k, v) -> v.get(GetColumn.get(0)).toString().equals(GetValue.get(0));
}
else if (GetColumn.size() == 2) {
return (k, v) -> v.get(GetColumn.get(0)).toString().equals(GetValue.get(0)) &&
v.get(GetColumn.get(1)).toString().equals(GetValue.get(1));
}
return (k, v) -> v.getStatus().equals("Open"); // replace by empty predicate later
}
暂无答案!
目前还没有任何答案,快来回答吧!