Replace "Connection" with gen_statem (#1865)

This commit is contained in:
Andrea Leopardi 2023-04-15 10:47:33 +02:00 committed by GitHub
parent 7d24982b57
commit 47fe94f560
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 75 deletions

View file

@ -1,6 +1,7 @@
defmodule Livebook.WebSocket.ClientConnection do
@moduledoc false
use Connection
@behaviour :gen_statem
require Logger
@ -9,15 +10,17 @@ defmodule Livebook.WebSocket.ClientConnection do
@timeout 10_000
@backoff 5_000
@no_state :no_state
@loop_ping_delay 5_000
defstruct [:url, :listener, :headers, :http_conn, :websocket, :ref, id: 0, reply: %{}]
@doc """
Starts a new WebSocket connection with given URL and headers.
"""
@spec start_link(pid(), String.t(), Mint.Types.headers()) :: GenServer.on_start()
@spec start_link(pid(), String.t(), Mint.Types.headers()) :: :gen_statem.start_ret()
def start_link(listener, url, headers \\ []) do
Connection.start_link(__MODULE__, {listener, url, headers})
:gen_statem.start_link(__MODULE__, {listener, url, headers}, [])
end
@doc """
@ -25,112 +28,109 @@ defmodule Livebook.WebSocket.ClientConnection do
"""
@spec send_request(pid(), WebSocket.proto()) :: {atom(), term()}
def send_request(conn, %_struct{} = data) do
Connection.call(conn, {:request, data}, @timeout)
:gen_statem.call(conn, {:request, data}, @timeout)
end
## Connection callbacks
## gen_statem callbacks
@impl true
def callback_mode, do: :handle_event_function
@impl true
def init({listener, url, headers}) do
state = %__MODULE__{listener: listener, url: url, headers: headers}
{:connect, :init, state}
data = %__MODULE__{listener: listener, url: url, headers: headers}
{:ok, @no_state, data, {:next_event, :internal, :connect}}
end
@impl true
def connect(_, state) do
case Client.connect(state.url, state.headers) do
def handle_event(event_type, event_data, state, data)
def handle_event(:internal, :connect, @no_state, %__MODULE__{} = data) do
case Client.connect(data.url, data.headers) do
{:ok, conn, websocket, ref} ->
send(state.listener, {:connect, :ok, :connected})
send(data.listener, {:connect, :ok, :connected})
send(self(), {:loop_ping, ref})
{:ok, %{state | http_conn: conn, ref: ref, websocket: websocket}}
{:ok, %__MODULE__{data | http_conn: conn, ref: ref, websocket: websocket}}
{:transport_error, reason} ->
send(state.listener, {:connect, :error, reason})
{:backoff, @backoff, state}
send(data.listener, {:connect, :error, reason})
{:keep_state_and_data, {{:timeout, :backoff}, @backoff, nil}}
{:server_error, binary} ->
{:response, %{type: {:error, error}}} = decode_response_or_event(binary)
send(state.listener, {:connect, :error, error.details})
{:backoff, @backoff, state}
send(data.listener, {:connect, :error, error.details})
{:keep_state_and_data, {{:timeout, :reconnect}, @backoff, nil}}
end
end
@dialyzer {:nowarn_function, disconnect: 2}
@impl true
def disconnect(info, state) do
case info do
{:close, from} -> Logger.debug("Received close from: #{inspect(from)}")
{:error, :closed} -> Logger.error("Connection closed")
{:error, reason} -> Logger.error("Connection error: #{inspect(reason)}")
end
{:connect, :reconnect, state}
def handle_event({:timeout, :reconnect}, nil, _state, _data) do
{:keep_state_and_data, {:next_event, :internal, :connect}}
end
## GenServer callbacks
def handle_event({:call, from}, {:request, request}, @no_state, %__MODULE__{id: id} = data) do
frame = LivebookProto.build_request_frame(request, id)
reply = Map.put(data.reply, id, from)
@impl true
def handle_call({:request, data}, caller, state) do
id = state.id
frame = LivebookProto.build_request_frame(data, id)
reply = Map.put(state.reply, id, caller)
case Client.send(state.http_conn, state.websocket, state.ref, frame) do
case Client.send(data.http_conn, data.websocket, data.ref, frame) do
{:ok, conn, websocket} ->
{:noreply, %{state | http_conn: conn, websocket: websocket, id: id + 1, reply: reply}}
data = %__MODULE__{data | http_conn: conn, websocket: websocket, id: id + 1, reply: reply}
{:keep_state, data}
{:error, conn, websocket, reason} ->
{:reply, {:error, reason}, %{state | http_conn: conn, websocket: websocket}}
data = %__MODULE__{data | http_conn: conn, websocket: websocket}
{:keep_state, data, {:reply, from, {:error, reason}}}
end
end
@loop_ping_delay 5_000
@impl true
def handle_info({:loop_ping, ref}, state) when ref == state.ref and is_reference(ref) do
case Client.send(state.http_conn, state.websocket, state.ref, :ping) do
def handle_event(:info, {:loop_ping, ref}, @no_state, %__MODULE__{} = data)
when ref == data.ref and is_reference(ref) do
case Client.send(data.http_conn, data.websocket, data.ref, :ping) do
{:ok, conn, websocket} ->
Process.send_after(self(), {:loop_ping, state.ref}, @loop_ping_delay)
{:noreply, %{state | http_conn: conn, websocket: websocket}}
Process.send_after(self(), {:loop_ping, data.ref}, @loop_ping_delay)
{:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}}
{:error, conn, websocket, _reason} ->
{:noreply, %{state | http_conn: conn, websocket: websocket}}
{:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}}
end
end
def handle_info({:loop_ping, _another_ref}, state), do: {:noreply, state}
def handle_event(:info, {:loop_ping, _another_ref}, @no_state, _data) do
:keep_state_and_data
end
def handle_info({:tcp_closed, _port} = message, state),
do: handle_websocket_message(message, state)
def handle_event(:info, {:tcp_closed, _port} = message, @no_state, %__MODULE__{} = data) do
handle_websocket_message(message, data)
end
def handle_info({:tcp, _port, _data} = message, state),
do: handle_websocket_message(message, state)
def handle_event(:info, {:tcp, _port, _data} = message, @no_state, %__MODULE__{} = data) do
handle_websocket_message(message, data)
end
def handle_info(_message, state), do: {:noreply, state}
def handle_event(:info, _message, @no_state, _data) do
:keep_state_and_data
end
# Private
def handle_websocket_message(message, state) do
case Client.receive(state.http_conn, state.ref, state.websocket, message) do
{:ok, conn, websocket, data} ->
state = %{state | http_conn: conn, websocket: websocket}
{:noreply, send_received(data, state)}
defp handle_websocket_message(message, %__MODULE__{} = data) do
case Client.receive(data.http_conn, data.ref, data.websocket, message) do
{:ok, conn, websocket, binaries} ->
data = %__MODULE__{data | http_conn: conn, websocket: websocket}
data = send_received(binaries, data)
{:keep_state, data}
{:server_error, conn, websocket, reason} ->
send(state.listener, {:connect, :error, reason})
{:connect, :receive, %{state | http_conn: conn, websocket: websocket}}
send(data.listener, {:connect, :error, reason})
data = %__MODULE__{data | http_conn: conn, websocket: websocket}
{:keep_state, data, {:next_event, :internal, :connect}}
end
end
defp send_received([], state), do: state
defp send_received([], data), do: data
defp send_received([_ | _] = binaries, state) do
for binary <- binaries, reduce: state do
defp send_received([_ | _] = binaries, data) do
for binary <- binaries, reduce: data do
acc ->
case decode_response_or_event(binary) do
{:response, %{id: -1, type: {:error, %{details: reason}}}} ->
@ -158,19 +158,19 @@ defmodule Livebook.WebSocket.ClientConnection do
end
end
defp reply_to_all(message, state) do
for {_id, caller} <- state.reply do
Connection.reply(caller, message)
defp reply_to_all(message, %__MODULE__{} = data) do
for {_id, caller} <- data.reply do
:gen_statem.reply(caller, message)
end
state
data
end
defp reply_to_id(id, message, state) do
{caller, reply} = Map.pop(state.reply, id)
if caller, do: Connection.reply(caller, message)
defp reply_to_id(id, message, %__MODULE__{} = data) do
{caller, reply} = Map.pop(data.reply, id)
if caller, do: :gen_statem.reply(caller, message)
%{state | reply: reply}
%__MODULE__{data | reply: reply}
end
defp decode_response_or_event(data) do

