1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
|
# -*- coding: utf-8 -*-
#
# Copyright (C) 2013 Vinay Sajip.
# Licensed to the Python Software Foundation under a contributor agreement.
# See LICENSE.txt and CONTRIBUTORS.txt.
#
import codecs
import json
import logging
import os
import shutil
try:
import ssl
except ImportError:
ssl = None
import subprocess
import sys
import tempfile
try:
import threading
except ImportError:
import dummy_threading as threading
import time
from compat import unittest, Request
from support import DistlibTestCase, in_github_workflow
if ssl:
from support import HTTPSServerThread
from distlib import DistlibException
from distlib.compat import urlopen, HTTPError, URLError
from distlib.index import PackageIndex
from distlib.metadata import Metadata, MetadataMissingError, METADATA_FILENAME
from distlib.util import zip_dir
if ssl:
from distlib.util import HTTPSHandler
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
HERE = os.path.abspath(os.path.dirname(__file__))
if 'HOME' in os.environ:
PYPIRC = os.path.expandvars('$HOME/.pypirc')
else:
PYPIRC = None
TEST_SERVER_PORT = os.environ.get('TEST_PYPISERVER_PORT', '8086')
IN_GITHUB_WORKFLOW = in_github_workflow()
class PackageIndexTestCase(DistlibTestCase):
run_test_server = True
test_server_url = 'http://localhost:%s/' % TEST_SERVER_PORT
@classmethod
def setUpClass(cls):
if cls.run_test_server:
cls.server = None
server_script = os.path.join(HERE, 'pypi-server-standalone.py')
if not os.path.exists(server_script):
logger.debug('test server not available - some tests '
'will be skipped.')
return
pwdfn = os.path.join(HERE, 'passwords')
if not os.path.exists(pwdfn): # pragma: no cover
with open(pwdfn, 'w') as f:
f.write('test:secret\n')
pkgdir = os.path.join(HERE, 'packages')
if not os.path.isdir(pkgdir): # pragma: no cover
os.mkdir(pkgdir)
fd, cls.sinkfile = tempfile.mkstemp(suffix='.log', prefix='distlib-pypi-')
os.close(fd)
cls.sink = sink = open(cls.sinkfile, 'w')
cmd = [
sys.executable, 'pypi-server-standalone.py', '--interface', '127.0.0.1', '--port', TEST_SERVER_PORT,
'-P', 'passwords', 'packages'
]
cls.server = subprocess.Popen(cmd, stdout=sink, stderr=sink, cwd=HERE)
# wait for the server to start up
response = None
tries = 20
timeout = 0.5
count = 0
while response is None:
try:
count += 1
time.sleep(timeout)
response = urlopen(cls.test_server_url)
if response.getcode() != 200:
response = None
break
# In case some other server is listening on the same port ...
# need to check the actual response for pypiserver-specific content
data = response.read()
if b'Welcome to pypiserver!' not in data:
response = None
break
except URLError:
if count < tries:
pass
else:
break
if response is None or cls.server.poll() is not None:
logger.warning('PyPI test server could not be run')
cls.server = None
cls.sink.close()
os.remove(cls.sinkfile)
@classmethod
def tearDownClass(cls):
if cls.run_test_server:
if cls.server and cls.server.returncode is None:
cls.server.kill()
cls.server.wait()
cls.sink.close()
try:
os.remove(cls.sinkfile)
except Exception:
logger.warning('Unable to remove test file %s', cls.sinkfile)
def setUp(self):
if not self.run_test_server:
self.index = PackageIndex()
else:
self.index = PackageIndex(self.test_server_url)
self.index.username = 'test'
self.index.password = 'secret'
def load_package_metadata(self, path):
result = None
for bn in (METADATA_FILENAME, 'package.json'):
fn = os.path.join(path, bn)
if os.path.exists(fn):
with codecs.open(fn, 'r', 'utf-8') as jf:
result = json.load(jf)
break
if not result:
raise ValueError('neither %s nor package.json '
'found in %s' % (METADATA_FILENAME, fn))
if bn == 'package.json':
result = result.get('index-metadata', {})
if result.get('metadata_version') != '2.0':
raise ValueError('Not a valid file: %s' % fn)
return result
def check_pypi_server_available(self):
if self.run_test_server and not self.server: # pragma: no cover
raise unittest.SkipTest('test server not available')
def check_testdist_available(self):
self.index.check_credentials()
self.username = self.index.username.replace('-', '_')
self.dist_project = '%s_testdist' % self.username
self.dist_version = '0.1'
self.testdir = '%s-%s' % (self.dist_project, self.dist_version)
destdir = os.path.join(HERE, self.testdir)
if not os.path.isdir(destdir): # pragma: no cover
srcdir = os.path.join(HERE, 'testdist-0.1')
shutil.copytree(srcdir, destdir)
for fn in os.listdir(destdir):
fn = os.path.join(destdir, fn)
if os.path.isfile(fn):
with codecs.open(fn, 'r', 'utf-8') as f:
data = f.read()
data = data.format(username=self.username)
with codecs.open(fn, 'w', 'utf-8') as f:
f.write(data)
zip_data = zip_dir(destdir).getvalue()
zip_name = destdir + '.zip'
with open(zip_name, 'wb') as f:
f.write(zip_data)
@unittest.skip('The PyPI API changed, so this test is temporarily skipped')
def test_register(self): # pragma: no cover
"Test registration"
self.check_pypi_server_available()
self.check_testdist_available()
d = os.path.join(HERE, self.testdir)
data = self.load_package_metadata(d)
md = Metadata()
self.assertRaises(MetadataMissingError, self.index.register, md)
md.name = self.dist_project
self.assertRaises(MetadataMissingError, self.index.register, md)
md.version = data['version']
md.summary = data['summary']
response = self.index.register(md)
self.assertEqual(response.code, 200)
def remove_package(self, name, version):
"""
Remove package. Only works with test server; PyPI would require
some scraping to get CSRF tokens into the request.
"""
d = {
':action': 'remove_pkg',
'name': name,
'version': version,
'submit_remove': 'Remove',
'submit_ok': 'OK',
}
self.index.check_credentials()
request = self.index.encode_request(d.items(), [])
try:
self.index.send_request(request)
except HTTPError as e:
if e.getcode() != 404:
raise
@unittest.skip('The PyPI API changed, so this test is temporarily skipped')
def test_upload(self):
"Test upload"
self.check_pypi_server_available()
self.check_testdist_available()
if self.run_test_server:
self.remove_package(self.dist_project, self.dist_version)
d = os.path.join(HERE, self.testdir)
data = self.load_package_metadata(d)
md = Metadata(mapping=data)
self.index.gpg_home = os.path.join(HERE, 'keys')
try:
zip_name = os.path.join(HERE, '%s.zip' % self.testdir)
self.assertRaises(DistlibException, self.index.upload_file, md, 'random-' + zip_name, 'Test User', 'tuser')
response = self.index.upload_file(md, zip_name, 'Test User', 'tuser')
self.assertEqual(response.code, 200)
if self.run_test_server:
fn = os.path.join(HERE, 'packages', os.path.basename(zip_name))
self.assertTrue(os.path.exists(fn))
except HTTPError as e:
# Treat as success if it already exists
if e.getcode() != 400 or 'already exists' not in e.msg:
raise
def test_upload_documentation(self):
"Test upload of documentation"
raise unittest.SkipTest('Skipped, as pythonhosted.org is being '
'de-emphasised and this functionality may '
'no longer be available')
self.check_pypi_server_available()
self.check_testdist_available()
d = os.path.join(HERE, self.testdir)
data = self.load_package_metadata(d)
md = Metadata(mapping=data)
d = os.path.join(d, 'doc')
# Non-existent directory
self.assertRaises(DistlibException, self.index.upload_documentation, md, d + '-random')
# Directory with no index.html
self.assertRaises(DistlibException, self.index.upload_documentation, md, HERE)
response = self.index.upload_documentation(md, d)
self.assertEqual(response.code, 200)
if not self.run_test_server:
url = 'http://packages.python.org/%s/' % self.dist_project
response = urlopen(url)
self.assertEqual(response.code, 200)
data = response.read()
expected = b'This is dummy documentation'
self.assertIn(expected, data)
@unittest.skipIf(IN_GITHUB_WORKFLOW, 'This test is end-of-line dependent')
def test_verify_signature(self): # pragma: no cover
if not self.index.gpg: # pragma: no cover
raise unittest.SkipTest('gpg not available')
sig_file = os.path.join(HERE, 'good.bin.asc')
good_file = os.path.join(HERE, 'good.bin')
bad_file = os.path.join(HERE, 'bad.bin')
gpg = self.index.gpg
self.index.gpg = None
self.assertRaises(DistlibException, self.index.verify_signature, sig_file, good_file)
self.index.gpg = gpg
# Not pointing to keycd tests
self.assertRaises(DistlibException, self.index.verify_signature, sig_file, good_file)
self.index.gpg_home = os.path.join(HERE, 'keys')
self.assertTrue(self.index.verify_signature(sig_file, good_file))
self.assertFalse(self.index.verify_signature(sig_file, bad_file))
def test_invalid(self):
self.assertRaises(DistlibException, PackageIndex, 'ftp://ftp.python.org/')
self.index.username = None
self.assertRaises(DistlibException, self.index.check_credentials)
@unittest.skipIf(PYPIRC is None or os.path.exists(PYPIRC), 'because $HOME/.pypirc is unavailable for use')
def test_save_configuration(self):
try:
self.index.save_configuration()
self.assertTrue(os.path.exists(PYPIRC))
finally:
os.remove(PYPIRC)
if ssl:
def make_https_server(self, certfile):
server = HTTPSServerThread(certfile)
flag = threading.Event()
server.start(flag)
flag.wait()
def cleanup():
server.stop()
server.join()
self.addCleanup(cleanup)
return server
@unittest.skipIf(sys.version_info[:2] > (3, 11), 'Temporary skip')
def test_ssl_verification(self):
certfile = os.path.join(HERE, 'keycert.pem')
server = self.make_https_server(certfile)
url = 'https://localhost:%d/' % server.port
req = Request(url)
self.index.ssl_verifier = HTTPSHandler(certfile)
response = self.index.send_request(req)
self.assertEqual(response.code, 200)
@unittest.skipIf(IN_GITHUB_WORKFLOW, 'This test is end-of-line dependent')
@unittest.skipIf(sys.version_info[:2] > (3, 11), 'Temporary skip')
def test_download(self): # pragma: no cover
digest = '913093474942c5a564c011f232868517' # for testsrc/README.txt
certfile = os.path.join(HERE, 'keycert.pem')
server = self.make_https_server(certfile)
url = 'https://localhost:%d/README.txt' % server.port
fd, fn = tempfile.mkstemp()
os.close(fd)
self.addCleanup(os.remove, fn)
with open(os.path.join(HERE, 'testsrc', 'README.txt'), 'rb') as f:
data = f.read()
self.index.ssl_verifier = HTTPSHandler(certfile)
self.index.download_file(url, fn) # no digest
with open(fn, 'rb') as f:
self.assertEqual(data, f.read())
self.index.download_file(url, fn, digest)
with open(fn, 'rb') as f:
self.assertEqual(data, f.read())
reporthook = lambda *args: None
self.index.download_file(url, fn, ('md5', digest), reporthook)
with open(fn, 'rb') as f:
self.assertEqual(data, f.read())
# bad digest
self.assertRaises(DistlibException, self.index.download_file, url, fn, digest[:-1] + '8')
@unittest.skipIf('SKIP_ONLINE' in os.environ, 'Skipping online test')
@unittest.skipUnless(ssl, 'SSL required for this test.')
@unittest.skipIf(True, 'skipping due to temporary changes in PyPI')
def test_search(self): # pragma: no cover
self.index = PackageIndex()
result = self.index.search({'name': 'tatterdemalion'})
self.assertEqual(len(result), 1)
result = self.index.search({'name': 'ragamuff'})
if result:
msg = 'got an unexpected result: %s' % result
else:
msg = None
self.assertEqual(len(result), 0, msg)
if __name__ == '__main__': # pragma: no cover
unittest.main()
|