我有一个脚本,我想运行它来遍历有大约4m条记录的mysql表,对于每一个,我执行另一个调用另一个mysql表来获取更多的数据,从这些数据我创建一个新的对象并将其插入mongodb。
所以基本上我想执行从mysql到mongodb的迁移
我正在使用quill,它是一个在scala项目(qdsl)上使用sql的库。
my script class is very short and looks like:
class MigrateScript @Inject()(dao: PeopleDao) {
lazy val ctx = new MysqlAsyncContext(SnakeCase, "mysql")
import ctx._
def getNextPerson(idx: Int): Future[Person] = {
val q = quote {
query[Person].drop(lift(idx)).take(1)
}
ctx.run(q.take(1)).map(_.head) recover {
case t: NoSuchElementException =>
println(s"problem with index: $idx")
throw new RuntimeException
}
}
def getPersonKids(personId: Int): Future[List[Kid]] = {
val q = quote {
query[Kid].filter(kid => kid.parent_id == lift(personId))
}
ctx.run(q)
}
def runScript(numberOfRecords: Int): Unit = {
for (a <- 0 to numberOfRecords) {
getNextPerson(a).map(person => {
getPersonKids(person.id).map(kids => {
// create obj and insert to mongo
val parentWithKidsObjectToInsert = // creating new object using person & kids
dao.insert(parentWithKidsObjectToInsert) // this returns Future[String]
})
})
}
}
}
要运行它,我从我的控制器执行如下操作:
def insertMySqlRecordsToMongo(numberOfRecords: Int) = Action { request =>
mScript.runScript(numberOfRecords)
Ok
}
问题:
当我这样运行它时,脚本在100+条记录后被卡住,我的日志中出现错误:
java.util.concurrent.timeoutexception:未来在[5秒]后超时
以及
WARN [ousekeeper] - c.z.h.p.HikariPool(755) - HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m51s17ms).
感觉应用程序的运行速度超过了mysql连接池的处理速度。。。
所以我试着在getnextperson上面添加await.result,它工作得很好,但是速度很慢。它每分钟只插入300条记录,而超过400万条记录需要几天的时间。。。
有什么解决办法吗?感谢所有花时间理解这一点的人:)
1条答案
按热度按时间bxfogqkk1#
我真的,真的建议你研究一下spark来做到这一点,我听起来像是一个典型的etl用例。问题是,您正在将数千条记录具体化到内存中,这会杀死gc并使未来的工作停滞。而且你必须一张一张地做这件事,这使得它非常,非常慢。如果将其加载到sparkDataframe中,我将占用更少的空间,因为spark实际上不会将记录具体化到内存中(它们使用一个非常紧凑的二进制内存序列化,如果需要的话,它会“溢出”到磁盘),这样可以使堆免于gc湮灭。它还并行地对许多记录执行加载和转换。它将给你的性能特点,使你的问题容易处理。
我大概会这么做:
使用
spark.read.jdbc
通过父记录联接数据集和组使用mongospark收集器将记录写到mongodb
代码本身应该如下所示:
您可能需要以下sbt依赖项:
祝你好运!