我正在使用pyspark对hive中的一个表进行文本分析。我使用以下代码
from pyspark.sql import SQLContext, Row, HiveContext
from pyspark.sql.functions import col, udf, StringType
from pyspark.sql.types import *
from pyspark import SparkContext
hc = HiveContext(sc)
df=hc.sql("select * from table1")
def cleaning_text(sentence):
sentence=sentence.lower()
sentence=re.sub('\'',' ',sentence)
cleaned=' '.join([w for w in cleaned.split() if not len(w)<=2 ])
return cleaned
org_val=udf(cleaning_text,StringType())
data=df.withColumn("cleaned",org_val(df.text))
data_1=data.select('uniqueid','cleaned','parsed')#2630789 #2022395
tokenizer = Tokenizer(inputCol="cleaned", outputCol="words")
wordsData = tokenizer.transform(data_1)
hc.sql("SET spark.sql.hive.convertMetastoreParquet=false")
hc.sql("create table table2 (uniqueid string, cleaned string, parsed string)")
wordsData.insertInto('table2')
我能做到
words_data.show(2)
然而,当我试图导出它时,它给了我这个错误
INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
Exception in thread "stdout writer for python" 17/02/02 15:18:44 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Java heap space
我不介意它是否也作为文本文件导出。
2条答案
按热度按时间hsvhsicv1#
在向表中插入时,应该在hivecontext中编写insert语句,因为它正在向配置单元表写入。
hc.sql("SET spark.sql.hive.convertMetastoreParquet=false") hc.sql("create table table2 (uniqueid string, cleaned string, parsed string)") wordsData.registerTempTable("tb1") val df1 = hc.sql("insert into table table2 select * from tb1")
如果上面的方法不起作用或者您不满意,请尝试下面的方法,您可以直接保存一个表(确保已经在您想要的模式中创建了一个表)wordsData.write.mode("append").saveAsTable("sample_database.sample_tablename")
如果您尝试上面的错误,请将错误粘贴到这里,我将进一步帮助您zbq4xfa02#
我在sparkshell上运行这个脚本,默认驱动程序内存为1g。
我在启动sparkshell时运行下面的语句来更改它
这解决了我的问题