[go: up one dir, main page]

File: test_index.py

package info (click to toggle)
distlib 0.3.9-1
  • links: PTS
  • area: main
  • in suites: trixie
  • size: 2,228 kB
  • sloc: python: 12,347; ansic: 820; sh: 106; makefile: 3
file content (359 lines) | stat: -rw-r--r-- 14,405 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# -*- coding: utf-8 -*-
#
# Copyright (C) 2013 Vinay Sajip.
# Licensed to the Python Software Foundation under a contributor agreement.
# See LICENSE.txt and CONTRIBUTORS.txt.
#
import codecs
import json
import logging
import os
import shutil

try:
    import ssl
except ImportError:
    ssl = None
import subprocess
import sys
import tempfile
try:
    import threading
except ImportError:
    import dummy_threading as threading
import time

from compat import unittest, Request
from support import DistlibTestCase, in_github_workflow
if ssl:
    from support import HTTPSServerThread

from distlib import DistlibException
from distlib.compat import urlopen, HTTPError, URLError
from distlib.index import PackageIndex
from distlib.metadata import Metadata, MetadataMissingError, METADATA_FILENAME
from distlib.util import zip_dir

if ssl:
    from distlib.util import HTTPSHandler

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

HERE = os.path.abspath(os.path.dirname(__file__))

if 'HOME' in os.environ:
    PYPIRC = os.path.expandvars('$HOME/.pypirc')
else:
    PYPIRC = None

TEST_SERVER_PORT = os.environ.get('TEST_PYPISERVER_PORT', '8086')

IN_GITHUB_WORKFLOW = in_github_workflow()


