#!/usr/bin/env python
########################################################################
#
# Project: Metalink Checker
# URL: http://www.nabber.org/projects/
# E-mail: webmaster@nabber.org
#
# Copyright: (C) 2007-2008, Neil McNab
# License: GNU General Public License Version 2
# (http://www.gnu.org/copyleft/gpl.html)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Filename: $URL$
# Last Updated: $Date$
# Version: $Rev$
# Author(s): Neil McNab
#
# Description:
# Command line application and Python library that checks metalink files.
# Requires Python 2.5 or newer.
#
# Library Instructions:
# - Use as expected.
#
# import checker
#
# results = checker.check_metalink("file.metalink")
#
########################################################################
import optparse
import urllib2
import urlparse
import os.path
import xml.dom.minidom
import random
import sys
import re
import socket
import base64
import hashlib
import httplib
import ftplib
import xmlutils
import download
import locale
import gettext
MAX_REDIRECTS = 20
def translate():
'''
Setup translation path
'''
if __name__=="__main__":
try:
base = os.path.basename(__file__)[:-3]
localedir = os.path.join(os.path.dirname(__file__), "locale")
except NameError:
base = os.path.basename(sys.executable)[:-4]
localedir = os.path.join(os.path.dirname(sys.executable), "locale")
else:
temp = __name__.split(".")
base = temp[-1]
localedir = os.path.join("/".join(["%s" % k for k in temp[:-1]]), "locale")
#print base, localedir
t = gettext.translation(base, localedir, [locale.getdefaultlocale()[0]], None, 'en')
return t.ugettext
_ = translate()
def check_metalink(src):
'''
Decode a metalink file, can be local or remote
First parameter, file to download, URL or file path to download from
Returns the results of the check in a dictonary
'''
src = download.complete_url(src)
datasource = urllib2.urlopen(src)
try:
dom2 = xml.dom.minidom.parse(datasource) # parse an open file
except:
print _("ERROR parsing XML.")
raise
datasource.close()
metalink_node = xmlutils.get_subnodes(dom2, ["metalink"])
try:
metalink_type = get_attr_from_item(metalink_node, "type")
except:
metalink_type = None
if metalink_type == "dynamic":
origin = get_attr_from_item(metalink_node, "origin")
if origin != src:
return check_metalink(origin)
urllist = xmlutils.get_subnodes(dom2, ["metalink", "files", "file"])
if len(urllist) == 0:
print _("No urls to download file from.")
return False
results = {}
for filenode in urllist:
try:
size = xmlutils.get_xml_tag_strings(filenode, ["size"])[0]
except:
size = None
name = xmlutils.get_attr_from_item(filenode, "name")
print "=" * 79
print _("File") + ": %s " % name + _("Size") + ": %s" % size
results[name] = check_file_node(filenode)
return results
def check_process(headers, filesize):
size = "?"
sizeheader = get_header(headers, "Content-Length")
if sizeheader != None and filesize != None:
if sizeheader == filesize:
size = _("OK")
else:
size = _("FAIL")
response_code = _("OK")
temp_code = get_header(headers, "Response")
if temp_code != None:
response_code = temp_code
return (response_code, size)
def get_header(textheaders, name):
textheaders = str(textheaders)
headers = textheaders.split("\n")
for line in headers:
line = line.strip()
result = line.split(": ")
if result[0].lower() == name.lower():
return result[1]
return None
def check_file_node(item):
'''
Downloads a specific version of a program
First parameter, file XML node
Second parameter, file path to save to
Third parameter, optional, force a new download even if a valid copy already exists
Fouth parameter, optional, progress handler callback
Returns dictionary of file paths with headers
'''
try:
size = get_xml_tag_strings(item, ["size"])[0]
except:
size = None
urllist = xmlutils.get_subnodes(item, ["resources", "url"])
if len(urllist) == 0:
print _("No urls to download file from.")
return False
number = 0
filename = {}
count = 1
result = {}
while (count <= len(urllist)):
filename = urllist[number].firstChild.nodeValue.strip()
print "-" *79
print _("Checking") + ": %s" % filename
checker = URLCheck(filename)
headers = checker.info()
result[checker.geturl()] = check_process(headers, size)
print _("Response Code") + ": %s\t" % result[checker.geturl()][0] + _("Size Check") + ": %s" % result[checker.geturl()][1]
number = (number + 1) % len(urllist)
count += 1
return result
class URLCheck:
def __init__(self, url):
self.infostring = ""
self.url = url
urlparts = urlparse.urlparse(url)
self.scheme = urlparts.scheme
if self.scheme == "http":
# need to set default port here
port = httplib.HTTP_PORT
try:
if urlparts.port != None:
port = urlparts.port
except ValueError:
self.infostring += _("Response") + ": " + _("Bad URL") + "\r\n"
return
conn = download.HTTPConnection(urlparts.hostname, port)
try:
conn.request("HEAD", url)
except socket.error, error:
self.infostring += _("Response") + ": " + _("Connection Error") + "\r\n"
return
resp = conn.getresponse()
# handle redirects here and set self.url
count = 0
while (resp.status == httplib.MOVED_PERMANENTLY or resp.status == httplib.FOUND) and count < MAX_REDIRECTS:
url = resp.getheader("location")
print _("Redirected") + ": %s" % url
conn.close()
urlparts = urlparse.urlparse(url)
# need to set default port here
port = httplib.HTTP_PORT
if urlparts.port != None:
port = urlparts.port
conn = download.HTTPConnection(urlparts.hostname, urlparts.port)
conn.request("HEAD", url)
resp = conn.getresponse()
count += 1
self.url = url
if resp.status == httplib.OK:
self.infostring += _("Response") + ": " + _("OK") + "\r\n"
else:
self.infostring += _("Response") + ": %s %s\r\n" % (resp.status, resp.reason)
# need to convert list into string
for header in resp.getheaders():
self.infostring += header[0] + ": " + header[1] + "\r\n"
conn.close()
elif self.scheme == "https":
# need to set default port here
port = httplib.HTTPS_PORT
try:
if urlparts.port != None:
port = urlparts.port
except ValueError:
self.infostring += _("Response") + ": " + _("Bad URL") + "\r\n"
return
conn = download.HTTPSConnection(urlparts.hostname, port)
try:
conn.request("HEAD", url)
except socket.error, error:
#dir(error)
self.infostring += _("Response") + ": " + _("Connection Error") + "\r\n"
return
resp = conn.getresponse()
# handle redirects here and set self.url
count = 0
while (resp.status == httplib.MOVED_PERMANENTLY or resp.status == httplib.FOUND) and count < MAX_REDIRECTS:
url = resp.getheader("location")
print _("Redirected") + ": %s" % url
conn.close()
urlparts = urlparse.urlparse(url)
# need to set default port here
port = httplib.HTTPS_PORT
if urlparts.port != None:
port = urlparts.port
conn = download.HTTPSConnection(urlparts.hostname, urlparts.port)
conn.request("HEAD", url)
resp = conn.getresponse()
count += 1
self.url = url
if resp.status == httplib.OK:
self.infostring += _("Response") + ": " + _("OK") + "\r\n"
else:
self.infostring += _("Response") + ": %s %s\r\n" % (resp.status, resp.reason)
# need to convert list into string
for header in resp.getheaders():
self.infostring += header[0] + ": " + header[1] + "\r\n"
conn.close()
elif self.scheme == "ftp":
try:
username = urlparts.username
password = urlparts.password
except AttributeError:
# needed for python < 2.5
username = None
if username == None:
username = "anonymous"
password = "anonymous"
ftpobj = download.FTP()
try:
ftpobj.connect(urlparts[1])
except socket.gaierror:
self.infostring += _("Response") + ": " + _("Bad Hostname") + "\r\n"
return
except socket.timeout:
self.infostring += _("Response") + ": " + _("timed out") + "\r\n"
return
except socket.error:
self.infostring += _("Response") + ": " + _("Connection refused") + "\r\n"
return
try:
ftpobj.login(username, password)
except (ftplib.error_perm), error:
self.infostring += _("Response") + ": %s\r\n" % error.message
if ftpobj.exist(url):
self.infostring += _("Response") + ": " + _("OK") + "\r\n"
else:
self.infostring += _("Response") + ": " + _("Not Found") + "\r\n"
try:
size = ftpobj.size(url)
except:
size = None
try:
ftpobj.quit()
except: pass
if size != None:
self.infostring += _("Content Length") + ": %s\r\n" % size
else:
self.infostring += _("Response") + ": ?\r\n"
def geturl(self):
return self.url
def info(self):
# need response and content-length for HTTP
return self.infostring