Implement event handling from WebSocket (#1608)

This commit is contained in:
Alexandre de Souza 2022-12-26 21:16:47 -03:00 committed by GitHub
parent 46163fef62
commit 9ff84f204f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 237 additions and 179 deletions

View file

@ -3,6 +3,7 @@ defmodule Livebook.Hubs.EnterpriseClient do
use GenServer
alias Livebook.Hubs.Enterprise
alias Livebook.Secrets.Secret
alias Livebook.WebSocket.Server
@pubsub_topic "enterprise"
@ -25,24 +26,15 @@ defmodule Livebook.Hubs.EnterpriseClient do
Server.send_request(GenServer.call(pid, :get_server), data)
end
@doc """
Disconnects the Enterprise client with WebSocket server.
"""
@spec disconnect(pid()) :: :ok
def disconnect(pid) do
GenServer.cast(pid, :disconnect)
end
@doc """
Subscribe to WebSocket Server events.
## Messages
* `{:unknown, :error, reason}`
* `{:connect, :ok, :waiting_upgrade | :connected}`
* `{:connect, :ok, :connected}`
* `{:connect, :error, reason}`
* `{:disconnect, :ok, :disconnected}`
* `{:disconnect, :error, reason}`
* `{:secret_created, %Secret{}}`
* `{:secret_updated, %Secret{}}`
"""
@spec subscribe() :: :ok | {:error, {:already_registered, pid()}}
@ -73,13 +65,6 @@ defmodule Livebook.Hubs.EnterpriseClient do
{:reply, state.server, state}
end
@impl true
def handle_cast(:disconnect, state) do
with :ok <- Server.close(state.server) do
{:noreply, state}
end
end
@impl true
def handle_info({:connect, _, _} = message, state) do
broadcast_message(message)
@ -91,6 +76,16 @@ defmodule Livebook.Hubs.EnterpriseClient do
{:noreply, state}
end
def handle_info({:event, :secret_created, %{name: name, value: value}}, state) do
broadcast_message({:secret_created, %Secret{name: name, value: value}})
{:noreply, state}
end
def handle_info({:event, :secret_updated, %{name: name, value: value}}, state) do
broadcast_message({:secret_updated, %Secret{name: name, value: value}})
{:noreply, state}
end
def handle_info({:disconnect, :ok, :disconnected}, state) do
{:stop, :normal, state}
end

View file

@ -1,27 +1,21 @@
defprotocol Livebook.Hubs.Provider do
@moduledoc false
@type t :: %{
required(:__struct__) => module(),
required(:id) => String.t(),
optional(any()) => any()
}
@doc """
Normalize given struct to `Livebook.Hubs.Metadata` struct.
"""
@spec normalize(t()) :: Livebook.Hubs.Metadata.t()
@spec normalize(struct()) :: Livebook.Hubs.Metadata.t()
def normalize(struct)
@doc """
Loads fields into given struct.
"""
@spec load(t(), map() | keyword()) :: t()
@spec load(struct(), map() | keyword()) :: struct()
def load(struct, fields)
@doc """
Gets the type from struct.
"""
@spec type(t()) :: String.t()
@spec type(struct()) :: String.t()
def type(struct)
end

View file

@ -21,22 +21,6 @@ defmodule Livebook.WebSocket.Server do
Connection.start_link(__MODULE__, {listener, url, headers})
end
@doc """
Checks if the given WebSocket Server is connected.
"""
@spec connected?(pid()) :: boolean()
def connected?(conn) do
Connection.call(conn, :connected?, @timeout)
end
@doc """
Closes the given WebSocket Server connection.
"""
@spec close(pid()) :: :ok
def close(conn) do
Connection.call(conn, :close, @timeout)
end
@doc """
Sends a Request to given WebSocket Server.
"""
@ -57,7 +41,6 @@ defmodule Livebook.WebSocket.Server do
def connect(_, state) do
case Client.connect(state.url, state.headers) do
{:ok, conn, ref} ->
send(state.listener, {:connect, :ok, :waiting_upgrade})
{:ok, %{state | http_conn: conn, ref: ref}}
{:error, exception} when is_exception(exception) ->
@ -77,55 +60,19 @@ defmodule Livebook.WebSocket.Server do
@dialyzer {:nowarn_function, disconnect: 2}
@impl true
def disconnect({:close, caller}, state) do
Connection.reply(caller, :ok)
case Client.disconnect(state.http_conn, state.websocket, state.ref) do
{:ok, conn, websocket} ->
send(state.listener, {:disconnect, :ok, :disconnected})
{:noconnect, %{state | http_conn: conn, websocket: websocket}}
{:error, conn, websocket, reason} ->
send(state.listener, {:disconnect, :error, reason})
{:noconnect, %{state | http_conn: conn, websocket: websocket}}
end
end
def disconnect(info, state) do
case info do
{:close, from} -> Logger.debug("Received close from: #{inspect(from)}")
{:error, :closed} -> Logger.error("Connection closed")
{:error, reason} -> Logger.error("Connection error: #{inspect(reason)}")
end
case Client.disconnect(state.http_conn, state.websocket, state.ref) do
{:ok, conn, websocket} ->
send(state.listener, {:disconnect, :ok, :disconnected})
{:connect, :reconnect, %{state | http_conn: conn, websocket: websocket}}
{:error, conn, websocket, reason} ->
Logger.error("Received error: #{inspect(reason)}")
send(state.listener, {:disconnect, :error, reason})
{:connect, :reconnect, %{state | http_conn: conn, websocket: websocket}}
end
{:connect, :reconnect, state}
end
## GenServer callbacks
@impl true
def handle_call(:connected?, _from, state) do
if conn = state.http_conn do
{:reply, conn.state == :open, state}
else
{:reply, false, state}
end
end
def handle_call(:close, caller, state) do
{:disconnect, {:close, caller}, state}
end
def handle_call({:request, data}, caller, state) do
id = state.id
frame = LivebookProto.build_request_frame(data, id)
@ -144,100 +91,98 @@ defmodule Livebook.WebSocket.Server do
def handle_info(message, state) do
case Client.receive(state.http_conn, state.ref, state.websocket, message) do
{:ok, conn, websocket, :connected} ->
send(state.listener, build_message({:ok, :connected}))
state = send_received({:ok, :connected}, state)
{:noreply, %{state | http_conn: conn, websocket: websocket}}
{:error, conn, websocket, %Mint.TransportError{} = reason} ->
send(state.listener, build_message({:error, reason}))
state = send_received({:error, reason}, state)
{:connect, :receive, %{state | http_conn: conn, websocket: websocket}}
{term, conn, websocket, data} ->
state =
{term, data}
|> build_message()
|> send_reply(state)
state = send_received({term, data}, state)
{:noreply, %{state | http_conn: conn, websocket: websocket}}
{:error, _} = error ->
send(state.listener, build_message(error))
{:noreply, state}
{:noreply, send_received(error, state)}
end
end
# Private
defp send_reply({:response, :error, reason}, state) do
defp send_received({:ok, :connected}, state) do
send(state.listener, {:connect, :ok, :connected})
state
end
defp send_received({:ok, %Client.Response{body: nil, status: nil}}, state), do: state
defp send_received({:ok, %Client.Response{body: body}}, state) do
case decode_response_or_event(body) do
{:response, %{id: -1, type: {:error, %{details: reason}}}} ->
reply_to_all({:error, reason}, state)
{:response, %{id: id, type: {:error, %{details: reason}}}} ->
reply_to_id(id, {:error, reason}, state)
{:response, %{id: id, type: result}} ->
reply_to_id(id, result, state)
{:event, %{type: {name, data}}} ->
send(state.listener, {:event, name, data})
state
end
end
defp send_received({:error, :unknown}, state), do: state
defp send_received({:error, %Mint.TransportError{} = reason}, state) do
send(state.listener, {:connect, :error, reason})
state
end
defp send_received({:error, %Client.Response{body: body, status: status}}, state)
when body != nil and status != nil do
%{type: {:error, %{details: reason}}} = LivebookProto.Response.decode(body)
send(state.listener, {:connect, :error, reason})
state
end
defp send_received({:error, %Client.Response{body: nil, status: status}}, state)
when status != nil do
reply_to_all({:error, Plug.Conn.Status.reason_phrase(status)}, state)
end
defp send_received({:error, %Client.Response{body: body, status: nil}}, state) do
case LivebookProto.Response.decode(body) do
%{id: -1, type: {:error, %{details: reason}}} -> reply_to_all({:error, reason}, state)
%{id: id, type: {:error, %{details: reason}}} -> reply_to_id(id, {:error, reason}, state)
end
end
defp reply_to_all(message, state) do
for {id, caller} <- state.reply, reduce: state do
acc ->
Connection.reply(caller, {:error, reason})
Connection.reply(caller, message)
%{acc | reply: Map.delete(acc.reply, id)}
end
end
defp send_reply({:response, id, result}, state) do
defp reply_to_id(id, message, state) do
if caller = state.reply[id] do
Connection.reply(caller, result)
Connection.reply(caller, message)
end
%{state | reply: Map.delete(state.reply, id)}
end
defp send_reply(message, state) do
send(state.listener, message)
state
end
defp build_message({:ok, :connected}) do
{:connect, :ok, :connected}
end
defp build_message({:ok, %Client.Response{body: nil, status: nil}}) do
:pong
end
defp build_message({:error, %Client.Response{body: nil, status: status}}) do
{:response, :error, Plug.Conn.Status.reason_phrase(status)}
end
defp build_message({:ok, %Client.Response{body: body}}) do
case LivebookProto.Response.decode(body) do
%{id: -1, type: {:error, %{details: reason}}} ->
{:response, :error, reason}
%{id: id, type: {:error, %{details: reason}}} ->
{:response, id, {:error, reason}}
%{id: id, type: result} ->
{:response, id, result}
defp decode_response_or_event(data) do
case LivebookProto.Response.decode(data) do
%{type: nil} -> {:event, LivebookProto.Event.decode(data)}
response -> {:response, response}
end
end
defp build_message({:error, %Client.Response{body: body} = response})
when response.status != nil do
%{type: {:error, %{details: reason}}} = LivebookProto.Response.decode(body)
{:connect, :error, reason}
end
defp build_message({:error, %Client.Response{body: body}}) do
case LivebookProto.Response.decode(body) do
%{id: -1, type: {:error, %{details: reason}}} ->
{:response, :error, reason}
%{id: id, type: {:error, %{details: reason}}} ->
{:response, id, {:error, reason}}
end
end
defp build_message({:error, %Mint.TransportError{} = reason}) do
{:connect, :error, reason}
end
defp build_message({:error, reason}) do
{:unknown, :error, reason}
end
end

View file

@ -119,7 +119,7 @@ defmodule LivebookWeb.Hub.New.EnterpriseComponent do
receive do
{:connect, :error, reason} ->
EnterpriseClient.disconnect(pid)
GenServer.stop(pid)
handle_error(reason, socket)
{:connect, :ok, :connected} ->
@ -134,7 +134,7 @@ defmodule LivebookWeb.Hub.New.EnterpriseComponent do
{:noreply, assign(socket, connected: true, changeset: changeset, base: base)}
{:error, reason} ->
EnterpriseClient.disconnect(pid)
GenServer.stop(pid)
handle_error(reason, socket)
end
end

View file

@ -0,0 +1,16 @@
defmodule LivebookProto.Event do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
oneof :type, 0
field :secret_created, 100,
type: LivebookProto.SecretCreated,
json_name: "secretCreated",
oneof: 0
field :secret_updated, 101,
type: LivebookProto.SecretUpdated,
json_name: "secretUpdated",
oneof: 0
end

View file

@ -0,0 +1,7 @@
defmodule LivebookProto.SecretCreated do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field :name, 1, type: :string
field :value, 2, type: :string
end

View file

@ -0,0 +1,7 @@
defmodule LivebookProto.SecretUpdated do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field :name, 1, type: :string
field :value, 2, type: :string
end

View file

@ -9,6 +9,16 @@ message Error {
string details = 1;
}
message SecretCreated {
string name = 1;
string value = 2;
}
message SecretUpdated {
string name = 1;
string value = 2;
}
message SessionRequest {
string app_version = 1;
}
@ -35,3 +45,10 @@ message Response {
SessionResponse session = 3;
}
}
message Event {
oneof type {
SecretCreated secret_created = 100;
SecretUpdated secret_updated = 101;
}
}

