CON002 Hands-On Lab Lab Manual
Reliable, Transacted and Instrumented Messaging with the Windows Communication Foundation This lab is designed for use with the .NET Framework 3 Beta 2.
Page i
Page ii
Information in this document is subject to change without notice. The example companies, organizations, products, people, and events depicted herein are fictitious. No association with any real company, organization, product, person or event is intended or should be inferred. Complying with all applicable copyright laws is the responsibility of the user. Without limiting the rights under copyright, no part of this document may be reproduced, stored in or introduced into a retrieval system, or transmitted in any form or by any means (electronic, mechanical, photocopying, recording, or otherwise), or for any purpose, without the express written permission of Microsoft Corporation. Microsoft may have patents, patent applications, trademarked, copyrights, or other intellectual property rights covering subject matter in this document. Except as expressly provided in any written license agreement from Microsoft, the furnishing of this document does not give you any license to these patents, trademarks, copyrights, or other intellectual property. © 2005 Microsoft Corporation. All rights reserved. Microsoft, MS-DOS, MS, Windows, Windows NT, MSDN, Active Directory, BizTalk, SQL Server, SharePoint, Outlook, PowerPoint, FrontPage, Visual Basic, Visual C++, Visual J++, Visual InterDev, Visual SourceSafe, Visual C#, Visual J#, and Visual Studio are either registered trademarks or trademarks of Microsoft Corporation in the U.S.A. and/or other countries. Other product and company names herein may be the trademarks of their respective owners.
Page iii
Contents RELIABLE, TRANSACTED AND INSTRUMENTED MESSAGING WITH THE WINDOWS COMMUNICATION FOUNDATION ........................................................................................................................................... ..............1 Lab Objective............................................................................................................................. ..........................1 Exercise 1 – Define a data contract for use in a derivatives trading service................................................. ........1 Task 1 – Create the Derivatives Trading Service Project...................................................... ...........................2 Task 2 – Add a Reference to the Windows Communication Foundation to the TradingService Project...........3 Task 3 – Define the Contract that the Trading Service will Expose......................................... .........................4 Task 4 – Implement the Contract that the Trading Service Exposes............................................. ...................5 Task 5 – Host the Derivatives Trading Service in a .NET Executable................................... ...........................6 Task 6 – Configure the Trading Service................................................................................. ..........................8 Task 7 – Deploy the Derivatives Trading Service........................................................................... ..................8 Task 8– Build a Client for the Derivatives Trading Service............................................................... ................8 Task 9 – Use the Derivatives Trading Service............................................................................. ...................11 Exercise 2 – Manage state within a service..................................................................................................... ...13 Task 1 – Understand the Problem.................................................................................................... ..............13 Task 2 – Control the Sequence in which the Operations of the Trading Service are Invoked.........................14 Task 3 – Separate the Execution of Operations on behalf of Different Clients...............................................15 Task 4 – Keep Track of Data between the Invocations of Operations on behalf of a Single Client.................15 Task 5 – Make the Client Compatible with the Modified Derivatives Trading Service.................................. ...17 Task 6 – Test the Enhanced Solution....................................................................................... ......................17 Exercise 3 – Experiment with Reliable Sessions...................................................................................... ..........18 Task 1 – Understand the Problem.................................................................................................... ..............18 Task 2 – Turn Reliable Sessions On.................................................................................... ..........................19 Task 3 – Test the Enhanced Solution................................................................................... ........................20 Task 4 – Introduce Uncertainty for which Reliable Sessions can Compensate..............................................21 Task 5 – See the Windows Communication Foundation’s Reliable Message Facility Compensate for Lost Messages................................................................................................................................................. ......60 Exercise 4 – Build a queued messaging service for recording derivatives trades..............................................61 Task 1 – Install MSMQ if Necessary............................................................................................................ ...61 Task 2 – Add the Trade Recording Service to the Solution................................................ ............................63 Task 3 – Provide a Host for the Trade Recording Service................................................................... ...........64 Task 4 – Configure the Trade Recording Service.............................................................................. .............65 Task 5 – Deploy the Trade Recording Service........................................................................ .......................66 Task 6 – Use the Trade Recording Service................................................................................... .................66 Task 7 – Witness the Superior Availability of the Trade Recording Service............................................. .......69 Exercise 5 – Wrap a set of messages in a transaction......................................................................... ..............70 Task 1 – Program the Client............................................................................................................... ............70 Task 2 – Enhance the Derivatives Trading Service ................................................................... ....................72 Task 3 – Modify the Client to Conform to the Service ................................................................................ ....73 Task 4 – See the Composition of Multiple Distributed Operations into a Single Unit of Work.........................74 Exercise 6 – Add performance counters to a service.......................................................................................... 76 Task 1 – Define a Custom Trading Volume Performance Counter ............................................. ...................76 Task 2 – Update the Value of the Custom Trading Volume Performance Counter ........................................77 Task 3 – Monitor the Custom Trading Volume Performance Counter.......................................................... ...80
Page iv
Reliable, Transacted and Instrumented Messaging with the Windows Communication Foundation Lab Objective Estimated time to complete this lab: 60 minutes The objective of this lab is to demonstrate the Windows Communication Foundation’s facilities for reliable, queued and transacted messaging, and for state management. The lab also shows how to instrument Windows Communication Foundation services with custom performance counters. •
Exercise 1 – Define a data contract for use in a derivatives trading service
•
Exercise 2 – Manage state within a service
•
Exercise 3 – Experiment with Reliable Sessions
•
Exercise 4 – Build a queued messaging service for recording derivatives trades
•
Exercise 5 – Wrap a set of messages in a transaction
•
Exercise 6 – Add performance counters to a service
Exercise 1 – Define a data contract for use in a derivatives trading service In this exercise, you will build a Windows Communication Foundation service that uses a data contract to define the format of data in the messages it exchanges. The service is for trading in derivatives.
A derivative is a financial entity whose value is derived from that of another. Here is an example. The value of a single share of Microsoft Corporation Stock was $24.41 on October 11, 2005. Given that value, one might offer for sale an option to buy 1,000 of those shares for $25 each on November 11, 2005. Such an option, which is known as a call, might be purchased by someone who anticipates that the price of the shares will rise above $25 by November 11, 2005, and sold by someone who anticipates that the price of the shares will drop. The call is a derivative, its value being derived from the value of Microsoft Corporation stock. Pricing a derivative is a complex task. Indeed, estimating the value of derivatives is perhaps the most high-profile problem in modern microeconomics. In the case of our example, clearly the quantity of the stock, and the current and past prices of the Microsoft Corporation stock are factors to consider, but other factors might be based on analyses of the values of quantities that are thought to affect the prices of the stock, such as the values of various stock Page 1
market indices, or the interest rate of the U.S. Federal Reserve Bank. In fact, one can say that, in general, the price of derivative is some function of one or more quantities, one or more market values, and the outcome of one or more quantitative analytical functions. Thus, the service that you will construct in this exercise is one that allows a client to define a derivative trade, identify any number of quantitative analytical functions to be used in estimating the value of the derivative to be traded, and, optionally, make the trade. Task 1 – Create the Derivatives Trading Service Project 1.
Log onto the virtual PC with the username Administrator, and the password, pass@word1. A folder with an electronic copy of this manual is located on the desktop, which will be useful for copying and pasting code into Visual Studio .NET. There is one particularly long passage of code that would certainly have to be copied and pasted rather than typed.
2.
Open Visual Studio 2005, and create a new blank solution called, TradingService, in c:\Windows Communication Foundation\Labs, as shown in figure 1.1, below.
Figure 1.1 3.
Creating a blank Visual Studio solution
Add a C# Class Library project called, TradingService, to the solution, as shown in figure 1.2, below. Page 2
Figure 1.1
Adding a Class Library project to the solution
3. In the properties for the project, set the default namespace to Fabrikam Task 2 – Add a Reference to the Windows Communication Foundation to the TradingService Project 1.
Add references to the System.ServiceModel and System.Runtime.Serialization .NET assemblies, to the TradingService project, as shown in figure 1.3, below.
Page 3
Figure 1.1 assemblies
Adding references to the Windows Communication Foundation
Task 3 – Define the Contract that the Trading Service will Expose 2.
Rename the class file, Class1.cs, in the TradingService project to ITradingService.cs, and modify the content thereof to read as follows: using using using using
System; System.Collections.Generic; System.ServiceModel; System.Text;
namespace Fabrikam { [ServiceContract] public interface ITradingService { [OperationContract] string BeginDeal(); [OperationContract] void AddTrade(Trade trade); [OperationContract] void AddFunction(string function); [OperationContract] decimal Calculate(); [OperationContract] void Purchase(); Page 4
[OperationContract] void EndDeal(); } 3.
}
Thus, you have defined an interface, ITradingService, with a method, AddTrade(), that accepts an instance of a class named Trade as a parameter. Furthermore, the AddTrade() method has the [OperationContract] attribute, which implies that parameters may be passed to it from a remote client. Therefore, not only must a Trade class be defined, but a data contract must also be defined for that class to specify the format in which instances of the class will be transmitted to the AddTrade() method from remote clients. So, add a class named, Trade.cs, to the project, and modify its contents to look like this: using using using using
System; System.Collections.Generic; System.Runtime.Serialization; System.Text;
namespace Fabrikam { [DataContract(Namespace="Fabrikam",Name="Trade")] public class Trade { [DataMember] public string Symbol; [DataMember] public long? Count; [DataMember] public DateTime? Date; public override string ToString() { string symbol = (this.Symbol != null)?((this.Symbol != string.Empty)?this.Symbol:"?"):"?"; string date = (this.Date != null)?this.Date.Value.ToShortDateString():"?"; string count = (this.Count != null)?this.Count.ToString():"?"; return string.Format("{0}x{1} on {2}",count,symbol,date); }
}
}
Task 4 – Implement the Contract that the Trading Service Exposes 4.
Add a class named, TradingSystem.cs, to the project, and modify its contents in this way: using System; using System.Collections.Generic; using System.Text; namespace Fabrikam
Page 5
{
public class TradingSystem: ITradingService { #region ITradingService Members string ITradingService.BeginDeal() { string dealIdentifier = Guid.NewGuid().ToString(); return dealIdentifier; } void ITradingService.AddTrade(Trade trade) { Console.WriteLine("Added trade for {0}",trade); } void ITradingService.AddFunction(string function) { Console.WriteLine("Added function {0}",function); } decimal ITradingService.Calculate() { Decimal value = DateTime.Now.Millisecond/10; Console.WriteLine("Calculated value as {0}",value); return value; } void ITradingService.Purchase() { Console.WriteLine("Purchased!"); } void ITradingService.EndDeal() { Console.WriteLine("Completed deal."); } #endregion
}
5.
}
Compile the TradingService project to confirm that there are no syntax errors.
Task 5 – Host the Derivatives Trading Service in a .NET Executable 6. 7. 8.
Add a C# Console Application project called, TradingServiceHost, to the TradingService solution. Add a reference to the System.ServiceModel .NET assembly, to the Host project. Add a reference to the TradingService project to the TradingServiceHost project, as shown in figure 1.4, below.
Page 6
Figure 1.4 9.
Adding a reference to the TradingService project
Modify the Program.cs class in the TradingServiceHost project to read as follows: using using using using
System; System.Collections.Generic; System.ServiceModel; System.Text;
namespace Fabrikam { public class Program { public static void Main(string[] args) { Uri[] baseAddresses = new Uri[]{new Uri("net.tcp://localhost:8001/")}; using(ServiceHost host = new ServiceHost(typeof(TradingSystem),baseAddresses)) { host.Open(); Console.WriteLine("The trading service is available."); Console.ReadKey(); host.Close(); }
Page 7
}
} } 10.Compile
the TradingServiceHost project to ensure that there are no syntax errors.
Task 6 – Configure the Trading Service 11. Add
an application configuration file named, app.config, to the TradingServiceHost project in the TradingService solution. 12.Modify the contents of that file thusly: <system.serviceModel> <services> <service name="Fabrikam.TradingSystem"> <endpoint address="TradingService" binding="netTcpBinding" contract="Fabrikam.ITradingService"/>
Task 7 – Deploy the Derivatives Trading Service 13.Build
the TradingService solution. 14.Start a new instance of the TradingServiceHost project. 15.When you see a message in the console application window of the TradingServiceHost executable confirming that the derivatives trading service is available, enter a keystroke into the console application window to terminate it. Task 8– Build a Client for the Derivatives Trading Service 16.Add
a C# Console Application project called, Client, to the TradingService solution. references to the System.ServiceModel and System.Runtime.Serialization .NET assemblies, to the Client project. 18.Add a class module to the Client project, name it ITradingService.cs, and modify its contents to look like this: 17. Add
using using using using
System; System.Collections.Generic; System.ServiceModel; System.Text;
namespace Client {
Page 8
[ServiceContract] public interface ITradingService { [OperationContract] string BeginDeal(); [OperationContract] void AddTrade(Trade trade); [OperationContract] void AddFunction(string function); [OperationContract] Decimal Calculate(); [OperationContract] void Purchase(); [OperationContract] void EndDeal(); } }
19.Add
another class module to the client project, name it Trade.cs, and alter its contents in this way:
using using using using
System; System.Collections.Generic; System.Runtime.Serialization; System.Text;
namespace Client { [DataContract(Namespace="Fabrikam",Name="Trade")] public class Trade { [DataMember] public string Symbol; [DataMember] public long? Count; [DataMember] public DateTime? Date; } }
By so doing, you have defined a data contract in the client application that is compatible with the data contract defined in the derivatives trading service, although the class used to define the data contract in the client application is different from the class used to define the data contract in the service. 20.Add an application configuration file named, app.config, to the Client project and change its contents to look like this: <system.serviceModel> <endpoint name="TradingServiceConfiguration" address="net.tcp://localhost:8001/TradingService" binding="netTcpBinding"
Page 9
21. Modify
using using using using using
contract="Client.ITradingService"/>
the code in the Program.cs module of the Client project thusly: System; System.Collections.Generic; System.ServiceModel; System.ServiceModel.Channels; System.Text;
namespace Client { public class Program { public static void Main(string[] args) { Console.WriteLine("Press any key when the service is ready."); Console.ReadKey(); ITradingService dealProxy = new ChannelFactory("TradingServiceConfiguration").CreateChannel(); dealProxy.BeginDeal(); Trade trade = new Trade(); trade.Count = 10; trade.Symbol = "MSFT"; trade.Date = DateTime.Now.AddMonths(2); dealProxy.AddTrade(trade); dealProxy.AddFunction("InterestRateEstimation"); dealProxy.AddFunction("TechnologyStockEstimation"); decimal dealValue = dealProxy.Calculate(); Console.WriteLine("Deal value estimated at ${0}",dealValue); dealProxy.Purchase(); dealProxy.EndDeal(); ((IChannel)dealProxy).Close(); Console.WriteLine(); Console.WriteLine("Done."); Console.ReadKey(); }
}
} 22.Build
the Client project to confirm the absence of any syntax errors.
Page 10
Task 9 – Use the Derivatives Trading Service 23.Modify
the Startup Project properties of the TradingService solution as shown in figure 1.5, below.
Figure 1.5
The Startup Project Properties of the TradingService Solution
24.Choose
Debug and then Start Debugging from the Visual Studio menus. you see a message in the console application window of the TradingServiceHost executable confirming that the derivatives trading service is available, enter a keystroke into the console application window of the Client executable. The output in the console application window of the TradingServiceHost should be similar to the output shown in figure 1.6, below, while the output in the console application window of the Client should be similar to the output in figure 1.7. The numbers shown in your console application windows may vary slightly from the numbers shown in the figures, due to variations in the prevailing market conditions over time.
25.When
Page 11
Figure 1.6
Output from the Trading Service
Figure 1.7
Output from the Trading Service Client
26.Enter
a keystroke into the console application window of the Client executable to terminate it, and enter a keystroke into the console application window of the TradingServiceHost executable to terminate the trading service. Page 12
Exercise 2 – Manage state within a service In this exercise, you will use the Windows Communication Foundation’s facilities for managing state within a service. Task 1 – Understand the Problem 27. Examine
the code of the TradingSystem class in the TradingSystem.cs module of the TradingService project of the TradingService solution constructed in the previous exercise: using System; using System.Collections.Generic; using System.Text; namespace Fabrikam { public class TradingSystem: ITradingService { #region ITradingService Members string ITradingService.BeginDeal() { string dealIdentifier = Guid.NewGuid().ToString(); }
return dealIdentifier;
void ITradingService.AddTrade(Trade trade) { Console.WriteLine(string.Format("Added trade for {0}",trade)); } void ITradingService.AddFunction(string function) { Console.WriteLine(string.Format("Added function {0}",function)); } decimal ITradingService.Calculate() { Decimal value = DateTime.Now.Millisecond/10; Console.WriteLine(string.Format("Calculated value as {0}",value)); return value; } void ITradingService.Purchase() { Console.WriteLine("Purchased!"); } void ITradingService.EndDeal() { Console.WriteLine("Completed deal.");
Page 13
} }
#endregion
}
By default, a new instance of the TradingSystem class will be created every time any of the class’ methods are invoked via the Windows Communication Foundation. Therefore, by default, if the AddTrade() method is invoked one or more times, and then the Purchase() method, none of the data received from the invocations of the AddTrade() method will be available in the execution of the Purchase() method. Now, suppose that problem is overcome, and when the Purchase() method is invoked, all of the data received from preceding invocations of the AddTrade() method are available. An additional problem is that the client on behalf of which the Purchase() method is being invoked may not be the same client on behalf of whom all of the calls to the AddTrade() method had been made, so one client’s activities might inadvertently result in the execution of trades defined by another client. Furthermore, nothing in the code of the TradingSystem class ensures that the methods of the class are invoked in a valid sequence. Thus, not only is it possible for the Purchase() method to be invoked before the AddTrade() method has ever been invoked, but it is also possible for the EndDeal() method to be invoked before the BeginDeal() method has ever been invoked. You will solve all of these problems quickly in the next few tasks. They will be tackled in reverse order. Task 2 – Control the Sequence in which the Operations of the Trading Service are Invoked 28.Alter
the code in the ITradingService.cs module of the TradingService project as shown here:
using using using using
System; System.Collections.Generic; System.ServiceModel; System.Text;
namespace Fabrikam { [ServiceContract(Session=true)] public interface ITradingService { [OperationContract(IsInitiating=true,IsTerminating=false)] string BeginDeal(); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=true )] void AddTrade(Trade trade); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=true )] void AddFunction(string function); [OperationContract(IsInitiating=false,IsTerminating=false)] decimal Calculate(); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=fals e)] Page 14
void Purchase(); [OperationContract(IsInitiating=false,IsTerminating=true,IsOneWay=true) ]
void EndDeal(); }
}
Task 3 – Separate the Execution of Operations on behalf of Different Clients 29.Include
the System.ServiceModel namespace in the TradingSystem.cs module of the TradingService project. using using using using
30.Alter
System; System.Collections.Generic; System.ServiceModel; System.Text;
the code in the TradingSystem.cs module of the TradingService project in this way:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)] public class TradingSystem: ITradingService
Task 4 – Keep Track of Data between the Invocations of Operations on behalf of a Single Client 31. Add
this DealData class to the Fabrikam namespace in the TradingSystem.cs module of the TradingService project:
internal class DealData: IExtension { private string dealIdentifier = null; private List trades = null; public DealData(string dealIdentifier) { this.dealIdentifier = dealIdentifier; this.trades = new List(); } public string DealIdentifier { get { return this.dealIdentifier; } } public void AddTrade(Trade trade) { trades.Add(trade); } public Trade[] Trades { Page 15
get { }
return this.trades.ToArray();
} #region IExtension Members void IExtension.Attach(InstanceContext owner) { } void IExtension.Detach(InstanceContext owner) { } }
#endregion
The DealData class defines the structure of the state information that will be stored between the invocations of the operations of the Derivatives Trading Service. 32.Alter
the BeginDeal() method of the TradingService class in this way, to keep track of the identifier for a deal, by storing it in a DealData object within the extensions to the Windows Communication Foundation’s InstanceContext: string ITradingService.BeginDeal() { string dealIdentifier = Guid.NewGuid().ToString(); OperationContext.Current.InstanceContext.Extensions.Add(new DealData(dealIdentifier)); Console.WriteLine("Started deal {0}",dealIdentifier); return dealIdentifier; }
33.Modify
the AddTrade() method of the TradingService class to keep track of trades added to deals:
void ITradingService.AddTrade(Trade trade) { DealData dealData = OperationContext.Current.InstanceContext.Extensions.Find(); dealData.AddTrade(trade); Console.WriteLine("Added trade for {0}",trade); } 34.Change
the Purchase() method of the TradingService class so that it operates on the trades that have been added to a deal:
Page 16
void ITradingService.Purchase() { DealData dealData = OperationContext.Current.InstanceContext.Extensions.Find(); foreach(Trade trade in dealData.Trades) { Console.WriteLine("Purchased {0}",trade); } } 35.Alter
the EndDeal() method to use the deal identifier that was stored by the BeginDeal() method.
void ITradingService.EndDeal() { DealData dealData = OperationContext.Current.InstanceContext.Extensions.Find(); Console.WriteLine("Completed deal: {0}",dealData.DealIdentifier); }
Task 5 – Make the Client Compatible with the Modified Derivatives Trading Service 36.Alter
the interface, ITradingService in the ITradingService.cs module of the Client project in the TradingService solution, in this way: [ServiceContract(Session=true)] public interface ITradingService { [OperationContract(IsInitiating=true,IsTerminating=false)] string BeginDeal(); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=true)] void AddTrade(Trade trade); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=true)] void AddFunction(string function); [OperationContract(IsInitiating=false,IsTerminating=false)] decimal Calculate(); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=false) ] void Purchase(); [OperationContract(IsInitiating=false,IsTerminating=true,IsOneWay=true)] void EndDeal(); }
Task 6 – Test the Enhanced Solution 37. Choose
Debug and then Start Debugging from the Visual Studio menus. you see a message in the console application window of the TradingServiceHost executable confirming that the derivatives trading service is available, enter a keystroke into the console application window of the Client executable. The output in the console application window of the TradingServiceHost should be similar to the output shown in figure 2.1, below, indicating that the service is keeping track of a deal over a sequence of requests from the client.
38.When
Page 17
The numbers shown in your console application windows may vary slightly from the numbers shown in the figures, due to variations in the prevailing market conditions over time. 39.Enter a keystroke into the console application window of the Client executable to terminate it, and enter a keystroke into the console application window of the TradingServiceHost executable to terminate the trading service.
Figure 2.1
Output from the enhanced Trading Service
Exercise 3 – Experiment with Reliable Sessions In this exercise, you will witness the Windows Communication Foundation’s reliable session capabilities. Task 1 – Understand the Problem 40.Examine
the ITradingService interface in the TradingService project.
using System.Collections.Generic; using System.ServiceModel; using System.Text; namespace Fabrikam { [ServiceContract(Session=true)] public interface ITradingService { [OperationContract(IsInitiating=true,IsTerminating=false)] Page 18
string BeginDeal(); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=true )]
void AddTrade(Trade trade); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=true
)]
void AddFunction(string function); [OperationContract(IsInitiating=false,IsTerminating=false)] decimal Calculate(); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=fals e)]
void Purchase(); [OperationContract(IsInitiating=false,IsTerminating=true,IsOneWay=true)
] }
void EndDeal();
}
The IsInitiating and IsTerminating parameters that we have provided for the [OperationContract] attribute ensure that any sequence of invocations of the operations defined by the interface begin with an invocation of the BeginDeal() method, and end with the invocation of the EndDeal() method. However, if a client invokes the BeginDeal() method, and then the AddTrade() method twice, followed by the Purchase() method and then the EndDeal() method, but one of the invocations of the AddTrade() method never reaches the service, the operations of the service will still be invoked in a valid sequence, and the missing AddTrade() invocation will go unnoticed. Similarly, if the invocations of the AddTrade() method both arrive, but one is delayed and arrives after the invocation of the Purchase() method, then, once again, the operations of the service will still be invoked in a valid sequence, but not the sequence that the client intended. Fortunately, with the Windows Communication Foundation, guarding against these problems is very easy. In the next task, you will configure reliable sessions to ensure that no messages are lost. Task 2 – Turn Reliable Sessions On 41. Alter
the definition of the ITradingService interface in the TradingService solution in this way:
[ServiceContract(Session=true)] [DeliveryRequirements(RequireOrderedDelivery=true)] public interface ITradingService 42.Adjust
the definition of the ITradingService interface in the Client solution in the same way:
[ServiceContract(Session=true)] [DeliveryRequirements(RequireOrderedDelivery=true)] public interface ITradingService 43.Modify
the app.config file in the TradingServiceHost project of the TradingService solution, configuring the binding of the Derivatives Trading service to incorporate the Windows Communication Foundation’s implementation of the WS-ReliableMessaging protocol: <system.serviceModel> Page 19
<services> <service name="Fabrikam.TradingSystem"> <endpoint address=" TradingService" binding="netTcpBinding" bindingConfiguration="ReliableBinding" contract="Fabrikam.ITradingService"/>
44.Make
corresponding modifications to the app.config file in the Client project:
<system.serviceModel> <endpoint name="TradingServiceConfiguration" address="net.tcp://localhost:8001/TradingService" binding="netTcpBinding" bindingConfiguration="ReliableBinding" contract="Client.ITradingService"/>
Task 3 – Test the Enhanced Solution 45.Choose
Debug and then Start Debugging from the Visual Studio menus. you see a message in the console application window of the TradingServiceHost executable confirming that the derivatives trading service is available, enter a keystroke into the console application window of the Client executable. The output in the console application window of the TradingServiceHost should be similar to the output shown in figure 2.1, above, while the output in the console application window of the Client should be similar to the output in figure 1.7. The numbers shown in your console application windows may vary slightly from the numbers shown in the figures, due to variations in the prevailing market conditions over time.
46.When
Page 20
47. Enter
a keystroke into the console application window of the Client executable to terminate it, and enter a keystroke into the console application window of the TradingServiceHost executable to terminate the trading service.
Task 4 – Introduce Uncertainty for which Reliable Sessions can Compensate By completing the previous task, you will have seen that the Derivatives Trading Service and its client can communicate with one another without error via a reliable messaging protocol. However, because you were not able to see transmissions between the client and the service going astray, you were also not able to see for yourself how the reliable messaging protocol would compensate for such mishaps. Now you will add some logic to intercept transmissions from the client and randomly and visibly discard some of them. 48.Add
a new class module called InterceptorBinding.cs to the Client project, and replace the default contents of that class with this code:
using using using using using using using using using using using
System; System.Collections.Generic; System.ComponentModel; System.ComponentModel.Design.Serialization; System.Configuration; System.ServiceModel; System.ServiceModel.Channels; System.ServiceModel.Configuration; System.Text; System.Threading; System.Xml;
namespace Client { public class MessageInterceptor : IMessageInterceptor { private Random randomizer = null; private int dropRate = 3; public MessageInterceptor() { this.randomizer = new Random(DateTime.Now.Millisecond); } public void ProcessSend(ref Message message) { string action = message.Headers.Action; Console.WriteLine(string.Format("Voting on message {0} ...", action)); if (!(action.Contains("ITradingService"))) { return; } int randomNumber = this.randomizer.Next(10); if ((this.dropRate != 0) && (randomNumber <= this.dropRate)) { Page 21
Console.WriteLine("Dropping message."); message = null; return;
} Console.WriteLine("Forwarding message."); }
public void ProcessReceive(ref Message message) { } } public interface IMessageInterceptor { void ProcessReceive(ref Message message); void ProcessSend(ref Message message); } public class InterceptorBindingElement : BindingElement { IMessageInterceptor interceptor; public InterceptorBindingElement() :this(new MessageInterceptor()) { } public InterceptorBindingElement(IMessageInterceptor interceptor) { this.interceptor = interceptor; } protected InterceptorBindingElement(InterceptorBindingElement other) : base(other) { this.interceptor = other.interceptor; } public IMessageInterceptor Interceptor { get { return this.interceptor; } set { this.interceptor = value; } } public override IChannelFactory BuildChannelFactory(BindingContext context) { InterceptorChannelFactory result = new InterceptorChannelFactory(this.interceptor, context.Binding); result.InnerChannelFactory = context.BuildInnerChannelFactory(); return result; } Page 22
public override IChannelListener BuildChannelListener(BindingContext context) { InterceptorChannelListener result = new InterceptorChannelListener(this.interceptor, context.Binding); result.InnerChannelListener = context.BuildInnerChannelListener(); return result; } public override BindingElement Clone() { return new InterceptorBindingElement(this); } public override T GetProperty(BindingContext context) { return context.GetInnerProperty(); } } class InterceptorChannelBase : ChannelBase where TChannel : IChannel { TChannel innerChannel; IMessageInterceptor interceptor; protected InterceptorChannelBase(ChannelManagerBase manager, TChannel innerChannel, IMessageInterceptor interceptor) : base(manager) { this.innerChannel = innerChannel; this.interceptor = interceptor; } internal TChannel InnerChannel { get { return this.innerChannel; } } internal IMessageInterceptor Interceptor { get { return this.interceptor; } } protected override void OnAbort() { this.innerChannel.Abort(); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { return this.innerChannel.BeginClose(timeout, callback, state); } Page 23
protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return this.innerChannel.BeginOpen(timeout, callback, state); } protected override void OnClose(TimeSpan timeout) { this.innerChannel.Close(timeout); } protected override void OnEndClose(IAsyncResult result) { this.innerChannel.EndClose(result); } protected override void OnEndOpen(IAsyncResult result) { this.innerChannel.EndOpen(result); } protected override void OnOpen(TimeSpan timeout) { this.innerChannel.Open(timeout); } internal virtual void OnDropMessage() { } } class NullMessageInterceptor : IMessageInterceptor { public void ProcessSend(ref Message message) { }
}
public void ProcessReceive(ref Message message) { }
class InterceptorChannelFactory : ChannelFactoryBase { IMessageInterceptor interceptor; IChannelFactory innerChannelFactory; public InterceptorChannelFactory(IMessageInterceptor interceptor, IDefaultCommunicationTimeouts timeouts) : base(timeouts) { if (interceptor == null) { this.interceptor = new NullMessageInterceptor(); } else { Page 24
}
this.interceptor = interceptor;
} internal IChannelFactory InnerChannelFactory { get { return this.innerChannelFactory; } set { this.innerChannelFactory = value; } } public IMessageInterceptor Interceptor { get { return interceptor; } } public override MessageVersion MessageVersion { get { return this.innerChannelFactory.MessageVersion; } } public override string Scheme { get { return this.innerChannelFactory.Scheme; } } protected override void OnOpen(TimeSpan timeout) { this.innerChannelFactory.Open(timeout); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return this.innerChannelFactory.BeginOpen(timeout, callback, state); } protected override void OnEndOpen(IAsyncResult result) { this.innerChannelFactory.EndOpen(result); } protected override void OnClose(TimeSpan timeout) { this.innerChannelFactory.Close(timeout); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { return this.innerChannelFactory.BeginClose(timeout, callback, state); } protected override void OnEndClose(IAsyncResult result) { this.innerChannelFactory.EndClose(result); } Page 25
protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via)
{
TChannel innerChannel = this.InnerChannelFactory.CreateChannel(remoteAddress, via); if (typeof(TChannel) == typeof(IOutputChannel)) { return (TChannel)(object)new InterceptorOutputChannel(this, this.interceptor, (IOutputChannel)innerChannel); } else if (typeof(TChannel) == typeof(IRequestChannel)) { return (TChannel)(object)new InterceptorRequestChannel(this, this.interceptor, (IRequestChannel)innerChannel); } else if (typeof(TChannel) == typeof(IDuplexChannel)) { return (TChannel)(object)new InterceptorDuplexChannel(this, (IDuplexChannel)innerChannel, this.interceptor); } else if (typeof(TChannel) == typeof(IOutputSessionChannel)) { return (TChannel)(object)new InterceptorOutputSessionChannel(this, this.interceptor, (IOutputSessionChannel)innerChannel); } else if (typeof(TChannel) == typeof(IRequestSessionChannel)) { return (TChannel)(object)new InterceptorRequestSessionChannel(this, this.interceptor, (IRequestSessionChannel)innerChannel); } else if (typeof(TChannel) == typeof(IDuplexSessionChannel)) { return (TChannel)(object)new InterceptorDuplexSessionChannel(this, (IDuplexSessionChannel)innerChannel, this.interceptor); } throw new ArgumentException("Channel type is not supported.", "TChannel"); } class InterceptorOutputChannel : InterceptorChannelBase, IOutputChannel { public InterceptorOutputChannel(ChannelFactoryBase factory, IMessageInterceptor interceptor, IOutputChannel innerChannel) : base(factory, innerChannel, interceptor) { } public EndpointAddress RemoteAddress { get { return this.InnerChannel.RemoteAddress; } } public Uri Via Page 26
{
get { return this.InnerChannel.Via; }
} public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state) { ThrowIfDisposedOrNotOpen(); return new SendAsyncResult(this, message, callback, state); } public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) { ThrowIfDisposedOrNotOpen(); return new SendAsyncResult(this, message, timeout, callback, state); } public void EndSend(IAsyncResult result) { SendAsyncResult.End(result); } public void Send(Message message) { ThrowIfDisposedOrNotOpen(); Interceptor.ProcessSend(ref message); if (message == null) { OnDropMessage(); } else { this.InnerChannel.Send(message); } } public void Send(Message message, TimeSpan timeout) { ThrowIfDisposedOrNotOpen(); Interceptor.ProcessSend(ref message); if (message == null) { OnDropMessage(); } else { this.InnerChannel.Send(message, timeout); } } } class InterceptorRequestChannel : InterceptorChannelBase, IRequestChannel Page 27
{
public InterceptorRequestChannel(ChannelFactoryBase factory, IMessageInterceptor interceptor, IRequestChannel innerChannel) : base(factory, innerChannel, interceptor) { } public EndpointAddress RemoteAddress { get { return this.InnerChannel.RemoteAddress; } } public Uri Via { get { return this.InnerChannel.Via; } } public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state) { ThrowIfDisposedOrNotOpen(); return new RequestAsyncResult(this, message, callback, state); } public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state) { ThrowIfDisposedOrNotOpen(); return new RequestAsyncResult(this, message, timeout, callback, state); } public Message EndRequest(IAsyncResult result) { return RequestAsyncResult.End(result); } public Message Request(Message message) { ThrowIfDisposedOrNotOpen(); Interceptor.ProcessSend(ref message); if (message == null) { OnDropMessage(); return Message.CreateMessage(MessageVersion.Soap12WSAddressing10, new DroppedMessageFault("Request Message dropped by interceptor."), "http://www.w3.org/2005/08/addressing/fault"); } else { Message reply = this.InnerChannel.Request(message); Interceptor.ProcessReceive(ref reply); if (reply == null) { Page 28
OnDropMessage(); return Message.CreateMessage(MessageVersion.Soap12WSAddressing10, new DroppedMessageFault("Reply Message dropped by the interceptor."), "http://www.w3.org/2005/08/addressing/fault"); } else { return reply; } } } public Message Request(Message message, TimeSpan timeout) { ThrowIfDisposedOrNotOpen(); Interceptor.ProcessSend(ref message); if (message == null) { OnDropMessage(); return Message.CreateMessage(message.Version, new DroppedMessageFault("Request Message dropped by interceptor."), "http://www.w3.org/2005/08/addressing/fault"); } else { Message reply = this.InnerChannel.Request(message, timeout); Interceptor.ProcessReceive(ref reply); if (reply == null) { OnDropMessage(); return Message.CreateMessage(message.Version, new DroppedMessageFault("Reply Message dropped by the interceptor."), "http://www.w3.org/2005/08/addressing/fault"); } else { return reply; } } } } class InterceptorOutputSessionChannel : InterceptorOutputChannel, IOutputSessionChannel { IOutputSessionChannel innerSessionChannel; public InterceptorOutputSessionChannel(ChannelFactoryBase factory, IMessageInterceptor interceptor, IOutputSessionChannel innerChannel) : base(factory, interceptor, innerChannel) { this.innerSessionChannel = innerChannel; }
Page 29
public IOutputSession Session { get { return innerSessionChannel.Session; } } internal override void OnDropMessage() { Fault(); innerSessionChannel.Abort(); } } class InterceptorRequestSessionChannel : InterceptorRequestChannel, IRequestSessionChannel { IRequestSessionChannel innerSessionChannel; public InterceptorRequestSessionChannel(ChannelFactoryBase factory, IMessageInterceptor interceptor, IRequestSessionChannel innerChannel) : base(factory, interceptor, innerChannel) { this.innerSessionChannel = innerChannel; } public IOutputSession Session { get { return innerSessionChannel.Session; } }
}
internal override void OnDropMessage() { Fault(); innerSessionChannel.Abort(); }
} class InterceptorConverter : TypeConverter { public override bool CanConvertFrom(ITypeDescriptorContext context, Type sourceType) { if (typeof(string) == sourceType) { return true; } return base.CanConvertFrom(context, sourceType); } public override bool CanConvertTo(ITypeDescriptorContext context, Type destinationType) { if (typeof(InstanceDescriptor) == destinationType) { return true; Page 30
} return base.CanConvertTo(context, destinationType); } public override object ConvertFrom(ITypeDescriptorContext context, System.Globalization.CultureInfo culture, object value) { if (value is string) { if (null == value) { throw new ArgumentNullException("value"); } Type type = Type.GetType((string)value, true); // fusion object retval = Activator.CreateInstance(type); return retval; } return base.ConvertFrom(context, culture, value); } public override object ConvertTo(ITypeDescriptorContext context, System.Globalization.CultureInfo culture, object value, Type destinationType) { if (typeof(string) == destinationType && value is IMessageInterceptor) { if (null == value) { throw new ArgumentNullException("value"); } return value.GetType().Namespace; } }
return base.ConvertTo(context, culture, value, destinationType);
} class InterceptorDuplexChannel : InterceptorChannelBase, IDuplexChannel { public InterceptorDuplexChannel(ChannelManagerBase manager, IDuplexChannel innerChannel, IMessageInterceptor interceptor) : base(manager, innerChannel, interceptor) { } public EndpointAddress LocalAddress { get { return this.InnerChannel.LocalAddress; } } public EndpointAddress RemoteAddress { get { return this.InnerChannel.RemoteAddress; } } Page 31
public Uri Via { get { return this.InnerChannel.Via; } } public IAsyncResult BeginReceive(AsyncCallback callback, object state) { return this.BeginReceive(DefaultReceiveTimeout, callback, state); } public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) { return this.BeginTryReceive(timeout, callback, state); } public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state) { return this.BeginSend(message, DefaultSendTimeout, callback, state); } public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) { ThrowIfDisposedOrNotOpen(); return new SendAsyncResult(this, message, timeout, callback, state); } public Message EndReceive(IAsyncResult result) { Message message; this.EndTryReceive(result, out message); return message; } public void EndSend(IAsyncResult result) { SendAsyncResult.End(result); } public bool TryReceive(TimeSpan timeout, out Message message) { ThrowIfDisposedOrNotOpen(); do { if (this.InnerChannel.TryReceive(timeout, out message)) { if (message == null) { return true; } else { Page 32
}
Interceptor.ProcessReceive(ref message); if (message == null) { OnDropMessage(); }
} else {
return false; } } while (message == null); return true; } public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) { ThrowIfDisposedOrNotOpen(); return new TryReceiveAsyncResult(this, timeout, callback, state); } public bool EndTryReceive(IAsyncResult result, out Message message) { message = TryReceiveAsyncResult.End(result); return true; } public bool WaitForMessage(TimeSpan timeout) { return this.InnerChannel.WaitForMessage(timeout); } public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) { return this.InnerChannel.BeginWaitForMessage(timeout, callback, state); } public bool EndWaitForMessage(IAsyncResult result) { return this.InnerChannel.EndWaitForMessage(result); } public Message Receive() { return this.Receive(DefaultReceiveTimeout); } public Message Receive(TimeSpan timeout) { Message message; if (this.TryReceive(timeout, out message)) { Page 33
return message;
} else { }
throw new TimeoutException("Receive timed out.");
} public void Send(Message message) { this.Send(message, DefaultSendTimeout); } public void Send(Message message, TimeSpan timeout) { ThrowIfDisposedOrNotOpen(); Interceptor.ProcessSend(ref message); if (message == null) { OnDropMessage(); } else { this.InnerChannel.Send(message, timeout); } } } class InterceptorDuplexSessionChannel : InterceptorDuplexChannel, IDuplexSessionChannel { IDuplexSessionChannel innerSessionChannel; public InterceptorDuplexSessionChannel(ChannelManagerBase manager, IDuplexSessionChannel innerChannel, IMessageInterceptor interceptor) : base(manager, innerChannel, interceptor) { this.innerSessionChannel = innerChannel; } public IDuplexSession Session { get { return innerSessionChannel.Session; } }
}
internal override void OnDropMessage() { //Fault(); //innerSessionChannel.Abort(); }
class InterceptorChannelListener : ChannelListenerBase where TChannel : class, IChannel { Page 34
IMessageInterceptor interceptor; public InterceptorChannelListener(IMessageInterceptor interceptor, IDefaultCommunicationTimeouts timeouts) : base(timeouts) { if (interceptor == null) { this.interceptor = new NullMessageInterceptor(); } else { this.interceptor = interceptor; } } public new IChannelListener InnerChannelListener { get { return (IChannelListener)base.InnerChannelListener; } set { base.InnerChannelListener = value; } } public IMessageInterceptor Interceptor { get { return interceptor; } } public override Uri Uri { get { return GetInnerListenerSnapshot().Uri; } } public override MessageVersion MessageVersion { get { return GetInnerListenerSnapshot().MessageVersion; } } public override string Scheme { get { return GetInnerListenerSnapshot().Scheme; } } protected override void OnOpen(TimeSpan timeout) { this.InnerChannelListener.Open(timeout); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return this.InnerChannelListener.BeginOpen(timeout, callback, state); } protected override void OnEndOpen(IAsyncResult result) { this.InnerChannelListener.EndOpen(result); Page 35
} protected override void OnClose(TimeSpan timeout) { this.InnerChannelListener.Close(timeout); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { return this.InnerChannelListener.BeginClose(timeout, callback, state); } protected override void OnEndClose(IAsyncResult result) { this.InnerChannelListener.EndClose(result); } void ThrowIfInnerListenerNotSet() { if (this.InnerChannelListener == null) { throw new InvalidOperationException("Inner listener is not set."); } } IChannelListener GetInnerListenerSnapshot() { IChannelListener innerChannelListener = this.InnerChannelListener; if (innerChannelListener == null) { throw new InvalidOperationException("Inner listener is not set."); } return innerChannelListener; } protected override TChannel OnAcceptChannel(TimeSpan timeout) { TChannel innerChannel = this.InnerChannelListener.AcceptChannel(timeout); return OnAcceptChannel(innerChannel); } protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state) { return this.InnerChannelListener.BeginAcceptChannel(timeout, callback, state); } protected override TChannel OnEndAcceptChannel(IAsyncResult result) { TChannel innerChannel = this.InnerChannelListener.EndAcceptChannel(result); Page 36
}
return OnAcceptChannel(innerChannel);
protected override bool OnWaitForChannel(TimeSpan timeout) { return this.InnerChannelListener.WaitForChannel(timeout); } protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state) { return this.InnerChannelListener.BeginWaitForChannel(timeout, callback, state); } protected override bool OnEndWaitForChannel(IAsyncResult result) { return this.InnerChannelListener.EndWaitForChannel(result); } TChannel OnAcceptChannel(TChannel innerChannel) { if (innerChannel == null) { return null; } if (typeof(TChannel) == typeof(IInputChannel)) { return (TChannel)(object)new InterceptorInputChannel(this, (IInputChannel)innerChannel); } else if (typeof(TChannel) == typeof(IReplyChannel)) { return (TChannel)(object)new InterceptorReplyChannel(this, (IReplyChannel)innerChannel); } else if (typeof(TChannel) == typeof(IDuplexChannel)) { return (TChannel)(object)new InterceptorDuplexChannel(this, (IDuplexChannel)innerChannel, this.interceptor); } else if (typeof(TChannel) == typeof(IInputSessionChannel)) { return (TChannel)(object)new InterceptorInputSessionChannel(this, (IInputSessionChannel)innerChannel); } else if (typeof(TChannel) == typeof(IReplySessionChannel)) { return (TChannel)(object)new InterceptorReplySessionChannel(this, (IReplySessionChannel)innerChannel); } else if (typeof(TChannel) == typeof(IDuplexSessionChannel)) { return (TChannel)(object)new InterceptorDuplexSessionChannel(this, (IDuplexSessionChannel)innerChannel, this.interceptor); Page 37
} // Cannot wrap this channel. return innerChannel; } class InterceptorInputChannel : InterceptorChannelBase, IInputChannel { InterceptorChannelListener listener; public InterceptorInputChannel(InterceptorChannelListener listener, IInputChannel innerChannel) : base(listener, innerChannel, listener.interceptor) { this.listener = listener; } public EndpointAddress LocalAddress { get { return this.InnerChannel.LocalAddress; } } public IAsyncResult BeginReceive(AsyncCallback callback, object state) { return this.BeginReceive(listener.DefaultReceiveTimeout, callback, state);
}
public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) { return this.BeginTryReceive(timeout, callback, state); } public Message EndReceive(IAsyncResult result) { Message message; this.EndTryReceive(result, out message); return message; } public bool TryReceive(TimeSpan timeout, out Message message) { ThrowIfDisposedOrNotOpen(); do { if (this.InnerChannel.TryReceive(timeout, out message)) { if (message == null) { return true; } else { Interceptor.ProcessReceive(ref message); Page 38
if (message == null) { OnDropMessage(); } }
} else {
return false; } } while (message == null); }
return true;
public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) { ThrowIfDisposedOrNotOpen(); return new TryReceiveAsyncResult(this, timeout, callback, state); } public bool EndTryReceive(IAsyncResult result, out Message message) { message = TryReceiveAsyncResult.End(result); return true; } public bool WaitForMessage(TimeSpan timeout) { return this.InnerChannel.WaitForMessage(timeout); } public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) { return this.InnerChannel.BeginWaitForMessage(timeout, callback, state); } public bool EndWaitForMessage(IAsyncResult result) { return this.InnerChannel.EndWaitForMessage(result); } public Message Receive() { return this.Receive(listener.DefaultReceiveTimeout); } public Message Receive(TimeSpan timeout) { Message message; if (this.TryReceive(timeout, out message)) { Page 39
return message;
} else { } }
throw new TimeoutException("Receive timed out.");
}
class InterceptorReplyChannel : InterceptorChannelBase, IReplyChannel { InterceptorChannelListener listener; public InterceptorReplyChannel(InterceptorChannelListener listener, IReplyChannel innerChannel) : base(listener, innerChannel, listener.interceptor) { this.listener = listener; } public EndpointAddress LocalAddress { get { return this.InnerChannel.LocalAddress; } } state)
public IAsyncResult BeginReceiveRequest(AsyncCallback callback, object {
return this.BeginReceiveRequest(listener.DefaultReceiveTimeout, callback, state); } public IAsyncResult BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state) { return this.BeginTryReceiveRequest(timeout, callback, state); } public IRequestContext EndReceiveRequest(IAsyncResult result) { IRequestContext requestContext; this.EndTryReceiveRequest(result, out requestContext); return requestContext; } public bool TryReceiveRequest(TimeSpan timeout, out IRequestContext requestContext) { ThrowIfDisposedOrNotOpen(); Message resultMessage; IRequestContext innerRequestContext; requestContext = null; do Page 40
{
if (this.InnerChannel.TryReceiveRequest(timeout, out innerRequestContext)) { if (innerRequestContext == null) { return true; } else { resultMessage = innerRequestContext.RequestMessage; Interceptor.ProcessReceive(ref resultMessage); if (resultMessage == null) { OnDropMessage(); } else { requestContext = new InterceptorRequestContext(resultMessage, this, innerRequestContext, timeout); } } } else { return false; } } while (resultMessage == null); return true; } public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state) { ThrowIfDisposedOrNotOpen(); return new TryReceiveRequestAsyncResult(this, timeout, callback, state); } public bool EndTryReceiveRequest(IAsyncResult result, out IRequestContext requestContext) { IRequestContext context; Message message = TryReceiveRequestAsyncResult.End(result, out context); if (context == null) { requestContext = null; } else { requestContext = new InterceptorRequestContext(message, this, context, listener.DefaultSendTimeout); } return true; Page 41
} public bool WaitForRequest(TimeSpan timeout) { return this.InnerChannel.WaitForRequest(timeout); } public IAsyncResult BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state) { return this.InnerChannel.BeginWaitForRequest(timeout, callback, state); } public bool EndWaitForRequest(IAsyncResult result) { return this.InnerChannel.EndWaitForRequest(result); } public IRequestContext ReceiveRequest() { return this.ReceiveRequest(listener.DefaultReceiveTimeout); } public IRequestContext ReceiveRequest(TimeSpan timeout) { IRequestContext requestContext; if (this.TryReceiveRequest(timeout, out requestContext)) { return requestContext; } else { throw new TimeoutException("Receive request timed out."); } } class InterceptorRequestContext : IRequestContext { Message message; InterceptorReplyChannel channel; IRequestContext innerContext; TimeSpan defaultSendTimeout; public InterceptorRequestContext(Message message, InterceptorReplyChannel channel, IRequestContext innerContext, TimeSpan defaultSendTimeout) { this.channel = channel; this.innerContext = innerContext; this.message = message; this.defaultSendTimeout = defaultSendTimeout; } public Message RequestMessage { Page 42
}
get { return this.message; }
public void Abort() { this.innerContext.Abort(); } public IAsyncResult BeginReply(Message message, AsyncCallback callback, object state) { return this.BeginReply(message, defaultSendTimeout, callback, state); } public IAsyncResult BeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state) { return new ReplyAsyncResult(channel, message, timeout, innerContext, callback, state); } public void Close() { this.innerContext.Close(); } public void Close(TimeSpan timeout) { this.innerContext.Close(timeout); } public void Dispose() { this.innerContext.Dispose(); } public void EndReply(IAsyncResult result) { ReplyAsyncResult.End(result); } public void Reply(Message message) { this.Reply(message, defaultSendTimeout); } public void Reply(Message message, TimeSpan timeout) { channel.Interceptor.ProcessSend(ref message); if (message == null) { channel.OnDropMessage(); } else { Page 43
} }
this.innerContext.Reply(message, timeout);
}
} class InterceptorInputSessionChannel : InterceptorInputChannel, IInputSessionChannel { IInputSessionChannel innerSessionChannel; public InterceptorInputSessionChannel(InterceptorChannelListener listener, IInputSessionChannel innerChannel) : base(listener, innerChannel) { this.innerSessionChannel = innerChannel; } public IInputSession Session { get { return innerSessionChannel.Session; } }
}
internal override void OnDropMessage() { Fault(); innerSessionChannel.Abort(); }
class InterceptorReplySessionChannel : InterceptorReplyChannel, IReplySessionChannel { IReplySessionChannel innerSessionChannel; public InterceptorReplySessionChannel(InterceptorChannelListener listener, IReplySessionChannel innerChannel) : base(listener, innerChannel) { this.innerSessionChannel = innerChannel; } public IInputSession Session { get { return innerSessionChannel.Session; } } internal override void OnDropMessage() { Fault(); innerSessionChannel.Abort(); } }
}
Page 44
class InterceptorSection : BindingElementExtensionSection { const string ClassType = "type"; public override Type BindingElementType { get { return typeof(InterceptorBindingElement); } } [ConfigurationProperty(ClassType)] [TypeConverter(typeof(InterceptorConverter))] public IMessageInterceptor Interceptor { get { return (IMessageInterceptor)base[ClassType]; } set { base[ClassType] = value; } } protected override BindingElement CreateBindingElement() { InterceptorBindingElement bindingElement = new InterceptorBindingElement((IMessageInterceptor)base[ClassType]); return bindingElement; } } class ReplyAsyncResult : AsyncResult { Message message; TimeSpan timeout; IRequestContext innerContext; InterceptorChannelBase channel; public ReplyAsyncResult(InterceptorChannelBase channel, Message message, TimeSpan timeout, IRequestContext innerContext, AsyncCallback callback, object state) : base(callback, state) { this.message = message; this.timeout = timeout; this.innerContext = innerContext; this.channel = channel; channel.Interceptor.ProcessSend(ref this.message); if (this.message == null) { channel.OnDropMessage(); this.message = Message.CreateMessage(message.Version, new DroppedMessageFault("Reply Message dropped by interceptor."), "http://www.w3.org/2005/08/addressing/fault"); Complete(true); } else {
Page 45
innerContext.BeginReply(this.message, new AsyncCallback(HandleCallback), null); } } void HandleCallback(IAsyncResult asyncResult) { try { innerContext.EndReply(asyncResult); Complete(false); } catch (Exception e) { Complete(false, e); } } public new WaitHandle AsyncWaitHandle { get { return base.AsyncWaitHandle; } } public new bool IsCompleted { get { return base.IsCompleted; } } public new bool CompletedSynchronously { get { return base.CompletedSynchronously; } } public static void End(IAsyncResult result) { if (result == null) { throw new ArgumentNullException("result"); } ReplyAsyncResult requestResult = result as ReplyAsyncResult; if (requestResult == null) { throw new ArgumentException("Invalid AsyncResult", "result"); } AsyncResult.End(requestResult); } } class RequestAsyncResult : AsyncResult { Message message; TimeSpan timeout; InterceptorChannelBase channel;
Page 46
public RequestAsyncResult(InterceptorChannelBase channel, Message message, AsyncCallback callback, object state) : base(callback, state) { this.message = message; this.channel = channel; channel.Interceptor.ProcessSend(ref this.message); if (this.message == null) { channel.OnDropMessage(); this.message = Message.CreateMessage(MessageVersion.Soap12WSAddressing10, new DroppedMessageFault("Request Message dropped by interceptor."), "http://www.w3.org/2005/08/addressing/fault"); Complete(true); } else { channel.InnerChannel.BeginRequest(this.message, new AsyncCallback(HandleCallback), null); } } public RequestAsyncResult(InterceptorChannelBase channel, Message message, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.message = message; this.timeout = timeout; this.channel = channel; channel.Interceptor.ProcessSend(ref this.message); if (this.message == null) { channel.OnDropMessage(); this.message = Message.CreateMessage(message.Version, new DroppedMessageFault("Request Message dropped by interceptor."), "http://www.w3.org/2005/08/addressing/fault"); Complete(true); } else { channel.InnerChannel.BeginRequest(this.message, timeout, new AsyncCallback(HandleCallback), null); } } void HandleCallback(IAsyncResult asyncResult) { try { Message reply = channel.InnerChannel.EndRequest(asyncResult); channel.Interceptor.ProcessReceive(ref reply); if (reply == null) { Page 47
channel.OnDropMessage(); this.message = Message.CreateMessage(MessageVersion.Soap12WSAddressing10, new DroppedMessageFault("Reply Message dropped by interceptor."), "http://www.w3.org/2005/08/addressing/fault"); } else { this.message = reply; } Complete(false); } catch (Exception e) { Complete(false, e); } } public new WaitHandle AsyncWaitHandle { get { return base.AsyncWaitHandle; } } public new bool IsCompleted { get { return base.IsCompleted; } } public new bool CompletedSynchronously { get { return base.CompletedSynchronously; } } public static Message End(IAsyncResult result) { if (result == null) { throw new ArgumentNullException("result"); } RequestAsyncResult requestResult = result as RequestAsyncResult; if (requestResult == null) { throw new ArgumentException("Invalid AsyncResult", "result"); } AsyncResult.End(requestResult); }
return requestResult.message;
} class SendAsyncResult : AsyncResult where TChannel : IOutputChannel { Message message; TimeSpan timeout; InterceptorChannelBase channel; Page 48
public SendAsyncResult(InterceptorChannelBase channel, Message message, AsyncCallback callback, object state) : base(callback, state) { this.message = message; this.channel = channel; channel.Interceptor.ProcessSend(ref this.message); if (this.message == null) { channel.OnDropMessage(); Complete(true); } else { channel.InnerChannel.BeginSend(this.message, new AsyncCallback(HandleCallback), null); } } public SendAsyncResult(InterceptorChannelBase channel, Message message, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.message = message; this.timeout = timeout; this.channel = channel; channel.Interceptor.ProcessSend(ref this.message); if (this.message == null) { channel.OnDropMessage(); Complete(true); } else { channel.InnerChannel.BeginSend(this.message, timeout, new AsyncCallback(HandleCallback), null); } } void HandleCallback(IAsyncResult asyncResult) { try { channel.InnerChannel.EndSend(asyncResult); Complete(false); } catch (Exception e) { Complete(false, e); } } public new WaitHandle AsyncWaitHandle Page 49
{
get { return base.AsyncWaitHandle; }
} public new bool IsCompleted { get { return base.IsCompleted; } } public new bool CompletedSynchronously { get { return base.CompletedSynchronously; } } public static void End(IAsyncResult result) { if (result == null) { throw new ArgumentNullException("result"); } SendAsyncResult outputResult = result as SendAsyncResult; if (outputResult == null) { throw new ArgumentException("Invalid AsyncResult", "result"); } AsyncResult.End(outputResult); } } class TryReceiveAsyncResult : AsyncResult where TChannel : IInputChannel { Message message; TimeSpan timeout; InterceptorChannelBase channel; public TryReceiveAsyncResult(InterceptorChannelBase channel, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.channel = channel; this.timeout = timeout; channel.InnerChannel.BeginTryReceive(timeout, new AsyncCallback(HandleCallback), null); } void HandleCallback(IAsyncResult asyncResult) { Message message; try { if (channel.InnerChannel.EndTryReceive(asyncResult, out message)) { if (message == null) { Page 50
this.message = null; Complete(false);
} else {
channel.Interceptor.ProcessReceive(ref message); if (message == null) { channel.OnDropMessage(); asyncResult = channel.InnerChannel.BeginTryReceive(timeout, new AsyncCallback(HandleCallback), null); } else { this.message = message; Complete(false); }
}
} else {
Complete(false, new TimeoutException("Receive request timed
out."));
}
} } catch (Exception e) { Complete(false, e); }
public static Message End(IAsyncResult asyncResult) { if (asyncResult == null) { throw new ArgumentNullException("asyncResult"); } TryReceiveAsyncResult inputResult = asyncResult as TryReceiveAsyncResult; if (inputResult == null) { throw new ArgumentException("Invalid AsyncResult", "asyncResult"); } AsyncResult.End(inputResult); return inputResult.message; }
}
class TryReceiveRequestAsyncResult : AsyncResult { Message message; Page 51
IRequestContext context; TimeSpan timeout; InterceptorChannelBase channel; public TryReceiveRequestAsyncResult(InterceptorChannelBase channel, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.timeout = timeout; this.channel = channel; channel.InnerChannel.BeginTryReceiveRequest(timeout, new AsyncCallback(HandleCallback), null); } void HandleCallback(IAsyncResult asyncResult) { IRequestContext context; try { if (channel.InnerChannel.EndTryReceiveRequest(asyncResult, out context)) { if (context == null) { this.message = null; this.context = null; Complete(false); } else {
Message result = context.RequestMessage;
channel.Interceptor.ProcessReceive(ref result); if (result == null) { channel.OnDropMessage(); asyncResult = channel.InnerChannel.BeginTryReceiveRequest(timeout, new AsyncCallback(HandleCallback), null); } else { this.message = result; this.context = context; }
Complete(false);
}
} else {
Complete(false, new TimeoutException("Receive request timed
out.")); }
}
Page 52
catch (Exception e) { Complete(false, e); } } public static Message End(IAsyncResult asyncResult, out IRequestContext
context)
{
if (asyncResult == null) { throw new ArgumentNullException("asyncResult"); } TryReceiveRequestAsyncResult replyResult = asyncResult as TryReceiveRequestAsyncResult; if (replyResult == null) { throw new ArgumentException("Invalid AsyncResult", "asyncResult"); } AsyncResult.End(replyResult);
}
context = replyResult.context; return replyResult.message;
} class DroppedMessageFault : MessageFault { FaultReason faultReason; FaultCode faultCode; public DroppedMessageFault(string reason) { faultReason = new FaultReason(reason); faultCode = new FaultCode("DroppedMessageFault"); } public override FaultCode Code { get { return faultCode; } } public override bool HasDetail { get { return false; } } public override FaultReason Reason { get { return faultReason; } } protected override void OnWriteDetailContents(XmlDictionaryWriter writer) {
Page 53
throw new NotImplementedException("The method or operation is not implemented."); } } abstract class AsyncResult : IAsyncResult { AsyncCallback callback; object state; bool completedSynchronously; bool endCalled; Exception exception; bool isCompleted; ManualResetEvent manualResetEvent; protected AsyncResult(AsyncCallback callback, object state) { this.callback = callback; this.state = state; } public object AsyncState { get { return state; } } public WaitHandle AsyncWaitHandle { get { if (manualResetEvent != null) { return manualResetEvent; } lock (ThisLock) { if (manualResetEvent == null) { manualResetEvent = new ManualResetEvent(isCompleted); } } return manualResetEvent; }
}
public bool CompletedSynchronously { get { return completedSynchronously; } } public bool IsCompleted { get { return isCompleted; } } Page 54
object ThisLock { get { return this; } } protected void Complete(bool completedSynchronously) { if (isCompleted) { throw new InvalidOperationException("AsyncResults can only be completed once."); } ManualResetEvent manualResetEvent = null; this.completedSynchronously = completedSynchronously; if (completedSynchronously) { // If we completedSynchronously, then there's no chance that the manualResetEvent was created so // we don't need to worry about a race this.isCompleted = true; if (this.manualResetEvent != null) { throw new InvalidOperationException("No ManualResetEvent should be created for a synchronous AsyncResult."); } } else { lock (ThisLock) { this.isCompleted = true; manualResetEvent = this.manualResetEvent; } } try {
if (callback != null) { callback(this); } if (manualResetEvent != null) { manualResetEvent.Set(); }
} catch (Exception unhandledException) { // The callback raising an exception is equivalent to Main raising an exception w/out a catch. // Queue it onto another thread and throw it there to ensure that there's no other handler in // place and the default unhandled exception behavior occurs.
Page 55
// Because the stack trace gets lost on a rethrow, we're wrapping it in a generic exception // so the stack trace is preserved. unhandledException = new Exception("AsyncCallbackException", unhandledException); ThreadPool.UnsafeQueueUserWorkItem(new WaitCallback(RaiseUnhandledException), unhandledException); } } protected void Complete(bool completedSynchronously, Exception exception) { this.exception = exception; Complete(completedSynchronously); } protected static void End(AsyncResult asyncResult) { if (asyncResult == null) { throw new ArgumentNullException("asyncResult"); } if (asyncResult.endCalled) { throw new InvalidOperationException("Async object already ended."); } asyncResult.endCalled = true; if (!asyncResult.isCompleted) { using (WaitHandle waitHandle = asyncResult.AsyncWaitHandle) { waitHandle.WaitOne(); } } if (asyncResult.exception != null) { throw asyncResult.exception; } }
}
void RaiseUnhandledException(object o) { Exception exception = (Exception)o; throw exception; }
}
Page 56
49.Examine
the ProcessSend() method of the MessageInterceptor class that you have added to the InterceptorBindingElement module. public void ProcessSend(ref Message message) { string action = message.Headers.Action; Console.WriteLine(string.Format("Voting on message {0} ...", action)); if (!(action.Contains("ITradingService"))) { return; } int randomNumber = this.randomizer.Next(10); if ((this.dropRate != 0) && (randomNumber <= this.dropRate)) { Console.WriteLine("Dropping message."); message = null; return; } Console.WriteLine("Forwarding message."); }
This method receives messages and randomly annihilates about thirty percent of them. You will now have the Windows Communication Foundation send all messages outbound from the client to the Derivatives Trading Service through this method directly after they have been processed by the reliable messaging system within the client. By doing so, you will simulate the effect of messages getting lost in transmission between the client and the server, and you will see the Windows Communication Foundation’s reliable session facilities compensate for the loss. 50.Add a reference to the System.Configuration assembly to the Client project of the Trading Service solution. 51. Add a new class module called ReliableBinding.cs to the Client project, and alter the code therein in this way to define a custom binding that incorporates your message interceptor: using using using using using
System; System.Collections.Generic; System.ServiceModel; System.ServiceModel.Channels; System.Text;
namespace Client { public class ReliableBinding: { public ReliableBinding() { this.Elements.Add(new this.Elements.Add(new this.Elements.Add(new }
CustomBinding, IBindingDeliveryCapabilities
ReliableSessionBindingElement(true)); InterceptorBindingElement()); TcpTransportBindingElement());
#region IBindingCapabilities Members bool IBindingDeliveryCapabilities.AssuresOrderedDelivery { Page 57
get { }
return true;
} bool IBindingDeliveryCapabilities.QueuedDelivery { get { return false; } } }
#endregion
} 52.Save
all of the changes, and then copy the ReliableBinding.cs class in the Client project, and paste into the TradingServiceHost project, so that there is a copy of that class module in both of those two projects. 53.Modify the copy in the TradingServiceHost project to use the Fabrikam namespace and to omit the message interceptor by commenting out the line of code which adds it to the binding elements. It will suffice for messages to be lost on the way from the client: namespace Fabrikam { public ReliableBinding() { this.Elements.Add(new ReliableSessionBindingElement(true)); //this.Elements.Add(new InterceptorBindingElement()); this.Elements.Add(new TcpTransportBindingElement()); } 54.Modify
the code in the Program.cs module of the Client project to use the new custom binding when transmitting to the Derivatives Trading Service: using using using using using
System; System.Collections.Generic; System.ServiceModel; System.ServiceModel.Channels; System.Text;
namespace Client { public class Program { static void Main(string[] args) { Console.WriteLine("Press any key when the service is ready."); Console.ReadKey(); do {
Page 58
ITradingService dealProxy = new ChannelFactory(new ReliableBinding(),new EndpointAddress("net.tcp://localhost:8001/TradingServiceExperiment")).CreateCha nnel(); dealProxy.BeginDeal(); Trade trade = new Trade(); trade.Count = 10; trade.Symbol = "MSFT"; trade.Date = DateTime.Now.AddMonths(2); dealProxy.AddTrade(trade); dealProxy.AddFunction("InterestRateEstimation"); dealProxy.AddFunction("TechnologyStockEstimation"); decimal dealValue = dealProxy.Calculate(); Console.WriteLine(string.Format("Deal value estimated at ${0}",dealValue)); dealProxy.Purchase(); dealProxy.EndDeal(); ((IChannel)dealProxy).Close(); Console.WriteLine("Press q to quit or any other key to
continue.");
} while(Console.ReadKey().KeyChar != 'q'); Console.WriteLine(); Console.WriteLine("Done."); Console.ReadKey(); }
} } 55.Change
the code in the Program.cs module of the TradingServiceHost project to add an endpoint to the Derivatives Trading Service that uses the new custom binding as well: using using using using
System; System.Collections.Generic; System.ServiceModel; System.Text;
namespace Fabrikam { public class Program { public static void Main(string[] args) { Uri[] baseAddresses = new Uri[]{new Uri("net.tcp://localhost:8001/")}; using(ServiceHost host = new ServiceHost(typeof(TradingSystem),baseAddresses))
Page 59
{
host.AddServiceEndpoint(typeof(ITradingService),new ReliableBinding(),"TradingServiceExperiment"); host.Open(); Console.WriteLine("The trading service is available."); Console.ReadKey(); }
host.Close();
} }
}
Task 5 – See the Windows Communication Foundation’s Reliable Message Facility Compensate for Lost Messages 56.Choose
Debug and then Start Debugging from the Visual Studio menus. you see a message in the console application window of the TradingServiceHost executable confirming that the derivatives trading service is available, enter a keystroke into the console application window of the Client executable. The output in the console application window of the TradingServiceHost should be similar to the output shown in figure 2.1, above, almost unchanged from how it has been hitherto. However, the output in the console application window of the Client executable should be similar to the output in figure 3.1, below, where one can see that a message has been destroyed at random and automatically retried.
57. When
Figure 3.1
Automatic Compensation for Lost Messages
Page 60
58.Choose
Debug and Stop Debugging from the Visual Studio menus. will not need the service endpoint that you configured to use the custom binding again, so remove it from the code in the Program.cs module of the TradingServiceHost project. You will be adding an endpoint in the next task. Note that your application will not run again until you have completed Exercise 4.
59.You
using using using using
System; System.Collections.Generic; System.ServiceModel; System.Text;
namespace Fabrikam { public class Program { public static void Main(string[] args) { Uri[] baseAddresses = new Uri[]{new Uri("net.tcp://localhost:8001/")}; using(ServiceHost host = new ServiceHost(typeof(TradingSystem),baseAddresses)) { //host.AddServiceEndpoint(typeof(ITradingService),new ReliableBinding(),"net.tcp://localhost:8001/TradingService"); host.Open(); Console.WriteLine("The trading service is available."); Console.ReadKey(); host.Close(); } }
}
}
Exercise 4 – Build a queued messaging service for recording derivatives trades In this exercise, you will build a Windows Communication Foundation service that receives messages via an MSMQ message queue. The service will be used for recording trades of derivatives. Task 1 – Install MSMQ if Necessary 60.Choose Add
or Remove Programs from the Windows Control panel. Add/Remove Windows Components. 62.Select Application Server in the Windows Components Wizard window as shown in figure 4.1, and click on the Details button. 61. Click
Page 61
Figure 4.1
The Windows Components Wizard
63.If
Message Queuing is not checked in the Application Server window, as shown in figure 4.2, below, then check it, and click on the button labeled, OK, and then on the button labeled, Next, in the Windows Components Wizard, and follow the instructions on the subsequent screens to install MSMQ. Otherwise, click on the button labeled, Cancel, in the Application Server window and the Windows Components Wizard. Close the Add or Remove Programs window.
Page 62
Figure 4.2 Wizard
The Application Server Window of the Windows Components
Task 2 – Add the Trade Recording Service to the Solution 64.Add
a C# class library project called, TradeRecordingService to the TradingService solution. references to the System.ServiceModel and System.Runtime.Serialization assemblies to the project. 66.In the TradeRecordingService project, rename the Class1.cs class module, ITradeRecorder.cs, and modify its code to look like this: 65.Add
using using using using
System; System.Collections.Generic; System.ServiceModel; System.Text;
namespace Fabrikam { [ServiceContract] [DeliveryRequirements(QueuedDeliveryRequirements = QueuedDeliveryRequirementsMode.Required)] public interface ITradeRecorder { [OperationContract(IsOneWay=true)] void RecordTrades(Trade[] trades); } }
Page 63
67. Copy
the class module, Trade.cs, from the TradingService project and paste into to the TradeRecordingService project so that both projects have a copy of the module. 68.Add a class module called TradeRecorder.cs to the TradeRecordingService project, and modify its code to look like this: using using using using
System; System.Collections.Generic; System.ServiceModel; System.Text;
namespace Fabrikam { [ServiceBehavior(InstanceContextMode=InstanceContextMode.Single)] public class TradeRecorder: ITradeRecorder { public TradeRecorder() { } #region ITradeRecorder Members void ITradeRecorder.RecordTrades(Trade[] trades) { Console.WriteLine("Recording trade ..."); foreach(Trade trade in trades) { Console.WriteLine(string.Format("Recorded trade for {0}",trade)); } } #endregion }
}
69.Compile
the TradeRecordingService project to ensure that there are no syntax errors.
Task 3 – Provide a Host for the Trade Recording Service 70.Add
a C# console application project called, TradeRecordingServiceHost, to the TradingService solution. 71. Add references to the System.Messaging and System.ServiceModel assemblies to the project. 72.Add a reference to the TradeRecordingService project to the TradeRecordingServiceHost project. 73.In the TradeRecordingServiceHost project, modify the code of the Program.cs module to look like this: using using using using
System; System.Collections.Generic; System.Messaging; System.ServiceModel; Page 64
using System.Text; namespace Fabrikam { public class Program { private const string queueName = @".\private$\traderecording"; public static void Main(string[] args) { if(!(MessageQueue.Exists(queueName))) { MessageQueue.Create(queueName,true); } Uri[] baseAddresses = new Uri[]{new Uri("net.msmq://localhost/private/")}; TradeRecorder tradeRecorder = new TradeRecorder(); using(ServiceHost host = new ServiceHost(tradeRecorder,baseAddresses)) { host.Open(); Console.WriteLine("The trade recording service is available."); Console.ReadKey(); }
host.Close();
} }
}
Task 4 – Configure the Trade Recording Service 74. Add
an application configuration file called, app.config to the TradingRecordingServiceHost project, and alter its contents to look like this:
<system.serviceModel> <services> <service name="Fabrikam.TradeRecorder"> <endpoint address="TradeRecording" binding="netMsmqBinding" bindingConfiguration="QueuedBinding" contract="Fabrikam.ITradeRecorder"/> <security mode="None"/> Page 65
Task 5 – Deploy the Trade Recording Service 75.Build
the TradingService solution. a new instance of the TradeRecordingServiceHost project. 77. When you see a message in the console application window of the TradeRecordingServiceHost executable confirming that the trade recording service is available, enter a keystroke into the console application window to terminate it. 76.Start
Task 6 – Use the Trade Recording Service 78.Copy
the module, ITradeRecorder.cs, from the TradingRecordingService project and paste into to the TradingService project so that both projects have a copy of the module. 79.Modify code of the Purchase() method of the TradingSystem class in the TradingService project so that the Trading Service uses the Trade Recording Service to record derivatives trades: void ITradingService.Purchase() { DealData dealData = OperationContext.Current.InstanceContext.Extensions.Find(); ITradeRecorder proxy = new ChannelFactory("TradeRecordingService").CreateChannel(); proxy.RecordTrades(dealData.Trades); foreach(Trade trade in dealData.Trades) { Console.WriteLine(string.Format("Purchased {0}",trade)); } } 80.Modify
the app.config file of the TradingServiceHost project to specify how the Trading Service is to communicate with the Trade Recording Service: <system.serviceModel> <services> <service name="Fabrikam.TradingSystem "> <endpoint address="net.tcp://localhost:8001/TradingService" binding="netTcpBinding" bindingConfiguration="ReliableBinding" contract="Fabrikam.ITradingService "/> <endpoint name="TradeRecordingService" address="net.msmq://localhost/private/TradeRecording" Page 66
binding="netMsmqBinding" bindingConfiguration="QueuedBinding" contract="Fabrikam.ITradeRecorder"/>
<security mode="None"/> 81. Modify
the code in the Program.cs module of the Client project of the TradingService solution so as to circumvent the mechanism for discarding messages that you used in the last exercise: do {
ITradingService dealProxy = new ChannelFactory("TradingServiceConfiguration").CreateChannel(); dealProxy.BeginDeal(); Trade trade = new Trade(); trade.Count = 10; trade.Symbol = "MSFT"; trade.Date = DateTime.Now.AddMonths(2); dealProxy.AddTrade(trade); dealProxy.AddFunction("InterestRateEstimation"); dealProxy.AddFunction("TechnologyStockEstimation"); decimal dealValue = dealProxy.Calculate(); Console.WriteLine(string.Format("Deal value estimated at ${0}",dealValue)); dealProxy.Purchase(); dealProxy.EndDeal(); ((IChannel)dealProxy).Close(); Console.WriteLine("Press q to quit or any other key to continue."); } while(Console.ReadKey().KeyChar != 'q'); 82.Set
the Startup Project properties of the TradingService solution as shown in figure 4.3, below.
Page 67
Figure 4.3 Solution
The Startup Project Properties of the TradingService
83.Choose
Debug and then Start Debugging from the Visual Studio menus. you see a message in the console application window of the TradeRecordingServiceHost confirming that the Trade Recording Service is available, and a message in the console application window of the TradingServiceHost executable confirming that the derivatives trading service is available, enter a keystroke into the console application window of the Client executable. The output in the console application window of the TradingServiceHost should be similar to the output shown in figure 2.1, above. The output in the console application window of the Client should be similar to the output in figure 1.7, above. The output in the console application window of the TradeRecordingServiceHost should be similar to the output shown in figure 4.4, below.
84.When
Page 68
Figure 4.4 85.Choose 86.Choose
Output from the Trade Recording Service
Debug and Stop Debugging from the Visual Studio menus. Debug and then Start Debugging from the Visual Studio menus.
Task 7 – Witness the Superior Availability of the Trade Recording Service 87. Choose
Debug and then Start Debugging from the Visual Studio menus. the console application window of the TradeRecordingServiceHost. 89.When you see a message in the console application window of the TradingServiceHost executable confirming that the derivatives trading service is available, enter a keystroke into the console application window of the Client executable. The output in the console application window of the TradingServiceHost should be similar to the output shown in figure 2.1, above. The output in the console application window of the Client should be similar to the output in figure 1.7, above. 90.Choose Debug and Stop Debugging from the Visual Studio menus. 91. Start a new instance of the TradeRecordingServiceHost. 92.The output in the console application window of the TradeRecordingServiceHost should be similar to the output shown in figure 4.4, above, thereby proving that the Trade Recording Service can process messages that were sent to it when it was not available. 93.Choose Debug and Stop Debugging from the Visual Studio menus. 88.Close
Page 69
Exercise 5 – Wrap a set of messages in a transaction In this exercise, you will witness the Windows Communication Foundation’s ability to compose multiple distributed operations into a single unit of work, or transaction. We will assume that a client using the Derivatives Trading Service that we have been constructing will want to estimate the price of two deals: a primary deal, and another one that is intended as a hedge against the possibility of losses in the first. The client will either commit to both deals together, or to neither one. The client will commit to both deals together if the difference in the estimated value of the two deals differs within a certain range. Task 1 – Program the Client 94.Begin
by adding a reference to the System.Transactions assembly to the Client project of the TradingService solution 95.Now modify the client to conform to the procedure described for hedging deals, altering the code in the Program.cs class of the Client project in this way: using using using using using using
System; System.Collections.Generic; System.ServiceModel; System.ServiceModel.Channels; System.Text; System.Transactions;
namespace Client { public class Program { static void Main(string[] args) { Console.WriteLine("Press any key when the service is ready."); Console.ReadKey(); do {
Console.WriteLine();
using(TransactionScope scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { decimal? dealValue = null; decimal? hedgeValue = null; ITradingService dealProxy = null; ITradingService hedgeProxy = null; try { dealProxy = new ChannelFactory("TradingServiceConfiguration").CreateChannel(); dealProxy.BeginDeal(); Trade trade = new Trade(); trade.Count = 10; Page 70
trade.Symbol = "MSFT"; trade.Date = DateTime.Now.AddMonths(2); dealProxy.AddTrade(trade); dealProxy.AddFunction("InterestRateEstimation"); dealProxy.AddFunction("TechnologyStockEstimation"); dealValue = dealProxy.Calculate(); dealProxy.Purchase(); dealProxy.EndDeal();
hedgeProxy = new ChannelFactory("TradingServiceConfiguration").CreateChannel(); hedgeProxy.BeginDeal(); trade = new Trade(); trade.Count = 10; trade.Symbol = "MSFT"; trade.Date = DateTime.Now.AddMonths(2); hedgeProxy.AddTrade(trade); hedgeProxy.AddFunction("InterestRateEstimation"); hedgeProxy.AddFunction("TechnologyStockEstimation"); hedgeValue = hedgeProxy.Calculate(); hedgeProxy.Purchase(); hedgeProxy.EndDeal(); if((dealValue != null)&&(hedgeValue != null)) { Console.WriteLine("Deal value is ${0}, and hedge value is ${1}",dealValue.Value,hedgeValue.Value); if((dealValue.Value - hedgeValue.Value) > 20m) { Console.WriteLine("Voting to complete trade!"); scope.Complete(); } else { Console.WriteLine(@"Voting NOT to complete trade! Transaction will not complete. Keep trying until a transaction commits."); }
}
} catch(Exception exception) { Console.WriteLine(exception.Message); Page 71
} finally { ((IChannel)dealProxy).Close(); ((IChannel)hedgeProxy).Close(); }
}
Console.WriteLine("Press q to quit or any other key to continue.");
} while(Console.ReadKey().KeyChar != 'q');
} }
Console.WriteLine(); Console.WriteLine("Done."); Console.ReadKey();
}
Task 2 – Enhance the Derivatives Trading Service 96.Now
enhance the Derivatives Trading Service so that it will only record any single trade if the client commits to all of the trades that it is considering together. To do so, add an attribute to the Purchase() method of the TradingSystem class in the TradingSystem.cs module of the TradingService project.
[OperationBehavior(TransactionScopeRequired=true,TransactionAutoComplete=true)] void ITradingService.Purchase() { DealData dealData = OperationContext.Current.InstanceContext.Extensions.Find(); ITradeRecorder proxy = new ChannelFactory("TradeRecordingService").CreateChannel(); proxy.RecordTrades(dealData.Trades); Console.WriteLine("Purchased!"); } 97. Add
an attribute indicating that the Purchase() operation can be composed into a transaction in the declaration of the operation in the ITradingService.cs module of the TradingService project:
public interface ITradingService { [OperationContract(IsInitiating=true,IsTerminating=false)] string BeginDeal(); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=true)] void AddTrade(Trade trade); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=true)] Page 72
void AddFunction(string function); [OperationContract(IsInitiating=false,IsTerminating=false)] decimal Calculate(); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=false)] [TransactionFlow(TransactionFlowOption.Allowed)] void Purchase(); [OperationContract(IsInitiating=false,IsTerminating=true,IsOneWay=true)] void EndDeal(); } 98.Modify
the app.config file of the TradingServiceHost project to specify that information about any transaction in progress is to be passed to the Derivatives Trading Service:
Task 3 – Modify the Client to Conform to the Service 99.In
the Client project of the TradingService solution, alter the definition of the ITradingService interface to conform to the new definition in the TradingService project:
public interface ITradingService { [OperationContract(IsInitiating=true,IsTerminating=false)] string BeginDeal(); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=true)] void AddTrade(Trade trade); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=true)] void AddFunction(string function); [OperationContract(IsInitiating=false,IsTerminating=false)] decimal Calculate(); [OperationContract(IsInitiating=false,IsTerminating=false,IsOneWay=false)] [TransactionFlow(TransactionFlowOption.Allowed)] void Purchase(); [OperationContract(IsInitiating=false,IsTerminating=true,IsOneWay=true)] void EndDeal(); } 100.Change
the configuration of the binding in the app.config file of the Client to match the definition of the binding in the configuration of the service:
Page 73
Task 4 – See the Composition of Multiple Distributed Operations into a Single Unit of Work 101.In
the TradingService solution in Visual Studio, choose Debug and then Start Debugging from the menus. 102.When you see a message in the console application window of the TradeRecordingServiceHost confirming that the Trade Recording Service is available, and a message in the TradingServiceHost executable confirming that the derivatives trading service is available, enter a keystroke into the console application window of the Client executable. The output in the console application windows of the Client executable, the TradingServiceHost executable, and the Trade RecordingHost executable should be similar to the output in figures 5.1, 5.2, and 5.3, below. There we see that the client has sent four purchase methods to the service, but only committed to the first pair, and, consequently, only two trades show up in the output of the Trade Recording Service.
Figure 5.1
Output from the Trading Service Client
Page 74
Figure 5.2
Output from the Trading Service
Figure 5.3
Output from the Trade Recording Service Page 75
103.Choose
Debug and Stop Debugging from the Visual Studio menus.
Exercise 6 – Add performance counters to a service In this exercise, you will add custom performance counters to enhance the built-in instrumentation of the Trade Recording Service. Task 1 – Define a Custom Trading Volume Performance Counter 104.To
the TradeRecorder.cs module in the TradeRecordingService project, add code to create a custom performance counter category called, TradeRecording, and code to add a custom performance counter called, Trade Volume, to that category: using using using using using
System; System.Collections.Generic; System.Diagnostics; System.ServiceModel; System.Text;
namespace Fabrikam { [ServiceBehavior(InstanceContextMode=InstanceContextMode.Single)] public class TradeRecorder: ITradeRecorder { private const string CounterCategoryName = "TradeRecording"; private const string VolumeCounterName = "Trade Volume"; private PerformanceCounterCategory counterCategory = null; public TradeRecorder() { if (PerformanceCounterCategory.Exists(TradeRecorder.CounterCategoryName)) { PerformanceCounterCategory.Delete(TradeRecorder.CounterCategoryName); } CounterCreationData volumeCounter = new CounterCreationData(TradeRecorder.VolumeCounterName, "Volume of trading.",PerformanceCounterType.NumberOfItemsHEX32); CounterCreationDataCollection counterCollection = new CounterCreationDataCollection(new CounterCreationData[] { volumeCounter }); this.counterCategory = PerformanceCounterCategory.Create(TradeRecorder.CounterCategoryName,"Trade Recording Data",PerformanceCounterCategoryType.MultiInstance,counterCollection); } Page 76
#region ITradeRecorder Members void ITradeRecorder.RecordTrades(Trade[] trades) { Console.WriteLine("Recording trade ..."); foreach(Trade trade in trades) { Console.WriteLine(string.Format("Recorded trade for {0}",trade)); } } }
#endregion
}
Task 2 – Update the Value of the Custom Trading Volume Performance Counter 105.For
each instance of the Trade Recording Service to be able to update the value of the newlydefined trading volume performance counter for trades communicated across its own receiving queue, it must access its own instance of the performance counter. That can be accomplished using the facilities of Windows Management Instrumentation, so add a reference to the System.Management assembly to the TradeRecordingService project in the TradingService solution. 106.Include the System.Management, System.Management.Instrumentation, System.ServiceModel.Description, and System.Runtime.InteropServices namespaces in the TradeRecorder.cs module of the TradeRecordingService project: using using using using using using using using using
System; System.Collections.Generic; System.Diagnostics; System.Management; System.Management.Instrumentation; System.Runtime.InteropServices; System.ServiceModel; System.ServiceModel.Description; System.Text;
107.To
the TradeRecorder class in the TradeRecorder.cs module, add the InitializeCounters() method for retrieving the instance of the trading volume performance counter associated with the current instance of the Trade Recording Service:
private PerformanceCounter volumeCounter = null; public void InitializeCounters(ServiceEndpointCollection endpoints) { List<string> names = new List<string>(); foreach (ServiceEndpoint endpoint in endpoints) { names.Add(string.Format("{0}@{1}", this.GetType().Name, endpoint.Address.ToString())); Page 77
} while (true) { try { foreach (string name in names) { string condition = string.Format("SELECT * FROM Service WHERE Name=\"{0}\"", name); SelectQuery query = new SelectQuery(condition); ManagementScope managementScope = new ManagementScope(@"\\.\root\ServiceModel", new ConnectionOptions()); ManagementObjectSearcher searcher = new ManagementObjectSearcher(managementScope, query); ManagementObjectCollection instances = searcher.Get(); foreach (ManagementBaseObject instance in instances) { PropertyData data = instance.Properties["CounterInstanceName"]; this.volumeCounter = new PerformanceCounter(TradeRecorder.CounterCategoryName, TradeRecorder.VolumeCounterName, data.Value.ToString()); this.volumeCounter.ReadOnly = false; this.volumeCounter.RawValue = 0; break; }
} break;
} catch(COMException) { } } if(this.volumeCounter != null) { Console.WriteLine("Volume counter initialized."); } Console.WriteLine("Counters initialized."); }
108.Include
the System.Threading namespaces in the TradeRecorder.cs module of the TradeRecordingService project: using using using using
System; System.Collections.Generic; System.Diagnostics; System.Management; Page 78
using using using using using
System.Management.Instrumentation; System.Runtime.InteropServices; System.ServiceModel; System.Text; System.Threading;
109.Modify
the RecordTrades() method of the TradeRecorder class in the TradeRecordingService.cs module to update the value of the trading volume performance counter: private long tradeCount = 0; void ITradeRecorder.RecordTrades(Trade[] trades) { Console.WriteLine("Recording trade ..."); lock (this) { while (this.volumeCounter == null) { Thread.Sleep(100); } }
foreach(Trade trade in trades) { this.tradeCount+=((trade.Count != null)?trade.Count.Value:0); this.volumeCounter.RawValue = this.tradeCount; Console.WriteLine(string.Format("Recorded trade for {0}",trade)); }
}
110.Modify
the app.config file of the TradeRecordingServiceHost project to expose data via performance counters and via Windows Management Instrumentation :
<system.serviceModel> <services> <service name="Fabrikam.TradeRecorder"> <endpoint address="net.msmq://localhost/private/TradeRecording" binding="netMsmqBinding" bindingConfiguration="QueuedBinding" contract="Fabrikam.ITradeRecorder"/> <security mode="None"/> Page 79
111.Alter
the code in the Program.cs module of the TradeRecordingServiceHost project to call the InitializeCounter() method that you added to the TradeRecordingService: using using using using
System; System.Collections.Generic; System.ServiceModel; System.Text;
namespace Fabrikam { class Program { string queueName = “.\\private$\\TradeRecording”; static void Main(string[] args) { if(!(MessageQueue.Exists(queueName))) { MessageQueue.Create(queueName,true); } TradeRecorder tradeRecorder = new TradeRecorder(); using(ServiceHost host = new ServiceHost(tradeRecorder)) { host.Open(); tradeRecorder.InitializeCounters(host.Description.Endpoints); Console.WriteLine("The trade recording service is available."); Console.ReadKey(); host.Close(); } }
}
}
Task 3 – Monitor the Custom Trading Volume Performance Counter 112.Modify
the code in the Program.cs file of the Client project so as to generate a large number of
trades: using System; using System.Collections.Generic; using System.ServiceModel; Page 80
using System.Text; using System.Transactions; namespace Client { public class Program { static void Main(string[] args) { Console.WriteLine("Press any key when the service is ready."); Console.ReadKey(); //do for(int i = 0;i < 500;i++) { Console.WriteLine(); using(TransactionScope scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { decimal? dealValue = null; decimal? hedgeValue = null; ITradingService dealProxy = null; ITradingService hedgeProxy = null; try { dealProxy = new ChannelFactory("TradingServiceConfiguration").CreateChannel(); dealProxy.BeginDeal(); Trade trade = new Trade(); trade.Count = 10; trade.Symbol = "MSFT"; trade.Date = DateTime.Now.AddMonths(2); dealProxy.AddTrade(trade); dealProxy.AddFunction("InterestRateEstimation"); dealProxy.AddFunction("TechnologyStockEstimation"); dealValue = dealProxy.Calculate(); dealProxy.Purchase(); dealProxy.EndDeal();
hedgeProxy = new ChannelFactory("TradingServiceConfiguration").CreateChannel(); hedgeProxy.BeginDeal(); trade = new Trade(); trade.Count = 10; Page 81
trade.Symbol = "MSFT"; trade.Date = DateTime.Now.AddMonths(2); hedgeProxy.AddTrade(trade); hedgeProxy.AddFunction("InterestRateEstimation"); hedgeProxy.AddFunction("TechnologyStockEstimation"); hedgeValue = hedgeProxy.Calculate(); hedgeProxy.Purchase(); hedgeProxy.EndDeal(); if((dealValue != null)&&(hedgeValue != null)) { Console.WriteLine("Deal value is ${0}, and hedge value is ${1}",dealValue.Value,hedgeValue.Value); if((dealValue.Value - hedgeValue.Value) > 20m) { Console.WriteLine("Voting to complete trade!"); scope.Complete(); } } } catch(Exception exception) { Console.WriteLine(exception.Message); } finally { ((IChannel)dealProxy).Close(); ((IChannel)hedgeProxy).Close(); } } //Console.WriteLine("Press q to quit or any other key to
continue.");
} //while(Console.ReadKey().KeyChar != 'q'); Console.WriteLine(); Console.WriteLine("Done."); Console.ReadKey(); }
}
} 113.Choose
Debug and then Start Debugging from the Visual Studio menus. you see a message in the console application window of the TradeRecordingServiceHost confirming that the Trade Recording Service is available, and a message in the TradingServiceHost executable confirming that the derivatives trading service is available, choose Run from the Windows Start menu, enter,
114.When
PerfMon Page 82
in the Run dialog box and click on the OK button. the Performance window, click the Add button, and then scroll through the list of performance counter categories in the list labeled Performance Object, as shown in figure 6.1, below. Examine the counters in the built-in categories of Windows Communication Foundation performance counters. Those categories are called, ServiceModelService, ServiceModelEndpoint, and ServiceModelOperation, and provide counters for monitoring Windows Communication Foundation applications at the level of the service, the endpoints of a service, and the individual operations of a service endpoint.
115.In
Figure 6.1
Output from the Trade Recording Service
116.Select
the custom TradeRecording category, and click on the Add button, and then close the dialog. 117.In the console application window of the Client executable, enter a keystroke. 118.Observe how, as soon as the client starts committing to trades of derivatives, the graph of the value of the custom Trade Volume counter begins to show activity in the Performance window, as depicted in figure 6.2, below. 119.The value of the Trade Volume counter will quickly exceed the default scale of the graph, so right-click on the entry for the Trade Volume counter in the list in the bottom right-hand corner of the Performance window, as show in figure 6.3, below, and alter the scale to 0.01, as shown in figure 6.4, below.
Page 83
Figure 6.2
Monitoring the Custom Trade Volume Performance Counter
Page 84
Figure 6.3
Accessing the Performance Counter Display Properties
Page 85
Figure 6.3
Adjusting the Scale of the Display of a Performance Counter
Page 86
Lab Summary In this lab you performed the following exercises.
•
Defined a data contract for use in a derivatives trading service
•
Managed state within a service
•
Experimented with Reliable Sessions
•
Built a queued messaging service for recording derivatives trades
•
Wrapped a set of messages in a transaction
•
Added performance counters to a service and monitored its activity
Thus, you have seen the capability of the Windows Communication Foundation to readily support the requirements of demanding distributed programming solutions.
Page 87