mongoose 使用bull进行节点cron作业调度

qnyhuwrf  于 2022-11-13  发布在  Go
关注(0)|答案(3)|浏览(176)

在我的Node-Express应用程序中,我需要根据用户从UI上可用的日历中选择的开始时间-日期和结束时间-日期来调度作业(与LinkedIn配置文件连接)。作业被调度为在几个选定的数据批上执行,直到日期-时间要求匹配为止。
我正在使用npm包Bull来处理作业队列和调度,但它似乎不起作用。作业(连接LinkedIn配置文件)或者说简单的控制台(用于测试目的),一旦设置了定时器或从UI创建了一个新作业,就立即执行,而不是在定义的开始时间执行。
下面是我的代码:

const Queue = require("bull");
const mongoose = require("mongoose");
const schedulerJobs = require("../api/models/schedulerJobs");

var id = mongoose.Types.ObjectId();
var HTMLParser = require("node-html-parser");
var { fetchSendConnectionData } = require("./services");
var { setLeadName } = require("./utils");

module.exports = async function connectionScheduler(
  userId,
  cookieName,
  leads,
  start,
  end,
  campaign_name,
  connection_message,
  campaign_id
) {
  var i = 0;
  var parse_message = HTMLParser.parse(
    connection_message
  ).structuredText.toString();

  // 1. Initiating the Queue
  const campaignQueue = new Queue("Campaign", {
    redis: {
      host: "127.0.0.1",
      port: 6379,
    },
  });

  const data = leads;

  const options = {
    attempts: 2,
    delay: 5000,
    repeat: {
      cron: "*/2 * * * * *",
      tz: "America/Los_Angeles",
      startDate: start,
      endDate: end,
    },
  };

  // 2. Adding a Job to the Queue
  campaignQueue.add(`${campaign_name}`, data, options);

  // 3. Consumer
  campaignQueue.process(async (job) => {
    campaignQueue.close();

    return await console.log("CRON started.....");

    console.log(
      `Connection Cron For ${campaign_name} Now Points To Index => ${i}`
    );
    fetchSendConnectionData({
      name: cookieName,
      profile_links: job[i].profileUrl,
      message: setLeadName(parse_message, job[i].name),
    })
      .then(({ data: { message } }) => {
        if (message === "Connection send") {
          console.log(
            `Its Been 5 Minutes And Connection Request Is Sent To => ${job[i].name}`
          );
        } else {
          console.log(
            `Connection Request To => ${job[i].name} Has Failed So Now We Move On To The Next One`
          );
        }
        i = i + 1;
        if (i === job.length) {
          job.close();
        }
      })
      .catch((err) => {
        console.log(
          `Connection Request To => ${job[i].name} Has Failed Due To => ${err.message}`
        );
        i = i + 1;
        if (i === job.length) {
          job.close();
        }
      });
  });

  // 4. Listener
  campaignQueue.on("error", (err) => {
    // console.log(`Job completed with result ${result}`);
  });

  campaignQueue.on("progress", function (job, progress) {
    // A job's progress was updated!
  });

  campaignQueue.on("completed", (job, result) => {
    // console.log("job completed", job);

    //save job completed to database
    const jobdetail = {
      userId,
      start,
      end,
      campaign_id,
      campaign_name,
      connection_message,
      jobId: job.opts.jobId,
    };

    const schedulerjobs = new schedulerJobs(jobdetail);
    schedulerjobs.save().then((scheduledjob) => console.log("Job saved to db"));
  });

  campaignQueue.on("failed", function (job, err) {
    // A job failed with reason `err`!
  });
};

服务器一启动,就立即打印输出:

CRON started.....
Job saved to db

它不等待start,并且只运行一次,直到end才保持运行。
请帮助解决此问题

vkc1a9a2

vkc1a9a21#

可以使用bull的重复功能

queue.add({your_data:""},{repeat:{cron:"5 * * * *"}});

上述代码将每5分钟运行一次作业。
https://optimalbits.github.io/bull/
请参阅上述文档可重复部分。

bis0qfac

bis0qfac2#

对于每5分钟重复一次的作业,应使用以下代码:

const myJob = await myqueue.add(
  { foo: 'bar' },
  {
    repeat: {
      every: 5*60*1000,
    }
  }
);
wgx48brx

wgx48brx3#

return之后的任何代码行都将被忽略。请尝试以下操作:

// return await console.log("CRON started.....");
    await console.log("CRON started.....");

如果需要,在函数的末尾放置空的return

相关问题