因此,我试图将一个表从一个路径移动到另一个路径,我得到一个错误,说我得到了未关闭的字符类错误,不能弄清楚这一点。
这是我的代码:
import os
import subprocess
from pyspark.sql.functions import input_file_name, regexp_replace from pyspark.sql import SparkSession
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(URI("some_url/some_ip"), Configuration())
desired_table = "some_table"
source_db = "some_source"
target_db = "some_target"
file_list = []
fixed_location = some_source+ "/" + some_table+ "/current"
target_path = fs.listStatus(Path('XXX')); source_folder = fs.listStatus(Path('XXX' + fixed_location))
for file_status in source_folder:
file_list.append(str(file_status.getPath()))
spark = SparkSession.builder.master('local').appName('test').getOrCreate()
for file_status in file_list:
df = spark.read.parquet(file_status)
df.coalesce(50).write.option("path", target_path).mode('append').format('parquet').saveAsTable(desired_table)
这就是错误:
An error occurred while calling o114.saveAsTable.
: java.io.IOException: Illegal file pattern: Unclosed character class near index 43
[Lorg.apache.hadoop.fs.FileStatus;@38099a5e
^
at org.apache.hadoop.fs.GlobFilter.init(GlobFilter.java:71)
at org.apache.hadoop.fs.GlobFilter.<init>(GlobFilter.java:50)
at org.apache.hadoop.fs.Globber.glob(Globber.java:192)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1697)
at org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:244)
at org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:254)
at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:707)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
我在上面花了几个小时,欢迎提出建议!
1条答案
按热度按时间8tntrjer1#
your target_path = fs.listStatus(Path('XXX '))不在类型字符串中,请检查文档返回的对象类型。日志显示saveAsTable处的错误,因为spark正在缓慢执行您的代码,并且仅在saveAsTable处触发操作。
另外,我不明白您为什么要在循环中创建sparksession
我想这里只需要一个sparksession