spark作业不在本地并行(使用本地文件系统中的parquet+avro)

hgqdbh6s  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(339)

编辑2
通过将rdd重新划分为8个分区间接地解决了这个问题。遇到了avro对象不可“java序列化”的障碍,在这里找到了一个将avro序列化委托给kryo的片段。原来的问题仍然存在。
编辑1:删除Map函数中的局部变量引用
我正在编写一个驱动程序,使用parquet和avro for io/schema在spark上运行一个计算量很大的作业。我好像不能让spark用我所有的核心。我做错什么了?是因为我把键设为空吗?
我只是想弄清楚hadoop是如何组织文件的。因为我的文件有一个千兆字节的原始数据,我应该期望看到与默认块和页面大小并行的东西。
etl my input for processing的函数如下所示:

def genForum {
    class MyWriter extends AvroParquetWriter[Topic](new Path("posts.parq"), Topic.getClassSchema) {
      override def write(t: Topic) {
        synchronized {
          super.write(t)
        }
      }
    }

    def makeTopic(x: ForumTopic): Topic = {
      // Ommited to save space
    }

    val writer = new MyWriter

    val q =
      DBCrawler.db.withSession {
        Query(ForumTopics).filter(x => x.crawlState === TopicCrawlState.Done).list()
      }

    val sz = q.size
    val c = new AtomicInteger(0)

    q.par.foreach {
      x =>
        writer.write(makeTopic(x))
        val count = c.incrementAndGet()
        print(f"\r${count.toFloat * 100 / sz}%4.2f%%")
    }
    writer.close()
  }

我的转变如下:

def sparkNLPTransformation() {
    val sc = new SparkContext("local[8]", "forumAddNlp")

    // io configuration
    val job = new Job()
    ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[Topic]])
    ParquetOutputFormat.setWriteSupportClass(job,classOf[AvroWriteSupport])
    AvroParquetOutputFormat.setSchema(job, Topic.getClassSchema)

    // configure annotator
    val props = new Properties()
    props.put("annotators", "tokenize,ssplit,pos,lemma,parse")
    val an = DAnnotator(props)

    // annotator function
    def annotatePosts(ann : DAnnotator, top : Topic) : Topic = {
      val new_p = top.getPosts.map{ x=>
        val at = new Annotation(x.getPostText.toString)
        ann.annotator.annotate(at)
        val t = at.get(classOf[SentencesAnnotation]).map(_.get(classOf[TreeAnnotation])).toList

        val r = SpecificData.get().deepCopy[Post](x.getSchema,x)
        if(t.nonEmpty) r.setTrees(t)
        r
      }
      val new_t = SpecificData.get().deepCopy[Topic](top.getSchema,top)
      new_t.setPosts(new_p)
      new_t
    }

    // transformation
    val ds = sc.newAPIHadoopFile("forum_dataset.parq", classOf[ParquetInputFormat[Topic]], classOf[Void], classOf[Topic], job.getConfiguration)
    val new_ds = ds.map(x=> ( null, annotatePosts(x._2) ) )

    new_ds.saveAsNewAPIHadoopFile("annotated_posts.parq",
      classOf[Void],
      classOf[Topic],
      classOf[ParquetOutputFormat[Topic]],
      job.getConfiguration
    )
  }
jslywgbw

jslywgbw1#

你能确认数据确实在hdfs的多个块中吗?forum\u dataset.parq文件上的总块数

相关问题