将患者分配到斯卡拉spark最近的设施

ippsafx7  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(321)

我有一个数据框如下

+-----------+----------+-----------+--------+-------------+
|LOCATION_ID|PATIENT_ID|FACILITY_ID|DISTANCE|rank_distance|
+-----------+----------+-----------+--------+-------------+
|LOC0001    |P1        |FAC003     |54      |2            |
|LOC0001    |P1        |FAC002     |45      |1            |
|LOC0001    |P2        |FAC003     |54      |2            |
|LOC0001    |P2        |FAC002     |45      |1            |
|LOC0010    |P3        |FAC006     |12      |1            |
|LOC0010    |P3        |FAC003     |54      |4            |
|LOC0010    |P3        |FAC005     |23      |2            |
|LOC0010    |P3        |FAC002     |45      |3            |
|LOC0010    |P4        |FAC002     |45      |3            |
|LOC0010    |P4        |FAC005     |23      |2            |
|LOC0010    |P4        |FAC003     |54      |4            |
|LOC0010    |P4        |FAC006     |12      |1            |
|LOC0010    |P5        |FAC006     |12      |1            |
|LOC0010    |P5        |FAC002     |45      |3            |
|LOC0010    |P5        |FAC005     |23      |2            |
|LOC0010    |P5        |FAC003     |54      |4            |
|LOC0010    |P6        |FAC006     |12      |1            |
|LOC0010    |P6        |FAC005     |23      |2            |
|LOC0010    |P6        |FAC002     |45      |3            |
|LOC0010    |P6        |FAC003     |54      |4            |
|LOC0043    |P7        |FAC004     |42      |1            |
|LOC0054    |P8        |FAC002     |24      |2            |
|LOC0054    |P8        |FAC006     |12      |1            |
|LOC0054    |P8        |FAC005     |76      |3            |
|LOC0054    |P8        |FAC007     |100     |4            |
|LOC0065    |P9        |FAC006     |32      |1            |
|LOC0065    |P9        |FAC005     |54      |2            |
|LOC0065    |P10       |FAC006     |32      |1            |
|LOC0065    |P10       |FAC005     |54      |2            |
+-----------+----------+-----------+--------+-------------+

对于每个患者,我必须分配等级最少的设备。我的输出图应如下所示

p1 ---> FAC002 (because its rank is least)
P2 ---> FAC002 (because its rank is least)

注:除fac003容量为3外,每个设施的容量仅为2
所以对于p3,p4,p5和p6输出应该是

p3 ----> FAC006 (because its rank is 1)
P4 ----> FAC006 (because its rank is 1)
p5 ----> FAC005 (bacause FAC006 has fulled its capacity of 2,and now least 
                 rank is of FAC005)
p6 ---->FAC005 (bacause FAC005 has one capacity left)
P7 ----->FAC004
h7appiyu

h7appiyu1#

请检查下面的代码。
基于病人从Dataframe中提取数据&用先前提取的数据检查条件,如果条件为假,则跳过该记录,否则将该记录与先前提取的记录合并。

scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.time {

 import spark.implicits._
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.expressions._

  val df = Seq(("LOC0001","P1","FAC003","54","2"),("LOC0001","P1","FAC002","45","1"),("LOC0001","P2","FAC003","54","2"),("LOC0001","P2","FAC002","45","1"),("LOC0010","P3","FAC006","12","1"),("LOC0010","P3","FAC003","54","4"),("LOC0010","P3","FAC005","23","2"),("LOC0010","P3","FAC002","45","3"),("LOC0010","P4","FAC002","45","3"),("LOC0010","P4","FAC005","23","2"),("LOC0010","P4","FAC003","54","4"),("LOC0010","P4","FAC006","12","1"),("LOC0010","P5","FAC006","12","1"),("LOC0010","P5","FAC002","45","3"),("LOC0010","P5","FAC005","23","2"),("LOC0010","P5","FAC003","54","4"),("LOC0010","P6","FAC006","12","1"),("LOC0010","P6","FAC005","23","2"),("LOC0010","P6","FAC002","45","3"),("LOC0010","P6","FAC003","54","4"),("LOC0043","P7","FAC004","42","1"),("LOC0054","P8","FAC002","24","2"),("LOC0054","P8","FAC006","12","1"),("LOC0054","P8","FAC005","76","3"),("LOC0054","P8","FAC007","100","4"),("LOC0065","P9","FAC006","32","1"),("LOC0065","P9","FAC005","54","2"),("LOC0065","P10","FAC006","32","1"),("LOC0065","P10","FAC005","54","2")).toDF("location_id","patient_id","facility_id","distance","rank_distance").withColumn("facility_new",first($"facility_id").over(Window.partitionBy($"patient_id").orderBy($"rank_distance".asc))).orderBy(substring($"patient_id",2,3).cast("int").asc)

  val patients = df.select("patient_id").distinct.orderBy(substring($"patient_id",2,3).cast("int").asc).collect.map(_.getAs[String](0)) // Taking all patients into collection.

  val facilities = Map("FAC004" -> 2, "FAC003" -> 3, "FAC007" -> 2, "FAC002" -> 2, "FAC006" -> 2, "FAC005" -> 2)// For checking conditions.
  case class Config(newDF: DataFrame,oldDF: DataFrame,facilities: Map[String,Int]) // Taking all facilitiy ids & max allowed values for checking codition.

  def fetchFacilityId(config:Config) = {
    config.newDF.select("facility_id").distinct.orderBy(substring($"facility_id",5,6).cast("int").asc)
      .except(config.oldDF.select("facility_new").distinct.orderBy(substring($"facility_new",5,6).cast("int").asc)).head.getAs[String](0)
  } // Getting required facility id.

  def findFacilityId(config:Config, patientId: String,index:Int):(Boolean,String,Int) = {
    val max_distance = config.oldDF.filter($"patient_id"=== patientId).select("rank_distance").orderBy($"rank_distance".desc).head.getAs[String](0).toInt
    (index < max_distance) match {
      case true => {
        val fac = config.oldDF.filter($"patient_id" === patientId  && $"rank_distance"=== index).select("facility_id").distinct.map(_.getAs[String](0)).collect.head
        val bool = config.newDF.filter($"facility_new" === lit(fac) && $"rank_distance"=== index).select("patient_id").distinct.count < config.facilities(fac)
        (bool,fac, index)
      }
      case false => (true,fetchFacilityId(config), 0)
    }
  } // finding required facility id.

  def process(config:Config,patientId:String,index:Int):DataFrame = findFacilityId(config,patientId,index) match {
    case (true,fac,_) => {
      config.newDF.union(config.oldDF.filter($"patient_id" === patientId).withColumn("facility_new",lit(fac)))
    }
    case (false,_,rank) => {
      process(config,patientId,index+1)
    }
  } // Checking duplicate facility ids 

  val config = Config(newDF= df.limit(0),df, facilities)

  val updatedDF = patients.foldLeft(config){ case ((cfg),patientId) =>
    cfg.newDF.count match {
      case 0L => cfg.copy(newDF = cfg.newDF.union(cfg.oldDF.filter($"patient_id" === patientId)))
      case _  => cfg.copy(newDF = process(cfg, patientId,1))
    }
  }.newDF.drop("facility_id").withColumnRenamed("facility_new","facility_id")

  updatedDF.show(31, false)

// Exiting paste mode, now interpreting.

+-----------+----------+--------+-------------+-----------+
|location_id|patient_id|distance|rank_distance|facility_id|
+-----------+----------+--------+-------------+-----------+
|LOC0001    |P1        |45      |1            |FAC002     |
|LOC0001    |P1        |54      |2            |FAC002     |
|LOC0001    |P2        |54      |2            |FAC002     |
|LOC0001    |P2        |45      |1            |FAC002     |
|LOC0010    |P3        |12      |1            |FAC006     |
|LOC0010    |P3        |54      |4            |FAC006     |
|LOC0010    |P3        |23      |2            |FAC006     |
|LOC0010    |P3        |45      |3            |FAC006     |
|LOC0010    |P4        |45      |3            |FAC006     |
|LOC0010    |P4        |23      |2            |FAC006     |
|LOC0010    |P4        |54      |4            |FAC006     |
|LOC0010    |P4        |12      |1            |FAC006     |
|LOC0010    |P5        |12      |1            |FAC005     |
|LOC0010    |P5        |45      |3            |FAC005     |
|LOC0010    |P5        |23      |2            |FAC005     |
|LOC0010    |P5        |54      |4            |FAC005     |
|LOC0010    |P6        |12      |1            |FAC005     |
|LOC0010    |P6        |23      |2            |FAC005     |
|LOC0010    |P6        |45      |3            |FAC005     |
|LOC0010    |P6        |54      |4            |FAC005     |
|LOC0043    |P7        |42      |1            |FAC003     |
|LOC0054    |P8        |24      |2            |FAC003     |
|LOC0054    |P8        |12      |1            |FAC003     |
|LOC0054    |P8        |76      |3            |FAC003     |
|LOC0054    |P8        |100     |4            |FAC003     |
|LOC0065    |P9        |32      |1            |FAC003     |
|LOC0065    |P9        |54      |2            |FAC003     |
|LOC0065    |P10       |32      |1            |FAC003     |
|LOC0065    |P10       |54      |2            |FAC003     |
+-----------+----------+--------+-------------+-----------+

Time taken: 31009 ms

scala>

相关问题