Real time event processing is an interesting topic. Unlike batch mode where data is written to database and then retrieved on a web page with certain refresh interval or on demand, we deal with a stream of data. We don't know how big the stream is and when it ends, it just passing through at a give rate. In some cases having a stream is necessity because there is to much data to fit into a storage, in others we may need to react quickly on the events and don't want to place locks. One simple scenario is a dashboard for monitoring logging events from all systems, something like this:
As time goes on, the lines slide left and eventually off the chart while current time beat is at the right. Just looking at such dashboard may give an instant clue about overall system health because spikes or dips are usually early signs of troubles for which we better react before our customers start calling us.

These are the key steps to build such a dashboard in SignalR, Angular with ASP.NET MVC backend. TLDR; here's the code.

Client side

@section JavascriptInHead

    .huge {
     font-size: 40px;

<div ng-app="dashboardModule" ng-controller="DashboardController">  
        <div class="huge">{{perMinute}}</div>
        <div>Events per minute</div>


    <wj-flex-chart items-source="trafficData" chart-type="Area" stacking="Stacked" binding-x="time">
        <wj-flex-chart-axis wj-property="axisX" format="mm:ss"></wj-flex-chart-axis>
        <wj-flex-chart-axis wj-property="axisY" min="0" title="Count"></wj-flex-chart-axis>
        <wj-flex-chart-series name="Fatal" binding="fatalCount"></wj-flex-chart-series>
        <wj-flex-chart-series name="Error" binding="errorCount"></wj-flex-chart-series>
        <wj-flex-chart-series name="Warrning" binding="warrningCount"></wj-flex-chart-series>
        <wj-flex-chart-series name="Info" binding="infoCount"></wj-flex-chart-series>
        <wj-flex-chart-series name="Debug" binding="debugCount"></wj-flex-chart-series>

The chart is rendered by simple binding of Wijmo FlexChart control. What's nice about that Wijmo control, is there is ObservableArray which we populate keeping its size fixed:

"use strict";
dashboardModule.controller("DashboardController", function($scope, $rootScope) {

    $scope.trafficData = new wijmo.collections.ObservableArray();
    $scope.perMinute = 0;
    var connection = $.connection.dashboardHub;

    // SignalR calls this 
    connection.client.addBeat = function (data) {
        var maxLength = 20;
        while ($scope.trafficData.length >= maxLength) {
            $scope.trafficData.splice(0, 1);

        $rootScope.$apply(function () {
            // Because Angular doesn't know it changed
            $scope.perMinute = data.perMinute; 

    $.connection.hub.start().done(function () {});

All this code does is provides a function for SignalR to call and starts SignalR hub to begin receiving events.

Server side

The project is built based on my Angular+ASP.NET MVC setup where all work is done in this class:

public class Dashboard  
    private readonly IHubConnectionContext<dynamic> _clients;
    private readonly BlockingCollection<Message> _messages;
    private readonly int _messageBufferSize;
    private readonly object _updateLastBeatLock = new object();
    private DateTime _lastBeat;

    public Dashboard(IHubConnectionContext<dynamic> clients, DateTime startDateTime, int messageBufferSize = 100)
        _clients = clients;
        _messages = new BlockingCollection<Message>();
        _lastBeat = startDateTime;
        _messageBufferSize = messageBufferSize;

    public void OnMessageReceived(string messageText)
        var message = JsonConvert.DeserializeObject<Message>(messageText);
        if (message == null)
        while (_messages.Count > _messageBufferSize)

    public void BroadcastBeat(DateTime beatDateTime)
        var beat = new Beat {DateTime = beatDateTime, WindowSize = beatDateTime - _lastBeat};
        var messagesInWindow = _messages
            .Where(s => s.DateTime >= _lastBeat && s.DateTime <= beatDateTime)

        lock (_updateLastBeatLock)
            _lastBeat = beatDateTime;

        beat.InfoCount = messagesInWindow.Count(s => s.LogLevel == LogLevel.Info);
        beat.DebugCount = messagesInWindow.Count(s => s.LogLevel == LogLevel.Debug);
        beat.ErrorCount = messagesInWindow.Count(s => s.LogLevel == LogLevel.Error);
        beat.FatalCount = messagesInWindow.Count(s => s.LogLevel == LogLevel.Fatal);


The class has two public methods. *OnMessageReceived is called when new JSON message is received from external system. In my case it's Redis, but it can be anything. The messages are pumped out when client calls BroacastBeat that aggregates messages for the given window and deletes aggregated messages. It was nice to find out about the existence of BlockingCollection which makes this easy.

Finally, there's a startup class:

public class Startup  
    private Dashboard _dashboard;
    private Timer _timer;

    public void Configuration(IAppBuilder app)
        var clients = GlobalHost.ConnectionManager.GetHubContext<DashboardHub>().Clients;

        IRedisListener redisListener;
#if TestRedisListener
        redisListener = new TestRedisListener(TimeSpan.FromSeconds(1));
        redisListener = new RedisListener("localhost", "urn:events");
        var windowSize = TimeSpan.FromSeconds(30);
        _dashboard = new Dashboard(clients, DateTime.Now);
        _timer = new Timer(OnTimerCallback, null, windowSize, windowSize);
        redisListener.Subsribe(_dashboard.OnMessageReceived); // This doesn't block

    private void OnTimerCallback(object state)

And that's really all that it takes to get a simple real-time dashboard up and running. The code is here.