kafka.javaapi.producer.Producer类的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(8.7k)|赞(0)|评价(0)|浏览(299)

本文整理了Java中kafka.javaapi.producer.Producer类的一些代码示例,展示了Producer类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer类的具体详情如下:
包路径:kafka.javaapi.producer.Producer
类名称:Producer

Producer介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-pinot

private void publish(JsonNode message)
  throws IOException {
 if (!keepIndexing) {
  avroDataStream.close();
  avroDataStream = null;
  return;
 }
 KeyedMessage<String, byte[]> data =
   new KeyedMessage<String, byte[]>("airlineStatsEvents", message.toString().getBytes("UTF-8"));
 producer.send(data);
}

代码示例来源:origin: apache/incubator-pinot

public MeetupRsvpStream(File schemaFile)
  throws IOException, URISyntaxException {
 schema = Schema.fromFile(schemaFile);
 Properties properties = new Properties();
 properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
 properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
 properties.put("request.required.acks", "1");
 ProducerConfig producerConfig = new ProducerConfig(properties);
 producer = new Producer<String, byte[]>(producerConfig);
}

代码示例来源:origin: apache/incubator-pinot

public void shutdown() {
 keepIndexing = false;
 avroDataStream = null;
 producer.close();
 producer = null;
 service.shutdown();
}

代码示例来源:origin: apache/incubator-pinot

Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(producerConfig);
try {
      bytes);
  producer.send(data);

代码示例来源:origin: linkedin/camus

private static List<Message> writeKafka(String topic, int numOfMessages) {
 List<Message> messages = new ArrayList<Message>();
 List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<KeyedMessage<String, String>>();
 for (int i = 0; i < numOfMessages; i++) {
  Message msg = new Message(RANDOM.nextInt());
  messages.add(msg);
  kafkaMessages.add(new KeyedMessage<String, String>(topic, Integer.toString(i), gson.toJson(msg)));
 }
 Properties producerProps = cluster.getProps();
 producerProps.setProperty("serializer.class", StringEncoder.class.getName());
 producerProps.setProperty("key.serializer.class", StringEncoder.class.getName());
 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(producerProps));
 try {
  producer.send(kafkaMessages);
 } finally {
  producer.close();
 }
 return messages;
}

代码示例来源:origin: linkedin/camus

try {
    KeyedMessage keyedMessage = new KeyedMessage("TrackingMonitoringEvent", message);
    producer.send(keyedMessage);
    break;
   } catch (Exception e) {
} finally {
 if (producer != null) {
  producer.close();

代码示例来源:origin: stackoverflow.com

public static void main(String[] args) throws InterruptedException {
   //Creating shared object
   BlockingQueue sharedQueue = new LinkedBlockingQueue();
   //Creating Producer and Consumer Thread
   Producer producer = new Producer(sharedQueue);
   Thread prodThread = new Thread(producer);
   Thread consThread = new Thread(new Consumer(sharedQueue));
   //Starting producer and Consumer thread
   prodThread.start();
   consThread.start();
   producer.pushItem(2000);
 }

代码示例来源:origin: apache/incubator-pinot

Producer<byte[], byte[]> producer = new Producer<>(producerConfig);
     producer.send(messagesToWrite);
     messagesToWrite.clear();
   producer.send(messagesToWrite);

代码示例来源:origin: linkedin/camus

private Producer mockProducerSendThrowsException() {
 Producer mockProducer = EasyMock.createMock(Producer.class);
 mockProducer.send((KeyedMessage) EasyMock.anyObject());
 EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).anyTimes();
 mockProducer.close();
 EasyMock.expectLastCall().anyTimes();
 EasyMock.replay(mockProducer);
 return mockProducer;
}

代码示例来源:origin: prestodb/presto

@Override
public void addResults(QueryStatusInfo statusInfo, QueryData data)
{
  if (types.get() == null && statusInfo.getColumns() != null) {
    types.set(getTypes(statusInfo.getColumns()));
  }
  if (data.getData() != null) {
    checkState(types.get() != null, "Data without types received!");
    List<Column> columns = statusInfo.getColumns();
    for (List<Object> fields : data.getData()) {
      ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
      for (int i = 0; i < fields.size(); i++) {
        Type type = types.get().get(i);
        Object value = convertValue(fields.get(i), type);
        if (value != null) {
          builder.put(columns.get(i).getName(), value);
        }
      }
      producer.send(new KeyedMessage<>(topicName, count.getAndIncrement(), builder.build()));
    }
  }
}

代码示例来源:origin: apache/incubator-pinot

public AirlineDataStream(Schema pinotSchema, File avroFile)
  throws FileNotFoundException, IOException {
 this.pinotSchema = pinotSchema;
 this.avroFile = avroFile;
 createStream();
 Properties properties = new Properties();
 properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
 properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
 properties.put("request.required.acks", "1");
 ProducerConfig producerConfig = new ProducerConfig(properties);
 producer = new Producer<String, byte[]>(producerConfig);
 service = Executors.newFixedThreadPool(1);
 Quickstart.printStatus(Quickstart.Color.YELLOW,
   "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time every 3000 events *****");
}

代码示例来源:origin: apache/incubator-pinot

Producer<byte[], byte[]> producer = new Producer<>(producerConfig);
    producer.send(messagesToWrite);
    messagesToWrite.clear();
  producer.send(messagesToWrite);

代码示例来源:origin: org.apache.kafka/kafka_2.9.2

public void awaitShutdown() {
  try {
   shutdownComplete.await();
   producer.close();
   logger.info("Producer thread " + threadName + " shutdown complete");
  } catch(InterruptedException ie) {
   logger.warn("Interrupt during shutdown of ProducerThread", ie);
  }
 }
}

