Allow Livebook to proxy requests to the runtime (#2608)

This commit is contained in:
Alexandre de Souza 2024-05-20 17:04:04 -03:00 committed by GitHub
parent 1150dc288a
commit 16bd46b54f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 252 additions and 0 deletions

View file

@ -1101,4 +1101,12 @@ defprotocol Livebook.Runtime do
""" """
@spec unregister_clients(t(), list(client_id())) :: :ok @spec unregister_clients(t(), list(client_id())) :: :ok
def unregister_clients(runtime, client_ids) def unregister_clients(runtime, client_ids)
@doc """
Fetches the running Proxy Handler's pid from runtime.
TODO: document the communication here.
"""
@spec fetch_proxy_handler(t()) :: {:ok, pid()} | {:error, :not_found}
def fetch_proxy_handler(runtime)
end end

View file

@ -204,4 +204,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
def unregister_clients(runtime, client_ids) do def unregister_clients(runtime, client_ids) do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids) RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end end
def fetch_proxy_handler(runtime) do
RuntimeServer.fetch_proxy_handler(runtime.server_pid)
end
end end

View file

@ -323,4 +323,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
def unregister_clients(runtime, client_ids) do def unregister_clients(runtime, client_ids) do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids) RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end end
def fetch_proxy_handler(runtime) do
RuntimeServer.fetch_proxy_handler(runtime.server_pid)
end
end end

View file

@ -171,6 +171,10 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids) RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end end
def fetch_proxy_handler(runtime) do
RuntimeServer.fetch_proxy_handler(runtime.server_pid)
end
defp config() do defp config() do
Application.get_env(:livebook, Livebook.Runtime.Embedded, []) Application.get_env(:livebook, Livebook.Runtime.Embedded, [])
end end

View file

@ -317,6 +317,14 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
GenServer.cast(pid, {:unregister_clients, client_ids}) GenServer.cast(pid, {:unregister_clients, client_ids})
end end
@doc """
Fetches the running Proxy Handler's pid from runtime.
"""
@spec fetch_proxy_handler(pid()) :: {:ok, pid()} | {:error, :not_found}
def fetch_proxy_handler(pid) do
GenServer.call(pid, :fetch_proxy_handler)
end
@doc """ @doc """
Stops the runtime server. Stops the runtime server.
@ -744,6 +752,14 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
{:reply, has_dependencies?, state} {:reply, has_dependencies?, state}
end end
def handle_call(:fetch_proxy_handler, _from, state) do
if pid = GenServer.whereis(Kino.Proxy) do
{:reply, {:ok, pid}, state}
else
{:reply, {:error, :not_found}, state}
end
end
defp file_path(state, file_id) do defp file_path(state, file_id) do
if tmp_dir = state.tmp_dir do if tmp_dir = state.tmp_dir do
Path.join([tmp_dir, "files", file_id]) Path.join([tmp_dir, "files", file_id])

View file

@ -624,6 +624,14 @@ defmodule Livebook.Session do
GenServer.cast(pid, {:set_notebook_deployment_group, self(), id}) GenServer.cast(pid, {:set_notebook_deployment_group, self(), id})
end end
@doc """
Fetches the running Proxy Handler's pid from runtime.
"""
@spec fetch_proxy_handler(pid()) :: {:ok, pid()} | {:error, :not_found | :disconnected}
def fetch_proxy_handler(pid) do
GenServer.call(pid, :fetch_proxy_handler)
end
@doc """ @doc """
Sends a file entries addition request to the server. Sends a file entries addition request to the server.
@ -1073,6 +1081,14 @@ defmodule Livebook.Session do
{:noreply, state} {:noreply, state}
end end
def handle_call(:fetch_proxy_handler, _from, state) do
if Runtime.connected?(state.data.runtime) do
{:reply, Runtime.fetch_proxy_handler(state.data.runtime), state}
else
{:reply, {:error, :disconnected}, state}
end
end
@impl true @impl true
def handle_cast({:set_notebook_attributes, client_pid, attrs}, state) do def handle_cast({:set_notebook_attributes, client_pid, attrs}, state) do
client_id = client_id(state, client_pid) client_id = client_id(state, client_pid)

View file

@ -0,0 +1,8 @@
defmodule LivebookWeb.ErrorJSON do
# By default, Phoenix returns the status message from
# the template name. For example, "404.json" becomes
# "Not Found".
def render(template, _assigns) do
%{errors: %{detail: Phoenix.Controller.status_message_from_template(template)}}
end
end

View file

