flink任务管理器超时

iugsix8n  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(692)

随着越来越多的记录被处理,我的程序变得非常慢。我最初认为这是由于过度的内存消耗,因为我的程序是字符串密集型的(我使用的是java 11,所以应该尽可能使用紧凑的字符串),所以我增加了jvm堆:

  1. -Xms2048m
  2. -Xmx6144m

我还增加了任务管理器的内存和超时, flink-conf.yaml :

  1. jobmanager.heap.size: 6144m
  2. heartbeat.timeout: 5000000

然而,这些都无助于解决这个问题。这个程序在处理了大约350万条记录之后,仍然非常慢,只剩下大约50万条了。当程序接近350万大关时,它会变得非常慢,直到最终超时,总执行时间约为11分钟。
我在visualvm中检查了内存消耗,但内存消耗从未超过约700mb。我的flink管道如下所示:

  1. final StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(1);
  2. environment.setParallelism(1);
  3. DataStream<Tuple> stream = environment.addSource(new TPCHQuery3Source(filePaths, relations));
  4. stream.process(new TPCHQuery3Process(relations)).addSink(new FDSSink());
  5. environment.execute("FlinkDataService");

如果大部分工作是在process函数中完成的,那么我将实现数据库连接算法,并将列存储为字符串,特别是我将实现tpch基准的query 3,如果您愿意,请检查这里https://examples.citusdata.com/tpch_queries.html.
超时错误如下:

  1. java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id <id> timed out.

一旦我也犯了这个错误:

  1. Exception in thread "pool-1-thread-1" java.lang.OutOfMemoryError: Java heap space

另外,我的visualvm监控,截图是在事情变得非常缓慢的时候拍摄的:

下面是我的源函数的运行循环:

  1. while (run) {
  2. readers.forEach(reader -> {
  3. try {
  4. String line = reader.readLine();
  5. if (line != null) {
  6. Tuple tuple = lineToTuple(line, counter.get() % filePaths.size());
  7. if (tuple != null && isValidTuple(tuple)) {
  8. sourceContext.collect(tuple);
  9. }
  10. } else {
  11. closedReaders.add(reader);
  12. if (closedReaders.size() == filePaths.size()) {
  13. System.out.println("ALL FILES HAVE BEEN STREAMED");
  14. cancel();
  15. }
  16. }
  17. counter.getAndIncrement();
  18. } catch (IOException e) {
  19. e.printStackTrace();
  20. }
  21. });
  22. }

我基本上读取了我需要的3个文件的每一行,根据文件的顺序,我构造了一个tuple对象,它是我的自定义类tuple,表示表中的一行,如果它是有效的,则发出这个tuple,即fullfils在日期上的特定条件。
我还建议jvm在第一百万、一百五十万、两百万和二百五十万条记录中进行垃圾收集,如下所示:

  1. System.gc()

关于如何优化这个有什么想法吗?

4jb9z9bj

4jb9z9bj1#

这些是我在link独立集群上为计算tpc-h查询03而更改的属性。

  1. jobmanager.memory.process.size: 1600m
  2. heartbeat.timeout: 100000
  3. taskmanager.memory.process.size: 8g # defaul: 1728m

