从jdbc源代码迁移数据时如何优化分区?

oxalkeyp  于 2021-06-25  发布在  Hive
关注(0)|答案(3)|浏览(496)

我正在尝试将数据从postgresql表中的表移动到hdfs上的配置单元表。为此,我想出了以下代码:

val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

数据插入到配置单元表中,并根据 prtn_String_columns: source_system_name, period_year, period_num 使用的spark submit:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

在执行器日志中生成以下错误消息:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

我在日志中看到,在给定的分区数下,读取正在正确执行,如下所示:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

以下是各阶段执行人的状态:




数据分区不正确。一个分区变小,而另一个分区变大。这里有一个倾斜的问题。将数据插入配置单元表时,作业在以下行失败: spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") 但我知道这是因为数据倾斜的问题。
我尝试增加执行器的数量,增加执行器内存,驱动程序内存,尝试只保存为csv文件,而不是将Dataframe保存到配置单元表中,但给出异常不会影响执行:

java.lang.OutOfMemoryError: GC overhead limit exceeded

代码中有什么我需要更正的吗?有人能告诉我怎么解决这个问题吗?

inn6fuwd

inn6fuwd1#

在给定输入数据量和集群资源的情况下,确定需要多少分区。根据经验,除非严格必要,否则最好将分区输入保持在1gb以下。严格小于块大小限制。
您之前已经说过,迁移1tb的数据值(5-70)很可能会降低,以确保过程的顺利进行。
尽量使用不需要进一步修改的值 repartitioning .
了解你的数据。
分析数据集中可用的列,以确定是否有任何具有高基数且分布均匀的列要分布在所需数量的分区中。这是一个很好的进口过程的候选人。此外,还应确定值的精确范围。
具有不同中心度和偏度度量的聚合以及按键的直方图和基本计数是很好的探索工具。对于这一部分,最好直接分析数据库中的数据,而不是将其提取到spark。
取决于您可能使用的rdbms width_bucket (postgresql,oracle)或等效函数,以获得加载后数据如何在spark中分布的合理想法 partitionColumn , lowerBound , upperBound , numPartitons .

s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
FROM t
GROUP BY bucket) as tmp)"""

如果没有满足上述标准的列,请考虑:
创建一个自定义的,并通过。风景。多个独立列上的散列通常是不错的选择。请参考您的数据库手册来确定这里可以使用的函数( DBMS_CRYPTO 在甲骨文中, pgcrypto 在postgresql中)*。
使用一组相互独立的列可以提供足够高的基数。
或者,如果要写入已分区的配置单元表,应该考虑包含配置单元分区列。它可能会限制以后生成的文件数。
准备分区参数
如果在前面步骤中选择或创建的列是数字(或spark中的日期/时间戳>=2.4),则直接将其作为 partitionColumn 并使用之前确定的范围值进行填充 lowerBound 以及 upperBound .
如果绑定值不能反映数据的属性( min(col) 为了 lowerBound , max(col) 为了 upperBound )它可能会导致严重的数据倾斜,因此请仔细阅读。在最坏的情况下,当边界不覆盖数据范围时,所有记录都将由一台机器获取,这并不比完全没有分区好。
如果在前面的步骤中选择的列是分类的或是一组列,则生成完全覆盖数据的互斥 predicate 列表,其形式可以在 SQL where子句。
例如,如果有一列 A 有价值观{ a1 , a2 , a3 }和列 B 有价值观{ b1 , b2 , b3 }:

val predicates = for {
  a <- Seq("a1", "a2", "a3")
  b <- Seq("b1", "b2", "b3")
} yield s"A = $a AND B = $b"

再次检查条件是否重叠,所有组合是否覆盖。如果不满足这些条件,将分别导致重复记录或丢失记录。
传递数据为 predicates 论据 jdbc 打电话。注意,分区的数量将正好等于 predicate 的数量。
将数据库置于只读模式(任何正在进行的写入都可能导致数据不一致。如果可能的话,您应该在开始整个过程之前锁定数据库,但是如果不可能,则在您的组织中锁定数据库)。
如果分区数与所需的输出负载数据匹配,则 repartition 并直接转储到接收器,如果没有,可以尝试按照步骤1中相同的规则重新分区。
如果仍然遇到任何问题,请确保已正确配置spark内存和gc选项。
如果以上都不起作用:
考虑将数据转储到网络/使用诸如 COPY TO 直接从那里读。
请注意,您通常需要一个与posix兼容的文件系统或标准数据库实用程序,因此hdfs通常不需要。
这种方法的优点是不需要担心列属性,也不需要将数据置于只读模式,以确保一致性。
使用专用的批量传输工具,如apachesqoop,然后重塑数据。

  • 不要在sparkjdbc中使用pseudocolumns-pseudocolumn。
xt0899hw

xt0899hw2#

你的另一个问题是重复的

'How to avoid data skewing while reading huge datasets or tables into spark? 
  The data is not being partitioned properly. One partition is smaller while the 
  other one becomes huge on read.
  I observed that one of the partition has nearly 2million rows and 
  while inserting there is a skew in partition. '

如果问题是处理读取后在Dataframe中分区的数据,您是否考虑过增加“numpartitions”值?

.option("numPartitions",50)
``` `lowerBound, upperBound` 生成的where子句表达式和numpartitions的窗体分区步长决定了拆分的数量。
例如,sometable有column-id(我们选择它作为 `partitionColumn` ) ; 我们在表中看到的列的值范围- `ID` 是从1到1000,我们希望通过运行 `select * from sometable` ,所以我们使用lowerbound=1,upperbound=1000和numpartition=4
通过基于我们的提要构建sql,这将产生一个由4个分区组成的dataframe,其中包含每个查询的结果 `(lowerbound = 1 & upperbound = 1000 and numpartition = 4)` ```
select * from sometable where ID < 250
select * from sometable where ID >= 250 and ID < 500
select * from sometable where ID >= 500 and ID < 750
select * from sometable where ID >= 750

如果我们表中的大多数记录都在 ID(500,750) . 这就是你现在的处境。
当我们增加numpartition时,分割会进一步发生,这会减少同一分区中的记录量,但这不是一个好办法。
而不是Spark分裂 partitioncolumn 根据我们提供的边界,如果您想自己输入分割,那么数据可以被平均分割。您需要切换到另一个jdbc方法,其中 (lowerbound,upperbound & numpartition) 我们可以直接提供 predicate 。

def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame

链接

7vux5j2d

7vux5j2d3#

根据我的经验,有4种不同的记忆设置:
a) [1]用于存储数据以进行处理的内存与[2]用于保存程序堆栈的堆空间
b) [1]驱动程序与[2]执行器内存
到目前为止,通过增加适当的内存,我总能成功地运行spark作业:
因此,a2-b1将是驱动程序上用来保存程序堆栈的可用内存。等。
属性名称如下:
a1-b1) executor-memory a1-b2) driver-memory a2-b1) spark.yarn.executor.memoryOverhead a2-b2) spark.yarn.driver.memoryOverhead 请记住,all*-b1之和必须小于worker上的可用内存,all*-b2之和必须小于driver节点上的内存。
我敢打赌,罪魁祸首是一个醒目标记的堆设置。

相关问题