MSMVPS.COM
The Ultimate Destination for Blogs by Current and Former Microsoft Most Valuable Professionals.

A simple MSMQ Listener helper class

MSMQ simply provides wonderful capabilities when it comes to building asynchronous messaging based applications. You typically run into situations where two applications exchange messages through a queue – one application sends messages to a known MSMQ destination and the other application “listens” to this queue and picks messages up as and when they arrive. The application which listens usually is a windows service which either has a dedicated thread making a blocking call on the Receive method or uses a timer of sorts to periodically check the queue for new messages. Either ways, to maximize throughput, you need to write the queue-listener code in such a way that you can have multiple threads simultaneously processing different messages that arrive at the queue.

MSMQ triggers provide one such solution where we can associate incoming messages to a COM component, based on certain rules, which can be application specific. MSMQ triggers are pretty powerful and also easy to administer. But there is one constraint, which has to do with the fact that only COM components are supported in this scheme, or in other words, message processors should be COM components. In a way, it isn’t that bad a choice after all. You still can create a CCW for a managed class and register the trigger. But, somehow, I didn’t like doing this. 

To circumvent this problem, I decided to write a C# helper class, which doesn’t really do anything much, but helps realize a simple multithreaded job (message processing) dispatcher. The helper itself doesn’t do any message processing as such; whenever a message arrives at a queue, it just fires an event (on a new thread pool thread) with the message contents being in the event arguments.

public delegate void MessageReceivedEventHandler(object sender, MessageEventArgs args);

public class MSMQListener
{
    
private bool _listen;
    
private Type[] _types;
    
private MessageQueue _queue;

    
public event MessageReceivedEventHandler MessageReceived;

    
public Type[] FormatterTypes
    {
        
get return _types; }
        
set { _types = value; }
    }

    
public MSMQListener(string queuePath)
    {
        _queue = 
new MessageQueue(queuePath);    
    }

    
public void Start()
    {
        _listen = 
true;
        
        
if (_types != null && _types.Length > 0)
        {
            
// Using only the XmlMessageFormatter. You can use other formatters as well
            
_queue.Formatter = new XmlMessageFormatter(_types);
        }

        _queue.PeekCompleted += 
new PeekCompletedEventHandler(OnPeekCompleted);
        _queue.ReceiveCompleted += 
new ReceiveCompletedEventHandler(OnReceiveCompleted);

        StartListening();
    }
    
    
public void Stop()
    {
        _listen = 
false;
        _queue.PeekCompleted -= 
new PeekCompletedEventHandler(OnPeekCompleted);
        _queue.ReceiveCompleted -= 
new ReceiveCompletedEventHandler(OnReceiveCompleted);

    }

    
private void StartListening()
    {
        
if (!_listen)
        {
            
return;
        }

        
// The MSMQ class does not have a BeginRecieve method that can take in a 
        // MSMQ transaction object. This is a workaround - we do a BeginPeek and then 
        // recieve the message synchronously in a transaction.
        // Check documentation for more details
        
if (_queue.Transactional)
        {
            _queue.BeginPeek();
        }
        
else
        
{
            _queue.BeginReceive();
        }
    }

    
private void OnPeekCompleted(object sender, PeekCompletedEventArgs e)
    {
        _queue.EndPeek(e.AsyncResult);
        MessageQueueTransaction trans = 
new MessageQueueTransaction();
        Message msg = 
null;
        
try
        
{
            trans.Begin();
            msg = _queue.Receive(trans);
            trans.Commit();

            StartListening();

            FireRecieveEvent(msg.Body);
        }
        
catch
        
{
            trans.Abort();
        }
    }

    
private void FireRecieveEvent(object body)
    {
        
if (MessageReceived != null)
        {
            MessageReceived(
thisnew MessageEventArgs(body));
        }
    }

    
private void OnReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        Message msg = _queue.EndReceive(e.AsyncResult);
        
        StartListening();

        FireRecieveEvent(msg.Body);
    }

}

 
public class MessageEventArgs : EventArgs
 {
    
private object _messageBody;

    
public object MessageBody
    {
        
get return _messageBody; }
    }

    
public MessageEventArgs(object body)
    {
        _messageBody = body;

    }
 }

As you can see, the helper class is indeed simple. One thing to notice though is that, I switch to BeginPeek if the queue is transactional. The reason behind this is there is no overload of the BeginRecieve method that takes a MessageQueueTransaction object and the MSDN documentation also explains the same. The workaround is to use BeginPeek and then do call Receive (synchronous/blocking operation) in the EndPeek method.

At this point of time, this class does not provide much control to the consumer. For instance, it does not allow you to set the maximum number of concurrent threads that can be processing messages - This class queues as many jobs as that supported by the .NET Thread pool. Also, I haven’t tested this code in all scenarios – I have not seen if this works well in the context of a distributed (DTC based) transaction. I shall update this version every now and then to make it more powerful and to work seamlessly in all scenarios.     


Posted Oct 16 2005, 08:00 PM by Manoj G
Filed under:

Comments

