我使用pysparkv3.0.1(今天最新版本)来执行一些简单的流操作。我使用小批量,而不是批量流式处理。
通过查看pyspark流式处理检查器(web ui),我可以看到流程的“addbatch”步骤在小作业中产生了大部分开销,因此我决定采用“连续触发器”模式,以消除这种影响:
(图中浅黄色显示addbatch时间(>总时间的80%):https://ibb.co/7r6chgj)
注意:我的简单工作使用 forEach
流api的函数。
为了启用连续触发模式,我采取了3个步骤:
定义将负责处理foreach调用的类:
class ForeachWriter:
def __init__(self, *args):
print("initialized")
def open(self, partition_id, epoch_id=0):
return True
def process(self, row):
pass
def close(self, error):
pass
writer = ForeachWriter()
创建结构化写流:
ssc, spark = createContext(...)
lines = ssc.load()
ws=lines.writeStream
ws.foreach(writer)
打开连续触发器:
ws=ws.trigger(continuous="1 second")
现在,查询已正确生成,但由于此运行时异常,作业始终无法正常运行:
“无法从taskcontext获取批处理id”
此方法导致:https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/streaming.html#datastreamwriter.foreach
建议的解决方案
因此,我通过创建一个自定义类来终止foreach()方法,以避免触发上述异常:
from pyspark.sql.streaming import DataStreamWriter
class CustomDataStreamWriter(DataStreamWriter):
def __init__(self,dsw):
self.dsw = dsw
# .... override all other methods
def foreachBatch(self, func):
return self.dsw.foreachBatch(func)
def foreach(self, f):
from pyspark.rdd import _wrap_function
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.taskcontext import TaskContext
if callable(f):
# The provided object is a callable function that is supposed to be called on each row.
# Construct a function that takes an iterator and calls the provided function on each
# row.
def func_without_process(_, iterator):
for x in iterator:
f(x)
return iter([])
func = func_without_process
else:
# The provided object is not a callable function. Then it is expected to have a
# 'process(row)' method, and optional 'open(partition_id, epoch_id)' and
# 'close(error)' methods.
if not hasattr(f, 'process'):
raise Exception("Provided object does not have a 'process' method")
if not callable(getattr(f, 'process')):
raise Exception("Attribute 'process' in provided object is not callable")
def doesMethodExist(method_name):
exists = hasattr(f, method_name)
if exists and not callable(getattr(f, method_name)):
raise Exception(
"Attribute '%s' in provided object is not callable" % method_name)
return exists
open_exists = doesMethodExist('open')
close_exists = doesMethodExist('close')
def func_with_open_process_close(partition_id, iterator):
epoch_id = TaskContext.get().getLocalProperty('streaming.sql.batchId')
if epoch_id:
epoch_id = int(epoch_id)
else:
epoch_id = 0
# raise Exception("Could not get batch id from TaskContext")
# Check if the data should be processed
should_process = True
if open_exists:
should_process = f.open(partition_id, epoch_id)
error = None
try:
if should_process:
for x in iterator:
f.process(x)
except Exception as ex:
error = ex
finally:
if close_exists:
f.close(error)
if error:
raise error
return iter([])
func = func_with_open_process_close
serializer = AutoBatchedSerializer(PickleSerializer())
dsw = self.dsw
wrapped_func = _wrap_function(dsw._spark._sc, func, serializer, serializer)
jForeachWriter = \
dsw._spark._sc._jvm.org.apache.spark.sql.execution.python.PythonForeachWriter(
wrapped_func, dsw._df._jdf.schema())
dsw._jwrite.foreach(jForeachWriter)
return dsw
然后我就在上面第2步之后,第3步之前使用了:
ws=CustomDataStreamWriter(ws)
我已经重演了剧本。这次在spark中正确地加载和初始化了脚本,我在pysparkwebui中看到了一个永无止境的作业(这应该是连续触发模式的预期结果)。
但是,foreach处理器的open/process/close回调永远不会被调用(触发),所以我想知道是否有其他人找到了一个很好的解决方法,或者我是否必须等待spark的下一个版本来解决这个问题?
您可以想象,这样的解决方法将极大地提高我的应用程序的性能,因此非常感谢您的帮助。
暂无答案!
目前还没有任何答案,快来回答吧!