dotnet core系列之Background tasks with hosted services (后台任务)

Dear 丶 2021-11-01 09:34 397阅读 0赞

这篇简单讲asp.net core 中的后台任务

用到的包:

Microsoft.AspNetCore.App metapackage

或者加入

Microsoft.Extensions.Hosting

一. Timed background tasks(定时后台任务)

使用到System.Threading.Timer类。定时器触发任务的DoWork方法。定时器在StopAsync上停止,并且释放是在Dispose上

  1. internal class TimedHostedService : IHostedService, IDisposable
  2. {
  3. private readonly ILogger _logger;
  4. private Timer _timer;
  5. public TimedHostedService(ILogger<TimedHostedService> logger)
  6. {
  7. _logger = logger;
  8. }
  9. public Task StartAsync(CancellationToken cancellationToken)
  10. {
  11. _logger.LogInformation("Timed Background Service is starting.");
  12. _timer = new Timer(DoWork, null, TimeSpan.Zero,
  13. TimeSpan.FromSeconds(5));
  14. return Task.CompletedTask;
  15. }
  16. private void DoWork(object state)
  17. {
  18. _logger.LogInformation("Timed Background Service is working.");
  19. }
  20. public Task StopAsync(CancellationToken cancellationToken)
  21. {
  22. _logger.LogInformation("Timed Background Service is stopping.");
  23. _timer?.Change(Timeout.Infinite, 0);
  24. return Task.CompletedTask;
  25. }
  26. public void Dispose()
  27. {
  28. _timer?.Dispose();
  29. }
  30. }

服务是在Startup.ConfigureServices上使用AddHostedService扩展方法注册:

  1. services.AddHostedService<TimedHostedService>();

二. Consuming a scoped service in a background task 在后台任务中运行scoped service

使用IHostService中的scoped services, 创建一个scope. 对于一个hosted service默认没有scope被创建。

这个scoped 后台任务服务包含后台任务逻辑。下面的例子中,一个ILogger被注入到了service中:

  1. internal interface IScopedProcessingService
  2. {
  3. void DoWork();
  4. }
  5. internal class ScopedProcessingService : IScopedProcessingService
  6. {
  7. private readonly ILogger _logger;
  8. public ScopedProcessingService(ILogger<ScopedProcessingService> logger)
  9. {
  10. _logger = logger;
  11. }
  12. public void DoWork()
  13. {
  14. _logger.LogInformation("Scoped Processing Service is working.");
  15. }
  16. }

这个hosted service 创建了一个scope解析了scoped后台任务服务来调用它的DoWork方法:

  1. internal class ConsumeScopedServiceHostedService : IHostedService
  2. {
  3. private readonly ILogger _logger;
  4. public ConsumeScopedServiceHostedService(IServiceProvider services,
  5. ILogger<ConsumeScopedServiceHostedService> logger)
  6. {
  7. Services = services;
  8. _logger = logger;
  9. }
  10. public IServiceProvider Services { get; }
  11. public Task StartAsync(CancellationToken cancellationToken)
  12. {
  13. _logger.LogInformation(
  14. "Consume Scoped Service Hosted Service is starting.");
  15. DoWork();
  16. return Task.CompletedTask;
  17. }
  18. private void DoWork()
  19. {
  20. _logger.LogInformation(
  21. "Consume Scoped Service Hosted Service is working.");
  22. using (var scope = Services.CreateScope())
  23. {
  24. var scopedProcessingService =
  25. scope.ServiceProvider
  26. .GetRequiredService<IScopedProcessingService>();
  27. scopedProcessingService.DoWork();
  28. }
  29. }
  30. public Task StopAsync(CancellationToken cancellationToken)
  31. {
  32. _logger.LogInformation(
  33. "Consume Scoped Service Hosted Service is stopping.");
  34. return Task.CompletedTask;
  35. }
  36. }

服务注册在Startup.ConfigureServices中。IHostedService的实现用AddHostedService扩展方法注册:

  1. services.AddHostedService<ConsumeScopedServiceHostedService>();
  2. services.AddScoped<IScopedProcessingService, ScopedProcessingService>();

三. Queued background tasks 排队的后台任务

  1. public interface IBackgroundTaskQueue
  2. {
  3. void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
  4. Task<Func<CancellationToken, Task>> DequeueAsync(
  5. CancellationToken cancellationToken);
  6. }
  7. public class BackgroundTaskQueue : IBackgroundTaskQueue
  8. {
  9. private ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
  10. new ConcurrentQueue<Func<CancellationToken, Task>>();
  11. private SemaphoreSlim _signal = new SemaphoreSlim(0);
  12. public void QueueBackgroundWorkItem(
  13. Func<CancellationToken, Task> workItem)
  14. {
  15. if (workItem == null)
  16. {
  17. throw new ArgumentNullException(nameof(workItem));
  18. }
  19. _workItems.Enqueue(workItem);
  20. _signal.Release();
  21. }
  22. public async Task<Func<CancellationToken, Task>> DequeueAsync(
  23. CancellationToken cancellationToken)
  24. {
  25. await _signal.WaitAsync(cancellationToken);
  26. _workItems.TryDequeue(out var workItem);
  27. return workItem;
  28. }
  29. }

