org.apache.flume.Channel类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(231)

本文整理了Java中org.apache.flume.Channel类的一些代码示例,展示了Channel类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel类的具体详情如下:
包路径:org.apache.flume.Channel
类名称:Channel

Channel介绍

[英]A channel connects a Source to a Sink. The source acts as producer while the sink acts as a consumer of events. The channel itself is the buffer between the two.

A channel exposes a Transaction interface that can be used by its clients to ensure atomic #put(Event) and #take() semantics. This is necessary to guarantee single hop reliability between agents. For instance, a source will successfully produce an Eventif and only if that event can be committed to the source's associated channel. Similarly, a sink will consume an event if and only if its respective endpoint can accept the event. The extent of transaction support varies for different channel implementations ranging from strong to best-effort semantics.

Channels are associated with unique NamedComponent that can be used for separating configuration and working namespaces.

Channels must be thread safe, protecting any internal invariants as no guarantees are given as to when and by how many sources/sinks they may be simultaneously accessed by.
[中]通道将源连接到接收器。源充当事件的生产者,而接收器充当事件的消费者。通道本身是两者之间的缓冲区。
通道公开一个事务接口,该接口可供其客户端使用,以确保原子的put(事件)和take()语义。这对于保证代理之间的单跳可靠性是必要的。例如,一个源将成功地生成一个Eventif并且仅当该事件可以提交到源的关联通道时。类似地,当且仅当其各自的端点可以接受事件时,接收器才会使用事件。事务支持的范围因不同的通道实现而异,从强语义到尽力而为语义不等。
通道与唯一的NamedComponent关联,该组件可用于分离配置和工作名称空间。
通道必须是线程安全的,可以保护任何内部不变量,因为无法保证何时以及由多少个源/汇同时访问它们。

代码示例

代码示例来源:origin: apache/flume

private Sink.Status prepareAndSend(Context context, String msg)
  throws EventDeliveryException {
 Sink kafkaSink = new KafkaSink();
 Configurables.configure(kafkaSink, context);
 Channel memoryChannel = new MemoryChannel();
 Configurables.configure(memoryChannel, context);
 kafkaSink.setChannel(memoryChannel);
 kafkaSink.start();
 Transaction tx = memoryChannel.getTransaction();
 tx.begin();
 Event event = EventBuilder.withBody(msg.getBytes());
 memoryChannel.put(event);
 tx.commit();
 tx.close();
 return kafkaSink.process();
}

代码示例来源:origin: apache/ignite

Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
  transaction.begin();
    Event event = channel.take();
      sinkCounter.incrementBatchUnderflowCount();
    else
      sinkCounter.incrementBatchCompleteCount();
    sinkCounter.incrementBatchEmptyCount();
  sinkCounter.addToEventDrainAttemptCount(batch.size());
  transaction.commit();
  sinkCounter.addToEventDrainSuccessCount(batch.size());
    transaction.rollback();
  throw new EventDeliveryException(e);

代码示例来源:origin: apache/flume

@Override
 public String toString() {
  return this.getClass().getName() + "{name:" + name + ", channel:" + channel.getName() + "}";
 }
}

代码示例来源:origin: apache/flume

@Override
 public Status process() throws EventDeliveryException {
  Status result = Status.READY;
  Channel channel = getChannel();
  Transaction transaction = channel.getTransaction();
  Event event = null;

  try {
   transaction.begin();
   event = channel.take();

   if (event != null) {
    if (logger.isInfoEnabled()) {
     logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
    }
   } else {
    // No event found, request back-off semantics from the sink runner
    result = Status.BACKOFF;
   }
   transaction.commit();
  } catch (Exception ex) {
   transaction.rollback();
   throw new EventDeliveryException("Failed to log event: " + event, ex);
  } finally {
   transaction.close();
  }

  return result;
 }
}

代码示例来源:origin: apache/flume

Transaction txn = ch.getTransaction();
txn.begin();
 Event event = ch.take();
  sinkCounter.incrementEventDrainAttemptCount();
  LOG.debug("Sending request : " + new String(event.getBody()));
     txn.rollback();
    } else {
     txn.commit();
     sinkCounter.incrementEventDrainSuccessCount();
   sinkCounter.incrementEventWriteFail();

代码示例来源:origin: apache/flume

serializerType, serializerContext, outputStream);
  serializer.afterCreate();
  sinkCounter.incrementConnectionCreatedCount();
  throw new EventDeliveryException("Failed to open file "
    + pathController.getCurrentFile() + " while delivering event", e);
