For large datasets, process results as you fetch them instead of loading everything into memory:
def process_results_stream(query, api_key, processor): """Process results incrementally.""" cursor = None total_processed = 0 while True: params = {"q": query} if cursor: params["cursor"] = cursor response = requests.get( "https://oathnet.org/api/service/v2/breach/search", params=params, headers={"x-api-key": api_key} ).json() for result in response["data"]["items"]: processor(result) # Process each result total_processed += 1 cursor = response["data"].get("next_cursor") or response["data"].get("nextCursorMark") if not cursor: break return total_processed
Handle Rate Limits Between Pages
Add delays between pagination requests to avoid rate limits:
import timecursor = Nonewhile True: response = fetch_page(query, cursor) process_results(response["data"]["results"]) cursor = response["data"].get("nextCursorMark") if not cursor: break # Small delay between pages time.sleep(0.1)
Save Cursors for Resumption
For long-running jobs, save cursors to resume if interrupted:
import jsonSTATE_FILE = "pagination_state.json"def save_state(cursor, processed_count): with open(STATE_FILE, "w") as f: json.dump({ "cursor": cursor, "processed": processed_count }, f)def load_state(): try: with open(STATE_FILE) as f: return json.load(f) except FileNotFoundError: return {"cursor": None, "processed": 0}# Resume from saved statestate = load_state()cursor = state["cursor"]processed = state["processed"]
Limit Total Results
Set a maximum number of results to fetch:
def fetch_limited_results(query, api_key, max_results=1000): """Fetch up to max_results records.""" all_results = [] cursor = None while len(all_results) < max_results: response = fetch_page(query, cursor) results = response["data"]["results"] remaining = max_results - len(all_results) all_results.extend(results[:remaining]) cursor = response["data"].get("nextCursorMark") if not cursor: break return all_results