kafka.javaapi.producer.Producer.<init>()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(218)

本文整理了Java中kafka.javaapi.producer.Producer.<init>()方法的一些代码示例,展示了Producer.<init>()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.<init>()方法的具体详情如下:
包路径:kafka.javaapi.producer.Producer
类名称:Producer
方法名:<init>

Producer.<init>介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-pinot

  1. public MeetupRsvpStream(File schemaFile)
  2. throws IOException, URISyntaxException {
  3. schema = Schema.fromFile(schemaFile);
  4. Properties properties = new Properties();
  5. properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
  6. properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
  7. properties.put("request.required.acks", "1");
  8. ProducerConfig producerConfig = new ProducerConfig(properties);
  9. producer = new Producer<String, byte[]>(producerConfig);
  10. }

代码示例来源:origin: apache/incubator-pinot

  1. public AirlineDataStream(Schema pinotSchema, File avroFile)
  2. throws FileNotFoundException, IOException {
  3. this.pinotSchema = pinotSchema;
  4. this.avroFile = avroFile;
  5. createStream();
  6. Properties properties = new Properties();
  7. properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
  8. properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
  9. properties.put("request.required.acks", "1");
  10. ProducerConfig producerConfig = new ProducerConfig(properties);
  11. producer = new Producer<String, byte[]>(producerConfig);
  12. service = Executors.newFixedThreadPool(1);
  13. Quickstart.printStatus(Quickstart.Color.YELLOW,
  14. "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time every 3000 events *****");
  15. }

代码示例来源:origin: apache/incubator-pinot

  1. Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(producerConfig);
  2. try {

代码示例来源:origin: apache/incubator-pinot

  1. Producer<byte[], byte[]> producer = new Producer<>(producerConfig);

代码示例来源:origin: apache/incubator-pinot

  1. Producer<byte[], byte[]> producer = new Producer<>(producerConfig);

代码示例来源:origin: linkedin/camus

  1. protected Producer createProducer(Properties props) {
  2. return new Producer(new ProducerConfig(props));
  3. }

代码示例来源:origin: rakam-io/rakam

  1. @Inject
  2. public KafkaEventStore(@Named("event.store.kafka") KafkaConfig config, FieldDependencyBuilder.FieldDependency fieldDependency) {
  3. config = checkNotNull(config, "config is null");
  4. this.sourceFields = Sets.union(fieldDependency.dependentFields.keySet(),
  5. fieldDependency.constantFields.stream().map(SchemaField::getName)
  6. .collect(Collectors.toSet()));
  7. Properties props = new Properties();
  8. props.put("metadata.broker.list", config.getNodes().stream().map(HostAndPort::toString).collect(Collectors.joining(",")));
  9. props.put("serializer.class", config.SERIALIZER);
  10. ProducerConfig producerConfig = new ProducerConfig(props);
  11. this.producer = new Producer(producerConfig);
  12. CuratorFramework client = CuratorFrameworkFactory.newClient(config.getZookeeperNode().toString(),
  13. new ExponentialBackoffRetry(1000, 3));
  14. client.start();
  15. try {
  16. if (client.checkExists().forPath(ZK_OFFSET_PATH) == null)
  17. client.create().forPath(ZK_OFFSET_PATH);
  18. } catch (Exception e) {
  19. LOGGER.error(e, format("Couldn't create event offset path %s", ZK_OFFSET_PATH));
  20. }
  21. new LeaderSelector(client, ZK_OFFSET_PATH, this).start();
  22. }

代码示例来源:origin: linkedin/camus

  1. private static List<Message> writeKafka(String topic, int numOfMessages) {
  2. List<Message> messages = new ArrayList<Message>();
  3. List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<KeyedMessage<String, String>>();
  4. for (int i = 0; i < numOfMessages; i++) {
  5. Message msg = new Message(RANDOM.nextInt());
  6. messages.add(msg);
  7. kafkaMessages.add(new KeyedMessage<String, String>(topic, Integer.toString(i), gson.toJson(msg)));
  8. }
  9. Properties producerProps = cluster.getProps();
  10. producerProps.setProperty("serializer.class", StringEncoder.class.getName());
  11. producerProps.setProperty("key.serializer.class", StringEncoder.class.getName());
  12. Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(producerProps));
  13. try {
  14. producer.send(kafkaMessages);
  15. } finally {
  16. producer.close();
  17. }
  18. return messages;
  19. }

