我有一个包含表DATA
和DB_CONNECTIONS
的源数据库。DB_CONNECTIONS
包含一组目标数据库,我想通过Apache Camel将DATA
复制到这些目标数据库中。
为了以编程方式构建持久性单元,我遵循了Baeldung's JPA bootstrapping guide。
我实现了自己的PersistenceUnitInfo
:
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.persistence.SharedCacheMode;
import javax.persistence.ValidationMode;
import javax.persistence.spi.ClassTransformer;
import javax.persistence.spi.PersistenceUnitInfo;
import javax.persistence.spi.PersistenceUnitTransactionType;
import javax.sql.DataSource;
public class HibernatePersistenceUnitInfo implements PersistenceUnitInfo {
public static String JPA_VERSION = "2.1";
private String persistenceUnitName;
private String persistenceProviderClassName = "org.hibernate.jpa.HibernatePersistenceProvider";
private PersistenceUnitTransactionType transactionType = PersistenceUnitTransactionType.RESOURCE_LOCAL;
private List<String> managedClassNames;
private List<String> mappingFileNames = new ArrayList<>();
private Properties properties;
private DataSource jtaDataSource;
private DataSource nonjtaDataSource;
private List<ClassTransformer> transformers = new ArrayList<>();
public HibernatePersistenceUnitInfo(String persistenceUnitName, List<String> managedClassNames, Properties properties) {
this.persistenceUnitName = persistenceUnitName;
this.managedClassNames = managedClassNames;
this.properties = properties;
}
@Override
public String getPersistenceUnitName() {
return persistenceUnitName;
}
@Override
public PersistenceUnitTransactionType getTransactionType() {
return transactionType;
}
@Override
public DataSource getJtaDataSource() {
return jtaDataSource;
}
@Override
public DataSource getNonJtaDataSource() {
return nonjtaDataSource;
}
@Override
public List<String> getMappingFileNames() {
return mappingFileNames;
}
@Override
public List<String> getManagedClassNames() {
return managedClassNames;
}
@Override
public Properties getProperties() {
return properties;
}
@Override
public void addTransformer(ClassTransformer transformer) {
transformers.add(transformer);
}
@Override
public String getPersistenceProviderClassName() {
return persistenceProviderClassName;
}
@Override
public boolean excludeUnlistedClasses() {
return false;
}
@Override
public List<URL> getJarFileUrls() {
// TODO Auto-generated method stub
return null;
}
@Override
public URL getPersistenceUnitRootUrl() {
// TODO Auto-generated method stub
return null;
}
@Override
public SharedCacheMode getSharedCacheMode() {
return SharedCacheMode.UNSPECIFIED;
}
@Override
public ValidationMode getValidationMode() {
return ValidationMode.AUTO;
}
@Override
public String getPersistenceXMLSchemaVersion() {
// TODO Auto-generated method stub
return null;
}
@Override
public ClassLoader getClassLoader() {
// TODO Auto-generated method stub
return null;
}
@Override
public ClassLoader getNewTempClassLoader() {
// TODO Auto-generated method stub
return null;
}
}
和一个JpaEntityManagerFactory
:
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.spi.PersistenceUnitInfo;
import javax.sql.DataSource;
import org.hibernate.jpa.boot.internal.EntityManagerFactoryBuilderImpl;
import org.hibernate.jpa.boot.internal.PersistenceUnitInfoDescriptor;
import oracle.jdbc.replay.OracleDataSourceImpl;
import my.package.Data;
public class JpaEntityManagerFactory {
private String dbUrl;
private String dbUser;
private String dbPass;
private Class[] entityClasses;
private String persistenceUnitName;
private String dbConnectionName;
public JpaEntityManagerFactory(String persistenceUnitName, String dbConnectionName) {
this.entityClasses = new Class[] {Data.class};
this.dbConnectionName= dbConnectionName;
this.persistenceUnitName = persistenceUnitName;
// TODO: Fill values from DB_CONNECTIONS table by using dbConnectionName
dbUrl = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=abc)(Port=123))(CONNECT_DATA=(SERVICE_NAME=XYZ)))";
dbUser = "sa";
dbPass = "sa";
}
public EntityManager getEntityManager() {
return getEntityManagerFactory().createEntityManager();
}
public EntityManagerFactory getEntityManagerFactory() {
PersistenceUnitInfo persistenceUnitInfo = getPersistenceUnitInfo(persistenceUnitName);
Map<String, Object> configuration = new HashMap<>();
return new EntityManagerFactoryBuilderImpl(new PersistenceUnitInfoDescriptor(persistenceUnitInfo), configuration).build();
}
protected HibernatePersistenceUnitInfo getPersistenceUnitInfo(String name) {
return new HibernatePersistenceUnitInfo(name, getEntityClassNames(), getProperties());
}
protected List<String> getEntityClassNames() {
return Arrays.asList(getEntities()).stream().map(Class::getName).collect(Collectors.toList());
}
protected Properties getProperties() {
Properties properties = new Properties();
properties.put("hibernate.dialect", "org.hibernate.dialect.Oracle8iDialect");
properties.put("hibernate.show_sql", false);
properties.put("hibernate.hbm2ddl.auto", "none");
properties.put("hibernate.connection.datasource", getOracleDataSource());
return properties;
}
protected Class[] getEntities() {
return entityClasses;
}
protected DataSource getOracleDataSource() {
OracleDataSourceImpl dataSource = new OracleDataSourceImpl();
try {
dataSource.setURL(dbUrl);
dataSource.setUser(dbUser);
dataSource.setPassword(dbPass);
} catch (SQLException e) {
e.printStackTrace();
}
return dataSource;
}
}
因为我们使用的是Oracle数据库,所以我从Baeldung(他们使用的是MySQL数据库)交换了hibernate.connection.datasource
值,但我不确定我是否做对了。
最后,在Camel中,我使用我的JpaEntityManagerFactory
构建了多个具有不同持久性单元的JPA端点:
public void startup() {
var factory = Persistence.createEntityManagerFactory(SOURCE_DB_PERSISTENCE_UNIT_NAME);
var jpaComponent = new JpaComponent();
jpaComponent.setJoinTransaction(false);
jpaComponent.setEntityManagerFactory(factory);
jpaComponent.setTransactionManager(new JpaTransactionManager(factory));
jpaComponent.setSharedEntityManager(true);
jpaComponent.setLazyStartProducer(true);
jpaComponent.setAliases(getAliases());
context.addComponent("jpaComponent", jpaComponent);
try {
// setting up the source
var consumer = (JpaEndpoint) jpaComponent.createEndpoint("jpa://Data");
consumer.setNamedQuery(NAMED_QUERY_DATA);
consumer.setConsumeDelete(false);
consumer.setPersistenceUnit(SOURCE_DB_PERSISTENCE_UNIT_NAME);
consumer.setEntityManagerFactory(Persistence.createEntityManagerFactory(SOURCE_DB_PERSISTENCE_UNIT_NAME));
consumer.setTimeUnit(TIME_UNIT);
consumer.setInitialDelay(INITIAL_DELAY);
consumer.setDelay(DELAY);
consumer.setMaxMessagesPerPoll(MAX_MESSAGES_PER_POLL);
consumer.setJoinTransaction(false);
consumer.setExchangePattern(ExchangePattern.InOut);
context.addEndpoint("jpa://Data", consumer);
var persistenceUnits = getPersistenceUnitsMap(); // contains an enum of my destination databases
// setting up the multiple destinations
for (var entry : persistenceUnits.entrySet()) {
var database = entry.getKey();
var databaseName = database.name();
var persistenceUnit = entry.getValue();
var entityManagerFactory = new JpaEntityManagerFactory(databaseName, persistenceUnit).getEntityManagerFactory();
var producer = (JpaEndpoint) jpaComponent.createEndpoint("jpa://Data" + databaseName);
producer.setEntityManagerFactory(entityManagerFactory);
producer.setPersistenceUnit(persistenceUnit);
producer.setTransactionManager(new JpaTransactionManager(entityManagerFactory));
producer.setJoinTransaction(false);
context.addEndpoint("jpa://Data" + databaseName, producer);
}
context.addRoutes(new DataRoute());
context.start();
} catch (final Exception e) {
logger.error("Error starting up camel", e);
}
}
// used to give different names (jpa://DataDB1, jpa://DataDB2 etc) to the different endpoints, all mapping to the same Entity though
private Map<String, Class<?>> getAliases() {
Map<String, Class<?>> aliases = new HashMap<>();
for (var entry : getPersistenceUnitsMap().entrySet()) {
var database = entry.getKey();
aliases.put("jpa://Data" + database.name(), Data.class);
}
return aliases;
}
我的“ Camel "路线如下所示:
@Override
public void configure() throws Exception {
from("jpa://Data").routeId("dataRoute")
.choice().when(simple("${body.targetDatabase} == 'DB1' ")).to("jpa://Data" + Database.DB1.name()).endChoice()
.when(simple("${body.targetDatabase} == 'DB2' ")).to("jpa://Data" + Database.DB2.name()).endChoice()
.when(simple("${body.targetDatabase} == 'DB3' ")).to("jpa://Data" + Database.DB3.name()).endChoice()
.......
.otherwise().process(ex -> {
// some logging
}).endChoice().end();
}
现在,我面临的问题是以下例外:
org.apache.camel.component.jpa.JpaConsumer (Camel (camel-1) thread #24 - jpa://Data) ..component.jpa.JpaConsumer (361) Acquiring exclusive lock on entity: my.package.Data@768d98ea
org.apache.camel.component.jpa.JpaConsumer (Camel (camel-1) thread #24 - jpa://Data) ..component.jpa.JpaConsumer (369) Failed to achieve lock on entity: my.package.Data@768d98ea. Reason: no transaction is in progress: javax.persistence.TransactionRequiredException: no transaction is in progress
at org.hibernate@5.3.11.SP1-redhat-00001//org.hibernate.internal.AbstractSharedSessionContract.checkTransactionNeededForUpdateOperation(AbstractSharedSessionContract.java:398)
at org.hibernate@5.3.11.SP1-redhat-00001//org.hibernate.internal.SessionImpl.checkTransactionNeededForUpdateOperation(SessionImpl.java:3589)
at org.hibernate@5.3.11.SP1-redhat-00001//org.hibernate.internal.SessionImpl.lock(SessionImpl.java:3615)
at org.hibernate@5.3.11.SP1-redhat-00001//org.hibernate.internal.SessionImpl.lock(SessionImpl.java:3609)
at deployment.camel-db-connector-0.0.1-SNAPSHOT.war//org.apache.camel.component.jpa.JpaConsumer.lockEntity(JpaConsumer.java:365)
似乎没有启动任何事务。我必须更改什么?
1条答案
按热度按时间uqjltbpv1#
您可能需要告诉消费者开始交易