Skip to main content

Installation

pip install oathnet
Requirements: Python 3.9+

Quick Start

from oathnet import OathNetClient

# Initialize client with API key
client = OathNetClient(api_key="your-api-key")

# Search breaches
result = client.search.breach("[email protected]")
print(f"Found {result.data.results_found} results")

for record in result.data.results:
    print(f"{record.email}: {record.password}")

Authentication

import os
from oathnet import OathNetClient

# Explicit API key (required)
client = OathNetClient(api_key="your-api-key")

# From environment variable
client = OathNetClient(api_key=os.environ["OATHNET_API_KEY"])

Search Service

# Basic search
result = client.search.breach("[email protected]")

# With filters
result = client.search.breach(
    "[email protected]",
    dbnames="linkedin_2012,adobe_2013"
)

# Access results
print(f"Total: {result.data.results_found}")
print(f"Shown: {result.data.results_shown}")

for record in result.data.results:
    print(f"Email: {record.email}")
    print(f"Password: {record.password}")
    print(f"Database: {record.dbname}")
    print("---")

# Pagination with cursor
if result.data.cursor:
    next_page = client.search.breach(
        "[email protected]",
        cursor=result.data.cursor
    )

Initialize Session

# Create a search session for quota optimization
result = client.search.init_session("[email protected]")

session_id = result.data.session.id
print(f"Session ID: {session_id}")
print(f"Search Type: {result.data.session.search_type}")
print(f"Expires: {result.data.session.expires_at}")
# Basic search
result = client.stealer.search("[email protected]")

# Advanced filtering
result = client.stealer.search(
    "[email protected]",
    domains=["google.com", "facebook.com"],
    has_log_id=True,
    page_size=50
)

# Access items
for item in result.data.items:
    print(f"URL: {item.url}")
    print(f"Username: {item.username}")
    print(f"Password: {item.password}")
    print(f"Log ID: {item.log_id}")
    print("---")

# Pagination
if result.data.next_cursor:
    next_page = client.stealer.search(
        "[email protected]",
        cursor=result.data.next_cursor
    )

# Subdomain extraction
result = client.stealer.subdomain("example.com")
print(f"Found {result.data.count} subdomains")
for sub in result.data.subdomains:
    print(f"  {sub}")

Victims

# Search victims
result = client.victims.search("[email protected]")

for victim in result.data.items:
    print(f"Log ID: {victim.log_id}")
    print(f"Users: {victim.device_users}")
    print(f"IPs: {victim.device_ips}")
    print(f"Documents: {victim.total_docs}")
    print("---")

# Get victim manifest (file tree)
manifest = client.victims.get_manifest("log_id_here")
print(f"Total Files: {manifest.data.total_files}")
print(f"Total Size: {manifest.data.total_size} bytes")

for file in manifest.data.files[:10]:
    print(f"  {file.relative_path} ({file.size} bytes)")

# Get file content
file_content = client.victims.get_file("log_id", "file_id")
print(file_content.data.content)

# Download archive
client.victims.download_archive("log_id", "./victim_archive.zip")
Search within stealer log files using regex or wildcard patterns.
# Create a file search job
result = client.file_search.create(
    expression="password",
    log_ids=["log_id_1", "log_id_2"],
    search_mode="literal"  # literal, regex, or wildcard
)
job_id = result.data.job_id
print(f"Job ID: {job_id}")

# Check job status
status = client.file_search.get_status(job_id)
print(f"Status: {status.data.status}")
if status.data.summary:
    print(f"Files scanned: {status.data.summary.files_scanned}")
    print(f"Matches: {status.data.summary.matches}")

# Wait for completion and get results
result = client.file_search.search(
    expression="password",
    log_ids=["log_id_1", "log_id_2"],
    search_mode="literal",
    timeout=300  # seconds
)

if result.data.matches:
    for match in result.data.matches:
        print(f"File: {match.file_name}")
        print(f"Log ID: {match.log_id}")
        print(f"Match: {match.match_text}")
        print("---")

