diff --git a/makeHosts.py b/makeHosts.py index 8fcca7e8d..436cb7234 100644 --- a/makeHosts.py +++ b/makeHosts.py @@ -54,59 +54,89 @@ def update_readme_file(): def main(): - parser = argparse.ArgumentParser(description="Creates custom hosts " - "file from hosts stored in " - "data subfolders.") + parser = argparse.ArgumentParser( + description="Creates custom hosts " + "file from hosts stored in " + "data subfolders." + ) parser.parse_args() - update_hosts_file("-a", "-o", - "alternates/gambling", - "-e", "gambling") - update_hosts_file("-a", "-n", "-o", - "alternates/porn", - "-e", "porn") - update_hosts_file("-a", "-n", "-o", - "alternates/social", - "-e", "social") - update_hosts_file("-a", "-n", "-o", - "alternates/fakenews", - "-e", "fakenews") + update_hosts_file("-a", "-o", "alternates/gambling", "-e", "gambling") + update_hosts_file("-a", "-n", "-o", "alternates/porn", "-e", "porn") + update_hosts_file("-a", "-n", "-o", "alternates/social", "-e", "social") + update_hosts_file("-a", "-n", "-o", "alternates/fakenews", "-e", "fakenews") - update_hosts_file("-a", "-n", "-o", - "alternates/fakenews-gambling", - "-e", "fakenews", "gambling") - update_hosts_file("-a", "-n", "-o", - "alternates/fakenews-porn", - "-e", "fakenews", "porn") - update_hosts_file("-a", "-n", "-o", - "alternates/fakenews-social", - "-e", "fakenews", "social") - update_hosts_file("-a", "-n", "-o", - "alternates/gambling-porn", - "-e", "gambling", "porn") - update_hosts_file("-a", "-n", "-o", - "alternates/gambling-social", - "-e", "gambling", "social") - update_hosts_file("-a", "-n", "-o", - "alternates/porn-social", - "-e", "porn", "social") + update_hosts_file( + "-a", "-n", "-o", "alternates/fakenews-gambling", "-e", "fakenews", "gambling" + ) + update_hosts_file( + "-a", "-n", "-o", "alternates/fakenews-porn", "-e", "fakenews", "porn" + ) + update_hosts_file( + "-a", "-n", "-o", "alternates/fakenews-social", "-e", "fakenews", "social" + ) + update_hosts_file( + "-a", "-n", "-o", "alternates/gambling-porn", "-e", "gambling", "porn" + ) + update_hosts_file( + "-a", "-n", "-o", "alternates/gambling-social", "-e", "gambling", "social" + ) + update_hosts_file( + "-a", "-n", "-o", "alternates/porn-social", "-e", "porn", "social" + ) - update_hosts_file("-a", "-n", "-o", - "alternates/fakenews-gambling-porn", - "-e", "fakenews", "gambling", "porn") - update_hosts_file("-a", "-n", "-o", - "alternates/fakenews-gambling-social", - "-e", "fakenews", "gambling", "social") - update_hosts_file("-a", "-n", "-o", - "alternates/fakenews-porn-social", - "-e", "fakenews", "porn", "social") - update_hosts_file("-a", "-n", "-o", - "alternates/gambling-porn-social", - "-e", "gambling", "porn", "social") + update_hosts_file( + "-a", + "-n", + "-o", + "alternates/fakenews-gambling-porn", + "-e", + "fakenews", + "gambling", + "porn", + ) + update_hosts_file( + "-a", + "-n", + "-o", + "alternates/fakenews-gambling-social", + "-e", + "fakenews", + "gambling", + "social", + ) + update_hosts_file( + "-a", + "-n", + "-o", + "alternates/fakenews-porn-social", + "-e", + "fakenews", + "porn", + "social", + ) + update_hosts_file( + "-a", + "-n", + "-o", + "alternates/gambling-porn-social", + "-e", + "gambling", + "porn", + "social", + ) - update_hosts_file("-a", "-n", "-o", - "alternates/fakenews-gambling-porn-social", - "-e", "fakenews", "gambling", "porn", "social") + update_hosts_file( + "-a", + "-n", + "-o", + "alternates/fakenews-gambling-porn-social", + "-e", + "fakenews", + "gambling", + "porn", + "social", + ) update_hosts_file("-a", "-n") diff --git a/testUpdateHostsFile.py b/testUpdateHostsFile.py index 6b13b567d..e14139c4b 100644 --- a/testUpdateHostsFile.py +++ b/testUpdateHostsFile.py @@ -17,26 +17,44 @@ import unittest.mock as mock from io import BytesIO, StringIO import updateHostsFile -from updateHostsFile import (Colors, colorize, display_exclusion_options, - domain_to_idna, exclude_domain, flush_dns_cache, - gather_custom_exclusions, get_defaults, - get_file_by_url, is_valid_domain_format, - matches_exclusions, move_hosts_file_into_place, - normalize_rule, path_join_robust, print_failure, - print_success, prompt_for_exclusions, - prompt_for_flush_dns_cache, prompt_for_move, - prompt_for_update, query_yes_no, recursive_glob, - remove_old_hosts_file, strip_rule, supports_color, - update_all_sources, update_readme_data, - update_sources_data, write_data, - write_opening_header) +from updateHostsFile import ( + Colors, + colorize, + display_exclusion_options, + domain_to_idna, + exclude_domain, + flush_dns_cache, + gather_custom_exclusions, + get_defaults, + get_file_by_url, + is_valid_domain_format, + matches_exclusions, + move_hosts_file_into_place, + normalize_rule, + path_join_robust, + print_failure, + print_success, + prompt_for_exclusions, + prompt_for_flush_dns_cache, + prompt_for_move, + prompt_for_update, + query_yes_no, + recursive_glob, + remove_old_hosts_file, + strip_rule, + supports_color, + update_all_sources, + update_readme_data, + update_sources_data, + write_data, + write_opening_header, +) unicode = str # Test Helper Objects class Base(unittest.TestCase): - @staticmethod def mock_property(name): return mock.patch(name, new_callable=mock.PropertyMock) @@ -50,7 +68,6 @@ class Base(unittest.TestCase): class BaseStdout(Base): - def setUp(self): sys.stdout = StringIO() @@ -60,7 +77,6 @@ class BaseStdout(Base): class BaseMockDir(Base): - @property def dir_count(self): return len(os.listdir(self.test_dir)) @@ -70,51 +86,53 @@ class BaseMockDir(Base): def tearDown(self): shutil.rmtree(self.test_dir) + + # End Test Helper Objects # Project Settings class TestGetDefaults(Base): - def test_get_defaults(self): with self.mock_property("updateHostsFile.BASEDIR_PATH"): updateHostsFile.BASEDIR_PATH = "foo" actual = get_defaults() - expected = {"numberofrules": 0, - "datapath": "foo" + self.sep + "data", - "freshen": True, - "replace": False, - "backup": False, - "skipstatichosts": False, - "keepdomaincomments": True, - "extensionspath": "foo" + self.sep + "extensions", - "extensions": [], - "compress": False, - "minimise": False, - "outputsubfolder": "", - "hostfilename": "hosts", - "targetip": "0.0.0.0", - "sourcedatafilename": "update.json", - "sourcesdata": [], - "readmefilename": "readme.md", - "readmetemplate": ("foo" + self.sep + - "readme_template.md"), - "readmedata": {}, - "readmedatafilename": ("foo" + self.sep + - "readmeData.json"), - "exclusionpattern": r"([a-zA-Z\d-]+\.){0,}", - "exclusionregexs": [], - "exclusions": [], - "commonexclusions": ["hulu.com"], - "blacklistfile": "foo" + self.sep + "blacklist", - "whitelistfile": "foo" + self.sep + "whitelist"} + expected = { + "numberofrules": 0, + "datapath": "foo" + self.sep + "data", + "freshen": True, + "replace": False, + "backup": False, + "skipstatichosts": False, + "keepdomaincomments": True, + "extensionspath": "foo" + self.sep + "extensions", + "extensions": [], + "compress": False, + "minimise": False, + "outputsubfolder": "", + "hostfilename": "hosts", + "targetip": "0.0.0.0", + "sourcedatafilename": "update.json", + "sourcesdata": [], + "readmefilename": "readme.md", + "readmetemplate": ("foo" + self.sep + "readme_template.md"), + "readmedata": {}, + "readmedatafilename": ("foo" + self.sep + "readmeData.json"), + "exclusionpattern": r"([a-zA-Z\d-]+\.){0,}", + "exclusionregexs": [], + "exclusions": [], + "commonexclusions": ["hulu.com"], + "blacklistfile": "foo" + self.sep + "blacklist", + "whitelistfile": "foo" + self.sep + "whitelist", + } self.assertDictEqual(actual, expected) + + # End Project Settings # Prompt the User class TestPromptForUpdate(BaseStdout, BaseMockDir): - def setUp(self): BaseStdout.setUp(self) BaseMockDir.setUp(self) @@ -174,8 +192,10 @@ class TestPromptForUpdate(BaseStdout, BaseMockDir): prompt_for_update(freshen=False, update_auto=False) output = sys.stdout.getvalue() - expected = ("ERROR: No 'hosts' file in the folder. " - "Try creating one manually.") + expected = ( + "ERROR: No 'hosts' file in the folder. " + "Try creating one manually." + ) self.assertIn(expected, output) sys.stdout = StringIO() @@ -197,8 +217,7 @@ class TestPromptForUpdate(BaseStdout, BaseMockDir): self.assertFalse(update_sources) output = sys.stdout.getvalue() - expected = ("OK, we'll stick with " - "what we've got locally.") + expected = "OK, we'll stick with " "what we've got locally." self.assertIn(expected, output) sys.stdout = StringIO() @@ -223,8 +242,9 @@ class TestPromptForUpdate(BaseStdout, BaseMockDir): dir_count = self.dir_count for update_auto in (False, True): - update_sources = prompt_for_update(freshen=True, - update_auto=update_auto) + update_sources = prompt_for_update( + freshen=True, update_auto=update_auto + ) self.assertTrue(update_sources) output = sys.stdout.getvalue() @@ -243,7 +263,6 @@ class TestPromptForUpdate(BaseStdout, BaseMockDir): class TestPromptForExclusions(BaseStdout): - @mock.patch("updateHostsFile.query_yes_no", return_value=False) def testSkipPrompt(self, mock_query): gather_exclusions = prompt_for_exclusions(skip_prompt=True) @@ -260,8 +279,7 @@ class TestPromptForExclusions(BaseStdout): self.assertFalse(gather_exclusions) output = sys.stdout.getvalue() - expected = ("OK, we'll only exclude " - "domains in the whitelist.") + expected = "OK, we'll only exclude " "domains in the whitelist." self.assertIn(expected, output) self.assert_called_once(mock_query) @@ -278,13 +296,11 @@ class TestPromptForExclusions(BaseStdout): class TestPromptForFlushDnsCache(Base): - @mock.patch("updateHostsFile.flush_dns_cache", return_value=0) @mock.patch("updateHostsFile.query_yes_no", return_value=False) def testFlushCache(self, mock_query, mock_flush): for prompt_flush in (False, True): - prompt_for_flush_dns_cache(flush_cache=True, - prompt_flush=prompt_flush) + prompt_for_flush_dns_cache(flush_cache=True, prompt_flush=prompt_flush) mock_query.assert_not_called() self.assert_called_once(mock_flush) @@ -295,8 +311,7 @@ class TestPromptForFlushDnsCache(Base): @mock.patch("updateHostsFile.flush_dns_cache", return_value=0) @mock.patch("updateHostsFile.query_yes_no", return_value=False) def testNoFlushCacheNoPrompt(self, mock_query, mock_flush): - prompt_for_flush_dns_cache(flush_cache=False, - prompt_flush=False) + prompt_for_flush_dns_cache(flush_cache=False, prompt_flush=False) mock_query.assert_not_called() mock_flush.assert_not_called() @@ -304,8 +319,7 @@ class TestPromptForFlushDnsCache(Base): @mock.patch("updateHostsFile.flush_dns_cache", return_value=0) @mock.patch("updateHostsFile.query_yes_no", return_value=False) def testNoFlushCachePromptNoFlush(self, mock_query, mock_flush): - prompt_for_flush_dns_cache(flush_cache=False, - prompt_flush=True) + prompt_for_flush_dns_cache(flush_cache=False, prompt_flush=True) self.assert_called_once(mock_query) mock_flush.assert_not_called() @@ -313,15 +327,13 @@ class TestPromptForFlushDnsCache(Base): @mock.patch("updateHostsFile.flush_dns_cache", return_value=0) @mock.patch("updateHostsFile.query_yes_no", return_value=True) def testNoFlushCachePromptFlush(self, mock_query, mock_flush): - prompt_for_flush_dns_cache(flush_cache=False, - prompt_flush=True) + prompt_for_flush_dns_cache(flush_cache=False, prompt_flush=True) self.assert_called_once(mock_query) self.assert_called_once(mock_flush) class TestPromptForMove(Base): - def setUp(self): Base.setUp(self) self.final_file = "final.txt" @@ -334,8 +346,9 @@ class TestPromptForMove(Base): def testSkipStaticHosts(self, mock_query, mock_move): for replace in (False, True): for auto in (False, True): - move_file = self.prompt_for_move(replace=replace, auto=auto, - skipstatichosts=True) + move_file = self.prompt_for_move( + replace=replace, auto=auto, skipstatichosts=True + ) self.assertFalse(move_file) mock_query.assert_not_called() @@ -348,8 +361,9 @@ class TestPromptForMove(Base): @mock.patch("updateHostsFile.query_yes_no", return_value=False) def testReplaceNoSkipStaticHosts(self, mock_query, mock_move): for auto in (False, True): - move_file = self.prompt_for_move(replace=True, auto=auto, - skipstatichosts=False) + move_file = self.prompt_for_move( + replace=True, auto=auto, skipstatichosts=False + ) self.assertTrue(move_file) mock_query.assert_not_called() @@ -362,8 +376,9 @@ class TestPromptForMove(Base): @mock.patch("updateHostsFile.query_yes_no", return_value=False) def testAutoNoSkipStaticHosts(self, mock_query, mock_move): for replace in (False, True): - move_file = self.prompt_for_move(replace=replace, auto=True, - skipstatichosts=True) + move_file = self.prompt_for_move( + replace=replace, auto=True, skipstatichosts=True + ) self.assertFalse(move_file) mock_query.assert_not_called() @@ -375,8 +390,9 @@ class TestPromptForMove(Base): @mock.patch("updateHostsFile.move_hosts_file_into_place", return_value=0) @mock.patch("updateHostsFile.query_yes_no", return_value=False) def testPromptNoMove(self, mock_query, mock_move): - move_file = self.prompt_for_move(replace=False, auto=False, - skipstatichosts=False) + move_file = self.prompt_for_move( + replace=False, auto=False, skipstatichosts=False + ) self.assertFalse(move_file) self.assert_called_once(mock_query) @@ -388,8 +404,9 @@ class TestPromptForMove(Base): @mock.patch("updateHostsFile.move_hosts_file_into_place", return_value=0) @mock.patch("updateHostsFile.query_yes_no", return_value=True) def testPromptMove(self, mock_query, mock_move): - move_file = self.prompt_for_move(replace=False, auto=False, - skipstatichosts=False) + move_file = self.prompt_for_move( + replace=False, auto=False, skipstatichosts=False + ) self.assertTrue(move_file) self.assert_called_once(mock_query) @@ -397,12 +414,13 @@ class TestPromptForMove(Base): mock_query.reset_mock() mock_move.reset_mock() + + # End Prompt the User # Exclusion Logic class TestDisplayExclusionsOptions(Base): - @mock.patch("updateHostsFile.query_yes_no", return_value=0) @mock.patch("updateHostsFile.exclude_domain", return_value=None) @mock.patch("updateHostsFile.gather_custom_exclusions", return_value=None) @@ -422,8 +440,7 @@ class TestDisplayExclusionsOptions(Base): mock_gather.assert_not_called() - exclude_calls = [mock.call("foo", "foo", []), - mock.call("bar", "foo", None)] + exclude_calls = [mock.call("foo", "foo", []), mock.call("bar", "foo", None)] mock_exclude.assert_has_calls(exclude_calls) @mock.patch("updateHostsFile.query_yes_no", side_effect=[0, 0, 1]) @@ -465,21 +482,23 @@ class TestGatherCustomExclusions(BaseStdout): def test_multiple(self, *_): gather_custom_exclusions("foo", []) - expected = ("Do you have more domains you want to enter? [Y/n] " - "Do you have more domains you want to enter? [Y/n]") + expected = ( + "Do you have more domains you want to enter? [Y/n] " + "Do you have more domains you want to enter? [Y/n]" + ) output = sys.stdout.getvalue() self.assertIn(expected, output) class TestExcludeDomain(Base): - def test_invalid_exclude_domain(self): exclusion_regexes = [] exclusion_pattern = "*.com" for domain in ["google.com", "hulu.com", "adaway.org"]: - self.assertRaises(re.error, exclude_domain, domain, - exclusion_pattern, exclusion_regexes) + self.assertRaises( + re.error, exclude_domain, domain, exclusion_pattern, exclusion_regexes + ) self.assertListEqual(exclusion_regexes, []) @@ -492,8 +511,9 @@ class TestExcludeDomain(Base): for domain in ["google.com", "hulu.com", "adaway.org"]: self.assertEqual(len(exclusion_regexes), exp_count) - exclusion_regexes = exclude_domain(domain, exclusion_pattern, - exclusion_regexes) + exclusion_regexes = exclude_domain( + domain, exclusion_pattern, exclusion_regexes + ) expected_regex = re.compile(exclusion_pattern + domain) expected_regexes.append(expected_regex) @@ -504,35 +524,47 @@ class TestExcludeDomain(Base): class TestMatchesExclusions(Base): - def test_no_match_empty_list(self): exclusion_regexes = [] - for domain in ["1.2.3.4 localhost", "5.6.7.8 hulu.com", - "9.1.2.3 yahoo.com", "4.5.6.7 cloudfront.net"]: + for domain in [ + "1.2.3.4 localhost", + "5.6.7.8 hulu.com", + "9.1.2.3 yahoo.com", + "4.5.6.7 cloudfront.net", + ]: self.assertFalse(matches_exclusions(domain, exclusion_regexes)) def test_no_match_list(self): exclusion_regexes = [r".*\.org", r".*\.edu"] exclusion_regexes = [re.compile(regex) for regex in exclusion_regexes] - for domain in ["1.2.3.4 localhost", "5.6.7.8 hulu.com", - "9.1.2.3 yahoo.com", "4.5.6.7 cloudfront.net"]: + for domain in [ + "1.2.3.4 localhost", + "5.6.7.8 hulu.com", + "9.1.2.3 yahoo.com", + "4.5.6.7 cloudfront.net", + ]: self.assertFalse(matches_exclusions(domain, exclusion_regexes)) def test_match_list(self): exclusion_regexes = [r".*\.com", r".*\.org", r".*\.edu"] exclusion_regexes = [re.compile(regex) for regex in exclusion_regexes] - for domain in ["5.6.7.8 hulu.com", "9.1.2.3 yahoo.com", - "4.5.6.7 adaway.org", "8.9.1.2 education.edu"]: + for domain in [ + "5.6.7.8 hulu.com", + "9.1.2.3 yahoo.com", + "4.5.6.7 adaway.org", + "8.9.1.2 education.edu", + ]: self.assertTrue(matches_exclusions(domain, exclusion_regexes)) + + # End Exclusion Logic # Update Logic class TestUpdateSourcesData(Base): - def setUp(self): Base.setUp(self) @@ -540,13 +572,16 @@ class TestUpdateSourcesData(Base): self.extensions_path = "extensions" self.source_data_filename = "update.json" - self.update_kwargs = dict(datapath=self.data_path, - extensionspath=self.extensions_path, - sourcedatafilename=self.source_data_filename) + self.update_kwargs = dict( + datapath=self.data_path, + extensionspath=self.extensions_path, + sourcedatafilename=self.source_data_filename, + ) def update_sources_data(self, sources_data, extensions): - return update_sources_data(sources_data[:], extensions=extensions, - **self.update_kwargs) + return update_sources_data( + sources_data[:], extensions=extensions, **self.update_kwargs + ) @mock.patch("updateHostsFile.recursive_glob", return_value=[]) @mock.patch("updateHostsFile.path_join_robust", return_value="dirpath") @@ -564,13 +599,17 @@ class TestUpdateSourcesData(Base): new_sources_data = self.update_sources_data(sources_data, extensions) self.assertEqual(new_sources_data, sources_data) - join_calls = [mock.call(self.extensions_path, ".json"), - mock.call(self.extensions_path, ".txt")] + join_calls = [ + mock.call(self.extensions_path, ".json"), + mock.call(self.extensions_path, ".txt"), + ] mock_join_robust.assert_has_calls(join_calls) mock_open.assert_not_called() - @mock.patch("updateHostsFile.recursive_glob", - side_effect=[[], ["update1.txt", "update2.txt"]]) + @mock.patch( + "updateHostsFile.recursive_glob", + side_effect=[[], ["update1.txt", "update2.txt"]], + ) @mock.patch("json.load", return_value={"mock_source": "mock_source.ext"}) @mock.patch("builtins.open", return_value=mock.Mock()) @mock.patch("updateHostsFile.path_join_robust", return_value="dirpath") @@ -583,13 +622,19 @@ class TestUpdateSourcesData(Base): self.assertEqual(new_sources_data, expected) self.assert_called_once(mock_join_robust) - @mock.patch("updateHostsFile.recursive_glob", - side_effect=[["update1.txt", "update2.txt"], - ["update3.txt", "update4.txt"]]) - @mock.patch("json.load", side_effect=[{"mock_source": "mock_source.txt"}, - {"mock_source": "mock_source2.txt"}, - {"mock_source": "mock_source3.txt"}, - {"mock_source": "mock_source4.txt"}]) + @mock.patch( + "updateHostsFile.recursive_glob", + side_effect=[["update1.txt", "update2.txt"], ["update3.txt", "update4.txt"]], + ) + @mock.patch( + "json.load", + side_effect=[ + {"mock_source": "mock_source.txt"}, + {"mock_source": "mock_source2.txt"}, + {"mock_source": "mock_source3.txt"}, + {"mock_source": "mock_source4.txt"}, + ], + ) @mock.patch("builtins.open", return_value=mock.Mock()) @mock.patch("updateHostsFile.path_join_robust", return_value="dirpath") def test_update_both_pathways(self, mock_join_robust, *_): @@ -597,16 +642,17 @@ class TestUpdateSourcesData(Base): sources_data = [{"source": "source1.txt"}, {"source": "source2.txt"}] new_sources_data = self.update_sources_data(sources_data, extensions) - expected = sources_data + [{"mock_source": "mock_source.txt"}, - {"mock_source": "mock_source2.txt"}, - {"mock_source": "mock_source3.txt"}, - {"mock_source": "mock_source4.txt"}] + expected = sources_data + [ + {"mock_source": "mock_source.txt"}, + {"mock_source": "mock_source2.txt"}, + {"mock_source": "mock_source3.txt"}, + {"mock_source": "mock_source4.txt"}, + ] self.assertEqual(new_sources_data, expected) self.assert_called_once(mock_join_robust) class TestUpdateAllSources(BaseStdout): - def setUp(self): BaseStdout.setUp(self) @@ -638,26 +684,29 @@ class TestUpdateAllSources(BaseStdout): @mock.patch("json.load", return_value={"url": "example.com"}) @mock.patch("updateHostsFile.recursive_glob", return_value=["foo"]) @mock.patch("updateHostsFile.write_data", return_value=0) - @mock.patch("updateHostsFile.get_file_by_url", - return_value=Exception("fail")) + @mock.patch("updateHostsFile.get_file_by_url", return_value=Exception("fail")) def test_source_fail(self, mock_get, mock_write, *_): update_all_sources(self.source_data_filename, self.host_filename) mock_write.assert_not_called() self.assert_called_once(mock_get) output = sys.stdout.getvalue() - expecteds = ["Updating source from example.com", - "Error in updating source: example.com"] + expecteds = [ + "Updating source from example.com", + "Error in updating source: example.com", + ] for expected in expecteds: self.assertIn(expected, output) @mock.patch("builtins.open", return_value=mock.Mock()) - @mock.patch("json.load", side_effect=[{"url": "example.com"}, - {"url": "example2.com"}]) + @mock.patch( + "json.load", side_effect=[{"url": "example.com"}, {"url": "example2.com"}] + ) @mock.patch("updateHostsFile.recursive_glob", return_value=["foo", "bar"]) @mock.patch("updateHostsFile.write_data", return_value=0) - @mock.patch("updateHostsFile.get_file_by_url", - side_effect=[Exception("fail"), "file_data"]) + @mock.patch( + "updateHostsFile.get_file_by_url", side_effect=[Exception("fail"), "file_data"] + ) def test_sources_fail_succeed(self, mock_get, mock_write, *_): update_all_sources(self.source_data_filename, self.host_filename) self.assert_called_once(mock_write) @@ -666,22 +715,31 @@ class TestUpdateAllSources(BaseStdout): mock_get.assert_has_calls(get_calls) output = sys.stdout.getvalue() - expecteds = ["Updating source from example.com", - "Error in updating source: example.com", - "Updating source from example2.com"] + expecteds = [ + "Updating source from example.com", + "Error in updating source: example.com", + "Updating source from example2.com", + ] for expected in expecteds: self.assertIn(expected, output) + + # End Update Logic # File Logic class TestNormalizeRule(BaseStdout): - def test_no_match(self): kwargs = dict(target_ip="0.0.0.0", keep_domain_comments=False) - for rule in ["foo", "128.0.0.1", "bar.com/usa", "0.0.0 google", - "0.1.2.3.4 foo/bar", "twitter.com"]: + for rule in [ + "foo", + "128.0.0.1", + "bar.com/usa", + "0.0.0 google", + "0.1.2.3.4 foo/bar", + "twitter.com", + ]: self.assertEqual(normalize_rule(rule, **kwargs), (None, None)) output = sys.stdout.getvalue() @@ -695,8 +753,9 @@ class TestNormalizeRule(BaseStdout): rule = "127.0.0.1 1.google.com foo" expected = ("1.google.com", str(target_ip) + " 1.google.com\n") - actual = normalize_rule(rule, target_ip=target_ip, - keep_domain_comments=False) + actual = normalize_rule( + rule, target_ip=target_ip, keep_domain_comments=False + ) self.assertEqual(actual, expected) # Nothing gets printed if there's a match. @@ -709,12 +768,14 @@ class TestNormalizeRule(BaseStdout): for target_ip in ("0.0.0.0", "127.0.0.1", "8.8.8.8"): for comment in ("foo", "bar", "baz"): rule = "127.0.0.1 1.google.co.uk " + comment - expected = ("1.google.co.uk", - (str(target_ip) + " 1.google.co.uk # " + - comment + "\n")) + expected = ( + "1.google.co.uk", + (str(target_ip) + " 1.google.co.uk # " + comment + "\n"), + ) - actual = normalize_rule(rule, target_ip=target_ip, - keep_domain_comments=True) + actual = normalize_rule( + rule, target_ip=target_ip, keep_domain_comments=True + ) self.assertEqual(actual, expected) # Nothing gets printed if there's a match. @@ -728,8 +789,9 @@ class TestNormalizeRule(BaseStdout): rule = "127.0.0.1 11.22.33.44 foo" expected = ("11.22.33.44", str(target_ip) + " 11.22.33.44\n") - actual = normalize_rule(rule, target_ip=target_ip, - keep_domain_comments=False) + actual = normalize_rule( + rule, target_ip=target_ip, keep_domain_comments=False + ) self.assertEqual(actual, expected) # Nothing gets printed if there's a match. @@ -740,47 +802,56 @@ class TestNormalizeRule(BaseStdout): class TestStripRule(Base): - def test_strip_empty(self): for line in ["0.0.0.0", "domain.com", "foo"]: output = strip_rule(line) self.assertEqual(output, "") def test_strip_exactly_two(self): - for line in ["0.0.0.0 twitter.com", "127.0.0.1 facebook.com", - "8.8.8.8 google.com", "1.2.3.4 foo.bar.edu"]: + for line in [ + "0.0.0.0 twitter.com", + "127.0.0.1 facebook.com", + "8.8.8.8 google.com", + "1.2.3.4 foo.bar.edu", + ]: output = strip_rule(line) self.assertEqual(output, line) def test_strip_more_than_two(self): comment = " # comments here galore" - for line in ["0.0.0.0 twitter.com", "127.0.0.1 facebook.com", - "8.8.8.8 google.com", "1.2.3.4 foo.bar.edu"]: + for line in [ + "0.0.0.0 twitter.com", + "127.0.0.1 facebook.com", + "8.8.8.8 google.com", + "1.2.3.4 foo.bar.edu", + ]: output = strip_rule(line + comment) self.assertEqual(output, line + comment) class TestWriteOpeningHeader(BaseMockDir): - def setUp(self): super(TestWriteOpeningHeader, self).setUp() self.final_file = BytesIO() def test_missing_keyword(self): - kwargs = dict(extensions="", outputsubfolder="", - numberofrules=5, skipstatichosts=False) + kwargs = dict( + extensions="", outputsubfolder="", numberofrules=5, skipstatichosts=False + ) for k in kwargs.keys(): bad_kwargs = kwargs.copy() bad_kwargs.pop(k) - self.assertRaises(KeyError, write_opening_header, - self.final_file, **bad_kwargs) + self.assertRaises( + KeyError, write_opening_header, self.final_file, **bad_kwargs + ) def test_basic(self): - kwargs = dict(extensions="", outputsubfolder="", - numberofrules=5, skipstatichosts=True) + kwargs = dict( + extensions="", outputsubfolder="", numberofrules=5, skipstatichosts=True + ) write_opening_header(self.final_file, **kwargs) contents = self.final_file.getvalue() @@ -790,8 +861,7 @@ class TestWriteOpeningHeader(BaseMockDir): for expected in ( "# This hosts file is a merged collection", "# with a dash of crowd sourcing via Github", - "# Number of unique domains: {count}".format( - count=kwargs["numberofrules"]), + "# Number of unique domains: {count}".format(count=kwargs["numberofrules"]), "Fetch the latest version of this file:", "Project home page: https://github.com/StevenBlack/hosts", ): @@ -808,8 +878,9 @@ class TestWriteOpeningHeader(BaseMockDir): self.assertNotIn(expected, contents) def test_basic_include_static_hosts(self): - kwargs = dict(extensions="", outputsubfolder="", - numberofrules=5, skipstatichosts=False) + kwargs = dict( + extensions="", outputsubfolder="", numberofrules=5, skipstatichosts=False + ) with self.mock_property("platform.system") as obj: obj.return_value = "Windows" write_opening_header(self.final_file, **kwargs) @@ -823,24 +894,20 @@ class TestWriteOpeningHeader(BaseMockDir): "127.0.0.1 localhost", "# This hosts file is a merged collection", "# with a dash of crowd sourcing via Github", - "# Number of unique domains: {count}".format( - count=kwargs["numberofrules"]), + "# Number of unique domains: {count}".format(count=kwargs["numberofrules"]), "Fetch the latest version of this file:", "Project home page: https://github.com/StevenBlack/hosts", ): self.assertIn(expected, contents) # Expected non-contents. - for expected in ( - "# Extensions added to this file:", - "127.0.0.53", - "127.0.1.1", - ): + for expected in ("# Extensions added to this file:", "127.0.0.53", "127.0.1.1"): self.assertNotIn(expected, contents) def test_basic_include_static_hosts_linux(self): - kwargs = dict(extensions="", outputsubfolder="", - numberofrules=5, skipstatichosts=False) + kwargs = dict( + extensions="", outputsubfolder="", numberofrules=5, skipstatichosts=False + ) with self.mock_property("platform.system") as system: system.return_value = "Linux" @@ -860,8 +927,7 @@ class TestWriteOpeningHeader(BaseMockDir): "127.0.0.1 localhost", "# This hosts file is a merged collection", "# with a dash of crowd sourcing via Github", - "# Number of unique domains: {count}".format( - count=kwargs["numberofrules"]), + "# Number of unique domains: {count}".format(count=kwargs["numberofrules"]), "Fetch the latest version of this file:", "Project home page: https://github.com/StevenBlack/hosts", ): @@ -872,9 +938,12 @@ class TestWriteOpeningHeader(BaseMockDir): self.assertNotIn(expected, contents) def test_extensions(self): - kwargs = dict(extensions=["epsilon", "gamma", "mu", "phi"], - outputsubfolder="", numberofrules=5, - skipstatichosts=True) + kwargs = dict( + extensions=["epsilon", "gamma", "mu", "phi"], + outputsubfolder="", + numberofrules=5, + skipstatichosts=True, + ) write_opening_header(self.final_file, **kwargs) contents = self.final_file.getvalue() @@ -886,8 +955,7 @@ class TestWriteOpeningHeader(BaseMockDir): "# Extensions added to this file:", "# This hosts file is a merged collection", "# with a dash of crowd sourcing via Github", - "# Number of unique domains: {count}".format( - count=kwargs["numberofrules"]), + "# Number of unique domains: {count}".format(count=kwargs["numberofrules"]), "Fetch the latest version of this file:", "Project home page: https://github.com/StevenBlack/hosts", ): @@ -909,8 +977,9 @@ class TestWriteOpeningHeader(BaseMockDir): with open(hosts_file, "w") as f: f.write("peter-piper-picked-a-pepper") - kwargs = dict(extensions="", outputsubfolder="", - numberofrules=5, skipstatichosts=True) + kwargs = dict( + extensions="", outputsubfolder="", numberofrules=5, skipstatichosts=True + ) with self.mock_property("updateHostsFile.BASEDIR_PATH"): updateHostsFile.BASEDIR_PATH = self.test_dir @@ -924,8 +993,7 @@ class TestWriteOpeningHeader(BaseMockDir): "peter-piper-picked-a-pepper", "# This hosts file is a merged collection", "# with a dash of crowd sourcing via Github", - "# Number of unique domains: {count}".format( - count=kwargs["numberofrules"]), + "# Number of unique domains: {count}".format(count=kwargs["numberofrules"]), "Fetch the latest version of this file:", "Project home page: https://github.com/StevenBlack/hosts", ): @@ -953,28 +1021,30 @@ class TestWriteOpeningHeader(BaseMockDir): class TestUpdateReadmeData(BaseMockDir): - def setUp(self): super(TestUpdateReadmeData, self).setUp() self.readme_file = os.path.join(self.test_dir, "readmeData.json") def test_missing_keyword(self): - kwargs = dict(extensions="", outputsubfolder="", - numberofrules="", sourcesdata="") + kwargs = dict( + extensions="", outputsubfolder="", numberofrules="", sourcesdata="" + ) for k in kwargs.keys(): bad_kwargs = kwargs.copy() bad_kwargs.pop(k) - self.assertRaises(KeyError, update_readme_data, - self.readme_file, **bad_kwargs) + self.assertRaises( + KeyError, update_readme_data, self.readme_file, **bad_kwargs + ) def test_add_fields(self): with open(self.readme_file, "w") as f: json.dump({"foo": "bar"}, f) - kwargs = dict(extensions=None, outputsubfolder="foo", - numberofrules=5, sourcesdata="hosts") + kwargs = dict( + extensions=None, outputsubfolder="foo", numberofrules=5, sourcesdata="hosts" + ) update_readme_data(self.readme_file, **kwargs) expected = { @@ -983,7 +1053,7 @@ class TestUpdateReadmeData(BaseMockDir): "sourcesdata": "hosts", "entries": 5, }, - "foo": "bar" + "foo": "bar", } with open(self.readme_file, "r") as f: @@ -994,16 +1064,13 @@ class TestUpdateReadmeData(BaseMockDir): with open(self.readme_file, "w") as f: json.dump({"base": "soprano"}, f) - kwargs = dict(extensions=None, outputsubfolder="foo", - numberofrules=5, sourcesdata="hosts") + kwargs = dict( + extensions=None, outputsubfolder="foo", numberofrules=5, sourcesdata="hosts" + ) update_readme_data(self.readme_file, **kwargs) expected = { - "base": { - "location": "foo" + self.sep, - "sourcesdata": "hosts", - "entries": 5, - } + "base": {"location": "foo" + self.sep, "sourcesdata": "hosts", "entries": 5} } with open(self.readme_file, "r") as f: @@ -1014,8 +1081,12 @@ class TestUpdateReadmeData(BaseMockDir): with open(self.readme_file, "w") as f: json.dump({}, f) - kwargs = dict(extensions=["com", "org"], outputsubfolder="foo", - numberofrules=5, sourcesdata="hosts") + kwargs = dict( + extensions=["com", "org"], + outputsubfolder="foo", + numberofrules=5, + sourcesdata="hosts", + ) update_readme_data(self.readme_file, **kwargs) expected = { @@ -1032,7 +1103,6 @@ class TestUpdateReadmeData(BaseMockDir): class TestMoveHostsFile(BaseStdout): - @mock.patch("os.path.abspath", side_effect=lambda f: f) def test_move_hosts_no_name(self, _): with self.mock_property("os.name"): @@ -1054,10 +1124,12 @@ class TestMoveHostsFile(BaseStdout): mock_file = mock.Mock(name="foo") move_hosts_file_into_place(mock_file) - expected = ("Automatically moving the hosts " - "file in place is not yet supported.\n" - "Please move the generated file to " - r"%SystemRoot%\system32\drivers\etc\hosts") + expected = ( + "Automatically moving the hosts " + "file in place is not yet supported.\n" + "Please move the generated file to " + r"%SystemRoot%\system32\drivers\etc\hosts" + ) output = sys.stdout.getvalue() self.assertIn(expected, output) @@ -1070,8 +1142,10 @@ class TestMoveHostsFile(BaseStdout): mock_file = mock.Mock(name="foo") move_hosts_file_into_place(mock_file) - expected = ("Moving the file requires administrative " - "privileges. You might need to enter your password.") + expected = ( + "Moving the file requires administrative " + "privileges. You might need to enter your password." + ) output = sys.stdout.getvalue() self.assertIn(expected, output) @@ -1090,17 +1164,18 @@ class TestMoveHostsFile(BaseStdout): class TestFlushDnsCache(BaseStdout): - @mock.patch("subprocess.call", return_value=0) def test_flush_darwin(self, _): with self.mock_property("platform.system") as obj: obj.return_value = "Darwin" flush_dns_cache() - expected = ("Flushing the DNS cache to utilize new hosts " - "file...\nFlushing the DNS cache requires " - "administrative privileges. You might need to " - "enter your password.") + expected = ( + "Flushing the DNS cache to utilize new hosts " + "file...\nFlushing the DNS cache requires " + "administrative privileges. You might need to " + "enter your password." + ) output = sys.stdout.getvalue() self.assertIn(expected, output) @@ -1122,11 +1197,13 @@ class TestFlushDnsCache(BaseStdout): os.name = "nt" flush_dns_cache() - expected = ("Automatically flushing the DNS cache is " - "not yet supported.\nPlease copy and paste " - "the command 'ipconfig /flushdns' in " - "administrator command prompt after running " - "this script.") + expected = ( + "Automatically flushing the DNS cache is " + "not yet supported.\nPlease copy and paste " + "the command 'ipconfig /flushdns' in " + "administrator command prompt after running " + "this script." + ) output = sys.stdout.getvalue() self.assertIn(expected, output) @@ -1153,8 +1230,7 @@ class TestFlushDnsCache(BaseStdout): os.name = "posix" flush_dns_cache() - expected = ("Flushing the DNS cache by " - "restarting nscd succeeded") + expected = "Flushing the DNS cache by " "restarting nscd succeeded" output = sys.stdout.getvalue() self.assertIn(expected, output) @@ -1168,13 +1244,11 @@ class TestFlushDnsCache(BaseStdout): os.name = "posix" flush_dns_cache() - expected = ("Flushing the DNS cache by " - "restarting nscd failed") + expected = "Flushing the DNS cache by " "restarting nscd failed" output = sys.stdout.getvalue() self.assertIn(expected, output) - @mock.patch("os.path.isfile", side_effect=[True, False, False, - True] + [False] * 10) + @mock.patch("os.path.isfile", side_effect=[True, False, False, True] + [False] * 10) @mock.patch("subprocess.call", side_effect=[1, 0]) def test_flush_posix_fail_then_succeed(self, *_): with self.mock_property("platform.system") as obj: @@ -1185,10 +1259,13 @@ class TestFlushDnsCache(BaseStdout): flush_dns_cache() output = sys.stdout.getvalue() - for expected in [("Flushing the DNS cache by " - "restarting nscd failed"), - ("Flushing the DNS cache by restarting " - "NetworkManager.service succeeded")]: + for expected in [ + ("Flushing the DNS cache by " "restarting nscd failed"), + ( + "Flushing the DNS cache by restarting " + "NetworkManager.service succeeded" + ), + ]: self.assertIn(expected, output) @@ -1202,7 +1279,6 @@ def mock_path_join_robust(*args): class TestRemoveOldHostsFile(BaseMockDir): - def setUp(self): super(TestRemoveOldHostsFile, self).setUp() self.hosts_file = os.path.join(self.test_dir, "hosts") @@ -1238,8 +1314,7 @@ class TestRemoveOldHostsFile(BaseMockDir): contents = f.read() self.assertEqual(contents, "") - @mock.patch("updateHostsFile.path_join_robust", - side_effect=mock_path_join_robust) + @mock.patch("updateHostsFile.path_join_robust", side_effect=mock_path_join_robust) def test_remove_hosts_file_backup(self, _): with open(self.hosts_file, "w") as f: f.write("foo") @@ -1262,6 +1337,8 @@ class TestRemoveOldHostsFile(BaseMockDir): with open(new_hosts_file, "r") as f: contents = f.read() self.assertEqual(contents, "foo") + + # End File Logic @@ -1337,12 +1414,11 @@ def mock_url_open_decode_fail(_): class DomainToIDNA(Base): - def __init__(self, *args, **kwargs): super(DomainToIDNA, self).__init__(*args, **kwargs) - self.domains = [b'\xc9\xa2oogle.com', b'www.huala\xc3\xb1e.cl'] - self.expected_domains = ['xn--oogle-wmc.com', 'www.xn--hualae-0wa.cl'] + self.domains = [b"\xc9\xa2oogle.com", b"www.huala\xc3\xb1e.cl"] + self.expected_domains = ["xn--oogle-wmc.com", "www.xn--hualae-0wa.cl"] def test_empty_line(self): data = ["", "\r", "\n"] @@ -1363,7 +1439,7 @@ class DomainToIDNA(Base): def test_simple_line(self): # Test with a space as separator. for i in range(len(self.domains)): - data = (b"0.0.0.0 " + self.domains[i]).decode('utf-8') + data = (b"0.0.0.0 " + self.domains[i]).decode("utf-8") expected = "0.0.0.0 " + self.expected_domains[i] actual = domain_to_idna(data) @@ -1372,7 +1448,7 @@ class DomainToIDNA(Base): # Test with a tabulation as separator. for i in range(len(self.domains)): - data = (b"0.0.0.0\t" + self.domains[i]).decode('utf-8') + data = (b"0.0.0.0\t" + self.domains[i]).decode("utf-8") expected = "0.0.0.0\t" + self.expected_domains[i] actual = domain_to_idna(data) @@ -1382,7 +1458,7 @@ class DomainToIDNA(Base): def test_multiple_space_as_separator(self): # Test with multiple space as separator. for i in range(len(self.domains)): - data = (b"0.0.0.0 " + self.domains[i]).decode('utf-8') + data = (b"0.0.0.0 " + self.domains[i]).decode("utf-8") expected = "0.0.0.0 " + self.expected_domains[i] actual = domain_to_idna(data) @@ -1392,7 +1468,7 @@ class DomainToIDNA(Base): def test_multiple_tabs_as_separator(self): # Test with multiple tabls as separator. for i in range(len(self.domains)): - data = (b"0.0.0.0\t\t\t\t\t\t" + self.domains[i]).decode('utf-8') + data = (b"0.0.0.0\t\t\t\t\t\t" + self.domains[i]).decode("utf-8") expected = "0.0.0.0\t\t\t\t\t\t" + self.expected_domains[i] actual = domain_to_idna(data) @@ -1402,8 +1478,7 @@ class DomainToIDNA(Base): def test_line_with_comment_at_the_end(self): # Test with a space as separator. for i in range(len(self.domains)): - data = (b"0.0.0.0 " + self.domains[i] + b" # Hello World") \ - .decode('utf-8') + data = (b"0.0.0.0 " + self.domains[i] + b" # Hello World").decode("utf-8") expected = "0.0.0.0 " + self.expected_domains[i] + " # Hello World" actual = domain_to_idna(data) @@ -1412,10 +1487,8 @@ class DomainToIDNA(Base): # Test with a tabulation as separator. for i in range(len(self.domains)): - data = (b"0.0.0.0\t" + self.domains[i] + b" # Hello World") \ - .decode('utf-8') - expected = "0.0.0.0\t" + self.expected_domains[i] + \ - " # Hello World" + data = (b"0.0.0.0\t" + self.domains[i] + b" # Hello World").decode("utf-8") + expected = "0.0.0.0\t" + self.expected_domains[i] + " # Hello World" actual = domain_to_idna(data) @@ -1423,10 +1496,10 @@ class DomainToIDNA(Base): # Test with tabulation as separator of domain and comment. for i in range(len(self.domains)): - data = (b"0.0.0.0\t" + self.domains[i] + b"\t # Hello World") \ - .decode('utf-8') - expected = "0.0.0.0\t" + self.expected_domains[i] + \ - "\t # Hello World" + data = (b"0.0.0.0\t" + self.domains[i] + b"\t # Hello World").decode( + "utf-8" + ) + expected = "0.0.0.0\t" + self.expected_domains[i] + "\t # Hello World" actual = domain_to_idna(data) @@ -1435,10 +1508,10 @@ class DomainToIDNA(Base): # Test with space as separator of domain and tabulation as separator # of comments. for i in range(len(self.domains)): - data = (b"0.0.0.0 " + self.domains[i] + b" \t # Hello World") \ - .decode('utf-8') - expected = "0.0.0.0 " + self.expected_domains[i] + \ - " \t # Hello World" + data = (b"0.0.0.0 " + self.domains[i] + b" \t # Hello World").decode( + "utf-8" + ) + expected = "0.0.0.0 " + self.expected_domains[i] + " \t # Hello World" actual = domain_to_idna(data) @@ -1447,10 +1520,10 @@ class DomainToIDNA(Base): # Test with multiple space as seprator of domain and space and # tabulation as separator or comments. for i in range(len(self.domains)): - data = (b"0.0.0.0 " + self.domains[i] + b" \t # Hello World") \ - .decode('utf-8') - expected = "0.0.0.0 " + self.expected_domains[i] + \ - " \t # Hello World" + data = (b"0.0.0.0 " + self.domains[i] + b" \t # Hello World").decode( + "utf-8" + ) + expected = "0.0.0.0 " + self.expected_domains[i] + " \t # Hello World" actual = domain_to_idna(data) @@ -1459,12 +1532,10 @@ class DomainToIDNA(Base): # Test with multiple tabulations as seprator of domain and space and # tabulation as separator or comments. for i in range(len(self.domains)): - data = (b"0.0.0.0\t\t\t" + - self.domains[i] + - b" \t # Hello World") \ - .decode('utf-8') - expected = "0.0.0.0\t\t\t" + self.expected_domains[i] + \ - " \t # Hello World" + data = (b"0.0.0.0\t\t\t" + self.domains[i] + b" \t # Hello World").decode( + "utf-8" + ) + expected = "0.0.0.0\t\t\t" + self.expected_domains[i] + " \t # Hello World" actual = domain_to_idna(data) @@ -1472,7 +1543,7 @@ class DomainToIDNA(Base): def test_line_without_prefix(self): for i in range(len(self.domains)): - data = self.domains[i].decode('utf-8') + data = self.domains[i].decode("utf-8") expected = self.expected_domains[i] actual = domain_to_idna(data) @@ -1481,9 +1552,7 @@ class DomainToIDNA(Base): class GetFileByUrl(BaseStdout): - - @mock.patch("updateHostsFile.urlopen", - side_effect=mock_url_open) + @mock.patch("updateHostsFile.urlopen", side_effect=mock_url_open) def test_read_url(self, _): url = b"www.google.com" @@ -1492,8 +1561,7 @@ class GetFileByUrl(BaseStdout): self.assertEqual(actual, expected) - @mock.patch("updateHostsFile.urlopen", - side_effect=mock_url_open_fail) + @mock.patch("updateHostsFile.urlopen", side_effect=mock_url_open_fail) def test_read_url_fail(self, _): url = b"www.google.com" self.assertIsNone(get_file_by_url(url)) @@ -1503,8 +1571,7 @@ class GetFileByUrl(BaseStdout): self.assertIn(expected, output) - @mock.patch("updateHostsFile.urlopen", - side_effect=mock_url_open_read_fail) + @mock.patch("updateHostsFile.urlopen", side_effect=mock_url_open_read_fail) def test_read_url_read_fail(self, _): url = b"www.google.com" self.assertIsNone(get_file_by_url(url)) @@ -1514,8 +1581,7 @@ class GetFileByUrl(BaseStdout): self.assertIn(expected, output) - @mock.patch("updateHostsFile.urlopen", - side_effect=mock_url_open_decode_fail) + @mock.patch("updateHostsFile.urlopen", side_effect=mock_url_open_decode_fail) def test_read_url_decode_fail(self, _): url = b"www.google.com" self.assertIsNone(get_file_by_url(url)) @@ -1527,7 +1593,6 @@ class GetFileByUrl(BaseStdout): class TestWriteData(Base): - def test_write_basic(self): f = BytesIO() @@ -1552,15 +1617,17 @@ class TestWriteData(Base): class TestQueryYesOrNo(BaseStdout): - def test_invalid_default(self): for invalid_default in ["foo", "bar", "baz", 1, 2, 3]: self.assertRaises(ValueError, query_yes_no, "?", invalid_default) @mock.patch("updateHostsFile.input", side_effect=["yes"] * 3) def test_valid_default(self, _): - for valid_default, expected in [(None, "[y/n]"), ("yes", "[Y/n]"), - ("no", "[y/N]")]: + for valid_default, expected in [ + (None, "[y/n]"), + ("yes", "[Y/n]"), + ("no", "[y/N]"), + ]: self.assertTrue(query_yes_no("?", valid_default)) output = sys.stdout.getvalue() @@ -1571,7 +1638,7 @@ class TestQueryYesOrNo(BaseStdout): @mock.patch("updateHostsFile.input", side_effect=([""] * 2)) def test_use_valid_default(self, _): for valid_default in ["yes", "no"]: - expected = (valid_default == "yes") + expected = valid_default == "yes" actual = query_yes_no("?", valid_default) self.assertEqual(actual, expected) @@ -1580,7 +1647,10 @@ class TestQueryYesOrNo(BaseStdout): def test_valid_no(self, _): self.assertFalse(query_yes_no("?", None)) - @mock.patch("updateHostsFile.input", side_effect=["yes", "YES", "Y", "yeS", "y", "YeS", "yES", "YEs"]) + @mock.patch( + "updateHostsFile.input", + side_effect=["yes", "YES", "Y", "yeS", "y", "YeS", "yES", "YEs"], + ) def test_valid_yes(self, _): self.assertTrue(query_yes_no("?", None)) @@ -1604,7 +1674,6 @@ class TestQueryYesOrNo(BaseStdout): class TestIsValidDomainFormat(BaseStdout): - def test_empty_domain(self): self.assertFalse(is_valid_domain_format("")) @@ -1614,11 +1683,15 @@ class TestIsValidDomainFormat(BaseStdout): self.assertTrue(expected in output) def test_invalid_domain(self): - expected = ("Do not include www.domain.com or " - "http(s)://domain.com. Try again.") + expected = ( + "Do not include www.domain.com or " "http(s)://domain.com. Try again." + ) - for invalid_domain in ["www.subdomain.domain", "https://github.com", - "http://www.google.com"]: + for invalid_domain in [ + "www.subdomain.domain", + "https://github.com", + "http://www.google.com", + ]: self.assertFalse(is_valid_domain_format(invalid_domain)) output = sys.stdout.getvalue() @@ -1644,8 +1717,16 @@ def mock_walk(stem): the provided parameters. """ - files = ["foo.txt", "bar.bat", "baz.py", "foo/foo.c", "foo/bar.doc", - "foo/baz/foo.py", "bar/foo/baz.c", "bar/bar/foo.bat"] + files = [ + "foo.txt", + "bar.bat", + "baz.py", + "foo/foo.c", + "foo/bar.doc", + "foo/baz/foo.py", + "bar/foo/baz.c", + "bar/bar/foo.bat", + ] if stem == ".": stem = "" @@ -1660,7 +1741,6 @@ def mock_walk(stem): class TestRecursiveGlob(Base): - @staticmethod def sorted_recursive_glob(stem, file_pattern): actual = recursive_glob(stem, file_pattern) @@ -1673,10 +1753,16 @@ class TestRecursiveGlob(Base): with self.mock_property("sys.version_info"): sys.version_info = (2, 6) - expected = ["bar.bat", "bar/bar/foo.bat", - "bar/foo/baz.c", "baz.py", - "foo.txt", "foo/bar.doc", - "foo/baz/foo.py", "foo/foo.c"] + expected = [ + "bar.bat", + "bar/bar/foo.bat", + "bar/foo/baz.c", + "baz.py", + "foo.txt", + "foo/bar.doc", + "foo/baz/foo.py", + "foo/foo.c", + ] actual = self.sorted_recursive_glob("*", "*") self.assertListEqual(actual, expected) @@ -1718,7 +1804,6 @@ def mock_path_join(*_): class TestPathJoinRobust(Base): - def test_basic(self): expected = "path1" actual = path_join_robust("path1") @@ -1750,7 +1835,6 @@ class TestPathJoinRobust(Base): # Colors class TestSupportsColor(BaseStdout): - def test_posix(self): with self.mock_property("sys.platform"): sys.platform = "Linux" @@ -1802,11 +1886,9 @@ class TestSupportsColor(BaseStdout): class TestColorize(Base): - def setUp(self): self.text = "house" - self.colors = ["red", "orange", "yellow", - "green", "blue", "purple"] + self.colors = ["red", "orange", "yellow", "green", "blue", "purple"] @mock.patch("updateHostsFile.supports_color", return_value=False) def test_colorize_no_support(self, _): @@ -1826,7 +1908,6 @@ class TestColorize(Base): class TestPrintSuccess(BaseStdout): - def setUp(self): super(TestPrintSuccess, self).setUp() self.text = "house" @@ -1851,7 +1932,6 @@ class TestPrintSuccess(BaseStdout): class TestPrintFailure(BaseStdout): - def setUp(self): super(TestPrintFailure, self).setUp() self.text = "house" @@ -1873,6 +1953,8 @@ class TestPrintFailure(BaseStdout): actual = sys.stdout.getvalue() self.assertEqual(actual, expected) + + # End Helper Functions diff --git a/updateHostsFile.py b/updateHostsFile.py index 06fadb192..dbb244817 100644 --- a/updateHostsFile.py +++ b/updateHostsFile.py @@ -30,7 +30,7 @@ PY3 = sys.version_info >= (3, 0) if PY3: from urllib.request import urlopen else: - raise Exception('We do not support Python 2 anymore.') + raise Exception("We do not support Python 2 anymore.") # Syntactic sugar for "sudo" command in UNIX / Linux if platform.system() == "OpenBSD": @@ -79,58 +79,120 @@ def get_defaults(): "exclusions": [], "commonexclusions": ["hulu.com"], "blacklistfile": path_join_robust(BASEDIR_PATH, "blacklist"), - "whitelistfile": path_join_robust(BASEDIR_PATH, "whitelist")} + "whitelistfile": path_join_robust(BASEDIR_PATH, "whitelist"), + } + + # End Project Settings def main(): - parser = argparse.ArgumentParser(description="Creates a unified hosts " - "file from hosts stored in " - "data subfolders.") - parser.add_argument("--auto", "-a", dest="auto", default=False, - action="store_true", help="Run without prompting.") - parser.add_argument("--backup", "-b", dest="backup", default=False, - action="store_true", help="Backup the hosts " - "files before they " - "are overridden.") - parser.add_argument("--extensions", "-e", dest="extensions", default=[], - nargs="*", help="Host extensions to include " - "in the final hosts file.") - parser.add_argument("--ip", "-i", dest="targetip", default="0.0.0.0", - help="Target IP address. Default is 0.0.0.0.") - parser.add_argument("--keepdomaincomments", "-k", - dest="keepdomaincomments", action="store_false", default=True, - help="Do not keep domain line comments.") - parser.add_argument("--noupdate", "-n", dest="noupdate", default=False, - action="store_true", help="Don't update from " - "host data sources.") - parser.add_argument("--skipstatichosts", "-s", dest="skipstatichosts", - default=False, action="store_true", - help="Skip static localhost entries " - "in the final hosts file.") - parser.add_argument("--output", "-o", dest="outputsubfolder", default="", - help="Output subfolder for generated hosts file.") - parser.add_argument("--replace", "-r", dest="replace", default=False, - action="store_true", help="Replace your active " - "hosts file with this " - "new hosts file.") - parser.add_argument("--flush-dns-cache", "-f", dest="flushdnscache", - default=False, action="store_true", - help="Attempt to flush DNS cache " - "after replacing the hosts file.") - parser.add_argument("--compress", "-c", dest="compress", - default=False, action="store_true", - help="Compress the hosts file " - "ignoring non-necessary lines " - "(empty lines and comments) and " - "putting multiple domains in " - "each line. Improve the " - "performances under Windows.") - parser.add_argument("--minimise", "-m", dest="minimise", - default=False, action="store_true", - help="Minimise the hosts file " - "ignoring non-necessary lines " - "(empty lines and comments).") + parser = argparse.ArgumentParser( + description="Creates a unified hosts " + "file from hosts stored in " + "data subfolders." + ) + parser.add_argument( + "--auto", + "-a", + dest="auto", + default=False, + action="store_true", + help="Run without prompting.", + ) + parser.add_argument( + "--backup", + "-b", + dest="backup", + default=False, + action="store_true", + help="Backup the hosts " "files before they " "are overridden.", + ) + parser.add_argument( + "--extensions", + "-e", + dest="extensions", + default=[], + nargs="*", + help="Host extensions to include " "in the final hosts file.", + ) + parser.add_argument( + "--ip", + "-i", + dest="targetip", + default="0.0.0.0", + help="Target IP address. Default is 0.0.0.0.", + ) + parser.add_argument( + "--keepdomaincomments", + "-k", + dest="keepdomaincomments", + action="store_false", + default=True, + help="Do not keep domain line comments.", + ) + parser.add_argument( + "--noupdate", + "-n", + dest="noupdate", + default=False, + action="store_true", + help="Don't update from " "host data sources.", + ) + parser.add_argument( + "--skipstatichosts", + "-s", + dest="skipstatichosts", + default=False, + action="store_true", + help="Skip static localhost entries " "in the final hosts file.", + ) + parser.add_argument( + "--output", + "-o", + dest="outputsubfolder", + default="", + help="Output subfolder for generated hosts file.", + ) + parser.add_argument( + "--replace", + "-r", + dest="replace", + default=False, + action="store_true", + help="Replace your active " "hosts file with this " "new hosts file.", + ) + parser.add_argument( + "--flush-dns-cache", + "-f", + dest="flushdnscache", + default=False, + action="store_true", + help="Attempt to flush DNS cache " "after replacing the hosts file.", + ) + parser.add_argument( + "--compress", + "-c", + dest="compress", + default=False, + action="store_true", + help="Compress the hosts file " + "ignoring non-necessary lines " + "(empty lines and comments) and " + "putting multiple domains in " + "each line. Improve the " + "performances under Windows.", + ) + parser.add_argument( + "--minimise", + "-m", + dest="minimise", + default=False, + action="store_true", + help="Minimise the hosts file " + "ignoring non-necessary lines " + "(empty lines and comments).", + ) global settings @@ -149,17 +211,19 @@ def main(): settings["extensionsources"] = list_dir_no_hidden(extensions_path) # All our extensions folders... - settings["extensions"] = [os.path.basename(item) for item in list_dir_no_hidden(extensions_path)] + settings["extensions"] = [ + os.path.basename(item) for item in list_dir_no_hidden(extensions_path) + ] # ... intersected with the extensions passed-in as arguments, then sorted. - settings["extensions"] = sorted(list( - set(options["extensions"]).intersection(settings["extensions"]))) + settings["extensions"] = sorted( + list(set(options["extensions"]).intersection(settings["extensions"])) + ) auto = settings["auto"] exclusion_regexes = settings["exclusionregexs"] source_data_filename = settings["sourcedatafilename"] - update_sources = prompt_for_update(freshen=settings["freshen"], - update_auto=auto) + update_sources = prompt_for_update(freshen=settings["freshen"], update_auto=auto) if update_sources: update_all_sources(source_data_filename, settings["hostfilename"]) @@ -171,14 +235,17 @@ def main(): exclusion_regexes = display_exclusion_options( common_exclusions=common_exclusions, exclusion_pattern=exclusion_pattern, - exclusion_regexes=exclusion_regexes) + exclusion_regexes=exclusion_regexes, + ) extensions = settings["extensions"] - sources_data = update_sources_data(settings["sourcesdata"], - datapath=data_path, - extensions=extensions, - extensionspath=extensions_path, - sourcedatafilename=source_data_filename) + sources_data = update_sources_data( + settings["sourcesdata"], + datapath=data_path, + extensions=extensions, + extensionspath=extensions_path, + sourcedatafilename=source_data_filename, + ) merge_file = create_initial_file() remove_old_hosts_file(settings["backup"]) @@ -199,32 +266,44 @@ def main(): output_subfolder = settings["outputsubfolder"] skip_static_hosts = settings["skipstatichosts"] - write_opening_header(final_file, extensions=extensions, - numberofrules=number_of_rules, - outputsubfolder=output_subfolder, - skipstatichosts=skip_static_hosts) + write_opening_header( + final_file, + extensions=extensions, + numberofrules=number_of_rules, + outputsubfolder=output_subfolder, + skipstatichosts=skip_static_hosts, + ) final_file.close() - update_readme_data(settings["readmedatafilename"], - extensions=extensions, - numberofrules=number_of_rules, - outputsubfolder=output_subfolder, - sourcesdata=sources_data) + update_readme_data( + settings["readmedatafilename"], + extensions=extensions, + numberofrules=number_of_rules, + outputsubfolder=output_subfolder, + sourcesdata=sources_data, + ) - print_success("Success! The hosts file has been saved in folder " + - output_subfolder + "\nIt contains " + - "{:,}".format(number_of_rules) + - " unique entries.") + print_success( + "Success! The hosts file has been saved in folder " + + output_subfolder + + "\nIt contains " + + "{:,}".format(number_of_rules) + + " unique entries." + ) - move_file = prompt_for_move(final_file, auto=auto, - replace=settings["replace"], - skipstatichosts=skip_static_hosts) + move_file = prompt_for_move( + final_file, + auto=auto, + replace=settings["replace"], + skipstatichosts=skip_static_hosts, + ) # We only flush the DNS cache if we have # moved a new hosts file into place. if move_file: - prompt_for_flush_dns_cache(flush_cache=settings["flushdnscache"], - prompt_flush=not auto) + prompt_for_flush_dns_cache( + flush_cache=settings["flushdnscache"], prompt_flush=not auto + ) # Prompt the User @@ -259,7 +338,9 @@ def prompt_for_update(freshen, update_auto): # Starting in Python 3.3, IOError is aliased # OSError. However, we have to catch both for # Python 2.x failures. - print_failure("ERROR: No 'hosts' file in the folder. Try creating one manually.") + print_failure( + "ERROR: No 'hosts' file in the folder. Try creating one manually." + ) if not freshen: return @@ -291,9 +372,11 @@ def prompt_for_exclusions(skip_prompt): custom domains beyond those in the whitelist. """ - prompt = ("Do you want to exclude any domains?\n" - "For example, hulu.com video streaming must be able to access " - "its tracking and ad servers in order to play video.") + prompt = ( + "Do you want to exclude any domains?\n" + "For example, hulu.com video streaming must be able to access " + "its tracking and ad servers in order to play video." + ) if not skip_prompt: if query_yes_no(prompt): @@ -361,6 +444,8 @@ def prompt_for_move(final_file, **move_params): move_hosts_file_into_place(final_file) return move_file + + # End Prompt the User @@ -394,15 +479,16 @@ def display_exclusion_options(common_exclusions, exclusion_pattern, exclusion_re prompt = "Do you want to exclude the domain " + exclusion_option + " ?" if query_yes_no(prompt): - exclusion_regexes = exclude_domain(exclusion_option, - exclusion_pattern, - exclusion_regexes) + exclusion_regexes = exclude_domain( + exclusion_option, exclusion_pattern, exclusion_regexes + ) else: continue if query_yes_no("Do you want to exclude any other domains?"): - exclusion_regexes = gather_custom_exclusions(exclusion_pattern, - exclusion_regexes) + exclusion_regexes = gather_custom_exclusions( + exclusion_pattern, exclusion_regexes + ) return exclusion_regexes @@ -432,7 +518,9 @@ def gather_custom_exclusions(exclusion_pattern, exclusion_regexes): user_domain = input(domain_prompt) if is_valid_domain_format(user_domain): - exclusion_regexes = exclude_domain(user_domain, exclusion_pattern, exclusion_regexes) + exclusion_regexes = exclude_domain( + user_domain, exclusion_pattern, exclusion_regexes + ) continue_prompt = "Do you have more domains you want to enter?" if not query_yes_no(continue_prompt): @@ -497,6 +585,8 @@ def matches_exclusions(stripped_rule, exclusion_regexes): return True return False + + # End Exclusion Logic @@ -533,8 +623,7 @@ def update_sources_data(sources_data, **sources_params): update_file.close() for source in sources_params["extensions"]: - source_dir = path_join_robust( - sources_params["extensionspath"], source) + source_dir = path_join_robust(sources_params["extensionspath"], source) for update_file_path in recursive_glob(source_dir, source_data_filename): update_file = open(update_file_path, "r") update_data = json.load(update_file) @@ -579,9 +668,7 @@ def update_all_sources(source_data_filename, host_filename): """ # The transforms we support - transform_methods = { - 'jsonarray': jsonarray - } + transform_methods = {"jsonarray": jsonarray} all_sources = recursive_glob("*", source_data_filename) @@ -606,13 +693,16 @@ def update_all_sources(source_data_filename, host_filename): # get rid of carriage-return symbols updated_file = updated_file.replace("\r", "") - hosts_file = open(path_join_robust(BASEDIR_PATH, - os.path.dirname(source), - host_filename), "wb") + hosts_file = open( + path_join_robust(BASEDIR_PATH, os.path.dirname(source), host_filename), + "wb", + ) write_data(hosts_file, updated_file) hosts_file.close() except Exception: print("Error in updating source: ", update_url) + + # End Update Logic @@ -625,8 +715,7 @@ def create_initial_file(): merge_file = tempfile.NamedTemporaryFile() # spin the sources for the base file - for source in recursive_glob(settings["datapath"], - settings["hostfilename"]): + for source in recursive_glob(settings["datapath"], settings["hostfilename"]): start = "# Start {}\n\n".format(os.path.basename(os.path.dirname(source))) end = "# End {}\n\n".format(os.path.basename(os.path.dirname(source))) @@ -636,8 +725,10 @@ def create_initial_file(): # spin the sources for extensions to the base file for source in settings["extensions"]: - for filename in recursive_glob(path_join_robust( - settings["extensionspath"], source), settings["hostfilename"]): + for filename in recursive_glob( + path_join_robust(settings["extensionspath"], source), + settings["hostfilename"], + ): with open(filename, "r") as curFile: write_data(merge_file, curFile.read()) @@ -668,7 +759,7 @@ def compress_file(input_file, target_ip, output_file): """ input_file.seek(0) # reset file pointer - write_data(output_file, '\n') + write_data(output_file, "\n") target_ip_len = len(target_ip) lines = [target_ip] @@ -677,12 +768,11 @@ def compress_file(input_file, target_ip, output_file): line = line.decode("UTF-8") if line.startswith(target_ip): - if lines[lines_index].count(' ') < 9: - lines[lines_index] += ' ' \ - + line[target_ip_len:line.find('#')].strip() + if lines[lines_index].count(" ") < 9: + lines[lines_index] += " " + line[target_ip_len : line.find("#")].strip() else: - lines[lines_index] += '\n' - lines.append(line[:line.find('#')].strip()) + lines[lines_index] += "\n" + lines.append(line[: line.find("#")].strip()) lines_index += 1 for line in lines: @@ -707,14 +797,14 @@ def minimise_file(input_file, target_ip, output_file): """ input_file.seek(0) # reset file pointer - write_data(output_file, '\n') + write_data(output_file, "\n") lines = [] for line in input_file.readlines(): line = line.decode("UTF-8") if line.startswith(target_ip): - lines.append(line[:line.find('#')].strip() + '\n') + lines.append(line[: line.find("#")].strip() + "\n") for line in lines: write_data(output_file, line) @@ -772,27 +862,28 @@ def remove_dups_and_excl(merge_file, exclusion_regexes, output_file=None): line = line.replace("\t+", " ") # see gh-271: trim trailing whitespace, periods - line = line.rstrip(' .') + line = line.rstrip(" .") # Testing the first character doesn't require startswith - if line[0] == "#" or re.match(r'^\s*$', line[0]): + if line[0] == "#" or re.match(r"^\s*$", line[0]): write_data(final_file, line) continue if "::1" in line: continue stripped_rule = strip_rule(line) # strip comments - if not stripped_rule or matches_exclusions(stripped_rule, - exclusion_regexes): + if not stripped_rule or matches_exclusions(stripped_rule, exclusion_regexes): continue # Normalize rule hostname, normalized_rule = normalize_rule( - stripped_rule, target_ip=settings["targetip"], - keep_domain_comments=settings["keepdomaincomments"]) + stripped_rule, + target_ip=settings["targetip"], + keep_domain_comments=settings["keepdomaincomments"], + ) for exclude in exclusions: - if re.search(r'[\s\.]' + re.escape(exclude) + r'\s', line): + if re.search(r"[\s\.]" + re.escape(exclude) + r"\s", line): write_line = False break @@ -832,7 +923,7 @@ def normalize_rule(rule, target_ip, keep_domain_comments): """ first try: IP followed by domain """ - regex = r'^\s*(\d{1,3}\.){3}\d{1,3}\s+([\w\.-]+[a-zA-Z])(.*)' + regex = r"^\s*(\d{1,3}\.){3}\d{1,3}\s+([\w\.-]+[a-zA-Z])(.*)" result = re.search(regex, rule) if result: @@ -843,7 +934,7 @@ def normalize_rule(rule, target_ip, keep_domain_comments): rule = "%s %s" % (target_ip, hostname) if suffix and keep_domain_comments: - if not suffix.strip().startswith('#'): + if not suffix.strip().startswith("#"): rule += " #%s" % suffix else: rule += " %s" % suffix @@ -853,7 +944,7 @@ def normalize_rule(rule, target_ip, keep_domain_comments): """ next try: IP address followed by host IP address """ - regex = r'^\s*(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*(.*)' + regex = r"^\s*(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\s*(.*)" result = re.search(regex, rule) if result: @@ -863,7 +954,7 @@ def normalize_rule(rule, target_ip, keep_domain_comments): rule = "%s %s" % (target_ip, ip_host) if suffix and keep_domain_comments: - if not suffix.strip().startswith('#'): + if not suffix.strip().startswith("#"): rule += " #%s" % suffix else: rule += " %s" % suffix @@ -922,22 +1013,51 @@ def write_opening_header(final_file, **header_params): file_contents = final_file.read() # Save content. final_file.seek(0) # Write at the top. - write_data(final_file, "# This hosts file is a merged collection " - "of hosts from reputable sources,\n") + write_data( + final_file, + "# This hosts file is a merged collection " + "of hosts from reputable sources,\n", + ) write_data(final_file, "# with a dash of crowd sourcing via Github\n#\n") - write_data(final_file, "# Date: " + time.strftime("%d %B %Y %H:%M:%S (%Z)", time.gmtime()) + "\n") + write_data( + final_file, + "# Date: " + time.strftime("%d %B %Y %H:%M:%S (%Z)", time.gmtime()) + "\n", + ) if header_params["extensions"]: - write_data(final_file, "# Extensions added to this file: " + ", ".join( - header_params["extensions"]) + "\n") + write_data( + final_file, + "# Extensions added to this file: " + + ", ".join(header_params["extensions"]) + + "\n", + ) - write_data(final_file, ("# Number of unique domains: {:,}\n#\n".format(header_params["numberofrules"]))) - write_data(final_file, "# Fetch the latest version of this file: " - "https://raw.githubusercontent.com/StevenBlack/hosts/master/" + - path_join_robust(header_params["outputsubfolder"], "") + "hosts\n") - write_data(final_file, "# Project home page: https://github.com/StevenBlack/hosts\n") - write_data(final_file, "# Project releases: https://github.com/StevenBlack/hosts/releases\n#\n") - write_data(final_file, "# ===============================================================\n") + write_data( + final_file, + ( + "# Number of unique domains: {:,}\n#\n".format( + header_params["numberofrules"] + ) + ), + ) + write_data( + final_file, + "# Fetch the latest version of this file: " + "https://raw.githubusercontent.com/StevenBlack/hosts/master/" + + path_join_robust(header_params["outputsubfolder"], "") + + "hosts\n", + ) + write_data( + final_file, "# Project home page: https://github.com/StevenBlack/hosts\n" + ) + write_data( + final_file, + "# Project releases: https://github.com/StevenBlack/hosts/releases\n#\n", + ) + write_data( + final_file, + "# ===============================================================\n", + ) write_data(final_file, "\n") if not header_params["skipstatichosts"]: @@ -997,9 +1117,11 @@ def update_readme_data(readme_file, **readme_updates): extensions_key = "-".join(extensions) output_folder = readme_updates["outputsubfolder"] - generation_data = {"location": path_join_robust(output_folder, ""), - "entries": readme_updates["numberofrules"], - "sourcesdata": readme_updates["sourcesdata"]} + generation_data = { + "location": path_join_robust(output_folder, ""), + "entries": readme_updates["numberofrules"], + "sourcesdata": readme_updates["sourcesdata"], + } with open(readme_file, "r") as f: readme_data = json.load(f) @@ -1029,12 +1151,16 @@ def move_hosts_file_into_place(final_file): filename = os.path.abspath(final_file.name) if os.name == "posix": - print("Moving the file requires administrative privileges. You might need to enter your password.") + print( + "Moving the file requires administrative privileges. You might need to enter your password." + ) if subprocess.call(SUDO + ["cp", filename, "/etc/hosts"]): print_failure("Moving the file failed.") elif os.name == "nt": print("Automatically moving the hosts file in place is not yet supported.") - print("Please move the generated file to %SystemRoot%\system32\drivers\etc\hosts") # noqa: W605 + print( + "Please move the generated file to %SystemRoot%\system32\drivers\etc\hosts" + ) # noqa: W605 def flush_dns_cache(): @@ -1043,7 +1169,9 @@ def flush_dns_cache(): """ print("Flushing the DNS cache to utilize new hosts file...") - print("Flushing the DNS cache requires administrative privileges. You might need to enter your password.") + print( + "Flushing the DNS cache requires administrative privileges. You might need to enter your password." + ) dns_cache_found = False @@ -1052,8 +1180,10 @@ def flush_dns_cache(): print_failure("Flushing the DNS cache failed.") elif os.name == "nt": print("Automatically flushing the DNS cache is not yet supported.") - print("Please copy and paste the command 'ipconfig /flushdns' in " - "administrator command prompt after running this script.") + print( + "Please copy and paste the command 'ipconfig /flushdns' in " + "administrator command prompt after running this script." + ) else: nscd_prefixes = ["/etc", "/etc/rc.d"] nscd_msg = "Flushing the DNS cache by restarting nscd {result}" @@ -1088,7 +1218,9 @@ def flush_dns_cache(): for service_type in service_types: service = service_type + ".service" service_file = path_join_robust(system_dir, service) - service_msg = ("Flushing the DNS cache by restarting " + service + " {result}") + service_msg = ( + "Flushing the DNS cache by restarting " + service + " {result}" + ) if os.path.isfile(service_file): dns_cache_found = True @@ -1132,8 +1264,9 @@ def remove_old_hosts_file(backup): open(old_file_path, "a").close() if backup: - backup_file_path = path_join_robust(BASEDIR_PATH, "hosts-{}".format( - time.strftime("%Y-%m-%d-%H-%M-%S"))) + backup_file_path = path_join_robust( + BASEDIR_PATH, "hosts-{}".format(time.strftime("%Y-%m-%d-%H-%M-%S")) + ) # Make a backup copy, marking the date in which the list was updated shutil.copy(old_file_path, backup_file_path) @@ -1142,6 +1275,8 @@ def remove_old_hosts_file(backup): # Create new empty hosts file open(old_file_path, "a").close() + + # End File Logic @@ -1174,9 +1309,9 @@ def domain_to_idna(line): - The following also split the trailing comment of a given line. """ - if not line.startswith('#'): - tabs = '\t' - space = ' ' + if not line.startswith("#"): + tabs = "\t" + space = " " tabs_position, space_position = (line.find(tabs), line.find(space)) @@ -1190,7 +1325,7 @@ def domain_to_idna(line): elif not space_position == -1: separator = space else: - separator = '' + separator = "" if separator: splited_line = line.split(separator) @@ -1202,20 +1337,21 @@ def domain_to_idna(line): break index += 1 - if '#' in splited_line[index]: - index_comment = splited_line[index].find('#') + if "#" in splited_line[index]: + index_comment = splited_line[index].find("#") if index_comment > -1: comment = splited_line[index][index_comment:] - splited_line[index] = splited_line[index] \ - .split(comment)[0] \ - .encode("IDNA").decode("UTF-8") + \ - comment + splited_line[index] = ( + splited_line[index] + .split(comment)[0] + .encode("IDNA") + .decode("UTF-8") + + comment + ) - splited_line[index] = splited_line[index] \ - .encode("IDNA") \ - .decode("UTF-8") + splited_line[index] = splited_line[index].encode("IDNA").decode("UTF-8") except IndexError: pass return separator.join(splited_line) @@ -1267,8 +1403,8 @@ def get_file_by_url(url): try: f = urlopen(url) - soup = BeautifulSoup(f.read(), 'lxml').get_text() - return '\n'.join(list(map(domain_to_idna, soup.split('\n')))) + soup = BeautifulSoup(f.read(), "lxml").get_text() + return "\n".join(list(map(domain_to_idna, soup.split("\n")))) except Exception: print("Problem getting file: ", url) @@ -1322,11 +1458,8 @@ def query_yes_no(question, default="yes"): yes : Whether or not the user replied yes to the question. """ - valid = {"yes": "yes", "y": "yes", "ye": "yes", - "no": "no", "n": "no"} - prompt = {None: " [y/n] ", - "yes": " [Y/n] ", - "no": " [y/N] "}.get(default, None) + valid = {"yes": "yes", "y": "yes", "ye": "yes", "no": "no", "n": "no"} + prompt = {None: " [y/n] ", "yes": " [Y/n] ", "no": " [y/N] "}.get(default, None) if not prompt: raise ValueError("invalid default answer: '%s'" % default) @@ -1371,8 +1504,10 @@ def is_valid_domain_format(domain): domain_regex = re.compile(r"www\d{0,3}[.]|https?") if domain_regex.match(domain): - print("The domain " + domain + " is not valid. Do not include " - "www.domain.com or http(s)://domain.com. Try again.") + print( + "The domain " + domain + " is not valid. Do not include " + "www.domain.com or http(s)://domain.com. Try again." + ) return False else: return True @@ -1436,7 +1571,9 @@ def path_join_robust(path, *paths): return os.path.join(path, *paths) except UnicodeDecodeError as e: - raise locale.Error("Unable to construct path. This is likely a LOCALE issue:\n\n" + str(e)) + raise locale.Error( + "Unable to construct path. This is likely a LOCALE issue:\n\n" + str(e) + ) # Colors @@ -1462,7 +1599,9 @@ def supports_color(): """ sys_platform = sys.platform - supported = sys_platform != "Pocket PC" and (sys_platform != "win32" or "ANSICON" in os.environ) + supported = sys_platform != "Pocket PC" and ( + sys_platform != "win32" or "ANSICON" in os.environ + ) atty_connected = hasattr(sys.stdout, "isatty") and sys.stdout.isatty() return supported and atty_connected @@ -1521,6 +1660,8 @@ def print_failure(text): """ print(colorize(text, Colors.FAIL)) + + # End Helper Functions diff --git a/updateReadme.py b/updateReadme.py index d6f4622c8..f99df81be 100644 --- a/updateReadme.py +++ b/updateReadme.py @@ -12,19 +12,21 @@ from string import Template # Project Settings BASEDIR_PATH = os.path.dirname(os.path.realpath(__file__)) -README_TEMPLATE = os.path.join(BASEDIR_PATH, 'readme_template.md') -README_FILENAME = 'readme.md' +README_TEMPLATE = os.path.join(BASEDIR_PATH, "readme_template.md") +README_FILENAME = "readme.md" README_DATA_FILENAME = "readmeData.json" def main(): - s = Template('${description} | [Readme](https://github.com/StevenBlack/' - 'hosts/blob/master/${location}readme.md) | ' - '[link](https://raw.githubusercontent.com/StevenBlack/' - 'hosts/master/${location}hosts) | ' - '${fmtentries} | ' - '[link](http://sbc.io/hosts/${location}hosts)') - with open(README_DATA_FILENAME, 'r') as f: + s = Template( + "${description} | [Readme](https://github.com/StevenBlack/" + "hosts/blob/master/${location}readme.md) | " + "[link](https://raw.githubusercontent.com/StevenBlack/" + "hosts/master/${location}hosts) | " + "${fmtentries} | " + "[link](http://sbc.io/hosts/${location}hosts)" + ) + with open(README_DATA_FILENAME, "r") as f: data = json.load(f) keys = list(data.keys()) @@ -37,10 +39,11 @@ def main(): for key in keys: data[key]["fmtentries"] = "{:,}".format(data[key]["entries"]) if key == "base": - data[key]["description"] = 'Unified hosts = **(adware + malware)**' + data[key]["description"] = "Unified hosts = **(adware + malware)**" else: - data[key]["description"] = ('Unified hosts **+ ' + - key.replace("-", " + ") + '**') + data[key]["description"] = ( + "Unified hosts **+ " + key.replace("-", " + ") + "**" + ) toc_rows += s.substitute(data[key]) + "\n" @@ -52,10 +55,13 @@ def main(): "issues": "", "url": "", "license": "", - "issues": ""} + "issues": "", + } - t = Template('${name} | ${description} |[link](${homeurl})' - ' | [raw](${url}) | ${frequency} | ${license} | [issues](${issues}) ') + t = Template( + "${name} | ${description} |[link](${homeurl})" + " | [raw](${url}) | ${frequency} | ${license} | [issues](${issues}) " + ) for key in keys: extensions = key.replace("-", ", ") @@ -71,16 +77,21 @@ def main(): this_row.update(source) source_rows += t.substitute(this_row) + "\n" - with open(os.path.join(data[key]["location"], - README_FILENAME), "wt") as out: + with open(os.path.join(data[key]["location"], README_FILENAME), "wt") as out: for line in open(README_TEMPLATE): - line = line.replace('@GEN_DATE@', time.strftime("%B %d %Y", time.gmtime())) - line = line.replace('@EXTENSIONS@', extensions_str) - line = line.replace('@EXTENSIONS_HEADER@', extensions_header) - line = line.replace('@NUM_ENTRIES@', "{:,}".format(data[key]["entries"])) - line = line.replace('@SUBFOLDER@', os.path.join(data[key]["location"], '')) - line = line.replace('@TOCROWS@', toc_rows) - line = line.replace('@SOURCEROWS@', source_rows) + line = line.replace( + "@GEN_DATE@", time.strftime("%B %d %Y", time.gmtime()) + ) + line = line.replace("@EXTENSIONS@", extensions_str) + line = line.replace("@EXTENSIONS_HEADER@", extensions_header) + line = line.replace( + "@NUM_ENTRIES@", "{:,}".format(data[key]["entries"]) + ) + line = line.replace( + "@SUBFOLDER@", os.path.join(data[key]["location"], "") + ) + line = line.replace("@TOCROWS@", toc_rows) + line = line.replace("@SOURCEROWS@", source_rows) out.write(line)