一个简单的Azure函数:
[FunctionName("TestFunction")]
public async Task Run([CosmosDBTrigger(
databaseName: "Test",
containerName: "test",
CreateLeaseContainerIfNotExists = true,
MaxItemsPerInvocation = 200,
FeedPollDelay = 10000,
Connection = "TestConnection",
LeaseContainerName = "testLeases")]IReadOnlyList<Document> inputs,
[EventHub("test", Connection = "TestConnectionString")] IAsyncCollector<EventData> outputEvents,
ILogger log)
{
if (inputs != null && inputs.Count > 0)
{
try
{
await inputs.ForEachAsync(dop: 10, input =>
{
var id = input.GetPropertyValue<string>("id");
var eventData = new EventData(Encoding.UTF8.GetBytes(input.ToString()));
return outputEvents.AddAsync(eventData, id);
});
}
catch (Exception exception)
{
_logger.LogError(exception, $"Error occurred while sending reports feed to datalake: {exception.Message}");
}
}
}
字符串
我尝试模拟outputEvents并AssertAddAsync(eventData,id)已收到调用。虽然对于AddAsync(eventData)没有问题,但对于上述情况有点问题,因为我收到来自EventHubWebJobsExtensions的异常:
public static Task AddAsync(this IAsyncCollector<EventData> instance, EventData eventData, string partitionKey, CancellationToken cancellationToken = default(CancellationToken)) =>
instance switch
{
EventHubAsyncCollector ehCollector => ehCollector.AddAsync(eventData, partitionKey, cancellationToken),
_ => throw new InvalidOperationException("Adding with a partition key is only available when using the Event Hubs extension package.")
};
型
对于AddAsync(eventData),我简单地使用建议的方法:
public class MockAsyncCollector<T> : IAsyncCollector<T>
{
public readonly List<T> Items = new List<T>();
public Task AddAsync(T item, CancellationToken cancellationToken = default)
{
Items.Add(item);
return Task.FromResult(true);
}
public Task FlushAsync(CancellationToken cancellationToken = default)
{
return Task.FromResult(true);
}
}
型
这工作正常,但我无法找出AddAsync(eventData,id)的解决方案。
目前我的测试看起来像这样:
public class Tests
{
private readonly MockLogger<Test> _logger;
private readonly Test _processor;
public Tests()
{
_logger = Substitute.For<MockLogger<Test>>();
_processor = new Test(_logger);
}
[Fact]
public async Task Run_GivenProperMessage_ShouldNotThrowAndCallAddAsync()
{
// Arrange
var input = new List<Document>();
var inputDocument = new Document();
inputDocument.SetPropertyValue("id", Guid.NewGuid().ToString());
input.Add(inputDocument);
var output = Substitute.For<IAsyncCollector<EventData>>();
// Act
var act = async () => await _processor.Run(input, output, _logger);
// Assert
await act.Should().NotThrowAsync();
await output.Received(1).AddAsync(Arg.Any<EventData>(), Arg.Any<string>());
}
}
型
异常的原因似乎是一个开关,它需要EventHubAsyncCollector,而不是它得到了一个NSubstitute ObjectProxy。
1条答案
按热度按时间ut6juiuv1#
这个问题是由于测试中的
IAsyncCollector<EventData>
示例是NSubstitute的替代,并且由于AddAsync
扩展方法中的switch语句,AddAsync
方法没有被正确拦截。IAsyncCollector<EventData>
实现,该实现正确地实现了具有分区键支持的AddAsync
方法。字符串
测试:
型
以下是我的自定义实现,用于测试目的:
型
示例数据:
的数据
MyEventData
示例),表示从Cosmos DB触发器转换的数据,每个事件都与正确的分区键相关联。预期结果的具体细节取决于Azure函数中给出的输入文档。制作活动:
的