Using Rabbit publishers

As mentioned before, each publisher communicates with RabbitMQ Exchanges only, and not with queues.

Publishers overview

In the following image, It will be shown a common overview about how publishers communicates with RabbitMQ broker, and subscribers listen to their on related queues.

In this overview, It could be seen how publishers send messages into Exchanges, then they forward messages into specific queues, and this behaviour depends on the binding between Exchanges and Queues.

Exchanges is one of the most important component component in RabbitMq architecture, its task is to forward messages into rispective queues, all depend of 2 important factors:

  • Routing message
  • Exchange type

Currently RabbitMQ implements these Exchanges types:

  • Direct
  • Fanout
  • Topic
  • Headers

Prepares publishers

There are 4 kind of publishers: CommonPublisher, BinderPublisher, MultiBinderPublisher and RestPublisher

All publishers has two dependencies:

  • descriptor - used to configure own publisher, for example Broker URI, credentials, exchange | queue name ecc, for more details see here.
  • dataFormatter - It could be an encoder (for publishers), or decoder (for subscribers)

In the following examples, It will be used a Formatter for semplicity (this component implements the IDataFormatter contract, and this one extends IDataDecoder and IDataEncoder contracts).

// this variable wil be used in all below examples.
IDataFormatter dataFormatter = new JsonDataFormatter(jsonSettings, Encoding.UTF8);

otherwise you can use this another implementation

// this variable wil be used in all below examples.
IDataFormatter dataFormatter = new RwJsonDataFormatter(jsonSettings, Encoding.UTF8);

CommonPublisher class

This is a basic class for some category of publishers.
The usage of this class is when it's needed to publish messages without knowing the queue/s which will be bound with.

// define its own descriptor
var descriptor = new PublisherDescriptor
{
    Address = new Uri("amqp://localhost:5672"),
    Credentials = new Credentials { Username = "user", Password = "admin-pwd" },
    Exchange = { Name = "demo-direct", Type = ExchangeTypes.Direct }
};

// define a new publisher injecting its descriptor
var publisher = new CommonPublisher(descriptor, dataFormatter);

BinderPublisher class

This class inherents from CommonPublisher class, and this defines a queue which will be bound its exchange with.

// define its own descriptor
var descriptor = new BinderPublisherDescriptor
{
    Address = new Uri("amqp://localhost:5672"),
    Credentials = new Credentials { Username = "user", Password = "admin-pwd" },
    Exchange = { Name = "demo-direct", Type = ExchangeTypes.Direct },
    Queue = { Name = "demo" }
};

// define a new publisher injecting its descriptor
var publisher = new BinderPublisher(descriptor, dataFormatter);

The choice of one of them depend about how can publishers must communicate with brokers, in some cases a CommonPublisher could be only the unique solution for communicates with subscribers listening.

MultiBinderPublisher class

This class inherents from CommonPublisher class.

// define its own descriptor
var descriptor = new MultiBinderPublisherDescriptor
{
    Address = new Uri("amqp://localhost:5672"),
    Credentials = new Credentials { Username = "user", Password = "admin-pwd" },
    Exchange = { Name = "demo-direct", Type = ExchangeTypes.Direct },
    QueueBinders =
    {
        new QueueBinderDescriptor{ Queue = { Name = "aa" } },
        new QueueBinderDescriptor{ Queue = { Name = "bb" } },
        new QueueBinderDescriptor{ Queue = { Name = "cc" } }
    }
};

// define a new publisher injecting its descriptor
var publisher = new MultiBinderPublisher(descriptor, dataFormatter);

This implementation of publisher is able to make multi binding between its Exchange with many queues.

The real use case for this implementation could be:

  • When your application must forward messages with different routes.
  • Being able to forward the same message into many / selected queues at once using a routing key.
  • Custom and mmore complex routes algorithms (using Topic exchange).
  • Broadcast messages (into all queues bound, using Fanout exchange)

Now, after prepared a publisher client, you can send messages like this:

MyCustomPayload payload = PrepareMyPayload(); // returns a payload (the real message to send)
var message = new Message<MyCustomPayload>
{
  Id = "my message id",
  Route = "my route",
  Created = DateTime.Now,
  Ttl = TimeSpan.FromMinutes(30),
  Properties = { {"my custom property", "my custom value"} },
  Data = payload
};

publisher.Send(message);

// or in async way

publisher.SendAsync(message);

As seen the above example, the real message to send into RabbitMQ broker is incapsulated into another object called "Message", this type is generic so you can specify any kind of type as needed.

