mirror of
https://github.com/StevenBlack/hosts.git
synced 2024-07-04 19:46:02 +02:00
870ade3522
Upon checking the lastest release after merging #2433, I noticed that some subjects with slashes goes through. This patch fixes that by ignoring subjects which contains slashes.
1966 lines
57 KiB
Python
Executable File
1966 lines
57 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Script by Ben Limmer
|
|
# https://github.com/l1m5
|
|
#
|
|
# This Python script will combine all the host files you provide
|
|
# as sources into one, unique host file to keep your internet browsing happy.
|
|
|
|
import argparse
|
|
import fnmatch
|
|
import ipaddress
|
|
import json
|
|
import locale
|
|
import os
|
|
import platform
|
|
from pathlib import Path
|
|
import re
|
|
import shutil
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from glob import glob
|
|
from typing import Optional, Tuple
|
|
|
|
# Detecting Python 3 for version-dependent implementations
|
|
PY3 = sys.version_info >= (3, 0)
|
|
|
|
if not PY3:
|
|
raise Exception("We do not support Python 2 anymore.")
|
|
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
raise ImportError(
|
|
"This project's dependencies have changed. The Requests library ("
|
|
"https://docs.python-requests.org/en/latest/) is now required."
|
|
)
|
|
|
|
|
|
# Syntactic sugar for "sudo" command in UNIX / Linux
|
|
if platform.system() == "OpenBSD":
|
|
SUDO = ["/usr/bin/doas"]
|
|
elif platform.system() == "Windows":
|
|
SUDO = ["powershell", "Start-Process", "powershell", "-Verb", "runAs"]
|
|
else:
|
|
SUDO = ["/usr/bin/env", "sudo"]
|
|
|
|
|
|
# Project Settings
|
|
BASEDIR_PATH = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
|
|
def get_defaults():
|
|
"""
|
|
Helper method for getting the default settings.
|
|
|
|
Returns
|
|
-------
|
|
default_settings : dict
|
|
A dictionary of the default settings when updating host information.
|
|
"""
|
|
|
|
return {
|
|
"numberofrules": 0,
|
|
"datapath": path_join_robust(BASEDIR_PATH, "data"),
|
|
"freshen": True,
|
|
"replace": False,
|
|
"backup": False,
|
|
"skipstatichosts": False,
|
|
"keepdomaincomments": True,
|
|
"extensionspath": path_join_robust(BASEDIR_PATH, "extensions"),
|
|
"extensions": [],
|
|
"nounifiedhosts": False,
|
|
"compress": False,
|
|
"minimise": False,
|
|
"outputsubfolder": "",
|
|
"hostfilename": "hosts",
|
|
"targetip": "0.0.0.0",
|
|
"sourcedatafilename": "update.json",
|
|
"sourcesdata": [],
|
|
"readmefilename": "readme.md",
|
|
"readmetemplate": path_join_robust(BASEDIR_PATH, "readme_template.md"),
|
|
"readmedata": {},
|
|
"readmedatafilename": path_join_robust(BASEDIR_PATH, "readmeData.json"),
|
|
"exclusionpattern": r"([a-zA-Z\d-]+\.){0,}",
|
|
"exclusionregexes": [],
|
|
"exclusions": [],
|
|
"commonexclusions": ["hulu.com"],
|
|
"blacklistfile": path_join_robust(BASEDIR_PATH, "blacklist"),
|
|
"whitelistfile": path_join_robust(BASEDIR_PATH, "whitelist"),
|
|
}
|
|
|
|
|
|
# End Project Settings
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Creates a unified hosts "
|
|
"file from hosts stored in the data subfolders."
|
|
)
|
|
parser.add_argument(
|
|
"--auto",
|
|
"-a",
|
|
dest="auto",
|
|
default=False,
|
|
action="store_true",
|
|
help="Run without prompting.",
|
|
)
|
|
parser.add_argument(
|
|
"--backup",
|
|
"-b",
|
|
dest="backup",
|
|
default=False,
|
|
action="store_true",
|
|
help="Backup the hosts files before they are overridden.",
|
|
)
|
|
parser.add_argument(
|
|
"--extensions",
|
|
"-e",
|
|
dest="extensions",
|
|
default=[],
|
|
nargs="*",
|
|
help="Host extensions to include in the final hosts file.",
|
|
)
|
|
parser.add_argument(
|
|
"--nounifiedhosts",
|
|
dest="nounifiedhosts",
|
|
default=False,
|
|
action="store_true",
|
|
help="Do not include the unified hosts file in the final hosts file. Usually used together with `--extensions`.",
|
|
)
|
|
parser.add_argument(
|
|
"--ip",
|
|
"-i",
|
|
dest="targetip",
|
|
default="0.0.0.0",
|
|
help="Target IP address. Default is 0.0.0.0.",
|
|
)
|
|
parser.add_argument(
|
|
"--keepdomaincomments",
|
|
"-k",
|
|
dest="keepdomaincomments",
|
|
action="store_false",
|
|
default=True,
|
|
help="Do not keep domain line comments.",
|
|
)
|
|
parser.add_argument(
|
|
"--noupdate",
|
|
"-n",
|
|
dest="noupdate",
|
|
default=False,
|
|
action="store_true",
|
|
help="Don't update from host data sources.",
|
|
)
|
|
parser.add_argument(
|
|
"--skipstatichosts",
|
|
"-s",
|
|
dest="skipstatichosts",
|
|
default=False,
|
|
action="store_true",
|
|
help="Skip static localhost entries in the final hosts file.",
|
|
)
|
|
parser.add_argument(
|
|
"--nogendata",
|
|
"-g",
|
|
dest="nogendata",
|
|
default=False,
|
|
action="store_true",
|
|
help="Skip generation of readmeData.json",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
"-o",
|
|
dest="outputsubfolder",
|
|
default="",
|
|
help="Output subfolder for generated hosts file.",
|
|
)
|
|
parser.add_argument(
|
|
"--replace",
|
|
"-r",
|
|
dest="replace",
|
|
default=False,
|
|
action="store_true",
|
|
help="Replace your active hosts file with this new hosts file.",
|
|
)
|
|
parser.add_argument(
|
|
"--flush-dns-cache",
|
|
"-f",
|
|
dest="flushdnscache",
|
|
default=False,
|
|
action="store_true",
|
|
help="Attempt to flush DNS cache after replacing the hosts file.",
|
|
)
|
|
parser.add_argument(
|
|
"--compress",
|
|
"-c",
|
|
dest="compress",
|
|
default=False,
|
|
action="store_true",
|
|
help="Compress the hosts file ignoring non-necessary lines "
|
|
"(empty lines and comments) and putting multiple domains in "
|
|
"each line. Improve the performance under Windows.",
|
|
)
|
|
parser.add_argument(
|
|
"--minimise",
|
|
"-m",
|
|
dest="minimise",
|
|
default=False,
|
|
action="store_true",
|
|
help="Minimise the hosts file ignoring non-necessary lines "
|
|
"(empty lines and comments).",
|
|
)
|
|
parser.add_argument(
|
|
"--whitelist",
|
|
"-w",
|
|
dest="whitelistfile",
|
|
default=path_join_robust(BASEDIR_PATH, "whitelist"),
|
|
help="Whitelist file to use while generating hosts files.",
|
|
)
|
|
parser.add_argument(
|
|
"--blacklist",
|
|
"-x",
|
|
dest="blacklistfile",
|
|
default=path_join_robust(BASEDIR_PATH, "blacklist"),
|
|
help="Blacklist file to use while generating hosts files.",
|
|
)
|
|
|
|
global settings
|
|
|
|
options = vars(parser.parse_args())
|
|
|
|
options["outputpath"] = path_join_robust(BASEDIR_PATH, options["outputsubfolder"])
|
|
options["freshen"] = not options["noupdate"]
|
|
|
|
settings = get_defaults()
|
|
settings.update(options)
|
|
|
|
data_path = settings["datapath"]
|
|
extensions_path = settings["extensionspath"]
|
|
|
|
settings["sources"] = list_dir_no_hidden(data_path)
|
|
settings["extensionsources"] = list_dir_no_hidden(extensions_path)
|
|
|
|
# All our extensions folders...
|
|
settings["extensions"] = [
|
|
os.path.basename(item) for item in list_dir_no_hidden(extensions_path)
|
|
]
|
|
# ... intersected with the extensions passed-in as arguments, then sorted.
|
|
settings["extensions"] = sorted(
|
|
list(set(options["extensions"]).intersection(settings["extensions"]))
|
|
)
|
|
|
|
auto = settings["auto"]
|
|
exclusion_regexes = settings["exclusionregexes"]
|
|
source_data_filename = settings["sourcedatafilename"]
|
|
no_unified_hosts = settings["nounifiedhosts"]
|
|
|
|
update_sources = prompt_for_update(freshen=settings["freshen"], update_auto=auto)
|
|
if update_sources:
|
|
update_all_sources(source_data_filename, settings["hostfilename"])
|
|
|
|
gather_exclusions = prompt_for_exclusions(skip_prompt=auto)
|
|
|
|
if gather_exclusions:
|
|
common_exclusions = settings["commonexclusions"]
|
|
exclusion_pattern = settings["exclusionpattern"]
|
|
exclusion_regexes = display_exclusion_options(
|
|
common_exclusions=common_exclusions,
|
|
exclusion_pattern=exclusion_pattern,
|
|
exclusion_regexes=exclusion_regexes,
|
|
)
|
|
|
|
extensions = settings["extensions"]
|
|
sources_data = update_sources_data(
|
|
settings["sourcesdata"],
|
|
datapath=data_path,
|
|
extensions=extensions,
|
|
extensionspath=extensions_path,
|
|
sourcedatafilename=source_data_filename,
|
|
nounifiedhosts=no_unified_hosts,
|
|
)
|
|
|
|
merge_file = create_initial_file(
|
|
nounifiedhosts=no_unified_hosts,
|
|
)
|
|
remove_old_hosts_file(settings["outputpath"], "hosts", settings["backup"])
|
|
if settings["compress"]:
|
|
final_file = open(path_join_robust(settings["outputpath"], "hosts"), "w+b")
|
|
compressed_file = tempfile.NamedTemporaryFile()
|
|
remove_dups_and_excl(merge_file, exclusion_regexes, compressed_file)
|
|
compress_file(compressed_file, settings["targetip"], final_file)
|
|
elif settings["minimise"]:
|
|
final_file = open(path_join_robust(settings["outputpath"], "hosts"), "w+b")
|
|
minimised_file = tempfile.NamedTemporaryFile()
|
|
remove_dups_and_excl(merge_file, exclusion_regexes, minimised_file)
|
|
minimise_file(minimised_file, settings["targetip"], final_file)
|
|
else:
|
|
final_file = remove_dups_and_excl(merge_file, exclusion_regexes)
|
|
|
|
number_of_rules = settings["numberofrules"]
|
|
output_subfolder = settings["outputsubfolder"]
|
|
skip_static_hosts = settings["skipstatichosts"]
|
|
|
|
write_opening_header(
|
|
final_file,
|
|
extensions=extensions,
|
|
numberofrules=number_of_rules,
|
|
outputsubfolder=output_subfolder,
|
|
skipstatichosts=skip_static_hosts,
|
|
nounifiedhosts=no_unified_hosts,
|
|
)
|
|
final_file.close()
|
|
|
|
if not settings["nogendata"]:
|
|
update_readme_data(
|
|
settings["readmedatafilename"],
|
|
extensions=extensions,
|
|
numberofrules=number_of_rules,
|
|
outputsubfolder=output_subfolder,
|
|
sourcesdata=sources_data,
|
|
nounifiedhosts=no_unified_hosts,
|
|
)
|
|
|
|
print_success(
|
|
"Success! The hosts file has been saved in folder "
|
|
+ output_subfolder
|
|
+ "\nIt contains "
|
|
+ "{:,}".format(number_of_rules)
|
|
+ " unique entries."
|
|
)
|
|
|
|
move_file = prompt_for_move(
|
|
final_file,
|
|
auto=auto,
|
|
replace=settings["replace"],
|
|
skipstatichosts=skip_static_hosts,
|
|
)
|
|
|
|
# We only flush the DNS cache if we have
|
|
# moved a new hosts file into place.
|
|
if move_file:
|
|
prompt_for_flush_dns_cache(
|
|
flush_cache=settings["flushdnscache"], prompt_flush=not auto
|
|
)
|
|
|
|
|
|
# Prompt the User
|
|
def prompt_for_update(freshen, update_auto):
|
|
"""
|
|
Prompt the user to update all hosts files.
|
|
|
|
If requested, the function will update all data sources after it
|
|
checks that a hosts file does indeed exist.
|
|
|
|
Parameters
|
|
----------
|
|
freshen : bool
|
|
Whether data sources should be updated. This function will return
|
|
if it is requested that data sources not be updated.
|
|
update_auto : bool
|
|
Whether or not to automatically update all data sources.
|
|
|
|
Returns
|
|
-------
|
|
update_sources : bool
|
|
Whether or not we should update data sources for exclusion files.
|
|
"""
|
|
|
|
# Create a hosts file if it doesn't exist.
|
|
hosts_file = path_join_robust(BASEDIR_PATH, "hosts")
|
|
|
|
if not os.path.isfile(hosts_file):
|
|
try:
|
|
open(hosts_file, "w+").close()
|
|
except (IOError, OSError):
|
|
# Starting in Python 3.3, IOError is aliased
|
|
# OSError. However, we have to catch both for
|
|
# Python 2.x failures.
|
|
print_failure(
|
|
"ERROR: No 'hosts' file in the folder. Try creating one manually."
|
|
)
|
|
|
|
if not freshen:
|
|
return
|
|
|
|
prompt = "Do you want to update all data sources?"
|
|
|
|
if update_auto or query_yes_no(prompt):
|
|
return True
|
|
elif not update_auto:
|
|
print("OK, we'll stick with what we've got locally.")
|
|
|
|
return False
|
|
|
|
|
|
def prompt_for_exclusions(skip_prompt):
|
|
"""
|
|
Prompt the user to exclude any custom domains from being blocked.
|
|
|
|
Parameters
|
|
----------
|
|
skip_prompt : bool
|
|
Whether or not to skip prompting for custom domains to be excluded.
|
|
If true, the function returns immediately.
|
|
|
|
Returns
|
|
-------
|
|
gather_exclusions : bool
|
|
Whether or not we should proceed to prompt the user to exclude any
|
|
custom domains beyond those in the whitelist.
|
|
"""
|
|
|
|
prompt = (
|
|
"Do you want to exclude any domains?\n"
|
|
"For example, hulu.com video streaming must be able to access "
|
|
"its tracking and ad servers in order to play video."
|
|
)
|
|
|
|
if not skip_prompt:
|
|
if query_yes_no(prompt):
|
|
return True
|
|
else:
|
|
print("OK, we'll only exclude domains in the whitelist.")
|
|
|
|
return False
|
|
|
|
|
|
def prompt_for_flush_dns_cache(flush_cache, prompt_flush):
|
|
"""
|
|
Prompt the user to flush the DNS cache.
|
|
|
|
Parameters
|
|
----------
|
|
flush_cache : bool
|
|
Whether to flush the DNS cache without prompting.
|
|
prompt_flush : bool
|
|
If `flush_cache` is False, whether we should prompt for flushing the
|
|
cache. Otherwise, the function returns immediately.
|
|
"""
|
|
|
|
if flush_cache:
|
|
flush_dns_cache()
|
|
elif prompt_flush:
|
|
if query_yes_no("Attempt to flush the DNS cache?"):
|
|
flush_dns_cache()
|
|
|
|
|
|
def prompt_for_move(final_file, **move_params):
|
|
"""
|
|
Prompt the user to move the newly created hosts file to its designated
|
|
location in the OS.
|
|
|
|
Parameters
|
|
----------
|
|
final_file : file
|
|
The file object that contains the newly created hosts data.
|
|
move_params : kwargs
|
|
Dictionary providing additional parameters for moving the hosts file
|
|
into place. Currently, those fields are:
|
|
|
|
1) auto
|
|
2) replace
|
|
3) skipstatichosts
|
|
|
|
Returns
|
|
-------
|
|
move_file : bool
|
|
Whether or not the final hosts file was moved.
|
|
"""
|
|
|
|
skip_static_hosts = move_params["skipstatichosts"]
|
|
|
|
if move_params["replace"] and not skip_static_hosts:
|
|
move_file = True
|
|
elif move_params["auto"] or skip_static_hosts:
|
|
move_file = False
|
|
else:
|
|
prompt = "Do you want to replace your existing hosts file with the newly generated file?"
|
|
move_file = query_yes_no(prompt)
|
|
|
|
if move_file:
|
|
move_file = move_hosts_file_into_place(final_file)
|
|
|
|
return move_file
|
|
|
|
|
|
# End Prompt the User
|
|
|
|
|
|
def sort_sources(sources):
|
|
"""
|
|
Sorts the sources.
|
|
The idea is that all Steven Black's list, file or entries
|
|
get on top and the rest sorted alphabetically.
|
|
|
|
Parameters
|
|
----------
|
|
sources: list
|
|
The sources to sort.
|
|
"""
|
|
|
|
result = sorted(
|
|
sources.copy(),
|
|
key=lambda x: x.lower().replace("-", "").replace("_", "").replace(" ", ""),
|
|
)
|
|
|
|
# Steven Black's repositories/files/lists should be on top!
|
|
steven_black_positions = [
|
|
x for x, y in enumerate(result) if "stevenblack" in y.lower()
|
|
]
|
|
|
|
for index in steven_black_positions:
|
|
result.insert(0, result.pop(index))
|
|
|
|
return result
|
|
|
|
|
|
# Exclusion logic
|
|
def display_exclusion_options(common_exclusions, exclusion_pattern, exclusion_regexes):
|
|
"""
|
|
Display the exclusion options to the user.
|
|
|
|
This function checks whether a user wants to exclude particular domains,
|
|
and if so, excludes them.
|
|
|
|
Parameters
|
|
----------
|
|
common_exclusions : list
|
|
A list of common domains that are excluded from being blocked. One
|
|
example is Hulu. This setting is set directly in the script and cannot
|
|
be overwritten by the user.
|
|
exclusion_pattern : str
|
|
The exclusion pattern with which to create the domain regex.
|
|
exclusion_regexes : list
|
|
The list of regex patterns used to exclude domains.
|
|
|
|
Returns
|
|
-------
|
|
aug_exclusion_regexes : list
|
|
The original list of regex patterns potentially with additional
|
|
patterns from domains that the user chooses to exclude.
|
|
"""
|
|
|
|
for exclusion_option in common_exclusions:
|
|
prompt = "Do you want to exclude the domain " + exclusion_option + " ?"
|
|
|
|
if query_yes_no(prompt):
|
|
exclusion_regexes = exclude_domain(
|
|
exclusion_option, exclusion_pattern, exclusion_regexes
|
|
)
|
|
else:
|
|
continue
|
|
|
|
if query_yes_no("Do you want to exclude any other domains?"):
|
|
exclusion_regexes = gather_custom_exclusions(
|
|
exclusion_pattern, exclusion_regexes
|
|
)
|
|
|
|
return exclusion_regexes
|
|
|
|
|
|
def gather_custom_exclusions(exclusion_pattern, exclusion_regexes):
|
|
"""
|
|
Gather custom exclusions from the user.
|
|
|
|
Parameters
|
|
----------
|
|
exclusion_pattern : str
|
|
The exclusion pattern with which to create the domain regex.
|
|
exclusion_regexes : list
|
|
The list of regex patterns used to exclude domains.
|
|
|
|
Returns
|
|
-------
|
|
aug_exclusion_regexes : list
|
|
The original list of regex patterns potentially with additional
|
|
patterns from domains that the user chooses to exclude.
|
|
"""
|
|
|
|
# We continue running this while-loop until the user
|
|
# says that they have no more domains to exclude.
|
|
while True:
|
|
domain_prompt = "Enter the domain you want to exclude (e.g. facebook.com): "
|
|
user_domain = input(domain_prompt)
|
|
|
|
if is_valid_user_provided_domain_format(user_domain):
|
|
exclusion_regexes = exclude_domain(
|
|
user_domain, exclusion_pattern, exclusion_regexes
|
|
)
|
|
|
|
continue_prompt = "Do you have more domains you want to enter?"
|
|
if not query_yes_no(continue_prompt):
|
|
break
|
|
|
|
return exclusion_regexes
|
|
|
|
|
|
def exclude_domain(domain, exclusion_pattern, exclusion_regexes):
|
|
"""
|
|
Exclude a domain from being blocked.
|
|
|
|
This creates the domain regex by which to exclude this domain and appends
|
|
it a list of already-existing exclusion regexes.
|
|
|
|
Parameters
|
|
----------
|
|
domain : str
|
|
The filename or regex pattern to exclude.
|
|
exclusion_pattern : str
|
|
The exclusion pattern with which to create the domain regex.
|
|
exclusion_regexes : list
|
|
The list of regex patterns used to exclude domains.
|
|
|
|
Returns
|
|
-------
|
|
aug_exclusion_regexes : list
|
|
The original list of regex patterns with one additional pattern from
|
|
the `domain` input.
|
|
"""
|
|
|
|
exclusion_regex = re.compile(exclusion_pattern + domain)
|
|
exclusion_regexes.append(exclusion_regex)
|
|
|
|
return exclusion_regexes
|
|
|
|
|
|
def matches_exclusions(stripped_rule, exclusion_regexes):
|
|
"""
|
|
Check whether a rule matches an exclusion rule we already provided.
|
|
|
|
If this function returns True, that means this rule should be excluded
|
|
from the final hosts file.
|
|
|
|
Parameters
|
|
----------
|
|
stripped_rule : str
|
|
The rule that we are checking.
|
|
exclusion_regexes : list
|
|
The list of regex patterns used to exclude domains.
|
|
|
|
Returns
|
|
-------
|
|
matches_exclusion : bool
|
|
Whether or not the rule string matches a provided exclusion.
|
|
"""
|
|
|
|
try:
|
|
stripped_domain = stripped_rule.split()[1]
|
|
except IndexError:
|
|
# Example: 'example.org' instead of '0.0.0.0 example.org'
|
|
stripped_domain = stripped_rule
|
|
|
|
for exclusionRegex in exclusion_regexes:
|
|
if exclusionRegex.search(stripped_domain):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
# End Exclusion Logic
|
|
|
|
|
|
# Update Logic
|
|
def update_sources_data(sources_data, **sources_params):
|
|
"""
|
|
Update the sources data and information for each source.
|
|
|
|
Parameters
|
|
----------
|
|
sources_data : list
|
|
The list of sources data that we are to update.
|
|
sources_params : kwargs
|
|
Dictionary providing additional parameters for updating the
|
|
sources data. Currently, those fields are:
|
|
|
|
1) datapath
|
|
2) extensions
|
|
3) extensionspath
|
|
4) sourcedatafilename
|
|
5) nounifiedhosts
|
|
|
|
Returns
|
|
-------
|
|
update_sources_data : list
|
|
The original source data list with new source data appended.
|
|
"""
|
|
|
|
source_data_filename = sources_params["sourcedatafilename"]
|
|
|
|
if not sources_params["nounifiedhosts"]:
|
|
for source in sort_sources(
|
|
recursive_glob(sources_params["datapath"], source_data_filename)
|
|
):
|
|
update_file = open(source, "r", encoding="UTF-8")
|
|
try:
|
|
update_data = json.load(update_file)
|
|
sources_data.append(update_data)
|
|
finally:
|
|
update_file.close()
|
|
|
|
for source in sources_params["extensions"]:
|
|
source_dir = path_join_robust(sources_params["extensionspath"], source)
|
|
for update_file_path in sort_sources(
|
|
recursive_glob(source_dir, source_data_filename)
|
|
):
|
|
update_file = open(update_file_path, "r")
|
|
try:
|
|
update_data = json.load(update_file)
|
|
sources_data.append(update_data)
|
|
finally:
|
|
update_file.close()
|
|
|
|
return sources_data
|
|
|
|
|
|
def jsonarray(json_array_string):
|
|
"""
|
|
Transformer, converts a json array string hosts into one host per
|
|
line, prefixing each line with "127.0.0.1 ".
|
|
|
|
Parameters
|
|
----------
|
|
json_array_string : str
|
|
The json array string in the form
|
|
'["example1.com", "example1.com", ...]'
|
|
"""
|
|
|
|
temp_list = json.loads(json_array_string)
|
|
hostlines = "127.0.0.1 " + "\n127.0.0.1 ".join(temp_list)
|
|
return hostlines
|
|
|
|
|
|
def update_all_sources(source_data_filename, host_filename):
|
|
"""
|
|
Update all host files, regardless of folder depth.
|
|
|
|
Parameters
|
|
----------
|
|
source_data_filename : str
|
|
The name of the filename where information regarding updating
|
|
sources for a particular URL is stored. This filename is assumed
|
|
to be the same for all sources.
|
|
host_filename : str
|
|
The name of the file in which the updated source information
|
|
is stored for a particular URL. This filename is assumed to be
|
|
the same for all sources.
|
|
"""
|
|
|
|
# The transforms we support
|
|
transform_methods = {"jsonarray": jsonarray}
|
|
|
|
all_sources = sort_sources(recursive_glob("*", source_data_filename))
|
|
|
|
for source in all_sources:
|
|
update_file = open(source, "r", encoding="UTF-8")
|
|
update_data = json.load(update_file)
|
|
update_file.close()
|
|
|
|
# we can pause updating any given hosts source.
|
|
# if the update.json "pause" key is missing, don't pause.
|
|
if update_data.get("pause", False):
|
|
continue
|
|
|
|
update_url = update_data["url"]
|
|
update_transforms = []
|
|
if update_data.get("transforms"):
|
|
update_transforms = update_data["transforms"]
|
|
|
|
print("Updating source " + os.path.dirname(source) + " from " + update_url)
|
|
|
|
try:
|
|
updated_file = get_file_by_url(update_url)
|
|
|
|
# spin the transforms as required
|
|
for transform in update_transforms:
|
|
updated_file = transform_methods[transform](updated_file)
|
|
|
|
# get rid of carriage-return symbols
|
|
updated_file = updated_file.replace("\r", "")
|
|
|
|
hosts_file = open(
|
|
path_join_robust(BASEDIR_PATH, os.path.dirname(source), host_filename),
|
|
"wb",
|
|
)
|
|
write_data(hosts_file, updated_file)
|
|
hosts_file.close()
|
|
except Exception:
|
|
print("Error in updating source: ", update_url)
|
|
|
|
|
|
# End Update Logic
|
|
|
|
|
|
# File Logic
|
|
def create_initial_file(**initial_file_params):
|
|
"""
|
|
Initialize the file in which we merge all host files for later pruning.
|
|
|
|
Parameters
|
|
----------
|
|
header_params : kwargs
|
|
Dictionary providing additional parameters for populating the initial file
|
|
information. Currently, those fields are:
|
|
|
|
1) nounifiedhosts
|
|
"""
|
|
|
|
merge_file = tempfile.NamedTemporaryFile()
|
|
|
|
if not initial_file_params["nounifiedhosts"]:
|
|
# spin the sources for the base file
|
|
for source in sort_sources(
|
|
recursive_glob(settings["datapath"], settings["hostfilename"])
|
|
):
|
|
start = "# Start {}\n\n".format(os.path.basename(os.path.dirname(source)))
|
|
end = "\n# End {}\n\n".format(os.path.basename(os.path.dirname(source)))
|
|
|
|
with open(source, "r", encoding="UTF-8") as curFile:
|
|
write_data(merge_file, start + curFile.read() + end)
|
|
|
|
# spin the sources for extensions to the base file
|
|
for source in settings["extensions"]:
|
|
for filename in sort_sources(
|
|
recursive_glob(
|
|
path_join_robust(settings["extensionspath"], source),
|
|
settings["hostfilename"],
|
|
)
|
|
):
|
|
with open(filename, "r") as curFile:
|
|
write_data(merge_file, curFile.read())
|
|
|
|
maybe_copy_example_file(settings["blacklistfile"])
|
|
|
|
if os.path.isfile(settings["blacklistfile"]):
|
|
with open(settings["blacklistfile"], "r") as curFile:
|
|
write_data(merge_file, curFile.read())
|
|
|
|
return merge_file
|
|
|
|
|
|
def compress_file(input_file, target_ip, output_file):
|
|
"""
|
|
Reduce the file dimension removing non-necessary lines (empty lines and
|
|
comments) and putting multiple domains in each line.
|
|
Reducing the number of lines of the file, the parsing under Microsoft
|
|
Windows is much faster.
|
|
|
|
Parameters
|
|
----------
|
|
input_file : file
|
|
The file object that contains the hostnames that we are reducing.
|
|
target_ip : str
|
|
The target IP address.
|
|
output_file : file
|
|
The file object that will contain the reduced hostnames.
|
|
"""
|
|
|
|
input_file.seek(0) # reset file pointer
|
|
write_data(output_file, "\n")
|
|
|
|
target_ip_len = len(target_ip)
|
|
lines = [target_ip]
|
|
lines_index = 0
|
|
for line in input_file.readlines():
|
|
line = line.decode("UTF-8")
|
|
|
|
if line.startswith(target_ip):
|
|
if lines[lines_index].count(" ") < 9:
|
|
lines[lines_index] += (
|
|
" " + line[target_ip_len : line.find("#")].strip() # noqa: E203
|
|
)
|
|
else:
|
|
lines[lines_index] += "\n"
|
|
lines.append(line[: line.find("#")].strip())
|
|
lines_index += 1
|
|
|
|
for line in lines:
|
|
write_data(output_file, line)
|
|
|
|
input_file.close()
|
|
|
|
|
|
def minimise_file(input_file, target_ip, output_file):
|
|
"""
|
|
Reduce the file dimension removing non-necessary lines (empty lines and
|
|
comments).
|
|
|
|
Parameters
|
|
----------
|
|
input_file : file
|
|
The file object that contains the hostnames that we are reducing.
|
|
target_ip : str
|
|
The target IP address.
|
|
output_file : file
|
|
The file object that will contain the reduced hostnames.
|
|
"""
|
|
|
|
input_file.seek(0) # reset file pointer
|
|
write_data(output_file, "\n")
|
|
|
|
lines = []
|
|
for line in input_file.readlines():
|
|
line = line.decode("UTF-8")
|
|
|
|
if line.startswith(target_ip):
|
|
lines.append(line[: line.find("#")].strip() + "\n")
|
|
|
|
for line in lines:
|
|
write_data(output_file, line)
|
|
|
|
input_file.close()
|
|
|
|
|
|
def remove_dups_and_excl(merge_file, exclusion_regexes, output_file=None):
|
|
"""
|
|
Remove duplicates and remove hosts that we are excluding.
|
|
|
|
We check for duplicate hostnames as well as remove any hostnames that
|
|
have been explicitly excluded by the user.
|
|
|
|
Parameters
|
|
----------
|
|
merge_file : file
|
|
The file object that contains the hostnames that we are pruning.
|
|
exclusion_regexes : list
|
|
The list of regex patterns used to exclude domains.
|
|
output_file : file
|
|
The file object in which the result is written. If None, the file
|
|
'settings["outputpath"]' will be created.
|
|
"""
|
|
|
|
number_of_rules = settings["numberofrules"]
|
|
maybe_copy_example_file(settings["whitelistfile"])
|
|
|
|
if os.path.isfile(settings["whitelistfile"]):
|
|
with open(settings["whitelistfile"], "r") as ins:
|
|
for line in ins:
|
|
line = line.strip(" \t\n\r")
|
|
if line and not line.startswith("#"):
|
|
settings["exclusions"].append(line)
|
|
|
|
if not os.path.exists(settings["outputpath"]):
|
|
os.makedirs(settings["outputpath"])
|
|
|
|
if output_file is None:
|
|
final_file = open(path_join_robust(settings["outputpath"], "hosts"), "w+b")
|
|
else:
|
|
final_file = output_file
|
|
|
|
merge_file.seek(0) # reset file pointer
|
|
hostnames = {"localhost", "localhost.localdomain", "local", "broadcasthost"}
|
|
exclusions = settings["exclusions"]
|
|
|
|
for line in merge_file.readlines():
|
|
write_line = True
|
|
|
|
# Explicit encoding
|
|
line = line.decode("UTF-8")
|
|
|
|
# replace tabs with space
|
|
line = line.replace("\t+", " ")
|
|
|
|
# see gh-271: trim trailing whitespace, periods
|
|
line = line.rstrip(" .")
|
|
|
|
# Testing the first character doesn't require startswith
|
|
if line[0] == "#" or re.match(r"^\s*$", line[0]):
|
|
write_data(final_file, line)
|
|
continue
|
|
if "::1" in line:
|
|
continue
|
|
|
|
stripped_rule = strip_rule(line) # strip comments
|
|
if not stripped_rule or matches_exclusions(stripped_rule, exclusion_regexes):
|
|
continue
|
|
|
|
# Issue #1628
|
|
if "@" in stripped_rule:
|
|
continue
|
|
|
|
# Normalize rule
|
|
hostname, normalized_rule = normalize_rule(
|
|
stripped_rule,
|
|
target_ip=settings["targetip"],
|
|
keep_domain_comments=settings["keepdomaincomments"],
|
|
)
|
|
|
|
for exclude in exclusions:
|
|
if re.search(r"(^|[\s\.])" + re.escape(exclude) + r"\s", line):
|
|
write_line = False
|
|
break
|
|
|
|
if normalized_rule and (hostname not in hostnames) and write_line:
|
|
write_data(final_file, normalized_rule)
|
|
hostnames.add(hostname)
|
|
number_of_rules += 1
|
|
|
|
settings["numberofrules"] = number_of_rules
|
|
merge_file.close()
|
|
|
|
if output_file is None:
|
|
return final_file
|
|
|
|
|
|
def normalize_rule(rule, target_ip, keep_domain_comments):
|
|
"""
|
|
Standardize and format the rule string provided.
|
|
|
|
Parameters
|
|
----------
|
|
rule : str
|
|
The rule whose spelling and spacing we are standardizing.
|
|
target_ip : str
|
|
The target IP address for the rule.
|
|
keep_domain_comments : bool
|
|
Whether or not to keep comments regarding these domains in
|
|
the normalized rule.
|
|
|
|
Returns
|
|
-------
|
|
normalized_rule : tuple
|
|
A tuple of the hostname and the rule string with spelling
|
|
and spacing reformatted.
|
|
"""
|
|
|
|
def normalize_response(
|
|
extracted_hostname: str, extracted_suffix: Optional[str]
|
|
) -> Tuple[str, str]:
|
|
"""
|
|
Normalizes the responses after the provision of the extracted
|
|
hostname and suffix - if exist.
|
|
|
|
Parameters
|
|
----------
|
|
extracted_hostname: str
|
|
The extracted hostname to work with.
|
|
extracted_suffix: str
|
|
The extracted suffix to with.
|
|
|
|
Returns
|
|
-------
|
|
normalized_response: tuple
|
|
A tuple of the hostname and the rule string with spelling
|
|
and spacing reformatted.
|
|
"""
|
|
|
|
rule = "%s %s" % (target_ip, extracted_hostname)
|
|
|
|
if keep_domain_comments and extracted_suffix:
|
|
if not extracted_suffix.strip().startswith("#"):
|
|
# Strings are stripped, therefore we need to add the space back.
|
|
rule += " # %s" % extracted_suffix
|
|
else:
|
|
rule += " %s" % extracted_suffix
|
|
|
|
return extracted_hostname, rule + "\n"
|
|
|
|
def is_ip(dataset: str) -> bool:
|
|
"""
|
|
Checks whether the given dataset is an IP.
|
|
|
|
Parameters
|
|
----------
|
|
|
|
dataset: str
|
|
The dataset to work with.
|
|
|
|
Returns
|
|
-------
|
|
is_ip: bool
|
|
Whether the dataset is an IP.
|
|
"""
|
|
|
|
try:
|
|
_ = ipaddress.ip_address(dataset)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
def belch_unwanted(unwanted: str) -> Tuple[None, None]:
|
|
"""
|
|
Belches unwanted to screen.
|
|
|
|
Parameters
|
|
----------
|
|
unwanted: str
|
|
The unwanted string to belch.
|
|
|
|
Returns
|
|
-------
|
|
belched: tuple
|
|
A tuple of None, None.
|
|
"""
|
|
|
|
"""
|
|
finally, if we get here, just belch to screen
|
|
"""
|
|
print("==>%s<==" % unwanted)
|
|
return None, None
|
|
|
|
"""
|
|
first try: IP followed by domain
|
|
"""
|
|
|
|
static_ip_regex = r"^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})$"
|
|
split_rule = rule.split(maxsplit=1)
|
|
|
|
if is_ip(split_rule[0]):
|
|
# Assume that the first item is an IP address following the rule.
|
|
|
|
if " " or "\t" in split_rule[-1]:
|
|
try:
|
|
# Example: 0.0.0.0 example.org # hello, world!
|
|
hostname, suffix = split_rule[-1].split(maxsplit=1)
|
|
except ValueError:
|
|
# Example: 0.0.0.0 example.org[:space:]
|
|
hostname, suffix = split_rule[-1], None
|
|
else:
|
|
# Example: 0.0.0.0 example.org
|
|
hostname, suffix = split_rule[-1], None
|
|
|
|
hostname = hostname.lower()
|
|
|
|
if (
|
|
is_ip(hostname)
|
|
or re.search(static_ip_regex, hostname)
|
|
or "." not in hostname
|
|
or "/" in hostname
|
|
or ".." in hostname
|
|
or ":" in hostname
|
|
):
|
|
# Example: 0.0.0.0 127.0.0.1
|
|
|
|
# If the hostname is:
|
|
# - an IP - or looks like it,
|
|
# - doesn't contain dots, or
|
|
# - contains a colon,
|
|
# we don't want to normalize it.
|
|
return belch_unwanted(rule)
|
|
|
|
return normalize_response(hostname, suffix)
|
|
|
|
if (
|
|
not re.search(static_ip_regex, split_rule[0])
|
|
and ":" not in split_rule[0]
|
|
and ".." not in split_rule[0]
|
|
and "/" not in split_rule[0]
|
|
and "." in split_rule[0]
|
|
):
|
|
# Deny anything that looks like an IP; doesn't container dots or INVALID.
|
|
|
|
try:
|
|
hostname, suffix = split_rule
|
|
except ValueError:
|
|
hostname, suffix = split_rule[0], None
|
|
|
|
hostname = hostname.lower()
|
|
|
|
return normalize_response(hostname, suffix)
|
|
|
|
return belch_unwanted(rule)
|
|
|
|
|
|
def strip_rule(line):
|
|
"""
|
|
Sanitize a rule string provided before writing it to the output hosts file.
|
|
|
|
Parameters
|
|
----------
|
|
line : str
|
|
The rule provided for sanitation.
|
|
|
|
Returns
|
|
-------
|
|
sanitized_line : str
|
|
The sanitized rule.
|
|
"""
|
|
|
|
return " ".join(line.split())
|
|
|
|
|
|
def write_opening_header(final_file, **header_params):
|
|
"""
|
|
Write the header information into the newly-created hosts file.
|
|
|
|
Parameters
|
|
----------
|
|
final_file : file
|
|
The file object that points to the newly-created hosts file.
|
|
header_params : kwargs
|
|
Dictionary providing additional parameters for populating the header
|
|
information. Currently, those fields are:
|
|
|
|
1) extensions
|
|
2) numberofrules
|
|
3) outputsubfolder
|
|
4) skipstatichosts
|
|
5) nounifiedhosts
|
|
"""
|
|
|
|
final_file.seek(0) # Reset file pointer.
|
|
file_contents = final_file.read() # Save content.
|
|
|
|
final_file.seek(0) # Write at the top.
|
|
|
|
no_unified_hosts = header_params["nounifiedhosts"]
|
|
|
|
if header_params["extensions"]:
|
|
if no_unified_hosts:
|
|
if len(header_params["extensions"]) > 1:
|
|
write_data(
|
|
final_file,
|
|
"# Title: StevenBlack/hosts extensions {0} and {1} \n#\n".format(
|
|
", ".join(header_params["extensions"][:-1]),
|
|
header_params["extensions"][-1],
|
|
),
|
|
)
|
|
else:
|
|
write_data(
|
|
final_file,
|
|
"# Title: StevenBlack/hosts extension {0}\n#\n".format(
|
|
", ".join(header_params["extensions"])
|
|
),
|
|
)
|
|
else:
|
|
if len(header_params["extensions"]) > 1:
|
|
write_data(
|
|
final_file,
|
|
"# Title: StevenBlack/hosts with the {0} and {1} extensions\n#\n".format(
|
|
", ".join(header_params["extensions"][:-1]),
|
|
header_params["extensions"][-1],
|
|
),
|
|
)
|
|
else:
|
|
write_data(
|
|
final_file,
|
|
"# Title: StevenBlack/hosts with the {0} extension\n#\n".format(
|
|
", ".join(header_params["extensions"])
|
|
),
|
|
)
|
|
else:
|
|
write_data(final_file, "# Title: StevenBlack/hosts\n#\n")
|
|
|
|
write_data(
|
|
final_file,
|
|
"# This hosts file is a merged collection "
|
|
"of hosts from reputable sources,\n",
|
|
)
|
|
write_data(final_file, "# with a dash of crowd sourcing via GitHub\n#\n")
|
|
write_data(
|
|
final_file,
|
|
"# Date: " + time.strftime("%d %B %Y %H:%M:%S (%Z)", time.gmtime()) + "\n",
|
|
)
|
|
|
|
if header_params["extensions"]:
|
|
if header_params["nounifiedhosts"]:
|
|
write_data(
|
|
final_file,
|
|
"# The unified hosts file was not used while generating this file.\n"
|
|
"# Extensions used to generate this file: "
|
|
+ ", ".join(header_params["extensions"])
|
|
+ "\n",
|
|
)
|
|
else:
|
|
write_data(
|
|
final_file,
|
|
"# Extensions added to this file: "
|
|
+ ", ".join(header_params["extensions"])
|
|
+ "\n",
|
|
)
|
|
|
|
write_data(
|
|
final_file,
|
|
(
|
|
"# Number of unique domains: {:,}\n#\n".format(
|
|
header_params["numberofrules"]
|
|
)
|
|
),
|
|
)
|
|
write_data(
|
|
final_file,
|
|
"# Fetch the latest version of this file: "
|
|
"https://raw.githubusercontent.com/StevenBlack/hosts/master/"
|
|
+ path_join_robust(header_params["outputsubfolder"], "").replace("\\", "/")
|
|
+ "hosts\n",
|
|
)
|
|
write_data(
|
|
final_file, "# Project home page: https://github.com/StevenBlack/hosts\n"
|
|
)
|
|
write_data(
|
|
final_file,
|
|
"# Project releases: https://github.com/StevenBlack/hosts/releases\n#\n",
|
|
)
|
|
write_data(
|
|
final_file,
|
|
"# ===============================================================\n",
|
|
)
|
|
write_data(final_file, "\n")
|
|
|
|
if not header_params["skipstatichosts"]:
|
|
write_data(final_file, "127.0.0.1 localhost\n")
|
|
write_data(final_file, "127.0.0.1 localhost.localdomain\n")
|
|
write_data(final_file, "127.0.0.1 local\n")
|
|
write_data(final_file, "255.255.255.255 broadcasthost\n")
|
|
write_data(final_file, "::1 localhost\n")
|
|
write_data(final_file, "::1 ip6-localhost\n")
|
|
write_data(final_file, "::1 ip6-loopback\n")
|
|
write_data(final_file, "fe80::1%lo0 localhost\n")
|
|
write_data(final_file, "ff00::0 ip6-localnet\n")
|
|
write_data(final_file, "ff00::0 ip6-mcastprefix\n")
|
|
write_data(final_file, "ff02::1 ip6-allnodes\n")
|
|
write_data(final_file, "ff02::2 ip6-allrouters\n")
|
|
write_data(final_file, "ff02::3 ip6-allhosts\n")
|
|
write_data(final_file, "0.0.0.0 0.0.0.0\n")
|
|
|
|
if platform.system() == "Linux":
|
|
write_data(final_file, "127.0.1.1 " + socket.gethostname() + "\n")
|
|
write_data(final_file, "127.0.0.53 " + socket.gethostname() + "\n")
|
|
|
|
write_data(final_file, "\n")
|
|
|
|
preamble = path_join_robust(BASEDIR_PATH, "myhosts")
|
|
maybe_copy_example_file(preamble)
|
|
|
|
if os.path.isfile(preamble):
|
|
with open(preamble, "r") as f:
|
|
write_data(final_file, f.read())
|
|
|
|
final_file.write(file_contents)
|
|
|
|
|
|
def update_readme_data(readme_file, **readme_updates):
|
|
"""
|
|
Update the host and website information provided in the README JSON data.
|
|
|
|
Parameters
|
|
----------
|
|
readme_file : str
|
|
The name of the README file to update.
|
|
readme_updates : kwargs
|
|
Dictionary providing additional JSON fields to update before
|
|
saving the data. Currently, those fields are:
|
|
|
|
1) extensions
|
|
2) sourcesdata
|
|
3) numberofrules
|
|
4) outputsubfolder
|
|
5) nounifiedhosts
|
|
"""
|
|
|
|
extensions_key = "base"
|
|
extensions = readme_updates["extensions"]
|
|
no_unified_hosts = readme_updates["nounifiedhosts"]
|
|
|
|
if extensions:
|
|
extensions_key = "-".join(extensions)
|
|
if no_unified_hosts:
|
|
extensions_key = extensions_key + "-only"
|
|
|
|
output_folder = readme_updates["outputsubfolder"]
|
|
generation_data = {
|
|
"location": path_join_robust(output_folder, ""),
|
|
"no_unified_hosts": no_unified_hosts,
|
|
"entries": readme_updates["numberofrules"],
|
|
"sourcesdata": readme_updates["sourcesdata"],
|
|
}
|
|
|
|
with open(readme_file, "r") as f:
|
|
readme_data = json.load(f)
|
|
readme_data[extensions_key] = generation_data
|
|
|
|
for denomination, data in readme_data.copy().items():
|
|
if "location" in data and data["location"] and "\\" in data["location"]:
|
|
# Windows compatibility: #1166
|
|
readme_data[denomination]["location"] = data["location"].replace("\\", "/")
|
|
|
|
with open(readme_file, "w") as f:
|
|
json.dump(readme_data, f)
|
|
|
|
|
|
def move_hosts_file_into_place(final_file):
|
|
"""
|
|
Move the newly-created hosts file into its correct location on the OS.
|
|
|
|
For UNIX systems, the hosts file is "etc/hosts." On Windows, it's
|
|
"C:\Windows\System32\drivers\etc\hosts."
|
|
|
|
For this move to work, you must have administrator privileges to do this.
|
|
On UNIX systems, this means having "sudo" access, and on Windows, it
|
|
means being able to run command prompt in administrator mode.
|
|
|
|
Parameters
|
|
----------
|
|
final_file : file object
|
|
The newly-created hosts file to move.
|
|
""" # noqa: W605
|
|
|
|
filename = os.path.abspath(final_file.name)
|
|
|
|
try:
|
|
if not Path(filename).exists():
|
|
raise FileNotFoundError
|
|
except Exception:
|
|
print_failure(f"{filename} does not exist.")
|
|
return False
|
|
|
|
if platform.system() == "Windows":
|
|
target_file = str(
|
|
Path(os.getenv("SystemRoot")) / "system32" / "drivers" / "etc" / "hosts"
|
|
)
|
|
else:
|
|
target_file = "/etc/hosts"
|
|
|
|
if os.getenv("IN_CONTAINER"):
|
|
# It's not allowed to remove/replace a mounted /etc/hosts, so we replace the content.
|
|
# This requires running the container user as root, as is the default.
|
|
print(f"Running in container, so we will replace the content of {target_file}.")
|
|
try:
|
|
with open(target_file, "w") as target_stream:
|
|
with open(filename, "r") as source_stream:
|
|
source = source_stream.read()
|
|
target_stream.write(source)
|
|
return True
|
|
except Exception:
|
|
print_failure(f"Replacing content of {target_file} failed.")
|
|
return False
|
|
elif (
|
|
platform.system() == "Linux"
|
|
or platform.system() == "Windows"
|
|
or platform.system() == "Darwin"
|
|
):
|
|
print(
|
|
f"Replacing {target_file} requires root privileges. You might need to enter your password."
|
|
)
|
|
try:
|
|
subprocess.run(SUDO + ["cp", filename, target_file], check=True)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
print_failure(f"Replacing {target_file} failed.")
|
|
return False
|
|
|
|
|
|
def flush_dns_cache():
|
|
"""
|
|
Flush the DNS cache.
|
|
"""
|
|
|
|
print("Flushing the DNS cache to utilize new hosts file...")
|
|
print(
|
|
"Flushing the DNS cache requires administrative privileges. You might need to enter your password."
|
|
)
|
|
|
|
dns_cache_found = False
|
|
|
|
if platform.system() == "Darwin":
|
|
if subprocess.call(SUDO + ["killall", "-HUP", "mDNSResponder"]):
|
|
print_failure("Flushing the DNS cache failed.")
|
|
elif os.name == "nt":
|
|
print("Automatically flushing the DNS cache is not yet supported.")
|
|
print(
|
|
"Please copy and paste the command 'ipconfig /flushdns' in "
|
|
"administrator command prompt after running this script."
|
|
)
|
|
else:
|
|
nscd_prefixes = ["/etc", "/etc/rc.d"]
|
|
nscd_msg = "Flushing the DNS cache by restarting nscd {result}"
|
|
|
|
for nscd_prefix in nscd_prefixes:
|
|
nscd_cache = nscd_prefix + "/init.d/nscd"
|
|
|
|
if os.path.isfile(nscd_cache):
|
|
dns_cache_found = True
|
|
|
|
if subprocess.call(SUDO + [nscd_cache, "restart"]):
|
|
print_failure(nscd_msg.format(result="failed"))
|
|
else:
|
|
print_success(nscd_msg.format(result="succeeded"))
|
|
|
|
centos_file = "/etc/init.d/network"
|
|
centos_msg = "Flushing the DNS cache by restarting network {result}"
|
|
|
|
if os.path.isfile(centos_file):
|
|
if subprocess.call(SUDO + [centos_file, "restart"]):
|
|
print_failure(centos_msg.format(result="failed"))
|
|
else:
|
|
print_success(centos_msg.format(result="succeeded"))
|
|
|
|
system_prefixes = ["/usr", ""]
|
|
service_types = ["NetworkManager", "wicd", "dnsmasq", "networking"]
|
|
restarted_services = []
|
|
|
|
for system_prefix in system_prefixes:
|
|
systemctl = system_prefix + "/bin/systemctl"
|
|
system_dir = system_prefix + "/lib/systemd/system"
|
|
|
|
for service_type in service_types:
|
|
service = service_type + ".service"
|
|
if service in restarted_services:
|
|
continue
|
|
|
|
service_file = path_join_robust(system_dir, service)
|
|
service_msg = (
|
|
"Flushing the DNS cache by restarting " + service + " {result}"
|
|
)
|
|
|
|
if os.path.isfile(service_file):
|
|
if 0 != subprocess.call(
|
|
[systemctl, "status", service], stdout=subprocess.DEVNULL
|
|
):
|
|
continue
|
|
dns_cache_found = True
|
|
|
|
if subprocess.call(SUDO + [systemctl, "restart", service]):
|
|
print_failure(service_msg.format(result="failed"))
|
|
else:
|
|
print_success(service_msg.format(result="succeeded"))
|
|
restarted_services.append(service)
|
|
|
|
dns_clean_file = "/etc/init.d/dns-clean"
|
|
dns_clean_msg = "Flushing the DNS cache via dns-clean executable {result}"
|
|
|
|
if os.path.isfile(dns_clean_file):
|
|
dns_cache_found = True
|
|
|
|
if subprocess.call(SUDO + [dns_clean_file, "start"]):
|
|
print_failure(dns_clean_msg.format(result="failed"))
|
|
else:
|
|
print_success(dns_clean_msg.format(result="succeeded"))
|
|
|
|
if not dns_cache_found:
|
|
print_failure("Unable to determine DNS management tool.")
|
|
|
|
|
|
def remove_old_hosts_file(path_to_file, file_name, backup):
|
|
"""
|
|
Remove the old hosts file.
|
|
|
|
This is a hotfix because merging with an already existing hosts file leads
|
|
to artifacts and duplicates.
|
|
|
|
Parameters
|
|
----------
|
|
backup : boolean, default False
|
|
Whether or not to backup the existing hosts file.
|
|
"""
|
|
|
|
full_file_path = path_join_robust(path_to_file, file_name)
|
|
|
|
if os.path.exists(full_file_path):
|
|
if backup:
|
|
backup_file_path = full_file_path + "-{}".format(
|
|
time.strftime("%Y-%m-%d-%H-%M-%S")
|
|
)
|
|
|
|
# Make a backup copy, marking the date in which the list was updated
|
|
shutil.copy(full_file_path, backup_file_path)
|
|
|
|
os.remove(full_file_path)
|
|
|
|
# Create directory if not exists
|
|
if not os.path.exists(path_to_file):
|
|
os.makedirs(path_to_file)
|
|
|
|
# Create new empty hosts file
|
|
open(full_file_path, "a").close()
|
|
|
|
|
|
# End File Logic
|
|
|
|
|
|
def domain_to_idna(line):
|
|
"""
|
|
Encode a domain that is present into a line into `idna`. This way we
|
|
avoid most encoding issues.
|
|
|
|
Parameters
|
|
----------
|
|
line : str
|
|
The line we have to encode/decode.
|
|
|
|
Returns
|
|
-------
|
|
line : str
|
|
The line in a converted format.
|
|
|
|
Notes
|
|
-----
|
|
- This function encodes only the domain to `idna` format because in
|
|
most cases, the encoding issue is due to a domain which looks like
|
|
`b'\xc9\xa2oogle.com'.decode('idna')`.
|
|
- About the splitting:
|
|
We split because we only want to encode the domain and not the full
|
|
line, which may cause some issues. Keep in mind that we split, but we
|
|
still concatenate once we encoded the domain.
|
|
|
|
- The following split the prefix `0.0.0.0` or `127.0.0.1` of a line.
|
|
- The following also split the trailing comment of a given line.
|
|
"""
|
|
|
|
if not line.startswith("#"):
|
|
tabs = "\t"
|
|
space = " "
|
|
|
|
tabs_position, space_position = (line.find(tabs), line.find(space))
|
|
|
|
if tabs_position > -1 and space_position > -1:
|
|
if space_position < tabs_position:
|
|
separator = space
|
|
else:
|
|
separator = tabs
|
|
elif not tabs_position == -1:
|
|
separator = tabs
|
|
elif not space_position == -1:
|
|
separator = space
|
|
else:
|
|
separator = ""
|
|
|
|
if separator:
|
|
splited_line = line.split(separator)
|
|
|
|
try:
|
|
index = 1
|
|
while index < len(splited_line):
|
|
if splited_line[index]:
|
|
break
|
|
index += 1
|
|
|
|
if "#" in splited_line[index]:
|
|
index_comment = splited_line[index].find("#")
|
|
|
|
if index_comment > -1:
|
|
comment = splited_line[index][index_comment:]
|
|
|
|
splited_line[index] = (
|
|
splited_line[index]
|
|
.split(comment)[0]
|
|
.encode("IDNA")
|
|
.decode("UTF-8")
|
|
+ comment
|
|
)
|
|
|
|
splited_line[index] = splited_line[index].encode("IDNA").decode("UTF-8")
|
|
except IndexError:
|
|
pass
|
|
return separator.join(splited_line)
|
|
return line.encode("IDNA").decode("UTF-8")
|
|
return line.encode("UTF-8").decode("UTF-8")
|
|
|
|
|
|
# Helper Functions
|
|
def maybe_copy_example_file(file_path):
|
|
"""
|
|
Given a file path, copy over its ".example" if the path doesn't exist.
|
|
|
|
If the path does exist, nothing happens in this function.
|
|
|
|
If the path doesn't exist, and the ".example" file doesn't exist, nothing happens in this function.
|
|
|
|
Parameters
|
|
----------
|
|
file_path : str
|
|
The full file path to check.
|
|
"""
|
|
|
|
if not os.path.isfile(file_path):
|
|
example_file_path = file_path + ".example"
|
|
if os.path.isfile(example_file_path):
|
|
shutil.copyfile(example_file_path, file_path)
|
|
|
|
|
|
def get_file_by_url(url, params=None, **kwargs):
|
|
"""
|
|
Retrieve the contents of the hosts file at the URL, then pass it through domain_to_idna().
|
|
|
|
Parameters are passed to the requests.get() function.
|
|
|
|
Parameters
|
|
----------
|
|
url : str or bytes
|
|
URL for the new Request object.
|
|
params :
|
|
Dictionary, list of tuples or bytes to send in the query string for the Request.
|
|
kwargs :
|
|
Optional arguments that request takes.
|
|
|
|
Returns
|
|
-------
|
|
url_data : str or None
|
|
The data retrieved at that URL from the file. Returns None if the
|
|
attempted retrieval is unsuccessful.
|
|
"""
|
|
|
|
try:
|
|
req = requests.get(url=url, params=params, **kwargs)
|
|
except requests.exceptions.RequestException:
|
|
print("Error retrieving data from {}".format(url))
|
|
return None
|
|
|
|
req.encoding = req.apparent_encoding
|
|
res_text = "\n".join([domain_to_idna(line) for line in req.text.split("\n")])
|
|
return res_text
|
|
|
|
|
|
def write_data(f, data):
|
|
"""
|
|
Write data to a file object.
|
|
|
|
Parameters
|
|
----------
|
|
f : file
|
|
The file object at which to write the data.
|
|
data : str
|
|
The data to write to the file.
|
|
"""
|
|
|
|
f.write(bytes(data, "UTF-8"))
|
|
|
|
|
|
def list_dir_no_hidden(path):
|
|
"""
|
|
List all files in a directory, except for hidden files.
|
|
|
|
Parameters
|
|
----------
|
|
path : str
|
|
The path of the directory whose files we wish to list.
|
|
"""
|
|
|
|
return glob(os.path.join(path, "*"))
|
|
|
|
|
|
def query_yes_no(question, default="yes"):
|
|
"""
|
|
Ask a yes/no question via input() and get answer from the user.
|
|
|
|
Inspired by the following implementation:
|
|
|
|
https://code.activestate.com/recipes/577058/
|
|
|
|
Parameters
|
|
----------
|
|
question : str
|
|
The question presented to the user.
|
|
default : str, default "yes"
|
|
The presumed answer if the user just hits <Enter>. It must be "yes",
|
|
"no", or None (means an answer is required of the user).
|
|
|
|
Returns
|
|
-------
|
|
yes : Whether or not the user replied yes to the question.
|
|
"""
|
|
|
|
valid = {"yes": "yes", "y": "yes", "ye": "yes", "no": "no", "n": "no"}
|
|
prompt = {None: " [y/n] ", "yes": " [Y/n] ", "no": " [y/N] "}.get(default, None)
|
|
|
|
if not prompt:
|
|
raise ValueError("invalid default answer: '%s'" % default)
|
|
|
|
reply = None
|
|
|
|
while not reply:
|
|
sys.stdout.write(colorize(question, Colors.PROMPT) + prompt)
|
|
|
|
choice = input().lower()
|
|
reply = None
|
|
|
|
if default and not choice:
|
|
reply = default
|
|
elif choice in valid:
|
|
reply = valid[choice]
|
|
else:
|
|
print_failure("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")
|
|
|
|
return reply == "yes"
|
|
|
|
|
|
def is_valid_user_provided_domain_format(domain):
|
|
"""
|
|
Check whether a provided domain is valid.
|
|
|
|
Parameters
|
|
----------
|
|
domain : str
|
|
The domain against which to check.
|
|
|
|
Returns
|
|
-------
|
|
valid_domain : bool
|
|
Whether or not the domain provided is valid.
|
|
"""
|
|
|
|
if domain == "":
|
|
print("You didn't enter a domain. Try again.")
|
|
return False
|
|
|
|
domain_regex = re.compile(r"www\d{0,3}[.]|https?")
|
|
|
|
if domain_regex.match(domain):
|
|
print(
|
|
"The domain " + domain + " is not valid. Do not include "
|
|
"www.domain.com or http(s)://domain.com. Try again."
|
|
)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def recursive_glob(stem, file_pattern):
|
|
"""
|
|
Recursively match files in a directory according to a pattern.
|
|
|
|
Parameters
|
|
----------
|
|
stem : str
|
|
The directory in which to recurse
|
|
file_pattern : str
|
|
The filename regex pattern to which to match.
|
|
|
|
Returns
|
|
-------
|
|
matches_list : list
|
|
A list of filenames in the directory that match the file pattern.
|
|
"""
|
|
|
|
if sys.version_info >= (3, 5):
|
|
return glob(stem + "/**/" + file_pattern, recursive=True)
|
|
else:
|
|
# gh-316: this will avoid invalid unicode comparisons in Python 2.x
|
|
if stem == str("*"):
|
|
stem = "."
|
|
matches = []
|
|
for root, dirnames, filenames in os.walk(stem):
|
|
for filename in fnmatch.filter(filenames, file_pattern):
|
|
matches.append(path_join_robust(root, filename))
|
|
return matches
|
|
|
|
|
|
def path_join_robust(path, *paths):
|
|
"""
|
|
Wrapper around `os.path.join` with handling for locale issues.
|
|
|
|
Parameters
|
|
----------
|
|
path : str
|
|
The first path to join.
|
|
paths : varargs
|
|
Subsequent path strings to join.
|
|
|
|
Returns
|
|
-------
|
|
joined_path : str
|
|
The joined path string of the two path inputs.
|
|
|
|
Raises
|
|
------
|
|
locale.Error : A locale issue was detected that prevents path joining.
|
|
"""
|
|
|
|
try:
|
|
# gh-316: joining unicode and str can be saddening in Python 2.x
|
|
path = str(path)
|
|
paths = [str(another_path) for another_path in paths]
|
|
|
|
return os.path.join(path, *paths)
|
|
except UnicodeDecodeError as e:
|
|
raise locale.Error(
|
|
"Unable to construct path. This is likely a LOCALE issue:\n\n" + str(e)
|
|
)
|
|
|
|
|
|
# Colors
|
|
class Colors(object):
|
|
PROMPT = "\033[94m"
|
|
SUCCESS = "\033[92m"
|
|
FAIL = "\033[91m"
|
|
ENDC = "\033[0m"
|
|
|
|
|
|
def supports_color():
|
|
"""
|
|
Check whether the running terminal or command prompt supports color.
|
|
|
|
Inspired by StackOverflow link (and Django implementation) here:
|
|
|
|
https://stackoverflow.com/questions/7445658
|
|
|
|
Returns
|
|
-------
|
|
colors_supported : bool
|
|
Whether the running terminal or command prompt supports color.
|
|
"""
|
|
|
|
sys_platform = sys.platform
|
|
supported = sys_platform != "Pocket PC" and (
|
|
sys_platform != "win32" or "ANSICON" in os.environ
|
|
)
|
|
|
|
atty_connected = hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
|
|
return supported and atty_connected
|
|
|
|
|
|
def colorize(text, color):
|
|
"""
|
|
Wrap a string so that it displays in a particular color.
|
|
|
|
This function adds a prefix and suffix to a text string so that it is
|
|
displayed as a particular color, either in command prompt or the terminal.
|
|
|
|
If the running terminal or command prompt does not support color, the
|
|
original text is returned without being wrapped.
|
|
|
|
Parameters
|
|
----------
|
|
text : str
|
|
The message to display.
|
|
color : str
|
|
The color string prefix to put before the text.
|
|
|
|
Returns
|
|
-------
|
|
wrapped_str : str
|
|
The wrapped string to display in color, if possible.
|
|
"""
|
|
|
|
if not supports_color():
|
|
return text
|
|
|
|
return color + text + Colors.ENDC
|
|
|
|
|
|
def print_success(text):
|
|
"""
|
|
Print a success message.
|
|
|
|
Parameters
|
|
----------
|
|
text : str
|
|
The message to display.
|
|
"""
|
|
|
|
print(colorize(text, Colors.SUCCESS))
|
|
|
|
|
|
def print_failure(text):
|
|
"""
|
|
Print a failure message.
|
|
|
|
Parameters
|
|
----------
|
|
text : str
|
|
The message to display.
|
|
"""
|
|
|
|
print(colorize(text, Colors.FAIL))
|
|
|
|
|
|
# End Helper Functions
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|