我已经建立了一个实验性的kafka环境,其中有3个代理,一个主题有3个分区,我有一个生产者和一个消费者。我想为特定使用者修改分区的偏移量。我在kafka文档中读到,kafka中的consumercommit/fetchapi可以提交特定的偏移量或获取consumer读取的最新偏移量。以下是api的链接:
https://cwiki.apache.org/confluence/display/kafka/a+guide+to+the+kafka+protocol#aguidetothekafkaprotocol-offsetcommit/fetchapi
我使用了下面页面中的代码来编写代码,以便从特定使用者获取偏移量。但是,fetch api为请求的偏移量返回“-1”的值。下面是示例代码:
https://cwiki.apache.org/confluence/display/kafka/committing+and+fetching+consumer+offsets+in+kafka
我还在第一个链接中读到“如果在该使用者组下没有与主题分区相关联的偏移量,那么代理不会设置错误代码(因为它实际上不是一个错误),而是返回空元数据并将偏移量字段设置为-1。”
但是,我已经生成了一些消息,我的消费者已经消费了这些消息,并输出了每条已读消息的偏移量。
如果有人能帮上忙,我会非常感激的。我想知道我的代码哪一部分是错的。或者api有问题。请不要犹豫,提出任何有用的意见。我的代码与我提供的链接中的代码完全相同。不过,如果你需要看我的代码,请告诉我把它放在这里。
Kafka版本是0.10.2.0
我的Kafka的配置是:
代理1:端口9093
代理2:端口9094
代理3:端口9095
主题:“testpic3”
......................
使用者配置:
props.put("group.id", "test");
props.put("client.id", "MyConsumer");
................
这是我的密码:
public class KafkaOffsetManage {
public static void main(String[] args) {
BlockingChannel channel = new BlockingChannel("localhost", 9095,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
final String MY_GROUP = "test";
final String MY_CLIENTID = "MyConsumer";
int correlationId = 0;
final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0);
final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1);
final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2);
channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
System.out.println("+++++++++++++++++++++++++++");
System.out.println(metadataResponse.errorCode());
if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
Broker offsetManager = metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
System.out.println("Connected to Offset Manager");
System.out.println(offsetManager.host() + ", Port:"+ offsetManager.port());
} else {
// retry (after backoff)
}
// How to fetch offsets
List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
partitions.add(testPartition0);
//partitions.add(testPartition1);
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
MY_GROUP,
partitions,
(short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
correlationId,
MY_CLIENTID);
try {
channel.send(fetchRequest.underlying());
OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
short offsetFetchErrorCode = result.error();
if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
channel.disconnect();
// Go to step 1 and retry the offset fetch
} else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) {
// retry the offset fetch (after backoff)
} else {
long retrievedOffset = result.offset();
String retrievedMetadata = result.metadata();
System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset));
System.out.println(retrievedMetadata);
System.out.println(result.toString());
}
}
catch (Exception e) {
channel.disconnect();
// Go to step 1 and then retry offset fetch after backoff
}
}
}
代码输出如下:
+++++++++++++++++++++++++++
0
Connected to Offset Manager
user-virtual-machine, Port:9093
------------------------
The retrieved offset is:-1
OffsetMetadataAndError[-1,,3]
Process finished with exit code 0
一件奇怪的事是关于Kafka的依赖性。添加此依赖项时,代码无法识别程序中的某些类:
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
“consumermetadatarequest”和“consumermetadataresponse”类无法识别。
所以我添加了这个依赖项:
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
谢谢您,
2条答案
按热度按时间w1e3prcc1#
这种情况的发生是因为补偿过期。Kafka有两个参数控制着这种行为。首先是“\u consumer\u offset”主题的“retention.ms”设置。它应该等于-1以禁用该主题内记录的过期。我假设使用Kafka1.1.x版。使用以下命令检查主题配置:
如果不符合配置设置,请使用以下命令进行更改:
假设设置了保留策略,接下来需要检查主题中是否有任何提交的消息。默认情况下,Kafka不允许阅读内部主题。要更改此行为,请使用使用者设置创建一个文件:
之后,使用以下命令阅读“\u consumer\u offsets”主题:
如果主题中有内容,则输出将如下所示:
这里expirationtime值是有意义的。组协调器将在加载偏移量时只读取未过期的记录,即now()<expirationtime,这些值将返回到客户端的偏移量获取请求。
expirationtime是在客户端提交偏移量时使用以下公式计算的:
offsets.retention.minutes是代理级别的设置,默认情况下等于1440(24小时)。从命令输出中解码committime和expirationtime,我们看到
正好是24小时。
因此,不正确偏移量问题的解决方案是增加“offsets.retention.minutes”设置,记住这会影响代理内存的使用,即当系统中有大量死亡使用者组时,还会定期提交未更改的偏移量以增加过期时间。
5fjcxozz2#
我想你已经加了
作为你的依靠。这就是Kafka本身。消费/制作Kafka0.10.2所需的是:
用于消费(和操作给定消费者的偏移量)使用类
KafkaConsumer
它有详细的javadoc,比kafka中提交和获取消费者补偿更方便。除此之外,如果仍要使用链接的示例中的代码,则可能存在以下问题:
您只添加了一个分区,并且这个分区上可能没有消息(您有3个分区,所以您发送的消息可能在其他两个分区上)。在Kafka中,每个分区是独立的,每个分区的用户组有不同的偏移量。