akshay wrote re: A simple MSMQ Listener helper class
on 04-25-2006 9:45
Thats cool...
I was looking at creating a MSMQ listner... This will help a lot.

One question I have : Is can you share any ideas; if I wish to create multiple listener to the same queue, is there are way to synchronize these listners sitting on different machines...
i.e.
Server A and Server B are listening to the same queue, will both the servers recieve an event when a messge arrives on the queue. Yes they would. - How can I ask one of the servers to process and the other to stand down...

Any Ideas ???

Thanks
Akshay.
Ravi wrote re: A simple MSMQ Listener helper class
on 06-27-2006 6:44
Hi,

i am tryig to do an async peek, by calling BeginPeek, the Peek Complete event get triggered immediatly after a call to BeginPeek, even though the Queue is empty.
and when endPeek is called, it throws an exception, with no message, but, it has a MessageQueueErrorCode : 0xC0000120

I was getting this exception.
Then after Searching found ur code. then again i am getting the same exception.
Any Idea?

Regards
B, Ravi Shankar
sloan wrote Wow. Thanks!
on 07-21-2006 16:10
manoj g,

thanks.. that really, really helps with my msmq development.

I added one overloaded constructor.... since I binary format my objects. (below)
(and theres a small spelling error: FireReceiveEvent)

but minus my caveat, this thing works great.

There's probably a hole with .. if you use the overload and set the Types[] , so don't trust my implementation too far.

public MSMQListener(string queuePath)
{
this.ConstructorCommon(queuePath);
}

private void ConstructorCommon(string queuePath)
{
_queue = new MessageQueue(queuePath);    


//non manoj stuff//MessageQueue.EnableConnectionCache = false;
//non manoj stuff//this._queue.SetPermissions("Everyone", MessageQueueAccessRights.FullControl, AccessControlEntryType.Allow);
}

public MSMQListener(string queuePath , System.Messaging.IMessageFormatter formatter)
{
this.ConstructorCommon(queuePath);
this._queue.Formatter = formatter; // Customize the formatter type
}
sloan wrote re: A simple MSMQ Listener helper class
on 09-18-2006 16:56
Ravi, I was able to put the fix in this way: In the OnPeekCompletedMethod .. I added the following (extra exception handling) code: catch (MessageQueueException msmqex) { switch ( (int)msmqex.MessageQueueErrorCode ) { case -1073741536: Console.WriteLine("Msmq Error -1073741536 : Sleeping, and then ReListening"); System.Threading.Thread.Sleep(3000); StartListening(); break; default: trans.Abort(); throw msmqex; //break; } } catch { trans.Abort(); } The issue is that there is a missing enum value for -1073741536 See this url: http://www.koders.com/csharp/fidE632059BF610D9D912C019A96DCF7A2DD76C847F.aspx
Dharan Prakash wrote re: A simple MSMQ Listener helper class
on 12-01-2006 5:50

Hi Manoj,

Can you please give me an answer or references for Akshay's question? i.e

Server A and Server B are listening to the same queue, will both the servers recieve an event when a messge arrives on the queue. Yes they would. - How can I ask one of the servers to process and the other to stand down...

Thanks

Manoj G wrote re: A simple MSMQ Listener helper class
on 12-02-2006 8:48

Hi Akshay,

Transactional Receive should solve your problem.

Thanks,

Manoj

Rahul wrote re: A simple MSMQ Listener helper class
on 02-28-2007 23:11

I am using the same code and created windows services in my local system but if i run my windows service to receive the message. Access denaid the  error message i got.

Can u specify the the format name for the Queue Path. Or any security setting has to be enabled.

Filip wrote re: A simple MSMQ Listener helper class
on 06-03-2007 21:08
Nice code, would you be able to provide an example of the test class as well on how to call this? (Sorry, I'm a c# beginner)
JM wrote re: A simple MSMQ Listener helper class
on 08-07-2007 20:55

I worte a Q listener as a window service, it is working for read the local transactional Q. But when I cluster the MSMQ, the Q listenser service can not read the Q any more.

any idea why?

Sloan wrote re: A simple MSMQ Listener helper class
on 10-03-2007 9:16

//Quote//

I am using the same code and created windows services in my local system but if i run my windows service to receive the message. Access denaid the  error message i got.

Can u specify the the format name for the Queue Path. Or any security setting has to be enabled.

//End Quote

You have to go thru "Manage Computer" and find the Queue.  And give permissions ON THE QUEUE...to the "SYSTEM" account (the SYSTEM account local to that computer).

You don't have to do "Full Control", but Send/Receive are necessary.  I usually give Get/Set Permissions as well.

Add a Comment

(required)  
(optional)
(required)  
Remember Me?


Copyright © is the original authors. Blog site is an independent site not sponsored by Microsoft. The Yoda blog server and the Brianna SQL server would like to thank www.ownwebnow.com and www.exchangedefender.com. They wouldn't be here and broadcasting without the generosity of Vlad Mazek and his companies.

Powered by Community Server (Commercial Edition), by Telligent Systems