Refactoring in updateHostsFile.py

This commit is contained in:
Steven Black 2016-12-18 00:34:12 -05:00
parent c596ef948f
commit d3b8cc7339

View File

@ -21,7 +21,7 @@ import subprocess
import sys import sys
import tempfile import tempfile
import time import time
import glob from glob import glob
import argparse import argparse
import socket import socket
import json import json
@ -33,542 +33,533 @@ import zlib
# Python 3 works differently with urlopen # Python 3 works differently with urlopen
try: # Python 3 try: # Python 3
from urllib.parse import urlparse, urlencode from urllib.parse import urlparse, urlencode
from urllib.request import urlopen, Request from urllib.request import urlopen, Request
from urllib.error import HTTPError from urllib.error import HTTPError
except ImportError: # Python 2 except ImportError: # Python 2
from urlparse import urlparse from urlparse import urlparse
from urllib import urlencode from urllib import urlencode
from urllib2 import urlopen, Request, HTTPError from urllib2 import urlopen, Request, HTTPError
try: # Python 2 try: # Python 2
raw_input raw_input
except NameError: # Python 3 except NameError: # Python 3
raw_input = input raw_input = input
# Detecting Python 3 for version-dependent implementations # Detecting Python 3 for version-dependent implementations
Python3 = sys.version_info >= (3,0) Python3 = sys.version_info >= (3,0)
# This function handles both Python 2 and Python 3 # This function handles both Python 2 and Python 3
def getFileByUrl(url): def getFileByUrl(url):
try: try:
f = urlopen(url) f = urlopen(url)
return f.read().decode("UTF-8") return f.read().decode("UTF-8")
except: except:
print ("Problem getting file: ", url) print ("Problem getting file: ", url)
# raise # raise
# In Python 3 "print" is a function, braces are added everywhere # In Python 3 "print" is a function, braces are added everywhere
# Cross-python writing function # Cross-python writing function
def writeData(f, data): def writeData(f, data):
if Python3: if Python3:
f.write(bytes(data, "UTF-8")) f.write(bytes(data, "UTF-8"))
else: else:
f.write(str(data).encode("UTF-8")) f.write(str(data).encode("UTF-8"))
# This function doesn't list hidden files # This function doesn't list hidden files
def listdir_nohidden(path): def listdir_nohidden(path):
return glob.glob(os.path.join(path, "*")) return glob(os.path.join(path, "*"))
# Project Settings # Project Settings
BASEDIR_PATH = os.path.dirname(os.path.realpath(__file__)) BASEDIR_PATH = os.path.dirname(os.path.realpath(__file__))
defaults = { defaults = {
"numberofrules" : 0, "numberofrules" : 0,
"datapath" : os.path.join(BASEDIR_PATH, "data"), "datapath" : os.path.join(BASEDIR_PATH, "data"),
"freshen" : True, "freshen" : True,
"replace" : False, "replace" : False,
"backup" : False, "backup" : False,
"skipstatichosts": False, "skipstatichosts": False,
"extensionspath" : os.path.join(BASEDIR_PATH, "extensions"), "extensionspath" : os.path.join(BASEDIR_PATH, "extensions"),
"extensions" : [], "extensions" : [],
"outputsubfolder" : "", "outputsubfolder" : "",
"datafilenames" : "hosts", "datafilenames" : "hosts",
"targetip" : "0.0.0.0", "targetip" : "0.0.0.0",
"ziphosts" : False, "ziphosts" : False,
"sourcedatafilename" : "update.json", "sourcedatafilename" : "update.json",
"sourcesdata": [], "sourcesdata": [],
"readmefilename" : "readme.md", "readmefilename" : "readme.md",
"readmetemplate" : os.path.join(BASEDIR_PATH, "readme_template.md"), "readmetemplate" : os.path.join(BASEDIR_PATH, "readme_template.md"),
"readmedata" : {}, "readmedata" : {},
"readmedatafilename" : os.path.join(BASEDIR_PATH, "readmeData.json"), "readmedatafilename" : os.path.join(BASEDIR_PATH, "readmeData.json"),
"exclusionpattern" : "([a-zA-Z\d-]+\.){0,}", "exclusionpattern" : "([a-zA-Z\d-]+\.){0,}",
"exclusionregexs" : [], "exclusionregexs" : [],
"exclusions" : [], "exclusions" : [],
"commonexclusions" : ["hulu.com"], "commonexclusions" : ["hulu.com"],
"blacklistfile" : os.path.join(BASEDIR_PATH, "blacklist"), "blacklistfile" : os.path.join(BASEDIR_PATH, "blacklist"),
"whitelistfile" : os.path.join(BASEDIR_PATH, "whitelist")} "whitelistfile" : os.path.join(BASEDIR_PATH, "whitelist")}
def main(): def main():
parser = argparse.ArgumentParser(description="Creates a unified hosts file from hosts stored in data subfolders.") parser = argparse.ArgumentParser(description="Creates a unified hosts file from hosts stored in data subfolders.")
parser.add_argument("--auto", "-a", dest="auto", default=False, action="store_true", help="Run without prompting.") parser.add_argument("--auto", "-a", dest="auto", default=False, action="store_true", help="Run without prompting.")
parser.add_argument("--backup", "-b", dest="backup", default=False, action="store_true", help="Backup the hosts files before they are overridden.") parser.add_argument("--backup", "-b", dest="backup", default=False, action="store_true", help="Backup the hosts files before they are overridden.")
parser.add_argument("--extensions", "-e", dest="extensions", default=[], nargs="*", help="Host extensions to include in the final hosts file.") parser.add_argument("--extensions", "-e", dest="extensions", default=[], nargs="*", help="Host extensions to include in the final hosts file.")
parser.add_argument("--ip", "-i", dest="targetip", default="0.0.0.0", help="Target IP address. Default is 0.0.0.0.") parser.add_argument("--ip", "-i", dest="targetip", default="0.0.0.0", help="Target IP address. Default is 0.0.0.0.")
parser.add_argument("--zip", "-z", dest="ziphosts", default=False, action="store_true", help="Additionally create a zip archive of the hosts file.") parser.add_argument("--zip", "-z", dest="ziphosts", default=False, action="store_true", help="Additionally create a zip archive of the hosts file.")
parser.add_argument("--noupdate", "-n", dest="noupdate", default=False, action="store_true", help="Don't update from host data sources.") parser.add_argument("--noupdate", "-n", dest="noupdate", default=False, action="store_true", help="Don't update from host data sources.")
parser.add_argument("--skipstatichosts", "-s", dest="skipstatichosts", default=False, action="store_true", help="Skip static localhost entries in the final hosts file.") parser.add_argument("--skipstatichosts", "-s", dest="skipstatichosts", default=False, action="store_true", help="Skip static localhost entries in the final hosts file.")
parser.add_argument("--output", "-o", dest="outputsubfolder", default="", help="Output subfolder for generated hosts file.") parser.add_argument("--output", "-o", dest="outputsubfolder", default="", help="Output subfolder for generated hosts file.")
parser.add_argument("--replace", "-r", dest="replace", default=False, action="store_true", help="Replace your active hosts file with this new hosts file.") parser.add_argument("--replace", "-r", dest="replace", default=False, action="store_true", help="Replace your active hosts file with this new hosts file.")
parser.add_argument("--flush-dns-cache", "-f", dest="flushdnscache", default=False, action="store_true", help="Attempt to flush DNS cache after replacing the hosts file.") parser.add_argument("--flush-dns-cache", "-f", dest="flushdnscache", default=False, action="store_true", help="Attempt to flush DNS cache after replacing the hosts file.")
global settings global settings
options = vars(parser.parse_args()) options = vars(parser.parse_args())
options["outputpath"] = os.path.join(BASEDIR_PATH, options["outputsubfolder"]) options["outputpath"] = os.path.join(BASEDIR_PATH, options["outputsubfolder"])
options["freshen"] = not options["noupdate"] options["freshen"] = not options["noupdate"]
settings = {} settings = {}
settings.update(defaults) settings.update(defaults)
settings.update(options) settings.update(options)
settings["sources"] = listdir_nohidden(settings["datapath"]) settings["sources"] = listdir_nohidden(settings["datapath"])
settings["extensionsources"] = listdir_nohidden(settings["extensionspath"]) settings["extensionsources"] = listdir_nohidden(settings["extensionspath"])
# All our extensions folders...
settings["extensions"] = [os.path.basename(item) for item in listdir_nohidden(settings["extensionspath"])]
# ... intersected with the extensions passed-in as arguments, then sorted.
settings["extensions"] = sorted( list(set(options["extensions"]).intersection(settings["extensions"])) )
# All our extensions folders... with open(settings["readmedatafilename"], "r") as f:
settings["extensions"] = [os.path.basename(item) for item in listdir_nohidden(settings["extensionspath"])] settings["readmedata"] = json.load(f)
# ... intersected with the extensions passed-in as arguments, then sorted.
settings["extensions"] = sorted( list(set(options["extensions"]).intersection(settings["extensions"])) )
with open(settings["readmedatafilename"], "r") as f: promptForUpdate()
settings["readmedata"] = json.load(f) promptForExclusions()
mergeFile = createInitialFile()
removeOldHostsFile()
finalFile = removeDupsAndExcl(mergeFile)
finalizeFile(finalFile)
promptForUpdate() if settings["ziphosts"]:
promptForExclusions() zf = zipfile.ZipFile(os.path.join(settings["outputsubfolder"], "hosts.zip"), mode='w')
mergeFile = createInitialFile() zf.write(os.path.join(settings["outputsubfolder"], "hosts"), compress_type=zipfile.ZIP_DEFLATED, arcname='hosts')
removeOldHostsFile() zf.close()
finalFile = removeDupsAndExcl(mergeFile)
finalizeFile(finalFile)
if settings["ziphosts"]: updateReadmeData()
zf = zipfile.ZipFile(os.path.join(settings["outputsubfolder"], "hosts.zip"), mode='w') printSuccess("Success! The hosts file has been saved in folder " + settings["outputsubfolder"] + "\nIt contains " +
zf.write(os.path.join(settings["outputsubfolder"], "hosts"), compress_type=zipfile.ZIP_DEFLATED, arcname='hosts') "{:,}".format(settings["numberofrules"]) + " unique entries.")
zf.close()
updateReadmeData() promptForMove(finalFile)
printSuccess("Success! The hosts file has been saved in folder " + settings["outputsubfolder"] + "\nIt contains " +
"{:,}".format(settings["numberofrules"]) + " unique entries.")
promptForMove(finalFile)
# Prompt the User # Prompt the User
def promptForUpdate(): def promptForUpdate():
# Create hosts file if it doesn't exists # Create hosts file if it doesn't exists
if not os.path.isfile(os.path.join(BASEDIR_PATH, "hosts")): if not os.path.isfile(os.path.join(BASEDIR_PATH, "hosts")):
try: try:
open(os.path.join(BASEDIR_PATH, "hosts"), "w+").close() open(os.path.join(BASEDIR_PATH, "hosts"), "w+").close()
except: except:
printFailure("ERROR: No 'hosts' file in the folder, try creating one manually") printFailure("ERROR: No 'hosts' file in the folder, try creating one manually")
if not settings["freshen"]: if not settings["freshen"]:
return return
response = "yes" if settings["auto"] else query_yes_no("Do you want to update all data sources?") response = "yes" if settings["auto"] else query_yes_no("Do you want to update all data sources?")
if response == "yes": if response == "yes":
updateAllSources() updateAllSources()
else: else:
if not settings["auto"]: if not settings["auto"]:
print ("OK, we'll stick with what we've got locally.") print ("OK, we'll stick with what we've got locally.")
def promptForExclusions(): def promptForExclusions():
response = "no" if settings["auto"] else query_yes_no("Do you want to exclude any domains?\n" + response = "no" if settings["auto"] else query_yes_no("Do you want to exclude any domains?\n" +
"For example, hulu.com video streaming must be able to access " + "For example, hulu.com video streaming must be able to access " +
"its tracking and ad servers in order to play video.") "its tracking and ad servers in order to play video.")
if response == "yes": if response == "yes":
displayExclusionOptions() displayExclusionOptions()
else: else:
if not settings["auto"]: if not settings["auto"]:
print ("OK, we'll only exclude domains in the whitelist.") print ("OK, we'll only exclude domains in the whitelist.")
def promptForMoreCustomExclusions(question="Do you have more domains you want to enter?"): def promptForMoreCustomExclusions(question="Do you have more domains you want to enter?"):
return query_yes_no(question) == "yes" return query_yes_no(question) == "yes"
def promptForFlushDnsCache(): def promptForFlushDnsCache():
if settings['auto']: if settings['auto']:
if settings['flushdnscache']: if settings['flushdnscache']:
flushDnsCache() flushDnsCache()
else: else:
if settings['flushdnscache'] or query_yes_no("Attempt to flush the DNS cache?"): if settings['flushdnscache'] or query_yes_no("Attempt to flush the DNS cache?"):
flushDnsCache() flushDnsCache()
def promptForMove(finalFile): def promptForMove(finalFile):
if settings["replace"] and not settings["skipstatichosts"]: if settings["replace"] and not settings["skipstatichosts"]:
response = "yes" response = "yes"
else: else:
response = "no" if settings["auto"] or settings["skipstatichosts"] else query_yes_no("Do you want to replace your existing hosts file " + response = "no" if settings["auto"] or settings["skipstatichosts"] else query_yes_no("Do you want to replace your existing hosts file " +
"with the newly generated file?") "with the newly generated file?")
if response == "yes": if response == "yes":
moveHostsFileIntoPlace(finalFile) moveHostsFileIntoPlace(finalFile)
promptForFlushDnsCache() promptForFlushDnsCache()
else: else:
return False return False
# End Prompt the User # End Prompt the User
# Exclusion logic # Exclusion logic
def displayExclusionOptions(): def displayExclusionOptions():
for exclusionOption in settings["commonexclusions"]: for exclusionOption in settings["commonexclusions"]:
response = query_yes_no("Do you want to exclude the domain " + exclusionOption + " ?") response = query_yes_no("Do you want to exclude the domain " + exclusionOption + " ?")
if response == "yes": if response == "yes":
excludeDomain(exclusionOption) excludeDomain(exclusionOption)
else: else:
continue continue
response = query_yes_no("Do you want to exclude any other domains?") response = query_yes_no("Do you want to exclude any other domains?")
if response == "yes": if response == "yes":
gatherCustomExclusions() gatherCustomExclusions()
def gatherCustomExclusions(): def gatherCustomExclusions():
while True: while True:
# Cross-python Input # Cross-python Input
domainFromUser = raw_input("Enter the domain you want to exclude (e.g. facebook.com): ") domainFromUser = raw_input("Enter the domain you want to exclude (e.g. facebook.com): ")
if isValidDomainFormat(domainFromUser): if isValidDomainFormat(domainFromUser):
excludeDomain(domainFromUser) excludeDomain(domainFromUser)
if not promptForMoreCustomExclusions(): if not promptForMoreCustomExclusions():
return return
def excludeDomain(domain): def excludeDomain(domain):
settings["exclusionregexs"].append(re.compile(settings["exclusionpattern"] + domain)) settings["exclusionregexs"].append(re.compile(settings["exclusionpattern"] + domain))
def matchesExclusions(strippedRule): def matchesExclusions(strippedRule):
strippedDomain = strippedRule.split()[1] strippedDomain = strippedRule.split()[1]
for exclusionRegex in settings["exclusionregexs"]: for exclusionRegex in settings["exclusionregexs"]:
if exclusionRegex.search(strippedDomain): if exclusionRegex.search(strippedDomain):
return True return True
return False return False
# End Exclusion Logic # End Exclusion Logic
# Update Logic # Update Logic
def updateAllSources(): def updateAllSources():
allsources = list(set(settings["sources"]) | set(settings["extensionsources"])) # Update all hosts files regardless of folder depth
for source in allsources: allsources = glob('*/**/' + settings["sourcedatafilename"])
if os.path.isdir(source): for source in allsources:
for updateURL in getUpdateURLsFromFile(source): updateFile = open(source, "r")
print ("Updating source " + os.path.basename(source) + " from " + updateURL) updateData = json.load(updateFile)
# Cross-python call updateURL = updateData["url"]
updatedFile = getFileByUrl(updateURL) updateFile.close()
try:
updatedFile = updatedFile.replace("\r", "") #get rid of carriage-return symbols
# This is cross-python code
dataFile = open(os.path.join(settings["datapath"], source, settings["datafilenames"]), "wb")
writeData(dataFile, updatedFile)
dataFile.close()
except:
print ("Skipping.")
def getUpdateURLsFromFile(source): print ("Updating source " + os.path.dirname(source) + " from " + updateURL)
pathToUpdateFile = os.path.join(settings["datapath"], source, settings["sourcedatafilename"]) # Cross-python call
if os.path.exists(pathToUpdateFile): updatedFile = getFileByUrl(updateURL)
updateFile = open(pathToUpdateFile, "r") try:
updateData = json.load(updateFile) updatedFile = updatedFile.replace("\r", "") #get rid of carriage-return symbols
retURLs = [updateData["url"]]
updateFile.close() # This is cross-python code
else: dataFile = open(os.path.join(BASEDIR_PATH, os.path.dirname(source), settings["datafilenames"]), "wb")
retURLs = None writeData(dataFile, updatedFile)
printFailure("Warning: Can't find the update file for source " + source + "\n" + dataFile.close()
"Make sure that there's a file at " + pathToUpdateFile) except:
return retURLs print ("Skipping.")
# End Update Logic # End Update Logic
# File Logic # File Logic
def createInitialFile(): def createInitialFile():
mergeFile = tempfile.NamedTemporaryFile() mergeFile = tempfile.NamedTemporaryFile()
# spin the sources for the base file # spin the sources for the base file
for source in settings["sources"]: for source in settings["sources"]:
filename = os.path.join(settings["datapath"], source, settings["datafilenames"]) filename = os.path.join(settings["datapath"], source, settings["datafilenames"])
with open(filename, "r") as curFile: with open(filename, "r") as curFile:
#Done in a cross-python way #Done in a cross-python way
writeData(mergeFile, curFile.read()) writeData(mergeFile, curFile.read())
pathToUpdateFile = os.path.join(settings["datapath"], source, settings["sourcedatafilename"]) pathToUpdateFile = os.path.join(settings["datapath"], source, settings["sourcedatafilename"])
if os.path.exists(pathToUpdateFile): if os.path.exists(pathToUpdateFile):
updateFile = open(pathToUpdateFile, "r") updateFile = open(pathToUpdateFile, "r")
updateData = json.load(updateFile) updateData = json.load(updateFile)
settings["sourcesdata"].append(updateData) settings["sourcesdata"].append(updateData)
updateFile.close() updateFile.close()
# spin the sources for extensions to the base file # spin the sources for extensions to the base file
for source in settings["extensions"]: for source in settings["extensions"]:
filename = os.path.join(settings["extensionspath"], source, settings["datafilenames"]) filename = os.path.join(settings["extensionspath"], source, settings["datafilenames"])
with open(filename, "r") as curFile: with open(filename, "r") as curFile:
#Done in a cross-python way #Done in a cross-python way
writeData(mergeFile, curFile.read()) writeData(mergeFile, curFile.read())
pathToUpdateFile = os.path.join(settings["extensionspath"], source, settings["sourcedatafilename"]) pathToUpdateFile = os.path.join(settings["extensionspath"], source, settings["sourcedatafilename"])
if os.path.exists(pathToUpdateFile): if os.path.exists(pathToUpdateFile):
updateFile = open(pathToUpdateFile, "r") updateFile = open(pathToUpdateFile, "r")
updateData = json.load(updateFile) updateData = json.load(updateFile)
settings["sourcesdata"].append(updateData) settings["sourcesdata"].append(updateData)
updateFile.close() updateFile.close()
if os.path.isfile(settings["blacklistfile"]): if os.path.isfile(settings["blacklistfile"]):
with open(settings["blacklistfile"], "r") as curFile: with open(settings["blacklistfile"], "r") as curFile:
#Done in a cross-python way #Done in a cross-python way
writeData(mergeFile, curFile.read()) writeData(mergeFile, curFile.read())
return mergeFile return mergeFile
def removeDupsAndExcl(mergeFile): def removeDupsAndExcl(mergeFile):
numberOfRules = settings["numberofrules"] numberOfRules = settings["numberofrules"]
if os.path.isfile(settings["whitelistfile"]): if os.path.isfile(settings["whitelistfile"]):
with open(settings["whitelistfile"], "r") as ins: with open(settings["whitelistfile"], "r") as ins:
for line in ins: for line in ins:
line = line.strip(" \t\n\r") line = line.strip(" \t\n\r")
if line and not line.startswith("#"): if line and not line.startswith("#"):
settings["exclusions"].append(line) settings["exclusions"].append(line)
if not os.path.exists(settings["outputpath"]): if not os.path.exists(settings["outputpath"]):
os.makedirs(settings["outputpath"]) os.makedirs(settings["outputpath"])
# Another mode is required to read and write the file in Python 3 # Another mode is required to read and write the file in Python 3
finalFile = open(os.path.join(settings["outputpath"], "hosts"), finalFile = open(os.path.join(settings["outputpath"], "hosts"),
"w+b" if Python3 else "w+") "w+b" if Python3 else "w+")
mergeFile.seek(0) # reset file pointer mergeFile.seek(0) # reset file pointer
hostnames = set(["localhost", "localhost.localdomain", "local", "broadcasthost"]) hostnames = set(["localhost", "localhost.localdomain", "local", "broadcasthost"])
exclusions = settings["exclusions"] exclusions = settings["exclusions"]
for line in mergeFile.readlines(): for line in mergeFile.readlines():
write = "true" write = "true"
# Explicit encoding # Explicit encoding
line = line.decode("UTF-8") line = line.decode("UTF-8")
# replace tabs with space # replace tabs with space
line = line.replace("\t+", " ") line = line.replace("\t+", " ")
# Trim trailing whitespace # Trim trailing whitespace
line = line.rstrip() + "\n" line = line.rstrip() + "\n"
# Testing the first character doesn't require startswith # Testing the first character doesn't require startswith
if line[0] == "#" or re.match(r'^\s*$', line[0]): if line[0] == "#" or re.match(r'^\s*$', line[0]):
# Cross-python write # Cross-python write
writeData(finalFile, line) writeData(finalFile, line)
continue continue
if "::1" in line: if "::1" in line:
continue continue
strippedRule = stripRule(line) #strip comments strippedRule = stripRule(line) #strip comments
if not strippedRule or matchesExclusions(strippedRule): if not strippedRule or matchesExclusions(strippedRule):
continue continue
hostname, normalizedRule = normalizeRule(strippedRule) # normalize rule hostname, normalizedRule = normalizeRule(strippedRule) # normalize rule
for exclude in exclusions: for exclude in exclusions:
if exclude in line: if exclude in line:
write = "false" write = "false"
break break
if normalizedRule and (hostname not in hostnames) and (write == "true"): if normalizedRule and (hostname not in hostnames) and (write == "true"):
writeData(finalFile, normalizedRule) writeData(finalFile, normalizedRule)
hostnames.add(hostname) hostnames.add(hostname)
numberOfRules += 1 numberOfRules += 1
settings["numberofrules"] = numberOfRules settings["numberofrules"] = numberOfRules
mergeFile.close() mergeFile.close()
return finalFile return finalFile
def normalizeRule(rule): def normalizeRule(rule):
result = re.search(r'^[ \t]*(\d+\.\d+\.\d+\.\d+)\s+([\w\.-]+)(.*)', rule) result = re.search(r'^[ \t]*(\d+\.\d+\.\d+\.\d+)\s+([\w\.-]+)(.*)', rule)
if result: if result:
hostname, suffix = result.group(2,3) hostname, suffix = result.group(2,3)
hostname = hostname.lower().strip() # explicitly lowercase and trim the hostname hostname = hostname.lower().strip() # explicitly lowercase and trim the hostname
if suffix: if suffix:
# add suffix as comment only, not as a separate host # add suffix as comment only, not as a separate host
return hostname, "%s %s #%s\n" % (settings["targetip"], hostname, suffix) return hostname, "%s %s #%s\n" % (settings["targetip"], hostname, suffix)
else: else:
return hostname, "%s %s\n" % (settings["targetip"], hostname) return hostname, "%s %s\n" % (settings["targetip"], hostname)
print ("==>%s<==" % rule) print ("==>%s<==" % rule)
return None, None return None, None
def finalizeFile(finalFile): def finalizeFile(finalFile):
writeOpeningHeader(finalFile) writeOpeningHeader(finalFile)
finalFile.close() finalFile.close()
# Some sources put comments around their rules, for accuracy we need to strip them # Some sources put comments around their rules, for accuracy we need to strip them
# the comments are preserved in the output hosts file # the comments are preserved in the output hosts file
def stripRule(line): def stripRule(line):
splitLine = line.split() splitLine = line.split()
if len(splitLine) < 2 : if len(splitLine) < 2 :
# just return blank # just return blank
return "" return ""
else: else:
return splitLine[0] + " " + splitLine[1] return splitLine[0] + " " + splitLine[1]
def writeOpeningHeader(finalFile): def writeOpeningHeader(finalFile):
finalFile.seek(0) #reset file pointer finalFile.seek(0) #reset file pointer
fileContents = finalFile.read() #save content fileContents = finalFile.read() #save content
finalFile.seek(0) #write at the top finalFile.seek(0) #write at the top
writeData(finalFile, "# This hosts file is a merged collection of hosts from reputable sources,\n") writeData(finalFile, "# This hosts file is a merged collection of hosts from reputable sources,\n")
writeData(finalFile, "# with a dash of crowd sourcing via Github\n#\n") writeData(finalFile, "# with a dash of crowd sourcing via Github\n#\n")
writeData(finalFile, "# Date: " + time.strftime("%B %d %Y", time.gmtime()) + "\n") writeData(finalFile, "# Date: " + time.strftime("%B %d %Y", time.gmtime()) + "\n")
if settings["extensions"]: if settings["extensions"]:
writeData(finalFile, "# Extensions added to this file: " + ", ".join(settings["extensions"]) + "\n") writeData(finalFile, "# Extensions added to this file: " + ", ".join(settings["extensions"]) + "\n")
writeData(finalFile, "# Number of unique domains: " + "{:,}\n#\n".format(settings["numberofrules"])) writeData(finalFile, "# Number of unique domains: " + "{:,}\n#\n".format(settings["numberofrules"]))
writeData(finalFile, "# Fetch the latest version of this file: https://raw.githubusercontent.com/StevenBlack/hosts/master/"+ os.path.join(settings["outputsubfolder"],"") + "hosts\n") writeData(finalFile, "# Fetch the latest version of this file: https://raw.githubusercontent.com/StevenBlack/hosts/master/"+ os.path.join(settings["outputsubfolder"],"") + "hosts\n")
writeData(finalFile, "# Project home page: https://github.com/StevenBlack/hosts\n#\n") writeData(finalFile, "# Project home page: https://github.com/StevenBlack/hosts\n#\n")
writeData(finalFile, "# ===============================================================\n") writeData(finalFile, "# ===============================================================\n")
writeData(finalFile, "\n") writeData(finalFile, "\n")
if not settings["skipstatichosts"]: if not settings["skipstatichosts"]:
writeData(finalFile, "127.0.0.1 localhost\n") writeData(finalFile, "127.0.0.1 localhost\n")
writeData(finalFile, "127.0.0.1 localhost.localdomain\n") writeData(finalFile, "127.0.0.1 localhost.localdomain\n")
writeData(finalFile, "127.0.0.1 local\n") writeData(finalFile, "127.0.0.1 local\n")
writeData(finalFile, "255.255.255.255 broadcasthost\n") writeData(finalFile, "255.255.255.255 broadcasthost\n")
writeData(finalFile, "::1 localhost\n") writeData(finalFile, "::1 localhost\n")
writeData(finalFile, "fe80::1%lo0 localhost\n") writeData(finalFile, "fe80::1%lo0 localhost\n")
if platform.system() == "Linux": if platform.system() == "Linux":
writeData(finalFile, "127.0.1.1 " + socket.gethostname() + "\n") writeData(finalFile, "127.0.1.1 " + socket.gethostname() + "\n")
writeData(finalFile, "\n") writeData(finalFile, "\n")
preamble = os.path.join(BASEDIR_PATH, "myhosts") preamble = os.path.join(BASEDIR_PATH, "myhosts")
if os.path.isfile(preamble): if os.path.isfile(preamble):
with open(preamble, "r") as f: with open(preamble, "r") as f:
writeData(finalFile, f.read()) writeData(finalFile, f.read())
finalFile.write(fileContents) finalFile.write(fileContents)
def updateReadmeData(): def updateReadmeData():
extensionsKey = "base" extensionsKey = "base"
hostsLocation = "" hostsLocation = ""
if settings["extensions"]: if settings["extensions"]:
extensionsKey = "-".join(settings["extensions"]) extensionsKey = "-".join(settings["extensions"])
generationData = {"location": os.path.join(settings["outputsubfolder"], ""), generationData = {"location": os.path.join(settings["outputsubfolder"], ""),
"entries": settings["numberofrules"], "entries": settings["numberofrules"],
"sourcesdata": settings["sourcesdata"]} "sourcesdata": settings["sourcesdata"]}
settings["readmedata"][extensionsKey] = generationData settings["readmedata"][extensionsKey] = generationData
with open(settings["readmedatafilename"], "w") as f: with open(settings["readmedatafilename"], "w") as f:
json.dump(settings["readmedata"], f) json.dump(settings["readmedata"], f)
def moveHostsFileIntoPlace(finalFile): def moveHostsFileIntoPlace(finalFile):
if os.name == "posix": if os.name == "posix":
print ("Moving the file requires administrative privileges. " + print ("Moving the file requires administrative privileges. " +
"You might need to enter your password.") "You might need to enter your password.")
if subprocess.call(["/usr/bin/sudo", "cp", os.path.abspath(finalFile.name), "/etc/hosts"]): if subprocess.call(["/usr/bin/sudo", "cp", os.path.abspath(finalFile.name), "/etc/hosts"]):
printFailure("Moving the file failed.") printFailure("Moving the file failed.")
elif os.name == "nt": elif os.name == "nt":
print("Automatically moving the hosts file in place is not yet supported.") print("Automatically moving the hosts file in place is not yet supported.")
print("Please move the generated file to %SystemRoot%\system32\drivers\etc\hosts") print("Please move the generated file to %SystemRoot%\system32\drivers\etc\hosts")
def flushDnsCache(): def flushDnsCache():
print("Flushing the DNS cache to utilize new hosts file...") print("Flushing the DNS cache to utilize new hosts file...")
print("Flushing the DNS cache requires administrative privileges. " + print("Flushing the DNS cache requires administrative privileges. " +
"You might need to enter your password.") "You might need to enter your password.")
dnsCacheFound = False dnsCacheFound = False
if platform.system() == "Darwin": if platform.system() == "Darwin":
if subprocess.call(["/usr/bin/sudo", "killall", "-HUP", "mDNSResponder"]): if subprocess.call(["/usr/bin/sudo", "killall", "-HUP", "mDNSResponder"]):
printFailure("Flushing the DNS cache failed.") printFailure("Flushing the DNS cache failed.")
else: else:
if os.path.isfile("/etc/rc.d/init.d/nscd"): if os.path.isfile("/etc/rc.d/init.d/nscd"):
dnsCacheFound = True dnsCacheFound = True
if subprocess.call(["/usr/bin/sudo", "/etc/rc.d/init.d/nscd", "restart"]): if subprocess.call(["/usr/bin/sudo", "/etc/rc.d/init.d/nscd", "restart"]):
printFailure("Flushing the DNS cache failed.") printFailure("Flushing the DNS cache failed.")
else: else:
printSuccess("Flushing DNS by restarting nscd succeeded") printSuccess("Flushing DNS by restarting nscd succeeded")
if os.path.isfile("/usr/lib/systemd/system/NetworkManager.service"): if os.path.isfile("/usr/lib/systemd/system/NetworkManager.service"):
dnsCacheFound = True dnsCacheFound = True
if subprocess.call(["/usr/bin/sudo", "/usr/bin/systemctl", "restart", "NetworkManager.service"]): if subprocess.call(["/usr/bin/sudo", "/usr/bin/systemctl", "restart", "NetworkManager.service"]):
printFailure("Flushing the DNS cache failed.") printFailure("Flushing the DNS cache failed.")
else: else:
printSuccess("Flushing DNS by restarting NetworkManager succeeded") printSuccess("Flushing DNS by restarting NetworkManager succeeded")
if os.path.isfile("/usr/lib/systemd/system/wicd.service"): if os.path.isfile("/usr/lib/systemd/system/wicd.service"):
dnsCacheFound = True dnsCacheFound = True
if subprocess.call(["/usr/bin/sudo", "/usr/bin/systemctl", "restart", "wicd.service"]): if subprocess.call(["/usr/bin/sudo", "/usr/bin/systemctl", "restart", "wicd.service"]):
printFailure("Flushing the DNS cache failed.") printFailure("Flushing the DNS cache failed.")
else: else:
printSuccess("Flushing DNS by restarting wicd succeeded") printSuccess("Flushing DNS by restarting wicd succeeded")
if os.path.isfile("/usr/lib/systemd/system/dnsmasq.service"): if os.path.isfile("/usr/lib/systemd/system/dnsmasq.service"):
dnsCacheFound = True dnsCacheFound = True
if subprocess.call(["/usr/bin/sudo", "/usr/bin/systemctl", "restart", "dnsmasq.service"]): if subprocess.call(["/usr/bin/sudo", "/usr/bin/systemctl", "restart", "dnsmasq.service"]):
printFailure("Flushing the DNS cache failed.") printFailure("Flushing the DNS cache failed.")
else: else:
printSuccess("Flushing DNS by restarting dnsmasq succeeded") printSuccess("Flushing DNS by restarting dnsmasq succeeded")
if os.path.isfile("/usr/lib/systemd/system/networking.service"): if os.path.isfile("/usr/lib/systemd/system/networking.service"):
dnsCacheFound = True dnsCacheFound = True
if subprocess.call(["/usr/bin/sudo", "/usr/bin/systemctl", "restart", "networking.service"]): if subprocess.call(["/usr/bin/sudo", "/usr/bin/systemctl", "restart", "networking.service"]):
printFailure("Flushing the DNS cache failed.") printFailure("Flushing the DNS cache failed.")
else: else:
printSuccess("Flushing DNS by restarting networking.service succeeded") printSuccess("Flushing DNS by restarting networking.service succeeded")
if not dnsCacheFound: if not dnsCacheFound:
printFailure("Unable to determine DNS management tool.") printFailure("Unable to determine DNS management tool.")
def removeOldHostsFile(): # hotfix since merging with an already existing hosts file leads to artefacts and duplicates def removeOldHostsFile(): # hotfix since merging with an already existing hosts file leads to artefacts and duplicates
oldFilePath = os.path.join(BASEDIR_PATH, "hosts") oldFilePath = os.path.join(BASEDIR_PATH, "hosts")
open(oldFilePath, "a").close() # create if already removed, so remove wont raise an error open(oldFilePath, "a").close() # create if already removed, so remove wont raise an error
if settings["backup"]: if settings["backup"]:
backupFilePath = os.path.join(BASEDIR_PATH, "hosts-{}".format(time.strftime("%Y-%m-%d-%H-%M-%S"))) backupFilePath = os.path.join(BASEDIR_PATH, "hosts-{}".format(time.strftime("%Y-%m-%d-%H-%M-%S")))
shutil.copy(oldFilePath, backupFilePath) # make a backup copy, marking the date in which the list was updated shutil.copy(oldFilePath, backupFilePath) # make a backup copy, marking the date in which the list was updated
os.remove(oldFilePath) os.remove(oldFilePath)
open(oldFilePath, "a").close() # create new empty hostsfile open(oldFilePath, "a").close() # create new empty hostsfile
# End File Logic # End File Logic
# Helper Functions # Helper Functions
## {{{ http://code.activestate.com/recipes/577058/ (r2) ## {{{ http://code.activestate.com/recipes/577058/ (r2)
def query_yes_no(question, default = "yes"): def query_yes_no(question, default = "yes"):
"""Ask a yes/no question via raw_input() and return their answer. """Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user. "question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>. "default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning It must be "yes" (the default), "no" or None (meaning
an answer is required of the user). an answer is required of the user).
The "answer" return value is one of "yes" or "no". The "answer" return value is one of "yes" or "no".
""" """
valid = {"yes":"yes", "y":"yes", "ye":"yes", valid = {"yes":"yes", "y":"yes", "ye":"yes",
"no":"no", "n":"no"} "no":"no", "n":"no"}
prompt = {None: " [y/n] ", prompt = {None: " [y/n] ",
"yes": " [Y/n] ", "yes": " [Y/n] ",
"no": " [y/N] "}.get(default, None) "no": " [y/N] "}.get(default, None)
if not prompt: if not prompt:
raise ValueError("invalid default answer: '%s'" % default) raise ValueError("invalid default answer: '%s'" % default)
while 1: while 1:
sys.stdout.write(colorize(question, colors.PROMPT) + prompt) sys.stdout.write(colorize(question, colors.PROMPT) + prompt)
# Changed to be cross-python # Changed to be cross-python
choice = raw_input().lower() choice = raw_input().lower()
if default and not choice: if default and not choice:
return default return default
elif choice in valid: elif choice in valid:
return valid[choice] return valid[choice]
else: else:
printFailure( printFailure(
"Please respond with 'yes' or 'no' (or 'y' or 'n').\n") "Please respond with 'yes' or 'no' (or 'y' or 'n').\n")
## end of http://code.activestate.com/recipes/577058/ }}} ## end of http://code.activestate.com/recipes/577058/ }}}
def isValidDomainFormat(domain): def isValidDomainFormat(domain):
if domain == "": if domain == "":
print ("You didn't enter a domain. Try again.") print ("You didn't enter a domain. Try again.")
return False return False
domainRegex = re.compile("www\d{0,3}[.]|https?") domainRegex = re.compile("www\d{0,3}[.]|https?")
if domainRegex.match(domain): if domainRegex.match(domain):
print ("The domain " + domain + " is not valid. " + print ("The domain " + domain + " is not valid. " +
"Do not include www.domain.com or http(s)://domain.com. Try again.") "Do not include www.domain.com or http(s)://domain.com. Try again.")
return False return False
else: else:
return True return True
# Colors # Colors
class colors: class colors:
PROMPT = "\033[94m" PROMPT = "\033[94m"
SUCCESS = "\033[92m" SUCCESS = "\033[92m"
FAIL = "\033[91m" FAIL = "\033[91m"
ENDC = "\033[0m" ENDC = "\033[0m"
def colorize(text, color): def colorize(text, color):
return color + text + colors.ENDC return color + text + colors.ENDC
def printSuccess(text): def printSuccess(text):
print (colorize(text, colors.SUCCESS)) print (colorize(text, colors.SUCCESS))
def printFailure(text): def printFailure(text):
print (colorize(text, colors.FAIL)) print (colorize(text, colors.FAIL))
# End Helper Functions # End Helper Functions
if __name__ == "__main__": if __name__ == "__main__":
main() main()