From e98fa825f63548eaade88a86a274eaf99b427461 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Mon, 17 Jun 2024 10:34:31 -0300 Subject: [PATCH] Send the apps manager report to Teams through WebSocket (#2647) --- lib/livebook/hubs/team_client.ex | 47 ++++++++++--------- lib/livebook/teams/connection.ex | 19 ++++++++ .../app_deployment_status_type.pb.ex | 6 +-- proto/messages.proto | 6 +-- test/livebook_teams/hubs/team_client_test.exs | 38 +++++++++++++-- test/livebook_teams/hubs_test.exs | 8 ---- test/livebook_teams/teams_test.exs | 8 ---- test/support/hub_helpers.ex | 18 +++++++ 8 files changed, 100 insertions(+), 50 deletions(-) diff --git a/lib/livebook/hubs/team_client.ex b/lib/livebook/hubs/team_client.ex index 7328fb49c..dc06bf5ed 100644 --- a/lib/livebook/hubs/team_client.ex +++ b/lib/livebook/hubs/team_client.ex @@ -2,6 +2,7 @@ defmodule Livebook.Hubs.TeamClient do use GenServer require Logger + alias Livebook.Apps alias Livebook.FileSystem alias Livebook.FileSystems alias Livebook.Hubs @@ -12,6 +13,7 @@ defmodule Livebook.Hubs.TeamClient do @supervisor Livebook.HubsSupervisor defstruct [ + :connection_pid, :hub, :connection_status, :derived_key, @@ -146,7 +148,7 @@ defmodule Livebook.Hubs.TeamClient do @impl true def init(%Hubs.Team{offline: nil} = team) do - Livebook.Apps.Manager.subscribe() + Apps.Manager.subscribe() derived_key = Teams.derive_key(team.teams_key) @@ -169,8 +171,8 @@ defmodule Livebook.Hubs.TeamClient do ] end - {:ok, _pid} = Teams.Connection.start_link(self(), headers) - {:ok, %__MODULE__{hub: team, derived_key: derived_key}} + {:ok, pid} = Teams.Connection.start_link(self(), headers) + {:ok, %__MODULE__{connection_pid: pid, hub: team, derived_key: derived_key}} end def init(%Hubs.Team{} = team) do @@ -272,15 +274,17 @@ defmodule Livebook.Hubs.TeamClient do {:noreply, handle_event(topic, data, state)} end - def handle_info({:apps_manager_status, status_entries}, state) do + def handle_info({:apps_manager_status, entries}, %{hub: %{id: id}} = state) do app_deployment_statuses = - for %{ - app_spec: %Livebook.Apps.TeamsAppSpec{} = app_spec, - running?: running? - } <- status_entries, - app_spec.hub_id == state.hub.id do - status = if(running?, do: :available, else: :processing) - %{version: app_spec.version, status: status} + for %{app_spec: %Apps.TeamsAppSpec{hub_id: ^id} = app_spec, running?: running?} <- entries do + status = if running?, do: :available, else: :preparing + + %LivebookProto.AppDeploymentStatus{ + id: app_spec.app_deployment_id, + deployment_group_id: state.deployment_group_id, + version: app_spec.version, + status: status + } end # The manager can send the status list even if it didn't change, @@ -289,17 +293,14 @@ defmodule Livebook.Hubs.TeamClient do if app_deployment_statuses == state.app_deployment_statuses do {:noreply, state} else - # TODO: send this status list to Teams and set the statuses in - # the database. Note that app deployments for this deployment - # group (this agent), that are not present in this list, we - # effectively no longer know about, so we may want to reset - # their status. + report = %LivebookProto.AppDeploymentStatusReport{ + app_deployment_statuses: app_deployment_statuses + } - # TODO: we want :version to be built on Teams server and just - # passed down to Livebook, so that Livebook does not care if - # we upsert app deployments or not. With that, we can also - # freely send the version with status here, and the server will - # recognise it. + Logger.debug("Sending apps manager report to Teams server #{inspect(report)}") + + message = LivebookProto.AppDeploymentStatusReport.encode(report) + :ok = Teams.Connection.send_message(state.connection_pid, message) {:noreply, %{state | app_deployment_statuses: app_deployment_statuses}} end @@ -807,8 +808,8 @@ defmodule Livebook.Hubs.TeamClient do defp manager_sync() do # Each node runs the teams client, but we only need to call sync once - if Livebook.Apps.Manager.local?() do - Livebook.Apps.Manager.sync_permanent_apps() + if Apps.Manager.local?() do + Apps.Manager.sync_permanent_apps() end end end diff --git a/lib/livebook/teams/connection.ex b/lib/livebook/teams/connection.ex index 3c46c87bc..3747e1524 100644 --- a/lib/livebook/teams/connection.ex +++ b/lib/livebook/teams/connection.ex @@ -21,6 +21,10 @@ defmodule Livebook.Teams.Connection do :gen_statem.start_link(__MODULE__, {listener, headers}, []) end + def send_message(conn, message) do + :gen_statem.call(conn, {:message, message}) + end + ## gen_statem callbacks @impl true @@ -83,6 +87,21 @@ defmodule Livebook.Teams.Connection do :keep_state_and_data end + def handle_event({:call, from}, {:message, message}, @no_state, data) do + case WebSocket.send(data.http_conn, data.websocket, data.ref, {:binary, message}) do + {:ok, conn, websocket} -> + :gen_statem.reply(from, :ok) + {:keep_state, %{data | http_conn: conn, websocket: websocket}} + + {:error, conn, websocket, reason} -> + data = %__MODULE__{data | http_conn: conn, websocket: websocket} + send(data.listener, {:connection_error, reason}) + :gen_statem.reply(from, {:error, reason}) + + {:keep_state, data, {:next_event, :internal, :connect}} + end + end + # Private defp handle_websocket_message(message, %__MODULE__{} = data) do diff --git a/proto/lib/livebook_proto/app_deployment_status_type.pb.ex b/proto/lib/livebook_proto/app_deployment_status_type.pb.ex index ea6c36bf9..66f3a40b8 100644 --- a/proto/lib/livebook_proto/app_deployment_status_type.pb.ex +++ b/proto/lib/livebook_proto/app_deployment_status_type.pb.ex @@ -1,8 +1,6 @@ defmodule LivebookProto.AppDeploymentStatusType do use Protobuf, enum: true, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" - field :connecting, 0 - field :preparing, 1 - field :available, 2 - field :deactivated, 4 + field :preparing, 0 + field :available, 1 end diff --git a/proto/messages.proto b/proto/messages.proto index 454bf9d5f..f2550880b 100644 --- a/proto/messages.proto +++ b/proto/messages.proto @@ -167,10 +167,8 @@ message Agent { * Otherwise, it shouldn't be used. */ enum AppDeploymentStatusType { - connecting = 0; - preparing = 1; - available = 2; - deactivated = 4; + preparing = 0; + available = 1; } message AppDeploymentStatus { diff --git a/test/livebook_teams/hubs/team_client_test.exs b/test/livebook_teams/hubs/team_client_test.exs index c126569ba..6d09b6ad1 100644 --- a/test/livebook_teams/hubs/team_client_test.exs +++ b/test/livebook_teams/hubs/team_client_test.exs @@ -686,10 +686,28 @@ defmodule Livebook.Hubs.TeamClientTest do agent_connected = %{agent_connected | app_deployments: [livebook_proto_app_deployment]} Livebook.Apps.subscribe() + Livebook.Apps.Manager.subscribe() + erpc_call(node, :subscribe, [self(), teams_deployment_group, teams_org]) + + assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{} send(pid, {:event, :agent_connected, agent_connected}) assert_receive {:app_deployment_started, ^app_deployment} - assert_receive {:app_created, %{slug: ^slug}} + + [app_spec] = Livebook.Hubs.Provider.get_app_specs(team) + Livebook.Apps.Manager.sync_permanent_apps() + + assert_receive {:app_created, %{slug: ^slug}}, 3_000 + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]} + assert_receive {:teams_broadcast, {:agent_updated, _agent}} + + assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{ + app_spec.version => %{ + id: app_spec.app_deployment_id, + status: :preparing, + deployment_group_id: deployment_group_id + } + } assert_receive {:app_updated, %{ @@ -698,11 +716,22 @@ defmodule Livebook.Hubs.TeamClientTest do sessions: [%{app_status: %{execution: :executed}}] }} + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: true}]} assert app_deployment in TeamClient.get_app_deployments(team.id) + assert_receive {:teams_broadcast, {:agent_updated, _agent}} + + assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{ + app_spec.version => %{ + id: app_spec.app_deployment_id, + status: :available, + deployment_group_id: deployment_group_id + } + } + + erpc_call(node, :toggle_app_deployment, [app_deployment.id, teams_org.id]) - agent_connected = %{agent_connected | app_deployments: []} - send(pid, {:event, :agent_connected, agent_connected}) assert_receive {:app_deployment_stopped, ^app_deployment} + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]} refute app_deployment in TeamClient.get_app_deployments(team.id) assert_receive {:app_closed, @@ -711,6 +740,9 @@ defmodule Livebook.Hubs.TeamClientTest do warnings: [], sessions: [%{app_status: %{execution: :executed}}] }} + + assert_receive {:teams_broadcast, {:agent_updated, _agent}} + assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{} end test "dispatches the agents list", diff --git a/test/livebook_teams/hubs_test.exs b/test/livebook_teams/hubs_test.exs index 1fd77f51e..049f6190b 100644 --- a/test/livebook_teams/hubs_test.exs +++ b/test/livebook_teams/hubs_test.exs @@ -250,14 +250,6 @@ defmodule Livebook.HubsTest do Hubs.Provider.verify_notebook_stamp(team, notebook_source <> "change\n", stamp) end - defp connect_to_teams(user, node) do - %{id: id} = team = create_team_hub(user, node) - assert_receive {:hub_connected, ^id} - assert_receive {:client_connected, ^id} - - team - end - defp secret_name(%{id: id}) do id |> String.replace("-", "_") diff --git a/test/livebook_teams/teams_test.exs b/test/livebook_teams/teams_test.exs index cb343a63a..c02dae797 100644 --- a/test/livebook_teams/teams_test.exs +++ b/test/livebook_teams/teams_test.exs @@ -258,12 +258,4 @@ defmodule Livebook.TeamsTest do assert_receive {:app_deployment_stopped, ^app_deployment2} end end - - defp connect_to_teams(user, node) do - %{id: id} = team = create_team_hub(user, node) - assert_receive {:hub_connected, ^id}, 3_000 - assert_receive {:client_connected, ^id}, 3_000 - - team - end end diff --git a/test/support/hub_helpers.ex b/test/support/hub_helpers.ex index a4bacc2b4..022cbcdaa 100644 --- a/test/support/hub_helpers.ex +++ b/test/support/hub_helpers.ex @@ -281,6 +281,24 @@ defmodule Livebook.HubHelpers do assert_receive {:agent_joined, ^agent} end + @doc """ + Creates a new Team hub from given user and node, and await the WebSocket to be connected. + + test "my test", %{user: user, node: node} do + team = connect_to_teams(user, node) + assert "team-" <> _ = team.id + end + + """ + @spec connect_to_teams(struct(), node()) :: Livebook.Hubs.Team.t() + def connect_to_teams(user, node) do + %{id: id} = team = create_team_hub(user, node) + assert_receive {:hub_connected, ^id}, 3_000 + assert_receive {:client_connected, ^id}, 3_000 + + team + end + defp hub_pid(hub) do if pid = GenServer.whereis({:via, Registry, {Livebook.HubsRegistry, hub.id}}) do {:ok, pid}