The real need to use a Message instance, is for specifying new attributes on messages, or maybe to indicate other important properties like:

  • Ttl - time to live, duration of messages on broker (queue)
  • Properties - custom dictionary used to hold extra information
  • Route - very important property, used to apply a routing message logic.

About Route property, this is used for Direct and Topic exchages only.

To understand better about routing, see documentation

RestPublisher

As mentioned in this tutorial, probably this kind of client is the most interesting one because it represents the bridge between ESB Broker and Rest Microservice architecture.

Currently, It exists 2 kind of Rest publishers, FullDuplexRestPublisher and StatelessRestPublisher, but these ones are internal publishers, so you cannot use directly, in fact the RestPublisher acts as decorator class using them internally.

In order to use one instead of the other one, it will depend on FullDuplex property value.

Let's start with a FullDuplex approach.

As can be deducted, the Rest API acts as subscriber, in reality the microservice process holds a second process which is the real subscriber, and behind the scenes, incoming messages are translated into HttpRequest instances, then are forwarded into all routes defined by Microservice API.

As can be understood, there are two queues, working queue is responsable to hold messages for subscribers, instead Callback Queue holds all responses from Subscribers sent to original publishers requests.

What happens if we have this situation?

In this example we have two indipendent publishers which try to consume the same Microservice API via RabbitMQ, in this case, each publsher will have a dedicated Callback Queue for their response, naturally they're not shareable with other publishers, in fact those queues are declared as "exclusives".

In the next It will be shown some examples for RestPublisher creation.

// defining a descriptor for publisher
var descriptor = new RestPublisherDescriptor
{
    Address = new Uri("amqp://localhost:5672"),
    Credentials = new Credentials { Username = "user", Password = "admin-pwd" },
    ServiceName = "MyAppDemo",
    Timeout = TimeSpan.FromMinutes(30),
    IncludeContent = true,
    Durable = true,
    FullDuplex = true
};

In this example, a descriptor instance defines some importants properties:

  • ServiceName - A common target name which identifies a Rest Microservice uniquely.
  • Timeout - a common timeout for each request, once elapsed the timeout It throws an exception.
  • IncludeContent - Indicates all responses will contain the real response payload represented as string.
  • Durable - Indicates each Broker components will be persisted on disk (exchanges, queues and possible messages).
  • FullDuplex - Indicates the capacity to receieve the Rest API responses

After defining the descriptor instance, you need other two dependencies for this publisher:

IDataFormatter

// defining a JsonSerializerSettings
var jsonSettings = new JsonSerializerSettings
{
    ContractResolver = new CamelCasePropertyNamesContractResolver()
};

// defining a data formatter, you can specify another kind of encoding, but pay attention by th other side, so subscriber must use the same encoding
var dataFormatter = new JsonDataFormatter(jsonSettings, Encoding.UTF8);

IHttpRestMessageSerializer

// defining IHttpRestMessageSerialzer instance
var restHttpSerializer = new HttpRestMessageSerializer(
    new FuncMemoryStreamResolver(() => new MemoryStream(), bytes => new MemoryStream(bytes)), Encoding.UTF8);

and then:

// defining the rest publisher
var client = new RestPublisher(descriptor, this.dataFormatter, restHttpSerializer);

at the end, you can send messages:

var request = new RestRequest("api/demo")
{
    Method = HttpMethods.Post,
    Payload = new { Id = i, Value = $"Custom value {i}", Price = i * 1.5 }
};

var tsk1 = client.SendAsync<object>(request);

//or 

object response = await client.SendAsync<object>(request);

The response is a generic Task, and depending on response structure, you can specify the generic type as pleasure, as long as It would be compatible.

Notice each publishers implements IDisposable, so It's needed to dispose once this client doesn't serve anymore, this is very important because this component is stateful, keeps a tcp connection with RabbitMQ, so keep in mind this is an application responsability.

About Stateless publisher approach:

It's almost identical, you need only setup like this:

// defining a descriptor for publisher
var descriptor = new RestPublisherDescriptor
{
    // other settings ..
    FullDuplex = false
};

In this case, no callback messages will be sent by remote service (Web APIs).

Pay attention with this approach, because RabbitMQ doesn't take care about bad routing, in that case messages arrives into remote services, but once arrived there, if no routing matches with any Web API routings, you won't be able to understand which messages will be lost !!!.

So, in order to avoid this situation, you need to test well your application workflow, making an end2end since publisher to REST Api reception.