Send status reports from apps manager (#2624)

This commit is contained in:
Jonatan Kłosko 2024-05-29 10:52:59 +02:00 committed by GitHub
parent a344a42c9f
commit 586c86454f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 132 additions and 13 deletions

View file

@ -50,6 +50,23 @@ defmodule Livebook.Apps.Manager do
GenServer.cast({:global, @name}, :sync_permanent_apps)
end
@doc """
Subscribes to manager reports.
The messages are only sent within the node that the manager runs on.
## Messages
* `{:apps_manager_status, status_entries}` - reports which permanent
app specs are running, and which are pending. Note that in some
cases the status may be sent, even if the entries do not change
"""
@spec subscribe() :: :ok | {:error, term()}
def subscribe() do
Phoenix.PubSub.subscribe(Livebook.PubSub, "apps_manager")
end
@impl true
def init({}) do
Apps.subscribe()
@ -132,40 +149,45 @@ defmodule Livebook.Apps.Manager do
defp sync_apps(state) do
permanent_app_specs = Apps.get_permanent_app_specs()
state = deploy_missing_apps(state, permanent_app_specs)
close_leftover_apps(permanent_app_specs)
permanent_apps = Enum.filter(Apps.list_apps(), & &1.permanent)
{state, up_to_date_app_specs} = deploy_missing_apps(state, permanent_app_specs)
close_leftover_apps(permanent_apps, permanent_app_specs)
broadcast_status(permanent_app_specs, up_to_date_app_specs, permanent_apps)
state
end
defp deploy_missing_apps(state, permanent_app_specs) do
for app_spec <- permanent_app_specs,
not Map.has_key?(state.deployments, app_spec.slug),
reduce: state do
state ->
reduce: {state, []} do
{state, up_to_date_app_specs} ->
case fetch_app(app_spec.slug) do
{:ok, _state, app} when app.app_spec.version == app_spec.version ->
state
{state, [app_spec | up_to_date_app_specs]}
{:ok, :reachable, app} ->
ref = redeploy(app, app_spec)
track_deployment(state, app_spec, ref)
state = track_deployment(state, app_spec, ref)
{state, up_to_date_app_specs}
{:ok, :unreachable, _app} ->
state
{state, up_to_date_app_specs}
:error ->
ref = deploy(app_spec)
track_deployment(state, app_spec, ref)
state = track_deployment(state, app_spec, ref)
{state, up_to_date_app_specs}
end
end
end
defp close_leftover_apps(permanent_app_specs) do
defp close_leftover_apps(permanent_apps, permanent_app_specs) do
permanent_slugs = MapSet.new(permanent_app_specs, & &1.slug)
for app <- Apps.list_apps(),
app.permanent,
app.slug not in permanent_slugs do
for app <- permanent_apps, app.slug not in permanent_slugs do
Livebook.App.close_async(app.pid)
end
end
@ -187,6 +209,32 @@ defmodule Livebook.Apps.Manager do
end
end
defp broadcast_status(permanent_app_specs, up_to_date_app_specs, permanent_apps) do
pending_app_specs = permanent_app_specs -- up_to_date_app_specs
running_app_specs = Enum.map(permanent_apps, & &1.app_spec)
# `up_to_date_app_specs` is the list of current permanent app
# specs that are already running. This information is based on
# :global and fetched directly from the processes, therefore it
# is more recent than the tracker and it may include app spec
# versions that the tracker does not know about yet. We combine
# this with information from the tracker (`running_app_specs`).
# Only one app spec may actually be running for the given slug,
# so we deduplicate, prioritizing `up_to_date_app_specs`.
running_app_specs = Enum.uniq_by(up_to_date_app_specs ++ running_app_specs, & &1.slug)
status_entries =
Enum.map(running_app_specs, &%{app_spec: &1, running?: true}) ++
Enum.map(pending_app_specs, &%{app_spec: &1, running?: false})
local_broadcast({:apps_manager_status, status_entries})
end
defp local_broadcast(message) do
Phoenix.PubSub.direct_broadcast!(node(), Livebook.PubSub, "apps_manager", message)
end
defp app_definitely_down?(slug) do
not Apps.exists?(slug) and Livebook.Tracker.fetch_app(slug) == :error
end

View file

@ -21,7 +21,8 @@ defmodule Livebook.Hubs.TeamClient do
file_systems: [],
deployment_groups: [],
app_deployments: [],
agents: []
agents: [],
app_deployment_statuses: nil
]
@type registry_name :: {:via, Registry, {Livebook.HubsRegistry, String.t()}}
@ -145,6 +146,8 @@ defmodule Livebook.Hubs.TeamClient do
@impl true
def init(%Hubs.Team{offline: nil} = team) do
Livebook.Apps.Manager.subscribe()
derived_key = Teams.derive_key(team.teams_key)
headers =
@ -269,6 +272,39 @@ defmodule Livebook.Hubs.TeamClient do
{:noreply, handle_event(topic, data, state)}
end
def handle_info({:apps_manager_status, status_entries}, state) do
app_deployment_statuses =
for %{
app_spec: %Livebook.Apps.TeamsAppSpec{} = app_spec,
running?: running?
} <- status_entries,
app_spec.hub_id == state.hub.id do
status = if(running?, do: :available, else: :processing)
%{version: app_spec.version, status: status}
end
# The manager can send the status list even if it didn't change,
# or it changed for non-teams app spec, so we check to send the
# event only when necessary
if app_deployment_statuses == state.app_deployment_statuses do
{:noreply, state}
else
# TODO: send this status list to Teams and set the statuses in
# the database. Note that app deployments for this deployment
# group (this agent), that are not present in this list, we
# effectively no longer know about, so we may want to reset
# their status.
# TODO: we want :version to be built on Teams server and just
# passed down to Livebook, so that Livebook does not care if
# we upsert app deployments or not. With that, we can also
# freely send the version with status here, and the server will
# recognise it.
{:noreply, %{state | app_deployment_statuses: app_deployment_statuses}}
end
end
@impl true
def handle_cast({:event, topic, data}, state) do
Logger.debug("Received event #{topic} with data: #{inspect(data)}")

View file

@ -104,6 +104,41 @@ defmodule Livebook.Apps.ManagerTest do
assert :global.whereis_name(Apps.Manager) == pid
end
test "sends status events about running app specs" do
slug = Livebook.Utils.random_short_id()
app_settings = %{Notebook.AppSettings.new() | slug: slug}
notebook = %{Notebook.new() | app_settings: app_settings}
app_spec = Apps.NotebookAppSpec.new(notebook)
Apps.Manager.subscribe()
Apps.set_startup_app_specs([app_spec])
Apps.Manager.sync_permanent_apps()
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]}
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: true}]}
# Version change
app_spec_v2 = %{app_spec | version: "2"}
Apps.set_startup_app_specs([app_spec_v2])
Apps.Manager.sync_permanent_apps()
assert_receive {:apps_manager_status,
[
%{app_spec: ^app_spec, running?: true},
%{app_spec: ^app_spec_v2, running?: false}
]}
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec_v2, running?: true}]}
# Restart
{:ok, app} = Apps.fetch_app(app_spec.slug)
Process.exit(app.pid, :kill)
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec_v2, running?: false}]}
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec_v2, running?: true}]}
end
defp path_app_spec(tmp_dir, slug) do
app_path = Path.join(tmp_dir, "app_#{slug}.livemd")