Merge pull request #85 from NotoriousRebel/master

Fixed some github tests and other small misc. changes
This commit is contained in:
J.Townsend 2020-02-15 20:58:59 +00:00 committed by GitHub
commit 7c4eca3ca8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 15 additions and 32 deletions

View file

@ -1,5 +1,4 @@
from theHarvester.discovery import githubcode
from theHarvester.discovery.githubcode import RetryResult, ErrorResult, SuccessResult
from theHarvester.discovery.constants import MissingKey
from theHarvester.lib.core import Core
from unittest.mock import MagicMock
@ -74,44 +73,27 @@ async def test_missing_key(self):
async def test_fragments_from_response(self):
Core.github_key = MagicMock(return_value="lol")
test_class_instance = githubcode.SearchGithubCode(word="test", limit=500)
test_result = await test_class_instance.fragments_from_response(self.OkResponse.response)
test_result = await test_class_instance.fragments_from_response(self.OkResponse.response.json())
print('test_result: ', test_result)
assert test_result == ["test1", "test2"]
async def test_invalid_fragments_from_response(self):
Core.github_key = MagicMock(return_value="lol")
test_class_instance = githubcode.SearchGithubCode(word="test", limit=500)
test_result = test_class_instance.fragments_from_response(self.MalformedResponse.response)
test_result = await test_class_instance.fragments_from_response(self.MalformedResponse.response.json())
assert test_result == []
async def test_handle_response_ok(self):
Core.github_key = MagicMock(return_value="lol")
test_class_instance = githubcode.SearchGithubCode(word="test", limit=500)
test_result = test_class_instance.handle_response(self.OkResponse.response)
assert isinstance(test_result, SuccessResult)
async def test_handle_response_retry(self):
Core.github_key = MagicMock(return_value="lol")
test_class_instance = githubcode.SearchGithubCode(word="test", limit=500)
test_result = test_class_instance.handle_response(self.RetryResponse.response)
assert isinstance(test_result, RetryResult)
async def test_handle_response_fail(self):
Core.github_key = MagicMock(return_value="lol")
test_class_instance = githubcode.SearchGithubCode(word="test", limit=500)
test_result = test_class_instance.handle_response(self.FailureResponse.response)
assert isinstance(test_result, ErrorResult)
async def test_next_page(self):
Core.github_key = MagicMock(return_value="lol")
test_class_instance = githubcode.SearchGithubCode(word="test", limit=500)
test_result = githubcode.SuccessResult(list(), next_page=2, last_page=4)
assert(2 == test_class_instance.next_page_or_end(test_result))
assert(2 == await test_class_instance.next_page_or_end(test_result))
async def test_last_page(self):
Core.github_key = MagicMock(return_value="lol")
test_class_instance = githubcode.SearchGithubCode(word="test", limit=500)
test_result = githubcode.SuccessResult(list(), None, None)
assert(None is test_class_instance.next_page_or_end(test_result))
assert(None is await test_class_instance.next_page_or_end(test_result))
if __name__ == '__main__':
pytest.main()

View file

@ -16,11 +16,13 @@ async def do_search(self):
dct = responses
self.totalhosts: set = {
host.split(',')[0].replace('www','') if ',' in host and self.word.replace('www.', '') in host.split(',')[0] in host else
host.split(',')[0].replace('www.', '') if ',' in host and self.word.replace('www.', '') in host.split(',')[
0] in host else
host.split(',')[1] for host in dct['FDNS_A']}
self.totalips: set = {ip.split(',')[0]for ip in dct['FDNS_A'] if
self.totalips: set = {ip.split(',')[0] for ip in dct['FDNS_A'] if
re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip.split(',')[0])}
async def get_hostnames(self) -> set:
return self.totalhosts

View file

@ -1,5 +1,4 @@
from theHarvester.lib.core import *
import aiohttp
class SearchCertspoter:
@ -11,11 +10,9 @@ def __init__(self, word):
async def do_search(self) -> None:
base_url = f'https://api.certspotter.com/v1/issuances?domain={self.word}&expand=dns_names'
headers = {'User-Agent': Core.get_user_agent()}
try:
client = aiohttp.ClientSession(headers=headers, timeout=aiohttp.ClientTimeout(total=30))
response = await AsyncFetcher.fetch(client, base_url, json=True, proxy=self.proxy)
await client.close()
response = await AsyncFetcher.fetch_all([base_url], json=True, proxy=self.proxy)
response = response[0]
if isinstance(response, list):
for dct in response:
for key, value in dct.items():

View file

@ -1,5 +1,6 @@
from theHarvester.lib.core import *
import aiohttp
from typing import Set
class SearchCrtsh:
@ -18,7 +19,8 @@ async def do_search(self) -> Set:
response = await AsyncFetcher.fetch(client, url, json=True, proxy=self.proxy)
await client.close()
data = set(
[dct['name_value'][2:] if '*.' == dct['name_value'][:2] else dct['name_value'] for dct in response])
[dict(dct)['name_value'][2:] if '*.' == dict(dct)['name_value'][:2] else dict(dct)['name_value']
for dct in response])
except Exception as e:
print(e)
return data

View file

@ -95,7 +95,7 @@ async def do_search(self, page: Optional[int]) -> Tuple[str, dict, int, Any]:
async with sess.get(url, proxy=random.choice(Core.proxy_list())) as resp:
return await resp.text(), await resp.json(), resp.status, resp.links
else:
async with sess.get(url, ) as resp:
async with sess.get(url) as resp:
return await resp.text(), await resp.json(), resp.status, resp.links
@staticmethod