December 5, 2018
In one of our projects we need to capture user activity throughout the application. For example when a user updates projected distance of a delivery, the application should create an activity for that action. To create an activity we need the currently logged in user id since we need to associate the activity with that user.
We are using devise
gem for authentication
which provides current_user
method by default
to controllers.
Any business logic residing at controller level
can use current_user
to associate the activity
with the logged in user.
However, some business logics
reside in Sidekiq
where current_user
is not available.
current_user
to Sidekiq jobOne way to solve this issue
is to pass the current_user
directly
to the Sidekiq job.
Here's how we can do it.
class DeliveryController < ApplicationController
def update
# update attributes
DeliveryUpdateWorker.
perform_async(params[:delivery], current_user.login)
# render delivery
end
end
class DeliveryUpdateWorker
include Sidekiq::Worker
def perform(delivery, user_login)
user = User.find_by(login: user_login)
ActivityGenerationService.new(delivery, user) if user
end
end
That works. Now let's say we add another endpoint in which we need to track when delivery is deleted. Here's the updated code.
class DeliveryController < ApplicationController
def update
# update attributes
DeliveryUpdateWorker.
perform_async(params[:delivery], current_user.login)
# render delivery
end
def destroy
# delete attributes
DeliveryDeleteWorker.
perform_async(params[:delivery], current_user.login)
# render :ok
end
end
class DeliveryDeleteWorker
include Sidekiq::Worker
def perform(delivery, user_login)
user = User.find_by(login: user_login)
ActivityGenerationService.new(delivery, user) if user
end
end
Again we needed to pass
current_user
login
in the new endpoint.
You can notice a pattern here.
For each endpoint which needs
to track activity
we need to pass current_user
.
What if we could pass current_user
info
by default.
The main reason we want to
pass current_user
by default
is because we're tracking model attribute
changes in the model's before_save
callbacks.
For this we store current_user
info in Thread.current
and access
it in before_save
callbacks
of the model which
generated relevant activity.
This will work fine for
model attribute changes made in
controllers and services
where Thread.current
is
accessible and persisted.
However, for Sidekiq jobs
which changes the model attributes
whose activity is generated,
we need to pass the current_user
manually since Thread.current
is not available in Sidekiq jobs.
Again we can argue here that
we don't need to pass the current_user
by default. Instead we can pass it
in each Sidekiq job as an argument.
This will work in simple cases,
although for more complex cases
this will require extra effort.
For eg. let's say we're tracking
delivery's cost. We've three sidekiq
jobs, DeliveryDestinationChangeWorker
,
DeliveryRouteChangeWorker
and DeliveryCostChangeWorker
.
We call DeliveryDestinationChangeWorker
which changes the destination of a delivery.
This calls DeliveryRouteChangeWorker
which
calculates the new route and calls
DeliveryCostChangeWorker
. Now
DeliveryCostChangeWorker
changes the delivery
cost where the before_save
callback is called.
In this example you can see
that we need to pass current_user
through all three Sidekiq workers
and initialize Thread.current
in DeliveryCostChangeWorker
.
The nesting can go much deeper.
Passing current_user
by default
will make sure if the activity is
being generated in a model's
before_save
callback then
it can access the current_user
info from Thread.current
no
matter how much nested the Sidekiq
call chain is.
Also it makes sure that
if a developer adds another
Sidekiq worker class
in the future which
changes a model whose
attribute change needs to be tracked.
Then the developer need not
remember to pass
current_user
explicitly
to the Sidekiq worker.
Note the presented problem in this blog is an oversimplified version in order to better present the solution.
current_user
by defaultThe most basic solution to
pass current_user
by default
is to create a wrapper module.
This module will be responsible
for adding the current_user
when perform_async
is invoked.
Here's an example.
module SidekiqMediator
def perform_async(klass, *args)
args.push(current_user.login)
klass.send(:perform_async, *args)
end
end
class DeliveryController < ApplicationController
include SidekiqMediator
def update
# update attributes
perform_async(DeliveryUpdateWorker, params[:delivery])
# render delivery
end
def destroy
# delete attributes
perform_async(DeliveryDeleteWorker, params[:delivery])
# render :ok
end
end
class DeliveryDeleteWorker
include Sidekiq::Worker
def perform(delivery, user_login)
user = User.find_by(login: user_login)
ActivityGenerationService.new(delivery, user) if user
end
end
Now we don't need to pass
current_user
login in each call.
However we still need to
remember including SidekiqMediator
whenever we need to use current_user
in the Sidekiq job
for activity generation.
Another way to solve this problem
is to intercept the Sidekiq job
before it is pushed to redis.
Then we can include current_user
login by default.
current_user
by defaultSidekiq provides a client middleware
to run custom logic
before pushing the job in redis.
We can use the client middleware
to push current_user
as default argument
in the Sidekiq arguments.
Here's an example of Sidekiq client middleware.
class SidekiqClientMiddleware
def call(worker_class, job, queue, redis_pool = nil)
# Do something before pushing the job in redis
yield
end
end
We need a way to introduce
current_user
in the Sidekiq arguments.
The
job
payload contains the arguments passed to the Sidekiq worker.
Here's what the job
payload looks like.
{
"class": "DeliveryDeleteWorker",
"jid": "b4a577edbccf1d805744efa9",
"args": [1, "arg", true],
"created_at": 1234567890,
"enqueued_at": 1234567890
}
Notice here the args
key which is an array
containing the arguments passed
to the Sidekiq worker.
We can push the current_user
in the args
array.
This way each Sidekiq job
will have current_user
by default
as the last argument.
Here's the modified version
of the client middleware
which includes current_user
by default.
class SidekiqClientMiddleware
def call(_worker_class, job, _queue, _redis_pool = nil)
# Push current user login as the last argument by default
job['args'].push(current_user.login)
yield
end
end
Now we don't need to
pass current_user
login to Sidekiq workers
in the controller.
Here's how our controller logic
looks like now.
class DeliveryController < ApplicationController
def update
# update attributes
DeliveryUpdateWorker.perform_async(params[:data])
# render delivery
end
def destroy
# delete attributes
DeliveryDeleteWorker.perform_async(params[:data])
# render :ok
end
end
We don't need
SidekiqMediator
anymore.
The current_user
will automatically
be included as the last argument
in every Sidekiq job.
Although there's one issue here.
We included current_user
by default to every Sidekiq worker.
This means workers
which does not expect current_user
as an argument will also have
current_user
as their last argument.
This will raise ArgumentError: wrong number of arguments (2 for 1)
.
Here's an example.
class DeliveryCreateWorker
include Sidekiq::Worker
def perform(data)
# doesn't use current_user login to track activity when called
# this will get data, current_user_login as the arguments
end
end
To solve this we need to
extract current_user
argument from job['args']
before the worker starts processing.
current_user
loginSidekiq also provides
server middleware
which runs before processing
any Sidekiq job.
We used this to
extract current_user
from job['args']
and saved it in a global state.
This global state should persist when the server middleware execution is complete and the actual Sidekiq job processing has started. Here's the server middleware.
class SidekiqServerMiddleware
def call(_worker, job, _queue)
set_request_user(job['args'].pop)
yield
end
private
def set_request_user(request_user_login)
RequestStore.store[:request_user_login] = request_user_login
end
end
Notice here we used pop
to extract
the last argument.
Since we're setting
the last argument
to current_user
in the client middleware,
the last argument
will always be the
current_user
in server middleware.
Using pop
also removes
current_user
from job['args']
which ensures the worker
does not get
current_user
as an extra argument.
We used
request_store
to persist a global state.
RequestStore
provides
a per request global storage
using Thread.current
which stores info
as a key value pair.
Here's how we used it
in Sidekiq workers
to access the current_user
info.
class DeliveryDeleteWorker
include Sidekiq::Worker
def perform(delivery)
user_login = RequestStore.store[:request_user_login]
user = User.find_by(login: user_login)
ActivityGenerationService.new(delivery, user) if user
end
end
Now we don't need to pass
current_user
in the controller
when calling the Sidekiq worker.
Also we don't need to
add user_login
as an extra argument
in each Sidekiq worker
every time we need to
access current_user
.
By default Sidekiq
does not run
server middleware
in inline
and fake
mode.
Because of this
current_user
was being added in the
client middleware
but it's not being extracted
in the server middleware
since it's never called.
This resulted in
ArgumentError: wrong number of arguments (2 for 1)
failures in our test cases
which used Sidekiq in
inline
or fake
mode.
We solved this by adding following config:
Sidekiq::Testing.server_middleware do |chain|
chain.add SidekiqServerMiddleware
end
This ensures that SidekiqServerMiddleware
is called in
inline
and fake
mode
in our test cases.
However, we found an alternative
to this which was much
simpler and cleaner.
We noticed that job
payload
is a simple hash
which is pushed to redis
as it is
and is available
in the server middleware
as well.
Instead of adding the
current_user
as an argument in job['args']
we could add another key
in job
payload itself
which will hold the
current_user
.
Here's the modified logic.
class SidekiqClientMiddleware
def call(_worker_class, job, _queue, _redis_pool = nil)
# Set current user login in job payload
job['request_user_login'] = current_user.login if defined?(current_user)
yield
end
end
class SidekiqServerMiddleware
def call(_worker, job, _queue)
if job.key?('request_user_login')
set_request_user(job['request_user_login'])
end
yield
end
private
def set_request_user(request_user_login)
RequestStore.store[:request_user_login] = request_user_login
end
end
We used a unique key
request_user_login
which would not conflict with
the other keys in the
job
payload.
Additionally we added
a check if request_user_login
key
is present in the job
payload.
This is necessary
because if the user calls
the worker from console
then it'll not have current_user
set.
Apart from this we noticed
that we had multiple api services
talking to each other.
These services also generated user activity.
Few of them didn't use Devise
for authentication,
instead the requesting user info
was passed to them in each request
as param.
For these services
we set the request user info
in RequestStore.store
in our BaseApiController
and changed the client middleware
to use RequestStore.store
instead of current_user
method.
We also initialized RequestStore.store
in services
where we used Devise
to make it completely independent
of current_user
.
Here's how our client middleware
looks now.
class SidekiqClientMiddleware
def call(_worker_class, job, _queue, _redis_pool = nil)
# Set current user login in job payload
if RequestStore.store[:request_user_login]
job['request_user_login'] = RequestStore.store[:request_user_login]
end
yield
end
end
Lastly we needed to register the client and server middleware in Sidekiq.
To enable the middleware
with Sidekiq,
we need to register
the client middleware
and the server middleware
in config/initializers/sidekiq.rb
.
Here's how we did it.
Sidekiq.configure_client do |config|
config.client_middleware do |chain|
chain.add SidekiqClientMiddleware
end
end
Sidekiq.configure_server do |config|
config.client_middleware do |chain|
chain.add SidekiqClientMiddleware
end
config.server_middleware do |chain|
chain.add SidekiqServerMiddleware
end
end
Notice that we added SidekiqClientMiddleware
in both
configure_server
block
and configure_client
block,
this is because
a Sidekiq job can call
another Sidekiq job
in which case
the Sidekiq server itself
will act as the client.
To sum it up, here's how our client middleware and server middleware finally looked like.
class SidekiqClientMiddleware
def call(_worker_class, job, _queue, _redis_pool = nil)
# Set current user login in job payload
if RequestStore.store[:request_user_login]
job['request_user_login'] = RequestStore.store[:request_user_login]
end
yield
end
end
class SidekiqServerMiddleware
def call(_worker, job, _queue)
if job.key?('request_user_login')
set_request_user(job['request_user_login'])
end
yield
end
private
def set_request_user(request_user_login)
RequestStore.store[:request_user_login] = request_user_login
end
end
The controller example we mentioned initially looks like:
class DeliveryController < ApplicationController
def update
# update attributes
DeliveryUpdateWorker.perform_async(params[:delivery])
# render delivery
end
def destroy
# delete attributes
DeliveryDeleteWorker.perform_async(params[:delivery])
# render :ok
end
end
class DeliveryDeleteWorker
include Sidekiq::Worker
def perform(delivery)
user_login = RequestStore.store[:request_user_login]
user = User.find_by(login: user_login)
ActivityGenerationService.new(delivery, user) if user
end
end
class DeliveryUpdateWorker
include Sidekiq::Worker
def perform(delivery)
user_login = RequestStore.store[:request_user_login]
user = User.find_by(login: user_login)
ActivityGenerationService.new(delivery, user) if user
end
end
Now we don't need to
explicitly pass current_user
to each Sidekiq job.
It's available out of the box
without any changes
in all Sidekiq jobs.
As an alternative we can also use ActiveSupport::CurrentAttributes.
If this blog was helpful, check out our full blog archive.