scala—使用play framework从mysql读取并插入mongo的脚本运行问题(多线程问题)

klr1opcd  于 2021-06-15  发布在  Mysql
关注(0)|答案(1)|浏览(377)

我有一个脚本,我想运行它来遍历有大约4m条记录的mysql表,对于每一个,我执行另一个调用另一个mysql表来获取更多的数据,从这些数据我创建一个新的对象并将其插入mongodb。
所以基本上我想执行从mysql到mongodb的迁移
我正在使用quill,它是一个在scala项目(qdsl)上使用sql的库。

my script class is very short and looks like:

class MigrateScript @Inject()(dao: PeopleDao) {

  lazy val ctx = new MysqlAsyncContext(SnakeCase, "mysql")

  import ctx._

      def getNextPerson(idx: Int): Future[Person] = {
        val q = quote {
          query[Person].drop(lift(idx)).take(1)
        }
        ctx.run(q.take(1)).map(_.head) recover {
          case t: NoSuchElementException =>
            println(s"problem with index: $idx")
            throw new RuntimeException
        }
      }

      def getPersonKids(personId: Int): Future[List[Kid]] = {
        val q = quote {
          query[Kid].filter(kid => kid.parent_id == lift(personId))
        }
        ctx.run(q)
      }

      def runScript(numberOfRecords: Int): Unit = {
        for (a <- 0 to numberOfRecords) {
          getNextPerson(a).map(person => {
            getPersonKids(person.id).map(kids => {
              // create obj and insert to mongo
              val parentWithKidsObjectToInsert = // creating new object using person & kids

              dao.insert(parentWithKidsObjectToInsert) // this returns Future[String]
            })
          })
        }
      }

}

要运行它,我从我的控制器执行如下操作:

def insertMySqlRecordsToMongo(numberOfRecords: Int) = Action { request =>
    mScript.runScript(numberOfRecords)
    Ok
  }

问题:
当我这样运行它时,脚本在100+条记录后被卡住,我的日志中出现错误:
java.util.concurrent.timeoutexception:未来在[5秒]后超时
以及

WARN   [ousekeeper] - c.z.h.p.HikariPool(755)        - HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m51s17ms).

感觉应用程序的运行速度超过了mysql连接池的处理速度。。。
所以我试着在getnextperson上面添加await.result,它工作得很好,但是速度很慢。它每分钟只插入300条记录,而超过400万条记录需要几天的时间。。。
有什么解决办法吗?感谢所有花时间理解这一点的人:)

bxfogqkk

bxfogqkk1#

我真的,真的建议你研究一下spark来做到这一点,我听起来像是一个典型的etl用例。问题是,您正在将数千条记录具体化到内存中,这会杀死gc并使未来的工作停滞。而且你必须一张一张地做这件事,这使得它非常,非常慢。如果将其加载到sparkDataframe中,我将占用更少的空间,因为spark实际上不会将记录具体化到内存中(它们使用一个非常紧凑的二进制内存序列化,如果需要的话,它会“溢出”到磁盘),这样可以使堆免于gc湮灭。它还并行地对许多记录执行加载和转换。它将给你的性能特点,使你的问题容易处理。
我大概会这么做:
使用 spark.read.jdbc 通过父记录联接数据集和组
使用mongospark收集器将记录写到mongodb
代码本身应该如下所示:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import com.mongodb.spark._
// probably import some other stuff

SparkSession.builder()
      .master("local")
      .appName("Load records to mongo")
       // Configure the spark->mongo connector
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
      .enableHiveSupport()
      .getOrCreate()

case class PersonWithKids(person:Person, kids:List[Kid])

// make sure the MySQL jdbc driver is not the classpath because spark is about to use it
val people = spark.jdbc.read("(select * from people) as data", "jdbc://...").as[Person]
val kids = spark.jdbc.read("(select * from kids) as data", "jdbc://...").as[Kid]
val joined = 
  people
    .joinWith(kids, people("id") === kids("parent_id"))
    .map({case (person, kid) => PersonWithKids(person, List(kid))})
    .groupByKey(_.person)
    .flatMapGroups({case (person, personWithKidIter) => 
        PersonWithKids(person, personWithKidIter.toList.flatMap(_.kids))
    })

// make sure you did stuff correctly
// joined.show()
// joined.take(100).write.json("someFileWhereYouCanDoubleCheck.json")

MongoSpark.save(joined)

您可能需要以下sbt依赖项:

"org.apache.spark" %% "spark-core" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-hive" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-sql" % "2.3.1" // or 2.4.0 might be latest now
"org.mongodb.spark" %% "mongo-spark-connector" % "2.3.1" // or 2.4.0 might be latest now

祝你好运!

相关问题