我正在尝试使用jsr223采样器在jmeter中使用kafka消费者来读取kafka消息。我无法理解这个错误
[响应消息:javax.script.scriptexception:javax.script.scriptexception:java.lang.classcastexception:[ljava.lang.string;无法转换为java.util.list]
请帮助我解决这个问题,以便我可以订阅和消费使用Kafka消费者的消息。
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Properties props = new Properties();
String groupID = "REQUEST_RESPONSE_JOB_GROUP";
String clientID = "REQUEST_RESPONSE_JOB_CLIENT";
String BSID = "kafka:9092";
String topic = "PROC_REST_EVENTS";
props.put("bootstrap.servers", BSID);
props.put("group.id", groupID);
props.put("client.id", clientID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy","org.apache.kafka.clients.consumer.RangeAssignor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topic));
//print the topic name
System.out.println("Subscribed to topic " + topic);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
return records;
}
1条答案
按热度按时间z9zf31ra1#
很可能您从kafka主题中得到一个列表,而您的使用者需要一个字符串,您需要修改使用者配置以匹配来自该主题的类型。
尝试下面的groovy代码,它向
test
主题(如果它不存在,您将需要创建它)并在此之后阅读它们。您应该看到如下输出:
一旦完成,您应该能够使用上面的代码作为您自己的Kafka消息消费测试的基础。有关使用jmeter进行kafka负载测试的更多信息,请参阅apachekafka-how to load test with jmeter文章。