pyspark regexp\u replace不能像预期的那样用于以下模式

4zcjmb1e  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(408)

我正在使用spark流来使用一个主题并对数据进行转换。其中是一个正则表达式的替代品。这个 regexp_replace 函数来自 pyspark.sql.functions 不是替换以下模式(我之前使用regex101.com测试过它, re 来自python等): df.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)(\[[A-Z,a-z,0-9]+\])',r'$1')) 这是一段记录:

  1. {someVersion=8.3.2-hmg-dev, someUnitName=IB, someMessage=Test. [BL056], someOrigin=MOBILE, someStatus=TEST, duration=3500,

这就是正则表达式模式的“目标”:  someMessage=Test. [BL056] 它应该匹配整个目标并分为两组,并用单独匹配的第一组(如 r'$1' ).
这些模式也不起作用:
df.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)','')) df.withColumn('value', f.regexp_replace('value', '(\[[A-Z,a-z,0-9]+\])','')) 这起作用了: df.withColumn('value', f.regexp_replace('value', 'someMessage=Test. [BL056]','')) 为什么会这样?spark regex引擎有什么特别之处吗?我要做的事情的正确模式是什么?
示例和整个脚本如下所示:
这是“值”列的示例值:

  1. {someVersion=8.3.2-hmg-dev, someUnitName=IB, someMessage=Test. [BL056], someOrigin=MOBILE, someStatus=TEST, duration=3500, someNumber=9872329, someAppOrigin=APP_PADRAO, someId=c3ASAUSQTiWvl_YA9DYpDV:APA91bGfVcLNNGL20hfmaDDS0D8TuzJDuCjj4tgbRNcJcYASIBRVEE2FnA4exnE4ZWTuupRX7FQkdcJiMWkNEatk8lktkFcpR7P7mehb4r_SVnabIabGInjagGZ6pGyweDkxW2JUGK8g, someType=00001, someOriginOpen=null, someOS=null, eventSubType=TESTLOGON, someToken=, ip=error, somePair=0.4220043,-1.084015, eventType=SUCESSO, someMag=aWg4V01qSxDMjAvWmlEWGJ6aExnc2nZJbWZVPQ==, macAddress=33d94a3f7d2f8aff, someJSON=\{"ip":"error","hostname":null,"type":null,"concode":null,"continent":null,"country":null,"country_name":null,"code":null,"name":null,"city":null,"zip":null,"latitude":null,"longitude":null,"anotherJSON":{"id":null,"capital":null,"languages":null,"flag":null,"flag_emoji":null,"flag_emoji_unicode":null,"calling_code":null,"is_eu":null},"time_zone":\{"id":null,"current_time":null,"gmt_offset":null,"code":null,"is_daylight_saving":null},"currency":\{"code":null,"name":null,"plural":null,"symbol":null,"symbol_native":null},"connection":\{"asn":null,"isp":null},"security":\{"is_proxy":null,"proxy_type":null,"is_crawler":null,"crawler_name":null,"crawler_type":null,"is_tor":null,"threat_level":null,"threat_types":null}}, organization=IBPF, codigoCliente=440149, device=Android SDK built for x86, eventDate=6/1/20 4:03 PM}

全部代码如下: 

  1. import re
  2. import json
  3. import pyhocon
  4. import fastavro
  5. import requests
  6. from io import BytesIO
  7. from pyspark.sql import SparkSession
  8. from pyspark.sql import functions as f
  9. spark = SparkSession.builder.getOrCreate()
  10. def decode(msg, schema):
  11.     bytes_io = BytesIO(msg)
  12.     bytes_io.seek(5)
  13.     msg = fastavro.schemaless_reader(bytes_io, schema)
  14.     return msg
  15. def parse(msg):
  16.     conf = pyhocon.ConfigParser.parse(msg)
  17.     msg_converter = pyhocon.tool.HOCONConverter.to_json(conf)
  18.     msg = json.loads(msg_converter)
  19.     return msg
  20. def get_schema(registry_url,topic):
  21.     URL = f'\{registry_url}/subjects/\{topic}/versions/latest'
  22.     response = requests.get(url=URL, verify=False)
  23.     subject = response.json()
  24.     schema_id = subject['id']
  25.     schema = json.loads(subject['schema'])
  26.     return [schema_id, schema]
  27. schema_id, schema = get_schema(registry_url=SCHEMA_REGISTRY,topic=SUBSCRIBE_TOPIC)
  28. spark.udf.register('decode',lambda value: decode(value,schema))
  29. spark.udf.register('parse',parse)
  30. spark.readStream \
  31. .format('kafka') \
  32. .option('subscribe', SUBSCRIBE_TOPIC) \
  33. .option('startingOffsets', 'earliest') \
  34. .option('kafka.bootstrap.servers', HOST) \
  35. .option('kafka.security.protocol', 'SSL') \
  36. .option('kafka.ssl.key.password', KEYSTORE_PASSWORD) \
  37. .option('kafka.ssl.keystore.location', KEYSTORE_PATH) \
  38. .option('kafka.ssl.truststore.location', KEYSTORE_PATH) \
  39. .option('kafka.ssl.keystore.password', KEYSTORE_PASSWORD) \
  40. .option('kafka.ssl.truststore.password', KEYSTORE_PASSWORD) \
  41. .load() \
  42. .selectExpr(f'decode(value) as value') \
  43. .withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)(\[[A-Z,a-z,1-9]+\])','$1'))\
  44. .writeStream \
  45. .format('console') \
  46. .option('truncate', 'false') \
  47. .start()
  48. ``` 
8fsztsew

8fsztsew1#

iiuc公司,
如果只需要输出,请使用regexp\u extract,如果要替换它,请使用regexp replace
我的工作正则表达式是:

  1. df.select(regexp_extract('value','someMessage=\w+\.\ \[\w+\]',0)).show(2,False)
  2. # and
  3. df.select(regexp_extract('value','someMessage=(.*)]',0)).show(2,False)
  4. +-------------------------------------------+
  5. |val |
  6. +-------------------------------------------+
  7. |someMessage=Test. [BL056] |
  8. |someMessage=Test. [BL056] |
  9. +-------------------------------------------+
  10. And if you want to replace use this
  11. df.select(regexp_replace('value','someMessage=(.*)]',''))
展开查看全部

相关问题