use crate::{
credentials,
fetch::{refs, Action, Arguments, Command, Delegate, Error, LsRefsAction, Response},
};
use git_features::{progress, progress::Progress};
use git_transport::{
client,
client::{SetServiceResponse, TransportV2Ext},
Service,
};
use maybe_async::maybe_async;
use std::io;
#[maybe_async]
pub async fn fetch<F, D, T>(
mut transport: T,
mut delegate: D,
mut authenticate: F,
mut progress: impl Progress,
) -> Result<(), Error>
where
F: FnMut(credentials::Action<'_>) -> credentials::Result,
D: Delegate,
T: client::Transport,
{
let (protocol_version, parsed_refs, capabilities) = {
progress.init(None, progress::steps());
progress.set_name("handshake");
progress.step();
let extra_parameters = delegate.handshake_extra_parameters();
let extra_parameters: Vec<_> = extra_parameters
.iter()
.map(|(k, v)| (k.as_str(), v.as_ref().map(|s| s.as_str())))
.collect();
let supported_versions: Vec<_> = transport.supported_protocol_versions().into();
let result = transport.handshake(Service::UploadPack, &extra_parameters).await;
let SetServiceResponse {
actual_protocol,
capabilities,
refs,
} = match result {
Ok(v) => Ok(v),
Err(client::Error::Io { ref err }) if err.kind() == io::ErrorKind::PermissionDenied => {
drop(result); let url = transport.to_url();
progress.set_name("authentication");
let credentials::Outcome { identity, next } =
authenticate(credentials::Action::Fill(&url))?.expect("FILL provides an identity");
transport.set_identity(identity)?;
progress.step();
progress.set_name("handshake (authenticated)");
match transport.handshake(Service::UploadPack, &extra_parameters).await {
Ok(v) => {
authenticate(next.approve())?;
Ok(v)
}
Err(client::Error::Io { err }) if err.kind() == io::ErrorKind::PermissionDenied => {
authenticate(next.reject())?;
Err(client::Error::Io { err })
}
Err(err) => Err(err),
}
}
Err(err) => Err(err),
}?;
if !supported_versions.is_empty() && !supported_versions.contains(&actual_protocol) {
return Err(Error::TransportProtocolPolicyViolation {
actual_version: actual_protocol,
});
}
let parsed_refs = match refs {
Some(mut refs) => {
assert_eq!(
actual_protocol,
git_transport::Protocol::V1,
"Only V1 auto-responds with refs"
);
Some(
refs::from_v1_refs_received_as_part_of_handshake_and_capabilities(&mut refs, capabilities.iter())
.await?,
)
}
None => None,
};
(actual_protocol, parsed_refs, capabilities)
};
let parsed_refs = match parsed_refs {
Some(refs) => refs,
None => {
assert_eq!(
protocol_version,
git_transport::Protocol::V2,
"Only V2 needs a separate request to get specific refs"
);
let ls_refs = Command::LsRefs;
let mut ls_features = ls_refs.default_features(protocol_version, &capabilities);
let mut ls_args = ls_refs.initial_arguments(&ls_features);
match delegate.prepare_ls_refs(&capabilities, &mut ls_args, &mut ls_features) {
Ok(LsRefsAction::Skip) => Vec::new(),
Ok(LsRefsAction::Continue) => {
ls_refs.validate_argument_prefixes_or_panic(
protocol_version,
&capabilities,
&ls_args,
&ls_features,
);
progress.step();
progress.set_name("list refs");
let mut remote_refs = transport
.invoke(
ls_refs.as_str(),
ls_features.into_iter(),
if ls_args.is_empty() {
None
} else {
Some(ls_args.into_iter())
},
)
.await?;
refs::from_v2_refs(&mut remote_refs).await?
}
Err(err) => {
indicate_end_of_interaction(transport).await?;
return Err(err.into());
}
}
}
};
let fetch = Command::Fetch;
let mut fetch_features = fetch.default_features(protocol_version, &capabilities);
match delegate.prepare_fetch(protocol_version, &capabilities, &mut fetch_features, &parsed_refs) {
Ok(Action::Cancel) => {
return if matches!(protocol_version, git_transport::Protocol::V1)
|| delegate.indicate_client_done_when_fetch_completes()
{
indicate_end_of_interaction(transport).await
} else {
Ok(())
};
}
Ok(Action::Continue) => {
fetch.validate_argument_prefixes_or_panic(protocol_version, &capabilities, &[], &fetch_features);
}
Err(err) => {
indicate_end_of_interaction(transport).await?;
return Err(err.into());
}
}
Response::check_required_features(protocol_version, &fetch_features)?;
let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
let mut arguments = Arguments::new(protocol_version, fetch_features);
let mut previous_response = None::<Response>;
let mut round = 1;
'negotiation: loop {
progress.step();
progress.set_name(format!("negotiate (round {})", round));
round += 1;
let action = delegate.negotiate(&parsed_refs, &mut arguments, previous_response.as_ref())?;
let mut reader = arguments.send(&mut transport, action == Action::Cancel).await?;
if sideband_all {
setup_remote_progress(&mut progress, &mut reader);
}
let response = Response::from_line_reader(protocol_version, &mut reader).await?;
previous_response = if response.has_pack() {
progress.step();
progress.set_name("receiving pack");
if !sideband_all {
setup_remote_progress(&mut progress, &mut reader);
}
delegate.receive_pack(reader, progress, &parsed_refs, &response).await?;
break 'negotiation;
} else {
match action {
Action::Cancel => break 'negotiation,
Action::Continue => Some(response),
}
}
}
if matches!(protocol_version, git_transport::Protocol::V2) && delegate.indicate_client_done_when_fetch_completes() {
indicate_end_of_interaction(transport).await?;
}
Ok(())
}
#[maybe_async]
async fn indicate_end_of_interaction(mut transport: impl client::Transport) -> Result<(), Error> {
if transport.is_stateful() {
transport
.request(client::WriteMode::Binary, client::MessageKind::Flush)?
.into_read()
.await?;
}
Ok(())
}
fn setup_remote_progress(
progress: &mut impl Progress,
reader: &mut Box<dyn git_transport::client::ExtendedBufRead + Unpin + '_>,
) {
reader.set_progress_handler(Some(Box::new({
let mut remote_progress = progress.add_child("remote");
move |is_err: bool, data: &[u8]| {
crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress)
}
}) as git_transport::client::HandleProgress));
}