数据税Cassandra散货船

8oomwypt  于 2022-09-27  发布在  Cassandra
关注(0)|答案(2)|浏览(124)

我正在尝试使用JMX BulkLoader将数据从远程节点ETL到Cassandra集群
https://github.com/PatrickCallaghan/datastax-analytics-example/blob/master/src/main/java/com/datastax/jmxloader/JmxBulkLoader.java
然而,在成功建立JMX连接之后,似乎无法进行批量加载。
请注意,批量加载是从远程节点发送到cassandra集群的。
似乎它希望在cassandra集群的本地运行(即localhost到cassandra-cluster)
我在这里遗漏了什么吗。谁能给我提个建议吗
以下异常
java.lang.IllegalArgumentException:位于sun.reflect.GeneratedMethodAccessor21.invoke(未知源)的org.apache.cassandra.service.StorageService.bulkLoadInternal(StorageService.java:3970)的目录/XXXXXXXX无效sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)的sun.refract.misc.Trampoline.invoke(MethodUtil.java:75)的java.lang.reflect.Method.invoket(Method.java:606)的java.lang.reflect.MethodAccess的java.invoke的DelegatingMethodAssessorImpl.invooke(Delegating MethodaccessorImple.java:43)com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2(StandardMBean Introspector.java:112),位于com.sun.jmx.mbenserver.StandardMBeanInterospector.involkeM2,位于com.sun.jmx.mbeanserver.MBeanInterospector.invokeM(MBeanIndrospector.java:237),位于com.sun.jmmx.mbeansserver.PerInterface.invoke(PerInterface.java:138),位于com/sun.jmx.mbeanserver.MBean支持invoke(MBeanSupport.java:252)位于com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBearServerIntercertor.java:819),位于com.son.jmx.mbeanserver.JmxMBeanServer.invoket(JmxMB eanServer)。

class JmxBulkLoader(host: String, port: Int) {

  private var connector: JMXConnector = _

  private var storageBean: StorageServiceMBean = _

  private var timer: Timer = new Timer()

  connect("http://hostip , 7199)

 private def connect(host: String, port: Int) {

    val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host,

      port))

    Logger.info(" Connected to JMX Entity " + jmxUrl)

    val env = new HashMap[String, Any]()

    connector = JMXConnectorFactory.connect(jmxUrl, env)

    val mbeanServerConn = connector.getMBeanServerConnection

    val name = new ObjectName("org.apache.cassandra.db:type=StorageService")

    storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean])

  }

  def close() {

    connector.close()

  }

  def bulkLoad(path: String): Boolean = {

    try {

      val timer = new Stopwatch().start

      val result = storageBean.bulkLoadAsync(path)

      timer.stop

      Logger.info("Async Result of Bulk Load " + result)

      Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.")

      true

    } catch {

      case e: Exception =>

        Logger.error("Error in Bulk Loading " + e.printStackTrace())

        false

    }

  }

}
m1m5dgzv

m1m5dgzv1#

似乎它希望在cassandra集群的本地运行(即localhost到cassandra-cluster)
不完全是。但想想看:您正在使用字符串参数调用Cassandra节点的mbean函数。此调用由您正在调用的Cassandra进程执行(即连接到)。该参数指定连接到的节点一侧的路径。
您必须确保目标上存在路径,并保存您期望的数据(例如,通过共享存储或预先复制文件)。

puruo6ea

puruo6ea2#

1.该表应存在于Cassandra中
1.目录应可供cassandra节点访问(本地)。
1.目录应以键空间和目标表名结尾:
/some_path/$KeySpaceName/$TableName

相关问题