Live session streaming - fixes #4

This commit is contained in:
Eugene Pankov 2022-04-23 20:08:47 +02:00
parent 168572a4f8
commit dc2dff6553
No known key found for this signature in database
GPG key ID: 5896FCBBDD1CF4F4
10 changed files with 295 additions and 84 deletions

39
Cargo.lock generated
View file

@ -2237,6 +2237,7 @@ checksum = "2e69bd4623d873b286d4416e59c68db09c154c7776045c8308ad9fa27d5bd2dc"
dependencies = [
"anyhow",
"async-trait",
"base64",
"bytes",
"chrono",
"cookie",
@ -2266,6 +2267,7 @@ dependencies = [
"tokio",
"tokio-rustls",
"tokio-stream",
"tokio-tungstenite",
"tokio-util 0.7.0",
"tracing",
]
@ -3489,6 +3491,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.6.9"
@ -3693,6 +3707,25 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tungstenite"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5"
dependencies = [
"base64",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"rand",
"sha-1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.15.0"
@ -3781,6 +3814,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf8-ranges"
version = "1.0.4"

View file

@ -12,7 +12,7 @@ chrono = "0.4"
futures = "0.3"
hex = "0.4"
mime_guess = {version = "2.0", default_features = false}
poem = {version = "^1.3.24", features = ["cookie", "session", "anyhow", "rustls"]}
poem = {version = "^1.3.24", features = ["cookie", "session", "anyhow", "rustls", "websocket"]}
poem-openapi = {version = "^1.3.24", features = ["swagger-ui", "chrono", "uuid", "static-files"]}
russh-keys = {version = "0.22.0-beta.1", features = ["openssl"]}
rust-embed = "6.3"

View file

@ -2,18 +2,16 @@
import { api, Recording, RecordingKind } from 'lib/api'
import { Alert, Spinner } from 'sveltestrap'
import TerminalRecordingPlayer from 'player/TerminalRecordingPlayer.svelte'
import { onDestroy } from 'svelte';
export let params = { id: '' }
let error: Error|null = null
let recording: Recording|null = null
let terminalRecordingURL: string|null = null
async function load () {
recording = await api.getRecording(params)
if (recording.kind === 'Terminal') {
terminalRecordingURL = `/api/recordings/${params.id}/cast`
}
}
function getTCPDumpURL () {
@ -42,6 +40,6 @@ load().catch(e => {
{#if recording?.kind === 'Traffic'}
<a href={getTCPDumpURL()}>Download tcpdump file</a>
{/if}
{#if recording?.kind === 'Terminal' && terminalRecordingURL}
<TerminalRecordingPlayer url={terminalRecordingURL} />
{#if recording?.kind === 'Terminal'}
<TerminalRecordingPlayer recording={recording} />
{/if}

View file

@ -6,9 +6,11 @@
import { faPlay, faPause, faExpand } from '@fortawesome/free-solid-svg-icons'
import { Spinner } from 'sveltestrap'
import formatDuration from 'format-duration'
import type { Recording } from 'lib/api'
export let url: string
export let recording: Recording
let url: string
let containerElement: HTMLDivElement
let rootElement: HTMLDivElement
let timestamp = 0
@ -18,6 +20,11 @@
let events: (SizeEvent | DataEvent | SnapshotEvent)[] = []
let playing = false
let loading = true
let sessionIsLive: boolean|null = null
let socket: WebSocket|null = null
let isStreaming = false
$: isStreaming = timestamp === duration && playing
const COLOR_NAMES = [
'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white',
@ -51,6 +58,23 @@
theme[COLOR_NAMES[i]] = colors[i]
}
interface AsciiCastHeader {
time: number
version: number
width: number
height: number
}
type AsciiCastData = [number, 'o', string]
type AsciiCastItem = AsciiCastHeader | AsciiCastData
function isAsciiCastHeader(data: AsciiCastItem): data is AsciiCastHeader {
return 'version' in data
}
function isAsciiCastData(data: AsciiCastItem): data is AsciiCastData {
return data[1] === 'o'
}
interface SizeEvent { time: number, cols: number, rows: number }
interface DataEvent { time: number, data: string }
interface SnapshotEvent { time: number, snapshot: string }
@ -58,7 +82,15 @@
const term = new Terminal()
const serializeAddon = new SerializeAddon()
onDestroy(() => socket?.close())
onMount(async () => {
if (recording.kind !== 'Terminal') {
throw new Error('Invalid recording type')
}
url = `/api/recordings/${recording.id}/cast`
term.loadAddon(serializeAddon)
term.open(containerElement)
@ -75,25 +107,55 @@
}
await seek(duration)
await seek(0)
socket = new WebSocket(`wss://${location.host}/api/recordings/${recording.id}/stream`)
socket.addEventListener('message', function (event) {
let message = JSON.parse(event.data)
if ('data' in message) {
let item: AsciiCastItem = message.data
addData(item)
} if ('start' in message) {
sessionIsLive = message.live
if (!sessionIsLive) {
seek(0)
} else {
playing = true
}
} if ('end' in message) {
sessionIsLive = false
} else {
console.log('Message from server ', message)
}
})
socket.addEventListener('close', () => console.info('Live stream closed'))
loading = false
})
function addData (data) {
if (data.version) {
duration = Math.max(duration, data.time)
function addData (data: AsciiCastItem) {
if (isAsciiCastHeader(data)) {
events.push({
time: data.time,
cols: data.width,
rows: data.height,
})
if (isStreaming) {
resize(data.width, data.height)
timestamp = data.time
}
duration = Math.max(duration, data.time)
}
if (data instanceof Array) {
duration = Math.max(duration, data[0])
events.push({
if (isAsciiCastData(data)) {
let dataEvent = {
time: data[0],
data: data[2],
})
}
events.push(dataEvent)
if (isStreaming) {
term.write(dataEvent.data)
timestamp = dataEvent.time
}
duration = Math.max(duration, dataEvent.time)
}
}
@ -129,7 +191,11 @@
let index = nearestSnapshot ? events.indexOf(nearestSnapshot) : 0
if (time >= timestamp) {
index = Math.max(index, events.findIndex(e => e.time > timestamp))
const nextEventIndex = events.findIndex(e => e.time > timestamp)
if (nextEventIndex === -1) {
return
}
index = Math.max(index, nextEventIndex)
}
let lastSize = { cols: term.cols, rows: term.rows }
@ -205,7 +271,7 @@
return
}
if (playing) {
await seek(timestamp + 0.1)
await seek(Math.min(duration, timestamp + 0.1))
}
setTimeout(step, 100)
}
@ -237,17 +303,26 @@
{/if}
<div
class="container"
class:invisible={loading}
on:click={togglePlaying}
bind:this={containerElement}
class="container"
class:invisible={loading}
on:click={togglePlaying}
bind:this={containerElement}
></div>
<div class="toolbar" class:invisible={loading}>
<button class="btn btn-link" on:click={togglePlaying}>
<Fa icon={playing ? faPause : faPlay} fw />
</button>
<pre class="timestamp">{ formatDuration(timestamp * 1000, { leading: true }) }</pre>
<pre
class="timestamp"
>{ formatDuration(timestamp * 1000, { leading: true }) }</pre>
{#if sessionIsLive === true}
<button
class="btn live-btn"
class:active={isStreaming}
on:click={() => seek(duration)}
>LIVE</button>
{/if}
<input
class="w-100"
type="range"
@ -346,8 +421,22 @@
flex: none;
overflow: visible;
color: #eeeeee;
font-size: 0.75rem;
margin: 0;
font-size: 0.75rem;
align-self: center;
}
.live-btn {
font-size: 0.75rem;
align-self: center;
color: red;
flex: none;
&.active {
background: red;
color: white;
padding: 0.1rem 0.25rem;
margin: 0 0.5rem;
}
}
</style>

View file

@ -1,20 +1,23 @@
use crate::helpers::{authorized, ApiResult};
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use poem::error::{InternalServerError, NotFoundError};
use poem::handler;
use poem::session::Session;
use poem::web::websocket::{Message, WebSocket};
use poem::web::Data;
use poem::{handler, IntoResponse};
use poem_openapi::param::Path;
use poem_openapi::payload::Json;
use poem_openapi::{ApiResponse, OpenApi};
use sea_orm::{DatabaseConnection, EntityTrait};
use serde::Serialize;
use serde_json::json;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::Mutex;
use tracing::*;
use uuid::Uuid;
use warpgate_common::recordings::{SessionRecordings, TerminalRecordingItem};
use warpgate_common::recordings::{AsciiCast, SessionRecordings, TerminalRecordingItem};
use warpgate_db_entities::Recording::{self, RecordingKind};
pub struct Api;
@ -96,36 +99,16 @@ pub async fn api_get_recording_cast(
while let Some(line) = lines.next_line().await.map_err(InternalServerError)? {
let entry: TerminalRecordingItem =
serde_json::from_str(&line[..]).map_err(InternalServerError)?;
match entry {
TerminalRecordingItem::Data { time, data } => {
response.push(
serde_json::to_string(&Cast::Output(
time,
"o".to_string(),
String::from_utf8_lossy(&data[..]).to_string(),
))
.map_err(InternalServerError)?,
);
}
TerminalRecordingItem::PtyResize { time, cols, rows } => {
last_size = (cols, rows);
response.push(
serde_json::to_string(&Cast::Header {
time,
version: 2,
width: cols,
height: rows,
title: recording.name.clone(),
})
.map_err(InternalServerError)?,
);
}
let asciicast: AsciiCast = entry.into();
response.push(serde_json::to_string(&asciicast).map_err(InternalServerError)?);
if let AsciiCast::Header { width, height, .. } = asciicast {
last_size = (width, height);
}
}
response.insert(
0,
serde_json::to_string(&Cast::Header {
serde_json::to_string(&AsciiCast::Header {
time: 0.0,
version: 2,
width: last_size.0,
@ -177,27 +160,49 @@ pub async fn api_get_recording_tcpdump(
.await
}
#[derive(Serialize)]
#[serde(untagged)]
enum Cast {
Header {
time: f32,
version: u32,
width: u32,
height: u32,
title: String,
},
Output(f32, String, String),
#[handler]
pub async fn api_get_recording_stream(
ws: WebSocket,
recordings: Data<&Arc<Mutex<SessionRecordings>>>,
id: poem::web::Path<Uuid>,
) -> impl IntoResponse {
let recordings = recordings.lock().await;
let receiver = recordings.subscribe_live(&id).await;
ws.on_upgrade(|socket| async move {
let (mut sink, _) = socket.split();
sink.send(Message::Text(serde_json::to_string(&json!({
"start": true,
"live": receiver.is_some(),
}))?))
.await?;
if let Some(mut receiver) = receiver {
tokio::spawn(async move {
if let Err(error) = async {
loop {
let Ok(data) = receiver.recv().await else {
break;
};
let content: TerminalRecordingItem = serde_json::from_slice(&data)?;
let cast: AsciiCast = content.into();
let msg = serde_json::to_string(&json!({ "data": cast }))?;
sink.send(Message::Text(msg)).await?;
}
sink.send(Message::Text(serde_json::to_string(&json!({
"end": true,
}))?))
.await?;
Ok::<(), anyhow::Error>(())
}
.await
{
error!(%error, "Livestream error:");
}
});
}
Ok::<(), anyhow::Error>(())
})
}
// #[handler]
// pub async fn api_get_recording_stream(
// ws: WebSocket,
// db: Data<&Arc<Mutex<DatabaseConnection>>>,
// state: Data<&Arc<Mutex<State>>>,
// id: poem::web::Path<Uuid>,
// ) -> impl IntoResponse {
// ws.on_upgrade(|socket| async move {
// })
// }

View file

@ -67,6 +67,10 @@ impl AdminServer {
"/api/recordings/:id/cast",
crate::api::recordings_detail::api_get_recording_cast,
)
.at(
"/api/recordings/:id/stream",
crate::api::recordings_detail::api_get_recording_stream,
)
.at(
"/api/recordings/:id/tcpdump",
crate::api::recordings_detail::api_get_recording_tcpdump,

View file

@ -1,7 +1,9 @@
use bytes::Bytes;
use sea_orm::{ActiveModelTrait, DatabaseConnection};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{broadcast, Mutex};
use tracing::*;
use uuid::Uuid;
use warpgate_db_entities::Recording::{self, RecordingKind};
@ -40,6 +42,7 @@ pub struct SessionRecordings {
db: Arc<Mutex<DatabaseConnection>>,
path: PathBuf,
config: RecordingsConfig,
live: Arc<Mutex<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
}
impl SessionRecordings {
@ -54,10 +57,11 @@ impl SessionRecordings {
db,
config: config.store.recordings.clone(),
path,
live: Arc::new(Mutex::new(HashMap::new())),
})
}
pub async fn start<T>(&self, id: &SessionId, name: String) -> Result<T>
pub async fn start<T>(&mut self, id: &SessionId, name: String) -> Result<T>
where
T: Recorder,
{
@ -84,10 +88,15 @@ impl SessionRecordings {
values.insert(&*db).await.map_err(Error::Database)?
};
let writer = RecordingWriter::new(path, model, self.db.clone()).await?;
let writer = RecordingWriter::new(path, model, self.db.clone(), self.live.clone()).await?;
Ok(T::new(writer))
}
pub async fn subscribe_live(&self, id: &Uuid) -> Option<broadcast::Receiver<Bytes>> {
let live = self.live.lock().await;
live.get(id).map(|sender| sender.subscribe())
}
pub fn path_for(&self, session_id: &SessionId, name: &dyn AsRef<std::path::Path>) -> PathBuf {
self.path.join(session_id.to_string()).join(&name)
}

View file

@ -7,6 +7,19 @@ use warpgate_db_entities::Recording::RecordingKind;
use super::writer::RecordingWriter;
use super::Recorder;
#[derive(Serialize)]
#[serde(untagged)]
pub enum AsciiCast {
Header {
time: f32,
version: u32,
width: u32,
height: u32,
title: String,
},
Output(f32, String, String),
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum TerminalRecordingItem {
@ -22,6 +35,25 @@ pub enum TerminalRecordingItem {
},
}
impl From<TerminalRecordingItem> for AsciiCast {
fn from(item: TerminalRecordingItem) -> Self {
match item {
TerminalRecordingItem::Data { time, data } => AsciiCast::Output(
time,
"o".to_string(),
String::from_utf8_lossy(&data[..]).to_string(),
),
TerminalRecordingItem::PtyResize { time, cols, rows } => AsciiCast::Header {
time,
version: 2,
width: cols,
height: rows,
title: "".to_string(),
},
}
}
}
pub struct TerminalRecorder {
writer: RecordingWriter,
started_at: Instant,
@ -33,9 +65,9 @@ impl TerminalRecorder {
}
async fn write_item(&mut self, item: &TerminalRecordingItem) -> Result<()> {
let serialized_item = serde_json::to_vec(&item)?;
let mut serialized_item = serde_json::to_vec(&item)?;
serialized_item.push(b'\n');
self.writer.write(&serialized_item).await?;
self.writer.write(b"\n").await?;
Ok(())
}

View file

@ -3,18 +3,22 @@ use crate::helpers::fs::secure_file;
use super::{Error, Result};
use bytes::{Bytes, BytesMut};
use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{broadcast, mpsc, Mutex};
use tracing::*;
use uuid::Uuid;
use warpgate_db_entities::Recording;
#[derive(Clone)]
pub struct RecordingWriter {
sender: mpsc::Sender<Bytes>,
live_sender: broadcast::Sender<Bytes>,
drop_signal: mpsc::Sender<()>,
}
impl RecordingWriter {
@ -22,11 +26,30 @@ impl RecordingWriter {
path: PathBuf,
model: Recording::Model,
db: Arc<Mutex<DatabaseConnection>>,
live: Arc<Mutex<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
) -> Result<Self> {
let file = File::create(&path).await?;
secure_file(&path)?;
let mut writer = BufWriter::new(file);
let (sender, mut receiver) = mpsc::channel::<Bytes>(1024);
let (drop_signal, mut drop_receiver) = mpsc::channel(1);
let live_sender = broadcast::channel(128).0;
{
let mut live = live.lock().await;
live.insert(model.id, live_sender.clone());
}
tokio::spawn({
let live = live.clone();
let id = model.id;
async move {
let _ = drop_receiver.recv().await;
let mut live = live.lock().await;
live.remove(&id);
}
});
tokio::spawn(async move {
if let Err(error) = async {
let mut last_flush = Instant::now();
@ -73,14 +96,26 @@ impl RecordingWriter {
}
});
Ok(RecordingWriter { sender })
Ok(RecordingWriter {
sender,
live_sender,
drop_signal,
})
}
pub async fn write(&mut self, data: &[u8]) -> Result<()> {
let data = BytesMut::from(data).freeze();
self.sender
.send(BytesMut::from(data).freeze())
.send(data.clone())
.await
.map_err(|_| Error::Closed)?;
let _ = self.live_sender.send(data);
Ok(())
}
}
impl Drop for RecordingWriter {
fn drop(&mut self) {
let _ = self.drop_signal.send(());
}
}

View file

@ -6,8 +6,8 @@ use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::{Path, PathBuf};
use tracing::*;
use warpgate_common::helpers::hash::hash_password;
use warpgate_common::helpers::fs::{secure_directory, secure_file};
use warpgate_common::helpers::hash::hash_password;
use warpgate_common::{
Role, SSHConfig, Secret, Services, Target, TargetWebAdminOptions, User, UserAuthCredential,
WarpgateConfigStore, WebAdminConfig,