我正在设置一个新的kafka流应用程序,并希望使用rocksdb自定义状态存储。这对于将数据放入状态存储并从中获取可查询的状态存储并对数据进行迭代非常有效,但是,在大约72小时后,我观察到存储中缺少数据。kafka streams或rocksdb中状态存储的数据是否有默认保留时间?
我正在使用rocksdb使用自定义状态存储,这样我们就可以利用列族特性,这是kstreams的嵌入式rocksdb实现所不能使用的。我使用keyvaluestore接口实现了自定义存储。还有我自己的店面供应商、店面建造者、店面类型和店面 Package 器。已经为应用程序创建了一个changelog主题,但是还没有数据进入它(还没有研究这个问题)。
将数据放入这个自定义状态存储并从中获取可查询的状态存储工作正常。然而,我看到数据丢失后,大约72小时从商店。我通过获取state store目录的大小、将数据导出到文件并检查条目数来进行检查。
使用快速压缩和通用压缩
简单拓扑:
final StreamsBuilder builder = new StreamsBuilder();
String storeName = "store-name"
List<String> cfNames = new ArrayList<>();
// Hybrid custom store
final StoreBuilder customStore = new RocksDBColumnFamilyStoreBuilder(storeName, cfNames);
builder.addStateStore(customStore);
KStream<String, String> inputstream = builder.stream(
inputTopicName,
Consumed.with(Serdes.String(), Serdes.String()
));
inputstream
.transform(() -> new CurrentTransformer(storeName), storeName);
Topology tp = builder.build();
来自自定义存储实现的代码段:
RocksDBColumnFamilyStore(final String name, final String parentDir, List<String> columnFamilyNames) {
.....
......
final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig()
.setBlockCache(cache)
.setBlockSize(BLOCK_SIZE)
.setCacheIndexAndFilterBlocks(true)
.setPinL0FilterAndIndexBlocksInCache(true)
.setFilterPolicy(filter)
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinTopLevelIndexAndFilter(true)
;
cfOptions = new ColumnFamilyOptions()
.setCompressionType(CompressionType.SNAPPY_COMPRESSION)
.setCompactionStyle(CompactionStyle.UNIVERSAL)
.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS)
.setOptimizeFiltersForHits(true)
.setLevelCompactionDynamicLevelBytes(true)
.setTableFormatConfig(tableConfig);
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOptions));
columnFamilyNames.stream().forEach((cfName) -> columnFamilyDescriptors.add(new ColumnFamilyDescriptor(cfName.getBytes(), cfOptions)));
}
@SuppressWarnings("unchecked")
public void openDB(final ProcessorContext context) {
Options opts = new Options()
.prepareForBulkLoad();
options = new DBOptions(opts)
.setCreateIfMissing(true)
.setErrorIfExists(false)
.setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
.setMaxOpenFiles(-1)
.setWriteBufferManager(writeBufferManager)
.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2))
.setCreateMissingColumnFamilies(true);
fOptions = new FlushOptions();
fOptions.setWaitForFlush(true);
dbDir = new File(new File(context.stateDir(), parentDir), name);
try {
Files.createDirectories(dbDir.getParentFile().toPath());
db = RocksDB.open(options, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
columnFamilyHandles.stream().forEach((handle) -> {
try {
columnFamilyMap.put(new String(handle.getName()), handle);
} catch (RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
}
});
} catch (RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
}
open = true;
}
预期状态存储(rocksdb)将无限期地保留数据,直到手动删除或存储磁盘关闭。我不知道Kafka流已经推出了与国有商店ttl尚未。
暂无答案!
目前还没有任何答案,快来回答吧!