在生产sidekiq中将作业添加到历史记录中,但不执行它(sidekiq+redis+ec2+cloud66)

5ssjco0h  于 2021-06-07  发布在  Redis
关注(0)|答案(2)|浏览(310)

我的应用程序有一个导入功能,可以执行sidekiq worker并导入一堆csv行,将它们保存到我的数据库中。当我在本地机器上执行sidekiq时,这可以正常工作,但是当我将代码部署到生产环境时,sidekiq只会正确执行作业一次。当我第二次使用import函数时,作业直接进入sidekiq中的历史记录堆,而worker中的逻辑永远不会执行。这真的很奇怪,因为它不会抛出错误,就像作业被正确执行一样。对于暂存,我在aws弹性缓存中使用redis。

redis_version: 5.0.6

rails, "5.0.7"
sidekiq, "6.0.5"
sidekiq-failures, "1.0.0"
sidekiq-history, "0.0.11"
sidekiq-limit_fetch, "3.4.0"
sidekiq-pro, "5.0.1"
sidekiq-unique-jobs, "6.0.15"

我将感谢任何提示有关的问题,你以前面临类似的,或任何其他我可以做的调试这个问题。我已经跑了

Sidekiq.redis { |conn| conn.ping }
=> "PONG"

看来redis连接正常。
项目工作人员


# frozen_string_literal: true

class ImportWorker
  include Sidekiq::Worker
  sidekiq_options queue: "import_worker", lock: :until_executed, retry: false

  def perform(import_id)
    import = Import.find_by(id: import_id)
    return if import.blank?

    path = import.file.expiring_url(10)
    file = open(path)

    csv = CSV.parse(file.read, headers: true)
    import.update!( number_of_lines_in_csv: csv.size,
                    import_started_at: DateTime.now)

    created_transactions = []
    csv.each do |row|
      guid = row["TransactionUniqueId"]
      next if guid.blank?

      existing_transaction = Transaction.find_by(transaction_unique_id: guid)
      next if existing_transaction.present?

      attributes = Transaction.convert_attributes(import, row).merge(imported_at: Time.now)

      transaction = Transaction.create!(attributes)
      created_transactions << [transaction.id, guid]
      Rails.logger.info "Transaction #{row["TransactionUniqueId"]} created."
    end

    import.update!(import_finished_at: DateTime.now,
                   imported:           true)
    send_mail(import_id, created_transactions)
  end

  def send_mail(import_id, created_transactions)
    ["email0@example.com", "email1@example.com"].each do |email|
      ImportTransactionsMailer.import_processed(import_id, email, created_transactions).deliver
    end
  end
end

编辑1:抱歉,我忘了说我正在使用cloud66部署我的应用程序,如果这有任何帮助的话。

8yoxcaq7

8yoxcaq71#

sidekiq_options queue: "import_worker", lock: :until_executed, retry: false

如果发生错误怎么办?作业是否将被丢弃,但唯一性锁将保留,从而阻止更多作业排队?

hlswsv35

hlswsv352#

我发现了问题所在。所以,我在一个 after_create 钩在我的进口模型如下。


# frozen_string_literal: true

class Import < ApplicationRecord
  has_many :transactions
  belongs_to :admin_user

  has_attached_file :file, s3_protocol: :https
  validates_attachment_content_type :file, content_type: ["text/plain",
                                                          "text/csv",
                                                          "application/vnd.ms-excel",
                                                          "application/octet-stream"]
  validates :file, attachment_presence: true
  has_paper_trail
  after_create :run_import_in_background

  def run_import_in_background
    ImportWorker.perform_async(id)
  end
end

但是当工作人员执行第一行以查找导入时

class ImportWorker
  include Sidekiq::Worker
  sidekiq_options queue: "import_worker", lock: :until_executed, retry: false

  def perform(import_id)
    import = Import.find_by(id: import_id)
    return if import.blank?
...

如果导入是 nil 它应该会回来。问题是,我以为进口永远不会 nil ,因为这是由 after_create 钩子,但实际上是 nil . 当我把回程线改为 raise StandardError.new("Empty import object.") if import.blank? 工人开始失败了。
所以我也换了我的工人 sidekiq_optionsretry: falseretry: 3 在第二次尝试中,worker执行了ok,因为它现在可以找到具有指定id的导入 after_create 钩子和侧钩。这可能与在这个设置中使用s3gem有关。在s3中保存文件可能会导致在db中保存对象的延迟。
您可以在下面看到最终的worker代码。


# frozen_string_literal: true

class ImportWorker
  include Sidekiq::Worker
  sidekiq_options queue: "import_worker", lock: :until_executed, retry: 3

  def perform(import_id)
    import = Import.find_by(id: import_id)
    raise StandardError.new("Empty import object.") if import.blank?
...

相关问题