1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use http::{self, HeaderValue, Response, StatusCode};
use thiserror::Error;
use tokio_tungstenite::tungstenite as ws;
use crate::{client::Body, Error, Result};
#[derive(Debug)]
pub enum StreamProtocol {
/// Binary subprotocol v4. See `Client::connect`.
V4,
/// Binary subprotocol v5. See `Client::connect`.
/// v5 supports CLOSE signals.
/// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L52C26-L52C43
V5,
}
impl StreamProtocol {
pub fn as_str(&self) -> &'static str {
match self {
Self::V4 => "v4.channel.k8s.io",
Self::V5 => "v5.channel.k8s.io",
}
}
fn as_bytes(&self) -> &'static [u8] {
self.as_str().as_bytes()
}
pub fn supports_stream_close(&self) -> bool {
match self {
Self::V4 => false,
Self::V5 => true,
}
}
/// Add HTTP header SEC_WEBSOCKET_PROTOCOL with a list of supported protocol.
pub fn add_to_headers(headers: &mut http::HeaderMap) -> Result<()> {
// Protocols we support in our preferred order.
let supported_protocols = [
// v5 supports CLOSE signals.
Self::V5.as_str(),
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
// There's no official documentation about this protocol, but it's described in
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
// There's a comment about v4 and `Status` object in
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
Self::V4.as_str(),
];
let header_value_string = supported_protocols.join(", ");
// Note: Multiple headers does not work. Only a single CSV works.
headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_str(&header_value_string).map_err(|e| Error::HttpError(e.into()))?,
);
Ok(())
}
/// Return the subprotocol of an HTTP response.
fn get_from_response<B>(res: &Response<B>) -> Option<Self> {
let headers = res.headers();
match headers
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
.map(|h| h.as_bytes())
{
Some(protocol) => {
if protocol == Self::V4.as_bytes() {
Some(Self::V4)
} else if protocol == Self::V5.as_bytes() {
Some(Self::V5)
} else {
None
}
}
_ => None,
}
}
}
/// Possible errors from upgrading to a WebSocket connection
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
#[derive(Debug, Error)]
pub enum UpgradeConnectionError {
/// The server did not respond with [`SWITCHING_PROTOCOLS`] status when upgrading the
/// connection.
///
/// [`SWITCHING_PROTOCOLS`]: http::status::StatusCode::SWITCHING_PROTOCOLS
#[error("failed to switch protocol: {0}")]
ProtocolSwitch(http::status::StatusCode),
/// `Upgrade` header was not set to `websocket` (case insensitive)
#[error("upgrade header was not set to websocket")]
MissingUpgradeWebSocketHeader,
/// `Connection` header was not set to `Upgrade` (case insensitive)
#[error("connection header was not set to Upgrade")]
MissingConnectionUpgradeHeader,
/// `Sec-WebSocket-Accept` key mismatched.
#[error("Sec-WebSocket-Accept key mismatched")]
SecWebSocketAcceptKeyMismatch,
/// `Sec-WebSocket-Protocol` mismatched.
#[error("Sec-WebSocket-Protocol mismatched")]
SecWebSocketProtocolMismatch,
/// Failed to get pending HTTP upgrade.
#[error("failed to get pending HTTP upgrade: {0}")]
GetPendingUpgrade(#[source] hyper::Error),
}
// Verify upgrade response according to RFC6455.
// Based on `tungstenite` and added subprotocol verification.
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<StreamProtocol, UpgradeConnectionError> {
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
}
let headers = res.headers();
if !headers
.get(http::header::UPGRADE)
.and_then(|h| h.to_str().ok())
.map(|h| h.eq_ignore_ascii_case("websocket"))
.unwrap_or(false)
{
return Err(UpgradeConnectionError::MissingUpgradeWebSocketHeader);
}
if !headers
.get(http::header::CONNECTION)
.and_then(|h| h.to_str().ok())
.map(|h| h.eq_ignore_ascii_case("Upgrade"))
.unwrap_or(false)
{
return Err(UpgradeConnectionError::MissingConnectionUpgradeHeader);
}
let accept_key = ws::handshake::derive_accept_key(key.as_ref());
if !headers
.get(http::header::SEC_WEBSOCKET_ACCEPT)
.map(|h| h == &accept_key)
.unwrap_or(false)
{
return Err(UpgradeConnectionError::SecWebSocketAcceptKeyMismatch);
}
// Make sure that the server returned an expected subprotocol.
let protocol = match StreamProtocol::get_from_response(res) {
Some(p) => p,
None => return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch),
};
Ok(protocol)
}