我用了一个Kafka简单的消费者。我只想从制作人那里得到消息,然后这些数据在storm中处理,然后保存到cassandra。一切都很顺利,但问题是,每当我增加maxreads值时,kafka消费者就会进入无限循环,数据处理和保存到cassandra永远不会发生。所以我的问题是maxreads变量在这里的意义是什么?我怎样才能让这个消费者喜欢当生产者发送消息时,它获取并将这个元组交给storm bolt,当生产者停止时,消费者进入停止状态,当一段时间后,如果生产者发送消息,它将消耗它,将它传递给storm bolt,然后进入停止状态,这个过程将进一步工作。
这是我的Kafka消费者
package com.sethiyaji.kafka;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
public class ConsumerKafka {
private List<String> m_replicaBrokers;
public ConsumerKafka() {
m_replicaBrokers = new ArrayList<String>();
}
public void run(long maxReads, String topic, int partition,List<String> seedBrokers,int port ) throws Exception{
PartitionMetadata partitionMetaData = findLeader(seedBrokers,port,topic,partition);
if(partitionMetaData == null){
System.out.println("Metadata not found");
return;
}
if(partitionMetaData.leader() == null){
System.out.println("Leader Not Found");
return;
}
String leadBroker = partitionMetaData.leader().host();
//String leadBroker = seedBrokers.get(0);
String clientName = "Client_"+topic+"_"+partition;
SimpleConsumer simpleConsumer = new SimpleConsumer(leadBroker, port, 100000, 64*1024, clientName);
long readOffset = getLastOffset(simpleConsumer,topic,partition,OffsetRequest.EarliestTime(),clientName);
int numErrors = 0;
while(maxReads > 0){
if(simpleConsumer == null){
simpleConsumer = new SimpleConsumer(leadBroker, port, 100000,64*1024,clientName);
}
FetchRequest fetchRequest = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();
//FetchRequest fetchRequest = new FetchRequestBuilder().addFetch(topic, partition, readOffset, 100000).build();
//System.out.println("FETCH_REQUEST_PARTITION:"+fetchRequest.numPartitions());
FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest);
if(fetchResponse.hasError()){
numErrors++;
short code=fetchResponse.errorCode(topic, partition);
if(numErrors > 5) break;
if(code == ErrorMapping.OffsetOutOfRangeCode()){
readOffset = getLastOffset(simpleConsumer,topic,partition,OffsetRequest.LatestTime(),clientName);
continue;
}
simpleConsumer.close();
simpleConsumer=null;
leadBroker = findNewLeader(leadBroker,topic,partition,port);
continue;
}
numErrors=0;
long numRead = 0;
for(MessageAndOffset messageAndOffset: fetchResponse.messageSet(topic, partition)){
long currentOffset = messageAndOffset.offset();
if(currentOffset<readOffset){
System.out.println("Found Old Offset:"+currentOffset+" Expecting: "+readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())+":"+new String(bytes,"UTF-8"));
numRead++;
maxReads--;
}
if(numRead == 0){
try{
Thread.sleep(1000);
}catch(InterruptedException e){
System.out.println("Error:"+e);
}
}
}
//if(simpleConsumer!=null) simpleConsumer.close();
}
public long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime,String clientName){
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition);
Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest offsetRequest = new kafka.javaapi.OffsetRequest(requestInfo,OffsetRequest.CurrentVersion(), clientName);
OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
if(offsetResponse.hasError()){
System.out.println("Error feching data oddset Data the broker reaseon:"+offsetResponse.errorCode(topic,partition));
return 0;
}
long[] offsets=offsetResponse.offsets(topic,partition);
//System.out.println(offsets.length);
return offsets[0];
}
private String findNewLeader(String oldLeader,String topic,int partition,int port)throws Exception{
for(int i=0 ; i< 3;i++){
boolean goToSleep=false;
PartitionMetadata metaData=findLeader(m_replicaBrokers,port,topic,partition);
if(metaData == null){
goToSleep=true;
} else if(metaData.leader()==null){
goToSleep=true;
} else if(oldLeader.equalsIgnoreCase(metaData.leader().host()) && i==0){
goToSleep=true;
} else{
return metaData.leader().host();
}
if(goToSleep){
try{
Thread.sleep(1000);
}catch(InterruptedException e){
System.out.println("Error:"+e);
}
}
}
System.out.println("Unable to find new Leader after broker failure.Exiting");
throw new Exception("Unable to find new Leader after broker failure.Exiting.");
}
private PartitionMetadata findLeader(List<String> seedBrokers,int port,String topic,int partition){
PartitionMetadata returnMetadata=null;
loop:
for(String seed: seedBrokers){
SimpleConsumer consumer=null;
try{
consumer=new SimpleConsumer(seed,port,100000,64*1024,"id7");
List<String> topicsList= Collections.singletonList(topic);
TopicMetadataRequest request = new TopicMetadataRequest(topicsList);
TopicMetadataResponse response = consumer.send(request);
List<TopicMetadata> metaDataList= response.topicsMetadata();
for(TopicMetadata item: metaDataList){
for(PartitionMetadata part:item.partitionsMetadata()){
if(part.partitionId() == partition){
returnMetadata = part;
break loop;
}
}
}
} catch(Exception e){
System.out.println("Error communicating with Broker ["+seed+"] to find Leader for["+topic+", "+partition+"]Reason:"+e);
} finally{
if(consumer!=null) consumer.close();
for(kafka.cluster.Broker replica: returnMetadata.replicas()){
m_replicaBrokers.add(replica.host());
}
}
}
return returnMetadata;
}
}
1条答案
按热度按时间0dxa2lsx1#
我不确定您的简单消费者中的特定问题,但对于您的问题,使用喷口(在本例中为Kafka喷口)和螺栓的适当拓扑将更为相关。