Remote port and X11 forwarding - fixes #11, fixes #12

This commit is contained in:
Eugene Pankov 2022-09-04 12:06:09 +02:00
parent d870fc54cc
commit 5ddc260262
No known key found for this signature in database
GPG key ID: 5896FCBBDD1CF4F4
17 changed files with 420 additions and 128 deletions

2
russh

@ -1 +1 @@
Subproject commit 811680a3bf04c563fd1a4cfa39b05224d08e85c0
Subproject commit e65dd27f2102b2cb7018981f7689525a9824a8a6

View file

@ -1,4 +1,4 @@
FROM alpine:3.14
RUN apk add openssh
RUN apk add openssh curl
RUN passwd -u root
ENTRYPOINT ["/usr/sbin/sshd", "-De"]

View file

@ -1,5 +1,7 @@
import requests
import subprocess
import tempfile
import time
import pytest
from textwrap import dedent
@ -116,22 +118,66 @@ class Test:
self,
processes: ProcessManager,
wg_port,
timeout,
):
local_port = alloc_port()
wait_port(wg_port)
ssh_client = processes.start_ssh_client(
'-p',
str(wg_port),
'-v',
*common_args,
'-L', f'{local_port}:localhost:22',
'sleep', '15',
'-L', f'{local_port}:neverssl.com:80',
'-N',
password='123',
)
data = wait_port(local_port)
assert b'SSH-2.0' in data
wait_port(local_port, recv=False)
for _ in range(15):
time.sleep(1)
try:
response = requests.get(f'http://localhost:{local_port}', timeout=timeout)
except Exception:
continue
if response.status_code == 200:
break
response = requests.get(f'http://localhost:{local_port}', timeout=timeout)
print(response.text)
assert response.status_code == 200
ssh_client.kill()
def test_tcpip_forward(
self,
processes: ProcessManager,
wg_port,
timeout,
):
wait_port(wg_port)
pf_client = processes.start_ssh_client(
'-p',
str(wg_port),
'-v',
*common_args,
'-R', '1234:neverssl.com:80',
'-N',
password='123',
)
time.sleep(5)
ssh_client = processes.start_ssh_client(
'-p',
str(wg_port),
'-v',
*common_args,
'curl', '-v', 'http://localhost:1234',
password='123',
)
output = ssh_client.communicate(timeout=timeout)[0]
print(output)
assert ssh_client.returncode == 0
assert b'<html>' in output
pf_client.kill()
def test_shell(
self,
processes: ProcessManager,

View file

@ -23,7 +23,7 @@ def alloc_port():
return last_port
def wait_port(port, recv=True):
def wait_port(port, recv=True, timeout=60):
logging.debug(f'Waiting for port {port}')
data = b''
@ -49,7 +49,7 @@ def wait_port(port, recv=True):
t = threading.Thread(target=wait, daemon=True)
t.start()
t.join(timeout=5)
t.join(timeout=timeout)
if t.is_alive():
raise Exception(f'Port {port} is not up')
return data

View file

@ -25,6 +25,9 @@ pub enum WarpgateError {
NoHostInUrl,
#[error("Inconsistent state error")]
InconsistentState,
#[error("Session end")]
SessionEnd,
}
impl ResponseError for WarpgateError {

View file

@ -53,11 +53,13 @@ impl AuthStateStore {
protocol: &str,
) -> Result<(Uuid, Arc<Mutex<AuthState>>), WarpgateError> {
let id = Uuid::new_v4();
let Some(policy) = self.config_provider
.lock()
.await
.get_credential_policy(username)
.await? else {
let policy = self
.config_provider
.lock()
.await
.get_credential_policy(username)
.await?;
let Some(policy) = policy else {
return Err(WarpgateError::UserNotFound)
};

View file

@ -91,12 +91,11 @@ impl ConnectionRecorder {
.write(&u32::to_le_bytes(data.len() as u32))
.await?;
self.writer.write(&data).await?;
debug!("connection {:?} data {:?}", self.params, data);
Ok(())
}
pub async fn write_rx(&mut self, data: &[u8]) -> Result<()> {
debug!("connection {:?} data tx {:?}", self.params, data);
debug!("connection {:?} data rx {:?}", self.params, data);
let seq_rx = self.seq_rx;
self.seq_rx = self.seq_rx.wrapping_add(data.len() as u32);
self.write_packet(

View file

@ -229,7 +229,8 @@ impl Api {
auth: Option<Data<&SessionAuthorization>>,
id: Path<Uuid>,
) -> poem::Result<AuthStateResponse> {
let Some(state_arc) = get_auth_state(&id, &services, auth.map(|x|x.0)).await else {
let state_arc = get_auth_state(&id, &services, auth.map(|x| x.0)).await;
let Some(state_arc) = state_arc else {
return Ok(AuthStateResponse::NotFound);
};
serialize_auth_state_inner(state_arc).await

View file

@ -115,7 +115,13 @@ impl Api {
email: email.clone(),
};
let Some(username) = services.config_provider.lock().await.username_for_sso_credential(&cred).await? else {
let username = services
.config_provider
.lock()
.await
.username_for_sso_credential(&cred)
.await?;
let Some(username) = username else {
return make_err_response(&format!("No user matching {email}"));
};

View file

@ -32,7 +32,8 @@ pub async fn catchall_endpoint(
services: Data<&Services>,
server_handle: Option<Data<&Arc<Mutex<WarpgateServerHandle>>>>,
) -> poem::Result<Response> {
let Some((target, options)) = get_target_for_request(req, services.0).await? else {
let target_and_options = get_target_for_request(req, services.0).await?;
let Some((target, options)) = target_and_options else {
return Ok(target_select_redirect());
};

View file

@ -111,6 +111,7 @@ impl SessionChannel {
}
Some(russh::ChannelMsg::Close) => {
self.events_tx.send(RCEvent::Close(self.channel_id)).map_err(|_| SshClientError::MpscError)?;
break;
},
Some(russh::ChannelMsg::Success) => {
self.events_tx.send(RCEvent::Success(self.channel_id)).map_err(|_| SshClientError::MpscError)?;
@ -145,7 +146,7 @@ impl SessionChannel {
Some(msg) => {
warn!("unhandled channel message: {:?}", msg);
}
None => {
None => {
self.events_tx.send(RCEvent::Close(self.channel_id)).map_err(|_| SshClientError::MpscError)?;
break
},

View file

@ -1,7 +1,8 @@
use std::pin::Pin;
use futures::FutureExt;
use russh::client::Session;
use russh::client::{Msg, Session};
use russh::Channel;
use russh_keys::key::PublicKey;
use russh_keys::PublicKeyBase64;
use tokio::sync::mpsc::UnboundedSender;
@ -11,13 +12,14 @@ use warpgate_common::{SessionId, TargetSSHOptions};
use warpgate_core::Services;
use crate::known_hosts::{KnownHostValidationResult, KnownHosts};
use crate::ConnectionError;
use crate::{ConnectionError, ForwardedTcpIpParams};
#[derive(Debug)]
pub enum ClientHandlerEvent {
HostKeyReceived(PublicKey),
HostKeyUnknown(PublicKey, oneshot::Sender<bool>),
// ForwardedTCPIP(ChannelId, DirectTCPIPParams),
ForwardedTcpIp(Channel<Msg>, ForwardedTcpIpParams),
X11(Channel<Msg>, String, u32),
Disconnect,
}
@ -124,6 +126,51 @@ impl russh::client::Handler for ClientHandler {
}
.boxed()
}
fn server_channel_open_forwarded_tcpip(
self,
channel: Channel<Msg>,
connected_address: &str,
connected_port: u32,
originator_address: &str,
originator_port: u32,
session: Session,
) -> Self::FutureUnit {
let connected_address = connected_address.to_string();
let originator_address = originator_address.to_string();
async move {
let _ = self.event_tx.send(ClientHandlerEvent::ForwardedTcpIp(
channel,
ForwardedTcpIpParams {
connected_address,
connected_port,
originator_address,
originator_port,
},
));
Ok((self, session))
}
.boxed()
}
fn server_channel_open_x11(
self,
channel: Channel<Msg>,
originator_address: &str,
originator_port: u32,
session: Session,
) -> Self::FutureUnit {
let originator_address = originator_address.to_string();
async move {
let _ = self.event_tx.send(ClientHandlerEvent::X11(
channel,
originator_address,
originator_port,
));
Ok((self, session))
}
.boxed()
}
}
impl Drop for ClientHandler {

View file

@ -30,6 +30,7 @@ use super::{ChannelOperation, DirectTCPIPParams};
use crate::client::handler::ClientHandlerError;
use crate::helpers::PublicKeyAsOpenSSH;
use crate::keys::load_client_keys;
use crate::ForwardedTcpIpParams;
#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
@ -90,6 +91,8 @@ pub enum RCEvent {
Done,
HostKeyReceived(PublicKey),
HostKeyUnknown(PublicKey, oneshot::Sender<bool>),
ForwardedTcpIp(Uuid, ForwardedTcpIpParams),
X11(Uuid, String, u32),
}
pub type RCCommandReply = oneshot::Sender<Result<(), SshClientError>>;
@ -98,8 +101,8 @@ pub type RCCommandReply = oneshot::Sender<Result<(), SshClientError>>;
pub enum RCCommand {
Connect(TargetSSHOptions),
Channel(Uuid, ChannelOperation),
// ForwardTCPIP(String, u32),
// CancelTCPIPForward(String, u32),
ForwardTCPIP(String, u32),
CancelTCPIPForward(String, u32),
Disconnect,
}
@ -123,6 +126,7 @@ pub struct RemoteClient {
session: Option<Arc<Mutex<Handle<ClientHandler>>>>,
channel_pipes: Arc<Mutex<HashMap<Uuid, UnboundedSender<ChannelOperation>>>>,
pending_ops: Vec<(Uuid, ChannelOperation)>,
pending_forwards: Vec<(String, u32)>,
state: RCState,
abort_rx: UnboundedReceiver<()>,
inner_event_rx: UnboundedReceiver<InnerEvent>,
@ -151,6 +155,7 @@ impl RemoteClient {
session: None,
channel_pipes: Arc::new(Mutex::new(HashMap::new())),
pending_ops: vec![],
pending_forwards: vec![],
state: RCState::NotInitialized,
inner_event_rx,
inner_event_tx: inner_event_tx.clone(),
@ -259,28 +264,9 @@ impl RemoteClient {
loop {
tokio::select! {
Some(event) = self.inner_event_rx.recv() => {
match event {
InnerEvent::RCCommand(cmd, reply) => {
let result = self.handle_command(cmd).await;
let brk = matches!(result, Ok(true));
if let Some(reply) = reply {
let _ = reply.send(result.map(|_| ()));
}
if brk {
break
}
}
InnerEvent::ClientHandlerEvent(client_event) => {
debug!("Client handler event: {:?}", client_event);
match client_event {
ClientHandlerEvent::Disconnect => {
self._on_disconnect().await?;
}
event => {
error!(?event, "Unhandled client handler event");
},
}
}
debug!(event=?event, "event");
if self.handle_event(event).await? {
break
}
}
Some(_) = self.abort_rx.recv() => {
@ -299,40 +285,98 @@ impl RemoteClient {
let _ = self.tx.send(RCEvent::Error(error));
err
})?;
debug!("No more commmands");
info!("Client session closed");
Ok::<(), anyhow::Error>(())
}
.instrument(Span::current()),
);
}
async fn handle_command(&mut self, cmd: RCCommand) -> Result<bool, SshClientError> {
match cmd {
RCCommand::Connect(options) => {
match self.connect(options).await {
Ok(_) => {
self.set_state(RCState::Connected)
.map_err(SshClientError::other)?;
let ops = self.pending_ops.drain(..).collect::<Vec<_>>();
for (id, op) in ops {
self.apply_channel_op(id, op).await?;
}
// let forwards = self.pending_forwards.drain(..).collect::<Vec<_>>();
// for (address, port) in forwards {
// self.tcpip_forward(address, port).await?;
// }
async fn handle_event(&mut self, event: InnerEvent) -> Result<bool> {
match event {
InnerEvent::RCCommand(cmd, reply) => {
let result = self.handle_command(cmd).await;
let brk = matches!(result, Ok(true));
if let Some(reply) = reply {
let _ = reply.send(result.map(|_| ()));
}
return Ok(brk);
}
InnerEvent::ClientHandlerEvent(client_event) => {
debug!("Client handler event: {:?}", client_event);
match client_event {
ClientHandlerEvent::Disconnect => {
self._on_disconnect().await?;
}
Err(e) => {
debug!("Connect error: {}", e);
let _ = self.tx.send(RCEvent::ConnectionError(e));
self.set_disconnected();
return Ok(true);
ClientHandlerEvent::ForwardedTcpIp(channel, params) => {
info!("New forwarded connection: {params:?}");
let id = self.setup_server_initiated_channel(channel).await;
let _ = self.tx.send(RCEvent::ForwardedTcpIp(id, params));
}
ClientHandlerEvent::X11(channel, originator_address, originator_port) => {
info!("New X11 connection from {originator_address}:{originator_port:?}");
let id = self.setup_server_initiated_channel(channel).await;
let _ = self
.tx
.send(RCEvent::X11(id, originator_address, originator_port));
}
event => {
error!(?event, "Unhandled client handler event");
}
}
}
}
Ok(false)
}
async fn setup_server_initiated_channel(&mut self, channel: russh::Channel<russh::client::Msg>) -> Uuid {
let id = Uuid::new_v4();
let (tx, rx) = unbounded_channel();
self.channel_pipes.lock().await.insert(id, tx);
let session_channel = SessionChannel::new(channel, id, rx, self.tx.clone(), self.id);
self.child_tasks.push(
tokio::task::Builder::new()
.name(&format!("SSH {} {:?} ops", self.id, id))
.spawn(session_channel.run()),
);
id
}
async fn handle_command(&mut self, cmd: RCCommand) -> Result<bool, SshClientError> {
match cmd {
RCCommand::Connect(options) => match self.connect(options).await {
Ok(_) => {
self.set_state(RCState::Connected)
.map_err(SshClientError::other)?;
let ops = self.pending_ops.drain(..).collect::<Vec<_>>();
for (id, op) in ops {
self.apply_channel_op(id, op).await?;
}
let forwards = self.pending_forwards.drain(..).collect::<Vec<_>>();
for (address, port) in forwards {
self.tcpip_forward(address, port).await?;
}
}
Err(e) => {
debug!("Connect error: {}", e);
let _ = self.tx.send(RCEvent::ConnectionError(e));
self.set_disconnected();
return Ok(true);
}
},
RCCommand::Channel(ch, op) => {
self.apply_channel_op(ch, op).await?;
}
RCCommand::ForwardTCPIP(address, port) => {
self.tcpip_forward(address, port).await?;
}
RCCommand::CancelTCPIPForward(address, port) => {
self.cancel_tcpip_forward(address, port).await?;
}
RCCommand::Disconnect => {
self.disconnect().await;
return Ok(true);
@ -517,6 +561,32 @@ impl RemoteClient {
Ok(())
}
async fn tcpip_forward(&mut self, address: String, port: u32) -> Result<bool, SshClientError> {
if let Some(session) = &self.session {
let mut session = session.lock().await;
Ok(session.tcpip_forward(address, port).await?)
} else {
self.pending_forwards.push((address, port));
Ok(true)
}
}
async fn cancel_tcpip_forward(
&mut self,
address: String,
port: u32,
) -> Result<bool, SshClientError> {
if let Some(session) = &self.session {
let mut session = session.lock().await;
Ok(session.cancel_tcpip_forward(address, port).await?)
} else {
self.pending_forwards
.retain(|x| x.0 != address || x.1 != port);
Ok(true)
}
}
async fn disconnect(&mut self) {
if let Some(session) = &mut self.session {
let _ = session

View file

@ -30,6 +30,14 @@ pub struct DirectTCPIPParams {
pub originator_port: u32,
}
#[derive(Clone, Debug)]
pub struct ForwardedTcpIpParams {
pub connected_address: String,
pub connected_port: u32,
pub originator_address: String,
pub originator_port: u32,
}
#[derive(Clone, Debug)]
pub struct X11Request {
pub single_conection: bool,

View file

@ -46,6 +46,8 @@ pub enum ServerHandlerEvent {
ChannelOpenDirectTcpIp(ServerChannelId, DirectTCPIPParams, oneshot::Sender<bool>),
EnvRequest(ServerChannelId, String, String, oneshot::Sender<()>),
X11Request(ServerChannelId, X11Request, oneshot::Sender<()>),
TcpIpForward(String, u32, oneshot::Sender<bool>),
CancelTcpIpForward(String, u32, oneshot::Sender<bool>),
Disconnect,
}
@ -449,19 +451,47 @@ impl russh::server::Handler for ServerHandler {
.boxed()
}
fn tcpip_forward(self, address: &str, port: u32, mut session: Session) -> Self::FutureBool {
let address = address.to_string();
async move {
let (tx, rx) = oneshot::channel();
self.send_event(ServerHandlerEvent::TcpIpForward(address, port, tx))?;
let allowed = rx.await.unwrap_or(false);
if allowed {
session.request_success()
} else {
session.request_failure()
}
Ok((self, session, allowed))
}
.boxed()
}
fn cancel_tcpip_forward(
self,
address: &str,
port: u32,
mut session: Session,
) -> Self::FutureBool {
let address = address.to_string();
async move {
let (tx, rx) = oneshot::channel();
self.send_event(ServerHandlerEvent::CancelTcpIpForward(address, port, tx))?;
let allowed = rx.await.unwrap_or(false);
if allowed {
session.request_success()
} else {
session.request_failure()
}
Ok((self, session, allowed))
}
.boxed()
}
// -----
// fn auth_none(self, user: &str) -> Self::FutureAuth {
// self.finished_auth(Auth::Reject)
// }
// fn tcpip_forward(self, address: &str, port: u32, session: Session) -> Self::FutureBool {
// self.finished_bool(false, session)
// }
// fn cancel_tcpip_forward(self, address: &str, port: u32, session: Session) -> Self::FutureBool {
// self.finished_bool(false, session)
// }
}
impl Drop for ServerHandler {

View file

@ -11,7 +11,7 @@ use ansi_term::Colour;
use anyhow::{Context, Result};
use bimap::BiMap;
use bytes::Bytes;
use futures::Future;
use futures::{Future, FutureExt};
use russh::{CryptoVec, MethodSet, Sig};
use russh_keys::key::PublicKey;
use russh_keys::PublicKeyBase64;
@ -20,7 +20,7 @@ use tokio::sync::{broadcast, oneshot, Mutex};
use tracing::*;
use uuid::Uuid;
use warpgate_common::auth::{AuthCredential, AuthResult, AuthSelector, AuthState, CredentialKind};
use warpgate_common::eventhub::{EventHub, EventSender};
use warpgate_common::eventhub::{EventHub, EventSender, EventSubscription};
use warpgate_common::{
Secret, SessionId, SshHostKeyVerificationMode, Target, TargetOptions, TargetSSHOptions,
WarpgateError,
@ -85,6 +85,7 @@ pub struct ServerSession {
traffic_connection_recorders: HashMap<Uuid, ConnectionRecorder>,
hub: EventHub<Event>,
event_sender: EventSender<Event>,
main_event_subscription: EventSubscription<Event>,
service_output: ServiceOutput,
channel_writer: ChannelWriter,
auth_state: Option<Arc<Mutex<AuthState>>>,
@ -117,7 +118,7 @@ impl ServerSession {
let mut rc_handles = RemoteClient::create(id, services.clone());
let (hub, event_sender) = EventHub::setup();
let mut event_sub = hub
let main_event_subscription = hub
.subscribe(|e| !matches!(e, Event::ConsoleInput(_)))
.await;
@ -141,6 +142,7 @@ impl ServerSession {
traffic_connection_recorders: HashMap::new(),
hub,
event_sender: event_sender.clone(),
main_event_subscription,
service_output: ServiceOutput::new(),
channel_writer: ChannelWriter::new(),
auth_state: None,
@ -204,18 +206,18 @@ impl ServerSession {
});
Ok(async move {
while let Some(event) = event_sub.recv().await {
match event {
Event::Client(RCEvent::Done) => break,
Event::ServerHandler(ServerHandlerEvent::Disconnect) => break,
event => this.handle_event(event).await?,
}
while let Some(event) = this.get_next_event().await {
this.handle_event(event).await?;
}
debug!("No more events");
Ok::<_, anyhow::Error>(())
})
}
async fn get_next_event(&mut self) -> Option<Event> {
self.main_event_subscription.recv().await
}
async fn get_auth_state(&mut self, username: &str) -> Result<Arc<Mutex<AuthState>>> {
#[allow(clippy::unwrap_used)]
if self.auth_state.is_none()
@ -316,36 +318,45 @@ impl ServerSession {
Ok(())
}
async fn handle_event(&mut self, event: Event) -> Result<()> {
match event {
Event::Client(e) => {
debug!(event=?e, "Event");
let span = self.make_logging_span();
if let Err(err) = self.handle_remote_event(e).instrument(span).await {
error!("Event handler error: {:?}", err);
// break;
fn handle_event<'a>(
&'a mut self,
event: Event,
) -> Pin<Box<dyn Future<Output = Result<(), WarpgateError>> + Send + 'a>> {
async move {
match event {
Event::Client(RCEvent::Done) => Err(WarpgateError::SessionEnd)?,
Event::ServerHandler(ServerHandlerEvent::Disconnect) => {
Err(WarpgateError::SessionEnd)?
}
}
Event::ServerHandler(e) => {
let span = self.make_logging_span();
if let Err(err) = self.handle_server_handler_event(e).instrument(span).await {
error!("Event handler error: {:?}", err);
// break;
Event::Client(e) => {
debug!(event=?e, "Event");
let span = self.make_logging_span();
if let Err(err) = self.handle_remote_event(e).instrument(span).await {
error!("Event handler error: {:?}", err);
// break;
}
}
}
Event::Command(command) => {
debug!(?command, "Session control");
if let Err(err) = self.handle_session_control(command).await {
error!("Event handler error: {:?}", err);
// break;
Event::ServerHandler(e) => {
let span = self.make_logging_span();
if let Err(err) = self.handle_server_handler_event(e).instrument(span).await {
error!("Event handler error: {:?}", err);
// break;
}
}
Event::Command(command) => {
debug!(?command, "Session control");
if let Err(err) = self.handle_session_control(command).await {
error!("Event handler error: {:?}", err);
// break;
}
}
Event::ServiceOutput(data) => {
let _ = self.emit_pty_output(&data).await;
}
Event::ConsoleInput(_) => (),
}
Event::ServiceOutput(data) => {
let _ = self.emit_pty_output(&data).await;
}
Event::ConsoleInput(_) => (),
}
Ok(())
Ok(())
}.boxed()
}
async fn handle_server_handler_event(&mut self, event: ServerHandlerEvent) -> Result<()> {
@ -485,6 +496,16 @@ impl ServerSession {
self._channel_x11_request(channel, request).await?;
}
ServerHandlerEvent::TcpIpForward(address, port, reply) => {
self._tcpip_forward(address, port).await?;
let _ = reply.send(true);
}
ServerHandlerEvent::CancelTcpIpForward(address, port, reply) => {
self._cancel_tcpip_forward(address, port).await?;
let _ = reply.send(true);
}
ServerHandlerEvent::Disconnect => (),
}
@ -703,6 +724,36 @@ impl ServerSession {
RCEvent::HostKeyUnknown(key, reply) => {
self.handle_unknown_host_key(key, reply).await?;
}
RCEvent::ForwardedTcpIp(id, params) => {
if let Some(session) = &mut self.session_handle {
let server_channel = session
.channel_open_forwarded_tcpip(
params.connected_address,
params.connected_port,
params.originator_address,
params.originator_port,
)
.await?;
self.channel_map
.insert(ServerChannelId(server_channel.id()), id);
self.all_channels.push(id);
}
}
RCEvent::X11(id, originator_address, originator_port) =>{
if let Some(session) = &mut self.session_handle {
let server_channel = session
.channel_open_x11(
originator_address,
originator_port,
)
.await?;
self.channel_map
.insert(ServerChannelId(server_channel.id()), id);
self.all_channels.push(id);
}
}
}
Ok(())
}
@ -800,6 +851,8 @@ impl ServerSession {
info!(%channel, "Opening direct TCP/IP channel from {}:{} to {}:{}", params.originator_address, params.originator_port, params.host_to_connect, params.port_to_connect);
let _ = self.maybe_connect_remote().await;
match self
.send_command_and_wait(RCCommand::Channel(
uuid,
@ -1001,13 +1054,15 @@ impl ServerSession {
pub async fn _channel_shell_request_begin(
&mut self,
server_channel_id: ServerChannelId,
) -> Result<PendingCommand> {
) -> Result<()> {
let channel_id = self.map_channel(&server_channel_id)?;
let _ = self.maybe_connect_remote().await;
Ok(self.send_command_and_wait(RCCommand::Channel(
self.send_command_and_wait(RCCommand::Channel(
channel_id,
ChannelOperation::RequestShell,
)))
))
.await
.map_err(anyhow::Error::from)
}
pub async fn _channel_shell_request_finish(
@ -1095,6 +1150,21 @@ impl ServerSession {
Ok(())
}
async fn _tcpip_forward(&mut self, address: String, port: u32) -> Result<()> {
info!(%address, %port, "Remote port forwarding requested");
let _ = self.maybe_connect_remote().await;
self.send_command_and_wait(RCCommand::ForwardTCPIP(address, port))
.await
.map_err(anyhow::Error::from)
}
pub async fn _cancel_tcpip_forward(&mut self, address: String, port: u32) -> Result<()> {
info!(%address, %port, "Remote port forwarding cancelled");
self.send_command_and_wait(RCCommand::CancelTCPIPForward(address, port))
.await
.map_err(anyhow::Error::from)
}
async fn _auth_publickey(
&mut self,
ssh_username: Secret<String>,
@ -1404,16 +1474,6 @@ impl ServerSession {
Ok(())
}
// pub async fn _tcpip_forward(&mut self, address: String, port: u32) {
// info!(%address, %port, "Remote port forwarding requested");
// self.send_command(RCCommand::ForwardTCPIP(address, port));
// }
// pub async fn _cancel_tcpip_forward(&mut self, address: String, port: u32) {
// info!(%address, %port, "Remote port forwarding cancelled");
// self.send_command(RCCommand::CancelTCPIPForward(address, port));
// }
pub async fn _channel_signal(
&mut self,
server_channel_id: ServerChannelId,
@ -1433,11 +1493,27 @@ impl ServerSession {
self.rc_tx.send((command, None)).map_err(|e| e.0 .0)
}
fn send_command_and_wait(&mut self, command: RCCommand) -> PendingCommand {
async fn send_command_and_wait(&mut self, command: RCCommand) -> Result<(), SshClientError> {
let (tx, rx) = oneshot::channel();
match self.rc_tx.send((command, Some(tx))) {
let mut cmd = match self.rc_tx.send((command, Some(tx))) {
Ok(_) => PendingCommand::Waiting(rx),
Err(_) => PendingCommand::Failed,
};
loop {
tokio::select! {
result = &mut cmd => {
return result
}
event = self.get_next_event() => {
match event {
Some(event) => {
self.handle_event(event).await.map_err(SshClientError::from)?
}
None => {Err(SshClientError::MpscError)?}
};
}
}
}
}
@ -1470,13 +1546,15 @@ impl ServerSession {
Ok(())
})
.await;
drop(self.session_handle.take());
self.session_handle = None;
}
}
impl Drop for ServerSession {
fn drop(&mut self) {
info!("Closed connection");
let _ = self.rc_abort_tx.send(());
info!("Closed session");
debug!("Dropped");
}
}

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::{Context, Result};
use config::{Config, Environment, File};
use notify::{RecursiveMode, Watcher, recommended_watcher};
use notify::{recommended_watcher, RecursiveMode, Watcher};
use tokio::sync::{broadcast, mpsc, Mutex};
use tracing::*;
use warpgate_common::helpers::fs::secure_file;