org.apache.kafka.clients.producer.Producer.send()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(201)

本文整理了Java中org.apache.kafka.clients.producer.Producer.send()方法的一些代码示例,展示了Producer.send()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.send()方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:send

Producer.send介绍

[英]See KafkaProducer#send(ProducerRecord)
[中]参见卡夫卡制作人#发送(生产记录)

代码示例

代码示例来源:origin: QNJR-GROUP/EasyTransaction

  1. public Future<RecordMetadata> publishKafkaMessage(ProducerRecord<String,byte[]> record){
  2. return kafkaProducer.send(record);
  3. }
  4. }

代码示例来源:origin: alibaba/canal

  1. private void produce(String topicName, int partition, FlatMessage flatMessage) throws ExecutionException,
  2. InterruptedException {
  3. ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
  4. partition,
  5. null,
  6. JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
  7. if (kafkaProperties.getTransaction()) {
  8. producer2.send(record);
  9. } else {
  10. producer2.send(record).get();
  11. }
  12. }

代码示例来源:origin: line/armeria

  1. @Override
  2. public void log(RequestLog log) {
  3. final V value = valueExtractor.apply(log);
  4. if (value == null) {
  5. return;
  6. }
  7. final K key = keyExtractor.apply(log);
  8. final ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, value);
  9. producer.send(producerRecord, (metadata, exception) -> {
  10. if (exception != null) {
  11. logger.warn("Failed to send a record to Kafka: {}", producerRecord, exception);
  12. }
  13. });
  14. }

代码示例来源:origin: jmxtrans/jmxtrans

  1. @Override
  2. protected void internalWrite(Server server, Query query, ImmutableList<Result> results) throws Exception {
  3. for (Result result : results) {
  4. log.debug("Query result: [{}]", result);
  5. String message = resultSerializer.serialize(server, query, result);
  6. for (String topic : this.topics) {
  7. log.debug("Topic: [{}] ; Kafka Message: [{}]", topic, message);
  8. producer.send(new ProducerRecord<String, String>(topic, message));
  9. }
  10. }
  11. }

代码示例来源:origin: line/armeria

  1. @Override
  2. protected void writeLog(RequestLog log, L structuredLog) {
  3. final byte[] key = keySelector.selectKey(log, structuredLog);
  4. final ProducerRecord<byte[], L> producerRecord = new ProducerRecord<>(topic, key, structuredLog);
  5. producer.send(producerRecord, (metadata, exception) -> {
  6. if (exception != null) {
  7. logger.warn("failed to send service log to Kafka {}", producerRecord, exception);
  8. }
  9. });
  10. }

代码示例来源:origin: jmxtrans/jmxtrans

  1. @Override
  2. public void doWrite(Server server, Query query, Iterable<Result> results) throws Exception {
  3. for (Result result : results) {
  4. String message = resultSerializer.serialize(server, query, result);
  5. if (message != null) {
  6. producer.send(new ProducerRecord<String, String>(topic, message));
  7. }
  8. }
  9. }

代码示例来源:origin: apache/incubator-gobblin

  1. @Override
  2. public Future<WriteResponse> write(final D record, final WriteCallback callback) {
  3. return new WriteResponseFuture<>(this.producer.send(new ProducerRecord<String, D>(topic, record), new Callback() {
  4. @Override
  5. public void onCompletion(final RecordMetadata metadata, Exception exception) {
  6. if (exception != null) {
  7. callback.onFailure(exception);
  8. } else {
  9. callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
  10. }
  11. }
  12. }), WRITE_RESPONSE_WRAPPER);
  13. }

代码示例来源:origin: OryxProject/oryx

  1. @Override
  2. public void send(K key, M message) {
  3. getProducer().send(new ProducerRecord<>(topic, key, message));
  4. }

代码示例来源:origin: OryxProject/oryx

  1. @Override
  2. public void send(K key, M message) {
  3. getProducer().send(new ProducerRecord<>(topic, key, message));
  4. }

代码示例来源:origin: apache/incubator-gobblin

  1. @Override
  2. public Future<WriteResponse> write(final D record, final WriteCallback callback) {
  3. return new WriteResponseFuture<>(this.producer.send(new ProducerRecord<String, D>(topic, record), new Callback() {
  4. @Override
  5. public void onCompletion(final RecordMetadata metadata, Exception exception) {
  6. if (exception != null) {
  7. callback.onFailure(exception);
  8. } else {
  9. callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
  10. }
  11. }
  12. }), WRITE_RESPONSE_WRAPPER);
  13. }

代码示例来源:origin: openzipkin/brave

  1. @Benchmark public RecordMetadata send_baseCase() throws Exception {
  2. return producer.send(record).get();
  3. }

代码示例来源:origin: apache/incubator-druid

  1. private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
  2. {
  3. ObjectContainer<String> objectToSend;
  4. try {
  5. while (true) {
  6. objectToSend = recordQueue.take();
  7. producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
  8. }
  9. }
  10. catch (InterruptedException e) {
  11. log.warn(e, "Failed to take record from queue!");
  12. }
  13. }

