通过Apache Camel路由将数据发送到引导的JPA数据库连接

kmbjn2e3  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(135)

我有一个包含表DATADB_CONNECTIONS的源数据库。DB_CONNECTIONS包含一组目标数据库,我想通过Apache Camel将DATA复制到这些目标数据库中。
为了以编程方式构建持久性单元,我遵循了Baeldung's JPA bootstrapping guide
我实现了自己的PersistenceUnitInfo

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.persistence.SharedCacheMode;
import javax.persistence.ValidationMode;
import javax.persistence.spi.ClassTransformer;
import javax.persistence.spi.PersistenceUnitInfo;
import javax.persistence.spi.PersistenceUnitTransactionType;
import javax.sql.DataSource;

public class HibernatePersistenceUnitInfo implements PersistenceUnitInfo {

    public static String JPA_VERSION = "2.1";
    private String persistenceUnitName;
    private String persistenceProviderClassName = "org.hibernate.jpa.HibernatePersistenceProvider";
    private PersistenceUnitTransactionType transactionType = PersistenceUnitTransactionType.RESOURCE_LOCAL;
    private List<String> managedClassNames;
    private List<String> mappingFileNames = new ArrayList<>();
    private Properties properties;
    private DataSource jtaDataSource;
    private DataSource nonjtaDataSource;
    private List<ClassTransformer> transformers = new ArrayList<>();

    public HibernatePersistenceUnitInfo(String persistenceUnitName, List<String> managedClassNames, Properties properties) {
        this.persistenceUnitName = persistenceUnitName;
        this.managedClassNames = managedClassNames;
        this.properties = properties;
    }

    @Override
    public String getPersistenceUnitName() {
        return persistenceUnitName;
    }

    @Override
    public PersistenceUnitTransactionType getTransactionType() {
        return transactionType;
    }

    @Override
    public DataSource getJtaDataSource() {
        return jtaDataSource;
    }

    @Override
    public DataSource getNonJtaDataSource() {
        return nonjtaDataSource;
    }

    @Override
    public List<String> getMappingFileNames() {
        return mappingFileNames;
    }

    @Override
    public List<String> getManagedClassNames() {
        return managedClassNames;
    }

    @Override
    public Properties getProperties() {
        return properties;
    }

    @Override
    public void addTransformer(ClassTransformer transformer) {
        transformers.add(transformer);
    }

    @Override
    public String getPersistenceProviderClassName() {
        return persistenceProviderClassName;
    }

    @Override
    public boolean excludeUnlistedClasses() {
        return false;
    }

    @Override
    public List<URL> getJarFileUrls() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public URL getPersistenceUnitRootUrl() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public SharedCacheMode getSharedCacheMode() {
        return SharedCacheMode.UNSPECIFIED;
    }

    @Override
    public ValidationMode getValidationMode() {
        return ValidationMode.AUTO;
    }

    @Override
    public String getPersistenceXMLSchemaVersion() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public ClassLoader getClassLoader() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public ClassLoader getNewTempClassLoader() {
        // TODO Auto-generated method stub
        return null;
    }
}

和一个JpaEntityManagerFactory

import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.spi.PersistenceUnitInfo;
import javax.sql.DataSource;
import org.hibernate.jpa.boot.internal.EntityManagerFactoryBuilderImpl;
import org.hibernate.jpa.boot.internal.PersistenceUnitInfoDescriptor;
import oracle.jdbc.replay.OracleDataSourceImpl;
import my.package.Data;

public class JpaEntityManagerFactory {
    private String dbUrl;
    private String dbUser;
    private String dbPass;
    private Class[] entityClasses;
    private String persistenceUnitName;
    private String dbConnectionName;

    public JpaEntityManagerFactory(String persistenceUnitName, String dbConnectionName) {
        this.entityClasses = new Class[] {Data.class};
        this.dbConnectionName= dbConnectionName;
        this.persistenceUnitName = persistenceUnitName;

        // TODO: Fill values from DB_CONNECTIONS table by using dbConnectionName
        dbUrl = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=abc)(Port=123))(CONNECT_DATA=(SERVICE_NAME=XYZ)))";
        dbUser = "sa";
        dbPass = "sa";
    }

    public EntityManager getEntityManager() {
        return getEntityManagerFactory().createEntityManager();
    }

    public EntityManagerFactory getEntityManagerFactory() {
        PersistenceUnitInfo persistenceUnitInfo = getPersistenceUnitInfo(persistenceUnitName);
        Map<String, Object> configuration = new HashMap<>();
        return new EntityManagerFactoryBuilderImpl(new PersistenceUnitInfoDescriptor(persistenceUnitInfo), configuration).build();
    }

    protected HibernatePersistenceUnitInfo getPersistenceUnitInfo(String name) {
        return new HibernatePersistenceUnitInfo(name, getEntityClassNames(), getProperties());
    }

    protected List<String> getEntityClassNames() {
        return Arrays.asList(getEntities()).stream().map(Class::getName).collect(Collectors.toList());
    }

    protected Properties getProperties() {
        Properties properties = new Properties();
        properties.put("hibernate.dialect", "org.hibernate.dialect.Oracle8iDialect");
        properties.put("hibernate.show_sql", false);
        properties.put("hibernate.hbm2ddl.auto", "none");
        properties.put("hibernate.connection.datasource", getOracleDataSource());
        return properties;
    }

    protected Class[] getEntities() {
        return entityClasses;
    }

    protected DataSource getOracleDataSource() {
        OracleDataSourceImpl dataSource = new OracleDataSourceImpl();
        try {
            dataSource.setURL(dbUrl);
            dataSource.setUser(dbUser);
            dataSource.setPassword(dbPass);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return dataSource;
    }
}

