从步骤定义调用时kafka使用者不消费

zed5wv10  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(318)

我已经写了一个Kafka消费程序,这是工作得非常好,消费所有的消息,当我单独运行它。但是当程序作为步骤定义中的方法调用时,没有任何消息被使用。
下面是我的代码块

public class ConsumerKafka {

public static void main(String []args) throws InterruptedException, Exception {

    Properties props = new Properties();
    props.put("bootstrap.servers", "serv1,serv2,serv3");
    props.put("acks", "all");   
    props.put("key.deserializer",StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
    props.put("schema.registry.url", "url");
    props.put("enable.auto.commit","true");
    props.put("auto.offset.reset","latest");

    Consumer<String, Object> consumer = null;

    try {
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic"));

            ConsumerRecords<String, Object> records = consumer.poll(5000);
            for (ConsumerRecord<String, Object> record : records) {
              Map<String, Object> data = new HashMap<>();
              data.put("partition", record.partition());
              data.put("offset", record.offset());
              data.put("value",record.value());
              System.out.println(data);
            }

    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        consumer.close();
    }

}

}

下面是我的 cucumber 步骤定义

@Given("^Consume the new kafka messages published$")
public static void consumer() throws InterruptedException, Exception
{
    ConsumerKafka.main(null);
}

下面是被cucumber调用时的log4j日志:

INFO 
main org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig 
values: 
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [serv1:9092, serv2:9092, serv3:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = 
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.KafkaAvroDeserializer
group.id = test-consumer-group
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest

DEBUG main org.apache.kafka.clients.consumer.KafkaConsumer - Starting the 
Kafka consumer
DEBUG main org.apache.kafka.clients.Metadata - Updated cluster metadata 
version 1 to Cluster(nodes = [serv1:9092 (id: -2 rack: null), serv2:9092 
(id: -1 rack: null), serv3:9092 (id: -3 rack: null)], partitions = [])
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
connections-closed:
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
connections-created:
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
bytes-sent-received:
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
bytes-sent:
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
bytes-received:
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
select-time:
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
io-time:
INFO main org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig 
values: 
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [serv1:9092, serv2:9092, serv3:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.KafkaAvroDeserializer
group.id = test-consumer-group
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest

DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
heartbeat-latency
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
join-latency
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
sync-latency
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
commit-latency
INFO main io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - 
KafkaAvroDeserializerConfig values: 
schema.registry.url = [http://url]
max.schemas.per.subject = 1000
specific.avro.reader = false

DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
bytes-fetched
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
records-fetched
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
fetch-latency
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
records-lag
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
fetch-throttle-time
WARN main org.apache.kafka.clients.consumer.ConsumerConfig - The 
configuration acks = all was supplied but isn't a known config.
INFO main org.apache.kafka.common.utils.AppInfoParser - Kafka version : 
0.10.0.1
INFO main org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
a7a17cdec9eaa6c5
DEBUG main org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer 
created
DEBUG main org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to 
topic(s): test_topic
DEBUG main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Sending coordinator request for group test-consumer-group to broker 
serv2:9092 (id: -3 rack: null)
DEBUG main org.apache.kafka.clients.NetworkClient - Initiating connection to 
node -3 at serv2:9092.
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--3.bytes-sent
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--3.bytes-received
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--3.latency
DEBUG main org.apache.kafka.clients.NetworkClient - Completed connection to node -3
DEBUG main org.apache.kafka.clients.NetworkClient - Sending metadata request {topics=[test_topic]} to node -3
DEBUG main org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(nodes = [serv1:9092 (id: 2 rack: null), serv2:9092 (id: 3 rack: null), serv3:9092 (id: 1 rack: null)], partitions = [Partition(topic = test_topic, partition = 4, leader = 1, replicas = [1,], isr = [1,], Partition(topic = test_topic, partition = 5, leader = 2, replicas = [2,], isr = [2,], Partition(topic = test_topic, partition = 6, leader = 3, replicas = [3,], isr = [3,], Partition(topic = test_topic, partition = 7, leader = 1, replicas = [1,], isr = [1,], Partition(topic = test_topic, partition = 0, leader = 3, replicas = [3,], isr = [3,], Partition(topic = test_topic, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = test_topic, partition = 2, leader = 2, replicas = [2,], isr = [2,], Partition(topic = test_topic, partition = 3, leader = 3, replicas = [3,], isr = [3,]])
DEBUG main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received group coordinator response ClientResponse(receivedTimeMs=1522941657152, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@21362712, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=test-consumer-group}), createdTimeMs=1522941656943, sendTimeMs=1522941657054), responseBody={error_code=0,coordinator={node_id=1,host=serv3,port=9092}})
INFO main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator serv3:9092 (id: 2147483646 rack: null) for group test-consumer-group.
DEBUG main org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147483646 at serv3:9092.
INFO main org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group test-consumer-group
INFO main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group test-consumer-group
DEBUG main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ({group_id=test-consumer-group,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=58 cap=58]}]}) to coordinator serv3:9092 (id: 2147483646 rack: null)
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483646.bytes-sent
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483646.bytes-received
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483646.latency
DEBUG main org.apache.kafka.clients.NetworkClient - Completed connection to node 2147483646
DEBUG main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received successful join group response for group test-consumer-group: {error_code=0,generation_id=182,group_protocol=range,leader_id=consumer-1-b4f31749-b0a7-460c-88b7-f62f41ed0fd2,member_id=consumer-1-b4f31749-b0a7-460c-88b7-f62f41ed0fd2,members=[{member_id=consumer-1-b4f31749-b0a7-460c-88b7-f62f41ed0fd2,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=58 cap=58]}]}
DEBUG main org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Performing assignment for group test-consumer-group using strategy range with subscriptions {consumer-1-b4f31749-b0a7-460c-88b7-f62f41ed0fd2=Subscription(topics=[test_topic])}
DEBUG main org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Finished assignment for group test-consumer-group: {consumer-1-b4f31749-b0a7-460c-88b7-f62f41ed0fd2=Assignment(partitions=[test_topic-0, test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5, test_topic-6, test_topic-7])}
DEBUG main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending leader SyncGroup for group test-consumer-group to coordinator serv3:9092 (id: 2147483646 rack: null): {group_id=test-consumer-group,generation_id=182,member_id=consumer-1-b4f31749-b0a7-460c-88b7-f62f41ed0fd2,group_assignment=[{member_id=consumer-1-b4f31749-b0a7-460c-88b7-f62f41ed0fd2,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=94 cap=94]}]}
INFO main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully joined group test-consumer-group with generation 182
INFO main org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [test_topic-4, test_topic-5, test_topic-6, test_topic-7, test_topic-0, test_topic-1, test_topic-2, test_topic-3] for group test-consumer-group
DEBUG main org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group test-consumer-group fetching committed offsets for partitions: [test_topic-4, test_topic-5, test_topic-6, test_topic-7, test_topic-0, test_topic-1, test_topic-2, test_topic-3]
DEBUG main org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test_topic-4 to the committed offset 33
DEBUG main org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test_topic-5 to the committed offset 32
DEBUG main org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test_topic-6 to the committed offset 43
DEBUG main org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test_topic-7 to the committed offset 31
DEBUG main org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test_topic-0 to the committed offset 103
DEBUG main org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test_topic-1 to the committed offset 29
DEBUG main org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test_topic-2 to the committed offset 35
DEBUG main org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test_topic-3 to the committed offset 29
DEBUG main org.apache.kafka.clients.NetworkClient - Initiating connection to node 3 at serv2:9092.
DEBUG main org.apache.kafka.clients.NetworkClient - Initiating connection to node 1 at serv3:9092.
DEBUG main org.apache.kafka.clients.NetworkClient - Initiating connection to node 2 at serv1:9092.
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.bytes-sent
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.bytes-received
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-3.latency
DEBUG main org.apache.kafka.clients.NetworkClient - Completed connection to node 3
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1.bytes-sent
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1.bytes-received
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1.latency
DEBUG main org.apache.kafka.clients.NetworkClient - Completed connection to node 1
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2.bytes-sent
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2.bytes-received
DEBUG main org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2.latency
DEBUG main org.apache.kafka.clients.NetworkClient - Completed connection to node 2
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-4
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-7
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-1
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-5
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-2
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-4
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-7
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-1
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-5
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-2
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-4
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-7
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-1
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-5
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-2
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-4
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-7
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-1
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-5
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-2
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-4
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-7
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-1
WARN main org.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition test_topic-5

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题