一个初学者的问题:我正在尝试创建一个FastAPI路由,用于处理Prometheus警报。警报本身只是一个有效负载,如下所示:
{
"endsAt": "0001-01-01T00:00:00Z",
"labels.prometheus": "label/prometheus-stack",
"labels.namespace": "namespace_here",
"labels.alertname": "JobFailed",
"labels.severity": "critical",
"labels.job_name": "uuid-job",
"labels.purpose": "purpose_here",
"labels.team": "team_here",
"status": "firing",
"startsAt": "2023-03-13T21:05:48.433Z",
"annotations.summary": "job failed",
"annotations.message": "Job XY failed.",
"fingerprint": "1234123412341234",
"generatorURL": "https://URL.stuff"
}
对于API,我有一个pydantic类“Alert”,它看起来像这样:
class Alert(BaseModel):
fingerprint: str
status: str
startsAt: datetime
endsAt: datetime
endsAt: str
annotations: Dict[str, str]
labels: Dict[str, str]
我所要做的就是能够将Prometheus有效负载发布到以下FastAPI路由,并能够访问所有属性并对其执行某些操作。
@api.post("/alerts/jobs", tags=["Alerts"])
async def recieve_failed_job_alert(alert: Alert):
print(alert)
print(alert.fingerprint)
print(alert.annotations.summary)
print(alert.labels.alertname)
如果我有上面提到的pydantic类,FastAPI会回答“422 Unprocessable Entity”,因为字段“annotations”和“labels”是必需的。它们本身没有任何内容,只有“子字段”(?)像标签一样。命名空间、注解。留言,...有内容。
如果我编辑pydantic类,给予这些字段成为默认值,并使它们不再需要,就像这样:
annotations: Dict[str, str] = None
labels: Dict[str, str] = None
则Post请求成功,但是值不是期望的。给定的有效载荷被给出为
fingerprint='1234123412341234' status='firing' starts_at=datetime.datetime(2023, 3, 13, 21, 5, 48, 433000, tzinfo=datetime.timezone.utc) ends_at=datetime.datetime(1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) generator_url='https://URL.stuff' annotations=None labels=None labels.purpose='purpose_here' labels.prometheus='label/prometheus-stack' labels.namespace='namespace_here' labels.severity='critical' annotations.summary='job failed' annotations.message='Job XY failed.' labels.alertname='JobFailed' labels.team='team_here' labels.job_name='uuid-job'
请注意“。..annotations=None labels=None...”此外,嵌套的标签和注解似乎不能以任何方式订阅
print(alert.labels.namespace)
^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'namespace'
我确信我只是不明白像这样的嵌套字段的处理,远远不够。但由于这一点,我也不知道如何帮助自己,使这项工作。任何帮助是赞赏!
亲切的问候
1条答案
按热度按时间egdjgwm81#
你最后的评论显示了一些关于Python语法的困惑。如果你的JSON数据有一个名为
labels.prometheus
的字段,你不能要求你的模型提供alert.labels.prometheus
;这意味着Alert
对象有一个名为labels
的字段(该字段又有一个名为prometheus
的字段)。你有一个扁平的数据结构。没有层次结构允许您尝试使用的点式表达式。
您需要创建一个模型,将这些带点的字段名(
labels.prometheus
)Map到与Python语法兼容的字段名(例如,例如,labels_prometheus
)。我们可以使用字段别名来实现这一点;例如:如果我们将示例JSON文档交付给此API,我们将在控制台上看到打印:
阅读更多in the Pydantic documentation.
或者,如果你想要一个分层的数据结构,你可以通过
root_validator
转换JSON输入数据。可能是这样的:这将打印: