This commit is contained in:
Eugene Pankov 2023-04-20 04:58:55 -07:00
parent 73d8b88cab
commit c2f1b3e1c0
No known key found for this signature in database
GPG key ID: 5896FCBBDD1CF4F4
4 changed files with 23 additions and 20 deletions

View file

@ -3,6 +3,7 @@ mod channel_session;
mod error; mod error;
mod handler; mod handler;
use std::collections::HashMap; use std::collections::HashMap;
use std::io;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::sync::Arc; use std::sync::Arc;
@ -141,7 +142,7 @@ pub struct RemoteClientHandles {
} }
impl RemoteClient { impl RemoteClient {
pub fn create(id: SessionId, services: Services) -> RemoteClientHandles { pub fn create(id: SessionId, services: Services) -> io::Result<RemoteClientHandles> {
let (event_tx, event_rx) = unbounded_channel(); let (event_tx, event_rx) = unbounded_channel();
let (command_tx, mut command_rx) = unbounded_channel(); let (command_tx, mut command_rx) = unbounded_channel();
let (abort_tx, abort_rx) = unbounded_channel(); let (abort_tx, abort_rx) = unbounded_channel();
@ -175,13 +176,13 @@ impl RemoteClient {
.instrument(Span::current()), .instrument(Span::current()),
); );
this.start(); this.start()?;
RemoteClientHandles { Ok(RemoteClientHandles {
event_rx, event_rx,
command_tx, command_tx,
abort_tx, abort_tx,
} })
} }
fn set_disconnected(&mut self) { fn set_disconnected(&mut self) {
@ -252,7 +253,7 @@ impl RemoteClient {
Ok(()) Ok(())
} }
pub fn start(mut self) { pub fn start(mut self) -> io::Result<JoinHandle<anyhow::Result<()>>> {
let name = format!("SSH {} client commands", self.id); let name = format!("SSH {} client commands", self.id);
tokio::task::Builder::new().name(&name).spawn( tokio::task::Builder::new().name(&name).spawn(
async move { async move {
@ -285,7 +286,7 @@ impl RemoteClient {
Ok::<(), anyhow::Error>(()) Ok::<(), anyhow::Error>(())
} }
.instrument(Span::current()), .instrument(Span::current()),
); )
} }
async fn handle_event(&mut self, event: InnerEvent) -> Result<bool> { async fn handle_event(&mut self, event: InnerEvent) -> Result<bool> {
@ -306,12 +307,12 @@ impl RemoteClient {
} }
ClientHandlerEvent::ForwardedTcpIp(channel, params) => { ClientHandlerEvent::ForwardedTcpIp(channel, params) => {
info!("New forwarded connection: {params:?}"); info!("New forwarded connection: {params:?}");
let id = self.setup_server_initiated_channel(channel).await; let id = self.setup_server_initiated_channel(channel).await?;
let _ = self.tx.send(RCEvent::ForwardedTcpIp(id, params)); let _ = self.tx.send(RCEvent::ForwardedTcpIp(id, params));
} }
ClientHandlerEvent::X11(channel, originator_address, originator_port) => { ClientHandlerEvent::X11(channel, originator_address, originator_port) => {
info!("New X11 connection from {originator_address}:{originator_port:?}"); info!("New X11 connection from {originator_address}:{originator_port:?}");
let id = self.setup_server_initiated_channel(channel).await; let id = self.setup_server_initiated_channel(channel).await?;
let _ = self let _ = self
.tx .tx
.send(RCEvent::X11(id, originator_address, originator_port)); .send(RCEvent::X11(id, originator_address, originator_port));
@ -328,7 +329,7 @@ impl RemoteClient {
async fn setup_server_initiated_channel( async fn setup_server_initiated_channel(
&mut self, &mut self,
channel: russh::Channel<russh::client::Msg>, channel: russh::Channel<russh::client::Msg>,
) -> Uuid { ) -> Result<Uuid> {
let id = Uuid::new_v4(); let id = Uuid::new_v4();
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
@ -339,10 +340,10 @@ impl RemoteClient {
self.child_tasks.push( self.child_tasks.push(
tokio::task::Builder::new() tokio::task::Builder::new()
.name(&format!("SSH {} {:?} ops", self.id, id)) .name(&format!("SSH {} {:?} ops", self.id, id))
.spawn(session_channel.run()).unwrap(), .spawn(session_channel.run())?,
); );
id Ok(id)
} }
async fn handle_command(&mut self, cmd: RCCommand) -> Result<bool, SshClientError> { async fn handle_command(&mut self, cmd: RCCommand) -> Result<bool, SshClientError> {
@ -524,7 +525,8 @@ impl RemoteClient {
self.child_tasks.push( self.child_tasks.push(
tokio::task::Builder::new() tokio::task::Builder::new()
.name(&format!("SSH {} {:?} ops", self.id, channel_id)) .name(&format!("SSH {} {:?} ops", self.id, channel_id))
.spawn(channel.run()).unwrap(), .spawn(channel.run())
.map_err(|e| SshClientError::Other(Box::new(e)))?,
); );
} }
Ok(()) Ok(())
@ -554,7 +556,8 @@ impl RemoteClient {
self.child_tasks.push( self.child_tasks.push(
tokio::task::Builder::new() tokio::task::Builder::new()
.name(&format!("SSH {} {:?} ops", self.id, channel_id)) .name(&format!("SSH {} {:?} ops", self.id, channel_id))
.spawn(channel.run()).unwrap(), .spawn(channel.run())
.map_err(|e| SshClientError::Other(Box::new(e)))?,
); );
} }
Ok(()) Ok(())

View file

@ -51,7 +51,7 @@ impl ProtocolServer for SSHProtocolServer {
return Err(TargetTestError::Misconfigured("Not an SSH target".to_owned())); return Err(TargetTestError::Misconfigured("Not an SSH target".to_owned()));
}; };
let mut handles = RemoteClient::create(Uuid::new_v4(), self.services.clone()); let mut handles = RemoteClient::create(Uuid::new_v4(), self.services.clone())?;
let _ = handles let _ = handles
.command_tx .command_tx

View file

@ -90,11 +90,11 @@ pub async fn run_server(services: Services, address: SocketAddr) -> Result<()> {
tokio::task::Builder::new() tokio::task::Builder::new()
.name(&format!("SSH {id} session")) .name(&format!("SSH {id} session"))
.spawn(session); .spawn(session)?;
tokio::task::Builder::new() tokio::task::Builder::new()
.name(&format!("SSH {id} protocol")) .name(&format!("SSH {id} protocol"))
.spawn(_run_stream(russh_config, socket, handler)); .spawn(_run_stream(russh_config, socket, handler))?;
} }
Ok(()) Ok(())
} }

View file

@ -115,7 +115,7 @@ impl ServerSession {
let _span = info_span!("SSH", session=%id); let _span = info_span!("SSH", session=%id);
let _enter = _span.enter(); let _enter = _span.enter();
let mut rc_handles = RemoteClient::create(id, services.clone()); let mut rc_handles = RemoteClient::create(id, services.clone())?;
let (hub, event_sender) = EventHub::setup(); let (hub, event_sender) = EventHub::setup();
let main_event_subscription = hub let main_event_subscription = hub
@ -179,7 +179,7 @@ impl ServerSession {
} }
} }
} }
}); })?;
let name = format!("SSH {id} client events"); let name = format!("SSH {id} client events");
tokio::task::Builder::new().name(&name).spawn({ tokio::task::Builder::new().name(&name).spawn({
@ -191,7 +191,7 @@ impl ServerSession {
} }
} }
} }
}); })?;
let name = format!("SSH {id} server handler events"); let name = format!("SSH {id} server handler events");
tokio::task::Builder::new().name(&name).spawn({ tokio::task::Builder::new().name(&name).spawn({
@ -203,7 +203,7 @@ impl ServerSession {
} }
} }
} }
}); })?;
Ok(async move { Ok(async move {
while let Some(event) = this.get_next_event().await { while let Some(event) = this.get_next_event().await {