Transaction transaction = channel.getTransaction();
Event event = null;
Status result = Status.READY;
 transaction.begin();
 int eventAttemptCounter = 0;
 for (int i = 0; i < batchSize; i++) {
  event = channel.take();
 transaction.commit();
 transaction.rollback();
 throw new EventDeliveryException("Failed to process transaction", ex);

代码示例来源:origin: apache/flume

Transaction transaction = channel.getTransaction();
transaction.begin();
boolean success = false;
try {
 transaction.commit();
 success = true;
 return Status.BACKOFF;
} catch (Exception e) {
 sinkCounter.incrementEventWriteOrChannelFail(e);
 throw new EventDeliveryException(e);
} finally {
 if (!success) {
  transaction.rollback();
 transaction.close();

代码示例来源:origin: apache/flume

partitionMap.put(i, new ArrayList<Event>());
Transaction tx = memoryChannel.getTransaction();
tx.begin();
 memoryChannel.put(event);
tx.commit();
tx.close();
                        numMsgs);
memoryChannel.stop();
kafkaSink.stop();
deleteTopic(topic);

代码示例来源:origin: apache/flume

Event event = channel.take();
  } catch (EventDeliveryException ex) {
   LOG.warn("Error closing writer there may be temp files that need to"
     + " be manually recovered: " + ex.getLocalizedMessage());
   LOG.debug("Exception follows.", ex);
 throw new EventDeliveryException(th);
 counter.incrementBatchEmptyCount();
 return Status.BACKOFF;
} else if (processedEvents < batchSize) {
 counter.incrementBatchUnderflowCount();
} else {
 counter.incrementBatchCompleteCount();

代码示例来源:origin: apache/flume

for (; txnEventCount < batchSize; ++txnEventCount) {
 Event event = channel.take();
 if (event == null) {
  break;
 sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
 sinkCounter.incrementBatchCompleteCount();
} else {
 sinkCounter.incrementBatchUnderflowCount();
sinkCounter.addToEventDrainAttemptCount(txnEventCount);

代码示例来源:origin: apache/flume

/**
 * Enter the transaction boundary. This will either begin a new transaction
 * if one didn't already exist. If we're already in a transaction boundary,
 * then this method does nothing.
 *
 * @param channel The Sink's channel
 * @throws EventDeliveryException There was an error starting a new batch
 *                                with the failure policy.
 */
private void enterTransaction(Channel channel) throws EventDeliveryException {
 // There's no synchronization around the transaction instance because the
 // Sink API states "the Sink#process() call is guaranteed to only
 // be accessed  by a single thread". Technically other methods could be
 // called concurrently, but the implementation of SinkRunner waits
 // for the Thread running process() to end before calling stop()
 if (transaction == null) {
  this.transaction = channel.getTransaction();
  transaction.begin();
  failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
 }
}

代码示例来源:origin: Stratio/ingestion

private List<Event> takeEventsFromChannel(Channel channel) {
  List<Event> events = new ArrayList<Event>();
  for (int i = 0; i < this.batchsize; i++) {
    this.sinkCounter.incrementEventDrainAttemptCount();
    events.add(channel.take());
  }
  events.removeAll(Collections.singleton(null));
  return events;
}

代码示例来源:origin: Stratio/ingestion

private Event buildEvent(Channel channel) {
  final Event takenEvent = channel.take();
  final ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
  Event event = null;
  if (takenEvent != null) {
    event = EventBuilder.withBody(objectNode.toString().getBytes(Charsets.UTF_8),
        takenEvent.getHeaders());
  }
  return event;
}

代码示例来源:origin: apache/flume

@Override
 public Event call() {
  return channel.take();
 }
});

代码示例来源:origin: apache/flume

while (ch.getLifecycleState() != LifecycleState.START
  && !supervisor.isComponentInErrorState(ch)) {
 try {
  logger.info("Waiting for channel: " + ch.getName() +
    " to start. Sleeping for 500 ms");
  Thread.sleep(500);

代码示例来源:origin: apache/flume

@Override
 public void run() {
  channel.put(event);
 }
});

代码示例来源:origin: apache/flume

Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
 tx.begin();
 reqChannel.put(event);
 tx.commit();
} catch (Throwable t) {
 tx.rollback();
 if (t instanceof Error) {
  LOG.error("Error while writing to required channel: " + reqChannel, t);
Transaction tx = null;
try {
 tx = optChannel.getTransaction();
 tx.begin();
 optChannel.put(event);
 tx.commit();

代码示例来源:origin: org.apache.flume.flume-ng-sinks/flume-hive-sink

Transaction transaction = channel.getTransaction();
transaction.begin();
boolean success = false;
try {
 transaction.commit();
 success = true;
 return Status.BACKOFF;
} catch (Exception e) {
 sinkCounter.incrementEventWriteOrChannelFail(e);
 throw new EventDeliveryException(e);
} finally {
 if (!success) {
  transaction.rollback();
 transaction.close();

代码示例来源:origin: Stratio/ingestion

private List<Event> takeEventsFromChannel(Channel channel, int eventsToTake) {
  List<Event> events = new ArrayList<Event>();
  for (int i = 0; i < eventsToTake; i++) {
    this.sinkCounter.incrementEventDrainAttemptCount();
    events.add(channel.take());
  }
  events.removeAll(Collections.singleton(null));
  return events;
}

代码示例来源:origin: apache/flume

@Override
 public List<Event> call() {
  List<Event> events = new ArrayList<Event>(max);
  while (events.size() < max) {
   Event event = channel.take();
   if (event == null) {
    break;
   }
   events.add(event);
  }
  return events;
 }
});

相关文章