我有3个 java
具有单个 main functions
由父程序在3个独立线程上调用。这三个程序都使用下面的代码在一个控制台中打印计数器的值
对于程序#1
System.out.print(ANSI_PURPLE + " \r EEG Sensor count =" + Sensor1_tuple_count);
System.out.flush();
对于程序#2
System.out.print(ANSI_PURPLE + " \r BP Sensor count = " + Sensor2_tuple_count + " ");
System.out.flush();
对于程序#3
System.out.print(ANSI_PURPLE + "\r ECG Sensor count =" + Sensor3_tuple_count);
System.out.flush();
所有这些都是在lambda函数中更新的,现在所有这些值都在互相覆盖,如何得到这样的输出
EEG Sensor count = X
BP Sensor count = Y
ECG Sensor count = Z
编辑1
下面给出了使用flink的单个程序的代码
public class bp_sensor {
public static final String ANSI_RED = "\u001B[31m";
public static final String ANSI_BLUE = "\u001B[34m";
public static final String ANSI_PURPLE = "\u001B[35m";
public static void main(String[] args) throws Exception {
Constants constants = null;
//setting the envrionment variable as StreamExecutionEnvironment
StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.getExecutionEnvironment();
envrionment.setParallelism(1);
DataStream<Sensor_Event> bp_stream = envrionment
.addSource(new EventGenerator_bp_sensor(constants.bp_data_rate,constants.bp_run_time_sec,1,1))
.name("BP stream")
.setParallelism(1);
if(constants.send_to_timekeeper){
//Sending the stream to timekeeper
bp_stream.map(new RichMapFunction<Sensor_Event, String>() {
@Override
public String map(Sensor_Event event) throws Exception {
String tuple = event.toString();
System.out.println(tuple);
return tuple + "\n";
}
}).writeToSocket(constants.timekeeper_ip, 8003, new SimpleStringSchema() );
}
// Sending the stream to mobile phone
if(constants.send_to_android){
DataStreamSink<String> total_tuples = bp_stream.map(new RichMapFunction<Sensor_Event, String>() {
IntCounter Sensor2_tuple_count;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.Sensor2_tuple_count = getRuntimeContext().getIntCounter("total_tuples");
}
@Override
public String map(Sensor_Event event) throws Exception {
String tuple = event.toString();
Sensor2_tuple_count.add(1);
System.lineSeparator();
System.out.print(ANSI_PURPLE + " \r BP Sensor count = " + Sensor2_tuple_count + " ");
System.out.flush();
// System.out.println(ANSI_BLUE + tuple);
return tuple + "\n";
}
}).writeToSocket(constants.mobile_ip, 7003, new SimpleStringSchema() );
}
//start the execution
JobExecutionResult executionResult = envrionment.execute();
Integer number_of_tuples = (Integer) executionResult.getAllAccumulatorResults().get("total_tuples");
int input_rate = number_of_tuples/constants.bp_run_time_sec;
System.out.println("\n");
System.out.println(ANSI_BLUE + " Expected Input rate of BP Sensor = " + constants.bp_data_rate + " tuples/second");
System.out.println(ANSI_RED + " Actual Input rate of BP Sensor = " + input_rate + " tuples/second");
System.out.println(ANSI_PURPLE + " Total # of tuples sent by BP Sensor = " + number_of_tuples );
}// main
} //class
父程序代码为
public class start_sensors {
public static void main(String[] args) throws Exception {
ecg_sensor ecg_sensor = null;
bp_sensor bp_sensor = null;
eeg_sensor eeg_sensor = null;
Thread thread1 = new Thread() {
@Override
public void run() {
try {
ecg_sensor.main(null);
} catch (Exception e) {
e.printStackTrace();
}
}
};
Thread thread2 = new Thread() {
@Override
public void run() {
try {
bp_sensor.main(null);
} catch (Exception e) {
e.printStackTrace();
}
}
};
Thread thread3 = new Thread() {
@Override
public void run() {
try {
eeg_sensor.main(null);
} catch (Exception e) {
e.printStackTrace();
}
}
};
thread1.start();
thread2.start();
thread3.start();
} //main
} //class
1条答案
按热度按时间7lrncoxx1#
至于线程,要控制它们的工作顺序,必须使用某种同步。至于控制台的行为,它超出了java的范围,依赖于环境。