org.apache.flume.Channel.put()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(6.7k)|赞(0)|评价(0)|浏览(235)

本文整理了Java中org.apache.flume.Channel.put()方法的一些代码示例,展示了Channel.put()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.put()方法的具体详情如下:
包路径:org.apache.flume.Channel
类名称:Channel
方法名:put

Channel.put介绍

[英]Puts the given event into the channel.

Note: This method must be invoked within an active Transaction boundary. Failure to do so can lead to unpredictable results.
[中]将给定事件放入通道。
注意:此方法必须在活动事务边界内调用。不这样做可能导致不可预测的结果。

代码示例

代码示例来源:origin: apache/flume

@Override
 public void run() {
  channel.put(event);
 }
});

代码示例来源:origin: apache/flume

@Override
 public void run() {
  for (Event event : events) {
   channel.put(event);
  }
 }
});

代码示例来源:origin: apache/flume

reqChannel.put(event);
optChannel.put(event);

代码示例来源:origin: apache/flume

tx.begin();
reqChannel.put(event);
tx.begin();
optChannel.put(event);

代码示例来源:origin: apache/flume

memoryChannel.put(event);
tx.commit();
tx.close();

代码示例来源:origin: apache/flume

memoryChannel.put(event);

代码示例来源:origin: apache/flume

tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();

代码示例来源:origin: apache/flume

@Test
public void testTopicAndKeyFromHeader() {
 Sink kafkaSink = new KafkaSink();
 Context context = prepareDefaultContext();
 Configurables.configure(kafkaSink, context);
 Channel memoryChannel = new MemoryChannel();
 Configurables.configure(memoryChannel, context);
 kafkaSink.setChannel(memoryChannel);
 kafkaSink.start();
 String msg = "test-topic-and-key-from-header";
 Map<String, String> headers = new HashMap<String, String>();
 headers.put("topic", TestConstants.CUSTOM_TOPIC);
 headers.put("key", TestConstants.CUSTOM_KEY);
 Transaction tx = memoryChannel.getTransaction();
 tx.begin();
 Event event = EventBuilder.withBody(msg.getBytes(), headers);
 memoryChannel.put(event);
 tx.commit();
 tx.close();
 try {
  Sink.Status status = kafkaSink.process();
  if (status == Sink.Status.BACKOFF) {
   fail("Error Occurred");
  }
 } catch (EventDeliveryException ex) {
  // ignore
 }
 checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC);
}

代码示例来源:origin: apache/flume

private Sink.Status prepareAndSend(Context context, String msg)
  throws EventDeliveryException {
 Sink kafkaSink = new KafkaSink();
 Configurables.configure(kafkaSink, context);
 Channel memoryChannel = new MemoryChannel();
 Configurables.configure(memoryChannel, context);
 kafkaSink.setChannel(memoryChannel);
 kafkaSink.start();
 Transaction tx = memoryChannel.getTransaction();
 tx.begin();
 Event event = EventBuilder.withBody(msg.getBytes());
 memoryChannel.put(event);
 tx.commit();
 tx.close();
 return kafkaSink.process();
}

代码示例来源:origin: apache/flume

tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();

代码示例来源:origin: apache/flume

@Test
public void testReplaceSubStringOfTopicWithHeaders() {
 String topic = TestConstants.HEADER_1_VALUE + "-topic";
 Sink kafkaSink = new KafkaSink();
 Context context = prepareDefaultContext();
 context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
 Configurables.configure(kafkaSink, context);
 Channel memoryChannel = new MemoryChannel();
 Configurables.configure(memoryChannel, context);
 kafkaSink.setChannel(memoryChannel);
 kafkaSink.start();
 String msg = "test-replace-substring-of-topic-with-headers";
 Map<String, String> headers = new HashMap<>();
 headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
 Transaction tx = memoryChannel.getTransaction();
 tx.begin();
 Event event = EventBuilder.withBody(msg.getBytes(), headers);
 memoryChannel.put(event);
 tx.commit();
 tx.close();
 try {
  Sink.Status status = kafkaSink.process();
  if (status == Sink.Status.BACKOFF) {
   fail("Error Occurred");
  }
 } catch (EventDeliveryException ex) {
  // ignore
 }
 checkMessageArrived(msg, topic);
}

代码示例来源:origin: apache/flume

tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();

代码示例来源:origin: apache/flume

@Test
public void testDefaultTopic() {
 Sink kafkaSink = new KafkaSink();
 Context context = prepareDefaultContext();
 Configurables.configure(kafkaSink, context);
 Channel memoryChannel = new MemoryChannel();
 Configurables.configure(memoryChannel, context);
 kafkaSink.setChannel(memoryChannel);
 kafkaSink.start();
 String msg = "default-topic-test";
 Transaction tx = memoryChannel.getTransaction();
 tx.begin();
 Event event = EventBuilder.withBody(msg.getBytes());
 memoryChannel.put(event);
 tx.commit();
 tx.close();
 try {
  Sink.Status status = kafkaSink.process();
  if (status == Sink.Status.BACKOFF) {
   fail("Error Occurred");
  }
 } catch (EventDeliveryException ex) {
  // ignore
 }
 checkMessageArrived(msg, DEFAULT_TOPIC);
}

代码示例来源:origin: forcedotcom/phoenix

channel.put(event);
transaction.commit();
transaction.close();

代码示例来源:origin: forcedotcom/phoenix

channel.put(event);
transaction.commit();
transaction.close();

代码示例来源:origin: forcedotcom/phoenix

channel.put(event);
transaction.commit();
transaction.close();

代码示例来源:origin: apache/phoenix

channel.put(event);
transaction.commit();
transaction.close();

代码示例来源:origin: apache/phoenix

channel.put(event);
transaction.commit();
transaction.close();

代码示例来源:origin: apache/phoenix

@Test
public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {
  final String fullTableName = "FLUME_JSON_TEST";
  initSinkContextWithDefaults(fullTableName);
  sink = new PhoenixSink();
  Configurables.configure(sink, sinkContext);
  assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
  final Channel channel = this.initChannel();
  sink.setChannel(channel);
  sink.start();
  final String eventBody = "{\"col1\" : \"kalyan\", \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
  final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
  // put event in channel
  Transaction transaction = channel.getTransaction();
  transaction.begin();
  channel.put(event);
  transaction.commit();
  transaction.close();
  sink.process();
  int rowsInDb = countRows(fullTableName);
  assertEquals(0, rowsInDb);
  sink.stop();
  assertEquals(LifecycleState.STOP, sink.getLifecycleState());
  dropTable(fullTableName);
}

代码示例来源:origin: apache/phoenix

@Test
public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {
  final String fullTableName = "FLUME_CSV_TEST";
  initSinkContextWithDefaults(fullTableName);
  sink = new PhoenixSink();
  Configurables.configure(sink, sinkContext);
  assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
  final Channel channel = this.initChannel();
  sink.setChannel(channel);
  sink.start();
  final String eventBody = "kalyan,\"abc,pqr,xyz\",\"1,2,3,4\"";
  final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
  // put event in channel
  Transaction transaction = channel.getTransaction();
  transaction.begin();
  channel.put(event);
  transaction.commit();
  transaction.close();
  sink.process();
  int rowsInDb = countRows(fullTableName);
  assertEquals(0, rowsInDb);
  sink.stop();
  assertEquals(LifecycleState.STOP, sink.getLifecycleState());
  dropTable(fullTableName);
}

相关文章