From 14dd6d925fa49c0c0ae6e4837542895b68124d98 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Thu, 1 Jun 2023 12:01:43 -0300 Subject: [PATCH] Implement TeamClient with Teams WebSocket communication (#1951) --- lib/livebook/hubs/broadcasts.ex | 20 +- lib/livebook/hubs/team.ex | 15 +- lib/livebook/hubs/team_client.ex | 104 ++++++++++ lib/livebook/teams.ex | 8 +- lib/livebook/teams/connection.ex | 110 +++++++++++ lib/livebook/teams/{client.ex => http.ex} | 2 +- .../client.ex => teams/web_socket.ex} | 12 +- lib/livebook/web_socket/client_connection.ex | 187 ------------------ lib/livebook_web/live/hooks/sidebar_hook.ex | 9 +- lib/livebook_web/live/layout_helpers.ex | 12 +- proto/lib/livebook_proto.ex | 66 ++----- .../lib/livebook_proto/changeset_error.pb.ex | 6 - .../create_secret_request.pb.ex | 7 - .../create_secret_response.pb.ex | 4 - proto/lib/livebook_proto/event.pb.ex | 8 +- proto/lib/livebook_proto/field_error.pb.ex | 7 - .../livebook_proto/handshake_request.pb.ex | 6 - .../livebook_proto/handshake_response.pb.ex | 8 - proto/lib/livebook_proto/request.pb.ex | 14 -- proto/lib/livebook_proto/response.pb.ex | 16 -- proto/lib/livebook_proto/user.pb.ex | 7 - .../livebook_proto/user_synchronized.pb.ex | 5 +- proto/messages.proto | 70 +------ test/livebook/hubs/team_client_test.exs | 54 +++++ test/livebook/hubs_test.exs | 2 +- test/livebook/live_markdown/export_test.exs | 2 +- test/livebook/teams/connection_test.exs | 44 +++++ test/livebook_web/live/home_live_test.exs | 6 +- test/livebook_web/live/hub/edit_live_test.exs | 4 +- .../live/integration/session_live_test.exs | 50 +++++ test/livebook_web/live/session_live_test.exs | 20 -- test/support/factory.ex | 2 +- test/support/integration/teams_server.ex | 8 +- 33 files changed, 457 insertions(+), 438 deletions(-) create mode 100644 lib/livebook/hubs/team_client.ex create mode 100644 lib/livebook/teams/connection.ex rename lib/livebook/teams/{client.ex => http.ex} (98%) rename lib/livebook/{web_socket/client.ex => teams/web_socket.ex} (96%) delete mode 100644 lib/livebook/web_socket/client_connection.ex delete mode 100644 proto/lib/livebook_proto/changeset_error.pb.ex delete mode 100644 proto/lib/livebook_proto/create_secret_request.pb.ex delete mode 100644 proto/lib/livebook_proto/create_secret_response.pb.ex delete mode 100644 proto/lib/livebook_proto/field_error.pb.ex delete mode 100644 proto/lib/livebook_proto/handshake_request.pb.ex delete mode 100644 proto/lib/livebook_proto/handshake_response.pb.ex delete mode 100644 proto/lib/livebook_proto/request.pb.ex delete mode 100644 proto/lib/livebook_proto/response.pb.ex delete mode 100644 proto/lib/livebook_proto/user.pb.ex create mode 100644 test/livebook/hubs/team_client_test.exs create mode 100644 test/livebook/teams/connection_test.exs create mode 100644 test/livebook_web/live/integration/session_live_test.exs diff --git a/lib/livebook/hubs/broadcasts.ex b/lib/livebook/hubs/broadcasts.ex index aab834c04..4dde4c05f 100644 --- a/lib/livebook/hubs/broadcasts.ex +++ b/lib/livebook/hubs/broadcasts.ex @@ -10,7 +10,7 @@ defmodule Livebook.Hubs.Broadcasts do @secrets_topic "hubs:secrets" @doc """ - Broadcasts under `hubs:crud` topic when hubs changed. + Broadcasts under `#{@crud_topic}` topic when hubs changed. """ @spec hub_changed() :: broadcast() def hub_changed do @@ -18,7 +18,7 @@ defmodule Livebook.Hubs.Broadcasts do end @doc """ - Broadcasts under `hubs:connection` topic when hub connected. + Broadcasts under `#{@connection_topic}` topic when hub connected. """ @spec hub_connected() :: broadcast() def hub_connected do @@ -26,15 +26,15 @@ defmodule Livebook.Hubs.Broadcasts do end @doc """ - Broadcasts under `hubs:connection` topic when hub disconnected. + Broadcasts under `#{@connection_topic}` topic when hub is out-of-date. """ - @spec hub_disconnected() :: broadcast() - def hub_disconnected do - broadcast(@connection_topic, :hub_disconnected) + @spec hub_server_error(String.t()) :: broadcast() + def hub_server_error(reason) when is_binary(reason) do + broadcast(@connection_topic, {:hub_server_error, reason}) end @doc """ - Broadcasts under `hubs:connection` topic when hub received a connection error. + Broadcasts under `#{@connection_topic}` topic when hub received a connection error. """ @spec hub_connection_failed(String.t()) :: broadcast() def hub_connection_failed(reason) when is_binary(reason) do @@ -42,7 +42,7 @@ defmodule Livebook.Hubs.Broadcasts do end @doc """ - Broadcasts under `hubs:secrets` topic when hub received a new secret. + Broadcasts under `#{@secrets_topic}` topic when hub received a new secret. """ @spec secret_created(Secret.t()) :: broadcast() def secret_created(%Secret{} = secret) do @@ -50,7 +50,7 @@ defmodule Livebook.Hubs.Broadcasts do end @doc """ - Broadcasts under `hubs:secrets` topic when hub received an updated secret. + Broadcasts under `#{@secrets_topic}` topic when hub received an updated secret. """ @spec secret_updated(Secret.t()) :: broadcast() def secret_updated(%Secret{} = secret) do @@ -58,7 +58,7 @@ defmodule Livebook.Hubs.Broadcasts do end @doc """ - Broadcasts under `hubs:secrets` topic when hub received a deleted secret. + Broadcasts under `#{@secrets_topic}` topic when hub received a deleted secret. """ @spec secret_deleted(Secret.t()) :: broadcast() def secret_deleted(%Secret{} = secret) do diff --git a/lib/livebook/hubs/team.ex b/lib/livebook/hubs/team.ex index 3fa2668ce..4e3a4163d 100644 --- a/lib/livebook/hubs/team.ex +++ b/lib/livebook/hubs/team.ex @@ -60,6 +60,8 @@ defmodule Livebook.Hubs.Team do end defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Team do + alias Livebook.Hubs.TeamClient + def load(team, fields) do %{ team @@ -80,17 +82,17 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Team do name: team.hub_name, provider: team, emoji: team.hub_emoji, - connected?: false + connected?: TeamClient.connected?(team.id) } end def type(_team), do: "team" - def connection_spec(_team), do: nil + def connection_spec(team), do: {TeamClient, team} - def disconnect(_team), do: raise("not implemented") + def disconnect(team), do: TeamClient.stop(team.id) - def capabilities(_team), do: [] + def capabilities(_team), do: ~w(connect)a def get_secrets(_team), do: [] @@ -100,7 +102,10 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Team do def delete_secret(_team, _secret), do: :ok - def connection_error(_team), do: raise("not implemented") + def connection_error(team) do + reason = TeamClient.get_connection_error(team.id) + "Cannot connect to Hub: #{reason}. Will attempt to reconnect automatically..." + end def notebook_stamp(_hub, _notebook_source, _metadata) do :skip diff --git a/lib/livebook/hubs/team_client.ex b/lib/livebook/hubs/team_client.ex new file mode 100644 index 000000000..4d19be2d4 --- /dev/null +++ b/lib/livebook/hubs/team_client.ex @@ -0,0 +1,104 @@ +defmodule Livebook.Hubs.TeamClient do + @moduledoc false + use GenServer + require Logger + + alias Livebook.Hubs.Broadcasts + alias Livebook.Hubs.Team + alias Livebook.Teams.Connection + + @registry Livebook.HubsRegistry + @supervisor Livebook.HubsSupervisor + + defstruct [:hub, :connection_error, connected?: false, secrets: []] + + @type registry_name :: {:via, Registry, {Livebook.HubsRegistry, String.t()}} + + @doc """ + Connects the Team client with WebSocket server. + """ + @spec start_link(Team.t()) :: GenServer.on_start() + def start_link(%Team{} = team) do + GenServer.start_link(__MODULE__, team, name: registry_name(team.id)) + end + + @doc """ + Stops the WebSocket server. + """ + @spec stop(String.t()) :: :ok + def stop(id) do + if pid = GenServer.whereis(registry_name(id)) do + DynamicSupervisor.terminate_child(@supervisor, pid) + end + + :ok + end + + @doc """ + Returns the latest error from connection. + """ + @spec get_connection_error(String.t()) :: String.t() | nil + def get_connection_error(id) do + GenServer.call(registry_name(id), :get_connection_error) + catch + :exit, _ -> "connection refused" + end + + @doc """ + Returns if the Team client is connected. + """ + @spec connected?(String.t()) :: boolean() + def connected?(id) do + GenServer.call(registry_name(id), :connected?) + catch + :exit, _ -> false + end + + ## GenServer callbacks + + @impl true + def init(%Team{} = team) do + header = [ + {"x-user", to_string(team.user_id)}, + {"x-org", to_string(team.org_id)}, + {"x-org-key", to_string(team.org_key_id)}, + {"x-session-token", team.session_token} + ] + + {:ok, _pid} = Connection.start_link(self(), header) + {:ok, %__MODULE__{hub: team}} + end + + @impl true + def handle_call(:get_connection_error, _caller, state) do + {:reply, state.connection_error, state} + end + + def handle_call(:connected?, _caller, state) do + {:reply, state.connected?, state} + end + + @impl true + def handle_info(:connected, state) do + Broadcasts.hub_connected() + {:noreply, %{state | connected?: true, connection_error: nil}} + end + + def handle_info({:connection_error, reason}, state) do + Broadcasts.hub_connection_failed(reason) + {:noreply, %{state | connected?: false, connection_error: reason}} + end + + def handle_info({:server_error, reason}, state) do + Broadcasts.hub_server_error("#{state.hub.hub_name}: #{reason}") + :ok = Livebook.Hubs.delete_hub(state.hub.id) + + {:noreply, %{state | connected?: false}} + end + + # Private + + defp registry_name(id) do + {:via, Registry, {@registry, id}} + end +end diff --git a/lib/livebook/teams.ex b/lib/livebook/teams.ex index 0856e4c95..752ba16eb 100644 --- a/lib/livebook/teams.ex +++ b/lib/livebook/teams.ex @@ -3,7 +3,7 @@ defmodule Livebook.Teams do alias Livebook.Hubs alias Livebook.Hubs.Team - alias Livebook.Teams.{Client, Org} + alias Livebook.Teams.{HTTP, Org} import Ecto.Changeset, only: [add_error: 3, apply_action: 2, apply_action!: 2, get_field: 2] @@ -18,7 +18,7 @@ defmodule Livebook.Teams do | {:error, Ecto.Changeset.t()} | {:transport_error, String.t()} def create_org(%Org{} = org, attrs) do - create_org_request(org, attrs, &Client.create_org/1) + create_org_request(org, attrs, &HTTP.create_org/1) end @doc """ @@ -32,7 +32,7 @@ defmodule Livebook.Teams do | {:error, Ecto.Changeset.t()} | {:transport_error, String.t()} def join_org(%Org{} = org, attrs) do - create_org_request(org, attrs, &Client.join_org/1) + create_org_request(org, attrs, &HTTP.join_org/1) end defp create_org_request(%Org{} = org, attrs, callback) when is_function(callback, 1) do @@ -74,7 +74,7 @@ defmodule Livebook.Teams do | {:error, :expired} | {:transport_error, String.t()} def get_org_request_completion_data(%Org{id: id}, device_code) do - case Client.get_org_request_completion_data(id, device_code) do + case HTTP.get_org_request_completion_data(id, device_code) do {:ok, %{"status" => "awaiting_confirmation"}} -> {:ok, :awaiting_confirmation} {:ok, completion_data} -> {:ok, completion_data} {:error, %{"status" => "expired"}} -> {:error, :expired} diff --git a/lib/livebook/teams/connection.ex b/lib/livebook/teams/connection.ex new file mode 100644 index 000000000..cc8200573 --- /dev/null +++ b/lib/livebook/teams/connection.ex @@ -0,0 +1,110 @@ +defmodule Livebook.Teams.Connection do + @moduledoc false + + @behaviour :gen_statem + + require Logger + + alias Livebook.WebSocket + alias Livebook.Teams.WebSocket + + @backoff 5_000 + @no_state :no_state + @loop_ping_delay 5_000 + + defstruct [:listener, :headers, :http_conn, :websocket, :ref] + + @doc """ + Starts a new WebSocket connection with given headers. + """ + @spec start_link(pid(), Mint.Types.headers()) :: :gen_statem.start_ret() + def start_link(listener, headers \\ []) do + :gen_statem.start_link(__MODULE__, {listener, headers}, []) + end + + ## gen_statem callbacks + + @impl true + def callback_mode(), do: :handle_event_function + + @impl true + def init({listener, headers}) do + data = %__MODULE__{listener: listener, headers: headers} + {:ok, @no_state, data, {:next_event, :internal, :connect}} + end + + @impl true + def handle_event(event_type, event_data, state, data) + + def handle_event(:internal, :connect, @no_state, %__MODULE__{} = data) do + case WebSocket.connect(data.headers) do + {:ok, conn, websocket, ref} -> + send(data.listener, :connected) + send(self(), {:loop_ping, ref}) + + {:keep_state, %__MODULE__{data | http_conn: conn, ref: ref, websocket: websocket}} + + {:transport_error, reason} -> + send(data.listener, {:connection_error, reason}) + {:keep_state_and_data, {{:timeout, :backoff}, @backoff, nil}} + + {:server_error, error} -> + reason = LivebookProto.Error.decode(error).details + send(data.listener, {:server_error, reason}) + + {:keep_state_and_data, {{:timeout, :reconnect}, @backoff, nil}} + end + end + + def handle_event({:timeout, :backoff}, nil, _state, _data) do + {:keep_state_and_data, {:next_event, :internal, :connect}} + end + + def handle_event({:timeout, :reconnect}, nil, _state, _data) do + {:keep_state_and_data, {:next_event, :internal, :connect}} + end + + def handle_event(:info, {:loop_ping, ref}, @no_state, %__MODULE__{ref: ref} = data) do + case WebSocket.send(data.http_conn, data.websocket, data.ref, :ping) do + {:ok, conn, websocket} -> + Process.send_after(self(), {:loop_ping, data.ref}, @loop_ping_delay) + {:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}} + + {:error, conn, websocket, _reason} -> + {:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}} + end + end + + def handle_event(:info, {:loop_ping, _another_ref}, @no_state, _data) do + :keep_state_and_data + end + + def handle_event(:info, {:tcp_closed, _port} = message, @no_state, %__MODULE__{} = data) do + handle_websocket_message(message, data) + end + + def handle_event(:info, {:tcp, _port, _data} = message, @no_state, %__MODULE__{} = data) do + handle_websocket_message(message, data) + end + + def handle_event(:info, _message, @no_state, _data) do + :keep_state_and_data + end + + # Private + + defp handle_websocket_message(message, %__MODULE__{} = data) do + case WebSocket.receive(data.http_conn, data.ref, data.websocket, message) do + {:ok, conn, websocket, _binaries} -> + data = %__MODULE__{data | http_conn: conn, websocket: websocket} + + {:keep_state, data} + + {:server_error, conn, websocket, reason} -> + send(data.listener, {:connection_error, reason}) + data = %__MODULE__{data | http_conn: conn, websocket: websocket} + + {:keep_state, data, {:next_event, :internal, :connect}} + end + end +end diff --git a/lib/livebook/teams/client.ex b/lib/livebook/teams/http.ex similarity index 98% rename from lib/livebook/teams/client.ex rename to lib/livebook/teams/http.ex index 6974bfce1..20e74ba99 100644 --- a/lib/livebook/teams/client.ex +++ b/lib/livebook/teams/http.ex @@ -1,4 +1,4 @@ -defmodule Livebook.Teams.Client do +defmodule Livebook.Teams.HTTP do @moduledoc false alias Livebook.Teams.Org diff --git a/lib/livebook/web_socket/client.ex b/lib/livebook/teams/web_socket.ex similarity index 96% rename from lib/livebook/web_socket/client.ex rename to lib/livebook/teams/web_socket.ex index 4096acefb..87bc462bc 100644 --- a/lib/livebook/web_socket/client.ex +++ b/lib/livebook/teams/web_socket.ex @@ -1,9 +1,9 @@ -defmodule Livebook.WebSocket.Client do +defmodule Livebook.Teams.WebSocket do @moduledoc false alias Mint.WebSocket.UpgradeFailureError - @ws_path "/livebook/websocket" + @ws_path "/user/websocket" @type conn :: Mint.HTTP.t() @type websocket :: Mint.WebSocket.t() @@ -13,14 +13,14 @@ defmodule Livebook.WebSocket.Client do defguard is_frame(value) when value in [:close, :ping] or elem(value, 0) == :binary @doc """ - Connects to the WebSocket server with given url and headers. + Connects to the WebSocket server with given headers. """ - @spec connect(String.t(), list({String.t(), String.t()})) :: + @spec connect(list({String.t(), String.t()})) :: {:ok, conn(), websocket(), ref()} | {:transport_error, String.t()} | {:server_error, String.t()} - def connect(url, headers \\ []) do - uri = URI.parse(url) + def connect(headers \\ []) do + uri = URI.parse(Livebook.Config.teams_url()) {http_scheme, ws_scheme} = parse_scheme(uri) state = %{status: nil, headers: [], body: []} diff --git a/lib/livebook/web_socket/client_connection.ex b/lib/livebook/web_socket/client_connection.ex deleted file mode 100644 index f1d49bccc..000000000 --- a/lib/livebook/web_socket/client_connection.ex +++ /dev/null @@ -1,187 +0,0 @@ -defmodule Livebook.WebSocket.ClientConnection do - @moduledoc false - - @behaviour :gen_statem - - require Logger - - alias Livebook.WebSocket - alias Livebook.WebSocket.Client - - @timeout 10_000 - @backoff 5_000 - @no_state :no_state - @loop_ping_delay 5_000 - - defstruct [:url, :listener, :headers, :http_conn, :websocket, :ref, id: 0, reply: %{}] - - @doc """ - Starts a new WebSocket connection with given URL and headers. - """ - @spec start_link(pid(), String.t(), Mint.Types.headers()) :: :gen_statem.start_ret() - def start_link(listener, url, headers \\ []) do - :gen_statem.start_link(__MODULE__, {listener, url, headers}, []) - end - - @doc """ - Sends a Request to given WebSocket Server. - """ - @spec send_request(pid(), WebSocket.proto()) :: {atom(), term()} - def send_request(conn, %_struct{} = data) do - :gen_statem.call(conn, {:request, data}, @timeout) - end - - ## gen_statem callbacks - - @impl true - def callback_mode, do: :handle_event_function - - @impl true - def init({listener, url, headers}) do - data = %__MODULE__{listener: listener, url: url, headers: headers} - {:ok, @no_state, data, {:next_event, :internal, :connect}} - end - - @impl true - def handle_event(event_type, event_data, state, data) - - def handle_event(:internal, :connect, @no_state, %__MODULE__{} = data) do - case Client.connect(data.url, data.headers) do - {:ok, conn, websocket, ref} -> - send(data.listener, {:connect, :ok, :connected}) - send(self(), {:loop_ping, ref}) - - {:keep_state, %__MODULE__{data | http_conn: conn, ref: ref, websocket: websocket}} - - {:transport_error, reason} -> - send(data.listener, {:connect, :error, reason}) - {:keep_state_and_data, {{:timeout, :backoff}, @backoff, nil}} - - {:server_error, binary} -> - {:response, %{type: {:error, error}}} = decode_response_or_event(binary) - send(data.listener, {:connect, :error, error.details}) - - {:keep_state_and_data, {{:timeout, :reconnect}, @backoff, nil}} - end - end - - def handle_event({:timeout, :backoff}, nil, _state, _data) do - {:keep_state_and_data, {:next_event, :internal, :connect}} - end - - def handle_event({:timeout, :reconnect}, nil, _state, _data) do - {:keep_state_and_data, {:next_event, :internal, :connect}} - end - - def handle_event({:call, from}, {:request, request}, @no_state, %__MODULE__{id: id} = data) do - frame = LivebookProto.build_request_frame(request, id) - reply = Map.put(data.reply, id, from) - - case Client.send(data.http_conn, data.websocket, data.ref, frame) do - {:ok, conn, websocket} -> - data = %__MODULE__{data | http_conn: conn, websocket: websocket, id: id + 1, reply: reply} - {:keep_state, data} - - {:error, conn, websocket, reason} -> - data = %__MODULE__{data | http_conn: conn, websocket: websocket} - {:keep_state, data, {:reply, from, {:error, reason}}} - end - end - - def handle_event(:info, {:loop_ping, ref}, @no_state, %__MODULE__{} = data) - when ref == data.ref and is_reference(ref) do - case Client.send(data.http_conn, data.websocket, data.ref, :ping) do - {:ok, conn, websocket} -> - Process.send_after(self(), {:loop_ping, data.ref}, @loop_ping_delay) - {:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}} - - {:error, conn, websocket, _reason} -> - {:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}} - end - end - - def handle_event(:info, {:loop_ping, _another_ref}, @no_state, _data) do - :keep_state_and_data - end - - def handle_event(:info, {:tcp_closed, _port} = message, @no_state, %__MODULE__{} = data) do - handle_websocket_message(message, data) - end - - def handle_event(:info, {:tcp, _port, _data} = message, @no_state, %__MODULE__{} = data) do - handle_websocket_message(message, data) - end - - def handle_event(:info, _message, @no_state, _data) do - :keep_state_and_data - end - - # Private - - defp handle_websocket_message(message, %__MODULE__{} = data) do - case Client.receive(data.http_conn, data.ref, data.websocket, message) do - {:ok, conn, websocket, binaries} -> - data = %__MODULE__{data | http_conn: conn, websocket: websocket} - data = send_received(binaries, data) - {:keep_state, data} - - {:server_error, conn, websocket, reason} -> - send(data.listener, {:connect, :error, reason}) - data = %__MODULE__{data | http_conn: conn, websocket: websocket} - {:keep_state, data, {:next_event, :internal, :connect}} - end - end - - defp send_received([], data), do: data - - defp send_received([_ | _] = binaries, data) do - for binary <- binaries, reduce: data do - acc -> - case decode_response_or_event(binary) do - {:response, %{id: -1, type: {:error, %{details: reason}}}} -> - reply_to_all({:error, reason}, acc) - - {:response, %{id: id, type: {:error, %{details: reason}}}} -> - reply_to_id(id, {:error, reason}, acc) - - {:response, %{id: id, type: {:changeset, %{errors: field_errors}}}} -> - reply_to_id(id, {:changeset_error, to_changeset_errors(field_errors)}, acc) - - {:response, %{id: id, type: result}} -> - reply_to_id(id, result, acc) - - {:event, %{type: {name, data}}} -> - send(acc.listener, {:event, name, data}) - acc - end - end - end - - defp to_changeset_errors(field_errors) do - for %{field: field, details: errors} <- field_errors, into: %{} do - {String.to_atom(field), errors} - end - end - - defp reply_to_all(message, %__MODULE__{} = data) do - for {_id, caller} <- data.reply do - :gen_statem.reply(caller, message) - end - - data - end - - defp reply_to_id(id, message, %__MODULE__{} = data) do - {caller, reply} = Map.pop(data.reply, id) - if caller, do: :gen_statem.reply(caller, message) - - %__MODULE__{data | reply: reply} - end - - defp decode_response_or_event(data) do - case LivebookProto.Response.decode(data) do - %{type: nil} -> {:event, LivebookProto.Event.decode(data)} - response -> {:response, response} - end - end -end diff --git a/lib/livebook_web/live/hooks/sidebar_hook.ex b/lib/livebook_web/live/hooks/sidebar_hook.ex index f217f60b5..9950e7929 100644 --- a/lib/livebook_web/live/hooks/sidebar_hook.ex +++ b/lib/livebook_web/live/hooks/sidebar_hook.ex @@ -25,7 +25,7 @@ defmodule LivebookWeb.SidebarHook do {:halt, put_flash(socket, :info, "Livebook is shutting down. You can close this page.")} end - @connection_events ~w(hub_connected hub_disconnected hub_changed)a + @connection_events ~w(hub_connected hub_changed)a defp handle_info(event, socket) when event in @connection_events do {:cont, assign(socket, saved_hubs: Livebook.Hubs.get_metadatas())} @@ -35,6 +35,13 @@ defmodule LivebookWeb.SidebarHook do {:cont, assign(socket, saved_hubs: Livebook.Hubs.get_metadatas())} end + defp handle_info({:hub_server_error, error}, socket) do + {:cont, + socket + |> assign(saved_hubs: Livebook.Hubs.get_metadatas()) + |> put_flash(:error, error)} + end + defp handle_info(_event, socket), do: {:cont, socket} defp handle_event("shutdown", _params, socket) do diff --git a/lib/livebook_web/live/layout_helpers.ex b/lib/livebook_web/live/layout_helpers.ex index ee611ffd1..a06242cc4 100644 --- a/lib/livebook_web/live/layout_helpers.ex +++ b/lib/livebook_web/live/layout_helpers.ex @@ -185,7 +185,7 @@ defmodule LivebookWeb.LayoutHelpers do defp sidebar_hub_link_with_tooltip(assigns) do ~H""" - <.link {hub_connection_link_opts(@hub.provider, @to, @current)}> + <.link {hub_connection_link_opts(@hub, @to, @current)}>
<%= @hub.emoji %> @@ -238,10 +238,14 @@ defmodule LivebookWeb.LayoutHelpers do class = "h-7 flex items-center hover:text-white #{text_color} border-l-4 #{border_color} hover:border-white" - if tooltip = Provider.connection_error(hub) do - [navigate: to, data_tooltip: tooltip, class: "tooltip right " <> class] - else + if hub.connected? do [navigate: to, class: class] + else + [ + navigate: to, + data_tooltip: Provider.connection_error(hub.provider), + class: "tooltip right " <> class + ] end end diff --git a/proto/lib/livebook_proto.ex b/proto/lib/livebook_proto.ex index 76ef26dcf..55aa145f2 100644 --- a/proto/lib/livebook_proto.ex +++ b/proto/lib/livebook_proto.ex @@ -2,59 +2,31 @@ defmodule LivebookProto do @moduledoc false alias LivebookProto.{ - CreateSecretRequest, - CreateSecretResponse, - HandshakeRequest, - HandshakeResponse, - Request, - Response + Event, + SecretCreated, + SecretUpdated, + SecretDeleted, + UserSynchronized } - @request_mapping (for {_id, field_prop} <- Request.__message_props__().field_props, - into: %{} do - {field_prop.type, field_prop.name_atom} - end) + @event_mapping (for {_id, field_prop} <- Event.__message_props__().field_props, + into: %{} do + {field_prop.type, field_prop.name_atom} + end) - @response_mapping (for {_id, field_prop} <- Response.__message_props__().field_props, - into: %{} do - {field_prop.type, field_prop.name_atom} - end) - - @type request_proto :: HandshakeRequest.t() | CreateSecretRequest.t() - @type response_proto :: HandshakeResponse.t() | CreateSecretResponse.t() + @type event_proto :: + SecretCreated.t() + | SecretUpdated.t() + | SecretDeleted.t() + | UserSynchronized.t() @doc """ - Builds a request frame with given data and id. + Builds an event with given data. """ - @spec build_request_frame(request_proto(), integer()) :: {:binary, iodata()} - def build_request_frame(%struct{} = data, id \\ -1) do - type = request_type(struct) - message = Request.new!(id: id, type: {type, data}) - - {:binary, Request.encode(message)} + @spec build_event(event_proto()) :: Event.t() + def build_event(%struct{} = data) do + Event.new!(type: {event_type(struct), data}) end - @doc """ - Builds a create secret request struct. - """ - @spec build_create_secret_request(keyword()) :: CreateSecretRequest.t() - defdelegate build_create_secret_request(fields), to: CreateSecretRequest, as: :new! - - @doc """ - Builds a handshake request struct. - """ - @spec build_handshake_request(keyword()) :: HandshakeRequest.t() - defdelegate build_handshake_request(fields), to: HandshakeRequest, as: :new! - - @doc """ - Builds a response with given data and id. - """ - @spec build_response(response_proto(), integer()) :: Response.t() - def build_response(%struct{} = data, id \\ -1) do - type = response_type(struct) - Response.new!(id: id, type: {type, data}) - end - - defp request_type(module), do: Map.fetch!(@request_mapping, module) - defp response_type(module), do: Map.fetch!(@response_mapping, module) + defp event_type(module), do: Map.fetch!(@event_mapping, module) end diff --git a/proto/lib/livebook_proto/changeset_error.pb.ex b/proto/lib/livebook_proto/changeset_error.pb.ex deleted file mode 100644 index 695db505c..000000000 --- a/proto/lib/livebook_proto/changeset_error.pb.ex +++ /dev/null @@ -1,6 +0,0 @@ -defmodule LivebookProto.ChangesetError do - @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 - - field :errors, 1, repeated: true, type: LivebookProto.FieldError -end diff --git a/proto/lib/livebook_proto/create_secret_request.pb.ex b/proto/lib/livebook_proto/create_secret_request.pb.ex deleted file mode 100644 index 027c479bb..000000000 --- a/proto/lib/livebook_proto/create_secret_request.pb.ex +++ /dev/null @@ -1,7 +0,0 @@ -defmodule LivebookProto.CreateSecretRequest do - @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 - - field :name, 1, type: :string - field :value, 2, type: :string -end diff --git a/proto/lib/livebook_proto/create_secret_response.pb.ex b/proto/lib/livebook_proto/create_secret_response.pb.ex deleted file mode 100644 index 446c97f71..000000000 --- a/proto/lib/livebook_proto/create_secret_response.pb.ex +++ /dev/null @@ -1,4 +0,0 @@ -defmodule LivebookProto.CreateSecretResponse do - @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 -end diff --git a/proto/lib/livebook_proto/event.pb.ex b/proto/lib/livebook_proto/event.pb.ex index 580e5ad1a..23efc2b6f 100644 --- a/proto/lib/livebook_proto/event.pb.ex +++ b/proto/lib/livebook_proto/event.pb.ex @@ -4,22 +4,22 @@ defmodule LivebookProto.Event do oneof :type, 0 - field :secret_created, 100, + field :secret_created, 1, type: LivebookProto.SecretCreated, json_name: "secretCreated", oneof: 0 - field :secret_updated, 101, + field :secret_updated, 2, type: LivebookProto.SecretUpdated, json_name: "secretUpdated", oneof: 0 - field :secret_deleted, 102, + field :secret_deleted, 3, type: LivebookProto.SecretDeleted, json_name: "secretDeleted", oneof: 0 - field :user_synchronized, 103, + field :user_synchronized, 4, type: LivebookProto.UserSynchronized, json_name: "userSynchronized", oneof: 0 diff --git a/proto/lib/livebook_proto/field_error.pb.ex b/proto/lib/livebook_proto/field_error.pb.ex deleted file mode 100644 index 75228aff2..000000000 --- a/proto/lib/livebook_proto/field_error.pb.ex +++ /dev/null @@ -1,7 +0,0 @@ -defmodule LivebookProto.FieldError do - @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 - - field :field, 1, type: :string - field :details, 2, repeated: true, type: :string -end diff --git a/proto/lib/livebook_proto/handshake_request.pb.ex b/proto/lib/livebook_proto/handshake_request.pb.ex deleted file mode 100644 index 22d9eafc4..000000000 --- a/proto/lib/livebook_proto/handshake_request.pb.ex +++ /dev/null @@ -1,6 +0,0 @@ -defmodule LivebookProto.HandshakeRequest do - @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 - - field :app_version, 1, type: :string, json_name: "appVersion" -end diff --git a/proto/lib/livebook_proto/handshake_response.pb.ex b/proto/lib/livebook_proto/handshake_response.pb.ex deleted file mode 100644 index fc3d5b481..000000000 --- a/proto/lib/livebook_proto/handshake_response.pb.ex +++ /dev/null @@ -1,8 +0,0 @@ -defmodule LivebookProto.HandshakeResponse do - @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 - - field :id, 1, type: :string - field :name, 2, type: :string - field :user, 3, type: LivebookProto.User -end diff --git a/proto/lib/livebook_proto/request.pb.ex b/proto/lib/livebook_proto/request.pb.ex deleted file mode 100644 index edcc3c6c6..000000000 --- a/proto/lib/livebook_proto/request.pb.ex +++ /dev/null @@ -1,14 +0,0 @@ -defmodule LivebookProto.Request do - @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 - - oneof :type, 0 - - field :id, 1, type: :int32 - field :handshake, 2, type: LivebookProto.HandshakeRequest, oneof: 0 - - field :create_secret, 3, - type: LivebookProto.CreateSecretRequest, - json_name: "createSecret", - oneof: 0 -end diff --git a/proto/lib/livebook_proto/response.pb.ex b/proto/lib/livebook_proto/response.pb.ex deleted file mode 100644 index e158cd001..000000000 --- a/proto/lib/livebook_proto/response.pb.ex +++ /dev/null @@ -1,16 +0,0 @@ -defmodule LivebookProto.Response do - @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 - - oneof :type, 0 - - field :id, 1, type: :int32 - field :error, 2, type: LivebookProto.Error, oneof: 0 - field :changeset, 3, type: LivebookProto.ChangesetError, oneof: 0 - field :handshake, 4, type: LivebookProto.HandshakeResponse, oneof: 0 - - field :create_secret, 5, - type: LivebookProto.CreateSecretResponse, - json_name: "createSecret", - oneof: 0 -end diff --git a/proto/lib/livebook_proto/user.pb.ex b/proto/lib/livebook_proto/user.pb.ex deleted file mode 100644 index e9177f5bf..000000000 --- a/proto/lib/livebook_proto/user.pb.ex +++ /dev/null @@ -1,7 +0,0 @@ -defmodule LivebookProto.User do - @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 - - field :id, 1, type: :int32 - field :email, 2, type: :string -end diff --git a/proto/lib/livebook_proto/user_synchronized.pb.ex b/proto/lib/livebook_proto/user_synchronized.pb.ex index 04a7073c5..5e8d3e3e0 100644 --- a/proto/lib/livebook_proto/user_synchronized.pb.ex +++ b/proto/lib/livebook_proto/user_synchronized.pb.ex @@ -2,7 +2,6 @@ defmodule LivebookProto.UserSynchronized do @moduledoc false use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 - field :id, 1, type: :string - field :name, 2, type: :string - field :secrets, 3, repeated: true, type: LivebookProto.Secret + field :name, 1, type: :string + field :secrets, 2, repeated: true, type: LivebookProto.Secret end diff --git a/proto/messages.proto b/proto/messages.proto index 1348710f2..7a1f3c225 100644 --- a/proto/messages.proto +++ b/proto/messages.proto @@ -1,26 +1,12 @@ syntax = "proto3"; -message User { - int32 id = 1; - string email = 2; -} - -message Secret { - string name = 1; - string value = 2; -} - message Error { string details = 1; } -message FieldError { - string field = 1; - repeated string details = 2; -} - -message ChangesetError { - repeated FieldError errors = 1; +message Secret { + string name = 1; + string value = 2; } message SecretCreated { @@ -39,55 +25,15 @@ message SecretDeleted { } message UserSynchronized { - string id = 1; - string name = 2; - repeated Secret secrets = 3; -} - -message HandshakeRequest { - string app_version = 1; -} - -message HandshakeResponse { - string id = 1; - string name = 2; - User user = 3; -} - -message CreateSecretRequest { string name = 1; - string value = 2; -} - -message CreateSecretResponse { -} - -message Request { - int32 id = 1; - - oneof type { - HandshakeRequest handshake = 2; - CreateSecretRequest create_secret = 3; - } -} - -message Response { - int32 id = 1; - - oneof type { - Error error = 2; - ChangesetError changeset = 3; - - HandshakeResponse handshake = 4; - CreateSecretResponse create_secret = 5; - } + repeated Secret secrets = 2; } message Event { oneof type { - SecretCreated secret_created = 100; - SecretUpdated secret_updated = 101; - SecretDeleted secret_deleted = 102; - UserSynchronized user_synchronized = 103; + SecretCreated secret_created = 1; + SecretUpdated secret_updated = 2; + SecretDeleted secret_deleted = 3; + UserSynchronized user_synchronized = 4; } } diff --git a/test/livebook/hubs/team_client_test.exs b/test/livebook/hubs/team_client_test.exs new file mode 100644 index 000000000..72c5c5886 --- /dev/null +++ b/test/livebook/hubs/team_client_test.exs @@ -0,0 +1,54 @@ +defmodule Livebook.Hubs.TeamClientTest do + use Livebook.TeamsIntegrationCase, async: true + @moduletag :capture_log + + alias Livebook.Hubs.TeamClient + + setup do + Livebook.Hubs.subscribe([:connection]) + :ok + end + + describe "start_link/1" do + test "successfully authenticates the web socket connection", %{user: user, node: node} do + org = :erpc.call(node, Hub.Integration, :create_org, []) + org_key = :erpc.call(node, Hub.Integration, :create_org_key, [[org: org]]) + token = :erpc.call(node, Hub.Integration, :associate_user_with_org, [user, org]) + + team = + build(:team, + id: "team-#{org.name}", + hub_name: org.name, + user_id: user.id, + org_id: org.id, + org_key_id: org_key.id, + session_token: token + ) + + refute TeamClient.connected?(team.id) + + TeamClient.start_link(team) + assert_receive :hub_connected + assert TeamClient.connected?(team.id) + end + + test "rejects the web socket connection with invalid credentials", %{user: user, token: token} do + team = + build(:team, + user_id: user.id, + org_id: 123_456, + org_key_id: 123_456, + session_token: token + ) + + TeamClient.start_link(team) + + assert_receive {:hub_server_error, error} + + assert error == + "#{team.hub_name}: Your session is out-of-date. Please re-join the organization." + + refute Livebook.Hubs.hub_exists?(team.id) + end + end +end diff --git a/test/livebook/hubs_test.exs b/test/livebook/hubs_test.exs index 0e69bb531..4623cd594 100644 --- a/test/livebook/hubs_test.exs +++ b/test/livebook/hubs_test.exs @@ -1,5 +1,5 @@ defmodule Livebook.HubsTest do - use Livebook.DataCase + use Livebook.TeamsIntegrationCase, async: true alias Livebook.Hubs diff --git a/test/livebook/live_markdown/export_test.exs b/test/livebook/live_markdown/export_test.exs index 5ea1ec4bb..44fdbc68e 100644 --- a/test/livebook/live_markdown/export_test.exs +++ b/test/livebook/live_markdown/export_test.exs @@ -1122,7 +1122,7 @@ defmodule Livebook.LiveMarkdown.ExportTest do end test "persists hub id when not default" do - Livebook.Factory.insert_hub(:team, id: "team-persisted-id") + Livebook.Factory.build(:team, id: "team-persisted-id") notebook = %{ Notebook.new() diff --git a/test/livebook/teams/connection_test.exs b/test/livebook/teams/connection_test.exs new file mode 100644 index 000000000..b313005df --- /dev/null +++ b/test/livebook/teams/connection_test.exs @@ -0,0 +1,44 @@ +defmodule Livebook.Teams.ConnectionTest do + use Livebook.TeamsIntegrationCase, async: true + + @moduletag :capture_log + + alias Livebook.Teams.Connection + + describe "connect" do + test "successfully authenticates the websocket connection", %{user: user, node: node} do + org = :erpc.call(node, Hub.Integration, :create_org, []) + org_key = :erpc.call(node, Hub.Integration, :create_org_key, [[org: org]]) + token = :erpc.call(node, Hub.Integration, :associate_user_with_org, [user, org]) + + header = [ + {"x-user", to_string(user.id)}, + {"x-org", to_string(org.id)}, + {"x-org-key", to_string(org_key.id)}, + {"x-session-token", token} + ] + + assert {:ok, _conn} = Connection.start_link(self(), header) + assert_receive :connected + end + + test "rejects the websocket connection with invalid credentials", %{user: user} do + header = [ + {"x-user", to_string(user.id)}, + {"x-org", to_string(user.id)}, + {"x-org-key", to_string(user.id)}, + {"x-session-token", "foo"} + ] + + assert {:ok, _conn} = Connection.start_link(self(), header) + + assert_receive {:server_error, + "Your session is out-of-date. Please re-join the organization."} + + assert {:ok, _conn} = Connection.start_link(self(), []) + + assert_receive {:server_error, + "Invalid request. Please re-join the organization and update Livebook if the issue persists."} + end + end +end diff --git a/test/livebook_web/live/home_live_test.exs b/test/livebook_web/live/home_live_test.exs index 8782d1b6f..621ea3071 100644 --- a/test/livebook_web/live/home_live_test.exs +++ b/test/livebook_web/live/home_live_test.exs @@ -198,14 +198,14 @@ defmodule LivebookWeb.HomeLiveTest do end end - describe "hubs sidebar" do - test "render section", %{conn: conn} do + describe "hubs" do + test "renders sidebar section", %{conn: conn} do {:ok, _view, html} = live(conn, ~p"/") assert html =~ "HUBS" assert html =~ "Add Hub" end - test "render persisted hubs", %{conn: conn} do + test "renders sidebar persisted hubs", %{conn: conn} do team = insert_hub(:team, id: "team-foo-bar-id") {:ok, _view, html} = live(conn, ~p"/") diff --git a/test/livebook_web/live/hub/edit_live_test.exs b/test/livebook_web/live/hub/edit_live_test.exs index d97dbacba..ad58c928d 100644 --- a/test/livebook_web/live/hub/edit_live_test.exs +++ b/test/livebook_web/live/hub/edit_live_test.exs @@ -1,5 +1,5 @@ defmodule LivebookWeb.Hub.EditLiveTest do - use LivebookWeb.ConnCase + use LivebookWeb.ConnCase, async: true import Phoenix.LiveViewTest import Livebook.TestHelpers @@ -22,7 +22,7 @@ defmodule LivebookWeb.Hub.EditLiveTest do |> render_change(%{"personal" => attrs}) refute view - |> element("#enterprise-form .invalid-feedback") + |> element("#personal-form .invalid-feedback") |> has_element?() assert {:ok, view, _html} = diff --git a/test/livebook_web/live/integration/session_live_test.exs b/test/livebook_web/live/integration/session_live_test.exs new file mode 100644 index 000000000..155c33910 --- /dev/null +++ b/test/livebook_web/live/integration/session_live_test.exs @@ -0,0 +1,50 @@ +defmodule LivebookWeb.Integration.SessionLiveTest do + use Livebook.TeamsIntegrationCase, async: true + + import Phoenix.LiveViewTest + + alias Livebook.{Sessions, Session} + + setup do + {:ok, session} = Sessions.create_session(notebook: Livebook.Notebook.new()) + + on_exit(fn -> + Session.close(session.pid) + end) + + %{session: session} + end + + describe "hubs" do + test "selects the notebook hub", %{conn: conn, user: user, node: node, session: session} do + org = :erpc.call(node, Hub.Integration, :create_org, []) + org_key = :erpc.call(node, Hub.Integration, :create_org_key, [[org: org]]) + token = :erpc.call(node, Hub.Integration, :associate_user_with_org, [user, org]) + + hub = + insert_hub(:team, + id: "team-#{org.name}", + hub_name: org.name, + user_id: user.id, + org_id: org.id, + org_key_id: org_key.id, + session_token: token + ) + + id = hub.id + personal_id = Livebook.Hubs.Personal.id() + + Session.subscribe(session.id) + {:ok, view, _} = live(conn, ~p"/sessions/#{session.id}") + + assert Session.get_data(session.pid).notebook.hub_id == personal_id + + view + |> element(~s/#select-hub-#{id}/) + |> render_click() + + assert_receive {:operation, {:set_notebook_hub, _, ^id}} + assert Session.get_data(session.pid).notebook.hub_id == hub.id + end + end +end diff --git a/test/livebook_web/live/session_live_test.exs b/test/livebook_web/live/session_live_test.exs index 1fac408c4..d28dc010f 100644 --- a/test/livebook_web/live/session_live_test.exs +++ b/test/livebook_web/live/session_live_test.exs @@ -1481,24 +1481,4 @@ defmodule LivebookWeb.SessionLiveTest do Livebook.App.close(app.pid) end end - - describe "hubs" do - test "selects the notebook hub", %{conn: conn, session: session} do - hub = insert_hub(:team) - id = hub.id - personal_id = Livebook.Hubs.Personal.id() - - Session.subscribe(session.id) - {:ok, view, _} = live(conn, ~p"/sessions/#{session.id}") - - assert Session.get_data(session.pid).notebook.hub_id == personal_id - - view - |> element(~s/#select-hub-#{id}/) - |> render_click() - - assert_receive {:operation, {:set_notebook_hub, _, ^id}} - assert Session.get_data(session.pid).notebook.hub_id == hub.id - end - end end diff --git a/test/support/factory.ex b/test/support/factory.ex index 91169150b..c5109f574 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -24,7 +24,7 @@ defmodule Livebook.Factory do user_id: 1, org_key_id: 1, teams_key: org.teams_key, - session_token: Livebook.Utils.random_cookie() + session_token: Livebook.Utils.random_short_id() } end diff --git a/test/support/integration/teams_server.ex b/test/support/integration/teams_server.ex index ee24354e2..0ee5930a8 100644 --- a/test/support/integration/teams_server.ex +++ b/test/support/integration/teams_server.ex @@ -100,7 +100,7 @@ defmodule Livebook.TeamsServer do end def handle_info({_port, {:exit_status, status}}, _state) do - error("enterprise quit with status #{status}") + error("team quit with status #{status}") System.halt(status) end @@ -212,6 +212,10 @@ defmodule Livebook.TeamsServer do System.get_env("TEAMS_DEBUG", "false") end + defp proto do + System.get_env("TEAMS_LIVEBOOK_PROTO_PATH") + end + defp wait_on_start(state, port) do url = state.url || fetch_url(state) @@ -257,6 +261,8 @@ defmodule Livebook.TeamsServer do "DEBUG" => debug() } + env = if proto(), do: Map.merge(env, %{"LIVEBOOK_PROTO_PATH" => proto()}), else: env + if state_env do Map.merge(env, state_env) else