Waterfall enrich HubSpot contacts across Apollo, Clearbit, and PDL using an agent skill

medium complexityCost: Usage-based

Prerequisites

Prerequisites
  • Claude Code, Cursor, or another AI coding agent that supports skills
  • HubSpot private app token stored as HUBSPOT_TOKEN (scopes: crm.objects.contacts.read, crm.objects.contacts.write)
  • Apollo API key stored as APOLLO_API_KEY
  • Clearbit API key stored as CLEARBIT_API_KEY
  • People Data Labs API key stored as PDL_API_KEY

Overview

Waterfall enrichment is one of the most powerful patterns in RevOps, but it involves complex branching logic. An agent skill wraps the entire waterfall into a single command — run /waterfall-enrich and it cascades through Apollo, Clearbit, and PDL automatically, writing results to HubSpot with source attribution.

Step 1: Create the skill directory

mkdir -p .claude/skills/waterfall-enrich/scripts

Step 2: Write the SKILL.md file

Create .claude/skills/waterfall-enrich/SKILL.md:

---
name: waterfall-enrich
description: Waterfall enriches HubSpot contacts missing key fields. Tries Apollo first, then Clearbit for gaps, then People Data Labs as a final fallback. Writes enriched data and source attribution back to HubSpot.
disable-model-invocation: true
allowed-tools: Bash(python *)
---
 
Run waterfall enrichment on HubSpot contacts missing job title:
 
1. Run: `python $SKILL_DIR/scripts/waterfall.py`
2. Review the per-contact enrichment results
3. Check source attribution to see which providers filled which contacts

Step 3: Write the waterfall script

Create .claude/skills/waterfall-enrich/scripts/waterfall.py:

#!/usr/bin/env python3
"""
Waterfall Enrichment: HubSpot → Apollo → Clearbit → PDL → HubSpot
Tries each provider in sequence. Only calls the next provider for missing fields.
"""
import os
import sys
import time
 
try:
    import requests
except ImportError:
    os.system("pip install requests -q")
    import requests
 
# --- Config ---
HUBSPOT_TOKEN = os.environ.get("HUBSPOT_TOKEN")
APOLLO_API_KEY = os.environ.get("APOLLO_API_KEY")
CLEARBIT_API_KEY = os.environ.get("CLEARBIT_API_KEY")
PDL_API_KEY = os.environ.get("PDL_API_KEY")
 
required = {"HUBSPOT_TOKEN": HUBSPOT_TOKEN, "APOLLO_API_KEY": APOLLO_API_KEY}
missing = [k for k, v in required.items() if not v]
if missing:
    print(f"ERROR: Missing required env vars: {', '.join(missing)}")
    sys.exit(1)
 
HS_HEADERS = {"Authorization": f"Bearer {HUBSPOT_TOKEN}", "Content-Type": "application/json"}
REQUIRED_FIELDS = ["jobtitle", "company", "phone", "linkedin_url", "industry"]
 
# --- Provider functions ---
def enrich_apollo(email):
    resp = requests.post(
        "https://api.apollo.io/api/v1/people/match",
        headers={"x-api-key": APOLLO_API_KEY, "Content-Type": "application/json"},
        json={"email": email}
    )
    resp.raise_for_status()
    p = resp.json().get("person")
    if not p:
        return {}
    return {
        "jobtitle": p.get("title"),
        "company": p.get("organization", {}).get("name"),
        "phone": (p.get("phone_numbers") or [{}])[0].get("sanitized_number"),
        "linkedin_url": p.get("linkedin_url"),
        "industry": p.get("organization", {}).get("industry"),
    }
 
def enrich_clearbit(email):
    if not CLEARBIT_API_KEY:
        return {}
    resp = requests.get(
        f"https://person.clearbit.com/v2/people/find?email={email}",
        headers={"Authorization": f"Bearer {CLEARBIT_API_KEY}"}
    )
    if resp.status_code == 404:
        return {}
    resp.raise_for_status()
    d = resp.json()
    handle = d.get("linkedin", {}).get("handle")
    return {
        "jobtitle": d.get("employment", {}).get("title"),
        "company": d.get("employment", {}).get("name"),
        "linkedin_url": f"https://linkedin.com/in/{handle}" if handle else None,
    }
 
def enrich_pdl(email):
    if not PDL_API_KEY:
        return {}
    resp = requests.post(
        "https://api.peopledatalabs.com/v5/person/enrich",
        headers={"x-api-key": PDL_API_KEY, "Content-Type": "application/json"},
        json={"email": email}
    )
    if resp.status_code == 404:
        return {}
    resp.raise_for_status()
    d = resp.json().get("data", resp.json())
    phones = d.get("phone_numbers") or []
    return {
        "jobtitle": d.get("job_title"),
        "company": d.get("job_company_name"),
        "phone": phones[0] if phones else None,
        "linkedin_url": d.get("linkedin_url"),
        "industry": d.get("industry"),
    }
 
