Skip to content

Create Subscriber

This page shows how to create a RabbitMQ subscriber manually.

The concrete client is:

SubscriberBound<TData>

It requires:

  • SubscriberBoundDescriptor
  • IDataDecoder
  • RabbitMQ channel

Create Connection And Channel

var factory = new ConnectionFactory
{
    HostName = "localhost",
    UserName = "guest",
    Password = "guest",
    DispatchConsumersAsync = true
};

using var connection =
    factory.CreateConnection("subscriber-demo");

using var channel =
    connection.CreateChannel();

Create Formatter

This guide assumes that an IDataFormatter instance has already been created.

See:

  • Getting Started → Create Formatter
IDataFormatter formatter = ...;

Create Descriptor

var descriptor = new SubscriberBoundDescriptor
{
    EnableChannelEvents = true,

    PrefetchCount = 10,

    Queue = new QueueDescriptor
    {
        Name = "orders.created",
        Durable = true,
        AutoDelete = false
    },

    Exchange = new ExchangeDescriptor
    {
        Name = "orders",
        Type = ExchangeTypes.Direct,
        Durable = true,
        AutoDelete = false
    },

    Binder = new BinderDescriptor
    {
        RoutingKey = "orders.created"
    },

    RetryPolicy = new RetryPolicyDescriptor
    {
        Enabled = true,
        MaxRetry = 5,
        Delayed = TimeSpan.FromSeconds(10),
        Durable = true,
        AutoDelete = false
    }
};

Create Subscriber

await using var subscriber =
    new SubscriberBound<OrderCreated>(
        descriptor,
        formatter,
        channel);

Register A Message Handler

subscriber.Received.Add(
    async (sender, args) =>
    {
        try
        {
            var message = args.Message.Data;

            Console.WriteLine(
                $"Received: {message.OrderId}");

            args.Acknowledged = true;
            args.Requeue = false;
        }
        catch
        {
            args.Acknowledged = false;
            args.Requeue = true;

            throw;
        }

        await Task.CompletedTask;
    });

Start Subscriber

Subscribers implement:

IAsyncRunnable

Start the subscriber before processing messages.

await subscriber.Start();

You can verify the current state using:

if (subscriber.IsRunning)
{
    Console.WriteLine("Subscriber is running.");
}

Stop Subscriber

await subscriber.Stop();

Complete Example

var factory = new ConnectionFactory
{
    HostName = "localhost",
    UserName = "guest",
    Password = "guest",
    DispatchConsumersAsync = true
};

using var connection =
    factory.CreateConnection("subscriber-demo");

using var channel =
    connection.CreateChannel();

IDataFormatter formatter = ...;

var descriptor = new SubscriberBoundDescriptor
{
    EnableChannelEvents = true,

    PrefetchCount = 10,

    Queue = new QueueDescriptor
    {
        Name = "orders.created",
        Durable = true
    },

    Exchange = new ExchangeDescriptor
    {
        Name = "orders",
        Type = ExchangeTypes.Direct,
        Durable = true
    },

    Binder = new BinderDescriptor
    {
        RoutingKey = "orders.created"
    },

    RetryPolicy = new RetryPolicyDescriptor
    {
        Enabled = true,
        MaxRetry = 5,
        Delayed = TimeSpan.FromSeconds(10)
    }
};

await using var subscriber =
    new SubscriberBound<OrderCreated>(
        descriptor,
        formatter,
        channel);

subscriber.Received.Add(
    async (sender, args) =>
    {
        try
        {
            var message = args.Message.Data;

            Console.WriteLine(message.OrderId);

            args.Acknowledged = true;
            args.Requeue = false;
        }
        catch
        {
            args.Acknowledged = false;
            args.Requeue = true;

            throw;
        }

        await Task.CompletedTask;
    });

await subscriber.Start();

Console.WriteLine("Press ENTER to stop.");
Console.ReadLine();

await subscriber.Stop();

Message Contract

public sealed class OrderCreated
{
    public Guid OrderId { get; init; }

    public DateTime CreatedAtUtc { get; init; }
}

Next Step

Continue with:

  • Consuming
  • Configure Retry Policies
  • Channel Events
  • Reference → SubscriberBoundDescriptor