- Independent Consumption: Each message will be consumed independently by each consumer set up to handle it.
- Periodic Querying: Each instance of your app will periodically query the
consumer_messages
table for a batch of available messages to process. - The query takes place at the frequency defined by
ProcessorMaxDelay
, if a full batch is returned it will delay byProcessorMinDelay
. - Concurrency: Each app instance can run multiple parallel consumer processors defined by
ConsumerMessageProcessorCount
, unless usingAsyncMonolith.Ef
. - Batching: Consumer messages will be read from the
consumer_messages
table in batches defined byConsumerMessageBatchSize
. - Idempotency: Ensure your Consumers are idempotent, since they will be retried on failure.
- Timeout: Consumers timeout after the number of seconds defined by the
ConsumerTimeout
attribute or theDefaultConsumerTimeout
if not set.
Example
[ConsumerTimeout(5)] // Consumer timeouts after 5 seconds
[ConsumerAttempts(1)] // Consumer messages moved to poisoned table after 1 failed attempt
public class DeleteUsersPosts : BaseConsumer<UserDeleted>
{
private readonly ApplicationDbContext _dbContext;
public DeleteUsersPosts(ApplicationDbContext dbContext)
{
_dbContext = dbContext;
}
public override Task Consume(UserDeleted message, CancellationToken cancellationToken)
{
...
await _dbContext.SaveChangesAsync(cancellationToken);
}
}
Consumer Failures 💢
- Retry Logic: Messages will be retried up to
MaxAttempts
times (with aAttemptDelay
seconds between attempts) until they are moved to thepoisoned_messages
table. Add the[ConsumerAttempts(1)]
attribute to override this behavior. - Manual Intervention: If a message is moved to the
poisoned_messages
table, it will need to be manually removed from the database or moved back to theconsumer_messages
table to be retried. Note that the poisoned message will only be retried a single time unless you setattempts
back to 0. - Monitoring: Periodically monitor the
poisoned_messages
table to ensure there are not too many failed messages.