Overview
OathNet uses cursor-based pagination for efficient retrieval of large datasets. This approach provides:
- Consistent results - No duplicates or missed records when data changes
- Better performance - Faster than offset-based pagination for large datasets
- Stable ordering - Results maintain consistent order across pages
How It Works
- Make your initial request
- Check for
nextCursorMark (or next_cursor) in the response
- Pass the cursor as the
cursor parameter in your next request
- Repeat until the cursor is
null
Basic Example
import requests
def fetch_all_results(query, api_key):
"""Fetch all paginated results for a query."""
all_results = []
cursor = None
while True:
params = {"q": query}
if cursor:
params["cursor"] = cursor
response = requests.get(
"https://oathnet.org/api/service/search-breach",
params=params,
headers={"x-api-key": api_key}
).json()
if not response["success"]:
raise Exception(response["message"])
data = response["data"]
all_results.extend(data["results"])
print(f"Fetched {len(data['results'])} results "
f"({len(all_results)}/{data['results_found']} total)")
# Check for more pages
cursor = data.get("nextCursorMark") or data.get("next_cursor")
if not cursor:
break
return all_results
# Usage
results = fetch_all_results("[email protected]", API_KEY)
print(f"Total results: {len(results)}")
Response Structure
Breach/Stealer Search (Legacy)
{
"data": {
"results": [...],
"results_found": 1500,
"results_shown": 100,
"nextCursorMark": "AoJw3ZD...",
"next_cursor_mark": "AoJw3ZD..."
}
}
| Field | Description |
|---|
results_found | Total matching records |
results_shown | Records in this response |
nextCursorMark | Cursor for next page (null if last page) |
next_cursor_mark | Alias for nextCursorMark |
V2 Endpoints (Stealer, Victims)
{
"data": {
"items": [...],
"meta": {
"count": 25,
"total": 1500,
"has_more": true,
"total_pages": 60
},
"next_cursor": "eyJsYXN0X2lkIjoiZG9jXzAwMSJ9"
}
}
| Field | Description |
|---|
meta.count | Records in this response |
meta.total | Total matching records |
meta.has_more | Whether more pages exist |
meta.total_pages | Estimated total pages |
next_cursor | Cursor for next page |
Controlling Page Size
Use page_size to control results per page:
# Get 50 results per page (default is 25)
response = requests.get(
"https://oathnet.org/api/service/v2/stealer/search",
params={
"q": "[email protected]",
"page_size": 50
},
headers={"x-api-key": API_KEY}
)
| Endpoint Type | Default | Maximum |
|---|
| Breach Search | 100 | 1000 |
| V2 Stealer | 25 | 100 |
| V2 Victims | 25 | 100 |
Cursors work with all filter parameters:
cursor = None
while True:
response = requests.get(
"https://oathnet.org/api/service/v2/stealer/search",
params={
"domain[]": "google.com",
"from": "2024-01-01",
"to": "2024-06-30",
"page_size": 50,
"cursor": cursor
},
headers={"x-api-key": API_KEY}
).json()
# Process results...
cursor = response["data"].get("next_cursor")
if not cursor:
break
Don’t change filters between pages. The cursor is specific to the original query. Changing filters will cause unexpected results.
Best Practices
Process Results Incrementally
For large datasets, process results as you fetch them instead of loading everything into memory:def process_results_stream(query, api_key, processor):
"""Process results incrementally."""
cursor = None
total_processed = 0
while True:
params = {"q": query}
if cursor:
params["cursor"] = cursor
response = requests.get(
"https://oathnet.org/api/service/search-breach",
params=params,
headers={"x-api-key": api_key}
).json()
for result in response["data"]["results"]:
processor(result) # Process each result
total_processed += 1
cursor = response["data"].get("nextCursorMark")
if not cursor:
break
return total_processed
Handle Rate Limits Between Pages
Add delays between pagination requests to avoid rate limits:import time
cursor = None
while True:
response = fetch_page(query, cursor)
process_results(response["data"]["results"])
cursor = response["data"].get("nextCursorMark")
if not cursor:
break
# Small delay between pages
time.sleep(0.1)
Save Cursors for Resumption
For long-running jobs, save cursors to resume if interrupted:import json
STATE_FILE = "pagination_state.json"
def save_state(cursor, processed_count):
with open(STATE_FILE, "w") as f:
json.dump({
"cursor": cursor,
"processed": processed_count
}, f)
def load_state():
try:
with open(STATE_FILE) as f:
return json.load(f)
except FileNotFoundError:
return {"cursor": None, "processed": 0}
# Resume from saved state
state = load_state()
cursor = state["cursor"]
processed = state["processed"]
Set a maximum number of results to fetch:def fetch_limited_results(query, api_key, max_results=1000):
"""Fetch up to max_results records."""
all_results = []
cursor = None
while len(all_results) < max_results:
response = fetch_page(query, cursor)
results = response["data"]["results"]
remaining = max_results - len(all_results)
all_results.extend(results[:remaining])
cursor = response["data"].get("nextCursorMark")
if not cursor:
break
return all_results
Common Issues
Cursor Expired
Cursors may expire after a period of inactivity:
{
"success": false,
"message": "Cursor expired or invalid",
"errors": {
"cursor": "The provided cursor is no longer valid"
}
}
Solution: Start pagination from the beginning.
Results Changed
If data is updated between pagination requests, you may see slightly different totals. This is normal and doesn’t affect result consistency.
Next Steps