postgresql transaction parallel c#当使用多个transaction时,它们不会通过重复检查

wgeznvg7  于 2023-06-22  发布在  PostgreSQL
关注(0)|答案(1)|浏览(138)

我正在并行运行DB同步方法

var dependedRules = clients.Select(x => SynchronizationPipeLine(x.Key,_scopeFactory.CreateScope().ServiceProvider.GetRequiredService<IUnitOfWork>()));
await Parallel.ForEachAsync(dependedRules, async (r, _) => await r);

在基本同步方法中,我做了一个事务

using (var transaction = await _unitOfWork.BeginTransactionAsync(IsolationLevel.RepeatableRead))
                {
                    try
                    {
                        // Check if the project is in the database, if not, add it before
                        if (!await _unitOfWork.BaseRepositories.ProjectRepository.CheckIfRecordExists(resultProject.ProjectId))
                        {
                            resultProject.Id = await _unitOfWork.BaseRepositories.ProjectRepository.AddAsync(resultProject);
                        }
                        else
                        {
                            // if there is a record, we find it including links
                            var projectResult = await _unitOfWork.BaseRepositories.ProjectRepository.GetByProjectId(resultProject.ProjectId);
                            resultProject = projectResult;
                        }
                        await _unitOfWork.BaseRepositories.ProjectRepository.SaveChangesAsync();
                        await transaction.CommitAsync();
                    }
                    catch (Exception)
                    {
                        await transaction.RollbackAsync();
                    }
                }


im using unitOfWork

    public UnitOfWork(DbContext context,
            ILogger<UnitOfWork> logger, rep, rep2)
        {
            _dbContext = context;
            _logger = logger;
            rep= rep;
            BaseRepositories = baseRepositories;
            rep2= rep2;
        }

        public Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
        {
            _dbContext.UpdatedChangedAtDateTimestamps();
            try
            {   
                return _dbContext.SaveChangesAsync(cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e, "An error occurred while saving changes");
                throw;
            }
        }

        public IDbContextTransaction BeginTransaction(IsolationLevel? isolationLevel)
        {
            return _dbContext.Database.BeginTransaction(isolationLevel.Value);
        }

        public async Task<IDbContextTransaction> BeginTransactionAsync(IsolationLevel? isolationLevel)
        {
            return await _dbContext.Database.BeginTransactionAsync(isolationLevel.Value);
        }

和baseClassRep

public abstract class BaseRepository<T> : IRepository<T> where T : BaseEntity
    {
        protected readonly PerformanceRiskAnalyzerDbContext DbContext;
        protected readonly DbSet<T> DbSet;

        public BaseRepository(DbContext dbContext)
        {
            DbContext = dbContext;
            DbSet = DbContext.Set<T>();
        }
        public async Task<IEnumerable<T>> GetAll() 
        {
            return await DbSet.AsNoTracking().ToListAsync();
         
        }

        public async Task<long> AddAsync(T entity)
        {
        
                var entry = await DbSet.AddAsync(entity);
                return entry.Entity.Id;
       
        }

        public async Task<bool> Update(T entity)
        {
        
             DbSet.Entry(entity).State = EntityState.Modified;
                return true;
          
        }

        public async Task AddRange(IEnumerable<T> entities)
        {
            await DbSet.AddRangeAsync(entities);
         
        }

        public async Task SaveChangesAsync()
        {
            await DbContext.SaveChangesAsync();
        }

Dependencies reg AddScoped和一个conext被使用。
问题的本质,当执行事务,一个错误发生在数据库重复(它发生)用户重复),我添加到数据库和后1事务一切正常2可能下降由于重复.虽然检查的地方是我把所有的用户都从数据库中拉出来的,但好像-在另一个线程中不知道新用户
Copyright © 2019 www.js.com All rights reserved. All rights reserved. All rights reserved. All rights reserved. All rights reserved. All rights reserved. All rights reserved. All rights reserved. if(user!= null);

var sqlString = $@"INSERT INTO public.""UserAndProject""(""ProjectsId"", ""UsersId"") VALUES ({projectId}, {userId}) ON CONFLICT DO NOTHING";
     var sql = FormattableStringFactory.Create(sqlString);
     await DbContext.Database.ExecuteSqlAsync(sql);`

这种情况发生在SaveChangesAsync时
我尝试了不同的交易范围,限制没有帮助

wdebmtf2

wdebmtf21#

即使你使用的是一个工作单元,你也需要注意Db Context的生命周期,因为它是有作用域的,并且可能会导致并行进程的问题,因为它不是线程安全的,最终会被并行进程共享。
看看这个答案是否能帮助你:https://stackoverflow.com/a/61841263/1402237
简而言之,为每个流程创建一个范围

public class MainClassInConsoleNoContext()
{
    private readonly IServiceScopeFactory _scopeFactory;
    public MainClassInConsoleNoContext(IServiceScopeFactory scopeFactory)
    {
        _scopeFactory = scopeFactory;
    }

    public void MyMainAction() {
        using(var scope = scopeFactory.CreateScope())
        {   
            scope.ServiceProvider.GetService<MyServiceWithContextAndScope>().MyScopedAction();           
        }
    }
}

一个好的做法是创建一个执行操作的新服务,与并行任务分离,然后在并行运行中使用该范围,以便它为每个流程生成一个新的范围

var dependedRules = clients.Select(x => SynchronizationPipeLine(x.Key,_scopeFactory.CreateScope().ServiceProvider.GetRequiredService<IUnitOfWork>()));
await Parallel.ForEachAsync(dependedRules, async (r, _) => {
    using(var scope = scopeFactory.CreateScope())
    {   
        await scope.ServiceProvider.GetService<MyServiceWithContextAndScope>().MyScopedAction();           
    }
});
    • 编辑**

如果你有并发问题,只要接受数据库验证,删除你验证是否存在的代码。只需尝试将实体保存在单独的事务中,您不需要任何特定的隔离级别,请确保提交事务,如果由于重复键而失败,请忽略错误并继续前进。
在一个单独的事务中,然后再次查询模型,您知道它必须存在,因为前一个事务应该已经创建了实体,或者因为它已经存在而失败。
这将为您节省验证实体是否存在的第一个查询,但是您需要始终在第二个事务中查询实体,以便您可以使用该实体。
另一种选择是使用本机查询,例如使用INSERT IGNOREsimilar

相关问题