java.lang.illegalargumentexception:字段“order\u total”不存在

7z5jn7bk  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(436)

java spark dataframe无法插入cassandra udt列,出现以下错误。
如何在dataframe select中构造这个udt列,或者通过其他方式构造?提前感谢您的意见。

java.lang.IllegalArgumentException: Field "order_total" does not exist.
at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:254)

spark版本2.0.2

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.functions.col;

public class testpoc {

  public static String host = "$$";
  public static String port = "$$";
  public static String username = "$$";
  public static String password = "$$";
  public static String keyspace = "$$";
  public static String tablename = "$$";
  public static String filePath = "$/Orders_00000.csv";

  public static StructType overall_total_udt = new StructType()
         .add("order_total", DataTypes.StringType, false);

  public static StructType orderType = new StructType()
         .add("id", DataTypes.StringType, false)
         .add("order_date", DataTypes.TimestampType, true)
         .add("totals",  overall_total_udt, true);

  public static void main (String[] args) throws IOException {

      SparkConf sparkConf = new SparkConf();
      if (sparkConf.getOption("spark.master").isEmpty()) { sparkConf.setMaster("local[*]"); }
      sparkConf.set("spark.cassandra.connection.host", host);
      sparkConf.set("spark.cassandra.connection.port", port);
      sparkConf.set("spark.cassandra.auth.username",username);
      sparkConf.set("spark.cassandra.auth.password", password);
      sparkConf.set("spark.cassandra.connection.ssl.enabled", "true");

      //Spark Session
      SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();

      Dataset<Row> DF1 = spark
              .read().format("csv")
              .option("header", "true")
              .option("delimiter", "\t")
              .load(filePath);

      DF1.show(false);

      Dataset<Row>  DF2 = DF1.select(
                col("ORDER_NO").as("id"),
                unix_timestamp(col("ORDER_DATE"), "yyyy-MM-dd HH:mm:ss").cast("timestamp").as("order_date"),
                struct(col("TOTAL_AMOUNT")).as("totals")
        );

      DF2 = DF2.map((MapFunction<Row,Row>) row -> row, RowEncoder.apply(orderType));
      DF2.show(false);

      DF2.write()
         .format("org.apache.spark.sql.cassandra")
         .option("keyspace", keyspace)
         .option("table", tablename)
         .mode(SaveMode.Append)
         .save();

/*

**Cassandra table structure**

CREATE TYPE myspace.overall_total_udt (order_total text);

CREATE TABLE myspace.jorder (
id text, 
order_date timestamp, 
totals frozen<myspace.overall_total_udt>,
PRIMARY KEY (id)
) ;

* /

      spark.stop();
    }

}
java.lang.IllegalArgumentException: Field "order_total" does not exist.
    at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:254)
    at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:254)
    at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
    at scala.collection.AbstractMap.getOrElse(Map.scala:59)
    at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:253)
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1$$anonfun$convertPF$1$$anonfun$3.apply(UserDefinedType.scala:53)
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1$$anonfun$convertPF$1$$anonfun$3.apply(UserDefinedType.scala:50)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.Range.foreach(Range.scala:160)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1$$anonfun$convertPF$1.applyOrElse(UserDefinedType.scala:50)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:41)
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(UserDefinedType.scala:36)
    at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:54)
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1.convert(UserDefinedType.scala:36)
    at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:26)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:24)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我提到了这个答案,但还是没能实现。https://stackoverflow.com/a/51651345/2636642

k3bvogb1

k3bvogb11#

税务团队给出了这个解决方案。

DF3 = DF2.selectExpr(
                "CAST (id AS STRING) AS id",
                "CAST (order_date AS TIMESTAMP) AS order_date",
                "NAMED_STRUCT('order_total', CAST (totals AS STRING)) as totals"
                );

相关问题