在多处理方案中,使用共享同一控制台,在单独的连续行中打印3个java程序的输出

9lowa7mx  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(374)

我有3个 java 具有单个 main functions 由父程序在3个独立线程上调用。这三个程序都使用下面的代码在一个控制台中打印计数器的值
对于程序#1

System.out.print(ANSI_PURPLE + "  \r  EEG Sensor count =" + Sensor1_tuple_count);
   System.out.flush();

对于程序#2

System.out.print(ANSI_PURPLE + " \r  BP Sensor count = " + Sensor2_tuple_count + " ");
    System.out.flush();

对于程序#3

System.out.print(ANSI_PURPLE + "\r  ECG Sensor count =" + Sensor3_tuple_count);
    System.out.flush();

所有这些都是在lambda函数中更新的,现在所有这些值都在互相覆盖,如何得到这样的输出

EEG Sensor count = X
BP Sensor count  = Y
ECG Sensor count = Z

编辑1

下面给出了使用flink的单个程序的代码

public class bp_sensor {

    public static final String ANSI_RED     = "\u001B[31m";
    public static final String ANSI_BLUE    = "\u001B[34m";
    public static final String ANSI_PURPLE  = "\u001B[35m";

    public static void main(String[] args)  throws Exception {

        Constants constants = null;

        //setting the envrionment variable as StreamExecutionEnvironment
        StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.getExecutionEnvironment();

        envrionment.setParallelism(1);

        DataStream<Sensor_Event> bp_stream  = envrionment
                .addSource(new EventGenerator_bp_sensor(constants.bp_data_rate,constants.bp_run_time_sec,1,1))
                .name("BP stream")
                .setParallelism(1);

        if(constants.send_to_timekeeper){

            //Sending the stream to timekeeper
            bp_stream.map(new RichMapFunction<Sensor_Event, String>() {
                @Override
                public String map(Sensor_Event event) throws Exception {
                    String tuple = event.toString();
                    System.out.println(tuple);
                    return tuple + "\n";
                }
            }).writeToSocket(constants.timekeeper_ip, 8003, new SimpleStringSchema() );

        }

        // Sending the stream to mobile phone

        if(constants.send_to_android){

            DataStreamSink<String> total_tuples = bp_stream.map(new RichMapFunction<Sensor_Event, String>() {

                IntCounter Sensor2_tuple_count;

                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    this.Sensor2_tuple_count = getRuntimeContext().getIntCounter("total_tuples");
                }

                @Override
                public String map(Sensor_Event event) throws Exception {
                    String tuple = event.toString();
                    Sensor2_tuple_count.add(1);

                    System.lineSeparator();
                    System.out.print(ANSI_PURPLE + " \r  BP Sensor count = " + Sensor2_tuple_count + " ");
                    System.out.flush();

//                    System.out.println(ANSI_BLUE + tuple);

                    return tuple + "\n";
                }
            }).writeToSocket(constants.mobile_ip, 7003, new SimpleStringSchema() );

        }

        //start the execution
        JobExecutionResult executionResult = envrionment.execute();

        Integer number_of_tuples = (Integer) executionResult.getAllAccumulatorResults().get("total_tuples");
        int input_rate = number_of_tuples/constants.bp_run_time_sec;

        System.out.println("\n");
        System.out.println(ANSI_BLUE   + "  Expected Input rate of BP Sensor     = " + constants.bp_data_rate + " tuples/second");
        System.out.println(ANSI_RED    + "  Actual Input rate of BP Sensor       = " + input_rate + " tuples/second");
        System.out.println(ANSI_PURPLE + "  Total # of tuples sent by BP Sensor  = " + number_of_tuples );

    }// main

} //class

父程序代码为

public class start_sensors {

    public static void main(String[] args) throws Exception {

        ecg_sensor ecg_sensor = null;
        bp_sensor bp_sensor = null;
        eeg_sensor eeg_sensor = null;

        Thread thread1 = new Thread() {

            @Override
            public void run() {
                try {

                    ecg_sensor.main(null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        Thread thread2 = new Thread() {

            @Override
            public void run() {
                try {
                    bp_sensor.main(null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        Thread thread3 = new Thread() {

            @Override
            public void run() {
                try {
                    eeg_sensor.main(null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        thread1.start();
        thread2.start();
        thread3.start();

    } //main
} //class
7lrncoxx

7lrncoxx1#

至于线程,要控制它们的工作顺序,必须使用某种同步。至于控制台的行为,它超出了java的范围,依赖于环境。

相关问题