Verifying PubSub Services from Rails using Redis

Vipul

Vipul

May 9, 2015

Let's say that we have a Service that reads and writes to Redis. We have BaseRedisService for managing connection, RedisWriterService to write to Redis and RedisReaderService to read from Redis.

require 'redis'

# Base class to manage connection
class BaseRedisService
 REDIS_HOST = '127.0.0.1'

 def connection
   if !(defined?(@@connection) && @@connection)
     @@connection =  Redis.new({ host: REDIS_HOST })
   end
   @@connection
 end

end

# Class to write to Redis
class RedisWriterService <  BaseRedisService
 attr_reader :key, :value

 def initialize key, value
   @key, @value = key, value
 end

 def process
   connection.set key, value
 end

end

# Class to read from Redis
class RedisReaderService < BaseRedisService
attr_reader :key

def initialize key
  @key = key
end

def process
  connection.get key
end

end

A test for the above mentioned case might look like this.


require 'test_helper'

class RedisPubSubServiceTest < ActiveSupport::TestCase

def test_writing_and_reading_value_using_redis
  value = 'Vipul A M'

  RedisWriterService.new('name', value).process
  assert_equal value, RedisReaderService.new('name').process
end

end

But now, we need to add PubSub to the service and verify that the service sends proper messages to Redis. For verifying from such a service, the service would need to listen to messages sent to Redis. Problem is that Redis listens in a loop. We would need to explicitly release control from listening and allow our tests to go ahead and verify some scenario.

Lets look at how these services would look.

class RedisPublisherService < BaseRedisService
attr_reader :options, :channel

def initialize channel, options
  @channel     = channel
  @options = options
end

def process
  connection.publish(options, channel)
end

end

and our Subscriber looks like this.

class RedisSubscriberService < BaseRedisService
attr_reader :channel

def initialize channel
  @channel = channel
end

def process
  connection.subscribe(channel) do |on|
    on.message do |channel, message|
      puts message
    end
  end
end

end

Notice that we don't have control over returning value from the loop easily. Right now we just print on receiving a new message.

Now, lets start persisting our messages to some array in our Service. Also we will start exposing this from a thread variable so that it could be accessed from outside of execution of this listen loop.

class RedisSubscriberService < BaseRedisService
attr_reader :channel, :messages, :messages_count

def initialize channel, messages_count = 5
  @channel        = channel
  @messages       = []
  @messages_count = messages_count
end

def process
  connection.subscribe(channel) do |on|
    on.message do |channel, message|
      messages.unshift message
      Thread.current[:messages] = messages
    end
  end
end

end

We now have a way to access message state from the service to read any messages received by it. Lets say we define a new SubscriberService from a Thread, we could read messages like this.

subscriber = Thread.new { RedisSubscriberService.new('payment_alerts').process }
# Print first message from messages received
puts subscriber[:messages].first

Armed with this, we can now define a set of Rails helpers to use in our Rails tests.


module SubscriberHelpers
THREAD_PROCESS_TIMEOUT = 0.5

def setup_subscriber channel = 'test_channel'
  @subscriber = Thread.new { RedisSubscriberService.new(channel).process }
end

def teardown_subscriber
  @subscriber.kill
end

def with_subscriber
  yield
  @subscriber.join THREAD_PROCESS_TIMEOUT
end

def message_from_subscription
  @subscriber[:messages].first
end
end

Notice the with_subscriber method. It executes some code passed to it, then passes method execution to the subscriber process to read any messages sent and store onto messages store.

The count of the variable THREAD_PROCESS_TIMEOUT, can be experimented to set to a value that suites the system that's being verified.

In our tests, we can now verify PubSub as-


require 'test_helper'

class RedisPubSubServiceTest < ActiveSupport::TestCase
include SubscriberHelpers

def setup
  setup_subscriber
end

def teardown
  teardown_subscriber
end

def test_writing_and_reading_back_values_from_pub_sub
  value = 'Vipul A M'

  with_subscriber do
    RedisPublisherService.new('test_channel', value).process
  end

  assert_equal value, message_from_subscription
end
end

Summary

We took a look at how PubSub based services can be verified by using threads and exposing messages from them, for verification. These can be tailored to support any similar PubSub service like Redis, and can be used to easily verify values being published from our services from Rails tests.

If this blog was helpful, check out our full blog archive.

Stay up to date with our blogs.

Subscribe to receive email notifications for new blog posts.