3 Best Practices for Background Workers

Most web applications today use some form of background job processing, or a job scheduling system like Sidekiq or Celery. A job processing framework provides many benefits, like being able to process jobs in parallel and scale your async workers independently of your other application servers. But these benefits come at the tradeoff of introducing new problems that our application needs to be aware of to prevent our application from entering an inconsistent state.

To illustrate what problems can occur, let's consider the following worker and database.

class TransferFundsWorker
  include Sidekiq::Worker

  def perform(source_user_id, destination_user_id, amount)
    source_user = User.find source_user_id
    destination_user = User.find destination_user_id

    if source_user.balance < amount
        # log error somewhere
        return
    end

    source_user.balance -= amount
    destination_user.balance += amount

    source_user.save!
    destination_user.save!
  end
end
users
id | balance
1  | 100
2  | 200
3  | 50

Now let's call TransferFundsWorker.new.perform(1, 2, 50) . Assuming this is the only worker running at this particular time, and our server runs without any faults, our application will run as expected. But what happens in the following cases?

  1. Your application dies midway through the function? Or there is a database error while committing one of the db writes?
  2. Your job scheduler multiple workers for transfers from the same source account at the same time?
  3. Your job scheduler calls the same worker twice sequentially

In each of these cases, there is a possibility that our application ends up in an inconsistent state. Here are 3 best practices you can follow to maintain database consistency in each of the three cases described above.

Sidenote: In addition to Sidekiq I will be using abstractions provided by ActiveRecord, but the same concepts should apply regardless of what programming language or framework you are using. Most relational databases today will provide you with a way to implement each of the solutions I described below.

1. Make your workers transactional

In a distributed system, there is always the possibility that one of your servers can go down while your application is running. For this reason, we want our application to be tolerant of these types of failures by making sure our application code is stateless and groups of related state changes are transactional.

In general, we want the state of our application to be saved by a database that runs independently of our application code so that our servers can go up and down without losing any business-critical state. Similarly, we want our application to make groups of related state changes in a single atomic transaction so that if our app server goes down our database is not in an inconsistent state. For example, what happens if we call TransferFundsWorker.new.perform(1, 2, 50) and our app server goes down after running source_user.save!? User:1 will have had $50 deducted from their balance but User:2 will still have a balance of $200. Even worse, a job processing system like Sidekiq implements retries so this worker will run again and deduct another $50 from User:1's account.

ApplicationRecord.transaction do
  source_user.save!
  destination_user.save!
end

Now we can use the fact that database transactions are atomic, to guarantee that both of these db writes will happen together. That way even if there is an error in between the two db writes, the initial write will fail and our database will remain in a consistent state regardless of the failure.

2. Use database locks to prevent race conditions

Consider the case where we have two workers running at approximately the same time, TransferFundsWorker.new.perform(1, 2, 50) and TransferFundsWorker.new.perform(1, 3, 50). It's possible that both threads read the source_user's balance before the other worker makes any changes to that user's balance. In this case, both workers will read a balance of 100 for User:1. Then both workers will proceed to run source_user.balance -= amount which sets User:1's balance to 100 - 50, which is 50. After saving changes to the database we are left with the following state

users
id | balance
1  | 50
2  | 250
3  | 100

User:1 should have a balance of 0, not 50. But the race condition causes user_1's value to be set erroneously.

Solution: Use row level locks

We can grab a row-level lock on both the source_user and destination_user records in the users table. Grabbing a lock essentially throttle's the TransferFundsWorker, so that workers that are processing transfers on a common user concurrently, are sequenced one after the other.

def perform(source_user_id, destination_user_id, amount)
    ApplicationRecord.transaction do
        source_user = User.lock.find_by(id: source_user_id)
        destination_user = User.lock.find_by(id: destination_user_id)

        if source_user.balance < amount
            # log error somewhere
            return
         end

        source_user.balance -= amount
        destination_user.balance += amount

        source_user.save!
        destination_user.save!
      end
    end

What about deadlocks?

With the solution above, we allow the potential for a deadlock. For example, consider the following workers TransferFundsWorker.new.perform(1, 2, 50) and TransferFundsWorker.new.perform(2, 1, 50) running concurrently. It is possible that worker 1 grabs a lock on User:1, and worker 2 grabs a lock on User:2, and then both threads are waiting for the other worker to release the lock on User:2 and User:1 respectively.

