scala 将结果集转换为数据框

eqqqjvef  于 2023-06-06  发布在  Scala
关注(0)|答案(3)|浏览(218)

我很希望你们中有人能指导我将scala(或java)Resultset转换为spark Dataframe。
我不能使用这个符号:

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/")
  .option("dbtable", "pg_partner")
  .option("user", "XXX")
  .option("password", "XXX")
  .load()

所以在提到this similar question之前,请考虑一下。
我不能使用这个符号的原因是我需要使用一个jdbc配置,而这个配置在我使用的spark当前版本(2.2.0)中不存在,因为我想使用一个最近添加到spark版本2.4中的“queryTimeout”选项,所以我需要在ResultSet中使用它。
任何帮助将不胜感激。
感谢您的评分

uemypmqf

uemypmqf1#

针对公共源mySQL的工作示例

import java.util.Properties
import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.implicits.

val url = "jdbc:mysql://mysql-rfam-public.ebi.ac.uk:4497/Rfam"
val username = "rfamro"
val password = ""
val myRDD = new JdbcRDD( sc, () => DriverManager.getConnection(url, username, password), "select rfam_id, noise_cutoff from family limit ?, ?", 1, 100, 10,                  
                    r => r.getString("rfam_id") + ", " + r.getString("noise_cutoff"))
val DF = myRDD.toDF
DF.show

返回:

+-------------------+
|              value|
+-------------------+
|    5_8S_rRNA, 41.9|
|           U1, 39.9|
|           U2, 45.9|
|         tRNA, 28.9|
|        Vault, 33.9|
|          U12, 52.9|
....
....
de90aj5v

de90aj5v2#

给予这个
(没有尝试过,但应该可以稍微修改一下)

import java.sql.ResultSet
import org.apache.spark.sql.DataFrame

// assuming ResultSet comprises rows of (String, Int)
def resultSetToDataFrame(resultSet: ResultSet): DataFrame = {
  val resultSetAsList: List[(String, Int)] = new Iterator[(String, Int)] {
    override def hasNext: Boolean = resultSet.next()

    override def next(): (String, Int) = {
      // can also use column-label instead of column-index
      (resultSet.getString(0), resultSet.getInt(1))
    }
  }.toStream.toList

  import org.apache.spark.implicits._
  val listAsDataFrame: DataFrame = resultSetAsList.toDF("column_name_1", "column_name_2")

  listAsDataFrame
}

参考文献:

ddrv8njm

ddrv8njm3#

def parallelizeResultSet(spark: SparkSession, rs: ResultSet): DataFrame = {

  def parseResultSet(rs: ResultSet): Row = {
    val columns = Seq.range(1, rs.getMetaData.getColumnCount + 1).map(i => rs.getMetaData.getColumnName(i))
    val resultSetRecord = columns.map(c => rs.getString(c))
    Row(resultSetRecord:_*)
  }

  def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] = new Iterator[Row] {
    def hasNext: Boolean = rs.next()
    def next(): Row = f(rs)
  }

  val rdd = spark.sparkContext.parallelize(resultSetToIter(rs)(parseResultSet).toSeq)
  val schema = StructType(Seq.range(1, rs.getMetaData.getColumnCount + 1).map(i => {StructField(rs.getMetaData.getColumnName(i), StringType, true)}))
  spark.createDataFrame(rdd, schema)
}

parallelizeResultSet(spark,rs)

相关问题