上篇讲到 ,这篇讲述一下MSMQ在.NET Compact Framework下的开发。
所谓MQ就是Message Queue,消息队列。消息队列可以作为不同应用程序之间,甚至不同机器之间通信的渠道。在消息队列下进行通信的内容称为消息(Message),在C#程序下Message就是对象。
MSMQ就是Microsoft公司提供的MQ服务程序。MQ服务程序负责管理消息队列,保证消息在消息队列这一渠道下能无误的发送到对端,MQ支持离线交易,有时候消息会缓存在MQ服务程序中,当接收方再线时候在提取消息。这一特性使得MQ可以广泛使用在移动领域,因为移动应用的网络不能保证7×24的长连接。
生成队列
在CF.net下开发MQ,需要引用System.Messaging库。
using System.Messaging; public class MQService { private const string mMachinePrefix = @".\" ; private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\" ; private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$" ; private MessageQueue mServiceQueue; private void InitServiceQueue() { // create the message queue try { // check to make sure the message queue does not exist already if (! MessageQueue.Exists(mServiceQueuePath)) { // create the new message queue and make it transactional mServiceQueue = MessageQueue.Create(mServiceQueuePath); mServiceQueue.Close(); } else { mServiceQueue = new MessageQueue(mServiceQueuePath); } Type[] types = new Type[1 ]; types[0] = typeof(string ); mServiceQueue.Formatter = new XmlMessageFormatter(types); mServiceQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(MessageListenerEventHandler); // Begin the asynchronous receive operation. mServiceQueue.BeginReceive(); mServiceQueue.Close(); } // show message if we used an invalid message queue name; catch (MessageQueueException MQException) { Console.WriteLine(MQException.Message); } return ; } }
在建立Q之前先检查该Q是否存在,如果存在就生成Q的处理对象,如果不存在就先在队列管理器建立这个Q。建立Q的时候,输入参数为一个string,这个string可以为path(路径),或者。使用path的相对广泛,在例子中使用path作为输入参数。Path由 and 组成,建立的Q可以分为Public,Private,Journal和DeadLetter。使用广泛的是Public和Private,Public的Q由 and 组成,格式如\,而Private的Q的格式为\Private$\,比Public的Q多了一个标识Private$,在例子中使用了Private的Q。路径“.\”指的是本地机器。
Property Formatter十分重要,他定义了消息体的格式,所谓消息体的格式就是通过这个Q通信的消息的数据类型,一个Q可以传递多个不同的数据类型,需要在Type进行定义然后赋值给Formatter。
Event ReceiveCompleted用来注册接收处理函数,当Q接收到消息后,使用注册的函数进行处理。使用ReceiveCompleted注册处理函数以后,必须调用BeginReceive让这个Q进入异步接收状态。
下面讲述MQ应用中两种常见的应用模式,第一种为请求回应模式,第二种为注册广播模式。
请求回应模式
public class MQService { private const string mMachinePrefix = @".\" ; private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\" ; private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$" ; private System.Messaging.MessageQueue mServiceQueue; private void InitServiceQueue() { // create the message queue try { // check to make sure the message queue does not exist already if (! System.Messaging.MessageQueue.Exists(mServiceQueuePath)) { // create the new message queue and make it transactional mServiceQueue = System.Messaging.MessageQueue.Create(mServiceQueuePath); mServiceQueue.Close(); } else { mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath); } Type[] types = new Type[1 ]; types[0] = typeof(string ); mServiceQueue.Formatter = new System.Messaging.XmlMessageFormatter(types); mServiceQueue.ReceiveCompleted += new System.Messaging.ReceiveCompletedEventHandler(MessageListenerEventHandler); // Begin the asynchronous receive operation. mServiceQueue.BeginReceive(); mServiceQueue.Close(); } // show message if we used an invalid message queue name; catch (System.Messaging.MessageQueueException MQException) { Console.WriteLine(MQException.Message); } return ; } private void MessageListenerEventHandler(object sender, System.Messaging.ReceiveCompletedEventArgs e) { try { // Connect to the queue. System.Messaging.MessageQueue mq = (System.Messaging.MessageQueue)sender; // End the asynchronous receive operation. System.Messaging.Message msg = mq.EndReceive(e.AsyncResult); if (msg.Body.ToString() == "mq_reques_1" ) { msg.ResponseQueue.Send("mq_respond_1" ); } else if (msg.Body.ToString() == "mq_reques_2" ) { msg.ResponseQueue.Send(true ); } // Restart the asynchronous receive operation. mq.BeginReceive(); } catch (System.Messaging.MessageQueueException ex) { // Handle sources of MessageQueueException. Console.WriteLine(ex.Message); } return ; } } public class MQClient { private const string mMachinePrefix = @".\" ; private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\" ; private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$" ; private const string mClientQueuePath = mPrivateQueueNamePrefix + "MQClientQueue$" ; private System.Messaging.MessageQueue mServiceQueue; private System.Messaging.MessageQueue mClientQueue; public void InitQueues() { // create the message queue try { mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath); // check to make sure the message queue does not exist already if (! System.Messaging.MessageQueue.Exists(mClientQueuePath)) { // create the new message queue and make it transactional mClientQueue = System.Messaging.MessageQueue.Create(mClientQueuePath); mClientQueue.Close(); } else { mClientQueue = new System.Messaging.MessageQueue(mClientQueuePath); } Type[] types = new Type[2 ]; types[0] = typeof(string ); types[1] = typeof(bool ); mClientQueue.Formatter = new System.Messaging.XmlMessageFormatter(types); mClientQueue.Close(); } // show message if we used an invalid message queue name; catch (System.Messaging.MessageQueueException MQException) { Console.WriteLine(MQException.Message); } return ; } private void SendRequest() { try { System.Messaging.Message message = new System.Messaging.Message("mq_reques_1" ); message.ResponseQueue = mClientQueue; mClientQueue.Purge(); mServiceQueue.Send(message); System.Messaging.Message msg = mClientQueue.Receive(new TimeSpan(0, 0, 4 )); //handle the result. Console.WriteLine(msg.Body.ToString()); } // show message if we used an invalid message queue name; catch (System.Messaging.MessageQueueException MQException) { Console.WriteLine(MQException.Message); } } }
MQService是服务程序,负责服务队列"的建立和管理,当有新消息发送到该服务队列时MessageListenerEventHandler函数就会callback,取出消息进行分析处理和发送返回,返回是通过client原先建立的Q进行返回,不是通过原服务Q返回,因为MQ的队列是单向的。MQClient负责客户端队列"的建立,在发送请求的时候把客户端队列赋值到properties ResponseQueue里,让服务程序可以返回到这个客户端的队列里面,同时在等待返回的时候有超时控制。
注册广播模式
注册广播模式是Observer模式的一种应用,Observer模式可见。
客户端可以往服务端注册关心的消息,服务端通过MQ自动广播消息到客户端。
public class MQService { private const string mMachinePrefix = @".\" ; private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\" ; private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$" ; private System.Messaging.MessageQueue mServiceQueue; private Dictionary<string, MessageQueue> mmClientQueues = new Dictionary<string, MessageQueue> (); private void InitServiceQueue() { // create the message queue try { // check to make sure the message queue does not exist already if (! System.Messaging.MessageQueue.Exists(mServiceQueuePath)) { // create the new message queue and make it transactional mServiceQueue = System.Messaging.MessageQueue.Create(mServiceQueuePath); mServiceQueue.Close(); } else { mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath); } Type[] types = new Type[1 ]; types[0] = typeof(string ); mServiceQueue.Formatter = new System.Messaging.XmlMessageFormatter(types); mServiceQueue.ReceiveCompleted += new System.Messaging.ReceiveCompletedEventHandler(MessageListenerEventHandler); // Begin the asynchronous receive operation. mServiceQueue.BeginReceive(); mServiceQueue.Close(); } // show message if we used an invalid message queue name; catch (System.Messaging.MessageQueueException MQException) { Console.WriteLine(MQException.Message); } return ; } private void MessageListenerEventHandler(object sender, System.Messaging.ReceiveCompletedEventArgs e) { try { // Connect to the queue. System.Messaging.MessageQueue mq = (System.Messaging.MessageQueue)sender; // End the asynchronous receive operation. System.Messaging.Message msg = mq.EndReceive(e.AsyncResult); if(msg.Body.ToString() == "mq_register_1" ) { mmClientQueues.Add(msg.Label, msg.ResponseQueue); } else if(msg.Body.ToString() == "mq_unregister_1" ) { mmClientQueues[msg.Label].Purge(); mmClientQueues.Remove(msg.Label); } // Restart the asynchronous receive operation. mq.BeginReceive(); } catch (System.Messaging.MessageQueueException ex) { // Handle sources of MessageQueueException. Console.WriteLine(ex.Message); } return ; } private void Notify(string str) { if (mmClientQueues.Count > 0 ) { foreach(MessageQueue mq in mmClientQueues.Values) { mq.Send(str); } } } } public class MQClient { private const string mMachinePrefix = @".\" ; private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\" ; private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$" ; private const string mClientQueuePath = mPrivateQueueNamePrefix + "MQClientQueue$" ; private System.Messaging.MessageQueue mServiceQueue; private System.Messaging.MessageQueue mClientQueue; public void InitQueues() { // create the message queue try { mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath); // check to make sure the message queue does not exist already if (! System.Messaging.MessageQueue.Exists(mClientQueuePath)) { // create the new message queue and make it transactional mClientQueue = System.Messaging.MessageQueue.Create(mClientQueuePath); mClientQueue.Close(); } else { mClientQueue = new System.Messaging.MessageQueue(mClientQueuePath); } Type[] types = new Type[2 ]; types[0] = typeof(string ); types[1] = typeof(bool ); mClientQueue.Formatter = new System.Messaging.XmlMessageFormatter(types); // Initiate the asynchronous receive operation by telling the Message // Queue to begin receiving messages and notify the event handler // when finished mClientQueue.ReceiveCompleted += new System.Messaging.ReceiveCompletedEventHandler(ClientQueueReceiveCompleted); mClientQueue.BeginReceive(); mClientQueue.Close(); } // show message if we used an invalid message queue name; catch (System.Messaging.MessageQueueException MQException) { Console.WriteLine(MQException.Message); } return ; } private void RegisterService() { try { System.Messaging.Message message = new System.Messaging.Message("mq_register_1" ); message.Label = "client1" ; message.ResponseQueue = mClientQueue; mServiceQueue.Send(message); } // show message if we used an invalid message queue name; catch (System.Messaging.MessageQueueException MQException) { Console.WriteLine(MQException.Message); } } private void UnregisterService() { try { System.Messaging.Message message = new System.Messaging.Message("mq_unregister_1" ); message.Label = "client1" ; mServiceQueue.Send(message); Thread.Sleep(500 ); mClientQueue.Purge(); } // show message if we used an invalid message queue name; catch (System.Messaging.MessageQueueException MQException) { Console.WriteLine(MQException.Message); } } private void ClientQueueReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult) { try { // End the Asynchronous Receive Operation Message message = mClientQueue.EndReceive(asyncResult.AsyncResult); if (message.Body is string ) { Console.WriteLine(message.Body.ToString()); } } catch (MessageQueueException e) { Console.WriteLine (String.Format(System.Globalization.CultureInfo.CurrentCulture, "Failed to receive Message: {0} " , e.ToString())); } //Begin the next Asynchronous Receive Operation mClientQueue.BeginReceive(); }} 和请求回应模式相比MQService使用容器保存所有注册的客户端的Q,当需要notify的时候遍历所有客户端Q进行广播。MQClient建立广播Q,然后注册函数ClientQueueReceiveCompleted处理广播事件。MQ的应用能把Oberver模式应用跨进程和跨系统,消息订阅广播机制可以借助MQ和observer模式来实现。
参考文献
本文转自Jake Lin博客园博客,原文链接:http://www.cnblogs.com/procoder/archive/2009/03/23/1419440.html,如需转载请自行联系原作者