在 QueueHostedService中,队列中的后台任务出队列并且作为BackroundService执行。BackgroundService是一个实现了IHostedService接口的类。

  1. public class QueuedHostedService : BackgroundService
  2. {
  3. private readonly ILogger _logger;
  4. public QueuedHostedService(IBackgroundTaskQueue taskQueue,
  5. ILoggerFactory loggerFactory)
  6. {
  7. TaskQueue = taskQueue;
  8. _logger = loggerFactory.CreateLogger<QueuedHostedService>();
  9. }
  10. public IBackgroundTaskQueue TaskQueue { get; }
  11. protected async override Task ExecuteAsync(
  12. CancellationToken cancellationToken)
  13. {
  14. _logger.LogInformation("Queued Hosted Service is starting.");
  15. while (!cancellationToken.IsCancellationRequested)
  16. {
  17. var workItem = await TaskQueue.DequeueAsync(cancellationToken);
  18. try
  19. {
  20. await workItem(cancellationToken);
  21. }
  22. catch (Exception ex)
  23. {
  24. _logger.LogError(ex,
  25. $"Error occurred executing {nameof(workItem)}.");
  26. }
  27. }
  28. _logger.LogInformation("Queued Hosted Service is stopping.");
  29. }
  30. }

服务注册在Startup.ConfigureService方法中。IHostedService的实现用AddHostedService扩展方法注册:

  1. services.AddHostedService<QueuedHostedService>();
  2. services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();

在Index page model类中:

  • IBackgroundTaskQueue被注入到构造函数并且指定给Queue
  • 一个 IServiceScopeFactory被注入并且指定给_serviceScopeFactory. 这个工厂用来创建IServiceScope实例, IServiceScope实例是用来在scope内创建 services的。一个scope被创建时为了用应用的AppDbContext(a scoped service)来写数据库记录在 IBackgroundTaskQueue 中(a singleton service).

    public class IndexModel : PageModel
    {

    1. private readonly AppDbContext _db;
    2. private readonly ILogger _logger;
    3. private readonly IServiceScopeFactory _serviceScopeFactory;
    4. public IndexModel(AppDbContext db, IBackgroundTaskQueue queue,
    5. ILogger<IndexModel> logger, IServiceScopeFactory serviceScopeFactory)
    6. {
    7. _db = db;
    8. _logger = logger;
    9. Queue = queue;
    10. _serviceScopeFactory = serviceScopeFactory;
    11. }
    12. public IBackgroundTaskQueue Queue { get; }

当 Index page 上的Add Task按钮被选中时,OnPostAddTask方法被执行。QueueBackgroundWorkItem被调用来使work item入队。

  1. public IActionResult OnPostAddTaskAsync()
  2. {
  3. Queue.QueueBackgroundWorkItem(async token =>
  4. {
  5. var guid = Guid.NewGuid().ToString();
  6. using (var scope = _serviceScopeFactory.CreateScope())
  7. {
  8. var scopedServices = scope.ServiceProvider;
  9. var db = scopedServices.GetRequiredService<AppDbContext>();
  10. for (int delayLoop = 1; delayLoop < 4; delayLoop++)
  11. {
  12. try
  13. {
  14. db.Messages.Add(
  15. new Message()
  16. {
  17. Text = $"Queued Background Task {guid} has " +
  18. $"written a step. {delayLoop}/3"
  19. });
  20. await db.SaveChangesAsync();
  21. }
  22. catch (Exception ex)
  23. {
  24. _logger.LogError(ex,
  25. "An error occurred writing to the " +
  26. $"database. Error: {ex.Message}");
  27. }
  28. await Task.Delay(TimeSpan.FromSeconds(5), token);
  29. }
  30. }
  31. _logger.LogInformation(
  32. $"Queued Background Task {guid} is complete. 3/3");
  33. });
  34. return RedirectToPage();
  35. }

四. 总结

注意上面的方法都有一个共同点:即直接或间接实现 IHostedService 方法

IHostedService interface

Hosted servcies实现IHostService接口. 这个接口定义了两个方法,为被主机管理的对象:

  • StartAsync - StartAsync包含启动后台任务的逻辑。

  • StopAsync - 当host 执行关闭时触发。StopAsync包含终止后台任务的逻辑。实现IDisposable 和finalizers 来释放任意unmanaged resources.

你可以把这种用法的后台任务加到任意应用,例如web api , mvc , 控制台等,因为后台服务在应用启动时,就被加载了。它是被以服务的方式加到了管道上了

参考网址:

https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.2&tabs=visual-studio

转载于:https://www.cnblogs.com/Vincent-yuan/p/11048748.html

发表评论

表情:
评论列表 (有 0 条评论,397人围观)

还没有评论,来说两句吧...

相关阅读