Internal log store, log viewer & forwarding (#105)

This commit is contained in:
Eugene 2022-05-30 00:32:45 -07:00 committed by GitHub
parent 677c08e747
commit a922a3fb49
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
42 changed files with 1189 additions and 287 deletions

4
Cargo.lock generated
View file

@ -4061,6 +4061,7 @@ dependencies = [
"data-encoding",
"humantime-serde",
"lazy_static",
"once_cell",
"packet",
"password-hash 0.4.1",
"poem-openapi",
@ -4074,6 +4075,8 @@ dependencies = [
"tokio",
"totp-rs",
"tracing",
"tracing-core",
"tracing-subscriber",
"url",
"uuid",
"warpgate-db-entities",
@ -4088,6 +4091,7 @@ dependencies = [
"poem-openapi",
"sea-orm",
"serde",
"serde_json",
"uuid",
]

View file

@ -15,6 +15,9 @@ clippy *ARGS:
yarn *ARGS:
cd warpgate-admin/app/ && yarn {{ARGS}}
migrate *ARGS:
cargo run -p warpgate-db-migrations -- {{ARGS}}
svelte-check:
cd warpgate-admin/app/ && yarn run check
@ -24,4 +27,4 @@ openapi-all:
openapi:
cd warpgate-admin/app/ && yarn openapi-client
cleanup: (fix "--allow-dirty") (clippy "--fix" "--allow-dirty") fmt
cleanup: (fix "--allow-dirty") (clippy "--fix" "--allow-dirty") fmt svelte-check

View file

@ -134,6 +134,8 @@ rules:
overrides:
- files: '*.svelte'
processor: svelte3/svelte3
rules:
'@typescript-eslint/init-declarations': off
ignorePatterns:
- svelte.config.js

View file

@ -2,7 +2,7 @@
"openapi": "3.0.0",
"info": {
"title": "Warpgate",
"version": "0.1.0"
"version": "0.1.1"
},
"servers": [
{
@ -394,6 +394,36 @@
},
"operationId": "get_ssh_own_keys"
}
},
"/logs": {
"post": {
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/GetLogsRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "array",
"items": {
"$ref": "#/components/schemas/LogEntry"
}
}
}
}
}
},
"operationId": "get_logs"
}
}
},
"components": {
@ -413,6 +443,33 @@
}
}
},
"GetLogsRequest": {
"type": "object",
"properties": {
"before": {
"type": "string",
"format": "date-time"
},
"after": {
"type": "string",
"format": "date-time"
},
"limit": {
"type": "integer",
"format": "uint64"
},
"session_id": {
"type": "string",
"format": "uuid"
},
"username": {
"type": "string"
},
"search": {
"type": "string"
}
}
},
"Info": {
"type": "object",
"required": [
@ -427,6 +484,35 @@
}
}
},
"LogEntry": {
"type": "object",
"required": [
"id",
"text",
"timestamp",
"session_id"
],
"properties": {
"id": {
"type": "string",
"format": "uuid"
},
"text": {
"type": "string"
},
"timestamp": {
"type": "string",
"format": "date-time"
},
"session_id": {
"type": "string",
"format": "uuid"
},
"username": {
"type": "string"
}
}
},
"LoginRequest": {
"type": "object",
"required": [

View file

@ -11,7 +11,7 @@
"check": "svelte-check --tsconfig ./tsconfig.json",
"lint": "eslint src && svelte-check",
"postinstall": "yarn run openapi-client",
"openapi-schema": "curl http://localhost:8888/api/openapi.json > openapi-schema.json",
"openapi-schema": "curl -k https://localhost:8888/api/openapi.json > openapi-schema.json",
"openapi-client": "openapi-generator-cli generate -g typescript-fetch -i openapi-schema.json -o api-client -p npmName=warpgate-api-client -p useSingleRequestParameter=true && cd api-client && npm i && npm run build",
"openapi": "yarn run openapi-schema && yarn run openapi-client"
},
@ -38,6 +38,7 @@
"svelte": "^3.48.0",
"svelte-check": "^2.7.1",
"svelte-fa": "^2.4.0",
"svelte-intersection-observer": "^0.10.0",
"svelte-preprocess": "^4.10.6",
"svelte-spa-router": "^3.2.0",
"sveltestrap": "^5.9.0",

View file

@ -54,6 +54,9 @@ const routes = {
'/ssh': wrap({
asyncComponent: () => import('./SSH.svelte')
}),
'/log': wrap({
asyncComponent: () => import('./Log.svelte')
}),
}
</script>
@ -67,6 +70,7 @@ const routes = {
<a use:link use:active href="/targets">Targets</a>
<a use:link use:active href="/tickets">Tickets</a>
<a use:link use:active href="/ssh">SSH</a>
<a use:link use:active href="/log">Log</a>
{/if}
{#if $authenticatedUsername}
<div class="username">

View file

@ -0,0 +1,9 @@
<script lang="ts">
import LogViewer from 'LogViewer.svelte'
</script>
<div class="page-summary-bar">
<h1>Log</h1>
</div>
<LogViewer filters={{}} />

View file

@ -0,0 +1,231 @@
<script lang="ts">
import { api, LogEntry } from 'lib/api'
import { Alert, FormGroup } from 'sveltestrap'
import { firstBy } from 'thenby';
import IntersectionObserver from 'svelte-intersection-observer'
import { link } from 'svelte-spa-router'
import { onDestroy, onMount } from 'svelte'
export let filters: {
sessionId?: string,
} | undefined
let error: Error|undefined
let items: LogEntry[]|undefined
let visibleItems: LogEntry[]|undefined
let loading = true
let endReached = false
let loadOlderButton: HTMLButtonElement|undefined
let reloadInterval: number|undefined
let lastUpdate = new Date()
let isLive = true
let searchQuery = ''
const PAGE_SIZE = 1000
function addItems (newItems: LogEntry[]) {
lastUpdate = new Date()
let existingIds = new Set(items?.map(i => i.id) ?? [])
newItems = newItems.filter(i => !existingIds.has(i.id))
newItems.sort(firstBy('timestamp', -1))
if (!newItems.length) {
return
}
items ??= []
if (items?.[0]?.timestamp > newItems[0].timestamp) {
items = items.concat(newItems)
} else {
items = [
...newItems,
...items,
]
}
}
async function loadNewer () {
loading = true
try {
const newItems = await api.getLogs({
getLogsRequest: {
...filters ?? {},
after: items?.at(0)?.timestamp,
limit: PAGE_SIZE,
search: searchQuery,
},
})
addItems(newItems)
visibleItems = items
} finally {
loading = false
}
}
async function loadOlder (search = false) {
loading = true
try {
const newItems = await api.getLogs({
getLogsRequest: {
...filters ?? {},
before: search ? undefined : items?.at(-1)?.timestamp,
limit: PAGE_SIZE,
search: searchQuery,
},
})
if (search) {
endReached = false
items = []
}
addItems(newItems)
visibleItems = items
if (!newItems.length) {
endReached = true
}
} finally {
loading = false
}
}
function search () {
loadOlder(true)
}
function stringifyDate (date: Date) {
return date.toLocaleString()
}
loadOlder().catch(e => {
error = e
})
onMount(() => {
reloadInterval = setInterval(() => {
isLive = Date.now() - lastUpdate.valueOf() < 3000
if (!loading) {
loadNewer()
}
}, 1000)
})
onDestroy(() => {
clearInterval(reloadInterval)
})
</script>
{#if error}
<Alert color="danger">{error}</Alert>
{/if}
<input
placeholder="Search..."
type="text"
class="form-control form-control-sm mb-2"
bind:value={searchQuery}
on:keyup={() => search()} />
{#if visibleItems}
<div class="table-wrapper">
<table class="w-100">
<tr>
<th>Time</th>
{#if !filters?.sessionId}
<th>User</th>
<th>Session</th>
{/if}
<th class="d-flex">
<div class="me-auto">Message</div>
{#if isLive}
<span class="badge bg-danger">Live</span>
{:else}
<small><em>Last update: {stringifyDate(lastUpdate)}</em></small>
{/if}
</th>
</tr>
{#each visibleItems as item}
<tr>
<td class="timestamp pe-4">
{stringifyDate(item.timestamp)}
</td>
{#if !filters?.sessionId}
<td class="username pe-4">
{#if item.username}
{item.username}
{/if}
</td>
<td class="session pe-4">
{#if item.sessionId}
<a href="/sessions/{item.sessionId}" use:link>
{item.sessionId}
</a>
{/if}
</td>
{/if}
<td class="text">
{item.text}
</td>
</tr>
{/each}
{#if !endReached}
{#if !loading}
<tr>
<td colspan="3">
<IntersectionObserver element={loadOlderButton} on:observe={event => {
if (!loading && event.detail.isIntersecting) {
loadOlder()
}
}}>
<button
bind:this={loadOlderButton}
class="btn btn-light"
on:click={() => loadOlder()}
disabled={loading}
>
Load older
</button>
</IntersectionObserver>
</td>
</tr>
{/if}
{:else}
<tr>
<td></td>
{#if !filters?.sessionId}
<td></td>
<td></td>
{/if}
<td class="text">End of the log</td>
</tr>
{/if}
</table>
</div>
{/if}
<style lang="scss">
@import "./vars";
.table-wrapper {
max-width: 100%;
overflow-x: auto;
}
tr {
td {
font-family: $font-family-monospace;
font-size: 0.75rem;
white-space: nowrap;
}
.timestamp {
opacity: .75;
}
:not(:last-child) {
padding-right: 15px;
}
}
.badge {
line-height: 1.3;
}
</style>

View file

@ -2,10 +2,11 @@
import { api, SessionSnapshot, Recording } from 'lib/api'
import { timeAgo } from 'lib/time'
import moment from 'moment'
import RelativeDate from 'RelativeDate.svelte';
import { onDestroy } from 'svelte';
import { link } from 'svelte-spa-router'
import { Alert, Button, FormGroup, Spinner } from 'sveltestrap'
import LogViewer from 'LogViewer.svelte'
import RelativeDate from 'RelativeDate.svelte'
export let params = { id: '' }
@ -51,9 +52,20 @@ onDestroy(() => clearInterval(interval))
<div class="page-summary-bar">
<div>
<h1>Session</h1>
<div class="text-muted">
<div>
{#if session.ended}
{moment.duration(moment(session.ended).diff(session.started)).humanize()} long, <RelativeDate date={session.started} />
<strong class="me-2">
{#if session.username}
{session.username}
{:else}
Logging in
{/if}
{getTargetDescription()}
</strong>
<span class="text-muted">
{moment.duration(moment(session.ended).diff(session.started)).humanize()} long, <RelativeDate date={session.started} />
</span>
{:else}
{moment.duration(moment().diff(session.started)).humanize()}
{/if}
@ -66,25 +78,8 @@ onDestroy(() => clearInterval(interval))
{/if}
</div>
<div class="row mb-4">
<div class="col-12 col-md-6">
<FormGroup floating label="User">
{#if session.username}
<input type="text" class="form-control" readonly value={session.username} />
{:else}
<input type="text" class="form-control" readonly value="Not logged in" />
{/if}
</FormGroup>
</div>
<div class="col-12 col-md-6">
<FormGroup floating label="Target">
<input type="text" class="form-control" readonly value={getTargetDescription()} />
</FormGroup>
</div>
</div>
{#if recordings?.length }
<h3>Recordings</h3>
<h3 class="mt-4">Recordings</h3>
<div class="list-group list-group-flush">
{#each recordings as recording}
<a
@ -103,6 +98,12 @@ onDestroy(() => clearInterval(interval))
{/each}
</div>
{/if}
<h3 class="mt-4">Log</h3>
<LogViewer filters={{
sessionId: session.id,
}} />
{/if}
<style lang="scss">

View file

@ -35,7 +35,7 @@
// @import "bootstrap/scss/accordion";
// @import "bootstrap/scss/breadcrumb";
// @import "bootstrap/scss/pagination";
// @import "bootstrap/scss/badge";
@import "bootstrap/scss/badge";
@import "bootstrap/scss/alert";
// @import "bootstrap/scss/progress";
@import "bootstrap/scss/list-group";

View file

@ -9,3 +9,5 @@ $alert-border-radius: 0;
$alert-border-scale: -30%;
@import "../node_modules/bootstrap/scss/variables";
$text-muted: $gray-500;

View file

@ -15,6 +15,7 @@
* Note that setting allowJs false does not prevent the use
* of JS in `.svelte` files.
*/
"types": [],
"allowJs": true,
"checkJs": true,
"paths": {
@ -29,6 +30,9 @@
"src/**/*.js",
"src/**/*.svelte"
],
"exclude": [
"node_modules/@types/node/**",
],
"references": [
{
"path": "./tsconfig.node.json"

View file

@ -2303,6 +2303,11 @@ svelte-hmr@^0.14.11:
resolved "https://registry.yarnpkg.com/svelte-hmr/-/svelte-hmr-0.14.11.tgz#63d532dc9c2c849ab708592f034765fa2502e568"
integrity sha512-R9CVfX6DXxW1Kn45Jtmx+yUe+sPhrbYSUp7TkzbW0jI5fVPn6lsNG9NEs5dFg5qRhFNAoVdRw5qQDLALNKhwbQ==
svelte-intersection-observer@^0.10.0:
version "0.10.0"
resolved "https://registry.yarnpkg.com/svelte-intersection-observer/-/svelte-intersection-observer-0.10.0.tgz#6f9ff6c235ee80b761c169406e389ff15d1e8f3a"
integrity sha512-GdOTMSrRpoBciMe+NbocsHOBvqKJ5OiL5H8Jz4mBoDGHba9VOI4oGeXAmMOmh8mi5gq95+0b0DVI+6alcQ7TCA==
svelte-preprocess@^4.0.0, svelte-preprocess@^4.10.6:
version "4.10.6"
resolved "https://registry.yarnpkg.com/svelte-preprocess/-/svelte-preprocess-4.10.6.tgz#5f9a53e7ed3b85fc7e0841120c725b76ac5a1ba8"

View file

@ -0,0 +1,82 @@
use crate::helpers::{authorized, ApiResult};
use chrono::{DateTime, Utc};
use poem::session::Session;
use poem::web::Data;
use poem_openapi::payload::Json;
use poem_openapi::{ApiResponse, Object, OpenApi};
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect};
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
use warpgate_db_entities::LogEntry;
pub struct Api;
#[derive(ApiResponse)]
enum GetLogsResponse {
#[oai(status = 200)]
Ok(Json<Vec<LogEntry::Model>>),
}
#[derive(Object)]
struct GetLogsRequest {
before: Option<DateTime<Utc>>,
after: Option<DateTime<Utc>>,
limit: Option<u64>,
session_id: Option<Uuid>,
username: Option<String>,
search: Option<String>,
}
#[OpenApi]
impl Api {
#[oai(path = "/logs", method = "post", operation_id = "get_logs")]
async fn api_get_all_logs(
&self,
db: Data<&Arc<Mutex<DatabaseConnection>>>,
body: Json<GetLogsRequest>,
session: &Session,
) -> ApiResult<GetLogsResponse> {
authorized(session, || async move {
use warpgate_db_entities::LogEntry;
let db = db.lock().await;
let mut q = LogEntry::Entity::find()
.order_by_desc(LogEntry::Column::Timestamp)
.limit(body.limit.unwrap_or(100));
if let Some(before) = body.before {
q = q.filter(LogEntry::Column::Timestamp.lt(before));
}
if let Some(after) = body.after {
q = q
.filter(LogEntry::Column::Timestamp.gt(after))
.order_by_asc(LogEntry::Column::Timestamp);
}
if let Some(ref session_id) = body.session_id {
q = q.filter(LogEntry::Column::SessionId.eq(*session_id));
}
if let Some(ref username) = body.username {
q = q.filter(LogEntry::Column::SessionId.eq(username.clone()));
}
if let Some(ref search) = body.search {
q = q.filter(
LogEntry::Column::Text
.contains(search)
.or(LogEntry::Column::Username.contains(search)),
);
}
let logs = q
.all(&*db)
.await
.map_err(poem::error::InternalServerError)?;
let logs = logs
.into_iter()
.map(Into::into)
.collect::<Vec<LogEntry::Model>>();
Ok(GetLogsResponse::Ok(Json(logs)))
})
.await
}
}

View file

@ -2,6 +2,7 @@ pub mod auth;
pub mod info;
pub mod known_hosts_detail;
pub mod known_hosts_list;
pub mod logs;
pub mod recordings_detail;
pub mod sessions_detail;
pub mod sessions_list;

View file

@ -44,6 +44,7 @@ impl AdminServer {
crate::api::info::Api,
crate::api::auth::Api,
crate::api::ssh_keys::Api,
crate::api::logs::Api,
),
"Warpgate",
env!("CARGO_PKG_VERSION"),

View file

@ -13,6 +13,7 @@ chrono = {version = "0.4", features = ["serde"]}
data-encoding = "2.3"
humantime-serde = "1.1"
lazy_static = "1.4"
once_cell = "1.10"
packet = "0.1"
password-hash = "0.4"
poem-openapi = {version = "^1.3.30", features = ["swagger-ui", "chrono", "uuid", "static-files"]}
@ -26,6 +27,8 @@ thiserror = "1.0"
tokio = {version = "1.18", features = ["tracing"]}
totp-rs = "1.4"
tracing = "0.1"
tracing-core = "0.1"
tracing-subscriber = "0.3"
url = "2.2"
uuid = {version = "0.8", features = ["v4", "serde"]}
warpgate-db-entities = {version = "*", path = "../warpgate-db-entities"}

View file

@ -189,6 +189,24 @@ impl Default for RecordingsConfig {
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct LogConfig {
#[serde(default = "_default_retention", with = "humantime_serde")]
pub retention: Duration,
#[serde(default)]
pub send_to: Option<String>,
}
impl Default for LogConfig {
fn default() -> Self {
Self {
retention: _default_retention(),
send_to: None,
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct WarpgateConfigStore {
pub targets: Vec<Target>,
@ -207,8 +225,8 @@ pub struct WarpgateConfigStore {
#[serde(default)]
pub ssh: SSHConfig,
#[serde(default = "_default_retention", with = "humantime_serde")]
pub retention: Duration,
#[serde(default)]
pub log: LogConfig,
}
impl Default for WarpgateConfigStore {
@ -221,7 +239,7 @@ impl Default for WarpgateConfigStore {
web_admin: WebAdminConfig::default(),
database_url: _default_database_url(),
ssh: SSHConfig::default(),
retention: _default_retention(),
log: LogConfig::default(),
}
}
}

View file

@ -104,7 +104,7 @@ impl ConfigProvider for FileConfigProvider {
base64_bytes.pop();
let client_key = format!("{} {}", kind, base64_bytes);
debug!(username=%user.username, "Client key: {}", client_key);
debug!(username = &user.username[..], "Client key: {}", client_key);
for credential in user.credentials.iter() {
if let UserAuthCredential::PublicKey { key: ref user_key } = credential {
@ -131,7 +131,10 @@ impl ConfigProvider for FileConfigProvider {
}
Ok(false) => continue,
Err(e) => {
error!(username=%user.username, "Error verifying password hash: {}", e);
error!(
username = &user.username[..],
"Error verifying password hash: {}", e
);
continue;
}
}
@ -155,7 +158,10 @@ impl ConfigProvider for FileConfigProvider {
}
if valid_credentials.is_empty() {
warn!(username=%user.username, "Client credentials did not match");
warn!(
username = &user.username[..],
"Client credentials did not match"
);
}
match user.require {

View file

@ -1,14 +1,14 @@
use crate::helpers::fs::secure_file;
use crate::WarpgateConfig;
use anyhow::Result;
use sea_orm::sea_query::Expr;
use sea_orm::{
ConnectOptions, Database, DatabaseConnection, EntityTrait, QueryFilter, TransactionTrait,
};
use std::time::Duration;
use warpgate_db_entities::LogEntry;
use warpgate_db_migrations::{Migrator, MigratorTrait};
use crate::helpers::fs::secure_file;
use crate::WarpgateConfig;
pub async fn connect_to_db(config: &WarpgateConfig) -> Result<DatabaseConnection> {
let mut url = url::Url::parse(&config.store.database_url.expose_secret()[..])?;
if url.scheme() == "sqlite" {
@ -80,6 +80,11 @@ pub async fn cleanup_db(db: &mut DatabaseConnection, retention: &Duration) -> Re
use warpgate_db_entities::{Recording, Session};
let cutoff = chrono::Utc::now() - chrono::Duration::from_std(*retention)?;
LogEntry::Entity::delete_many()
.filter(Expr::col(LogEntry::Column::Timestamp).lt(cutoff))
.exec(db)
.await?;
Recording::Entity::delete_many()
.filter(Expr::col(Session::Column::Ended).is_not_null())
.filter(Expr::col(Session::Column::Ended).lt(cutoff))

View file

@ -7,6 +7,7 @@ mod data;
pub mod db;
pub mod eventhub;
pub mod helpers;
pub mod logging;
mod protocols;
pub mod recordings;
mod services;

View file

@ -0,0 +1,71 @@
use super::layer::ValuesLogLayer;
use super::values::SerializedRecordValues;
use once_cell::sync::OnceCell;
use sea_orm::query::JsonValue;
use sea_orm::{ActiveModelTrait, DatabaseConnection};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::*;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
use uuid::Uuid;
use warpgate_db_entities::LogEntry;
static LOG_SENDER: OnceCell<tokio::sync::broadcast::Sender<LogEntry::ActiveModel>> =
OnceCell::new();
pub fn make_database_logger_layer<S>() -> impl Layer<S>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let _ = LOG_SENDER.set(tokio::sync::broadcast::channel(1024).0);
ValuesLogLayer::new(|values| {
if let Some(sender) = LOG_SENDER.get() {
if let Some(entry) = values_to_log_entry_data(values) {
let _ = sender.send(entry);
}
}
})
}
pub fn install_database_logger(database: Arc<Mutex<DatabaseConnection>>) {
tokio::spawn(async move {
let mut receiver = LOG_SENDER.get().unwrap().subscribe();
loop {
match receiver.recv().await {
Err(_) => break,
Ok(log_entry) => {
let database = database.lock().await;
if let Err(error) = log_entry.insert(&*database).await {
error!(?error, "Failed to store log entry");
}
}
}
}
});
}
fn values_to_log_entry_data(mut values: SerializedRecordValues) -> Option<LogEntry::ActiveModel> {
let session_id = (*values).remove("session");
let username = (*values).remove("session_username");
let message = (*values).remove("message").unwrap_or("".to_string());
use sea_orm::ActiveValue::Set;
let session_id = session_id.and_then(|x| Uuid::parse_str(&x).ok());
let Some(session_id) = session_id else {
return None
};
Some(LogEntry::ActiveModel {
id: Set(Uuid::new_v4()),
text: Set(message),
values: Set(values
.into_values()
.into_iter()
.map(|(k, v)| (k, JsonValue::from(v)))
.collect()),
session_id: Set(session_id),
username: Set(username),
timestamp: Set(chrono::Utc::now()),
})
}

View file

@ -0,0 +1,73 @@
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use super::values::{RecordVisitor, SerializedRecordValues};
pub struct ValuesLogLayer<C>
where
C: Fn(SerializedRecordValues),
{
callback: C,
}
impl<C> ValuesLogLayer<C>
where
C: Fn(SerializedRecordValues),
{
pub fn new(callback: C) -> Self {
Self { callback }
}
}
impl<C, S> tracing_subscriber::Layer<S> for ValuesLogLayer<C>
where
S: Subscriber + for<'a> LookupSpan<'a>,
C: Fn(SerializedRecordValues),
Self: 'static,
{
fn on_new_span(
&self,
attrs: &tracing_core::span::Attributes<'_>,
id: &tracing_core::span::Id,
ctx: Context<'_, S>,
) {
let Some(span) = ctx.span(id) else {
return
};
if !span.metadata().target().starts_with("warpgate") {
return;
}
let mut values = SerializedRecordValues::new();
attrs.record(&mut RecordVisitor::new(&mut values));
span.extensions_mut().replace(values);
}
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
if !event.metadata().target().starts_with("warpgate") {
return;
}
if event.metadata().level() > &Level::INFO {
return;
}
let mut values = SerializedRecordValues::new();
let current = ctx.current_span();
let parent_id = event.parent().or_else(|| current.id());
if let Some(parent_id) = parent_id {
if let Some(span) = ctx.span(parent_id) {
for span in span.scope().from_root() {
if let Some(other_values) = span.extensions().get::<SerializedRecordValues>() {
values.extend((*other_values).clone().into_iter());
}
}
}
}
event.record(&mut RecordVisitor::new(&mut values));
(self.callback)(values);
}
}

View file

@ -0,0 +1,7 @@
mod database;
mod layer;
mod socket;
mod values;
pub use database::{install_database_logger, make_database_logger_layer};
pub use socket::make_socket_logger_layer;

View file

@ -0,0 +1,61 @@
use super::layer::ValuesLogLayer;
use crate::WarpgateConfig;
use bytes::BytesMut;
use chrono::Local;
use tokio::net::UnixDatagram;
use tracing::*;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
static SKIP_KEY: &str = "is_socket_logging_error";
pub async fn make_socket_logger_layer<S>(config: &WarpgateConfig) -> impl Layer<S>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let mut socket = None;
let socket_address = config.store.log.send_to.clone();
if socket_address.is_some() {
socket = UnixDatagram::unbound()
.map_err(|error| {
println!("Failed to create the log forwarding UDP socket: {}", error);
})
.ok();
}
let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
let got_socket = socket.is_some();
let layer = ValuesLogLayer::new(move |mut values| {
if !got_socket || values.contains_key(&SKIP_KEY) {
return;
}
values.insert("timestamp", Local::now().to_rfc3339());
let _ = tx.try_send(values);
});
if !got_socket {
return layer;
}
tokio::spawn(async move {
while let Some(values) = rx.recv().await {
let Some(ref socket) = socket else {
return
};
let Some(ref socket_address) = socket_address else {
return
};
let buffer = BytesMut::from(
&serde_json::to_vec(&values).expect("Cannot serialize log entry, this is a bug")[..],
);
if let Err(error) = socket.send_to(buffer.as_ref(), socket_address).await {
error!(%error, is_socket_logging_error=true, "Failed to forward log entry");
}
}
});
layer
}

View file

@ -0,0 +1,55 @@
use serde::Serialize;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::DerefMut;
use tracing::field::Visit;
use tracing_core::Field;
pub type SerializedRecordValuesInner = HashMap<&'static str, String>;
#[derive(Serialize)]
pub struct SerializedRecordValues(SerializedRecordValuesInner);
impl SerializedRecordValues {
pub fn new() -> Self {
Self(HashMap::new())
}
pub fn into_values(self) -> SerializedRecordValuesInner {
self.0
}
}
impl std::ops::Deref for SerializedRecordValues {
type Target = SerializedRecordValuesInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for SerializedRecordValues {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub struct RecordVisitor<'a> {
values: &'a mut SerializedRecordValues,
}
impl<'a> RecordVisitor<'a> {
pub fn new(values: &'a mut SerializedRecordValues) -> Self {
Self { values }
}
}
impl<'a> Visit for RecordVisitor<'a> {
fn record_str(&mut self, field: &Field, value: &str) {
self.values.insert(field.name(), value.to_string());
}
fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
self.values.insert(field.name(), format!("{:?}", value));
}
}

View file

@ -9,4 +9,5 @@ chrono = {version = "0.4", features = ["serde"]}
poem-openapi = {version = "^1.3.30", features = ["chrono", "uuid"]}
sea-orm = {version = "^0.7", features = ["macros", "with-chrono", "with-uuid"], default-features = false}
serde = "1.0"
serde_json = "1.0"
uuid = {version = "0.8", features = ["v4", "serde"]}

View file

@ -0,0 +1,24 @@
use chrono::{DateTime, Utc};
use poem_openapi::Object;
use sea_orm::entity::prelude::*;
use sea_orm::query::JsonValue;
use serde::Serialize;
use uuid::Uuid;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Object)]
#[sea_orm(table_name = "log")]
#[oai(rename = "LogEntry")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: Uuid,
pub text: String,
pub values: JsonValue,
pub timestamp: DateTime<Utc>,
pub session_id: Uuid,
pub username: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -1,6 +1,7 @@
#![allow(non_snake_case)]
pub mod KnownHost;
pub mod LogEntry;
pub mod Recording;
pub mod Session;
pub mod Ticket;

View file

@ -4,6 +4,8 @@ mod m00001_create_ticket;
mod m00002_create_session;
mod m00003_create_recording;
mod m00004_create_known_host;
mod m00005_create_log_entry;
pub struct Migrator;
#[async_trait::async_trait]
@ -14,6 +16,7 @@ impl MigratorTrait for Migrator {
Box::new(m00002_create_session::Migration),
Box::new(m00003_create_recording::Migration),
Box::new(m00004_create_known_host::Migration),
Box::new(m00005_create_log_entry::Migration),
]
}
}

View file

@ -84,13 +84,6 @@ impl MigrationTrait for Migration {
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.name("recording__unique__session_id__name")
.to_owned(),
)
.await?;
manager
.drop_table(Table::drop().table(recording::Entity).to_owned())
.await

View file

@ -0,0 +1,80 @@
use sea_schema::migration::sea_orm::Schema;
use sea_schema::migration::sea_query::*;
use sea_schema::migration::*;
pub mod log_entry {
use chrono::{DateTime, Utc};
use sea_orm::entity::prelude::*;
use sea_orm::query::JsonValue;
use uuid::Uuid;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "log")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: Uuid,
pub text: String,
pub values: JsonValue,
pub timestamp: DateTime<Utc>,
pub session_id: Uuid,
pub username: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
}
pub struct Migration;
impl MigrationName for Migration {
fn name(&self) -> &str {
"m00005_create_log_entry"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let builder = manager.get_database_backend();
let schema = Schema::new(builder);
manager
.create_table(schema.create_table_from_entity(log_entry::Entity))
.await?;
manager
.create_index(
Index::create()
.table(log_entry::Entity)
.name("log_entry__timestamp_session_id")
.col(log_entry::Column::Timestamp)
.col(log_entry::Column::SessionId)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.table(log_entry::Entity)
.name("log_entry__session_id")
.col(log_entry::Column::SessionId)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.table(log_entry::Entity)
.name("log_entry__username")
.col(log_entry::Column::Username)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(log_entry::Entity).to_owned())
.await
}
}

View file

@ -4,6 +4,7 @@ use russh::client::Channel;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::*;
use uuid::Uuid;
use warpgate_common::SessionId;
use crate::{ChannelOperation, RCEvent};
@ -12,7 +13,7 @@ pub struct DirectTCPIPChannel {
channel_id: Uuid,
ops_rx: UnboundedReceiver<ChannelOperation>,
events_tx: UnboundedSender<RCEvent>,
session_tag: String,
session_id: SessionId,
}
impl DirectTCPIPChannel {
@ -21,14 +22,14 @@ impl DirectTCPIPChannel {
channel_id: Uuid,
ops_rx: UnboundedReceiver<ChannelOperation>,
events_tx: UnboundedSender<RCEvent>,
session_tag: String,
session_id: SessionId,
) -> Self {
DirectTCPIPChannel {
client_channel,
channel_id,
ops_rx,
events_tx,
session_tag,
session_id,
}
}
@ -46,7 +47,7 @@ impl DirectTCPIPChannel {
Some(ChannelOperation::Close) => break,
None => break,
Some(operation) => {
warn!(client_channel=%self.channel_id, ?operation, session=%self.session_tag, "unexpected client_channel operation");
warn!(client_channel=%self.channel_id, ?operation, session=%self.session_id, "unexpected client_channel operation");
}
}
}
@ -73,7 +74,7 @@ impl DirectTCPIPChannel {
break
},
Some(operation) => {
warn!(client_channel=%self.channel_id, ?operation, session=%self.session_tag, "unexpected client_channel operation");
warn!(client_channel=%self.channel_id, ?operation, session=%self.session_id, "unexpected client_channel operation");
}
}
}
@ -85,6 +86,6 @@ impl DirectTCPIPChannel {
impl Drop for DirectTCPIPChannel {
fn drop(&mut self) {
info!(client_channel=%self.channel_id, session=%self.session_tag, "Closed");
info!(client_channel=%self.channel_id, session=%self.session_id, "Closed");
}
}

View file

@ -4,6 +4,7 @@ use russh::client::Channel;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::*;
use uuid::Uuid;
use warpgate_common::SessionId;
use crate::{ChannelOperation, RCEvent};
@ -12,7 +13,7 @@ pub struct SessionChannel {
channel_id: Uuid,
ops_rx: UnboundedReceiver<ChannelOperation>,
events_tx: UnboundedSender<RCEvent>,
session_tag: String,
session_id: SessionId,
}
impl SessionChannel {
@ -21,14 +22,14 @@ impl SessionChannel {
channel_id: Uuid,
ops_rx: UnboundedReceiver<ChannelOperation>,
events_tx: UnboundedSender<RCEvent>,
session_tag: String,
session_id: SessionId,
) -> Self {
SessionChannel {
client_channel,
channel_id,
ops_rx,
events_tx,
session_tag,
session_id,
}
}
@ -149,6 +150,6 @@ impl SessionChannel {
impl Drop for SessionChannel {
fn drop(&mut self) {
info!(channel=%self.channel_id, session=%self.session_tag, "Closed");
info!(channel=%self.channel_id, session=%self.session_id, "Closed");
}
}

View file

@ -1,16 +1,14 @@
use std::pin::Pin;
use crate::known_hosts::{KnownHostValidationResult, KnownHosts};
use crate::ConnectionError;
use futures::FutureExt;
use russh::client::Session;
use russh_keys::key::PublicKey;
use russh_keys::PublicKeyBase64;
use std::pin::Pin;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use tracing::*;
use warpgate_common::{Services, TargetSSHOptions};
use warpgate_common::{Services, SessionId, TargetSSHOptions};
#[derive(Debug)]
pub enum ClientHandlerEvent {
@ -24,7 +22,7 @@ pub struct ClientHandler {
pub ssh_options: TargetSSHOptions,
pub event_tx: UnboundedSender<ClientHandlerEvent>,
pub services: Services,
pub session_tag: String,
pub session_id: SessionId,
}
#[derive(Debug, thiserror::Error)]
@ -78,7 +76,7 @@ impl russh::client::Handler for ClientHandler {
key_type,
key_base64,
}) => {
warn!(session=%self.session_tag, "Host key is invalid!");
warn!(session=%self.session_id, "Host key is invalid!");
return Err(ClientHandlerError::ConnectionError(
ConnectionError::HostKeyMismatch {
received_key_type: server_public_key.name().to_owned(),
@ -89,7 +87,7 @@ impl russh::client::Handler for ClientHandler {
));
}
Ok(KnownHostValidationResult::Unknown) => {
warn!(session=%self.session_tag, "Host key is unknown");
warn!(session=%self.session_id, "Host key is unknown");
let (tx, rx) = oneshot::channel();
self.event_tx
@ -108,7 +106,7 @@ impl russh::client::Handler for ClientHandler {
)
.await
{
error!(?error, session=%self.session_tag, "Failed to save host key");
error!(?error, session=%self.session_id, "Failed to save host key");
}
Ok((self, true))
} else {
@ -116,7 +114,7 @@ impl russh::client::Handler for ClientHandler {
}
}
Err(error) => {
error!(?error, session=%self.session_tag, "Failed to verify the host key");
error!(?error, session=%self.session_id, "Failed to verify the host key");
Err(ClientHandlerError::Internal)
}
}
@ -128,6 +126,6 @@ impl russh::client::Handler for ClientHandler {
impl Drop for ClientHandler {
fn drop(&mut self) {
let _ = self.event_tx.send(ClientHandlerEvent::Disconnect);
debug!(session=%self.session_tag, "Dropped");
debug!(session=%self.session_id, "Dropped");
}
}

View file

@ -120,7 +120,6 @@ pub struct RemoteClient {
inner_event_tx: UnboundedSender<InnerEvent>,
child_tasks: Vec<JoinHandle<Result<()>>>,
services: Services,
session_tag: String,
}
pub struct RemoteClientHandles {
@ -130,7 +129,7 @@ pub struct RemoteClientHandles {
}
impl RemoteClient {
pub fn create(id: SessionId, session_tag: String, services: Services) -> RemoteClientHandles {
pub fn create(id: SessionId, services: Services) -> RemoteClientHandles {
let (event_tx, event_rx) = unbounded_channel();
let (command_tx, mut command_rx) = unbounded_channel();
let (abort_tx, abort_rx) = unbounded_channel();
@ -147,19 +146,21 @@ impl RemoteClient {
inner_event_rx,
inner_event_tx: inner_event_tx.clone(),
child_tasks: vec![],
session_tag,
services,
abort_rx,
};
tokio::spawn({
async move {
while let Some(e) = command_rx.recv().await {
inner_event_tx.send(InnerEvent::RCCommand(e))?
tokio::spawn(
{
async move {
while let Some(e) = command_rx.recv().await {
inner_event_tx.send(InnerEvent::RCCommand(e))?
}
Ok::<(), anyhow::Error>(())
}
Ok::<(), anyhow::Error>(())
}
});
.instrument(Span::current()),
);
this.start();
@ -231,7 +232,7 @@ impl RemoteClient {
}
},
None => {
debug!(channel=%channel_id, session=%self.session_tag, "operation for unknown channel")
debug!(channel=%channel_id, "operation for unknown channel")
}
}
}
@ -262,7 +263,7 @@ impl RemoteClient {
// }
}
Err(e) => {
debug!(session=%self.session_tag, "Connect error: {}", e);
debug!("Connect error: {}", e);
let _ = self.tx.send(RCEvent::ConnectionError(e));
self.set_disconnected();
break
@ -278,20 +279,20 @@ impl RemoteClient {
}
}
InnerEvent::ClientHandlerEvent(client_event) => {
debug!(session=%self.session_tag, "Client handler event: {:?}", client_event);
debug!("Client handler event: {:?}", client_event);
match client_event {
ClientHandlerEvent::Disconnect => {
self._on_disconnect().await?;
}
event => {
error!(session=%self.session_tag, ?event, "Unhandled client handler event");
error!(?event, "Unhandled client handler event");
},
}
}
}
}
Some(_) = self.abort_rx.recv() => {
debug!(session=%self.session_tag, "Abort requested");
debug!("Abort requested");
self.disconnect().await?;
break
}
@ -301,12 +302,12 @@ impl RemoteClient {
}
.await
.map_err(|error| {
error!(?error, session=%self.session_tag, "error in command loop");
error!(?error, "error in command loop");
anyhow::anyhow!("Error in command loop: {error}")
})?;
debug!(session=%self.session_tag, "No more commmands");
debug!("No more commmands");
Ok::<(), anyhow::Error>(())
});
}.instrument(Span::current()));
}
async fn connect(&mut self, ssh_options: TargetSSHOptions) -> Result<(), ConnectionError> {
@ -324,7 +325,7 @@ impl RemoteClient {
}
};
info!(?address, username=?ssh_options.username, session=%self.session_tag, "Connecting");
info!(?address, username = &ssh_options.username[..], "Connecting");
let config = russh::client::Config {
..Default::default()
};
@ -335,7 +336,7 @@ impl RemoteClient {
ssh_options: ssh_options.clone(),
event_tx,
services: self.services.clone(),
session_tag: self.session_tag.clone(),
session_id: self.id,
};
let fut_connect = russh::client::connect(config, address, handler);
@ -355,7 +356,7 @@ impl RemoteClient {
}
}
Some(_) = self.abort_rx.recv() => {
info!(session=%self.session_tag, "Abort requested");
info!("Abort requested");
self.set_disconnected();
return Err(ConnectionError::Aborted)
}
@ -366,7 +367,7 @@ impl RemoteClient {
ClientHandlerError::Ssh(e) => ConnectionError::SSH(e),
ClientHandlerError::Internal => ConnectionError::Internal,
};
error!(error=?connection_error, session=%self.session_tag, "Connection error");
error!(error=?connection_error, "Connection error");
return Err(connection_error);
}
@ -377,10 +378,10 @@ impl RemoteClient {
match ssh_options.auth {
SSHTargetAuth::Password { password } => {
auth_result = session
.authenticate_password(ssh_options.username, password.expose_secret())
.authenticate_password(ssh_options.username.clone(), password.expose_secret())
.await?;
if auth_result {
debug!(session=%self.session_tag, "Authenticated with password");
debug!(username=&ssh_options.username[..], "Authenticated with password");
}
}
SSHTargetAuth::PublicKey => {
@ -391,7 +392,7 @@ impl RemoteClient {
.authenticate_publickey(ssh_options.username.clone(), Arc::new(key))
.await?;
if auth_result {
debug!(session=%self.session_tag, key=%key_str, "Authenticated with key");
debug!(username=&ssh_options.username[..], key=%key_str, "Authenticated with key");
break;
}
}
@ -399,7 +400,7 @@ impl RemoteClient {
}
if !auth_result {
error!(session=%self.session_tag, "Auth rejected");
error!("Auth rejected");
let _ = session
.disconnect(russh::Disconnect::ByApplication, "", "")
.await;
@ -408,7 +409,7 @@ impl RemoteClient {
self.session = Some(Arc::new(Mutex::new(session)));
info!(?address, session=%self.session_tag, "Connected");
info!(?address, "Connected");
tokio::spawn({
let inner_event_tx = self.inner_event_tx.clone();
@ -419,7 +420,7 @@ impl RemoteClient {
}
Ok::<(), anyhow::Error>(())
}
});
}.instrument(Span::current()));
return Ok(())
}
@ -435,13 +436,7 @@ impl RemoteClient {
let (tx, rx) = unbounded_channel();
self.channel_pipes.lock().await.insert(channel_id, tx);
let channel = SessionChannel::new(
channel,
channel_id,
rx,
self.tx.clone(),
self.session_tag.clone(),
);
let channel = SessionChannel::new(channel, channel_id, rx, self.tx.clone(), self.id);
self.child_tasks.push(
tokio::task::Builder::new()
.name(&format!("SSH {} {:?} ops", self.id, channel_id))
@ -470,13 +465,8 @@ impl RemoteClient {
let (tx, rx) = unbounded_channel();
self.channel_pipes.lock().await.insert(channel_id, tx);
let channel = DirectTCPIPChannel::new(
channel,
channel_id,
rx,
self.tx.clone(),
self.session_tag.clone(),
);
let channel =
DirectTCPIPChannel::new(channel, channel_id, rx, self.tx.clone(), self.id);
self.child_tasks.push(
tokio::task::Builder::new()
.name(&format!("SSH {} {:?} ops", self.id, channel_id))
@ -509,7 +499,7 @@ impl Drop for RemoteClient {
for task in self.child_tasks.drain(..) {
let _ = task.abort();
}
info!(session=%self.session_tag, "Closed connection");
debug!(session=%self.session_tag, "Dropped");
info!("Closed connection");
debug!("Dropped");
}
}

View file

@ -47,8 +47,7 @@ impl ProtocolServer for SSHProtocolServer {
return Err(TargetTestError::Misconfigured("Not an SSH target".to_owned()));
};
let mut handles =
RemoteClient::create(Uuid::new_v4(), "test".to_owned(), self.services.clone());
let mut handles = RemoteClient::create(Uuid::new_v4(), self.services.clone());
let _ = handles.command_tx.send(RCCommand::Connect(ssh_options));

View file

@ -42,11 +42,14 @@ impl russh::server::Handler for ServerHandler {
fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
async move {
self.session
.lock()
.await
._channel_open_session(ServerChannelId(channel), &mut session)
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_open_session(ServerChannelId(channel), &mut session)
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -60,11 +63,14 @@ impl russh::server::Handler for ServerHandler {
) -> Self::FutureUnit {
let name = name.to_string();
async move {
self.session
.lock()
.await
._channel_subsystem_request(ServerChannelId(channel), name)
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_subsystem_request(ServerChannelId(channel), name)
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -84,21 +90,24 @@ impl russh::server::Handler for ServerHandler {
let term = term.to_string();
let modes = modes.to_vec();
async move {
self.session
.lock()
.await
._channel_pty_request(
ServerChannelId(channel),
PtyRequest {
term,
col_width,
row_height,
pix_width,
pix_height,
modes,
},
)
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_pty_request(
ServerChannelId(channel),
PtyRequest {
term,
col_width,
row_height,
pix_width,
pix_height,
modes,
},
)
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -106,11 +115,14 @@ impl russh::server::Handler for ServerHandler {
fn shell_request(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
async move {
self.session
.lock()
.await
._channel_shell_request(ServerChannelId(channel))
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_shell_request(ServerChannelId(channel))
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -120,7 +132,14 @@ impl russh::server::Handler for ServerHandler {
let user = user.to_string();
let key = key.clone();
async move {
let result = self.session.lock().await._auth_publickey(user, &key).await;
let result = {
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._auth_publickey(user, &key)
.instrument(span)
.await
};
Ok((self, result))
}
.boxed()
@ -130,12 +149,14 @@ impl russh::server::Handler for ServerHandler {
let user = user.to_string();
let password = password.to_string();
async move {
let result = self
.session
.lock()
.await
._auth_password(Secret::new(user), Secret::new(password))
.await;
let result = {
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._auth_password(Secret::new(user), Secret::new(password))
.instrument(span)
.await
};
Ok((self, result))
}
.boxed()
@ -152,12 +173,14 @@ impl russh::server::Handler for ServerHandler {
.and_then(|mut r| r.next())
.and_then(|b| String::from_utf8(b.to_vec()).ok());
async move {
let result = self
.session
.lock()
.await
._auth_keyboard_interactive(Secret::new(user), response.map(Secret::new))
.await;
let result = {
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._auth_keyboard_interactive(Secret::new(user), response.map(Secret::new))
.instrument(span)
.await
};
Ok((self, result))
}
.boxed()
@ -166,11 +189,14 @@ impl russh::server::Handler for ServerHandler {
fn data(self, channel: ChannelId, data: &[u8], session: Session) -> Self::FutureUnit {
let data = BytesMut::from(data).freeze();
async move {
self.session
.lock()
.await
._data(ServerChannelId(channel), data)
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._data(ServerChannelId(channel), data)
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -185,11 +211,14 @@ impl russh::server::Handler for ServerHandler {
) -> Self::FutureUnit {
let data = BytesMut::from(data);
async move {
self.session
.lock()
.await
._extended_data(ServerChannelId(channel), code, data)
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._extended_data(ServerChannelId(channel), code, data)
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -197,11 +226,14 @@ impl russh::server::Handler for ServerHandler {
fn channel_close(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
async move {
self.session
.lock()
.await
._channel_close(ServerChannelId(channel))
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_close(ServerChannelId(channel))
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -217,21 +249,24 @@ impl russh::server::Handler for ServerHandler {
session: Session,
) -> Self::FutureUnit {
async move {
self.session
.lock()
.await
._window_change_request(
ServerChannelId(channel),
PtyRequest {
term: "".to_string(),
col_width,
row_height,
pix_width,
pix_height,
modes: vec![],
},
)
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._window_change_request(
ServerChannelId(channel),
PtyRequest {
term: "".to_string(),
col_width,
row_height,
pix_width,
pix_height,
modes: vec![],
},
)
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -239,11 +274,14 @@ impl russh::server::Handler for ServerHandler {
fn channel_eof(self, channel: ChannelId, session: Session) -> Self::FutureUnit {
async move {
self.session
.lock()
.await
._channel_eof(ServerChannelId(channel))
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_eof(ServerChannelId(channel))
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -256,11 +294,14 @@ impl russh::server::Handler for ServerHandler {
session: Session,
) -> Self::FutureUnit {
async move {
self.session
.lock()
.await
._channel_signal(ServerChannelId(channel), signal_name)
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_signal(ServerChannelId(channel), signal_name)
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -269,11 +310,14 @@ impl russh::server::Handler for ServerHandler {
fn exec_request(self, channel: ChannelId, data: &[u8], session: Session) -> Self::FutureUnit {
let data = BytesMut::from(data);
async move {
self.session
.lock()
.await
._channel_exec_request(ServerChannelId(channel), data.freeze())
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_exec_request(ServerChannelId(channel), data.freeze())
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -289,11 +333,14 @@ impl russh::server::Handler for ServerHandler {
let variable_name = variable_name.to_string();
let variable_value = variable_value.to_string();
async move {
self.session.lock().await._channel_env_request(
ServerChannelId(channel),
variable_name,
variable_value,
)?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_env_request(ServerChannelId(channel), variable_name, variable_value)
.instrument(span)
.await?
};
Ok((self, session))
}
.boxed()
@ -311,20 +358,23 @@ impl russh::server::Handler for ServerHandler {
let host_to_connect = host_to_connect.to_string();
let originator_address = originator_address.to_string();
async move {
self.session
.lock()
.await
._channel_open_direct_tcpip(
ServerChannelId(channel),
DirectTCPIPParams {
host_to_connect,
port_to_connect,
originator_address,
originator_port,
},
&mut session,
)
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_open_direct_tcpip(
ServerChannelId(channel),
DirectTCPIPParams {
host_to_connect,
port_to_connect,
originator_address,
originator_port,
},
&mut session,
)
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()
@ -342,19 +392,22 @@ impl russh::server::Handler for ServerHandler {
let x11_auth_protocol = x11_auth_protocol.to_string();
let x11_auth_cookie = x11_auth_cookie.to_string();
async move {
self.session
.lock()
.await
._channel_x11_request(
ServerChannelId(channel),
X11Request {
single_conection,
x11_auth_protocol,
x11_auth_cookie,
x11_screen_number,
},
)
.await?;
{
let mut this_session = self.session.lock().await;
let span = this_session.make_logging_span();
this_session
._channel_x11_request(
ServerChannelId(channel),
X11Request {
single_conection,
x11_auth_protocol,
x11_auth_cookie,
x11_screen_number,
},
)
.instrument(span)
.await?;
}
Ok((self, session))
}
.boxed()

View file

@ -51,6 +51,7 @@ enum Event {
pub struct ServerSession {
pub id: SessionId,
username: Option<String>,
session_handle: Option<russh::server::Handle>,
pty_channels: Vec<Uuid>,
all_channels: Vec<Uuid>,
@ -90,11 +91,11 @@ impl ServerSession {
mut session_handle_rx: UnboundedReceiver<SessionHandleCommand>,
) -> Result<Arc<Mutex<Self>>> {
let id = server_handle.id();
let mut rc_handles = RemoteClient::create(
id,
session_debug_tag(&id, &remote_address),
services.clone(),
);
let _span = info_span!("SSH", session=%id);
let _enter = _span.enter();
let mut rc_handles = RemoteClient::create(id, services.clone());
let (hub, event_sender) = EventHub::setup();
let mut event_sub = hub.subscribe(|_| true).await;
@ -115,6 +116,7 @@ impl ServerSession {
let this = Self {
id: server_handle.id(),
username: None,
session_handle: None,
pty_channels: vec![],
all_channels: vec![],
@ -138,9 +140,6 @@ impl ServerSession {
})),
};
info!(session=?this, "New connection");
let session_debug_tag = format!("{:?}", this);
let this = Arc::new(Mutex::new(this));
let name = format!("SSH {} session control", id);
@ -173,28 +172,26 @@ impl ServerSession {
async move {
loop {
match event_sub.recv().await {
Some(Event::Client(RCEvent::Done)) => {
break
}
Some(Event::Client(RCEvent::Done)) => break,
Some(Event::Client(e)) => {
debug!(session=%session_debug_tag, event=?e, "Event");
debug!(event=?e, "Event");
let Some(this) = this.upgrade() else {
break;
};
let this = &mut this.lock().await;
if let Err(err) = this.handle_remote_event(e).await {
error!(session=%session_debug_tag, "Event handler error: {:?}", err);
error!("Event handler error: {:?}", err);
break;
}
}
Some(Event::Command(command)) => {
debug!(session=%session_debug_tag, ?command, "Session control");
debug!(?command, "Session control");
let Some(this) = this.upgrade() else {
break;
};
let this = &mut this.lock().await;
if let Err(err) = this.handle_session_control(command).await {
error!(session=%session_debug_tag, "Event handler error: {:?}", err);
error!("Event handler error: {:?}", err);
break;
}
}
@ -209,13 +206,20 @@ impl ServerSession {
None => break,
}
}
debug!(session=%session_debug_tag, "No more events");
debug!("No more events");
}
});
Ok(this)
}
pub fn make_logging_span(&self) -> tracing::Span {
match self.username {
Some(ref username) => info_span!("SSH", session=%self.id, session_username=%username),
None => info_span!("SSH", session=%self.id),
}
}
fn map_channel(&self, ch: &ServerChannelId) -> Result<Uuid> {
self.channel_map
.get_by_left(ch)
@ -231,7 +235,7 @@ impl ServerSession {
}
pub async fn emit_service_message(&mut self, msg: &str) -> Result<()> {
debug!(session=?self, "Service message: {}", msg);
debug!("Service message: {}", msg);
self.emit_pty_output(
format!(
@ -288,7 +292,7 @@ impl ServerSession {
match command {
SessionHandleCommand::Close => {
let _ = self.emit_service_message("Session closed by admin").await;
info!(session=?self, "Session closed by admin");
info!("Session closed by admin");
let _ = self.request_disconnect().await;
self.disconnect_server().await;
}
@ -370,14 +374,14 @@ impl ServerSession {
RCEvent::Output(channel, data) => {
if let Some(recorder) = self.channel_recorders.get_mut(&channel) {
if let Err(error) = recorder.write(&data).await {
error!(session=?self, %channel, ?error, "Failed to record terminal data");
error!(%channel, ?error, "Failed to record terminal data");
self.channel_recorders.remove(&channel);
}
}
if let Some(recorder) = self.traffic_connection_recorders.get_mut(&channel) {
if let Err(error) = recorder.write_rx(&data).await {
error!(session=?self, %channel, ?error, "Failed to record traffic data");
error!(%channel, ?error, "Failed to record traffic data");
self.traffic_connection_recorders.remove(&channel);
}
}
@ -459,7 +463,7 @@ impl ServerSession {
RCEvent::ExtendedData { channel, data, ext } => {
if let Some(recorder) = self.channel_recorders.get_mut(&channel) {
if let Err(error) = recorder.write(&data).await {
error!(session=?self, %channel, ?error, "Failed to record session data");
error!(%channel, ?error, "Failed to record session data");
self.channel_recorders.remove(&channel);
}
}
@ -549,7 +553,7 @@ impl ServerSession {
let channel = Uuid::new_v4();
self.channel_map.insert(server_channel_id, channel);
info!(session=?self, %channel, "Opening session channel");
info!(%channel, "Opening session channel");
self.all_channels.push(channel);
self.session_handle = Some(session.handle());
self.rc_tx
@ -566,7 +570,7 @@ impl ServerSession {
let uuid = Uuid::new_v4();
self.channel_map.insert(channel, uuid);
info!(session=?self, %channel, "Opening direct TCP/IP channel from {}:{} to {}:{}", params.originator_address, params.originator_port, params.host_to_connect, params.port_to_connect);
info!(%channel, "Opening direct TCP/IP channel from {}:{} to {}:{}", params.originator_address, params.originator_port, params.host_to_connect, params.port_to_connect);
let recorder = self
.traffic_recorder_for(&params.host_to_connect, params.port_to_connect)
@ -579,7 +583,7 @@ impl ServerSession {
src_port: params.originator_port as u16,
});
if let Err(error) = recorder.write_connection_setup().await {
error!(session=?self, %channel, ?error, "Failed to record connection setup");
error!(%channel, ?error, "Failed to record connection setup");
}
self.traffic_connection_recorders.insert(uuid, recorder);
}
@ -606,7 +610,7 @@ impl ServerSession {
.write_pty_resize(request.col_width, request.row_height)
.await
{
error!(session=?self, %channel_id, ?error, "Failed to record terminal data");
error!(%channel_id, ?error, "Failed to record terminal data");
self.channel_recorders.remove(&channel_id);
}
}
@ -637,7 +641,7 @@ impl ServerSession {
.write_pty_resize(request.col_width, request.row_height)
.await
{
error!(session=?self, %channel_id, ?error, "Failed to record terminal data");
error!(%channel_id, ?error, "Failed to record terminal data");
self.channel_recorders.remove(&channel_id);
}
}
@ -656,11 +660,11 @@ impl ServerSession {
let channel_id = self.map_channel(&server_channel_id)?;
match std::str::from_utf8(&data) {
Err(e) => {
error!(session=?self, channel=%channel_id, ?data, "Requested exec - invalid UTF-8");
error!(channel=%channel_id, ?data, "Requested exec - invalid UTF-8");
anyhow::bail!(e)
}
Ok::<&str, _>(command) => {
debug!(session=?self, channel=%channel_id, %command, "Requested exec");
debug!(channel=%channel_id, %command, "Requested exec");
let _ = self.maybe_connect_remote().await;
self.send_command(RCCommand::Channel(
channel_id,
@ -677,7 +681,7 @@ impl ServerSession {
request: X11Request,
) -> Result<()> {
let channel_id = self.map_channel(&server_channel_id)?;
debug!(session=?self, channel=%channel_id, "Requested X11");
debug!(channel=%channel_id, "Requested X11");
let _ = self.maybe_connect_remote().await;
self.send_command(RCCommand::Channel(
channel_id,
@ -686,14 +690,14 @@ impl ServerSession {
Ok(())
}
pub fn _channel_env_request(
pub async fn _channel_env_request(
&mut self,
server_channel_id: ServerChannelId,
name: String,
value: String,
) -> Result<()> {
let channel_id = self.map_channel(&server_channel_id)?;
debug!(session=?self, channel=%channel_id, %name, %value, "Environment");
debug!(channel=%channel_id, %name, %value, "Environment");
self.send_command(RCCommand::Channel(
channel_id,
ChannelOperation::RequestEnv(name, value),
@ -719,7 +723,7 @@ impl ServerSession {
e.insert(recorder);
}
Err(error) => {
error!(session=?self, %host, %port, ?error, "Failed to start recording");
error!(%host, %port, ?error, "Failed to start recording");
}
}
}
@ -760,11 +764,11 @@ impl ServerSession {
self.channel_recorders.insert(channel_id, recorder);
}
Err(error) => {
error!(session=?self, channel=%channel_id, ?error, "Failed to start recording");
error!(channel=%channel_id, ?error, "Failed to start recording");
}
}
info!(session=?self, %channel_id, "Opening shell");
info!(%channel_id, "Opening shell");
let _ = self
.session_handle
.as_mut()
@ -781,7 +785,7 @@ impl ServerSession {
name: String,
) -> Result<()> {
let channel_id = self.map_channel(&server_channel_id)?;
info!(session=?self, channel=%channel_id, "Requesting subsystem {}", &name);
info!(channel=%channel_id, "Requesting subsystem {}", &name);
self.send_command(RCCommand::Channel(
channel_id,
ChannelOperation::RequestSubsystem(name),
@ -791,16 +795,16 @@ impl ServerSession {
pub async fn _data(&mut self, server_channel_id: ServerChannelId, data: Bytes) -> Result<()> {
let channel_id = self.map_channel(&server_channel_id)?;
debug!(session=?self, channel=%server_channel_id.0, ?data, "Data");
debug!(channel=%server_channel_id.0, ?data, "Data");
if self.rc_state == RCState::Connecting && data.get(0) == Some(&3) {
info!(session=?self, channel=%channel_id, "User requested connection abort (Ctrl-C)");
info!(channel=%channel_id, "User requested connection abort (Ctrl-C)");
self.request_disconnect().await;
return Ok(());
}
if let Some(recorder) = self.traffic_connection_recorders.get_mut(&channel_id) {
if let Err(error) = recorder.write_tx(&data).await {
error!(session=?self, channel=%channel_id, ?error, "Failed to record traffic data");
error!(channel=%channel_id, ?error, "Failed to record traffic data");
self.traffic_connection_recorders.remove(&channel_id);
}
}
@ -823,7 +827,7 @@ impl ServerSession {
data: BytesMut,
) -> Result<()> {
let channel_id = self.map_channel(&server_channel_id)?;
debug!(session=?self, channel=%server_channel_id.0, ?data, "Data");
debug!(channel=%server_channel_id.0, ?data, "Data");
self.send_command(RCCommand::Channel(
channel_id,
ChannelOperation::ExtendedData {
@ -841,7 +845,11 @@ impl ServerSession {
) -> russh::server::Auth {
let selector: AuthSelector = (&ssh_username).into();
info!(session=?self, "Public key auth as {:?} with key FP {}", selector, key.fingerprint());
info!(
"Public key auth as {:?} with key FP {}",
selector,
key.fingerprint()
);
self.credentials.push(AuthCredential::PublicKey {
kind: key.name().to_string(),
@ -853,7 +861,7 @@ impl ServerSession {
Ok(AuthResult::Rejected) => russh::server::Auth::Reject,
Ok(AuthResult::OTPNeeded) => russh::server::Auth::Reject,
Err(error) => {
error!(session=?self, ?error, "Failed to verify credentials");
error!(?error, "Failed to verify credentials");
russh::server::Auth::Reject
}
}
@ -865,7 +873,7 @@ impl ServerSession {
password: Secret<String>,
) -> russh::server::Auth {
let selector: AuthSelector = ssh_username.expose_secret().into();
info!(session=?self, "Password key auth as {:?}", selector);
info!("Password key auth as {:?}", selector);
self.credentials.push(AuthCredential::Password(password));
@ -874,7 +882,7 @@ impl ServerSession {
Ok(AuthResult::Rejected) => russh::server::Auth::Reject,
Ok(AuthResult::OTPNeeded) => russh::server::Auth::Reject,
Err(error) => {
error!(session=?self, ?error, "Failed to verify credentials");
error!(?error, "Failed to verify credentials");
russh::server::Auth::Reject
}
}
@ -886,7 +894,7 @@ impl ServerSession {
response: Option<Secret<String>>,
) -> russh::server::Auth {
let selector: AuthSelector = ssh_username.expose_secret().into();
info!(session=?self, "Keyboard-interactive auth as {:?}", selector);
info!("Keyboard-interactive auth as {:?}", selector);
if let Some(otp) = response {
self.credentials.push(AuthCredential::OTP(otp));
@ -901,7 +909,7 @@ impl ServerSession {
prompts: Cow::Owned(vec![(Cow::Borrowed("One-time password: "), true)]),
},
Err(error) => {
error!(session=?self, ?error, "Failed to verify credentials");
error!(?error, "Failed to verify credentials");
russh::server::Auth::Reject
}
}
@ -949,7 +957,7 @@ impl ServerSession {
AuthSelector::Ticket { secret } => {
match authorize_ticket(&self.services.db, secret).await? {
Some(ticket) => {
info!(session=?self, "Authorized for {} with a ticket", ticket.target);
info!("Authorized for {} with a ticket", ticket.target);
self.services
.config_provider
.lock()
@ -968,9 +976,10 @@ impl ServerSession {
}
async fn _auth_accept(&mut self, username: &str, target_name: &str) {
info!(session=?self, "Authenticated");
info!(username = username, "Authenticated");
let _ = self.server_handle.set_username(username.to_string()).await;
self.username = Some(username.to_string());
let target = {
self.services
@ -987,7 +996,7 @@ impl ServerSession {
let Some((target, ssh_options)) = target else {
self.target = TargetSelection::NotFound(target_name.to_string());
info!(session=?self, "Selected target not found");
info!("Selected target not found");
return;
};
@ -997,25 +1006,25 @@ impl ServerSession {
pub async fn _channel_close(&mut self, server_channel_id: ServerChannelId) -> Result<()> {
let channel_id = self.map_channel(&server_channel_id)?;
debug!(session=?self, channel=%channel_id, "Closing channel");
debug!(channel=%channel_id, "Closing channel");
self.send_command(RCCommand::Channel(channel_id, ChannelOperation::Close));
Ok(())
}
pub async fn _channel_eof(&mut self, server_channel_id: ServerChannelId) -> Result<()> {
let channel_id = self.map_channel(&server_channel_id)?;
debug!(session=?self, channel=%channel_id, "EOF");
debug!(channel=%channel_id, "EOF");
self.send_command(RCCommand::Channel(channel_id, ChannelOperation::Eof));
Ok(())
}
// pub async fn _tcpip_forward(&mut self, address: String, port: u32) {
// info!(session=?self, %address, %port, "Remote port forwarding requested");
// info!(%address, %port, "Remote port forwarding requested");
// self.send_command(RCCommand::ForwardTCPIP(address, port));
// }
// pub async fn _cancel_tcpip_forward(&mut self, address: String, port: u32) {
// info!(session=?self, %address, %port, "Remote port forwarding cancelled");
// info!(%address, %port, "Remote port forwarding cancelled");
// self.send_command(RCCommand::CancelTCPIPForward(address, port));
// }
@ -1025,7 +1034,7 @@ impl ServerSession {
signal: Sig,
) -> Result<()> {
let channel_id = self.map_channel(&server_channel_id)?;
debug!(session=?self, channel=%channel_id, ?signal, "Signal");
debug!(channel=%channel_id, ?signal, "Signal");
self.send_command(RCCommand::Channel(
channel_id,
ChannelOperation::Signal(signal),
@ -1038,12 +1047,12 @@ impl ServerSession {
}
pub async fn _disconnect(&mut self) {
debug!(session=?self, "Client disconnect requested");
debug!("Client disconnect requested");
self.request_disconnect().await;
}
async fn request_disconnect(&mut self) {
debug!(session=?self, "Disconnecting");
debug!("Disconnecting");
let _ = self.rc_abort_tx.send(());
if self.rc_state != RCState::NotInitialized && self.rc_state != RCState::Disconnected {
self.send_command(RCCommand::Disconnect);
@ -1073,7 +1082,7 @@ impl ServerSession {
impl Drop for ServerSession {
fn drop(&mut self) {
info!(session=?self, "Closed connection");
info!("Closed connection");
debug!("Dropped");
}
}

View file

@ -4,6 +4,7 @@ use futures::StreamExt;
use std::net::ToSocketAddrs;
use tracing::*;
use warpgate_common::db::cleanup_db;
use warpgate_common::logging::install_database_logger;
use warpgate_common::{ProtocolServer, Services};
use warpgate_protocol_ssh::SSHProtocolServer;
@ -17,6 +18,8 @@ pub(crate) async fn command(cli: &crate::Cli) -> Result<()> {
let config = load_config(&cli.config, true)?;
let services = Services::new(config.clone()).await?;
install_database_logger(services.db.clone());
let mut other_futures = futures::stream::FuturesUnordered::new();
let mut protocol_futures = futures::stream::FuturesUnordered::new();
@ -52,7 +55,7 @@ pub(crate) async fn command(cli: &crate::Cli) -> Result<()> {
let services = services.clone();
async move {
loop {
let retention = { services.config.lock().await.store.retention };
let retention = { services.config.lock().await.store.log.retention };
let interval = retention / 10;
match cleanup_db(&mut *services.db.lock().await, &retention).await {
Err(error) => error!(?error, "Failed to cleanup the database"),

View file

@ -5,8 +5,10 @@ use tracing_subscriber::fmt::time::OffsetTime;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
use warpgate_common::logging::{make_database_logger_layer, make_socket_logger_layer};
use warpgate_common::WarpgateConfig;
pub fn init_logging() {
pub async fn init_logging(config: Option<&WarpgateConfig>) {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "warpgate=info")
}
@ -24,6 +26,11 @@ pub fn init_logging() {
#[cfg(all(debug_assertions, feature = "console-subscriber"))]
let registry = registry.with(console_layer);
let socket_layer = match config {
Some(config) => Some(make_socket_logger_layer(config).await),
None => None,
};
let registry = registry
.with((!console::user_attended()).then({
let env_filter = env_filter.clone();
@ -54,7 +61,9 @@ pub fn init_logging() {
env_filter.enabled(m, c.clone())
}))
}
}));
}))
.with(make_database_logger_layer())
.with(socket_layer);
registry.init();
}

View file

@ -2,6 +2,7 @@
mod commands;
mod config;
mod logging;
use crate::config::load_config;
use anyhow::Result;
use clap::StructOpt;
use logging::init_logging;
@ -44,6 +45,8 @@ enum Commands {
async fn _main() -> Result<()> {
let cli = Cli::parse();
init_logging(load_config(&cli.config, false).ok().as_ref()).await;
match &cli.command {
Commands::Run => crate::commands::run::command(&cli).await,
Commands::Hash => crate::commands::hash::command().await,
@ -62,8 +65,6 @@ async fn main() {
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();
init_logging();
if let Err(error) = _main().await {
error!(?error, "Fatal error");
std::process::exit(1);