kafka.consumer.Consumer类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(13.4k)|赞(0)|评价(0)|浏览(331)

本文整理了Java中kafka.consumer.Consumer类的一些代码示例,展示了Consumer类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer类的具体详情如下:
包路径:kafka.consumer.Consumer
类名称:Consumer

Consumer介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-gobblin

public SimpleKafkaConsumer(Properties props, KafkaCheckpoint checkpoint)
{
 Config config = ConfigFactory.parseProperties(props);
 topic = config.getString("topic");
 String zkConnect = config.getString("zookeeper.connect");
 schemaRegistry = KafkaSchemaRegistryFactory.getSchemaRegistry(props);
 deserializer = new LiAvroDeserializer(schemaRegistry);
 /** TODO: Make Confluent schema registry integration configurable
  * HashMap<String, String> avroSerDeConfig = new HashMap<>();
  * avroSerDeConfig.put("schema.registry.url", "http://localhost:8081");
  * deserializer = new io.confluent.kafka.serializers.KafkaAvroDeserializer();
  * deserializer.configure(avroSerDeConfig, false);
  *
  **/
 Properties consumeProps = new Properties();
 consumeProps.put("zookeeper.connect", zkConnect);
 consumeProps.put("group.id", "gobblin-tool-" + System.nanoTime());
 consumeProps.put("zookeeper.session.timeout.ms", "10000");
 consumeProps.put("zookeeper.sync.time.ms", "10000");
 consumeProps.put("auto.commit.interval.ms", "10000");
 consumeProps.put("auto.offset.reset", "smallest");
 consumeProps.put("auto.commit.enable", "false");
 //consumeProps.put("consumer.timeout.ms", "10000");
 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(ImmutableMap.of(topic, 1));
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this.topic);
 stream = streams.get(0);
 iterator = stream.iterator();
}

代码示例来源:origin: com.hurence.logisland/logisland-kafka-0-8-plugin

consumerProperties.put("auto.offset.reset", "smallest");
consumerProperties.put("consumer.timeout.ms", "500");
ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties(new Properties()));
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put(topicName, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> events = javaConsumerConnector.createMessageStreams(topicMap);
List<KafkaStream<byte[], byte[]>> events1 = events.get(topicName);
final KafkaStream<byte[], byte[]> kafkaStreams = events1.get(0);
} finally {
  singleThread.shutdown();
  javaConsumerConnector.shutdown();

代码示例来源:origin: apache/incubator-gobblin

protected ConsumerConnector createConsumerConnector() {
 return Consumer.createJavaConsumerConnector(this.consumerConfig);
}

代码示例来源:origin: elodina/dropwizard-kafka-http

@GET
@Timed
public Response consume(
    @QueryParam("topic") String topic,
    @QueryParam("timeout") Integer timeout
) {
  if (Strings.isNullOrEmpty(topic))
    return Response.status(400)
        .entity(new String[]{"Undefined topic"})
        .build();
  Properties props = (Properties) consumerCfg.clone();
  if (timeout != null) props.put("consumer.timeout.ms", "" + timeout);
  ConsumerConfig config = new ConsumerConfig(props);
  ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
  Map<String, Integer> streamCounts = Collections.singletonMap(topic, 1);
  Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(streamCounts);
  KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);
  List<Message> messages = new ArrayList<>();
  try {
    for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream)
      messages.add(new Message(messageAndMetadata));
  } catch (ConsumerTimeoutException ignore) {
  } finally {
    connector.commitOffsets();
    connector.shutdown();
  }
  return Response.ok(messages).build();
}

代码示例来源:origin: pinterest/secor

@Override
public void init(SecorConfig config) throws UnknownHostException {
  this.mConfig = config;
  mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig());
  if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) {
    throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
  }
  TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty() ? new Blacklist(mConfig.getKafkaTopicBlacklist()) :
      new Whitelist(mConfig.getKafkaTopicFilter());
  LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter);
  List<KafkaStream<byte[], byte[]>> streams =
      mConsumerConnector.createMessageStreamsByFilter(topicFilter);
  KafkaStream<byte[], byte[]> stream = streams.get(0);
  mIterator = stream.iterator();
  mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}

