How-To Guides
Embed Pharox in a Worker

Embed Pharox in a Worker

Embed Pharox in a Worker

This guide shows how to integrate Pharox into a long-running worker that leases proxies, performs work, and emits observability events.

Scenario

You operate a scraping worker scheduled by Celery or a cron job. Each job needs an exclusive proxy, must release it after use, and should record metrics about lease success/failure.

1. Set Up the Manager and Callbacks

Define callbacks once at process startup to centralise logging/metrics.

import logging
from pharox import (
    AcquireEventPayload,
    InMemoryStorage,
    ProxyManager,
    ReleaseEventPayload,
    SelectorStrategy,
)

logger = logging.getLogger("pharox.worker")

storage = InMemoryStorage()
manager = ProxyManager(storage=storage)

def on_acquire(event: AcquireEventPayload):
    outcome = "success" if event.lease else "failure"
    logger.info(
        "proxy.acquire",
        extra={
            "pool": event.pool_name,
            "consumer": event.consumer_name,
            "outcome": outcome,
            "duration_ms": event.duration_ms,
            "selector": event.selector.value,
            "filters": event.filters.model_dump() if event.filters else None,
            "available": event.pool_stats.available_proxies
            if event.pool_stats
            else None,
        },
    )

def on_release(event: ReleaseEventPayload):
    logger.info(
        "proxy.release",
        extra={
            "proxy_id": event.lease.proxy_id,
            "lease_duration_ms": event.lease_duration_ms,
            "available": event.pool_stats.available_proxies
            if event.pool_stats
            else None,
        },
    )

manager.register_acquire_callback(on_acquire)
manager.register_release_callback(on_release)

Tip

Swap logging for your telemetry stack (OpenTelemetry, StatsD, Prometheus) by pushing the same metadata to counters/histograms.

2. Write a Worker Function

Wrap the work in the with_lease context manager. Lease failures return None, allowing you to requeue or back off gracefully.

def process_account(account_id: str) -> None:
    with manager.with_lease(
        pool_name="residential",
        consumer_name="worker",
        selector=SelectorStrategy.ROUND_ROBIN,
    ) as lease:
        if not lease:
            logger.warning("No proxy available; retrying account %s", account_id)
            raise RuntimeError("proxy unavailable")

        proxy = storage.get_proxy_by_id(lease.proxy_id)
        run_job(account_id, proxy.url)

Use your orchestration tool (Celery, RQ, APScheduler) to call process_account with retry policies that fit your workload.

Need bounded backoff before giving up? Swap the context manager for manager.with_retrying_lease(...) or use manager.acquire_proxy_with_retry(...) to wait for capacity with exponential delays.

3. Handle Errors Safely

Because the context manager releases the lease in a finally block, any raised exceptions do not leak the proxy. If you need custom recovery logic, use a try/except inside the context:

with manager.with_lease(pool_name="residential") as lease:
    if not lease:
        return
    try:
        run_job(...)
    except ProviderTimeout as exc:
        logger.exception("Work failed, marking proxy as suspect", exc_info=exc)
        # Optionally adjust proxy state in storage here.

4. Periodic Cleanup and Health Checks

Schedule a background job that releases expired leases and runs health sweeps:

import asyncio
from pharox import HealthCheckOrchestrator

checker = HealthCheckOrchestrator(storage=storage)

async def sweep_proxies():
    proxies = storage.list_proxies(status="active")
    async for result in checker.stream_health_checks(proxies):
        logger.info(
            "proxy.health",
            extra={
                "proxy_id": result.proxy_id,
                "status": result.status.value,
                "latency_ms": result.latency_ms,
            },
        )

def nightly_maintenance():
    released = manager.cleanup_expired_leases()
    logger.info("Expired leases cleaned", extra={"released": released})
    asyncio.run(sweep_proxies())

Tie nightly_maintenance to a cron trigger or a lightweight scheduler.

5. Promote to External Storage

When you are ready for persistence, implement the IStorage interface or follow the PostgreSQL adapter walkthrough. The worker code above continues to work unchanged once the storage backend is swapped.