diff --git a/lib/livebook/hubs/team_client.ex b/lib/livebook/hubs/team_client.ex index f808030e7..4a2241ea4 100644 --- a/lib/livebook/hubs/team_client.ex +++ b/lib/livebook/hubs/team_client.ex @@ -20,7 +20,8 @@ defmodule Livebook.Hubs.TeamClient do secrets: [], file_systems: [], deployment_groups: [], - app_deployments: [] + app_deployments: [], + agents: [] ] @type registry_name :: {:via, Registry, {Livebook.HubsRegistry, String.t()}} @@ -114,6 +115,14 @@ defmodule Livebook.Hubs.TeamClient do GenServer.call(registry_name(id), {:get_app_deployment_download_info, app_deployment_id}) end + @doc """ + Returns a list of cached agents. + """ + @spec get_agents(String.t()) :: list(Teams.Agent.t()) + def get_agents(id) do + GenServer.call(registry_name(id), :get_agents) + end + @doc """ Returns if the Team client is connected. """ @@ -230,6 +239,10 @@ defmodule Livebook.Hubs.TeamClient do {:reply, reply, state} end + def handle_call(:get_agents, _caller, state) do + {:reply, state.agents, state} + end + @impl true def handle_info(:connected, state) do Hubs.Broadcasts.hub_connected(state.hub.id) @@ -400,12 +413,13 @@ defmodule Livebook.Hubs.TeamClient do } end - defp build_app_deployment(%LivebookProto.AppDeployment{} = app_deployment) do + defp build_app_deployment(state, %LivebookProto.AppDeployment{} = app_deployment) do %Teams.AppDeployment{ id: app_deployment.id, slug: app_deployment.slug, sha: app_deployment.sha, title: app_deployment.title, + hub_id: state.hub.id, deployment_group_id: app_deployment.deployment_group_id, file: nil, deployed_by: app_deployment.deployed_by, @@ -413,6 +427,26 @@ defmodule Livebook.Hubs.TeamClient do } end + defp put_agent(state, agent) do + state = remove_agent(state, agent) + + %{state | agents: [agent | state.agents]} + end + + defp remove_agent(state, agent) do + %{state | agents: Enum.reject(state.agents, &(&1.id == agent.id))} + end + + defp build_agent(state, %LivebookProto.Agent{} = agent) do + %Livebook.Teams.Agent{ + id: agent.id, + name: agent.name, + hub_id: state.hub.id, + org_id: agent.org_id, + deployment_group_id: agent.deployment_group_id + } + end + defp handle_event(:secret_created, %Secrets.Secret{} = secret, state) do Hubs.Broadcasts.secret_created(secret) @@ -433,12 +467,14 @@ defmodule Livebook.Hubs.TeamClient do handle_event(:secret_updated, build_secret(state, secret_updated), state) end - defp handle_event(:secret_deleted, secret_deleted, state) do - if secret = Enum.find(state.secrets, &(&1.name == secret_deleted.name)) do - Hubs.Broadcasts.secret_deleted(secret) - remove_secret(state, secret) - else - state + defp handle_event(:secret_deleted, %Secrets.Secret{} = secret, state) do + Hubs.Broadcasts.secret_deleted(secret) + remove_secret(state, secret) + end + + defp handle_event(:secret_deleted, %{name: name}, state) do + with {:ok, secret} <- fetch_secret(name, state) do + handle_event(:secret_deleted, secret, state) end end @@ -464,15 +500,12 @@ defmodule Livebook.Hubs.TeamClient do defp handle_event(:file_system_deleted, %{external_id: _} = file_system, state) do Hubs.Broadcasts.file_system_deleted(file_system) - remove_file_system(state, file_system) end - defp handle_event(:file_system_deleted, %{id: id}, state) do - if file_system = Enum.find(state.file_systems, &(&1.external_id == id)) do + defp handle_event(:file_system_deleted, %{id: external_id}, state) do + with {:ok, file_system} <- fetch_file_system(external_id, state) do handle_event(:file_system_deleted, file_system, state) - else - state end end @@ -503,11 +536,14 @@ defmodule Livebook.Hubs.TeamClient do ) end - defp handle_event(:deployment_group_deleted, deployment_group_deleted, state) do - with {:ok, deployment_group} <- fetch_deployment_group(deployment_group_deleted.id, state) do - Teams.Broadcasts.deployment_group_deleted(deployment_group) + defp handle_event(:deployment_group_deleted, %Teams.DeploymentGroup{} = deployment_group, state) do + Teams.Broadcasts.deployment_group_deleted(deployment_group) + remove_deployment_group(state, deployment_group) + end - remove_deployment_group(state, deployment_group) + defp handle_event(:deployment_group_deleted, %{id: id}, state) do + with {:ok, deployment_group} <- fetch_deployment_group(id, state) do + handle_event(:deployment_group_deleted, deployment_group, state) end end @@ -517,6 +553,7 @@ defmodule Livebook.Hubs.TeamClient do |> dispatch_file_systems(user_connected) |> dispatch_deployment_groups(user_connected) |> dispatch_app_deployments(user_connected) + |> dispatch_agents(user_connected) end defp handle_event(:agent_connected, agent_connected, state) do @@ -526,14 +563,15 @@ defmodule Livebook.Hubs.TeamClient do |> dispatch_file_systems(agent_connected) |> dispatch_deployment_groups(agent_connected) |> dispatch_app_deployments(agent_connected) + |> dispatch_agents(agent_connected) end defp handle_event(:app_deployment_created, %Teams.AppDeployment{} = app_deployment, state) do deployment_group_id = app_deployment.deployment_group_id with {:ok, deployment_group} <- fetch_deployment_group(deployment_group_id, state) do - state = put_app_deployment(state, app_deployment) Teams.Broadcasts.app_deployment_created(app_deployment) + state = put_app_deployment(state, app_deployment) if deployment_group.id == state.deployment_group_id do manager_sync() @@ -546,11 +584,31 @@ defmodule Livebook.Hubs.TeamClient do defp handle_event(:app_deployment_created, app_deployment_created, state) do handle_event( :app_deployment_created, - build_app_deployment(app_deployment_created.app_deployment), + build_app_deployment(state, app_deployment_created.app_deployment), state ) end + defp handle_event(:agent_joined, %Teams.Agent{} = agent, state) do + Teams.Broadcasts.agent_joined(agent) + put_agent(state, agent) + end + + defp handle_event(:agent_joined, agent_joined, state) do + handle_event(:agent_joined, build_agent(state, agent_joined.agent), state) + end + + defp handle_event(:agent_left, %Teams.Agent{} = agent, state) do + Teams.Broadcasts.agent_left(agent) + remove_agent(state, agent) + end + + defp handle_event(:agent_left, %{id: id}, state) do + with {:ok, agent} <- fetch_agent(id, state) do + handle_event(:agent_left, agent, state) + end + end + defp dispatch_secrets(state, %{secrets: secrets}) do decrypted_secrets = Enum.map(secrets, &build_secret(state, &1)) @@ -601,12 +659,19 @@ defmodule Livebook.Hubs.TeamClient do end defp dispatch_app_deployments(state, %{app_deployments: app_deployments}) do - app_deployments = Enum.map(app_deployments, &build_app_deployment/1) + app_deployments = Enum.map(app_deployments, &build_app_deployment(state, &1)) {created, _, _} = diff(state.app_deployments, app_deployments, &(&1.id == &2.id)) dispatch_events(state, app_deployment_created: created) end + defp dispatch_agents(state, %{agents: agents}) do + agents = Enum.map(agents, &build_agent(state, &1)) + {joined, left, _} = diff(state.agents, agents, &(&1.id == &2.id)) + + dispatch_events(state, agent_joined: joined, agent_left: left) + end + defp update_hub(state, %{public_key: org_public_key}) do hub = %{state.hub | org_public_key: org_public_key} @@ -638,9 +703,19 @@ defmodule Livebook.Hubs.TeamClient do defp find_deployment_group(nil, _), do: nil defp find_deployment_group(id, groups), do: Enum.find(groups, &(&1.id == id)) - defp fetch_deployment_group(id, state) do - if deployment_group = find_deployment_group(id, state.deployment_groups) do - {:ok, deployment_group} + defp fetch_deployment_group(id, state), + do: fetch_entry(state.deployment_groups, &(&1.id == id), state) + + defp fetch_secret(name, state), do: fetch_entry(state.secrets, &(&1.name == name), state) + + defp fetch_file_system(external_id, state), + do: fetch_entry(state.file_systems, &(&1.external_id == external_id), state) + + defp fetch_agent(id, state), do: fetch_entry(state.agents, &(&1.id == id), state) + + defp fetch_entry(entries, fun, state) do + if entry = Enum.find(entries, fun) do + {:ok, entry} else state end diff --git a/lib/livebook/teams.ex b/lib/livebook/teams.ex index 4a0300737..95d1c7c4c 100644 --- a/lib/livebook/teams.ex +++ b/lib/livebook/teams.ex @@ -4,7 +4,7 @@ defmodule Livebook.Teams do alias Livebook.Hubs alias Livebook.Hubs.Team alias Livebook.Hubs.TeamClient - alias Livebook.Teams.{AppDeployment, DeploymentGroup, Org, Requests} + alias Livebook.Teams.{Agent, AppDeployment, DeploymentGroup, Org, Requests} import Ecto.Changeset, only: [add_error: 3, apply_action: 2, apply_action!: 2, get_field: 2] @@ -221,4 +221,12 @@ defmodule Livebook.Teams do def get_app_deployments(team) do TeamClient.get_app_deployments(team.id) end + + @doc """ + Gets a list of agents for a given Hub. + """ + @spec get_agents(Team.t()) :: list(Agent.t()) + def get_agents(team) do + TeamClient.get_agents(team.id) + end end diff --git a/lib/livebook/teams/agent.ex b/lib/livebook/teams/agent.ex new file mode 100644 index 000000000..8bc118b53 --- /dev/null +++ b/lib/livebook/teams/agent.ex @@ -0,0 +1,19 @@ +defmodule Livebook.Teams.Agent do + use Ecto.Schema + + @type t :: %__MODULE__{ + id: String.t() | nil, + name: String.t() | nil, + hub_id: String.t() | nil, + org_id: String.t() | nil, + deployment_group_id: String.t() | nil + } + + @primary_key {:id, :string, autogenerate: false} + embedded_schema do + field :name, :string + field :hub_id, :string + field :org_id, :string + field :deployment_group_id, :string + end +end diff --git a/lib/livebook/teams/app_deployment.ex b/lib/livebook/teams/app_deployment.ex index c105bc793..748dff8dd 100644 --- a/lib/livebook/teams/app_deployment.ex +++ b/lib/livebook/teams/app_deployment.ex @@ -9,6 +9,7 @@ defmodule Livebook.Teams.AppDeployment do slug: String.t() | nil, sha: String.t() | nil, title: String.t() | nil, + hub_id: String.t() | nil, deployment_group_id: String.t() | nil, file: binary() | nil, deployed_by: String.t() | nil, @@ -20,6 +21,7 @@ defmodule Livebook.Teams.AppDeployment do field :slug, :string field :sha, :string field :title, :string + field :hub_id, :string field :deployment_group_id, :string field :file, :string field :deployed_by, :string @@ -45,6 +47,7 @@ defmodule Livebook.Teams.AppDeployment do slug: notebook.app_settings.slug, sha: shasum, title: notebook.name, + hub_id: notebook.hub_id, deployment_group_id: notebook.deployment_group_id, file: zip_content }} diff --git a/lib/livebook/teams/broadcasts.ex b/lib/livebook/teams/broadcasts.ex index 56125b9fa..8e34ecce7 100644 --- a/lib/livebook/teams/broadcasts.ex +++ b/lib/livebook/teams/broadcasts.ex @@ -1,10 +1,11 @@ defmodule Livebook.Teams.Broadcasts do - alias Livebook.Teams.{AppDeployment, DeploymentGroup} + alias Livebook.Teams.{Agent, AppDeployment, DeploymentGroup} @type broadcast :: :ok | {:error, term()} @deployment_groups_topic "teams:deployment_groups" @app_deployments_topic "teams:app_deployments" + @agents_topic "teams:agents" @doc """ Subscribes to one or more subtopics in `"teams"`. @@ -21,6 +22,11 @@ defmodule Livebook.Teams.Broadcasts do * `{:app_deployment_created, AppDeployment.t()}` + Topic `#{@agents_topic}`: + + * `{:agent_joined, Agent.t()}` + * `{:agent_left, Agent.t()}` + """ @spec subscribe(atom() | list(atom())) :: :ok | {:error, term()} def subscribe(topics) when is_list(topics) do @@ -79,6 +85,22 @@ defmodule Livebook.Teams.Broadcasts do broadcast(@app_deployments_topic, {:app_deployment_created, app_deployment}) end + @doc """ + Broadcasts under `#{@agents_topic}` topic when hub received a new agent. + """ + @spec agent_joined(Agent.t()) :: broadcast() + def agent_joined(%Agent{} = agent) do + broadcast(@agents_topic, {:agent_joined, agent}) + end + + @doc """ + Broadcasts under `#{@agents_topic}` topic when hub received a deleted agent. + """ + @spec agent_left(Agent.t()) :: broadcast() + def agent_left(%Agent{} = agent) do + broadcast(@agents_topic, {:agent_left, agent}) + end + defp broadcast(topic, message) do Phoenix.PubSub.broadcast(Livebook.PubSub, topic, message) end diff --git a/lib/livebook_web/live/hub/edit/team_component.ex b/lib/livebook_web/live/hub/edit/team_component.ex index 3606d48b1..dd2990100 100644 --- a/lib/livebook_web/live/hub/edit/team_component.ex +++ b/lib/livebook_web/live/hub/edit/team_component.ex @@ -16,6 +16,7 @@ defmodule LivebookWeb.Hub.Edit.TeamComponent do file_systems = Hubs.get_file_systems(assigns.hub, hub_only: true) deployment_groups = Teams.get_deployment_groups(assigns.hub) app_deployments = Teams.get_app_deployments(assigns.hub) + agents = Teams.get_agents(assigns.hub) secret_name = assigns.params["secret_name"] file_system_id = assigns.params["file_system_id"] default? = default_hub?(assigns.hub) @@ -41,6 +42,7 @@ defmodule LivebookWeb.Hub.Edit.TeamComponent do file_systems: file_systems, deployment_groups: Enum.sort_by(deployment_groups, & &1.name), app_deployments: Enum.frequencies_by(app_deployments, & &1.deployment_group_id), + agents: Enum.frequencies_by(agents, & &1.deployment_group_id), show_key: show_key, secret_name: secret_name, secret_value: secret_value, @@ -222,6 +224,7 @@ defmodule LivebookWeb.Hub.Edit.TeamComponent do hub={@hub} deployment_group={deployment_group} app_deployments_count={Map.get(@app_deployments, deployment_group.id, 0)} + agents_count={Map.get(@agents, deployment_group.id, 0)} live_action={@live_action} params={@params} /> diff --git a/lib/livebook_web/live/hub/edit_live.ex b/lib/livebook_web/live/hub/edit_live.ex index b50cb824b..7d1666616 100644 --- a/lib/livebook_web/live/hub/edit_live.ex +++ b/lib/livebook_web/live/hub/edit_live.ex @@ -11,7 +11,7 @@ defmodule LivebookWeb.Hub.EditLive do def mount(_params, _session, socket) do if connected?(socket) do Hubs.Broadcasts.subscribe([:connection]) - Livebook.Teams.Broadcasts.subscribe([:deployment_groups, :app_deployments]) + Livebook.Teams.Broadcasts.subscribe([:deployment_groups, :app_deployments, :agents]) end {:ok, diff --git a/lib/livebook_web/live/hub/teams/deployment_group_component.ex b/lib/livebook_web/live/hub/teams/deployment_group_component.ex index c01977d0f..bf8a429f0 100644 --- a/lib/livebook_web/live/hub/teams/deployment_group_component.ex +++ b/lib/livebook_web/live/hub/teams/deployment_group_component.ex @@ -54,7 +54,9 @@ defmodule LivebookWeb.Hub.Teams.DeploymentGroupComponent do
<.labeled_text class="grow mt-6 lg:border-l lg:pl-4" label="Instances running"> - -1 + + <%= @agents_count %> + <.link patch={~p"/hub/#{@hub.id}/groups/#{@deployment_group.id}/agents/new"} class="pl-2 text-blue-600" diff --git a/proto/lib/livebook_proto/agent_left.pb.ex b/proto/lib/livebook_proto/agent_left.pb.ex index 781ea432f..628236e54 100644 --- a/proto/lib/livebook_proto/agent_left.pb.ex +++ b/proto/lib/livebook_proto/agent_left.pb.ex @@ -1,5 +1,5 @@ defmodule LivebookProto.AgentLeft do use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" - field :agent, 1, type: LivebookProto.Agent + field :id, 1, type: :string end diff --git a/proto/messages.proto b/proto/messages.proto index 6c7ae6be6..05ccb1159 100644 --- a/proto/messages.proto +++ b/proto/messages.proto @@ -140,7 +140,7 @@ message AgentJoined { } message AgentLeft { - Agent agent = 1; + string id = 1; } message Agent { diff --git a/test/livebook_teams/hubs/team_client_test.exs b/test/livebook_teams/hubs/team_client_test.exs index b29efc724..cc5d12596 100644 --- a/test/livebook_teams/hubs/team_client_test.exs +++ b/test/livebook_teams/hubs/team_client_test.exs @@ -5,7 +5,7 @@ defmodule Livebook.Hubs.TeamClientTest do setup do Livebook.Hubs.Broadcasts.subscribe([:connection, :file_systems, :secrets]) - Livebook.Teams.Broadcasts.subscribe([:deployment_groups, :app_deployments]) + Livebook.Teams.Broadcasts.subscribe([:deployment_groups, :app_deployments, :agents]) :ok end @@ -332,7 +332,13 @@ defmodule Livebook.Hubs.TeamClientTest do agent_keys: [] } - app_deployment = build(:app_deployment, file: nil, deployment_group_id: deployment_group.id) + app_deployment = + build(:app_deployment, + hub_id: team.id, + file: nil, + deployment_group_id: deployment_group.id + ) + {seconds, 0} = NaiveDateTime.to_gregorian_seconds(app_deployment.deployed_at) livebook_proto_app_deployment = @@ -357,6 +363,31 @@ defmodule Livebook.Hubs.TeamClientTest do assert_receive {:app_deployment_created, ^app_deployment}, 5000 assert app_deployment in TeamClient.get_app_deployments(team.id) end + + test "dispatches the agents list", %{team: team, user_connected: user_connected} do + pid = connect_to_teams(team) + agent = build(:agent, hub_id: team.id, org_id: to_string(team.org_id)) + + livebook_proto_agent = + %LivebookProto.Agent{ + id: agent.id, + name: agent.name, + org_id: agent.org_id, + deployment_group_id: agent.deployment_group_id + } + + user_connected = %{user_connected | agents: [livebook_proto_agent]} + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:agent_joined, ^agent} + assert agent in TeamClient.get_agents(team.id) + + user_connected = %{user_connected | agents: []} + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:agent_left, ^agent} + refute agent in TeamClient.get_agents(team.id) + end end describe "handle agent_connected event" do @@ -373,7 +404,8 @@ defmodule Livebook.Hubs.TeamClientTest do secrets: [], file_systems: [], deployment_groups: [], - app_deployments: [] + app_deployments: [], + agents: [] } {:ok, @@ -723,6 +755,36 @@ defmodule Livebook.Hubs.TeamClientTest do Livebook.Hubs.delete_hub(team.id) Livebook.App.close(app_pid) end + + test "dispatches the agents list", %{team: team, agent_connected: agent_connected} do + pid = connect_to_teams(team) + + # Since we're connecting as Agent, we should receive the + # `:agent_joined` event from `:agent_connected` event + assert_receive {:agent_joined, agent} + assert agent in TeamClient.get_agents(team.id) + + assert_receive {:deployment_group_created, deployment_group} + + livebook_proto_deployment_group = + %LivebookProto.DeploymentGroup{ + id: to_string(deployment_group.id), + name: deployment_group.name, + mode: to_string(deployment_group.mode), + secrets: [], + agent_keys: [] + } + + agent_connected = %{ + agent_connected + | deployment_groups: [livebook_proto_deployment_group], + agents: [] + } + + send(pid, {:event, :agent_connected, agent_connected}) + assert_receive {:agent_left, ^agent} + refute agent in TeamClient.get_agents(team.id) + end end defp connect_to_teams(%{id: id} = team) do diff --git a/test/livebook_teams/web/hub/deployment_group_test.exs b/test/livebook_teams/web/hub/deployment_group_test.exs index e22d05902..e670f79ec 100644 --- a/test/livebook_teams/web/hub/deployment_group_test.exs +++ b/test/livebook_teams/web/hub/deployment_group_test.exs @@ -8,7 +8,7 @@ defmodule LivebookWeb.Integration.Hub.DeploymentGroupTest do setup %{user: user, node: node} do Livebook.Hubs.Broadcasts.subscribe([:crud, :connection, :secrets, :file_systems]) - Livebook.Teams.Broadcasts.subscribe([:deployment_groups]) + Livebook.Teams.Broadcasts.subscribe([:deployment_groups, :agents]) hub = create_team_hub(user, node) id = hub.id @@ -212,4 +212,43 @@ defmodule LivebookWeb.Integration.Hub.DeploymentGroupTest do assert render(view) =~ "Secret DEPLOYMENT_GROUP_DELETE_SECRET deleted successfully" refute render(element(view, "#hub-deployment-group-#{id}")) =~ secret.name end + + test "shows the agent count", %{conn: conn, hub: hub} do + name = "TEAMS_EDIT_DEPLOYMENT_GROUP" + %{id: id} = insert_deployment_group(name: name, mode: :online, hub_id: hub.id) + + {:ok, view, _html} = live(conn, ~p"/hub/#{hub.id}") + + assert view + |> element("#hub-deployment-group-#{id} [aria-label=\"instances running\"]") + |> render() + |> Floki.parse_fragment!() + |> Floki.text() + |> String.trim() == "0" + + org_id = to_string(hub.org_id) + + # Simulates the agent join event + pid = Livebook.Hubs.TeamClient.get_pid(hub.id) + agent = build(:agent, hub_id: hub.id, org_id: org_id, deployment_group_id: to_string(id)) + + livebook_proto_agent = + %LivebookProto.Agent{ + id: agent.id, + name: agent.name, + org_id: agent.org_id, + deployment_group_id: agent.deployment_group_id + } + + livebook_proto_agent_joined = %LivebookProto.AgentJoined{agent: livebook_proto_agent} + send(pid, {:event, :agent_joined, livebook_proto_agent_joined}) + assert_receive {:agent_joined, ^agent} + + assert view + |> element("#hub-deployment-group-#{id} [aria-label=\"instances running\"]") + |> render() + |> Floki.parse_fragment!() + |> Floki.text() + |> String.trim() == "1" + end end diff --git a/test/support/factory.ex b/test/support/factory.ex index 80c450250..348952eb6 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -112,12 +112,23 @@ defmodule Livebook.Factory do sha: shasum, slug: slug, file: content, + hub_id: Livebook.Hubs.Personal.id(), deployment_group_id: "1", deployed_by: "Ada Lovelace", deployed_at: NaiveDateTime.truncate(deployed_at, :second) } end + def build(:agent) do + %Livebook.Teams.Agent{ + id: "agent_name-#{Livebook.Utils.random_short_id()}", + name: "agent_name", + hub_id: Livebook.Hubs.Personal.id(), + org_id: "1", + deployment_group_id: "1" + } + end + def build(factory_name, attrs) do factory_name |> build() |> struct!(attrs) end diff --git a/test/support/integration/teams_server.ex b/test/support/integration/teams_server.ex index 167136fcf..00b58fa9f 100644 --- a/test/support/integration/teams_server.ex +++ b/test/support/integration/teams_server.ex @@ -93,7 +93,10 @@ defmodule Livebook.TeamsServer do @impl true def handle_info({_port, {:data, message}}, state) do - info(message) + if Livebook.Config.boolean!("TEAMS_DEBUG", false) do + info(message) + end + {:noreply, state} end