Censys-Python
The official Censys Python SDK provides programmatic access to Censys data for automated reconnaissance, vulnerability scanning, and security research.
Installation
# Install from PyPI
pip install censys
# For development
git clone https://github.com/censys/censys-python.git
cd censys-python
pip install -e .
# Verify installation
python3 -c "import censys; print(censys.__version__)"
API Credentials Setup
# Get API ID and Secret from https://censys.io/settings/api
export CENSYS_API_ID="your_api_id"
export CENSYS_API_SECRET="your_api_secret"
# Or create ~/.censysrc
cat > ~/.censysrc << 'EOF'
[default]
api_id = your_api_id
api_secret = your_api_secret
EOF
chmod 600 ~/.censysrc
Basic Usage
IPv4 Hosts Search
#!/usr/bin/env python3
from censys.search import CensysHosts
# Initialize client (uses credentials from environment or .censysrc)
h = CensysHosts()
# Search for all web servers
query = "80:*"
for page in h.search(query, per_page=100):
for host in page:
print(f"{host['ip']} - {host['services']}")
# Search with filters
query = "22:* AND location.country_code: US"
results = h.search(query, per_page=50)
for page in results:
for host in page:
print(f"{host['ip']}: SSH open")
# Get specific host details
ip = "1.2.3.4"
details = h.view(ip)
print(f"IP: {details['ip']}")
print(f"Ports: {[s['port'] for s in details['services']]}")
print(f"Autonomous System: {details['autonomous_system']['name']}")
# Get host history (changes over time)
history = h.view_host_history(ip, days=90)
for timestamp, data in history:
print(f"{timestamp}: {data}")
SSL/TLS Certificate Search
#!/usr/bin/env python3
from censys.search import CensysCerts
# Initialize client
c = CensysCerts()
# Search certificates for domain
domain = "example.com"
query = f"dns_names: {domain}"
for page in c.search(query, per_page=100):
for cert in page:
print(f"Fingerprint: {cert['fingerprint_sha256']}")
print(f"Subject: {cert.get('parsed.subject_dn', 'N/A')}")
print(f"Valid From: {cert.get('parsed.validity.start', 'N/A')}")
print(f"Valid To: {cert.get('parsed.validity.end', 'N/A')}")
print()
# Get certificate details
fingerprint = "86e2c7e7f8f3d3e4f5f6f7f8"
cert_details = c.view(fingerprint)
print(cert_details)
# Find hosts serving a certificate
hosts = c.hosts_having_cert(fingerprint)
for host in hosts:
print(f"{host['ip']}:{host['port']}")
Advanced Querying
Complex Search Queries
#!/usr/bin/env python3
from censys.search import CensysHosts
import json
class TargetRecon:
def __init__(self):
self.h = CensysHosts()
def find_org_servers(self, org_name):
"""Find all servers for organization"""
query = f'autonomous_system.organization: "{org_name}"'
results = []
for page in self.h.search(query, per_page=100):
for host in page:
results.append({
'ip': host['ip'],
'ports': [s['port'] for s in host.get('services', [])],
'os': host.get('operating_system', [None])[0]
})
return results
def find_web_servers_by_country(self, country_code, per_page=100):
"""Find web servers in specific country"""
query = f"80:* OR 443:* AND location.country_code: {country_code}"
for page in self.h.search(query, per_page=per_page):
for host in page:
yield host
def find_vulnerable_services(self):
"""Find potentially vulnerable services"""
queries = {
'elasticsearch': '9200:*',
'redis': '6379:*',
'mongodb': '27017:*',
'postgresql': '5432:*',
'mysql': '3306:*'
}
vulnerable = {}
for service, query in queries.items():
vulnerable[service] = []
for page in self.h.search(query, per_page=50):
for host in page:
vulnerable[service].append(host['ip'])
return vulnerable
def asn_enumeration(self, asn):
"""Enumerate all hosts in ASN"""
query = f"autonomous_system.asn: {asn}"
all_hosts = []
for page in self.h.search(query, per_page=100):
for host in page:
all_hosts.append(host)
return all_hosts
# Usage
recon = TargetRecon()
# Find Google servers
google_servers = recon.find_org_servers("Google")
print(f"Found {len(google_servers)} Google servers")
for server in google_servers[:5]:
print(f" {server['ip']}: {server['ports']}")
# Find web servers in US
print("\nWeb servers in US:")
count = 0
for host in recon.find_web_servers_by_country("US"):
if count < 5:
print(f" {host['ip']}")
count += 1
print(f"Total: {count}")
# Find exposed services
print("\nExposed services:")
vuln = recon.find_vulnerable_services()
for service, ips in vuln.items():
print(f" {service}: {len(ips)} hosts")
# Enumerate ASN (example: AWS)
print("\nAWS (ASN 16509) hosts:")
asn_hosts = recon.asn_enumeration(16509)
print(f"Found {len(asn_hosts)} hosts")
Certificate Intelligence
#!/usr/bin/env python3
from censys.search import CensysCerts
from datetime import datetime, timedelta
class CertRecon:
def __init__(self):
self.c = CensysCerts()
def find_certs_by_domain(self, domain):
"""Find all certificates for domain"""
query = f'dns_names: {domain}'
certs = []
for page in self.c.search(query, per_page=100):
for cert in page:
certs.append({
'fingerprint': cert['fingerprint_sha256'],
'subject': cert.get('parsed.subject.common_name', 'N/A'),
'issuer': cert.get('parsed.issuer.organization', ['N/A'])[0],
'valid_from': cert.get('parsed.validity.start', 'N/A'),
'valid_to': cert.get('parsed.validity.end', 'N/A')
})
return certs
def find_expiring_certs(self, days=30):
"""Find certificates expiring soon"""
expiration = datetime.utcnow() + timedelta(days=days)
query = f'parsed.validity.end: [2024-01-01 TO {expiration.strftime("%Y-%m-%d")}]'
certs = []
for page in self.c.search(query, per_page=100):
for cert in page:
certs.append({
'fingerprint': cert['fingerprint_sha256'],
'expires': cert.get('parsed.validity.end', 'N/A')
})
return certs
def find_self_signed_certs(self):
"""Find self-signed certificates"""
query = 'parsed.self_signed: true'
for page in self.c.search(query, per_page=50):
for cert in page:
yield cert
def find_cert_hosts(self, fingerprint):
"""Find all hosts serving a certificate"""
return self.c.hosts_having_cert(fingerprint)
def check_cert_chain(self, fingerprint):
"""Get certificate details and chain info"""
cert = self.c.view(fingerprint)
return {
'subject': cert.get('parsed.subject_dn', 'N/A'),
'issuer': cert.get('parsed.issuer_dn', 'N/A'),
'valid_from': cert.get('parsed.validity.start', 'N/A'),
'valid_to': cert.get('parsed.validity.end', 'N/A'),
'fingerprint_sha256': cert.get('fingerprint_sha256', 'N/A'),
'key_size': cert.get('parsed.subject_key_info.key_algorithm.name', 'N/A')
}
# Usage
cert_recon = CertRecon()
# Find certs for domain
certs = cert_recon.find_certs_by_domain("github.com")
print(f"Found {len(certs)} certificates for github.com")
for cert in certs[:3]:
print(f" {cert['subject']}")
print(f" Issuer: {cert['issuer']}")
print(f" Expires: {cert['valid_to']}")
# Find expiring certificates
print("\nExpiring in 30 days:")
expiring = cert_recon.find_expiring_certs(days=30)
print(f"Found {len(expiring)} expiring certificates")
# Get certificate details
if certs:
fp = certs[0]['fingerprint']
details = cert_recon.check_cert_chain(fp)
print(f"\nCertificate details:")
print(details)
# Find hosts
hosts = cert_recon.find_cert_hosts(fp)
print(f"\nHosts serving this certificate:")
for host in hosts[:5]:
print(f" {host['ip']}:{host['port']}")
Pagination & Error Handling
#!/usr/bin/env python3
from censys.search import CensysHosts
from censys.common.exceptions import CensysException
import time
class SafeRecon:
def __init__(self):
self.h = CensysHosts()
def safe_search(self, query, max_pages=10, delay=2):
"""Search with error handling and rate limiting"""
all_results = []
page_count = 0
try:
for page in self.h.search(query, per_page=100):
try:
for host in page:
all_results.append(host)
page_count += 1
if page_count >= max_pages:
print(f"Reached max pages ({max_pages})")
break
time.sleep(delay) # Rate limiting
except Exception as e:
print(f"Error processing page: {e}")
continue
except CensysException as e:
print(f"Censys API error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
return all_results
def safe_view(self, ip):
"""Get IP details with error handling"""
try:
return self.h.view(ip)
except CensysException as e:
print(f"Error viewing {ip}: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Usage
safe_recon = SafeRecon()
# Safe search
results = safe_recon.safe_search("80:*", max_pages=5)
print(f"Retrieved {len(results)} results")
# Safe view
ip_details = safe_recon.safe_view("1.2.3.4")
if ip_details:
print(ip_details)
Bulk Operations & Export
#!/usr/bin/env python3
from censys.search import CensysHosts
import json
import csv
class BulkExport:
def __init__(self):
self.h = CensysHosts()
def export_to_json(self, query, output_file, max_results=1000):
"""Export search results to JSON"""
results = []
count = 0
for page in self.h.search(query, per_page=100):
for host in page:
results.append(host)
count += 1
if count >= max_results:
break
if count >= max_results:
break
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
return len(results)
def export_to_csv(self, query, output_file, max_results=1000):
"""Export search results to CSV"""
results = []
count = 0
for page in self.h.search(query, per_page=100):
for host in page:
results.append({
'ip': host['ip'],
'ports': ','.join(str(s['port']) for s in host.get('services', [])),
'os': host.get('operating_system', ['N/A'])[0] if host.get('operating_system') else 'N/A',
'asn': host.get('autonomous_system', {}).get('asn', 'N/A')
})
count += 1
if count >= max_results:
break
if count >= max_results:
break
with open(output_file, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['ip', 'ports', 'os', 'asn'])
writer.writeheader()
writer.writerows(results)
return len(results)
# Usage
exporter = BulkExport()
# Export to JSON
count = exporter.export_to_json("80:* OR 443:*", "web_servers.json", max_results=500)
print(f"Exported {count} results to web_servers.json")
# Export to CSV
count = exporter.export_to_csv("22:*", "ssh_servers.csv", max_results=500)
print(f"Exported {count} results to ssh_servers.csv")
Rate Limiting & Quotas
#!/usr/bin/env python3
from censys.search import CensysHosts
import time
class QuotaAwareRecon:
def __init__(self):
self.h = CensysHosts()
self.min_delay = 1 # Minimum seconds between requests
def check_quota(self):
"""Check remaining API quota"""
try:
# Not directly available via SDK, but can track manually
return "Check quota via web interface"
except Exception as e:
print(f"Error checking quota: {e}")
def throttled_search(self, query, per_page=50):
"""Search with automatic throttling"""
last_request = time.time()
for page in self.h.search(query, per_page=per_page):
# Enforce minimum delay
elapsed = time.time() - last_request
if elapsed < self.min_delay:
time.sleep(self.min_delay - elapsed)
last_request = time.time()
yield page
# Usage
quota_recon = QuotaAwareRecon()
for page in quota_recon.throttled_search("80:*", per_page=50):
for host in page:
print(host['ip'])
Resources
Last updated: 2026-03-30