.net 使用NSubstitute在Azure函数中模拟EventHubAsyncCollector

uidvcgyl  于 2023-11-20  发布在  .NET
关注(0)|答案(1)|浏览(167)

一个简单的Azure函数:

  1. [FunctionName("TestFunction")]
  2. public async Task Run([CosmosDBTrigger(
  3. databaseName: "Test",
  4. containerName: "test",
  5. CreateLeaseContainerIfNotExists = true,
  6. MaxItemsPerInvocation = 200,
  7. FeedPollDelay = 10000,
  8. Connection = "TestConnection",
  9. LeaseContainerName = "testLeases")]IReadOnlyList<Document> inputs,
  10. [EventHub("test", Connection = "TestConnectionString")] IAsyncCollector<EventData> outputEvents,
  11. ILogger log)
  12. {
  13. if (inputs != null && inputs.Count > 0)
  14. {
  15. try
  16. {
  17. await inputs.ForEachAsync(dop: 10, input =>
  18. {
  19. var id = input.GetPropertyValue<string>("id");
  20. var eventData = new EventData(Encoding.UTF8.GetBytes(input.ToString()));
  21. return outputEvents.AddAsync(eventData, id);
  22. });
  23. }
  24. catch (Exception exception)
  25. {
  26. _logger.LogError(exception, $"Error occurred while sending reports feed to datalake: {exception.Message}");
  27. }
  28. }
  29. }

字符串
我尝试模拟outputEvents并AssertAddAsync(eventData,id)已收到调用。虽然对于AddAsync(eventData)没有问题,但对于上述情况有点问题,因为我收到来自EventHubWebJobsExtensions的异常:

  1. public static Task AddAsync(this IAsyncCollector<EventData> instance, EventData eventData, string partitionKey, CancellationToken cancellationToken = default(CancellationToken)) =>
  2. instance switch
  3. {
  4. EventHubAsyncCollector ehCollector => ehCollector.AddAsync(eventData, partitionKey, cancellationToken),
  5. _ => throw new InvalidOperationException("Adding with a partition key is only available when using the Event Hubs extension package.")
  6. };


对于AddAsync(eventData),我简单地使用建议的方法:

  1. public class MockAsyncCollector<T> : IAsyncCollector<T>
  2. {
  3. public readonly List<T> Items = new List<T>();
  4. public Task AddAsync(T item, CancellationToken cancellationToken = default)
  5. {
  6. Items.Add(item);
  7. return Task.FromResult(true);
  8. }
  9. public Task FlushAsync(CancellationToken cancellationToken = default)
  10. {
  11. return Task.FromResult(true);
  12. }
  13. }


这工作正常,但我无法找出AddAsync(eventData,id)的解决方案。
目前我的测试看起来像这样:

  1. public class Tests
  2. {
  3. private readonly MockLogger<Test> _logger;
  4. private readonly Test _processor;
  5. public Tests()
  6. {
  7. _logger = Substitute.For<MockLogger<Test>>();
  8. _processor = new Test(_logger);
  9. }
  10. [Fact]
  11. public async Task Run_GivenProperMessage_ShouldNotThrowAndCallAddAsync()
  12. {
  13. // Arrange
  14. var input = new List<Document>();
  15. var inputDocument = new Document();
  16. inputDocument.SetPropertyValue("id", Guid.NewGuid().ToString());
  17. input.Add(inputDocument);
  18. var output = Substitute.For<IAsyncCollector<EventData>>();
  19. // Act
  20. var act = async () => await _processor.Run(input, output, _logger);
  21. // Assert
  22. await act.Should().NotThrowAsync();
  23. await output.Received(1).AddAsync(Arg.Any<EventData>(), Arg.Any<string>());
  24. }
  25. }


异常的原因似乎是一个开关,它需要EventHubAsyncCollector,而不是它得到了一个NSubstitute ObjectProxy。

ut6juiuv

ut6juiuv1#