View file

@ -101,8 +101,7 @@ defmodule Livebook.MixProject do
{:protobuf, "~> 0.8.0"},
{:phoenix_live_reload, "~> 1.2", only: :dev},
{:floki, ">= 0.27.0", only: :test},
{:bypass, "~> 2.1", only: :test},
{:connection, "~> 1.1.0"}
{:bypass, "~> 2.1", only: :test}
]
end

View file

@ -2,7 +2,6 @@
"aws_signature": {:hex, :aws_signature, "0.3.1", "67f369094cbd55ffa2bbd8cc713ede14b195fcfb45c86665cd7c5ad010276148", [:rebar3], [], "hexpm", "50fc4dc1d1f7c2d0a8c63f455b3c66ecd74c1cf4c915c768a636f9227704a674"},
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
"castore": {:hex, :castore, "1.0.1", "240b9edb4e9e94f8f56ab39d8d2d0a57f49e46c56aced8f873892df8ff64ff5a", [:mix], [], "hexpm", "b4951de93c224d44fac71614beabd88b71932d0b1dea80d2f80fb9044e01bbb3"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"},
@ -24,7 +23,6 @@
"phoenix_live_view": {:hex, :phoenix_live_view, "0.18.18", "1f38fbd7c363723f19aad1a04b5490ff3a178e37daaf6999594d5f34796c47fc", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.6.15 or ~> 1.7.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.3", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a5810d0472f3189ede6d2a95bda7f31c6113156b91784a3426cb0ab6a6d85214"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.1", "ba04e489ef03763bf28a17eb2eaddc2c20c6d217e2150a61e3298b0f4c2012b5", [:mix], [], "hexpm", "81367c6d1eea5878ad726be80808eb5a787a23dee699f96e72b1109c57cdd8d9"},
"phoenix_template": {:hex, :phoenix_template, "1.0.0", "c57bc5044f25f007dc86ab21895688c098a9f846a8dda6bc40e2d0ddc146e38f", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "1b066f99a26fd22064c12b2600a9a6e56700f591bf7b20b418054ea38b4d4357"},
"phoenix_view": {:hex, :phoenix_view, "2.0.2", "6bd4d2fd595ef80d33b439ede6a19326b78f0f1d8d62b9a318e3d9c1af351098", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}], "hexpm", "a929e7230ea5c7ee0e149ffcf44ce7cf7f4b6d2bfe1752dd7c084cdff152d36f"},
"plug": {:hex, :plug, "1.14.0", "ba4f558468f69cbd9f6b356d25443d0b796fbdc887e03fa89001384a9cac638f", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "bf020432c7d4feb7b3af16a0c2701455cbbbb95e5b6866132cb09eb0c29adc14"},
"plug_cowboy": {:hex, :plug_cowboy, "2.6.0", "d1cf12ff96a1ca4f52207c5271a6c351a4733f413803488d75b70ccf44aebec2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "073cf20b753ce6682ed72905cd62a2d4bd9bad1bf9f7feb02a1b8e525bd94fa6"},
"plug_crypto": {:hex, :plug_crypto, "1.2.5", "918772575e48e81e455818229bf719d4ab4181fcbf7f85b68a35620f78d89ced", [:mix], [], "hexpm", "26549a1d6345e2172eb1c233866756ae44a9609bd33ee6f99147ab3fd87fd842"},