我实现了这个查询,只对order表进行流式处理,并将其他表作为一个状态。另外,我将计算作为一个无窗口查询,我认为这更有意义,而且速度更快。

  1. public class TPCHQuery03 {
  2. private final String topic = "topic-tpch-query-03";
  3. public TPCHQuery03() {
  4. this(PARAMETER_OUTPUT_LOG, "127.0.0.1", false, false, -1);
  5. }
  6. public TPCHQuery03(String output, String ipAddressSink, boolean disableOperatorChaining, boolean pinningPolicy, long maxCount) {
  7. try {
  8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  10. if (disableOperatorChaining) {
  11. env.disableOperatorChaining();
  12. }
  13. DataStream<Order> orders = env
  14. .addSource(new OrdersSource(maxCount)).name(OrdersSource.class.getSimpleName()).uid(OrdersSource.class.getSimpleName());
  15. // Filter market segment "AUTOMOBILE"
  16. // customers = customers.filter(new CustomerFilter());
  17. // Filter all Orders with o_orderdate < 12.03.1995
  18. DataStream<Order> ordersFiltered = orders
  19. .filter(new OrderDateFilter("1995-03-12")).name(OrderDateFilter.class.getSimpleName()).uid(OrderDateFilter.class.getSimpleName());
  20. // Join customers with orders and package them into a ShippingPriorityItem
  21. DataStream<ShippingPriorityItem> customerWithOrders = ordersFiltered
  22. .keyBy(new OrderKeySelector())
  23. .process(new OrderKeyedByCustomerProcessFunction(pinningPolicy)).name(OrderKeyedByCustomerProcessFunction.class.getSimpleName()).uid(OrderKeyedByCustomerProcessFunction.class.getSimpleName());
  24. // Join the last join result with Lineitems
  25. DataStream<ShippingPriorityItem> result = customerWithOrders
  26. .keyBy(new ShippingPriorityOrderKeySelector())
  27. .process(new ShippingPriorityKeyedProcessFunction(pinningPolicy)).name(ShippingPriorityKeyedProcessFunction.class.getSimpleName()).uid(ShippingPriorityKeyedProcessFunction.class.getSimpleName());
  28. // Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
  29. DataStream<ShippingPriorityItem> resultSum = result
  30. .keyBy(new ShippingPriority3KeySelector())
  31. .reduce(new SumShippingPriorityItem(pinningPolicy)).name(SumShippingPriorityItem.class.getSimpleName()).uid(SumShippingPriorityItem.class.getSimpleName());
  32. // emit result
  33. if (output.equalsIgnoreCase(PARAMETER_OUTPUT_MQTT)) {
  34. resultSum
  35. .map(new ShippingPriorityItemMap(pinningPolicy)).name(ShippingPriorityItemMap.class.getSimpleName()).uid(ShippingPriorityItemMap.class.getSimpleName())
  36. .addSink(new MqttStringPublisher(ipAddressSink, topic, pinningPolicy)).name(OPERATOR_SINK).uid(OPERATOR_SINK);
  37. } else if (output.equalsIgnoreCase(PARAMETER_OUTPUT_LOG)) {
  38. resultSum.print().name(OPERATOR_SINK).uid(OPERATOR_SINK);
  39. } else if (output.equalsIgnoreCase(PARAMETER_OUTPUT_FILE)) {
  40. StreamingFileSink<String> sink = StreamingFileSink
  41. .forRowFormat(new Path(PATH_OUTPUT_FILE), new SimpleStringEncoder<String>("UTF-8"))
  42. .withRollingPolicy(
  43. DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
  44. .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
  45. .withMaxPartSize(1024 * 1024 * 1024).build())
  46. .build();
  47. resultSum
  48. .map(new ShippingPriorityItemMap(pinningPolicy)).name(ShippingPriorityItemMap.class.getSimpleName()).uid(ShippingPriorityItemMap.class.getSimpleName())
  49. .addSink(sink).name(OPERATOR_SINK).uid(OPERATOR_SINK);
  50. } else {
  51. System.out.println("discarding output");
  52. }
  53. System.out.println("Stream job: " + TPCHQuery03.class.getSimpleName());
  54. System.out.println("Execution plan >>>\n" + env.getExecutionPlan());
  55. env.execute(TPCHQuery03.class.getSimpleName());
  56. } catch (IOException e) {
  57. e.printStackTrace();
  58. } catch (Exception e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. public static void main(String[] args) throws Exception {
  63. new TPCHQuery03();
  64. }
  65. }

这里的UDF是:ordersource、orderkeyedbycustomerprocessfunction、shippingprioritykeyedprocessfunction和sumshippingpriorityitem。我用的是 com.google.common.collect.ImmutableList 因为状态不会被更新。另外,我只保留必要的专栏,如 ImmutableList<Tuple2<Long, Double>> lineItemList .

展开查看全部
new9mtju

new9mtju2#

字符串 intern() 救了我。我在把每根线都存储在Map上之前都做过实习,效果很不错。

相关问题