无法将消息发布到消息中心

xe55xuns  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(650)
// Asynchronous response from Message Hub / Kafka.
kafkaProducer.send(record,
new Callback() {
   public void onCompletion(RecordMetadata m, Exception e) {
       if(e != null) {
       e.printStackTrace();
       } else { 
    log.debug("****Message sent, offset: " + m.offset() + 
    " @ partition " + m.partition());
    log.debug(" <<<< " +
    " document_id " + key +
    " @ " + account.getActivityId());
    }
   }
});

当尝试使用上述代码将消息发布到消息中心时,我们总是会遇到以下错误。
2016-06-21 18:38:22-[info]com.ibm.cloudant.streaming.messagehub.client.send(476):>>>>发送文档\u id julia30@my\u database 2016-06-21 18:38:22-[debug]org.apache.kafka.clients.networkclient$defaultmetadataupdater.maybeupdate(623):初始化到节点-1的连接以发送元数据请求2016-06-21 18:38:22-[debug]org.apache.kafka.clients.networkclient.initiateconnect(487):启动到kafka01-prod01.messagehub.services.us-south.bluemix节点-1的连接。net:9093. 2016-06-21 18:38:22-[调试]org.apache.kafka.common.security.authenticator.saslclientauthenticator$1.运行(105):创建saslclient:client=multiuser-adapter@multiuser.messagehub.ibm.com;服务=Kafka;serviceostname=kafka01-prod01.messagehub.services.us-south.bluemix.net;mechs=[gssapi]2016-06-21 18:38:22-[debug]com.ibm.cloudant.streaming.messagehub.accountmanager.(53):进程id:15825 2016-06-21 18:38:22-[info]org.apache.kafka.common.network.saslchannelbuilder.buildchannel(92):由于org.apache.kafka.common.kafkaexception:未能在上配置saslclientauthenticatororg.apache.kafka.common.security.authenticator.saslclientauthenticator.configure(saslclientauthenticator。java:96)在org.apache.kafka.common.network.saslchannelbuilder.buildchannel(saslchannelbuilder。java:89)在org.apache.kafka.common.network.selector.connect(selector。java:162)在org.apache.kafka.clients.networkclient.initiateconnect(网络客户端。java:489)在org.apache.kafka.clients.networkclient.access$400(networkclient。java:47)在org.apache.kafka.clients.networkclient$defaultmetadataupdater.maybeupdate(networkclient。java:624)在org.apache.kafka.clients.networkclient$defaultmetadataupdater.maybeupdate(networkclient。java:543)在org.apache.kafka.clients.networkclient.poll(networkclient。java:254)在org.apache.kafka.clients.producer.internals.sender.run(sender。java:216)在org.apache.kafka.clients.producer.internals.sender.run(sender。java:128)在java.lang.thread.run(线程。java:745)原因:org.apache.kafka.common.kafkaexception:未能在org.apache.kafka.common.security.authenticator.saslclientauthenticator.createsaslclient(saslclientauthenticator)上创建saslclient。java:112)在org.apache.kafka.common.security.authenticator.saslclientauthenticator.configure(saslclientauthenticator。java:94) ... 还有10个原因:javax.security.sasl.saslexception:plain:必须在com.sun.security.sasl.plainclient.(plainclient)中指定授权id和密码。java:58)在com.sun.security.sasl.clientfactoryimpl.createsaslclient(clientfactoryimpl。java:97)在javax.security.sasl.sasl.createsaslclient(sasl。java:384)在com.ibm.messagehub.login.messagehubsalslclientfactory.createsaslclient(messagehubsalslclientfactory)。java:77)在javax.security.sasl.sasl.createsaslclient(sasl。java:384)在org.apache.kafka.common.security.authenticator.saslclientauthenticator$1.run(saslclientauthenticator。java:107)在org.apache.kafka.common.security.authenticator.saslclientauthenticator$1.run(saslclientauthenticator。java:102)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:422)在org.apache.kafka.common.security.authenticator.saslclientauthenticator.createsaslclient(saslclientauthenticator。java:102) ... 11更多2016-06-21 18:38:22-[错误]org.apache.kafka.clients.producer.internals.sender.run(130):kafka producer i/o线程中未捕获的错误:org.apache.kafka.common.kafkaexception:org.apache.kafka.common.kafkaexception:未能在上配置saslclientauthenticatororg.apache.kafka.common.network.saslchannelbuilder.buildchannel(saslchannelbuilder)。java:93)在org.apache.kafka.common.network.selector.connect(selector。java:162)在org.apache.kafka.clients.networkclient.initiateconnect(networkclient。java:489)在org.apache.kafka.clients.networkclient.access$400(networkclient。java:47)在org.apache.kafka.clients.networkclient$defaultmetadataupdater.maybeupdate(networkclient。java:624)在org.apache.kafka.clients.networkclient$defaultmetadataupdater.maybeupdate(networkclient。java:543)在org.apache.kafka.clients.networkclient.poll(networkclient。java:254)在org.apache.kafka.clients.producer.internals.sender.run(sender。java:216)在org.apache.kafka.clients.producer.internals.sender.run(sender。java:128)在java.lang.thread.run(线程。java:745)原因:org.apache.kafka.common.kafkaexception:未能在org.apache.kafka.common.security.authenticator.saslclientauthenticator.configure(saslclientauthenticator)中配置saslclientauthenticator。java:96)在org.apache.kafka.common.network.saslchannelbuilder.buildchannel(saslchannelbuilder)。java:89) ... 9其他原因:org.apache.kafka.common.kafkaexception:未能在org.apache.kafka.common.security.authenticator.saslclientauthenticator.createsaslclient(saslclientauthenticator)上创建saslclient。java:112)在org.apache.kafka.common.security.authenticator.saslclientauthenticator.configure(saslclientauthenticator。java:94) ... 还有10个原因:javax.security.sasl.saslexception:plain:必须在com.sun.security.sasl.plainclient.(plainclient)中指定授权id和密码。java:58)在com.sun.security.sasl.clientfactoryimpl.createsaslclient(clientfactoryimpl。java:97)在javax.security.sasl.sasl.createsaslclient(sasl。java:384)在com.ibm.messagehub.login.messagehubsalslclientfactory.createsaslclient(messagehubsalslclientfactory)。java:77)在javax.security.sasl.sasl.createsaslclient(sasl。java:384)在org.apache.kafka.common.security.authenticator.saslclientauthenticator$1.run(saslclientauthenticator。java:107)在org.apache.kafka.common.security.authenticator.saslclientauthenticator$1.run(saslclientauthenticator。java:102)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:422)在org.apache.kafka.common.security.authenticator.saslclientauthenticator.createsaslclient(saslclientauthenticator。java:102) ... 11更多2016-06-21 18:38:22-[错误]org.apache.kafka.clients.producer.internals.sender.run(130):kafka producer i/o线程中未捕获的错误:java.lang.nullpointerexception at org.apache.kafka.common.network.selector.poll(selector)。java:268)在org.apache.kafka.clients.networkclient.poll(networkclient。java:256)在org.apache.kafka.clients.producer.internals.sender.run(sender。java:216)在org.apache.kafka.clients.producer.internals.sender.run(sender。java:128)在java.lang.thread.run(线程。java:745)
producer的设置如下所示:

compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9093]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = [hidden]
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id = kafka01-prod01.messagehub.services.us-south.bluemix.net%3A9093_8qp87X32V6PK5epv.1
ssl.endpoint.identification.algorithm = HTTPS
ssl.protocol = TLSv1.2
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2]
acks = -1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = SASL_SSL
retries = 1
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = /Users/jiangph/tools/liberty/usr/shared/resources/keystore.jks
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
send.buffer.bytes = 131072
linger.ms = 0

使用上述设置,使用MessageHub rest api创建主题没有问题。尝试发布消息时出现问题。
任何想法都非常感谢。

nhjlsmyf

nhjlsmyf1#

结果发现,这个问题与身份验证完全无关,与密钥/值序列化有关。正在从更改

props.setProperty("key.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
    props.setProperty("value.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer")

props.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

已解决javax.security.sasl.saslexception
奇怪的是,一旦您成功地使用了bytearrayserialization,您甚至可以切换回stringserialization,事情将继续工作。

czfnxgou

czfnxgou2#

messagehub rest客户机api以与javakafka客户机不同的方式进行身份验证。
我看到您的日志中存在身份验证错误: javax.security.sasl.SaslException: PLAIN: authorization ID and password must be specified at com.sun.security.sasl.PlainClient. 要为messagehub sasl身份验证配置java客户端,请参阅以下java示例:
https://github.com/ibm-messaging/message-hub-samples/tree/master/java/message-hub-kafka-ssl
请注意,生产者属性应包括以下内容:
https://github.com/ibm-messaging/message-hub-samples/blob/master/java/message-hub-kafka-ssl/resources/producer.properties
您的jaas.conf文件应该如下所示 KafkaClient { com.ibm.messagehub.login.MessageHubLoginModule required serviceName="kafka" username="your-username" password="your-password"; }; 并且必须在类路径中有messagehub登录jar。
嗯,江户

9wbgstp7

9wbgstp73#

我可以使用stringserializer运行消费者或生产者示例,而且从来没有必要进行切换。序列化程序不适用于sasl身份验证。

相关问题