我正在并行运行DB同步方法
var dependedRules = clients.Select(x => SynchronizationPipeLine(x.Key,_scopeFactory.CreateScope().ServiceProvider.GetRequiredService<IUnitOfWork>()));
await Parallel.ForEachAsync(dependedRules, async (r, _) => await r);
在基本同步方法中,我做了一个事务
using (var transaction = await _unitOfWork.BeginTransactionAsync(IsolationLevel.RepeatableRead))
{
try
{
// Check if the project is in the database, if not, add it before
if (!await _unitOfWork.BaseRepositories.ProjectRepository.CheckIfRecordExists(resultProject.ProjectId))
{
resultProject.Id = await _unitOfWork.BaseRepositories.ProjectRepository.AddAsync(resultProject);
}
else
{
// if there is a record, we find it including links
var projectResult = await _unitOfWork.BaseRepositories.ProjectRepository.GetByProjectId(resultProject.ProjectId);
resultProject = projectResult;
}
await _unitOfWork.BaseRepositories.ProjectRepository.SaveChangesAsync();
await transaction.CommitAsync();
}
catch (Exception)
{
await transaction.RollbackAsync();
}
}
im using unitOfWork
public UnitOfWork(DbContext context,
ILogger<UnitOfWork> logger, rep, rep2)
{
_dbContext = context;
_logger = logger;
rep= rep;
BaseRepositories = baseRepositories;
rep2= rep2;
}
public Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
_dbContext.UpdatedChangedAtDateTimestamps();
try
{
return _dbContext.SaveChangesAsync(cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "An error occurred while saving changes");
throw;
}
}
public IDbContextTransaction BeginTransaction(IsolationLevel? isolationLevel)
{
return _dbContext.Database.BeginTransaction(isolationLevel.Value);
}
public async Task<IDbContextTransaction> BeginTransactionAsync(IsolationLevel? isolationLevel)
{
return await _dbContext.Database.BeginTransactionAsync(isolationLevel.Value);
}
和baseClassRep
public abstract class BaseRepository<T> : IRepository<T> where T : BaseEntity
{
protected readonly PerformanceRiskAnalyzerDbContext DbContext;
protected readonly DbSet<T> DbSet;
public BaseRepository(DbContext dbContext)
{
DbContext = dbContext;
DbSet = DbContext.Set<T>();
}
public async Task<IEnumerable<T>> GetAll()
{
return await DbSet.AsNoTracking().ToListAsync();
}
public async Task<long> AddAsync(T entity)
{
var entry = await DbSet.AddAsync(entity);
return entry.Entity.Id;
}
public async Task<bool> Update(T entity)
{
DbSet.Entry(entity).State = EntityState.Modified;
return true;
}
public async Task AddRange(IEnumerable<T> entities)
{
await DbSet.AddRangeAsync(entities);
}
public async Task SaveChangesAsync()
{
await DbContext.SaveChangesAsync();
}
Dependencies reg AddScoped和一个conext被使用。
问题的本质,当执行事务,一个错误发生在数据库重复(它发生)用户重复),我添加到数据库和后1事务一切正常2可能下降由于重复.虽然检查的地方是我把所有的用户都从数据库中拉出来的,但好像-在另一个线程中不知道新用户
Copyright © 2019 www.js.com All rights reserved. All rights reserved. All rights reserved. All rights reserved. All rights reserved. All rights reserved. All rights reserved. All rights reserved. if(user!= null);
var sqlString = $@"INSERT INTO public.""UserAndProject""(""ProjectsId"", ""UsersId"") VALUES ({projectId}, {userId}) ON CONFLICT DO NOTHING";
var sql = FormattableStringFactory.Create(sqlString);
await DbContext.Database.ExecuteSqlAsync(sql);`
这种情况发生在SaveChangesAsync时
我尝试了不同的交易范围,限制没有帮助
1条答案
按热度按时间wdebmtf21#
即使你使用的是一个工作单元,你也需要注意Db Context的生命周期,因为它是有作用域的,并且可能会导致并行进程的问题,因为它不是线程安全的,最终会被并行进程共享。
看看这个答案是否能帮助你:https://stackoverflow.com/a/61841263/1402237
简而言之,为每个流程创建一个范围
一个好的做法是创建一个执行操作的新服务,与并行任务分离,然后在并行运行中使用该范围,以便它为每个流程生成一个新的范围
如果你有并发问题,只要接受数据库验证,删除你验证是否存在的代码。只需尝试将实体保存在单独的事务中,您不需要任何特定的隔离级别,请确保提交事务,如果由于重复键而失败,请忽略错误并继续前进。
在一个单独的事务中,然后再次查询模型,您知道它必须存在,因为前一个事务应该已经创建了实体,或者因为它已经存在而失败。
这将为您节省验证实体是否存在的第一个查询,但是您需要始终在第二个事务中查询实体,以便您可以使用该实体。
另一种选择是使用本机查询,例如使用
INSERT IGNORE
或similar