MSMQ simply provides wonderful capabilities when it comes to building asynchronous messaging based applications. You typically run into situations where two applications exchange messages through a queue – one application sends messages to a known MSMQ destination and the other application “listens” to this queue and picks messages up as and when they arrive. The application which listens usually is a windows service which either has a dedicated thread making a blocking call on the Receive method or uses a timer of sorts to periodically check the queue for new messages. Either ways, to maximize throughput, you need to write the queue-listener code in such a way that you can have multiple threads simultaneously processing different messages that arrive at the queue.
MSMQ triggers provide one such solution where we can associate incoming messages to a COM component, based on certain rules, which can be application specific. MSMQ triggers are pretty powerful and also easy to administer. But there is one constraint, which has to do with the fact that only COM components are supported in this scheme, or in other words, message processors should be COM components. In a way, it isn’t that bad a choice after all. You still can create a CCW for a managed class and register the trigger. But, somehow, I didn’t like doing this.
To circumvent this problem, I decided to write a C# helper class, which doesn’t really do anything much, but helps realize a simple multithreaded job (message processing) dispatcher. The helper itself doesn’t do any message processing as such; whenever a message arrives at a queue, it just fires an event (on a new thread pool thread) with the message contents being in the event arguments.
public delegate void MessageReceivedEventHandler(object sender, MessageEventArgs args);
public class MSMQListener
{
private bool _listen;
private Type[] _types;
private MessageQueue _queue;
public event MessageReceivedEventHandler MessageReceived;
public Type[] FormatterTypes
{
get { return _types; }
set { _types = value; }
}
public MSMQListener(string queuePath)
{
_queue = new MessageQueue(queuePath);
}
public void Start()
{
_listen = true;
if (_types != null && _types.Length > 0)
{
// Using only the XmlMessageFormatter. You can use other formatters as well
_queue.Formatter = new XmlMessageFormatter(_types);
}
_queue.PeekCompleted += new PeekCompletedEventHandler(OnPeekCompleted);
_queue.ReceiveCompleted += new ReceiveCompletedEventHandler(OnReceiveCompleted);
StartListening();
}
public void Stop()
{
_listen = false;
_queue.PeekCompleted -= new PeekCompletedEventHandler(OnPeekCompleted);
_queue.ReceiveCompleted -= new ReceiveCompletedEventHandler(OnReceiveCompleted);
}
private void StartListening()
{
if (!_listen)
{
return;
}
// The MSMQ class does not have a BeginRecieve method that can take in a
// MSMQ transaction object. This is a workaround - we do a BeginPeek and then
// recieve the message synchronously in a transaction.
// Check documentation for more details
if (_queue.Transactional)
{
_queue.BeginPeek();
}
else
{
_queue.BeginReceive();
}
}
private void OnPeekCompleted(object sender, PeekCompletedEventArgs e)
{
_queue.EndPeek(e.AsyncResult);
MessageQueueTransaction trans = new MessageQueueTransaction();
Message msg = null;
try
{
trans.Begin();
msg = _queue.Receive(trans);
trans.Commit();
StartListening();
FireRecieveEvent(msg.Body);
}
catch
{
trans.Abort();
}
}
private void FireRecieveEvent(object body)
{
if (MessageReceived != null)
{
MessageReceived(this, new MessageEventArgs(body));
}
}
private void OnReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
Message msg = _queue.EndReceive(e.AsyncResult);
StartListening();
FireRecieveEvent(msg.Body);
}
}
public class MessageEventArgs : EventArgs
{
private object _messageBody;
public object MessageBody
{
get { return _messageBody; }
}
public MessageEventArgs(object body)
{
_messageBody = body;
}
}
As you can see, the helper class is indeed simple. One thing to notice though is that, I switch to BeginPeek if the queue is transactional. The reason behind this is there is no overload of the BeginRecieve method that takes a MessageQueueTransaction object and the MSDN documentation also explains the same. The workaround is to use BeginPeek and then do call Receive (synchronous/blocking operation) in the EndPeek method.
At this point of time, this class does not provide much control to the consumer. For instance, it does not allow you to set the maximum number of concurrent threads that can be processing messages - This class queues as many jobs as that supported by the .NET Thread pool. Also, I haven’t tested this code in all scenarios – I have not seen if this works well in the context of a distributed (DTC based) transaction. I shall update this version every now and then to make it more powerful and to work seamlessly in all scenarios.
Posted
Oct 16 2005, 08:00 PM
by
Manoj G