java—如何修改保存在一个Kafka主题中的twitter api消息并将其发送到另一个Kafka主题

e0bqpujr  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(313)

我已经创建了kafka producer,它使用hbc核心从twitterapi生成一个主题的消息,我想修改这些消息,因为我只需要几个字段,比如tweet创建时间、id字符串、一些关于用户的基本信息和tweet的文本。我尝试使用kafka streams和pojo模型,但在提取文本时遇到问题,因为根据tweet是否被转发、是否有140多个标志等,全文可能位于不同的命名字段中。我的pojo模型:

"type": "object",
  "properties": {
    "created_at": { "type": "string" },
    "id_str": { "type": "string" },
    "user": {
      "type": "object",
      "properties": {
        "location": { "type": "string" },
        "followers_count": { "type": "integer" },
        "friends_count": { "type": "integer" },
        "created_at": { "type": "string" }
      }
    },
    "text": { "type": "string" }
  }
}

这是使用Kafka流的正确方法还是有更好的解决方案来提取这些字段并放到另一个主题?

xbp102n0

xbp102n01#

不需要中间客户机、系统、kafka流、花哨的util或奇迹框架,只需关注旧的简单方法:重用生产者和生成并发送完整pojo的代码。 Producers 是线程安全的,所以使用同一个producer示例通过不同的线程生成两个或多个主题是完全可以的。
这只是一个简化,因为我不知道你们实现的细节。我假设消息(pojo)是一个简单的 String . 想象一下,相信不同的字母是字段。从 fullPojo ,您希望发送只包含两个字段的消息,表示为 y 以及 v ,转到另一个主题。

String fullPojo = "xxxxxyxxxxv";
  //some logic to extract the desired fields
  String shortPojo = getDesiredFields(fullPojo);
  /* shortPojo="yv" */

在kafka集群上创建一个新主题,在本例中,它将被调用 shortPojoTopic .
就用同样的 producer 它通过第二次调用将完整数据发送到原始主题,以便用仅包含筛选值的消息填充短主题:

producer.send(new ProducerRecord<String, String>(fullPojoTopic,  fullPojo));
producer.send(new ProducerRecord<String, String>(shortPojoTopic, shortPojo));

第二个调用也可以从另一个辅助线程执行。如果希望在这里完成多线程处理,可以定义另一个线程来执行“过滤”任务。把原件递给我就行了 producer 引用第二个线程并用类似 FIFO 结构( deques, queues ,…)持有完整的POJO。
原始线程将fullpojo发送到 fullPojoTopic 主题,并将fullpojo推入队列。
这个次要的“filterer”线程将从queue/deque中删除top消息,提取创建shortpojo所需的字段,并将其发送到 shortPojoTopic (使用相同的 producer ,而不必担心生产者同步问题)。
如果其中一个主题处于错误状态并且不能接受更多消息,或者其中一个主题位于另一个刚刚失败的kafka集群上(在这种情况下,您还需要两个不同的生产者),那么第二个线程也可以避免锁定整个系统,或者即使过滤过程在过滤某些格式错误的消息时发现一些困难。例如,即使 shortPojoTopic 是out,这不会影响第一个线程的性能,因为它将继续发送他的fullpojo而不会出现问题/延迟。
始终注意内存使用:如果第二个线程被大量时间卡住,或者无法遵循第一个线程的节奏,则应该以某种方式限制/控制队列/队列大小,以避免oom。如果发生这种情况,它将无法足够快地读取/删除消息,从而产生一个可能导致上述oom问题的延迟
此外,即使没有主题/代理出现问题,这种分离也会提高总体性能,因为原始线程不必等待每次迭代时在其线程中发生的过滤过程。
第一个线程只发送pojo;第二个线程只是过滤并发送短pojo。简单的责任,都是并行的。
假设您可以控制生产者及其发送的内容,我建议您将逻辑直接放在那里,以避免其他中间系统(流…)。只需提取核心代码中的字段,并使用同一个生产者将恢复的pojo生成到另一个主题。只使用一个线程或尽可能多的你想要的。
我敢打赌我自己的房子和右手,这是远远,远远快于任何流实用程序,你能想到的。
如果您没有访问该代码的权限,则可以创建一个中间消费生产者服务,在下一节中继续。
如果原始pojo生成和生产的代码不可访问
如果您只能访问完整的pojo主题,而不能访问上一步(生成消息并将它们发送到主题的代码),那么第二个选项可以是创建一个中间kafka使用者生产者,它使用来自 fullPojoTopic ,提取字段并将过滤后的shortpojo生成到 shortPojoTopic .
请注意,该逻辑与第一种方法中的逻辑相同,但此解决方案意味着更大的资源浪费:新的生产者线程和使用者线程(相信我,它们会创建许多次线程)、要管理的新使用者组、在线路上双重传输fullpojo消息等。。
我的建议是,只有当您不能直接访问以第一种方式生成完整pojo的代码,或者您希望对过滤完整数据并将所需字段发送到另一个主题的微服务有更大的控制时,才应该使用此选项。

相关问题