kafka.javaapi.producer.Producer类的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(8.7k)|赞(0)|评价(0)|浏览(318)

本文整理了Java中kafka.javaapi.producer.Producer类的一些代码示例,展示了Producer类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer类的具体详情如下:
包路径:kafka.javaapi.producer.Producer
类名称:Producer

Producer介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-pinot

  1. private void publish(JsonNode message)
  2. throws IOException {
  3. if (!keepIndexing) {
  4. avroDataStream.close();
  5. avroDataStream = null;
  6. return;
  7. }
  8. KeyedMessage<String, byte[]> data =
  9. new KeyedMessage<String, byte[]>("airlineStatsEvents", message.toString().getBytes("UTF-8"));
  10. producer.send(data);
  11. }

代码示例来源:origin: apache/incubator-pinot

  1. public MeetupRsvpStream(File schemaFile)
  2. throws IOException, URISyntaxException {
  3. schema = Schema.fromFile(schemaFile);
  4. Properties properties = new Properties();
  5. properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
  6. properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
  7. properties.put("request.required.acks", "1");
  8. ProducerConfig producerConfig = new ProducerConfig(properties);
  9. producer = new Producer<String, byte[]>(producerConfig);
  10. }

代码示例来源:origin: apache/incubator-pinot

  1. public void shutdown() {
  2. keepIndexing = false;
  3. avroDataStream = null;
  4. producer.close();
  5. producer = null;
  6. service.shutdown();
  7. }

代码示例来源:origin: apache/incubator-pinot

  1. Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(producerConfig);
  2. try {
  3. bytes);
  4. producer.send(data);

代码示例来源:origin: linkedin/camus

  1. private static List<Message> writeKafka(String topic, int numOfMessages) {
  2. List<Message> messages = new ArrayList<Message>();
  3. List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<KeyedMessage<String, String>>();
  4. for (int i = 0; i < numOfMessages; i++) {
  5. Message msg = new Message(RANDOM.nextInt());
  6. messages.add(msg);
  7. kafkaMessages.add(new KeyedMessage<String, String>(topic, Integer.toString(i), gson.toJson(msg)));
  8. }
  9. Properties producerProps = cluster.getProps();
  10. producerProps.setProperty("serializer.class", StringEncoder.class.getName());
  11. producerProps.setProperty("key.serializer.class", StringEncoder.class.getName());
  12. Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(producerProps));
  13. try {
  14. producer.send(kafkaMessages);
  15. } finally {
  16. producer.close();
  17. }
  18. return messages;
  19. }

代码示例来源:origin: linkedin/camus

  1. try {
  2. KeyedMessage keyedMessage = new KeyedMessage("TrackingMonitoringEvent", message);
  3. producer.send(keyedMessage);
  4. break;
  5. } catch (Exception e) {
  6. } finally {
  7. if (producer != null) {
  8. producer.close();

代码示例来源:origin: stackoverflow.com

  1. public static void main(String[] args) throws InterruptedException {
  2. //Creating shared object
  3. BlockingQueue sharedQueue = new LinkedBlockingQueue();
  4. //Creating Producer and Consumer Thread
  5. Producer producer = new Producer(sharedQueue);
  6. Thread prodThread = new Thread(producer);
  7. Thread consThread = new Thread(new Consumer(sharedQueue));
  8. //Starting producer and Consumer thread
  9. prodThread.start();
  10. consThread.start();
  11. producer.pushItem(2000);
  12. }

代码示例来源:origin: apache/incubator-pinot

  1. Producer<byte[], byte[]> producer = new Producer<>(producerConfig);
  2. producer.send(messagesToWrite);
  3. messagesToWrite.clear();
  4. producer.send(messagesToWrite);

代码示例来源:origin: linkedin/camus

  1. private Producer mockProducerSendThrowsException() {
  2. Producer mockProducer = EasyMock.createMock(Producer.class);
  3. mockProducer.send((KeyedMessage) EasyMock.anyObject());
  4. EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).anyTimes();
  5. mockProducer.close();
  6. EasyMock.expectLastCall().anyTimes();
  7. EasyMock.replay(mockProducer);
  8. return mockProducer;
  9. }

代码示例来源:origin: prestodb/presto

  1. @Override
  2. public void addResults(QueryStatusInfo statusInfo, QueryData data)
  3. {
  4. if (types.get() == null && statusInfo.getColumns() != null) {
  5. types.set(getTypes(statusInfo.getColumns()));
  6. }
  7. if (data.getData() != null) {
  8. checkState(types.get() != null, "Data without types received!");
  9. List<Column> columns = statusInfo.getColumns();
  10. for (List<Object> fields : data.getData()) {
  11. ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
  12. for (int i = 0; i < fields.size(); i++) {
  13. Type type = types.get().get(i);
  14. Object value = convertValue(fields.get(i), type);
  15. if (value != null) {
  16. builder.put(columns.get(i).getName(), value);
  17. }
  18. }
  19. producer.send(new KeyedMessage<>(topicName, count.getAndIncrement(), builder.build()));
  20. }
  21. }
  22. }

代码示例来源:origin: apache/incubator-pinot

  1. public AirlineDataStream(Schema pinotSchema, File avroFile)
  2. throws FileNotFoundException, IOException {
  3. this.pinotSchema = pinotSchema;
  4. this.avroFile = avroFile;
  5. createStream();
  6. Properties properties = new Properties();
  7. properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
  8. properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
  9. properties.put("request.required.acks", "1");
  10. ProducerConfig producerConfig = new ProducerConfig(properties);
  11. producer = new Producer<String, byte[]>(producerConfig);
  12. service = Executors.newFixedThreadPool(1);
  13. Quickstart.printStatus(Quickstart.Color.YELLOW,
  14. "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time every 3000 events *****");
  15. }

代码示例来源:origin: apache/incubator-pinot

  1. Producer<byte[], byte[]> producer = new Producer<>(producerConfig);
  2. producer.send(messagesToWrite);
  3. messagesToWrite.clear();
  4. producer.send(messagesToWrite);

代码示例来源:origin: org.apache.kafka/kafka_2.9.2

  1. public void awaitShutdown() {
  2. try {
  3. shutdownComplete.await();
  4. producer.close();
  5. logger.info("Producer thread " + threadName + " shutdown complete");
  6. } catch(InterruptedException ie) {
  7. logger.warn("Interrupt during shutdown of ProducerThread", ie);
  8. }
  9. }
  10. }

