如何使用sparksql及其执行引擎来查询配置单元数据库和表,而不调用配置单元执行引擎的任何部分?

oogrdqng  于 2021-06-27  发布在  Hive
关注(0)|答案(0)|浏览(313)

我已经创建了select和join语句,可以在hivecli和/或beelinecli和/或spark(2.3.1)中运行 enableHiveSupport=TRUE . (注意:我正在使用sparkr作为我的api)
使用beeline进行连接和写入需要30分钟,但是使用spark进行连接和写入需要30分钟 enableHiveSupport=TRUE 需要3.5小时。这要么意味着Spark和它的连接器是垃圾,要么我没有使用Spark的方式,我应该。。。我读到的所有关于spark's best thing since sliced bread'的评论都意味着我可能没有正确使用它。
我想从Hive的table上看书,但我不想Hive做任何事。我想在每月的数据上运行连接,在每个记录的每月增量上运行回归,然后输出我的最终斜率/beta到parquet中的一个输出表,如果需要的话,它可以从hive读取。。。最好采用与我从配置单元中将表作为输入数据进行分区的相同方式进行分区。
这是一些代码,按要求。。。但我不认为你会学到什么。大数据查询不会得到可重复的结果。

  1. Sys.setenv(SPARK_HOME="/usr/hdp/current/spark2-client")
  2. sessionInfo()
  3. library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
  4. sparkR.stop()
  5. Sys.setenv(SPARKR_SUBMIT_ARGS="--master yarn sparkr-shell") #--master yarn-client sparkr-shell
  6. Sys.setenv(LOCAL_DIRS="/tmp")
  7. config = list()
  8. config$spark.cores.max <- 144L
  9. config$spark.executor.cores <- 2L
  10. config$spark.executor.memory <- '8g'
  11. config$spark.driver.cores <- 6L
  12. config$spark.driver.maxResultSize <-"0"
  13. config$spark.driver.memory <- "32g"
  14. config$spark.shuffle.service.enabled<-TRUE
  15. config$spark.dynamicAllocation.enabled <-FALSE
  16. config$spark.scheduler.mode <- 'FIFO'
  17. config$spark.ui.port<-4044L
  18. sparkR.session(master = "yarn",
  19. sparkHome = Sys.getenv("SPARK_HOME"),
  20. sparkConfig = config,
  21. enableHiveSupport = TRUE)
  22. print("Connected!")
  23. ############ SET HIVE CONFIG
  24. collect(sql("SET hive.exec.dynamic.partition") )
  25. sql("SET hive.exec.dynamic.partition=true")
  26. collect(sql("SET hive.exec.dynamic.partition.mode"))
  27. sql("SET hive.exec.dynamic.partition.mode=nonstrict")
  28. ##
  29. start_time <- Sys.time()
  30. ############### READ IN DATA {FROM HIVE}
  31. sql('use historicdata')
  32. data_tables<-collect(sql('show tables'))
  33. exporttabs <- grep(pattern = 'export_historic_archive_records',x = data_tables$tableName,value = TRUE)
  34. jointabs<-sort(exporttabs)[length(exporttabs)-(nMonths-1):0]
  35. currenttab<-jointabs[6]
  36. ############### CREATE TABLE AND INSERT SCRIPTS
  37. sql(paste0('use ',hivedb))
  38. sql(paste0('DROP TABLE IF EXISTS histdata_regression',tab_suffix))
  39. sSelect<-paste0("Insert Into TABLE histdata_regression",tab_suffix," partition (scf) SELECT a.idkey01, a.ssn7")
  40. sCreateQuery<-paste0("CREATE TABLE histdata_regression",tab_suffix," (idkey01 string, ssn7 string")
  41. sFrom<-paste0("FROM historicdata.",jointabs[nMonths]," a")
  42. sAlias<-letters[nMonths:1]
  43. DT <- gsub(pattern = "export_historic_archive_records_",replacement = "",jointabs)
  44. DT<-paste0(DT)
  45. for (i in nMonths:1) {
  46. sSelect<-paste0(sSelect,", ",sAlias[i],".",hdAttr," as ",hdAttr,"_",i,", ",sAlias[i],".recordid as recordid_",DT[i])
  47. sCreateQuery<-paste0(sCreateQuery,", ",hdAttr,"_",i," int, recordid_",DT[i]," int")
  48. if (i==1) sCreateQuery<-paste0(sCreateQuery,') PARTITIONED BY (scf string) STORED AS ORC')
  49. if (i==1) sSelect<-paste0(sSelect,", a.scf")
  50. if (i!=nMonths) sFrom<-paste0(sFrom," inner join historicdata.",jointabs[i]," ",sAlias[i]," on ",
  51. paste(paste0(paste0("a.",c("scf","idkey01","ssn7")),"=",
  52. paste0(sAlias[i],".",c("scf","idkey01","ssn7"))),collapse=" AND "))
  53. }
  54. system(paste0('beeline -u "jdbc:hive2://myserver1.com,myserver2.com,myserver3.com,myserver4.com,myserver5.com/work;\
  55. serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "',sCreateQuery,'"'))
  56. system(paste0("beeline -u \"jdbc:hive2://myserver1.com,myserver2.com,myserver3.com,myserver4.com,myserver5.com/work;\
  57. serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2\" -e \"",sSelect," ",sFrom,"\""))

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题