From 9ff84f204ffacff4e805cb240989415577fa173f Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Mon, 26 Dec 2022 21:16:47 -0300 Subject: [PATCH] Implement event handling from WebSocket (#1608) --- lib/livebook/hubs/enterprise_client.ex | 33 ++-- lib/livebook/hubs/provider.ex | 12 +- lib/livebook/web_socket/server.ex | 185 ++++++------------ .../live/hub/new/enterprise_component.ex | 4 +- proto/lib/livebook_proto/event.pb.ex | 16 ++ proto/lib/livebook_proto/secret_created.pb.ex | 7 + proto/lib/livebook_proto/secret_updated.pb.ex | 7 + proto/messages.proto | 17 ++ test/livebook/hubs/enterprise_client_test.exs | 36 +++- test/livebook/web_socket/server_test.exs | 59 ++++-- test/support/integration/enterprise_server.ex | 40 +++- 11 files changed, 237 insertions(+), 179 deletions(-) create mode 100644 proto/lib/livebook_proto/event.pb.ex create mode 100644 proto/lib/livebook_proto/secret_created.pb.ex create mode 100644 proto/lib/livebook_proto/secret_updated.pb.ex diff --git a/lib/livebook/hubs/enterprise_client.ex b/lib/livebook/hubs/enterprise_client.ex index 37680dc0c..88c47cb82 100644 --- a/lib/livebook/hubs/enterprise_client.ex +++ b/lib/livebook/hubs/enterprise_client.ex @@ -3,6 +3,7 @@ defmodule Livebook.Hubs.EnterpriseClient do use GenServer alias Livebook.Hubs.Enterprise + alias Livebook.Secrets.Secret alias Livebook.WebSocket.Server @pubsub_topic "enterprise" @@ -25,24 +26,15 @@ defmodule Livebook.Hubs.EnterpriseClient do Server.send_request(GenServer.call(pid, :get_server), data) end - @doc """ - Disconnects the Enterprise client with WebSocket server. - """ - @spec disconnect(pid()) :: :ok - def disconnect(pid) do - GenServer.cast(pid, :disconnect) - end - @doc """ Subscribe to WebSocket Server events. ## Messages - * `{:unknown, :error, reason}` - * `{:connect, :ok, :waiting_upgrade | :connected}` + * `{:connect, :ok, :connected}` * `{:connect, :error, reason}` - * `{:disconnect, :ok, :disconnected}` - * `{:disconnect, :error, reason}` + * `{:secret_created, %Secret{}}` + * `{:secret_updated, %Secret{}}` """ @spec subscribe() :: :ok | {:error, {:already_registered, pid()}} @@ -73,13 +65,6 @@ defmodule Livebook.Hubs.EnterpriseClient do {:reply, state.server, state} end - @impl true - def handle_cast(:disconnect, state) do - with :ok <- Server.close(state.server) do - {:noreply, state} - end - end - @impl true def handle_info({:connect, _, _} = message, state) do broadcast_message(message) @@ -91,6 +76,16 @@ defmodule Livebook.Hubs.EnterpriseClient do {:noreply, state} end + def handle_info({:event, :secret_created, %{name: name, value: value}}, state) do + broadcast_message({:secret_created, %Secret{name: name, value: value}}) + {:noreply, state} + end + + def handle_info({:event, :secret_updated, %{name: name, value: value}}, state) do + broadcast_message({:secret_updated, %Secret{name: name, value: value}}) + {:noreply, state} + end + def handle_info({:disconnect, :ok, :disconnected}, state) do {:stop, :normal, state} end diff --git a/lib/livebook/hubs/provider.ex b/lib/livebook/hubs/provider.ex index 79c1914bf..91014828e 100644 --- a/lib/livebook/hubs/provider.ex +++ b/lib/livebook/hubs/provider.ex @@ -1,27 +1,21 @@ defprotocol Livebook.Hubs.Provider do @moduledoc false - @type t :: %{ - required(:__struct__) => module(), - required(:id) => String.t(), - optional(any()) => any() - } - @doc """ Normalize given struct to `Livebook.Hubs.Metadata` struct. """ - @spec normalize(t()) :: Livebook.Hubs.Metadata.t() + @spec normalize(struct()) :: Livebook.Hubs.Metadata.t() def normalize(struct) @doc """ Loads fields into given struct. """ - @spec load(t(), map() | keyword()) :: t() + @spec load(struct(), map() | keyword()) :: struct() def load(struct, fields) @doc """ Gets the type from struct. """ - @spec type(t()) :: String.t() + @spec type(struct()) :: String.t() def type(struct) end diff --git a/lib/livebook/web_socket/server.ex b/lib/livebook/web_socket/server.ex index 7e7b25717..0da871977 100644 --- a/lib/livebook/web_socket/server.ex +++ b/lib/livebook/web_socket/server.ex @@ -21,22 +21,6 @@ defmodule Livebook.WebSocket.Server do Connection.start_link(__MODULE__, {listener, url, headers}) end - @doc """ - Checks if the given WebSocket Server is connected. - """ - @spec connected?(pid()) :: boolean() - def connected?(conn) do - Connection.call(conn, :connected?, @timeout) - end - - @doc """ - Closes the given WebSocket Server connection. - """ - @spec close(pid()) :: :ok - def close(conn) do - Connection.call(conn, :close, @timeout) - end - @doc """ Sends a Request to given WebSocket Server. """ @@ -57,7 +41,6 @@ defmodule Livebook.WebSocket.Server do def connect(_, state) do case Client.connect(state.url, state.headers) do {:ok, conn, ref} -> - send(state.listener, {:connect, :ok, :waiting_upgrade}) {:ok, %{state | http_conn: conn, ref: ref}} {:error, exception} when is_exception(exception) -> @@ -77,55 +60,19 @@ defmodule Livebook.WebSocket.Server do @dialyzer {:nowarn_function, disconnect: 2} @impl true - def disconnect({:close, caller}, state) do - Connection.reply(caller, :ok) - - case Client.disconnect(state.http_conn, state.websocket, state.ref) do - {:ok, conn, websocket} -> - send(state.listener, {:disconnect, :ok, :disconnected}) - {:noconnect, %{state | http_conn: conn, websocket: websocket}} - - {:error, conn, websocket, reason} -> - send(state.listener, {:disconnect, :error, reason}) - {:noconnect, %{state | http_conn: conn, websocket: websocket}} - end - end - def disconnect(info, state) do case info do + {:close, from} -> Logger.debug("Received close from: #{inspect(from)}") {:error, :closed} -> Logger.error("Connection closed") {:error, reason} -> Logger.error("Connection error: #{inspect(reason)}") end - case Client.disconnect(state.http_conn, state.websocket, state.ref) do - {:ok, conn, websocket} -> - send(state.listener, {:disconnect, :ok, :disconnected}) - - {:connect, :reconnect, %{state | http_conn: conn, websocket: websocket}} - - {:error, conn, websocket, reason} -> - Logger.error("Received error: #{inspect(reason)}") - send(state.listener, {:disconnect, :error, reason}) - - {:connect, :reconnect, %{state | http_conn: conn, websocket: websocket}} - end + {:connect, :reconnect, state} end ## GenServer callbacks @impl true - def handle_call(:connected?, _from, state) do - if conn = state.http_conn do - {:reply, conn.state == :open, state} - else - {:reply, false, state} - end - end - - def handle_call(:close, caller, state) do - {:disconnect, {:close, caller}, state} - end - def handle_call({:request, data}, caller, state) do id = state.id frame = LivebookProto.build_request_frame(data, id) @@ -144,100 +91,98 @@ defmodule Livebook.WebSocket.Server do def handle_info(message, state) do case Client.receive(state.http_conn, state.ref, state.websocket, message) do {:ok, conn, websocket, :connected} -> - send(state.listener, build_message({:ok, :connected})) + state = send_received({:ok, :connected}, state) {:noreply, %{state | http_conn: conn, websocket: websocket}} {:error, conn, websocket, %Mint.TransportError{} = reason} -> - send(state.listener, build_message({:error, reason})) + state = send_received({:error, reason}, state) {:connect, :receive, %{state | http_conn: conn, websocket: websocket}} {term, conn, websocket, data} -> - state = - {term, data} - |> build_message() - |> send_reply(state) + state = send_received({term, data}, state) {:noreply, %{state | http_conn: conn, websocket: websocket}} {:error, _} = error -> - send(state.listener, build_message(error)) - - {:noreply, state} + {:noreply, send_received(error, state)} end end # Private - defp send_reply({:response, :error, reason}, state) do + defp send_received({:ok, :connected}, state) do + send(state.listener, {:connect, :ok, :connected}) + state + end + + defp send_received({:ok, %Client.Response{body: nil, status: nil}}, state), do: state + + defp send_received({:ok, %Client.Response{body: body}}, state) do + case decode_response_or_event(body) do + {:response, %{id: -1, type: {:error, %{details: reason}}}} -> + reply_to_all({:error, reason}, state) + + {:response, %{id: id, type: {:error, %{details: reason}}}} -> + reply_to_id(id, {:error, reason}, state) + + {:response, %{id: id, type: result}} -> + reply_to_id(id, result, state) + + {:event, %{type: {name, data}}} -> + send(state.listener, {:event, name, data}) + state + end + end + + defp send_received({:error, :unknown}, state), do: state + + defp send_received({:error, %Mint.TransportError{} = reason}, state) do + send(state.listener, {:connect, :error, reason}) + state + end + + defp send_received({:error, %Client.Response{body: body, status: status}}, state) + when body != nil and status != nil do + %{type: {:error, %{details: reason}}} = LivebookProto.Response.decode(body) + send(state.listener, {:connect, :error, reason}) + + state + end + + defp send_received({:error, %Client.Response{body: nil, status: status}}, state) + when status != nil do + reply_to_all({:error, Plug.Conn.Status.reason_phrase(status)}, state) + end + + defp send_received({:error, %Client.Response{body: body, status: nil}}, state) do + case LivebookProto.Response.decode(body) do + %{id: -1, type: {:error, %{details: reason}}} -> reply_to_all({:error, reason}, state) + %{id: id, type: {:error, %{details: reason}}} -> reply_to_id(id, {:error, reason}, state) + end + end + + defp reply_to_all(message, state) do for {id, caller} <- state.reply, reduce: state do acc -> - Connection.reply(caller, {:error, reason}) + Connection.reply(caller, message) %{acc | reply: Map.delete(acc.reply, id)} end end - defp send_reply({:response, id, result}, state) do + defp reply_to_id(id, message, state) do if caller = state.reply[id] do - Connection.reply(caller, result) + Connection.reply(caller, message) end %{state | reply: Map.delete(state.reply, id)} end - defp send_reply(message, state) do - send(state.listener, message) - - state - end - - defp build_message({:ok, :connected}) do - {:connect, :ok, :connected} - end - - defp build_message({:ok, %Client.Response{body: nil, status: nil}}) do - :pong - end - - defp build_message({:error, %Client.Response{body: nil, status: status}}) do - {:response, :error, Plug.Conn.Status.reason_phrase(status)} - end - - defp build_message({:ok, %Client.Response{body: body}}) do - case LivebookProto.Response.decode(body) do - %{id: -1, type: {:error, %{details: reason}}} -> - {:response, :error, reason} - - %{id: id, type: {:error, %{details: reason}}} -> - {:response, id, {:error, reason}} - - %{id: id, type: result} -> - {:response, id, result} + defp decode_response_or_event(data) do + case LivebookProto.Response.decode(data) do + %{type: nil} -> {:event, LivebookProto.Event.decode(data)} + response -> {:response, response} end end - - defp build_message({:error, %Client.Response{body: body} = response}) - when response.status != nil do - %{type: {:error, %{details: reason}}} = LivebookProto.Response.decode(body) - {:connect, :error, reason} - end - - defp build_message({:error, %Client.Response{body: body}}) do - case LivebookProto.Response.decode(body) do - %{id: -1, type: {:error, %{details: reason}}} -> - {:response, :error, reason} - - %{id: id, type: {:error, %{details: reason}}} -> - {:response, id, {:error, reason}} - end - end - - defp build_message({:error, %Mint.TransportError{} = reason}) do - {:connect, :error, reason} - end - - defp build_message({:error, reason}) do - {:unknown, :error, reason} - end end diff --git a/lib/livebook_web/live/hub/new/enterprise_component.ex b/lib/livebook_web/live/hub/new/enterprise_component.ex index 91708e4ab..2638a4bbb 100644 --- a/lib/livebook_web/live/hub/new/enterprise_component.ex +++ b/lib/livebook_web/live/hub/new/enterprise_component.ex @@ -119,7 +119,7 @@ defmodule LivebookWeb.Hub.New.EnterpriseComponent do receive do {:connect, :error, reason} -> - EnterpriseClient.disconnect(pid) + GenServer.stop(pid) handle_error(reason, socket) {:connect, :ok, :connected} -> @@ -134,7 +134,7 @@ defmodule LivebookWeb.Hub.New.EnterpriseComponent do {:noreply, assign(socket, connected: true, changeset: changeset, base: base)} {:error, reason} -> - EnterpriseClient.disconnect(pid) + GenServer.stop(pid) handle_error(reason, socket) end end diff --git a/proto/lib/livebook_proto/event.pb.ex b/proto/lib/livebook_proto/event.pb.ex new file mode 100644 index 000000000..5f71e048d --- /dev/null +++ b/proto/lib/livebook_proto/event.pb.ex @@ -0,0 +1,16 @@ +defmodule LivebookProto.Event do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 + + oneof :type, 0 + + field :secret_created, 100, + type: LivebookProto.SecretCreated, + json_name: "secretCreated", + oneof: 0 + + field :secret_updated, 101, + type: LivebookProto.SecretUpdated, + json_name: "secretUpdated", + oneof: 0 +end diff --git a/proto/lib/livebook_proto/secret_created.pb.ex b/proto/lib/livebook_proto/secret_created.pb.ex new file mode 100644 index 000000000..e1aa2348e --- /dev/null +++ b/proto/lib/livebook_proto/secret_created.pb.ex @@ -0,0 +1,7 @@ +defmodule LivebookProto.SecretCreated do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 + + field :name, 1, type: :string + field :value, 2, type: :string +end diff --git a/proto/lib/livebook_proto/secret_updated.pb.ex b/proto/lib/livebook_proto/secret_updated.pb.ex new file mode 100644 index 000000000..3994ab9e6 --- /dev/null +++ b/proto/lib/livebook_proto/secret_updated.pb.ex @@ -0,0 +1,7 @@ +defmodule LivebookProto.SecretUpdated do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 + + field :name, 1, type: :string + field :value, 2, type: :string +end diff --git a/proto/messages.proto b/proto/messages.proto index ea59a3045..fba747db4 100644 --- a/proto/messages.proto +++ b/proto/messages.proto @@ -9,6 +9,16 @@ message Error { string details = 1; } +message SecretCreated { + string name = 1; + string value = 2; +} + +message SecretUpdated { + string name = 1; + string value = 2; +} + message SessionRequest { string app_version = 1; } @@ -35,3 +45,10 @@ message Response { SessionResponse session = 3; } } + +message Event { + oneof type { + SecretCreated secret_created = 100; + SecretUpdated secret_updated = 101; + } +} diff --git a/test/livebook/hubs/enterprise_client_test.exs b/test/livebook/hubs/enterprise_client_test.exs index 7686b8c8c..1e8a43d38 100644 --- a/test/livebook/hubs/enterprise_client_test.exs +++ b/test/livebook/hubs/enterprise_client_test.exs @@ -3,6 +3,7 @@ defmodule Livebook.Hubs.EnterpriseClientTest do @moduletag :capture_log alias Livebook.Hubs.EnterpriseClient + alias Livebook.Secrets.Secret setup do EnterpriseClient.subscribe() @@ -14,7 +15,6 @@ defmodule Livebook.Hubs.EnterpriseClientTest do enterprise = build(:enterprise, url: url, token: token) assert {:ok, _pid} = EnterpriseClient.start_link(enterprise) - assert_receive {:connect, :ok, :waiting_upgrade} assert_receive {:connect, :ok, :connected} end @@ -33,4 +33,38 @@ defmodule Livebook.Hubs.EnterpriseClientTest do assert reason =~ "the given token is invalid" end end + + describe "handle events" do + setup %{url: url, token: token} do + enterprise = build(:enterprise, url: url, token: token) + assert {:ok, _pid} = EnterpriseClient.start_link(enterprise) + + assert_receive {:connect, :ok, :connected} + + :ok + end + + test "receives a secret_created event" do + name = "API_TOKEN_ID" + value = Livebook.Utils.random_id() + node = EnterpriseServer.get_node() + :erpc.call(node, Enterprise.Integration, :create_secret, [name, value]) + + assert_receive {:secret_created, %Secret{name: ^name, value: ^value}} + end + + test "receives a secret_updated event" do + name = "SUPER_SUDO_USER" + value = "JakePeralta" + node = EnterpriseServer.get_node() + secret = :erpc.call(node, Enterprise.Integration, :create_secret, [name, value]) + + assert_receive {:secret_created, %Secret{name: ^name, value: ^value}} + + new_value = "ChonkyCat" + :erpc.call(node, Enterprise.Integration, :update_secret, [secret, new_value]) + + assert_receive {:secret_updated, %Secret{name: ^name, value: ^new_value}} + end + end end diff --git a/test/livebook/web_socket/server_test.exs b/test/livebook/web_socket/server_test.exs index e6e1c1cad..5531d07a0 100644 --- a/test/livebook/web_socket/server_test.exs +++ b/test/livebook/web_socket/server_test.exs @@ -9,33 +9,29 @@ defmodule Livebook.WebSocket.ServerTest do test "successfully authenticates the websocket connection", %{url: url, token: token} do headers = [{"X-Auth-Token", token}] - assert {:ok, conn} = Server.start_link(self(), url, headers) - assert_receive {:connect, :ok, :waiting_upgrade} + assert {:ok, _conn} = Server.start_link(self(), url, headers) assert_receive {:connect, :ok, :connected} - assert Server.connected?(conn) end test "rejects the websocket with invalid address", %{token: token} do headers = [{"X-Auth-Token", token}] - assert {:ok, conn} = Server.start_link(self(), "http://localhost:9999", headers) - refute Server.connected?(conn) + assert {:ok, _conn} = Server.start_link(self(), "http://localhost:9999", headers) + refute_receive {:connect, :ok, :connected} end test "rejects the websocket connection with invalid credentials", %{url: url} do headers = [{"X-Auth-Token", "foo"}] - assert {:ok, conn} = Server.start_link(self(), url, headers) + assert {:ok, _conn} = Server.start_link(self(), url, headers) assert_receive {:connect, :error, reason} assert reason =~ "the given token is invalid" - assert Server.close(conn) == :ok - assert {:ok, conn} = Server.start_link(self(), url) + assert {:ok, _conn} = Server.start_link(self(), url) assert_receive {:connect, :error, reason} assert reason =~ "could not get the token from the connection" - assert Server.close(conn) == :ok end end @@ -45,7 +41,6 @@ defmodule Livebook.WebSocket.ServerTest do {:ok, conn} = Server.start_link(self(), url, headers) - assert_receive {:connect, :ok, :waiting_upgrade} assert_receive {:connect, :ok, :connected} {:ok, conn: conn} @@ -80,9 +75,7 @@ defmodule Livebook.WebSocket.ServerTest do assert {:ok, conn} = Server.start_link(self(), url, headers) - assert_receive {:connect, :ok, :waiting_upgrade} assert_receive {:connect, :ok, :connected} - assert Server.connected?(conn) on_exit(fn -> EnterpriseServer.disconnect(name) @@ -99,27 +92,55 @@ defmodule Livebook.WebSocket.ServerTest do assert_receive {:connect, :error, %Mint.TransportError{reason: :econnrefused}} assert Process.alive?(conn) - refute Server.connected?(conn) end - test "reconnects after websocket server is up", %{conn: conn, test: name} do + test "reconnects after websocket server is up", %{test: name} do EnterpriseServer.disconnect(name) assert_receive {:connect, :error, %Mint.TransportError{reason: :closed}} assert_receive {:connect, :error, %Mint.TransportError{reason: :econnrefused}} Process.sleep(1000) - refute Server.connected?(conn) # Wait until the server is up again assert EnterpriseServer.reconnect(name) == :ok - assert_receive {:connect, :ok, :waiting_upgrade}, 3000 assert_receive {:connect, :ok, :connected}, 3000 + end + end - assert Server.connected?(conn) - assert Server.close(conn) == :ok - refute Server.connected?(conn) + describe "handle events from server" do + setup %{url: url, token: token} do + headers = [{"X-Auth-Token", token}] + + {:ok, _conn} = Server.start_link(self(), url, headers) + + assert_receive {:connect, :ok, :connected} + + :ok + end + + test "receives a secret_created event" do + name = "MY_SECRET_ID" + value = Livebook.Utils.random_id() + node = EnterpriseServer.get_node() + :erpc.call(node, Enterprise.Integration, :create_secret, [name, value]) + + assert_receive {:event, :secret_created, %{name: ^name, value: ^value}} + end + + test "receives a secret_updated event" do + name = "API_USERNAME" + value = "JakePeralta" + node = EnterpriseServer.get_node() + secret = :erpc.call(node, Enterprise.Integration, :create_secret, [name, value]) + + assert_receive {:event, :secret_created, %{name: ^name, value: ^value}} + + new_value = "ChonkyCat" + :erpc.call(node, Enterprise.Integration, :update_secret, [secret, new_value]) + + assert_receive {:event, :secret_updated, %{name: ^name, value: ^new_value}} end end end diff --git a/test/support/integration/enterprise_server.ex b/test/support/integration/enterprise_server.ex index 9f9c4e62c..68dc533c6 100644 --- a/test/support/integration/enterprise_server.ex +++ b/test/support/integration/enterprise_server.ex @@ -23,8 +23,15 @@ defmodule Livebook.EnterpriseServer do GenServer.call(name, :fetch_user, @timeout) end + def get_node(name \\ @name) do + GenServer.call(name, :fetch_node) + end + def drop_database(name \\ @name) do - GenServer.cast(name, :drop_database) + app_port = GenServer.call(name, :fetch_port) + state_env = GenServer.call(name, :fetch_env) + + app_port |> env(state_env) |> mix(["ecto.drop", "--quiet"]) end def reconnect(name \\ @name) do @@ -74,12 +81,20 @@ defmodule Livebook.EnterpriseServer do {:reply, url, %{state | url: url}} end - @impl true - def handle_cast(:drop_database, state) do - :ok = mix(state, ["ecto.drop", "--quiet"]) - {:noreply, state} + def handle_call(:fetch_node, _from, state) do + {:reply, state.node, state} end + def handle_call(:fetch_port, _from, state) do + port = state.app_port || app_port() + {:reply, port, state} + end + + def handle_call(:fetch_env, _from, state) do + {:reply, state.env, state} + end + + @impl true def handle_cast(:reconnect, state) do if state.port do {:noreply, state} @@ -213,10 +228,14 @@ defmodule Livebook.EnterpriseServer do end end - defp mix(state, args) do + defp mix(state, args) when is_struct(state) do + state |> env() |> mix(args) + end + + defp mix(env, args) do cmd_opts = [ stderr_to_stdout: true, - env: env(state), + env: env, cd: app_dir(), into: IO.stream(:stdio, :line) ] @@ -231,15 +250,18 @@ defmodule Livebook.EnterpriseServer do defp env(state) do app_port = state.app_port || app_port() + env(app_port, state.env) + end + defp env(app_port, state_env) do env = %{ "MIX_ENV" => "livebook", "LIVEBOOK_ENTERPRISE_PORT" => to_string(app_port), "LIVEBOOK_ENTERPRISE_DEBUG" => debug() } - if state.env do - Map.merge(env, state.env) + if state_env do + Map.merge(env, state_env) else env end