π§ Architecture (what youβre building)¶
ββββββββββββ
β Web UI β
ββββββ¬ββββββ
β
ββββββΌββββββ βββββββββββββ
β Schedulerβ ββββββΆ β Redis β (broker)
ββββββ¬ββββββ ββββββ¬βββββββ
β β
ββββββΌββββββ ββββββΌβββββββ
β Celery ββββββββ β Worker β (n workers)
β Executor β βββββββββββββ
ββββββ¬ββββββ
β
ββββββΌββββββ
β Postgres β (metadata DB)
ββββββββββββ
π Directory structure¶
airflow-celery/
βββ dags/
βββ logs/
βββ plugins/
βββ docker-compose.yml
βββ .env
π .env file¶
π³ docker-compose.yml¶
version: "3.8"
x-airflow-common: &airflow-common
image: apache/airflow:2.9.3
environment:
&airflow-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID}:${AIRFLOW_GID}"
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
restart: always
redis:
image: redis:7
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
restart: always
airflow-scheduler:
<<: *airflow-common
command: scheduler
restart: always
airflow-worker:
<<: *airflow-common
command: celery worker
restart: always
airflow-init:
<<: *airflow-common
command: >
bash -c "
airflow db migrate &&
airflow users create
--username admin
--password admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
"
restart: "no"
volumes:
postgres-db-volume:
π How to run (step-by-step)¶
1οΈβ£ Set permissions (important on Linux)¶
2οΈβ£ Initialize Airflow¶
You should see DB migration + admin user creation.
3οΈβ£ Start all services¶
4οΈβ£ Access Airflow UI¶
Login
π§ͺ Verify Celery + Redis is working¶
Check running containers¶
You should see:
- airflow-webserver
- airflow-scheduler
- airflow-worker
- redis
- postgres
Check worker logs¶
You should see:
Check Redis manually¶
Expected:
π§© Sample DAG to test Celery execution¶
Create dags/test_celery.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def hello():
print("Hello from Celery worker!")
with DAG(
dag_id="celery_test",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
):
PythonOperator(
task_id="hello_task",
python_callable=hello
)
Trigger the DAG from UI β watch logs β worker executes task β
βοΈ Scale Celery workers (real power)¶
Now you have 3 parallel workers processing tasks.
π₯ Why this setup is production-grade¶
β CeleryExecutor (distributed) β Redis as broker β Postgres as metadata + result backend β Horizontally scalable workers β Clean separation of concerns
π§ Next things to learn (recommended)¶
- Flower (Celery monitoring)
- Airflow Pools & Queues
- Task retries & SLA
- Remote logging (S3 / GCS)
- KubernetesExecutor vs CeleryExecutor