我的堆栈跟踪:
java.lang.runtimeexception:[b不是org.apache.spark.sql.catalyst.expressions.generatedclass$generatederatorforcodegenstage1.serializefromobject\u doconsume\u 0$(未知源)处字符串架构的有效外部类型org.apache.spark.sql.catalyst.expressions.generatedclass$generatediteratorforcodegenstage1.mapeelements\u doconsume\u 0$(未知源),位于org.apache.spark.sql.catalyst.expressions.generatedclass$generatediteratorforcodegenstage1.processnext(未知源),位于org.apache.spark.sql.execution.bufferedrowterator.hasnext(Bufferedrowterator)。java:43)在org.apache.spark.sql.execution.whitestagecodegenexec$$anonfun$13$$anon$1.hasnext(whitestagecodegenexec。scala:636)位于org.apache.spark.sql.execution.columnar.cachedrddbuilder$$anonfun$1$$anon$1.hasnext(inmemoryrelation)。scala:125)在org.apache.spark.storage.memory.memorystore.putiterator(memorystore。scala:221)在org.apache.spark.storage.memory.memorystore.putiteratorasvalues(内存存储)。scala:299)在org.apache.spark.storage.blockmanager$$anonfun$doputiterator$1.apply(blockmanager。scala:1165)在org.apache.spark.storage.blockmanager$$anonfun$doputiterator$1.apply(blockmanager。scala:1156)在org.apache.spark.storage.blockmanager.doput(blockmanager。scala:1091)在org.apache.spark.storage.blockmanager.doputiterator(blockmanager。scala:1156)在org.apache.spark.storage.blockmanager.getorelseupdate(blockmanager。scala:882)在org.apache.spark.rdd.rdd.getorcompute(rdd。scala:357)在org.apache.spark.rdd.rdd.iterator(rdd。scala:308)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:52)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:346)在org.apache.spark.rdd.rdd.iterator(rdd。scala:310)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:52)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:346)在org.apache.spark.rdd.rdd.iterator(rdd。scala:310)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:52)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:346)在org.apache.spark.rdd.rdd.iterator(rdd。scala:310)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:52)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:346)在org.apache.spark.rdd.rdd.iterator(rdd。scala:310)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:99)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:55)在org.apache.spark.scheduler.task.run(task。scala:123)在org.apache.spark.executor.executor$taskrunner$$anonfun$10.apply(executor。scala:408)在org.apache.spark.util.utils$.trywithsafefinally(utils。scala:1360)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:414)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)
问题描述:我想使用spark sql数据集写入kafka主题,但遇到有效的外部类型异常。
我知道这是由于值和模式类型之间的不一致。
这是我的Kafka模式:
private static StructType KAFKA_SCHEMA() {
List<StructField> fs = new ArrayList<>();
// key|value|topic|partition|offset|timestamp|timestampType
fs.add(DataTypes.createStructField("key", DataTypes.ByteType, true));// for Binary key schema
fs.add(DataTypes.createStructField("value", DataTypes.ByteType, true));
fs.add(DataTypes.createStructField("topic", DataTypes.StringType, true));
return new StructType(fs.toArray(new StructField[fs.size()]));
}
我尝试使用数据类型。二进制类型,同样的问题。
请看调试截图:
根据截图,类型和值是一对一的。
为什么程序报告这个错误?
1条答案
按热度按时间trnvg8h31#
struct streaming只支持binarytype和string
架构应为: