Queuing

Coravel gives you a zero-configuration queue that runs in-memory.

This is useful to offload long-winded tasks to the background instead of making your users wait for their HTTP request to finish.

Setup

In your Startup file, in the ConfigureServices():

services.AddQueue();

Then you inject an instance of the Coravel.Queuing.Interfaces.IQueue interface into your controller, etc.

IQueue _queue;

public HomeController(IQueue queue) {
    this._queue = queue;
}

Queuing Jobs

Queuing Invocables

To queue an invocable, use QueueInvocable:

this._queue.QueueInvocable<GrabDataFromApiAndPutInDBInvocable>();

TIP

Queuing invocables is the recommended way to use Coravel's queuing.

To learn about creating and using invocables see here.

Queue With A Payload

Many times you want to queue a background job and also supply a payload/parameters.

For example, you might have an invocable SendWelcomeUserEmailInvocable. However, you need to supply a specific user's information so that the correct user will receive the email!

First, add the IInvocableWithPayload<T> interface to your existing invocable:

                                                         // This one 👇
public class SendWelcomeUserEmailInvocable : IInvocable, IInvocableWithPayload<UserModel>
{
  // This is the implementation of the interface 👇
  public UserModel Payload { get; set; }

  /* Constructor, etc. */

  public async Task Invoke()
  {
    // `this.Payload` will be available to use now.
  }
}

To queue this invocable, use the QueueInvocableWithPayload method:

var userModel = await _userService.Get(userId);
queue.QueueInvocableWithPayload<SendWelcomeUserEmailInvocable, UserModel>(userModel);

Now your job will be queued to execute in the background with the specific data it needs to run!

Queuing Async

Use the QueueAsyncTask to queue up an async task:

 this._queue.QueueAsyncTask(async() => {
    await Task.Delay(1000);
    Console.WriteLine("This was queued!");
 });

Queuing Synchronously

You use the QueueTask() method to add a task to the queue.

public IActionResult QueueTask() {
    this._queue.QueueTask(() => Console.WriteLine("This was queued!"));
    return Ok();
}

Queue Event Broadcast

Event broadcasting is great - but what if your event listeners are doing some heavy / long-winded tasks? You don't want that to happen on the same thread as your HTTP request!

By using QueueBroadcast, you can queue an event to be broadcasted in the background.

this._queue.QueueBroadcast(new OrderCreated(orderId)); 

Queuing Cancellable Invocables

Sometimes you have a long-running invocable that needs the ability to be cancelled (manually by you or by the system when the application is being shutdown).

By using QueueCancellableInvocable you can build invocables that can be cancelled:

 var (taskGuid, token) = queue.QueueCancellableInvocable<CancellableInvocable>();
 
 // Somewhere else....

 token.Cancel();

Add the Coravel.Queuing.Interfaces.ICancellableTask to your invocable and implement the property:

CancellationToken Token { get; set; }

In your Invoke method, you will have access to check if the token has been cancelled.

while(!this.Token.IsCancellationRequested)
{
  await ProcessNextRecord();
}

Metrics

You can gain some insight into how the queue is doing at a given moment in time.

var metrics = _queue.GetMetrics();

Available methods:

  • WaitingCount(): The number of tasks waiting to be executed.
  • RunningCount(): The number of tasks currently running.

Tracking Task Progress

Most of the methods on the IQueue interface will return a Guid that represents the unique id for the task you pushed to the queue. Also, Coravel's queue exposes some internal events that you can hook into.

Combining these: you can create listeners for the events QueueTaskStarted and QueueTaskCompleted that verify the progress of specific tasks in real-time. When a task/job crashes, then the event DequeuedTaskFailed will be emitted. Creating a listener for this one might be helpful too.

A basic listener that notifies some user interface of task progress might look something like this:

public class TaskStartedListener : IListener<QueueTaskStarted>
{
    // Constructor etc.

    public async Task HandleAsync(QueueTaskStarted broadcasted)
    {
        await this._uiNotifications.NotifyTaskStarted(broadcasted.Guid);
    }
}

Global Error Handling

In the Configure method of your Startup file:

var provider = app.ApplicationServices;
provider
    .ConfigureQueue()
    .OnError(e =>
    {
        //... handle the error
    });

Adjusting Consummation Delay

Normally, the queue will consume all of its queued tasks every 30 seconds. You can adjust this delay in the appsettings.json file.

"Coravel": {
  "Queue": {
    "ConsummationDelay": 1
  }
}

Logging Task Progress

Coravel uses the ILogger .NET Core interface to allow logging task progress:

var provider = app.ApplicationServices;
provider
    .ConfigureQueue()
    .LogQueuedTaskProgress(provider.GetService<ILogger<IQueue>>());