Support proxy requests on generic app URL and await session execution (#2618)

This commit is contained in:
Jonatan Kłosko 2024-05-24 20:34:11 +02:00 committed by GitHub
parent 55274f8117
commit 197f4868e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 157 additions and 25 deletions

View file

@ -222,6 +222,16 @@ defmodule Livebook.Session do
GenServer.call(pid, {:register_client, client_pid, user}, @timeout)
end
@doc """
Resets the auto shutdown timer, if ticking.
When the session has connected clients, nothing changes.
"""
@spec reset_auto_shutdown(pid()) :: :ok
def reset_auto_shutdown(pid) do
GenServer.cast(pid, :reset_auto_shutdown)
end
@doc """
Returns data of the given session.
"""
@ -959,25 +969,31 @@ defmodule Livebook.Session do
defp schedule_auto_shutdown(state) do
client_count = map_size(state.data.clients_map)
case {client_count, state.auto_shutdown_timer_ref} do
{0, nil} when state.auto_shutdown_ms != nil ->
cond do
client_count == 0 and state.auto_shutdown_timer_ref == nil and state.auto_shutdown_ms != nil ->
timer_ref = Process.send_after(self(), :close, state.auto_shutdown_ms)
%{state | auto_shutdown_timer_ref: timer_ref}
{client_count, timer_ref} when client_count > 0 and timer_ref != nil ->
if Process.cancel_timer(timer_ref) == false do
receive do
:close -> :ok
end
end
client_count > 0 ->
cancel_auto_shutdown_timer(state)
%{state | auto_shutdown_timer_ref: nil}
_ ->
true ->
state
end
end
defp cancel_auto_shutdown_timer(%{auto_shutdown_timer_ref: nil} = state), do: state
defp cancel_auto_shutdown_timer(state) do
if Process.cancel_timer(state.auto_shutdown_timer_ref) == false do
receive do
:close -> :ok
end
end
%{state | auto_shutdown_timer_ref: nil}
end
@impl true
def handle_continue(:app_init, state) do
cell_ids = Data.cell_ids_for_full_evaluation(state.data, [])
@ -1091,6 +1107,13 @@ defmodule Livebook.Session do
end
@impl true
def handle_cast(:reset_auto_shutdown, state) do
{:noreply,
state
|> cancel_auto_shutdown_timer()
|> schedule_auto_shutdown()}
end
def handle_cast({:set_notebook_attributes, client_pid, attrs}, state) do
client_id = client_id(state, client_pid)
operation = {:set_notebook_attributes, client_id, attrs}
@ -2466,7 +2489,7 @@ defmodule Livebook.Session do
status = state.data.app_data.status
send(state.app_pid, {:app_status_changed, state.session_id, status})
notify_update(state)
state
end
defp handle_action(state, :app_recover) do

View file

@ -0,0 +1,7 @@
defmodule LivebookWeb.NotFoundError do
defexception [:message, plug_status: 404]
end
defmodule LivebookWeb.BadRequestError do
defexception [:message, plug_status: 400]
end

View file

@ -1,3 +0,0 @@
defmodule LivebookWeb.NotFoundError do
defexception [:message, plug_status: 404]
end

View file

@ -11,24 +11,43 @@ defmodule LivebookWeb.ProxyPlug do
@impl true
def call(%{path_info: ["sessions", id, "proxy" | path_info]} = conn, _opts) do
session = fetch_session!(id)
Livebook.Session.reset_auto_shutdown(session.pid)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["sessions", id, "proxy"])
conn = call_proxy_handler(proxy_handler_spec, conn)
halt(conn)
call_proxy_handler(proxy_handler_spec, conn)
end
def call(%{path_info: ["apps", slug, id, "proxy" | path_info]} = conn, _opts) do
app = fetch_app!(slug)
unless Enum.any?(app.sessions, &(&1.id == id)) do
raise NotFoundError, "could not find an app session matching #{inspect(id)}"
raise NotFoundError, "could not find an app session with id #{inspect(id)}"
end
session = fetch_session!(id)
Livebook.Session.reset_auto_shutdown(session.pid)
await_app_session_ready(app, session.id)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["apps", slug, id, "proxy"])
call_proxy_handler(proxy_handler_spec, conn)
end
def call(%{path_info: ["apps", slug, "proxy" | path_info]} = conn, _opts) do
app = fetch_app!(slug)
if app.multi_session do
raise LivebookWeb.BadRequestError,
"the requested app is multi-session. In order to send requests to this app," <>
" you need to start a session and use its specific URL"
end
session_id = Livebook.App.get_session_id(app.pid)
{:ok, session} = Livebook.Sessions.fetch_session(session_id)
Livebook.Session.reset_auto_shutdown(session.pid)
await_app_session_ready(app, session.id)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["apps", slug, "proxy"])
conn = call_proxy_handler(proxy_handler_spec, conn)
halt(conn)
call_proxy_handler(proxy_handler_spec, conn)
end
def call(conn, _opts) do
@ -38,21 +57,27 @@ defmodule LivebookWeb.ProxyPlug do
defp fetch_app!(slug) do
case Livebook.Apps.fetch_app(slug) do
{:ok, app} -> app
:error -> raise NotFoundError, "could not find an app matching #{inspect(slug)}"
:error -> raise NotFoundError, "could not find an app with slug #{inspect(slug)}"
end
end
defp fetch_session!(id) do
case Livebook.Sessions.fetch_session(id) do
{:ok, session} -> session
{:error, _} -> raise NotFoundError, "could not find a session matching #{id}"
{:error, _} -> raise NotFoundError, "could not find a session with id #{id}"
end
end
defp fetch_proxy_handler_spec!(session) do
case Livebook.Session.fetch_proxy_handler_spec(session.pid) do
{:ok, pid} -> pid
{:error, _} -> raise NotFoundError, "could not find a kino proxy running"
{:ok, pid} ->
pid
{:error, _} ->
raise NotFoundError,
"the session does not listen to proxied requests." <>
" See the Kino.Proxy documentation to learn about defining" <>
" custom request handlers"
end
end
@ -62,6 +87,34 @@ defmodule LivebookWeb.ProxyPlug do
defp call_proxy_handler(proxy_handler_spec, conn) do
{module, function, args} = proxy_handler_spec
apply(module, function, args ++ [conn])
conn = apply(module, function, args ++ [conn])
halt(conn)
end
defp await_app_session_ready(app, session_id) do
unless session_ready?(app, session_id) do
Livebook.App.subscribe(app.slug)
# We fetch the app again, in case it had changed before we subscribed
app = Livebook.App.get_by_pid(app.pid)
await_session_execution_loop(app, session_id)
Livebook.App.unsubscribe(app.slug)
end
end
defp await_session_execution_loop(%{slug: slug} = app, session_id) do
unless session_ready?(app, session_id) do
receive do
{:app_updated, %{slug: ^slug} = app} ->
await_session_execution_loop(app, session_id)
end
end
end
defp session_ready?(app, session_id) do
if session = Enum.find(app.sessions, &(&1.id == session_id)) do
session.app_status.execution != :executing
else
false
end
end
end