Most relational databases like PostgreSQL have a mechanism to detect deadlocks and throw an error. In this case, both workers will fail and Sidekiq will automatically retry both jobs. We could implement something fancy like a scheduling algorithm to prevent deadlocks, but deadlocks are rare enough that we can simply retry both jobs and hope the deadlock doesn't occur again. (Ostrich algorithm)

3. Make your workers Idempotent

As soon as you move to a distributed system, it becomes impossible to guarantee exactly-once delivery semantics. That means it is possible that your job scheduler runs your job multiple times sequentially. How do we structure our worker so that if TransferFundsWorker.new.perform(1, 2, 50) is run twice, we only transfer $50 from User:1 to User:2 once?

Solution: Exactly Once Processing

What we want is that if the TransferFundsWorker is called multiple times for single transfer, it only processes the transfer once. This is what it means for an operation to be idempotent, you can call it as many times as you want with the same inputs and the result is the same. In our case the inputs are the parameters of the worker, and the output is the change of state in the application (which is saved in the db).

So how do we go about actually guaranteeing exactly once processing semantics? Let's try refactoring our function to keep track of the transfers with a new table.

def perform(source_user_id, destination_user_id, amount)
    ApplicationRecord.transaction do
      source_user = User.lock.find_by(id: source_user_id)
      destination_user = User.lock.find_by(id: destination_user_id)

      return if Transfer.where(source_user: source_user, 
                               destination_user: destination_user,
                               amount: amount).exists?

      if source_user.balance < amount
        # log error somewhere
        return
      end

      source_user.balance -= amount
      destination_user.balance += amount

      Transfer.create!(source_user: source_user,
                       destination_user: destination_user,
                       amount: amount)
      source_user.save!
      destination_user.save!
    end
  end

Now if we call TransferFundsWorker.new.perform(1, 2, 50) twice, we can guarantee that it will only process the transfer once. But we still have a problem, what if User:1 tries to transfer $50 to User:2 on two different days? Our worker is technically idempotent, but the behaviour of our system does not match what our application should be doing. We need some way of uniquely identifying a transfer request, so we can differentiate between two different requests with the same parameters.

Idempotency Keys

To uniquely identify a transfer, we can generate a UUID to act as our idempotency key. We can then pass this key to our worker, so our worker can uniquely identify a transfer request, and ensure idempotency.

def perform(transfer_request_id, source_user_id, destination_user_id, amount)
  ApplicationRecord.transaction do
    source_user = User.lock.find_by(id: source_user_id)
    destination_user = User.lock.find_by(id: destination_user_id)

    return if Transfer.where(transfer_request_id: transfer_request_id).exists?

    if source_user.balance < amount
      # log error somewhere
      return
    end

    source_user.balance -= amount
    destination_user.balance += amount

    Transfer.create!(transfer_request_id: transfer_request_id, 
                     source_user: source_user,
                     destination_user: destination_user, 
                     amount: amount)
    source_user.save!
    destination_user.save!
  end
end

Our idempotency key is the transfer_request_id and now the worker can uniquely identify a transfer request. As a best practice, we should also add a unique constraint on the transfer_request_id column in the transfers table but depending on your job scheduling system, you might still want the guard clause to allow for a graceful completion. Reason being, Sidekiq will automatically retry a job regardless of what error is thrown. To prevent our worker system from continuously retrying this failing job, we have a guard clause checking for the existence of a transfer with this key for a graceful return.

The solution we provided above is very specific to the use case we presented, but you could generalize this solution to work for all your workers as a middleware. We could create a jobs table with a database flag representing whether it was completed or not. Then we can automatically generate an idempotency key corresponding to that job, and toggle the flag in a transaction block with the worker being run. It would have the same effect as the solution we implemented above, except it will work for all of your jobs.

I will omit implementing the general solution because it will depend on what job processing framework you are using but, the example above should give you a good idea of how to implement one if you choose to do so.

Sidenote: Some job processing systems provide a mechanism to make your workers idempotent out of the box. If your framework already provides this mechanism, you should revert to using the existing solution over writing your own.

TLDR

  • Wrap related database writes in a transaction block to prevent your database from entering an invalid state
  • Use row-level database locks to prevent race conditions in your application
  • Make your workers idempotent with idempotency keys
Previous
Previous

I have an index and my query is still slow

Next
Next

Replication Lag: Messages from the Future