pyspark 将选项卡字符串拆分为不同的列

dxxyhpgq  于 2023-10-15  发布在  Spark
关注(0)|答案(2)|浏览(115)

在我的代工环境中,我有一个pyspark数据集,其中只有一个名为“data”的列。
每行都有一个看起来像TSV的字符串。每一行都有一个这样的值:

ott-akamai-logs-processor   srv 2023-07-29 17:46:50.134 2023-07-29 17:46:49.358     unstruct    103b9271-777        ott node-3.13.1 ssc-2.8.2-kinesis   snowplow-enrich-kinesis-3.7.0       3.65.234.x              12345679    DE  HE  Karachi 60313   50.1188 8.6843  Malta                                                                                                                                       {"schema":"iglu:com.xxx/1-0-0","data":{"schema":"xxx/hls_manifest_requested/jsonschema/1-0-1","data":{"channel":"bildtv-broadcast","session_id":"xxx","request_id":"xxx","total_bytes":351,"referrer":"^","geo_country":"DE","geo_state":"Berlin","geo_city":"-","variant_name":"6.m3u8"}}}                                                                         snowplow-nodejs-tracker/3.13.1                                                                                                                                                      Europe/Berlin               2023-07-29 17:46:49.281         {"schema":"xxx/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-4","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemVersionMajor":"??","layoutEngineNameVersion":"Unknown ??","operatingSystemNameVersion":"Unknown ??","agentInformationEmail":"Unknown","networkType":"Unknown","webviewAppNameVersionMajor":"Unknown ??","layoutEngineNameVersionMajor":"Unknown ??","operatingSystemName":"Unknown","agentVersionMajor":"3","layoutEngineVersionMajor":"??","webviewAppName":"Unknown","deviceClass":"Unknown","agentNameVersionMajor":"Snowplow-Nodejs-Tracker 3","operatingSystemNameVersionMajor":"Unknown ??","webviewAppVersionMajor":"??","operatingSystemClass":"Unknown","webviewAppVersion":"??","layoutEngineName":"Unknown","agentName":"Snowplow-Nodejs-Tracker","agentVersion":"3.13.1","layoutEngineClass":"Unknown","agentNameVersion":"Snowplow-Nodejs-Tracker 3.13.1","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??","agentInformationUrl":"Unknown"}},{"schema":"iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0","data":{"useragentFamily":"Other","useragentMajor":null,"useragentMinor":null,"useragentPatch":null,"useragentVersion":"Other","osFamily":"Other","osMajor":null,"osMinor":null,"osPatch":null,"osPatchMinor":null,"osVersion":"Other","deviceFamily":"Other"}}]}        2023-07-29 17:46:09.938 com.axelspringer.ott    hls_manifest_requested  jsonschema  1-0-1       2023-07-29 17:46:09.938

在这里,事物被制表符分隔。对于每个制表符分隔,我想将值放入不同的列中。我怎么能这样做?

def unnamed_1(my_df):
    df = my_df

    return df
xmjla07d

xmjla07d1#

您可以使用spark中的split函数来实现这一点:https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.split.html

import pyspark.sql.functions as f

split_col = f.split(df['data'], '\t')

formatted_df = (
    df
    .withColumn('column_a', split_col.getItem(0))
    .withColumn('column_b', split_col.getItem(1))
    .withColumn('column_c', split_col.getItem(2))
    # ...
    .drop('data')
)

如果你有一个按正确顺序排列的所需列的列表,你可以通过迭代来自动化它:

columns = [
    'column_a',
    'column_b',
    'column_c',
    # ...
]

formatted_df = df
for i in range(len(columns)):
    column_name = columns[i]
    column_values = split_col.getItem(i)
    formatted_df = formatted_df.withColumn(column_name, column_values)

formatted_df = formatted_df.drop('data')

如果您接收的是格式正确的TSV数据,则可以在阅读数据时使用“解析”选项来解析该数据。

df = (
    spark.read.format("csv")
    .option("delimiter", "\t")
    .load('data.tsv')
)
brgchamk

brgchamk2#

你可以使用pandas.read_csv来实现:

import pandas as pd
from io import StringIO

string = 'ott-akamai-logs-processor   srv 2023-07-29 17:46:50.134 2023-07-29 17:46:49.358     unstruct    103b9271-777        ott node-3.13.1 ssc-2.8.2-kinesis   snowplow-enrich-kinesis-3.7.0       3.65.234.x              12345679    DE  HE  Karachi 60313   50.1188 8.6843  Malta                                                                                                                                       {"schema":"iglu:com.xxx/1-0-0","data":{"schema":"xxx/hls_manifest_requested/jsonschema/1-0-1","data":{"channel":"bildtv-broadcast","session_id":"xxx","request_id":"xxx","total_bytes":351,"referrer":"^","geo_country":"DE","geo_state":"Berlin","geo_city":"-","variant_name":"6.m3u8"}}}                                                                         snowplow-nodejs-tracker/3.13.1                                                                                                                                                      Europe/Berlin               2023-07-29 17:46:49.281         {"schema":"xxx/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-4","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemVersionMajor":"??","layoutEngineNameVersion":"Unknown ??","operatingSystemNameVersion":"Unknown ??","agentInformationEmail":"Unknown","networkType":"Unknown","webviewAppNameVersionMajor":"Unknown ??","layoutEngineNameVersionMajor":"Unknown ??","operatingSystemName":"Unknown","agentVersionMajor":"3","layoutEngineVersionMajor":"??","webviewAppName":"Unknown","deviceClass":"Unknown","agentNameVersionMajor":"Snowplow-Nodejs-Tracker 3","operatingSystemNameVersionMajor":"Unknown ??","webviewAppVersionMajor":"??","operatingSystemClass":"Unknown","webviewAppVersion":"??","layoutEngineName":"Unknown","agentName":"Snowplow-Nodejs-Tracker","agentVersion":"3.13.1","layoutEngineClass":"Unknown","agentNameVersion":"Snowplow-Nodejs-Tracker 3.13.1","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??","agentInformationUrl":"Unknown"}},{"schema":"iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0","data":{"useragentFamily":"Other","useragentMajor":null,"useragentMinor":null,"useragentPatch":null,"useragentVersion":"Other","osFamily":"Other","osMajor":null,"osMinor":null,"osPatch":null,"osPatchMinor":null,"osVersion":"Other","deviceFamily":"Other"}}]}        2023-07-29 17:46:09.938 com.axelspringer.ott    hls_manifest_requested  jsonschema  1-0-1       2023-07-29 17:46:09.938'
tsvString = StringIO(string)
df = pd.read_csv(tsvString,sep='\t')

相关问题