postgresql Flask Rest API SQL Alchemy连接云SQL Postgresq

2guxujil  于 2023-02-18  发布在  PostgreSQL
关注(0)|答案(1)|浏览(144)

我的Flask Rest API应用程序中的Cloud Sql Postgres出现连接问题。我有一个db.py文件:

import os
from flask_sqlalchemy import SQLAlchemy

import sqlalchemy

db = SQLAlchemy()

def connect_unix_socket() -> sqlalchemy.engine.base.Engine:
""" Initializes a Unix socket connection pool for a Cloud SQL instance of Postgres. """
# Note: Saving credentials in environment variables is convenient, but not
# secure - consider a more secure solution such as
# Cloud Secret Manager (https://cloud.google.com/secret-manager) to help
# keep secrets safe.
db_user = os.environ["DB_USER"]  # e.g. 'my-database-user'
db_pass = os.environ["DB_PASS"]  # e.g. 'my-database-password'
db_name = os.environ["DB_NAME"]  # e.g. 'my-database'
unix_socket_path = os.environ["INSTANCE_UNIX_SOCKET"]  # e.g. '/cloudsql/project:region:instance'

pool = sqlalchemy.create_engine(
    # Equivalent URL:
    # postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
    #                         ?unix_sock=<INSTANCE_UNIX_SOCKET>/.s.PGSQL.5432
    # Note: Some drivers require the `unix_sock` query parameter to use a different key.
    # For example, 'psycopg2' uses the path set to `host` in order to connect successfully.
    sqlalchemy.engine.url.URL.create(
        drivername="postgresql+pg8000",
        username=db_user,
        password=db_pass,
        database=db_name,
        query={"unix_sock": "{}/.s.PGSQL.5432".format(unix_socket_path)},
    ),
    # [START_EXCLUDE]
    # Pool size is the maximum number of permanent connections to keep.
    pool_size=5,

    # Temporarily exceeds the set pool_size if no connections are available.
    max_overflow=2,

    # The total number of concurrent connections for your application will be
    # a total of pool_size and max_overflow.

    # 'pool_timeout' is the maximum number of seconds to wait when retrieving a
    # new connection from the pool. After the specified amount of time, an
    # exception will be thrown.
    pool_timeout=30,  # 30 seconds

    # 'pool_recycle' is the maximum number of seconds a connection can persist.
    # Connections that live longer than the specified amount of time will be
    # re-established
    pool_recycle=1800,  # 30 minutes
    # [END_EXCLUDE]
)
return pool

我将db.py文件导入到app.py文件中:

import os
import sqlalchemy

from flask import Flask
from flask_smorest import Api
from flask_sqlalchemy import SQLAlchemy

from db import db, connect_unix_socket
import models

from resources.user import blp as UserBlueprint

# pylint: disable=C0103
app = Flask(__name__)

def init_connection_pool() -> sqlalchemy.engine.base.Engine:

    # use a Unix socket when INSTANCE_UNIX_SOCKET (e.g. /cloudsql/project:region:instance) is defined
    if unix_socket_path:
        return connect_unix_socket()
    raise ValueError(
        "Missing database connection type. Please define one of INSTANCE_HOST, INSTANCE_UNIX_SOCKET, or INSTANCE_CONNECTION_NAME"
    )

db = None

@app.before_first_request
def init_db() -> sqlalchemy.engine.base.Engine:
    global db
    db = init_connection_pool()

    
api = Api(app)

@app.route("/api")
def user_route():
    return "Welcome user API!"

api.register_blueprint(UserBlueprint)

if __name__ == '__main__':
    server_port = os.environ.get('PORT', '8080')
    app.run(debug=True, port=server_port, host='0.0.0.0')

应用程序运行正确,当我调用端点获取或发布用户,应用程序崩溃,并给予我这个错误:
“当前Flask应用程序未注册到此'SQLAchemy'"运行时错误:当前Flask应用程序未注册到此“SQLAlchemy”示例。是否忘记调用“init_app”,或者是否创建了多个“SQLAlchemy”示例?
这是我的User.py类:

from sqlalchemy.exc import SQLAlchemyError, IntegrityError

from db import db
from models import UserModel
from schemas import UserSchema

blp = Blueprint("Users", "users", description="Operations on users")

@blp.route("/user/<string:user_id>")
class User(MethodView):
    @blp.response(200, UserSchema)
    def get(self, user_id):
        user = UserModel.query.get_or_404(user_id)
        return user

    def delete(self, user_id):
        user = UserModel.query.get_or_404(user_id)
        db.session.delete(user)
        db.session.commit()
        return {"message": "User deleted"}, 200

@blp.route("/user")
class UserList(MethodView):
    @blp.response(200, UserSchema(many=True))
    def get(self):
        return UserModel.query.all()

我如何解决这个问题?

o7jaxewo

o7jaxewo1#

@dev_您的问题是您试图将SQLAlchemy CoreSQLAlchemy ORM混合使用,好像它们是同一个东西,从而导致您的问题。使用sqlalchemy.create_engine创建的SQLAlchemy连接池使用CORE API,而Flask-SQLAlchemy使用SQLAlchemy ORM模型。这是您的问题的核心原因。使用其中一个更容易。
我建议您使用纯粹的Flask-SQLALchemy,并使用cloud-sql-python-connector库,这将使您的工作更加轻松。
为了简单起见,我将去掉db.py,从而使app.py文件如下所示:

from flask import Flask
from flask_smorest import Api
from flask_sqlalchemy import SQLAlchemy
from google.cloud.sql.connector import Connector, IPTypes

from resources.user import blp as UserBlueprint

# load env vars
db_user = os.environ["DB_USER"]  # e.g. 'my-database-user'
db_pass = os.environ["DB_PASS"]  # e.g. 'my-database-password'
db_name = os.environ["DB_NAME"]  # e.g. 'my-database'
instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"]  # e.g. 'project:region:instance'

# Python Connector database connection function
def getconn():
    with Connector() as connector:
        conn = connector.connect(
            instance_connection_name, # Cloud SQL Instance Connection Name
            "pg8000",
            user=db_user,
            password=db_pass,
            db=db_name,
            ip_type= IPTypes.PUBLIC  # IPTypes.PRIVATE for private IP
        )
        return conn

app = Flask(__name__)

# configure Flask-SQLAlchemy to use Python Connector
app.config['SQLALCHEMY_DATABASE_URI'] = "postgresql+pg8000://"
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    "creator": getconn
}
# initialize db (using app!)
db = SQLAlchemy(app)

# rest of your code    
api = Api(app)

# ...

希望这有助于解决您的问题!

相关问题