这个问题在这里已经有答案了:
配置单元合并命令在spark hivecontext中不起作用(2个答案)
三年前关门了。
我正在运行pyspark的合并查询,但是spark无法识别关键字“merge”。
17/11/27 14:39:34 ERROR JobScheduler: Error running job streaming job 1511793570000 ms.1
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/hdp/2.6.1.0-
129/spark2/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call
r = self.func(t, *rdds)
File "/usr/hdp/2.6.1.0-129/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in <lambda>
func = lambda t, rdd: old_func(rdd)
File "/usr/repos/dataconnect/connect/spark/stream_kafka_consumer.py", line 66, in sendRecord
COLUMNS='sub.id, sub.name, sub.age'))
File "/usr/hdp/2.6.1.0-129/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", line 384, in sql
return self.sparkSession.sql(sqlQuery)
File "/usr/hdp/2.6.1.0-129/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 545, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/hdp/2.6.1.0-129/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/2.6.1.0-129/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nmismatched input 'merge' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)\n\n== SQL ==\nmerge into customer_partitioned using (select case when all_updates.age <> customer_partitioned.age then 1 else 0 end as delete_flag, all_updates.id as match_key, all_updates.* from all_updates left join customer_partitioned on all_updates.id = customer_partitioned.id union all select 0, null, all_updates.* from all_updates, customer_partitioned where all_updates.id = customer_partitioned.id ) sub on customer_partitioned.id = sub.match_key when matched and delete_flag=1 then delete when matched and delete_flag=0 then update set name=sub.name when not matched then insert values(sub.id, sub.name, sub.age);\n^^^\n"
我可以将该查询直接复制到配置单元视图中,它将毫无问题地运行。
merge into customer_partitioned using (select case when all_updates.age <> customer_partitioned.age then 1 else 0 end as delete_flag, all_updates.id as match_key, all_updates.* from all_updates left join customer_partitioned on all_updates.id = customer_partitioned.id union all select 0, null, all_updates.* from all_updates, customer_partitioned where all_updates.id = customer_partitioned.id ) sub on customer_partitioned.id = sub.match_key when matched and delete_flag=1 then delete when matched and delete_flag=0 then update set name=sub.name when not matched then insert values(sub.id, sub.name, sub.age);
我的代码如下:
from pyspark.sql import HiveContext
sqlcontext = HiveContext(sc)
sql = 'merge into customer_partitioned using (select case when all_updates.age <> customer_partitioned.age then 1 else 0 end as delete_flag, all_updates.id as match_key, all_updates.* from all_updates left join customer_partitioned on all_updates.id = customer_partitioned.id union all select 0, null, all_updates.* from all_updates, customer_partitioned where all_updates.id = customer_partitioned.id ) sub on customer_partitioned.id = sub.match_key when matched and delete_flag=1 then delete when matched and delete_flag=0 then update set name=sub.name when not matched then insert values(sub.id, sub.name, sub.age);'
sqlcontext.sql(sql)
1条答案
按热度按时间nr7wwzry1#
我可以将该查询直接复制到配置单元视图中,它将毫无问题地运行。
spark不是配置单元(即使启用了配置单元支持)。它的查询语言是为实现sql03标准的一个子集而设计的,只与hql保持部分兼容性。
因此,不支持配置单元的许多功能,包括
MERGE
以及一般的更新或细粒度插入。热释光;dr仅仅因为你可以在hive中做一些事情并不意味着你可以在sparksql中做同样的事情。