spark sql:java.lang.runtimeexception:[b不是字符串架构的有效外部类型

v64noz0r  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(411)

我的堆栈跟踪:
java.lang.runtimeexception:[b不是org.apache.spark.sql.catalyst.expressions.generatedclass$generatederatorforcodegenstage1.serializefromobject\u doconsume\u 0$(未知源)处字符串架构的有效外部类型org.apache.spark.sql.catalyst.expressions.generatedclass$generatediteratorforcodegenstage1.mapeelements\u doconsume\u 0$(未知源),位于org.apache.spark.sql.catalyst.expressions.generatedclass$generatediteratorforcodegenstage1.processnext(未知源),位于org.apache.spark.sql.execution.bufferedrowterator.hasnext(Bufferedrowterator)。java:43)在org.apache.spark.sql.execution.whitestagecodegenexec$$anonfun$13$$anon$1.hasnext(whitestagecodegenexec。scala:636)位于org.apache.spark.sql.execution.columnar.cachedrddbuilder$$anonfun$1$$anon$1.hasnext(inmemoryrelation)。scala:125)在org.apache.spark.storage.memory.memorystore.putiterator(memorystore。scala:221)在org.apache.spark.storage.memory.memorystore.putiteratorasvalues(内存存储)。scala:299)在org.apache.spark.storage.blockmanager$$anonfun$doputiterator$1.apply(blockmanager。scala:1165)在org.apache.spark.storage.blockmanager$$anonfun$doputiterator$1.apply(blockmanager。scala:1156)在org.apache.spark.storage.blockmanager.doput(blockmanager。scala:1091)在org.apache.spark.storage.blockmanager.doputiterator(blockmanager。scala:1156)在org.apache.spark.storage.blockmanager.getorelseupdate(blockmanager。scala:882)在org.apache.spark.rdd.rdd.getorcompute(rdd。scala:357)在org.apache.spark.rdd.rdd.iterator(rdd。scala:308)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:52)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:346)在org.apache.spark.rdd.rdd.iterator(rdd。scala:310)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:52)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:346)在org.apache.spark.rdd.rdd.iterator(rdd。scala:310)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:52)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:346)在org.apache.spark.rdd.rdd.iterator(rdd。scala:310)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:52)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:346)在org.apache.spark.rdd.rdd.iterator(rdd。scala:310)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:99)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:55)在org.apache.spark.scheduler.task.run(task。scala:123)在org.apache.spark.executor.executor$taskrunner$$anonfun$10.apply(executor。scala:408)在org.apache.spark.util.utils$.trywithsafefinally(utils。scala:1360)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:414)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)
问题描述:我想使用spark sql数据集写入kafka主题,但遇到有效的外部类型异常。
我知道这是由于值和模式类型之间的不一致。
这是我的Kafka模式:

private static StructType KAFKA_SCHEMA() {
        List<StructField> fs = new ArrayList<>();
        // key|value|topic|partition|offset|timestamp|timestampType
        fs.add(DataTypes.createStructField("key", DataTypes.ByteType, true));// for Binary key schema
        fs.add(DataTypes.createStructField("value", DataTypes.ByteType, true));
        fs.add(DataTypes.createStructField("topic", DataTypes.StringType, true));
        return new StructType(fs.toArray(new StructField[fs.size()]));
    }

我尝试使用数据类型。二进制类型,同样的问题。
请看调试截图:

根据截图,类型和值是一对一的。
为什么程序报告这个错误?

trnvg8h3

trnvg8h31#

struct streaming只支持binarytype和string
架构应为:

fs.add(DataTypes.createStructField("key", DataTypes.BinaryType, true));
        fs.add(DataTypes.createStructField("value", DataTypes.BinaryType, true));
        fs.add(DataTypes.createStructField("topic", DataTypes.StringType, true));

相关问题