3 Best Practices for your Sidekiq Jobs
Most web applications today use some form of background job processing, or a job scheduling system like Sidekiq or Celery. A job processing framework provides many benefits, like being able to process jobs in parallel and scale your async workers independently of your other application servers. But these benefits come at the tradeoff of introducing new problems that our application needs to be aware of to prevent our application from entering an inconsistent state.
To illustrate what problems can occur, let's consider the following worker and database.
class TransferFundsWorker
include Sidekiq::Worker
def perform(source_user_id, destination_user_id, amount)
source_user = User.find source_user_id
destination_user = User.find destination_user_id
if source_user.balance < amount
# log error somewhere
return
end
source_user.balance -= amount
destination_user.balance += amount
source_user.save!
destination_user.save!
end
end
users
id | balance
1 | 100
2 | 200
3 | 50
Now let's call TransferFundsWorker.new.perform(1, 2, 50)
. Assuming this is the only worker running at this particular time, and our server runs without any faults, our application will run as expected. But what happens in the following cases?
- Your application dies midway through the function? Or there is a database error while committing one of the db writes?
- Your job scheduler multiple workers for transfers from the same source account at the same time?
- Your job scheduler calls the same worker twice sequentially
In each of these cases, there is a possibility that our application ends up in an inconsistent state. Here are 3 best practices you can follow to maintain database consistency in each of the three cases described above.
Sidenote: In addition to Sidekiq I will be using abstractions provided by ActiveRecord, but the same concepts should apply regardless of what programming language or framework you are using. Most relational databases today will provide you with a way to implement each of the solutions I described below.
1. Make your workers transactional
In a distributed system, there is always the possibility that one of your servers can go down while your application is running. For this reason, we want our application to be tolerant of these types of failures by making sure our application code is stateless and groups of related state changes are transactional.
In general, we want the state of our application to be saved by a database that runs independently of our application code so that our servers can go up and down without losing any business-critical state. Similarly, we want our application to make groups of related state changes in a single atomic transaction so that if our app server goes down our database is not in an inconsistent state. For example, what happens if we call TransferFundsWorker.new.perform(1, 2, 50)
and our app server goes down after running source_user.save!
? User:1 will have had $50 deducted from their balance but User:2 will still have a balance of $200. Even worse, a job processing system like Sidekiq implements retries so this worker will run again and deduct another $50 from User:1's account.
Solution: Wrap related database writes in a transaction block
ApplicationRecord.transaction do
source_user.save!
destination_user.save!
end
Now we can use the fact that database transactions are atomic, to guarantee that both of these db writes will happen together. That way even if there is an error in between the two db writes, the initial write will fail and our database will remain in a consistent state regardless of the failure.
2. Use database locks to prevent race conditions
Consider the case where we have two workers running at approximately the same time, TransferFundsWorker.new.perform(1, 2, 50)
and TransferFundsWorker.new.perform(1, 3, 50)
. It's possible that both threads read the source_user's balance before the other worker makes any changes to that user's balance. In this case, both workers will read a balance of 100 for User:1. Then both workers will proceed to run source_user.balance -= amount
which sets User:1's balance to 100 - 50, which is 50. After saving changes to the database we are left with the following state
users
id | balance
1 | 50
2 | 250
3 | 100
User:1 should have a balance of 0, not 50. But the race condition causes user_1's value to be set erroneously.
Solution: Use row level locks
We can grab a row-level lock on both the source_user and destination_user records in the users table. Grabbing a lock essentially throttle's the TransferFundsWorker, so that workers that are processing transfers on a common user concurrently, are sequenced one after the other.
def perform(source_user_id, destination_user_id, amount)
ApplicationRecord.transaction do
source_user = User.lock.find_by(id: source_user_id)
destination_user = User.lock.find_by(id: destination_user_id)
if source_user.balance < amount
# log error somewhere
return
end
source_user.balance -= amount
destination_user.balance += amount
source_user.save!
destination_user.save!
end
end
What about deadlocks?
With the solution above, we allow the potential for a deadlock. For example, consider the following workers TransferFundsWorker.new.perform(1, 2, 50)
and TransferFundsWorker.new.perform(2, 1, 50)
running concurrently. It is possible that worker 1 grabs a lock on User:1, and worker 2 grabs a lock on User:2, and then both threads are waiting for the other worker to release the lock on User:2 and User:1 respectively.
Most relational databases like PostgreSQL have a mechanism to detect deadlocks and throw an error. In this case, both workers will fail and Sidekiq will automatically retry both jobs. We could implement something fancy like a scheduling algorithm to prevent deadlocks, but deadlocks are rare enough that we can simply retry both jobs and hope the deadlock doesn't occur again. (Ostrich algorithm)
3. Make your workers Idempotent
As soon as you move to a distributed system, it becomes impossible to guarantee exactly-once delivery semantics. That means it is possible that your job scheduler runs your job multiple times sequentially. How do we structure our worker so that if TransferFundsWorker.new.perform(1, 2, 50)
is run twice, we only transfer $50 from User:1 to User:2 once?
Solution: Exactly Once Processing
What we want is that if the TransferFundsWorker is called multiple times for single transfer, it only processes the transfer once. This is what it means for an operation to be idempotent, you can call it as many times as you want with the same inputs and the result is the same. In our case the inputs are the parameters of the worker, and the output is the change of state in the application (which is saved in the db).
So how do we go about actually guaranteeing exactly once processing semantics? Let's try refactoring our function to keep track of the transfers with a new table.
def perform(source_user_id, destination_user_id, amount)
ApplicationRecord.transaction do
source_user = User.lock.find_by(id: source_user_id)
destination_user = User.lock.find_by(id: destination_user_id)
return if Transfer.where(source_user: source_user,
destination_user: destination_user,
amount: amount).exists?
if source_user.balance < amount
# log error somewhere
return
end
source_user.balance -= amount
destination_user.balance += amount
Transfer.create!(source_user: source_user,
destination_user: destination_user,
amount: amount)
source_user.save!
destination_user.save!
end
end
Now if we call TransferFundsWorker.new.perform(1, 2, 50)
twice, we can guarantee that it will only process the transfer once. But we still have a problem, what if User:1 tries to transfer $50 to User:2 on two different days? Our worker is technically idempotent, but the behaviour of our system does not match what our application should be doing. We need some way of uniquely identifying a transfer request, so we can differentiate between two different requests with the same parameters.
Idempotency Keys
To uniquely identify a transfer, we can generate a UUID to act as our idempotency key. We can then pass this key to our worker, so our worker can uniquely identify a transfer request, and ensure idempotency.
def perform(transfer_request_id, source_user_id, destination_user_id, amount)
ApplicationRecord.transaction do
source_user = User.lock.find_by(id: source_user_id)
destination_user = User.lock.find_by(id: destination_user_id)
return if Transfer.where(transfer_request_id: transfer_request_id).exists?
if source_user.balance < amount
# log error somewhere
return
end
source_user.balance -= amount
destination_user.balance += amount
Transfer.create!(transfer_request_id: transfer_request_id,
source_user: source_user,
destination_user: destination_user,
amount: amount)
source_user.save!
destination_user.save!
end
end
Our idempotency key is the transfer_request_id
and now the worker can uniquely identify a transfer request. As a best practice, we should also add a unique constraint on the transfer_request_id
column in the transfers table but depending on your job scheduling system, you might still want the guard clause to allow for a graceful completion. Reason being, Sidekiq will automatically retry a job regardless of what error is thrown. To prevent our worker system from continuously retrying this failing job, we have a guard clause checking for the existence of a transfer with this key for a graceful return.
The solution we provided above is very specific to the use case we presented, but you could generalize this solution to work for all your workers as a middleware. We could create a jobs table with a database flag representing whether it was completed or not. Then we can automatically generate an idempotency key corresponding to that job, and toggle the flag in a transaction block with the worker being run. It would have the same effect as the solution we implemented above, except it will work for all of your jobs.
I will omit implementing the general solution because it will depend on what job processing framework you are using but, the example above should give you a good idea of how to implement one if you choose to do so.
Sidenote: Some job processing systems provide a mechanism to make your workers idempotent out of the box. If your framework already provides this mechanism, you should revert to using the existing solution over writing your own.
TLDR
- Wrap related database writes in a transaction block to prevent your database from entering an invalid state
- Use row-level database locks to prevent race conditions in your application
- Make your workers idempotent with idempotency keys