pyspark转换pipelinedrdd到sparkDataframe

aiqt4smr  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(877)

我使用的是spark2.3.1,我在spark中执行nlp,当我打印rdd类型时,它会显示<class'pyspark.rdd.pipelinedrdd'> rdd.collect() 它的输出是
本发明的实施例包括配对两个无线设备放置至少一个两设备配对模式执行至少一个配对运动事件至少一个无线设备满足至少一个配对条件检测满足至少一个配对条件配对两个无线设备响应检测满足至少一个配对提供条件',本发明涉及无线通信系统,具体地说,本发明涉及发送控制信息的方法pucch无线通信系统装置,其包括获得多个第二调制符号流对应的多个scfdma单载波频分复用符号扩散的步骤多个第一调制符号流在第一时隙内形成与scfdma符号相对应的第一调制符号流,获得多个复数符号流,执行dft离散傅立叶变换预编码处理,多个第二调制符号流发送多个复数符号流pucch,其中多个第二调制符号流调制符号流加扰scfdma符号级dog church aardwolf abacus']
我想创建一个这样的数据框,将每个单词添加到数据框的行中。

+--------------+
|    text      |
+--------------+
|  embodiment  |
|  present     |
|  invention   |
....
....
|  aardwolf    |
|  abacus      |
+--------------+

这是我的密码

import pyspark
import nltk
import string

from pyspark import SparkContext
from nltk.stem import WordNetLemmatizer

from pyspark.ml.feature import NGram
from pyspark.sql.types import ArrayType,StructType,StructField,StringType

from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName('Spark Example').getOrCreate()

Source_path="Folder_of_multiple_text_file"

data=sc.textFile(Source_path)

lower_casetext = data.map(lambda x:x.lower())

# splitting_rdd = lower_casetext.map(lambda x:x.split(" "))

# print(splitting_rdd.collect())

# Function to perform Sentence tokeniaztion

def sent_TokenizeFunct(x):
    return nltk.sent_tokenize(x)

sentencetokenization_rdd = lower_casetext.map(sent_TokenizeFunct)

# Function to perform Word tokenization

def word_TokenizeFunct(x):
    splitted = [word for line in x for word in line.split()]
    return splitted

wordtokenization_rdd = sentencetokenization_rdd.map(word_TokenizeFunct)

# Remove Stop Words

def removeStopWordsFunct(x):
    from nltk.corpus import stopwords
    stop_words=set(stopwords.words('english'))
    filteredSentence = [w for w in x if not w in stop_words]
    return filteredSentence
stopwordRDD = wordtokenization_rdd.map(removeStopWordsFunct)

# Remove Punctuation marks

def removePunctuationsFunct(x):
    list_punct=list(string.punctuation)
    filtered = [''.join(c for c in s if c not in list_punct) for s in x] 
    filtered_space = [s for s in filtered if s] #remove empty space 
    return filtered
rmvPunctRDD = stopwordRDD.map(removePunctuationsFunct)

# Perform Lemmatization

def lemma(x):

    lemmatizer = WordNetLemmatizer()

    final_rdd = [lemmatizer.lemmatize(s) for s in x]
    return final_rdd

lem_wordsRDD = rmvPunctRDD.map(lemma)

# Join tokens

def joinTokensFunct(x):
    joinedTokens_list = []
    x = " ".join(x)
    return x

joinedTokensRDD = lem_wordsRDD.map(joinTokensFunct)

print(joinedTokensRDD.collect())
print(type(joinedTokensRDD))
t98cgbkg

t98cgbkg1#

我已经通过删除join tokens步骤更改了代码,并通过以下代码将lem\u wordsrdd直接转换为Dataframe。

df = lem_wordsRDD.map(lambda x: (x, )).toDF(["features"])

explod_df = df.withColumn("values", explode("features"))

tokenized_df = explod_df.select("values")
tokenized_df.show()
o4tp2gmn

o4tp2gmn2#

类似这样,但相应地进行调整:

data = [('Category A', 100, "This is category A"),
        ('Category B', 120, "This is category B"),
        ('Category C', 150, "This is category C")]

rdd = spark.sparkContext.parallelize(data)
rdd.collect

# generate a pipelined RDD with some dummy logic

rdd = rdd.filter(lambda x: x[2] == x[2])

from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

schema = StructType([
    StructField('Category', StringType(), True),
    StructField('Count', IntegerType(), True),
    StructField('Description', StringType(), True)
])

df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

相关问题