PROVIDERS = [("apollo", enrich_apollo), ("clearbit", enrich_clearbit), ("pdl", enrich_pdl)]
 
def waterfall(email):
    merged = {}
    sources = []
    for name, fn in PROVIDERS:
        if all(merged.get(f) for f in REQUIRED_FIELDS):
            break
        try:
            result = fn(email)
            filled = False
            for k, v in result.items():
                if v and not merged.get(k):
                    merged[k] = v
                    filled = True
            if filled:
                sources.append(name)
        except Exception as e:
            print(f"  WARN: {name} failed: {e}")
        time.sleep(0.5)
    merged["enrichment_source"] = "+".join(sources) if sources else "none"
    return merged
 
# --- Main ---
print("Searching for unenriched contacts...")
contacts = []
after = 0
while True:
    resp = requests.post(
        "https://api.hubapi.com/crm/v3/objects/contacts/search",
        headers=HS_HEADERS,
        json={
            "filterGroups": [{"filters": [{
                "propertyName": "jobtitle",
                "operator": "NOT_HAS_PROPERTY"
            }]}],
            "properties": ["email", "jobtitle", "company"],
            "limit": 100,
            "after": after
        }
    )
    resp.raise_for_status()
    data = resp.json()
    contacts.extend(data["results"])
    if data.get("paging", {}).get("next"):
        after = data["paging"]["next"]["after"]
    else:
        break
 
print(f"Found {len(contacts)} contacts to enrich\n")
 
enriched = 0
for contact in contacts:
    email = contact["properties"].get("email")
    if not email:
        continue
 
    domain = email.split("@")[-1].lower()
    if domain in ("gmail.com", "yahoo.com", "hotmail.com", "outlook.com"):
        continue
 
    print(f"  {email}...")
    fields = waterfall(email)
 
    properties = {k: v for k, v in fields.items() if v}
    if properties:
        requests.patch(
            f"https://api.hubapi.com/crm/v3/objects/contacts/{contact['id']}",
            headers=HS_HEADERS,
            json={"properties": properties}
        ).raise_for_status()
        enriched += 1
        print(f"    -> {fields['enrichment_source']} | {[k for k in properties if k != 'enrichment_source']}")
 
print(f"\nDone. Enriched {enriched}/{len(contacts)} contacts.")

Step 4: Run the skill

# Via Claude Code
/waterfall-enrich
 
# Or directly
python .claude/skills/waterfall-enrich/scripts/waterfall.py

Step 5: Schedule it (optional)

# .github/workflows/waterfall-enrich.yml
name: Waterfall Enrichment
on:
  schedule:
    - cron: '0 */6 * * *'  # Every 6 hours
  workflow_dispatch: {}
jobs:
  enrich:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.12'
      - run: pip install requests
      - run: python .claude/skills/waterfall-enrich/scripts/waterfall.py
        env:
          HUBSPOT_TOKEN: ${{ secrets.HUBSPOT_TOKEN }}
          APOLLO_API_KEY: ${{ secrets.APOLLO_API_KEY }}
          CLEARBIT_API_KEY: ${{ secrets.CLEARBIT_API_KEY }}
          PDL_API_KEY: ${{ secrets.PDL_API_KEY }}

Cost

  • Apollo: 1 credit/enrichment — called for every contact. ($49/mo Basic = 900 credits)
  • Clearbit: Volume-based starting at $99/mo — called for ~30% of contacts.
  • People Data Labs: $0.03-0.10/enrichment — called for ~10-15% of contacts.
  • The waterfall saves money: For 100 contacts, you might use 100 Apollo + 30 Clearbit + 10 PDL credits instead of 100 credits at each provider.
Optional providers

The script handles missing Clearbit or PDL keys gracefully — it skips those providers. You can start with Apollo-only and add providers later as needed.

When to use this approach

  • You want to test the waterfall pattern before committing to a platform
  • You want full control over provider order and field-merging logic
  • You need to run enrichment ad-hoc ("enrich the contacts we imported today")
  • You want enrichment logic version-controlled alongside your code

When to move to a dedicated tool

  • You need real-time enrichment on contact creation (not batch)
  • Multiple team members need to modify provider settings without touching code
  • You want visual monitoring of which providers are being called and their success rates

Need help implementing this?

We build and optimize automation systems for mid-market businesses. Let's discuss the right approach for your team.