如何使用带有kerberos的kafka的spark流?

mpbci0fu  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(501)

我在使用kerberized hadoop集群中的spark流应用程序使用kafka的消息时遇到了一些问题。我尝试了以下两种方法:
基于接收器的方法: KafkaUtils.createStream 直接进近(无接收器): KafkaUtils.createDirectStream 基于接收机的方法( KafkaUtils.createStream )抛出两种类型的异常(不同的异常,无论我处于本地模式( --master local[*] )或Yarn模式( --master yarn --deploy-mode client ) :
奇怪的 kafka.common.BrokerEndPointNotAvailableException 在spark本地应用程序中
YarnSpark应用程序中的缩放器超时。我曾经设法使这项工作(连接到zookeeper成功),但没有收到任何消息
在两种模式(本地或Yarn)中,直接方法( KafkaUtils.createDirectStream )返回无法解释的 EOFException (详见下文)。
我的最终目标是在yarn上启动spark流媒体工作,所以我将spark本地工作放在一边。
这是我的测试环境:
cloudera cdh 5.7.0版
Spark1.6.0
Kafka0.10.1.0
我正在处理单节点群集(主机名= quickstart.cloudera )用于测试目的。对于那些有兴趣复制测试的人,我正在开发一个基于 cloudera/quickstart (Git回购)。
下面是我在一个应用程序中使用的示例代码 spark-shell . 当然,这段代码在未启用kerberos时可以工作:消息由 kafka-console-producer 由spark应用程序接收。

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import kafka.serializer.StringDecoder

val ssc = new StreamingContext(sc, Seconds(5))

val topics = Map("test-kafka" -> 1)

def readFromKafkaReceiver(): Unit = {
    val kafkaParams = Map(
        "zookeeper.connect" -> "quickstart.cloudera:2181",
        "group.id" -> "gid1",
        "client.id" -> "cid1",
        "zookeeper.session.timeout.ms" -> "5000",
        "zookeeper.connection.timeout.ms" -> "5000"
    )

    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_2)
    stream.print
}

def readFromKafkaDirectStream(): Unit = {
    val kafkaDirectParams = Map(
        "bootstrap.servers" -> "quickstart.cloudera:9092",
        "group.id" -> "gid1",
        "client.id" -> "cid1"
    )

    val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams, topics.map(_._1).toSet)
    directStream.print
}

readFromKafkaReceiver() // or readFromKafkaDirectStream()

ssc.start

Thread.sleep(20000)

ssc.stop(stopSparkContext = false, stopGracefully = true)

启用kerberos后,此代码不起作用。我遵循以下指南:配置kafka安全性,并创建了两个配置文件: jaas.conf :

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
``` `client.properties` :

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

我可以通过以下方式生成消息:

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-producer
--broker-list quickstart.cloudera:9092
--topic test-kafka
--producer.config client.properties

但我不能从spark流应用程序中使用这些消息。发射 `spark-shell` 在 `yarn-client` 模式,我刚刚创建了一个新的jaas配置( `jaas_with_zk_yarn.conf` ),带有zookeeper部分( `Client` ),并且对keytab的引用仅为文件名(然后传递keytab) `--keytab` 选项):

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

此新文件已传入 `--files` 选项:

spark-shell --master yarn --deploy-mode client
--num-executors 2
--files /home/simpleuser/jaas_with_zk_yarn.conf
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf"
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf"
--keytab /home/simpleuser/simpleuser.keytab
--principal simpleuser

我使用了与前面相同的代码,只是添加了另外两个kafka参数,对应于 `consumer.properties` 文件:

"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka"
``` readFromKafkaReceiver() 启动spark流式处理上下文后引发以下错误(无法连接到zookeeper):

ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
        at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
        at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
        at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
        at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:191)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:139)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:156)
        at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)
        at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
        at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
        at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

有时会建立到zk的连接(未达到超时),但随后不会收到任何消息。 readFromKafkaDirectStream() 调用此方法后立即抛出以下错误:

org.apache.spark.SparkException: java.io.EOFException
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.readFromKafkaDirectStream(<console>:47)

没有更多的解释,只是 EOFException . 我想spark和Kafka经纪人之间存在沟通问题,但没有更多的解释。我也试过了 metadata.broker.list 而不是 bootstrap.servers ,但没有成功。
也许我在jaas配置文件或kafka参数中遗漏了什么?也许是Spark的选择( extraJavaOptions )是无效的吗?我尝试了太多的可能性,我有点迷路了。
我会很高兴,如果有人能帮我解决至少一个这些问题(直接方法或接收器为基础)。谢谢:)

t3psigkw

t3psigkw1#

如cloudera文档中所述,spark 1.6不支持它:
spark streaming在开始使用kafka 0.9 consumer api之前无法从secure kafka使用
https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_spark_ki.html#ki_spark_streaming_consumer_api
spark streaming在1.6中使用了旧的consumer api,不支持安全消费。
您可以使用支持安全Kafka的spark 2.1:https://blog.cloudera.com/blog/2017/05/reading-data-securely-from-apache-kafka-to-apache-spark/

相关问题