servlets—如何使部分作业在java的quartz调度器(即kafka的使用者)中只执行一次

7cjasjjr  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(246)

我正在使用JavaServlet。在web.xml文件中,我初始化了quartz调度器。我有两份不同的工作和两个触发器。我正在使用资源文件夹中的quartz.properties文件。下面是我的web.xml文件。

<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
<display-name>Archetype Created Web Application</display-name>

 <context-param>
     <param-name>quartz:config-file</param-name>
     <param-value>quartz.properties</param-value>
 </context-param>
 <context-param>
     <param-name>quartz:shutdown-on-unload</param-name>
     <param-value>true</param-value>
 </context-param>
 <context-param>
     <param-name>quartz:wait-on-shutdown</param-name>
     <param-value>true</param-value>
 </context-param>
 <context-param>
     <param-name>quartz:start-on-load</param-name>
     <param-value>true</param-value>
 </context-param> 

 <listener>
<listener-class>
org.quartz.ee.servlet.QuartzInitializerListener
</listener-class>
</listener>
<listener>
<listener-class>com.hpe.statistics.StatisticsQuartzListener</listener-class>
</listener>
<servlet>
<servlet-name>StatisticsIvr</servlet-name>
<display-name>StatisticsIvr</display-name>
<description></description>
<servlet-class>StatisticsIvr</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>StatisticsIvr</servlet-name>
<url-pattern>/StatisticsIvr</url-pattern>
</servlet-mapping>
</web-app>

statisticsivr是我的servlet名称。statisticsquartzlistener是我初始化所有作业和触发器的类。com.hpe.statistics。是我拥有statisticsquartzlistener类的包名。下面是实现servletcontextlistener的statisticsquartzlistener类。

public void contextInitialized(ServletContextEvent ctx)
{

    JobDetail newfilejob = JobBuilder.newJob(NewFileJob.class)
            .withIdentity("fileJob", "group1").build();

    JobDetail newlinejob = JobBuilder.newJob(NewFileJob.class)
            .withIdentity("lineJob", "group2").build();

    Trigger filetrigger = TriggerBuilder
            .newTrigger()
            .withIdentity("fileTrigger", "group1")
            .startNow()
            .withSchedule(
                    SimpleScheduleBuilder.simpleSchedule()
                            .withIntervalInMinutes(6).repeatForever())
            .build();
    Trigger linetrigger = TriggerBuilder
            .newTrigger()
            .withIdentity("lineTrigger", "group2")
            .startNow()
            .withSchedule(
                    SimpleScheduleBuilder.simpleSchedule()
                            .withIntervalInMinutes(2).repeatForever())
            .build();

    try {
            scheduler1 = ((StdSchedulerFactory) ctx.getServletContext()
                    .getAttribute(
                            QuartzInitializerListener.QUARTZ_FACTORY_KEY))
                    .getScheduler();
            scheduler1.scheduleJob(newfilejob, filetrigger);
        } catch (SchedulerException e) {

        }

     try {
            scheduler2 = ((StdSchedulerFactory) ctx.getServletContext()
                    .getAttribute(
                            QuartzInitializerListener.QUARTZ_FACTORY_KEY))
                    .getScheduler();
            scheduler2.scheduleJob(newlinejob, linetrigger);
        } catch (SchedulerException e) {

        }

}

下面是quartz.properties文件


# Main Quartz configuration

org.quartz.scheduler.skipUpdateCheck = true
org.quartz.scheduler.instanceName = StatisticsQuartzListener
org.quartz.scheduler.jobFactory.class = org.quartz.simpl.SimpleJobFactory
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 5

在newlinejob(第二个作业)中,我只想执行一次部分代码。newlinejob类如下

public class NewLineJob implements Job{

public static final String KAFKAHOST="kafkahost";
public static final String KAFKAPORT="kafkaport";
public static final String KAFKATOPICNAME="topicname";
KafkaConsumer<String, String> consumer;
 HashMap<String,Long> consumerMap=new HashMap<String,Long>();
 File lastModifiedFile;

 String flag="off";

public void execute(JobExecutionContext context)
        throws JobExecutionException
        {

            String kafkahost=dataMap.getString(KAFKAHOST);
            String kafkaport=dataMap.getString(KAFKAPORT);
            String topicname=dataMap.getString(KAFKATOPICNAME);

              SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
              Date d = new Date();
              String timeStamp = sf.format(d);

              int i=0;
              String writeString;
              if(flag.equals("off"))
              {
                  flag="on";
              Properties propsConsumer = new Properties();

                propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport);
                propsConsumer.put("group.id", "test");
                propsConsumer.put("enable.auto.commit", "true");
                propsConsumer.put("auto.commit.interval.ms", "1000");
                propsConsumer.put("session.timeout.ms", "30000");
                propsConsumer.put("key.deserializer", 
                     "org.apache.kafka.common.serialization.StringDeserializer");
                propsConsumer.put("value.deserializer", 
                     "org.apache.kafka.common.serialization.StringDeserializer");         
                consumer = new KafkaConsumer<String, String>(propsConsumer);

                  //Kafka Consumer subscribes list of topics here.
                consumer.subscribe(Arrays.asList(topicname)); 

              }

              ///some other operations which I want according to sceduler
        }

我只想执行一次的部分是

Properties propsConsumer = new Properties();

                propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport);
                propsConsumer.put("group.id", "test");
                propsConsumer.put("enable.auto.commit", "true");
                propsConsumer.put("auto.commit.interval.ms", "1000");
                propsConsumer.put("session.timeout.ms", "30000");
                propsConsumer.put("key.deserializer", 
                     "org.apache.kafka.common.serialization.StringDeserializer");
                propsConsumer.put("value.deserializer", 
                     "org.apache.kafka.common.serialization.StringDeserializer");         
                consumer = new KafkaConsumer<String, String>(propsConsumer);

                  //Kafka Consumer subscribes list of topics here.
                consumer.subscribe(Arrays.asList(topicname));

因为我不想每次调度器触发时都创建一个新的消费者。每次调度器触发时,我只想在代码的其余部分使用相同的kafka消费者。甚至我也试着把我只需要一次的代码部分放到构造函数中。但由于key.deserializer没有默认值,因此它会得到一个错误。我要做什么?提前谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题