Create Subscriber¶
This page shows how to create a RabbitMQ subscriber manually.
The concrete client is:
SubscriberBound<TData>
It requires:
SubscriberBoundDescriptorIDataDecoder- RabbitMQ channel
Create Connection And Channel¶
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
DispatchConsumersAsync = true
};
using var connection =
factory.CreateConnection("subscriber-demo");
using var channel =
connection.CreateChannel();
Create Formatter¶
This guide assumes that an IDataFormatter instance has already been created.
See:
- Getting Started → Create Formatter
IDataFormatter formatter = ...;
Create Descriptor¶
var descriptor = new SubscriberBoundDescriptor
{
EnableChannelEvents = true,
PrefetchCount = 10,
Queue = new QueueDescriptor
{
Name = "orders.created",
Durable = true,
AutoDelete = false
},
Exchange = new ExchangeDescriptor
{
Name = "orders",
Type = ExchangeTypes.Direct,
Durable = true,
AutoDelete = false
},
Binder = new BinderDescriptor
{
RoutingKey = "orders.created"
},
RetryPolicy = new RetryPolicyDescriptor
{
Enabled = true,
MaxRetry = 5,
Delayed = TimeSpan.FromSeconds(10),
Durable = true,
AutoDelete = false
}
};
Create Subscriber¶
await using var subscriber =
new SubscriberBound<OrderCreated>(
descriptor,
formatter,
channel);
Register A Message Handler¶
subscriber.Received.Add(
async (sender, args) =>
{
try
{
var message = args.Message.Data;
Console.WriteLine(
$"Received: {message.OrderId}");
args.Acknowledged = true;
args.Requeue = false;
}
catch
{
args.Acknowledged = false;
args.Requeue = true;
throw;
}
await Task.CompletedTask;
});
Start Subscriber¶
Subscribers implement:
IAsyncRunnable
Start the subscriber before processing messages.
await subscriber.Start();
You can verify the current state using:
if (subscriber.IsRunning)
{
Console.WriteLine("Subscriber is running.");
}
Stop Subscriber¶
await subscriber.Stop();
Complete Example¶
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
DispatchConsumersAsync = true
};
using var connection =
factory.CreateConnection("subscriber-demo");
using var channel =
connection.CreateChannel();
IDataFormatter formatter = ...;
var descriptor = new SubscriberBoundDescriptor
{
EnableChannelEvents = true,
PrefetchCount = 10,
Queue = new QueueDescriptor
{
Name = "orders.created",
Durable = true
},
Exchange = new ExchangeDescriptor
{
Name = "orders",
Type = ExchangeTypes.Direct,
Durable = true
},
Binder = new BinderDescriptor
{
RoutingKey = "orders.created"
},
RetryPolicy = new RetryPolicyDescriptor
{
Enabled = true,
MaxRetry = 5,
Delayed = TimeSpan.FromSeconds(10)
}
};
await using var subscriber =
new SubscriberBound<OrderCreated>(
descriptor,
formatter,
channel);
subscriber.Received.Add(
async (sender, args) =>
{
try
{
var message = args.Message.Data;
Console.WriteLine(message.OrderId);
args.Acknowledged = true;
args.Requeue = false;
}
catch
{
args.Acknowledged = false;
args.Requeue = true;
throw;
}
await Task.CompletedTask;
});
await subscriber.Start();
Console.WriteLine("Press ENTER to stop.");
Console.ReadLine();
await subscriber.Stop();
Message Contract¶
public sealed class OrderCreated
{
public Guid OrderId { get; init; }
public DateTime CreatedAtUtc { get; init; }
}
Next Step¶
Continue with:
- Consuming
- Configure Retry Policies
- Channel Events
- Reference → SubscriberBoundDescriptor