In one of our projects we need to capture user activity throughout the application. For example when a user updates projected distance of a delivery, the application should create an activity for that action. To create an activity we need the currently logged in user id since we need to associate the activity with that user.
We are using devise gem for authentication which provides current_user method by default to controllers. Any business logic residing at controller level can use current_user to associate the activity with the logged in user. However, some business logics reside in Sidekiq where current_user is not available.
Passing current_user to Sidekiq job
One way to solve this issue is to pass the current_user directly to the Sidekiq job. Here's how we can do it.
1 class DeliveryController < ApplicationController 2 def update 3 # update attributes 4 DeliveryUpdateWorker. 5 perform_async(params[:delivery], current_user.login) 6 # render delivery 7 end 8 end
1 class DeliveryUpdateWorker 2 include Sidekiq::Worker 3 4 def perform(delivery, user_login) 5 user = User.find_by(login: user_login) 6 ActivityGenerationService.new(delivery, user) if user 7 end 8 end
That works. Now let's say we add another endpoint in which we need to track when delivery is deleted. Here's the updated code.
1 class DeliveryController < ApplicationController 2 def update 3 # update attributes 4 DeliveryUpdateWorker. 5 perform_async(params[:delivery], current_user.login) 6 # render delivery 7 end 8 9 def destroy 10 # delete attributes 11 DeliveryDeleteWorker. 12 perform_async(params[:delivery], current_user.login) 13 # render :ok 14 end 15 end
1 class DeliveryDeleteWorker 2 include Sidekiq::Worker 3 4 def perform(delivery, user_login) 5 user = User.find_by(login: user_login) 6 ActivityGenerationService.new(delivery, user) if user 7 end 8 end
Again we needed to pass current_user login in the new endpoint. You can notice a pattern here. For each endpoint which needs to track activity we need to pass current_user. What if we could pass current_user info by default.
The main reason we want to pass current_user by default is because we're tracking model attribute changes in the model's before_save callbacks.
For this we store current_user info in Thread.current and access it in before_save callbacks of the model which generated relevant activity.
This will work fine for model attribute changes made in controllers and services where Thread.current is accessible and persisted. However, for Sidekiq jobs which changes the model attributes whose activity is generated, we need to pass the current_user manually since Thread.current is not available in Sidekiq jobs.
Again we can argue here that we don't need to pass the current_user by default. Instead we can pass it in each Sidekiq job as an argument. This will work in simple cases, although for more complex cases this will require extra effort.
For eg. let's say we're tracking delivery's cost. We've three sidekiq jobs, DeliveryDestinationChangeWorker, DeliveryRouteChangeWorker and DeliveryCostChangeWorker. We call DeliveryDestinationChangeWorker which changes the destination of a delivery. This calls DeliveryRouteChangeWorker which calculates the new route and calls DeliveryCostChangeWorker. Now DeliveryCostChangeWorker changes the delivery cost where the before_save callback is called.
In this example you can see that we need to pass current_user through all three Sidekiq workers and initialize Thread.current in DeliveryCostChangeWorker. The nesting can go much deeper.
Passing current_user by default will make sure if the activity is being generated in a model's before_save callback then it can access the current_user info from Thread.current no matter how much nested the Sidekiq call chain is.
Also it makes sure that if a developer adds another Sidekiq worker class in the future which changes a model whose attribute change needs to be tracked. Then the developer need not remember to pass current_user explicitly to the Sidekiq worker.
Note the presented problem in this blog is an oversimplified version in order to better present the solution.
Creating a wrapper module to include current_user by default
The most basic solution to pass current_user by default is to create a wrapper module. This module will be responsible for adding the current_user when perform_async is invoked. Here's an example.
1 module SidekiqMediator 2 def perform_async(klass, *args) 3 args.push(current_user.login) 4 klass.send(:perform_async, *args) 5 end 6 end
1 class DeliveryController < ApplicationController 2 include SidekiqMediator 3 4 def update 5 # update attributes 6 perform_async(DeliveryUpdateWorker, params[:delivery]) 7 # render delivery 8 end 9 10 def destroy 11 # delete attributes 12 perform_async(DeliveryDeleteWorker, params[:delivery]) 13 # render :ok 14 end 15 end
1 class DeliveryDeleteWorker 2 include Sidekiq::Worker 3 4 def perform(delivery, user_login) 5 user = User.find_by(login: user_login) 6 ActivityGenerationService.new(delivery, user) if user 7 end 8 end
Now we don't need to pass current_user login in each call. However we still need to remember including SidekiqMediator whenever we need to use current_user in the Sidekiq job for activity generation. Another way to solve this problem is to intercept the Sidekiq job before it is pushed to redis. Then we can include current_user login by default.
Using Sidekiq client middleware to pass current_user by default
Sidekiq provides a client middleware to run custom logic before pushing the job in redis. We can use the client middleware to push current_user as default argument in the Sidekiq arguments. Here's an example of Sidekiq client middleware.
1 class SidekiqClientMiddleware 2 def call(worker_class, job, queue, redis_pool = nil) 3 # Do something before pushing the job in redis 4 yield 5 end 6 end
We need a way to introduce current_user in the Sidekiq arguments. The job payload contains the arguments passed to the Sidekiq worker. Here's what the job payload looks like.
1 { 2 "class": "DeliveryDeleteWorker", 3 "jid": "b4a577edbccf1d805744efa9", 4 "args": [1, "arg", true], 5 "created_at": 1234567890, 6 "enqueued_at": 1234567890 7 }
Notice here the args key which is an array containing the arguments passed to the Sidekiq worker. We can push the current_user in the args array. This way each Sidekiq job will have current_user by default as the last argument. Here's the modified version of the client middleware which includes current_user by default.
1 class SidekiqClientMiddleware 2 def call(_worker_class, job, _queue, _redis_pool = nil) 3 # Push current user login as the last argument by default 4 job['args'].push(current_user.login) 5 yield 6 end 7 end
Now we don't need to pass current_user login to Sidekiq workers in the controller. Here's how our controller logic looks like now.
1 class DeliveryController < ApplicationController 2 def update 3 # update attributes 4 DeliveryUpdateWorker.perform_async(params[:data]) 5 # render delivery 6 end 7 8 def destroy 9 # delete attributes 10 DeliveryDeleteWorker.perform_async(params[:data]) 11 # render :ok 12 end 13 end
We don't need SidekiqMediator anymore. The current_user will automatically be included as the last argument in every Sidekiq job.
Although there's one issue here. We included current_user by default to every Sidekiq worker. This means workers which does not expect current_user as an argument will also have current_user as their last argument. This will raise ArgumentError: wrong number of arguments (2 for 1). Here's an example.
1 class DeliveryCreateWorker 2 include Sidekiq::Worker 3 4 def perform(data) 5 # doesn't use current_user login to track activity when called 6 # this will get data, current_user_login as the arguments 7 end 8 end
To solve this we need to extract current_user argument from job['args'] before the worker starts processing.
Using Sidekiq server middleware to extract current_user login
Sidekiq also provides server middleware which runs before processing any Sidekiq job. We used this to extract current_user from job['args'] and saved it in a global state.
This global state should persist when the server middleware execution is complete and the actual Sidekiq job processing has started. Here's the server middleware.
1 class SidekiqServerMiddleware 2 def call(_worker, job, _queue) 3 set_request_user(job['args'].pop) 4 yield 5 end 6 7 private 8 def set_request_user(request_user_login) 9 RequestStore.store[:request_user_login] = request_user_login 10 end 11 end
Notice here we used pop to extract the last argument. Since we're setting the last argument to current_user in the client middleware, the last argument will always be the current_user in server middleware.
Using pop also removes current_user from job['args'] which ensures the worker does not get current_user as an extra argument.
We used request_store to persist a global state. RequestStore provides a per request global storage using Thread.current which stores info as a key value pair. Here's how we used it in Sidekiq workers to access the current_user info.
1 class DeliveryDeleteWorker 2 include Sidekiq::Worker 3 4 def perform(delivery) 5 user_login = RequestStore.store[:request_user_login] 6 user = User.find_by(login: user_login) 7 ActivityGenerationService.new(delivery, user) if user 8 end 9 end
Now we don't need to pass current_user in the controller when calling the Sidekiq worker. Also we don't need to add user_login as an extra argument in each Sidekiq worker every time we need to access current_user.
Configure server middleware for Sidekiq test cases
By default Sidekiq does not run server middleware in inline and fake mode.
Because of this current_user was being added in the client middleware but it's not being extracted in the server middleware since it's never called.
This resulted in ArgumentError: wrong number of arguments (2 for 1) failures in our test cases which used Sidekiq in inline or fake mode. We solved this by adding following config:
1 Sidekiq::Testing.server_middleware do |chain| 2 chain.add SidekiqServerMiddleware 3 end
This ensures that SidekiqServerMiddleware is called in inline and fake mode in our test cases.
However, we found an alternative to this which was much simpler and cleaner. We noticed that job payload is a simple hash which is pushed to redis as it is and is available in the server middleware as well.
Instead of adding the current_user as an argument in job['args'] we could add another key in job payload itself which will hold the current_user. Here's the modified logic.
1 class SidekiqClientMiddleware 2 def call(_worker_class, job, _queue, _redis_pool = nil) 3 # Set current user login in job payload 4 job['request_user_login'] = current_user.login if defined?(current_user) 5 yield 6 end 7 end
1 class SidekiqServerMiddleware 2 def call(_worker, job, _queue) 3 if job.key?('request_user_login') 4 set_request_user(job['request_user_login']) 5 end 6 yield 7 end 8 9 private 10 def set_request_user(request_user_login) 11 RequestStore.store[:request_user_login] = request_user_login 12 end 13 end
We used a unique key request_user_login which would not conflict with the other keys in the job payload. Additionally we added a check if request_user_login key is present in the job payload. This is necessary because if the user calls the worker from console then it'll not have current_user set.
Apart from this we noticed that we had multiple api services talking to each other. These services also generated user activity. Few of them didn't use Devise for authentication, instead the requesting user info was passed to them in each request as param.
For these services we set the request user info in RequestStore.store in our BaseApiController and changed the client middleware to use RequestStore.store instead of current_user method.
We also initialized RequestStore.store in services where we used Devise to make it completely independent of current_user. Here's how our client middleware looks now.
1 class SidekiqClientMiddleware 2 def call(_worker_class, job, _queue, _redis_pool = nil) 3 # Set current user login in job payload 4 if RequestStore.store[:request_user_login] 5 job['request_user_login'] = RequestStore.store[:request_user_login] 6 end 7 yield 8 end 9 end
Lastly we needed to register the client and server middleware in Sidekiq.
Configuring Sidekiq middleware
To enable the middleware with Sidekiq, we need to register the client middleware and the server middleware in config/initializers/sidekiq.rb. Here's how we did it.
1Sidekiq.configure_client do |config| 2 config.client_middleware do |chain| 3 chain.add SidekiqClientMiddleware 4 end 5end 6 7Sidekiq.configure_server do |config| 8 config.client_middleware do |chain| 9 chain.add SidekiqClientMiddleware 10 end 11 config.server_middleware do |chain| 12 chain.add SidekiqServerMiddleware 13 end 14end
Notice that we added SidekiqClientMiddleware in both configure_server block and configure_client block, this is because a Sidekiq job can call another Sidekiq job in which case the Sidekiq server itself will act as the client.
To sum it up, here's how our client middleware and server middleware finally looked like.
1 class SidekiqClientMiddleware 2 def call(_worker_class, job, _queue, _redis_pool = nil) 3 # Set current user login in job payload 4 if RequestStore.store[:request_user_login] 5 job['request_user_login'] = RequestStore.store[:request_user_login] 6 end 7 yield 8 end 9 end
1 class SidekiqServerMiddleware 2 def call(_worker, job, _queue) 3 if job.key?('request_user_login') 4 set_request_user(job['request_user_login']) 5 end 6 yield 7 end 8 9 private 10 def set_request_user(request_user_login) 11 RequestStore.store[:request_user_login] = request_user_login 12 end 13 end
The controller example we mentioned initially looks like:
1 class DeliveryController < ApplicationController 2 def update 3 # update attributes 4 DeliveryUpdateWorker.perform_async(params[:delivery]) 5 # render delivery 6 end 7 8 def destroy 9 # delete attributes 10 DeliveryDeleteWorker.perform_async(params[:delivery]) 11 # render :ok 12 end 13 end
1 class DeliveryDeleteWorker 2 include Sidekiq::Worker 3 4 def perform(delivery) 5 user_login = RequestStore.store[:request_user_login] 6 user = User.find_by(login: user_login) 7 ActivityGenerationService.new(delivery, user) if user 8 end 9 end
1 class DeliveryUpdateWorker 2 include Sidekiq::Worker 3 4 def perform(delivery) 5 user_login = RequestStore.store[:request_user_login] 6 user = User.find_by(login: user_login) 7 ActivityGenerationService.new(delivery, user) if user 8 end 9 end
Now we don't need to explicitly pass current_user to each Sidekiq job. It's available out of the box without any changes in all Sidekiq jobs.
As an alternative we can also use ActiveSupport::CurrentAttributes.