ApacheFlink—如何使用睡眠时间在java中每秒生成1000多个事件

8wigbo56  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(499)

我有一个生成flink cep事件的生成器,下面给出了它的代码。基本上,我用的是 Thread.sleep() 我在某个地方读到,即使我们使用java,它的睡眠时间也不能少于1毫秒 System.nanoTime() . 发电机代码为

public class RR_interval_Gen extends RichParallelSourceFunction<RRIntervalStreamEvent> {

Integer InputRate  ;  // events/second
Integer Sleeptime ;
Integer NumberOfEvents;

public RR_interval_Gen(Integer inputRate, Integer numberOfEvents ) {
    this.InputRate = inputRate;
    Sleeptime = 1000 / InputRate;
    NumberOfEvents = numberOfEvents;
}

@Override
public void run(SourceContext<RRIntervalStreamEvent> sourceContext) throws Exception {

    long currentTime;
    Random random = new Random();
    int RRInterval;
    int Sensor_id;

   for(int i = 1 ; i <= NumberOfEvents ; i++) {
        Sensor_id =  2;
       currentTime = System.currentTimeMillis();

       // int randomNum = rand.nextInt((max - min) + 1) + min;
       RRInterval =  10 + random.nextInt((20-10)+ 1);

        RRIntervalStreamEvent stream = new RRIntervalStreamEvent(Sensor_id,currentTime,RRInterval);

        synchronized (sourceContext.getCheckpointLock())
        {
            sourceContext.collect(stream);
             }
        Thread.sleep(Sleeptime);
    }
}

@Override
public void cancel() {

}

}

我将在这里用简单的文字说明我的要求。我想让generator类生成事件,比如说一个1200赫兹的ecg流。这个生成器将接受输入速率和生成流所需的总时间等参数。
到目前为止还不错,问题是我需要发送超过1000个事件/秒。如何使用生成值的生成器函数来实现这一点 U[10,20] ?
另外,请让我知道,如果我使用错误的方式,以产生x个事件/秒在上述以下。

Sleeptime = 1000 / InputRate;

提前谢谢

cyej8jka

cyej8jka1#

在windows系统中,睡眠时间最短为10ms,在linux和macintosh中,睡眠时间最短为1毫秒。
睡眠的粒度通常受到线程调度程序的中断周期的限制。在linux中,这个中断周期在最近的内核中通常是1ms。在windows中,调度程序的中断周期通常在10或15毫秒左右
通过我的研究,我了解到在java中使用nano-time sleep在操作系统级别上并没有帮助。如果你想在 arrival rate > 1000 在一种受控的方式下,可以使用实时操作系统(rtos)来完成,因为它们可以睡眠不到一毫秒。现在,我想出了另一种方法,但是在这个解决方案中,到达间隔时间不会一直分布。
假设你想要的是 3000 events/ second ,则可以创建 for loop 它在每次迭代中迭代3次以发送数据,然后休眠 1ms . 因此,对于3个元组,到达间隔时间将彼此接近,但问题将得到解决。这也许是一个愚蠢的解决方案,但它是有效的。
请告诉我是否有更好的解决办法。

相关问题