Documentation Index Fetch the complete documentation index at: https://docs.oathnet.org/llms.txt
Use this file to discover all available pages before exploring further.
API Integration Best Practices
Follow these guidelines to build robust, efficient integrations with the OathNet API.
1. Authentication
Never hardcode API keys. Use environment variables or secret managers: # Good: Environment variable
import os
API_KEY = os.environ.get( "OATHNET_API_KEY" )
# Good: AWS Secrets Manager
import boto3
client = boto3.client( 'secretsmanager' )
API_KEY = client.get_secret_value( SecretId = 'oathnet-api-key' )
# Bad: Hardcoded
API_KEY = "oathnet_live_abc123" # Never do this!
Rotate API keys periodically: # Track key age
KEY_CREATED = datetime( 2024 , 1 , 1 )
KEY_MAX_AGE = timedelta( days = 90 )
if datetime.now() - KEY_CREATED > KEY_MAX_AGE :
alert_key_rotation_needed()
Use Separate Keys per Environment
Create distinct keys for dev, staging, and production: Environment Key Name Development dev-local-johnCI/CD staging-github-actionsProduction prod-api-server-1
2. Request Optimization
Always initialize a session for related queries: # Initialize session once
session = init_session( "user@example.com" )
session_id = session[ "data" ][ "session" ][ "id" ]
# Reuse for all related queries
breach = search_breach(query, session_id)
stealer = search_stealer(query, session_id)
discord = lookup_discord(discord_id, session_id)
Request Only What You Need
Use field selection to reduce response size: # Only get specific fields
response = requests.get(
"https://oathnet.org/api/service/v2/stealer/search" ,
params = {
"q" : "user@example.com" ,
"fields[]" : [ "email" , "password" , "domain" ]
},
headers = { "x-api-key" : API_KEY }
)
Cache responses to avoid redundant requests: import hashlib
from functools import lru_cache
@lru_cache ( maxsize = 10000 )
def cached_search ( query_hash : str ):
# Actual API call
return api_call(query)
def search ( query : str ):
query_hash = hashlib.md5(query.encode()).hexdigest()
return cached_search(query_hash)
3. Error Handling
try :
response = api_call()
except AuthenticationError:
# Invalid API key
refresh_api_key()
except RateLimitError as e:
# Too many requests
time.sleep(e.retry_after)
retry()
except NotFoundError:
# Resource doesn't exist
return None
except ServerError:
# Server issue - retry later
queue_for_retry()
from tenacity import retry, stop_after_attempt, wait_exponential
@retry (
stop = stop_after_attempt( 3 ),
wait = wait_exponential( multiplier = 1 , min = 1 , max = 60 )
)
def reliable_api_call ( query ):
return api_call(query)
import logging
logger = logging.getLogger( "oathnet" )
def logged_call ( query ):
try :
result = api_call(query)
logger.debug( f "Success: { query } " )
return result
except OathNetError as e:
logger.error( f "API Error: { e.message } " , extra = {
"query" : query,
"status" : e.status_code,
"errors" : e.errors
})
raise
4. Rate Limit Management
class QuotaMonitor :
def __init__ ( self , warning_threshold = 0.2 ):
self .warning_threshold = warning_threshold
def check ( self , response ):
lookups = response.get( "_meta" , {}).get( "lookups" , {})
lookups_left = lookups.get( "left_today" )
if lookups_left is not None and lookups_left < self .daily_limit * self .warning_threshold:
self .send_alert( f "Low quota: { lookups_left } remaining" )
monitor = QuotaMonitor()
response = api_call()
monitor.check(response)
Implement Request Throttling
import time
from threading import Lock
class Throttler :
def __init__ ( self , requests_per_second ):
self .min_interval = 1.0 / requests_per_second
self .last_request = 0
self .lock = Lock()
def throttle ( self ):
with self .lock:
now = time.time()
elapsed = now - self .last_request
if elapsed < self .min_interval:
time.sleep( self .min_interval - elapsed)
self .last_request = time.time()
throttler = Throttler( requests_per_second = 5 )
for query in queries:
throttler.throttle()
api_call(query)
5. Data Handling
Stream results instead of loading all into memory: def stream_results ( query ):
cursor = None
while True :
response = api_call(query, cursor)
for result in response[ "data" ][ "results" ]:
yield result # Process one at a time
cursor = response[ "data" ].get( "nextCursorMark" )
if not cursor:
break
# Process results as they come
for result in stream_results( "@company.com" ):
process(result)
from pydantic import BaseModel, ValidationError
class BreachResult ( BaseModel ):
email: str
password: str | None
source: str | None
def parse_results ( response ):
results = []
for item in response[ "data" ][ "results" ]:
try :
results.append(BreachResult( ** item))
except ValidationError as e:
logger.warning( f "Invalid result: { e } " )
return results
# Use .get() with defaults
email = result.get( "email" , "unknown" )
password = result.get( "password" ) # None if missing
# Or use dict unpacking with defaults
defaults = { "source" : "unknown" , "date" : None }
full_result = { ** defaults, ** result}
6. Security
def redact_sensitive ( result ):
"""Redact sensitive fields before logging."""
redacted = result.copy()
if "password" in redacted:
redacted[ "password" ] = "***REDACTED***"
return redacted
logger.info( f "Result: { redact_sensitive(result) } " )
# Always use HTTPS
BASE_URL = "https://oathnet.org/api"
# Verify SSL certificates
response = requests.get(url, verify = True )
7. Monitoring
import time
from datadog import statsd
def tracked_api_call ( query ):
start = time.time()
try :
result = api_call(query)
statsd.increment( 'oathnet.api.success' )
return result
except OathNetError as e:
statsd.increment( 'oathnet.api.error' , tags = [ f 'code: { e.status_code } ' ])
raise
finally :
duration = time.time() - start
statsd.histogram( 'oathnet.api.duration' , duration)
# Alert on low quota
lookups = response.get( "_meta" , {}).get( "lookups" , {})
if lookups.get( "left_today" , 0 ) < 100 :
send_slack_alert( f "Low OathNet quota: { lookups[ 'left_today' ] } " )
# Alert on high error rate
if error_rate > 0.1 :
send_pagerduty_alert( "OathNet API error rate elevated" )
def api_call_with_trace ( query ):
request_id = str (uuid.uuid4())
logger.info( f "API Request: { request_id } " , extra = { "query" : query})
try :
result = api_call(query)
logger.info( f "API Response: { request_id } " , extra = {
"success" : True ,
"results_found" : result[ "data" ].get( "meta" , {}).get( "total" )
})
return result
except Exception as e:
logger.error( f "API Error: { request_id } " , extra = { "error" : str (e)})
raise
Quick Reference
Do Don’t Use environment variables for API keys Hardcode API keys in source code Initialize search sessions Make isolated queries Implement exponential backoff Retry immediately on failure Cache responses Make redundant requests Handle all error types Ignore error responses Monitor quota usage Ignore _meta.lookups.left_today on envelope responses Validate input before requests Send unsanitized user input
Next Steps
API Reference Explore all available endpoints
SDKs Use official client libraries