pythonspark:需要从文件列执行配置单元查询

dauxcl2d  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(260)

我有一个包含如下行的文件(文件名: sample.csv )

  1. Id,Query
  2. T1012,"Select * from employee_dim limit 100"
  3. T1212,"Select * from department_dim limit 100"
  4. T1231,"Select dept_number,location,dept_name from locations"

我需要遍历这个文件( sample.csv )并获取第二列(“查询”),在配置单元数据库中运行它并获得结果,然后将其保存到名为 T1012_result.csv ,并对所有行执行类似的操作。
你能帮忙吗?
我尝试通过spark读取文件并将其转换为一个列表,然后使用sparksession执行sql查询,但它不起作用。

  1. from pyspark.sql import SparkSession,HiveContext
  2. spark=SparkSession.builder.enableHiveSupport().getOrCreate()
  3. spark.sql("use sample")
  4. input=spark.read.csv("sample.csv")
  5. # input.select('_c1').show()
  6. import pandas as pd
  7. a=input.toPandas().values.tolist()
  8. for i in a :
  9. print i[1]
  10. spark.sql('pd.DataFrame(i)')
ivqmmu1c

ivqmmu1c1#

更新:spark

  1. file_path="file:///user/vikrant/inputfiles/multiquery.csv"
  2. df=spark.read.format("com.databricks.spark.csv").option("header", "true").load(file_path)
  3. +---+-------------------------------+
  4. |id |query |
  5. +---+-------------------------------+
  6. |1 |select * from exampledate |
  7. |2 |select * from test |
  8. |3 |select * from newpartitiontable|
  9. +---+-------------------------------+
  10. def customFunction(row):
  11. for row in df.rdd.collect():
  12. item=(row[1])
  13. filename=(row[0])
  14. query=""
  15. query+=str(item)
  16. newdf=spark.sql(query)
  17. savedataframe(newdf,filename)
  18. def savedataframe(newdf,filename):
  19. newdf.coalesce(1).write.csv("/user/dev/hadoop/external/files/file_" + filename + ".csv")
  20. customFunction(df)
  21. drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_1.csv
  22. drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_2.csv
  23. drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_3.csv

更新:使用pandas,我在sql server上没有几个测试表,我正在将它们读取到pandas dataframe中,正如您在问题中提到的,并将查询结果保存到每个不同的文件中,重命名为dataframe的第一列:

  1. import pandas as pd
  2. import pyodbc
  3. from pandas import DataFrame
  4. connection = pyodbc.connect('Driver={ODBC Driver 13 for SQL Server};SERVER=yourservername;DATABASE=some_db;UID=username;PWD=password')
  5. cursor = connection.cursor()
  6. data=[['1','select * from User_Stage_Table'],['2','select * from User_temp_Table']]
  7. df=pd.DataFrame(data,columns=['id','query'])
  8. def get_query(df):
  9. a=df.values.tolist()
  10. for i in a:
  11. query=i[1] #reading second column value as query
  12. filename=i[0] #reading first column value as filename
  13. write_query(query,filename) #calling write_query function
  14. def write_query(query,filename):
  15. df=pd.read_sql_query(query,connection)
  16. df.to_csv(outfile_location+filename+".txt",sep=',',encoding='utf-8',index=None,mode='a')
  17. get_query(df) #calling get_query function to build the query
  18. out_file_location='G:\Testing\OutputFile\outfile'

输出文件名为: outfile1.txt #这将有表的数据
User_Stage_Table outfile2.txt #这将有表的数据 User_temp_Table' 让我知道如果这解决了你的问题或面临任何问题进一步。

展开查看全部

相关问题