org.apache.flume.Channel.getTransaction()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(293)

本文整理了Java中org.apache.flume.Channel.getTransaction()方法的一些代码示例,展示了Channel.getTransaction()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.getTransaction()方法的具体详情如下:
包路径:org.apache.flume.Channel
类名称:Channel
方法名:getTransaction

Channel.getTransaction介绍

暂无

代码示例

代码示例来源:origin: apache/flume

  1. /**
  2. * Enter the transaction boundary. This will either begin a new transaction
  3. * if one didn't already exist. If we're already in a transaction boundary,
  4. * then this method does nothing.
  5. *
  6. * @param channel The Sink's channel
  7. * @throws EventDeliveryException There was an error starting a new batch
  8. * with the failure policy.
  9. */
  10. private void enterTransaction(Channel channel) throws EventDeliveryException {
  11. // There's no synchronization around the transaction instance because the
  12. // Sink API states "the Sink#process() call is guaranteed to only
  13. // be accessed by a single thread". Technically other methods could be
  14. // called concurrently, but the implementation of SinkRunner waits
  15. // for the Thread running process() to end before calling stop()
  16. if (transaction == null) {
  17. this.transaction = channel.getTransaction();
  18. transaction.begin();
  19. failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
  20. }
  21. }

代码示例来源:origin: apache/ignite

  1. Channel channel = getChannel();
  2. Transaction transaction = channel.getTransaction();

代码示例来源:origin: apache/flume

  1. @Override
  2. public Status process() throws EventDeliveryException {
  3. Status result = Status.READY;
  4. Channel channel = getChannel();
  5. Transaction transaction = channel.getTransaction();
  6. Event event = null;
  7. try {
  8. transaction.begin();
  9. event = channel.take();
  10. if (event != null) {
  11. if (logger.isInfoEnabled()) {
  12. logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
  13. }
  14. } else {
  15. // No event found, request back-off semantics from the sink runner
  16. result = Status.BACKOFF;
  17. }
  18. transaction.commit();
  19. } catch (Exception ex) {
  20. transaction.rollback();
  21. throw new EventDeliveryException("Failed to log event: " + event, ex);
  22. } finally {
  23. transaction.close();
  24. }
  25. return result;
  26. }
  27. }

代码示例来源:origin: apache/flume

  1. Status status = Status.READY;
  2. Channel channel = getChannel();
  3. Transaction txn = channel.getTransaction();
  4. List<Row> actions = new LinkedList<Row>();
  5. List<Increment> incs = new LinkedList<Increment>();

代码示例来源:origin: apache/flume

  1. Status status = Status.READY;
  2. Channel channel = getChannel();
  3. Transaction txn = channel.getTransaction();
  4. List<Row> actions = new LinkedList<>();
  5. List<Increment> incs = new LinkedList<>();

代码示例来源:origin: apache/flume

  1. Transaction transaction = channel.getTransaction();
  2. Event event = null;
  3. long eventCounter = counterGroup.get("events.success");

