Skip to content

Subscribe To Queue

This page shows how to configure a subscriber that consumes messages from a RabbitMQ queue.

The subscriber topology is defined through SubscriberBoundDescriptor.


Topology

graph LR
    Exchange["orders"]
    Queue["orders.created"]
    Subscriber["SubscriberBound<OrderCreated>"]

    Exchange --> Queue
    Queue --> Subscriber

Message Contract

public sealed class OrderCreated
{
    public Guid OrderId { get; init; }

    public DateTime CreatedAtUtc { get; init; }
}

Subscriber Descriptor

var descriptor = new SubscriberBoundDescriptor
{
    PrefetchCount = 10,

    Queue = new QueueDescriptor
    {
        Name = "orders.created",
        Durable = true,
        AutoDelete = false
    },

    Exchange = new ExchangeDescriptor
    {
        Name = "orders",
        Type = ExchangeTypes.Direct,
        Durable = true,
        AutoDelete = false
    },

    Binder = new BinderDescriptor
    {
        RoutingKey = "orders.created"
    }
};

Create Subscriber

var subscriber = new SubscriberBound<OrderCreated>(
    descriptor,
    formatter,
    channel);

Handle Messages

subscriber.Received.Add(
    async (sender, args) =>
    {
        var message = args.Message.Data;

        Console.WriteLine(
            $"order created: {message.OrderId}");

        args.Acknowledged = true;
        args.Requeue = false;

        await Task.CompletedTask;
    });

Start The Subscriber

Subscribers implement IAsyncRunnable.

The subscriber lifecycle is controlled through explicit start and stop operations.

await subscriber.Start();

Check Runtime State

The current execution state can be inspected through IsRunning.

if (subscriber.IsRunning)
{
    Console.WriteLine("Subscriber is running.");
}

Stop The Subscriber

When the application is shutting down, stop the subscriber explicitly.

await subscriber.Stop();

Runtime Lifecycle

stateDiagram-v2

    [*] --> Created

    Created --> Running : Start()

    Running --> Stopped : Stop()

    Stopped --> Running : Start()

Implementations should handle repeated calls to Start() and Stop() gracefully.


Notes

The queue and exchange names are logical descriptor names.

The physical RabbitMQ names are described in the Descriptor Reference section, where Name, Prefix and FullName are explained in detail.