flume-如何为jms源(tibco)指定用户名/密码

qcbq4gxm  于 2021-06-04  发布在  Hadoop
关注(0)|答案(3)|浏览(445)

我最近一直在试验Flume。我目前正在使用jms源代码(http://flume.apache.org/flumeuserguide.html#jms-(来源)
但是,当我尝试运行flume代理时,它成功地创建了通道和接收器,但是在创建源时失败了。我使用的是tibco ems,我需要使用ems用户名/密码组合进行身份验证。
文档要求我定义一个密码文件,其中包含目标/提供者的密码。有没有一个正确的方法来创建这个文件(我目前有一个文本文件中存储的密码)。
下面是错误堆栈(我非常确定用户/密码是正确的)

13/11/17 11:49:08 INFO source.DefaultSourceFactory: Creating instance of source WeatherData, type jms
13/11/17 11:49:08 ERROR node.AbstractConfigurationProvider: Source WeatherData has been removed due to an error during configuration
org.apache.flume.FlumeException: Could not lookup ConnectionFactory
        at org.apache.flume.source.jms.JMSSource.doConfigure(JMSSource.java:222)
        at org.apache.flume.source.BasicSourceSemantics.configure(BasicSourceSemantics.java:65)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:331)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
Caused by: javax.naming.AuthenticationException: Not permitted: invalid name or password [Root exception is javax.jms.JMSSecurityException: invalid name or password]
        at com.tibco.tibjms.naming.TibjmsContext.lookup(TibjmsContext.java:668)
        at com.tibco.tibjms.naming.TibjmsContext.lookup(TibjmsContext.java:489)
        at javax.naming.InitialContext.lookup(InitialContext.java:392)
        at org.apache.flume.source.jms.JMSSource.doConfigure(JMSSource.java:219)
        ... 14 more
Caused by: javax.jms.JMSSecurityException: invalid name or password
        at com.tibco.tibjms.Tibjmsx.buildException(Tibjmsx.java:543)
        at com.tibco.tibjms.TibjmsConnection._create(TibjmsConnection.java:1044)
        at com.tibco.tibjms.TibjmsConnection.<init>(TibjmsConnection.java:2707)
        at com.tibco.tibjms.TibjmsQueueConnection.<init>(TibjmsQueueConnection.java:36)
        at com.tibco.tibjms.TibjmsxCFImpl._createImpl(TibjmsxCFImpl.java:186)
        at com.tibco.tibjms.TibjmsxCFImpl._createConnection(TibjmsxCFImpl.java:239)
        at com.tibco.tibjms.TibjmsQueueConnectionFactory.createQueueConnection(TibjmsQueueConnectionFactory.java:87)
        at com.tibco.tibjms.naming.TibjmsContext$Messenger.request(TibjmsContext.java:325)
        at com.tibco.tibjms.naming.TibjmsContext.lookup(TibjmsContext.java:655)
        ... 17 more
snvhrwxg

snvhrwxg1#

如果密码文件包含特殊字符(如重音符号),请确保它是用utf-8编码的。flume使用这个字符集读取文件。

nbysray5

nbysray52#

flume源中存在一个缺陷,无法将凭据正确地传递到jndi服务器。flume-2095解决了这个问题,并提出了一个补丁。您指定用户名/密码的方式可能没有问题,因为如果没有此修复程序,它将永远无法工作。
我所知道的唯一解决方法是关闭jms级别的访问控制,这将完全消除对凭据的需要,但会使队列/主题完全开放。

a8jjtwal

a8jjtwal3#

关于密码文件不确定,那似乎是特定于flume的。tibco ems在两个级别上进行身份验证:
查找连接工厂
查找目的地
一些“预构建”配置,如spring等(可能是flume),很难做到这一点,需要一些返工。
检查下面的代码,您可以看到在两个级别上应用user/pw以从tibco ems获取queue.sample:
`/**

  • @param args
    */
    public static void main(String[] args) {
    String userName = "user";
    String password = "password";
    try {
    // Obtain a JNDI connection
    Properties env = new Properties();
    env.put(Context.INITIAL_CONTEXT_FACTORY,
    "com.tibco.tibjms.naming.TibjmsInitialContextFactory");
    env.put(Context.PROVIDER_URL, "tibjmsnaming://localhost:7222");
    env.put(Context.SECURITY_PRINCIPAL, userName);
    env.put(Context.SECURITY_CREDENTIALS, password);
    // ... specify the JNDI properties specific to the vendor
    InitialContext jndi = new InitialContext(env);
    ConnectionFactory factory = (ConnectionFactory) jndi
    .lookup("ConnectionFactory");
    try {
    Connection connection = factory.createConnection(userName,
    password);
    Session session = connection.createSession();
    Queue sampleTopic = (Queue) jndi.lookup("queue.sample");
    } catch (JMSException je) {
    je.printStackTrace();
    }
    } catch (NamingException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }`

相关问题