java.lang.nosuchmethoderror:com.datastax.driver.core.boundstatement.set

e5nqia27  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(458)

我对春靴和Cassandra还不熟悉。我试图通过flink cassandra连接器将数据保存到cassandra表中。
pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-cassandra</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--Flink-->                                                        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.2.1</version>
        </dependency>

        <!--Flink Cassandra-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-cassandra_2.10</artifactId>
            <version>1.2.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

日期.java

package com.example.demo;

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
import org.springframework.cassandra.core.PrimaryKeyType;
import org.springframework.data.cassandra.mapping.PrimaryKeyColumn;

@Table(keyspace = "testing" ,name = "dates")
public class Date {

    public Date(int patientid, long date, long timestamp) {
        super();
        this.patientid = patientid;
        this.date = date;
        this.timestamp = timestamp;
    }
    @PrimaryKeyColumn(name = "patientid", ordinal = 0, type = PrimaryKeyType.PARTITIONED)
    private int patientid;
    @PrimaryKeyColumn(name = "date", ordinal = 1, type = PrimaryKeyType.CLUSTERED)
    private long date;

    @Column(name = "timestamp")
    private long timestamp;

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public int getPatientid() {
        return patientid;
    }

    public void setPatientid(int patientid) {
        this.patientid = patientid;
    }

    public long getDate() {
        return date;
    }

    public void setDate(long date) {
        this.date = date;
    }

}

测试.java

package com.example.demo;

import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Test implements CommandLineRunner {

    private final static Collection<Date> collection = new ArrayList<>(2);

    static {
        for (int i = 1; i <= 2; ++i) {
            collection.add(new Date(i, i, i));
        }
    }

    @Override
    public void run(String... strings) {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<Date> fromCollection = env.fromCollection(collection);

        try {

            CassandraSink
                    .addSink(fromCollection)
                    .setHost("192.168.1.20")
                    .build();

            env.execute();
        } catch (Exception e) {

        }
    }
}

完整堆栈跟踪

java.lang.NoSuchMethodError: com.datastax.driver.core.BoundStatement.set(ILjava/lang/Object;Lorg/apache/flink/cassandra/shaded/com/google/common/reflect/TypeToken;)Lcom/datastax/driver/core/BoundStatement;
    at com.datastax.driver.mapping.Mapper.setObject(Mapper.java:230)
    at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:206)
    at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:163)
    at com.datastax.driver.mapping.Mapper.saveAsync(Mapper.java:271)
    at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.send(CassandraPojoSink.java:65)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.invoke(CassandraSinkBase.java:75)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
    at java.lang.Thread.run(Thread.java:745)

我找不到合适的解决办法。请帮帮我。提前谢谢。

vql8enpb

vql8enpb1#

java.lang.nosuchmethoderror:com.datastax.driver.core.boundstatement.set(iljava/lang/object;lorg/apache/flink/cassandra/shaded/com/google/common/reflect/typetoken;)lcom/数据税/驱动程序/核心/边界声明;在com.datastax.driver.mapping.mapper.setobject(mapper。java:230)
上面的方法表明,没有方法 setObjectMapper 上课时间 com.datastax.driver.mapping 包裹。
看了jar版本的 cassandra-driver-mapping 在这里,您很可能下载了旧版本的驱动程序 flink pom中指定的依赖项。
你能确认你是否有旧版本的吗 com.datastax.driver.mapping.Mapper 类在类路径中?
希望这有帮助,祝你好运!

0lvr5msh

0lvr5msh2#

在我的例子中,我有一个复合分区键:主键(metric\u id,data\u type)
通过向我的对象的coresponding属性添加@partitionkey注解,所有操作都很好:

@PartitionKey(0)
    @Column(name = "metric_id")
    private UUID metricId;
    @PartitionKey(1)
    @Column(name = "data_type")
    private Integer dataType;

以下是pom.xml中的依赖项:

<flink.version>1.8.0</flink.version>
    ....

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-cassandra_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

链接:https://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/
希望有帮助!
当做,
阿里

相关问题