如何将list< objects>的字节[]解码到spark中的dataset< row>?

hts6caw3  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(285)

我在项目中使用spark-sql-2.3.1v、kafka和java8。我正在尝试将主题接收字节[]转换为kafka消费端的数据集。
以下是细节
我有

class Company{
    String companyName;
    Integer companyId;
}

我定义为

public static final StructType companySchema = new StructType(
              .add("companyName", DataTypes.StringType)
              .add("companyId", DataTypes.IntegerType);

但消息定义为

class Message{
    private List<Company> companyList;
    private String messageId;
}

我试图定义为

StructType messageSchema = new StructType()
            .add("companyList", DataTypes.createArrayType(companySchema , false),false)
            .add("messageId", DataTypes.StringType);

我使用序列化将消息作为byte[]发送到kafka主题。
我在消费者处成功接收到消息byte[]。我正在尝试将其转换为数据集??怎么做?

Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");

  messagesDs.printSchema();

  root
         |-- companyList: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- companyName: string (nullable = true)
         |    |    |-- companyId: integer (nullable = true)
         |-- messageId: string (nullable = true)    

Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));

comapanyListDs.printSchema();

root
 |-- col: struct (nullable = true)
 |    |-- companyName: string (nullable = true)
 |    |-- companyId: integer (nullable = true)

Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));

获取错误:
线程“main”org.apache.spark.sql.analysisexception中出现异常:无法解析' companyName '给定输入列:[col];
如何获取数据集记录,如何获取?

kx1ctssn

kx1ctssn1#

你的结构在爆炸时被命名为“col”。
因为您的bean类没有“col”属性,所以它会失败并出现上述错误。
线程“main”org.apache.spark.sql.analysisexception中出现异常:无法解析给定输入列的“companyname”:[col];
您可以执行以下选择操作,以将相关列作为普通列:如下所示:

Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));

我还没有测试过语法,但一旦从struct中为每一行获得普通列,就必须执行下一步。

相关问题