Using Concurrent Ruby in a Ruby on Rails Application

Midhun Krishna

Midhun Krishna

June 5, 2018

Concurrent Ruby is a concurrency toolkit that builds on a lot of interesting ideas from many functional languages and classic concurrency patterns. When it comes to writing threaded code in Rails applications, look no further since concurrent ruby is already included in Rails via Active Support.

Using Concurrent::Future

In one of our applications, to improve performance we added threaded code using Concurrent::Future. It worked really well for us until one day it stopped working.

"Why threads?" one might ask. The code in question was a textbook threading use case. It had a few API calls, some DB requests and finally an action that was performed on all the data that was aggregated.

Let us look at what this code looks like.

Non threaded code

selected_shipping_companies.each do | carrier |
  # api calls
  distance_in_miles = find_distance_from_origin_to_destination
  historical_average_rate = historical_average_for_this_particular_carrier

  # action performed
  build_price_details_for_this_carrier(distance_in_miles,
                                       historical_average_rate)
end

Converting the above code to use Concurrent::Future is trivial.

futures = selected_shipping_companies.map do |carrier|
  Concurrent::Future.execute do
    # api calls
    distance_in_miles = find_distance_from_origin_to_destination
    historical_average_rate = historical_average_for_this_particular_carrier

    # action performed
    build_price_details_for_this_carrier(distance_in_miles,
                                         historical_average_rate)
  end
end

futures.map(&:value)

A bit more about Concurrent::Future

It is often intimidating to work with threads. They can bring in complexity and can have unpredictable behaviors due to lack of thread-safety. Ruby, being a language of mutable references, we often find it difficult to write 100% thread-safe code.

Inspired by Clojure's Future function, Concurrent::Future is a primitive that guarantees thead safety. It takes a block of work and performs the work asynchronously using Concurrent Ruby's global thread-pool. Once a block of work is scheduled, Concurrent Ruby gives us a handle to this future work, on which when #value (or #deref) is called block's value is returned.

The Bug

Usually, when an exception occurs in the main thread, the interpreter stops and gathers the exception data. In the case of Ruby Threads, any unhandled exceptions are reported only when Thread#join is called. Setting Thread#abort_on_exception to true, is an better alternative which will cause all threads to exit when an exception is raised in any running thread. We published a blog recently which talks about this in great detail.

Exception handling in Concurrent Ruby


future = Concurrent::Future.execute {
            raise StandardError.new("Boom!")
          }

sleep(0.1) # giving arbitrary time for future to execute

future.value     #=> nil

Where did the exception go? This code fails silently and swallows the exceptions. How can we find out if the code executed successfully?

future = Concurrent::Future.execute {
              raise StandardError.new("Boom!")
          }

sleep(0.1) # giving arbitrary time for future to execute

future.value     #=> nil

future.rejected? #=> true
future.reason    #=> "#<StandardError: Boom!>"

How we fixed our issue

We found places in our application where Concurrent::Future was used in a way that would swallow exceptions. It is also a possibility that people might overlook the explicit need to manually report exception. We addressed these concerns with the following wrapper class.

module ConcurrentExecutor
  class Error < StandardError
    def initialize(exceptions)
      @exceptions = exceptions
      super
    end

    def message
      @exceptions.map { | e | e.message }.join "\n"
    end

    def backtrace
      traces = @exceptions.map { |e| e.backtrace }
      ["ConcurrentExecutor::Error START", traces, "END"].flatten
    end
  end

  class Future
    def initialize(pool: nil)
      @pool = pool || Concurrent::FixedThreadPool.new(20)
      @exceptions = Concurrent::Array.new
    end

    # Sample Usage
    # executor = ConcurrentExecutor::Future.new(pool: pool)
    # executor.execute(carriers) do | carrier |
    #   ...
    # end
    #
    # values = executor.resolve

    def execute array, &block
      @futures = array.map do | element |
        Concurrent::Future.execute({ executor: @pool }) do
          yield(element)
        end.rescue do | exception |
          @exceptions << exception
        end
      end

      self
    end

    def resolve
      values = @futures.map(&:value)

      if @exceptions.length > 0
        raise ConcurrentExecutor::Error.new(@exceptions)
      end

      values
    end
  end
end

Please note that using Concurrent Ruby Futures caused segmentation fault while running specs in Circle CI. As of this writing, we are using normal looping instead of Futures in Circle CI until the reason for the segfault is isolated and fixed.

Update

Concurrent::Future also gives us another API which not only returns the value of the block but also posts/raises any exceptions that occur into the main thread.

thread_pool = Concurrent::FixedThreadPool.new(20)
executors = [1, 2, 3, 4].map do |random_number|
  Concurrent::Future.execute({ executor: thread_pool }) do
    random_number / (random_number.even? ? 0 : 1)
  end
end

executors.map(&:value)
=> [1, nil, 3, nil]

executors.map(&:value!)

> ZeroDivisionError: divided by 0
> from (pry):4:in `/'

We thank Jonathan Rochkind for pointing us to this undocumented api in his reddit post.

If this blog was helpful, check out our full blog archive.

Stay up to date with our blogs.

Subscribe to receive email notifications for new blog posts.