为sparkrdd的内容添加标签

wlwcrazw  于 2021-06-10  发布在  Hbase
关注(0)|答案(0)|浏览(205)

我们有出版物的数据,我们想把它们分为不同的类别。我们把它们储存在 HBase 从那里我们把他们拯救到 sparkrdd 然后用 scala 代码。hbase数据的示例如下所示:

PubEntity:Abstract                  timestamp=1476537886382, value=not found                                                                  
 PubEntity:Affiliations              timestamp=1476537886382, value=[]                                                                         
 PubEntity:Article_Title             timestamp=1476537886382, value=Formate assay in body fluids: application in methanol poisoning.           
 PubEntity:Author                    timestamp=1476537886382, value=[{'LastName': 'Makar', 'ForeName': 'A B', 'author_name': 'A B Makar', 'Init
                                     ials': 'AB', 'author_affiliation': 'not found'}, {'LastName': 'McMartin', 'ForeName': 'K E', 'author_name'
                                     : 'K E McMartin', 'Initials': 'KE', 'author_affiliation': 'not found'}, {'LastName': 'Palese', 'ForeName':
                                      'M', 'author_name': 'M Palese', 'Initials': 'M', 'author_affiliation': 'not found'}, {'LastName': 'Tephly
                                     ', 'ForeName': 'T R', 'author_name': 'T R Tephly', 'Initials': 'TR', 'author_affiliation': 'not found'}]  
 PubEntity:Journal_Title             timestamp=1476537886382, value=Biochemical medicine                                                       
 PubEntity:PMID                      timestamp=1476537886382, value=1                                                                          
 PubRemaining:Countries              timestamp=1476537886382, value=[]                                                                         
 PubRemaining:Created_At             timestamp=1476537886382, value=170812800.0                                                                
 PubRemaining:DOI                    timestamp=1476537886382, value=not found                                                                  
 PubRemaining:Date_Created           timestamp=1476537886382, value=19760116                                                                   
 PubRemaining:ISO_Abbreviation       timestamp=1476537886382, value=Biochem Med                                                                
 PubRemaining:ISSN                   timestamp=1476537886382, value=0006-2944                                                                  
 PubRemaining:Pub_Date               timestamp=1476537886382, value=01 Jun, 1975                                                               
 PubRemaining:Year                   timestamp=1476537886382, value=1975

我们已经成功地 RDD 如第一个答案中所述
如何使用spark读取hbase的代码如下:

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]").set("spark.driver.allowMultipleContexts", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    val tableName = "name_of_the_database"

    System.setProperty("user.name", "hdfs")
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    conf.set("hbase.master", "localhost:60000")
    conf.setInt("timeout", 120000)
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set(TableInputFormat.INPUT_TABLE, tableName) 

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println(" Number of Records found : " + hBaseRDD.count())
    println(hBaseRDD.first())
    sc.stop()
  }
}

在scala中运行此代码块后,我得到: defined module HBaseRead 然后我执行了hbaseread.main(array()),它输出找到的记录数,并将第一条记录读取为:
(31,键值={1/pubentity:abstract/1476537886382/put/vlen=9/mvcc=0, 1/entity:affiliations/1476537886382/put/vlen=2/mvcc=0, 1/entity:article_title/1476537886382/put/vlen=64/mvcc=0,1/entity:author/1476537886382/put/vlen=497/mvcc=0, 1/entity:journal_title/1476537886382/put/vlen=20/mvcc=0,1/entity:pmid/1476537886382/put/vlen=1/mvcc=0, 1/remaining:countries/1476537886382/put/vlen=2/mvcc=0, 1/remaining:created_at/1476537886382/put/vlen=11/mvcc=0,1/remaining:doi/1476537886382/put/vlen=9/mvcc=0, 1/remaining:date_created/1476537886382/put/vlen=8/mvcc=0,1/remaining:iso_abbreviation/1476537886382/put/vlen=11/mvcc=0,1/remaining:issn/1476537886382/put/vlen=9/mvcc=0, 1/remaining:pub_date/1476537886382/put/vlen=12/mvcc=0,1/remaining:year/1476537886382/put/vlen=4/mvcc=0})
现在在这个输出中,您将看到vlen=12/mvcc=0。在检查数据时,我发现vlen是每个单词/数字的长度。我不知道mvcc是什么意思。我们希望输出显示单词/数字,而不是vlen=4。此外,我们希望阅读这些条目,找到其中的某些单词和短语,并相应地标记它们。全都在斯卡拉。
任何链接到任何有用的在线资源在这方面将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题