我用spark结构化流式api制作spark-java客户机代码。这些代码从kafka中提取csv类型的字符串
SparkSession spark = SparkSession.builder().master("local[*]").appName("KafkaMongoStream").getOrCreate();
Dataset<Row> df = spark.read().format("kafka").option("kafka.bootstrap.servers", "localhost:9092"))
.option("subscribe", "topicForMongoDB")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)");
df.show();
返回的结果是成功的。这些代码打印csv类型的字符串。
+--------------------+
| value|
+--------------------+
|realtime_start,re...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
然后我尝试将这些字符串转换为sparksql中的sparkDataframe。首先,下面的代码是javapojo类
public class EntityMongoDB implements Serializable {
private Date date;
private float value;
private String id;
private String title;
private String state;
private String frequency_short;
private String units_short;
private String seasonal_adjustment_short;
private static StructType structType = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("date", DataTypes.DateType, false),
DataTypes.createStructField("value", DataTypes.FloatType, false),
DataTypes.createStructField("id", DataTypes.StringType, false),
DataTypes.createStructField("title", DataTypes.StringType, false),
DataTypes.createStructField("state", DataTypes.StringType, false),
DataTypes.createStructField("frequency_short", DataTypes.StringType, false),
DataTypes.createStructField("units_short", DataTypes.StringType, false),
DataTypes.createStructField("seasonal_adjustment_short", DataTypes.StringType, false)
});
public static StructType getStructType() {
return structType;
}
}
我编写代码将csv类型的字符串转换成Dataframe
Dataset<Row> dfs = df.select(from_json(col("value"), EntityMongoDB.getStructType())
.as("entityMongoDB"))
.selectExpr("entityMongoDB.date", "entityMongoDB.value", "entityMongoDB.id",
"entityMongoDB.title", "entityMongoDB.state", "entityMongoDB.frequency_short",
"entityMongoDB.units_short", "entityMongoDB.seasonal_adjustment_short").toDF();
dfs.show();
dfs.printSchema();
打印的架构是正确的。
|-- date: date (nullable = true)
|-- value: float (nullable = true)
|-- id: string (nullable = true)
|-- title: string (nullable = true)
|-- state: string (nullable = true)
|-- frequency_short: string (nullable = true)
|-- units_short: string (nullable = true)
|-- seasonal_adjustment_short: string (nullable = true)
但是生成的列充满了空值
+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|date|value| id|title|state|frequency_short|units_short|seasonal_adjustment_short|
+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|null| null|null| null| null| null| null| null|
|null| null|null| null| null| null| null| null|
|null| null|null| null| null| null| null| null|
|null| null|null| null| null| null| null| null|
|null| null|null| null| null| null| null| null|
我认为dataframe的模式生成正确,但是提取数据部分有一些问题。任何答复都将感激不尽。致以最诚挚的问候
1条答案
按热度按时间yjghlzjz1#
你手上的弦
value
列不是有效的json,所以from_json
在这里不行。对于spark 3+,可以使用
from_csv
正如@mck在评论中指出的:对于3之前的spark版本,您可以
split
然后,逗号表示的值从结果数组转换为多列:另外,如果值中有列名,可以过滤掉该行。