View file

@ -78,6 +78,58 @@ defmodule Livebook.ProxyTest do
assert text_response(put(conn, url), 200) == "used PUT method"
assert text_response(patch(conn, url), 200) == "used PATCH method"
assert text_response(delete(conn, url), 200) == "used DELETE method"
# Generic path also works for single-session apps
url = "/apps/#{slug}/proxy/"
assert text_response(get(conn, url), 200) == "used GET method"
end
test "waits for the session to be executed before attempting the request", %{conn: conn} do
slug = Livebook.Utils.random_short_id()
app_settings = %{
Notebook.AppSettings.new()
| slug: slug,
access_type: :public,
auto_shutdown_ms: 5_000
}
notebook = %{proxy_notebook() | app_settings: app_settings}
Livebook.Apps.subscribe()
pid = deploy_notebook_sync(notebook)
assert_receive {:app_created, %{pid: ^pid, slug: ^slug, sessions: []}}
# The app is configured with auto shutdown, so the session will
# start only once requested. We should wait until it executes
# and then proxy the request as usual
url = "/apps/#{slug}/proxy/"
assert text_response(get(conn, url), 200) == "used GET method"
end
test "returns error when requesting generic path for multi-session app", %{conn: conn} do
slug = Livebook.Utils.random_short_id()
app_settings = %{
Notebook.AppSettings.new()
| slug: slug,
access_type: :public,
multi_session: true
}
notebook = %{proxy_notebook() | app_settings: app_settings}
Livebook.Apps.subscribe()
pid = deploy_notebook_sync(notebook)
assert_receive {:app_created, %{pid: ^pid, slug: ^slug, sessions: []}}
assert_error_sent 400, fn ->
get(conn, "/apps/#{slug}/proxy/foo/bar")
end
end
end