Migrate WebSocket GenServer (Server) to Connection (#1585)

This commit is contained in:
Alexandre de Souza 2022-12-21 11:28:27 -03:00 committed by GitHub
parent 59cbba63b7
commit 3e11023925
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 558 additions and 232 deletions

View file

@ -2,22 +2,19 @@ defmodule Livebook.WebSocket do
@moduledoc false
alias Livebook.WebSocket.Client
alias LivebookProto.{Request, SessionRequest}
defmodule Connection do
defstruct [:conn, :websocket, :ref]
@type t :: %__MODULE__{
conn: Client.conn(),
websocket: Client.websocket(),
ref: Client.ref()
conn: Client.conn() | nil,
websocket: Client.websocket() | nil,
ref: Client.ref() | nil
}
end
@type proto :: SessionRequest.t()
@typep header :: {String.t(), String.t()}
@typep headers :: list(header())
@type proto :: LivebookProto.SessionRequest.t()
@typep headers :: Mint.Types.headers()
@doc """
Connects with the WebSocket server for given URL and headers.
@ -36,9 +33,15 @@ defmodule Livebook.WebSocket do
@doc """
Disconnects the given WebSocket client.
"""
@spec disconnect(Connection.t()) :: :ok
@spec disconnect(Connection.t()) :: {:ok, Connection.t()} | {:error, Connection.t(), any()}
def disconnect(%Connection{} = connection) do
Client.disconnect(connection.conn, connection.websocket, connection.ref)
case Client.disconnect(connection.conn, connection.websocket, connection.ref) do
{:ok, conn, websocket} ->
{:ok, %{connection | conn: conn, websocket: websocket, ref: nil}}
{:error, conn, websocket, reason} ->
{:error, %{connection | conn: conn, websocket: websocket}, reason}
end
end
@doc """
@ -47,29 +50,24 @@ defmodule Livebook.WebSocket do
@spec send_request(Connection.t(), proto()) ::
{:ok, Connection.t()}
| {:error, Connection.t(), Client.ws_error() | Client.mint_error()}
def send_request(%Connection{} = connection, %struct{} = data) do
type = LivebookProto.request_type(struct)
message = Request.new!(type: {type, data})
binary = {:binary, Request.encode(message)}
def send_request(%Connection{} = connection, data) do
frame = LivebookProto.build_request_frame(data)
case Client.send(connection.conn, connection.websocket, connection.ref, binary) do
case Client.send(connection.conn, connection.websocket, connection.ref, frame) do
{:ok, conn, websocket} ->
{:ok, %{connection | conn: conn, websocket: websocket}}
{:error, %Mint.WebSocket{} = websocket, reason} ->
{:error, %{connection | websocket: websocket}, reason}
{:error, conn, reason} ->
{:error, %{connection | conn: conn}, reason}
{:error, conn, websocket, reason} ->
{:error, %{connection | conn: conn, websocket: websocket}, reason}
end
end
@dialyzer {:nowarn_function, receive_response: 1}
@doc """
Receives a response from the given server.
"""
@spec receive_response(Connection.t()) :: Client.receive_fun()
@spec receive_response(Connection.t()) ::
{:ok, Connection.t(), Client.Response.t() | :connect}
| {:error, Connection.t(), Client.Response.t() | term()}
def receive_response(%Connection{conn: conn, websocket: websocket, ref: ref}) do
conn
|> Client.receive(ref, websocket)
@ -85,12 +83,51 @@ defmodule Livebook.WebSocket do
{:ok, %Connection{conn: conn, websocket: websocket, ref: ref}, result}
end
defp handle_receive({:error, conn, %Client.Response{body: nil, status: status}}, ref) do
{:error, %Connection{conn: conn, ref: ref}, Plug.Conn.Status.reason_phrase(status)}
defp handle_receive({:error, conn, websocket, %Client.Response{body: nil, status: status}}, ref) do
{:error, %Connection{conn: conn, websocket: websocket, ref: ref},
Plug.Conn.Status.reason_phrase(status)}
end
defp handle_receive({:error, conn, %Client.Response{body: response}}, ref) do
defp handle_receive({:error, conn, websocket, %Client.Response{body: response}}, ref) do
%{type: {:error, error}} = LivebookProto.Response.decode(response)
{:error, %Connection{conn: conn, ref: ref}, error}
{:error, %Connection{conn: conn, websocket: websocket, ref: ref}, error}
end
defp handle_receive({:error, reason}, ref) do
{:error, %Connection{ref: ref}, reason}
end
@doc """
Subscribe to WebSocket Server events.
## Messages
* `{:ok, pid, random_id, :connected}`
* `{:ok, pid, random_id, %Livebook.WebSocket.Response{}}`
* `{:error, pid, random_id, %Livebook.WebSocket.Response{}}`
* `{:error, pid, random_id, reason}`
"""
@spec subscribe() :: :ok | {:error, {:already_registered, pid()}}
def subscribe do
Phoenix.PubSub.subscribe(Livebook.PubSub, "websocket:clients")
end
@doc """
Unsubscribes from `subscribe/0`.
"""
@spec unsubscribe() :: :ok
def unsubscribe do
Phoenix.PubSub.unsubscribe(Livebook.PubSub, "websocket:clients")
end
@doc """
Notifies interested processes about WebSocket Server messages.
Broadcasts the given message under the `"websocket:clients"` topic.
"""
@spec broadcast_message(any()) :: :ok
def broadcast_message(message) do
Phoenix.PubSub.broadcast(Livebook.PubSub, "websocket:clients", message)
end
end

View file

@ -7,7 +7,7 @@ defmodule Livebook.WebSocket.Client do
@type conn :: Mint.HTTP.t()
@type websocket :: Mint.WebSocket.t()
@type frame :: :close | {:binary, binary()}
@type frame :: Mint.WebSocket.frame() | Mint.WebSocket.shorthand_frame()
@type ref :: Mint.Types.request_ref()
@type ws_error :: Mint.WebSocket.error()
@type mint_error :: Mint.Types.error()
@ -16,9 +16,9 @@ defmodule Livebook.WebSocket.Client do
defstruct [:body, :status, :headers]
@type t :: %__MODULE__{
body: Livebook.WebSocket.Response.t(),
status: Mint.Types.status(),
headers: Mint.Types.headers()
body: Livebook.WebSocket.Response.t() | nil,
status: Mint.Types.status() | nil,
headers: Mint.Types.headers() | nil
}
end
@ -53,15 +53,19 @@ defmodule Livebook.WebSocket.Client do
If there's no WebSocket connection yet, it'll only close the HTTP connection.
"""
@spec disconnect(conn(), websocket(), ref()) :: :ok
@spec disconnect(conn(), websocket() | nil, ref()) ::
{:ok, conn(), websocket() | nil}
| {:error, conn() | websocket(), term()}
def disconnect(conn, nil, _ref) do
{:ok, conn} = Mint.HTTP.close(conn)
{:ok, conn, nil}
end
def disconnect(conn, websocket, ref) do
if websocket do
send(conn, websocket, ref, :close)
with {:ok, conn, websocket} <- send(conn, websocket, ref, :close),
{:ok, conn} <- Mint.HTTP.close(conn) do
{:ok, conn, websocket}
end
Mint.HTTP.close(conn)
:ok
end
@doc """
@ -70,17 +74,24 @@ defmodule Livebook.WebSocket.Client do
If the WebSocket isn't connected yet, it will try to get the connection
response to start a new WebSocket connection.
"""
@spec receive(conn(), ref(), term()) ::
{:ok, conn(), Response.t() | :connect}
| {:error, conn(), Response.t()}
| {:error, conn(), :unknown}
@spec receive(conn() | nil, ref(), websocket() | nil, term()) ::
{:ok, conn(), websocket(), Response.t() | :connected}
| {:error, conn(), websocket(), Response.t()}
| {:error, conn(), websocket(), ws_error() | mint_error()}
| {:error, :not_connected | :unknown}
def receive(conn, ref, websocket \\ nil, message \\ receive(do: (message -> message))) do
do_receive(conn, ref, websocket, message)
end
defp do_receive(nil, _ref, _websocket, _message), do: {:error, :not_connected}
defp do_receive(conn, ref, websocket, message) do
case Mint.WebSocket.stream(conn, message) do
{:ok, conn, responses} ->
handle_responses(conn, ref, websocket, responses)
{:error, conn, reason, []} ->
{:error, conn, reason}
{:error, conn, websocket, reason}
{:error, conn, _reason, responses} ->
handle_responses(conn, ref, websocket, responses)
@ -92,24 +103,6 @@ defmodule Livebook.WebSocket.Client do
@successful_status 100..299
defp handle_responses(conn, ref, nil, responses) do
result =
Enum.reduce(responses, %Response{}, fn
{:status, ^ref, status}, acc -> %{acc | status: status}
{:headers, ^ref, headers}, acc -> %{acc | headers: headers}
{:data, ^ref, body}, acc -> %{acc | body: body}
{:done, ^ref}, acc -> handle_done_response(conn, ref, acc)
end)
case result do
%Response{} = response when response.status not in @successful_status ->
{:error, conn, response}
result ->
result
end
end
defp handle_responses(conn, ref, websocket, [{:data, ref, data}]) do
with {:ok, websocket, frames} <- Mint.WebSocket.decode(websocket, data) do
case handle_frames(%Response{}, frames) do
@ -120,8 +113,7 @@ defmodule Livebook.WebSocket.Client do
{:ok, conn, websocket, response}
{:close, result} ->
disconnect(conn, websocket, ref)
{:ok, conn, websocket, result}
handle_disconnect(conn, websocket, ref, result)
{:error, response} ->
{:error, conn, websocket, response}
@ -129,7 +121,25 @@ defmodule Livebook.WebSocket.Client do
end
end
defp handle_done_response(conn, ref, response) do
defp handle_responses(conn, ref, websocket, [_ | _] = responses) do
result =
Enum.reduce(responses, %Response{}, fn
{:status, ^ref, status}, acc -> %{acc | status: status}
{:headers, ^ref, headers}, acc -> %{acc | headers: headers}
{:data, ^ref, body}, acc -> %{acc | body: body}
{:done, ^ref}, acc -> handle_done_response(conn, ref, websocket, acc)
end)
case result do
%Response{} = response when response.status not in @successful_status ->
{:error, conn, websocket, response}
result ->
result
end
end
defp handle_done_response(conn, ref, websocket, response) do
case Mint.WebSocket.new(conn, ref, response.status, response.headers) do
{:ok, conn, websocket} ->
case decode_response(websocket, response) do
@ -137,15 +147,20 @@ defmodule Livebook.WebSocket.Client do
{:ok, conn, websocket, result}
{websocket, {:close, result}} ->
disconnect(conn, websocket, ref)
{:ok, conn, websocket, result}
handle_disconnect(conn, websocket, ref, result)
{websocket, {:error, reason}} ->
{:error, conn, websocket, reason}
end
{:error, conn, %UpgradeFailureError{status_code: status, headers: headers}} ->
{:error, conn, %{response | status: status, headers: headers}}
{:error, conn, websocket, %{response | status: status, headers: headers}}
end
end
defp handle_disconnect(conn, websocket, ref, result) do
with {:ok, conn, websocket} <- disconnect(conn, websocket, ref) do
{:ok, conn, websocket, result}
end
end
@ -173,18 +188,22 @@ defmodule Livebook.WebSocket.Client do
end)
end
@dialyzer {:nowarn_function, send: 4}
@doc """
Sends a message to the given HTTP Connection and WebSocket connection.
"""
@spec send(conn(), websocket(), ref(), frame()) ::
{:ok, conn(), websocket()}
| {:error, conn() | websocket(), term()}
| {:error, conn(), websocket(), term()}
def send(conn, websocket, ref, frame) when is_frame(frame) do
with {:ok, websocket, data} <- Mint.WebSocket.encode(websocket, frame),
{:ok, conn} <- Mint.WebSocket.stream_request_body(conn, ref, data) do
{:ok, conn, websocket}
else
{:error, %Mint.HTTP1{} = conn, reason} ->
{:error, conn, websocket, reason}
{:error, websocket, reason} ->
{:error, conn, websocket, reason}
end
end
end

View file

@ -1,115 +1,196 @@
defmodule Livebook.WebSocket.Server do
@moduledoc false
use GenServer
use Connection
require Logger
import Livebook.WebSocket.Client, only: [is_frame: 1]
alias Livebook.WebSocket
alias Livebook.WebSocket.Client
defstruct [
:conn,
:websocket,
:caller,
:status,
:resp_headers,
:resp_body,
:ref,
closing?: false
]
@timeout 10_000
@backoff 1_490
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts)
defstruct [:url, :headers, :http_conn, :websocket, :ref, id: 0]
@doc """
Starts a new WebSocket Server connection with given URL and headers.
"""
@spec start_link(String.t(), Mint.Types.headers()) ::
{:ok, pid()} | {:error, {:already_started, pid()}}
def start_link(url, headers \\ []) do
Connection.start_link(__MODULE__, {url, headers})
end
@doc """
Connects the WebSocket client.
Checks if the given WebSocket Server is connected.
"""
def connect(pid, url, headers \\ []) do
GenServer.call(pid, {:connect, url, headers})
@spec connected?(pid()) :: boolean()
def connected?(conn) do
Connection.call(conn, :connected?, @timeout)
end
@doc """
Disconnects the WebSocket client.
Closes the given WebSocket Server connection.
"""
def disconnect(pid) do
GenServer.cast(pid, :close)
@spec close(pid()) :: :ok
def close(conn) do
Connection.call(conn, :close, @timeout)
end
@doc """
Sends a message to the WebSocket server the message request.
Sends a Request to given WebSocket Server.
"""
def send_message(socket, frame) when is_frame(frame) do
GenServer.cast(socket, {:send_message, frame})
@spec send_request(pid(), WebSocket.proto()) :: :ok
def send_request(conn, %_struct{} = data) do
Connection.call(conn, {:request, data}, @timeout)
end
## Connection callbacks
@impl true
def init({url, headers}) do
state = struct!(__MODULE__, url: url, headers: headers)
{:connect, :init, state}
end
@impl true
def connect(_, state) do
case Client.connect(state.url, state.headers) do
{:ok, conn, ref} ->
{:ok, %{state | http_conn: conn, ref: ref}}
{:error, exception} when is_exception(exception) ->
Logger.error("Received exception: #{Exception.message(exception)}")
{:backoff, @backoff, state}
{:error, conn, reason} ->
Logger.error("Received error: #{inspect(reason)}")
{:backoff, @backoff, %{state | http_conn: conn}}
end
end
@dialyzer {:nowarn_function, disconnect: 2}
@impl true
def disconnect({:close, caller}, state) do
case Client.disconnect(state.http_conn, state.websocket, state.ref) do
{:ok, conn, websocket} ->
Connection.reply(caller, :ok)
{:noconnect, %{state | http_conn: conn, websocket: websocket}}
{:error, conn, websocket, reason} ->
Connection.reply(caller, {:error, reason})
{:noconnect, %{state | http_conn: conn, websocket: websocket}}
end
end
def disconnect(info, state) do
case info do
{:error, :closed} -> Logger.error("Connection closed")
{:error, reason} -> Logger.error("Connection error: #{inspect(reason)}")
end
case Client.disconnect(state.http_conn, state.websocket, state.ref) do
{:ok, conn, websocket} ->
{:connect, :reconnect, %{state | http_conn: conn, websocket: websocket}}
{:error, conn, websocket, reason} ->
Logger.error("Received error: #{inspect(reason)}")
{:connect, :reconnect, %{state | http_conn: conn, websocket: websocket}}
end
end
## GenServer callbacks
@impl true
def init(_) do
{:ok, %__MODULE__{}}
end
@impl true
def handle_call({:connect, url, headers}, from, state) do
case Client.connect(url, headers) do
{:ok, conn, ref} -> {:noreply, %{state | conn: conn, ref: ref, caller: from}}
{:error, _reason} = error -> {:reply, error, state}
{:error, conn, reason} -> {:reply, {:error, reason}, %{state | conn: conn}}
def handle_call(:connected?, _from, state) do
if conn = state.http_conn do
{:reply, conn.state == :open, state}
else
{:reply, false, state}
end
end
@impl true
def handle_cast(:close, state) do
Client.disconnect(state.conn, state.websocket, state.ref)
{:stop, :normal, state}
def handle_call(:close, caller, state) do
{:disconnect, {:close, caller}, state}
end
def handle_cast({:send_message, frame}, state) do
case Client.send(state.conn, state.websocket, state.ref, frame) do
def handle_call({:request, data}, caller, state) do
id = state.id
frame = LivebookProto.build_request_frame(data, id)
Connection.reply(caller, :ok)
case Client.send(state.http_conn, state.websocket, state.ref, frame) do
{:ok, conn, websocket} ->
{:noreply, %{state | conn: conn, websocket: websocket}}
{:noreply, %{state | http_conn: conn, websocket: websocket, id: id + 1}}
{:error, %Mint.WebSocket{} = websocket, _reason} ->
{:noreply, %{state | websocket: websocket}}
{:error, conn, _reason} ->
{:noreply, %{state | conn: conn}}
{:error, conn, websocket, reason} ->
WebSocket.broadcast_message({:error, self(), id, reason})
{:noreply, %{state | http_conn: conn, websocket: websocket}}
end
end
@impl true
def handle_info(message, state) do
case Client.receive(state.conn, state.ref, state.websocket, message) do
{:ok, conn, websocket, response} ->
state = %{state | conn: conn, websocket: websocket}
{:noreply, reply(state, {:ok, response})}
case Client.receive(state.http_conn, state.ref, state.websocket, message) do
{:ok, conn, websocket, :connected} ->
{:ok, :connected}
|> build_response(state)
|> WebSocket.broadcast_message()
{:error, conn, websocket, response} ->
state = %{state | conn: conn, websocket: websocket}
{:noreply, reply(state, {:error, response})}
{:noreply, %{state | http_conn: conn, websocket: websocket}}
{:error, conn, response} ->
state = %{state | conn: conn}
{:noreply, reply(state, {:error, response})}
{:error, conn, websocket, %Mint.TransportError{} = reason} ->
{:error, reason}
|> build_response(state)
|> WebSocket.broadcast_message()
{:connect, :receive, %{state | http_conn: conn, websocket: websocket}}
{term, conn, websocket, data} ->
{term, data}
|> build_response(state)
|> WebSocket.broadcast_message()
{:noreply, %{state | http_conn: conn, websocket: websocket}}
{:error, _} = error ->
error
|> build_response(state)
|> WebSocket.broadcast_message()
{:error, _} ->
{:noreply, state}
end
end
# Private
defp reply(%{caller: nil} = state, response) do
Logger.warning("The caller is nil, so we can't reply the message: #{inspect(response)}")
state
defp build_response({:ok, :connected}, state) do
{:ok, self(), state.id, :connected}
end
defp reply(state, response) do
GenServer.reply(state.caller, response)
defp build_response({:ok, %Client.Response{body: nil, status: nil, headers: nil}}, state) do
{:ok, self(), state.id, :pong}
end
state
defp build_response({:error, %Client.Response{body: nil, status: status} = response}, state) do
response = %{response | body: Plug.Conn.Status.reason_phrase(status)}
{:error, self(), state.id, response}
end
defp build_response({:ok, %Client.Response{body: body} = response}, _state) do
case LivebookProto.Response.decode(body) do
%{id: id, type: {:error, _} = error} -> {:error, self(), id, %{response | body: error}}
%{id: id, type: result} -> {:ok, self(), id, result}
end
end
defp build_response({:error, %Client.Response{body: body} = response}, _state) do
%{id: id, type: {:error, _} = error} = LivebookProto.Response.decode(body)
{:error, self(), id, %{response | body: error}}
end
defp build_response({:error, reason}, state) do
{:error, self(), state.id, %Client.Response{body: reason}}
end
end

View file

@ -106,7 +106,8 @@ defmodule Livebook.MixProject do
{:protobuf, "~> 0.8.0"},
{:phoenix_live_reload, "~> 1.2", only: :dev},
{:floki, ">= 0.27.0", only: :test},
{:bypass, "~> 2.1", only: :test}
{:bypass, "~> 2.1", only: :test},
{:connection, "~> 1.1.0"}
]
end

View file

@ -2,6 +2,7 @@
"aws_signature": {:hex, :aws_signature, "0.3.1", "67f369094cbd55ffa2bbd8cc713ede14b195fcfb45c86665cd7c5ad010276148", [:rebar3], [], "hexpm", "50fc4dc1d1f7c2d0a8c63f455b3c66ecd74c1cf4c915c768a636f9227704a674"},
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
"castore": {:hex, :castore, "0.1.18", "deb5b9ab02400561b6f5708f3e7660fc35ca2d51bfc6a940d2f513f89c2975fc", [:mix], [], "hexpm", "61bbaf6452b782ef80b33cdb45701afbcf0a918a45ebe7e73f1130d661e66a06"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"},

View file

@ -1,10 +1,19 @@
defmodule LivebookProto do
@moduledoc false
@mapping (for {_id, field_prop} <- LivebookProto.Request.__message_props__().field_props,
alias LivebookProto.Request
@mapping (for {_id, field_prop} <- Request.__message_props__().field_props,
into: %{} do
{field_prop.type, field_prop.name_atom}
end)
def request_type(module), do: Map.fetch!(@mapping, module)
def build_request_frame(%struct{} = data, id \\ -1) do
type = request_type(struct)
message = Request.new!(id: id, type: {type, data})
{:binary, Request.encode(message)}
end
defp request_type(module), do: Map.fetch!(@mapping, module)
end

View file

@ -4,5 +4,6 @@ defmodule LivebookProto.Request do
oneof :type, 0
field :session, 1, type: LivebookProto.SessionRequest, oneof: 0
field :id, 1, type: :int32
field :session, 2, type: LivebookProto.SessionRequest, oneof: 0
end

View file

@ -4,6 +4,7 @@ defmodule LivebookProto.Response do
oneof :type, 0
field :error, 1, type: LivebookProto.Error, oneof: 0
field :session, 2, type: LivebookProto.SessionResponse, oneof: 0
field :id, 1, type: :int32
field :error, 2, type: LivebookProto.Error, oneof: 0
field :session, 3, type: LivebookProto.SessionResponse, oneof: 0
end

View file

@ -19,15 +19,19 @@ message SessionResponse {
}
message Request {
int32 id = 1;
oneof type {
SessionRequest session = 1;
SessionRequest session = 2;
}
}
message Response {
oneof type {
Error error = 1;
int32 id = 1;
SessionResponse session = 2;
oneof type {
Error error = 2;
SessionResponse session = 3;
}
}

View file

@ -1,7 +1,10 @@
defmodule Livebook.WebSocket.ClientTest do
use Livebook.EnterpriseIntegrationCase, async: true
@app_version Mix.Project.config()[:version]
alias Livebook.WebSocket.Client
alias LivebookProto.Request
describe "connect/2" do
test "successfully authenticates the websocket connection", %{url: url, token: token} do
@ -9,7 +12,7 @@ defmodule Livebook.WebSocket.ClientTest do
assert {:ok, conn, ref} = Client.connect(url, headers)
assert {:ok, conn, websocket, :connected} = Client.receive(conn, ref)
assert Client.disconnect(conn, websocket, ref) == :ok
assert {:ok, _conn, _websocket} = Client.disconnect(conn, websocket, ref)
end
test "rejects the websocket with invalid address", %{token: token} do
@ -23,7 +26,7 @@ defmodule Livebook.WebSocket.ClientTest do
headers = [{"X-Auth-Token", "foo"}]
assert {:ok, conn, ref} = Client.connect(url, headers)
assert {:error, _conn, response} = Client.receive(conn, ref)
assert {:error, _conn, nil, response} = Client.receive(conn, ref)
assert response.status == 403
@ -31,7 +34,7 @@ defmodule Livebook.WebSocket.ClientTest do
assert error =~ "the given token is invalid"
assert {:ok, conn, ref} = Client.connect(url)
assert {:error, _conn, response} = Client.receive(conn, ref)
assert {:error, _conn, nil, response} = Client.receive(conn, ref)
assert response.status == 401
@ -39,4 +42,36 @@ defmodule Livebook.WebSocket.ClientTest do
assert error =~ "could not get the token from the connection"
end
end
describe "send/2" do
setup %{url: url, token: token} do
headers = [{"X-Auth-Token", token}]
{:ok, conn, ref} = Client.connect(url, headers)
{:ok, conn, websocket, :connected} = Client.receive(conn, ref)
on_exit(fn -> Client.disconnect(conn, websocket, ref) end)
{:ok, conn: conn, websocket: websocket, ref: ref}
end
test "successfully sends a session message", %{
conn: conn,
websocket: websocket,
ref: ref,
user: %{id: id, email: email}
} do
session_request = LivebookProto.SessionRequest.new!(app_version: @app_version)
request = Request.new!(type: {:session, session_request})
frame = {:binary, Request.encode(request)}
assert {:ok, conn, websocket} = Client.send(conn, websocket, ref, frame)
assert {:ok, ^conn, ^websocket, %Client.Response{body: body}} =
Client.receive(conn, ref, websocket)
assert %{type: result} = LivebookProto.Response.decode(body)
assert {:session, %{id: _, user: %{id: ^id, email: ^email}}} = result
end
end
end

View file

@ -1,47 +1,132 @@
defmodule Livebook.WebSocket.ServerTest do
use Livebook.EnterpriseIntegrationCase, async: true
alias Livebook.WebSocket.Server
@app_version Mix.Project.config()[:version]
@moduletag :capture_log
describe "connect/2" do
alias Livebook.WebSocket.Server
alias Livebook.WebSocket.Client.Response
setup do
Livebook.WebSocket.subscribe()
:ok
end
describe "connect" do
test "successfully authenticates the websocket connection", %{url: url, token: token} do
headers = [{"X-Auth-Token", token}]
assert {:ok, pid} = Server.start_link()
assert {:ok, :connected} = Server.connect(pid, url, headers)
assert Server.disconnect(pid) == :ok
assert {:ok, conn} = Server.start_link(url, headers)
assert_receive {:ok, ^conn, _id, :connected}
assert Server.connected?(conn)
end
test "rejects the websocket with invalid address", %{token: token} do
headers = [{"X-Auth-Token", token}]
assert {:ok, pid} = Server.start_link()
assert {:error, %Mint.TransportError{reason: :econnrefused}} =
Server.connect(pid, "http://localhost:9999", headers)
assert Server.disconnect(pid) == :ok
assert {:ok, conn} = Server.start_link("http://localhost:9999", headers)
refute Server.connected?(conn)
end
test "rejects the websocket connection with invalid credentials", %{url: url} do
headers = [{"X-Auth-Token", "foo"}]
assert {:ok, pid} = Server.start_link()
assert {:error, response} = Server.connect(pid, url, headers)
assert {:ok, conn} = Server.start_link(url, headers)
assert response.status == 403
assert_receive {:error, ^conn, -1, response}
assert %Response{body: {:error, %{details: error}}, status: 403} = response
assert %{type: {:error, %{details: error}}} = LivebookProto.Response.decode(response.body)
assert error =~ "the given token is invalid"
assert Server.close(conn) == :ok
assert {:error, response} = Server.connect(pid, url)
assert {:ok, conn} = Server.start_link(url)
assert response.status == 401
assert_receive {:error, ^conn, -1, response}
assert %Response{body: {:error, %{details: error}}, status: 401} = response
assert %{type: {:error, %{details: error}}} = LivebookProto.Response.decode(response.body)
assert error =~ "could not get the token from the connection"
assert Server.close(conn) == :ok
end
end
assert Server.disconnect(pid) == :ok
describe "send_request/2" do
setup %{url: url, token: token} do
headers = [{"X-Auth-Token", token}]
{:ok, conn} = Server.start_link(url, headers)
assert_receive {:ok, ^conn, _id, :connected}
{:ok, conn: conn}
end
test "successfully sends a session request", %{
conn: conn,
user: %{id: id, email: email}
} do
session_request = LivebookProto.SessionRequest.new!(app_version: @app_version)
assert Server.send_request(conn, session_request) == :ok
assert_receive {:ok, ^conn, 0, {:session, response}}
assert %{id: _, user: %{id: ^id, email: ^email}} = response
end
end
describe "reconnect event" do
setup %{test: name} do
suffix = Ecto.UUID.generate() |> :erlang.phash2() |> to_string()
app_port = Enum.random(1000..9000) |> to_string()
{:ok, _} =
EnterpriseServer.start(name,
env: %{"ENTERPRISE_DB_SUFFIX" => suffix},
app_port: app_port
)
url = EnterpriseServer.url(name)
token = EnterpriseServer.token(name)
headers = [{"X-Auth-Token", token}]
assert {:ok, conn} = Server.start_link(url, headers)
assert_receive {:ok, ^conn, _id, :connected}
assert Server.connected?(conn)
on_exit(fn ->
EnterpriseServer.disconnect(name)
EnterpriseServer.drop_database(name)
end)
{:ok, conn: conn}
end
test "receives the disconnect message from websocket server", %{conn: conn, test: name} do
EnterpriseServer.disconnect(name)
assert_receive {:error, ^conn, 0, %Response{body: reason}}
assert %Mint.TransportError{reason: :closed} = reason
assert Process.alive?(conn)
refute Server.connected?(conn)
end
test "reconnects after websocket server is up", %{conn: conn, test: name} do
EnterpriseServer.disconnect(name)
assert_receive {:error, ^conn, 0, %Response{body: reason}}
assert %Mint.TransportError{reason: :closed} = reason
Process.sleep(1000)
refute Server.connected?(conn)
# Wait until the server is up again
assert EnterpriseServer.reconnect(name) == :ok
assert_receive {:ok, ^conn, 0, :connected}, 5000
assert Server.connected?(conn)
assert Server.close(conn) == :ok
refute Server.connected?(conn)
end
end
end

View file

@ -10,7 +10,7 @@ defmodule Livebook.WebSocketTest do
headers = [{"X-Auth-Token", token}]
assert {:ok, connection, :connected} = WebSocket.connect(url, headers)
assert WebSocket.disconnect(connection) == :ok
assert {:ok, _connection} = WebSocket.disconnect(connection)
end
test "rejects the web socket connection with invalid credentials", %{url: url} do
@ -18,33 +18,36 @@ defmodule Livebook.WebSocketTest do
assert {:error, connection, %{details: error}} = WebSocket.connect(url, headers)
assert error =~ "the given token is invalid"
assert WebSocket.disconnect(connection) == :ok
assert {:ok, _connection} = WebSocket.disconnect(connection)
assert {:error, connection, %{details: error}} = WebSocket.connect(url)
assert error =~ "could not get the token from the connection"
assert WebSocket.disconnect(connection) == :ok
assert {:ok, _connection} = WebSocket.disconnect(connection)
end
end
describe "send_request/2" do
test "receives the session response from server", %{url: url, token: token, user: user} do
setup %{url: url, token: token} do
headers = [{"X-Auth-Token", token}]
assert {:ok, %WebSocket.Connection{} = connection, :connected} =
WebSocket.connect(url, headers)
{:ok, %WebSocket.Connection{} = connection, :connected} = WebSocket.connect(url, headers)
on_exit(fn -> WebSocket.disconnect(connection) end)
{:ok, connection: connection}
end
test "successfully sends a session message", %{
connection: connection,
user: %{id: id, email: email}
} do
session_request = LivebookProto.SessionRequest.new!(app_version: @app_version)
assert {:ok, %WebSocket.Connection{} = connection} =
WebSocket.send_request(connection, session_request)
assert {:ok, connection, {:session, session_response}} =
WebSocket.receive_response(connection)
assert WebSocket.disconnect(connection) == :ok
assert session_response.user.id == user.id
assert session_response.user.email == user.email
assert {:ok, ^connection, response} = WebSocket.receive_response(connection)
assert {:session, %{id: _, user: %{id: ^id, email: ^email}}} = response
end
end
end

View file

@ -2,53 +2,100 @@ defmodule Livebook.EnterpriseServer do
@moduledoc false
use GenServer
defstruct [:token, :user, :node, :port]
defstruct [:token, :user, :node, :port, :app_port, :url, :env]
@name __MODULE__
@timeout 10_000
def start do
GenServer.start(__MODULE__, [], name: @name)
def start(name \\ @name, opts \\ []) do
GenServer.start(__MODULE__, opts, name: name)
end
def url do
"http://localhost:#{app_port()}"
def url(name \\ @name) do
GenServer.call(name, :fetch_url, @timeout)
end
def token do
GenServer.call(@name, :fetch_token)
def token(name \\ @name) do
GenServer.call(name, :fetch_token, @timeout)
end
def user do
GenServer.call(@name, :fetch_user)
def user(name \\ @name) do
GenServer.call(name, :fetch_user, @timeout)
end
def drop_database(name \\ @name) do
GenServer.cast(name, :drop_database)
end
def reconnect(name \\ @name) do
GenServer.cast(name, :reconnect)
end
def disconnect(name \\ @name) do
GenServer.cast(name, :disconnect)
end
# GenServer Callbacks
@impl true
def init(_opts) do
state = %__MODULE__{node: enterprise_node()}
{:ok, state, {:continue, :start_enterprise}}
def init(opts) do
state = struct(__MODULE__, opts)
{:ok, %{state | node: enterprise_node()}, {:continue, :start_enterprise}}
end
@impl true
def handle_continue(:start_enterprise, state) do
ensure_app_dir!()
prepare_database(state)
{:noreply, %{state | port: start_enterprise(state)}}
end
@impl true
def handle_call(:fetch_token, _from, state) do
state = if _ = state.token, do: state, else: create_enterprise_token(state)
state = if state.token, do: state, else: create_enterprise_token(state)
{:reply, state.token, state}
end
@impl true
def handle_call(:fetch_user, _from, state) do
state = if _ = state.user, do: state, else: create_enterprise_user(state)
state = if state.user, do: state, else: create_enterprise_user(state)
{:reply, state.user, state}
end
@impl true
def handle_call(:fetch_url, _from, state) do
state = if state.app_port, do: state, else: %{state | app_port: app_port()}
url = state.url || fetch_url(state)
{:reply, url, %{state | url: url}}
end
@impl true
def handle_cast(:drop_database, state) do
:ok = mix(state, ["ecto.drop", "--quiet"])
{:noreply, state}
end
def handle_cast(:reconnect, state) do
if state.port do
{:noreply, state}
else
{:noreply, %{state | port: start_enterprise(state)}}
end
end
def handle_cast(:disconnect, state) do
if state.port do
Port.close(state.port)
end
{:noreply, %{state | port: nil}}
end
# Port Callbacks
@impl true
@ -85,14 +132,10 @@ defmodule Livebook.EnterpriseServer do
end
defp start_enterprise(state) do
ensure_app_dir!()
prepare_database()
env = [
{~c"MIX_ENV", ~c"livebook"},
{~c"LIVEBOOK_ENTERPRISE_PORT", String.to_charlist(app_port())},
{~c"LIVEBOOK_ENTERPRISE_DEBUG", String.to_charlist(debug?())}
]
env =
for {key, value} <- env(state), into: [] do
{String.to_charlist(key), String.to_charlist(value)}
end
args = [
"-e",
@ -118,13 +161,18 @@ defmodule Livebook.EnterpriseServer do
args: args
])
wait_on_start(port)
wait_on_start(state, port)
end
defp prepare_database do
mix(["ecto.drop", "--quiet"])
mix(["ecto.create", "--quiet"])
mix(["ecto.migrate", "--quiet"])
defp fetch_url(state) do
port = state.app_port || app_port()
"http://localhost:#{port}"
end
defp prepare_database(state) do
:ok = mix(state, ["ecto.drop", "--quiet"])
:ok = mix(state, ["ecto.create", "--quiet"])
:ok = mix(state, ["ecto.migrate", "--quiet"])
end
defp ensure_app_dir! do
@ -148,51 +196,52 @@ defmodule Livebook.EnterpriseServer do
System.get_env("ENTERPRISE_PORT", "4043")
end
defp debug? do
defp debug do
System.get_env("ENTERPRISE_DEBUG", "false")
end
defp wait_on_start(port) do
case :httpc.request(:get, {~c"#{url()}/public/health", []}, [], []) do
defp wait_on_start(state, port) do
url = state.url || fetch_url(state)
case :httpc.request(:get, {~c"#{url}/public/health", []}, [], []) do
{:ok, _} ->
port
{:error, _} ->
Process.sleep(10)
wait_on_start(port)
wait_on_start(state, port)
end
end
defp mix(args, opts \\ []) do
env = [
{"MIX_ENV", "livebook"},
{"LIVEBOOK_ENTERPRISE_PORT", app_port()},
{"LIVEBOOK_ENTERPRISE_DEBUG", debug?()}
defp mix(state, args) do
cmd_opts = [
stderr_to_stdout: true,
env: env(state),
cd: app_dir(),
into: IO.stream(:stdio, :line)
]
cmd_opts = [stderr_to_stdout: true, env: env, cd: app_dir()]
args = ["--erl", "-elixir ansi_enabled true", "-S", "mix" | args]
cmd_opts =
if opts[:with_return],
do: cmd_opts,
else: Keyword.put(cmd_opts, :into, IO.stream(:stdio, :line))
case System.cmd(elixir_executable(), args, cmd_opts) do
{_, 0} -> :ok
_ -> :error
end
end
if opts[:with_return] do
case System.cmd(elixir_executable(), args, cmd_opts) do
{result, 0} ->
result
defp env(state) do
app_port = state.app_port || app_port()
{message, status} ->
error("""
env = %{
"MIX_ENV" => "livebook",
"LIVEBOOK_ENTERPRISE_PORT" => to_string(app_port),
"LIVEBOOK_ENTERPRISE_DEBUG" => debug()
}
#{message}\
""")
System.halt(status)
end
if state.env do
Map.merge(env, state.env)
else
{_, 0} = System.cmd(elixir_executable(), args, cmd_opts)
env
end
end