我使用了2个docker flink图片和下面的示例代码。amidst是一个支持flink的概率图形模型框架。
一个映像作为jobmanager运行,另一个作为taskmanager运行。jm可以通过dns访问,我根据这些映像使用的bin/flink-console.sh中的启动脚本提供自己的log4j.properties。
public class ParallelMLExample {
private static final Logger LOG = LoggerFactory.getLogger(ParallelMLExample.class);
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env;
//Set-up Flink session
env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
//generate a random dataset
DataFlink<DataInstance> dataFlink = new DataSetGenerator().generate(env, 1234, 1000, 5, 0);
//Creates a DAG with the NaiveBayes structure for the random dataset
DAG dag = DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(), "DiscreteVar4");
LOG.info(dag.toString());
//Create the Learner object
ParameterLearningAlgorithm learningAlgorithmFlink = new ParallelMaximumLikelihood();
//Learning parameters
learningAlgorithmFlink.setBatchSize(10);
learningAlgorithmFlink.setDAG(dag);
//Initialize the learning process
learningAlgorithmFlink.initLearning();
//Learn from the flink data
LOG.info("########## BEFORE UPDATEMODEL ##########");
learningAlgorithmFlink.updateModel(dataFlink);
LOG.info("########## AFTER UPDATEMODEL ##########");
//Print the learnt Bayes Net
BayesianNetwork bn = learningAlgorithmFlink.getLearntBayesianNetwork();
LOG.info(bn.toString());
}
}
问题是在updatemodel调用之前,我只能看到log.info()条目。沉默之后。如果我注解掉这个调用,我可以看到其他条目。我是故意让Flink的作品安静下来的。
Creating flink_jobmanager_1 ... done
Creating flink_jobmanager_1 ...
Creating flink_taskmanager_1 ... done
Attaching to flink_jobmanager_1, flink_taskmanager_1
jobmanager_1 | Starting Job Manager
jobmanager_1 | config file:
taskmanager_1 | Starting Task Manager
jobmanager_1 | jobmanager.rpc.address: jobmanager
taskmanager_1 | config file:
jobmanager_1 | jobmanager.rpc.port: 6123
jobmanager_1 | jobmanager.heap.mb: 1024
taskmanager_1 | jobmanager.rpc.address: jobmanager
jobmanager_1 | taskmanager.heap.mb: 1024
taskmanager_1 | jobmanager.rpc.port: 6123
jobmanager_1 | taskmanager.numberOfTaskSlots: 1
taskmanager_1 | jobmanager.heap.mb: 1024
jobmanager_1 | taskmanager.memory.preallocate: false
taskmanager_1 | taskmanager.heap.mb: 1024
jobmanager_1 | parallelism.default: 1
taskmanager_1 | taskmanager.numberOfTaskSlots: 2
jobmanager_1 | web.port: 8081
taskmanager_1 | taskmanager.memory.preallocate: false
jobmanager_1 | blob.server.port: 6124
taskmanager_1 | parallelism.default: 1
jobmanager_1 | query.server.port: 6125
taskmanager_1 | web.port: 8081
jobmanager_1 | Starting jobmanager as a console application on host c16d9156ff68.
taskmanager_1 | blob.server.port: 6124
taskmanager_1 | query.server.port: 6125
taskmanager_1 | Starting taskmanager as a console application on host 76c78378d35c.
jobmanager_1 | 2018-02-18 15:31:42,809 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
taskmanager_1 | 2018-02-18 15:31:43,897 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
jobmanager_1 | 2018-02-18 15:32:18,667 INFO com.ness.ParallelMLExample - DAG
jobmanager_1 | DiscreteVar0 has 1 parent(s): {DiscreteVar4}
jobmanager_1 | DiscreteVar1 has 1 parent(s): {DiscreteVar4}
jobmanager_1 | DiscreteVar2 has 1 parent(s): {DiscreteVar4}
jobmanager_1 | DiscreteVar3 has 1 parent(s): {DiscreteVar4}
jobmanager_1 | DiscreteVar4 has 0 parent(s): {}
jobmanager_1 |
jobmanager_1 | 2018-02-18 15:32:18,679 INFO com.ness.ParallelMLExample - ########## BEFORE UPDATEMODEL ##########
updatemodel方法从一个新的configuration()开始,然后检索数据集。然后,它运行一个map,reduce并针对提供的数据集进行收集,但似乎不会与根记录器发生冲突。。。
我错过了什么?
暂无答案!
目前还没有任何答案,快来回答吧!