From 47fe94f5604d828127dcb4609864641f8a9037ad Mon Sep 17 00:00:00 2001 From: Andrea Leopardi Date: Sat, 15 Apr 2023 10:47:33 +0200 Subject: [PATCH] Replace "Connection" with gen_statem (#1865) --- lib/livebook/web_socket/client_connection.ex | 142 +++++++++---------- mix.exs | 3 +- mix.lock | 2 - 3 files changed, 72 insertions(+), 75 deletions(-) diff --git a/lib/livebook/web_socket/client_connection.ex b/lib/livebook/web_socket/client_connection.ex index 6565be5b6..891723fc9 100644 --- a/lib/livebook/web_socket/client_connection.ex +++ b/lib/livebook/web_socket/client_connection.ex @@ -1,6 +1,7 @@ defmodule Livebook.WebSocket.ClientConnection do @moduledoc false - use Connection + + @behaviour :gen_statem require Logger @@ -9,15 +10,17 @@ defmodule Livebook.WebSocket.ClientConnection do @timeout 10_000 @backoff 5_000 + @no_state :no_state + @loop_ping_delay 5_000 defstruct [:url, :listener, :headers, :http_conn, :websocket, :ref, id: 0, reply: %{}] @doc """ Starts a new WebSocket connection with given URL and headers. """ - @spec start_link(pid(), String.t(), Mint.Types.headers()) :: GenServer.on_start() + @spec start_link(pid(), String.t(), Mint.Types.headers()) :: :gen_statem.start_ret() def start_link(listener, url, headers \\ []) do - Connection.start_link(__MODULE__, {listener, url, headers}) + :gen_statem.start_link(__MODULE__, {listener, url, headers}, []) end @doc """ @@ -25,112 +28,109 @@ defmodule Livebook.WebSocket.ClientConnection do """ @spec send_request(pid(), WebSocket.proto()) :: {atom(), term()} def send_request(conn, %_struct{} = data) do - Connection.call(conn, {:request, data}, @timeout) + :gen_statem.call(conn, {:request, data}, @timeout) end - ## Connection callbacks + ## gen_statem callbacks + + @impl true + def callback_mode, do: :handle_event_function @impl true def init({listener, url, headers}) do - state = %__MODULE__{listener: listener, url: url, headers: headers} - {:connect, :init, state} + data = %__MODULE__{listener: listener, url: url, headers: headers} + {:ok, @no_state, data, {:next_event, :internal, :connect}} end @impl true - def connect(_, state) do - case Client.connect(state.url, state.headers) do + def handle_event(event_type, event_data, state, data) + + def handle_event(:internal, :connect, @no_state, %__MODULE__{} = data) do + case Client.connect(data.url, data.headers) do {:ok, conn, websocket, ref} -> - send(state.listener, {:connect, :ok, :connected}) + send(data.listener, {:connect, :ok, :connected}) send(self(), {:loop_ping, ref}) - {:ok, %{state | http_conn: conn, ref: ref, websocket: websocket}} + {:ok, %__MODULE__{data | http_conn: conn, ref: ref, websocket: websocket}} {:transport_error, reason} -> - send(state.listener, {:connect, :error, reason}) - - {:backoff, @backoff, state} + send(data.listener, {:connect, :error, reason}) + {:keep_state_and_data, {{:timeout, :backoff}, @backoff, nil}} {:server_error, binary} -> {:response, %{type: {:error, error}}} = decode_response_or_event(binary) - send(state.listener, {:connect, :error, error.details}) - - {:backoff, @backoff, state} + send(data.listener, {:connect, :error, error.details}) + {:keep_state_and_data, {{:timeout, :reconnect}, @backoff, nil}} end end - @dialyzer {:nowarn_function, disconnect: 2} - - @impl true - def disconnect(info, state) do - case info do - {:close, from} -> Logger.debug("Received close from: #{inspect(from)}") - {:error, :closed} -> Logger.error("Connection closed") - {:error, reason} -> Logger.error("Connection error: #{inspect(reason)}") - end - - {:connect, :reconnect, state} + def handle_event({:timeout, :reconnect}, nil, _state, _data) do + {:keep_state_and_data, {:next_event, :internal, :connect}} end - ## GenServer callbacks + def handle_event({:call, from}, {:request, request}, @no_state, %__MODULE__{id: id} = data) do + frame = LivebookProto.build_request_frame(request, id) + reply = Map.put(data.reply, id, from) - @impl true - def handle_call({:request, data}, caller, state) do - id = state.id - frame = LivebookProto.build_request_frame(data, id) - reply = Map.put(state.reply, id, caller) - - case Client.send(state.http_conn, state.websocket, state.ref, frame) do + case Client.send(data.http_conn, data.websocket, data.ref, frame) do {:ok, conn, websocket} -> - {:noreply, %{state | http_conn: conn, websocket: websocket, id: id + 1, reply: reply}} + data = %__MODULE__{data | http_conn: conn, websocket: websocket, id: id + 1, reply: reply} + {:keep_state, data} {:error, conn, websocket, reason} -> - {:reply, {:error, reason}, %{state | http_conn: conn, websocket: websocket}} + data = %__MODULE__{data | http_conn: conn, websocket: websocket} + {:keep_state, data, {:reply, from, {:error, reason}}} end end - @loop_ping_delay 5_000 - - @impl true - def handle_info({:loop_ping, ref}, state) when ref == state.ref and is_reference(ref) do - case Client.send(state.http_conn, state.websocket, state.ref, :ping) do + def handle_event(:info, {:loop_ping, ref}, @no_state, %__MODULE__{} = data) + when ref == data.ref and is_reference(ref) do + case Client.send(data.http_conn, data.websocket, data.ref, :ping) do {:ok, conn, websocket} -> - Process.send_after(self(), {:loop_ping, state.ref}, @loop_ping_delay) - {:noreply, %{state | http_conn: conn, websocket: websocket}} + Process.send_after(self(), {:loop_ping, data.ref}, @loop_ping_delay) + {:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}} {:error, conn, websocket, _reason} -> - {:noreply, %{state | http_conn: conn, websocket: websocket}} + {:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}} end end - def handle_info({:loop_ping, _another_ref}, state), do: {:noreply, state} + def handle_event(:info, {:loop_ping, _another_ref}, @no_state, _data) do + :keep_state_and_data + end - def handle_info({:tcp_closed, _port} = message, state), - do: handle_websocket_message(message, state) + def handle_event(:info, {:tcp_closed, _port} = message, @no_state, %__MODULE__{} = data) do + handle_websocket_message(message, data) + end - def handle_info({:tcp, _port, _data} = message, state), - do: handle_websocket_message(message, state) + def handle_event(:info, {:tcp, _port, _data} = message, @no_state, %__MODULE__{} = data) do + handle_websocket_message(message, data) + end - def handle_info(_message, state), do: {:noreply, state} + def handle_event(:info, _message, @no_state, _data) do + :keep_state_and_data + end # Private - def handle_websocket_message(message, state) do - case Client.receive(state.http_conn, state.ref, state.websocket, message) do - {:ok, conn, websocket, data} -> - state = %{state | http_conn: conn, websocket: websocket} - {:noreply, send_received(data, state)} + defp handle_websocket_message(message, %__MODULE__{} = data) do + case Client.receive(data.http_conn, data.ref, data.websocket, message) do + {:ok, conn, websocket, binaries} -> + data = %__MODULE__{data | http_conn: conn, websocket: websocket} + data = send_received(binaries, data) + {:keep_state, data} {:server_error, conn, websocket, reason} -> - send(state.listener, {:connect, :error, reason}) - - {:connect, :receive, %{state | http_conn: conn, websocket: websocket}} + send(data.listener, {:connect, :error, reason}) + data = %__MODULE__{data | http_conn: conn, websocket: websocket} + {:keep_state, data, {:next_event, :internal, :connect}} end end - defp send_received([], state), do: state + defp send_received([], data), do: data - defp send_received([_ | _] = binaries, state) do - for binary <- binaries, reduce: state do + defp send_received([_ | _] = binaries, data) do + for binary <- binaries, reduce: data do acc -> case decode_response_or_event(binary) do {:response, %{id: -1, type: {:error, %{details: reason}}}} -> @@ -158,19 +158,19 @@ defmodule Livebook.WebSocket.ClientConnection do end end - defp reply_to_all(message, state) do - for {_id, caller} <- state.reply do - Connection.reply(caller, message) + defp reply_to_all(message, %__MODULE__{} = data) do + for {_id, caller} <- data.reply do + :gen_statem.reply(caller, message) end - state + data end - defp reply_to_id(id, message, state) do - {caller, reply} = Map.pop(state.reply, id) - if caller, do: Connection.reply(caller, message) + defp reply_to_id(id, message, %__MODULE__{} = data) do + {caller, reply} = Map.pop(data.reply, id) + if caller, do: :gen_statem.reply(caller, message) - %{state | reply: reply} + %__MODULE__{data | reply: reply} end defp decode_response_or_event(data) do diff --git a/mix.exs b/mix.exs index ee3c4a198..d57418b5a 100644 --- a/mix.exs +++ b/mix.exs @@ -101,8 +101,7 @@ defmodule Livebook.MixProject do {:protobuf, "~> 0.8.0"}, {:phoenix_live_reload, "~> 1.2", only: :dev}, {:floki, ">= 0.27.0", only: :test}, - {:bypass, "~> 2.1", only: :test}, - {:connection, "~> 1.1.0"} + {:bypass, "~> 2.1", only: :test} ] end diff --git a/mix.lock b/mix.lock index 06b3da475..6a64d9e28 100644 --- a/mix.lock +++ b/mix.lock @@ -2,7 +2,6 @@ "aws_signature": {:hex, :aws_signature, "0.3.1", "67f369094cbd55ffa2bbd8cc713ede14b195fcfb45c86665cd7c5ad010276148", [:rebar3], [], "hexpm", "50fc4dc1d1f7c2d0a8c63f455b3c66ecd74c1cf4c915c768a636f9227704a674"}, "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, "castore": {:hex, :castore, "1.0.1", "240b9edb4e9e94f8f56ab39d8d2d0a57f49e46c56aced8f873892df8ff64ff5a", [:mix], [], "hexpm", "b4951de93c224d44fac71614beabd88b71932d0b1dea80d2f80fb9044e01bbb3"}, - "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, "cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"}, @@ -24,7 +23,6 @@ "phoenix_live_view": {:hex, :phoenix_live_view, "0.18.18", "1f38fbd7c363723f19aad1a04b5490ff3a178e37daaf6999594d5f34796c47fc", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.6.15 or ~> 1.7.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.3", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a5810d0472f3189ede6d2a95bda7f31c6113156b91784a3426cb0ab6a6d85214"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.1", "ba04e489ef03763bf28a17eb2eaddc2c20c6d217e2150a61e3298b0f4c2012b5", [:mix], [], "hexpm", "81367c6d1eea5878ad726be80808eb5a787a23dee699f96e72b1109c57cdd8d9"}, "phoenix_template": {:hex, :phoenix_template, "1.0.0", "c57bc5044f25f007dc86ab21895688c098a9f846a8dda6bc40e2d0ddc146e38f", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "1b066f99a26fd22064c12b2600a9a6e56700f591bf7b20b418054ea38b4d4357"}, - "phoenix_view": {:hex, :phoenix_view, "2.0.2", "6bd4d2fd595ef80d33b439ede6a19326b78f0f1d8d62b9a318e3d9c1af351098", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}], "hexpm", "a929e7230ea5c7ee0e149ffcf44ce7cf7f4b6d2bfe1752dd7c084cdff152d36f"}, "plug": {:hex, :plug, "1.14.0", "ba4f558468f69cbd9f6b356d25443d0b796fbdc887e03fa89001384a9cac638f", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "bf020432c7d4feb7b3af16a0c2701455cbbbb95e5b6866132cb09eb0c29adc14"}, "plug_cowboy": {:hex, :plug_cowboy, "2.6.0", "d1cf12ff96a1ca4f52207c5271a6c351a4733f413803488d75b70ccf44aebec2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "073cf20b753ce6682ed72905cd62a2d4bd9bad1bf9f7feb02a1b8e525bd94fa6"}, "plug_crypto": {:hex, :plug_crypto, "1.2.5", "918772575e48e81e455818229bf719d4ab4181fcbf7f85b68a35620f78d89ced", [:mix], [], "hexpm", "26549a1d6345e2172eb1c233866756ae44a9609bd33ee6f99147ab3fd87fd842"},