代码示例来源:origin: linkedin/camus

private Producer mockProducerThirdSendSucceed() {
 Producer mockProducer = EasyMock.createMock(Producer.class);
 mockProducer.send((KeyedMessage) EasyMock.anyObject());
 EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).times(2);
 mockProducer.send((KeyedMessage) EasyMock.anyObject());
 EasyMock.expectLastCall().times(1);
 mockProducer.close();
 EasyMock.expectLastCall().anyTimes();
 EasyMock.replay(mockProducer);
 return mockProducer;
}

代码示例来源:origin: org.apache.kafka/kafka_2.9.2

public void run() {
 try{
  while(true) {
   KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
   if(!data.equals(shutdownMessage)) {
    producer.send(data);
    if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
   }
   else
    break;
  }
  logger.info("Producer thread " + threadName + " finished running");
 } catch (Throwable t){
  logger.fatal("Producer thread failure due to ", t);
 } finally {
  shutdownComplete.countDown();
 }
}

代码示例来源:origin: linkedin/camus

protected Producer createProducer(Properties props) {
 return new Producer(new ProducerConfig(props));
}

代码示例来源:origin: stackoverflow.com

public class MainApp {
  public static void main(String[] args) throws Exception {
    Producer p = new Producer();
    for (int i = 0; i < 10000; i++)
      p.send(i);
  }
}

代码示例来源:origin: HuaweiBigData/StreamCQL

/**
 * {@inheritDoc}
 */
@Override
public void destroy()
  throws StreamingException
{
  if (producer != null)
  {
    producer.close();
  }
}

代码示例来源:origin: rakam-io/rakam

@Override
public void store(Event event) {
  GenericDatumWriter writer = new SourceFilteredRecordWriter(event.properties().getSchema(), GenericData.get(), sourceFields);
  ByteBuf buffer = Unpooled.buffer(100);
  BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(
      new ByteBufOutputStream(buffer), null);
  try {
    writer.write(event.properties(), encoder);
  } catch (Exception e) {
    throw new RuntimeException("Couldn't serialize event", e);
  }
  try {
    producer.send(new KeyedMessage<>(event.project() + "_" + event.collection(), buffer.array()));
  } catch (FailedToSendMessageException e) {
    throw new RuntimeException("Couldn't send event to Kafka", e);
  }
}

代码示例来源:origin: rakam-io/rakam

@Inject
public KafkaEventStore(@Named("event.store.kafka") KafkaConfig config, FieldDependencyBuilder.FieldDependency fieldDependency) {
  config = checkNotNull(config, "config is null");
  this.sourceFields = Sets.union(fieldDependency.dependentFields.keySet(),
      fieldDependency.constantFields.stream().map(SchemaField::getName)
          .collect(Collectors.toSet()));
  Properties props = new Properties();
  props.put("metadata.broker.list", config.getNodes().stream().map(HostAndPort::toString).collect(Collectors.joining(",")));
  props.put("serializer.class", config.SERIALIZER);
  ProducerConfig producerConfig = new ProducerConfig(props);
  this.producer = new Producer(producerConfig);
  CuratorFramework client = CuratorFrameworkFactory.newClient(config.getZookeeperNode().toString(),
      new ExponentialBackoffRetry(1000, 3));
  client.start();
  try {
    if (client.checkExists().forPath(ZK_OFFSET_PATH) == null)
      client.create().forPath(ZK_OFFSET_PATH);
  } catch (Exception e) {
    LOGGER.error(e, format("Couldn't create event offset path %s", ZK_OFFSET_PATH));
  }
  new LeaderSelector(client, ZK_OFFSET_PATH, this).start();
}

相关文章