代码示例来源:origin: apache/flume

  1. Transaction tx = reqChannel.getTransaction();
  2. Preconditions.checkNotNull(tx, "Transaction object must not be null");
  3. try {
  4. Transaction tx = null;
  5. try {
  6. tx = optChannel.getTransaction();
  7. tx.begin();

代码示例来源:origin: apache/flume

  1. Transaction transaction = channel.getTransaction();
  2. transaction.begin();
  3. boolean success = false;

代码示例来源:origin: apache/flume

  1. Status status = Status.READY;
  2. Channel channel = getChannel();
  3. Transaction transaction = channel.getTransaction();

代码示例来源:origin: apache/flume

  1. Status status = Status.READY;
  2. Channel channel = getChannel();
  3. Transaction transaction = channel.getTransaction();

代码示例来源:origin: apache/flume

  1. Status status = Status.READY;
  2. Channel channel = getChannel();
  3. Transaction txn = channel.getTransaction();
  4. try {
  5. txn.begin();

代码示例来源:origin: apache/flume

  1. createTopic(topic, 5);
  2. Transaction tx = memoryChannel.getTransaction();
  3. tx.begin();

代码示例来源:origin: apache/flume

  1. partitionMap.put(i, new ArrayList<Event>());
  2. Transaction tx = memoryChannel.getTransaction();
  3. tx.begin();

代码示例来源:origin: apache/flume

  1. headers.put("key", TestConstants.CUSTOM_KEY);
  2. headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
  3. Transaction tx = memoryChannel.getTransaction();
  4. tx.begin();
  5. Event event = EventBuilder.withBody(msg.getBytes(), headers);

代码示例来源:origin: apache/flume

  1. @Test
  2. public void testTopicAndKeyFromHeader() {
  3. Sink kafkaSink = new KafkaSink();
  4. Context context = prepareDefaultContext();
  5. Configurables.configure(kafkaSink, context);
  6. Channel memoryChannel = new MemoryChannel();
  7. Configurables.configure(memoryChannel, context);
  8. kafkaSink.setChannel(memoryChannel);
  9. kafkaSink.start();
  10. String msg = "test-topic-and-key-from-header";
  11. Map<String, String> headers = new HashMap<String, String>();
  12. headers.put("topic", TestConstants.CUSTOM_TOPIC);
  13. headers.put("key", TestConstants.CUSTOM_KEY);
  14. Transaction tx = memoryChannel.getTransaction();
  15. tx.begin();
  16. Event event = EventBuilder.withBody(msg.getBytes(), headers);
  17. memoryChannel.put(event);
  18. tx.commit();
  19. tx.close();
  20. try {
  21. Sink.Status status = kafkaSink.process();
  22. if (status == Sink.Status.BACKOFF) {
  23. fail("Error Occurred");
  24. }
  25. } catch (EventDeliveryException ex) {
  26. // ignore
  27. }
  28. checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC);
  29. }

代码示例来源:origin: apache/flume

  1. private Sink.Status prepareAndSend(Context context, String msg)
  2. throws EventDeliveryException {
  3. Sink kafkaSink = new KafkaSink();
  4. Configurables.configure(kafkaSink, context);
  5. Channel memoryChannel = new MemoryChannel();
  6. Configurables.configure(memoryChannel, context);
  7. kafkaSink.setChannel(memoryChannel);
  8. kafkaSink.start();
  9. Transaction tx = memoryChannel.getTransaction();
  10. tx.begin();
  11. Event event = EventBuilder.withBody(msg.getBytes());
  12. memoryChannel.put(event);
  13. tx.commit();
  14. tx.close();
  15. return kafkaSink.process();
  16. }

代码示例来源:origin: apache/flume

  1. headers.put(KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER, TestConstants.CUSTOM_TOPIC);
  2. headers.put("foo", TestConstants.CUSTOM_TOPIC);
  3. Transaction tx = memoryChannel.getTransaction();
  4. tx.begin();
  5. Event event = EventBuilder.withBody(msg.getBytes(), headers);

代码示例来源:origin: apache/flume

  1. @Test
  2. public void testReplaceSubStringOfTopicWithHeaders() {
  3. String topic = TestConstants.HEADER_1_VALUE + "-topic";
  4. Sink kafkaSink = new KafkaSink();
  5. Context context = prepareDefaultContext();
  6. context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
  7. Configurables.configure(kafkaSink, context);
  8. Channel memoryChannel = new MemoryChannel();
  9. Configurables.configure(memoryChannel, context);
  10. kafkaSink.setChannel(memoryChannel);
  11. kafkaSink.start();
  12. String msg = "test-replace-substring-of-topic-with-headers";
  13. Map<String, String> headers = new HashMap<>();
  14. headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
  15. Transaction tx = memoryChannel.getTransaction();
  16. tx.begin();
  17. Event event = EventBuilder.withBody(msg.getBytes(), headers);
  18. memoryChannel.put(event);
  19. tx.commit();
  20. tx.close();
  21. try {
  22. Sink.Status status = kafkaSink.process();
  23. if (status == Sink.Status.BACKOFF) {
  24. fail("Error Occurred");
  25. }
  26. } catch (EventDeliveryException ex) {
  27. // ignore
  28. }
  29. checkMessageArrived(msg, topic);
  30. }

代码示例来源:origin: apache/flume

  1. Map<String, String> headers = new HashMap<String, String>();
  2. headers.put(customTopicHeader, TestConstants.CUSTOM_TOPIC);
  3. Transaction tx = memoryChannel.getTransaction();
  4. tx.begin();
  5. Event event = EventBuilder.withBody(msg.getBytes(), headers);

代码示例来源:origin: apache/flume

  1. @Test
  2. public void testDefaultTopic() {
  3. Sink kafkaSink = new KafkaSink();
  4. Context context = prepareDefaultContext();
  5. Configurables.configure(kafkaSink, context);
  6. Channel memoryChannel = new MemoryChannel();
  7. Configurables.configure(memoryChannel, context);
  8. kafkaSink.setChannel(memoryChannel);
  9. kafkaSink.start();
  10. String msg = "default-topic-test";
  11. Transaction tx = memoryChannel.getTransaction();
  12. tx.begin();
  13. Event event = EventBuilder.withBody(msg.getBytes());
  14. memoryChannel.put(event);
  15. tx.commit();
  16. tx.close();
  17. try {
  18. Sink.Status status = kafkaSink.process();
  19. if (status == Sink.Status.BACKOFF) {
  20. fail("Error Occurred");
  21. }
  22. } catch (EventDeliveryException ex) {
  23. // ignore
  24. }
  25. checkMessageArrived(msg, DEFAULT_TOPIC);
  26. }

相关文章