Send the apps manager report to Teams through WebSocket (#2647)

This commit is contained in:
Alexandre de Souza 2024-06-17 10:34:31 -03:00 committed by GitHub
parent 80da045f1d
commit e98fa825f6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 100 additions and 50 deletions

View file

@ -2,6 +2,7 @@ defmodule Livebook.Hubs.TeamClient do
use GenServer
require Logger
alias Livebook.Apps
alias Livebook.FileSystem
alias Livebook.FileSystems
alias Livebook.Hubs
@ -12,6 +13,7 @@ defmodule Livebook.Hubs.TeamClient do
@supervisor Livebook.HubsSupervisor
defstruct [
:connection_pid,
:hub,
:connection_status,
:derived_key,
@ -146,7 +148,7 @@ defmodule Livebook.Hubs.TeamClient do
@impl true
def init(%Hubs.Team{offline: nil} = team) do
Livebook.Apps.Manager.subscribe()
Apps.Manager.subscribe()
derived_key = Teams.derive_key(team.teams_key)
@ -169,8 +171,8 @@ defmodule Livebook.Hubs.TeamClient do
]
end
{:ok, _pid} = Teams.Connection.start_link(self(), headers)
{:ok, %__MODULE__{hub: team, derived_key: derived_key}}
{:ok, pid} = Teams.Connection.start_link(self(), headers)
{:ok, %__MODULE__{connection_pid: pid, hub: team, derived_key: derived_key}}
end
def init(%Hubs.Team{} = team) do
@ -272,15 +274,17 @@ defmodule Livebook.Hubs.TeamClient do
{:noreply, handle_event(topic, data, state)}
end
def handle_info({:apps_manager_status, status_entries}, state) do
def handle_info({:apps_manager_status, entries}, %{hub: %{id: id}} = state) do
app_deployment_statuses =
for %{
app_spec: %Livebook.Apps.TeamsAppSpec{} = app_spec,
running?: running?
} <- status_entries,
app_spec.hub_id == state.hub.id do
status = if(running?, do: :available, else: :processing)
%{version: app_spec.version, status: status}
for %{app_spec: %Apps.TeamsAppSpec{hub_id: ^id} = app_spec, running?: running?} <- entries do
status = if running?, do: :available, else: :preparing
%LivebookProto.AppDeploymentStatus{
id: app_spec.app_deployment_id,
deployment_group_id: state.deployment_group_id,
version: app_spec.version,
status: status
}
end
# The manager can send the status list even if it didn't change,
@ -289,17 +293,14 @@ defmodule Livebook.Hubs.TeamClient do
if app_deployment_statuses == state.app_deployment_statuses do
{:noreply, state}
else
# TODO: send this status list to Teams and set the statuses in
# the database. Note that app deployments for this deployment
# group (this agent), that are not present in this list, we
# effectively no longer know about, so we may want to reset
# their status.
report = %LivebookProto.AppDeploymentStatusReport{
app_deployment_statuses: app_deployment_statuses
}
# TODO: we want :version to be built on Teams server and just
# passed down to Livebook, so that Livebook does not care if
# we upsert app deployments or not. With that, we can also
# freely send the version with status here, and the server will
# recognise it.
Logger.debug("Sending apps manager report to Teams server #{inspect(report)}")
message = LivebookProto.AppDeploymentStatusReport.encode(report)
:ok = Teams.Connection.send_message(state.connection_pid, message)
{:noreply, %{state | app_deployment_statuses: app_deployment_statuses}}
end
@ -807,8 +808,8 @@ defmodule Livebook.Hubs.TeamClient do
defp manager_sync() do
# Each node runs the teams client, but we only need to call sync once
if Livebook.Apps.Manager.local?() do
Livebook.Apps.Manager.sync_permanent_apps()
if Apps.Manager.local?() do
Apps.Manager.sync_permanent_apps()
end
end
end

View file

@ -21,6 +21,10 @@ defmodule Livebook.Teams.Connection do
:gen_statem.start_link(__MODULE__, {listener, headers}, [])
end
def send_message(conn, message) do
:gen_statem.call(conn, {:message, message})
end
## gen_statem callbacks
@impl true
@ -83,6 +87,21 @@ defmodule Livebook.Teams.Connection do
:keep_state_and_data
end
def handle_event({:call, from}, {:message, message}, @no_state, data) do
case WebSocket.send(data.http_conn, data.websocket, data.ref, {:binary, message}) do
{:ok, conn, websocket} ->
:gen_statem.reply(from, :ok)
{:keep_state, %{data | http_conn: conn, websocket: websocket}}
{:error, conn, websocket, reason} ->
data = %__MODULE__{data | http_conn: conn, websocket: websocket}
send(data.listener, {:connection_error, reason})
:gen_statem.reply(from, {:error, reason})
{:keep_state, data, {:next_event, :internal, :connect}}
end
end
# Private
defp handle_websocket_message(message, %__MODULE__{} = data) do

View file

@ -1,8 +1,6 @@
defmodule LivebookProto.AppDeploymentStatusType do
use Protobuf, enum: true, syntax: :proto3, protoc_gen_elixir_version: "0.12.0"
field :connecting, 0
field :preparing, 1
field :available, 2
field :deactivated, 4
field :preparing, 0
field :available, 1
end

View file

@ -167,10 +167,8 @@ message Agent {
* Otherwise, it shouldn't be used.
*/
enum AppDeploymentStatusType {
connecting = 0;
preparing = 1;
available = 2;
deactivated = 4;
preparing = 0;
available = 1;
}
message AppDeploymentStatus {

View file

@ -686,10 +686,28 @@ defmodule Livebook.Hubs.TeamClientTest do
agent_connected = %{agent_connected | app_deployments: [livebook_proto_app_deployment]}
Livebook.Apps.subscribe()
Livebook.Apps.Manager.subscribe()
erpc_call(node, :subscribe, [self(), teams_deployment_group, teams_org])
assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{}
send(pid, {:event, :agent_connected, agent_connected})
assert_receive {:app_deployment_started, ^app_deployment}
assert_receive {:app_created, %{slug: ^slug}}
[app_spec] = Livebook.Hubs.Provider.get_app_specs(team)
Livebook.Apps.Manager.sync_permanent_apps()
assert_receive {:app_created, %{slug: ^slug}}, 3_000
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]}
assert_receive {:teams_broadcast, {:agent_updated, _agent}}
assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{
app_spec.version => %{
id: app_spec.app_deployment_id,
status: :preparing,
deployment_group_id: deployment_group_id
}
}
assert_receive {:app_updated,
%{
@ -698,11 +716,22 @@ defmodule Livebook.Hubs.TeamClientTest do
sessions: [%{app_status: %{execution: :executed}}]
}}
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: true}]}
assert app_deployment in TeamClient.get_app_deployments(team.id)
assert_receive {:teams_broadcast, {:agent_updated, _agent}}
assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{
app_spec.version => %{
id: app_spec.app_deployment_id,
status: :available,
deployment_group_id: deployment_group_id
}
}
erpc_call(node, :toggle_app_deployment, [app_deployment.id, teams_org.id])
agent_connected = %{agent_connected | app_deployments: []}
send(pid, {:event, :agent_connected, agent_connected})
assert_receive {:app_deployment_stopped, ^app_deployment}
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]}
refute app_deployment in TeamClient.get_app_deployments(team.id)
assert_receive {:app_closed,
@ -711,6 +740,9 @@ defmodule Livebook.Hubs.TeamClientTest do
warnings: [],
sessions: [%{app_status: %{execution: :executed}}]
}}
assert_receive {:teams_broadcast, {:agent_updated, _agent}}
assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{}
end
test "dispatches the agents list",