代码示例来源:origin: linkedin/camus

  1. private Producer mockProducerThirdSendSucceed() {
  2. Producer mockProducer = EasyMock.createMock(Producer.class);
  3. mockProducer.send((KeyedMessage) EasyMock.anyObject());
  4. EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).times(2);
  5. mockProducer.send((KeyedMessage) EasyMock.anyObject());
  6. EasyMock.expectLastCall().times(1);
  7. mockProducer.close();
  8. EasyMock.expectLastCall().anyTimes();
  9. EasyMock.replay(mockProducer);
  10. return mockProducer;
  11. }

代码示例来源:origin: org.apache.kafka/kafka_2.9.2

  1. public void run() {
  2. try{
  3. while(true) {
  4. KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
  5. if(!data.equals(shutdownMessage)) {
  6. producer.send(data);
  7. if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
  8. }
  9. else
  10. break;
  11. }
  12. logger.info("Producer thread " + threadName + " finished running");
  13. } catch (Throwable t){
  14. logger.fatal("Producer thread failure due to ", t);
  15. } finally {
  16. shutdownComplete.countDown();
  17. }
  18. }

代码示例来源:origin: linkedin/camus

  1. protected Producer createProducer(Properties props) {
  2. return new Producer(new ProducerConfig(props));
  3. }

代码示例来源:origin: stackoverflow.com

  1. public class MainApp {
  2. public static void main(String[] args) throws Exception {
  3. Producer p = new Producer();
  4. for (int i = 0; i < 10000; i++)
  5. p.send(i);
  6. }
  7. }

代码示例来源:origin: HuaweiBigData/StreamCQL

  1. /**
  2. * {@inheritDoc}
  3. */
  4. @Override
  5. public void destroy()
  6. throws StreamingException
  7. {
  8. if (producer != null)
  9. {
  10. producer.close();
  11. }
  12. }

代码示例来源:origin: rakam-io/rakam

  1. @Override
  2. public void store(Event event) {
  3. GenericDatumWriter writer = new SourceFilteredRecordWriter(event.properties().getSchema(), GenericData.get(), sourceFields);
  4. ByteBuf buffer = Unpooled.buffer(100);
  5. BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(
  6. new ByteBufOutputStream(buffer), null);
  7. try {
  8. writer.write(event.properties(), encoder);
  9. } catch (Exception e) {
  10. throw new RuntimeException("Couldn't serialize event", e);
  11. }
  12. try {
  13. producer.send(new KeyedMessage<>(event.project() + "_" + event.collection(), buffer.array()));
  14. } catch (FailedToSendMessageException e) {
  15. throw new RuntimeException("Couldn't send event to Kafka", e);
  16. }
  17. }

代码示例来源:origin: rakam-io/rakam

  1. @Inject
  2. public KafkaEventStore(@Named("event.store.kafka") KafkaConfig config, FieldDependencyBuilder.FieldDependency fieldDependency) {
  3. config = checkNotNull(config, "config is null");
  4. this.sourceFields = Sets.union(fieldDependency.dependentFields.keySet(),
  5. fieldDependency.constantFields.stream().map(SchemaField::getName)
  6. .collect(Collectors.toSet()));
  7. Properties props = new Properties();
  8. props.put("metadata.broker.list", config.getNodes().stream().map(HostAndPort::toString).collect(Collectors.joining(",")));
  9. props.put("serializer.class", config.SERIALIZER);
  10. ProducerConfig producerConfig = new ProducerConfig(props);
  11. this.producer = new Producer(producerConfig);
  12. CuratorFramework client = CuratorFrameworkFactory.newClient(config.getZookeeperNode().toString(),
  13. new ExponentialBackoffRetry(1000, 3));
  14. client.start();
  15. try {
  16. if (client.checkExists().forPath(ZK_OFFSET_PATH) == null)
  17. client.create().forPath(ZK_OFFSET_PATH);
  18. } catch (Exception e) {
  19. LOGGER.error(e, format("Couldn't create event offset path %s", ZK_OFFSET_PATH));
  20. }
  21. new LeaderSelector(client, ZK_OFFSET_PATH, this).start();
  22. }

相关文章