loader约束冲突

7gcisfzg  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(414)

我正在Flink1.9.1集群上运行一个流式作业,并试图将值的直方图输入到我们的prometheus度量收集器中。根据flink docs中的建议,我将dropwizard直方图实现与flink提供的 Package 一起使用,但是在将作业提交到集群时,它会崩溃,并出现以下回溯:

java.lang.LinkageError: loader constraint violation: when resolving method "org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper.<init>(Lcom/codahale/metrics/Histogram;)V" the class loader (instance of org/apache/flink/util/ChildFirstClassLoader) of the current class, com/example/foo/metrics/FooMeter, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper, have different Class objects for the type com/codahale/metrics/Histogram used in the signature
    at com.example.foo.metrics.FooMeter.<init>(FooMeter.scala:11)
    at com.example.foo.transform.ValidFoos$.open(ValidFoos.scala:15)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

我在邮件列表中发现了类似的错误,但是使用 shadowJar gradle的插件没用。
有什么我不知道的吗?
此处为相关代码:

import com.codahale.metrics.{Histogram, SlidingWindowReservoir}
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
import org.apache.flink.metrics.{MetricGroup, Histogram => FlinkHistogram}

class FooMeter(metricGroup: MetricGroup, name: String) {
  private var histogram: FlinkHistogram = metricGroup.histogram(
    name, new DropwizardHistogramWrapper(new Histogram(new SlidingWindowReservoir(500))))

  def record(fooValue: Long): Unit = {
    histogram.update(fooValue)
  }
}

object ValidFoos extends RichFlatMapFunction[Try[FooData], Foo] {
  @transient private var fooMeter: FooMeter = _

  override def open(parameters: Configuration): Unit = {
    fooMeter = new FooMeter(getRuntimeContext.getMetricGroup, "foo_values")
  }

  override def flatMap(value: Try[FooData], out: Collector[FooData]): Unit = {
    Transform.validFoo(value) foreach (foo => {
      fooMeter.record(foo.value)
      out.collect(foo)
    })
  }
}

内部版本.gradle:

plugins {
    id 'scala'
    id 'application'
    id 'com.github.johnrengelman.shadow' version '2.0.4'
}

ext {
    flinkVersion = "1.9.1"
    scalaBinaryVersion = "2.11"
    scalaVersion = "2.11.12"
}

dependencies {
    implementation(
        "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-runtime-web_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-json:${flinkVersion}"
        "org.apache.flink:flink-metrics-dropwizard:${flinkVersion}",
        "org.scala-lang:scala-library:${scalaVersion}",
    )
}

shadowJar {
    relocate("org.apache.flink.dropwizard", "com.example.foo.shaded.dropwizard")
    relocate("com.codahale", "com.example.foo.shaded.codahale")
}

jar {
    zip64 = true
    archiveName = rootProject.name + '-all.jar'
    manifest {
        attributes('Main-Class': 'com.example.foo.Foo')
    }
    from {
        configurations.compileClasspath.collect {
            it.isDirectory() ? it : zipTree(it)
        }
        configurations.runtimeClasspath.collect {
            it.isDirectory() ? it : zipTree(it)
        }
    }
}

更多信息:

在本地运行代码是可行的
flink群集是使用以下目录结构自定义编译的:


# find /usr/lib/flink/

/usr/lib/flink/
/usr/lib/flink/plugins
/usr/lib/flink/plugins/flink-metrics-influxdb-1.9.1.jar
/usr/lib/flink/plugins/flink-s3-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-graphite-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-prometheus-1.9.1.jar
/usr/lib/flink/plugins/flink-cep_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-python_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-queryable-state-runtime_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-sql-client_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-slf4j-1.9.1.jar
/usr/lib/flink/plugins/flink-state-processor-api_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-oss-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-statsd-1.9.1.jar
/usr/lib/flink/plugins/flink-swift-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-gelly-scala_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-azure-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-datadog-1.9.1.jar
/usr/lib/flink/plugins/flink-shaded-netty-tcnative-dynamic-2.0.25.Final-7.0.jar
/usr/lib/flink/plugins/flink-s3-fs-presto-1.9.1.jar
/usr/lib/flink/plugins/flink-cep-scala_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-gelly_2.11-1.9.1.jar
/usr/lib/flink/lib
/usr/lib/flink/lib/flink-metrics-influxdb-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-graphite-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-prometheus-1.9.1.jar
/usr/lib/flink/lib/flink-table_2.11-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-slf4j-1.9.1.jar
/usr/lib/flink/lib/log4j-1.2.17.jar
/usr/lib/flink/lib/slf4j-log4j12-1.7.15.jar
/usr/lib/flink/lib/flink-metrics-statsd-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-datadog-1.9.1.jar
/usr/lib/flink/lib/flink-table-blink_2.11-1.9.1.jar
/usr/lib/flink/lib/flink-dist_2.11-1.9.1.jar
/usr/lib/flink/bin/...

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题