Kafka流媒体是在事务模式下运行的吗?

fxnxkyjh  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(398)

我在ApacheIgnite中有一个分布式数据库和一个ApacheKafka流服务,它将数据流传输到ignite集群。Kafka拖缆的工作原理如下
创建ignite节点以查找群集
在集群中启动kafka streamer singleton作为服务
关闭ignite节点
ignite集群处于事务模式,但是我不确定这是保证acid还是只启用acid。这个流媒体服务点燃可以被认为是酸?
以下是Kafka拖缆的代码:

  1. public class IgniteKafkaStreamerService implements Service {
  2. private static final long serialVersionUID = 1L;
  3. @IgniteInstanceResource
  4. private Ignite ignite;
  5. private KafkaStreamer<String, JSONObject> kafkaStreamer = new KafkaStreamer<>();
  6. private IgniteLogger logger;
  7. public static void main(String[] args) throws InterruptedException {
  8. TcpDiscoverySpi spi = new TcpDiscoverySpi();
  9. TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
  10. // Set Multicast group.
  11. //ipFinder.setMulticastGroup("228.10.10.157");
  12. // Set initial IP addresses.
  13. // Note that you can optionally specify a port or a port range.
  14. ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509"));
  15. spi.setIpFinder(ipFinder);
  16. IgniteConfiguration cfg = new IgniteConfiguration();
  17. // Override default discovery SPI.
  18. cfg.setDiscoverySpi(spi);
  19. Ignite ignite = Ignition.getOrStart(cfg);
  20. // Deploy data streamer service on the server nodes.
  21. ClusterGroup forServers = ignite.cluster().forServers();
  22. IgniteKafkaStreamerService streamer = new IgniteKafkaStreamerService();
  23. ignite.services(forServers).deployClusterSingleton("KafkaService", streamer);
  24. ignite.close();
  25. }
  26. @Override
  27. public void init(ServiceContext ctx) {
  28. logger = ignite.log();
  29. IgniteDataStreamer<String, JSONObject> stmr = ignite.dataStreamer("my_cache");
  30. stmr.allowOverwrite(true);
  31. stmr.autoFlushFrequency(1000);
  32. List<String> topics = new ArrayList<>();
  33. topics.add(0,"IoTData");
  34. kafkaStreamer.setIgnite(ignite);
  35. kafkaStreamer.setStreamer(stmr);
  36. kafkaStreamer.setThreads(4);
  37. kafkaStreamer.setTopic(topics);
  38. Properties props = new Properties();
  39. props.put(StreamsConfig.APPLICATION_ID_CONFIG, "NiFi-consumer");
  40. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.242:9092");
  41. props.put("key.deserializer", StringDeserializer.class.getName());
  42. props.put("value.deserializer", StringDeserializer.class.getName());
  43. props.put("group.id", "hello");
  44. kafkaStreamer.setConsumerConfig(props);
  45. kafkaStreamer.setSingleTupleExtractor(msg -> {
  46. JSONObject jsonObj = new JSONObject(msg.value().toString());
  47. String key = jsonObj.getString("id") + "," + new Date(msg.timestamp());
  48. JSONObject value = jsonObj.accumulate("date", new Date(msg.timestamp()));
  49. return new AbstractMap.SimpleEntry<>(key, value);
  50. });
  51. }
  52. @Override
  53. public void execute(ServiceContext ctx) {
  54. kafkaStreamer.start();
  55. logger.info("KafkaStreamer started.");
  56. }
  57. @Override
  58. public void cancel(ServiceContext ctx) {
  59. kafkaStreamer.stop();
  60. logger.info("KafkaStreamer stopped.");
  61. }
  62. }
nkhmeac6

nkhmeac61#

KafkaStreamer 使用 IgniteDataStreamer 在引擎盖下面。 IgniteDataStreamer 本质上不是事务性的,因此没有任何事务性保证。

相关问题