Symfony Messenger组件和redis传输:在所有队列上处理消息

nmpmafwu  于 2023-11-22  发布在  Redis
关注(0)|答案(1)|浏览(163)

我有4个使用Redis作为传输的消息队列,每个队列都在各自的worker中被触发(通过Supervisor),但是每次消息到达其中一个队列时,它都会在所有队列中被处理。文档在这方面很清楚,它解释说必须在worker中配置环境变量MESSENGER_CONSUMER_NAME以区分哪个消息将在其各自的队列中被处理。
问题是我找不到解决方案。在教义作为传输的情况下,它很简单,因为通过配置队列的名称,可以区分消息。

#messenger.yaml:
framework:
    messenger:
        # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
        failure_transport: failed
        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
            audit: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 3
            async_backup_task:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 0
            failed: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
            sync: 'sync://'
        routing:
            Symfony\Component\Mercure\Update: async
            App\Message\UsuarioTraceMessage: audit
            App\Message\BackupDirectoryToRemoteStorageMessage: async_backup_task

字符串
这是Supervisor配置文件:

[program:squidmgr-messenger-async]
    environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
    command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume async --time-limit=3600 --env=prod
    user=www-data
    numprocs=1
    startsecs=0
    autostart=true
    autorestart=true
    process_name=%(program_name)s_%(process_num)02d
    stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
    stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log
    
    [program:squidmgr-messenger-audit]
    environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
    command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume audit --time-limit=3600 --env=prod
    user=www-data
    numprocs=1
    startsecs=0
    autostart=true
    autorestart=true
    process_name=%(program_name)s_%(process_num)02d
    stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
    stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log
    
    [program:squidmgr-messenger-backup-task]
    environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
    command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume async_backup_task --time-limit=3600 --env=prod
    user=www-data
    numprocs=1
    startsecs=0
    autostart=true
    autorestart=true
    process_name=%(program_name)s_%(process_num)02d
    stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
    stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log
[program:squidmgr-messenger-failed]
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume failed --time-limit=3600 --env=prod
user=www-data
numprocs=1
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log


在worker中设置的环境变量可能需要在每个队列的DSN中引用,但我不清楚。

wsewodh2

wsewodh21#

您还需要在Messenger配置中设置使用者名称:

#messenger.yaml:
framework:
    messenger:
        # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
        failure_transport: failed
        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                  consumer: %env(MESSENGER_CONSUMER_NAME)%

字符串

相关问题