本文整理了Java中kafka.javaapi.producer.Producer.<init>()
方法的一些代码示例,展示了Producer.<init>()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.<init>()
方法的具体详情如下:
包路径:kafka.javaapi.producer.Producer
类名称:Producer
方法名:<init>
暂无
代码示例来源:origin: apache/incubator-pinot
public MeetupRsvpStream(File schemaFile)
throws IOException, URISyntaxException {
schema = Schema.fromFile(schemaFile);
Properties properties = new Properties();
properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
ProducerConfig producerConfig = new ProducerConfig(properties);
producer = new Producer<String, byte[]>(producerConfig);
}
代码示例来源:origin: apache/incubator-pinot
public AirlineDataStream(Schema pinotSchema, File avroFile)
throws FileNotFoundException, IOException {
this.pinotSchema = pinotSchema;
this.avroFile = avroFile;
createStream();
Properties properties = new Properties();
properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
ProducerConfig producerConfig = new ProducerConfig(properties);
producer = new Producer<String, byte[]>(producerConfig);
service = Executors.newFixedThreadPool(1);
Quickstart.printStatus(Quickstart.Color.YELLOW,
"***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time every 3000 events *****");
}
代码示例来源:origin: apache/incubator-pinot
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(producerConfig);
try {
代码示例来源:origin: apache/incubator-pinot
Producer<byte[], byte[]> producer = new Producer<>(producerConfig);
代码示例来源:origin: apache/incubator-pinot
Producer<byte[], byte[]> producer = new Producer<>(producerConfig);
代码示例来源:origin: linkedin/camus
protected Producer createProducer(Properties props) {
return new Producer(new ProducerConfig(props));
}
代码示例来源:origin: rakam-io/rakam
@Inject
public KafkaEventStore(@Named("event.store.kafka") KafkaConfig config, FieldDependencyBuilder.FieldDependency fieldDependency) {
config = checkNotNull(config, "config is null");
this.sourceFields = Sets.union(fieldDependency.dependentFields.keySet(),
fieldDependency.constantFields.stream().map(SchemaField::getName)
.collect(Collectors.toSet()));
Properties props = new Properties();
props.put("metadata.broker.list", config.getNodes().stream().map(HostAndPort::toString).collect(Collectors.joining(",")));
props.put("serializer.class", config.SERIALIZER);
ProducerConfig producerConfig = new ProducerConfig(props);
this.producer = new Producer(producerConfig);
CuratorFramework client = CuratorFrameworkFactory.newClient(config.getZookeeperNode().toString(),
new ExponentialBackoffRetry(1000, 3));
client.start();
try {
if (client.checkExists().forPath(ZK_OFFSET_PATH) == null)
client.create().forPath(ZK_OFFSET_PATH);
} catch (Exception e) {
LOGGER.error(e, format("Couldn't create event offset path %s", ZK_OFFSET_PATH));
}
new LeaderSelector(client, ZK_OFFSET_PATH, this).start();
}
代码示例来源:origin: linkedin/camus
private static List<Message> writeKafka(String topic, int numOfMessages) {
List<Message> messages = new ArrayList<Message>();
List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<KeyedMessage<String, String>>();
for (int i = 0; i < numOfMessages; i++) {
Message msg = new Message(RANDOM.nextInt());
messages.add(msg);
kafkaMessages.add(new KeyedMessage<String, String>(topic, Integer.toString(i), gson.toJson(msg)));
}
Properties producerProps = cluster.getProps();
producerProps.setProperty("serializer.class", StringEncoder.class.getName());
producerProps.setProperty("key.serializer.class", StringEncoder.class.getName());
Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(producerProps));
try {
producer.send(kafkaMessages);
} finally {
producer.close();
}
return messages;
}
代码示例来源:origin: linkedin/camus
@Override
protected Producer createProducer(Properties props) {
switch (producerType) {
case REGULAR:
return new Producer(new ProducerConfig(props));
case SEND_THROWS_EXCEPTION:
return mockProducerSendThrowsException();
case SEND_SUCCEED_THIRD_TIME:
return mockProducerThirdSendSucceed();
default:
throw new RuntimeException("producer type not found");
}
}
代码示例来源:origin: org.apache.kafka/kafka_2.9.2
kafkaProducerProperties_08.put("client.id", clientId + "-" + i);
ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
Producer producer = new Producer(producerConfig_08);
ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
producerThread.start();
代码示例来源:origin: locationtech/geowave
private synchronized Producer<String, T> getProducerCreateIfNull(
final String typeName,
final GeoWaveAvroFormatPlugin<?, ?> plugin) {
if (!cachedProducers.containsKey(typeName)) {
final ProducerConfig producerConfig = new ProducerConfig(properties);
final Producer<String, T> producer = new Producer<String, T>(producerConfig);
cachedProducers.put(typeName, producer);
}
return cachedProducers.get(typeName);
}
代码示例来源:origin: stackoverflow.com
public class SomeClient {
public void start() {
Queue sharedQueue = new LinkedList();
producer = new Producer( sharedQueue );
consumer = new Consumer( sharedQueue );
producer.start();
consumer.start();
}
}
代码示例来源:origin: wurstmeister/storm-kafka-0.8-plus
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
Properties properties = new Properties();
properties.putAll(configMap);
ProducerConfig config = new ProducerConfig(properties);
producer = new Producer<K, V>(config);
this.topic = (String) stormConf.get(TOPIC);
this.collector = collector;
}
代码示例来源:origin: stackoverflow.com
public class MainApp {
public static void main(String[] args) throws Exception {
Producer p = new Producer();
for (int i = 0; i < 10000; i++)
p.send(i);
}
}
代码示例来源:origin: org.apache.apex/malhar-contrib
public void startServer() throws IOException {
ProducerConfig producerConfig = new ProducerConfig(configProperties);
producer = new Producer<String, T>(producerConfig);
serverSocket = new ServerSocket(port);
}
代码示例来源:origin: stackoverflow.com
class Program {
public static void main() {
BlockingQueue<Double> queue = new ArrayBlockingQueue<>();
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
代码示例来源:origin: Stratio/Decision
protected Producer<String, String> getProducer() {
if (producer == null) {
Properties properties = new Properties();
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("metadata.broker.list", kafkaQuorum);
properties.put("producer.type", "async");
producer = new Producer<String, String>(new ProducerConfig(properties));
}
return producer;
}
代码示例来源:origin: Stratio/Decision
@Bean
public Producer<String, byte[]> avroProducer() {
Properties properties = new Properties();
properties.put("metadata.broker.list", configurationContext.getKafkaHostsQuorum());
properties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
return new Producer<String, byte[]>(new ProducerConfig(properties));
}
代码示例来源:origin: Stratio/Decision
@Bean
public Producer<String, String> producer() {
Properties properties = new Properties();
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("metadata.broker.list", configurationContext.getKafkaHostsQuorum());
properties.put("producer.type", "async");
return new Producer<String, String>(new ProducerConfig(properties));
}
代码示例来源:origin: thilinamb/flume-ng-kafka-sink
@Override
public synchronized void start() {
// instantiate the producer
ProducerConfig config = new ProducerConfig(producerProps);
producer = new Producer<String, String>(config);
super.start();
}
内容来源于网络,如有侵权,请联系作者删除!