使用Rocksdb状态后端的LZ 4压缩对Flink-SQL应用程序的影响

jv4diomz  于 2024-01-04  发布在  Apache
关注(0)|答案(1)|浏览(203)

我正在使用flink 1.17.1并运行flink-sql应用程序,如

create table enrich_table (
col1 STRING,
col2....
)
WITH ( 'connector' = 'upsert-kafka',
    'topic' = 'enrich_table',
    'properties.bootstrap.servers' = '${KAFKA_BROKERS}',
    'properties.group.id' = '${CONSUMER_GROUP_ID}',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '${SCHEMA_REGISTRY_URL}',
    'key.format' = 'avro-confluent',
    'key.avro-confluent.url' = '${SCHEMA_REGISTRY_URL}');

Insert into enrich_table as 
select
    col1,col2...

from 
    table1 t1 
    inner join table2 t2 on ..
    inner join table3 t2 on ...
    .....

字符串
通过创建自定义工厂将LZ 4压缩设置为rocksdb

package my.org.abc

import org.rocksdb.{ColumnFamilyOptions, DBOptions}
import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.contrib.streaming.state.{ConfigurableRocksDBOptionsFactory, DefaultConfigurableOptionsFactory, RocksDBOptionsFactory}

import org.apache.flink.configuration.ConfigOptions
import org.apache.flink.configuration.ConfigOption

import org.apache.flink.configuration.description.LinkElement.link
import org.rocksdb.CompressionType
import org.apache.flink.configuration.description.Description

class RocksDBTuningJobOptionsFactory extends ConfigurableRocksDBOptionsFactory {
  private val defaultFactory = new DefaultConfigurableOptionsFactory
  private var configuration: ReadableConfig = _
  private val COMPRESSION: ConfigOption[CompressionType] =
    ConfigOptions.key("state.backend.rocksdb.custom.compression")
      .enumType(classOf[CompressionType])
      .noDefaultValue()
      .withDescription(
        Description.builder()
          .text("Configures RocksDB compression")
          .linebreak()
          .text(
            "For more information, please refer to %s",
            link("https://github.com/facebook/rocksdb/wiki/Compression")
          )
          .build()
      )

  override def configure(configuration: ReadableConfig): RocksDBOptionsFactory = {
    defaultFactory.configure(configuration)
    this.configuration = configuration
    this
  }

  override def createDBOptions(currentOptions: DBOptions, handlesToClose: java.util.Collection[AutoCloseable]): DBOptions = {
    defaultFactory.createDBOptions(currentOptions, handlesToClose)
  }

  override def createColumnOptions(currentOptions: ColumnFamilyOptions, handlesToClose: java.util.Collection[AutoCloseable]): ColumnFamilyOptions = {
    var updatedOptions = defaultFactory.createColumnOptions(currentOptions, handlesToClose)

    configuration.getOptional(COMPRESSION).ifPresent(currentOptions.setCompressionType)
    updatedOptions
  }
}


并提供如下配置

........
state.backend.rocksdb.use-bloom-filter: "true"
state.backend.rocksdb.options-factory: 'com.mdsol.streaming.util.RocksDBTuningJobOptionsFactory'
state.backend.rocksdb.custom.compression: "LZ4_COMPRESSION"


问题1:根据讨论here,LZ 4提高了性能。只是想知道社区使用什么来处理大状态,高容量和速度的基于flink-sql(没有临时连接)的应用程序?
Q#2:为了设置Lz 4压缩,我们必须扩展可扩展RocksDBOptionsFactory吗?或者有更好的方法吗?
问题3:将压缩(Lz 4)更改为已运行并具有检查点的现有作业是否有任何影响

6yoyoihd

6yoyoihd1#

关于Speedb Hive:
Q1默认压缩是Snappy。Q2 Flink问题。Q3 RocksDB和Speedb允许随时更改压缩类型。如果Flink不支持,我们在Speedb创建了一个后门来更改运行时的任何可变选项。
如果您有更多问题或需要其他信息,您可以找到Speedb hive here和(注册后)包含您的问题here的线程链接

相关问题