Ir al contenido

Censys-Python

The official Censys Python SDK provides programmatic access to Censys data for automated reconnaissance, vulnerability scanning, and security research.

Installation

# Install from PyPI
pip install censys

# For development
git clone https://github.com/censys/censys-python.git
cd censys-python
pip install -e .

# Verify installation
python3 -c "import censys; print(censys.__version__)"

API Credentials Setup

# Get API ID and Secret from https://censys.io/settings/api
export CENSYS_API_ID="your_api_id"
export CENSYS_API_SECRET="your_api_secret"

# Or create ~/.censysrc
cat > ~/.censysrc << 'EOF'
[default]
api_id = your_api_id
api_secret = your_api_secret
EOF

chmod 600 ~/.censysrc

Basic Usage

#!/usr/bin/env python3
from censys.search import CensysHosts

# Initialize client (uses credentials from environment or .censysrc)
h = CensysHosts()

# Search for all web servers
query = "80:*"
for page in h.search(query, per_page=100):
    for host in page:
        print(f"{host['ip']} - {host['services']}")

# Search with filters
query = "22:* AND location.country_code: US"
results = h.search(query, per_page=50)
for page in results:
    for host in page:
        print(f"{host['ip']}: SSH open")

# Get specific host details
ip = "1.2.3.4"
details = h.view(ip)
print(f"IP: {details['ip']}")
print(f"Ports: {[s['port'] for s in details['services']]}")
print(f"Autonomous System: {details['autonomous_system']['name']}")

# Get host history (changes over time)
history = h.view_host_history(ip, days=90)
for timestamp, data in history:
    print(f"{timestamp}: {data}")
#!/usr/bin/env python3
from censys.search import CensysCerts

# Initialize client
c = CensysCerts()

# Search certificates for domain
domain = "example.com"
query = f"dns_names: {domain}"

for page in c.search(query, per_page=100):
    for cert in page:
        print(f"Fingerprint: {cert['fingerprint_sha256']}")
        print(f"Subject: {cert.get('parsed.subject_dn', 'N/A')}")
        print(f"Valid From: {cert.get('parsed.validity.start', 'N/A')}")
        print(f"Valid To: {cert.get('parsed.validity.end', 'N/A')}")
        print()

# Get certificate details
fingerprint = "86e2c7e7f8f3d3e4f5f6f7f8"
cert_details = c.view(fingerprint)
print(cert_details)

# Find hosts serving a certificate
hosts = c.hosts_having_cert(fingerprint)
for host in hosts:
    print(f"{host['ip']}:{host['port']}")

Advanced Querying

Complex Search Queries

#!/usr/bin/env python3
from censys.search import CensysHosts
import json

class TargetRecon:
    def __init__(self):
        self.h = CensysHosts()

    def find_org_servers(self, org_name):
        """Find all servers for organization"""
        query = f'autonomous_system.organization: "{org_name}"'
        results = []

        for page in self.h.search(query, per_page=100):
            for host in page:
                results.append({
                    'ip': host['ip'],
                    'ports': [s['port'] for s in host.get('services', [])],
                    'os': host.get('operating_system', [None])[0]
                })

        return results

    def find_web_servers_by_country(self, country_code, per_page=100):
        """Find web servers in specific country"""
        query = f"80:* OR 443:* AND location.country_code: {country_code}"

        for page in self.h.search(query, per_page=per_page):
            for host in page:
                yield host

    def find_vulnerable_services(self):
        """Find potentially vulnerable services"""
        queries = {
            'elasticsearch': '9200:*',
            'redis': '6379:*',
            'mongodb': '27017:*',
            'postgresql': '5432:*',
            'mysql': '3306:*'
        }

        vulnerable = {}

        for service, query in queries.items():
            vulnerable[service] = []
            for page in self.h.search(query, per_page=50):
                for host in page:
                    vulnerable[service].append(host['ip'])

        return vulnerable

    def asn_enumeration(self, asn):
        """Enumerate all hosts in ASN"""
        query = f"autonomous_system.asn: {asn}"
        all_hosts = []

        for page in self.h.search(query, per_page=100):
            for host in page:
                all_hosts.append(host)

        return all_hosts

