我正试图通过遵循正式的fastapi文档,使用python、fastapi和异步sqlalchemy构建一个简单的作业板。我在通过检索数据库中的数据时遇到一些问题 ID
.
希望以下是最小的可复制代码段:
schemas/jobs.py
from typing import Optional
from pydantic import BaseModel
from datetime import date, datetime
class JobBase(BaseModel):
title: Optional[str] = None
company_name: Optional[str] = None
company_url: Optional[str] = None
location: Optional[str] = "remote"
description: Optional[str] = None
date_posted: Optional[date] = datetime.now().date()
class JobCreate(JobBase):
title: str
company_name: str
location: str
description: str
class ShowJob(JobBase):
title: str
company_name: str
company_url: Optional[str]
location: str
date_posted: date
description: str
class Config():
orm_mode = True
routes/route_jobs.py
from fastapi import APIRouter, HTTPException, status
from fastapi import Depends
from db.repository.job_board_dal import job_board
from schemas.jobs import JobCreate, ShowJob
from db.repository.job_board_dal import Job
from depends import get_db
router = APIRouter()
@router.post("/create-job",response_model=ShowJob)
async def create_user(Job: JobCreate, jobs: Job = Depends(get_db)):
owner_id = 1
return await jobs.create_new_job(Job, owner_id)
@router.get("/get/{id}")
def retrieve_job_by_id(id:int, job_board = Depends(get_db)):
#print(type(session))
job_id = job_board.retrieve_job(job_board, id=id)
if not job_id:
HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job with id {id} does not exist")
return job_id
db/repository/job_board_dal.py
from sqlalchemy.orm import Session, query
from schemas.users import UserCreate
from schemas.jobs import JobCreate
from db.models.users import User
from db.models.jobs import Job
from core.hashing import Hasher
class job_board():
def __init__(self, db_session: Session):
self.db_session = db_session
async def register_user(self, user: UserCreate):
new_user = User(username=user.username,
email=user.email,
hashed_password=Hasher.get_password_hash(user.password),
is_active = False,
is_superuser=False
)
self.db_session.add(new_user)
await self.db_session.flush()
return new_user
async def create_new_job(self, job: JobCreate, owner_id: int):
new_job = Job(**job.dict(), owner_id = owner_id)
self.db_session.add(new_job)
await self.db_session.flush()
return new_job
def retrieve_job(self, id:int):
item = self.db_session.query(Job).filter(Job.id == id).first()
return item
那就看你了
from db.session import async_session
from db.repository.job_board_dal import job_board
async def get_db():
async with async_session() as session:
async with session.begin():
yield job_board(session)
我试图改变依赖关系
@router.get("/get/{id}")
def retrieve_job_by_id(id:int, id_job: job_board = Depends(get_db)):
#print(type(session))
job_id = job_board.retrieve_job(id_job, id=id)
if not job_id:
HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job with id {id} does not exist")
return job_id
它给了我这个错误 AttributeError: 'AsyncSession' object has no attribute 'query'
. 任何帮助都将不胜感激。
1条答案
按热度按时间dzjeubhm1#
AsyncSession
类不声明query
财产。你可以用
AsyncSession.get
取回Job
与给定记录匹配的记录id
. 此方法返回None
如果未找到与所述记录匹配的记录id
.例如