我正在尝试使用JMX BulkLoader将数据从远程节点ETL到Cassandra集群
https://github.com/PatrickCallaghan/datastax-analytics-example/blob/master/src/main/java/com/datastax/jmxloader/JmxBulkLoader.java
然而,在成功建立JMX连接之后,似乎无法进行批量加载。
请注意,批量加载是从远程节点发送到cassandra集群的。
似乎它希望在cassandra集群的本地运行(即localhost到cassandra-cluster)
我在这里遗漏了什么吗。谁能给我提个建议吗
以下异常
java.lang.IllegalArgumentException:位于sun.reflect.GeneratedMethodAccessor21.invoke(未知源)的org.apache.cassandra.service.StorageService.bulkLoadInternal(StorageService.java:3970)的目录/XXXXXXXX无效sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)的sun.refract.misc.Trampoline.invoke(MethodUtil.java:75)的java.lang.reflect.Method.invoket(Method.java:606)的java.lang.reflect.MethodAccess的java.invoke的DelegatingMethodAssessorImpl.invooke(Delegating MethodaccessorImple.java:43)com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2(StandardMBean Introspector.java:112),位于com.sun.jmx.mbenserver.StandardMBeanInterospector.involkeM2,位于com.sun.jmx.mbeanserver.MBeanInterospector.invokeM(MBeanIndrospector.java:237),位于com.sun.jmmx.mbeansserver.PerInterface.invoke(PerInterface.java:138),位于com/sun.jmx.mbeanserver.MBean支持invoke(MBeanSupport.java:252)位于com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBearServerIntercertor.java:819),位于com.son.jmx.mbeanserver.JmxMBeanServer.invoket(JmxMB eanServer)。
class JmxBulkLoader(host: String, port: Int) {
private var connector: JMXConnector = _
private var storageBean: StorageServiceMBean = _
private var timer: Timer = new Timer()
connect("http://hostip , 7199)
private def connect(host: String, port: Int) {
val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host,
port))
Logger.info(" Connected to JMX Entity " + jmxUrl)
val env = new HashMap[String, Any]()
connector = JMXConnectorFactory.connect(jmxUrl, env)
val mbeanServerConn = connector.getMBeanServerConnection
val name = new ObjectName("org.apache.cassandra.db:type=StorageService")
storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean])
}
def close() {
connector.close()
}
def bulkLoad(path: String): Boolean = {
try {
val timer = new Stopwatch().start
val result = storageBean.bulkLoadAsync(path)
timer.stop
Logger.info("Async Result of Bulk Load " + result)
Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.")
true
} catch {
case e: Exception =>
Logger.error("Error in Bulk Loading " + e.printStackTrace())
false
}
}
}
2条答案
按热度按时间m1m5dgzv1#
似乎它希望在cassandra集群的本地运行(即localhost到cassandra-cluster)
不完全是。但想想看:您正在使用字符串参数调用Cassandra节点的mbean函数。此调用由您正在调用的Cassandra进程执行(即连接到)。该参数指定连接到的节点一侧的路径。
您必须确保目标上存在路径,并保存您期望的数据(例如,通过共享存储或预先复制文件)。
puruo6ea2#
1.该表应存在于Cassandra中
1.目录应可供cassandra节点访问(本地)。
1.目录应以键空间和目标表名结尾:
/some_path/$KeySpaceName/$TableName