We write about Ruby on Rails, React.js, React Native, remote work, open source, engineering and design.
Concurrent Ruby is a concurrency toolkit that builds on a lot of interesting ideas from many functional languages and classic concurrency patterns. When it comes to writing threaded code in Rails applications, look no further since concurrent ruby is already included in Rails via Active Support.
In one of our applications, to improve performance we added threaded code using Concurrent::Future. It worked really well for us until one day it stopped working.
"Why threads?" one might ask. The code in question was a textbook threading use case. It had a few API calls, some DB requests and finally an action that was performed on all the data that was aggregated.
Let us look at what this code looks like.
1selected_shipping_companies.each do | carrier |
2 # api calls
3 distance_in_miles = find_distance_from_origin_to_destination
4 historical_average_rate = historical_average_for_this_particular_carrier
5
6 # action performed
7 build_price_details_for_this_carrier(distance_in_miles,
8 historical_average_rate)
9end
Converting the above code to use Concurrent::Future is trivial.
1futures = selected_shipping_companies.map do |carrier|
2 Concurrent::Future.execute do
3 # api calls
4 distance_in_miles = find_distance_from_origin_to_destination
5 historical_average_rate = historical_average_for_this_particular_carrier
6
7 # action performed
8 build_price_details_for_this_carrier(distance_in_miles,
9 historical_average_rate)
10 end
11end
12
13futures.map(&:value)
It is often intimidating to work with threads. They can bring in complexity and can have unpredictable behaviors due to lack of thread-safety. Ruby, being a language of mutable references, we often find it difficult to write 100% thread-safe code.
Inspired by Clojure's Future function, Concurrent::Future is a primitive that guarantees thead safety. It takes a block of work and performs the work asynchronously using Concurrent Ruby's global thread-pool. Once a block of work is scheduled, Concurrent Ruby gives us a handle to this future work, on which when #value (or #deref) is called block's value is returned.
Usually,
when an exception occurs in the main thread,
the interpreter stops
and
gathers the exception data.
In the case of Ruby Threads,
any unhandled exceptions are reported
only when
Thread#join
is called.
Setting Thread#abort_on_exception
to true
,
is an better alternative
which will cause all threads to exit
when an
exception is raised
in any running thread.
We
published a blog
recently which talks about this in great detail.
1
2future = Concurrent::Future.execute {
3 raise StandardError.new("Boom!")
4 }
5
6sleep(0.1) # giving arbitrary time for future to execute
7
8future.value #=> nil
Where did the exception go? This code fails silently and swallows the exceptions. How can we find out if the code executed successfully?
1future = Concurrent::Future.execute {
2 raise StandardError.new("Boom!")
3 }
4
5sleep(0.1) # giving arbitrary time for future to execute
6
7future.value #=> nil
8
9future.rejected? #=> true
10future.reason #=> "#<StandardError: Boom!>"
We found places in our application where Concurrent::Future was used in a way that would swallow exceptions. It is also a possibility that people might overlook the explicit need to manually report exception. We addressed these concerns with the following wrapper class.
1module ConcurrentExecutor
2 class Error < StandardError
3 def initialize(exceptions)
4 @exceptions = exceptions
5 super
6 end
7
8 def message
9 @exceptions.map { | e | e.message }.join "\n"
10 end
11
12 def backtrace
13 traces = @exceptions.map { |e| e.backtrace }
14 ["ConcurrentExecutor::Error START", traces, "END"].flatten
15 end
16 end
17
18 class Future
19 def initialize(pool: nil)
20 @pool = pool || Concurrent::FixedThreadPool.new(20)
21 @exceptions = Concurrent::Array.new
22 end
23
24 # Sample Usage
25 # executor = ConcurrentExecutor::Future.new(pool: pool)
26 # executor.execute(carriers) do | carrier |
27 # ...
28 # end
29 #
30 # values = executor.resolve
31
32 def execute array, &block
33 @futures = array.map do | element |
34 Concurrent::Future.execute({ executor: @pool }) do
35 yield(element)
36 end.rescue do | exception |
37 @exceptions << exception
38 end
39 end
40
41 self
42 end
43
44 def resolve
45 values = @futures.map(&:value)
46
47 if @exceptions.length > 0
48 raise ConcurrentExecutor::Error.new(@exceptions)
49 end
50
51 values
52 end
53 end
54end
Please note that using Concurrent Ruby Futures caused segmentation fault while running specs in Circle CI. As of this writing, we are using normal looping instead of Futures in Circle CI until the reason for the segfault is isolated and fixed.
Concurrent::Future also gives us another API which not only returns the value of the block but also posts/raises any exceptions that occur into the main thread.
1thread_pool = Concurrent::FixedThreadPool.new(20)
2executors = [1, 2, 3, 4].map do |random_number|
3 Concurrent::Future.execute({ executor: thread_pool }) do
4 random_number / (random_number.even? ? 0 : 1)
5 end
6end
7
8executors.map(&:value)
9=> [1, nil, 3, nil]
10
11executors.map(&:value!)
12
13> ZeroDivisionError: divided by 0
14> from (pry):4:in `/'
15
We thank Jonathan Rochkind for pointing us to this undocumented api in his reddit post.