代码示例来源:origin: habren/KafkaExample

ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
      new String(messageAndMetadata.message()));
  System.out.println(message);
  consumerConnector.commitOffsets();

代码示例来源:origin: com.hurence.logisland/logisland-kafka-0-8-plugin

consumerProperties.put("auto.offset.reset", "smallest");
consumerProperties.put("consumer.timeout.ms", "500");
ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties(new Properties()));
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put(topicName, 1);
Map<String, List<KafkaStream<String, String>>> events = javaConsumerConnector.createMessageStreams(topicMap, stringDecoder, stringDecoder);
List<KafkaStream<String, String>> events1 = events.get(topicName);
final KafkaStream<String, String> kafkaStreams = events1.get(0);
} finally {
  singleThread.shutdown();
  javaConsumerConnector.shutdown();

代码示例来源:origin: io.zipkin.zipkin2/zipkin-collector-kafka08

ZookeeperConsumerConnector get() {
 if (connector == null) {
  synchronized (this) {
   if (connector == null) {
    connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(config);
   }
  }
 }
 return connector;
}

代码示例来源:origin: Graylog2/graylog2-server

cc = Consumer.createJavaConsumerConnector(consumerConfig);
final List<KafkaStream<byte[], byte[]>> streams = cc.createMessageStreamsByFilter(filter, numThreads);
final ExecutorService executor = executorService(numThreads);

代码示例来源:origin: apache/incubator-gobblin

KafkaConsumerSuite(String zkConnectString, String topic)
{
 _topic = topic;
 Properties consumeProps = new Properties();
 consumeProps.put("zookeeper.connect", zkConnectString);
 consumeProps.put("group.id", _topic+"-"+System.nanoTime());
 consumeProps.put("zookeeper.session.timeout.ms", "10000");
 consumeProps.put("zookeeper.sync.time.ms", "10000");
 consumeProps.put("auto.commit.interval.ms", "10000");
 consumeProps.put("_consumer.timeout.ms", "10000");
 _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
   _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
 _stream = streams.get(0);
 _iterator = _stream.iterator();
}

代码示例来源:origin: com.hurence.logisland/logisland-kafka-0-8-plugin

consumerProperties.put("auto.offset.reset", "smallest");
consumerProperties.put("consumer.timeout.ms", "500");
ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties(new Properties()));
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put(topicName, 1);
Map<String, List<KafkaStream<String, String>>> events = javaConsumerConnector.createMessageStreams(topicMap, stringDecoder, stringDecoder);
List<KafkaStream<String, String>> events1 = events.get(topicName);
final KafkaStream<String, String> kafkaStreams = events1.get(0);
} finally {
  singleThread.shutdown();
  javaConsumerConnector.shutdown();

代码示例来源:origin: org.apache.gobblin/gobblin-runtime

protected ConsumerConnector createConsumerConnector() {
 return Consumer.createJavaConsumerConnector(this.consumerConfig);
}

代码示例来源:origin: uber/chaperone

private void init() {
 // register kafka offset lag metrics, one Gauge is for per consumer level granularity
 MetricRegistry registry = Metrics.getRegistry();
 try {
  fetchedMsgCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-msgFetchRate");
  failedToIngestCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-failedToIngest");
  kafkaOffsetLagGauge =
    registry.register("kafkaIngesterConsumer." + this.getName() + "-kafkaOffsetLag", new JmxAttributeGauge(
      new ObjectName(maxLagMetricName), "Value"));
 } catch (MalformedObjectNameException | IllegalArgumentException e) {
  logger.error("Register failure for metrics of KafkaIngesterConsumer", e);
 }
 TopicFilter topicFilter = new Whitelist(AuditConfig.AUDIT_TOPIC_NAME);
 logger.info("{}: Topic filter is {}", getName(), AuditConfig.AUDIT_TOPIC_NAME);
 this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
 KafkaStream<byte[], byte[]> stream = consumer.createMessageStreamsByFilter(topicFilter, 1).get(0);
 iterator = stream.iterator();
 logger.info("KafkaIngesterConsumer thread {} is initialized successfully", getName());
 if (AuditConfig.INGESTER_ENABLE_DEDUP) {
  deduplicator =
    new Deduplicator(threadId, AuditConfig.INGESTER_REDIS_HOST, AuditConfig.INGESTER_REDIS_PORT,
      AuditConfig.INGESTER_REDIS_KEY_TTL_SEC, AuditConfig.INGESTER_DUP_HOST_PREFIX,
      AuditConfig.INGESTER_HOSTS_WITH_DUP);
  deduplicator.open();
 } else {
  deduplicator = null;
 }
}

代码示例来源:origin: apache/incubator-gobblin

KafkaConsumerSuite(String zkConnectString, String topic)
{
 _topic = topic;
 Properties consumeProps = new Properties();
 consumeProps.put("zookeeper.connect", zkConnectString);
 consumeProps.put("group.id", _topic+"-"+System.nanoTime());
 consumeProps.put("zookeeper.session.timeout.ms", "10000");
 consumeProps.put("zookeeper.sync.time.ms", "10000");
 consumeProps.put("auto.commit.interval.ms", "10000");
 consumeProps.put("_consumer.timeout.ms", "10000");
 _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
   _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
 _stream = streams.get(0);
 _iterator = _stream.iterator();
}

代码示例来源:origin: uk.gov.hmrc/kafka-unit

consumerProperties.put("auto.offset.reset", "smallest");
consumerProperties.put("consumer.timeout.ms", "500");
ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties(new Properties()));
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put(topicName, 1);
Map<String, List<KafkaStream<String, String>>> events = javaConsumerConnector.createMessageStreams(topicMap, stringDecoder, stringDecoder);
List<KafkaStream<String, String>> events1 = events.get(topicName);
final KafkaStream<String, String> kafkaStreams = events1.get(0);
} finally {
  singleThread.shutdown();
  javaConsumerConnector.shutdown();

代码示例来源:origin: com.linkedin.gobblin/gobblin-runtime

protected ConsumerConnector createConsumerConnector() {
 return Consumer.createJavaConsumerConnector(this.consumerConfig);
}

代码示例来源:origin: apache/streams

@Override
public void startStream() {
 Properties props = new Properties();
 props.setProperty("serializer.encoding", "UTF8");
 ConsumerConfig consumerConfig = new ConsumerConfig(props);
 consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
 Whitelist topics = new Whitelist(config.getTopic());
 VerifiableProperties vprops = new VerifiableProperties(props);
 inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
 for (final KafkaStream stream : inStreams) {
  executor.submit(new KafkaPersistReaderTask(this, stream));
 }
}

代码示例来源:origin: apache/incubator-pinot

kafka.consumer.Consumer.createJavaConsumerConnector(kafkaHighLevelStreamConfig.getKafkaConsumerConfig());
consumer.createMessageStreams(kafkaHighLevelStreamConfig.getTopicMap(1)).
  get(kafkaHighLevelStreamConfig.getKafkaTopicName()).get(0).iterator();

代码示例来源:origin: apache/eagle

try {
  ConsumerConfig ccfg = new ConsumerConfig(props);
  jcc = Consumer.createJavaConsumerConnector(ccfg);
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  topicCountMap.put(TEST_TOPIC_NAME, 1);
  Map<String, List<KafkaStream<byte[], byte[]>>> topicMap = jcc.createMessageStreams(topicCountMap);
  KafkaStream<byte[], byte[]> cstrm = topicMap.get(TEST_TOPIC_NAME).get(0);
  for (MessageAndMetadata<byte[], byte[]> mm : cstrm) {
    jcc.shutdown();

代码示例来源:origin: apache/incubator-edgent

private synchronized ConsumerConnector client() {
  if (consumer == null)
    consumer = Consumer.createJavaConsumerConnector(
                      createConsumerConfig());
  return consumer;
}

相关文章