mirror of
https://github.com/livebook-dev/livebook.git
synced 2024-09-20 10:05:57 +08:00
Update Connection life-cycle and implement ChangesetError
message (#1633)
This commit is contained in:
parent
b6f928cc0d
commit
3b2ea3c8ce
|
@ -54,6 +54,7 @@ defmodule Livebook.Application do
|
|||
clear_env_vars()
|
||||
display_startup_info()
|
||||
insert_development_hub()
|
||||
Livebook.Hubs.connect_hubs()
|
||||
result
|
||||
|
||||
{:error, error} ->
|
||||
|
|
|
@ -6,11 +6,17 @@ defmodule Livebook.Hubs do
|
|||
|
||||
@namespace :hubs
|
||||
|
||||
@type connected_hub :: %{
|
||||
required(:pid) => pid(),
|
||||
required(:hub) => Provider.t()
|
||||
}
|
||||
@type connected_hubs :: list(connected_hub())
|
||||
|
||||
@doc """
|
||||
Gets a list of hubs from storage.
|
||||
"""
|
||||
@spec fetch_hubs() :: list(Provider.t())
|
||||
def fetch_hubs do
|
||||
@spec get_hubs() :: list(Provider.t())
|
||||
def get_hubs do
|
||||
for fields <- Storage.all(@namespace) do
|
||||
to_struct(fields)
|
||||
end
|
||||
|
@ -19,13 +25,23 @@ defmodule Livebook.Hubs do
|
|||
@doc """
|
||||
Gets a list of metadatas from storage.
|
||||
"""
|
||||
@spec fetch_metadatas() :: list(Metadata.t())
|
||||
def fetch_metadatas do
|
||||
for hub <- fetch_hubs() do
|
||||
@spec get_metadatas() :: list(Metadata.t())
|
||||
def get_metadatas do
|
||||
for hub <- get_hubs() do
|
||||
Provider.normalize(hub)
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Gets one hub from storage.
|
||||
"""
|
||||
@spec get_hub(String.t()) :: {:ok, Provider.t()} | :error
|
||||
def get_hub(id) do
|
||||
with {:ok, data} <- Storage.fetch(@namespace, id) do
|
||||
{:ok, to_struct(data)}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Gets one hub from storage.
|
||||
|
||||
|
@ -54,18 +70,29 @@ defmodule Livebook.Hubs do
|
|||
def save_hub(struct) do
|
||||
attributes = struct |> Map.from_struct() |> Map.to_list()
|
||||
:ok = Storage.insert(@namespace, struct.id, attributes)
|
||||
:ok = connect_hub(struct)
|
||||
:ok = broadcast_hubs_change()
|
||||
|
||||
struct
|
||||
end
|
||||
|
||||
@doc false
|
||||
def delete_hub(id) do
|
||||
Storage.delete(@namespace, id)
|
||||
with {:ok, hub} <- get_hub(id) do
|
||||
if connected_hub = get_connected_hub(hub) do
|
||||
GenServer.stop(connected_hub.pid, :shutdown)
|
||||
end
|
||||
|
||||
:ok = Storage.delete(@namespace, id)
|
||||
:ok = broadcast_hubs_change()
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
@doc false
|
||||
def clean_hubs do
|
||||
for hub <- fetch_hubs(), do: delete_hub(hub.id)
|
||||
for hub <- get_hubs(), do: delete_hub(hub.id)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
@ -91,14 +118,10 @@ defmodule Livebook.Hubs do
|
|||
Phoenix.PubSub.unsubscribe(Livebook.PubSub, "hubs")
|
||||
end
|
||||
|
||||
@doc """
|
||||
Notifies interested processes about hubs data change.
|
||||
|
||||
Broadcasts `{:hubs_metadata_changed, hubs}` message under the `"hubs"` topic.
|
||||
"""
|
||||
@spec broadcast_hubs_change() :: :ok
|
||||
def broadcast_hubs_change do
|
||||
Phoenix.PubSub.broadcast(Livebook.PubSub, "hubs", {:hubs_metadata_changed, fetch_metadatas()})
|
||||
# Notifies interested processes about hubs data change.
|
||||
# Broadcasts `{:hubs_metadata_changed, hubs}` message under the `"hubs"` topic.
|
||||
defp broadcast_hubs_change do
|
||||
Phoenix.PubSub.broadcast(Livebook.PubSub, "hubs", {:hubs_metadata_changed, get_metadatas()})
|
||||
end
|
||||
|
||||
defp to_struct(%{id: "fly-" <> _} = fields) do
|
||||
|
@ -112,4 +135,49 @@ defmodule Livebook.Hubs do
|
|||
defp to_struct(%{id: "local-" <> _} = fields) do
|
||||
Provider.load(%Local{}, fields)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Connects to the all available and connectable hubs.
|
||||
|
||||
## Example
|
||||
|
||||
iex> connect_hubs()
|
||||
:ok
|
||||
|
||||
"""
|
||||
@spec connect_hubs() :: :ok
|
||||
def connect_hubs do
|
||||
for hub <- get_hubs(), do: connect_hub(hub)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
defp connect_hub(hub) do
|
||||
if child_spec = Provider.connect(hub) do
|
||||
DynamicSupervisor.start_child(Livebook.HubsSupervisor, child_spec)
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
@doc """
|
||||
Gets a list of connected hubs.
|
||||
|
||||
## Example
|
||||
|
||||
iex> get_connected_hubs()
|
||||
[%{pid: #PID<0.178.0>, hub: %Enterprise{}}, ...]
|
||||
|
||||
"""
|
||||
@spec get_connected_hubs() :: connected_hubs()
|
||||
def get_connected_hubs do
|
||||
for hub <- get_hubs(), connected = get_connected_hub(hub), do: connected
|
||||
end
|
||||
|
||||
defp get_connected_hub(hub) do
|
||||
case Registry.lookup(Livebook.HubsRegistry, hub.id) do
|
||||
[{pid, _}] -> %{pid: pid, hub: hub}
|
||||
[] -> nil
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -126,5 +126,8 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Enterprise do
|
|||
}
|
||||
end
|
||||
|
||||
def type(_), do: "enterprise"
|
||||
def type(_enterprise), do: "enterprise"
|
||||
|
||||
def connect(%Livebook.Hubs.Enterprise{} = enterprise),
|
||||
do: {Livebook.Hubs.EnterpriseClient, enterprise}
|
||||
end
|
||||
|
|
|
@ -133,5 +133,7 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Fly do
|
|||
}
|
||||
end
|
||||
|
||||
def type(_), do: "fly"
|
||||
def type(_fly), do: "fly"
|
||||
|
||||
def connect(_fly), do: nil
|
||||
end
|
||||
|
|
|
@ -18,5 +18,7 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Local do
|
|||
}
|
||||
end
|
||||
|
||||
def type(_), do: "local"
|
||||
def type(_local), do: "local"
|
||||
|
||||
def connect(_local), do: nil
|
||||
end
|
||||
|
|
|
@ -18,4 +18,10 @@ defprotocol Livebook.Hubs.Provider do
|
|||
"""
|
||||
@spec type(struct()) :: String.t()
|
||||
def type(struct)
|
||||
|
||||
@doc """
|
||||
Gets the child spec of the given struct.
|
||||
"""
|
||||
@spec connect(struct()) :: Supervisor.child_spec() | module() | {module(), any()} | nil
|
||||
def connect(struct)
|
||||
end
|
||||
|
|
|
@ -9,8 +9,6 @@ defmodule Livebook.WebSocket.Client do
|
|||
@type websocket :: Mint.WebSocket.t()
|
||||
@type frame :: Mint.WebSocket.frame() | Mint.WebSocket.shorthand_frame()
|
||||
@type ref :: Mint.Types.request_ref()
|
||||
@type ws_error :: Mint.WebSocket.error()
|
||||
@type mint_error :: Mint.Types.error()
|
||||
|
||||
defmodule Response do
|
||||
defstruct [:status, :headers, body: []]
|
||||
|
@ -28,17 +26,20 @@ defmodule Livebook.WebSocket.Client do
|
|||
Connects to the WebSocket server with given url and headers.
|
||||
"""
|
||||
@spec connect(String.t(), list({String.t(), String.t()})) ::
|
||||
{:ok, conn(), ref()}
|
||||
| {:error, mint_error()}
|
||||
| {:error, conn(), ws_error()}
|
||||
{:ok, conn(), websocket(), ref()}
|
||||
| {:transport_error, String.t()}
|
||||
| {:server_error, list(binary())}
|
||||
def connect(url, headers \\ []) do
|
||||
uri = URI.parse(url)
|
||||
http_scheme = parse_http_scheme(uri)
|
||||
ws_scheme = parse_ws_scheme(uri)
|
||||
state = %{status: nil, headers: [], body: []}
|
||||
|
||||
with {:ok, conn} <- Mint.HTTP.connect(http_scheme, uri.host, uri.port),
|
||||
{:ok, conn, ref} <- Mint.WebSocket.upgrade(ws_scheme, conn, @ws_path, headers) do
|
||||
{:ok, conn, ref}
|
||||
receive_upgrade(conn, ref, state)
|
||||
else
|
||||
{:error, exception} -> {:transport_error, Exception.message(exception)}
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -48,6 +49,62 @@ defmodule Livebook.WebSocket.Client do
|
|||
defp parse_ws_scheme(uri) when uri.scheme in ["http", "ws"], do: :ws
|
||||
defp parse_ws_scheme(uri) when uri.scheme in ["https", "wss"], do: :wss
|
||||
|
||||
defp receive_upgrade(conn, ref, state) do
|
||||
with {:ok, conn} <- Mint.HTTP.set_mode(conn, :passive),
|
||||
{:ok, conn, responses} <- Mint.WebSocket.recv(conn, 0, 5_000) do
|
||||
handle_upgrade_responses(responses, conn, ref, state)
|
||||
else
|
||||
{:error, _websocket, exception, []} ->
|
||||
Mint.HTTP.close(conn)
|
||||
{:transport_error, Exception.message(exception)}
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_upgrade_responses([{:status, ref, status} | responses], conn, ref, state) do
|
||||
handle_upgrade_responses(responses, conn, ref, %{state | status: status})
|
||||
end
|
||||
|
||||
defp handle_upgrade_responses([{:headers, ref, headers} | responses], conn, ref, state) do
|
||||
handle_upgrade_responses(responses, conn, ref, %{state | headers: headers})
|
||||
end
|
||||
|
||||
defp handle_upgrade_responses([{:data, ref, body} | responses], conn, ref, state) do
|
||||
handle_upgrade_responses(responses, conn, ref, %{state | body: [body | state.body]})
|
||||
end
|
||||
|
||||
defp handle_upgrade_responses([{:done, ref} | responses], conn, ref, state) do
|
||||
case state do
|
||||
%{status: 101} ->
|
||||
start_websocket(conn, ref, state)
|
||||
|
||||
%{body: []} ->
|
||||
handle_upgrade_responses(responses, conn, ref, state)
|
||||
|
||||
%{status: _} ->
|
||||
Mint.HTTP.close(conn)
|
||||
{:server_error, state.body |> Enum.reverse() |> IO.iodata_to_binary()}
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_upgrade_responses([], conn, ref, state) do
|
||||
receive_upgrade(conn, ref, state)
|
||||
end
|
||||
|
||||
defp start_websocket(conn, ref, state) do
|
||||
with {:ok, conn, websocket} <- Mint.WebSocket.new(conn, ref, state.status, state.headers),
|
||||
{:ok, conn} <- Mint.HTTP.set_mode(conn, :active) do
|
||||
{:ok, conn, websocket, ref}
|
||||
else
|
||||
{:error, conn, %UpgradeFailureError{}} ->
|
||||
Mint.HTTP.close(conn)
|
||||
{:server_error, state.body |> Enum.reverse() |> IO.iodata_to_binary()}
|
||||
|
||||
{:error, conn, exception} ->
|
||||
Mint.HTTP.close(conn)
|
||||
{:transport_error, Exception.message(exception)}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Disconnects from the given connection, WebSocket and reference.
|
||||
|
||||
|
@ -74,108 +131,42 @@ defmodule Livebook.WebSocket.Client do
|
|||
If the WebSocket isn't connected yet, it will try to get the connection
|
||||
response to start a new WebSocket connection.
|
||||
"""
|
||||
@spec receive(conn() | nil, ref(), websocket() | nil, term()) ::
|
||||
{:ok, conn(), websocket(), Response.t() | :connected}
|
||||
| {:error, conn(), websocket(), Response.t()}
|
||||
| {:error, conn(), websocket(), ws_error() | mint_error()}
|
||||
| {:error, :not_connected | :unknown}
|
||||
def receive(conn, ref, websocket \\ nil, message \\ receive(do: (message -> message))) do
|
||||
do_receive(conn, ref, websocket, message)
|
||||
end
|
||||
@spec receive(conn(), ref(), websocket(), term()) ::
|
||||
{:ok, conn(), websocket(), list(binary())}
|
||||
| {:server_error, conn(), websocket(), String.t()}
|
||||
def receive(conn, ref, websocket, message \\ receive(do: (message -> message))) do
|
||||
with {:ok, conn, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(conn, message),
|
||||
{:ok, websocket, frames} <- Mint.WebSocket.decode(websocket, data),
|
||||
{:ok, response} <- handle_frames(frames) do
|
||||
{:ok, conn, websocket, response}
|
||||
else
|
||||
{:close, response} ->
|
||||
handle_disconnect(conn, websocket, ref, response)
|
||||
|
||||
defp do_receive(nil, _ref, _websocket, _message), do: {:error, :not_connected}
|
||||
{:error, conn, exception} when is_exception(exception) ->
|
||||
{:server_error, conn, websocket, Exception.message(exception)}
|
||||
|
||||
defp do_receive(conn, ref, websocket, message) do
|
||||
case Mint.WebSocket.stream(conn, message) do
|
||||
{:ok, conn, responses} ->
|
||||
handle_responses(conn, ref, websocket, responses)
|
||||
|
||||
{:error, conn, reason, []} ->
|
||||
{:error, conn, websocket, reason}
|
||||
|
||||
{:error, conn, _reason, responses} ->
|
||||
handle_responses(conn, ref, websocket, responses)
|
||||
|
||||
:unknown ->
|
||||
{:error, :unknown}
|
||||
{:error, conn, exception, []} when is_exception(exception) ->
|
||||
{:server_error, conn, websocket, Exception.message(exception)}
|
||||
end
|
||||
end
|
||||
|
||||
@successful_status 100..299
|
||||
|
||||
defp handle_responses(conn, ref, websocket, [{:data, ref, data}]) do
|
||||
with {:ok, websocket, frames} <- Mint.WebSocket.decode(websocket, data) do
|
||||
case handle_frames(%Response{}, frames) do
|
||||
{:ok, response} -> {:ok, conn, websocket, response}
|
||||
{:close, response} -> handle_disconnect(conn, websocket, ref, response)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_responses(conn, ref, websocket, [_ | _] = responses) do
|
||||
Enum.reduce(responses, %Response{}, fn
|
||||
{:status, ^ref, status}, acc -> %{acc | status: status}
|
||||
{:headers, ^ref, headers}, acc -> %{acc | headers: headers}
|
||||
{:data, ^ref, body}, acc -> %{acc | body: body}
|
||||
{:done, ^ref}, acc -> handle_done_response(conn, ref, websocket, acc)
|
||||
end)
|
||||
|> case do
|
||||
{:error, _conn, _websocket, %Response{body: [_ | _]}} = result ->
|
||||
result
|
||||
|
||||
{:error, conn, websocket, %Response{} = response} ->
|
||||
{:error, conn, websocket, %{response | body: [response.body]}}
|
||||
|
||||
%Response{body: [_ | _]} = response when response.status not in @successful_status ->
|
||||
{:error, conn, websocket, response}
|
||||
|
||||
result ->
|
||||
result
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_done_response(conn, ref, websocket, response) do
|
||||
case Mint.WebSocket.new(conn, ref, response.status, response.headers) do
|
||||
{:ok, conn, websocket} ->
|
||||
case decode_response(websocket, response) do
|
||||
{websocket, {:ok, response}} -> {:ok, conn, websocket, response}
|
||||
{websocket, {:close, response}} -> handle_disconnect(conn, websocket, ref, response)
|
||||
{websocket, {:error, reason}} -> {:error, conn, websocket, reason}
|
||||
end
|
||||
|
||||
{:error, conn, %UpgradeFailureError{status_code: status, headers: headers}} ->
|
||||
{:error, conn, websocket, %{response | status: status, headers: headers}}
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_disconnect(conn, websocket, ref, result) do
|
||||
defp handle_disconnect(conn, websocket, ref, response) do
|
||||
with {:ok, conn, websocket} <- disconnect(conn, websocket, ref) do
|
||||
{:ok, conn, websocket, result}
|
||||
{:ok, conn, websocket, response}
|
||||
end
|
||||
end
|
||||
|
||||
defp decode_response(websocket, %Response{status: 101}) do
|
||||
{websocket, {:ok, :connected}}
|
||||
end
|
||||
defp handle_frames(frames), do: handle_frames([], frames)
|
||||
|
||||
defp decode_response(websocket, response) do
|
||||
case Mint.WebSocket.decode(websocket, response.body) do
|
||||
{:ok, websocket, frames} ->
|
||||
{websocket, handle_frames(response, frames)}
|
||||
defp handle_frames(binaries, [{:binary, binary} | rest]),
|
||||
do: handle_frames([binary | binaries], rest)
|
||||
|
||||
{:error, websocket, reason} ->
|
||||
{websocket, {:error, reason}}
|
||||
end
|
||||
end
|
||||
defp handle_frames(binaries, [{:close, _, _} | _]),
|
||||
do: {:close, binaries}
|
||||
|
||||
defp handle_frames(response, [{:binary, binary} | rest]),
|
||||
do: handle_frames(%{response | body: [binary | response.body]}, rest)
|
||||
|
||||
defp handle_frames(response, [{:close, _, _} | _]),
|
||||
do: {:close, response}
|
||||
|
||||
defp handle_frames(response, [_ | rest]), do: handle_frames(response, rest)
|
||||
defp handle_frames(response, []), do: {:ok, response}
|
||||
defp handle_frames(binaries, [_ | rest]), do: handle_frames(binaries, rest)
|
||||
defp handle_frames(binaries, []), do: {:ok, binaries}
|
||||
|
||||
@doc """
|
||||
Sends a message to the given HTTP Connection and WebSocket connection.
|
||||
|
|
|
@ -40,20 +40,22 @@ defmodule Livebook.WebSocket.Server do
|
|||
@impl true
|
||||
def connect(_, state) do
|
||||
case Client.connect(state.url, state.headers) do
|
||||
{:ok, conn, ref} ->
|
||||
{:ok, %{state | http_conn: conn, ref: ref}}
|
||||
{:ok, conn, websocket, ref} ->
|
||||
send(state.listener, {:connect, :ok, :connected})
|
||||
send(self(), {:loop_ping, ref})
|
||||
|
||||
{:error, exception} when is_exception(exception) ->
|
||||
Logger.error("Received exception: #{Exception.message(exception)}")
|
||||
send(state.listener, {:connect, :error, exception})
|
||||
{:ok, %{state | http_conn: conn, ref: ref, websocket: websocket}}
|
||||
|
||||
{:transport_error, reason} ->
|
||||
send(state.listener, {:connect, :error, reason})
|
||||
|
||||
{:backoff, @backoff, state}
|
||||
|
||||
{:error, conn, reason} ->
|
||||
Logger.error("Received error: #{inspect(reason)}")
|
||||
send(state.listener, {:connect, :error, reason})
|
||||
{:server_error, binary} ->
|
||||
{:response, %{type: {:error, error}}} = decode_response_or_event(binary)
|
||||
send(state.listener, {:connect, :error, error.details})
|
||||
|
||||
{:backoff, @backoff, %{state | http_conn: conn}}
|
||||
{:backoff, @backoff, state}
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -103,39 +105,32 @@ defmodule Livebook.WebSocket.Server do
|
|||
|
||||
def handle_info({:loop_ping, _another_ref}, state), do: {:noreply, state}
|
||||
|
||||
def handle_info(message, state) do
|
||||
case Client.receive(state.http_conn, state.ref, state.websocket, message) do
|
||||
{:ok, conn, websocket, :connected} ->
|
||||
state = send_received({:ok, :connected}, state)
|
||||
send(self(), {:loop_ping, state.ref})
|
||||
def handle_info({:tcp_closed, _port} = message, state),
|
||||
do: handle_websocket_message(message, state)
|
||||
|
||||
{:noreply, %{state | http_conn: conn, websocket: websocket}}
|
||||
def handle_info({:tcp, _port, _data} = message, state),
|
||||
do: handle_websocket_message(message, state)
|
||||
|
||||
{:error, conn, websocket, %Mint.TransportError{} = reason} ->
|
||||
state = send_received({:error, reason}, state)
|
||||
|
||||
{:connect, :receive, %{state | http_conn: conn, websocket: websocket}}
|
||||
|
||||
{term, conn, websocket, data} ->
|
||||
state = send_received({term, data}, state)
|
||||
|
||||
{:noreply, %{state | http_conn: conn, websocket: websocket}}
|
||||
|
||||
{:error, _} = error ->
|
||||
{:noreply, send_received(error, state)}
|
||||
end
|
||||
end
|
||||
def handle_info(_message, state), do: {:noreply, state}
|
||||
|
||||
# Private
|
||||
|
||||
defp send_received({:ok, :connected}, state) do
|
||||
send(state.listener, {:connect, :ok, :connected})
|
||||
state
|
||||
def handle_websocket_message(message, state) do
|
||||
case Client.receive(state.http_conn, state.ref, state.websocket, message) do
|
||||
{:ok, conn, websocket, data} ->
|
||||
state = %{state | http_conn: conn, websocket: websocket}
|
||||
{:noreply, send_received(data, state)}
|
||||
|
||||
{:server_error, conn, websocket, reason} ->
|
||||
send(state.listener, {:connect, :error, reason})
|
||||
|
||||
{:connect, :receive, %{state | http_conn: conn, websocket: websocket}}
|
||||
end
|
||||
end
|
||||
|
||||
defp send_received({:ok, %Client.Response{body: [], status: nil}}, state), do: state
|
||||
defp send_received([], state), do: state
|
||||
|
||||
defp send_received({:ok, %Client.Response{body: binaries}}, state) do
|
||||
defp send_received([_ | _] = binaries, state) do
|
||||
for binary <- binaries, reduce: state do
|
||||
acc ->
|
||||
case decode_response_or_event(binary) do
|
||||
|
@ -145,6 +140,9 @@ defmodule Livebook.WebSocket.Server do
|
|||
{:response, %{id: id, type: {:error, %{details: reason}}}} ->
|
||||
reply_to_id(id, {:error, reason}, acc)
|
||||
|
||||
{:response, %{id: id, type: {:changeset, %{errors: field_errors}}}} ->
|
||||
reply_to_id(id, {:changeset_error, to_changeset_errors(field_errors)}, acc)
|
||||
|
||||
{:response, %{id: id, type: result}} ->
|
||||
reply_to_id(id, result, acc)
|
||||
|
||||
|
@ -155,39 +153,9 @@ defmodule Livebook.WebSocket.Server do
|
|||
end
|
||||
end
|
||||
|
||||
defp send_received({:error, :unknown}, state), do: state
|
||||
|
||||
defp send_received({:error, %Mint.TransportError{} = reason}, state) do
|
||||
send(state.listener, {:connect, :error, reason})
|
||||
state
|
||||
end
|
||||
|
||||
defp send_received({:error, %Client.Response{body: binaries, status: status}}, state)
|
||||
when binaries != [] and status != nil do
|
||||
for binary <- binaries do
|
||||
with {:response, body} <- decode_response_or_event(binary),
|
||||
%{type: {:error, %{details: reason}}} <- body do
|
||||
send(state.listener, {:connect, :error, reason})
|
||||
end
|
||||
end
|
||||
|
||||
state
|
||||
end
|
||||
|
||||
defp send_received({:error, %Client.Response{body: [], status: status}}, state)
|
||||
when status != nil do
|
||||
reply_to_all({:error, Plug.Conn.Status.reason_phrase(status)}, state)
|
||||
end
|
||||
|
||||
defp send_received({:error, %Client.Response{body: binaries, status: nil}}, state) do
|
||||
for binary <- binaries,
|
||||
{:response, body} <- decode_response_or_event(binary),
|
||||
reduce: state do
|
||||
acc ->
|
||||
case body do
|
||||
%{id: -1, type: {:error, %{details: reason}}} -> reply_to_all({:error, reason}, acc)
|
||||
%{id: id, type: {:error, %{details: reason}}} -> reply_to_id(id, {:error, reason}, acc)
|
||||
end
|
||||
defp to_changeset_errors(field_errors) do
|
||||
for %{field: field, details: errors} <- field_errors, into: %{} do
|
||||
{String.to_atom(field), errors}
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -4,27 +4,22 @@ defmodule LivebookWeb.SidebarHook do
|
|||
import Phoenix.Component
|
||||
import Phoenix.LiveView
|
||||
|
||||
alias Livebook.Hubs.Enterprise
|
||||
alias Livebook.Hubs.EnterpriseClient
|
||||
|
||||
def on_mount(:default, _params, _session, socket) do
|
||||
if connected?(socket) do
|
||||
Livebook.Hubs.subscribe()
|
||||
end
|
||||
|
||||
hubs = Livebook.Hubs.fetch_metadatas()
|
||||
|
||||
socket =
|
||||
socket
|
||||
|> assign(saved_hubs: hubs)
|
||||
|> assign(saved_hubs: Livebook.Hubs.get_metadatas())
|
||||
|> attach_hook(:hubs, :handle_info, &handle_info/2)
|
||||
|> attach_hook(:shutdown, :handle_event, &handle_event/3)
|
||||
|
||||
{:cont, assign(socket, connected_hubs: connect_enterprise_hubs(hubs))}
|
||||
{:cont, socket}
|
||||
end
|
||||
|
||||
defp handle_info({:hubs_metadata_changed, hubs}, socket) do
|
||||
{:halt, assign(socket, saved_hubs: hubs, connected_hubs: connect_enterprise_hubs(hubs))}
|
||||
{:halt, assign(socket, saved_hubs: hubs)}
|
||||
end
|
||||
|
||||
defp handle_info(_event, socket), do: {:cont, socket}
|
||||
|
@ -42,26 +37,4 @@ defmodule LivebookWeb.SidebarHook do
|
|||
end
|
||||
|
||||
defp handle_event(_event, _params, socket), do: {:cont, socket}
|
||||
|
||||
# TODO: Move Hub connection life-cycle elsewhere
|
||||
@supervisor Livebook.HubsSupervisor
|
||||
@registry Livebook.HubsRegistry
|
||||
|
||||
defp connect_enterprise_hubs(hubs) do
|
||||
for %{provider: %Enterprise{} = enterprise} <- hubs do
|
||||
pid =
|
||||
case Registry.lookup(@registry, enterprise.url) do
|
||||
[{pid, _}] ->
|
||||
pid
|
||||
|
||||
[] ->
|
||||
case DynamicSupervisor.start_child(@supervisor, {EnterpriseClient, enterprise}) do
|
||||
{:ok, pid} -> pid
|
||||
{:error, {:already_started, pid}} -> pid
|
||||
end
|
||||
end
|
||||
|
||||
%{hub: enterprise, pid: pid}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -8,6 +8,7 @@ defmodule LivebookWeb.SessionLive do
|
|||
alias Livebook.{Sessions, Session, Delta, Notebook, Runtime, LiveMarkdown, Secrets}
|
||||
alias Livebook.Notebook.{Cell, ContentLoader}
|
||||
alias Livebook.JSInterop
|
||||
alias Livebook.Hubs
|
||||
alias Livebook.Hubs.EnterpriseClient
|
||||
|
||||
on_mount LivebookWeb.SidebarHook
|
||||
|
@ -63,7 +64,7 @@ defmodule LivebookWeb.SessionLive do
|
|||
autofocus_cell_id: autofocus_cell_id(data.notebook),
|
||||
page_title: get_page_title(data.notebook.name),
|
||||
livebook_secrets: Secrets.fetch_secrets() |> Map.new(&{&1.name, &1.value}),
|
||||
enterprise_secrets: fetch_enterprise_secrets(socket),
|
||||
enterprise_secrets: fetch_enterprise_secrets(),
|
||||
select_secret_ref: nil,
|
||||
select_secret_options: nil
|
||||
)
|
||||
|
@ -421,7 +422,6 @@ defmodule LivebookWeb.SessionLive do
|
|||
id="secrets"
|
||||
session={@session}
|
||||
secrets={@data_view.secrets}
|
||||
enterprise_hubs={@connected_hubs}
|
||||
livebook_secrets={@livebook_secrets}
|
||||
prefill_secret_name={@prefill_secret_name}
|
||||
select_secret_ref={@select_secret_ref}
|
||||
|
@ -1450,14 +1450,14 @@ defmodule LivebookWeb.SessionLive do
|
|||
def handle_info({:secret_created, %Secrets.Secret{}}, socket) do
|
||||
{:noreply,
|
||||
socket
|
||||
|> assign(enterprise_secrets: fetch_enterprise_secrets(socket))
|
||||
|> assign(enterprise_secrets: fetch_enterprise_secrets())
|
||||
|> put_flash(:info, "A new secret has been created on your Livebook Enterprise")}
|
||||
end
|
||||
|
||||
def handle_info({:secret_updated, %Secrets.Secret{}}, socket) do
|
||||
{:noreply,
|
||||
socket
|
||||
|> assign(enterprise_secrets: fetch_enterprise_secrets(socket))
|
||||
|> assign(enterprise_secrets: fetch_enterprise_secrets())
|
||||
|> put_flash(:info, "An existing secret has been updated on your Livebook Enterprise")}
|
||||
end
|
||||
|
||||
|
@ -2298,8 +2298,8 @@ defmodule LivebookWeb.SessionLive do
|
|||
secret in secrets
|
||||
end
|
||||
|
||||
defp fetch_enterprise_secrets(socket) do
|
||||
for connected_hub <- socket.assigns.connected_hubs,
|
||||
defp fetch_enterprise_secrets do
|
||||
for connected_hub <- Hubs.get_connected_hubs(),
|
||||
secret <- EnterpriseClient.list_cached_secrets(connected_hub.pid),
|
||||
into: %{},
|
||||
do: {secret.name, secret.value}
|
||||
|
|
|
@ -5,7 +5,11 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do
|
|||
|
||||
@impl true
|
||||
def update(assigns, socket) do
|
||||
socket = assign(socket, assigns)
|
||||
socket =
|
||||
socket
|
||||
|> assign(assigns)
|
||||
|> assign(connected_hubs: Livebook.Hubs.get_connected_hubs())
|
||||
|
||||
prefill_form = prefill_secret_name(socket)
|
||||
|
||||
socket =
|
||||
|
@ -119,16 +123,16 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do
|
|||
<% end %>
|
||||
<%= if Livebook.Config.feature_flag_enabled?(:hub) do %>
|
||||
<%= label class: "flex items-center gap-2 text-gray-600" do %>
|
||||
<%= radio_button(f, :store, "enterprise",
|
||||
disabled: @enterprise_hubs == [],
|
||||
checked: @data["store"] == "enterprise"
|
||||
) %> in the Enterprise
|
||||
<%= radio_button(f, :store, "hub",
|
||||
disabled: @connected_hubs == [],
|
||||
checked: @data["store"] == "hub"
|
||||
) %> in the Hub
|
||||
<% end %>
|
||||
<%= if @data["store"] == "enterprise" do %>
|
||||
<%= if @data["store"] == "hub" do %>
|
||||
<%= select(
|
||||
f,
|
||||
:enterprise_hub,
|
||||
enterprise_hubs_options(@enterprise_hubs, @data["enterprise_hub"]),
|
||||
:connected_hub,
|
||||
connected_hubs_options(@connected_hubs, @data["connected_hub"]),
|
||||
class: "input"
|
||||
) %>
|
||||
<% end %>
|
||||
|
@ -299,10 +303,10 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do
|
|||
Livebook.Session.set_secret(socket.assigns.session.pid, secret)
|
||||
end
|
||||
|
||||
defp set_secret(socket, secret, "enterprise") do
|
||||
selected_hub = socket.assigns.data["enterprise_hub"]
|
||||
defp set_secret(socket, secret, "hub") do
|
||||
selected_hub = socket.assigns.data["connected_hub"]
|
||||
|
||||
if hub = Enum.find(socket.assigns.enterprise_hubs, &(&1.hub.id == selected_hub)) do
|
||||
if hub = Enum.find(socket.assigns.connected_hubs, &(&1.hub.id == selected_hub)) do
|
||||
create_secret_request =
|
||||
LivebookProto.CreateSecretRequest.new!(
|
||||
name: secret.name,
|
||||
|
@ -314,7 +318,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do
|
|||
{:error, reason} -> {:error, put_flash(socket, :error, reason)}
|
||||
end
|
||||
else
|
||||
{:error, %{errors: [{"enterprise_hub", {"can't be blank", []}}]}}
|
||||
{:error, %{errors: [{"connected_hub", {"can't be blank", []}}]}}
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -336,7 +340,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do
|
|||
end
|
||||
|
||||
# TODO: Livebook.Hubs.fetch_hubs_with_secrets_storage()
|
||||
defp enterprise_hubs_options(connected_hubs, selected_hub) do
|
||||
defp connected_hubs_options(connected_hubs, selected_hub) do
|
||||
[[key: "Select one Hub", value: "", selected: true, disabled: true]] ++
|
||||
for %{hub: %{id: id, hub_name: name}} <- connected_hubs do
|
||||
[key: name, value: id, selected: id == selected_hub]
|
||||
|
|
6
proto/lib/livebook_proto/changeset_error.pb.ex
Normal file
6
proto/lib/livebook_proto/changeset_error.pb.ex
Normal file
|
@ -0,0 +1,6 @@
|
|||
defmodule LivebookProto.ChangesetError do
|
||||
@moduledoc false
|
||||
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
|
||||
|
||||
field :errors, 1, repeated: true, type: LivebookProto.FieldError
|
||||
end
|
7
proto/lib/livebook_proto/field_error.pb.ex
Normal file
7
proto/lib/livebook_proto/field_error.pb.ex
Normal file
|
@ -0,0 +1,7 @@
|
|||
defmodule LivebookProto.FieldError do
|
||||
@moduledoc false
|
||||
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
|
||||
|
||||
field :field, 1, type: :string
|
||||
field :details, 2, repeated: true, type: :string
|
||||
end
|
|
@ -6,9 +6,10 @@ defmodule LivebookProto.Response do
|
|||
|
||||
field :id, 1, type: :int32
|
||||
field :error, 2, type: LivebookProto.Error, oneof: 0
|
||||
field :session, 3, type: LivebookProto.SessionResponse, oneof: 0
|
||||
field :changeset, 3, type: LivebookProto.ChangesetError, oneof: 0
|
||||
field :session, 4, type: LivebookProto.SessionResponse, oneof: 0
|
||||
|
||||
field :create_secret, 4,
|
||||
field :create_secret, 5,
|
||||
type: LivebookProto.CreateSecretResponse,
|
||||
json_name: "createSecret",
|
||||
oneof: 0
|
||||
|
|
|
@ -9,6 +9,15 @@ message Error {
|
|||
string details = 1;
|
||||
}
|
||||
|
||||
message FieldError {
|
||||
string field = 1;
|
||||
repeated string details = 2;
|
||||
}
|
||||
|
||||
message ChangesetError {
|
||||
repeated FieldError errors = 1;
|
||||
}
|
||||
|
||||
message SecretCreated {
|
||||
string name = 1;
|
||||
string value = 2;
|
||||
|
@ -50,9 +59,10 @@ message Response {
|
|||
|
||||
oneof type {
|
||||
Error error = 2;
|
||||
ChangesetError changeset = 3;
|
||||
|
||||
SessionResponse session = 3;
|
||||
CreateSecretResponse create_secret = 4;
|
||||
SessionResponse session = 4;
|
||||
CreateSecretResponse create_secret = 5;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ defmodule Livebook.Hubs.EnterpriseClientTest do
|
|||
enterprise = build(:enterprise, url: "http://localhost:9999", token: token)
|
||||
|
||||
EnterpriseClient.start_link(enterprise)
|
||||
assert_receive {:connect, :error, %Mint.TransportError{reason: :econnrefused}}
|
||||
assert_receive {:connect, :error, "connection refused"}
|
||||
end
|
||||
|
||||
test "rejects the web socket connection with invalid credentials", %{url: url} do
|
||||
|
|
|
@ -9,18 +9,18 @@ defmodule Livebook.HubsTest do
|
|||
:ok
|
||||
end
|
||||
|
||||
test "fetch_hubs/0 returns a list of persisted hubs" do
|
||||
test "get_hubs/0 returns a list of persisted hubs" do
|
||||
fly = insert_hub(:fly, id: "fly-baz")
|
||||
assert Hubs.fetch_hubs() == [fly]
|
||||
assert Hubs.get_hubs() == [fly]
|
||||
|
||||
Hubs.delete_hub("fly-baz")
|
||||
assert Hubs.fetch_hubs() == []
|
||||
assert Hubs.get_hubs() == []
|
||||
end
|
||||
|
||||
test "fetch_metadata/0 returns a list of persisted hubs normalized" do
|
||||
test "get_metadata/0 returns a list of persisted hubs normalized" do
|
||||
fly = insert_hub(:fly, id: "fly-livebook")
|
||||
|
||||
assert Hubs.fetch_metadatas() == [
|
||||
assert Hubs.get_metadatas() == [
|
||||
%Hubs.Metadata{
|
||||
id: "fly-livebook",
|
||||
color: fly.hub_color,
|
||||
|
@ -30,7 +30,7 @@ defmodule Livebook.HubsTest do
|
|||
]
|
||||
|
||||
Hubs.delete_hub("fly-livebook")
|
||||
assert Hubs.fetch_metadatas() == []
|
||||
assert Hubs.get_metadatas() == []
|
||||
end
|
||||
|
||||
test "fetch_hub!/1 returns one persisted fly" do
|
||||
|
|
|
@ -17,7 +17,7 @@ defmodule Livebook.WebSocket.ServerTest do
|
|||
headers = [{"X-Auth-Token", token}]
|
||||
|
||||
assert {:ok, _conn} = Server.start_link(self(), "http://localhost:9999", headers)
|
||||
refute_receive {:connect, :ok, :connected}
|
||||
assert_receive {:connect, :error, "connection refused"}
|
||||
end
|
||||
|
||||
test "rejects the websocket connection with invalid credentials", %{url: url} do
|
||||
|
@ -71,7 +71,8 @@ defmodule Livebook.WebSocket.ServerTest do
|
|||
value: ""
|
||||
)
|
||||
|
||||
assert Server.send_request(conn, create_secret_request) == {:error, "value: can't be blank"}
|
||||
assert {:changeset_error, errors} = Server.send_request(conn, create_secret_request)
|
||||
assert "can't be blank" in errors.value
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -105,8 +106,8 @@ defmodule Livebook.WebSocket.ServerTest do
|
|||
test "receives the disconnect message from websocket server", %{conn: conn, test: name} do
|
||||
EnterpriseServer.disconnect(name)
|
||||
|
||||
assert_receive {:connect, :error, %Mint.TransportError{reason: :closed}}
|
||||
assert_receive {:connect, :error, %Mint.TransportError{reason: :econnrefused}}
|
||||
assert_receive {:connect, :error, "socket closed"}
|
||||
assert_receive {:connect, :error, "connection refused"}
|
||||
|
||||
assert Process.alive?(conn)
|
||||
end
|
||||
|
@ -114,8 +115,8 @@ defmodule Livebook.WebSocket.ServerTest do
|
|||
test "reconnects after websocket server is up", %{test: name} do
|
||||
EnterpriseServer.disconnect(name)
|
||||
|
||||
assert_receive {:connect, :error, %Mint.TransportError{reason: :closed}}
|
||||
assert_receive {:connect, :error, %Mint.TransportError{reason: :econnrefused}}
|
||||
assert_receive {:connect, :error, "socket closed"}
|
||||
assert_receive {:connect, :error, "connection refused"}
|
||||
|
||||
Process.sleep(1000)
|
||||
|
||||
|
|
|
@ -8,19 +8,21 @@ defmodule LivebookWeb.SessionLive.SecretsComponentTest do
|
|||
alias Livebook.Sessions
|
||||
|
||||
describe "enterprise" do
|
||||
setup %{user: user, url: url, token: token} do
|
||||
Livebook.Hubs.delete_hub("enterprise-#{user.id}")
|
||||
setup %{url: url, token: token} do
|
||||
id = Livebook.Utils.random_id()
|
||||
Livebook.Hubs.delete_hub("enterprise-#{id}")
|
||||
|
||||
enterprise =
|
||||
insert_hub(:enterprise,
|
||||
id: "enterprise-#{user.id}",
|
||||
external_id: user.id,
|
||||
id: "enterprise-#{id}",
|
||||
external_id: id,
|
||||
url: url,
|
||||
token: token
|
||||
)
|
||||
|
||||
{:ok, session} = Sessions.create_session(notebook: Livebook.Notebook.new())
|
||||
Livebook.Hubs.EnterpriseClient.subscribe()
|
||||
Livebook.Hubs.connect_hubs()
|
||||
|
||||
on_exit(fn ->
|
||||
Session.close(session.pid)
|
||||
|
@ -42,7 +44,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponentTest do
|
|||
data: %{
|
||||
name: "FOO",
|
||||
value: "123",
|
||||
store: "enterprise"
|
||||
store: "hub"
|
||||
}
|
||||
}) =~ ~s(<option value="#{enterprise.id}">#{enterprise.hub_name}</option>)
|
||||
end
|
||||
|
@ -58,8 +60,8 @@ defmodule LivebookWeb.SessionLive.SecretsComponentTest do
|
|||
data: %{
|
||||
name: "FOO",
|
||||
value: "123",
|
||||
store: "enterprise",
|
||||
enterprise_hub: enterprise.id
|
||||
store: "hub",
|
||||
connected_hub: enterprise.id
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue