kafka流以特定键作为输入连接

gajydyqb  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(265)

我在schema registry中有3个不同的主题和3个avro文件,我想流式处理这些主题并将它们连接在一起,并将它们写入一个主题。问题是我想要加入的键与我将数据写入每个主题的键不同。
假设我们有3个avro文件:
警报:

{
  "type" : "record",
  "name" : "Alarm",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "alarm_id",
    "type" : "string",
    "doc" : "Unique identifier of the alarm."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "Unique identifier of the  network element ID that produces the alarm."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the alarm was generated."
  }, {
    "name" : "severity",
    "type" : [ "null", "string" ],
    "doc" : "The severity field is the default severity associated to the alarm ",
    "default" : null
  }]
}

事件:

{
  "type" : "record",
  "name" : "Incident",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "incident_id",
    "type" : "string",
    "doc" : "Unique identifier of the incident."
  }, {
    "name" : "incident_type",
    "type" : [ "null", "string" ],
    "doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
    "default" : null
  }, {
    "name" : "alarm_source_id",
    "type" : "string",
    "doc" : "Respective Alarm"
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the incident was generated on the node."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "ID of specific network element."
  }]
}

维护:

{
  "type" : "record",
  "name" : "Maintenance",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "maintenance_id",
    "type" : "string",
    "doc" : "The message number is the unique ID for every maintenance"
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "The NE ID is the network element ID on which the maintenance is done."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }, {
    "name" : "end_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }]
}

在我的Kafka中,我有3个主题,分别是关于这些avro的(ley's say alarm\u raw,incident\u raw,maintenance\u raw),每当我想写这些主题时,我都使用ne\u id作为键(因此主题被ne\u id分割)。现在我想加入这三个主题,并获得一个新的记录,并写进一个新的主题。问题是我想基于报警id和报警源id加入报警和事件,基于网元id加入报警和维护。我想避免创建新主题并重新分配新密钥。我在加入时是否指定了密钥?

wlp8pajw

wlp8pajw1#

这取决于你想使用哪种连接(c.f。https://cwiki.apache.org/confluence/display/kafka/kafka+streams+join+semantics)
对于kstream kstream join,当前( v0.10.2 和更早)除了设置一个新键(例如,使用 selectKey() )重新分区。
对于kstream ktable join,kafka 0.10.2 (将在未来几周发布)包含一个名为 GlobalKTables (成本加运费)。https://cwiki.apache.org/confluence/display/kafka/kip-99%3a+add+global+tables+to+kafka+streams). 这允许您在ktable上执行非键联接(即kstream globalktable联接,因此不需要重新划分globalktable中的数据)。
注意:kstream globalktable联接与kstream ktable联接具有不同的语义。与后者相比,它不是时间同步的,因此,就globalktable更新而言,连接在设计上是不确定的;i、 例如,无法保证哪个kstream记录将第一个“看到”globalktable更新并因此与更新的globalktable记录联接。
还计划添加一个ktable globalktable连接。这可能在中提供 0.10.3 . 不过,目前还没有计划添加“全局”kstream kstream连接。

1cklez4t

1cklez4t2#

您可以通过修改来维护同一个密钥。
你可以用 KeyValueMapper 通过它,您可以修改您的键以及值。
您应该按如下方式使用它:

val modifiedStream = kStream.map[String,String](
    new KeyValueMapper[String, String,KeyValue[String,String]]{
        override def apply(key: String, value: String): KeyValue[String, String] = new KeyValue("modifiedKey", value)
    }
)

您可以将上述逻辑应用于多个 Kstream 对象来维护用于连接的单个键 KStream s。

相关问题