Installation
Copy
pip install oathnet
Quick Start
Copy
from oathnet import OathNetClient
# Initialize client with API key
client = OathNetClient(api_key="your-api-key")
# Search breaches
result = client.search.breach("[email protected]")
print(f"Found {result.data.results_found} results")
for record in result.data.results:
print(f"{record.email}: {record.password}")
Authentication
Copy
import os
from oathnet import OathNetClient
# Explicit API key (required)
client = OathNetClient(api_key="your-api-key")
# From environment variable
client = OathNetClient(api_key=os.environ["OATHNET_API_KEY"])
Search Service
Breach Search
Copy
# Basic search
result = client.search.breach("[email protected]")
# With filters
result = client.search.breach(
"[email protected]",
dbnames="linkedin_2012,adobe_2013"
)
# Access results
print(f"Total: {result.data.results_found}")
print(f"Shown: {result.data.results_shown}")
for record in result.data.results:
print(f"Email: {record.email}")
print(f"Password: {record.password}")
print(f"Database: {record.dbname}")
print("---")
# Pagination with cursor
if result.data.cursor:
next_page = client.search.breach(
"[email protected]",
cursor=result.data.cursor
)
Initialize Session
Copy
# Create a search session for quota optimization
result = client.search.init_session("[email protected]")
session_id = result.data.session.id
print(f"Session ID: {session_id}")
print(f"Search Type: {result.data.session.search_type}")
print(f"Expires: {result.data.session.expires_at}")
V2 Stealer Search
Copy
# Basic search
result = client.stealer.search("[email protected]")
# Advanced filtering
result = client.stealer.search(
"[email protected]",
domains=["google.com", "facebook.com"],
has_log_id=True,
page_size=50
)
# Access items
for item in result.data.items:
print(f"URL: {item.url}")
print(f"Username: {item.username}")
print(f"Password: {item.password}")
print(f"Log ID: {item.log_id}")
print("---")
# Pagination
if result.data.next_cursor:
next_page = client.stealer.search(
"[email protected]",
cursor=result.data.next_cursor
)
# Subdomain extraction
result = client.stealer.subdomain("example.com")
print(f"Found {result.data.count} subdomains")
for sub in result.data.subdomains:
print(f" {sub}")
Victims
Copy
# Search victims
result = client.victims.search("[email protected]")
for victim in result.data.items:
print(f"Log ID: {victim.log_id}")
print(f"Users: {victim.device_users}")
print(f"IPs: {victim.device_ips}")
print(f"Documents: {victim.total_docs}")
print("---")
# Get victim manifest (file tree)
manifest = client.victims.get_manifest("log_id_here")
print(f"Total Files: {manifest.data.total_files}")
print(f"Total Size: {manifest.data.total_size} bytes")
for file in manifest.data.files[:10]:
print(f" {file.relative_path} ({file.size} bytes)")
# Get file content
file_content = client.victims.get_file("log_id", "file_id")
print(file_content.data.content)
# Download archive
client.victims.download_archive("log_id", "./victim_archive.zip")
File Search
Search within stealer log files using regex or wildcard patterns.Copy
# Create a file search job
result = client.file_search.create(
expression="password",
log_ids=["log_id_1", "log_id_2"],
search_mode="literal" # literal, regex, or wildcard
)
job_id = result.data.job_id
print(f"Job ID: {job_id}")
# Check job status
status = client.file_search.get_status(job_id)
print(f"Status: {status.data.status}")
if status.data.summary:
print(f"Files scanned: {status.data.summary.files_scanned}")
print(f"Matches: {status.data.summary.matches}")
# Wait for completion and get results
result = client.file_search.search(
expression="password",
log_ids=["log_id_1", "log_id_2"],
search_mode="literal",
timeout=300 # seconds
)
if result.data.matches:
for match in result.data.matches:
print(f"File: {match.file_name}")
print(f"Log ID: {match.log_id}")
print(f"Match: {match.match_text}")
print("---")
Exports
Export large datasets asynchronously.Copy
# Create an export job
result = client.exports.create(
export_type="stealer", # stealer or victims
limit=100, # minimum 100
format="jsonl" # jsonl or csv
)
job_id = result.data.job_id
print(f"Export Job ID: {job_id}")
# Check export status
status = client.exports.get_status(job_id)
print(f"Status: {status.data.status}")
if status.data.progress:
print(f"Progress: {status.data.progress.percent}%")
# Wait for completion and download
result = client.exports.wait_for_completion(job_id, timeout=600)
if result.data.status == "completed":
# Download the export file
client.exports.download(job_id, "./export.jsonl")
OSINT Lookups
Copy
# IP Info
result = client.osint.ip_info("8.8.8.8")
print(f"Location: {result.data.city}, {result.data.country}")
print(f"ISP: {result.data.isp}")
print(f"Proxy: {result.data.proxy}")
# Steam
result = client.osint.steam("76561198012345678")
print(f"Username: {result.data.username}")
print(f"Avatar: {result.data.avatar}")
# Xbox
result = client.osint.xbox("GamerTag123")
print(f"Username: {result.data.username}")
print(f"Avatar: {result.data.avatar}")
# Discord User Info
result = client.osint.discord_userinfo("123456789012345678")
print(f"Username: {result.data.username}")
print(f"Global Name: {result.data.global_name}")
print(f"Created: {result.data.creation_date}")
# Discord Username History
result = client.osint.discord_username_history("123456789012345678")
for entry in result.data.history:
if entry.name and entry.time:
print(f"{entry.name[0]} at {entry.time[0]}")
# Discord to Roblox
result = client.osint.discord_to_roblox("123456789012345678")
if result.data.roblox_id:
print(f"Linked Roblox: {result.data.roblox_id}")
# Roblox User Info
result = client.osint.roblox_userinfo(user_id="123456789")
# Or by username:
result = client.osint.roblox_userinfo(username="PlayerName")
print(f"Username: {result.data.username}")
print(f"Display Name: {result.data.display_name}")
# Holehe - Email account detection
result = client.osint.holehe("[email protected]")
print(f"Found on {len(result.data.domains)} services:")
for domain in result.data.domains:
print(f" {domain}")
# GHunt - Google account lookup
result = client.osint.ghunt("[email protected]")
if result.data.found:
print(f"Name: {result.data.profile.name}")
# Subdomain extraction
result = client.osint.extract_subdomain("example.com", alive_only=True)
for sub in result.data.subdomains:
print(sub)
# Minecraft username history
result = client.osint.minecraft_history("PlayerName")
for entry in result.data.history:
print(f"{entry.username} - {entry.changed_at}")
Utility Service
Copy
# Database name autocomplete
result = client.utility.dbname_autocomplete("link")
for name in result:
print(name) # linkedin_2012, linkedin_2021, etc.
Error Handling
Copy
from oathnet.exceptions import (
OathNetError,
AuthenticationError,
NotFoundError,
ValidationError,
RateLimitError,
QuotaExceededError,
ServiceUnavailableError
)
try:
result = client.search.breach("[email protected]")
except AuthenticationError:
print("Invalid API key")
except QuotaExceededError:
print("Daily quota exceeded")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after} seconds")
except NotFoundError as e:
print(f"Resource not found: {e.message}")
except ValidationError as e:
print(f"Invalid input: {e.message}")
except ServiceUnavailableError:
print("Server error. Try again later.")
except OathNetError as e:
print(f"API error: {e.message}")
Configuration
Copy
from oathnet import OathNetClient
client = OathNetClient(
api_key="your-api-key",
base_url="https://oathnet.org/api", # Custom base URL
timeout=30.0 # Request timeout in seconds
)
Context Manager
Copy
# Use context manager for automatic cleanup
with OathNetClient(api_key="your-api-key") as client:
result = client.search.breach("[email protected]")
print(result.data.results_found)
# Connection automatically closed