pandas 跨多个请求存储大型 Dataframe 的Flask

uinbv5nw  于 2023-01-19  发布在  其他
关注(0)|答案(3)|浏览(129)

我有一个使用大型DataFrame的Flask Web应用程序(数百兆). DataFrame在应用中用于几个不同的机器学习模型.我想在应用中只创建一次DataFrame,并跨多个请求使用它,以便用户可以基于相同的数据构建不同的模型. Flask会话不是为大数据构建的,所以这不是一个选项。我不想回去重新创建数据框,以防数据源是csv文件(讨厌)。be
我有一个可行的解决方案,但是我在堆栈溢出中找不到任何关于这个解决方案的讨论。这使我怀疑我的解决方案可能不是一个好的设计思想。我一直使用这样的假设:在软件开发中,一条被充分利用的道路是一条被很好地选择的道路。
我的解决方案是创建一个带有一个类变量的数据保持器类:

class DataHolder:
     dataFrameHolder = None

现在,dataFrameHolder在所有类示例中都是已知的(就像Java中的静态变量一样),因为它存储在服务器的内存中。
现在我可以创建DataFrame一次,将其放入DataHolder类中:

import pandas as pd
from dataholder import DataHolder

result_set = pd.read_sql_query(some_SQL, connection)
df = pd.DataFrame(result_set, columns=['col1', 'col2',....]
DataHolder.dataFrameHolder = df

然后从导入DataHolder类的任何代码访问该DataFrame,然后我可以在应用程序的任何地方使用存储的DataFrame,包括跨不同的请求:

.
.
modelDataFrame = DataHolder.dataFrameHolder
do_some_model(modelDataFrame)
.
.

这是一个坏主意,一个好主意,还是有什么我不知道的东西已经解决了问题?

qxgroojn

qxgroojn1#

可以使用Redis。我的用例是较小的 Dataframe ,所以没有测试过较大的 Dataframe 。这允许我向多个浏览器客户端提供3秒的滴答声数据。pyarrow序列化/反序列化性能良好。在本地和AWS/GCloud和Azure上工作

GET路由

@app.route('/cacheget/<path:key>', methods=['GET'])
def cacheget(key):
    c = mycache()
    data = c.redis().get(key)
    resp = Response(BytesIO(data), mimetype="application/octet-stream", direct_passthrough=True)
    resp.headers["key"] = key
    resp.headers["type"] = c.redis().get(f"{key}.type")
    resp.headers["size"] = sys.getsizeof(data)
    resp.headers["redissize"] = sys.getsizeof(c.redis().get(key))
    return resp

将 Dataframe 放入缓存的示例路由

@app.route('/sensor_data', methods=['POST'])
def sensor_data() -> str:
    c = mycache()
    dfsensor = c.get("dfsensor")
    newsensor = json_normalize(request.get_json())
    newsensor[["x","y"]] = newsensor[["epoch", "value"]]
    newsensor["xy"] = newsensor[['x', 'y']].agg(pd.Series.to_dict, axis=1)
    newsensor["amin"] = newsensor["value"]
    newsensor["amax"] = newsensor["value"]
    newsensor = newsensor.drop(columns=["x","y"])

    # add new data from serial interface to start of list (append old data to new data).
    # default time as now to new data
    dfsensor = newsensor.append(dfsensor, sort=False)
    # keep size down - only last 500 observations
    c.set("dfsensor", dfsensor[:500])
    del dfsensor

    return jsonify(result={"status":"ok"})

实用程序类

import pandas as pd
import pyarrow as pa, os
import redis,json, os, pickle
import ebutils
from logenv import logenv
from pandas.core.frame import DataFrame
from redis.client import Redis
from typing import (Union, Optional)

class mycache():
    __redisClient:Redis
    CONFIGKEY = "cacheconfig"

    def __init__(self) -> None:
        try:
            ep = os.environ["REDIS_HOST"]
        except KeyError:
            if os.environ["HOST_ENV"] == "GCLOUD":
                os.environ["REDIS_HOST"] = "redis://10.0.0.3"
            elif os.environ["HOST_ENV"] == "EB":
                os.environ["REDIS_HOST"] = "redis://" + ebutils.get_redis_endpoint()
            elif os.environ["HOST_ENV"] == "AZURE":
                #os.environ["REDIS_HOST"] = "redis://ignore:password@redis-sensorvenv.redis.cache.windows.net"
                pass # should be set in azure env variable
            elif os.environ["HOST_ENV"] == "LOCAL":
                os.environ["REDIS_HOST"] = "redis://127.0.0.1"
            else:
                raise "could not initialise redis"
                return # no known redis setup

        #self.__redisClient = redis.Redis(host=os.environ["REDIS_HOST"])
        self.__redisClient = redis.Redis.from_url(os.environ["REDIS_HOST"])
        self.__redisClient.ping()
        # get config as well...
        self.config = self.get(self.CONFIGKEY)
        if self.config is None:
            self.config = {"pyarrow":True, "pickle":False}
            self.set(self.CONFIGKEY, self.config)
        self.alog = logenv.alog()

    def redis(self) -> Redis:
        return self.__redisClient

    def exists(self, key:str) -> bool:
        if self.__redisClient is None:
            return False

        return self.__redisClient.exists(key) == 1

    def get(self, key:str) -> Union[DataFrame, str]:
        keytype = "{k}.type".format(k=key)
        valuetype = self.__redisClient.get(keytype)
        if valuetype is None:
            if (key.split(".")[-1] == "pickle"):
                return pickle.loads(self.redis().get(key))
            else:
                ret = self.redis().get(key)
                if ret is None:
                    return ret
                else:
                    return ret.decode()
        elif valuetype.decode() == str(pd.DataFrame):
            # fallback to pickle serialized form if pyarrow fails
            # https://issues.apache.org/jira/browse/ARROW-7961
            try:
                return pa.deserialize(self.__redisClient.get(key))
            except pa.lib.ArrowIOError as err:
                self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                return pickle.loads(self.redis().get(f"{key}.pickle"))
            except OSError as err:
                if "Expected IPC" in str(err):
                    self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                    return pickle.loads(self.redis().get(f"{key}.pickle"))
                else:
                    raise err

        elif valuetype.decode() == str(type({})):
            return json.loads(self.__redisClient.get(key).decode())
        else:
            return self.__redisClient.get(key).decode() # type: ignore

    def set(self, key:str, value:Union[DataFrame, str]) -> None:
        if self.__redisClient is None:
            return
        keytype = "{k}.type".format(k=key)

        if str(type(value)) == str(pd.DataFrame):
            self.__redisClient.set(key, pa.serialize(value).to_buffer().to_pybytes())
            if self.config["pickle"]:
                self.redis().set(f"{key}.pickle", pickle.dumps(value))
                # issue should be transient through an upgrade....
                # once switched off data can go away
                self.redis().expire(f"{key}.pickle", 60*60*24)
        elif str(type(value)) == str(type({})):
            self.__redisClient.set(key, json.dumps(value))
        else:
            self.__redisClient.set(key, value)

        self.__redisClient.set(keytype, str(type(value)))

if __name__ == '__main__':
    os.environ["HOST_ENV"] = "LOCAL"
    r = mycache()
    rr = r.redis()
    for k in rr.keys("cache*"):
        print(k.decode(), rr.ttl(k))
        print(rr.get(k.decode()))
k4ymrczo

k4ymrczo2#

我也遇到过类似的问题,当我导入CSV(数百MB)并为每个请求动态创建DataFrame时,正如您所说,这很糟糕!我还尝试了REDIS方式来缓存它,这提高了性能一段时间,直到我意识到对底层数据进行更改也意味着更新该高速缓存。
然后我发现了CSV之外的世界,以及更高性能的文件格式,如Pickle,FeatherParquet等。您可以阅读更多关于here的信息。您可以导入/导出所有您想要的CSV,但使用中间格式进行处理。
不过我确实遇到了一些问题。我读到Pickle有安全问题,尽管我仍然在使用它。Feather不允许我在数据中写入一些object类型,它需要它们categorized。您的里程可能会有所不同,但如果您有良好的干净数据,请使用Feather。
最近,我发现我使用Datatable而不是Pandas来管理大型数据,并将它们存储在Jay中,以获得更好的performance读/写性能。
不过这并不意味着将使用Pandas的代码重写到DataTable中,但我相信API非常similar。由于代码库非常大,我自己还没有这样做,但您可以给予一下。

00jrzges

00jrzges3#

我不知道我是否有一个答案,但我有同样的问题,你和我用的答案在下面,它为我工作:
How do I load a file on initialization in a flask application

相关问题