Integration testing of components that communicate through asynchronous messages presents interesting challenges. I was adding a feature to somebody else's code which was listening for Redis message and doing some processing. I added a few lines of code to the end of this code to publish another Redis message:

public void WaitOnQueue()  
{
    // Wait for message
    // and when received call
    //PublishContractSigned
}
private void PublishContractSigned(ProcessResponse response)  
{
    using (IRedisClient client = new RedisClient(ConfigurationManager.AppSettings["RedisHost"]))
    {
            var messageData = JsonSerializer.SerializeToString(new ContractSignedMessage {AccountId = response.AccountId.Value});
            client.PublishMessage("urn:contractsigned", messageData);
    }    
}

So the code essentially receives one message and sends another one. In my integration test I needed to do the following steps:

  1. Begin waiting on queue by calling WaitOnQueue
  2. Setup my own message listener for the message that will be published by the code if everything goes well
  3. Initiate test by sending message
  4. Make sure the message is received within timeout.

This is how it all ended up being in test with many simplifications:

[TestFixture]
public class PublishMessageTests  
{
    private SignProc _proc;
    private static bool _messageReceived = false;
    private static int _accountIdReceived;
    private static readonly object _locker = new object();

    [Test]
    public void Proc_WaitOnQueue()
    {
        // Arrange
        var messageReceived = false;
        Task.Factory.StartNew(_proc.WaitOnQueue); 
        Task.Factory.StartNew(WaitForResponseMessage);
        Thread.Sleep(500);

        // Act
        SendMessage();

        // Assert
        const int timeoutSeconds = 60;
        var start = DateTime.Now;
        while ((DateTime.Now - start).Seconds < timeoutSeconds)
        {
            lock (_locker)
            {
                if (_messageReceived)
                {
                    break;
                }
            }
            Thread.Sleep(50);
        }
        Assert.IsTrue(_messageReceived, string.Format("No message within {0}s", timeoutSeconds));
        Assert.That(_accountIdReceived, Is.EqualTo(_control.AccountId));
    }

    private void WaitForResponseMessage()
    {
        using (IRedisClient client = new RedisClient(ConfigurationManager.AppSettings["RedisHost"]))
        {
            var sub = client.CreateSubscription();
            sub.OnMessage = (c, m) =>
            {
                lock (_locker)
                {
                    _messageReceived = true;
                    var message = JsonSerializer.DeserializeFromString<SignProcessing.ContractSignedMessage>(m.ToString());
                    _accountIdReceived = message.AccountId;
                }
            };
            sub.SubscribeToChannels("urn:contractsigned"); // this blocks
        }
    }