A basic inter webrole broadcast communication on Azure using the service bus

by ingvar 19. maj 2011 21:48

In this blog post I'll try to show a bare bone setup that does inter webrole broadcast communication. The code is based on Valery M blog post. The code in his blog post is based on a customer solution and contains a lot more code than needed to get the setup working. But his code also provides a lot more robust broadcast communication with retries and other things that makes the communication reliable. I have omitted all this to make it, as easy to understand and recreate as possible. The idea is that the code I provide, could be used as a basis for setting up your own inter webrole broadcast communication. You can download the code here: InterWebroleBroadcastDemo.zip (17.02 kb)

Windows Azure AppFabric SDK and using Microsoft.ServiceBus.dll

We need a reference to Microsoft.ServiceBus.dll in order to do the inter webrole communication. The Microsoft.ServiceBus.dll assembly is a part of the Windows Azure AppFabric SDK found here.
When you use Microsoft.ServiceBus.dll you need to add it as a reference like any other assembly. You do this by browsing to the directory where the AppFabric SDK was installed. But unlike most other references you add, you need to set the "Copy local" property for the reference to true (default is false).
I have put all my code in a separate assembly and then the main classes are then used in the WebRole.cs file. Even if I have added Microsoft.ServiceBus.dll to my assembly () and setted the "Copy Local" to true, I still have to add it to the WebRole project and also set the "Copy Local" to true here. This is a very important detail!

Creating a new Service Bus Namespace

Here is a short step-guide on how to create a new Service Bus Namespace. If you have already done this, you can skip it and just use the already existing namespace and its values.

  1. Go to the section "Service Bus, Access Control & Caching"
  2. Click the button "New Namespace"
  3. Check "Service Bus"
  4. Enter the desired Namespace (This namespace is the one used for EndpointInformation.ServiceNamespace)
  5. Click "Create Namespace"
  6. Select the newly created namespace
  7. Under properties (To the right) find Default Key and click "View"
  8. Here you will find the Default Issuer (This value should be used for EndpointInformation.IssuerName) and Default Key (This value should be used for  EndpointInformation.IssuerSecret)

The code

Here I will go through all the classes in my sample code. The full project including the WebRole project can be download here: InterWebroleBroadcastDemo.zip (17.02 kb)

BroadcastEvent

We start with the BroadcastEvent class. This class represents the data we send across the wire. This is done with the class attribute DataContract and the member attribute DataMember. In this sample code I only send two simple strings. SenderInstanceId is not required but I use it to display where the message came from.

[DataContract(Namespace = BroadcastNamespaces.DataContract)]
public class BroadcastEvent
{
public BroadcastEvent(string senderInstanceId, string message)
{
this.SenderInstanceId = senderInstanceId;
this.Message = message;
}

[DataMember]
public string SenderInstanceId { get; private set; }

[DataMember]
public string Message { get; private set; }
}

BroadcastNamespaces

This class only contains some constants that are used by some of the other classes.

public static class BroadcastNamespaces
{
public const string DataContract = "http://broadcast.event.test/data";
public const string ServiceContract = "http://broadcast.event.test/service";
}

IBroadcastServiceContract

This interface defines the contract that the web roles uses when communication to each other. Here in this simple example, the contract only has one method, namely the Publish method. This method is, in the implementation of the contract (BroadcastService) used to send BroadcastEvent's to all web roles that have subscribed to this channel. There is another method, Subscribe, that is inherited from the IObservable. This method is used to subscribe to the BroadcastEvents when they are published by some web role. This method is also implemented in the BroadcastService class.

[ServiceContract(Name = "BroadcastServiceContract", 
Namespace = BroadcastNamespaces.ServiceContract)]
public interface IBroadcastServiceContract : IObservable<BroadcastEvent>
{
[OperationContract(IsOneWay = true)]
void Publish(BroadcastEvent e);
}

IBroadcastServiceChannel

