Made takeover module asynchronous and added a flag if user wishes to perform subdomain takeover checks.

This commit is contained in:
NotoriousRebel 2020-01-12 17:41:46 -05:00
parent 93e1f00e0d
commit babb2bd206
3 changed files with 80 additions and 38 deletions

View file

@ -36,6 +36,8 @@ async def start():
parser.add_argument('-t', '--dns-tld', help='perform a DNS TLD expansion discovery, default False', default=False)
parser.add_argument('-n', '--dns-lookup', help='enable DNS server lookup, default False', default=False,
action='store_true')
parser.add_argument('-r', '--take-over', help='Check for takeovers', default=False,
action='store_true')
parser.add_argument('-c', '--dns-brute', help='perform a DNS brute force on the domain', default=False,
action='store_true')
parser.add_argument('-f', '--filename', help='save the results to an HTML and/or XML file', default='', type=str)
@ -73,6 +75,7 @@ async def start():
vhost: list = []
virtual = args.virtual_host
word: str = args.domain
takeover_status = args.take_over
async def store(search_engine: Any, source: str, process_param: Any = None, store_host: bool = False,
store_emails: bool = False, store_ip: bool = False, store_people: bool = False,
@ -464,8 +467,7 @@ async def handler(lst):
if ports_scanning:
print('\n\n[*] Scanning ports (active).\n')
for x in full:
host = x.split(':')[1]
domain = x.split(':')[0]
domain, host = x.split(':')
if host != 'empty':
print(('[*] Scanning ' + host))
ports = [21, 22, 80, 443, 8080]
@ -476,10 +478,14 @@ async def handler(lst):
print(('\t[*] Detected open ports: ' + ','.join(str(e) for e in openports)))
takeover_check = 'True'
if takeover_check == 'True' and len(openports) > 0:
search_take = takeover.TakeOver(domain)
search_take.process()
search_take = takeover.TakeOver([domain])
await search_take.process()
except Exception as e:
print(e)
if takeover_status:
print('Performing takeover check')
search_take = takeover.TakeOver(all_hosts)
await search_take.process()
# DNS reverse lookup
dnsrev = []

View file

@ -1,44 +1,61 @@
from theHarvester.lib.core import *
import re
import requests
class TakeOver:
def __init__(self, host):
self.host = host
def __init__(self, hosts):
self.hosts = hosts
self.results = ""
self.totalresults = ""
self.fingerprints = ["<title>Squarespace - Domain Not Claimed</title>",
'www.herokucdn.com/error-pages/no-such-app.html',
'<title>Squarespace - No Such Account</title>',
"<p> If you're trying to publish one, <a href=\"https://help.github.com/pages/\">read the full documentation</a> to learn how to set up <strong>GitHub Pages</strong> for your repository, organization, or user account. </p>",
"<p> If you\'re trying to publish one, <a href=\"https://help.github.com/pages/\">read the full documentation</a> to learn how to set up <strong>GitHub Pages</strong> for your repository, organization, or user account. </p>",
"<span class=\"title\">Bummer. It looks like the help center that you are trying to reach no longer exists.</span>",
"<head> <title>The page you\'re looking for could not be found (404)</title> <style> body { color: #666; text-align: center; font-family: \"Helvetica Neue\", Helvetica, Arial, sans-serif; margin: 0; width: 800px; margin: auto; font-size: 14px; } h1 { font-size: 56px; line-height: 100px; font-weight: normal; color: #456; } h2 { font-size: 24px; color: #666; line-height: 1.5em; } h3 { color: #456; font-size: 20px; font-weight: normal; line-height: 28px; } hr { margin: 18px 0; border: 0; border-top: 1px solid #EEE; border-bottom: 1px solid white; } </style> </head>",
'The specified bucket does not exist',
'Bad Request: ERROR: The request could not be satisfied',
'Fastly error: unknown domain:',
"There isn't a Github Pages site here.",
'No such app',
'Unrecognized domain',
'Sorry, this shop is currently unavailable.',
"Whatever you were looking for doesn't currently exist at this address",
'The requested URL was not found on this server.',
'This UserVoice subdomain is currently available!',
'Do you want to register *.wordpress.com?',
'Help Center Closed']
def do_take(self):
# Thank you to https://github.com/EdOverflow/can-i-take-over-xyz for these fingerprints
self.fingerprints = {"'Trying to access your account?'": 'Campaign Monitor',
'404 Not Found': 'Fly.io',
'404 error unknown site!': 'Pantheon',
'Do you want to register *.wordpress.com?': 'Wordpress',
'Domain uses DO name serves with no records in DO.': 'Digital Ocean',
"It looks like you may have taken a wrong turn somewhere. Don't worry...it happens to all of us.": 'LaunchRock',
'No Site For Domain': 'Kinsta',
'No settings were found for this company:': 'Help Scout',
'Project doesnt exist... yet!': 'Readme.io',
'Repository not found': 'Bitbucket',
'The feed has not been found.': 'Feedpress',
'No such app': 'Heroku',
'The specified bucket does not exist': 'AWS/S3',
'The thing you were looking for is no longer here, or never was': 'Ghost',
"There isn't a Github Pages site here.": 'Github',
'This UserVoice subdomain is currently available!': 'UserVoice',
"Uh oh. That page doesn't exist.": 'Intercom',
"We could not find what you're looking for.": 'Help Juice',
"Whatever you were looking for doesn't currently exist at this address": 'Tumblr',
'is not a registered InCloud YouTrack': 'JetBrains',
'page not found': 'Uptimerobot',
'project not found': 'Surge.sh'}
async def check(self, url, resp):
# Simple function that takes response and checks if any fingerprints exists
# If a fingerprint exists figures out which one and prints it out
regex = re.compile("(?=(" + "|".join(map(re.escape, list(self.fingerprints.keys()))) + "))")
# Sanitize fingerprints
matches = re.findall(regex, resp)
for match in matches:
print(f'\t\033[91m Takeover detected: {url}\033[1;32;40m')
if match in self.fingerprints.keys():
# Sanity check as to not error out
print(f'\t\033[91m Type of takeover is: {self.fingerprints[match]}\033[1;32;40m')
async def do_take(self):
try:
print(f'\t Searching takeovers for {self.host}')
r = requests.get(f'https://{self.host}', verify=False)
for x in self.fingerprints:
take_reg = re.compile(x)
self.temp = take_reg.findall(r.text)
if self.temp != []:
print(f'\t\033[91m Takeover detected! - {self.host}\033[1;32;40m')
tup_resps: list = await AsyncFetcher.fetch_all(self.hosts, takeover=True)
# Returns a list of tuples in this format: (url, response)
tup_resps = [tup for tup in tup_resps if tup[1] != '']
# Filter out responses whose responses are empty strings (indicates errored)
for url, resp in tup_resps:
await self.check(url, resp)
except Exception as e:
print(e)
def process(self):
self.do_take()
async def process(self):
await self.do_take()

View file

@ -1,7 +1,7 @@
# coding=utf-8
import random
from typing import Set, Union, Any
from typing import Set, Union, Any, Tuple
import yaml
import asyncio
import aiohttp
@ -415,12 +415,31 @@ async def fetch(session, url, params='', json=False) -> Union[str, dict, list]:
return ''
@staticmethod
async def fetch_all(urls, headers='', params='', json=False) -> list:
async def takeover_fetch(session, url) -> Union[Tuple[Any, Any], str]:
# This fetch method solely focuses on get requests
try:
# Wrap in try except due to 0x89 png/jpg files
# This fetch method solely focuses on get requests
# TODO determine if method for post requests is necessary
url = f'http://{url}' if str(url).startswith(('http:', 'https:')) is False else url
# Clean up urls with proper schemas
async with session.get(url) as response:
await asyncio.sleep(2)
return url, await response.text()
except Exception:
return url, ''
@staticmethod
async def fetch_all(urls, headers='', params='', json=False, takeover=False) -> list:
# By default timeout is 5 minutes, 30 seconds should suffice
timeout = aiohttp.ClientTimeout(total=30)
if len(headers) == 0:
headers = {'User-Agent': Core.get_user_agent()}
if takeover:
async with aiohttp.ClientSession(headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as session:
tuples = await asyncio.gather(*[AsyncFetcher.takeover_fetch(session, url) for url in urls])
return tuples
if len(params) == 0:
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
texts = await asyncio.gather(*[AsyncFetcher.fetch(session, url, json=json) for url in urls])