diff --git a/lib/livebook/hubs/team_client.ex b/lib/livebook/hubs/team_client.ex index 229148907..1df36ce42 100644 --- a/lib/livebook/hubs/team_client.ex +++ b/lib/livebook/hubs/team_client.ex @@ -88,6 +88,14 @@ defmodule Livebook.Hubs.TeamClient do :exit, _ -> false end + @doc """ + Enqueues the synchronous event to be handled by the Team client. + """ + @spec handle_event(String.t(), {atom(), LivebookProto.event_proto()}) :: :ok + def handle_event(id, {topic, data}) do + GenServer.cast(registry_name(id), {:event, topic, data}) + end + ## GenServer callbacks @impl true @@ -142,11 +150,13 @@ defmodule Livebook.Hubs.TeamClient do @impl true def handle_info(:connected, state) do Hubs.Broadcasts.hub_connected(state.hub.id) + {:noreply, %{state | connected?: true, connection_error: nil}} end def handle_info({:connection_error, reason}, state) do Hubs.Broadcasts.hub_connection_failed(state.hub.id, reason) + {:noreply, %{state | connected?: false, connection_error: reason}} end @@ -163,6 +173,13 @@ defmodule Livebook.Hubs.TeamClient do {:noreply, handle_event(topic, data, state)} end + @impl true + def handle_cast({:event, topic, data}, state) do + Logger.debug("Received event #{topic} with data: #{inspect(data)}") + + {:noreply, handle_event(topic, data, state)} + end + # Private defp registry_name(id) do @@ -171,6 +188,7 @@ defmodule Livebook.Hubs.TeamClient do defp put_secret(state, secret) do state = remove_secret(state, secret) + %{state | secrets: [secret | state.secrets]} end @@ -190,6 +208,7 @@ defmodule Livebook.Hubs.TeamClient do defp put_file_system(state, file_system) do state = remove_file_system(state, file_system) + %{state | file_systems: [file_system | state.file_systems]} end diff --git a/lib/livebook/teams/requests.ex b/lib/livebook/teams/requests.ex index 4898ff5eb..1a55dfdb8 100644 --- a/lib/livebook/teams/requests.ex +++ b/lib/livebook/teams/requests.ex @@ -41,8 +41,7 @@ defmodule Livebook.Teams.Requests do @spec org_sign(Team.t(), String.t()) :: {:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()} def org_sign(team, payload) do - headers = auth_headers(team) - post("/api/v1/org/sign", %{payload: payload}, headers) + post("/api/v1/org/sign", %{payload: payload}, team) end @doc """ @@ -54,10 +53,7 @@ defmodule Livebook.Teams.Requests do secret_key = Teams.derive_key(team.teams_key) secret_value = Teams.encrypt(secret.value, secret_key) - headers = auth_headers(team) - params = %{name: secret.name, value: secret_value} - - post("/api/v1/org/secrets", params, headers) + post("/api/v1/org/secrets", %{name: secret.name, value: secret_value}, team) end @doc """ @@ -69,10 +65,7 @@ defmodule Livebook.Teams.Requests do secret_key = Teams.derive_key(team.teams_key) secret_value = Teams.encrypt(secret.value, secret_key) - headers = auth_headers(team) - params = %{name: secret.name, value: secret_value} - - put("/api/v1/org/secrets", params, headers) + put("/api/v1/org/secrets", %{name: secret.name, value: secret_value}, team) end @doc """ @@ -81,10 +74,7 @@ defmodule Livebook.Teams.Requests do @spec delete_secret(Team.t(), Secret.t()) :: {:ok, String.t()} | {:error, map() | String.t()} | {:transport_error, String.t()} def delete_secret(team, secret) do - headers = auth_headers(team) - params = %{name: secret.name} - - delete("/api/v1/org/secrets", params, headers) + delete("/api/v1/org/secrets", %{name: secret.name}, team) end @doc """ @@ -94,7 +84,6 @@ defmodule Livebook.Teams.Requests do {:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()} def create_file_system(team, file_system) do secret_key = Teams.derive_key(team.teams_key) - headers = auth_headers(team) type = FileSystems.type(file_system) %{name: name} = FileSystem.external_metadata(file_system) @@ -107,7 +96,7 @@ defmodule Livebook.Teams.Requests do value: Teams.encrypt(json, secret_key) } - post("/api/v1/org/file-systems", params, headers) + post("/api/v1/org/file-systems", params, team) end @doc """ @@ -117,7 +106,6 @@ defmodule Livebook.Teams.Requests do {:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()} def update_file_system(team, file_system) do secret_key = Teams.derive_key(team.teams_key) - headers = auth_headers(team) type = FileSystems.type(file_system) %{name: name} = FileSystem.external_metadata(file_system) @@ -131,7 +119,7 @@ defmodule Livebook.Teams.Requests do value: Teams.encrypt(json, secret_key) } - put("/api/v1/org/file-systems", params, headers) + put("/api/v1/org/file-systems", params, team) end @doc """ @@ -140,10 +128,7 @@ defmodule Livebook.Teams.Requests do @spec delete_file_system(Team.t(), FileSystem.t()) :: {:ok, String.t()} | {:error, map() | String.t()} | {:transport_error, String.t()} def delete_file_system(team, file_system) do - headers = auth_headers(team) - params = %{id: file_system.external_id} - - delete("/api/v1/org/file-systems", params, headers) + delete("/api/v1/org/file-systems", %{id: file_system.external_id}, team) end @doc """ @@ -152,10 +137,8 @@ defmodule Livebook.Teams.Requests do @spec create_deployment_group(Team.t(), DeploymentGroup.t()) :: {:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()} def create_deployment_group(team, deployment_group) do - headers = auth_headers(team) params = %{name: deployment_group.name, mode: deployment_group.mode} - - post("/api/v1/org/deployment-groups", params, headers) + post("/api/v1/org/deployment-groups", params, team) end @doc """ @@ -164,10 +147,8 @@ defmodule Livebook.Teams.Requests do @spec update_deployment_group(Team.t(), DeploymentGroup.t()) :: {:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()} def update_deployment_group(team, deployment_group) do - headers = auth_headers(team) params = %{id: deployment_group.id, name: deployment_group.name, mode: deployment_group.mode} - - put("/api/v1/org/deployment-groups", params, headers) + put("/api/v1/org/deployment-groups", params, team) end @doc """ @@ -176,10 +157,7 @@ defmodule Livebook.Teams.Requests do @spec delete_deployment_group(Team.t(), DeploymentGroup.t()) :: {:ok, String.t()} | {:error, map() | String.t()} | {:transport_error, String.t()} def delete_deployment_group(team, deployment_group) do - headers = auth_headers(team) - params = %{id: deployment_group.id} - - delete("/api/v1/org/deployment-groups", params, headers) + delete("/api/v1/org/deployment-groups", %{id: deployment_group.id}, team) end @doc """ @@ -210,26 +188,32 @@ defmodule Livebook.Teams.Requests do ] end - defp post(path, json, headers \\ []) do + defp post(path, json, team \\ nil) do body = {"application/json", Jason.encode!(json)} - request(:post, path, body: body, headers: headers) + + request(:post, path, body: body, headers: auth_headers(team)) + |> dispatch_messages(team) end - defp put(path, json, headers) do + defp put(path, json, team) do body = {"application/json", Jason.encode!(json)} - request(:put, path, body: body, headers: headers) + + request(:put, path, body: body, headers: auth_headers(team)) + |> dispatch_messages(team) end - defp delete(path, json, headers) do + defp delete(path, json, team) do body = {"application/json", Jason.encode!(json)} - request(:delete, path, body: body, headers: headers) + + request(:delete, path, body: body, headers: auth_headers(team)) + |> dispatch_messages(team) end - defp get(path, params \\ %{}, headers \\ []) do + defp get(path, params \\ %{}) do query_string = URI.encode_query(params) path = if query_string != "", do: "#{path}?#{query_string}", else: path - request(:get, path, headers: headers) + request(:get, path, headers: []) end defp request(method, path, opts) do @@ -256,6 +240,23 @@ defmodule Livebook.Teams.Requests do end end + defp dispatch_messages({:ok, %{"messages" => _} = body}, team) do + {messages, body} = Map.pop!(body, "messages") + + for message <- messages do + %{type: event} = + message + |> Base.url_decode64!(padding: false) + |> LivebookProto.Event.decode() + + Livebook.Hubs.TeamClient.handle_event(team.id, event) + end + + {:ok, body} + end + + defp dispatch_messages(result, _), do: result + defp json?(headers) do HTTP.fetch_content_type(headers) == {:ok, "application/json"} end