Multiple Consumers¶
Multiple consumers can consume from the same queue.
This pattern is useful when you want to scale processing throughput horizontally.
Competing Consumers¶
graph LR
Queue["orders.created"]
Queue --> ConsumerA["Consumer A"]
Queue --> ConsumerB["Consumer B"]
Queue --> ConsumerC["Consumer C"]
RabbitMQ distributes messages between available consumers.
Each message is processed by one consumer.
Descriptor¶
All consumers can use the same queue descriptor.
var descriptor = new SubscriberBoundDescriptor
{
PrefetchCount = 10,
Queue = new QueueDescriptor
{
Name = "orders.created",
Durable = true
},
Exchange = new ExchangeDescriptor
{
Name = "orders",
Type = ExchangeTypes.Direct,
Durable = true
},
Binder = new BinderDescriptor
{
RoutingKey = "orders.created"
}
};
Consumer Instance¶
var subscriber = new SubscriberBound<OrderCreated>(
descriptor,
formatter,
channel);
Handler¶
subscriber.Received.Add(
async (sender, args) =>
{
var message = args.Message.Data;
Console.WriteLine(
$"consumer processed order: {message.OrderId}");
args.Acknowledged = true;
args.Requeue = false;
await Task.CompletedTask;
});
Start Each Subscriber¶
Each subscriber instance implements IAsyncRunnable.
Every consumer instance must be started independently.
await subscriberA.Start();
await subscriberB.Start();
await subscriberC.Start();
You can inspect the runtime state through IsRunning.
if (subscriberA.IsRunning)
{
Console.WriteLine("subscriberA is running.");
}
Stop Each Subscriber¶
Each subscriber should also be stopped explicitly during application shutdown.
await subscriberA.Stop();
await subscriberB.Stop();
await subscriberC.Stop();
Scaling Model¶
graph TD
Queue["orders.created"]
Instance1["Application Instance 1"]
Instance2["Application Instance 2"]
Instance3["Application Instance 3"]
Queue --> Instance1
Queue --> Instance2
Queue --> Instance3
Use multiple application instances when:
- processing is CPU intensive;
- message volume increases;
- consumers perform I/O operations;
- workload must be distributed.
Prefetch Count¶
When using multiple consumers, tune PrefetchCount carefully.
A high prefetch count can cause one consumer to hold too many messages while others remain idle.