我正在使用JavaServlet。在web.xml文件中,我初始化了quartz调度器。我有两份不同的工作和两个触发器。我正在使用资源文件夹中的quartz.properties文件。下面是我的web.xml文件。
<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >
<web-app>
<display-name>Archetype Created Web Application</display-name>
<context-param>
<param-name>quartz:config-file</param-name>
<param-value>quartz.properties</param-value>
</context-param>
<context-param>
<param-name>quartz:shutdown-on-unload</param-name>
<param-value>true</param-value>
</context-param>
<context-param>
<param-name>quartz:wait-on-shutdown</param-name>
<param-value>true</param-value>
</context-param>
<context-param>
<param-name>quartz:start-on-load</param-name>
<param-value>true</param-value>
</context-param>
<listener>
<listener-class>
org.quartz.ee.servlet.QuartzInitializerListener
</listener-class>
</listener>
<listener>
<listener-class>com.hpe.statistics.StatisticsQuartzListener</listener-class>
</listener>
<servlet>
<servlet-name>StatisticsIvr</servlet-name>
<display-name>StatisticsIvr</display-name>
<description></description>
<servlet-class>StatisticsIvr</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>StatisticsIvr</servlet-name>
<url-pattern>/StatisticsIvr</url-pattern>
</servlet-mapping>
</web-app>
statisticsivr是我的servlet名称。statisticsquartzlistener是我初始化所有作业和触发器的类。com.hpe.statistics。是我拥有statisticsquartzlistener类的包名。下面是实现servletcontextlistener的statisticsquartzlistener类。
public void contextInitialized(ServletContextEvent ctx)
{
JobDetail newfilejob = JobBuilder.newJob(NewFileJob.class)
.withIdentity("fileJob", "group1").build();
JobDetail newlinejob = JobBuilder.newJob(NewFileJob.class)
.withIdentity("lineJob", "group2").build();
Trigger filetrigger = TriggerBuilder
.newTrigger()
.withIdentity("fileTrigger", "group1")
.startNow()
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMinutes(6).repeatForever())
.build();
Trigger linetrigger = TriggerBuilder
.newTrigger()
.withIdentity("lineTrigger", "group2")
.startNow()
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMinutes(2).repeatForever())
.build();
try {
scheduler1 = ((StdSchedulerFactory) ctx.getServletContext()
.getAttribute(
QuartzInitializerListener.QUARTZ_FACTORY_KEY))
.getScheduler();
scheduler1.scheduleJob(newfilejob, filetrigger);
} catch (SchedulerException e) {
}
try {
scheduler2 = ((StdSchedulerFactory) ctx.getServletContext()
.getAttribute(
QuartzInitializerListener.QUARTZ_FACTORY_KEY))
.getScheduler();
scheduler2.scheduleJob(newlinejob, linetrigger);
} catch (SchedulerException e) {
}
}
下面是quartz.properties文件
# Main Quartz configuration
org.quartz.scheduler.skipUpdateCheck = true
org.quartz.scheduler.instanceName = StatisticsQuartzListener
org.quartz.scheduler.jobFactory.class = org.quartz.simpl.SimpleJobFactory
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 5
在newlinejob(第二个作业)中,我只想执行一次部分代码。newlinejob类如下
public class NewLineJob implements Job{
public static final String KAFKAHOST="kafkahost";
public static final String KAFKAPORT="kafkaport";
public static final String KAFKATOPICNAME="topicname";
KafkaConsumer<String, String> consumer;
HashMap<String,Long> consumerMap=new HashMap<String,Long>();
File lastModifiedFile;
String flag="off";
public void execute(JobExecutionContext context)
throws JobExecutionException
{
String kafkahost=dataMap.getString(KAFKAHOST);
String kafkaport=dataMap.getString(KAFKAPORT);
String topicname=dataMap.getString(KAFKATOPICNAME);
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
Date d = new Date();
String timeStamp = sf.format(d);
int i=0;
String writeString;
if(flag.equals("off"))
{
flag="on";
Properties propsConsumer = new Properties();
propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport);
propsConsumer.put("group.id", "test");
propsConsumer.put("enable.auto.commit", "true");
propsConsumer.put("auto.commit.interval.ms", "1000");
propsConsumer.put("session.timeout.ms", "30000");
propsConsumer.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
propsConsumer.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(propsConsumer);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicname));
}
///some other operations which I want according to sceduler
}
我只想执行一次的部分是
Properties propsConsumer = new Properties();
propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport);
propsConsumer.put("group.id", "test");
propsConsumer.put("enable.auto.commit", "true");
propsConsumer.put("auto.commit.interval.ms", "1000");
propsConsumer.put("session.timeout.ms", "30000");
propsConsumer.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
propsConsumer.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(propsConsumer);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicname));
因为我不想每次调度器触发时都创建一个新的消费者。每次调度器触发时,我只想在代码的其余部分使用相同的kafka消费者。甚至我也试着把我只需要一次的代码部分放到构造函数中。但由于key.deserializer没有默认值,因此它会得到一个错误。我要做什么?提前谢谢
暂无答案!
目前还没有任何答案,快来回答吧!