apacheflink:如何并行执行但保持消息的顺序?

wvt8vs2t  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(351)

关于Flink的平行性,我有几个问题。这是我的设置:
我有1个主节点和2个从节点。在flink中,我创建了3个Kafka消费者,每个消费者都从不同的主题消费。
因为元素的顺序对我很重要,所以每个主题只有一个分区,我有flink设置来使用事件时间。
然后在每个数据流上运行以下管道(伪代码):

source
.map(deserialize)
.window
.apply
.map(serialize)
.writeTo(sink)

直到现在我才开始我的flink程序 -p 2 假设这将允许我使用两个节点。结果不是我所希望的,因为我的输出顺序有时会混乱。
在阅读完flink文档并试图更好地理解它之后,有人能确认我的以下“经验教训”吗?
1.)通过 -p 2 仅配置任务并行性,即任务的最大并行示例数(例如 map(deserialize) )将被分成两部分。如果我想通过整个管道维持秩序,我必须使用 -p 1 .
2.)这在我看来似乎是矛盾的/混乱的:即使并行度设置为1,不同的任务仍然可以并行运行(同时)。因此,我的3条管道也将平行运行,如果我通过 -p 1 .
作为一个后续问题:有没有办法找出哪些任务Map到哪个任务槽,以便我自己确认并行执行?
如果您有任何意见,我将不胜感激!
更新
这是Flink的行刑计划 -p 2 .

gcuhipw9

gcuhipw91#

请在下面找到一个使用边输出和插槽组进行本地扩展的示例。

package org.example

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
  * This example shows an implementation of WordCount with data from a text socket.
  * To run the example make sure that the service providing the text data is already up and running.
  *
  * To start an example socket text stream on your local machine run netcat from a command line,
  * where the parameter specifies the port number:
  *
  * {{{
  *   nc -lk 9999
  * }}}
  *
  * Usage:
  * {{{
  *   SocketTextStreamWordCount <hostname> <port> <output path>
  * }}}
  *
  * This example shows how to:
  *
  *   - use StreamExecutionEnvironment.socketTextStream
  *   - write a simple Flink Streaming program in scala.
  *   - write and use user-defined functions.
  */
object SocketTextStreamWordCount {

  def main(args: Array[String]) {
    if (args.length != 2) {
      System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
      return
    }

    val hostName = args(0)
    val port = args(1).toInt
    val outputTag1 = OutputTag[String]("side-1")
    val outputTag2 = OutputTag[String]("side-2")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.enableObjectReuse()

    //Create streams for names and ages by mapping the inputs to the corresponding objects
    val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement")
    val counts = text.flatMap {
      _.toLowerCase.split("\\W+") filter {
        _.nonEmpty
      }
    }
      .process(new ProcessFunction[String, String] {
        override def processElement(
                                     value: String,
                                     ctx: ProcessFunction[String, String]#Context,
                                     out: Collector[String]): Unit = {
          if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))
          else ctx.output(outputTag2, String.valueOf(value))
        }
      })

    val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)
    val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)

    val output1 = sideOutputStream1.map {
      (_, 1)
    }.slotSharingGroup("map1")
      .keyBy(0)
      .sum(1)

    val output2 = sideOutputStream2.map {
      (_, 1)
    }.slotSharingGroup("map2")
      .keyBy(0)
      .sum(1)

    output1.print()
    output2.print()

    env.execute("Scala SocketTextStreamWordCount Example")
  }

}
4nkexdtk

4nkexdtk2#

在问了apache flink用户电子邮件列表上的问题之后,下面是答案:
(1)the -p 选项定义每个作业的任务并行度。如果选择的并行度高于1,并且数据被重新分配(例如,通过rebalance()或keyby()),则不能保证顺序。
2.)与 -p 设置为1仅使用1个任务插槽,即1个cpu内核。因此,可能有多个线程同时在一个内核上运行,但不是并行运行。
至于我的要求:为了并行运行多个管道并保持顺序,我可以运行多个flink作业,而不是在同一个flink作业中运行所有管道。

x6492ojm

x6492ojm3#

我会尽量用我所知道的来回答。
1) 是的,对于cli客户端,parallelism参数可以用-p指定。你说得对,这是并行示例的最大数目。但是,我没有看到并行性和顺序之间的联系?据我所知,顺序是由flink用事件中提供的时间戳或他自己的摄取时间戳来管理的。如果您想在不同的数据源中保持秩序,我觉得这很复杂,或者您可以将这些不同的数据源合并为一个。
2) 如果将parallelism设置为3,则3条管道可以并行运行。我认为这里的平行性意味着在不同的插槽上。
后续问题)您可以在以下位置检查哪些任务Map到jobmanager的web前端上的哪个任务槽:http://localhost:8081.

相关问题