嗨,我正在尝试读取apachebeam上的一个Parquet文件,然后应用一个预处理函数,最后将数据集拆分为验证和训练。到目前为止,管道在数据集拆分之前工作。
def beam_transformation(working_dir, eval_percent=20.0):
'''
METADATA
'''
RAW_DATA_FEATURE_SPEC = dict(
[(name, tf.io.FixedLenFeature([], tf.string)) for name in CATEGORICAL_FEATURE_KEYS] +
[(name, tf.io.VarLenFeature(tf.string)) for name in OTHER_CATEGORICAL_FEATURE_KEYS] +
[(name, tf.io.FixedLenFeature([], tf.int64)) for name in LABEL_KEY]
)
RAW_DATA_METADATA = tft.tf_metadata.dataset_metadata.DatasetMetadata(
tft.tf_metadata.dataset_schema.schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC))
tf_transform_output = tft.TFTransformOutput(working_dir)
'''
Beam TensorFlow Transform Pipeline
'''
with beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=working_dir):
# Read data
raw_data = pipeline | 'ReadTrainData' >> beam.io.ReadFromParquet(f'{working_dir}/dataset.parquet')
# Transform data
transformed_dataset_and_metadata, transform_fn = (
raw_data, RAW_DATA_METADATA) | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)
transformed_data, transformed_metadata = transformed_dataset_and_metadata
# Split the dataset into a training set and an evaluation set
assert 0 < eval_percent < 100, 'eval_percent must in the range (0-100)'
train_dataset, eval_dataset = (
transformed_data
| 'Split dataset' >> beam.Partition(
lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2))
我以这个网站为例:https://github.com/googlecloudplatform/cloudml-samples/blob/master/molecules/preprocess.py
问题似乎是因为在拆分数据之前必须先收集数据。我收到的错误如下:
异常:似乎您正试图从广播变量、操作或转换引用sparkcontext。sparkcontext只能在驱动程序上使用,不能在工作程序上运行的代码中使用。有关更多信息,请参阅spark-5063。
暂无答案!
目前还没有任何答案,快来回答吧!