Record SSH exec channels - fixes #6

This commit is contained in:
Eugene Pankov 2022-05-30 14:08:32 +02:00
parent ae06c02123
commit 172e5b63b7
No known key found for this signature in database
GPG key ID: 5896FCBBDD1CF4F4
5 changed files with 98 additions and 41 deletions

View file

@ -149,6 +149,7 @@ overrides:
'@typescript-eslint/no-unnecessary-condition': off '@typescript-eslint/no-unnecessary-condition': off
# False positives for FontAwesome # False positives for FontAwesome
import/no-named-as-default: off import/no-named-as-default: off
import/no-named-as-default-member: off
ignorePatterns: ignorePatterns:
- svelte.config.js - svelte.config.js

View file

@ -23,6 +23,7 @@
let sessionIsLive: boolean|null = null let sessionIsLive: boolean|null = null
let socket: WebSocket|null = null let socket: WebSocket|null = null
let isStreaming = false let isStreaming = false
let ptyMode = false
$: isStreaming = timestamp === duration && playing $: isStreaming = timestamp === duration && playing
@ -73,7 +74,7 @@
} }
function isAsciiCastData (data: AsciiCastItem): data is AsciiCastData { function isAsciiCastData (data: AsciiCastItem): data is AsciiCastData {
return data[1] === 'o' return data[1] === 'o' || data[1] === 'e'
} }
interface SizeEvent { time: number, cols: number, rows: number } interface SizeEvent { time: number, cols: number, rows: number }
@ -96,7 +97,7 @@
term.open(containerElement) term.open(containerElement)
term.options.theme = theme term.options.theme = theme
term.options.scrollback = 0 term.options.scrollback = 100
fitSize() fitSize()
resizeObserver = new ResizeObserver(fitSize) resizeObserver = new ResizeObserver(fitSize)
@ -133,8 +134,18 @@
loading = false loading = false
}) })
async function writeToTerminal (data: string) {
if (!ptyMode) {
data = data.replace(/\n/g, '\r\n')
}
await new Promise<void>(r => term.write(data, r))
}
function addData (data: AsciiCastItem) { function addData (data: AsciiCastItem) {
if (isAsciiCastHeader(data)) { if (isAsciiCastHeader(data)) {
if (data.width) {
ptyMode = true
}
events.push({ events.push({
time: data.time, time: data.time,
cols: data.width, cols: data.width,
@ -153,7 +164,7 @@
} }
events.push(dataEvent) events.push(dataEvent)
if (isStreaming) { if (isStreaming) {
term.write(dataEvent.data) writeToTerminal(dataEvent.data)
timestamp = dataEvent.time timestamp = dataEvent.time
} }
duration = Math.max(duration, dataEvent.time) duration = Math.max(duration, dataEvent.time)
@ -212,9 +223,7 @@
let output = '' let output = ''
async function flush () { async function flush () {
await new Promise<void>(r => { await writeToTerminal(output)
term.write(output, r)
})
output = '' output = ''
} }
@ -258,7 +267,9 @@
if (term.cols === cols && term.rows === rows) { if (term.cols === cols && term.rows === rows) {
return return
} }
term.resize(cols, rows) if (cols && rows) {
term.resize(cols, rows)
}
fitSize() fitSize()
} }

View file

@ -92,7 +92,7 @@ pub async fn api_get_recording_cast(
let mut response = vec![]; //String::new(); let mut response = vec![]; //String::new();
let mut last_size = (80, 25); let mut last_size = (0, 0);
let file = File::open(&path).await.map_err(InternalServerError)?; let file = File::open(&path).await.map_err(InternalServerError)?;
let reader = BufReader::new(file); let reader = BufReader::new(file);
let mut lines = reader.lines(); let mut lines = reader.lines();

View file

@ -20,11 +20,26 @@ pub enum AsciiCast {
Output(f32, String, String), Output(f32, String, String),
} }
#[derive(Serialize, Deserialize, Debug)]
pub enum TerminalRecordingStreamId {
Input,
Output,
Error,
}
impl Default for TerminalRecordingStreamId {
fn default() -> Self {
TerminalRecordingStreamId::Output
}
}
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)] #[serde(untagged)]
pub enum TerminalRecordingItem { pub enum TerminalRecordingItem {
Data { Data {
time: f32, time: f32,
#[serde(default)]
stream: TerminalRecordingStreamId,
#[serde(with = "crate::helpers::serde_base64")] #[serde(with = "crate::helpers::serde_base64")]
data: Bytes, data: Bytes,
}, },
@ -38,9 +53,13 @@ pub enum TerminalRecordingItem {
impl From<TerminalRecordingItem> for AsciiCast { impl From<TerminalRecordingItem> for AsciiCast {
fn from(item: TerminalRecordingItem) -> Self { fn from(item: TerminalRecordingItem) -> Self {
match item { match item {
TerminalRecordingItem::Data { time, data } => AsciiCast::Output( TerminalRecordingItem::Data { time, stream, data } => AsciiCast::Output(
time, time,
"o".to_string(), match stream {
TerminalRecordingStreamId::Input => "i".to_string(),
TerminalRecordingStreamId::Output => "o".to_string(),
TerminalRecordingStreamId::Error => "e".to_string(),
},
String::from_utf8_lossy(&data[..]).to_string(), String::from_utf8_lossy(&data[..]).to_string(),
), ),
TerminalRecordingItem::PtyResize { time, cols, rows } => AsciiCast::Header { TerminalRecordingItem::PtyResize { time, cols, rows } => AsciiCast::Header {
@ -71,9 +90,10 @@ impl TerminalRecorder {
Ok(()) Ok(())
} }
pub async fn write(&mut self, data: &[u8]) -> Result<()> { pub async fn write(&mut self, stream: TerminalRecordingStreamId, data: &[u8]) -> Result<()> {
self.write_item(&TerminalRecordingItem::Data { self.write_item(&TerminalRecordingItem::Data {
time: self.get_time(), time: self.get_time(),
stream,
data: BytesMut::from(data).freeze(), data: BytesMut::from(data).freeze(),
}) })
.await .await

View file

@ -27,7 +27,8 @@ use uuid::Uuid;
use warpgate_common::auth::AuthSelector; use warpgate_common::auth::AuthSelector;
use warpgate_common::eventhub::{EventHub, EventSender}; use warpgate_common::eventhub::{EventHub, EventSender};
use warpgate_common::recordings::{ use warpgate_common::recordings::{
ConnectionRecorder, TerminalRecorder, TrafficConnectionParams, TrafficRecorder, ConnectionRecorder, TerminalRecorder, TerminalRecordingStreamId, TrafficConnectionParams,
TrafficRecorder,
}; };
use warpgate_common::{ use warpgate_common::{
authorize_ticket, AuthCredential, AuthResult, Secret, Services, SessionId, Target, authorize_ticket, AuthCredential, AuthResult, Secret, Services, SessionId, Target,
@ -373,7 +374,10 @@ impl ServerSession {
} }
RCEvent::Output(channel, data) => { RCEvent::Output(channel, data) => {
if let Some(recorder) = self.channel_recorders.get_mut(&channel) { if let Some(recorder) = self.channel_recorders.get_mut(&channel) {
if let Err(error) = recorder.write(&data).await { if let Err(error) = recorder
.write(TerminalRecordingStreamId::Output, &data)
.await
{
error!(%channel, ?error, "Failed to record terminal data"); error!(%channel, ?error, "Failed to record terminal data");
self.channel_recorders.remove(&channel); self.channel_recorders.remove(&channel);
} }
@ -462,7 +466,10 @@ impl ServerSession {
RCEvent::Done => {} RCEvent::Done => {}
RCEvent::ExtendedData { channel, data, ext } => { RCEvent::ExtendedData { channel, data, ext } => {
if let Some(recorder) = self.channel_recorders.get_mut(&channel) { if let Some(recorder) = self.channel_recorders.get_mut(&channel) {
if let Err(error) = recorder.write(&data).await { if let Err(error) = recorder
.write(TerminalRecordingStreamId::Error, &data)
.await
{
error!(%channel, ?error, "Failed to record session data"); error!(%channel, ?error, "Failed to record session data");
self.channel_recorders.remove(&channel); self.channel_recorders.remove(&channel);
} }
@ -670,11 +677,44 @@ impl ServerSession {
channel_id, channel_id,
ChannelOperation::RequestExec(command.to_string()), ChannelOperation::RequestExec(command.to_string()),
)); ));
self.start_terminal_recording(
channel_id,
format!("exec-channel-{}", server_channel_id.0),
)
.await;
} }
} }
Ok(()) Ok(())
} }
async fn start_terminal_recording(&mut self, channel_id: Uuid, name: String) {
match async {
let mut recorder = self
.services
.recordings
.lock()
.await
.start::<TerminalRecorder>(&self.id, name)
.await?;
if let Some(request) = self.channel_pty_size_map.get(&channel_id) {
recorder
.write_pty_resize(request.col_width, request.row_height)
.await?;
}
Ok::<_, anyhow::Error>(recorder)
}
.await
{
Ok(recorder) => {
self.channel_recorders.insert(channel_id, recorder);
}
Err(error) => {
error!(channel=%channel_id, ?error, "Failed to start recording");
}
}
}
pub async fn _channel_x11_request( pub async fn _channel_x11_request(
&mut self, &mut self,
server_channel_id: ServerChannelId, server_channel_id: ServerChannelId,
@ -740,33 +780,8 @@ impl ServerSession {
ChannelOperation::RequestShell, ChannelOperation::RequestShell,
))?; ))?;
match async { self.start_terminal_recording(channel_id, format!("shell-channel-{}", server_channel_id.0))
let mut recorder = self .await;
.services
.recordings
.lock()
.await
.start::<TerminalRecorder>(
&self.id,
format!("shell-channel-{}", server_channel_id.0),
)
.await?;
if let Some(request) = self.channel_pty_size_map.get(&channel_id) {
recorder
.write_pty_resize(request.col_width, request.row_height)
.await?;
}
Ok::<_, anyhow::Error>(recorder)
}
.await
{
Ok(recorder) => {
self.channel_recorders.insert(channel_id, recorder);
}
Err(error) => {
error!(channel=%channel_id, ?error, "Failed to start recording");
}
}
info!(%channel_id, "Opening shell"); info!(%channel_id, "Opening shell");
let _ = self let _ = self
@ -802,6 +817,16 @@ impl ServerSession {
return Ok(()); return Ok(());
} }
if let Some(recorder) = self.channel_recorders.get_mut(&channel_id) {
if let Err(error) = recorder
.write(TerminalRecordingStreamId::Input, &data)
.await
{
error!(channel=%channel_id, ?error, "Failed to record terminal data");
self.channel_recorders.remove(&channel_id);
}
}
if let Some(recorder) = self.traffic_connection_recorders.get_mut(&channel_id) { if let Some(recorder) = self.traffic_connection_recorders.get_mut(&channel_id) {
if let Err(error) = recorder.write_tx(&data).await { if let Err(error) = recorder.write_tx(&data).await {
error!(channel=%channel_id, ?error, "Failed to record traffic data"); error!(channel=%channel_id, ?error, "Failed to record traffic data");