django 不能使用celery 延迟保存形式:对象不可序列化

5hcedyr0  于 2023-10-21  发布在  Go
关注(0)|答案(4)|浏览(100)

使用Django 1.8,我想在视图中保存表单后触发延迟celery 函数

def new_topic(request, forum_id):
    form = TopicForm()
    uid = request.user.id
    if request.method == 'POST':
        tform = TopicForm(request.POST)
        if tform.is_valid():
            topic = tform.save(commit=False)
            topic.title = clean_title(tform.cleaned_data['title'])
            topic.description = clean_desc(tform.cleaned_data['description'])
            topic.save()
            notify_new_topic.delay( uid, topic) #<--problem here
            #rest of the views

但我得到

EncodeError at /add/topic/
<Topic: Topic object> is not JSON serializable

如果我从celery任务中删除delay,我不会得到任何错误。
任务是:

@shared_task
def notify_new_topic(flwd_id, topic):
    title = topic.title
    link = topic.slug

    flwd= cached_user(flwd_id) #User.objects.get(id = flwd_id)
    print 'flwd is', flwd.username
    flwr_ids = FollowUser.objects.filter(followed=flwd).values('follower_id')
    flwrs = User.objects.filter(id__in= flwr_ids).values('id', 'username','email') 

    for f in flwrs:
        print 'flwr username:',  f['username']
        if notify_flwdp_applies(int(f['id'])):
            print 'notify flwdp applies'
            make_alerts_new_topic(flwd_id, f['id'], topic)
            print 'back from make_alerts_new_topic'

我想知道如何调试/修复这个问题?

vyswwuz2

vyswwuz21#

任务的参数应该是可序列化的(即string、int等)。要修复错误,您可以将topic_id作为参数传递,并在任务方法中获取主题对象:

notify_new_topic.delay( uid, topic.id)

@shared_task
def notify_new_topic(flwd_id, topic_id):
    topic = Topic.objects.get(pk=topic_id)
    title = topic.title
    link = topic.slug

    flwd= cached_user(flwd_id) #User.objects.get(id = flwd_id)
    print 'flwd is', flwd.username
    flwr_ids = FollowUser.objects.filter(followed=flwd).values('follower_id')
    flwrs = User.objects.filter(id__in= flwr_ids).values('id', 'username','email') 

    for f in flwrs:
        print 'flwr username:',  f['username']
        if notify_flwdp_applies(int(f['id'])):
            print 'notify flwdp applies'
            make_alerts_new_topic(flwd_id, f['id'], topic)
            print 'back from make_alerts_new_topic'
vzgqcmou

vzgqcmou2#

既然已经提供了解决方案,我将尝试解释为什么我们不能将不可序列化的对象传递给 celery tasks

为什么要将可序列化对象传递给celery任务?

对于celery,我们使用 * 消息代理 *(如 RedisRabbitMQ)。假设我们使用 Redis。当一个 celery task 被调用时,参数被传递给 Redis,这样broker就可以读取它们。要做到这一点,这些参数的数据类型应该得到 Redis 的支持。

解决方案

假设你想把python dictionary作为参数传递给一个 *celery任务 *,把这些值添加到celery配置中:

task_serializer = "json"  
result_serializer = "json"
accept_content = ["json"]

或者你可以

celery.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"]
)

其他情况,请将上表中的json替换为picklexml等。

典型的基于文本的序列化格式有csvjsonxmlyamltoml等。基于二进制的格式是protobufavro。Python还有几个包,如picklenumpypandas,支持将自定义对象序列化为byte。您还可以自定义序列化程序。

这些配置有什么作用?

1.指示celery首先序列化python对象,然后将它们传递给 * 消息代理 *。
1.将来自 * 消息代理 * 的对象格式化,然后将它们提供给 celery worker

参考文献

li9yvcax

li9yvcax3#

更改为拾取enoding

app.conf.event_serializer = 'pickle' # this event_serializer is optional. 
app.conf.task_serializer = 'pickle'
app.conf.result_serializer = 'pickle'
app.conf.accept_content = ['application/json', 'application/x-python-serialize']
wh6knrhe

wh6knrhe4#

你可以将示例作为json传递,然后从json重新创建示例,不需要像这样再次进行数据库查询:

serialized_topic = topic.__dict__
serialized_topic.pop('_state')    
notify_new_topic.delay(uid, serialized_topic)

现在,在任务内部,从序列化的主题重新创建示例

@shared_task
def notify_new_topic(flwd_id, serialized_topic):
    topic = Topic(**serialized_topic)
    # reset of the code

相关问题