Add error handling to GitHub code search methods

Refactor GitHub code search to include comprehensive try-except blocks across methods. This enhances robustness by capturing and logging exceptions, ensuring the system fails gracefully and provides meaningful error messages.
This commit is contained in:
L1ghtn1ng 2024-10-26 20:18:04 +01:00
parent 0b0d6516b1
commit e2acaa099e

View file

@ -27,113 +27,113 @@ class ErrorResult(NamedTuple):
class SearchGithubCode:
def __init__(self, word, limit) -> None:
self.word = word
self.total_results = ''
self.server = 'api.github.com'
self.limit = limit
self.counter: int = 0
self.page: int | None = 1
self.key = Core.github_key()
# If you don't have a personal access token, GitHub narrows your search capabilities significantly
# rate limits you more severely
# https://developer.github.com/v3/search/#rate-limit
if self.key is None:
raise MissingKey('Github')
self.proxy = False
try:
self.word = word
self.total_results = ''
self.server = 'api.github.com'
self.limit = limit
self.counter = 0
self.page = 1
self.key = Core.github_key()
if self.key is None:
raise MissingKey('Github')
self.proxy = False
self.base_url = f'https://{self.server}/search/code?q="{self.word}"'
self.headers = {
'Host': self.server,
'User-agent': Core.get_user_agent(),
'Accept': 'application/vnd.github.v3.text-match+json',
'Authorization': f'token {self.key}',
}
except Exception as e:
print(f'Error initializing SearchGithubCode: {e}')
raise
@staticmethod
async def fragments_from_response(json_data: dict) -> list[str]:
items: list[dict[str, Any]] = json_data.get('items') or list()
fragments: list[str] = list()
for item in items:
matches = item.get('text_matches') or list()
for match in matches:
fragments.append(match.get('fragment'))
return [fragment for fragment in fragments if fragment is not None]
try:
return [match['fragment'] for item in json_data.get('items', [])
for match in item.get('text_matches', [])
if match.get('fragment') is not None]
except Exception as e:
print(f'Error extracting fragments: {e}')
return []
@staticmethod
async def page_from_response(page: str, links) -> int | None:
page_link = links.get(page)
if page_link:
parsed = urlparse.urlparse(str(page_link.get('url')))
params = urlparse.parse_qs(parsed.query)
pages: list[Any] = params.get('page', [None])
page_number = pages[0] and int(pages[0])
return page_number
else:
try:
if page_link := links.get(page):
parsed = urlparse.urlparse(str(page_link.get('url')))
if page_param := urlparse.parse_qs(parsed.query).get('page', [None])[0]:
return int(page_param)
return None
except Exception as e:
print(f'Error parsing page response: {e}')
return None
async def handle_response(self, response: tuple[str, dict, int, Any]) -> ErrorResult | RetryResult | SuccessResult:
text, json_data, status, links = response
if status == 200:
results = await self.fragments_from_response(json_data)
next_page = await self.page_from_response('next', links)
last_page = await self.page_from_response('last', links)
return SuccessResult(results, next_page, last_page)
elif status == 429 or status == 403:
return RetryResult(60)
else:
try:
return ErrorResult(status, json_data)
except ValueError:
return ErrorResult(status, text)
try:
text, json_data, status, links = response
if status == 200:
results = await self.fragments_from_response(json_data)
next_page = await self.page_from_response('next', links)
last_page = await self.page_from_response('last', links)
return SuccessResult(results, next_page, last_page)
if status in (429, 403):
return RetryResult(60)
return ErrorResult(status, json_data if isinstance(json_data, dict) else text)
except Exception as e:
print(f'Error handling response: {e}')
return ErrorResult(500, str(e))
async def do_search(self, page: int) -> tuple[str, dict, int, Any]:
if page is None:
url = f'https://{self.server}/search/code?q="{self.word}"'
else:
url = f'https://{self.server}/search/code?q="{self.word}"&page={page}'
headers = {
'Host': self.server,
'User-agent': Core.get_user_agent(),
'Accept': 'application/vnd.github.v3.text-match+json',
'Authorization': f'token {self.key}',
}
async with aiohttp.ClientSession(headers=headers) as sess:
if self.proxy:
async with sess.get(url, proxy=random.choice(Core.proxy_list())) as resp:
try:
url = f'{self.base_url}&page={page}' if page else self.base_url
async with aiohttp.ClientSession(headers=self.headers) as sess:
async with sess.get(url, proxy=random.choice(Core.proxy_list()) if self.proxy else None) as resp:
return await resp.text(), await resp.json(), resp.status, resp.links
else:
async with sess.get(url) as resp:
return await resp.text(), await resp.json(), resp.status, resp.links
@staticmethod
async def next_page_or_end(result: SuccessResult) -> int | None:
if result.next_page is not None:
return result.next_page
else:
return result.last_page
except Exception as e:
print(f'Error performing search: {e}')
return '', {}, 500, {}
async def process(self, proxy: bool = False) -> None:
self.proxy = proxy
try:
self.proxy = proxy
while self.counter <= self.limit and self.page is not None:
api_response = await self.do_search(self.page)
result = await self.handle_response(api_response)
if isinstance(result, SuccessResult):
print(f'\tSearching {self.counter} results.')
for fragment in result.fragments:
self.total_results += fragment
self.counter = self.counter + 1
self.page = await self.next_page_or_end(result)
try:
api_response = await self.do_search(self.page)
result = await self.handle_response(api_response)
if isinstance(result, SuccessResult):
print(f'\tSearching {self.counter} results.')
self.total_results += ''.join(result.fragments)
self.counter += len(result.fragments)
self.page = result.next_page or result.last_page
await asyncio.sleep(get_delay())
elif isinstance(result, RetryResult):
sleepy_time = get_delay() + result.time
print(f'\tRetrying page in {sleepy_time} seconds...')
await asyncio.sleep(sleepy_time)
else:
print(f'\tException occurred: status_code: {result.status_code} reason: {result.body}')
except Exception as e:
print(f'Error processing page: {e}')
await asyncio.sleep(get_delay())
elif isinstance(result, RetryResult):
sleepy_time = get_delay() + result.time
print(f'\tRetrying page in {sleepy_time} seconds...')
await asyncio.sleep(sleepy_time)
elif isinstance(result, ErrorResult):
raise Exception(f'\tException occurred: status_code: {result.status_code} reason: {result.body}')
else:
raise Exception('\tUnknown exception occurred')
except Exception as e:
print(f'An exception has occurred: {e}')
print(f'An exception has occurred in githubcode process: {e}')
async def get_emails(self):
rawres = myparser.Parser(self.total_results, self.word)
return await rawres.emails()
try:
rawres = myparser.Parser(self.total_results, self.word)
return await rawres.emails()
except Exception as e:
print(f'Error getting emails: {e}')
return []
async def get_hostnames(self):
rawres = myparser.Parser(self.total_results, self.word)
return await rawres.hostnames()
try:
rawres = myparser.Parser(self.total_results, self.word)
return await rawres.hostnames()
except Exception as e:
print(f'Error getting hostnames: {e}')
return []