From 3b2ea3c8ce4286df1096f954929f1789fc17358c Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Tue, 10 Jan 2023 17:12:35 -0300 Subject: [PATCH] Update Connection life-cycle and implement `ChangesetError` message (#1633) --- lib/livebook/application.ex | 1 + lib/livebook/hubs.ex | 98 ++++++++-- lib/livebook/hubs/enterprise.ex | 5 +- lib/livebook/hubs/fly.ex | 4 +- lib/livebook/hubs/local.ex | 4 +- lib/livebook/hubs/provider.ex | 6 + lib/livebook/web_socket/client.ex | 183 +++++++++--------- lib/livebook/web_socket/server.ex | 102 ++++------ lib/livebook_web/live/hooks/sidebar_hook.ex | 33 +--- lib/livebook_web/live/session_live.ex | 12 +- .../live/session_live/secrets_component.ex | 30 +-- .../lib/livebook_proto/changeset_error.pb.ex | 6 + proto/lib/livebook_proto/field_error.pb.ex | 7 + proto/lib/livebook_proto/response.pb.ex | 5 +- proto/messages.proto | 14 +- test/livebook/hubs/enterprise_client_test.exs | 2 +- test/livebook/hubs_test.exs | 12 +- test/livebook/web_socket/server_test.exs | 13 +- .../session_live/secrets_component_test.exs | 16 +- 19 files changed, 299 insertions(+), 254 deletions(-) create mode 100644 proto/lib/livebook_proto/changeset_error.pb.ex create mode 100644 proto/lib/livebook_proto/field_error.pb.ex diff --git a/lib/livebook/application.ex b/lib/livebook/application.ex index 1519a41f3..806f3a373 100644 --- a/lib/livebook/application.ex +++ b/lib/livebook/application.ex @@ -54,6 +54,7 @@ defmodule Livebook.Application do clear_env_vars() display_startup_info() insert_development_hub() + Livebook.Hubs.connect_hubs() result {:error, error} -> diff --git a/lib/livebook/hubs.ex b/lib/livebook/hubs.ex index d2bc52a0a..d3e94e8b3 100644 --- a/lib/livebook/hubs.ex +++ b/lib/livebook/hubs.ex @@ -6,11 +6,17 @@ defmodule Livebook.Hubs do @namespace :hubs + @type connected_hub :: %{ + required(:pid) => pid(), + required(:hub) => Provider.t() + } + @type connected_hubs :: list(connected_hub()) + @doc """ Gets a list of hubs from storage. """ - @spec fetch_hubs() :: list(Provider.t()) - def fetch_hubs do + @spec get_hubs() :: list(Provider.t()) + def get_hubs do for fields <- Storage.all(@namespace) do to_struct(fields) end @@ -19,13 +25,23 @@ defmodule Livebook.Hubs do @doc """ Gets a list of metadatas from storage. """ - @spec fetch_metadatas() :: list(Metadata.t()) - def fetch_metadatas do - for hub <- fetch_hubs() do + @spec get_metadatas() :: list(Metadata.t()) + def get_metadatas do + for hub <- get_hubs() do Provider.normalize(hub) end end + @doc """ + Gets one hub from storage. + """ + @spec get_hub(String.t()) :: {:ok, Provider.t()} | :error + def get_hub(id) do + with {:ok, data} <- Storage.fetch(@namespace, id) do + {:ok, to_struct(data)} + end + end + @doc """ Gets one hub from storage. @@ -54,18 +70,29 @@ defmodule Livebook.Hubs do def save_hub(struct) do attributes = struct |> Map.from_struct() |> Map.to_list() :ok = Storage.insert(@namespace, struct.id, attributes) + :ok = connect_hub(struct) :ok = broadcast_hubs_change() + struct end @doc false def delete_hub(id) do - Storage.delete(@namespace, id) + with {:ok, hub} <- get_hub(id) do + if connected_hub = get_connected_hub(hub) do + GenServer.stop(connected_hub.pid, :shutdown) + end + + :ok = Storage.delete(@namespace, id) + :ok = broadcast_hubs_change() + end + + :ok end @doc false def clean_hubs do - for hub <- fetch_hubs(), do: delete_hub(hub.id) + for hub <- get_hubs(), do: delete_hub(hub.id) :ok end @@ -91,14 +118,10 @@ defmodule Livebook.Hubs do Phoenix.PubSub.unsubscribe(Livebook.PubSub, "hubs") end - @doc """ - Notifies interested processes about hubs data change. - - Broadcasts `{:hubs_metadata_changed, hubs}` message under the `"hubs"` topic. - """ - @spec broadcast_hubs_change() :: :ok - def broadcast_hubs_change do - Phoenix.PubSub.broadcast(Livebook.PubSub, "hubs", {:hubs_metadata_changed, fetch_metadatas()}) + # Notifies interested processes about hubs data change. + # Broadcasts `{:hubs_metadata_changed, hubs}` message under the `"hubs"` topic. + defp broadcast_hubs_change do + Phoenix.PubSub.broadcast(Livebook.PubSub, "hubs", {:hubs_metadata_changed, get_metadatas()}) end defp to_struct(%{id: "fly-" <> _} = fields) do @@ -112,4 +135,49 @@ defmodule Livebook.Hubs do defp to_struct(%{id: "local-" <> _} = fields) do Provider.load(%Local{}, fields) end + + @doc """ + Connects to the all available and connectable hubs. + + ## Example + + iex> connect_hubs() + :ok + + """ + @spec connect_hubs() :: :ok + def connect_hubs do + for hub <- get_hubs(), do: connect_hub(hub) + + :ok + end + + defp connect_hub(hub) do + if child_spec = Provider.connect(hub) do + DynamicSupervisor.start_child(Livebook.HubsSupervisor, child_spec) + end + + :ok + end + + @doc """ + Gets a list of connected hubs. + + ## Example + + iex> get_connected_hubs() + [%{pid: #PID<0.178.0>, hub: %Enterprise{}}, ...] + + """ + @spec get_connected_hubs() :: connected_hubs() + def get_connected_hubs do + for hub <- get_hubs(), connected = get_connected_hub(hub), do: connected + end + + defp get_connected_hub(hub) do + case Registry.lookup(Livebook.HubsRegistry, hub.id) do + [{pid, _}] -> %{pid: pid, hub: hub} + [] -> nil + end + end end diff --git a/lib/livebook/hubs/enterprise.ex b/lib/livebook/hubs/enterprise.ex index d4ace2e51..49358d276 100644 --- a/lib/livebook/hubs/enterprise.ex +++ b/lib/livebook/hubs/enterprise.ex @@ -126,5 +126,8 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Enterprise do } end - def type(_), do: "enterprise" + def type(_enterprise), do: "enterprise" + + def connect(%Livebook.Hubs.Enterprise{} = enterprise), + do: {Livebook.Hubs.EnterpriseClient, enterprise} end diff --git a/lib/livebook/hubs/fly.ex b/lib/livebook/hubs/fly.ex index fb7ff2314..62669d236 100644 --- a/lib/livebook/hubs/fly.ex +++ b/lib/livebook/hubs/fly.ex @@ -133,5 +133,7 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Fly do } end - def type(_), do: "fly" + def type(_fly), do: "fly" + + def connect(_fly), do: nil end diff --git a/lib/livebook/hubs/local.ex b/lib/livebook/hubs/local.ex index ff05235c8..009f61142 100644 --- a/lib/livebook/hubs/local.ex +++ b/lib/livebook/hubs/local.ex @@ -18,5 +18,7 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Local do } end - def type(_), do: "local" + def type(_local), do: "local" + + def connect(_local), do: nil end diff --git a/lib/livebook/hubs/provider.ex b/lib/livebook/hubs/provider.ex index 91014828e..3c54b92f7 100644 --- a/lib/livebook/hubs/provider.ex +++ b/lib/livebook/hubs/provider.ex @@ -18,4 +18,10 @@ defprotocol Livebook.Hubs.Provider do """ @spec type(struct()) :: String.t() def type(struct) + + @doc """ + Gets the child spec of the given struct. + """ + @spec connect(struct()) :: Supervisor.child_spec() | module() | {module(), any()} | nil + def connect(struct) end diff --git a/lib/livebook/web_socket/client.ex b/lib/livebook/web_socket/client.ex index 2cc8d739a..3a43f0fad 100644 --- a/lib/livebook/web_socket/client.ex +++ b/lib/livebook/web_socket/client.ex @@ -9,8 +9,6 @@ defmodule Livebook.WebSocket.Client do @type websocket :: Mint.WebSocket.t() @type frame :: Mint.WebSocket.frame() | Mint.WebSocket.shorthand_frame() @type ref :: Mint.Types.request_ref() - @type ws_error :: Mint.WebSocket.error() - @type mint_error :: Mint.Types.error() defmodule Response do defstruct [:status, :headers, body: []] @@ -28,17 +26,20 @@ defmodule Livebook.WebSocket.Client do Connects to the WebSocket server with given url and headers. """ @spec connect(String.t(), list({String.t(), String.t()})) :: - {:ok, conn(), ref()} - | {:error, mint_error()} - | {:error, conn(), ws_error()} + {:ok, conn(), websocket(), ref()} + | {:transport_error, String.t()} + | {:server_error, list(binary())} def connect(url, headers \\ []) do uri = URI.parse(url) http_scheme = parse_http_scheme(uri) ws_scheme = parse_ws_scheme(uri) + state = %{status: nil, headers: [], body: []} with {:ok, conn} <- Mint.HTTP.connect(http_scheme, uri.host, uri.port), {:ok, conn, ref} <- Mint.WebSocket.upgrade(ws_scheme, conn, @ws_path, headers) do - {:ok, conn, ref} + receive_upgrade(conn, ref, state) + else + {:error, exception} -> {:transport_error, Exception.message(exception)} end end @@ -48,6 +49,62 @@ defmodule Livebook.WebSocket.Client do defp parse_ws_scheme(uri) when uri.scheme in ["http", "ws"], do: :ws defp parse_ws_scheme(uri) when uri.scheme in ["https", "wss"], do: :wss + defp receive_upgrade(conn, ref, state) do + with {:ok, conn} <- Mint.HTTP.set_mode(conn, :passive), + {:ok, conn, responses} <- Mint.WebSocket.recv(conn, 0, 5_000) do + handle_upgrade_responses(responses, conn, ref, state) + else + {:error, _websocket, exception, []} -> + Mint.HTTP.close(conn) + {:transport_error, Exception.message(exception)} + end + end + + defp handle_upgrade_responses([{:status, ref, status} | responses], conn, ref, state) do + handle_upgrade_responses(responses, conn, ref, %{state | status: status}) + end + + defp handle_upgrade_responses([{:headers, ref, headers} | responses], conn, ref, state) do + handle_upgrade_responses(responses, conn, ref, %{state | headers: headers}) + end + + defp handle_upgrade_responses([{:data, ref, body} | responses], conn, ref, state) do + handle_upgrade_responses(responses, conn, ref, %{state | body: [body | state.body]}) + end + + defp handle_upgrade_responses([{:done, ref} | responses], conn, ref, state) do + case state do + %{status: 101} -> + start_websocket(conn, ref, state) + + %{body: []} -> + handle_upgrade_responses(responses, conn, ref, state) + + %{status: _} -> + Mint.HTTP.close(conn) + {:server_error, state.body |> Enum.reverse() |> IO.iodata_to_binary()} + end + end + + defp handle_upgrade_responses([], conn, ref, state) do + receive_upgrade(conn, ref, state) + end + + defp start_websocket(conn, ref, state) do + with {:ok, conn, websocket} <- Mint.WebSocket.new(conn, ref, state.status, state.headers), + {:ok, conn} <- Mint.HTTP.set_mode(conn, :active) do + {:ok, conn, websocket, ref} + else + {:error, conn, %UpgradeFailureError{}} -> + Mint.HTTP.close(conn) + {:server_error, state.body |> Enum.reverse() |> IO.iodata_to_binary()} + + {:error, conn, exception} -> + Mint.HTTP.close(conn) + {:transport_error, Exception.message(exception)} + end + end + @doc """ Disconnects from the given connection, WebSocket and reference. @@ -74,108 +131,42 @@ defmodule Livebook.WebSocket.Client do If the WebSocket isn't connected yet, it will try to get the connection response to start a new WebSocket connection. """ - @spec receive(conn() | nil, ref(), websocket() | nil, term()) :: - {:ok, conn(), websocket(), Response.t() | :connected} - | {:error, conn(), websocket(), Response.t()} - | {:error, conn(), websocket(), ws_error() | mint_error()} - | {:error, :not_connected | :unknown} - def receive(conn, ref, websocket \\ nil, message \\ receive(do: (message -> message))) do - do_receive(conn, ref, websocket, message) - end + @spec receive(conn(), ref(), websocket(), term()) :: + {:ok, conn(), websocket(), list(binary())} + | {:server_error, conn(), websocket(), String.t()} + def receive(conn, ref, websocket, message \\ receive(do: (message -> message))) do + with {:ok, conn, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(conn, message), + {:ok, websocket, frames} <- Mint.WebSocket.decode(websocket, data), + {:ok, response} <- handle_frames(frames) do + {:ok, conn, websocket, response} + else + {:close, response} -> + handle_disconnect(conn, websocket, ref, response) - defp do_receive(nil, _ref, _websocket, _message), do: {:error, :not_connected} + {:error, conn, exception} when is_exception(exception) -> + {:server_error, conn, websocket, Exception.message(exception)} - defp do_receive(conn, ref, websocket, message) do - case Mint.WebSocket.stream(conn, message) do - {:ok, conn, responses} -> - handle_responses(conn, ref, websocket, responses) - - {:error, conn, reason, []} -> - {:error, conn, websocket, reason} - - {:error, conn, _reason, responses} -> - handle_responses(conn, ref, websocket, responses) - - :unknown -> - {:error, :unknown} + {:error, conn, exception, []} when is_exception(exception) -> + {:server_error, conn, websocket, Exception.message(exception)} end end - @successful_status 100..299 - - defp handle_responses(conn, ref, websocket, [{:data, ref, data}]) do - with {:ok, websocket, frames} <- Mint.WebSocket.decode(websocket, data) do - case handle_frames(%Response{}, frames) do - {:ok, response} -> {:ok, conn, websocket, response} - {:close, response} -> handle_disconnect(conn, websocket, ref, response) - end - end - end - - defp handle_responses(conn, ref, websocket, [_ | _] = responses) do - Enum.reduce(responses, %Response{}, fn - {:status, ^ref, status}, acc -> %{acc | status: status} - {:headers, ^ref, headers}, acc -> %{acc | headers: headers} - {:data, ^ref, body}, acc -> %{acc | body: body} - {:done, ^ref}, acc -> handle_done_response(conn, ref, websocket, acc) - end) - |> case do - {:error, _conn, _websocket, %Response{body: [_ | _]}} = result -> - result - - {:error, conn, websocket, %Response{} = response} -> - {:error, conn, websocket, %{response | body: [response.body]}} - - %Response{body: [_ | _]} = response when response.status not in @successful_status -> - {:error, conn, websocket, response} - - result -> - result - end - end - - defp handle_done_response(conn, ref, websocket, response) do - case Mint.WebSocket.new(conn, ref, response.status, response.headers) do - {:ok, conn, websocket} -> - case decode_response(websocket, response) do - {websocket, {:ok, response}} -> {:ok, conn, websocket, response} - {websocket, {:close, response}} -> handle_disconnect(conn, websocket, ref, response) - {websocket, {:error, reason}} -> {:error, conn, websocket, reason} - end - - {:error, conn, %UpgradeFailureError{status_code: status, headers: headers}} -> - {:error, conn, websocket, %{response | status: status, headers: headers}} - end - end - - defp handle_disconnect(conn, websocket, ref, result) do + defp handle_disconnect(conn, websocket, ref, response) do with {:ok, conn, websocket} <- disconnect(conn, websocket, ref) do - {:ok, conn, websocket, result} + {:ok, conn, websocket, response} end end - defp decode_response(websocket, %Response{status: 101}) do - {websocket, {:ok, :connected}} - end + defp handle_frames(frames), do: handle_frames([], frames) - defp decode_response(websocket, response) do - case Mint.WebSocket.decode(websocket, response.body) do - {:ok, websocket, frames} -> - {websocket, handle_frames(response, frames)} + defp handle_frames(binaries, [{:binary, binary} | rest]), + do: handle_frames([binary | binaries], rest) - {:error, websocket, reason} -> - {websocket, {:error, reason}} - end - end + defp handle_frames(binaries, [{:close, _, _} | _]), + do: {:close, binaries} - defp handle_frames(response, [{:binary, binary} | rest]), - do: handle_frames(%{response | body: [binary | response.body]}, rest) - - defp handle_frames(response, [{:close, _, _} | _]), - do: {:close, response} - - defp handle_frames(response, [_ | rest]), do: handle_frames(response, rest) - defp handle_frames(response, []), do: {:ok, response} + defp handle_frames(binaries, [_ | rest]), do: handle_frames(binaries, rest) + defp handle_frames(binaries, []), do: {:ok, binaries} @doc """ Sends a message to the given HTTP Connection and WebSocket connection. diff --git a/lib/livebook/web_socket/server.ex b/lib/livebook/web_socket/server.ex index e1d4d6fbe..244c20453 100644 --- a/lib/livebook/web_socket/server.ex +++ b/lib/livebook/web_socket/server.ex @@ -40,20 +40,22 @@ defmodule Livebook.WebSocket.Server do @impl true def connect(_, state) do case Client.connect(state.url, state.headers) do - {:ok, conn, ref} -> - {:ok, %{state | http_conn: conn, ref: ref}} + {:ok, conn, websocket, ref} -> + send(state.listener, {:connect, :ok, :connected}) + send(self(), {:loop_ping, ref}) - {:error, exception} when is_exception(exception) -> - Logger.error("Received exception: #{Exception.message(exception)}") - send(state.listener, {:connect, :error, exception}) + {:ok, %{state | http_conn: conn, ref: ref, websocket: websocket}} + + {:transport_error, reason} -> + send(state.listener, {:connect, :error, reason}) {:backoff, @backoff, state} - {:error, conn, reason} -> - Logger.error("Received error: #{inspect(reason)}") - send(state.listener, {:connect, :error, reason}) + {:server_error, binary} -> + {:response, %{type: {:error, error}}} = decode_response_or_event(binary) + send(state.listener, {:connect, :error, error.details}) - {:backoff, @backoff, %{state | http_conn: conn}} + {:backoff, @backoff, state} end end @@ -103,39 +105,32 @@ defmodule Livebook.WebSocket.Server do def handle_info({:loop_ping, _another_ref}, state), do: {:noreply, state} - def handle_info(message, state) do - case Client.receive(state.http_conn, state.ref, state.websocket, message) do - {:ok, conn, websocket, :connected} -> - state = send_received({:ok, :connected}, state) - send(self(), {:loop_ping, state.ref}) + def handle_info({:tcp_closed, _port} = message, state), + do: handle_websocket_message(message, state) - {:noreply, %{state | http_conn: conn, websocket: websocket}} + def handle_info({:tcp, _port, _data} = message, state), + do: handle_websocket_message(message, state) - {:error, conn, websocket, %Mint.TransportError{} = reason} -> - state = send_received({:error, reason}, state) - - {:connect, :receive, %{state | http_conn: conn, websocket: websocket}} - - {term, conn, websocket, data} -> - state = send_received({term, data}, state) - - {:noreply, %{state | http_conn: conn, websocket: websocket}} - - {:error, _} = error -> - {:noreply, send_received(error, state)} - end - end + def handle_info(_message, state), do: {:noreply, state} # Private - defp send_received({:ok, :connected}, state) do - send(state.listener, {:connect, :ok, :connected}) - state + def handle_websocket_message(message, state) do + case Client.receive(state.http_conn, state.ref, state.websocket, message) do + {:ok, conn, websocket, data} -> + state = %{state | http_conn: conn, websocket: websocket} + {:noreply, send_received(data, state)} + + {:server_error, conn, websocket, reason} -> + send(state.listener, {:connect, :error, reason}) + + {:connect, :receive, %{state | http_conn: conn, websocket: websocket}} + end end - defp send_received({:ok, %Client.Response{body: [], status: nil}}, state), do: state + defp send_received([], state), do: state - defp send_received({:ok, %Client.Response{body: binaries}}, state) do + defp send_received([_ | _] = binaries, state) do for binary <- binaries, reduce: state do acc -> case decode_response_or_event(binary) do @@ -145,6 +140,9 @@ defmodule Livebook.WebSocket.Server do {:response, %{id: id, type: {:error, %{details: reason}}}} -> reply_to_id(id, {:error, reason}, acc) + {:response, %{id: id, type: {:changeset, %{errors: field_errors}}}} -> + reply_to_id(id, {:changeset_error, to_changeset_errors(field_errors)}, acc) + {:response, %{id: id, type: result}} -> reply_to_id(id, result, acc) @@ -155,39 +153,9 @@ defmodule Livebook.WebSocket.Server do end end - defp send_received({:error, :unknown}, state), do: state - - defp send_received({:error, %Mint.TransportError{} = reason}, state) do - send(state.listener, {:connect, :error, reason}) - state - end - - defp send_received({:error, %Client.Response{body: binaries, status: status}}, state) - when binaries != [] and status != nil do - for binary <- binaries do - with {:response, body} <- decode_response_or_event(binary), - %{type: {:error, %{details: reason}}} <- body do - send(state.listener, {:connect, :error, reason}) - end - end - - state - end - - defp send_received({:error, %Client.Response{body: [], status: status}}, state) - when status != nil do - reply_to_all({:error, Plug.Conn.Status.reason_phrase(status)}, state) - end - - defp send_received({:error, %Client.Response{body: binaries, status: nil}}, state) do - for binary <- binaries, - {:response, body} <- decode_response_or_event(binary), - reduce: state do - acc -> - case body do - %{id: -1, type: {:error, %{details: reason}}} -> reply_to_all({:error, reason}, acc) - %{id: id, type: {:error, %{details: reason}}} -> reply_to_id(id, {:error, reason}, acc) - end + defp to_changeset_errors(field_errors) do + for %{field: field, details: errors} <- field_errors, into: %{} do + {String.to_atom(field), errors} end end diff --git a/lib/livebook_web/live/hooks/sidebar_hook.ex b/lib/livebook_web/live/hooks/sidebar_hook.ex index 5db3241c1..d8c92212b 100644 --- a/lib/livebook_web/live/hooks/sidebar_hook.ex +++ b/lib/livebook_web/live/hooks/sidebar_hook.ex @@ -4,27 +4,22 @@ defmodule LivebookWeb.SidebarHook do import Phoenix.Component import Phoenix.LiveView - alias Livebook.Hubs.Enterprise - alias Livebook.Hubs.EnterpriseClient - def on_mount(:default, _params, _session, socket) do if connected?(socket) do Livebook.Hubs.subscribe() end - hubs = Livebook.Hubs.fetch_metadatas() - socket = socket - |> assign(saved_hubs: hubs) + |> assign(saved_hubs: Livebook.Hubs.get_metadatas()) |> attach_hook(:hubs, :handle_info, &handle_info/2) |> attach_hook(:shutdown, :handle_event, &handle_event/3) - {:cont, assign(socket, connected_hubs: connect_enterprise_hubs(hubs))} + {:cont, socket} end defp handle_info({:hubs_metadata_changed, hubs}, socket) do - {:halt, assign(socket, saved_hubs: hubs, connected_hubs: connect_enterprise_hubs(hubs))} + {:halt, assign(socket, saved_hubs: hubs)} end defp handle_info(_event, socket), do: {:cont, socket} @@ -42,26 +37,4 @@ defmodule LivebookWeb.SidebarHook do end defp handle_event(_event, _params, socket), do: {:cont, socket} - - # TODO: Move Hub connection life-cycle elsewhere - @supervisor Livebook.HubsSupervisor - @registry Livebook.HubsRegistry - - defp connect_enterprise_hubs(hubs) do - for %{provider: %Enterprise{} = enterprise} <- hubs do - pid = - case Registry.lookup(@registry, enterprise.url) do - [{pid, _}] -> - pid - - [] -> - case DynamicSupervisor.start_child(@supervisor, {EnterpriseClient, enterprise}) do - {:ok, pid} -> pid - {:error, {:already_started, pid}} -> pid - end - end - - %{hub: enterprise, pid: pid} - end - end end diff --git a/lib/livebook_web/live/session_live.ex b/lib/livebook_web/live/session_live.ex index a1df0f652..1d4a3cc06 100644 --- a/lib/livebook_web/live/session_live.ex +++ b/lib/livebook_web/live/session_live.ex @@ -8,6 +8,7 @@ defmodule LivebookWeb.SessionLive do alias Livebook.{Sessions, Session, Delta, Notebook, Runtime, LiveMarkdown, Secrets} alias Livebook.Notebook.{Cell, ContentLoader} alias Livebook.JSInterop + alias Livebook.Hubs alias Livebook.Hubs.EnterpriseClient on_mount LivebookWeb.SidebarHook @@ -63,7 +64,7 @@ defmodule LivebookWeb.SessionLive do autofocus_cell_id: autofocus_cell_id(data.notebook), page_title: get_page_title(data.notebook.name), livebook_secrets: Secrets.fetch_secrets() |> Map.new(&{&1.name, &1.value}), - enterprise_secrets: fetch_enterprise_secrets(socket), + enterprise_secrets: fetch_enterprise_secrets(), select_secret_ref: nil, select_secret_options: nil ) @@ -421,7 +422,6 @@ defmodule LivebookWeb.SessionLive do id="secrets" session={@session} secrets={@data_view.secrets} - enterprise_hubs={@connected_hubs} livebook_secrets={@livebook_secrets} prefill_secret_name={@prefill_secret_name} select_secret_ref={@select_secret_ref} @@ -1450,14 +1450,14 @@ defmodule LivebookWeb.SessionLive do def handle_info({:secret_created, %Secrets.Secret{}}, socket) do {:noreply, socket - |> assign(enterprise_secrets: fetch_enterprise_secrets(socket)) + |> assign(enterprise_secrets: fetch_enterprise_secrets()) |> put_flash(:info, "A new secret has been created on your Livebook Enterprise")} end def handle_info({:secret_updated, %Secrets.Secret{}}, socket) do {:noreply, socket - |> assign(enterprise_secrets: fetch_enterprise_secrets(socket)) + |> assign(enterprise_secrets: fetch_enterprise_secrets()) |> put_flash(:info, "An existing secret has been updated on your Livebook Enterprise")} end @@ -2298,8 +2298,8 @@ defmodule LivebookWeb.SessionLive do secret in secrets end - defp fetch_enterprise_secrets(socket) do - for connected_hub <- socket.assigns.connected_hubs, + defp fetch_enterprise_secrets do + for connected_hub <- Hubs.get_connected_hubs(), secret <- EnterpriseClient.list_cached_secrets(connected_hub.pid), into: %{}, do: {secret.name, secret.value} diff --git a/lib/livebook_web/live/session_live/secrets_component.ex b/lib/livebook_web/live/session_live/secrets_component.ex index e77567325..ff80e5ecc 100644 --- a/lib/livebook_web/live/session_live/secrets_component.ex +++ b/lib/livebook_web/live/session_live/secrets_component.ex @@ -5,7 +5,11 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do @impl true def update(assigns, socket) do - socket = assign(socket, assigns) + socket = + socket + |> assign(assigns) + |> assign(connected_hubs: Livebook.Hubs.get_connected_hubs()) + prefill_form = prefill_secret_name(socket) socket = @@ -119,16 +123,16 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do <% end %> <%= if Livebook.Config.feature_flag_enabled?(:hub) do %> <%= label class: "flex items-center gap-2 text-gray-600" do %> - <%= radio_button(f, :store, "enterprise", - disabled: @enterprise_hubs == [], - checked: @data["store"] == "enterprise" - ) %> in the Enterprise + <%= radio_button(f, :store, "hub", + disabled: @connected_hubs == [], + checked: @data["store"] == "hub" + ) %> in the Hub <% end %> - <%= if @data["store"] == "enterprise" do %> + <%= if @data["store"] == "hub" do %> <%= select( f, - :enterprise_hub, - enterprise_hubs_options(@enterprise_hubs, @data["enterprise_hub"]), + :connected_hub, + connected_hubs_options(@connected_hubs, @data["connected_hub"]), class: "input" ) %> <% end %> @@ -299,10 +303,10 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do Livebook.Session.set_secret(socket.assigns.session.pid, secret) end - defp set_secret(socket, secret, "enterprise") do - selected_hub = socket.assigns.data["enterprise_hub"] + defp set_secret(socket, secret, "hub") do + selected_hub = socket.assigns.data["connected_hub"] - if hub = Enum.find(socket.assigns.enterprise_hubs, &(&1.hub.id == selected_hub)) do + if hub = Enum.find(socket.assigns.connected_hubs, &(&1.hub.id == selected_hub)) do create_secret_request = LivebookProto.CreateSecretRequest.new!( name: secret.name, @@ -314,7 +318,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do {:error, reason} -> {:error, put_flash(socket, :error, reason)} end else - {:error, %{errors: [{"enterprise_hub", {"can't be blank", []}}]}} + {:error, %{errors: [{"connected_hub", {"can't be blank", []}}]}} end end @@ -336,7 +340,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do end # TODO: Livebook.Hubs.fetch_hubs_with_secrets_storage() - defp enterprise_hubs_options(connected_hubs, selected_hub) do + defp connected_hubs_options(connected_hubs, selected_hub) do [[key: "Select one Hub", value: "", selected: true, disabled: true]] ++ for %{hub: %{id: id, hub_name: name}} <- connected_hubs do [key: name, value: id, selected: id == selected_hub] diff --git a/proto/lib/livebook_proto/changeset_error.pb.ex b/proto/lib/livebook_proto/changeset_error.pb.ex new file mode 100644 index 000000000..695db505c --- /dev/null +++ b/proto/lib/livebook_proto/changeset_error.pb.ex @@ -0,0 +1,6 @@ +defmodule LivebookProto.ChangesetError do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 + + field :errors, 1, repeated: true, type: LivebookProto.FieldError +end diff --git a/proto/lib/livebook_proto/field_error.pb.ex b/proto/lib/livebook_proto/field_error.pb.ex new file mode 100644 index 000000000..75228aff2 --- /dev/null +++ b/proto/lib/livebook_proto/field_error.pb.ex @@ -0,0 +1,7 @@ +defmodule LivebookProto.FieldError do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 + + field :field, 1, type: :string + field :details, 2, repeated: true, type: :string +end diff --git a/proto/lib/livebook_proto/response.pb.ex b/proto/lib/livebook_proto/response.pb.ex index 0b4b6fad6..7be637b65 100644 --- a/proto/lib/livebook_proto/response.pb.ex +++ b/proto/lib/livebook_proto/response.pb.ex @@ -6,9 +6,10 @@ defmodule LivebookProto.Response do field :id, 1, type: :int32 field :error, 2, type: LivebookProto.Error, oneof: 0 - field :session, 3, type: LivebookProto.SessionResponse, oneof: 0 + field :changeset, 3, type: LivebookProto.ChangesetError, oneof: 0 + field :session, 4, type: LivebookProto.SessionResponse, oneof: 0 - field :create_secret, 4, + field :create_secret, 5, type: LivebookProto.CreateSecretResponse, json_name: "createSecret", oneof: 0 diff --git a/proto/messages.proto b/proto/messages.proto index 289a5dafd..cb6b68306 100644 --- a/proto/messages.proto +++ b/proto/messages.proto @@ -9,6 +9,15 @@ message Error { string details = 1; } +message FieldError { + string field = 1; + repeated string details = 2; +} + +message ChangesetError { + repeated FieldError errors = 1; +} + message SecretCreated { string name = 1; string value = 2; @@ -50,9 +59,10 @@ message Response { oneof type { Error error = 2; + ChangesetError changeset = 3; - SessionResponse session = 3; - CreateSecretResponse create_secret = 4; + SessionResponse session = 4; + CreateSecretResponse create_secret = 5; } } diff --git a/test/livebook/hubs/enterprise_client_test.exs b/test/livebook/hubs/enterprise_client_test.exs index 4394f6ed8..c801b4d76 100644 --- a/test/livebook/hubs/enterprise_client_test.exs +++ b/test/livebook/hubs/enterprise_client_test.exs @@ -22,7 +22,7 @@ defmodule Livebook.Hubs.EnterpriseClientTest do enterprise = build(:enterprise, url: "http://localhost:9999", token: token) EnterpriseClient.start_link(enterprise) - assert_receive {:connect, :error, %Mint.TransportError{reason: :econnrefused}} + assert_receive {:connect, :error, "connection refused"} end test "rejects the web socket connection with invalid credentials", %{url: url} do diff --git a/test/livebook/hubs_test.exs b/test/livebook/hubs_test.exs index ec39e9a6c..e9aeed9d0 100644 --- a/test/livebook/hubs_test.exs +++ b/test/livebook/hubs_test.exs @@ -9,18 +9,18 @@ defmodule Livebook.HubsTest do :ok end - test "fetch_hubs/0 returns a list of persisted hubs" do + test "get_hubs/0 returns a list of persisted hubs" do fly = insert_hub(:fly, id: "fly-baz") - assert Hubs.fetch_hubs() == [fly] + assert Hubs.get_hubs() == [fly] Hubs.delete_hub("fly-baz") - assert Hubs.fetch_hubs() == [] + assert Hubs.get_hubs() == [] end - test "fetch_metadata/0 returns a list of persisted hubs normalized" do + test "get_metadata/0 returns a list of persisted hubs normalized" do fly = insert_hub(:fly, id: "fly-livebook") - assert Hubs.fetch_metadatas() == [ + assert Hubs.get_metadatas() == [ %Hubs.Metadata{ id: "fly-livebook", color: fly.hub_color, @@ -30,7 +30,7 @@ defmodule Livebook.HubsTest do ] Hubs.delete_hub("fly-livebook") - assert Hubs.fetch_metadatas() == [] + assert Hubs.get_metadatas() == [] end test "fetch_hub!/1 returns one persisted fly" do diff --git a/test/livebook/web_socket/server_test.exs b/test/livebook/web_socket/server_test.exs index 0b48ed60f..84dba480e 100644 --- a/test/livebook/web_socket/server_test.exs +++ b/test/livebook/web_socket/server_test.exs @@ -17,7 +17,7 @@ defmodule Livebook.WebSocket.ServerTest do headers = [{"X-Auth-Token", token}] assert {:ok, _conn} = Server.start_link(self(), "http://localhost:9999", headers) - refute_receive {:connect, :ok, :connected} + assert_receive {:connect, :error, "connection refused"} end test "rejects the websocket connection with invalid credentials", %{url: url} do @@ -71,7 +71,8 @@ defmodule Livebook.WebSocket.ServerTest do value: "" ) - assert Server.send_request(conn, create_secret_request) == {:error, "value: can't be blank"} + assert {:changeset_error, errors} = Server.send_request(conn, create_secret_request) + assert "can't be blank" in errors.value end end @@ -105,8 +106,8 @@ defmodule Livebook.WebSocket.ServerTest do test "receives the disconnect message from websocket server", %{conn: conn, test: name} do EnterpriseServer.disconnect(name) - assert_receive {:connect, :error, %Mint.TransportError{reason: :closed}} - assert_receive {:connect, :error, %Mint.TransportError{reason: :econnrefused}} + assert_receive {:connect, :error, "socket closed"} + assert_receive {:connect, :error, "connection refused"} assert Process.alive?(conn) end @@ -114,8 +115,8 @@ defmodule Livebook.WebSocket.ServerTest do test "reconnects after websocket server is up", %{test: name} do EnterpriseServer.disconnect(name) - assert_receive {:connect, :error, %Mint.TransportError{reason: :closed}} - assert_receive {:connect, :error, %Mint.TransportError{reason: :econnrefused}} + assert_receive {:connect, :error, "socket closed"} + assert_receive {:connect, :error, "connection refused"} Process.sleep(1000) diff --git a/test/livebook_web/live/session_live/secrets_component_test.exs b/test/livebook_web/live/session_live/secrets_component_test.exs index 8b431e4df..90a8adc03 100644 --- a/test/livebook_web/live/session_live/secrets_component_test.exs +++ b/test/livebook_web/live/session_live/secrets_component_test.exs @@ -8,19 +8,21 @@ defmodule LivebookWeb.SessionLive.SecretsComponentTest do alias Livebook.Sessions describe "enterprise" do - setup %{user: user, url: url, token: token} do - Livebook.Hubs.delete_hub("enterprise-#{user.id}") + setup %{url: url, token: token} do + id = Livebook.Utils.random_id() + Livebook.Hubs.delete_hub("enterprise-#{id}") enterprise = insert_hub(:enterprise, - id: "enterprise-#{user.id}", - external_id: user.id, + id: "enterprise-#{id}", + external_id: id, url: url, token: token ) {:ok, session} = Sessions.create_session(notebook: Livebook.Notebook.new()) Livebook.Hubs.EnterpriseClient.subscribe() + Livebook.Hubs.connect_hubs() on_exit(fn -> Session.close(session.pid) @@ -42,7 +44,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponentTest do data: %{ name: "FOO", value: "123", - store: "enterprise" + store: "hub" } }) =~ ~s() end @@ -58,8 +60,8 @@ defmodule LivebookWeb.SessionLive.SecretsComponentTest do data: %{ name: "FOO", value: "123", - store: "enterprise", - enterprise_hub: enterprise.id + store: "hub", + connected_hub: enterprise.id } }