erlang 专辑中文名:Elixir百老汇

ndh0cuux  于 2023-03-27  发布在  Erlang
关注(0)|答案(1)|浏览(234)

我试图使用partition_by选项在broadway,使同一分区的消息去同一处理器.我实际上有20处理器和消息数据是从0到9

....
     processors: [
              default: [
                  max_demand: 50,
                  concurrency: 20
                  
              ]        
     ],
     partition_by: &partition/1
     )
  end
    
  defp partition(msg) do
       msg.data 
  end

  def handle_message(_processor, msg, _ctx) do
    Logger.info "pid #{inspect(self())}"
    ...
    msg
  end

奇怪的是,当我在handle消息回调中添加一些日志来监视处理器PID时,我总是为所有传入的消息获得相同的处理器PID。但是当我删除partition_by行时,我有不同的处理器PID。你知道为什么分区不起作用吗?

whlutmcx

whlutmcx1#

我刚刚自己在玩百老汇的分区,它和预期的一样好用,具有相同分区键的消息最终位于相同的生产者示例上(由PID标识)。从我在代码中看到的,不清楚从partition方法返回什么。您需要注意,应该从那里返回一个数字。如果你的message.data是一个字符串或其他东西,那么你可能会在同一个分区中结束。如果你的消息中没有一个数字可以用于分区,你可以帮助自己使用内置的erlang方法并哈希字符串(:erlang.phash2/1)。

相关问题