我在应用程序中使用kafka流和持久键值存储。我正在使用两个keyvalue存储和两个处理器。我面临两个处理器之间共享的statestore的问题。nameprocessor将数据放入namestore,eventprocessor从namestore提取数据。从调试来看,nameprocessor能够成功地放置数据,但是当eventprocessor试图从namestore获取数据时,它没有获取任何数据。下面是应用程序类、拓扑、nameprocessor和eventprocessor的代码段。另外,我使用的是SpringBootParentVersion2.4.3、kafka streams版本2.2.0和kafka客户端版本2.2.0
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
Properties configs = getKafkaStreamProperties();
Topology builder = new Topology();
new ApplicationTopology(builder);
KafkaStreams stream = new KafkaStreams(builder, configs);
stream.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
// here you should examine the throwable/exception and perform an appropriate action!
logger.error("Uncaught exception in stream, MessageDetail: "+ ExceptionUtils.getRootCauseMessage(throwable) + ", Stack Trace: " + throwable.fillInStackTrace());
Runtime.getRuntime().halt(1);
});
Runtime.getRuntime().addShutdownHook(new Thread(stream::close));
stream.start();
}
private static Properties getKafkaStreamProperties() {
Properties configs = new Properties();
configs.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, getApplicationId());
configs.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
configs.setProperty(StreamsConfig.RETRIES_CONFIG, getRetries());
configs.setProperty(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackOffMs());
configs.setProperty(StreamsConfig.REPLICATION_FACTOR_CONFIG, getReplicationFactor());
configs.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs());
return configs;
}
public class ApplicationTopology {
public ApplicationTopology (Topology builder) {
StoreBuilder<KeyValueStore<String, Sensor>> nameStoreBuilder = Stores.
keyValueStoreBuilder(Stores.persistentKeyValueStore("nameStore"), Serdes.String(), CustomSerdes.getNameSerde()).withCachingEnabled().withLoggingEnabled(new HashMap<>());
StoreBuilder<KeyValueStore<String, String>> stateStoreBuilder = Stores.
keyValueStoreBuilder(Stores.persistentKeyValueStore("stateStore"), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(new HashMap<>());
builder.addSource(AutoOffsetReset.LATEST, "source", Serdes.String().deserializer(), CustomSerdes.getIncomingEventSerde().deserializer(), getInboundTopic())
.addProcessor(TRANSFORMER, () -> new EventProcessor(), "source")
.addStateStore(nameStoreBuilder, TRANSFORMER)
.addSink("sink", getOutboundTopic(), Serdes.String().serializer(), CustomSerdes.getIncomingEventSerde().serializer(), TRANSFORMER);
//reset to earliest for model config topic as some models could be already on the topic
builder.addSource(AutoOffsetReset.EARLIEST, "nameStoreSource", Serdes.String().deserializer(), CustomSerdes.getSensorSerde().deserializer(), getInboundSensorUpdateTopic())
.addProcessor("process", () -> new NameProcessor(), "nameStoreSource")
.addStateStore(nameStoreBuilder, TRANSFORMER, "process");
}
public ApplicationTopology() {}
} }
public class NameProcessor extends AbstractProcessor<String, Sensor> {
private static final Logger LOGGER = LoggerFactory.getLogger(NameProcessor.class);
ProcessorContext context;
private KeyValueStore<String, Name> nameStore;
private static List<String> externalDeviceIdList = new ArrayList<>();
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
this.nameStore = (KeyValueStore<String, Name>) context.getStateStore("nameStore");
}
@Override
public void process(String externalDeviceId, Name name) {
if (StringUtils.isNotBlank(externalDeviceId)) {
String[] externalDeviceIds = SensorUtils.getExternalDeviceIdsWithoutSuffix(externalDeviceId);
if (Objects.isNull(name)) {
Arrays.stream(externalDeviceIds).forEach(id -> {
sensorStore.delete(id);
});
} else {
addOrUpdateNameInStore(sensor, externalDeviceIds);
}
}
}
private void addOrUpdateNameInStore(Sensor sensor, String[] externalDeviceIds) {
Arrays.stream(externalDeviceIds).forEach(id -> {
sensorStore.put(id, sensor);
});
// context.commit();
}
}
public class EventProcessor extends AbstractProcessor<String, IncomingEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
ProcessorContext context;
private KeyValueStore<String, Name> nameStore;
private KeyValueStore<String, String> stateStore;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
this.nameStore = (KeyValueStore<String, Name>) context.getStateStore("nameStore");
this.stateStore = (KeyValueStore<String, String>) context.getStateStore("stateStore");
}
@Override
public void process(String key, IncomingEvent value) {
String correlationId = UUID.randomUUID().toString();
try {
String externalDeviceId = value.getExternalDeviceId();
Name nameFromStore = nameStore.get(externalDeviceId);
}
}
}
In nameFromStore variable, I don't get even value even after storing it in NameProcessor.
暂无答案!
目前还没有任何答案,快来回答吧!