我的问题基于上一个:scalaplayframework不是所有的db查询都能执行
对于较小的文件,代码运行良好。但是对于更大的文件(>500000行json),它会毫无问题地停止。对于同一个文件,它总是在同一个json条目处停止。注意:这个过程需要很多时间。
我在本地计算机或服务器上以及本地mysql数据库或aws数据库上尝试了我的应用程序。我还尝试了不同的slick配置和jvm xms、xmx和xss值,但没有得到任何不同的结果。
这是我的灵活配置:
slick {
dbs {
default {
profile = "slick.jdbc.MySQLProfile$"
db {
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://url:3306/db"
user = "user"
password = "password"
queueSize = 1000
numThreads = 50
connectionTimeout = 30000
maximumPoolSize = 100
cacheServerConfiguration=true
rewriteBatchedStatements=true
useServerPrepStmts=true
cachePrepStmts=true
prepStmtCacheSize=500
prepStmtCacheSqlLimit=50000
}
}
}
有什么建议可以改进我的代码吗?正如我提到的,如果我减小文件大小,一切都正常。如何加快db插入速度或进一步增加缓存大小?
提前谢谢!
----更新----
插入代码: itemSeq
是经过验证的json(这很好用)。
import javax.inject._
import play.Logger
import play.api.db.slick.{DatabaseConfigProvider, HasDatabaseConfigProvider}
import play.api.mvc.{AbstractController, ControllerComponents}
import slick.jdbc.{JdbcProfile, TransactionIsolation}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import
... // DTOs
@Singleton
class ExampleController @Inject()(protected val dbConfigProvider: DatabaseConfigProvider, cc: ControllerComponents,
otherRestController: OtherRestController,
... // DTOs
) extends AbstractController(cc) with HasDatabaseConfigProvider[JdbcProfile] {
import profile.api._
def insertEntries(allEntries: exampleModel): Future[Seq[Any]] = {
val insertActions = allEntries.seqEntries.map(itemOfSeq => {
if (itemOfSeq.pbDate.isDefined &&
itemOfSeq.value1.descr.isDefined && itemOfSeq.value1.descr.get.descr_data.isDefined && itemOfSeq.value1.descr.get.descr_data.get.head.value.isDefined &&
itemOfSeq.value2.isDefined
&& itemOfSeq.value1.aff.value3.value3_data.nonEmpty && itemOfSeq.value1.aff.value3.value3_data.head.value4.value4_data.nonEmpty) {
val pbDateSql: Date = new Date(format.parse(itemOfSeq.pbDate.get).getTime)
val modDateSql: Date = new Date(format.parse(itemOfSeq.modDate.get).getTime)
val result = for (
value3Entry <- value3DTO.getOrCreate(Value3_Model(None, itemOfSeq.value1.aff.value3.value3_data.head.name));
value1Exists <- value1DTO.checkIfEntryExists(itemOfSeq.value1.meta_data.ID);
value1Entry <- value1DTO.getOrCreate(Value1_Model(None, itemOfSeq.value1.meta_data.ID, value3Entry.value3Id.get, itemOfSeq.value1.descr.get.descr_data.get.head.value.get,
pbDateSql, modDateSql), value1Exists);
value4Entry <- value4DTO.getOrCreate(Value4_Model(None, value3Entry.value3Id.get, itemOfSeq.value1.aff.value3.value3_data.head.value4.value4_data.head.v4value1))
) yield (value1Exists, value1Entry, value4Entry)
result.flatMap({
case (value1Exists, value1Entry, value4Entry) => {
if (!value1Exists) {
otherRestController.checkAllMatches(itemOfSeq)
DBIO.sequence(itemOfSeq.value1.values5.value5_data.map(val5 => {
var value5 = Value5_Model(None, val5.v5val1, val5.v5val2)
if (value5.v5val2.contains("http")) {
value5 = Value5_Model(None, val5.v5val2, val5.v5val1)
}
value5DTO
.getOrCreate(value5)
.flatMap(v1v5 => {
value1Value5DTO.getOrCreate(∂
Value1_Value5_Model(value1Entry.value1Id.get,
v1v5.v5Id.get)
).andThen(DBIO.sequence(val5.values6.map(val6 => {
Value6DTO.getOrCreate(Value6_Model(None, val6)).flatMap(v6 => {
value5value6DTO.getOrCreate(Value5_Value6_Model(v1v5.v5Id.get, v6.v6Id.get))
})
})))
})
})
).>>(DBIO.sequence(itemOfSeq.value1.aff.value3.value3_data.head.value4.value4_data.head.value7.value7_data.map(val7 => {
value7DTO.getOrCreate(Value6_Model(None, value4Entry.value4Id.get, val7.v4val1)).flatMap(v7 => {
Value1Value7DTO.getOrCreate(Value1_Value7_Model(value1Entry.value1Id.get, v7.value7Id.get))
})
}))
).>>(if (itemOfSeq.value2.get.value8.isDefined && itemOfSeq.value2.get.value9.isDefined) {
for {
i <- value10DTO
.getOrCreate(Value10_Model(None, value1Entry.value1Id.get))
v8 <- value8DTO.getOrCreate(
Value8_Model(None, i.value10Id.get, itemOfSeq.value2.get.value8.get.value8_data.v8val1,
itemOfSeq.value2.get.value8.get.value8_data.v8val2, itemOfSeq.value2.get.value8.get.value8_data.v8val3)
)
v9 <- baseMetricV3DTO.getOrCreate(
Value9_Model(None, i.value10Id.get, itemOfSeq.value2.get.value9.get.value9_data.v9val1,
itemOfSeq.value2.get.value9.get.value9_data.v9val2, itemOfSeq.value2.get.value9.get.value9_data.v9val3)
)
} yield (i, v8, v9)
}
else if (itemOfSeq.value2.get.value8.isDefined && itemOfSeq.value2.get.value9.isEmpty) {
for {
i <- value10DTO
.getOrCreate(Value10_Model(None, value1Entry.value1Id.get))
v8 <- value8DTO.getOrCreate(
Value8_Model(None, i.value10Id.get, itemOfSeq.value2.get.value8.get.value8_data.v8val1,
itemOfSeq.value2.get.value8.get.value8_data.v8val2, itemOfSeq.value2.get.value8.get.value8_data.v8val3)
)
} yield (i, v8)
}
else {
for {
i <- value10DTO
.getOrCreate(Value10_Model(None, value1Entry.value1Id.get))
v9 <- baseMetricV3DTO.getOrCreate(
Value9_Model(None, i.value10Id.get, itemOfSeq.value2.get.value9.get.value9_data.v9val1,
itemOfSeq.value2.get.value9.get.value9_data.v9val2, itemOfSeq.value2.get.value9.get.value9_data.v9val3)
)
} yield (i, v9)
})
} else {
DBIO.sequence(itemOfSeq.value1.values5.value5_data.map(val5 => {
var value5 = Value5_Model(None, val5.v5val1, val5.v5val2)
if (value5.v5val2.contains("http")) {
value5 = Value5_Model(None, val5.v5val2, val5.v5val1)
}
value5DTO
.getOrCreate(value5)
.flatMap(v1v5 => {
value1Value5DTO.getOrCreate(
Value1_Value5_Model(value1Entry.value1Id.get,
v1v5.v5Id.get)
).andThen(DBIO.sequence(val5.values6.map(val6 => {
Value6DTO.getOrCreate(Value6_Model(None, val6)).flatMap(v6 => {
value5value6DTO.getOrCreate(Value5_Value6_Model(v1v5.v5Id.get, v6.v6Id.get))
})
})))
})
})
).>>(if (itemOfSeq.value2.get.value8.isDefined && itemOfSeq.value2.get.value9.isDefined) {
for {
i <- value10DTO
.getOrCreate(Value10_Model(None, value1Entry.value1Id.get))
v8 <- value8DTO.getOrCreate(
Value8_Model(None, i.value10Id.get, itemOfSeq.value2.get.value8.get.value8_data.v8val1,
itemOfSeq.value2.get.value8.get.value8_data.v8val2, itemOfSeq.value2.get.value8.get.value8_data.v8val3)
)
v9 <- baseMetricV3DTO.getOrCreate(
Value9_Model(None, i.value10Id.get, itemOfSeq.value2.get.value9.get.value9_data.v9val1,
itemOfSeq.value2.get.value9.get.value9_data.v9val2, itemOfSeq.value2.get.value9.get.value9_data.v9val3)
)
} yield (i, v8, v9)
}
else if (itemOfSeq.value2.get.value8.isDefined && itemOfSeq.value2.get.value9.isEmpty) {
for {
i <- value10DTO
.getOrCreate(Value10_Model(None, value1Entry.value1Id.get))
v8 <- value8DTO.getOrCreate(
Value8_Model(None, i.value10Id.get, itemOfSeq.value2.get.value8.get.value8_data.v8val1,
itemOfSeq.value2.get.value8.get.value8_data.v8val2, itemOfSeq.value2.get.value8.get.value8_data.v8val3)
)
} yield (i, v8)
}
else {
for {
i <- value10DTO
.getOrCreate(Value10_Model(None, value1Entry.value1Id.get))
v9 <- baseMetricV3DTO.getOrCreate(
Value9_Model(None, i.value10Id.get, itemOfSeq.value2.get.value9.get.value9_data.v9val1,
itemOfSeq.value2.get.value9.get.value9_data.v9val2, itemOfSeq.value2.get.value9.get.value9_data.v9val3)
)
} yield (i, v9)
})
}
}
case _ => DBIO.failed(new Exception("Not all entries defined"))
})
}
else {
DBIO.successful("Not all objects defined - skipping")
}
})
db.run(DBIO.sequence(insertActions).transactionally.withTransactionIsolation(TransactionIsolation.Serializable))
}
}
getorcreate示例代码:
def getOrCreate(model: Value_Model): DBIO[Value_Model] = {
for {
exists <- valueTable.filter(b => b.v1val1 === model.v1val1).exists.result
entry <- if (exists) {
valueTable.filter(b => b.v1val1 === model.v1val1)
.map(b => (b.v1val1, b.v1val2, b.v1val3)
.update((model.v1val1, model.v1val2, model.v1val3))
.>>(valueTable.filter(b => b.v1val1 === model.v1val1).result.head)
} else
(valueTable
returning valueTable.map(_.value1Id)
into ((params, id) => params.copy(value1Id = Some(id)))
) += model
} yield entry
}
整个代码只针对较小的文件运行/ seqEntries
没有任何问题。今天我又用>10000测试了一次 itemOfSeq
以及大于100mb的文件大小。验证等工作正常。在测试期间,我还尝试了标准的slick配置,但这对结果也没有影响:
slick {
dbs {
default {
profile = "slick.jdbc.MySQLProfile$"
db {
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://url:3306/db"
user = "user"
password = "password"
connectionPool = disabled
}
}
}
它总是在同一时间停止 itemOfSeq
,没有错误或警告。
我已经登录了 slick.jdbc
例如:
<logger name="slick.jdbc.JdbcBackend.statement" level="DEBUG"/>
<logger name="slick.jdbc.JdbcBackend.parameter" level="DEBUG"/>
<logger name="slick.jdbc.JdbcBackend.benchmark" level="DEBUG"/>
日志文件也不包含任何错误/警告。
即使db insert进程似乎已停止或终止,应用程序仍然可以访问。
希望这段代码能帮助您进一步调试。提前谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!