java spark dataframe无法插入cassandra udt列,出现以下错误。
如何在dataframe select中构造这个udt列,或者通过其他方式构造?提前感谢您的意见。
java.lang.IllegalArgumentException: Field "order_total" does not exist.
at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:254)
spark版本2.0.2
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.functions.col;
public class testpoc {
public static String host = "$$";
public static String port = "$$";
public static String username = "$$";
public static String password = "$$";
public static String keyspace = "$$";
public static String tablename = "$$";
public static String filePath = "$/Orders_00000.csv";
public static StructType overall_total_udt = new StructType()
.add("order_total", DataTypes.StringType, false);
public static StructType orderType = new StructType()
.add("id", DataTypes.StringType, false)
.add("order_date", DataTypes.TimestampType, true)
.add("totals", overall_total_udt, true);
public static void main (String[] args) throws IOException {
SparkConf sparkConf = new SparkConf();
if (sparkConf.getOption("spark.master").isEmpty()) { sparkConf.setMaster("local[*]"); }
sparkConf.set("spark.cassandra.connection.host", host);
sparkConf.set("spark.cassandra.connection.port", port);
sparkConf.set("spark.cassandra.auth.username",username);
sparkConf.set("spark.cassandra.auth.password", password);
sparkConf.set("spark.cassandra.connection.ssl.enabled", "true");
//Spark Session
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> DF1 = spark
.read().format("csv")
.option("header", "true")
.option("delimiter", "\t")
.load(filePath);
DF1.show(false);
Dataset<Row> DF2 = DF1.select(
col("ORDER_NO").as("id"),
unix_timestamp(col("ORDER_DATE"), "yyyy-MM-dd HH:mm:ss").cast("timestamp").as("order_date"),
struct(col("TOTAL_AMOUNT")).as("totals")
);
DF2 = DF2.map((MapFunction<Row,Row>) row -> row, RowEncoder.apply(orderType));
DF2.show(false);
DF2.write()
.format("org.apache.spark.sql.cassandra")
.option("keyspace", keyspace)
.option("table", tablename)
.mode(SaveMode.Append)
.save();
/*
**Cassandra table structure**
CREATE TYPE myspace.overall_total_udt (order_total text);
CREATE TABLE myspace.jorder (
id text,
order_date timestamp,
totals frozen<myspace.overall_total_udt>,
PRIMARY KEY (id)
) ;
* /
spark.stop();
}
}
java.lang.IllegalArgumentException: Field "order_total" does not exist.
at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:254)
at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:254)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:253)
at com.datastax.spark.connector.types.UserDefinedType$$anon$1$$anonfun$convertPF$1$$anonfun$3.apply(UserDefinedType.scala:53)
at com.datastax.spark.connector.types.UserDefinedType$$anon$1$$anonfun$convertPF$1$$anonfun$3.apply(UserDefinedType.scala:50)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at com.datastax.spark.connector.types.UserDefinedType$$anon$1$$anonfun$convertPF$1.applyOrElse(UserDefinedType.scala:50)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:41)
at com.datastax.spark.connector.types.UserDefinedType$$anon$1.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(UserDefinedType.scala:36)
at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:54)
at com.datastax.spark.connector.types.UserDefinedType$$anon$1.convert(UserDefinedType.scala:36)
at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:26)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:24)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我提到了这个答案,但还是没能实现。https://stackoverflow.com/a/51651345/2636642
1条答案
按热度按时间k3bvogb11#
税务团队给出了这个解决方案。