mirror of
https://github.com/warp-tech/warpgate.git
synced 2025-09-04 21:55:22 +08:00
correctly bind to both ipv4 and ipv6 when [::] is set as listen endpoint (#1193)
This commit is contained in:
parent
4e56ace5b8
commit
6ade841378
24 changed files with 165 additions and 70 deletions
9
Cargo.lock
generated
9
Cargo.lock
generated
|
@ -2250,7 +2250,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"windows-targets 0.48.5",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5166,9 +5166,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "tokio-stream"
|
||||
version = "0.1.16"
|
||||
version = "0.1.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1"
|
||||
checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
|
@ -5704,6 +5704,7 @@ dependencies = [
|
|||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-stream",
|
||||
"totp-rs",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
|
@ -5838,6 +5839,7 @@ dependencies = [
|
|||
"async-trait",
|
||||
"bytes",
|
||||
"flate2",
|
||||
"futures",
|
||||
"mysql_common",
|
||||
"once_cell",
|
||||
"password-hash 0.2.1",
|
||||
|
@ -5864,6 +5866,7 @@ dependencies = [
|
|||
"anyhow",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"futures",
|
||||
"pgwire",
|
||||
"rsasl",
|
||||
"rustls 0.23.12",
|
||||
|
|
|
@ -27,6 +27,8 @@ serde_json = "1.0"
|
|||
russh = { version = "0.49.0" }
|
||||
russh-keys = { version = "0.49.0" }
|
||||
tracing = "0.1"
|
||||
futures = "0.3"
|
||||
tokio-stream = { version = "0.1.17", features = ["net"] }
|
||||
|
||||
[profile.release]
|
||||
lto = true
|
||||
|
|
|
@ -167,7 +167,7 @@ class Test:
|
|||
"-v",
|
||||
*common_args,
|
||||
"-L",
|
||||
f"{local_port}:neverssl.com:80",
|
||||
f"{local_port}:github.com:443",
|
||||
"-N",
|
||||
password="123",
|
||||
)
|
||||
|
@ -178,8 +178,8 @@ class Test:
|
|||
|
||||
s = requests.Session()
|
||||
retries = requests.adapters.Retry(total=5, backoff_factor=1)
|
||||
s.mount("http://", requests.adapters.HTTPAdapter(max_retries=retries))
|
||||
response = s.get(f"http://localhost:{local_port}", timeout=timeout)
|
||||
s.mount("https://", requests.adapters.HTTPAdapter(max_retries=retries))
|
||||
response = s.get(f"https://localhost:{local_port}", timeout=timeout, verify=False)
|
||||
assert response.status_code == 200
|
||||
ssh_client.kill()
|
||||
|
||||
|
@ -193,6 +193,7 @@ class Test:
|
|||
user, ssh_target = setup_user_and_target(
|
||||
processes, shared_wg, wg_c_ed25519_pubkey
|
||||
)
|
||||
fw_port = alloc_port()
|
||||
pf_client = processes.start_ssh_client(
|
||||
f"{user.username}:{ssh_target.name}@localhost",
|
||||
"-p",
|
||||
|
@ -200,11 +201,11 @@ class Test:
|
|||
"-v",
|
||||
*common_args,
|
||||
"-R",
|
||||
"1234:neverssl.com:80",
|
||||
f"{fw_port}:www.google.com:443",
|
||||
"-N",
|
||||
password="123",
|
||||
)
|
||||
time.sleep(5)
|
||||
# time.sleep(5)
|
||||
ssh_client = processes.start_ssh_client(
|
||||
f"{user.username}:{ssh_target.name}@localhost",
|
||||
"-p",
|
||||
|
@ -212,13 +213,15 @@ class Test:
|
|||
"-v",
|
||||
*common_args,
|
||||
"curl",
|
||||
"-v",
|
||||
"http://localhost:1234",
|
||||
"-vk",
|
||||
"--http1.1",
|
||||
"-H", "Host: www.google.com",
|
||||
f"https://localhost:{fw_port}",
|
||||
password="123",
|
||||
)
|
||||
output = ssh_client.communicate(timeout=timeout)[0]
|
||||
assert ssh_client.returncode == 0
|
||||
assert b"<html>" in output
|
||||
assert b"</html>" in output
|
||||
pf_client.kill()
|
||||
|
||||
def test_shell(
|
||||
|
|
|
@ -9,7 +9,7 @@ anyhow = { version = "1.0", features = ["std"] }
|
|||
async-trait = "0.1"
|
||||
bytes.workspace = true
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
futures = "0.3"
|
||||
futures.workspace = true
|
||||
hex = "0.4"
|
||||
mime_guess = { version = "2.0", default-features = false }
|
||||
poem = { version = "3.1", features = [
|
||||
|
|
|
@ -13,7 +13,7 @@ chrono = { version = "0.4", default-features = false, features = ["serde"] }
|
|||
data-encoding.workspace = true
|
||||
delegate = "0.6"
|
||||
humantime-serde = "1.1"
|
||||
futures = "0.3"
|
||||
futures.workspace = true
|
||||
once_cell = "1.17"
|
||||
password-hash = "0.4"
|
||||
poem = { version = "3.1", features = ["rustls"] }
|
||||
|
@ -47,3 +47,4 @@ rustls = "0.23"
|
|||
rustls-pemfile = "1.0"
|
||||
webpki = "0.22"
|
||||
aho-corasick = "1.1.3"
|
||||
tokio-stream.workspace = true
|
||||
|
|
|
@ -41,17 +41,17 @@ pub(crate) fn _default_database_url() -> Secret<String> {
|
|||
|
||||
#[inline]
|
||||
pub(crate) fn _default_http_listen() -> ListenEndpoint {
|
||||
ListenEndpoint(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 8888))
|
||||
ListenEndpoint::from(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 8888))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn _default_mysql_listen() -> ListenEndpoint {
|
||||
ListenEndpoint(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 33306))
|
||||
ListenEndpoint::from(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 33306))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn _default_postgres_listen() -> ListenEndpoint {
|
||||
ListenEndpoint(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 55432))
|
||||
ListenEndpoint::from(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 55432))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -75,7 +75,7 @@ pub(crate) fn _default_empty_vec<T>() -> Vec<T> {
|
|||
}
|
||||
|
||||
pub(crate) fn _default_ssh_listen() -> ListenEndpoint {
|
||||
ListenEndpoint(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 2222))
|
||||
ListenEndpoint::from(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 2222))
|
||||
}
|
||||
|
||||
pub(crate) fn _default_ssh_keys_path() -> String {
|
||||
|
|
|
@ -37,6 +37,8 @@ pub enum WarpgateError {
|
|||
Sso(#[from] SsoError),
|
||||
#[error(transparent)]
|
||||
RusshKeys(#[from] russh_keys::Error),
|
||||
#[error("I/O: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
|
||||
#[error("Session end")]
|
||||
SessionEnd,
|
||||
|
|
|
@ -1,16 +1,85 @@
|
|||
use std::fmt::Debug;
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
use std::ops::Deref;
|
||||
use std::io::ErrorKind;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs};
|
||||
|
||||
use futures::stream::{iter, FuturesUnordered};
|
||||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
use poem::listener::Listener;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_stream::wrappers::TcpListenerStream;
|
||||
|
||||
use crate::WarpgateError;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ListenEndpoint(pub SocketAddr);
|
||||
pub struct ListenEndpoint(SocketAddr);
|
||||
|
||||
impl Deref for ListenEndpoint {
|
||||
type Target = SocketAddr;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
impl ListenEndpoint {
|
||||
pub fn addresses_to_listen_on(&self) -> Result<Vec<SocketAddr>, WarpgateError> {
|
||||
// For [::], explicitly return both addresses so that we are not affected
|
||||
// by the state of the ipv6only sysctl.
|
||||
if self.0.ip() == Ipv6Addr::UNSPECIFIED {
|
||||
let addr6 = SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), self.0.port());
|
||||
let addr4 = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), self.0.port());
|
||||
let listener6 = std::net::TcpListener::bind(addr6)?;
|
||||
let listener4 = std::net::TcpListener::bind(addr4);
|
||||
let result = match listener4 {
|
||||
Ok(_) => vec![addr4, addr6],
|
||||
Err(e) if e.kind() == ErrorKind::AddrInUse => vec![addr6],
|
||||
Err(e) => return Err(WarpgateError::Io(e)),
|
||||
};
|
||||
drop(listener6);
|
||||
Ok(result)
|
||||
} else {
|
||||
Ok(vec![self.0])
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn tcp_listeners(&self) -> Result<Vec<TcpListener>, WarpgateError> {
|
||||
Ok(self
|
||||
.addresses_to_listen_on()?
|
||||
.into_iter()
|
||||
.map(TcpListener::bind)
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.try_collect()
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn poem_listener(&self) -> Result<poem::listener::BoxListener, WarpgateError> {
|
||||
let addrs = self.addresses_to_listen_on()?;
|
||||
#[allow(clippy::unwrap_used)] // length known >=1
|
||||
let (first, rest) = addrs.split_first().unwrap();
|
||||
let mut listener: poem::listener::BoxListener =
|
||||
poem::listener::TcpListener::bind(first.to_string()).boxed();
|
||||
for addr in rest {
|
||||
listener = listener
|
||||
.combine(poem::listener::TcpListener::bind(addr.to_string()))
|
||||
.boxed();
|
||||
}
|
||||
|
||||
Ok(listener)
|
||||
}
|
||||
|
||||
pub async fn tcp_accept_stream(
|
||||
&self,
|
||||
) -> Result<impl Stream<Item = std::io::Result<TcpStream>>, WarpgateError> {
|
||||
Ok(iter(
|
||||
self.tcp_listeners()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(TcpListenerStream::new),
|
||||
)
|
||||
.flatten_unordered(None))
|
||||
}
|
||||
|
||||
pub fn port(&self) -> u16 {
|
||||
self.0.port()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SocketAddr> for ListenEndpoint {
|
||||
fn from(addr: SocketAddr) -> Self {
|
||||
Self(addr)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ bytes.workspace = true
|
|||
chrono = { version = "0.4", default-features = false, features = ["serde"] }
|
||||
data-encoding.workspace = true
|
||||
humantime-serde = "1.1"
|
||||
futures = "0.3"
|
||||
futures.workspace = true
|
||||
once_cell = "1.17"
|
||||
packet = "0.1"
|
||||
password-hash = "0.4"
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
mod handle;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
pub use handle::{SessionHandle, WarpgateServerHandle};
|
||||
use warpgate_common::Target;
|
||||
use warpgate_common::{ListenEndpoint, Target};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TargetTestError {
|
||||
|
@ -22,6 +21,6 @@ pub enum TargetTestError {
|
|||
|
||||
#[async_trait]
|
||||
pub trait ProtocolServer {
|
||||
async fn run(self, address: SocketAddr) -> Result<()>;
|
||||
async fn run(self, address: ListenEndpoint) -> Result<()>;
|
||||
async fn test_target(&self, target: Target) -> Result<(), TargetTestError>;
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ chrono = { version = "0.4", default-features = false, features = ["serde"] }
|
|||
cookie = "0.17"
|
||||
data-encoding.workspace = true
|
||||
delegate = "0.6"
|
||||
futures = "0.3"
|
||||
futures.workspace = true
|
||||
http = "1.0"
|
||||
once_cell = "1.17"
|
||||
poem = { version = "3.1", features = [
|
||||
|
|
|
@ -10,7 +10,6 @@ mod session;
|
|||
mod session_handle;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
|
@ -21,7 +20,7 @@ pub use common::{SsoLoginState, PROTOCOL_NAME};
|
|||
use http::HeaderValue;
|
||||
use logging::{get_client_ip, log_request_error, log_request_result, span_for_request};
|
||||
use poem::endpoint::{EmbeddedFileEndpoint, EmbeddedFilesEndpoint};
|
||||
use poem::listener::{Listener, RustlsConfig, TcpListener};
|
||||
use poem::listener::{Listener, RustlsConfig};
|
||||
use poem::middleware::SetHeader;
|
||||
use poem::session::{CookieConfig, MemoryStorage, ServerSession, Session};
|
||||
use poem::web::Data;
|
||||
|
@ -31,7 +30,8 @@ use tokio::sync::Mutex;
|
|||
use tracing::*;
|
||||
use warpgate_admin::admin_api_app;
|
||||
use warpgate_common::{
|
||||
Target, TargetOptions, TlsCertificateAndPrivateKey, TlsCertificateBundle, TlsPrivateKey,
|
||||
ListenEndpoint, Target, TargetOptions, TlsCertificateAndPrivateKey, TlsCertificateBundle,
|
||||
TlsPrivateKey,
|
||||
};
|
||||
use warpgate_core::{ProtocolServer, Services, TargetTestError};
|
||||
use warpgate_web::Assets;
|
||||
|
@ -59,7 +59,7 @@ fn make_session_storage() -> SharedSessionStorage {
|
|||
|
||||
#[async_trait]
|
||||
impl ProtocolServer for HTTPProtocolServer {
|
||||
async fn run(self, address: SocketAddr) -> Result<()> {
|
||||
async fn run(self, address: ListenEndpoint) -> Result<()> {
|
||||
let admin_api_app = admin_api_app(&self.services).into_endpoint();
|
||||
let api_service = OpenApiService::new(
|
||||
crate::api::get(),
|
||||
|
@ -205,7 +205,9 @@ impl ProtocolServer for HTTPProtocolServer {
|
|||
|
||||
info!(?address, "Listening");
|
||||
Server::new(
|
||||
TcpListener::bind(address)
|
||||
address
|
||||
.poem_listener()
|
||||
.await?
|
||||
.rustls(RustlsConfig::new().fallback(certificate_and_key.into())),
|
||||
)
|
||||
.run(app)
|
||||
|
|
|
@ -11,6 +11,7 @@ warpgate-db-entities = { version = "*", path = "../warpgate-db-entities" }
|
|||
warpgate-database-protocols = { version = "*", path = "../warpgate-database-protocols" }
|
||||
anyhow = { version = "1.0", features = ["std"] }
|
||||
async-trait = "0.1"
|
||||
futures.workspace = true
|
||||
tokio = { version = "1.20", features = ["tracing", "signal"] }
|
||||
tracing.workspace = true
|
||||
uuid = { version = "1.3", features = ["v4"] }
|
||||
|
|
|
@ -6,19 +6,18 @@ mod session;
|
|||
mod session_handle;
|
||||
mod stream;
|
||||
use std::fmt::Debug;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use client::{ConnectionOptions, MySqlClient};
|
||||
use futures::TryStreamExt;
|
||||
use rustls::server::NoClientAuth;
|
||||
use rustls::ServerConfig;
|
||||
use tokio::net::TcpListener;
|
||||
use tracing::*;
|
||||
use warpgate_common::{
|
||||
ResolveServerCert, Target, TargetOptions, TlsCertificateAndPrivateKey, TlsCertificateBundle,
|
||||
TlsPrivateKey,
|
||||
ListenEndpoint, ResolveServerCert, Target, TargetOptions, TlsCertificateAndPrivateKey,
|
||||
TlsCertificateBundle, TlsPrivateKey,
|
||||
};
|
||||
use warpgate_core::{ProtocolServer, Services, SessionStateInit, TargetTestError};
|
||||
|
||||
|
@ -39,7 +38,7 @@ impl MySQLProtocolServer {
|
|||
|
||||
#[async_trait]
|
||||
impl ProtocolServer for MySQLProtocolServer {
|
||||
async fn run(self, address: SocketAddr) -> Result<()> {
|
||||
async fn run(self, address: ListenEndpoint) -> Result<()> {
|
||||
let certificate_and_key = {
|
||||
let config = self.services.config.lock().await;
|
||||
let certificate_path = config
|
||||
|
@ -72,9 +71,15 @@ impl ProtocolServer for MySQLProtocolServer {
|
|||
))));
|
||||
|
||||
info!(?address, "Listening");
|
||||
let listener = TcpListener::bind(address).await?;
|
||||
|
||||
let mut listener = address.tcp_accept_stream().await?;
|
||||
|
||||
loop {
|
||||
let (stream, remote_address) = listener.accept().await?;
|
||||
let Some(stream) = listener.try_next().await? else {
|
||||
return Ok(());
|
||||
};
|
||||
let remote_address = stream.peer_addr()?;
|
||||
|
||||
let tls_config = tls_config.clone();
|
||||
let services = self.services.clone();
|
||||
tokio::spawn(async move {
|
||||
|
|
|
@ -20,3 +20,4 @@ thiserror = "1.0"
|
|||
rustls-native-certs = "0.6"
|
||||
pgwire = { version = "0.25" }
|
||||
rsasl = { version = "2.1.0", default-features = false, features = ["config_builder", "scram-sha-2", "std", "plain", "provider"] }
|
||||
futures.workspace = true
|
||||
|
|
|
@ -7,21 +7,20 @@ mod session_handle;
|
|||
mod stream;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use client::{ConnectionOptions, PostgresClient};
|
||||
use futures::TryStreamExt;
|
||||
use rustls::server::NoClientAuth;
|
||||
use rustls::ServerConfig;
|
||||
use session::PostgresSession;
|
||||
use session_handle::PostgresSessionHandle;
|
||||
use tokio::net::TcpListener;
|
||||
use tracing::*;
|
||||
use warpgate_common::{
|
||||
ResolveServerCert, Target, TargetOptions, TlsCertificateAndPrivateKey, TlsCertificateBundle,
|
||||
TlsPrivateKey,
|
||||
ListenEndpoint, ResolveServerCert, Target, TargetOptions, TlsCertificateAndPrivateKey,
|
||||
TlsCertificateBundle, TlsPrivateKey,
|
||||
};
|
||||
use warpgate_core::{ProtocolServer, Services, SessionStateInit, TargetTestError};
|
||||
|
||||
|
@ -39,7 +38,7 @@ impl PostgresProtocolServer {
|
|||
|
||||
#[async_trait]
|
||||
impl ProtocolServer for PostgresProtocolServer {
|
||||
async fn run(self, address: SocketAddr) -> Result<()> {
|
||||
async fn run(self, address: ListenEndpoint) -> Result<()> {
|
||||
let certificate_and_key = {
|
||||
let config = self.services.config.lock().await;
|
||||
let certificate_path = config
|
||||
|
@ -72,9 +71,14 @@ impl ProtocolServer for PostgresProtocolServer {
|
|||
))));
|
||||
|
||||
info!(?address, "Listening");
|
||||
let listener = TcpListener::bind(address).await?;
|
||||
let mut listener = address.tcp_accept_stream().await?;
|
||||
loop {
|
||||
let (stream, remote_address) = listener.accept().await?;
|
||||
let Some(stream) = listener.try_next().await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let remote_address = stream.peer_addr()?;
|
||||
|
||||
let tls_config = tls_config.clone();
|
||||
let services = self.services.clone();
|
||||
tokio::spawn(async move {
|
||||
|
|
|
@ -13,7 +13,7 @@ bytes.workspace = true
|
|||
dialoguer = "0.10"
|
||||
curve25519-dalek = "4.0.0" # pin due to build fail on x86
|
||||
ed25519-dalek = "2.0.0" # pin due to build fail on x86 in 2.1
|
||||
futures = "0.3"
|
||||
futures.workspace = true
|
||||
russh.workspace = true
|
||||
sea-orm = { version = "0.12", features = [
|
||||
"runtime-tokio-rustls",
|
||||
|
|
|
@ -6,7 +6,6 @@ mod keys;
|
|||
mod known_hosts;
|
||||
mod server;
|
||||
use std::fmt::Debug;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
|
@ -15,7 +14,9 @@ pub use common::*;
|
|||
pub use keys::*;
|
||||
pub use server::run_server;
|
||||
use uuid::Uuid;
|
||||
use warpgate_common::{ProtocolName, SshHostKeyVerificationMode, Target, TargetOptions};
|
||||
use warpgate_common::{
|
||||
ListenEndpoint, ProtocolName, SshHostKeyVerificationMode, Target, TargetOptions,
|
||||
};
|
||||
use warpgate_core::{ProtocolServer, Services, TargetTestError};
|
||||
|
||||
pub static PROTOCOL_NAME: ProtocolName = "SSH";
|
||||
|
@ -38,7 +39,7 @@ impl SSHProtocolServer {
|
|||
|
||||
#[async_trait]
|
||||
impl ProtocolServer for SSHProtocolServer {
|
||||
async fn run(self, address: SocketAddr) -> Result<()> {
|
||||
async fn run(self, address: ListenEndpoint) -> Result<()> {
|
||||
run_server(self.services, address).await
|
||||
}
|
||||
|
||||
|
|
|
@ -5,25 +5,25 @@ mod session;
|
|||
mod session_handle;
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Debug;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use futures::TryStreamExt;
|
||||
use russh::keys::{Algorithm, HashAlg};
|
||||
use russh::{MethodSet, Preferred};
|
||||
pub use russh_handler::ServerHandler;
|
||||
pub use session::ServerSession;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
use tracing::*;
|
||||
use warpgate_common::ListenEndpoint;
|
||||
use warpgate_core::{Services, SessionStateInit};
|
||||
|
||||
use crate::keys::load_host_keys;
|
||||
use crate::server::session_handle::SSHSessionHandle;
|
||||
|
||||
pub async fn run_server(services: Services, address: SocketAddr) -> Result<()> {
|
||||
pub async fn run_server(services: Services, address: ListenEndpoint) -> Result<()> {
|
||||
let russh_config = {
|
||||
let config = services.config.lock().await;
|
||||
russh::server::Config {
|
||||
|
@ -53,9 +53,11 @@ pub async fn run_server(services: Services, address: SocketAddr) -> Result<()> {
|
|||
|
||||
let russh_config = Arc::new(russh_config);
|
||||
|
||||
let socket = TcpListener::bind(&address).await?;
|
||||
let mut listener = address.tcp_accept_stream().await?;
|
||||
|
||||
info!(?address, "Listening");
|
||||
while let Ok((socket, remote_address)) = socket.accept().await {
|
||||
while let Some(stream) = listener.try_next().await? {
|
||||
let remote_address = stream.peer_addr()?;
|
||||
let russh_config = russh_config.clone();
|
||||
|
||||
let (session_handle, session_handle_rx) = SSHSessionHandle::new();
|
||||
|
@ -101,7 +103,7 @@ pub async fn run_server(services: Services, address: SocketAddr) -> Result<()> {
|
|||
|
||||
tokio::task::Builder::new()
|
||||
.name(&format!("SSH {id} protocol"))
|
||||
.spawn(_run_stream(russh_config, socket, handler))?;
|
||||
.spawn(_run_stream(russh_config, stream, handler))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -15,4 +15,4 @@ serde_json.workspace = true
|
|||
once_cell = "1.17"
|
||||
jsonwebtoken = "8"
|
||||
data-encoding.workspace = true
|
||||
futures = "0.3"
|
||||
futures.workspace = true
|
||||
|
|
|
@ -166,7 +166,7 @@
|
|||
{#if target.options.auth.kind === 'PublicKey'}
|
||||
<a
|
||||
class="btn btn-link mb-3 d-flex align-items-center"
|
||||
href="/@warpgate/admin#/ssh"
|
||||
href="/@warpgate/admin#/config/ssh"
|
||||
target="_blank">
|
||||
<Fa fw icon={faExternalLink} />
|
||||
</a>
|
||||
|
|
|
@ -16,7 +16,7 @@ console = { version = "0.15", default-features = false }
|
|||
console-subscriber = { version = "0.1", optional = true }
|
||||
data-encoding.workspace = true
|
||||
dialoguer = "0.10"
|
||||
futures = "0.3"
|
||||
futures.workspace = true
|
||||
notify = "5.1"
|
||||
rcgen = { version = "0.10", features = ["zeroize"] }
|
||||
serde_json.workspace = true
|
||||
|
|
|
@ -45,7 +45,7 @@ pub(crate) async fn command(cli: &crate::Cli, enable_admin_token: bool) -> Resul
|
|||
protocol_futures.push(
|
||||
SSHProtocolServer::new(&services)
|
||||
.await?
|
||||
.run(*config.store.ssh.listen),
|
||||
.run(config.store.ssh.listen.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ pub(crate) async fn command(cli: &crate::Cli, enable_admin_token: bool) -> Resul
|
|||
protocol_futures.push(
|
||||
HTTPProtocolServer::new(&services)
|
||||
.await?
|
||||
.run(*config.store.http.listen),
|
||||
.run(config.store.http.listen.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ pub(crate) async fn command(cli: &crate::Cli, enable_admin_token: bool) -> Resul
|
|||
protocol_futures.push(
|
||||
MySQLProtocolServer::new(&services)
|
||||
.await?
|
||||
.run(*config.store.mysql.listen),
|
||||
.run(config.store.mysql.listen.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ pub(crate) async fn command(cli: &crate::Cli, enable_admin_token: bool) -> Resul
|
|||
protocol_futures.push(
|
||||
PostgresProtocolServer::new(&services)
|
||||
.await?
|
||||
.run(*config.store.postgres.listen),
|
||||
.run(config.store.postgres.listen.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ fn prompt_endpoint(prompt: &str, default: ListenEndpoint) -> ListenEndpoint {
|
|||
.and_then(|v| v.to_socket_addrs());
|
||||
match v {
|
||||
Ok(mut addr) => match addr.next() {
|
||||
Some(addr) => return ListenEndpoint(addr),
|
||||
Some(addr) => return ListenEndpoint::from(addr),
|
||||
None => {
|
||||
error!("No endpoints resolved");
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ pub(crate) async fn command(cli: &crate::Cli) -> Result<()> {
|
|||
store.http.enable = true;
|
||||
if let Commands::UnattendedSetup { http_port, .. } = &cli.command {
|
||||
store.http.listen =
|
||||
ListenEndpoint(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), *http_port));
|
||||
ListenEndpoint::from(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), *http_port));
|
||||
} else {
|
||||
if !is_docker() {
|
||||
store.http.listen = prompt_endpoint(
|
||||
|
@ -152,7 +152,7 @@ pub(crate) async fn command(cli: &crate::Cli) -> Result<()> {
|
|||
if let Some(ssh_port) = ssh_port {
|
||||
store.ssh.enable = true;
|
||||
store.ssh.listen =
|
||||
ListenEndpoint(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), *ssh_port));
|
||||
ListenEndpoint::from(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), *ssh_port));
|
||||
}
|
||||
} else {
|
||||
if is_docker() {
|
||||
|
@ -181,7 +181,7 @@ pub(crate) async fn command(cli: &crate::Cli) -> Result<()> {
|
|||
if let Some(mysql_port) = mysql_port {
|
||||
store.mysql.enable = true;
|
||||
store.mysql.listen =
|
||||
ListenEndpoint(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), *mysql_port));
|
||||
ListenEndpoint::from(SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), *mysql_port));
|
||||
}
|
||||
} else {
|
||||
if is_docker() {
|
||||
|
@ -203,7 +203,7 @@ pub(crate) async fn command(cli: &crate::Cli) -> Result<()> {
|
|||
if let Commands::UnattendedSetup { postgres_port, .. } = &cli.command {
|
||||
if let Some(postgres_port) = postgres_port {
|
||||
store.postgres.enable = true;
|
||||
store.postgres.listen = ListenEndpoint(SocketAddr::new(
|
||||
store.postgres.listen = ListenEndpoint::from(SocketAddr::new(
|
||||
Ipv6Addr::UNSPECIFIED.into(),
|
||||
*postgres_port,
|
||||
));
|
||||
|
|
Loading…
Add table
Reference in a new issue