View file

@ -250,14 +250,6 @@ defmodule Livebook.HubsTest do
Hubs.Provider.verify_notebook_stamp(team, notebook_source <> "change\n", stamp)
end
defp connect_to_teams(user, node) do
%{id: id} = team = create_team_hub(user, node)
assert_receive {:hub_connected, ^id}
assert_receive {:client_connected, ^id}
team
end
defp secret_name(%{id: id}) do
id
|> String.replace("-", "_")

View file

@ -258,12 +258,4 @@ defmodule Livebook.TeamsTest do
assert_receive {:app_deployment_stopped, ^app_deployment2}
end
end
defp connect_to_teams(user, node) do
%{id: id} = team = create_team_hub(user, node)
assert_receive {:hub_connected, ^id}, 3_000
assert_receive {:client_connected, ^id}, 3_000
team
end
end

View file

@ -281,6 +281,24 @@ defmodule Livebook.HubHelpers do
assert_receive {:agent_joined, ^agent}
end
@doc """
Creates a new Team hub from given user and node, and await the WebSocket to be connected.
test "my test", %{user: user, node: node} do
team = connect_to_teams(user, node)
assert "team-" <> _ = team.id
end
"""
@spec connect_to_teams(struct(), node()) :: Livebook.Hubs.Team.t()
def connect_to_teams(user, node) do
%{id: id} = team = create_team_hub(user, node)
assert_receive {:hub_connected, ^id}, 3_000
assert_receive {:client_connected, ^id}, 3_000
team
end
defp hub_pid(hub) do
if pid = GenServer.whereis({:via, Registry, {Livebook.HubsRegistry, hub.id}}) do
{:ok, pid}