Add new module anubis and unit test for it. Update shodan related things

This commit is contained in:
L1ghtn1ng 2021-06-21 00:19:27 +01:00
parent 0b7b5b02ab
commit d28393b9a2
6 changed files with 185 additions and 67 deletions

View file

@ -45,6 +45,10 @@ jobs:
mypy --pretty theHarvester/*/*.py
mypy --pretty theHarvester/*/*/*.py
- name: Run theHarvester module Anubis
run: |
python theHarvester.py -d apple.com -b anubis
- name: Run theHarvester module Baidu
run: |
python theHarvester.py -d yale.edu -b baidu

View file

@ -12,6 +12,8 @@ multiple public data sources that include:
Passive:
--------
* anubis: Anubis-DB - https://github.com/jonluca/anubis
* baidu: Baidu search engine - www.baidu.com
* bing: Microsoft search engine - www.bing.com

View file

@ -0,0 +1,30 @@
#!/usr/bin/env python3
# coding=utf-8
import requests
from theHarvester.lib.core import *
from theHarvester.discovery import anubis
import pytest
pytestmark = pytest.mark.asyncio
class TestAnubis:
@staticmethod
def domain() -> str:
return 'apple.com'
async def test_api(self):
base_url = f'https://jldc.me/anubis/subdomains/{TestAnubis.domain()}'
headers = {'User-Agent': Core.get_user_agent()}
request = requests.get(base_url, headers=headers)
assert request.status_code == 200
async def test_do_search(self):
search = anubis.SearchAnubis(word=TestAnubis.domain())
await search.do_search()
return await search.get_hostnames()
async def test_process(self):
await self.test_do_search()
assert len(await self.test_do_search()) > 0

View file

@ -1,5 +1,5 @@
#!/usr/bin/env python3
import pprint
from typing import Dict, List
from theHarvester.discovery import *
from theHarvester.discovery import dnssearch, takeover, shodansearch
@ -34,7 +34,7 @@ async def start(rest_args=None):
parser.add_argument('-n', '--dns-lookup', help='Enable DNS server lookup, default False.', default=False, action='store_true')
parser.add_argument('-c', '--dns-brute', help='Perform a DNS brute force on the domain.', default=False, action='store_true')
parser.add_argument('-f', '--filename', help='Save the results to an XML and JSON file.', default='', type=str)
parser.add_argument('-b', '--source', help='''baidu, bing, binaryedge, bingapi, bufferoverun, censys, certspotter, crtsh,
parser.add_argument('-b', '--source', help='''anubis, baidu, bing, binaryedge, bingapi, bufferoverun, censys, certspotter, crtsh,
dnsdumpster, duckduckgo, exalead, github-code, google,
hackertarget, hunter, intelx, linkedin, linkedin_links,
netcraft, omnisint, otx, pentesttools, projectdiscovery,
@ -57,7 +57,6 @@ async def start(rest_args=None):
alphabet = string.ascii_letters + string.digits
rest_filename += f"{''.join(secrets.choice(alphabet) for _ in range(32))}_{filename}" \
if len(filename) != 0 else ""
else:
args = parser.parse_args()
filename: str = args.filename
@ -96,6 +95,13 @@ async def start(rest_args=None):
interesting_urls: list = []
total_asns: list = []
linkedin_people_list_tracker: list = []
linkedin_links_tracker: list = []
twitter_people_list_tracker: list = []
interesting_urls: list = []
total_asns: list = []
async def store(search_engine: Any, source: str, process_param: Any = None, store_host: bool = False,
store_emails: bool = False, store_ip: bool = False, store_people: bool = False,
store_links: bool = False, store_results: bool = False,
@ -167,7 +173,7 @@ async def store(search_engine: Any, source: str, process_param: Any = None, stor
iurls = await search_engine.get_interestingurls()
interesting_urls.extend(iurls)
if len(iurls) > 0:
await db.store_all(word, iurls, 'interestingurl', engineitem)
await db.store_all(word, iurls, 'interestingurls', engineitem)
if store_asns:
fasns = await search_engine.get_asns()
total_asns.extend(fasns)
@ -185,7 +191,15 @@ async def store(search_engine: Any, source: str, process_param: Any = None, stor
print(f'\033[94m[*] Target: {word} \n \033[0m')
for engineitem in engines:
if engineitem == 'baidu':
if engineitem == 'anubis':
from theHarvester.discovery import anubis
try:
anubis_search = anubis.SearchAnubis(word)
stor_lst.append(store(anubis_search, engineitem, store_host=True))
except Exception as e:
print(e)
elif engineitem == 'baidu':
from theHarvester.discovery import baidusearch
try:
baidu_search = baidusearch.SearchBaidu(word, limit)
@ -383,8 +397,8 @@ async def store(search_engine: Any, source: str, process_param: Any = None, stor
elif engineitem == 'rocketreach':
from theHarvester.discovery import rocketreach
try:
rocketreach_search = rocketreach.SearchRocketreach(word)
stor_lst.append(store(rocketreach_search, engineitem, store_emails=True))
rocketreach_search = rocketreach.SearchRocketReach(word, limit)
stor_lst.append(store(rocketreach_search, engineitem, store_links=True))
except Exception as e:
if isinstance(e, MissingKey):
print(e)
@ -532,6 +546,59 @@ async def handler(lst):
sys.exit(1)
# Results
if len(total_asns) > 0:
print(f'\n[*] ASNS found: {len(total_asns)}')
print('--------------------')
total_asns = list(sorted(set(total_asns)))
for asn in total_asns:
print(asn)
if len(interesting_urls) > 0:
print(f'\n[*] Interesting Urls found: {len(interesting_urls)}')
print('--------------------')
interesting_urls = list(sorted(set(interesting_urls)))
for iurl in interesting_urls:
print(iurl)
if len(twitter_people_list_tracker) == 0 and 'twitter' in engines:
print('\n[*] No Twitter users found.\n\n')
else:
if len(twitter_people_list_tracker) >= 1:
print('\n[*] Twitter Users found: ' + str(len(twitter_people_list_tracker)))
print('---------------------')
twitter_people_list_tracker = list(sorted(set(twitter_people_list_tracker)))
for usr in twitter_people_list_tracker:
print(usr)
if len(linkedin_people_list_tracker) == 0 and 'linkedin' in engines:
print('\n[*] No LinkedIn users found.\n\n')
else:
if len(linkedin_people_list_tracker) >= 1:
print('\n[*] LinkedIn Users found: ' + str(len(linkedin_people_list_tracker)))
print('---------------------')
linkedin_people_list_tracker = list(sorted(set(linkedin_people_list_tracker)))
for usr in linkedin_people_list_tracker:
print(usr)
if len(linkedin_links_tracker) == 0 and ('linkedin' in engines or 'rocketreach' in engines):
print(f'\n[*] LinkedIn Links found: {len(linkedin_links_tracker)}')
linkedin_links_tracker = list(sorted(set(linkedin_links_tracker)))
print('---------------------')
for link in linkedin_people_list_tracker:
print(link)
length_urls = len(all_urls)
if length_urls == 0:
if len(engines) >= 1 and 'trello' in engines:
print('\n[*] No Trello URLs found.')
else:
total = length_urls
print('\n[*] Trello URLs found: ' + str(total))
print('--------------------')
all_urls = list(sorted(set(all_urls)))
for url in sorted(all_urls):
print(url)
if len(all_ip) == 0:
print('\n[*] No IPs found.')
else:
@ -540,13 +607,15 @@ async def handler(lst):
# use netaddr as the list may contain ipv4 and ipv6 addresses
ip_list = sorted([netaddr.IPAddress(ip.strip()) for ip in set(all_ip)])
print('\n'.join(map(str, ip_list)))
ip_list = list(ip_list)
if len(all_emails) == 0:
print('\n[*] No emails found.')
else:
print('\n[*] Emails found: ' + str(len(all_emails)))
print('----------------------')
print(('\n'.join(sorted(list(set(all_emails))))))
all_emails = sorted(list(set(all_emails)))
print(('\n'.join(all_emails)))
if len(all_hosts) == 0:
print('\n[*] No hosts found.\n\n')
@ -562,16 +631,6 @@ async def handler(lst):
print(host)
host_ip = [netaddr_ip.format() for netaddr_ip in sorted([netaddr.IPAddress(ip) for ip in ips])]
await db.store_all(word, host_ip, 'ip', 'DNS-resolver')
length_urls = len(all_urls)
if length_urls == 0:
if len(engines) >= 1 and 'trello' in engines:
print('\n[*] No Trello URLs found.')
else:
total = length_urls
print('\n[*] Trello URLs found: ' + str(total))
print('--------------------')
for url in sorted(all_urls):
print(url)
# DNS brute force
if dnsbrute and dnsbrute[0] is True:
@ -708,20 +767,34 @@ async def handler(lst):
if shodan is True:
import texttable
tab = texttable.Texttable()
header = ['IP address', 'Hostname', 'Org', 'Services:Ports', 'Technologies']
header = ['Asn', 'Domains', 'Hostnames', 'IP address',
'Isp', 'Org', 'Ports', 'Product', 'Server',
'Technologies', 'Title']
tab.header(header)
tab.set_cols_align(['c', 'c', 'c', 'c', 'c'])
tab.set_cols_valign(['m', 'm', 'm', 'm', 'm'])
tab.set_cols_align(['c'] * len(header))
tab.set_cols_valign(['m'] * len(header))
tab.set_chars(['-', '|', '+', '#'])
tab.set_cols_width([15, 20, 15, 15, 18])
tab.set_cols_width([20] * len(header))
print('\033[94m[*] Searching Shodan. \033[0m')
try:
for ip in host_ip:
print(('\tSearching for ' + ip))
shodan = shodansearch.SearchShodan()
rowdata = await shodan.search_ip(ip)
shodandict = await shodan.search_ip(ip)
await asyncio.sleep(2)
rowdata = []
for key, value in shodandict[ip].items():
if str(value) == 'Not in Shodan' or \
'Error occurred in the Shodan IP search module' in str(value):
rowdata.append([value].extend([''] * (len(header) - 1)))
break
if isinstance(value, int):
value = str(value)
if isinstance(value, list):
value = ', '.join(map(str, value))
rowdata.append(value)
tab.add_row(rowdata)
shodanres.append(rowdata)
printedtable = tab.draw()
print(printedtable)
except Exception as e:
@ -743,7 +816,6 @@ async def handler(lst):
else:
pass
# Reporting
if filename != '':
print('\n[*] Reporting started.')
try:
@ -752,6 +824,7 @@ async def handler(lst):
else:
filename = 'theHarvester/app/static/' + rest_filename.rsplit('.', 1)[0] + '.xml'
# TODO use aiofiles if user is using rest api
# XML REPORT SECTION
with open(filename, 'w+') as file:
file.write('<?xml version="1.0" encoding="UTF-8"?><theHarvester>')
for x in all_emails:
@ -768,26 +841,7 @@ async def handler(lst):
file.write(f'<vhost><ip>{ip} </ip><hostname>{host}</hostname></vhost>')
else:
file.write(f'<vhost>{host}</vhost>')
if shodanres != []:
shodanalysis = []
for x in shodanres:
res = x.split('SAPO')
file.write('<shodan>')
file.write('<host>' + res[0] + '</host>')
file.write('<port>' + res[2] + '</port>')
file.write('<banner><!--' + res[1] + '--></banner>')
reg_server = re.compile('Server:.*')
temp = reg_server.findall(res[1])
if temp:
shodanalysis.append(res[0] + ':' + temp[0])
file.write('</shodan>')
if shodanalysis:
shodanalysis = sorted(set(shodanalysis))
file.write('<servers>')
for x in shodanalysis:
file.write('<server>' + x + '</server>')
file.write('</servers>')
# TODO add Shodan output into XML report
file.write('</theHarvester>')
print('[*] XML File saved.')
except Exception as error:
@ -796,38 +850,43 @@ async def handler(lst):
try:
# JSON REPORT SECTION
filename = filename.rsplit('.', 1)[0] + '.json'
# create dict with values for json output
json_dict: Dict = dict()
json_dict["emails"] = [email for email in all_emails]
json_dict["hosts"] = [host for host in full]
json_dict["vhosts"] = [host for host in vhost]
# determine if variable exists
# it should but just a sanity check
if 'ip_list' in locals():
if all_ip and len(all_ip) >= 1 and ip_list and len(ip_list) > 0:
json_dict["ips"] = [str(ip) for ip in ip_list]
if len(all_emails) > 0:
json_dict["emails"] = [email for email in all_emails]
if len(full) > 0:
json_dict["hosts"] = [host for host in full]
if vhost and len(vhost) > 0:
json_dict["vhosts"] = [host for host in vhost]
if len(interesting_urls) > 0:
json_dict["interesting_urls"] = interesting_urls
if len(all_urls) > 0:
json_dict["trello_urls"] = all_urls
if len(total_asns) > 0:
json_dict["asns"] = total_asns
if len(twitter_people_list_tracker) > 0:
json_dict["twitter_people"] = [person for person in list(sorted(set(twitter_people_list_tracker)))]
json_dict["twitter_people"] = twitter_people_list_tracker
if len(linkedin_people_list_tracker) > 0:
json_dict["linkedin_people"] = [person for person in list(sorted(set(linkedin_people_list_tracker)))]
json_dict["linkedin_people"] = linkedin_people_list_tracker
if len(linkedin_links_tracker) > 0:
json_dict["linkedin_links"] = [link for link in list(sorted(set(linkedin_links_tracker)))]
json_dict["linkedin_links"] = linkedin_links_tracker
shodan_dict: Dict = dict()
if shodanres != []:
shodanalysis: List = []
for x in shodanres:
res = x.split('SAPO')
shodan_dict[res[0]] = [res[2], [res[1]]]
reg_server = re.compile('Server:.*')
temp = reg_server.findall(res[1])
if temp:
shodanalysis.append(res[0] + ':' + temp[0])
file.write('</shodan>')
if shodanalysis:
shodanalysis = sorted(set(shodanalysis))
shodan_dict["servers"] = [server for server in shodanalysis]
json_dict["shodan"] = shodan_dict
json_dict["shodan"] = shodanres
with open(filename, 'wb+') as fp:
fp.write(orjson.dumps(json_dict, option=orjson.OPT_SORT_KEYS))
print('[*] JSON File saved.')

View file

@ -0,0 +1,22 @@
from typing import Type
from theHarvester.lib.core import *
class SearchAnubis:
def __init__(self, word):
self.word = word
self.totalhosts = list
self.proxy = False
async def do_search(self):
url = f'https://jldc.me/anubis/subdomains/{self.word}'
response = await AsyncFetcher.fetch_all([url], json=True, proxy=self.proxy)
self.totalhosts: list = response[0]
async def get_hostnames(self) -> Type[list]:
return self.totalhosts
async def process(self, proxy=False):
self.proxy = proxy
await self.do_search()

View file

@ -113,7 +113,8 @@ def banner() -> None:
@staticmethod
def get_supportedengines() -> Set[Union[str, Any]]:
supportedengines = {'baidu',
supportedengines = {'anubis',
'baidu',
'binaryedge',
'bing',
'bingapi',