mirror of
https://github.com/laramies/theHarvester.git
synced 2025-02-26 07:23:17 +08:00
508 lines
18 KiB
Python
508 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
shodan.client
|
|
~~~~~~~~~~~~~
|
|
|
|
This module implements the Shodan API.
|
|
|
|
:copyright: (c) 2014- by John Matherly
|
|
"""
|
|
import time
|
|
|
|
import requests
|
|
import json
|
|
|
|
from .exception import APIError
|
|
from .helpers import api_request, create_facet_string
|
|
from .stream import Stream
|
|
|
|
|
|
# Try to disable the SSL warnings in urllib3 since not everybody can install
|
|
# C extensions. If you're able to install C extensions you can try to run:
|
|
#
|
|
# pip install requests[security]
|
|
#
|
|
# Which will download libraries that offer more full-featured SSL classes
|
|
try:
|
|
requests.packages.urllib3.disable_warnings()
|
|
except:
|
|
pass
|
|
|
|
# Define a basestring type if necessary for Python3 compatibility
|
|
try:
|
|
str
|
|
except NameError:
|
|
basestring = str
|
|
|
|
|
|
class Shodan:
|
|
"""Wrapper around the Shodan REST and Streaming APIs
|
|
|
|
:param key: The Shodan API key that can be obtained from your account page (https://account.shodan.io)
|
|
:type key: str
|
|
:ivar exploits: An instance of `shodan.Shodan.Exploits` that provides access to the Exploits REST API.
|
|
:ivar stream: An instance of `shodan.Shodan.Stream` that provides access to the Streaming API.
|
|
"""
|
|
|
|
class Data:
|
|
|
|
def __init__(self, parent):
|
|
self.parent = parent
|
|
|
|
def list_datasets(self):
|
|
"""Returns a list of datasets that the user has permission to download.
|
|
|
|
:returns: A list of objects where every object describes a dataset
|
|
"""
|
|
return self.parent._request('/shodan/data', {})
|
|
|
|
def list_files(self, dataset):
|
|
"""Returns a list of files that belong to the given dataset.
|
|
|
|
:returns: A list of objects where each object contains a 'name', 'size', 'timestamp' and 'url'
|
|
"""
|
|
return self.parent._request('/shodan/data/{}'.format(dataset), {})
|
|
|
|
class Tools:
|
|
|
|
def __init__(self, parent):
|
|
self.parent = parent
|
|
|
|
def myip(self):
|
|
"""Get your current IP address as seen from the Internet.
|
|
|
|
:returns: str -- your IP address
|
|
"""
|
|
return self.parent._request('/tools/myip', {})
|
|
|
|
class Exploits:
|
|
|
|
def __init__(self, parent):
|
|
self.parent = parent
|
|
|
|
def search(self, query, page=1, facets=None):
|
|
"""Search the entire Shodan Exploits archive using the same query syntax
|
|
as the website.
|
|
|
|
:param query: The exploit search query; same syntax as website.
|
|
:type query: str
|
|
:param facets: A list of strings or tuples to get summary information on.
|
|
:type facets: str
|
|
:param page: The page number to access.
|
|
:type page: int
|
|
:returns: dict -- a dictionary containing the results of the search.
|
|
"""
|
|
query_args = {
|
|
'query': query,
|
|
'page': page,
|
|
}
|
|
if facets:
|
|
query_args['facets'] = create_facet_string(facets)
|
|
|
|
return self.parent._request('/api/search', query_args, service='exploits')
|
|
|
|
def count(self, query, facets=None):
|
|
"""Search the entire Shodan Exploits archive but only return the total # of results,
|
|
not the actual exploits.
|
|
|
|
:param query: The exploit search query; same syntax as website.
|
|
:type query: str
|
|
:param facets: A list of strings or tuples to get summary information on.
|
|
:type facets: str
|
|
:returns: dict -- a dictionary containing the results of the search.
|
|
|
|
"""
|
|
query_args = {
|
|
'query': query,
|
|
}
|
|
if facets:
|
|
query_args['facets'] = create_facet_string(facets)
|
|
|
|
return self.parent._request('/api/count', query_args, service='exploits')
|
|
|
|
class Labs:
|
|
|
|
def __init__(self, parent):
|
|
self.parent = parent
|
|
|
|
def honeyscore(self, ip):
|
|
"""Calculate the probability of an IP being an ICS honeypot.
|
|
|
|
:param ip: IP address of the device
|
|
:type ip: str
|
|
|
|
:returns: int -- honeyscore ranging from 0.0 to 1.0
|
|
"""
|
|
return self.parent._request('/labs/honeyscore/{}'.format(ip), {})
|
|
|
|
def __init__(self, key):
|
|
"""Initializes the API object.
|
|
|
|
:param key: The Shodan API key.
|
|
:type key: str
|
|
"""
|
|
self.api_key = key
|
|
self.base_url = 'https://api.shodan.io'
|
|
self.base_exploits_url = 'https://exploits.shodan.io'
|
|
self.data = self.Data(self)
|
|
self.exploits = self.Exploits(self)
|
|
self.labs = self.Labs(self)
|
|
self.tools = self.Tools(self)
|
|
self.stream = Stream(key)
|
|
self._session = requests.Session()
|
|
|
|
def _request(self, function, params, service='shodan', method='get'):
|
|
"""General-purpose function to create web requests to SHODAN.
|
|
|
|
Arguments:
|
|
function -- name of the function you want to execute
|
|
params -- dictionary of parameters for the function
|
|
|
|
Returns
|
|
A dictionary containing the function's results.
|
|
|
|
"""
|
|
# Add the API key parameter automatically
|
|
params['key'] = self.api_key
|
|
|
|
# Determine the base_url based on which service we're interacting with
|
|
base_url = {
|
|
'shodan': self.base_url,
|
|
'exploits': self.base_exploits_url,
|
|
}.get(service, 'shodan')
|
|
|
|
# Send the request
|
|
try:
|
|
if method.lower() == 'post':
|
|
data = self._session.post(base_url + function, params)
|
|
else:
|
|
data = self._session.get(base_url + function, params=params)
|
|
except:
|
|
raise APIError('Unable to connect to Shodan')
|
|
|
|
# Check that the API key wasn't rejected
|
|
if data.status_code == 401:
|
|
try:
|
|
# Return the actual error message if the API returned valid JSON
|
|
error = data.json()['error']
|
|
except Exception as e:
|
|
error = 'Invalid API key'
|
|
|
|
raise APIError(error)
|
|
|
|
# Parse the text into JSON
|
|
try:
|
|
data = data.json()
|
|
except:
|
|
raise APIError('Unable to parse JSON response')
|
|
|
|
# Raise an exception if an error occurred
|
|
if type(data) == dict and 'error' in data:
|
|
raise APIError(data['error'])
|
|
|
|
# Return the data
|
|
return data
|
|
|
|
def count(self, query, facets=None):
|
|
"""Returns the total number of search results for the query.
|
|
|
|
:param query: Search query; identical syntax to the website
|
|
:type query: str
|
|
:param facets: (optional) A list of properties to get summary information on
|
|
:type facets: str
|
|
|
|
:returns: A dictionary with 1 main property: total. If facets have been provided then another property called "facets" will be available at the top-level of the dictionary. Visit the website for more detailed information.
|
|
"""
|
|
query_args = {
|
|
'query': query,
|
|
}
|
|
if facets:
|
|
query_args['facets'] = create_facet_string(facets)
|
|
return self._request('/shodan/host/count', query_args)
|
|
|
|
def host(self, ips, history=False, minify=False):
|
|
"""Get all available information on an IP.
|
|
|
|
:param ip: IP of the computer
|
|
:type ip: str
|
|
:param history: (optional) True if you want to grab the historical (non-current) banners for the host, False otherwise.
|
|
:type history: bool
|
|
:param minify: (optional) True to only return the list of ports and the general host information, no banners, False otherwise.
|
|
:type minify: bool
|
|
"""
|
|
if isinstance(ips, basestring):
|
|
ips = [ips]
|
|
|
|
params = {}
|
|
if history:
|
|
params['history'] = history
|
|
if minify:
|
|
params['minify'] = minify
|
|
return self._request('/shodan/host/%s' % ','.join(ips), params)
|
|
|
|
def info(self):
|
|
"""Returns information about the current API key, such as a list of add-ons
|
|
and other features that are enabled for the current user's API plan.
|
|
"""
|
|
return self._request('/api-info', {})
|
|
|
|
def ports(self):
|
|
"""Get a list of ports that Shodan crawls
|
|
|
|
:returns: An array containing the ports that Shodan crawls for.
|
|
"""
|
|
return self._request('/shodan/ports', {})
|
|
|
|
def protocols(self):
|
|
"""Get a list of protocols that the Shodan on-demand scanning API supports.
|
|
|
|
:returns: A dictionary containing the protocol name and description.
|
|
"""
|
|
return self._request('/shodan/protocols', {})
|
|
|
|
def scan(self, ips, force=False):
|
|
"""Scan a network using Shodan
|
|
|
|
:param ips: A list of IPs or netblocks in CIDR notation or an object structured like:
|
|
{
|
|
"9.9.9.9": [
|
|
(443, "https"),
|
|
(8080, "http")
|
|
],
|
|
"1.1.1.0/24": [
|
|
(503, "modbus")
|
|
]
|
|
}
|
|
:type ips: str or dict
|
|
:param force: Whether or not to force Shodan to re-scan the provided IPs. Only available to enterprise users.
|
|
:type force: bool
|
|
|
|
:returns: A dictionary with a unique ID to check on the scan progress, the number of IPs that will be crawled and how many scan credits are left.
|
|
"""
|
|
if isinstance(ips, basestring):
|
|
ips = [ips]
|
|
|
|
if isinstance(ips, dict):
|
|
networks = json.dumps(ips)
|
|
else:
|
|
networks = ','.join(ips)
|
|
|
|
params = {
|
|
'ips': networks,
|
|
'force': force,
|
|
}
|
|
|
|
return self._request('/shodan/scan', params, method='post')
|
|
|
|
def scan_internet(self, port, protocol):
|
|
"""Scan a network using Shodan
|
|
|
|
:param port: The port that should get scanned.
|
|
:type port: int
|
|
:param port: The name of the protocol as returned by the protocols() method.
|
|
:type port: str
|
|
|
|
:returns: A dictionary with a unique ID to check on the scan progress.
|
|
"""
|
|
params = {
|
|
'port': port,
|
|
'protocol': protocol,
|
|
}
|
|
|
|
return self._request('/shodan/scan/internet', params, method='post')
|
|
|
|
def scan_status(self, scan_id):
|
|
"""Get the status information about a previously submitted scan.
|
|
|
|
:param id: The unique ID for the scan that was submitted
|
|
:type id: str
|
|
|
|
:returns: A dictionary with general information about the scan, including its status in getting processed.
|
|
"""
|
|
return self._request('/shodan/scan/%s' % scan_id, {})
|
|
|
|
def search(self, query, page=1, limit=None, offset=None, facets=None, minify=True):
|
|
"""Search the SHODAN database.
|
|
|
|
:param query: Search query; identical syntax to the website
|
|
:type query: str
|
|
:param page: (optional) Page number of the search results
|
|
:type page: int
|
|
:param limit: (optional) Number of results to return
|
|
:type limit: int
|
|
:param offset: (optional) Search offset to begin getting results from
|
|
:type offset: int
|
|
:param facets: (optional) A list of properties to get summary information on
|
|
:type facets: str
|
|
:param minify: (optional) Whether to minify the banner and only return the important data
|
|
:type minify: bool
|
|
|
|
:returns: A dictionary with 2 main items: matches and total. If facets have been provided then another property called "facets" will be available at the top-level of the dictionary. Visit the website for more detailed information.
|
|
"""
|
|
args = {
|
|
'query': query,
|
|
'minify': minify,
|
|
}
|
|
if limit:
|
|
args['limit'] = limit
|
|
if offset:
|
|
args['offset'] = offset
|
|
else:
|
|
args['page'] = page
|
|
|
|
if facets:
|
|
args['facets'] = create_facet_string(facets)
|
|
|
|
return self._request('/shodan/host/search', args)
|
|
|
|
def search_cursor(self, query, minify=True, retries=5):
|
|
"""Search the SHODAN database.
|
|
|
|
This method returns an iterator that can directly be in a loop. Use it when you want to loop over
|
|
all of the results of a search query. But this method doesn't return a "matches" array or the "total"
|
|
information. And it also can't be used with facets, it's only use is to iterate over results more
|
|
easily.
|
|
|
|
:param query: Search query; identical syntax to the website
|
|
:type query: str
|
|
:param minify: (optional) Whether to minify the banner and only return the important data
|
|
:type minify: bool
|
|
:param retries: (optional) How often to retry the search in case it times out
|
|
:type minify: int
|
|
|
|
:returns: A search cursor that can be used as an iterator/ generator.
|
|
"""
|
|
args = {
|
|
'query': query,
|
|
'minify': minify,
|
|
}
|
|
|
|
page = 1
|
|
tries = 0
|
|
while page == 1 or results['matches']:
|
|
try:
|
|
results = self.search(query, minify=minify, page=page)
|
|
for banner in results['matches']:
|
|
try:
|
|
yield banner
|
|
except GeneratorExit:
|
|
return # exit out of the function
|
|
page += 1
|
|
tries = 0
|
|
except:
|
|
# We've retried several times but it keeps failing, so lets error out
|
|
if tries >= retries:
|
|
break
|
|
|
|
tries += 1
|
|
time.sleep(1.0) # wait 1 second if the search errored out for some reason
|
|
|
|
def search_tokens(self, query):
|
|
"""Returns information about the search query itself (filters used etc.)
|
|
|
|
:param query: Search query; identical syntax to the website
|
|
:type query: str
|
|
|
|
:returns: A dictionary with 4 main properties: filters, errors, attributes and string.
|
|
"""
|
|
query_args = {
|
|
'query': query,
|
|
}
|
|
return self._request('/shodan/host/search/tokens', query_args)
|
|
|
|
def services(self):
|
|
"""Get a list of services that Shodan crawls
|
|
|
|
:returns: A dictionary containing the ports/ services that Shodan crawls for. The key is the port number and the value is the name of the service.
|
|
"""
|
|
return self._request('/shodan/services', {})
|
|
|
|
def queries(self, page=1, sort='timestamp', order='desc'):
|
|
"""List the search queries that have been shared by other users.
|
|
|
|
:param page: Page number to iterate over results; each page contains 10 items
|
|
:type page: int
|
|
:param sort: Sort the list based on a property. Possible values are: votes, timestamp
|
|
:type sort: str
|
|
:param order: Whether to sort the list in ascending or descending order. Possible values are: asc, desc
|
|
:type order: str
|
|
|
|
:returns: A list of saved search queries (dictionaries).
|
|
"""
|
|
args = {
|
|
'page': page,
|
|
'sort': sort,
|
|
'order': order,
|
|
}
|
|
return self._request('/shodan/query', args)
|
|
|
|
def queries_search(self, query, page=1):
|
|
"""Search the directory of saved search queries in Shodan.
|
|
|
|
:param query: The search string to look for in the search query
|
|
:type query: str
|
|
:param page: Page number to iterate over results; each page contains 10 items
|
|
:type page: int
|
|
|
|
:returns: A list of saved search queries (dictionaries).
|
|
"""
|
|
args = {
|
|
'page': page,
|
|
'query': query,
|
|
}
|
|
return self._request('/shodan/query/search', args)
|
|
|
|
def queries_tags(self, size=10):
|
|
"""Search the directory of saved search queries in Shodan.
|
|
|
|
:param query: The number of tags to return
|
|
:type page: int
|
|
|
|
:returns: A list of tags.
|
|
"""
|
|
args = {
|
|
'size': size,
|
|
}
|
|
return self._request('/shodan/query/tags', args)
|
|
|
|
def create_alert(self, name, ip, expires=0):
|
|
"""Search the directory of saved search queries in Shodan.
|
|
|
|
:param query: The number of tags to return
|
|
:type page: int
|
|
|
|
:returns: A list of tags.
|
|
"""
|
|
data = {
|
|
'name': name,
|
|
'filters': {
|
|
'ip': ip,
|
|
},
|
|
'expires': expires,
|
|
}
|
|
|
|
response = api_request(self.api_key, '/shodan/alert', data=data, params={}, method='post')
|
|
|
|
return response
|
|
|
|
def alerts(self, aid=None, include_expired=True):
|
|
"""List all of the active alerts that the user created."""
|
|
if aid:
|
|
func = '/shodan/alert/%s/info' % aid
|
|
else:
|
|
func = '/shodan/alert/info'
|
|
|
|
response = api_request(self.api_key, func, params={
|
|
'include_expired': include_expired,
|
|
})
|
|
|
|
return response
|
|
|
|
def delete_alert(self, aid):
|
|
"""Delete the alert with the given ID."""
|
|
func = '/shodan/alert/%s' % aid
|
|
|
|
response = api_request(self.api_key, func, params={}, method='delete')
|
|
|
|
return response
|
|
|