diff --git a/lib/livebook/proxy/adapter.ex b/lib/livebook/proxy/adapter.ex index 30c681a8a..d409e6074 100644 --- a/lib/livebook/proxy/adapter.ex +++ b/lib/livebook/proxy/adapter.ex @@ -1,7 +1,14 @@ defmodule Livebook.Proxy.Adapter do @moduledoc false + + # Implements a Plug adapter for handling `conn` within the runtime. + # + # All actions are forwarded to the parent process (`Livebook.Proxy.Server`), + # which operates within Livebook itself. + @behaviour Plug.Conn.Adapter + @impl true def send_resp({pid, ref}, status, headers, body) do send(pid, {:send_resp, self(), ref, status, headers, body}) @@ -11,6 +18,7 @@ defmodule Livebook.Proxy.Adapter do end end + @impl true def get_peer_data({pid, ref}) do send(pid, {:get_peer_data, self(), ref}) @@ -20,6 +28,7 @@ defmodule Livebook.Proxy.Adapter do end end + @impl true def get_http_protocol({pid, ref}) do send(pid, {:get_http_protocol, self(), ref}) @@ -29,6 +38,7 @@ defmodule Livebook.Proxy.Adapter do end end + @impl true def read_req_body({pid, ref}, opts) do send(pid, {:read_req_body, self(), ref, opts}) @@ -40,6 +50,7 @@ defmodule Livebook.Proxy.Adapter do end end + @impl true def send_chunked({pid, ref}, status, headers) do send(pid, {:send_chunked, self(), ref, status, headers}) @@ -49,6 +60,7 @@ defmodule Livebook.Proxy.Adapter do end end + @impl true def chunk({pid, ref}, chunk) do send(pid, {:chunk, self(), ref, chunk}) @@ -59,6 +71,7 @@ defmodule Livebook.Proxy.Adapter do end end + @impl true def inform({pid, ref}, status, headers) do send(pid, {:inform, self(), ref, status, headers}) @@ -68,6 +81,7 @@ defmodule Livebook.Proxy.Adapter do end end + @impl true def send_file({pid, ref}, status, headers, path, offset, length) do %File.Stat{type: :regular, size: size} = File.stat!(path) @@ -90,7 +104,10 @@ defmodule Livebook.Proxy.Adapter do end end + @impl true def upgrade(_payload, _protocol, _opts), do: {:error, :not_supported} + + @impl true def push(_payload, _path, _headers), do: {:error, :not_supported} defp exit_fun(fun, arity, reason) do diff --git a/lib/livebook/proxy/handler.ex b/lib/livebook/proxy/handler.ex index fcef5411e..9475758b4 100644 --- a/lib/livebook/proxy/handler.ex +++ b/lib/livebook/proxy/handler.ex @@ -1,21 +1,58 @@ defmodule Livebook.Proxy.Handler do @moduledoc false + # Handles request `conn` with the configured function. + # + # The handler forwards all actual communication to the parent + # `Livebook.Proxy.Server` via `Livebook.Proxy.Adapter`. + # + # A handler process is started on demand under a task supervisor. + # To avoid bottlenecks, we use a partition supervisor, so that we + # have a group of task supervisors ready. + + @name __MODULE__ + + @doc """ + Returns a child spec to setup the handler supervision tree. + + Expects the `:listen` option to be provided, and be a function with + the signature `Plug.Conn.t() -> Plug.Conn.t()`. + """ + @spec child_spec(keyword()) :: Supervisor.child_spec() def child_spec(opts) do - name = Keyword.fetch!(opts, :name) listen = Keyword.fetch!(opts, :listen) - :persistent_term.put({__MODULE__, name}, listen) - PartitionSupervisor.child_spec(child_spec: Task.Supervisor, name: name) + :persistent_term.put({__MODULE__, :listen}, listen) + PartitionSupervisor.child_spec(child_spec: Task.Supervisor, name: @name) end - def serve(parent, name, data) when is_pid(parent) and is_atom(name) do - Process.link(parent) - ref = Process.monitor(parent) - conn = struct!(Plug.Conn, %{data | adapter: {Livebook.Proxy.Adapter, {parent, ref}}}) - :persistent_term.get({__MODULE__, name}).(conn) + @doc """ + Handles request with the configured listener function. + + Restores `%Plug.Conn{}` from the given attributes and delegates + all response handling back to the parent `Livebook.Proxy.Server`. + """ + @spec serve(pid(), map()) :: Plug.Conn.t() + def serve(parent_pid, %{} = conn_attrs) when is_pid(parent_pid) do + Process.link(parent_pid) + ref = Process.monitor(parent_pid) + + conn = + struct!(Plug.Conn, %{conn_attrs | adapter: {Livebook.Proxy.Adapter, {parent_pid, ref}}}) + + listen = :persistent_term.get({__MODULE__, :listen}) + listen.(conn) end - def get_pid(name, key) do - GenServer.whereis({:via, PartitionSupervisor, {name, key}}) + @doc """ + Returns a pid of task supervisor to start the handler under. + + In case no supervisor is running, returns `nil`. + """ + @spec get_supervisor_pid() :: pid() | nil + def get_supervisor_pid() do + if Process.whereis(@name) do + key = :rand.uniform() + GenServer.whereis({:via, PartitionSupervisor, {@name, key}}) + end end end diff --git a/lib/livebook/proxy/server.ex b/lib/livebook/proxy/server.ex index 5a0856e96..3cb1a300e 100644 --- a/lib/livebook/proxy/server.ex +++ b/lib/livebook/proxy/server.ex @@ -1,15 +1,39 @@ defmodule Livebook.Proxy.Server do @moduledoc false + + # The entrypoint for delegating `conn` handling to a runtime. + # + # The `Livebook.Proxy` modules are an implementation detail of the + # runtime. `Livebook.Proxy.Server` lives on the Livebook-side and + # it delegates request handling to `Livebook.Proxy.Handler`, which + # lives in the runtime node. The handler uses a custom plug adapter + # that dispatches `%Plug.Conn{}` operations as messages back to the + # server. + # + # Note that the server is not itself a new process, it is whoever + # calls `serve/2`. + import Plug.Conn - def serve(pid, name, %Plug.Conn{} = conn) when is_pid(pid) and is_atom(name) do - args = [self(), name, build_client_conn(conn)] - {:ok, spawn_pid} = Task.Supervisor.start_child(pid, Livebook.Proxy.Handler, :serve, args) - monitor_ref = Process.monitor(spawn_pid) + @doc """ + Handles a request by delegating to a new handler process in the + runtime. + + This function blocks until the request handler is done and it returns + the final `conn`. + """ + @spec serve(pid(), Plug.Conn.t()) :: Plug.Conn.t() + def serve(supervisor_pid, %Plug.Conn{} = conn) when is_pid(supervisor_pid) do + args = [self(), build_handler_conn(conn)] + + {:ok, handler_pid} = + Task.Supervisor.start_child(supervisor_pid, Livebook.Proxy.Handler, :serve, args) + + monitor_ref = Process.monitor(handler_pid) loop(monitor_ref, conn) end - defp build_client_conn(conn) do + defp build_handler_conn(conn) do %{ adapter: nil, host: conn.host, @@ -71,8 +95,8 @@ defmodule Livebook.Proxy.Server do send(pid, {ref, :ok}) loop(monitor_ref, conn) - {:DOWN, ^monitor_ref, :process, _pid, reason} -> - {conn, reason} + {:DOWN, ^monitor_ref, :process, _pid, _reason} -> + conn end end end diff --git a/lib/livebook/runtime.ex b/lib/livebook/runtime.ex index c5430ce3f..4103b9aab 100644 --- a/lib/livebook/runtime.ex +++ b/lib/livebook/runtime.ex @@ -763,6 +763,8 @@ defprotocol Livebook.Runtime do source: atom() } + @type proxy_handler_spec :: {module :: module(), function :: atom(), args :: list()} + @doc """ Returns relevant information about the runtime. @@ -1103,10 +1105,15 @@ defprotocol Livebook.Runtime do def unregister_clients(runtime, client_ids) @doc """ - Fetches the running Proxy Handler's pid from runtime. + Fetches information about a proxy request handler, if available. - TODO: document the communication here. + When the handler is available, this function returns MFA. In order + to handle a connection, the caller should invoke the MFA, appending + `conn` to the argument list, where `conn` is a `%Plug.Conn{}` struct + for the specific request. + + Once done, the handler MFA should return the final `conn`. """ - @spec fetch_proxy_handler(t(), pid()) :: {:ok, pid()} | {:error, :not_found} - def fetch_proxy_handler(runtime, client_pid) + @spec fetch_proxy_handler_spec(t()) :: {:ok, proxy_handler_spec()} | {:error, :not_found} + def fetch_proxy_handler_spec(runtime) end diff --git a/lib/livebook/runtime/attached.ex b/lib/livebook/runtime/attached.ex index c6b409a56..8e79a4d6a 100644 --- a/lib/livebook/runtime/attached.ex +++ b/lib/livebook/runtime/attached.ex @@ -205,7 +205,7 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do RuntimeServer.unregister_clients(runtime.server_pid, client_ids) end - def fetch_proxy_handler(runtime, client_pid) do - RuntimeServer.fetch_proxy_handler(runtime.server_pid, client_pid) + def fetch_proxy_handler_spec(runtime) do + RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid) end end diff --git a/lib/livebook/runtime/elixir_standalone.ex b/lib/livebook/runtime/elixir_standalone.ex index 211d7904e..57d93ec13 100644 --- a/lib/livebook/runtime/elixir_standalone.ex +++ b/lib/livebook/runtime/elixir_standalone.ex @@ -324,7 +324,7 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do RuntimeServer.unregister_clients(runtime.server_pid, client_ids) end - def fetch_proxy_handler(runtime, client_pid) do - RuntimeServer.fetch_proxy_handler(runtime.server_pid, client_pid) + def fetch_proxy_handler_spec(runtime) do + RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid) end end diff --git a/lib/livebook/runtime/embedded.ex b/lib/livebook/runtime/embedded.ex index 75b5e8ac7..c255cb076 100644 --- a/lib/livebook/runtime/embedded.ex +++ b/lib/livebook/runtime/embedded.ex @@ -171,8 +171,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do RuntimeServer.unregister_clients(runtime.server_pid, client_ids) end - def fetch_proxy_handler(runtime, client_pid) do - RuntimeServer.fetch_proxy_handler(runtime.server_pid, client_pid) + def fetch_proxy_handler_spec(runtime) do + RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid) end defp config() do diff --git a/lib/livebook/runtime/erl_dist/runtime_server.ex b/lib/livebook/runtime/erl_dist/runtime_server.ex index 13f58a2d9..af8de4209 100644 --- a/lib/livebook/runtime/erl_dist/runtime_server.ex +++ b/lib/livebook/runtime/erl_dist/runtime_server.ex @@ -318,11 +318,14 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do end @doc """ - Fetches the running Proxy Handler's pid from runtime. + Fetches information about a proxy request handler, if available. """ - @spec fetch_proxy_handler(pid(), pid()) :: {:ok, pid()} | {:error, :not_found} - def fetch_proxy_handler(pid, client_pid) do - GenServer.call(pid, {:fetch_proxy_handler, client_pid}) + @spec fetch_proxy_handler_spec(pid()) :: + {:ok, {module(), atom(), list()}} | {:error, :not_found} + def fetch_proxy_handler_spec(pid) do + with {:ok, supervisor_pid} <- GenServer.call(pid, :fetch_proxy_handler_supervisor) do + {:ok, {Livebook.Proxy.Server, :serve, [supervisor_pid]}} + end end @doc """ @@ -752,9 +755,9 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do {:reply, has_dependencies?, state} end - def handle_call({:fetch_proxy_handler, client_pid}, _from, state) do - if pid = Livebook.Proxy.Handler.get_pid(Kino.Proxy, client_pid) do - {:reply, {:ok, pid}, state} + def handle_call(:fetch_proxy_handler_supervisor, _from, state) do + if supervisor_pid = Livebook.Proxy.Handler.get_supervisor_pid() do + {:reply, {:ok, supervisor_pid}, state} else {:reply, {:error, :not_found}, state} end diff --git a/lib/livebook/runtime/evaluator/io_proxy.ex b/lib/livebook/runtime/evaluator/io_proxy.ex index 0c3c667da..ea06cf13a 100644 --- a/lib/livebook/runtime/evaluator/io_proxy.ex +++ b/lib/livebook/runtime/evaluator/io_proxy.ex @@ -383,7 +383,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do end defp io_request({:livebook_get_proxy_handler_child_spec, fun}, state) do - result = {Livebook.Proxy.Handler, name: Kino.Proxy, listen: fun} + result = {Livebook.Proxy.Handler, listen: fun} {result, state} end diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index 20e9e008d..fdb839592 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -625,11 +625,12 @@ defmodule Livebook.Session do end @doc """ - Fetches the running Proxy Handler's pid from runtime. + Fetches information about a proxy request handler, if available. """ - @spec fetch_proxy_handler(pid(), pid()) :: {:ok, pid()} | {:error, :not_found | :disconnected} - def fetch_proxy_handler(pid, client_pid) do - GenServer.call(pid, {:fetch_proxy_handler, client_pid}) + @spec fetch_proxy_handler_spec(pid()) :: + {:ok, Runtime.proxy_handler_spec()} | {:error, :not_found | :disconnected} + def fetch_proxy_handler_spec(pid) do + GenServer.call(pid, :fetch_proxy_handler_spec) end @doc """ @@ -1081,9 +1082,9 @@ defmodule Livebook.Session do {:noreply, state} end - def handle_call({:fetch_proxy_handler, client_pid}, _from, state) do + def handle_call(:fetch_proxy_handler_spec, _from, state) do if Runtime.connected?(state.data.runtime) do - {:reply, Runtime.fetch_proxy_handler(state.data.runtime, client_pid), state} + {:reply, Runtime.fetch_proxy_handler_spec(state.data.runtime), state} else {:reply, {:error, :disconnected}, state} end diff --git a/lib/livebook_web/plugs/proxy_plug.ex b/lib/livebook_web/plugs/proxy_plug.ex index 152830ace..5f1d266c0 100644 --- a/lib/livebook_web/plugs/proxy_plug.ex +++ b/lib/livebook_web/plugs/proxy_plug.ex @@ -1,5 +1,6 @@ defmodule LivebookWeb.ProxyPlug do @behaviour Plug + import Plug.Conn alias LivebookWeb.NotFoundError @@ -10,10 +11,9 @@ defmodule LivebookWeb.ProxyPlug do @impl true def call(%{path_info: ["sessions", id, "proxy" | path_info]} = conn, _opts) do session = fetch_session!(id) - pid = fetch_proxy_handler!(session) + proxy_handler_spec = fetch_proxy_handler_spec!(session) conn = prepare_conn(conn, path_info, ["sessions", id, "proxy"]) - {conn, _} = Livebook.Proxy.Server.serve(pid, Kino.Proxy, conn) - + conn = call_proxy_handler(proxy_handler_spec, conn) halt(conn) end @@ -25,10 +25,9 @@ defmodule LivebookWeb.ProxyPlug do end session = fetch_session!(id) - pid = fetch_proxy_handler!(session) + proxy_handler_spec = fetch_proxy_handler_spec!(session) conn = prepare_conn(conn, path_info, ["apps", slug, "proxy"]) - {conn, _} = Livebook.Proxy.Server.serve(pid, Kino.Proxy, conn) - + conn = call_proxy_handler(proxy_handler_spec, conn) halt(conn) end @@ -50,8 +49,8 @@ defmodule LivebookWeb.ProxyPlug do end end - defp fetch_proxy_handler!(session) do - case Livebook.Session.fetch_proxy_handler(session.pid, self()) do + defp fetch_proxy_handler_spec!(session) do + case Livebook.Session.fetch_proxy_handler_spec(session.pid) do {:ok, pid} -> pid {:error, _} -> raise NotFoundError, "could not find a kino proxy running" end @@ -60,4 +59,9 @@ defmodule LivebookWeb.ProxyPlug do defp prepare_conn(conn, path_info, script_name) do %{conn | path_info: path_info, script_name: conn.script_name ++ script_name} end + + defp call_proxy_handler(proxy_handler_spec, conn) do + {module, function, args} = proxy_handler_spec + apply(module, function, args ++ [conn]) + end end diff --git a/test/support/noop_runtime.ex b/test/support/noop_runtime.ex index 4f3de233a..1c768ecde 100644 --- a/test/support/noop_runtime.ex +++ b/test/support/noop_runtime.ex @@ -73,7 +73,7 @@ defmodule Livebook.Runtime.NoopRuntime do def register_clients(_, _), do: :ok def unregister_clients(_, _), do: :ok - def fetch_proxy_handler(_, _), do: {:error, :not_found} + def fetch_proxy_handler_spec(_), do: {:error, :not_found} defp trace(runtime, fun, args) do if runtime.trace_to do