Skip to main content

Command Palette

Search for a command to run...

How to setup Python Celery and RabbitMQ on Ubuntu 20.04

Published
3 min read
How to setup Python Celery and RabbitMQ on Ubuntu 20.04
M

Hey, I'm a postgraduate in Cyber Security with practical experience in Software Engineering and DevOps Operations. The top player on TryHackMe platform, multilingual speaker (Kazakh, Russian, English, Spanish, and Turkish), curios person, bookworm, geek, sports lover, and just a good guy to speak with!

Overview:

This post shows how quickly to start up with RabbitMQ and Celery on your local machine based on Ubuntu 20.04

Repository

Testbed:

  1. Ubuntu 20.04
  2. Python 3.8.10
  3. Celery 5.2.6 (dawn-chorus)
  4. RabbitMQ 3.8.2

Core concepts:

  1. Celery is an asynchronous task queue. It can be used for anything that needs to be run asynchronously. It uses so-called workers to execute jobs from queues.
  2. RabbitMQ is a message broker. It is widely used with Celery. RabbitMQ acts as a message transport (broker)

Basic scheme:

scheme.png

Explanation:

  1. Producer emits messages to exchange
  2. Consumer receives messages from the queue
  3. RabbitMQ uses so-called Bindings to connect an exchange with a queue by using binding key
  4. Exchange routes the tasks by comparing his routing key with the binding key of the incoming task message
  5. By default RabbitMQ uses a special nameless exchange that is created automatically and compares the routing key with the queue name

Project initialization

  1. Create a project folder:
    mkdir test_celery
    
    cd test_celery
    
  2. Install RabbitMQ server:
    sudo apt-get install rabbitmq-server
    
    To completely remove rabbimq (optional) from your machine:
    sudo apt-get remove --auto-remove rabbitmq-server
    
    sudo apt-get purge --auto-remove rabbitmq-server
    
  3. Create a Python virtualenv:
    virtualenv venv
    
    Start:
    source venv/bin/activate
    
    To deactivate (optional) use:
    deactivate
    
  4. Install celery:
    pip install celery
    
  5. Configure RabbitMQ for Celery. Create a virtual host, user and set permissions: Add a user with a password:
    sudo rabbitmqctl add_user maxat password123
    
    Add virtual host:
    sudo rabbitmqctl add_vhost maxat_vhost
    
    Set user tag:
    sudo rabbitmqctl set_user_tags maxat maxat_tag
    
    Set permission for user:
    sudo rabbitmqctl set_permissions -p maxat_vhost maxat ".*" ".*" ".*"
    
    There are three kinds of operations in RabbitMQ: configure, write and read. The ".*" ".*" ".*" line means to grant all permissions. More info here
  6. Optional commands for rabbitMQ: To list users:
    sudo rabbitmqctl list_users
    
    To shutdown node:
    sudo rabbitmqctl shutdown
    

celery.py:

  1. Create celery.py file in celery directory with the connection credentials to the RabbitMQ:
    from __future__ import absolute_import
    from celery import Celery
    app = Celery('test_celery',
              broker='amqp://maxat:password123@localhost/maxat_vhost',
              backend='rpc://',
              include=['test_celery.tasks'])
    
  2. Broker connection scheme: amqp://[user]:[password]@[hostname]:[port]/[virtual_host]
  3. The first argument of Celery is just the name of the project package.
  4. The broker argument specifies the broker URL.
  5. The backend argument specifies a backend URL. A backend in Celery is used for storing the task results. So if you need to access the results of your task when it is finished, you should set a backend for Celery.
  6. The include argument specifies a list of modules that you want to import when Celery worker starts.

tasks.py

  1. Create tasks.py file:
    from __future__ import absolute_import
    from test_celery.celery import app
    import time
    @app.task
    def longtime_add(x, y):
     print ('long time task begins')
     # sleep 5 seconds
     time.sleep(5)
     print ('long time task finished')
     return x + y
    
  2. To add a function to the task queue we add a decorator: app.task

run_tasks.py

  1. Create run_tasks.py file:
    from .tasks import longtime_add
    import time
    if __name__ == '__main__':
     result = longtime_add.delay(1,2)
     # at this time, our task is not finished, so it will return False
     print('Task finished? ', result.ready())
     print('Task result: ', result.result)
     # sleep 10 seconds to ensure the task has been finished
     time.sleep(10)
     # now the task should be finished and ready method will return True
     print('Task finished? ', result.ready())
     print('Task result: ', result.result)
    

Start celery worker:

  1. Start celery worker with the most verbose flag info:
    celery -A test_celery worker --loglevel=info
    
  2. Output:

celery.png

Run tasks

  1. Start another terminal window and run your tasks:
    python3 -m test_celery.run_tasks
    
  2. Output celery: celery2.png
  3. Output of tasks run: celery3.png

Reference:

  1. Celery
  2. RabbitMQ
  3. RabbitMQ in 5 Minutes
  4. What is a Message Queue?
  5. Main article

More from this blog

M

Maxat Akbanov's blog

259 posts

Postgraduate in Cyber Security with experience in Software Engineering and DevOps Operations.