class PackageIndexTestCase(DistlibTestCase):
    run_test_server = True
    test_server_url = 'http://localhost:%s/' % TEST_SERVER_PORT

    @classmethod
    def setUpClass(cls):
        if cls.run_test_server:
            cls.server = None
            server_script = os.path.join(HERE, 'pypi-server-standalone.py')
            if not os.path.exists(server_script):
                logger.debug('test server not available - some tests '
                             'will be skipped.')
                return
            pwdfn = os.path.join(HERE, 'passwords')
            if not os.path.exists(pwdfn):  # pragma: no cover
                with open(pwdfn, 'w') as f:
                    f.write('test:secret\n')
            pkgdir = os.path.join(HERE, 'packages')
            if not os.path.isdir(pkgdir):  # pragma: no cover
                os.mkdir(pkgdir)
            fd, cls.sinkfile = tempfile.mkstemp(suffix='.log', prefix='distlib-pypi-')
            os.close(fd)
            cls.sink = sink = open(cls.sinkfile, 'w')
            cmd = [
                sys.executable, 'pypi-server-standalone.py', '--interface', '127.0.0.1', '--port', TEST_SERVER_PORT,
                '-P', 'passwords', 'packages'
            ]
            cls.server = subprocess.Popen(cmd, stdout=sink, stderr=sink, cwd=HERE)
            # wait for the server to start up
            response = None
            tries = 20
            timeout = 0.5
            count = 0
            while response is None:
                try:
                    count += 1
                    time.sleep(timeout)
                    response = urlopen(cls.test_server_url)
                    if response.getcode() != 200:
                        response = None
                        break
                    # In case some other server is listening on the same port ...
                    # need to check the actual response for pypiserver-specific content
                    data = response.read()
                    if b'Welcome to pypiserver!' not in data:
                        response = None
                        break
                except URLError:
                    if count < tries:
                        pass
                    else:
                        break
            if response is None or cls.server.poll() is not None:
                logger.warning('PyPI test server could not be run')
                cls.server = None
                cls.sink.close()
                os.remove(cls.sinkfile)

    @classmethod
    def tearDownClass(cls):
        if cls.run_test_server:
            if cls.server and cls.server.returncode is None:
                cls.server.kill()
                cls.server.wait()
                cls.sink.close()
                try:
                    os.remove(cls.sinkfile)
                except Exception:
                    logger.warning('Unable to remove test file %s', cls.sinkfile)

    def setUp(self):
        if not self.run_test_server:
            self.index = PackageIndex()
        else:
            self.index = PackageIndex(self.test_server_url)
            self.index.username = 'test'
            self.index.password = 'secret'

    def load_package_metadata(self, path):
        result = None
        for bn in (METADATA_FILENAME, 'package.json'):
            fn = os.path.join(path, bn)
            if os.path.exists(fn):
                with codecs.open(fn, 'r', 'utf-8') as jf:
                    result = json.load(jf)
                    break
        if not result:
            raise ValueError('neither %s nor package.json '
                             'found in %s' % (METADATA_FILENAME, fn))
        if bn == 'package.json':
            result = result.get('index-metadata', {})
        if result.get('metadata_version') != '2.0':
            raise ValueError('Not a valid file: %s' % fn)
        return result

    def check_pypi_server_available(self):
        if self.run_test_server and not self.server:  # pragma: no cover
            raise unittest.SkipTest('test server not available')

    def check_testdist_available(self):
        self.index.check_credentials()
        self.username = self.index.username.replace('-', '_')
        self.dist_project = '%s_testdist' % self.username
        self.dist_version = '0.1'
        self.testdir = '%s-%s' % (self.dist_project, self.dist_version)
        destdir = os.path.join(HERE, self.testdir)
        if not os.path.isdir(destdir):  # pragma: no cover
            srcdir = os.path.join(HERE, 'testdist-0.1')
            shutil.copytree(srcdir, destdir)
            for fn in os.listdir(destdir):
                fn = os.path.join(destdir, fn)
                if os.path.isfile(fn):
                    with codecs.open(fn, 'r', 'utf-8') as f:
                        data = f.read()
                    data = data.format(username=self.username)
                    with codecs.open(fn, 'w', 'utf-8') as f:
                        f.write(data)
            zip_data = zip_dir(destdir).getvalue()
            zip_name = destdir + '.zip'
            with open(zip_name, 'wb') as f:
                f.write(zip_data)

    @unittest.skip('The PyPI API changed, so this test is temporarily skipped')
    def test_register(self):  # pragma: no cover
        "Test registration"
        self.check_pypi_server_available()
        self.check_testdist_available()
        d = os.path.join(HERE, self.testdir)
        data = self.load_package_metadata(d)
        md = Metadata()
        self.assertRaises(MetadataMissingError, self.index.register, md)
        md.name = self.dist_project
        self.assertRaises(MetadataMissingError, self.index.register, md)
        md.version = data['version']
        md.summary = data['summary']
        response = self.index.register(md)
        self.assertEqual(response.code, 200)

    def remove_package(self, name, version):
        """
        Remove package. Only works with test server; PyPI would require
        some scraping to get CSRF tokens into the request.
        """
        d = {
            ':action': 'remove_pkg',
            'name': name,
            'version': version,
            'submit_remove': 'Remove',
            'submit_ok': 'OK',
        }
        self.index.check_credentials()
        request = self.index.encode_request(d.items(), [])
        try:
            self.index.send_request(request)
        except HTTPError as e:
            if e.getcode() != 404:
                raise

    @unittest.skip('The PyPI API changed, so this test is temporarily skipped')
    def test_upload(self):
        "Test upload"
        self.check_pypi_server_available()
        self.check_testdist_available()
        if self.run_test_server:
            self.remove_package(self.dist_project, self.dist_version)
        d = os.path.join(HERE, self.testdir)
        data = self.load_package_metadata(d)
        md = Metadata(mapping=data)
        self.index.gpg_home = os.path.join(HERE, 'keys')
        try:
            zip_name = os.path.join(HERE, '%s.zip' % self.testdir)
            self.assertRaises(DistlibException, self.index.upload_file, md, 'random-' + zip_name, 'Test User', 'tuser')
            response = self.index.upload_file(md, zip_name, 'Test User', 'tuser')
            self.assertEqual(response.code, 200)
            if self.run_test_server:
                fn = os.path.join(HERE, 'packages', os.path.basename(zip_name))
                self.assertTrue(os.path.exists(fn))
        except HTTPError as e:
            # Treat as success if it already exists
            if e.getcode() != 400 or 'already exists' not in e.msg:
                raise

    def test_upload_documentation(self):
        "Test upload of documentation"
        raise unittest.SkipTest('Skipped, as pythonhosted.org is being '
                                'de-emphasised and this functionality may '
                                'no longer be available')
        self.check_pypi_server_available()
        self.check_testdist_available()
        d = os.path.join(HERE, self.testdir)
        data = self.load_package_metadata(d)
        md = Metadata(mapping=data)
        d = os.path.join(d, 'doc')
        # Non-existent directory
        self.assertRaises(DistlibException, self.index.upload_documentation, md, d + '-random')
        # Directory with no index.html
        self.assertRaises(DistlibException, self.index.upload_documentation, md, HERE)
        response = self.index.upload_documentation(md, d)
        self.assertEqual(response.code, 200)
        if not self.run_test_server:
            url = 'http://packages.python.org/%s/' % self.dist_project
            response = urlopen(url)
            self.assertEqual(response.code, 200)
            data = response.read()
            expected = b'This is dummy documentation'
            self.assertIn(expected, data)

    @unittest.skipIf(IN_GITHUB_WORKFLOW, 'This test is end-of-line dependent')
    def test_verify_signature(self):  # pragma: no cover
        if not self.index.gpg:  # pragma: no cover
            raise unittest.SkipTest('gpg not available')
        sig_file = os.path.join(HERE, 'good.bin.asc')
        good_file = os.path.join(HERE, 'good.bin')
        bad_file = os.path.join(HERE, 'bad.bin')
        gpg = self.index.gpg
        self.index.gpg = None
        self.assertRaises(DistlibException, self.index.verify_signature, sig_file, good_file)
        self.index.gpg = gpg
        # Not pointing to keycd tests
        self.assertRaises(DistlibException, self.index.verify_signature, sig_file, good_file)
        self.index.gpg_home = os.path.join(HERE, 'keys')
        self.assertTrue(self.index.verify_signature(sig_file, good_file))
        self.assertFalse(self.index.verify_signature(sig_file, bad_file))

    def test_invalid(self):
        self.assertRaises(DistlibException, PackageIndex, 'ftp://ftp.python.org/')
        self.index.username = None
        self.assertRaises(DistlibException, self.index.check_credentials)

    @unittest.skipIf(PYPIRC is None or os.path.exists(PYPIRC), 'because $HOME/.pypirc is unavailable for use')
    def test_save_configuration(self):
        try:
            self.index.save_configuration()
            self.assertTrue(os.path.exists(PYPIRC))
        finally:
            os.remove(PYPIRC)

    if ssl:

        def make_https_server(self, certfile):
            server = HTTPSServerThread(certfile)
            flag = threading.Event()
            server.start(flag)
            flag.wait()

            def cleanup():
                server.stop()
                server.join()

            self.addCleanup(cleanup)
            return server

        @unittest.skipIf(sys.version_info[:2] > (3, 11), 'Temporary skip')
        def test_ssl_verification(self):
            certfile = os.path.join(HERE, 'keycert.pem')
            server = self.make_https_server(certfile)
            url = 'https://localhost:%d/' % server.port
            req = Request(url)
            self.index.ssl_verifier = HTTPSHandler(certfile)
            response = self.index.send_request(req)
            self.assertEqual(response.code, 200)

        @unittest.skipIf(IN_GITHUB_WORKFLOW, 'This test is end-of-line dependent')
        @unittest.skipIf(sys.version_info[:2] > (3, 11), 'Temporary skip')
        def test_download(self):  # pragma: no cover
            digest = '913093474942c5a564c011f232868517'  # for testsrc/README.txt
            certfile = os.path.join(HERE, 'keycert.pem')
            server = self.make_https_server(certfile)
            url = 'https://localhost:%d/README.txt' % server.port
            fd, fn = tempfile.mkstemp()
            os.close(fd)
            self.addCleanup(os.remove, fn)
            with open(os.path.join(HERE, 'testsrc', 'README.txt'), 'rb') as f:
                data = f.read()
            self.index.ssl_verifier = HTTPSHandler(certfile)
            self.index.download_file(url, fn)  # no digest
            with open(fn, 'rb') as f:
                self.assertEqual(data, f.read())
            self.index.download_file(url, fn, digest)
            with open(fn, 'rb') as f:
                self.assertEqual(data, f.read())
            reporthook = lambda *args: None
            self.index.download_file(url, fn, ('md5', digest), reporthook)
            with open(fn, 'rb') as f:
                self.assertEqual(data, f.read())
            # bad digest
            self.assertRaises(DistlibException, self.index.download_file, url, fn, digest[:-1] + '8')

    @unittest.skipIf('SKIP_ONLINE' in os.environ, 'Skipping online test')
    @unittest.skipUnless(ssl, 'SSL required for this test.')
    @unittest.skipIf(True, 'skipping due to temporary changes in PyPI')
    def test_search(self):  # pragma: no cover
        self.index = PackageIndex()
        result = self.index.search({'name': 'tatterdemalion'})
        self.assertEqual(len(result), 1)
        result = self.index.search({'name': 'ragamuff'})
        if result:
            msg = 'got an unexpected result: %s' % result
        else:
            msg = None
        self.assertEqual(len(result), 0, msg)


if __name__ == '__main__':  # pragma: no cover
    unittest.main()