spark作业来聚合cassandra数据

mitkmikd  于 2021-06-15  发布在  Cassandra
关注(0)|答案(0)|浏览(460)

我是新来的。我在Cassandra有下表:

CREATE TABLE cust_actions (
   orgid text,
   empid int,
   custid int,
   date timestamp,
   action text
   PRIMARY KEY (orgid, empid, custid, date)
) WITH CLUSTERING ORDER BY (empid ASC, custid ASC, date DESC)

此表包含员工对客户采取的每个操作的数据。这张table每天有1000多万张插页。我有一个3节点的cassandra集群,运行在18个核心机器上,每个机器有32g内存。
我要汇总每天的数据,即在特定的一天对一个客户采取了多少操作。为此,我创建了另一个表:

CREATE TABLE daily_cust_actions (
    custid int,
    date date,
    action text,
    count int,
    PRIMARY KEY (custid, date, action)
) WITH CLUSTERING ORDER BY (date ASC, action ASC)

为了做到这一点,我想使用Spark(请建议,如果这是错误的,或有一些其他替代品以及)。我在其中一台cassandra机器上运行spark(如上所述),其中一台主机和一台从机有9个执行器,每个执行器有1g ram和2个内核。
table尺寸约为70克。我无法汇总这些数据。不过,对于较小的数据集来说,这很好。这是我的星火剧本:

object DailyAggregation {

  def main(args: Array[String]): Unit = {

  val conf = new SparkConf(true).set("spark.cassandra.connection.host", "host1,host2,host3")
    .set("spark.cassandra.auth.username", "cassandra")            
    .set("spark.cassandra.auth.password", "cassandra")            
    .set("spark.cassandra.input.split.size_in_mb", "10") //have tried multiple options here

val sc = new SparkContext("spark://host", "spark-cassandra", conf)

val rdd = sc.cassandraTable("mykeyspace","cust_actions")

val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")

val df = new SimpleDateFormat("yyyy-MM-dd")

val startDate = df.parse("2018-08-13")
val endDate = df.parse("2018-09-14")

sc.parallelize(
  rdd.select("custid", "date", "action")
    .where("date >= ? and date < ?", startDate, endDate)
    .keyBy(row => (
      row.getInt("custid"),
      df.format(row.getLong("date")),
      row.getString("action"))).map { case (key, value) => (key, 1) }
    .reduceByKey(_ + _).collect()
    .map { case (key, value) => (key._1, key._2, key._3, value) })
  .saveToCassandra("mykeyspace", "daily_cust_actions")

sc.stop()
 }
}

我试过不同的方法,增加/减少记忆/执行者,增加/减少 spark.cassandra.input.split.size_in_mb 评估和调整一些spark环境变量。但每次我都会犯不同的错误。它分为两个阶段,第一阶段运行平稳,第二阶段运行失败。
我见过很多不同的错误。目前我得到以下错误: 2018-09-15 16:36:05 INFO TaskSetManager:54 - Task 158.1 in stage 1.1 (TID 1293) failed, but the task will not be re-executed (either because t he task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded). 2018-09-15 16:36:05 WARN TaskSetManager:66 - Lost task 131.1 in stage 1.1 (TID 1286, 127.0.0.1, executor 18): FetchFailed(null, shuffleId=0, m apId=-1, reduceId=131, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 任何帮助都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题