mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-12-17 21:50:25 +08:00
Resolve user info requests coming from the runtime (#2540)
This commit is contained in:
parent
2ae8149568
commit
61ca2cd063
10 changed files with 128 additions and 56 deletions
|
|
@ -42,7 +42,7 @@ defmodule Livebook.Notebook.AppSettings do
|
||||||
slug: nil,
|
slug: nil,
|
||||||
multi_session: false,
|
multi_session: false,
|
||||||
zero_downtime: false,
|
zero_downtime: false,
|
||||||
show_existing_sessions: true,
|
show_existing_sessions: false,
|
||||||
auto_shutdown_ms: nil,
|
auto_shutdown_ms: nil,
|
||||||
access_type: :protected,
|
access_type: :protected,
|
||||||
password: generate_password(),
|
password: generate_password(),
|
||||||
|
|
|
||||||
|
|
@ -1093,7 +1093,7 @@ defprotocol Livebook.Runtime do
|
||||||
@doc """
|
@doc """
|
||||||
Notifies the runtime about connected clients.
|
Notifies the runtime about connected clients.
|
||||||
"""
|
"""
|
||||||
@spec register_clients(t(), list({client_id(), user_info()})) :: :ok
|
@spec register_clients(t(), list(client_id())) :: :ok
|
||||||
def register_clients(runtime, clients)
|
def register_clients(runtime, clients)
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
|
|
|
||||||
|
|
@ -19,9 +19,9 @@ defmodule Livebook.Runtime.Evaluator.ClientTracker do
|
||||||
@doc """
|
@doc """
|
||||||
Registeres new connected clients.
|
Registeres new connected clients.
|
||||||
"""
|
"""
|
||||||
@spec register_clients(pid(), list({Runtime.client_id(), Runtime.user_info()})) :: :ok
|
@spec register_clients(pid(), list(Runtime.client_id())) :: :ok
|
||||||
def register_clients(client_tracker, clients) do
|
def register_clients(client_tracker, client_ids) do
|
||||||
GenServer.cast(client_tracker, {:register_clients, clients})
|
GenServer.cast(client_tracker, {:register_clients, client_ids})
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
|
|
@ -34,24 +34,26 @@ defmodule Livebook.Runtime.Evaluator.ClientTracker do
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Subscribes the given process to client presence events.
|
Subscribes the given process to client presence events.
|
||||||
|
|
||||||
|
Returns the list of currently connected clients.
|
||||||
"""
|
"""
|
||||||
@spec monitor_clients(pid(), pid) :: :ok
|
@spec monitor_clients(pid(), pid) :: list(Runtime.client_id())
|
||||||
def monitor_clients(client_tracker, pid) do
|
def monitor_clients(client_tracker, pid) do
|
||||||
GenServer.cast(client_tracker, {:monitor_clients, pid})
|
GenServer.call(client_tracker, {:monitor_clients, pid})
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def init({}) do
|
def init({}) do
|
||||||
{:ok, %{clients: %{}, subscribers: MapSet.new()}}
|
{:ok, %{client_ids: MapSet.new(), subscribers: MapSet.new()}}
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_cast({:register_clients, clients}, state) do
|
def handle_cast({:register_clients, client_ids}, state) do
|
||||||
for {client_id, _user_info} <- clients, pid <- state.subscribers do
|
for client_id <- client_ids, pid <- state.subscribers do
|
||||||
send(pid, {:client_join, client_id})
|
send(pid, {:client_join, client_id})
|
||||||
end
|
end
|
||||||
|
|
||||||
state = update_in(state.clients, &Enum.into(clients, &1))
|
state = update_in(state.client_ids, &Enum.into(client_ids, &1))
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
@ -61,20 +63,20 @@ defmodule Livebook.Runtime.Evaluator.ClientTracker do
|
||||||
send(pid, {:client_leave, client_id})
|
send(pid, {:client_leave, client_id})
|
||||||
end
|
end
|
||||||
|
|
||||||
state = update_in(state.clients, &Map.drop(&1, client_ids))
|
state =
|
||||||
|
update_in(state.client_ids, fn ids ->
|
||||||
|
Enum.reduce(client_ids, ids, &MapSet.delete(&2, &1))
|
||||||
|
end)
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_cast({:monitor_clients, pid}, state) do
|
@impl true
|
||||||
|
def handle_call({:monitor_clients, pid}, _from, state) do
|
||||||
Process.monitor(pid)
|
Process.monitor(pid)
|
||||||
state = update_in(state.subscribers, &MapSet.put(&1, pid))
|
state = update_in(state.subscribers, &MapSet.put(&1, pid))
|
||||||
|
client_ids = MapSet.to_list(state.client_ids)
|
||||||
for {client_id, _user_info} <- state.clients do
|
{:reply, client_ids, state}
|
||||||
send(pid, {:client_join, client_id})
|
|
||||||
end
|
|
||||||
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
|
|
|
||||||
|
|
@ -373,8 +373,13 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp io_request({:livebook_monitor_clients, pid}, state) do
|
defp io_request({:livebook_monitor_clients, pid}, state) do
|
||||||
Evaluator.ClientTracker.monitor_clients(state.client_tracker, pid)
|
client_ids = Evaluator.ClientTracker.monitor_clients(state.client_tracker, pid)
|
||||||
{:ok, state}
|
{{:ok, client_ids}, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp io_request({:livebook_get_user_info, client_id}, state) do
|
||||||
|
result = request_user_info(client_id, state)
|
||||||
|
{result, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
defp io_request(_, state) do
|
defp io_request(_, state) do
|
||||||
|
|
@ -444,6 +449,13 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
|
||||||
with {:ok, reply} <- runtime_request(state, request, reply_tag), do: reply
|
with {:ok, reply} <- runtime_request(state, request, reply_tag), do: reply
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp request_user_info(client_id, state) do
|
||||||
|
request = {:runtime_user_info_request, self(), client_id}
|
||||||
|
reply_tag = :runtime_user_info_reply
|
||||||
|
|
||||||
|
with {:ok, reply} <- runtime_request(state, request, reply_tag), do: reply
|
||||||
|
end
|
||||||
|
|
||||||
defp runtime_request(state, request, reply_tag) do
|
defp runtime_request(state, request, reply_tag) do
|
||||||
send(state.send_to, request)
|
send(state.send_to, request)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1645,6 +1645,24 @@ defmodule Livebook.Session do
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def handle_info({:runtime_user_info_request, reply_to, client_id}, state) do
|
||||||
|
reply =
|
||||||
|
cond do
|
||||||
|
not state.data.notebook.teams_enabled ->
|
||||||
|
{:error, :not_available}
|
||||||
|
|
||||||
|
user_id = state.data.clients_map[client_id] ->
|
||||||
|
user = Map.fetch!(state.data.users_map, user_id)
|
||||||
|
{:ok, user_info(user)}
|
||||||
|
|
||||||
|
true ->
|
||||||
|
{:error, :not_found}
|
||||||
|
end
|
||||||
|
|
||||||
|
send(reply_to, {:runtime_user_info_reply, reply})
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
def handle_info({:runtime_container_down, container_ref, message}, state) do
|
def handle_info({:runtime_container_down, container_ref, message}, state) do
|
||||||
broadcast_error(state.session_id, "evaluation process terminated - #{message}")
|
broadcast_error(state.session_id, "evaluation process terminated - #{message}")
|
||||||
|
|
||||||
|
|
@ -2085,13 +2103,8 @@ defmodule Livebook.Session do
|
||||||
Runtime.restore_transient_state(runtime, state.data.runtime_transient_state)
|
Runtime.restore_transient_state(runtime, state.data.runtime_transient_state)
|
||||||
end
|
end
|
||||||
|
|
||||||
clients =
|
client_ids = Map.keys(state.data.clients_map)
|
||||||
for {client_id, user_id} <- state.data.clients_map do
|
Runtime.register_clients(runtime, client_ids)
|
||||||
user = Map.fetch!(state.data.users_map, user_id)
|
|
||||||
{client_id, user_info(user)}
|
|
||||||
end
|
|
||||||
|
|
||||||
Runtime.register_clients(runtime, clients)
|
|
||||||
|
|
||||||
%{state | runtime_monitor_ref: runtime_monitor_ref}
|
%{state | runtime_monitor_ref: runtime_monitor_ref}
|
||||||
end
|
end
|
||||||
|
|
@ -2207,7 +2220,7 @@ defmodule Livebook.Session do
|
||||||
state = put_in(state.client_id_with_assets[client_id], %{})
|
state = put_in(state.client_id_with_assets[client_id], %{})
|
||||||
|
|
||||||
if Runtime.connected?(state.data.runtime) do
|
if Runtime.connected?(state.data.runtime) do
|
||||||
Runtime.register_clients(state.data.runtime, [{client_id, user_info(user)}])
|
Runtime.register_clients(state.data.runtime, [client_id])
|
||||||
end
|
end
|
||||||
|
|
||||||
app_report_client_count_change(state)
|
app_report_client_count_change(state)
|
||||||
|
|
|
||||||
|
|
@ -1274,7 +1274,7 @@ defmodule Livebook.LiveMarkdown.ExportTest do
|
||||||
| slug: "app",
|
| slug: "app",
|
||||||
multi_session: true,
|
multi_session: true,
|
||||||
zero_downtime: false,
|
zero_downtime: false,
|
||||||
show_existing_sessions: false,
|
show_existing_sessions: true,
|
||||||
auto_shutdown_ms: 5_000,
|
auto_shutdown_ms: 5_000,
|
||||||
access_type: :public,
|
access_type: :public,
|
||||||
show_source: true,
|
show_source: true,
|
||||||
|
|
@ -1283,7 +1283,7 @@ defmodule Livebook.LiveMarkdown.ExportTest do
|
||||||
}
|
}
|
||||||
|
|
||||||
expected_document = """
|
expected_document = """
|
||||||
<!-- livebook:{"app_settings":{"access_type":"public","auto_shutdown_ms":5000,"multi_session":true,"output_type":"rich","show_existing_sessions":false,"show_source":true,"slug":"app"}} -->
|
<!-- livebook:{"app_settings":{"access_type":"public","auto_shutdown_ms":5000,"multi_session":true,"output_type":"rich","show_existing_sessions":true,"show_source":true,"slug":"app"}} -->
|
||||||
|
|
||||||
# My Notebook
|
# My Notebook
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
|
|
@ -353,16 +353,17 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do
|
||||||
send(Process.group_leader(), {:io_request, self(), ref, {:livebook_monitor_clients, pid}})
|
send(Process.group_leader(), {:io_request, self(), ref, {:livebook_monitor_clients, pid}})
|
||||||
|
|
||||||
receive do
|
receive do
|
||||||
{:io_reply, ^ref, :ok} -> :ok
|
{:io_reply, ^ref, {:ok, ["c1"]}} -> :ok
|
||||||
end
|
end
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
RuntimeServer.register_clients(pid, ["c1"])
|
||||||
|
|
||||||
RuntimeServer.evaluate_code(pid, :elixir, code, {:c1, :e1}, [])
|
RuntimeServer.evaluate_code(pid, :elixir, code, {:c1, :e1}, [])
|
||||||
assert_receive {:runtime_evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}}
|
assert_receive {:runtime_evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}}
|
||||||
|
|
||||||
clients = [{"c1", %{id: "u1", name: "Jake Peralta", email: nil, source: :session}}]
|
RuntimeServer.register_clients(pid, ["c2"])
|
||||||
RuntimeServer.register_clients(pid, clients)
|
assert_receive {:client_join, "c2"}
|
||||||
assert_receive {:client_join, "c1"}
|
|
||||||
|
|
||||||
RuntimeServer.unregister_clients(pid, ["c1"])
|
RuntimeServer.unregister_clients(pid, ["c1"])
|
||||||
assert_receive {:client_leave, "c1"}
|
assert_receive {:client_leave, "c1"}
|
||||||
|
|
|
||||||
|
|
@ -11,8 +11,7 @@ defmodule Livebook.Runtime.Evaluator.ClientTrackerTest do
|
||||||
test "sends client joins to monitoring processes", %{client_tracker: client_tracker} do
|
test "sends client joins to monitoring processes", %{client_tracker: client_tracker} do
|
||||||
ClientTracker.monitor_clients(client_tracker, self())
|
ClientTracker.monitor_clients(client_tracker, self())
|
||||||
|
|
||||||
clients = [{"c1", user_info()}, {"c2", user_info()}]
|
ClientTracker.register_clients(client_tracker, ["c1", "c2"])
|
||||||
ClientTracker.register_clients(client_tracker, clients)
|
|
||||||
|
|
||||||
assert_receive {:client_join, "c1"}
|
assert_receive {:client_join, "c1"}
|
||||||
assert_receive {:client_join, "c2"}
|
assert_receive {:client_join, "c2"}
|
||||||
|
|
@ -21,8 +20,7 @@ defmodule Livebook.Runtime.Evaluator.ClientTrackerTest do
|
||||||
test "sends client leaves to monitoring processes", %{client_tracker: client_tracker} do
|
test "sends client leaves to monitoring processes", %{client_tracker: client_tracker} do
|
||||||
ClientTracker.monitor_clients(client_tracker, self())
|
ClientTracker.monitor_clients(client_tracker, self())
|
||||||
|
|
||||||
clients = [{"c1", user_info()}, {"c2", user_info()}]
|
ClientTracker.register_clients(client_tracker, ["c1", "c2"])
|
||||||
ClientTracker.register_clients(client_tracker, clients)
|
|
||||||
|
|
||||||
ClientTracker.unregister_clients(client_tracker, ["c1"])
|
ClientTracker.unregister_clients(client_tracker, ["c1"])
|
||||||
assert_receive {:client_leave, "c1"}
|
assert_receive {:client_leave, "c1"}
|
||||||
|
|
@ -31,22 +29,10 @@ defmodule Livebook.Runtime.Evaluator.ClientTrackerTest do
|
||||||
assert_receive {:client_leave, "c2"}
|
assert_receive {:client_leave, "c2"}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "sends existing client joins when monitoring starts", %{client_tracker: client_tracker} do
|
test "returns existing client joins when monitoring starts", %{client_tracker: client_tracker} do
|
||||||
clients = [{"c1", user_info()}, {"c2", user_info()}]
|
ClientTracker.register_clients(client_tracker, ["c1", "c2"])
|
||||||
ClientTracker.register_clients(client_tracker, clients)
|
ClientTracker.unregister_clients(client_tracker, ["c1"])
|
||||||
|
|
||||||
ClientTracker.monitor_clients(client_tracker, self())
|
assert ClientTracker.monitor_clients(client_tracker, self()) == ["c2"]
|
||||||
|
|
||||||
assert_receive {:client_join, "c1"}
|
|
||||||
assert_receive {:client_join, "c2"}
|
|
||||||
end
|
|
||||||
|
|
||||||
defp user_info() do
|
|
||||||
%{
|
|
||||||
id: "u1",
|
|
||||||
name: "Jake Peralta",
|
|
||||||
email: nil,
|
|
||||||
source: :session
|
|
||||||
}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -1901,6 +1901,57 @@ defmodule Livebook.SessionTest do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "accessing client's user info" do
|
||||||
|
test "replies with error when the session does not use teams hub" do
|
||||||
|
session = start_session()
|
||||||
|
|
||||||
|
runtime = connected_noop_runtime(self())
|
||||||
|
Session.set_runtime(session.pid, runtime)
|
||||||
|
send(session.pid, {:runtime_user_info_request, self(), "c1"})
|
||||||
|
|
||||||
|
assert_receive {:runtime_user_info_reply, {:error, :not_available}}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "replies with error when the client does not exist" do
|
||||||
|
notebook = %{Notebook.new() | teams_enabled: true}
|
||||||
|
session = start_session(notebook: notebook)
|
||||||
|
|
||||||
|
runtime = connected_noop_runtime(self())
|
||||||
|
Session.set_runtime(session.pid, runtime)
|
||||||
|
send(session.pid, {:runtime_user_info_request, self(), "c1"})
|
||||||
|
|
||||||
|
assert_receive {:runtime_user_info_reply, {:error, :not_found}}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "replies with user info when the client exists" do
|
||||||
|
notebook = %{Notebook.new() | teams_enabled: true}
|
||||||
|
session = start_session(notebook: notebook)
|
||||||
|
|
||||||
|
user = %{
|
||||||
|
Livebook.Users.User.new()
|
||||||
|
| id: "1234",
|
||||||
|
name: "Jake Peralta",
|
||||||
|
email: "jperalta@example.com"
|
||||||
|
}
|
||||||
|
|
||||||
|
{_, client_id} = Session.register_client(session.pid, self(), user)
|
||||||
|
|
||||||
|
runtime = connected_noop_runtime(self())
|
||||||
|
Session.set_runtime(session.pid, runtime)
|
||||||
|
send(session.pid, {:runtime_user_info_request, self(), client_id})
|
||||||
|
|
||||||
|
assert_receive {:runtime_user_info_reply, {:ok, user_info}}
|
||||||
|
|
||||||
|
assert user_info == %{
|
||||||
|
source: :session,
|
||||||
|
id: "1234",
|
||||||
|
name: "Jake Peralta",
|
||||||
|
email: "jperalta@example.com",
|
||||||
|
payload: nil
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
test "supports legacy text outputs" do
|
test "supports legacy text outputs" do
|
||||||
session = start_session()
|
session = start_session()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,14 @@ defmodule LivebookWeb.AppLiveTest do
|
||||||
describe "multi-session app" do
|
describe "multi-session app" do
|
||||||
test "renders a list of active app sessions", %{conn: conn} do
|
test "renders a list of active app sessions", %{conn: conn} do
|
||||||
slug = Utils.random_short_id()
|
slug = Utils.random_short_id()
|
||||||
app_settings = %{Notebook.AppSettings.new() | slug: slug, multi_session: true}
|
|
||||||
|
app_settings = %{
|
||||||
|
Notebook.AppSettings.new()
|
||||||
|
| slug: slug,
|
||||||
|
multi_session: true,
|
||||||
|
show_existing_sessions: true
|
||||||
|
}
|
||||||
|
|
||||||
notebook = %{Notebook.new() | app_settings: app_settings}
|
notebook = %{Notebook.new() | app_settings: app_settings}
|
||||||
|
|
||||||
Apps.subscribe()
|
Apps.subscribe()
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue