HDFS 尝试使用Spark和Hadoop API将表移动到另一个路径时出现SaveAsTable错误

mqxuamgl  于 2022-12-09  发布在  HDFS
关注(0)|答案(1)|浏览(171)

因此,我试图将一个表从一个路径移动到另一个路径,我得到一个错误,说我得到了未关闭的字符类错误,不能弄清楚这一点。
这是我的代码:

import os 
import subprocess
        
from pyspark.sql.functions import input_file_name, regexp_replace from pyspark.sql import SparkSession
        
URI           = sc._gateway.jvm.java.net.URI 
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path 
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem 
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
        
fs = FileSystem.get(URI("some_url/some_ip"), Configuration())
        
desired_table = "some_table" 
source_db = "some_source" 
target_db = "some_target"
        
        file_list = []
        
        fixed_location = some_source+ "/" + some_table+ "/current"
        
        target_path = fs.listStatus(Path('XXX')); source_folder = fs.listStatus(Path('XXX' + fixed_location))
        
        for file_status in source_folder:
            file_list.append(str(file_status.getPath()))
             spark = SparkSession.builder.master('local').appName('test').getOrCreate()
        
        for file_status in file_list:
            df = spark.read.parquet(file_status)
            df.coalesce(50).write.option("path", target_path).mode('append').format('parquet').saveAsTable(desired_table)

这就是错误:

An error occurred while calling o114.saveAsTable.
: java.io.IOException: Illegal file pattern: Unclosed character class near index 43
[Lorg.apache.hadoop.fs.FileStatus;@38099a5e
                                           ^
    at org.apache.hadoop.fs.GlobFilter.init(GlobFilter.java:71)
    at org.apache.hadoop.fs.GlobFilter.<init>(GlobFilter.java:50)
    at org.apache.hadoop.fs.Globber.glob(Globber.java:192)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1697)
    at org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:244)
    at org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:254)
    at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:707)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)

我在上面花了几个小时,欢迎提出建议!

8tntrjer

8tntrjer1#

your target_path = fs.listStatus(Path('XXX '))不在类型字符串中,请检查文档返回的对象类型。日志显示saveAsTable处的错误,因为spark正在缓慢执行您的代码,并且仅在saveAsTable处触发操作。
另外,我不明白您为什么要在循环中创建sparksession

for file_status in source_folder:
            file_list.append(str(file_status.getPath()))
             spark = SparkSession.builder.master('local').appName('test').getOrCreate()

我想这里只需要一个sparksession

相关问题