# Usage
recon = TargetRecon()

# Find Google servers
google_servers = recon.find_org_servers("Google")
print(f"Found {len(google_servers)} Google servers")
for server in google_servers[:5]:
    print(f"  {server['ip']}: {server['ports']}")

# Find web servers in US
print("\nWeb servers in US:")
count = 0
for host in recon.find_web_servers_by_country("US"):
    if count < 5:
        print(f"  {host['ip']}")
    count += 1
print(f"Total: {count}")

# Find exposed services
print("\nExposed services:")
vuln = recon.find_vulnerable_services()
for service, ips in vuln.items():
    print(f"  {service}: {len(ips)} hosts")

# Enumerate ASN (example: AWS)
print("\nAWS (ASN 16509) hosts:")
asn_hosts = recon.asn_enumeration(16509)
print(f"Found {len(asn_hosts)} hosts")

Certificate Intelligence

#!/usr/bin/env python3
from censys.search import CensysCerts
from datetime import datetime, timedelta

class CertRecon:
    def __init__(self):
        self.c = CensysCerts()

    def find_certs_by_domain(self, domain):
        """Find all certificates for domain"""
        query = f'dns_names: {domain}'
        certs = []

        for page in self.c.search(query, per_page=100):
            for cert in page:
                certs.append({
                    'fingerprint': cert['fingerprint_sha256'],
                    'subject': cert.get('parsed.subject.common_name', 'N/A'),
                    'issuer': cert.get('parsed.issuer.organization', ['N/A'])[0],
                    'valid_from': cert.get('parsed.validity.start', 'N/A'),
                    'valid_to': cert.get('parsed.validity.end', 'N/A')
                })

        return certs

    def find_expiring_certs(self, days=30):
        """Find certificates expiring soon"""
        expiration = datetime.utcnow() + timedelta(days=days)
        query = f'parsed.validity.end: [2024-01-01 TO {expiration.strftime("%Y-%m-%d")}]'

        certs = []
        for page in self.c.search(query, per_page=100):
            for cert in page:
                certs.append({
                    'fingerprint': cert['fingerprint_sha256'],
                    'expires': cert.get('parsed.validity.end', 'N/A')
                })

        return certs

    def find_self_signed_certs(self):
        """Find self-signed certificates"""
        query = 'parsed.self_signed: true'

        for page in self.c.search(query, per_page=50):
            for cert in page:
                yield cert

    def find_cert_hosts(self, fingerprint):
        """Find all hosts serving a certificate"""
        return self.c.hosts_having_cert(fingerprint)

    def check_cert_chain(self, fingerprint):
        """Get certificate details and chain info"""
        cert = self.c.view(fingerprint)
        return {
            'subject': cert.get('parsed.subject_dn', 'N/A'),
            'issuer': cert.get('parsed.issuer_dn', 'N/A'),
            'valid_from': cert.get('parsed.validity.start', 'N/A'),
            'valid_to': cert.get('parsed.validity.end', 'N/A'),
            'fingerprint_sha256': cert.get('fingerprint_sha256', 'N/A'),
            'key_size': cert.get('parsed.subject_key_info.key_algorithm.name', 'N/A')
        }

# Usage
cert_recon = CertRecon()

# Find certs for domain
certs = cert_recon.find_certs_by_domain("github.com")
print(f"Found {len(certs)} certificates for github.com")
for cert in certs[:3]:
    print(f"  {cert['subject']}")
    print(f"    Issuer: {cert['issuer']}")
    print(f"    Expires: {cert['valid_to']}")

# Find expiring certificates
print("\nExpiring in 30 days:")
expiring = cert_recon.find_expiring_certs(days=30)
print(f"Found {len(expiring)} expiring certificates")

# Get certificate details
if certs:
    fp = certs[0]['fingerprint']
    details = cert_recon.check_cert_chain(fp)
    print(f"\nCertificate details:")
    print(details)

    # Find hosts
    hosts = cert_recon.find_cert_hosts(fp)
    print(f"\nHosts serving this certificate:")
    for host in hosts[:5]:
        print(f"  {host['ip']}:{host['port']}")

