本文整理了Java中kafka.javaapi.producer.Producer
类的一些代码示例,展示了Producer
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer
类的具体详情如下:
包路径:kafka.javaapi.producer.Producer
类名称:Producer
暂无
代码示例来源:origin: apache/incubator-pinot
private void publish(JsonNode message)
throws IOException {
if (!keepIndexing) {
avroDataStream.close();
avroDataStream = null;
return;
}
KeyedMessage<String, byte[]> data =
new KeyedMessage<String, byte[]>("airlineStatsEvents", message.toString().getBytes("UTF-8"));
producer.send(data);
}
代码示例来源:origin: apache/incubator-pinot
public MeetupRsvpStream(File schemaFile)
throws IOException, URISyntaxException {
schema = Schema.fromFile(schemaFile);
Properties properties = new Properties();
properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
ProducerConfig producerConfig = new ProducerConfig(properties);
producer = new Producer<String, byte[]>(producerConfig);
}
代码示例来源:origin: apache/incubator-pinot
public void shutdown() {
keepIndexing = false;
avroDataStream = null;
producer.close();
producer = null;
service.shutdown();
}
代码示例来源:origin: apache/incubator-pinot
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(producerConfig);
try {
bytes);
producer.send(data);
代码示例来源:origin: linkedin/camus
private static List<Message> writeKafka(String topic, int numOfMessages) {
List<Message> messages = new ArrayList<Message>();
List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<KeyedMessage<String, String>>();
for (int i = 0; i < numOfMessages; i++) {
Message msg = new Message(RANDOM.nextInt());
messages.add(msg);
kafkaMessages.add(new KeyedMessage<String, String>(topic, Integer.toString(i), gson.toJson(msg)));
}
Properties producerProps = cluster.getProps();
producerProps.setProperty("serializer.class", StringEncoder.class.getName());
producerProps.setProperty("key.serializer.class", StringEncoder.class.getName());
Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(producerProps));
try {
producer.send(kafkaMessages);
} finally {
producer.close();
}
return messages;
}
代码示例来源:origin: linkedin/camus
try {
KeyedMessage keyedMessage = new KeyedMessage("TrackingMonitoringEvent", message);
producer.send(keyedMessage);
break;
} catch (Exception e) {
} finally {
if (producer != null) {
producer.close();
代码示例来源:origin: stackoverflow.com
public static void main(String[] args) throws InterruptedException {
//Creating shared object
BlockingQueue sharedQueue = new LinkedBlockingQueue();
//Creating Producer and Consumer Thread
Producer producer = new Producer(sharedQueue);
Thread prodThread = new Thread(producer);
Thread consThread = new Thread(new Consumer(sharedQueue));
//Starting producer and Consumer thread
prodThread.start();
consThread.start();
producer.pushItem(2000);
}
代码示例来源:origin: apache/incubator-pinot
Producer<byte[], byte[]> producer = new Producer<>(producerConfig);
producer.send(messagesToWrite);
messagesToWrite.clear();
producer.send(messagesToWrite);
代码示例来源:origin: linkedin/camus
private Producer mockProducerSendThrowsException() {
Producer mockProducer = EasyMock.createMock(Producer.class);
mockProducer.send((KeyedMessage) EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).anyTimes();
mockProducer.close();
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(mockProducer);
return mockProducer;
}
代码示例来源:origin: prestodb/presto
@Override
public void addResults(QueryStatusInfo statusInfo, QueryData data)
{
if (types.get() == null && statusInfo.getColumns() != null) {
types.set(getTypes(statusInfo.getColumns()));
}
if (data.getData() != null) {
checkState(types.get() != null, "Data without types received!");
List<Column> columns = statusInfo.getColumns();
for (List<Object> fields : data.getData()) {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
for (int i = 0; i < fields.size(); i++) {
Type type = types.get().get(i);
Object value = convertValue(fields.get(i), type);
if (value != null) {
builder.put(columns.get(i).getName(), value);
}
}
producer.send(new KeyedMessage<>(topicName, count.getAndIncrement(), builder.build()));
}
}
}
代码示例来源:origin: apache/incubator-pinot
public AirlineDataStream(Schema pinotSchema, File avroFile)
throws FileNotFoundException, IOException {
this.pinotSchema = pinotSchema;
this.avroFile = avroFile;
createStream();
Properties properties = new Properties();
properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
ProducerConfig producerConfig = new ProducerConfig(properties);
producer = new Producer<String, byte[]>(producerConfig);
service = Executors.newFixedThreadPool(1);
Quickstart.printStatus(Quickstart.Color.YELLOW,
"***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time every 3000 events *****");
}
代码示例来源:origin: apache/incubator-pinot
Producer<byte[], byte[]> producer = new Producer<>(producerConfig);
producer.send(messagesToWrite);
messagesToWrite.clear();
producer.send(messagesToWrite);
代码示例来源:origin: org.apache.kafka/kafka_2.9.2
public void awaitShutdown() {
try {
shutdownComplete.await();
producer.close();
logger.info("Producer thread " + threadName + " shutdown complete");
} catch(InterruptedException ie) {
logger.warn("Interrupt during shutdown of ProducerThread", ie);
}
}
}
代码示例来源:origin: linkedin/camus
private Producer mockProducerThirdSendSucceed() {
Producer mockProducer = EasyMock.createMock(Producer.class);
mockProducer.send((KeyedMessage) EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).times(2);
mockProducer.send((KeyedMessage) EasyMock.anyObject());
EasyMock.expectLastCall().times(1);
mockProducer.close();
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(mockProducer);
return mockProducer;
}
代码示例来源:origin: org.apache.kafka/kafka_2.9.2
public void run() {
try{
while(true) {
KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
if(!data.equals(shutdownMessage)) {
producer.send(data);
if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
}
else
break;
}
logger.info("Producer thread " + threadName + " finished running");
} catch (Throwable t){
logger.fatal("Producer thread failure due to ", t);
} finally {
shutdownComplete.countDown();
}
}
代码示例来源:origin: linkedin/camus
protected Producer createProducer(Properties props) {
return new Producer(new ProducerConfig(props));
}
代码示例来源:origin: stackoverflow.com
public class MainApp {
public static void main(String[] args) throws Exception {
Producer p = new Producer();
for (int i = 0; i < 10000; i++)
p.send(i);
}
}
代码示例来源:origin: HuaweiBigData/StreamCQL
/**
* {@inheritDoc}
*/
@Override
public void destroy()
throws StreamingException
{
if (producer != null)
{
producer.close();
}
}
代码示例来源:origin: rakam-io/rakam
@Override
public void store(Event event) {
GenericDatumWriter writer = new SourceFilteredRecordWriter(event.properties().getSchema(), GenericData.get(), sourceFields);
ByteBuf buffer = Unpooled.buffer(100);
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(
new ByteBufOutputStream(buffer), null);
try {
writer.write(event.properties(), encoder);
} catch (Exception e) {
throw new RuntimeException("Couldn't serialize event", e);
}
try {
producer.send(new KeyedMessage<>(event.project() + "_" + event.collection(), buffer.array()));
} catch (FailedToSendMessageException e) {
throw new RuntimeException("Couldn't send event to Kafka", e);
}
}
代码示例来源:origin: rakam-io/rakam
@Inject
public KafkaEventStore(@Named("event.store.kafka") KafkaConfig config, FieldDependencyBuilder.FieldDependency fieldDependency) {
config = checkNotNull(config, "config is null");
this.sourceFields = Sets.union(fieldDependency.dependentFields.keySet(),
fieldDependency.constantFields.stream().map(SchemaField::getName)
.collect(Collectors.toSet()));
Properties props = new Properties();
props.put("metadata.broker.list", config.getNodes().stream().map(HostAndPort::toString).collect(Collectors.joining(",")));
props.put("serializer.class", config.SERIALIZER);
ProducerConfig producerConfig = new ProducerConfig(props);
this.producer = new Producer(producerConfig);
CuratorFramework client = CuratorFrameworkFactory.newClient(config.getZookeeperNode().toString(),
new ExponentialBackoffRetry(1000, 3));
client.start();
try {
if (client.checkExists().forPath(ZK_OFFSET_PATH) == null)
client.create().forPath(ZK_OFFSET_PATH);
} catch (Exception e) {
LOGGER.error(e, format("Couldn't create event offset path %s", ZK_OFFSET_PATH));
}
new LeaderSelector(client, ZK_OFFSET_PATH, this).start();
}
内容来源于网络,如有侵权,请联系作者删除!