请找到下面的代码片段,我使用的是s3a。在hadoop配置中设置访问令牌、机密和bucket的端点。fs.s3a.endpoint的值是-s3.us-east-2.amazonaws.com
spark_session = SparkSession.builder\
.appName("Clear_Backup_Old_Statistics")\
.getOrCreate()
sc = spark_session.sparkContext
sqlContext = SQLContext(sc)
filter_date = datetime.utcnow()-timedelta(days=150)
query = f"(select * from {config_data.DAILY_TABLE} where end_time<='{filter_date}' ) as t"
df = sqlContext.read.format("jdbc").\
option("url", f"jdbc:mysql://{config_data.MYSQL_DB}?characterEncoding=latin1").option("driver", "com.mysql.jdbc.Driver").\
option("dbtable", query)\
.option('user',config_data.MYSQL_DB_USER).option('password',config_data.MYSQL_DB_PASSWORD).load()
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", config_data.AWS_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config_data.AWS_ACCESS_SECRET)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", config_data.AWS_S3_ENDPOINT)
df.show()
file_name = 'backup_'+str(datetime.utcnow())
df.write.option('header','true').csv(f's3a://{config_data.AWS_BACKUP_BUCKET}/{file_name}.csv')
暂无答案!
目前还没有任何答案,快来回答吧!