#!/usr/bin/env python
########################################################################
#
# Project: Metalink Checker
# URL: http://www.nabber.org/projects/
# E-mail: webmaster@nabber.org
#
# Copyright: (C) 2007-2012, Neil McNab
# License: GNU General Public License Version 2
# (http://www.gnu.org/copyleft/gpl.html)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Filename: $URL$
# Last Updated: $Date$
# Version: $Rev$
# Author(s): Neil McNab
#
# Description:
# Command line application and Python library that checks metalink files.
# Requires Python 2.5 or newer.
#
# Library Instructions:
# - Use as expected.
#
# import checker
#
# results = checker.check_metalink("file.metalink")
#
########################################################################
import urlparse
import os.path
import sys
import socket
import httplib
import ftplib
import threading
import time
import binascii
import download
import proxy
import locale
import gettext
NAME="Metalink Checker"
VERSION="6.0"
#WEBSITE="http://www.metalinker.org"
WEBSITE="http://www.nabber.org/projects/metalink/checker/"
MAX_REDIRECTS = 20
MAX_THREADS = 10
def translate():
'''
Setup translation path
'''
if __name__=="__main__":
try:
base = os.path.basename(__file__)[:-3]
localedir = os.path.join(os.path.dirname(__file__), "locale")
except NameError:
base = os.path.basename(sys.executable)[:-4]
localedir = os.path.join(os.path.dirname(sys.executable), "locale")
else:
temp = __name__.split(".")
base = temp[-1]
localedir = os.path.join("/".join(["%s" % k for k in temp[:-1]]), "locale")
#print base, localedir
localelang = locale.getdefaultlocale()[0]
if localelang == None:
localelang = "LC_ALL"
t = gettext.translation(base, localedir, [localelang], None, 'en')
return t.ugettext
_ = translate()
ABOUT = NAME + "\n" + _("Version") + ": " + VERSION + "\n" + \
_("Website") + ": " + WEBSITE + "\n\n" + \
_("Copyright") + ": 2009-2011 Neil McNab\n" + \
_("License") + ": " + _("GNU General Public License, Version 2") + "\n\n" + \
NAME + _(" comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it under certain conditions, see LICENSE.txt for details.")
import HTMLParser
class Webpage(HTMLParser.HTMLParser):
def __init__(self, *args):
self.urls = []
self.url = ""
HTMLParser.HTMLParser.__init__(self, *args)
def set_url(self, url):
self.url = url
def handle_starttag(self, tag, attrs):
if tag == "a":
for item in attrs:
if item[0] == "href":
url = item[1]
if not download.is_remote(item):
#fix relative links
url = download.path_join(self.url, url)
if not url.startswith("mailto:"):
self.urls.append(url)
#print url
class Checker:
def __init__(self, only_testable=False):
self.threadlist = []
self.running = False
self.clear_results()
self.cancel = False
self.only_testable = only_testable
def check_metalink(self, src):
'''
Decode a metalink file, can be local or remote
First parameter, file to download, URL or file path to download from
Returns the results of the check in a dictonary
'''
self.running = True
src = download.complete_url(src)
try:
metalinkobj = download.parse_metalink(src, nocheck=False)
except:
print _("ERROR parsing XML.")
raise
if not metalinkobj:
return False
if metalinkobj.type == "dynamic":
origin = metalinkobj.origin
if origin != src and origin != "":
try:
result = self.check_metalink(origin)
self.running = True
return result
except:
print "Error downloading from origin %s, not using." % origin
urllist = metalinkobj.files
if len(urllist) == 0:
print _("No urls to download file from.")
self.running = False
return False
#results = {}
for filenode in urllist:
#size = filenode.size
#name = filenode.filename
#print "=" * 79
#print _("File") + ": %s " % name + _("Size") + ": %s" % size
myheaders = {}
if download.is_remote(src):
myheaders = {'referer': src}
self.check_file_node(filenode, myheaders)
self.running = False
#return results
def isAlive(self):
if self.running:
return True
for threadobj in self.threadlist:
if threadobj.isAlive():
return True
return False
def activeCount(self):
count = 0
for threadobj in self.threadlist:
if threadobj.isAlive():
count += 1
return count
def _add_result(self, key1, key2, value):
try:
self.results[key1]
except KeyError:
self.results[key1] = {}
try:
self.new_results[key1]
except KeyError:
self.new_results[key1] = {}
self.results[key1][key2] = value
self.new_results[key1][key2] = value
def get_results(self, block=True):
while block and self.isAlive():
time.sleep(0.1)
return self.results
def get_new_results(self):
temp = self.new_results
self.new_results = {}
return temp
def stop(self):
self.cancel = True
while self.isAlive():
time.sleep(0.1)
def clear_results(self):
self.stop()
self.threadlist = []
self.results = {}
self.new_results = {}
def _check_process(self, headers, filesize, checksums = {}):
size = "?"
checksum = "?"
sizeheader = self._get_header(headers, "Content-Length")
# digest code, untested since no servers seem to support this
digest = self._get_header(headers, "Content-MD5")
if digest != None:
try:
if binascii.hexlify(binascii.a2b_base64(digest)).lower() == checksums['md5'].lower():
checksum = _("OK")
else:
checksum = _("FAIL")
except KeyError: pass
digest = self._get_header(headers, "Digest")
if digest != None:
digests = digest.split(",")
for d in digests:
(name, value) = d.split("=", 2)
typestr = ""
if name.lower() == "sha":
typestr = "sha1"
elif name.lower() == "md5":
typestr = "md5"
if typestr != "" and checksum == "?":
try:
if binascii.hexlify(binascii.a2b_base64(value)).lower() == checksums[typestr].lower():
checksum = _("OK")
else:
checksum = _("FAIL")
except KeyError: pass
if sizeheader != None and filesize != None:
if int(sizeheader) == int(filesize):
size = _("OK")
elif int(filesize) != 0:
size = _("FAIL")
response_code = _("OK")
temp_code = self._get_header(headers, "Response")
if temp_code != None:
response_code = temp_code
return [response_code, size, checksum]
def _get_header(self, textheaders, name):
textheaders = str(textheaders)
headers = textheaders.split("\n")
headers.reverse()
for line in headers:
line = line.strip()
result = line.split(": ")
if result[0].lower() == name.lower():
return result[1]
return None
def check_file_node(self, item, myheaders = {}):
'''
First parameter, file object
Returns dictionary of file paths with headers
'''
self.running = True
#self.results[item.name] = {}
size = item.size
urllist = item.resources
if len(urllist) == 0:
print _("No urls to download file from.")
self.running = False
return False
def thread(filename, myheaders):
checker = URLCheck(filename, myheaders)
headers = checker.info()
redir = self._get_header(headers, "Redirected")
result = self._check_process(headers, size, item.get_checksums())
result.append(redir)
#self.results[item.name][checker.geturl()] = result
self._add_result(item.filename, filename, result)
#print "-" *79
#print _("Checked") + ": %s" % filename
#if redir != None:
# print _("Redirected") + ": %s" % redir
#print _("Response Code") + ": %s\t" % self.results[item.name][filename][0] + _("Size Check") + ": %s" % self.results[item.name][filename][1]
number = 0
filename = {}
count = 1
result = {}
while (count <= len(urllist)):
filename = urllist[number].url
testme = True
urltype = str(filename.split("://", 1)[0])
#print self.only_testable, urltype
if self.only_testable and urltype not in ("http", "https", "ftp"):
#print "not checking"
testme = False
if testme:
#don't start too many threads at once
while self.activeCount() > MAX_THREADS and not self.cancel:
time.sleep(0.1)
mythread = threading.Thread(target = thread, args = [filename, myheaders], name = filename)
mythread.start()
self.threadlist.append(mythread)
#thread(filename)
number = (number + 1) % len(urllist)
count += 1
# don't return until all threads are finished (except the one main thread)
#while threading.activeCount() > 1:
# pass
#return result
self.running = False
class URLCheck:
def __init__(self, url, myheaders = {}):
self.infostring = ""
self.url = url
urlparts = urlparse.urlparse(url)
self.scheme = urlparts.scheme
headers = {"Want-Digest": "MD5;q=0.3, SHA;q=1"}
headers.update(myheaders)
if self.scheme == "http":
# need to set default port here
port = httplib.HTTP_PORT
try:
if urlparts.port != None:
port = urlparts.port
except ValueError:
self.infostring += _("Response") + ": " + _("Bad URL") + "\r\n"
return
conn = proxy.HTTPConnection(urlparts.hostname, port)
try:
conn.request("HEAD", url, headers = headers)
except socket.error, error:
self.infostring += _("Response") + ": " + _("Connection Error") + "\r\n"
return
try:
resp = conn.getresponse()
except socket.timeout:
self.infostring += _("Response") + ": " + _("Timeout") + "\r\n"
return
except socket.error, error:
self.infostring += _("Response") + ": " + _("Connection Error") + "\r\n"
return
# handle redirects here and set self.url
count = 0
while (resp != None and (resp.status == httplib.MOVED_PERMANENTLY or resp.status == httplib.FOUND) and count < MAX_REDIRECTS):
url = resp.getheader("location")
#print _("Redirected from ") + self.url + " to %s." % url
self.infostring += _("Redirected") + ": %s\r\n" % url
conn.close()
urlparts = urlparse.urlparse(url)
# need to set default port here
port = httplib.HTTP_PORT
if urlparts.port != None:
port = urlparts.port
conn = proxy.HTTPConnection(urlparts.hostname, urlparts.port)
try:
conn.request("HEAD", url, headers = headers)
resp = conn.getresponse()
except socket.gaierror:
resp = None
count += 1
self.url = url
if resp == None:
self.infostring += _("Response") + ": socket error\r\n"
elif resp.status == httplib.OK:
self.infostring += _("Response") + ": " + _("OK") + "\r\n"
else:
self.infostring += _("Response") + ": %s %s\r\n" % (resp.status, resp.reason)
# need to convert list into string
if resp != None:
for header in resp.getheaders():
self.infostring += header[0] + ": " + header[1] + "\r\n"
conn.close()
elif self.scheme == "https":
# need to set default port here
port = httplib.HTTPS_PORT
try:
if urlparts.port != None:
port = urlparts.port
except ValueError:
self.infostring += _("Response") + ": " + _("Bad URL") + "\r\n"
return
conn = download.HTTPSConnection(urlparts.hostname, port)
try:
conn.request("HEAD", url, headers = headers)
except socket.error, error:
#dir(error)
self.infostring += _("Response") + ": " + _("Connection Error") + "\r\n"
return
resp = conn.getresponse()
# handle redirects here and set self.url
count = 0
while (resp.status == httplib.MOVED_PERMANENTLY or resp.status == httplib.FOUND) and count < MAX_REDIRECTS:
url = resp.getheader("location")
#print _("Redirected") + ": %s" % url
self.infostring += _("Redirected") + ": %s\r\n" % url
conn.close()
urlparts = urlparse.urlparse(url)
# need to set default port here
port = httplib.HTTPS_PORT
if urlparts.port != None:
port = urlparts.port
conn = download.HTTPSConnection(urlparts.hostname, urlparts.port)
conn.request("HEAD", url, headers = headers)
resp = conn.getresponse()
count += 1
self.url = url
if resp.status == httplib.OK:
self.infostring += _("Response") + ": " + _("OK") + "\r\n"
else:
self.infostring += _("Response") + ": %s %s\r\n" % (resp.status, resp.reason)
# need to convert list into string
for header in resp.getheaders():
self.infostring += header[0] + ": " + header[1] + "\r\n"
conn.close()
elif self.scheme == "ftp":
try:
username = urlparts.username
password = urlparts.password
except AttributeError:
# needed for python < 2.5
username = None
if username == None:
username = "anonymous"
password = "anonymous"
ftpobj = proxy.FTP()
try:
ftpobj.connect(urlparts[1])
except socket.gaierror:
self.infostring += _("Response") + ": " + _("Bad Hostname") + "\r\n"
return
except socket.timeout:
self.infostring += _("Response") + ": " + _("timed out") + "\r\n"
return
except socket.error:
self.infostring += _("Response") + ": " + _("Connection refused") + "\r\n"
return
except (ftplib.error_perm, ftplib.error_temp), error:
self.infostring += _("Response") + ": %s\r\n" % error.message
return
try:
ftpobj.login(username, password)
except (ftplib.error_perm, ftplib.error_temp), error:
self.infostring += _("Response") + ": %s\r\n" % error.message
if ftpobj.exist(url):
self.infostring += _("Response") + ": " + _("OK") + "\r\n"
else:
self.infostring += _("Response") + ": " + _("Not Found") + "\r\n"
try:
size = ftpobj.size(url)
except:
size = None
try:
ftpobj.quit()
except: pass
if size != None:
self.infostring += _("Content Length") + ": %s\r\n" % size
else:
self.infostring += _("Response") + ": ?\r\n"
def geturl(self):
return self.url
def info(self):
# need response and content-length for HTTP
return self.infostring