Monitor clients presence in the runtime (#2530)

This commit is contained in:
Jonatan Kłosko 2024-03-27 15:25:42 +01:00 committed by GitHub
parent 23a172711d
commit f363be949e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 319 additions and 22 deletions

View file

@ -746,6 +746,23 @@ defprotocol Livebook.Runtime do
"""
@type transient_state :: %{atom() => term()}
@typedoc """
An identifier representing a client process.
A client is a connected user that interacts with the runtime outputs.
"""
@type client_id :: String.t()
@typedoc """
User information about a particular client.
"""
@type user_info :: %{
id: String.t(),
name: String.t() | nil,
email: String.t() | nil,
source: atom()
}
@doc """
Returns relevant information about the runtime.
@ -1072,4 +1089,16 @@ defprotocol Livebook.Runtime do
"""
@spec restore_transient_state(t(), transient_state()) :: :ok
def restore_transient_state(runtime, transient_state)
@doc """
Notifies the runtime about connected clients.
"""
@spec register_clients(t(), list({client_id(), user_info()})) :: :ok
def register_clients(runtime, clients)
@doc """
Notifies the runtime about clients leaving.
"""
@spec unregister_clients(t(), list(client_id())) :: :ok
def unregister_clients(runtime, client_ids)
end

View file

@ -196,4 +196,12 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
def restore_transient_state(runtime, transient_state) do
RuntimeServer.restore_transient_state(runtime.server_pid, transient_state)
end
def register_clients(runtime, clients) do
RuntimeServer.register_clients(runtime.server_pid, clients)
end
def unregister_clients(runtime, client_ids) do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end
end

View file

@ -194,4 +194,12 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
def restore_transient_state(runtime, transient_state) do
RuntimeServer.restore_transient_state(runtime.server_pid, transient_state)
end
def register_clients(runtime, clients) do
RuntimeServer.register_clients(runtime.server_pid, clients)
end
def unregister_clients(runtime, client_ids) do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end
end

View file

@ -163,6 +163,14 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
RuntimeServer.restore_transient_state(runtime.server_pid, transient_state)
end
def register_clients(runtime, clients) do
RuntimeServer.register_clients(runtime.server_pid, clients)
end
def unregister_clients(runtime, client_ids) do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end
defp config() do
Application.get_env(:livebook, Livebook.Runtime.Embedded, [])
end

View file

@ -27,6 +27,7 @@ defmodule Livebook.Runtime.ErlDist do
Livebook.Runtime.Evaluator.IOProxy,
Livebook.Runtime.Evaluator.Tracer,
Livebook.Runtime.Evaluator.ObjectTracker,
Livebook.Runtime.Evaluator.ClientTracker,
Livebook.Runtime.Evaluator.Formatter,
Livebook.Runtime.Evaluator.Doctests,
Livebook.Intellisense,

View file

@ -301,6 +301,22 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
GenServer.cast(pid, {:restore_transient_state, transient_state})
end
@doc """
Notifies the runtime about connected clients.
"""
@spec register_clients(pid(), list({Runtime.client_id(), Runtime.user_info()})) :: :ok
def register_clients(pid, clients) do
GenServer.cast(pid, {:register_clients, clients})
end
@doc """
Notifies the runtime about clients leaving.
"""
@spec unregister_clients(pid(), list(Runtime.client_id())) :: :ok
def unregister_clients(pid, client_ids) do
GenServer.cast(pid, {:unregister_clients, client_ids})
end
@doc """
Stops the runtime server.
@ -320,7 +336,8 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
{:ok, evaluator_supervisor} = ErlDist.EvaluatorSupervisor.start_link()
{:ok, task_supervisor} = Task.Supervisor.start_link()
{:ok, object_tracker} = Livebook.Runtime.Evaluator.ObjectTracker.start_link()
{:ok, object_tracker} = Evaluator.ObjectTracker.start_link()
{:ok, client_tracker} = Evaluator.ClientTracker.start_link()
{:ok,
%{
@ -330,6 +347,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
evaluator_supervisor: evaluator_supervisor,
task_supervisor: task_supervisor,
object_tracker: object_tracker,
client_tracker: client_tracker,
smart_cell_supervisor: nil,
smart_cell_gl: nil,
smart_cells: %{},
@ -656,6 +674,16 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
{:noreply, state}
end
def handle_cast({:register_clients, clients}, state) do
Evaluator.ClientTracker.register_clients(state.client_tracker, clients)
{:noreply, state}
end
def handle_cast({:unregister_clients, client_ids}, state) do
Evaluator.ClientTracker.unregister_clients(state.client_tracker, client_ids)
{:noreply, state}
end
def handle_cast({:relabel_file, file_id, new_file_id}, state) do
path = file_path(state, file_id)
new_path = file_path(state, new_file_id)
@ -738,6 +766,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
send_to: state.owner,
runtime_broadcast_to: state.runtime_broadcast_to,
object_tracker: state.object_tracker,
client_tracker: state.client_tracker,
ebin_path: state.ebin_path,
tmp_dir: evaluator_tmp_dir(state),
io_proxy_registry: state.io_proxy_registry

View file

@ -28,6 +28,7 @@ defmodule Livebook.Runtime.Evaluator do
send_to: pid(),
runtime_broadcast_to: pid(),
object_tracker: pid(),
client_tracker: pid(),
contexts: %{ref() => context()},
initial_context: context(),
initial_context_version: nil | (md5 :: binary()),
@ -75,6 +76,9 @@ defmodule Livebook.Runtime.Evaluator do
* `:object_tracker` - a pid of `Livebook.Runtime.Evaluator.ObjectTracker`.
Required
* `:client_tracker` - a pid of `Livebook.Runtime.Evaluator.ClientTracker`.
Required
* `:runtime_broadcast_to` - the process to send runtime broadcast
events to. Defaults to the value of `:send_to`
@ -266,6 +270,7 @@ defmodule Livebook.Runtime.Evaluator do
send_to = Keyword.fetch!(opts, :send_to)
runtime_broadcast_to = Keyword.get(opts, :runtime_broadcast_to, send_to)
object_tracker = Keyword.fetch!(opts, :object_tracker)
client_tracker = Keyword.fetch!(opts, :client_tracker)
ebin_path = Keyword.get(opts, :ebin_path)
tmp_dir = Keyword.get(opts, :tmp_dir)
io_proxy_registry = Keyword.get(opts, :io_proxy_registry)
@ -276,6 +281,7 @@ defmodule Livebook.Runtime.Evaluator do
send_to,
runtime_broadcast_to,
object_tracker,
client_tracker,
ebin_path,
tmp_dir,
io_proxy_registry
@ -309,6 +315,7 @@ defmodule Livebook.Runtime.Evaluator do
send_to: send_to,
runtime_broadcast_to: runtime_broadcast_to,
object_tracker: object_tracker,
client_tracker: client_tracker,
contexts: %{},
initial_context: context,
initial_context_version: nil,

View file

@ -0,0 +1,85 @@
defmodule Livebook.Runtime.Evaluator.ClientTracker do
# Keeps track of connected clients as reported to the runtime by the
# owner.
#
# Sends events to processes monitoring clients presence.
use GenServer
alias Livebook.Runtime
@doc """
Starts a new client tracker.
"""
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(_opts \\ []) do
GenServer.start_link(__MODULE__, {})
end
@doc """
Registeres new connected clients.
"""
@spec register_clients(pid(), list({Runtime.client_id(), Runtime.user_info()})) :: :ok
def register_clients(client_tracker, clients) do
GenServer.cast(client_tracker, {:register_clients, clients})
end
@doc """
Unregisteres connected clients.
"""
@spec unregister_clients(pid(), list(Runtime.client_id())) :: :ok
def unregister_clients(client_tracker, client_ids) do
GenServer.cast(client_tracker, {:unregister_clients, client_ids})
end
@doc """
Subscribes the given process to client presence events.
"""
@spec monitor_clients(pid(), pid) :: :ok
def monitor_clients(client_tracker, pid) do
GenServer.cast(client_tracker, {:monitor_clients, pid})
end
@impl true
def init({}) do
{:ok, %{clients: %{}, subscribers: MapSet.new()}}
end
@impl true
def handle_cast({:register_clients, clients}, state) do
for {client_id, _user_info} <- clients, pid <- state.subscribers do
send(pid, {:client_join, client_id})
end
state = update_in(state.clients, &Enum.into(clients, &1))
{:noreply, state}
end
def handle_cast({:unregister_clients, client_ids}, state) do
for client_id <- client_ids, pid <- state.subscribers do
send(pid, {:client_leave, client_id})
end
state = update_in(state.clients, &Map.drop(&1, client_ids))
{:noreply, state}
end
def handle_cast({:monitor_clients, pid}, state) do
Process.monitor(pid)
state = update_in(state.subscribers, &MapSet.put(&1, pid))
for {client_id, _user_info} <- state.clients do
send(pid, {:client_join, client_id})
end
{:noreply, state}
end
@impl true
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
state = update_in(state.subscribers, &MapSet.delete(&1, pid))
{:noreply, state}
end
end

View file

@ -29,6 +29,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
pid(),
pid(),
pid(),
pid(),
String.t() | nil,
String.t() | nil,
atom() | nil
@ -38,13 +39,15 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
send_to,
runtime_broadcast_to,
object_tracker,
client_tracker,
ebin_path,
tmp_dir,
registry
) do
GenServer.start(
__MODULE__,
{evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, tmp_dir, registry}
{evaluator, send_to, runtime_broadcast_to, object_tracker, client_tracker, ebin_path,
tmp_dir, registry}
)
end
@ -103,7 +106,8 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
@impl true
def init(
{evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, tmp_dir, registry}
{evaluator, send_to, runtime_broadcast_to, object_tracker, client_tracker, ebin_path,
tmp_dir, registry}
) do
evaluator_monitor = Process.monitor(evaluator)
@ -124,6 +128,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
send_to: send_to,
runtime_broadcast_to: runtime_broadcast_to,
object_tracker: object_tracker,
client_tracker: client_tracker,
ebin_path: ebin_path,
tmp_dir: tmp_dir,
tracer_info: %Evaluator.Tracer{},
@ -392,6 +397,11 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
{result, state}
end
defp io_request({:livebook_monitor_clients, pid}, state) do
Evaluator.ClientTracker.monitor_clients(state.client_tracker, pid)
{:ok, state}
end
defp io_request(_, state) do
{{:error, :request}, state}
end

View file

@ -2085,6 +2085,14 @@ defmodule Livebook.Session do
Runtime.restore_transient_state(runtime, state.data.runtime_transient_state)
end
clients =
for {client_id, user_id} <- state.data.clients_map do
user = Map.fetch!(state.data.users_map, user_id)
{client_id, user_info(user)}
end
Runtime.register_clients(runtime, clients)
%{state | runtime_monitor_ref: runtime_monitor_ref}
end
@ -2198,6 +2206,10 @@ defmodule Livebook.Session do
state = put_in(state.client_id_with_assets[client_id], %{})
if Runtime.connected?(state.data.runtime) do
Runtime.register_clients(state.data.runtime, [{client_id, user_info(user)}])
end
app_report_client_count_change(state)
schedule_auto_shutdown(state)
@ -2213,6 +2225,10 @@ defmodule Livebook.Session do
state = delete_client_files(state, client_id)
{_, state} = pop_in(state.client_id_with_assets[client_id])
if Runtime.connected?(state.data.runtime) do
Runtime.unregister_clients(state.data.runtime, [client_id])
end
app_report_client_count_change(state)
schedule_auto_shutdown(state)
@ -2836,13 +2852,7 @@ defmodule Livebook.Session do
info = %{type: :multi_session}
if user = state.started_by do
{type, _module, _key} = Livebook.Config.identity_provider()
started_by =
user
|> Map.take([:id, :name, :email])
|> Map.put(:source, type)
started_by = user_info(user)
Map.put(info, :started_by, started_by)
else
info
@ -2996,6 +3006,14 @@ defmodule Livebook.Session do
end
end
defp user_info(user) do
{type, _module, _key} = Livebook.Config.identity_provider()
user
|> Map.take([:id, :name, :email])
|> Map.put(:source, type)
end
# Normalizes output to match the most recent specification.
#
# Rewrites legacy output formats and adds defaults for newly introduced

View file

@ -340,4 +340,31 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do
RuntimeServer.evaluate_code(pid, :elixir, "1 + 1", {:c1, :e1}, [])
end
end
test "clients monitoring", %{pid: pid, test: test} do
# Pretend we are the subscriber
Process.register(self(), test)
code =
"""
pid = Process.whereis(#{inspect(test)})
ref = make_ref()
send(Process.group_leader(), {:io_request, self(), ref, {:livebook_monitor_clients, pid}})
receive do
{:io_reply, ^ref, :ok} -> :ok
end
"""
RuntimeServer.evaluate_code(pid, :elixir, code, {:c1, :e1}, [])
assert_receive {:runtime_evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}}
clients = [{"c1", %{id: "u1", name: "Jake Peralta", email: nil, source: :session}}]
RuntimeServer.register_clients(pid, clients)
assert_receive {:client_join, "c1"}
RuntimeServer.unregister_clients(pid, ["c1"])
assert_receive {:client_leave, "c1"}
end
end

View file

@ -0,0 +1,52 @@
defmodule Livebook.Runtime.Evaluator.ClientTrackerTest do
use ExUnit.Case, async: true
alias Livebook.Runtime.Evaluator.ClientTracker
setup do
{:ok, client_tracker} = start_supervised(ClientTracker)
%{client_tracker: client_tracker}
end
test "sends client joins to monitoring processes", %{client_tracker: client_tracker} do
ClientTracker.monitor_clients(client_tracker, self())
clients = [{"c1", user_info()}, {"c2", user_info()}]
ClientTracker.register_clients(client_tracker, clients)
assert_receive {:client_join, "c1"}
assert_receive {:client_join, "c2"}
end
test "sends client leaves to monitoring processes", %{client_tracker: client_tracker} do
ClientTracker.monitor_clients(client_tracker, self())
clients = [{"c1", user_info()}, {"c2", user_info()}]
ClientTracker.register_clients(client_tracker, clients)
ClientTracker.unregister_clients(client_tracker, ["c1"])
assert_receive {:client_leave, "c1"}
ClientTracker.unregister_clients(client_tracker, ["c2"])
assert_receive {:client_leave, "c2"}
end
test "sends existing client joins when monitoring starts", %{client_tracker: client_tracker} do
clients = [{"c1", user_info()}, {"c2", user_info()}]
ClientTracker.register_clients(client_tracker, clients)
ClientTracker.monitor_clients(client_tracker, self())
assert_receive {:client_join, "c1"}
assert_receive {:client_join, "c2"}
end
defp user_info() do
%{
id: "u1",
name: "Jake Peralta",
email: nil,
source: :session
}
end
end

View file

@ -8,9 +8,13 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do
setup do
{:ok, object_tracker} = start_supervised(Evaluator.ObjectTracker)
{:ok, client_tracker} = start_supervised(Evaluator.ClientTracker)
{:ok, _pid, evaluator} =
start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]})
start_supervised(
{Evaluator,
[send_to: self(), object_tracker: object_tracker, client_tracker: client_tracker]}
)
io = Process.info(evaluator.pid)[:group_leader]
IOProxy.before_evaluation(io, :ref, "cell")