This interface defines the channel which the web roles communicates through. This is done by adding the IClientChannel interface.

public interface IBroadcastServiceChannel : IBroadcastServiceContract, IClientChannel
{
}

BroadcastEventSubscriber

The web role subscribes to the channel by creating an instance of this class and registering it. For testing purpose, this implementation only logs when it receives any BroadcastEvent.

public class BroadcastEventSubscriber : IObserver<BroadcastEvent>
{
public void OnNext(BroadcastEvent value)
{
Logger.AddLogEntry(RoleEnvironment.CurrentRoleInstance.Id +
" got message from " + value.SenderInstanceId + " : " +
value.Message);
}

public void OnCompleted()
{
/* Handle on completed */
}

public void OnError(Exception error)
{
/* Handle on error */
}
}

BroadcastService

This class implements the IBroadcastServiceContract interface. It handles the publish scenario by calling the OnNext method on all subscribes in parallel. The reason for doing this parallel, is that the OnNext method is blocking, so there is a good change that there is a okay performance gain by doing this in parallel.
The other method is Subscribe. This method adds the BroadcastEvent observer to the subscribers and returns a object of type UnsubscribeCallbackHandler that, when disposed unsubscribe itself. This is a part of the IObserver/IObservable pattern.

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, 
ConcurrencyMode = ConcurrencyMode.Multiple)]
public class BroadcastService : IBroadcastServiceContract
{
private readonly IList<IObserver<BroadcastEvent>> _subscribers =
new List<IObserver<BroadcastEvent>>();

public void Publish(BroadcastEvent e)
{
ParallelQuery<IObserver<BroadcastEvent>> subscribers =
from sub in _subscribers.AsParallel().AsUnordered()
select sub;

subscribers.ForAll((subscriber) =>
{
try
{
subscriber.OnNext(e);
}
catch (Exception ex)
{
try
{
subscriber.OnError(ex);
}
catch (Exception)
{
/* Ignore exception */
}
}
});
}

public IDisposable Subscribe(IObserver<BroadcastEvent> subscriber)
{
if (!_subscribers.Contains(subscriber))
{
_subscribers.Add(subscriber);
}

return new UnsubscribeCallbackHandler(_subscribers, subscriber);
}


private class UnsubscribeCallbackHandler : IDisposable
{
private readonly IList<IObserver<BroadcastEvent>> _subscribers;
private readonly IObserver<BroadcastEvent> _subscriber;

public UnsubscribeCallbackHandler(IList<IObserver<BroadcastEvent>> subscribers,
IObserver<BroadcastEvent> subscriber)
{
_subscribers = subscribers;
_subscriber = subscriber;
}

public void Dispose()
{
if ((_subscribers != null) && (_subscriber != null) &&
(_subscribers.Contains(_subscriber)))
{
_subscribers.Remove(_subscriber);
}
}
}
}

ServiceBusClient

The main purpose of the ServiceBusClient class is setup and create a ChannelFactory<IBroadcastServiceChannel> and a IBroadcastServiceChannel instance through the factory. The channel is used by the web role to send BroadcastEvent's through the publish method. It is in this class all the Azure service bus magic happens. Setting up the binding and endpoint. A few service bus related constants is used here and they are all kept in the EndpointInformation class. 

public class ServiceBusClient<T> where T : class, IClientChannel, IDisposable
{
private readonly ChannelFactory<T> _channelFactory;
private readonly T _channel;
private bool _disposed = false;

public ServiceBusClient()
{
Uri address = ServiceBusEnvironment.CreateServiceUri("sb",
EndpointInformation.ServiceNamespace, EndpointInformation.ServicePath);

NetEventRelayBinding binding = new NetEventRelayBinding(
EndToEndSecurityMode.None,
RelayEventSubscriberAuthenticationType.None);

TransportClientEndpointBehavior credentialsBehaviour =
new TransportClientEndpointBehavior();
credentialsBehaviour.CredentialType =
TransportClientCredentialType.SharedSecret;
credentialsBehaviour.Credentials.SharedSecret.IssuerName =
EndpointInformation.IssuerName;
credentialsBehaviour.Credentials.SharedSecret.IssuerSecret =
EndpointInformation.IssuerSecret;

ServiceEndpoint endpoint = new ServiceEndpoint(
ContractDescription.GetContract(typeof(T)), binding,
new EndpointAddress(address));
endpoint.Behaviors.Add(credentialsBehaviour);

_channelFactory = new ChannelFactory<T>(endpoint);

_channel = _channelFactory.CreateChannel();
}

public T Client
{
get
{
if (_channel.State == CommunicationState.Opening) return null;

if (_channel.State != CommunicationState.Opened)
{
_channel.Open();
}

return _channel;
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

public void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
try
{
if (_channel.State == CommunicationState.Opened)
{
_channel.Close();
}
else
{
_channel.Abort();
}
}
catch (Exception)
{
/* Ignore exceptions */
}


try
{
if (_channelFactory.State == CommunicationState.Opened)
{
_channelFactory.Close();
}
else
{
_channelFactory.Abort();
}
}
catch (Exception)
{
/* Ignore exceptions */
}

_disposed = true;
}
}
}

~ServiceBusClient()
{
Dispose(false);
}
}

ServiceBusHost

The main purpose of the ServiceBusHost class is to setup, create and open a ServiceHost. The service host is used by the web role to receive BroadcastEvent's through registering a BroadcastEventSubsriber instance. Like the ServiceBusClient it is in this class all the Azure service bus magic happens.

public class ServiceBusHost<T> where T : class
{
private readonly ServiceHost _serviceHost;
private bool _disposed = false;

public ServiceBusHost()
{
Uri address = ServiceBusEnvironment.CreateServiceUri("sb",
EndpointInformation.ServiceNamespace, EndpointInformation.ServicePath);

NetEventRelayBinding binding = new NetEventRelayBinding(
EndToEndSecurityMode.None,
RelayEventSubscriberAuthenticationType.None);

TransportClientEndpointBehavior credentialsBehaviour =
new TransportClientEndpointBehavior();
credentialsBehaviour.CredentialType =
TransportClientCredentialType.SharedSecret;
credentialsBehaviour.Credentials.SharedSecret.IssuerName =
EndpointInformation.IssuerName;
credentialsBehaviour.Credentials.SharedSecret.IssuerSecret =
EndpointInformation.IssuerSecret;

ServiceEndpoint endpoint = new ServiceEndpoint(
ContractDescription.GetContract(typeof(T)), binding,
new EndpointAddress(address));
endpoint.Behaviors.Add(credentialsBehaviour);

_serviceHost = new ServiceHost(Activator.CreateInstance(typeof(T)));

_serviceHost.Description.Endpoints.Add(endpoint);

_serviceHost.Open();
}

public T ServiceInstance
{
get
{
return _serviceHost.SingletonInstance as T;
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

public void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
try
{
if (_serviceHost.State == CommunicationState.Opened)
{
_serviceHost.Close();
}
else
{
_serviceHost.Abort();
}
}
catch
{
/* Ignore exceptions */
}
finally
{
_disposed = true;
}
}
}
}

~ServiceBusHost()
{
Dispose(false);
}
}

EndpointInformation

This class keeps all the service bus related constants. I have put a dummy constant for the ServiceNamespace, IssuerName and IssuerSecret. These you have to find in the Windows Azure Management Portal [URL]. Read below how to create a new Service Bus and obtain these values.

public static class EndpointInformation
{
public const string ServiceNamespace = "CHANGE THIS TO YOUR NAMESPACE";
public const string ServicePath = "BroadcastService";
public const string IssuerName = "CHANGE THIS TO YOUR ISSUER NAME";
public const string IssuerSecret = "CHANGE THIS TO YOUR ISSUER SECRET";
}


