Document request proxy and shift boundaries (#2617)

This commit is contained in:
Jonatan Kłosko 2024-05-24 19:37:41 +02:00 committed by GitHub
parent ef25a2e24b
commit 428d9ff591
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 143 additions and 50 deletions

View file

@ -1,7 +1,14 @@
defmodule Livebook.Proxy.Adapter do
@moduledoc false
# Implements a Plug adapter for handling `conn` within the runtime.
#
# All actions are forwarded to the parent process (`Livebook.Proxy.Server`),
# which operates within Livebook itself.
@behaviour Plug.Conn.Adapter
@impl true
def send_resp({pid, ref}, status, headers, body) do
send(pid, {:send_resp, self(), ref, status, headers, body})
@ -11,6 +18,7 @@ defmodule Livebook.Proxy.Adapter do
end
end
@impl true
def get_peer_data({pid, ref}) do
send(pid, {:get_peer_data, self(), ref})
@ -20,6 +28,7 @@ defmodule Livebook.Proxy.Adapter do
end
end
@impl true
def get_http_protocol({pid, ref}) do
send(pid, {:get_http_protocol, self(), ref})
@ -29,6 +38,7 @@ defmodule Livebook.Proxy.Adapter do
end
end
@impl true
def read_req_body({pid, ref}, opts) do
send(pid, {:read_req_body, self(), ref, opts})
@ -40,6 +50,7 @@ defmodule Livebook.Proxy.Adapter do
end
end
@impl true
def send_chunked({pid, ref}, status, headers) do
send(pid, {:send_chunked, self(), ref, status, headers})
@ -49,6 +60,7 @@ defmodule Livebook.Proxy.Adapter do
end
end
@impl true
def chunk({pid, ref}, chunk) do
send(pid, {:chunk, self(), ref, chunk})
@ -59,6 +71,7 @@ defmodule Livebook.Proxy.Adapter do
end
end
@impl true
def inform({pid, ref}, status, headers) do
send(pid, {:inform, self(), ref, status, headers})
@ -68,6 +81,7 @@ defmodule Livebook.Proxy.Adapter do
end
end
@impl true
def send_file({pid, ref}, status, headers, path, offset, length) do
%File.Stat{type: :regular, size: size} = File.stat!(path)
@ -90,7 +104,10 @@ defmodule Livebook.Proxy.Adapter do
end
end
@impl true
def upgrade(_payload, _protocol, _opts), do: {:error, :not_supported}
@impl true
def push(_payload, _path, _headers), do: {:error, :not_supported}
defp exit_fun(fun, arity, reason) do

View file

@ -1,21 +1,58 @@
defmodule Livebook.Proxy.Handler do
@moduledoc false
# Handles request `conn` with the configured function.
#
# The handler forwards all actual communication to the parent
# `Livebook.Proxy.Server` via `Livebook.Proxy.Adapter`.
#
# A handler process is started on demand under a task supervisor.
# To avoid bottlenecks, we use a partition supervisor, so that we
# have a group of task supervisors ready.
@name __MODULE__
@doc """
Returns a child spec to setup the handler supervision tree.
Expects the `:listen` option to be provided, and be a function with
the signature `Plug.Conn.t() -> Plug.Conn.t()`.
"""
@spec child_spec(keyword()) :: Supervisor.child_spec()
def child_spec(opts) do
name = Keyword.fetch!(opts, :name)
listen = Keyword.fetch!(opts, :listen)
:persistent_term.put({__MODULE__, name}, listen)
PartitionSupervisor.child_spec(child_spec: Task.Supervisor, name: name)
:persistent_term.put({__MODULE__, :listen}, listen)
PartitionSupervisor.child_spec(child_spec: Task.Supervisor, name: @name)
end
def serve(parent, name, data) when is_pid(parent) and is_atom(name) do
Process.link(parent)
ref = Process.monitor(parent)
conn = struct!(Plug.Conn, %{data | adapter: {Livebook.Proxy.Adapter, {parent, ref}}})
:persistent_term.get({__MODULE__, name}).(conn)
@doc """
Handles request with the configured listener function.
Restores `%Plug.Conn{}` from the given attributes and delegates
all response handling back to the parent `Livebook.Proxy.Server`.
"""
@spec serve(pid(), map()) :: Plug.Conn.t()
def serve(parent_pid, %{} = conn_attrs) when is_pid(parent_pid) do
Process.link(parent_pid)
ref = Process.monitor(parent_pid)
conn =
struct!(Plug.Conn, %{conn_attrs | adapter: {Livebook.Proxy.Adapter, {parent_pid, ref}}})
listen = :persistent_term.get({__MODULE__, :listen})
listen.(conn)
end
def get_pid(name, key) do
GenServer.whereis({:via, PartitionSupervisor, {name, key}})
@doc """
Returns a pid of task supervisor to start the handler under.
In case no supervisor is running, returns `nil`.
"""
@spec get_supervisor_pid() :: pid() | nil
def get_supervisor_pid() do
if Process.whereis(@name) do
key = :rand.uniform()
GenServer.whereis({:via, PartitionSupervisor, {@name, key}})
end
end
end

View file

@ -1,15 +1,39 @@
defmodule Livebook.Proxy.Server do
@moduledoc false
# The entrypoint for delegating `conn` handling to a runtime.
#
# The `Livebook.Proxy` modules are an implementation detail of the
# runtime. `Livebook.Proxy.Server` lives on the Livebook-side and
# it delegates request handling to `Livebook.Proxy.Handler`, which
# lives in the runtime node. The handler uses a custom plug adapter
# that dispatches `%Plug.Conn{}` operations as messages back to the
# server.
#
# Note that the server is not itself a new process, it is whoever
# calls `serve/2`.
import Plug.Conn
def serve(pid, name, %Plug.Conn{} = conn) when is_pid(pid) and is_atom(name) do
args = [self(), name, build_client_conn(conn)]
{:ok, spawn_pid} = Task.Supervisor.start_child(pid, Livebook.Proxy.Handler, :serve, args)
monitor_ref = Process.monitor(spawn_pid)
@doc """
Handles a request by delegating to a new handler process in the
runtime.
This function blocks until the request handler is done and it returns
the final `conn`.
"""
@spec serve(pid(), Plug.Conn.t()) :: Plug.Conn.t()
def serve(supervisor_pid, %Plug.Conn{} = conn) when is_pid(supervisor_pid) do
args = [self(), build_handler_conn(conn)]
{:ok, handler_pid} =
Task.Supervisor.start_child(supervisor_pid, Livebook.Proxy.Handler, :serve, args)
monitor_ref = Process.monitor(handler_pid)
loop(monitor_ref, conn)
end
defp build_client_conn(conn) do
defp build_handler_conn(conn) do
%{
adapter: nil,
host: conn.host,
@ -71,8 +95,8 @@ defmodule Livebook.Proxy.Server do
send(pid, {ref, :ok})
loop(monitor_ref, conn)
{:DOWN, ^monitor_ref, :process, _pid, reason} ->
{conn, reason}
{:DOWN, ^monitor_ref, :process, _pid, _reason} ->
conn
end
end
end

View file

@ -763,6 +763,8 @@ defprotocol Livebook.Runtime do
source: atom()
}
@type proxy_handler_spec :: {module :: module(), function :: atom(), args :: list()}
@doc """
Returns relevant information about the runtime.
@ -1103,10 +1105,15 @@ defprotocol Livebook.Runtime do
def unregister_clients(runtime, client_ids)
@doc """
Fetches the running Proxy Handler's pid from runtime.
Fetches information about a proxy request handler, if available.
TODO: document the communication here.
When the handler is available, this function returns MFA. In order
to handle a connection, the caller should invoke the MFA, appending
`conn` to the argument list, where `conn` is a `%Plug.Conn{}` struct
for the specific request.
Once done, the handler MFA should return the final `conn`.
"""
@spec fetch_proxy_handler(t(), pid()) :: {:ok, pid()} | {:error, :not_found}
def fetch_proxy_handler(runtime, client_pid)
@spec fetch_proxy_handler_spec(t()) :: {:ok, proxy_handler_spec()} | {:error, :not_found}
def fetch_proxy_handler_spec(runtime)
end

View file

@ -205,7 +205,7 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end
def fetch_proxy_handler(runtime, client_pid) do
RuntimeServer.fetch_proxy_handler(runtime.server_pid, client_pid)
def fetch_proxy_handler_spec(runtime) do
RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid)
end
end

View file

@ -324,7 +324,7 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end
def fetch_proxy_handler(runtime, client_pid) do
RuntimeServer.fetch_proxy_handler(runtime.server_pid, client_pid)
def fetch_proxy_handler_spec(runtime) do
RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid)
end
end

View file

@ -171,8 +171,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end
def fetch_proxy_handler(runtime, client_pid) do
RuntimeServer.fetch_proxy_handler(runtime.server_pid, client_pid)
def fetch_proxy_handler_spec(runtime) do
RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid)
end
defp config() do

View file

@ -318,11 +318,14 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
end
@doc """
Fetches the running Proxy Handler's pid from runtime.
Fetches information about a proxy request handler, if available.
"""
@spec fetch_proxy_handler(pid(), pid()) :: {:ok, pid()} | {:error, :not_found}
def fetch_proxy_handler(pid, client_pid) do
GenServer.call(pid, {:fetch_proxy_handler, client_pid})
@spec fetch_proxy_handler_spec(pid()) ::
{:ok, {module(), atom(), list()}} | {:error, :not_found}
def fetch_proxy_handler_spec(pid) do
with {:ok, supervisor_pid} <- GenServer.call(pid, :fetch_proxy_handler_supervisor) do
{:ok, {Livebook.Proxy.Server, :serve, [supervisor_pid]}}
end
end
@doc """
@ -752,9 +755,9 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
{:reply, has_dependencies?, state}
end
def handle_call({:fetch_proxy_handler, client_pid}, _from, state) do
if pid = Livebook.Proxy.Handler.get_pid(Kino.Proxy, client_pid) do
{:reply, {:ok, pid}, state}
def handle_call(:fetch_proxy_handler_supervisor, _from, state) do
if supervisor_pid = Livebook.Proxy.Handler.get_supervisor_pid() do
{:reply, {:ok, supervisor_pid}, state}
else
{:reply, {:error, :not_found}, state}
end

View file

@ -383,7 +383,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
end
defp io_request({:livebook_get_proxy_handler_child_spec, fun}, state) do
result = {Livebook.Proxy.Handler, name: Kino.Proxy, listen: fun}
result = {Livebook.Proxy.Handler, listen: fun}
{result, state}
end

View file

@ -625,11 +625,12 @@ defmodule Livebook.Session do
end
@doc """
Fetches the running Proxy Handler's pid from runtime.
Fetches information about a proxy request handler, if available.
"""
@spec fetch_proxy_handler(pid(), pid()) :: {:ok, pid()} | {:error, :not_found | :disconnected}
def fetch_proxy_handler(pid, client_pid) do
GenServer.call(pid, {:fetch_proxy_handler, client_pid})
@spec fetch_proxy_handler_spec(pid()) ::
{:ok, Runtime.proxy_handler_spec()} | {:error, :not_found | :disconnected}
def fetch_proxy_handler_spec(pid) do
GenServer.call(pid, :fetch_proxy_handler_spec)
end
@doc """
@ -1081,9 +1082,9 @@ defmodule Livebook.Session do
{:noreply, state}
end
def handle_call({:fetch_proxy_handler, client_pid}, _from, state) do
def handle_call(:fetch_proxy_handler_spec, _from, state) do
if Runtime.connected?(state.data.runtime) do
{:reply, Runtime.fetch_proxy_handler(state.data.runtime, client_pid), state}
{:reply, Runtime.fetch_proxy_handler_spec(state.data.runtime), state}
else
{:reply, {:error, :disconnected}, state}
end

View file

@ -1,5 +1,6 @@
defmodule LivebookWeb.ProxyPlug do
@behaviour Plug
import Plug.Conn
alias LivebookWeb.NotFoundError
@ -10,10 +11,9 @@ defmodule LivebookWeb.ProxyPlug do
@impl true
def call(%{path_info: ["sessions", id, "proxy" | path_info]} = conn, _opts) do
session = fetch_session!(id)
pid = fetch_proxy_handler!(session)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["sessions", id, "proxy"])
{conn, _} = Livebook.Proxy.Server.serve(pid, Kino.Proxy, conn)
conn = call_proxy_handler(proxy_handler_spec, conn)
halt(conn)
end
@ -25,10 +25,9 @@ defmodule LivebookWeb.ProxyPlug do
end
session = fetch_session!(id)
pid = fetch_proxy_handler!(session)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["apps", slug, "proxy"])
{conn, _} = Livebook.Proxy.Server.serve(pid, Kino.Proxy, conn)
conn = call_proxy_handler(proxy_handler_spec, conn)
halt(conn)
end
@ -50,8 +49,8 @@ defmodule LivebookWeb.ProxyPlug do
end
end
defp fetch_proxy_handler!(session) do
case Livebook.Session.fetch_proxy_handler(session.pid, self()) do
defp fetch_proxy_handler_spec!(session) do
case Livebook.Session.fetch_proxy_handler_spec(session.pid) do
{:ok, pid} -> pid
{:error, _} -> raise NotFoundError, "could not find a kino proxy running"
end
@ -60,4 +59,9 @@ defmodule LivebookWeb.ProxyPlug do
defp prepare_conn(conn, path_info, script_name) do
%{conn | path_info: path_info, script_name: conn.script_name ++ script_name}
end
defp call_proxy_handler(proxy_handler_spec, conn) do
{module, function, args} = proxy_handler_spec
apply(module, function, args ++ [conn])
end
end

View file

@ -73,7 +73,7 @@ defmodule Livebook.Runtime.NoopRuntime do
def register_clients(_, _), do: :ok
def unregister_clients(_, _), do: :ok
def fetch_proxy_handler(_, _), do: {:error, :not_found}
def fetch_proxy_handler_spec(_), do: {:error, :not_found}
defp trace(runtime, fun, args) do
if runtime.trace_to do