mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-10-08 20:46:16 +08:00
Fix teams websocket connection not reconnecting after a graceful close (#2924)
This commit is contained in:
parent
b1843c08f2
commit
856b46d001
2 changed files with 38 additions and 21 deletions
|
@ -45,20 +45,17 @@ defmodule Livebook.Teams.Connection do
|
||||||
send(data.listener, :connected)
|
send(data.listener, :connected)
|
||||||
send(self(), {:loop_ping, ref})
|
send(self(), {:loop_ping, ref})
|
||||||
Logger.info("Teams WebSocket connection - established")
|
Logger.info("Teams WebSocket connection - established")
|
||||||
|
|
||||||
{:keep_state, %{data | http_conn: conn, ref: ref, websocket: websocket}}
|
{:keep_state, %{data | http_conn: conn, ref: ref, websocket: websocket}}
|
||||||
|
|
||||||
{:transport_error, reason} ->
|
{:transport_error, reason} ->
|
||||||
send(data.listener, {:connection_error, reason})
|
send(data.listener, {:connection_error, reason})
|
||||||
Logger.warning("Teams WebSocket connection - transport error: #{inspect(reason)}")
|
Logger.warning("Teams WebSocket connection - transport error: #{inspect(reason)}")
|
||||||
|
|
||||||
{:keep_state_and_data, {{:timeout, :backoff}, @backoff, nil}}
|
{:keep_state_and_data, {{:timeout, :backoff}, @backoff, nil}}
|
||||||
|
|
||||||
{:server_error, error} ->
|
{:server_error, error} ->
|
||||||
reason = LivebookProto.Error.decode(error).details
|
reason = LivebookProto.Error.decode(error).details
|
||||||
send(data.listener, {:server_error, reason})
|
send(data.listener, {:server_error, reason})
|
||||||
Logger.warning("Teams WebSocket connection - server error : #{inspect(reason)}")
|
Logger.warning("Teams WebSocket connection - server error: #{inspect(reason)}")
|
||||||
|
|
||||||
{:keep_state, data}
|
{:keep_state, data}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -73,9 +70,12 @@ defmodule Livebook.Teams.Connection do
|
||||||
Process.send_after(self(), {:loop_ping, data.ref}, @loop_ping_delay)
|
Process.send_after(self(), {:loop_ping, data.ref}, @loop_ping_delay)
|
||||||
{:keep_state, %{data | http_conn: conn, websocket: websocket}}
|
{:keep_state, %{data | http_conn: conn, websocket: websocket}}
|
||||||
|
|
||||||
{:error, conn, websocket, _reason} ->
|
{:error, conn, websocket, reason} ->
|
||||||
Logger.warning("Teams WebSocket connection - ping error")
|
data = %{data | http_conn: conn, websocket: websocket}
|
||||||
{:keep_state, %{data | http_conn: conn, websocket: websocket}}
|
send(data.listener, {:connection_error, reason})
|
||||||
|
Logger.warning("Teams WebSocket connection - ping error: #{inspect(reason)}")
|
||||||
|
ensure_closed(data)
|
||||||
|
{:keep_state, data, {:next_event, :internal, :connect}}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -106,7 +106,8 @@ defmodule Livebook.Teams.Connection do
|
||||||
data = %{data | http_conn: conn, websocket: websocket}
|
data = %{data | http_conn: conn, websocket: websocket}
|
||||||
send(data.listener, {:connection_error, reason})
|
send(data.listener, {:connection_error, reason})
|
||||||
:gen_statem.reply(from, {:error, reason})
|
:gen_statem.reply(from, {:error, reason})
|
||||||
|
Logger.warning("Teams WebSocket connection - send error: #{inspect(reason)}")
|
||||||
|
ensure_closed(data)
|
||||||
{:keep_state, data, {:next_event, :internal, :connect}}
|
{:keep_state, data, {:next_event, :internal, :connect}}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -117,19 +118,32 @@ defmodule Livebook.Teams.Connection do
|
||||||
case WebSocket.receive(data.http_conn, data.ref, data.websocket, message) do
|
case WebSocket.receive(data.http_conn, data.ref, data.websocket, message) do
|
||||||
{:ok, conn, websocket, binaries} ->
|
{:ok, conn, websocket, binaries} ->
|
||||||
data = %{data | http_conn: conn, websocket: websocket}
|
data = %{data | http_conn: conn, websocket: websocket}
|
||||||
|
handle_messages(data, binaries)
|
||||||
for binary <- binaries do
|
|
||||||
%{type: {topic, message}} = LivebookProto.Event.decode(binary)
|
|
||||||
send(data.listener, {:event, topic, message})
|
|
||||||
end
|
|
||||||
|
|
||||||
{:keep_state, data}
|
{:keep_state, data}
|
||||||
|
|
||||||
|
{:closed, conn, websocket, binaries} ->
|
||||||
|
handle_messages(data, binaries)
|
||||||
|
data = %{data | http_conn: conn, websocket: websocket}
|
||||||
|
Logger.warning("Teams WebSocket connection - closed")
|
||||||
|
{:keep_state, data, {:next_event, :internal, :connect}}
|
||||||
|
|
||||||
{:error, conn, websocket, reason} ->
|
{:error, conn, websocket, reason} ->
|
||||||
send(data.listener, {:connection_error, reason})
|
send(data.listener, {:connection_error, reason})
|
||||||
data = %{data | http_conn: conn, websocket: websocket}
|
data = %{data | http_conn: conn, websocket: websocket}
|
||||||
|
Logger.warning("Teams WebSocket connection - receive error: #{inspect(reason)}")
|
||||||
|
ensure_closed(data)
|
||||||
{:keep_state, data, {:next_event, :internal, :connect}}
|
{:keep_state, data, {:next_event, :internal, :connect}}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp handle_messages(data, binaries) do
|
||||||
|
for binary <- binaries do
|
||||||
|
%{type: {topic, message}} = LivebookProto.Event.decode(binary)
|
||||||
|
send(data.listener, {:event, topic, message})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp ensure_closed(data) do
|
||||||
|
_ = WebSocket.disconnect(data.http_conn, data.websocket, data.ref)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -144,6 +144,7 @@ defmodule Livebook.Teams.WebSocket do
|
||||||
"""
|
"""
|
||||||
@spec receive(conn(), ref(), websocket(), term()) ::
|
@spec receive(conn(), ref(), websocket(), term()) ::
|
||||||
{:ok, conn(), websocket(), list(binary())}
|
{:ok, conn(), websocket(), list(binary())}
|
||||||
|
| {:closed, conn(), websocket(), list(binary())}
|
||||||
| {:error, conn(), websocket(), String.t()}
|
| {:error, conn(), websocket(), String.t()}
|
||||||
def receive(conn, ref, websocket, message \\ receive(do: (message -> message))) do
|
def receive(conn, ref, websocket, message \\ receive(do: (message -> message))) do
|
||||||
with {:ok, conn, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(conn, message),
|
with {:ok, conn, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(conn, message),
|
||||||
|
@ -152,19 +153,21 @@ defmodule Livebook.Teams.WebSocket do
|
||||||
{:ok, conn, websocket, response}
|
{:ok, conn, websocket, response}
|
||||||
else
|
else
|
||||||
{:close, response} ->
|
{:close, response} ->
|
||||||
handle_disconnect(conn, websocket, ref, response)
|
with {:ok, conn, websocket} <- disconnect(conn, websocket, ref) do
|
||||||
|
{:closed, conn, websocket, response}
|
||||||
|
end
|
||||||
|
|
||||||
{:error, conn, exception} when is_exception(exception) ->
|
{:error, conn, exception} when is_exception(exception) ->
|
||||||
{:error, conn, websocket, Exception.message(exception)}
|
{:error, conn, websocket, Exception.message(exception)}
|
||||||
|
|
||||||
{:error, conn, exception, []} when is_exception(exception) ->
|
{:error, conn, exception, []} when is_exception(exception) ->
|
||||||
{:error, conn, websocket, Exception.message(exception)}
|
{:error, conn, websocket, Exception.message(exception)}
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp handle_disconnect(conn, websocket, ref, response) do
|
:unknown ->
|
||||||
with {:ok, conn, websocket} <- disconnect(conn, websocket, ref) do
|
# Message does not belong to this socket. For example, this
|
||||||
{:ok, conn, websocket, response}
|
# can be a leftover :tcp_close or :ssl_close from a previously
|
||||||
|
# gracefully closed socket.
|
||||||
|
{:ok, conn, websocket, []}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue