org.apache.flume.Channel.put()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(6.7k)|赞(0)|评价(0)|浏览(295)

本文整理了Java中org.apache.flume.Channel.put()方法的一些代码示例,展示了Channel.put()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.put()方法的具体详情如下:
包路径:org.apache.flume.Channel
类名称:Channel
方法名:put

Channel.put介绍

[英]Puts the given event into the channel.

Note: This method must be invoked within an active Transaction boundary. Failure to do so can lead to unpredictable results.
[中]将给定事件放入通道。
注意:此方法必须在活动事务边界内调用。不这样做可能导致不可预测的结果。

代码示例

代码示例来源:origin: apache/flume

  1. @Override
  2. public void run() {
  3. channel.put(event);
  4. }
  5. });

代码示例来源:origin: apache/flume

  1. @Override
  2. public void run() {
  3. for (Event event : events) {
  4. channel.put(event);
  5. }
  6. }
  7. });

代码示例来源:origin: apache/flume

  1. reqChannel.put(event);
  2. optChannel.put(event);

代码示例来源:origin: apache/flume

  1. tx.begin();
  2. reqChannel.put(event);
  3. tx.begin();
  4. optChannel.put(event);

代码示例来源:origin: apache/flume

  1. memoryChannel.put(event);
  2. tx.commit();
  3. tx.close();

代码示例来源:origin: apache/flume

  1. memoryChannel.put(event);

代码示例来源:origin: apache/flume

  1. tx.begin();
  2. Event event = EventBuilder.withBody(msg.getBytes(), headers);
  3. memoryChannel.put(event);
  4. tx.commit();
  5. tx.close();

代码示例来源:origin: apache/flume

  1. @Test
  2. public void testTopicAndKeyFromHeader() {
  3. Sink kafkaSink = new KafkaSink();
  4. Context context = prepareDefaultContext();
  5. Configurables.configure(kafkaSink, context);
  6. Channel memoryChannel = new MemoryChannel();
  7. Configurables.configure(memoryChannel, context);
  8. kafkaSink.setChannel(memoryChannel);
  9. kafkaSink.start();
  10. String msg = "test-topic-and-key-from-header";
  11. Map<String, String> headers = new HashMap<String, String>();
  12. headers.put("topic", TestConstants.CUSTOM_TOPIC);
  13. headers.put("key", TestConstants.CUSTOM_KEY);
  14. Transaction tx = memoryChannel.getTransaction();
  15. tx.begin();
  16. Event event = EventBuilder.withBody(msg.getBytes(), headers);
  17. memoryChannel.put(event);
  18. tx.commit();
  19. tx.close();
  20. try {
  21. Sink.Status status = kafkaSink.process();
  22. if (status == Sink.Status.BACKOFF) {
  23. fail("Error Occurred");
  24. }
  25. } catch (EventDeliveryException ex) {
  26. // ignore
  27. }
  28. checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC);
  29. }

代码示例来源:origin: apache/flume

  1. private Sink.Status prepareAndSend(Context context, String msg)
  2. throws EventDeliveryException {
  3. Sink kafkaSink = new KafkaSink();
  4. Configurables.configure(kafkaSink, context);
  5. Channel memoryChannel = new MemoryChannel();
  6. Configurables.configure(memoryChannel, context);
  7. kafkaSink.setChannel(memoryChannel);
  8. kafkaSink.start();
  9. Transaction tx = memoryChannel.getTransaction();
  10. tx.begin();
  11. Event event = EventBuilder.withBody(msg.getBytes());
  12. memoryChannel.put(event);
  13. tx.commit();
  14. tx.close();
  15. return kafkaSink.process();
  16. }

代码示例来源:origin: apache/flume

  1. tx.begin();
  2. Event event = EventBuilder.withBody(msg.getBytes(), headers);
  3. memoryChannel.put(event);
  4. tx.commit();
  5. tx.close();

代码示例来源:origin: apache/flume

  1. @Test
  2. public void testReplaceSubStringOfTopicWithHeaders() {
  3. String topic = TestConstants.HEADER_1_VALUE + "-topic";
  4. Sink kafkaSink = new KafkaSink();
  5. Context context = prepareDefaultContext();
  6. context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
  7. Configurables.configure(kafkaSink, context);
  8. Channel memoryChannel = new MemoryChannel();
  9. Configurables.configure(memoryChannel, context);
  10. kafkaSink.setChannel(memoryChannel);
  11. kafkaSink.start();
  12. String msg = "test-replace-substring-of-topic-with-headers";
  13. Map<String, String> headers = new HashMap<>();
  14. headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
  15. Transaction tx = memoryChannel.getTransaction();
  16. tx.begin();
  17. Event event = EventBuilder.withBody(msg.getBytes(), headers);
  18. memoryChannel.put(event);
  19. tx.commit();
  20. tx.close();
  21. try {
  22. Sink.Status status = kafkaSink.process();
  23. if (status == Sink.Status.BACKOFF) {
  24. fail("Error Occurred");
  25. }
  26. } catch (EventDeliveryException ex) {
  27. // ignore
  28. }
  29. checkMessageArrived(msg, topic);
  30. }

代码示例来源:origin: apache/flume

  1. tx.begin();
  2. Event event = EventBuilder.withBody(msg.getBytes(), headers);
  3. memoryChannel.put(event);
  4. tx.commit();
  5. tx.close();

代码示例来源:origin: apache/flume

  1. @Test
  2. public void testDefaultTopic() {
  3. Sink kafkaSink = new KafkaSink();
  4. Context context = prepareDefaultContext();
  5. Configurables.configure(kafkaSink, context);
  6. Channel memoryChannel = new MemoryChannel();
  7. Configurables.configure(memoryChannel, context);
  8. kafkaSink.setChannel(memoryChannel);
  9. kafkaSink.start();
  10. String msg = "default-topic-test";
  11. Transaction tx = memoryChannel.getTransaction();
  12. tx.begin();
  13. Event event = EventBuilder.withBody(msg.getBytes());
  14. memoryChannel.put(event);
  15. tx.commit();
  16. tx.close();
  17. try {
  18. Sink.Status status = kafkaSink.process();
  19. if (status == Sink.Status.BACKOFF) {
  20. fail("Error Occurred");
  21. }
  22. } catch (EventDeliveryException ex) {
  23. // ignore
  24. }
  25. checkMessageArrived(msg, DEFAULT_TOPIC);
  26. }

代码示例来源:origin: forcedotcom/phoenix

  1. channel.put(event);
  2. transaction.commit();
  3. transaction.close();

代码示例来源:origin: forcedotcom/phoenix

  1. channel.put(event);
  2. transaction.commit();
  3. transaction.close();

代码示例来源:origin: forcedotcom/phoenix

  1. channel.put(event);
  2. transaction.commit();
  3. transaction.close();

代码示例来源:origin: apache/phoenix

  1. channel.put(event);
  2. transaction.commit();
  3. transaction.close();

代码示例来源:origin: apache/phoenix

  1. channel.put(event);
  2. transaction.commit();
  3. transaction.close();

代码示例来源:origin: apache/phoenix

  1. @Test
  2. public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {
  3. final String fullTableName = "FLUME_JSON_TEST";
  4. initSinkContextWithDefaults(fullTableName);
  5. sink = new PhoenixSink();
  6. Configurables.configure(sink, sinkContext);
  7. assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
  8. final Channel channel = this.initChannel();
  9. sink.setChannel(channel);
  10. sink.start();
  11. final String eventBody = "{\"col1\" : \"kalyan\", \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
  12. final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
  13. // put event in channel
  14. Transaction transaction = channel.getTransaction();
  15. transaction.begin();
  16. channel.put(event);
  17. transaction.commit();
  18. transaction.close();
  19. sink.process();
  20. int rowsInDb = countRows(fullTableName);
  21. assertEquals(0, rowsInDb);
  22. sink.stop();
  23. assertEquals(LifecycleState.STOP, sink.getLifecycleState());
  24. dropTable(fullTableName);
  25. }

代码示例来源:origin: apache/phoenix

  1. @Test
  2. public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {
  3. final String fullTableName = "FLUME_CSV_TEST";
  4. initSinkContextWithDefaults(fullTableName);
  5. sink = new PhoenixSink();
  6. Configurables.configure(sink, sinkContext);
  7. assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
  8. final Channel channel = this.initChannel();
  9. sink.setChannel(channel);
  10. sink.start();
  11. final String eventBody = "kalyan,\"abc,pqr,xyz\",\"1,2,3,4\"";
  12. final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
  13. // put event in channel
  14. Transaction transaction = channel.getTransaction();
  15. transaction.begin();
  16. channel.put(event);
  17. transaction.commit();
  18. transaction.close();
  19. sink.process();
  20. int rowsInDb = countRows(fullTableName);
  21. assertEquals(0, rowsInDb);
  22. sink.stop();
  23. assertEquals(LifecycleState.STOP, sink.getLifecycleState());
  24. dropTable(fullTableName);
  25. }

相关文章