livebook/lib/livebook/sessions.ex
Jonatan Kłosko 4ff1ff0d5a
Use Phoenix.Tracker to keep track of sessions within the cluster (#540)
* Use Phoenix.Tracker to keep track of sessions within the cluster

* Apply review comments

* Cleanup topics and updates

* Update lib/livebook_web/live/session_live.ex
2021-09-04 19:16:01 +02:00

83 lines
2.4 KiB
Elixir

defmodule Livebook.Sessions do
@moduledoc false
# This module is responsible for starting and discovering sessions.
#
# Every session has a server process and is described by a `%Session{}`
# info struct. Information about all sessions in the cluster is
# propagated using `Livebook.Tracker`, which serves as an ephemeral
# distributed database for the `%Session{}` structs.
alias Livebook.{Session, Utils}
@doc """
Spawns a new `Session` process with the given options.
Makes the session globally visible within the session tracker.
`{:session_created, session}` gets broadcasted under the "tracker_sessions" topic.
"""
@spec create_session(keyword()) :: {:ok, Session.t()} | {:error, any()}
def create_session(opts \\ []) do
id = Utils.random_node_aware_id()
opts = Keyword.put(opts, :id, id)
case DynamicSupervisor.start_child(Livebook.SessionSupervisor, {Session, opts}) do
{:ok, pid} ->
session = Session.get_by_pid(pid)
case Livebook.Tracker.track_session(session) do
:ok ->
{:ok, session}
{:error, reason} ->
Session.close(pid)
{:error, reason}
end
{:error, reason} ->
{:error, reason}
end
end
@doc """
Returns all the running sessions.
"""
@spec list_sessions() :: list(Session.t())
def list_sessions() do
Livebook.Tracker.list_sessions()
end
@doc """
Returns tracked session with the given id.
"""
@spec fetch_session(Session.id()) :: {:ok, Session.t()} | :error
def fetch_session(id) do
case Livebook.Tracker.fetch_session(id) do
{:ok, session} ->
{:ok, session}
:error ->
# The local tracker server doesn't know about this session,
# but it may not have propagated yet, so we extract the session
# node from id and ask the corresponding tracker directly
with {:ok, other_node} when other_node != node() <- Utils.node_from_node_aware_id(id),
{:ok, session} <- :rpc.call(other_node, Livebook.Tracker, :fetch_session, [id]) do
{:ok, session}
else
_ -> :error
end
end
end
@doc """
Updates the given session info across the cluster.
`{:session_updated, session}` gets broadcasted under the "tracker_sessions" topic.
"""
@spec update_session(Session.t()) :: :ok | {:error, any()}
def update_session(session) do
Livebook.Tracker.update_session(session)
end
end