如何使用预训练的Keras模型使用分布式Dask进行模型预测?

soat7uwm  于 2023-05-18  发布在  其他
关注(0)|答案(2)|浏览(164)

我正在加载预先训练好的keras模型,然后尝试使用dask并行化大量输入数据?不幸的是,我遇到了一些与如何创建dask数组有关的问题。任何指导都将不胜感激!
设置:
首先,我从这个仓库克隆https://github.com/sanchit2843/dlworkshop.git
可复制代码示例:

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.model_selection import train_test_split
from keras.models import load_model
import keras
from keras.models import Sequential
from keras.layers import Dense
from dask.distributed import Client
import warnings
import dask.array as DaskArray
warnings.filterwarnings('ignore')

dataset = pd.read_csv('data/train.csv')
X = dataset.drop(['price_range'], axis=1).values
y = dataset[['price_range']].values

# scale data
sc = StandardScaler()
X = sc.fit_transform(X)
ohe = OneHotEncoder()
y = ohe.fit_transform(y).toarray()

X_train,X_test,y_train,y_test = train_test_split(X,y,test_size = 0.2)

# Neural network
model = Sequential()
model.add(Dense(16, input_dim=20, activation="relu"))
model.add(Dense(12, activation="relu"))
model.add(Dense(4, activation="softmax"))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=100, batch_size=64)

# Use dask
client = Client()
def load_and_predict(input_data_chunk):

    def contrastive_loss(y_true, y_pred):
        margin = 1
        square_pred = K.square(y_pred)
        margin_square = K.square(K.maximum(margin - y_pred, 0))
        return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

    mlflow.set_tracking_uri('<uri>')
    mlflow.set_experiment('clean_parties_ml')
    runs = mlflow.search_runs()
    artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri']
    model = mlflow.keras.load_model(artifact_uri + '/model', custom_objects={'contrastive_loss': contrastive_loss})
    y_pred = model.predict(input_data_chunk)
    return y_pred

da_input_data = da.from_array(X_test, chunks=(100, None))
prediction_results = da_input_data.map_blocks(load_and_predict, dtype=X_test.dtype).compute()

我收到的错误:

AttributeError: '_thread._local' object has no attribute 'value'
bejyjqdl

bejyjqdl1#

所以我认为代码是最好的。我最终使用Dask中的延迟接口来分发VGG16模型,并修改层以从图像中提取特征。是的,我有这个工作与Dask分布。有点像这样:

def calculate_features(model, image):
    frame = np.frombuffer(image, dtype=np.uint8)
    frame = frame.reshape(1, 224, 224, 3)
    frame = preprocess_input(frame)
    features = model.predict(frame, verbose=0)
    return features.tobytes()

@dask.delayed
def build_vgg16_model():
    vgg16 = VGG16()
    vgg16 = Model(inputs=vgg16.input, outputs=vgg16.layers[-2].output)
    return vgg16

@dask.delayed
def get_features(model, df):
    rows = []
    for row in df.itertuples():
        features = calculate_features(model, row.image)
        
        rows.append([features])
        
    return pd.DataFrame(rows, columns=['features'])

dfi = dd.read_parquet(
    's3://bucket/images',
    engine='pyarrow',
    storage_options=minio_options
)

dfid = dfi.to_delayed(optimize_graph=True)

model = build_vgg16_model()
features = [get_features(model, df) for df in dfid]

dff = dd.from_delayed(features, verify_meta=False, meta={
    'features': 'bytes'
})

很有趣。该模型最终在一台机器上完成,整个CPU仅限于该机器。Boo...我认为这个方法只在Dask图中加载一次模型,并将其发送给第一个需要它的工人。我认为map_blocks或map_partitions会试图将模型分布到节点上的每个分区。但是,这可能是非常低效的,并导致大量的网络流量和内存使用。(VGG16型号有点大)
我可以控制像这样的小把戏…

def partition(arr, chunks):
    return [arr[i::chunks] for i in range(chunks)]

features = []
for chunk in partition(dfid, 16):
    model = build_vgg16_model()
    features.extend([get_features(model, df) for df in chunk])

然后在功能上调用dd.from_delayed。现在,这将限制模型在Dask图中仅加载16次。我确实看到使用这种方法可以显著地分配更多的CPU负载,而不会增加网络开销。
我还没有进一步研究过这个问题,但我希望我可以加载模型X次,次数等于我拥有的工人数量,确保每个工人都得到一个模型,然后在分区中开始流式传输。我会更新我的帖子,因为我的调查还在继续...

wlzqhblo

wlzqhblo2#

Keras/Tensorflow不能很好地与其他线程系统配合使用。这里有一个关于这个主题的持续问题:https://github.com/dask/dask-examples/issues/35

相关问题