mirror of
https://github.com/warp-tech/warpgate.git
synced 2025-09-07 23:25:13 +08:00
fix SCP hangups
This commit is contained in:
parent
f90c529b07
commit
4d5ebe42eb
2 changed files with 40 additions and 18 deletions
|
@ -2,23 +2,44 @@ use russh::server::Handle;
|
|||
use russh::{ChannelId, CryptoVec};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ChannelWriteOperation {
|
||||
Data(Handle, ChannelId, CryptoVec),
|
||||
ExtendedData(Handle, ChannelId, u32, CryptoVec),
|
||||
}
|
||||
|
||||
/// Sequences data writes and runs them in background to avoid lockups
|
||||
pub struct ChannelWriter {
|
||||
tx: mpsc::UnboundedSender<(Handle, ChannelId, CryptoVec)>,
|
||||
tx: mpsc::UnboundedSender<ChannelWriteOperation>,
|
||||
}
|
||||
|
||||
impl ChannelWriter {
|
||||
pub fn new() -> Self {
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<(Handle, ChannelId, CryptoVec)>();
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<ChannelWriteOperation>();
|
||||
tokio::spawn(async move {
|
||||
while let Some((handle, channel, data)) = rx.recv().await {
|
||||
let _ = handle.data(channel, data).await;
|
||||
while let Some(operation) = rx.recv().await {
|
||||
match operation {
|
||||
ChannelWriteOperation::Data(handle, channel, data) => {
|
||||
let _ = handle.data(channel, data).await;
|
||||
}
|
||||
ChannelWriteOperation::ExtendedData(handle, channel, ext, data) => {
|
||||
let _ = handle.extended_data(channel, ext, data).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
ChannelWriter { tx }
|
||||
}
|
||||
|
||||
pub fn write(&self, handle: Handle, channel: ChannelId, data: CryptoVec) {
|
||||
let _ = self.tx.send((handle, channel, data));
|
||||
let _ = self
|
||||
.tx
|
||||
.send(ChannelWriteOperation::Data(handle, channel, data));
|
||||
}
|
||||
|
||||
pub fn write_extended(&self, handle: Handle, channel: ChannelId, ext: u32, data: CryptoVec) {
|
||||
let _ = self.tx.send(ChannelWriteOperation::ExtendedData(
|
||||
handle, channel, ext, data,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -695,10 +695,12 @@ impl ServerSession {
|
|||
}
|
||||
|
||||
let server_channel_id = self.map_channel_reverse(&channel)?;
|
||||
if let Some(session) = self.session_handle.as_mut() {
|
||||
let _ = session
|
||||
.data(server_channel_id.0, CryptoVec::from_slice(&data))
|
||||
.await;
|
||||
if let Some(session) = self.session_handle.clone() {
|
||||
self.channel_writer.write(
|
||||
session,
|
||||
server_channel_id.0,
|
||||
CryptoVec::from_slice(&data),
|
||||
);
|
||||
}
|
||||
}
|
||||
RCEvent::Success(channel) => {
|
||||
|
@ -787,15 +789,14 @@ impl ServerSession {
|
|||
}
|
||||
}
|
||||
let server_channel_id = self.map_channel_reverse(&channel)?;
|
||||
self.maybe_with_session(|handle| async move {
|
||||
handle
|
||||
.extended_data(server_channel_id.0, ext, CryptoVec::from_slice(&data))
|
||||
.await
|
||||
.map_err(|_| ())
|
||||
.context("failed to send extended data")?;
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
if let Some(session) = self.session_handle.clone() {
|
||||
self.channel_writer.write_extended(
|
||||
session,
|
||||
server_channel_id.0,
|
||||
ext,
|
||||
CryptoVec::from_slice(&data),
|
||||
);
|
||||
}
|
||||
}
|
||||
RCEvent::HostKeyReceived(key) => {
|
||||
self.emit_service_message(&format!(
|
||||
|
|
Loading…
Add table
Reference in a new issue