BroadcastCommunicator

This class abstracts all the dirty details away and is the main class that the web role uses. It has two methods. Publish for publishing BroadcastEvent instances. And Subscribe for subscribing to the broadcast events by creating an instanse of the BroadcastEventSubscriber and handing it to the Subscribe method.

public class BroadcastCommunicator : IDisposable
{
private ServiceBusClient<IBroadcastServiceChannel> _publisher;
private ServiceBusHost<BroadcastService> _subscriber;
private bool _disposed = false;

public void Publish(BroadcastEvent e)
{
if (this.Publisher.Client != null)
{
this.Publisher.Client.Publish(e);
}
}

public IDisposable Subscribe(IObserver<BroadcastEvent> subscriber)
{
return this.Subscriber.ServiceInstance.Subscribe(subscriber);
}

private ServiceBusClient<IBroadcastServiceChannel> Publisher
{
get
{
if (_publisher == null)
{
_publisher = new ServiceBusClient<IBroadcastServiceChannel>();
}

return _publisher;
}
}

private ServiceBusHost<BroadcastService> Subscriber
{
get
{
if (_subscriber == null)
{
_subscriber = new ServiceBusHost<BroadcastService>();
}

return _subscriber;
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

public void Dispose(bool disposing)
{
if (!_disposed && disposing)
{
try
{
_subscriber.Dispose();
_subscriber = null;
}
catch
{
/* Ignore exceptions */
}

try
{
_publisher.Dispose();
_publisher = null;
}
catch
{
/* Ignore exceptions */
}

_disposed = true;
}
}

~BroadcastCommunicator()
{
Dispose(false);
}
}

WebRole

This is a pretty strait forward web role. In the OnStart method a instance of the BroadcastCommunicator is created and an instance of BroadcastEventSubscriber is used to subscribe to the channel.
The Run method is a endless loop with a random sleep in every loop, for testing purpose. In every loop it sends a "Hello World" message including its own role instance id.
The OnStep method cleans up by disposing disposable objects.

public class WebRole : RoleEntryPoint
{
private volatile BroadcastCommunicator _broadcastCommunicator;
private volatile BroadcastEventSubscriber _broadcastEventSubscriber;
private volatile IDisposable _broadcastSubscription;
private volatile bool _keepLooping = true;


public override bool OnStart()
{
_broadcastCommunicator = new BroadcastCommunicator();
_broadcastEventSubscriber = new BroadcastEventSubscriber();

_broadcastSubscription =
_broadcastCommunicator.Subscribe(_broadcastEventSubscriber);

return base.OnStart();
}



public override void Run()
{
/* Just keep sending messasges */
while (_keepLooping)
{
int secs = ((new Random()).Next(30) + 60);

Thread.Sleep(secs * 1000);
try
{
BroadcastEvent broadcastEvent =
new BroadcastEvent(RoleEnvironment.CurrentRoleInstance.Id,
"Hello world!");

_broadcastCommunicator.Publish(broadcastEvent);
}
catch (Exception ex)
{
Logger.AddLogEntry(ex);
}
}
}

public override void OnStop()
{
_keepLooping = false;

if (_broadcastCommunicator != null)
{
_broadcastCommunicator.Dispose();
}

if (_broadcastSubscription != null)
{
_broadcastSubscription.Dispose();
}

base.OnStop();
}
}


Logger

The logger class is used some places in the code. If a logger action has been set, logging will be done. Read more about how I did logging below.

public static class Logger
{
private static Action<string> AddLogEntryAction { get; set; }

public static void Initialize(Action<string> addLogEntry)
{
AddLogEntryAction = addLogEntry;
}

public static void AddLogEntry(string entry)
{
if (AddLogEntryAction != null)
{
AddLogEntryAction(entry);
}
}

public static void AddLogEntry(Exception ex)
{
while (ex != null)
{
AddLogEntry(ex.ToString());

ex = ex.InnerException;
}
}
}

Simple but effective logging

When I developed this demo I used a web service on another server for logging. This web service just have one method taking one string argument, the line to log. Then I have a page for displaying and clearing the log. This is a very simple way of doing logging, but it gets the job done.

The output

Below is the output from a run of the demo project with 4 web role instances. Here the first two lines are most interesting. Here you can see that only web role instance WebRole_IN_0 and WebRole_IN_2 are ready to receive (and send) events. The reason for this is the late creation (create when needed) of the ServiceBusClient and ServiceBusHost in the BroadcastCommunicator class and the sleep period in the WebRole. This illustrates that web roles can join the broadcast channel at any time and start sending and receiving events.
 
20:30:40.4976 : WebRole_IN_2 got message from WebRole_IN_0 : Hello world!
20:30:40.7576 : WebRole_IN_0 got message from WebRole_IN_0 : Hello world!
20:30:43.0912 : WebRole_IN_2 got message from WebRole_IN_3 : Hello world!
20:30:43.0912 : WebRole_IN_1 got message from WebRole_IN_3 : Hello world!
20:30:43.0912 : WebRole_IN_0 got message from WebRole_IN_3 : Hello world!
20:30:43.1068 : WebRole_IN_3 got message from WebRole_IN_3 : Hello world!
20:30:45.4505 : WebRole_IN_0 got message from WebRole_IN_2 : Hello world!
20:30:45.4505 : WebRole_IN_3 got message from WebRole_IN_2 : Hello world!
20:30:45.4505 : WebRole_IN_1 got message from WebRole_IN_2 : Hello world!
20:30:45.4662 : WebRole_IN_2 got message from WebRole_IN_2 : Hello world!
20:30:59.4816 : WebRole_IN_0 got message from WebRole_IN_1 : Hello world!
20:30:59.4816 : WebRole_IN_3 got message from WebRole_IN_1 : Hello world!
20:30:59.4972 : WebRole_IN_2 got message from WebRole_IN_1 : Hello world!
20:30:59.4972 : WebRole_IN_1 got message from WebRole_IN_1 : Hello world!
20:31:59.1371 : WebRole_IN_2 got message from WebRole_IN_3 : Hello world!
20:31:59.2621 : WebRole_IN_1 got message from WebRole_IN_3 : Hello world!
20:31:59.3871 : WebRole_IN_0 got message from WebRole_IN_3 : Hello world!
20:31:59.5746 : WebRole_IN_3 got message from WebRole_IN_3 : Hello world!
20:32:03.1683 : WebRole_IN_2 got message from WebRole_IN_0 : Hello world!
20:32:03.1683 : WebRole_IN_0 got message from WebRole_IN_0 : Hello world!
20:32:03.1683 : WebRole_IN_1 got message from WebRole_IN_0 : Hello world!
20:32:03.1839 : WebRole_IN_3 got message from WebRole_IN_0 : Hello world!

Tags:

.NET | Azure | C#

Comments (2) -

Oskar Austegard
Oskar Austegard United States
27-06-2012 23:46:24 #

Isn't this an example of INTRA (inside) not inter (between) - role communication ?  Unless I'm reading the example wrong, all communication takes place between instances within the same webrole.

ingvar
ingvar Denmark
28-06-2012 05:19:17 #

It is true that when looking at the naming use by Microsoft it goes like this:
Hosted Service -> Deployment -> Role -> Instance

So you are correct that I have misused the role term and should have used the instance term.

But, none the less, the code shows how to do inter communication between role instances, which is properly what other people are looking for Smile

About the author

Martin Ingvar Kofoed Jensen

Architect and Senior Developer at Composite on the open source project Composite C1 - C#/4.0, LINQ, Azure, Parallel and much more!

Follow me on Twitter

Read more about me here.

Read press and buzz about my work and me here.

Stack Overflow

Month List