View file

@ -1,4 +1,4 @@
defmodule Livebook.Runtime.Evaluator.ObjecTrackerTest do
defmodule Livebook.Runtime.Evaluator.ObjectTrackerTest do
use ExUnit.Case, async: true
alias Livebook.Runtime.Evaluator.ObjectTracker

View file

@ -17,13 +17,23 @@ defmodule Livebook.Runtime.EvaluatorTest do
end
{:ok, object_tracker} = start_supervised(Evaluator.ObjectTracker)
{:ok, client_tracker} = start_supervised(Evaluator.ClientTracker)
{:ok, _pid, evaluator} =
start_supervised(
{Evaluator, [send_to: self(), object_tracker: object_tracker, ebin_path: ebin_path]}
)
opts = [
send_to: self(),
object_tracker: object_tracker,
client_tracker: client_tracker,
ebin_path: ebin_path
]
%{evaluator: evaluator, object_tracker: object_tracker, ebin_path: ebin_path}
{:ok, _pid, evaluator} = start_supervised({Evaluator, opts})
%{
evaluator: evaluator,
object_tracker: object_tracker,
client_tracker: client_tracker,
ebin_path: ebin_path
}
end
defmacrop metadata do
@ -1188,11 +1198,9 @@ defmodule Livebook.Runtime.EvaluatorTest do
end
describe "initialize_from/3" do
setup %{object_tracker: object_tracker} do
{:ok, _pid, parent_evaluator} =
start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]},
id: :parent_evaluator
)
setup %{object_tracker: object_tracker, client_tracker: client_tracker} do
opts = [send_to: self(), object_tracker: object_tracker, client_tracker: client_tracker]
{:ok, _pid, parent_evaluator} = start_supervised({Evaluator, opts}, id: :parent_evaluator)
%{parent_evaluator: parent_evaluator}
end

View file

@ -71,6 +71,9 @@ defmodule Livebook.Runtime.NoopRuntime do
:ok
end
def register_clients(_, _), do: :ok
def unregister_clients(_, _), do: :ok
defp trace(runtime, fun, args) do
if runtime.trace_to do
send(runtime.trace_to, {:runtime_trace, fun, args})