@ -69,6 +69,7 @@ defmodule LivebookWeb.Endpoint do
plug Plug.RequestId plug Plug.RequestId
plug Plug.Telemetry, event_prefix: [:phoenix, :endpoint] plug Plug.Telemetry, event_prefix: [:phoenix, :endpoint]
plug LivebookWeb.ProxyPlug
plug Plug.Parsers, plug Plug.Parsers,
parsers: [:urlencoded, :multipart, :json], parsers: [:urlencoded, :multipart, :json],

View file

@ -0,0 +1,63 @@
defmodule LivebookWeb.ProxyPlug do
@behaviour Plug
import Plug.Conn
alias LivebookWeb.NotFoundError
@impl true
def init(opts), do: opts
@impl true
def call(%{path_info: ["sessions", id, "proxy" | path_info]} = conn, _opts) do
session = fetch_session!(id)
pid = fetch_proxy_handler!(session)
conn = prepare_conn(conn, path_info, ["sessions", id, "proxy"])
{conn, _} = Kino.Proxy.serve(pid, conn)
halt(conn)
end
def call(%{path_info: ["apps", slug, id, "proxy" | path_info]} = conn, _opts) do
app = fetch_app!(slug)
unless Enum.any?(app.sessions, &(&1.id == id)) do
raise NotFoundError, "could not find an app session matching #{inspect(id)}"
end
session = fetch_session!(id)
pid = fetch_proxy_handler!(session)
conn = prepare_conn(conn, path_info, ["apps", slug, "proxy"])
{conn, _} = Kino.Proxy.serve(pid, conn)
halt(conn)
end
def call(conn, _opts) do
conn
end
defp fetch_app!(slug) do
case Livebook.Apps.fetch_app(slug) do
{:ok, app} -> app
:error -> raise NotFoundError, "could not find an app matching #{inspect(slug)}"
end
end
defp fetch_session!(id) do
case Livebook.Sessions.fetch_session(id) do
{:ok, session} -> session
{:error, _} -> raise NotFoundError, "could not find a session matching #{id}"
end
end
defp fetch_proxy_handler!(session) do
case Livebook.Session.fetch_proxy_handler(session.pid) do
{:ok, pid} -> pid
{:error, _} -> raise NotFoundError, "could not find a kino proxy running"
end
end
defp prepare_conn(conn, path_info, script_name) do
%{conn | path_info: path_info, script_name: conn.script_name ++ script_name}
end
end

View file

@ -116,6 +116,7 @@ defmodule Livebook.MixProject do
{:mint_web_socket, "~> 1.0.0"}, {:mint_web_socket, "~> 1.0.0"},
{:protobuf, "~> 0.12.0"}, {:protobuf, "~> 0.12.0"},
{:dns_cluster, "~> 0.1.2"}, {:dns_cluster, "~> 0.1.2"},
{:kino_proxy, kino_proxy_opts()},
{:phoenix_live_reload, "~> 1.2", only: :dev}, {:phoenix_live_reload, "~> 1.2", only: :dev},
{:floki, ">= 0.27.0", only: :test}, {:floki, ">= 0.27.0", only: :test},
{:bypass, "~> 2.1", only: :test}, {:bypass, "~> 2.1", only: :test},
@ -127,6 +128,14 @@ defmodule Livebook.MixProject do
] ]
end end
defp kino_proxy_opts do
if path = System.get_env("KINO_PROXY_PATH") do
[path: path]
else
[github: "livebook-dev/kino_proxy"]
end
end
defp target_deps(:app), do: [{:elixirkit, path: "elixirkit"}] defp target_deps(:app), do: [{:elixirkit, path: "elixirkit"}]
defp target_deps(_), do: [] defp target_deps(_), do: []

View file

@ -21,6 +21,7 @@
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"jose": {:hex, :jose, "1.11.10", "a903f5227417bd2a08c8a00a0cbcc458118be84480955e8d251297a425723f83", [:mix, :rebar3], [], "hexpm", "0d6cd36ff8ba174db29148fc112b5842186b68a90ce9fc2b3ec3afe76593e614"}, "jose": {:hex, :jose, "1.11.10", "a903f5227417bd2a08c8a00a0cbcc458118be84480955e8d251297a425723f83", [:mix, :rebar3], [], "hexpm", "0d6cd36ff8ba174db29148fc112b5842186b68a90ce9fc2b3ec3afe76593e614"},
"jsx": {:hex, :jsx, "3.0.0", "20a170abd4335fc6db24d5fad1e5d677c55dadf83d1b20a8a33b5fe159892a39", [:rebar3], [], "hexpm", "37beca0435f5ca8a2f45f76a46211e76418fbef80c36f0361c249fc75059dc6d"}, "jsx": {:hex, :jsx, "3.0.0", "20a170abd4335fc6db24d5fad1e5d677c55dadf83d1b20a8a33b5fe159892a39", [:rebar3], [], "hexpm", "37beca0435f5ca8a2f45f76a46211e76418fbef80c36f0361c249fc75059dc6d"},
"kino_proxy": {:git, "https://github.com/livebook-dev/kino_proxy.git", "42c434450d97bf2d2035c42296317c4955873bac", []},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"},