View file

@ -3,6 +3,7 @@ defmodule Livebook.Hubs.EnterpriseClientTest do
@moduletag :capture_log
alias Livebook.Hubs.EnterpriseClient
alias Livebook.Secrets.Secret
setup do
EnterpriseClient.subscribe()
@ -14,7 +15,6 @@ defmodule Livebook.Hubs.EnterpriseClientTest do
enterprise = build(:enterprise, url: url, token: token)
assert {:ok, _pid} = EnterpriseClient.start_link(enterprise)
assert_receive {:connect, :ok, :waiting_upgrade}
assert_receive {:connect, :ok, :connected}
end
@ -33,4 +33,38 @@ defmodule Livebook.Hubs.EnterpriseClientTest do
assert reason =~ "the given token is invalid"
end
end
describe "handle events" do
setup %{url: url, token: token} do
enterprise = build(:enterprise, url: url, token: token)
assert {:ok, _pid} = EnterpriseClient.start_link(enterprise)
assert_receive {:connect, :ok, :connected}
:ok
end
test "receives a secret_created event" do
name = "API_TOKEN_ID"
value = Livebook.Utils.random_id()
node = EnterpriseServer.get_node()
:erpc.call(node, Enterprise.Integration, :create_secret, [name, value])
assert_receive {:secret_created, %Secret{name: ^name, value: ^value}}
end
test "receives a secret_updated event" do
name = "SUPER_SUDO_USER"
value = "JakePeralta"
node = EnterpriseServer.get_node()
secret = :erpc.call(node, Enterprise.Integration, :create_secret, [name, value])
assert_receive {:secret_created, %Secret{name: ^name, value: ^value}}
new_value = "ChonkyCat"
:erpc.call(node, Enterprise.Integration, :update_secret, [secret, new_value])
assert_receive {:secret_updated, %Secret{name: ^name, value: ^new_value}}
end
end
end