因为我们使用的是Oracle数据库,所以我从Baeldung(他们使用的是MySQL数据库)交换了hibernate.connection.datasource值,但我不确定我是否做对了。
最后,在Camel中,我使用我的JpaEntityManagerFactory构建了多个具有不同持久性单元的JPA端点:

public void startup() {
        var factory = Persistence.createEntityManagerFactory(SOURCE_DB_PERSISTENCE_UNIT_NAME);

        var jpaComponent = new JpaComponent();
        jpaComponent.setJoinTransaction(false);
        jpaComponent.setEntityManagerFactory(factory);
        jpaComponent.setTransactionManager(new JpaTransactionManager(factory));
        jpaComponent.setSharedEntityManager(true);
        jpaComponent.setLazyStartProducer(true);
        jpaComponent.setAliases(getAliases());
        context.addComponent("jpaComponent", jpaComponent);

        try {
            // setting up the source
            var consumer = (JpaEndpoint) jpaComponent.createEndpoint("jpa://Data");
            consumer.setNamedQuery(NAMED_QUERY_DATA);
            consumer.setConsumeDelete(false);
            consumer.setPersistenceUnit(SOURCE_DB_PERSISTENCE_UNIT_NAME);
            consumer.setEntityManagerFactory(Persistence.createEntityManagerFactory(SOURCE_DB_PERSISTENCE_UNIT_NAME));
            consumer.setTimeUnit(TIME_UNIT);
            consumer.setInitialDelay(INITIAL_DELAY);
            consumer.setDelay(DELAY);
            consumer.setMaxMessagesPerPoll(MAX_MESSAGES_PER_POLL);
            consumer.setJoinTransaction(false);
            consumer.setExchangePattern(ExchangePattern.InOut);
            context.addEndpoint("jpa://Data", consumer);

            var persistenceUnits = getPersistenceUnitsMap(); // contains an enum of my destination databases

            // setting up the multiple destinations
            for (var entry : persistenceUnits.entrySet()) {
                var database = entry.getKey();
                var databaseName = database.name();
                var persistenceUnit = entry.getValue();

                var entityManagerFactory = new JpaEntityManagerFactory(databaseName, persistenceUnit).getEntityManagerFactory();

                var producer = (JpaEndpoint) jpaComponent.createEndpoint("jpa://Data" + databaseName);
                producer.setEntityManagerFactory(entityManagerFactory);
                producer.setPersistenceUnit(persistenceUnit);
                producer.setTransactionManager(new JpaTransactionManager(entityManagerFactory));
                producer.setJoinTransaction(false);
                context.addEndpoint("jpa://Data" + databaseName, producer);
            }

            context.addRoutes(new DataRoute());
            context.start();
        } catch (final Exception e) {
            logger.error("Error starting up camel", e);
        }
    }

    // used to give different names (jpa://DataDB1, jpa://DataDB2 etc) to the different endpoints, all mapping to the same Entity though
    private Map<String, Class<?>> getAliases() {
        Map<String, Class<?>> aliases = new HashMap<>();

        for (var entry : getPersistenceUnitsMap().entrySet()) {
            var database = entry.getKey();
            aliases.put("jpa://Data" + database.name(), Data.class);
        }
        return aliases;
    }

我的“ Camel "路线如下所示:

@Override
    public void configure() throws Exception {

        from("jpa://Data").routeId("dataRoute")
        .choice().when(simple("${body.targetDatabase} == 'DB1' ")).to("jpa://Data" + Database.DB1.name()).endChoice()
        .when(simple("${body.targetDatabase} == 'DB2' ")).to("jpa://Data" + Database.DB2.name()).endChoice()
        .when(simple("${body.targetDatabase} == 'DB3' ")).to("jpa://Data" + Database.DB3.name()).endChoice()
        .......
        .otherwise().process(ex -> {
            // some logging
        }).endChoice().end();
    }

现在,我面临的问题是以下例外:

org.apache.camel.component.jpa.JpaConsumer (Camel (camel-1) thread #24 - jpa://Data) ..component.jpa.JpaConsumer (361) Acquiring exclusive lock on entity: my.package.Data@768d98ea
org.apache.camel.component.jpa.JpaConsumer (Camel (camel-1) thread #24 - jpa://Data) ..component.jpa.JpaConsumer (369) Failed to achieve lock on entity: my.package.Data@768d98ea. Reason: no transaction is in progress: javax.persistence.TransactionRequiredException: no transaction is in progress
    at org.hibernate@5.3.11.SP1-redhat-00001//org.hibernate.internal.AbstractSharedSessionContract.checkTransactionNeededForUpdateOperation(AbstractSharedSessionContract.java:398)
    at org.hibernate@5.3.11.SP1-redhat-00001//org.hibernate.internal.SessionImpl.checkTransactionNeededForUpdateOperation(SessionImpl.java:3589)
    at org.hibernate@5.3.11.SP1-redhat-00001//org.hibernate.internal.SessionImpl.lock(SessionImpl.java:3615)
    at org.hibernate@5.3.11.SP1-redhat-00001//org.hibernate.internal.SessionImpl.lock(SessionImpl.java:3609)
    at deployment.camel-db-connector-0.0.1-SNAPSHOT.war//org.apache.camel.component.jpa.JpaConsumer.lockEntity(JpaConsumer.java:365)

似乎没有启动任何事务。我必须更改什么?

uqjltbpv

uqjltbpv1#

您可能需要告诉消费者开始交易

from("jpa://Data?transacted=true")

相关问题