#!/usr/bin/env python
########################################################################
#
# Project: Metalink Checker
# URL: http://www.nabber.org/projects/
# E-mail: webmaster@nabber.org
#
# Copyright: (C) 2007-2008, Neil McNab
# License: GNU General Public License Version 2
# (http://www.gnu.org/copyleft/gpl.html)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Filename: $URL$
# Last Updated: $Date$
# Version: $Rev$
# Author(s): Neil McNab
#
# Description:
# Command line application and Python library that checks metalink files.
# Requires Python 2.5 or newer.
#
# Library Instructions:
# - Use as expected.
#
# import checker
#
# results = checker.check_metalink("file.metalink")
#
########################################################################
import optparse
import urllib2
import urlparse
import os.path
import random
import sys
import re
import socket
import base64
import hashlib
import httplib
import ftplib
import threading
import xmlutils
import download
import locale
import gettext
MAX_REDIRECTS = 20
MAX_THREADS = 10
def translate():
'''
Setup translation path
'''
if __name__=="__main__":
try:
base = os.path.basename(__file__)[:-3]
localedir = os.path.join(os.path.dirname(__file__), "locale")
except NameError:
base = os.path.basename(sys.executable)[:-4]
localedir = os.path.join(os.path.dirname(sys.executable), "locale")
else:
temp = __name__.split(".")
base = temp[-1]
localedir = os.path.join("/".join(["%s" % k for k in temp[:-1]]), "locale")
#print base, localedir
t = gettext.translation(base, localedir, [locale.getdefaultlocale()[0]], None, 'en')
return t.ugettext
_ = translate()
class Checker:
def __init__(self):
self.threadlist = []
self.clear_results()
def check_metalink(self, src):
'''
Decode a metalink file, can be local or remote
First parameter, file to download, URL or file path to download from
Returns the results of the check in a dictonary
'''
src = download.complete_url(src)
datasource = urllib2.urlopen(src)
try:
metalink = xmlutils.Metalink()
metalink.parsehandle(datasource)
except:
print _("ERROR parsing XML.")
raise
datasource.close()
if metalink.type == "dynamic":
origin = metalink.origin
if origin != src:
try:
return self.check_metalink(origin)
except:
print "Error downloading from origin %s, not using." % origin
urllist = metalink.files
if len(urllist) == 0:
print _("No urls to download file from.")
return False
#results = {}
for filenode in urllist:
size = filenode.size
name = filenode.filename
print "=" * 79
print _("File") + ": %s " % name + _("Size") + ": %s" % size
self.check_file_node(filenode)
#return results
def isAlive(self):
for threadobj in self.threadlist:
if threadobj.isAlive():
return True
return False
def activeCount(self):
count = 0
for threadobj in self.threadlist:
if threadobj.isAlive():
count += 1
return count
def get_results(self, block=True):
while block and self.isAlive():
pass
return self.results
def clear_results(self):
while self.isAlive():
pass
self.threadlist = []
self.results = {}
def _check_process(self, headers, filesize):
size = "?"
sizeheader = self._get_header(headers, "Content-Length")
if sizeheader != None and filesize != None:
if int(sizeheader) == int(filesize):
size = _("OK")
else:
size = _("FAIL")
response_code = _("OK")
temp_code = self._get_header(headers, "Response")
if temp_code != None:
response_code = temp_code
return (response_code, size)
def _get_header(self, textheaders, name):
textheaders = str(textheaders)
headers = textheaders.split("\n")
headers.reverse()
for line in headers:
line = line.strip()
result = line.split(": ")
if result[0].lower() == name.lower():
return result[1]
return None
def check_file_node(self, item):
'''
First parameter, file object
Returns dictionary of file paths with headers
'''
self.results[item.name] = {}
size = item.size
urllist = item.resources
if len(urllist) == 0:
print _("No urls to download file from.")
return False
def thread(filename):
checker = URLCheck(filename)
headers = checker.info()
self.results[item.name][checker.geturl()] = self._check_process(headers, size)
redir = self._get_header(headers, "Redirected")
print "-" *79
print _("Checked") + ": %s" % filename
if redir != None:
print _("Redirected") + ": %s" % redir
print _("Response Code") + ": %s\t" % self.results[item.name][checker.geturl()][0] + _("Size Check") + ": %s" % self.results[item.name][checker.geturl()][1]
number = 0
filename = {}
count = 1
result = {}
while (count <= len(urllist)):
filename = urllist[number].url
#don't start too many threads at once
while self.activeCount() > MAX_THREADS:
pass
mythread = threading.Thread(target = thread, args = [filename])
mythread.start()
self.threadlist.append(mythread)
#thread(filename)
number = (number + 1) % len(urllist)
count += 1
# don't return until all threads are finished (except the one main thread)
#while threading.activeCount() > 1:
# pass
#return result
class URLCheck:
def __init__(self, url):
self.infostring = ""
self.url = url
urlparts = urlparse.urlparse(url)
self.scheme = urlparts.scheme
if self.scheme == "http":
# need to set default port here
port = httplib.HTTP_PORT
try:
if urlparts.port != None:
port = urlparts.port
except ValueError:
self.infostring += _("Response") + ": " + _("Bad URL") + "\r\n"
return
conn = download.HTTPConnection(urlparts.hostname, port)
try:
conn.request("HEAD", url)
except socket.error, error:
self.infostring += _("Response") + ": " + _("Connection Error") + "\r\n"
return
try:
resp = conn.getresponse()
except socket.timeout:
self.infostring += _("Response") + ": " + _("Timeout") + "\r\n"
return
# handle redirects here and set self.url
count = 0
while (resp.status == httplib.MOVED_PERMANENTLY or resp.status == httplib.FOUND) and count < MAX_REDIRECTS:
url = resp.getheader("location")
#print _("Redirected from ") + self.url + " to %s." % url
self.infostring += _("Redirected") + ": %s\r\n" % url
conn.close()
urlparts = urlparse.urlparse(url)
# need to set default port here
port = httplib.HTTP_PORT
if urlparts.port != None:
port = urlparts.port
conn = download.HTTPConnection(urlparts.hostname, urlparts.port)
conn.request("HEAD", url)
resp = conn.getresponse()
count += 1
self.url = url
if resp.status == httplib.OK:
self.infostring += _("Response") + ": " + _("OK") + "\r\n"
else:
self.infostring += _("Response") + ": %s %s\r\n" % (resp.status, resp.reason)
# need to convert list into string
for header in resp.getheaders():
self.infostring += header[0] + ": " + header[1] + "\r\n"
conn.close()
elif self.scheme == "https":
# need to set default port here
port = httplib.HTTPS_PORT
try:
if urlparts.port != None:
port = urlparts.port
except ValueError:
self.infostring += _("Response") + ": " + _("Bad URL") + "\r\n"
return
conn = download.HTTPSConnection(urlparts.hostname, port)
try:
conn.request("HEAD", url)
except socket.error, error:
#dir(error)
self.infostring += _("Response") + ": " + _("Connection Error") + "\r\n"
return
resp = conn.getresponse()
# handle redirects here and set self.url
count = 0
while (resp.status == httplib.MOVED_PERMANENTLY or resp.status == httplib.FOUND) and count < MAX_REDIRECTS:
url = resp.getheader("location")
#print _("Redirected") + ": %s" % url
self.infostring += _("Redirected") + ": %s\r\n" % url
conn.close()
urlparts = urlparse.urlparse(url)
# need to set default port here
port = httplib.HTTPS_PORT
if urlparts.port != None:
port = urlparts.port
conn = download.HTTPSConnection(urlparts.hostname, urlparts.port)
conn.request("HEAD", url)
resp = conn.getresponse()
count += 1
self.url = url
if resp.status == httplib.OK:
self.infostring += _("Response") + ": " + _("OK") + "\r\n"
else:
self.infostring += _("Response") + ": %s %s\r\n" % (resp.status, resp.reason)
# need to convert list into string
for header in resp.getheaders():
self.infostring += header[0] + ": " + header[1] + "\r\n"
conn.close()
elif self.scheme == "ftp":
try:
username = urlparts.username
password = urlparts.password
except AttributeError:
# needed for python < 2.5
username = None
if username == None:
username = "anonymous"
password = "anonymous"
ftpobj = download.FTP()
try:
ftpobj.connect(urlparts[1])
except socket.gaierror:
self.infostring += _("Response") + ": " + _("Bad Hostname") + "\r\n"
return
except socket.timeout:
self.infostring += _("Response") + ": " + _("timed out") + "\r\n"
return
except socket.error:
self.infostring += _("Response") + ": " + _("Connection refused") + "\r\n"
return
try:
ftpobj.login(username, password)
except (ftplib.error_perm), error:
self.infostring += _("Response") + ": %s\r\n" % error.message
if ftpobj.exist(url):
self.infostring += _("Response") + ": " + _("OK") + "\r\n"
else:
self.infostring += _("Response") + ": " + _("Not Found") + "\r\n"
try:
size = ftpobj.size(url)
except:
size = None
try:
ftpobj.quit()
except: pass
if size != None:
self.infostring += _("Content Length") + ": %s\r\n" % size
else:
self.infostring += _("Response") + ": ?\r\n"
def geturl(self):
return self.url
def info(self):
# need response and content-length for HTTP
return self.infostring