代码示例来源:origin: apache/kylin

  1. protected void send(String topic, Record record, Callback callback) {
  2. producer.send(new ProducerRecord<>(topic, record.getKey(), record.getValue()), callback);
  3. }

代码示例来源:origin: confluentinc/ksql

  1. @Test
  2. public void shouldSendCommandCorrectly() throws Exception {
  3. // When
  4. commandTopic.send(commandId1, command1);
  5. // Then
  6. verify(commandProducer).send(new ProducerRecord<>(COMMAND_TOPIC_NAME, 0, commandId1, command1));
  7. verify(future).get();
  8. }

代码示例来源:origin: confluentinc/ksql

  1. @Before
  2. @SuppressWarnings("unchecked")
  3. public void setup() {
  4. commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer, commandProducer);
  5. when(commandProducer.send(any(ProducerRecord.class))).thenReturn(future);
  6. }

代码示例来源:origin: line/armeria

  1. @Test
  2. public void withKeyExtractor() {
  3. final RequestLog log = mock(RequestLog.class);
  4. when(log.authority()).thenReturn("kawamuray");
  5. when(log.decodedPath()).thenReturn("kyuto");
  6. final KafkaAccessLogWriter<String, String> service =
  7. new KafkaAccessLogWriter<>(producer, TOPIC_NAME,
  8. RequestLog::decodedPath, RequestLog::authority);
  9. service.log(log);
  10. verify(producer, times(1)).send(captor.capture(), any(Callback.class));
  11. final ProducerRecord<String, String> record = captor.getValue();
  12. assertThat(record.key()).isEqualTo("kyuto");
  13. assertThat(record.value()).isEqualTo("kawamuray");
  14. }

代码示例来源:origin: line/armeria

  1. @Test
  2. public void withoutKeyExtractor() {
  3. final RequestLog log = mock(RequestLog.class);
  4. when(log.authority()).thenReturn("kawamuray");
  5. final KafkaAccessLogWriter<String, String> service =
  6. new KafkaAccessLogWriter<>(producer, TOPIC_NAME, RequestLog::authority);
  7. service.log(log);
  8. verify(producer, times(1)).send(captor.capture(), any(Callback.class));
  9. final ProducerRecord<String, String> record = captor.getValue();
  10. assertThat(record.key()).isNull();
  11. assertThat(record.value()).isEqualTo("kawamuray");
  12. }

代码示例来源:origin: line/armeria

  1. @Test
  2. public void testWithKeySelector() {
  3. final KafkaStructuredLoggingServiceExposed service = new KafkaStructuredLoggingServiceExposed(
  4. producer, (res, log) -> log.name.getBytes(), false);
  5. final SimpleStructuredLog log = new SimpleStructuredLog("kawamuray");
  6. service.writeLog(null, log);
  7. verify(producer, times(1)).send(captor.capture(), any(Callback.class));
  8. final ProducerRecord<byte[], SimpleStructuredLog> record = captor.getValue();
  9. assertThat(record.key()).isNotNull();
  10. assertThat(new String(record.key())).isEqualTo(log.name);
  11. assertThat(record.value()).isEqualTo(log);
  12. }

代码示例来源:origin: line/armeria

  1. @Test
  2. public void testServiceWithoutKeySelector() {
  3. final KafkaStructuredLoggingServiceExposed service =
  4. new KafkaStructuredLoggingServiceExposed(producer, null, false);
  5. final SimpleStructuredLog log = new SimpleStructuredLog("kawamuray");
  6. service.writeLog(null, log);
  7. verify(producer, times(1)).send(captor.capture(), any(Callback.class));
  8. final ProducerRecord<byte[], SimpleStructuredLog> record = captor.getValue();
  9. assertThat(record.key()).isNull();
  10. assertThat(record.value()).isEqualTo(log);
  11. }

代码示例来源:origin: apache/kafka

  1. @Test
  2. public void testSendToInvalidTopic() throws Exception {
  3. Map<String, Object> configs = new HashMap<>();
  4. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  5. configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
  6. Time time = new MockTime();
  7. MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
  8. Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  9. metadata.update(initialUpdateResponse, time.milliseconds());
  10. MockClient client = new MockClient(time, metadata);
  11. Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
  12. metadata, client, null, time);
  13. String invalidTopicName = "topic abc"; // Invalid topic name due to space
  14. ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
  15. List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
  16. topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
  17. invalidTopicName, false, Collections.emptyList()));
  18. MetadataResponse updateResponse = new MetadataResponse(
  19. new ArrayList<>(initialUpdateResponse.brokers()),
  20. initialUpdateResponse.clusterId(),
  21. initialUpdateResponse.controller().id(),
  22. topicMetadata);
  23. client.prepareMetadataUpdate(updateResponse);
  24. Future<RecordMetadata> future = producer.send(record);
  25. assertEquals("Cluster has incorrect invalid topic list.", Collections.singleton(invalidTopicName),
  26. metadata.fetch().invalidTopics());
  27. TestUtils.assertFutureError(future, InvalidTopicException.class);
  28. producer.close(Duration.ofMillis(0));
  29. }

相关文章