[go: up one dir, main page]

File: http.py

package info (click to toggle)
toot 0.34.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 868 kB
  • sloc: python: 4,858; makefile: 3
file content (99 lines) | stat: -rw-r--r-- 2,881 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from requests import Request, Session
from requests.exceptions import RequestException

from toot import __version__
from toot.exceptions import NotFoundError, ApiError
from toot.logging import log_request, log_response


def send_request(request, allow_redirects=True):
    # Set a user agent string
    # Required for accessing instances using Cloudfront DDOS protection.
    request.headers["User-Agent"] = "toot/{}".format(__version__)

    log_request(request)

    try:
        with Session() as session:
            prepared = session.prepare_request(request)
            settings = session.merge_environment_settings(prepared.url, {}, None, None, None)
            response = session.send(prepared, allow_redirects=allow_redirects, **settings)
    except RequestException as ex:
        raise ApiError(f"Request failed: {str(ex)}")

    log_response(response)

    return response


def _get_error_message(response):
    """Attempt to extract an error message from response body"""
    try:
        data = response.json()
        if "error_description" in data:
            return data['error_description']
        if "error" in data:
            return data['error']
    except Exception:
        pass

    return "Unknown error"


def process_response(response):
    if not response.ok:
        error = _get_error_message(response)

        if response.status_code == 404:
            raise NotFoundError(error)

        raise ApiError(error)

    return response


def get(app, user, path, params=None, headers=None):
    url = app.base_url + path

    headers = headers or {}
    headers["Authorization"] = f"Bearer {user.access_token}"

    request = Request('GET', url, headers, params=params)
    response = send_request(request)

    return process_response(response)


def anon_get(url, params=None):
    request = Request('GET', url, None, params=params)
    response = send_request(request)

    return process_response(response)


def post(app, user, path, headers=None, files=None, data=None, json=None, allow_redirects=True):
    url = app.base_url + path

    headers = headers or {}
    headers["Authorization"] = f"Bearer {user.access_token}"

    return anon_post(url, headers=headers, files=files, data=data, json=json, allow_redirects=allow_redirects)


def delete(app, user, path, data=None, headers=None):
    url = app.base_url + path

    headers = headers or {}
    headers["Authorization"] = f"Bearer {user.access_token}"

    request = Request('DELETE', url, headers=headers, json=data)
    response = send_request(request)

    return process_response(response)


def anon_post(url, headers=None, files=None, data=None, json=None, allow_redirects=True):
    request = Request(method="POST", url=url, headers=headers, files=files, data=data, json=json)
    response = send_request(request, allow_redirects)

    return process_response(response)