Exports

Export large datasets asynchronously.
# Create an export job
result = client.exports.create(
    export_type="stealer",  # stealer or victims
    limit=100,              # minimum 100
    format="jsonl"          # jsonl or csv
)
job_id = result.data.job_id
print(f"Export Job ID: {job_id}")

# Check export status
status = client.exports.get_status(job_id)
print(f"Status: {status.data.status}")
if status.data.progress:
    print(f"Progress: {status.data.progress.percent}%")

# Wait for completion and download
result = client.exports.wait_for_completion(job_id, timeout=600)
if result.data.status == "completed":
    # Download the export file
    client.exports.download(job_id, "./export.jsonl")

OSINT Lookups

# IP Info
result = client.osint.ip_info("8.8.8.8")
print(f"Location: {result.data.city}, {result.data.country}")
print(f"ISP: {result.data.isp}")
print(f"Proxy: {result.data.proxy}")

# Steam
result = client.osint.steam("76561198012345678")
print(f"Username: {result.data.username}")
print(f"Avatar: {result.data.avatar}")

# Xbox
result = client.osint.xbox("GamerTag123")
print(f"Username: {result.data.username}")
print(f"Avatar: {result.data.avatar}")

# Discord User Info
result = client.osint.discord_userinfo("123456789012345678")
print(f"Username: {result.data.username}")
print(f"Global Name: {result.data.global_name}")
print(f"Created: {result.data.creation_date}")

# Discord Username History
result = client.osint.discord_username_history("123456789012345678")
for entry in result.data.history:
    if entry.name and entry.time:
        print(f"{entry.name[0]} at {entry.time[0]}")

# Discord to Roblox
result = client.osint.discord_to_roblox("123456789012345678")
if result.data.roblox_id:
    print(f"Linked Roblox: {result.data.roblox_id}")

# Roblox User Info
result = client.osint.roblox_userinfo(user_id="123456789")
# Or by username:
result = client.osint.roblox_userinfo(username="PlayerName")
print(f"Username: {result.data.username}")
print(f"Display Name: {result.data.display_name}")

# Holehe - Email account detection
result = client.osint.holehe("[email protected]")
print(f"Found on {len(result.data.domains)} services:")
for domain in result.data.domains:
    print(f"  {domain}")

# GHunt - Google account lookup
result = client.osint.ghunt("[email protected]")
if result.data.found:
    print(f"Name: {result.data.profile.name}")

# Subdomain extraction
result = client.osint.extract_subdomain("example.com", alive_only=True)
for sub in result.data.subdomains:
    print(sub)

# Minecraft username history
result = client.osint.minecraft_history("PlayerName")
for entry in result.data.history:
    print(f"{entry.username} - {entry.changed_at}")

Utility Service

# Database name autocomplete
result = client.utility.dbname_autocomplete("link")
for name in result:
    print(name)  # linkedin_2012, linkedin_2021, etc.

Error Handling

from oathnet.exceptions import (
    OathNetError,
    AuthenticationError,
    NotFoundError,
    ValidationError,
    RateLimitError,
    QuotaExceededError,
    ServiceUnavailableError
)

try:
    result = client.search.breach("[email protected]")

except AuthenticationError:
    print("Invalid API key")

except QuotaExceededError:
    print("Daily quota exceeded")

except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after} seconds")

except NotFoundError as e:
    print(f"Resource not found: {e.message}")

except ValidationError as e:
    print(f"Invalid input: {e.message}")

except ServiceUnavailableError:
    print("Server error. Try again later.")

except OathNetError as e:
    print(f"API error: {e.message}")

Configuration

from oathnet import OathNetClient

client = OathNetClient(
    api_key="your-api-key",
    base_url="https://oathnet.org/api",  # Custom base URL
    timeout=30.0                          # Request timeout in seconds
)

Context Manager

# Use context manager for automatic cleanup
with OathNetClient(api_key="your-api-key") as client:
    result = client.search.breach("[email protected]")
    print(result.data.results_found)
# Connection automatically closed