View file

@ -0,0 +1,117 @@
defmodule LivebookWeb.ProxyPlugTest do
use LivebookWeb.ConnCase, async: true
require Phoenix.LiveViewTest
import Livebook.AppHelpers
alias Livebook.{Notebook, Runtime, Session, Sessions}
describe "session" do
test "returns error when session doesn't exist", %{conn: conn} do
session_id = Livebook.Utils.random_long_id()
assert_error_sent 404, fn ->
get(conn, "/sessions/#{session_id}/proxy/foo/bar")
end
end
test "returns error when runtime is disconnected", %{conn: conn} do
{:ok, session} = Sessions.create_session()
assert_error_sent 404, fn ->
get(conn, "/sessions/#{session.id}/proxy/foo/bar")
end
end
test "returns the proxied response defined in notebook", %{conn: conn} do
cell = %{
Notebook.Cell.new(:code)
| source: """
Kino.Proxy.listen(fn conn ->
conn
|> Plug.Conn.put_resp_header("content-type", "application/text;charset=utf-8")
|> Plug.Conn.send_resp(200, "used " <> conn.method <> " method")
end)
"""
}
cell_id = cell.id
section = %{Notebook.Section.new() | cells: [cell]}
notebook = %{Notebook.new() | sections: [section]}
{:ok, session} = Sessions.create_session(notebook: notebook)
{:ok, runtime} = Runtime.Embedded.new() |> Runtime.connect()
Session.set_runtime(session.pid, runtime)
Session.subscribe(session.id)
Session.queue_cell_evaluation(session.pid, cell_id)
assert_receive {:operation, {:add_cell_evaluation_response, _, ^cell_id, _, _}}
url = "/sessions/#{session.id}/proxy/"
assert text_response(get(conn, url), 200) == "used GET method"
assert text_response(post(conn, url), 200) == "used POST method"
assert text_response(put(conn, url), 200) == "used PUT method"
assert text_response(patch(conn, url), 200) == "used PATCH method"
assert text_response(delete(conn, url), 200) == "used DELETE method"
Session.close(session.pid)
end
end
describe "app" do
test "returns error when app doesn't exist", %{conn: conn} do
slug = Livebook.Utils.random_short_id()
session_id = Livebook.Utils.random_long_id()
assert_error_sent 404, fn ->
get(conn, "/apps/#{slug}/#{session_id}/proxy/foo/bar")
end
end
test "returns the proxied response defined in notebook", %{conn: conn} do
slug = Livebook.Utils.random_short_id()
cell = %{
Notebook.Cell.new(:code)
| source: """
Kino.Proxy.listen(fn conn ->
conn
|> Plug.Conn.put_resp_header("content-type", "application/text;charset=utf-8")
|> Plug.Conn.send_resp(200, "used " <> conn.method <> " method")
end)
"""
}
app_settings = %{Notebook.AppSettings.new() | slug: slug, access_type: :public}
section = %{Notebook.Section.new() | cells: [cell]}
notebook = %{Notebook.new() | app_settings: app_settings, sections: [section]}
Livebook.Apps.subscribe()
pid = deploy_notebook_sync(notebook)
assert_receive {:app_created, %{pid: ^pid, slug: ^slug}}
assert_receive {:app_updated,
%{
pid: ^pid,
slug: ^slug,
sessions: [
%{id: id, app_status: %{lifecycle: :active, execution: :executed}}
]
}}
url = "/apps/#{slug}/#{id}/proxy/"
assert text_response(get(conn, url), 200) == "used GET method"
assert text_response(post(conn, url), 200) == "used POST method"
assert text_response(put(conn, url), 200) == "used PUT method"
assert text_response(patch(conn, url), 200) == "used PATCH method"
assert text_response(delete(conn, url), 200) == "used DELETE method"
Livebook.App.close(pid)
end
end
end

View file

@ -73,6 +73,7 @@ defmodule Livebook.Runtime.NoopRuntime do
def register_clients(_, _), do: :ok def register_clients(_, _), do: :ok
def unregister_clients(_, _), do: :ok def unregister_clients(_, _), do: :ok
def fetch_proxy_handler(_), do: {:error, :not_found}
defp trace(runtime, fun, args) do defp trace(runtime, fun, args) do
if runtime.trace_to do if runtime.trace_to do