Build a PostgreSQL Adapter
Build a PostgreSQL Adapter¶
Pharox ships with an in-memory adapter for tests and demos. This guide shows how to create a PostgreSQL-backed implementation using SQLAlchemy. The same approach applies to other ORMs or async drivers—swap libraries as needed.
Scope
This is a reference implementation you can adapt to your organisation's
standards. It focuses on the required IStorage methods and leaves schema
migrations to tools like Alembic. A maintained template (code + migrations +
Docker Compose) lives under examples/postgres/ in the repository so you
can clone it directly.
Install optional dependencies
The toolkit exposes a postgres extra that bundles SQLAlchemy, psycopg, and
Alembic. Install it via pip install 'pharox[postgres]' (or
poetry install --extras postgres) before running the examples below.
1. Define the Schema¶
Create tables for pools, proxies, consumers, and leases. Below is a simplified schema that captures the fields Pharox expects.
CREATE TABLE proxy_pool (
id UUID PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE consumer (
id UUID PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE proxy (
id UUID PRIMARY KEY,
pool_id UUID NOT NULL REFERENCES proxy_pool(id),
host TEXT NOT NULL,
port INTEGER NOT NULL,
protocol TEXT NOT NULL,
status TEXT NOT NULL,
max_concurrency INTEGER NOT NULL DEFAULT 1,
current_leases INTEGER NOT NULL DEFAULT 0,
asn INTEGER,
country TEXT,
source TEXT,
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE lease (
id UUID PRIMARY KEY,
proxy_id UUID NOT NULL REFERENCES proxy(id),
consumer_id UUID NOT NULL REFERENCES consumer(id),
status TEXT NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
released_at TIMESTAMPTZ
);
-- Tracks round-robin cursors per pool/strategy.
CREATE TABLE pool_selector_state (
pool_id UUID NOT NULL REFERENCES proxy_pool(id) ON DELETE CASCADE,
strategy VARCHAR(32) NOT NULL,
last_proxy_id UUID,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (pool_id, strategy)
);
Add indexes that match your filter workloads (e.g., proxy(pool_id, status),
geospatial indexes for latitude/longitude).
2. Implement the Adapter¶
Use SQLAlchemy's ORM or Core—below we use Core for clarity.
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from typing import Optional
from uuid import uuid4
from sqlalchemy import select, update
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.engine import Connection
from pharox.models import HealthCheckResult, Lease, Proxy, ProxyFilters
from pharox.storage import IStorage
from .tables import consumer_table, lease_table, pool_table, proxy_table
class PostgresStorage(IStorage):
def __init__(self, conn: Connection):
self._conn = conn
def find_available_proxy(
self, pool_name: str, filters: Optional[ProxyFilters] = None
) -> Optional[Proxy]:
query = (
select(proxy_table)
.join(pool_table, proxy_table.c.pool_id == pool_table.c.id)
.where(
pool_table.c.name == pool_name,
proxy_table.c.status == "active",
proxy_table.c.current_leases < proxy_table.c.max_concurrency,
)
.order_by(proxy_table.c.created_at.asc())
.limit(1)
)
if filters:
# Translate simple equality/geo clauses into SQL first.
if filters.country:
query = query.where(proxy_table.c.country == filters.country)
if filters.source:
query = query.where(proxy_table.c.source == filters.source)
if filters.asn is not None:
query = query.where(proxy_table.c.asn == filters.asn)
# Composite clauses/predicates can fall back to `filters.matches()`
# after fetching a candidate.
row = self._conn.execute(query).m.fetchone()
return Proxy.model_validate(row) if row else None
def create_lease(
self, proxy: Proxy, consumer_name: str, duration_seconds: int
) -> Lease:
expires_at = datetime.now(UTC) + timedelta(seconds=duration_seconds)
consumer_id = self.ensure_consumer(consumer_name)
lease_id = uuid4()
self._conn.execute(
lease_table.insert().values(
id=lease_id,
proxy_id=proxy.id,
consumer_id=consumer_id,
status="active",
acquired_at=datetime.now(UTC),
expires_at=expires_at,
)
)
self._conn.execute(
update(proxy_table)
.where(proxy_table.c.id == proxy.id)
.values(current_leases=proxy.current_leases + 1)
)
return Lease(
id=lease_id,
proxy_id=proxy.id,
consumer_id=consumer_id,
status="active",
acquired_at=datetime.now(UTC),
expires_at=expires_at,
)
def ensure_consumer(self, consumer_name: str):
stmt = (
insert(consumer_table)
.values(id=uuid4(), name=consumer_name)
.on_conflict_do_nothing(index_elements=["name"])
.returning(consumer_table.c.id)
)
result = self._conn.execute(stmt)
row = result.fetchone()
if row:
return row.id
query = select(consumer_table.c.id).where(consumer_table.c.name == consumer_name)
return self._conn.execute(query).scalar_one()
def release_lease(self, lease: Lease) -> None:
self._conn.execute(
update(lease_table)
.where(lease_table.c.id == lease.id)
.values(status="released", released_at=datetime.now(UTC))
)
self._conn.execute(
update(proxy_table)
.where(proxy_table.c.id == lease.proxy_id)
.values(current_leases=proxy_table.c.current_leases - 1)
)
def cleanup_expired_leases(self) -> int:
now = datetime.now(UTC)
query = select(lease_table.c.id, lease_table.c.proxy_id).where(
lease_table.c.status == "active",
lease_table.c.expires_at <= now,
)
expired = list(self._conn.execute(query))
for lease_id, proxy_id in expired:
self._conn.execute(
update(lease_table)
.where(lease_table.c.id == lease_id)
.values(status="expired", released_at=now)
)
self._conn.execute(
update(proxy_table)
.where(proxy_table.c.id == proxy_id)
.values(current_leases=proxy_table.c.current_leases - 1)
)
return len(expired)
def apply_health_check_result(
self, result: HealthCheckResult
) -> Optional[Proxy]:
self._conn.execute(
update(proxy_table)
.where(proxy_table.c.id == result.proxy_id)
.values(
status=result.status.value,
checked_at=result.checked_at,
last_latency_ms=result.latency_ms,
)
)
refreshed = self._conn.execute(
select(proxy_table).where(proxy_table.c.id == result.proxy_id)
).m.fetchone()
return Proxy.model_validate(refreshed) if refreshed else None
Selector strategies
The reference adapter extends find_available_proxy with an optional
SelectorStrategy parameter and stores round-robin cursors in the
pool_selector_state table. Copy that behaviour if you need deterministic
load-balancing across workers.
The full implementation should guard against race conditions (e.g., using
FOR UPDATE locks) and handle geospatial filters. Start simple, then iterate
based on scale. Remember that ProxyFilters can include nested all_of /
any_of / none_of clauses plus a Python predicate; SQL adapters typically
push as much as possible into the query builder and then evaluate predicates in
Python before returning the proxy.
Health result contract
Align your apply_health_check_result with the guidance in
Storage › Best Practices
so every adapter exposes consistent status/latency data to the toolkit.
3. Run Contract Tests¶
Before adopting the adapter in production, reuse the storage contract suite bundled with Pharox:
import pytest
from sqlalchemy import create_engine, text
from pharox.storage.postgres import PostgresStorage
from pharox.models import Proxy, ProxyPool
from pharox.tests.adapters import (
StorageContractFixtures,
storage_contract_suite,
)
engine = create_engine("postgresql+psycopg://user:pass@localhost/pharox")
def make_storage() -> PostgresStorage:
with engine.begin() as conn:
conn.execute(text("TRUNCATE lease, proxy, consumer, proxy_pool RESTART IDENTITY"))
return PostgresStorage(engine)
def seed_pool(storage: PostgresStorage, pool: ProxyPool) -> ProxyPool:
with engine.begin() as conn:
conn.execute(
text(
"INSERT INTO proxy_pool (id, name, description) VALUES (:id, :name, :description)"
),
{"id": str(pool.id), "name": pool.name, "description": pool.description},
)
return pool
def seed_proxy(storage: PostgresStorage, proxy: Proxy) -> Proxy:
with engine.begin() as conn:
conn.execute(
text(
"""
INSERT INTO proxy (
id, pool_id, host, port, protocol, status, max_concurrency,
current_leases, country, source, city
)
VALUES (
:id, :pool_id, :host, :port, :protocol, :status,
:max_concurrency, 0, :country, :source, :city
)
"""
),
{
"id": str(proxy.id),
"pool_id": str(proxy.pool_id),
"host": proxy.host,
"port": proxy.port,
"protocol": proxy.protocol.value,
"status": proxy.status.value,
"max_concurrency": proxy.max_concurrency,
"country": proxy.country,
"source": proxy.source,
"city": proxy.city,
},
)
return proxy
@pytest.mark.contract
def test_postgres_storage_contract():
fixtures = StorageContractFixtures(
make_storage=make_storage,
seed_pool=seed_pool,
seed_proxy=seed_proxy,
)
storage_contract_suite(fixtures)
Set PHAROX_TEST_POSTGRES_URL to point at a disposable database (e.g.,
postgresql+psycopg://pharox:pharox@localhost:5439/pharox) and run
poetry run pytest tests/test_storage_contract_postgres.py in this repo to see
the suite in action.
Spin up PostgreSQL fast
Use docker compose up postgres from the /examples/postgres directory
(see below) to boot a development instance with migrations pre-applied.
4. Share as an Example¶
Pharox already ships pharox.storage.postgres.PostgresStorage plus the
examples/postgres/ toolkit (docker-compose, migrations, and shims). Copy that
directory into your service to kick-start a production implementation, and send
improvements upstream (docs, migrations, tests) so the template keeps getting
better. If you need to publish an internal fork, keep the README up to date so
new teams can bootstrap quickly.