How-To Guides
Use Pharox with Task Queues

Use Pharox with Task Queues

Use Pharox with Task Queues

This guide shows how to run Pharox-backed workers under Celery, RQ, or Airflow while sharing state through the PostgreSQL adapter.

1) Build a shared manager

Create the storage and manager once per worker process so all tasks reuse the same connection pool.

from sqlalchemy import create_engine
from pharox import ProxyManager
from pharox.storage.postgres import PostgresStorage

engine = create_engine("postgresql+psycopg://user:pass@localhost:5432/pharox")
storage = PostgresStorage(engine)
manager = ProxyManager(storage=storage)

Register lifecycle callbacks here (e.g., Prometheus/structured logging) so every task emits the same telemetry.

2) Celery example

Define the manager at module import time and use with_retrying_lease in tasks.

from celery import Celery
from pharox import SelectorStrategy
from worker_core import process_job  # your app logic

app = Celery("scraper", broker="redis://localhost:6379/0")


@app.task(bind=True, max_retries=5)
def run_scrape(self, pool: str, payload: dict):
    with manager.with_retrying_lease(
        pool_name=pool,
        consumer_name="celery-worker",
        selector=SelectorStrategy.ROUND_ROBIN,
        max_attempts=3,
        backoff_seconds=0.5,
    ) as lease:
        if lease is None:
            raise self.retry(exc=RuntimeError("no proxy available"))
        proxy = storage.get_proxy_by_id(lease.proxy_id)
        process_job(proxy.url, payload)

Add a Celery beat task to run maintenance:

@app.task
def sweep_pools():
    cleaned = manager.cleanup_expired_leases()
    return {"cleaned": cleaned}

3) RQ or Airflow pattern

The same manager works inside RQ jobs or Airflow tasks—initialise it in the worker process and re-use it across tasks.

# rq job
def rq_job(pool: str, payload: dict):
    with manager.with_lease(pool_name=pool, consumer_name="rq") as lease:
        if not lease:
            raise RuntimeError("no proxy available")
        proxy = storage.get_proxy_by_id(lease.proxy_id)
        process_job(proxy.url, payload)

For Airflow, place the manager in a module imported by tasks and call it inside your PythonOperator or TaskFlow function.

4) Health sweeps on a schedule

Run periodic health checks from the same scheduler to keep pool state fresh:

from pharox import HealthCheckOrchestrator

orchestrator = HealthCheckOrchestrator(storage=storage)

async def sweep_health(pool: str):
    proxies = storage.find_proxies(pool)  # implement on your adapter
    async for result in orchestrator.stream_health_checks(proxies):
        ...

Wire this to Celery beat, RQ scheduler, or an Airflow DAG to enforce SLAs and quarantine unhealthy proxies.

Tips

  • Use dedicated consumers per queue (consumer_name) to segment metrics.
  • Keep the manager singleton per worker; do not create one per task.
  • Handle misses by retrying upstream (Celery retry, RQ requeue) instead of sleeping inside tasks.