How to setup Python Celery and RabbitMQ on Ubuntu 20.04

Overview:

This post shows how quickly to start up with RabbitMQ and Celery on your local machine based on Ubuntu 20.04

Repository

Testbed:

  1. Ubuntu 20.04
  2. Python 3.8.10
  3. Celery 5.2.6 (dawn-chorus)
  4. RabbitMQ 3.8.2

Core concepts:

  1. Celery is an asynchronous task queue. It can be used for anything that needs to be run asynchronously. It uses so-called workers to execute jobs from queues.
  2. RabbitMQ is a message broker. It is widely used with Celery. RabbitMQ acts as a message transport (broker)

Basic scheme:

scheme.png

Explanation:

  1. Producer emits messages to exchange
  2. Consumer receives messages from the queue
  3. RabbitMQ uses so-called Bindings to connect an exchange with a queue by using binding key
  4. Exchange routes the tasks by comparing his routing key with the binding key of the incoming task message
  5. By default RabbitMQ uses a special nameless exchange that is created automatically and compares the routing key with the queue name

Project initialization

  1. Create a project folder:
    mkdir test_celery
    
    cd test_celery
    
  2. Install RabbitMQ server:
    sudo apt-get install rabbitmq-server
    
    To completely remove rabbimq (optional) from your machine:
    sudo apt-get remove --auto-remove rabbitmq-server
    
    sudo apt-get purge --auto-remove rabbitmq-server
    
  3. Create a Python virtualenv:
    virtualenv venv
    
    Start:
    source venv/bin/activate
    
    To deactivate (optional) use:
    deactivate
    
  4. Install celery:
    pip install celery
    
  5. Configure RabbitMQ for Celery. Create a virtual host, user and set permissions: Add a user with a password:
    sudo rabbitmqctl add_user maxat password123
    
    Add virtual host:
    sudo rabbitmqctl add_vhost maxat_vhost
    
    Set user tag:
    sudo rabbitmqctl set_user_tags maxat maxat_tag
    
    Set permission for user:
    sudo rabbitmqctl set_permissions -p maxat_vhost maxat ".*" ".*" ".*"
    
    There are three kinds of operations in RabbitMQ: configure, write and read. The ".*" ".*" ".*" line means to grant all permissions. More info here
  6. Optional commands for rabbitMQ: To list users:
    sudo rabbitmqctl list_users
    
    To shutdown node:
    sudo rabbitmqctl shutdown
    

celery.py:

  1. Create celery.py file in celery directory with the connection credentials to the RabbitMQ:
    from __future__ import absolute_import
    from celery import Celery
    app = Celery('test_celery',
              broker='amqp://maxat:password123@localhost/maxat_vhost',
              backend='rpc://',
              include=['test_celery.tasks'])
    
  2. Broker connection scheme: amqp://[user]:[password]@[hostname]:[port]/[virtual_host]
  3. The first argument of Celery is just the name of the project package.
  4. The broker argument specifies the broker URL.
  5. The backend argument specifies a backend URL. A backend in Celery is used for storing the task results. So if you need to access the results of your task when it is finished, you should set a backend for Celery.
  6. The include argument specifies a list of modules that you want to import when Celery worker starts.

tasks.py

  1. Create tasks.py file:
    from __future__ import absolute_import
    from test_celery.celery import app
    import time
    @app.task
    def longtime_add(x, y):
     print ('long time task begins')
     # sleep 5 seconds
     time.sleep(5)
     print ('long time task finished')
     return x + y
    
  2. To add a function to the task queue we add a decorator: app.task

run_tasks.py

  1. Create run_tasks.py file:
    from .tasks import longtime_add
    import time
    if __name__ == '__main__':
     result = longtime_add.delay(1,2)
     # at this time, our task is not finished, so it will return False
     print('Task finished? ', result.ready())
     print('Task result: ', result.result)
     # sleep 10 seconds to ensure the task has been finished
     time.sleep(10)
     # now the task should be finished and ready method will return True
     print('Task finished? ', result.ready())
     print('Task result: ', result.result)
    

Start celery worker:

  1. Start celery worker with the most verbose flag info:
    celery -A test_celery worker --loglevel=info
    
  2. Output:

celery.png

Run tasks

  1. Start another terminal window and run your tasks:
    python3 -m test_celery.run_tasks
    
  2. Output celery: celery2.png
  3. Output of tasks run: celery3.png

Reference:

  1. Celery
  2. RabbitMQ
  3. RabbitMQ in 5 Minutes
  4. What is a Message Queue?
  5. Main article