Embed Pharox in a Worker
Embed Pharox in a Worker¶
This guide shows how to integrate Pharox into a long-running worker that leases proxies, performs work, and emits observability events.
Scenario
You operate a scraping worker scheduled by Celery or a cron job. Each job needs an exclusive proxy, must release it after use, and should record metrics about lease success/failure.
1. Set Up the Manager and Callbacks¶
Define callbacks once at process startup to centralise logging/metrics.
import logging
from pharox import (
AcquireEventPayload,
InMemoryStorage,
ProxyManager,
ReleaseEventPayload,
SelectorStrategy,
)
logger = logging.getLogger("pharox.worker")
storage = InMemoryStorage()
manager = ProxyManager(storage=storage)
def on_acquire(event: AcquireEventPayload):
outcome = "success" if event.lease else "failure"
logger.info(
"proxy.acquire",
extra={
"pool": event.pool_name,
"consumer": event.consumer_name,
"outcome": outcome,
"duration_ms": event.duration_ms,
"selector": event.selector.value,
"filters": event.filters.model_dump() if event.filters else None,
"available": event.pool_stats.available_proxies
if event.pool_stats
else None,
},
)
def on_release(event: ReleaseEventPayload):
logger.info(
"proxy.release",
extra={
"proxy_id": event.lease.proxy_id,
"lease_duration_ms": event.lease_duration_ms,
"available": event.pool_stats.available_proxies
if event.pool_stats
else None,
},
)
manager.register_acquire_callback(on_acquire)
manager.register_release_callback(on_release)
Tip
Swap logging for your telemetry stack (OpenTelemetry, StatsD, Prometheus)
by pushing the same metadata to counters/histograms.
2. Write a Worker Function¶
Wrap the work in the with_lease context manager. Lease failures return None,
allowing you to requeue or back off gracefully.
def process_account(account_id: str) -> None:
with manager.with_lease(
pool_name="residential",
consumer_name="worker",
selector=SelectorStrategy.ROUND_ROBIN,
) as lease:
if not lease:
logger.warning("No proxy available; retrying account %s", account_id)
raise RuntimeError("proxy unavailable")
proxy = storage.get_proxy_by_id(lease.proxy_id)
run_job(account_id, proxy.url)
Use your orchestration tool (Celery, RQ, APScheduler) to call process_account
with retry policies that fit your workload.
Need bounded backoff before giving up? Swap the context manager for
manager.with_retrying_lease(...) or use manager.acquire_proxy_with_retry(...)
to wait for capacity with exponential delays.
3. Handle Errors Safely¶
Because the context manager releases the lease in a finally block, any raised
exceptions do not leak the proxy. If you need custom recovery logic, use a
try/except inside the context:
with manager.with_lease(pool_name="residential") as lease:
if not lease:
return
try:
run_job(...)
except ProviderTimeout as exc:
logger.exception("Work failed, marking proxy as suspect", exc_info=exc)
# Optionally adjust proxy state in storage here.
4. Periodic Cleanup and Health Checks¶
Schedule a background job that releases expired leases and runs health sweeps:
import asyncio
from pharox import HealthCheckOrchestrator
checker = HealthCheckOrchestrator(storage=storage)
async def sweep_proxies():
proxies = storage.list_proxies(status="active")
async for result in checker.stream_health_checks(proxies):
logger.info(
"proxy.health",
extra={
"proxy_id": result.proxy_id,
"status": result.status.value,
"latency_ms": result.latency_ms,
},
)
def nightly_maintenance():
released = manager.cleanup_expired_leases()
logger.info("Expired leases cleaned", extra={"released": released})
asyncio.run(sweep_proxies())
Tie nightly_maintenance to a cron trigger or a lightweight scheduler.
5. Promote to External Storage¶
When you are ready for persistence, implement the IStorage interface or follow
the PostgreSQL adapter walkthrough. The worker code above
continues to work unchanged once the storage backend is swapped.