使用ApacheSpark从/到elasticsearch处理纯文本

kr98yfug  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(254)

我想利用apachespark对elasticsearch索引中的文本数据进行预处理。使用elasticsearch hadoop,我正在检索apachespark的索引。我得到一个rdd类型:rdd[(string,scala.collection.map[string,anyref])]
第一个元素看起来像:document:(string,scala.collection.map[string,anyref])=(file:document_id,Map(创建->周一1月20日11:50:35 cet 2014,修改->周五10月23日12:46:40 cest 2015,索引->周五3月25日18:05:37 cet 2016,mimetype->应用程序/pdf,内容->文档的纯文本)
现在关键的部分是使用nlp工具箱处理上述内容字段,并将结果存储在elasticsearch中。第一部分很好。我使用stanfordcorenlp在stackoverflow上发现了一个类似的问题(不幸的是spark本身没有提供这个,我无法直接从elasticsearch检索令牌)。因此,我得到了每个文档的标记rdd[seq[string]],但我不知道如何将其引入elasticsearch。
显然,我需要一个outputdd来连接文档和相关的令牌。类似于:map(“document\u id\u 1”->“tokens for id\u 1”,“document\u id\u 2”->“tokens for id\u 2”)。也许有人想提供一个如何到达那里的提示,或者有一个更好的想法来解决问题。非常感谢您的帮助。

  1. import org.apache.spark._
  2. import org.apache.spark.rdd.RDD
  3. import scala.collection.mutable.ArrayBuffer
  4. import org.elasticsearch.spark._
  5. import org.elasticsearch.spark.rdd._
  6. import edu.stanford.nlp.pipeline._
  7. import edu.stanford.nlp.ling.CoreAnnotations._
  8. import scala.collection.JavaConversions._
  9. import java.util.Properties
  10. object Stemming {
  11. def main(args: Array[String]) {
  12. val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkLemma")
  13. .set("es.nodes", "hadoop-m:9200" )
  14. .set("es.write.operation", "upsert")
  15. .set("es.mapping.id", "id")
  16. val esIndex = "elastic/documents"
  17. val sc = new SparkContext(conf)
  18. // Read data from ES
  19. val esRDD = sc.esRDD(esIndex)
  20. val id:RDD[String] = esRDD.map(_._1.toString)
  21. val content:RDD[String] = esRDD.map(_._2("content").toString)
  22. val plainText: RDD[(String, String)] = id.zip(content)
  23. val stopWords = sc.broadcast(scala.io.Source.fromFile("stopwords.txt").getLines().toSet).value
  24. def createNLPPipeline(): StanfordCoreNLP = {
  25. val props = new Properties()
  26. props.put("annotators", "tokenize, ssplit, pos, lemma")
  27. new StanfordCoreNLP(props)
  28. }
  29. def plainTextToLemmas(content: String, stopWords: Set[String], nlp: StanfordCoreNLP) : Seq[String] = {
  30. val doc = new Annotation(content)
  31. nlp.annotate(doc)
  32. val lemmas = new ArrayBuffer[String]()
  33. val sentences = doc.get(classOf[SentencesAnnotation])
  34. for (sentence <- sentences;
  35. token <- sentence.get(classOf[TokensAnnotation])) {
  36. val lemma = token.get(classOf[LemmaAnnotation])
  37. if (lemma.length > 3 && !stopWords.contains(lemma)) {
  38. if (lemmas.isEmpty) {
  39. lemmas += id += lemma.toLowerCase
  40. }
  41. else {
  42. lemmas += lemma.toLowerCase
  43. }
  44. }
  45. }
  46. lemmas
  47. }
  48. val lemmatized: RDD[Seq[String]] = plainText.mapPartitions(strings => {
  49. val nlp = createNLPPipeline()
  50. strings.map{case(id, content) => plainTextToLemmas(content, stopWords, nlp)}
  51. })
  52. def writeTokensToES(row:Seq[String]): Map[String,String] = {
  53. val tokens = row.drop(1).mkString(" ")
  54. Map("id" -> row.head, "content" -> tokens, "last-run" -> getDate())
  55. }
  56. val outputRDD = lemmatized.map(row => writeTokensToES(row))
  57. EsSpark.saveToEs(outputRDD, esIndex)
  58. sc.stop()
  59. }
  60. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题