试图阅读Kafka的资料。我想从收到的消息中提取时间戳来进行结构化的spark流。Kafka(版本0.10.0.0)spark streaming(版本2.0.1)
lf5gs5x21#
我建议两件事:假设您通过最新的kafka流api(0.10 kafka)创建流e、 g.使用依赖关系: "org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1 然后根据上面的文档创建流:
"org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "broker1:9092,broker2:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[ByteArrayDeserializer], "group.id" -> "spark-streaming-test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)) val sparkConf = new SparkConf() // suppose you have 60 second window val ssc = new StreamingContext(sparkConf, Seconds(60)) ssc.checkpoint("checkpoint") val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, Array[Byte]](topics, kafkaParams))
您的流将是consumerrecord[string,array[byte]]的数据流,您可以获得时间戳和键值,如下所示:
stream.map { record => (record.timestamp(), record.key(), record.value()) }
希望有帮助。
yzckvree2#
spark.read .format("kafka") .option("kafka.bootstrap.servers", "your.server.com:9092") .option("subscribe", "your-topic") .load() .select($"timestamp", $"value")
字段“timestamp”是您要查找的内容。键入-java.sql.timestamp。确保您正在连接到0.10 kafka服务器。早期版本中没有时间戳。此处描述的字段的完整列表-http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-querys查询
2条答案
按热度按时间lf5gs5x21#
我建议两件事:
假设您通过最新的kafka流api(0.10 kafka)创建流
e、 g.使用依赖关系:
"org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1
然后根据上面的文档创建流:您的流将是consumerrecord[string,array[byte]]的数据流,您可以获得时间戳和键值,如下所示:
希望有帮助。
yzckvree2#
字段“timestamp”是您要查找的内容。键入-java.sql.timestamp。确保您正在连接到0.10 kafka服务器。早期版本中没有时间戳。此处描述的字段的完整列表-http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-querys查询