代码示例来源:origin: linkedin/camus

  1. @Override
  2. protected Producer createProducer(Properties props) {
  3. switch (producerType) {
  4. case REGULAR:
  5. return new Producer(new ProducerConfig(props));
  6. case SEND_THROWS_EXCEPTION:
  7. return mockProducerSendThrowsException();
  8. case SEND_SUCCEED_THIRD_TIME:
  9. return mockProducerThirdSendSucceed();
  10. default:
  11. throw new RuntimeException("producer type not found");
  12. }
  13. }

代码示例来源:origin: org.apache.kafka/kafka_2.9.2

  1. kafkaProducerProperties_08.put("client.id", clientId + "-" + i);
  2. ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
  3. Producer producer = new Producer(producerConfig_08);
  4. ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
  5. producerThread.start();

代码示例来源:origin: locationtech/geowave

  1. private synchronized Producer<String, T> getProducerCreateIfNull(
  2. final String typeName,
  3. final GeoWaveAvroFormatPlugin<?, ?> plugin) {
  4. if (!cachedProducers.containsKey(typeName)) {
  5. final ProducerConfig producerConfig = new ProducerConfig(properties);
  6. final Producer<String, T> producer = new Producer<String, T>(producerConfig);
  7. cachedProducers.put(typeName, producer);
  8. }
  9. return cachedProducers.get(typeName);
  10. }

代码示例来源:origin: stackoverflow.com

  1. public class SomeClient {
  2. public void start() {
  3. Queue sharedQueue = new LinkedList();
  4. producer = new Producer( sharedQueue );
  5. consumer = new Consumer( sharedQueue );
  6. producer.start();
  7. consumer.start();
  8. }
  9. }

代码示例来源:origin: wurstmeister/storm-kafka-0.8-plus

  1. @Override
  2. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  3. Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
  4. Properties properties = new Properties();
  5. properties.putAll(configMap);
  6. ProducerConfig config = new ProducerConfig(properties);
  7. producer = new Producer<K, V>(config);
  8. this.topic = (String) stormConf.get(TOPIC);
  9. this.collector = collector;
  10. }

代码示例来源:origin: stackoverflow.com

  1. public class MainApp {
  2. public static void main(String[] args) throws Exception {
  3. Producer p = new Producer();
  4. for (int i = 0; i < 10000; i++)
  5. p.send(i);
  6. }
  7. }

代码示例来源:origin: org.apache.apex/malhar-contrib

  1. public void startServer() throws IOException {
  2. ProducerConfig producerConfig = new ProducerConfig(configProperties);
  3. producer = new Producer<String, T>(producerConfig);
  4. serverSocket = new ServerSocket(port);
  5. }

代码示例来源:origin: stackoverflow.com

  1. class Program {
  2. public static void main() {
  3. BlockingQueue<Double> queue = new ArrayBlockingQueue<>();
  4. Producer producer = new Producer(queue);
  5. Consumer consumer = new Consumer(queue);
  6. new Thread(producer).start();
  7. new Thread(consumer).start();
  8. }
  9. }

代码示例来源:origin: Stratio/Decision

  1. protected Producer<String, String> getProducer() {
  2. if (producer == null) {
  3. Properties properties = new Properties();
  4. properties.put("serializer.class", "kafka.serializer.StringEncoder");
  5. properties.put("metadata.broker.list", kafkaQuorum);
  6. properties.put("producer.type", "async");
  7. producer = new Producer<String, String>(new ProducerConfig(properties));
  8. }
  9. return producer;
  10. }

代码示例来源:origin: Stratio/Decision

  1. @Bean
  2. public Producer<String, byte[]> avroProducer() {
  3. Properties properties = new Properties();
  4. properties.put("metadata.broker.list", configurationContext.getKafkaHostsQuorum());
  5. properties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  6. "org.apache.kafka.common.serialization.ByteArraySerializer");
  7. properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  8. "org.apache.kafka.common.serialization.StringSerializer");
  9. return new Producer<String, byte[]>(new ProducerConfig(properties));
  10. }

代码示例来源:origin: Stratio/Decision

  1. @Bean
  2. public Producer<String, String> producer() {
  3. Properties properties = new Properties();
  4. properties.put("serializer.class", "kafka.serializer.StringEncoder");
  5. properties.put("metadata.broker.list", configurationContext.getKafkaHostsQuorum());
  6. properties.put("producer.type", "async");
  7. return new Producer<String, String>(new ProducerConfig(properties));
  8. }

代码示例来源:origin: thilinamb/flume-ng-kafka-sink

  1. @Override
  2. public synchronized void start() {
  3. // instantiate the producer
  4. ProducerConfig config = new ProducerConfig(producerProps);
  5. producer = new Producer<String, String>(config);
  6. super.start();
  7. }

相关文章