我正在尝试使用类zkstringserializer,这是我得到的
import kafka.utils.ZKStringSerializer
根据整个互联网,甚至我自己的代码之前,我重新启动电脑,这应该允许我的代码工作。但是,我现在得到了一个非常混乱的编译错误,
object ZKStringSerializer in package utils cannot be accessed in package kafka.utils
这很混乱,因为这个文件不应该在任何包中,而且我没有在任何地方指定包。这是我的密码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.ZkConnection
import java.util.Properties
import org.apache.kafka.clients.admin
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object SpeedTester {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.master("local[4]").appName("SpeedTester").config("spark.driver.memory", "8g").getOrCreate()
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
import spark.implicits._
val zookeeperConnect = "localhost:2181"
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer)
val topicName = "testTopic"
val numPartitions = 8
val replicationFactor = 1
val topicConfig = new Properties
val isSecureKafkaCluster = false
val zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster)
AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig)
// Create producer for topic testTopic and actually push values to the topic
val props = new Properties()
props.put("bootstrap.servers", "localhost:9592")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val TOPIC = "testTopic"
for (i <- 1 to 50) {
val record = new ProducerRecord(TOPIC, "key", s"hello $i")
producer.send(record)
}
val record = new ProducerRecord(TOPIC, "key", "the end" + new java.util.Date)
producer.send(record)
producer.flush()
producer.close()
}
}
1条答案
按热度按时间w8f9ii691#
我知道这已经太迟了,但是对于其他人来说,他们也会寻找同样的问题-
在最新版本的kafka中,kafka.utils被弃用。所以请使用kafka管理客户端api