View file

@ -9,33 +9,29 @@ defmodule Livebook.WebSocket.ServerTest do
test "successfully authenticates the websocket connection", %{url: url, token: token} do
headers = [{"X-Auth-Token", token}]
assert {:ok, conn} = Server.start_link(self(), url, headers)
assert_receive {:connect, :ok, :waiting_upgrade}
assert {:ok, _conn} = Server.start_link(self(), url, headers)
assert_receive {:connect, :ok, :connected}
assert Server.connected?(conn)
end
test "rejects the websocket with invalid address", %{token: token} do
headers = [{"X-Auth-Token", token}]
assert {:ok, conn} = Server.start_link(self(), "http://localhost:9999", headers)
refute Server.connected?(conn)
assert {:ok, _conn} = Server.start_link(self(), "http://localhost:9999", headers)
refute_receive {:connect, :ok, :connected}
end
test "rejects the websocket connection with invalid credentials", %{url: url} do
headers = [{"X-Auth-Token", "foo"}]
assert {:ok, conn} = Server.start_link(self(), url, headers)
assert {:ok, _conn} = Server.start_link(self(), url, headers)
assert_receive {:connect, :error, reason}
assert reason =~ "the given token is invalid"
assert Server.close(conn) == :ok
assert {:ok, conn} = Server.start_link(self(), url)
assert {:ok, _conn} = Server.start_link(self(), url)
assert_receive {:connect, :error, reason}
assert reason =~ "could not get the token from the connection"
assert Server.close(conn) == :ok
end
end
@ -45,7 +41,6 @@ defmodule Livebook.WebSocket.ServerTest do
{:ok, conn} = Server.start_link(self(), url, headers)
assert_receive {:connect, :ok, :waiting_upgrade}
assert_receive {:connect, :ok, :connected}
{:ok, conn: conn}
@ -80,9 +75,7 @@ defmodule Livebook.WebSocket.ServerTest do
assert {:ok, conn} = Server.start_link(self(), url, headers)
assert_receive {:connect, :ok, :waiting_upgrade}
assert_receive {:connect, :ok, :connected}
assert Server.connected?(conn)
on_exit(fn ->
EnterpriseServer.disconnect(name)
@ -99,27 +92,55 @@ defmodule Livebook.WebSocket.ServerTest do
assert_receive {:connect, :error, %Mint.TransportError{reason: :econnrefused}}
assert Process.alive?(conn)
refute Server.connected?(conn)
end
test "reconnects after websocket server is up", %{conn: conn, test: name} do
test "reconnects after websocket server is up", %{test: name} do
EnterpriseServer.disconnect(name)
assert_receive {:connect, :error, %Mint.TransportError{reason: :closed}}
assert_receive {:connect, :error, %Mint.TransportError{reason: :econnrefused}}
Process.sleep(1000)
refute Server.connected?(conn)
# Wait until the server is up again
assert EnterpriseServer.reconnect(name) == :ok
assert_receive {:connect, :ok, :waiting_upgrade}, 3000
assert_receive {:connect, :ok, :connected}, 3000
end
end
assert Server.connected?(conn)
assert Server.close(conn) == :ok
refute Server.connected?(conn)
describe "handle events from server" do
setup %{url: url, token: token} do
headers = [{"X-Auth-Token", token}]
{:ok, _conn} = Server.start_link(self(), url, headers)
assert_receive {:connect, :ok, :connected}
:ok
end
test "receives a secret_created event" do
name = "MY_SECRET_ID"
value = Livebook.Utils.random_id()
node = EnterpriseServer.get_node()
:erpc.call(node, Enterprise.Integration, :create_secret, [name, value])
assert_receive {:event, :secret_created, %{name: ^name, value: ^value}}
end
test "receives a secret_updated event" do
name = "API_USERNAME"
value = "JakePeralta"
node = EnterpriseServer.get_node()
secret = :erpc.call(node, Enterprise.Integration, :create_secret, [name, value])
assert_receive {:event, :secret_created, %{name: ^name, value: ^value}}
new_value = "ChonkyCat"
:erpc.call(node, Enterprise.Integration, :update_secret, [secret, new_value])
assert_receive {:event, :secret_updated, %{name: ^name, value: ^new_value}}
end
end
end

View file

@ -23,8 +23,15 @@ defmodule Livebook.EnterpriseServer do
GenServer.call(name, :fetch_user, @timeout)
end
def get_node(name \\ @name) do
GenServer.call(name, :fetch_node)
end
def drop_database(name \\ @name) do
GenServer.cast(name, :drop_database)
app_port = GenServer.call(name, :fetch_port)
state_env = GenServer.call(name, :fetch_env)
app_port |> env(state_env) |> mix(["ecto.drop", "--quiet"])
end
def reconnect(name \\ @name) do
@ -74,12 +81,20 @@ defmodule Livebook.EnterpriseServer do
{:reply, url, %{state | url: url}}
end
@impl true
def handle_cast(:drop_database, state) do
:ok = mix(state, ["ecto.drop", "--quiet"])
{:noreply, state}
def handle_call(:fetch_node, _from, state) do
{:reply, state.node, state}
end
def handle_call(:fetch_port, _from, state) do
port = state.app_port || app_port()
{:reply, port, state}
end
def handle_call(:fetch_env, _from, state) do
{:reply, state.env, state}
end
@impl true
def handle_cast(:reconnect, state) do
if state.port do
{:noreply, state}
@ -213,10 +228,14 @@ defmodule Livebook.EnterpriseServer do
end
end
defp mix(state, args) do
defp mix(state, args) when is_struct(state) do
state |> env() |> mix(args)
end
defp mix(env, args) do
cmd_opts = [
stderr_to_stdout: true,
env: env(state),
env: env,
cd: app_dir(),
into: IO.stream(:stdio, :line)
]
@ -231,15 +250,18 @@ defmodule Livebook.EnterpriseServer do
defp env(state) do
app_port = state.app_port || app_port()
env(app_port, state.env)
end
defp env(app_port, state_env) do
env = %{
"MIX_ENV" => "livebook",
"LIVEBOOK_ENTERPRISE_PORT" => to_string(app_port),
"LIVEBOOK_ENTERPRISE_DEBUG" => debug()
}
if state.env do
Map.merge(env, state.env)
if state_env do
Map.merge(env, state_env)
else
env
end