这个问题是由于测试中的IAsyncCollector<EventData>示例是NSubstitute的替代,并且由于AddAsync扩展方法中的switch语句,AddAsync方法没有被正确拦截。

  • 创建一个自定义的IAsyncCollector<EventData>实现,该实现正确地实现了具有分区键支持的AddAsync方法。
  1. public class MockEventHubAsyncCollector : IAsyncCollector<EventData>
  2. {
  3. public readonly List<(EventData, string)> Items = new List<(EventData, string)>();
  4. public Task AddAsync(EventData item, string partitionKey, CancellationToken cancellationToken = default)
  5. {
  6. Items.Add((item, partitionKey));
  7. return Task.CompletedTask;
  8. }
  9. public Task FlushAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;
  10. }

字符串

测试:

  1. // Arrange
  2. var input = new List<Document>();
  3. var inputDocument = new Document();
  4. var id = Guid.NewGuid().ToString();
  5. inputDocument.SetPropertyValue("id", id);
  6. input.Add(inputDocument);
  7. var output = new MockEventHubAsyncCollector();
  8. // Act
  9. var act = async () => await _processor.Run(input, output, _logger);
  10. // Assert
  11. await act.Should().NotThrowAsync();
  12. output.Items.Should().HaveCount(1);
  13. output.Items[0].Item2.Should().Be(id); // Check the partition key


以下是我的自定义实现,用于测试目的:

  1. using System.Collections.Generic;
  2. using Microsoft.Azure.Functions.Worker;
  3. using Microsoft.Extensions.Logging;
  4. using System.Threading.Tasks;
  5. namespace FunctionApp23
  6. {
  7. public class Function1
  8. {
  9. private readonly ILogger _logger;
  10. public Function1(ILoggerFactory loggerFactory)
  11. {
  12. _logger = loggerFactory.CreateLogger<Function1>();
  13. }
  14. [Function("Function1")]
  15. public async Task Run(
  16. [CosmosDBTrigger(
  17. databaseName: "databaseName",
  18. collectionName: "collectionName",
  19. ConnectionStringSetting = "CONN_STRING",
  20. LeaseCollectionName = "leases")] IReadOnlyList<MyDocument> input,
  21. [EventHub("test", Connection = "EventHubConnectionString")] IAsyncCollector<MyEventData> outputEvents)
  22. {
  23. if (input != null && input.Count > 0)
  24. {
  25. _logger.LogInformation("Documents modified: " + input.Count);
  26. _logger.LogInformation("First document Id: " + input[0].Id);
  27. // Create a custom mock for IAsyncCollector<MyEventData>
  28. var mockOutput = new MockEventHubAsyncCollector();
  29. // Process the documents and send to Event Hub
  30. foreach (var document in input)
  31. {
  32. var eventData = new MyEventData
  33. {
  34. Id = document.Id,
  35. Text = document.Text,
  36. Number = document.Number,
  37. Boolean = document.Boolean
  38. };
  39. await outputEvents.AddAsync(eventData, document.Id);
  40. }
  41. // Use the mock to assert expectations
  42. // For example: Assert that AddAsync was called with the correct parameters
  43. // mockOutput.Items should contain the expected values
  44. }
  45. }
  46. }
  47. public class MyEventData
  48. {
  49. public string Id { get; set; }
  50. public string Text { get; set; }
  51. public int Number { get; set; }
  52. public bool Boolean { get; set; }
  53. }
  54. public class MockEventHubAsyncCollector : IAsyncCollector<MyEventData>
  55. {
  56. public readonly List<(MyEventData, string)> Items = new List<(MyEventData, string)>();
  57. public Task AddAsync(MyEventData item, string partitionKey, System.Threading.CancellationToken cancellationToken = default)
  58. {
  59. Items.Add((item, partitionKey));
  60. return Task.CompletedTask;
  61. }
  62. public Task FlushAsync(System.Threading.CancellationToken cancellationToken = default) => Task.CompletedTask;
  63. }
  64. }

示例数据:


的数据

  • 一组事件(MyEventData示例),表示从Cosmos DB触发器转换的数据,每个事件都与正确的分区键相关联。预期结果的具体细节取决于Azure函数中给出的输入文档。
    制作活动:


展开查看全部

相关问题