diff --git a/bookmarks/services/website_loader.py b/bookmarks/services/website_loader.py index ef4cefa..fb3fc40 100644 --- a/bookmarks/services/website_loader.py +++ b/bookmarks/services/website_loader.py @@ -1,9 +1,12 @@ +import logging from dataclasses import dataclass import requests from bs4 import BeautifulSoup from charset_normalizer import from_bytes +logger = logging.getLogger(__name__) + @dataclass class WebsiteMetadata: @@ -33,15 +36,37 @@ def load_website_metadata(url: str): return WebsiteMetadata(url=url, title=title, description=description) +CHUNK_SIZE = 50 * 1024 +MAX_CONTENT_LIMIT = 5000 * 1024 + + def load_page(url: str): headers = fake_request_headers() - r = requests.get(url, timeout=10, headers=headers) + size = 0 + content = None + # Use with to ensure request gets closed even if it's only read partially + with requests.get(url, timeout=10, headers=headers, stream=True) as r: + for chunk in r.iter_content(chunk_size=CHUNK_SIZE): + size += len(chunk) + if content is None: + content = chunk + else: + content = content + chunk + + # Stop reading if we have parsed end of head tag + if ''.encode('utf-8') in content: + logger.debug(f'Found closing head tag after {size} bytes') + break + # Stop reading if we exceed limit + if size > MAX_CONTENT_LIMIT: + logger.debug(f'Cancel reading document after {size} bytes') + break # Use charset_normalizer to determine encoding that best matches the response content # Several sites seem to specify the response encoding incorrectly, so we ignore it and use custom logic instead # This is different from Response.text which does respect the encoding specified in the response first, # before trying to determine one - results = from_bytes(r.content) + results = from_bytes(content or '') return str(results.best()) diff --git a/bookmarks/tests/test_website_loader.py b/bookmarks/tests/test_website_loader.py new file mode 100644 index 0000000..bc3bf9c --- /dev/null +++ b/bookmarks/tests/test_website_loader.py @@ -0,0 +1,53 @@ +from unittest import mock +from bookmarks.services import website_loader + +from django.test import TestCase + + +class MockStreamingResponse: + def __init__(self, num_chunks, chunk_size, insert_head_after_chunk=None): + self.chunks = [] + for index in range(num_chunks): + chunk = ''.zfill(chunk_size) + self.chunks.append(chunk.encode('utf-8')) + + if index == insert_head_after_chunk: + self.chunks.append(''.encode('utf-8')) + + def iter_content(self, **kwargs): + return self.chunks + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + pass + + +class WebsiteLoaderTestCase(TestCase): + def test_load_page_returns_content(self): + with mock.patch('requests.get') as mock_get: + mock_get.return_value = MockStreamingResponse(num_chunks=10, chunk_size=1024) + content = website_loader.load_page('https://example.com') + + expected_content_size = 10 * 1024 + self.assertEqual(expected_content_size, len(content)) + + def test_load_page_limits_large_documents(self): + with mock.patch('requests.get') as mock_get: + mock_get.return_value = MockStreamingResponse(num_chunks=10, chunk_size=1024 * 1000) + content = website_loader.load_page('https://example.com') + + # Should have read six chunks, after which content exceeds the max of 5MB + expected_content_size = 6 * 1024 * 1000 + self.assertEqual(expected_content_size, len(content)) + + def test_load_page_stops_reading_at_closing_head_tag(self): + with mock.patch('requests.get') as mock_get: + mock_get.return_value = MockStreamingResponse(num_chunks=10, chunk_size=1024 * 1000, + insert_head_after_chunk=0) + content = website_loader.load_page('https://example.com') + + # Should have read first chunk, and second chunk containing closing head tag + expected_content_size = 1 * 1024 * 1000 + len('') + self.assertEqual(expected_content_size, len(content))