Pagination & Error Handling

#!/usr/bin/env python3
from censys.search import CensysHosts
from censys.common.exceptions import CensysException
import time

class SafeRecon:
    def __init__(self):
        self.h = CensysHosts()

    def safe_search(self, query, max_pages=10, delay=2):
        """Search with error handling and rate limiting"""
        all_results = []
        page_count = 0

        try:
            for page in self.h.search(query, per_page=100):
                try:
                    for host in page:
                        all_results.append(host)

                    page_count += 1

                    if page_count >= max_pages:
                        print(f"Reached max pages ({max_pages})")
                        break

                    time.sleep(delay)  # Rate limiting

                except Exception as e:
                    print(f"Error processing page: {e}")
                    continue

        except CensysException as e:
            print(f"Censys API error: {e}")
        except Exception as e:
            print(f"Unexpected error: {e}")

        return all_results

    def safe_view(self, ip):
        """Get IP details with error handling"""
        try:
            return self.h.view(ip)
        except CensysException as e:
            print(f"Error viewing {ip}: {e}")
            return None
        except Exception as e:
            print(f"Unexpected error: {e}")
            return None

# Usage
safe_recon = SafeRecon()

# Safe search
results = safe_recon.safe_search("80:*", max_pages=5)
print(f"Retrieved {len(results)} results")

# Safe view
ip_details = safe_recon.safe_view("1.2.3.4")
if ip_details:
    print(ip_details)

Bulk Operations & Export

#!/usr/bin/env python3
from censys.search import CensysHosts
import json
import csv

class BulkExport:
    def __init__(self):
        self.h = CensysHosts()

    def export_to_json(self, query, output_file, max_results=1000):
        """Export search results to JSON"""
        results = []
        count = 0

        for page in self.h.search(query, per_page=100):
            for host in page:
                results.append(host)
                count += 1

                if count >= max_results:
                    break

            if count >= max_results:
                break

        with open(output_file, 'w') as f:
            json.dump(results, f, indent=2)

        return len(results)

    def export_to_csv(self, query, output_file, max_results=1000):
        """Export search results to CSV"""
        results = []
        count = 0

        for page in self.h.search(query, per_page=100):
            for host in page:
                results.append({
                    'ip': host['ip'],
                    'ports': ','.join(str(s['port']) for s in host.get('services', [])),
                    'os': host.get('operating_system', ['N/A'])[0] if host.get('operating_system') else 'N/A',
                    'asn': host.get('autonomous_system', {}).get('asn', 'N/A')
                })
                count += 1

                if count >= max_results:
                    break

            if count >= max_results:
                break

        with open(output_file, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=['ip', 'ports', 'os', 'asn'])
            writer.writeheader()
            writer.writerows(results)

        return len(results)

# Usage
exporter = BulkExport()

# Export to JSON
count = exporter.export_to_json("80:* OR 443:*", "web_servers.json", max_results=500)
print(f"Exported {count} results to web_servers.json")

# Export to CSV
count = exporter.export_to_csv("22:*", "ssh_servers.csv", max_results=500)
print(f"Exported {count} results to ssh_servers.csv")

Rate Limiting & Quotas

#!/usr/bin/env python3
from censys.search import CensysHosts
import time

class QuotaAwareRecon:
    def __init__(self):
        self.h = CensysHosts()
        self.min_delay = 1  # Minimum seconds between requests

    def check_quota(self):
        """Check remaining API quota"""
        try:
            # Not directly available via SDK, but can track manually
            return "Check quota via web interface"
        except Exception as e:
            print(f"Error checking quota: {e}")

    def throttled_search(self, query, per_page=50):
        """Search with automatic throttling"""
        last_request = time.time()

        for page in self.h.search(query, per_page=per_page):
            # Enforce minimum delay
            elapsed = time.time() - last_request
            if elapsed < self.min_delay:
                time.sleep(self.min_delay - elapsed)

            last_request = time.time()
            yield page

# Usage
quota_recon = QuotaAwareRecon()
for page in quota_recon.throttled_search("80:*", per_page=50):
    for host in page:
        print(host['ip'])

Resources


Last updated: 2026-03-30