Implement TeamClient with Teams WebSocket communication (#1951)

This commit is contained in:
Alexandre de Souza 2023-06-01 12:01:43 -03:00 committed by GitHub
parent 2399395bfd
commit 14dd6d925f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 457 additions and 438 deletions

View file

@ -10,7 +10,7 @@ defmodule Livebook.Hubs.Broadcasts do
@secrets_topic "hubs:secrets"
@doc """
Broadcasts under `hubs:crud` topic when hubs changed.
Broadcasts under `#{@crud_topic}` topic when hubs changed.
"""
@spec hub_changed() :: broadcast()
def hub_changed do
@ -18,7 +18,7 @@ defmodule Livebook.Hubs.Broadcasts do
end
@doc """
Broadcasts under `hubs:connection` topic when hub connected.
Broadcasts under `#{@connection_topic}` topic when hub connected.
"""
@spec hub_connected() :: broadcast()
def hub_connected do
@ -26,15 +26,15 @@ defmodule Livebook.Hubs.Broadcasts do
end
@doc """
Broadcasts under `hubs:connection` topic when hub disconnected.
Broadcasts under `#{@connection_topic}` topic when hub is out-of-date.
"""
@spec hub_disconnected() :: broadcast()
def hub_disconnected do
broadcast(@connection_topic, :hub_disconnected)
@spec hub_server_error(String.t()) :: broadcast()
def hub_server_error(reason) when is_binary(reason) do
broadcast(@connection_topic, {:hub_server_error, reason})
end
@doc """
Broadcasts under `hubs:connection` topic when hub received a connection error.
Broadcasts under `#{@connection_topic}` topic when hub received a connection error.
"""
@spec hub_connection_failed(String.t()) :: broadcast()
def hub_connection_failed(reason) when is_binary(reason) do
@ -42,7 +42,7 @@ defmodule Livebook.Hubs.Broadcasts do
end
@doc """
Broadcasts under `hubs:secrets` topic when hub received a new secret.
Broadcasts under `#{@secrets_topic}` topic when hub received a new secret.
"""
@spec secret_created(Secret.t()) :: broadcast()
def secret_created(%Secret{} = secret) do
@ -50,7 +50,7 @@ defmodule Livebook.Hubs.Broadcasts do
end
@doc """
Broadcasts under `hubs:secrets` topic when hub received an updated secret.
Broadcasts under `#{@secrets_topic}` topic when hub received an updated secret.
"""
@spec secret_updated(Secret.t()) :: broadcast()
def secret_updated(%Secret{} = secret) do
@ -58,7 +58,7 @@ defmodule Livebook.Hubs.Broadcasts do
end
@doc """
Broadcasts under `hubs:secrets` topic when hub received a deleted secret.
Broadcasts under `#{@secrets_topic}` topic when hub received a deleted secret.
"""
@spec secret_deleted(Secret.t()) :: broadcast()
def secret_deleted(%Secret{} = secret) do

View file

@ -60,6 +60,8 @@ defmodule Livebook.Hubs.Team do
end
defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Team do
alias Livebook.Hubs.TeamClient
def load(team, fields) do
%{
team
@ -80,17 +82,17 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Team do
name: team.hub_name,
provider: team,
emoji: team.hub_emoji,
connected?: false
connected?: TeamClient.connected?(team.id)
}
end
def type(_team), do: "team"
def connection_spec(_team), do: nil
def connection_spec(team), do: {TeamClient, team}
def disconnect(_team), do: raise("not implemented")
def disconnect(team), do: TeamClient.stop(team.id)
def capabilities(_team), do: []
def capabilities(_team), do: ~w(connect)a
def get_secrets(_team), do: []
@ -100,7 +102,10 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Team do
def delete_secret(_team, _secret), do: :ok
def connection_error(_team), do: raise("not implemented")
def connection_error(team) do
reason = TeamClient.get_connection_error(team.id)
"Cannot connect to Hub: #{reason}. Will attempt to reconnect automatically..."
end
def notebook_stamp(_hub, _notebook_source, _metadata) do
:skip

View file

@ -0,0 +1,104 @@
defmodule Livebook.Hubs.TeamClient do
@moduledoc false
use GenServer
require Logger
alias Livebook.Hubs.Broadcasts
alias Livebook.Hubs.Team
alias Livebook.Teams.Connection
@registry Livebook.HubsRegistry
@supervisor Livebook.HubsSupervisor
defstruct [:hub, :connection_error, connected?: false, secrets: []]
@type registry_name :: {:via, Registry, {Livebook.HubsRegistry, String.t()}}
@doc """
Connects the Team client with WebSocket server.
"""
@spec start_link(Team.t()) :: GenServer.on_start()
def start_link(%Team{} = team) do
GenServer.start_link(__MODULE__, team, name: registry_name(team.id))
end
@doc """
Stops the WebSocket server.
"""
@spec stop(String.t()) :: :ok
def stop(id) do
if pid = GenServer.whereis(registry_name(id)) do
DynamicSupervisor.terminate_child(@supervisor, pid)
end
:ok
end
@doc """
Returns the latest error from connection.
"""
@spec get_connection_error(String.t()) :: String.t() | nil
def get_connection_error(id) do
GenServer.call(registry_name(id), :get_connection_error)
catch
:exit, _ -> "connection refused"
end
@doc """
Returns if the Team client is connected.
"""
@spec connected?(String.t()) :: boolean()
def connected?(id) do
GenServer.call(registry_name(id), :connected?)
catch
:exit, _ -> false
end
## GenServer callbacks
@impl true
def init(%Team{} = team) do
header = [
{"x-user", to_string(team.user_id)},
{"x-org", to_string(team.org_id)},
{"x-org-key", to_string(team.org_key_id)},
{"x-session-token", team.session_token}
]
{:ok, _pid} = Connection.start_link(self(), header)
{:ok, %__MODULE__{hub: team}}
end
@impl true
def handle_call(:get_connection_error, _caller, state) do
{:reply, state.connection_error, state}
end
def handle_call(:connected?, _caller, state) do
{:reply, state.connected?, state}
end
@impl true
def handle_info(:connected, state) do
Broadcasts.hub_connected()
{:noreply, %{state | connected?: true, connection_error: nil}}
end
def handle_info({:connection_error, reason}, state) do
Broadcasts.hub_connection_failed(reason)
{:noreply, %{state | connected?: false, connection_error: reason}}
end
def handle_info({:server_error, reason}, state) do
Broadcasts.hub_server_error("#{state.hub.hub_name}: #{reason}")
:ok = Livebook.Hubs.delete_hub(state.hub.id)
{:noreply, %{state | connected?: false}}
end
# Private
defp registry_name(id) do
{:via, Registry, {@registry, id}}
end
end

View file

@ -3,7 +3,7 @@ defmodule Livebook.Teams do
alias Livebook.Hubs
alias Livebook.Hubs.Team
alias Livebook.Teams.{Client, Org}
alias Livebook.Teams.{HTTP, Org}
import Ecto.Changeset, only: [add_error: 3, apply_action: 2, apply_action!: 2, get_field: 2]
@ -18,7 +18,7 @@ defmodule Livebook.Teams do
| {:error, Ecto.Changeset.t()}
| {:transport_error, String.t()}
def create_org(%Org{} = org, attrs) do
create_org_request(org, attrs, &Client.create_org/1)
create_org_request(org, attrs, &HTTP.create_org/1)
end
@doc """
@ -32,7 +32,7 @@ defmodule Livebook.Teams do
| {:error, Ecto.Changeset.t()}
| {:transport_error, String.t()}
def join_org(%Org{} = org, attrs) do
create_org_request(org, attrs, &Client.join_org/1)
create_org_request(org, attrs, &HTTP.join_org/1)
end
defp create_org_request(%Org{} = org, attrs, callback) when is_function(callback, 1) do
@ -74,7 +74,7 @@ defmodule Livebook.Teams do
| {:error, :expired}
| {:transport_error, String.t()}
def get_org_request_completion_data(%Org{id: id}, device_code) do
case Client.get_org_request_completion_data(id, device_code) do
case HTTP.get_org_request_completion_data(id, device_code) do
{:ok, %{"status" => "awaiting_confirmation"}} -> {:ok, :awaiting_confirmation}
{:ok, completion_data} -> {:ok, completion_data}
{:error, %{"status" => "expired"}} -> {:error, :expired}

View file

@ -0,0 +1,110 @@
defmodule Livebook.Teams.Connection do
@moduledoc false
@behaviour :gen_statem
require Logger
alias Livebook.WebSocket
alias Livebook.Teams.WebSocket
@backoff 5_000
@no_state :no_state
@loop_ping_delay 5_000
defstruct [:listener, :headers, :http_conn, :websocket, :ref]
@doc """
Starts a new WebSocket connection with given headers.
"""
@spec start_link(pid(), Mint.Types.headers()) :: :gen_statem.start_ret()
def start_link(listener, headers \\ []) do
:gen_statem.start_link(__MODULE__, {listener, headers}, [])
end
## gen_statem callbacks
@impl true
def callback_mode(), do: :handle_event_function
@impl true
def init({listener, headers}) do
data = %__MODULE__{listener: listener, headers: headers}
{:ok, @no_state, data, {:next_event, :internal, :connect}}
end
@impl true
def handle_event(event_type, event_data, state, data)
def handle_event(:internal, :connect, @no_state, %__MODULE__{} = data) do
case WebSocket.connect(data.headers) do
{:ok, conn, websocket, ref} ->
send(data.listener, :connected)
send(self(), {:loop_ping, ref})
{:keep_state, %__MODULE__{data | http_conn: conn, ref: ref, websocket: websocket}}
{:transport_error, reason} ->
send(data.listener, {:connection_error, reason})
{:keep_state_and_data, {{:timeout, :backoff}, @backoff, nil}}
{:server_error, error} ->
reason = LivebookProto.Error.decode(error).details
send(data.listener, {:server_error, reason})
{:keep_state_and_data, {{:timeout, :reconnect}, @backoff, nil}}
end
end
def handle_event({:timeout, :backoff}, nil, _state, _data) do
{:keep_state_and_data, {:next_event, :internal, :connect}}
end
def handle_event({:timeout, :reconnect}, nil, _state, _data) do
{:keep_state_and_data, {:next_event, :internal, :connect}}
end
def handle_event(:info, {:loop_ping, ref}, @no_state, %__MODULE__{ref: ref} = data) do
case WebSocket.send(data.http_conn, data.websocket, data.ref, :ping) do
{:ok, conn, websocket} ->
Process.send_after(self(), {:loop_ping, data.ref}, @loop_ping_delay)
{:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}}
{:error, conn, websocket, _reason} ->
{:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}}
end
end
def handle_event(:info, {:loop_ping, _another_ref}, @no_state, _data) do
:keep_state_and_data
end
def handle_event(:info, {:tcp_closed, _port} = message, @no_state, %__MODULE__{} = data) do
handle_websocket_message(message, data)
end
def handle_event(:info, {:tcp, _port, _data} = message, @no_state, %__MODULE__{} = data) do
handle_websocket_message(message, data)
end
def handle_event(:info, _message, @no_state, _data) do
:keep_state_and_data
end
# Private
defp handle_websocket_message(message, %__MODULE__{} = data) do
case WebSocket.receive(data.http_conn, data.ref, data.websocket, message) do
{:ok, conn, websocket, _binaries} ->
data = %__MODULE__{data | http_conn: conn, websocket: websocket}
{:keep_state, data}
{:server_error, conn, websocket, reason} ->
send(data.listener, {:connection_error, reason})
data = %__MODULE__{data | http_conn: conn, websocket: websocket}
{:keep_state, data, {:next_event, :internal, :connect}}
end
end
end

View file

@ -1,4 +1,4 @@
defmodule Livebook.Teams.Client do
defmodule Livebook.Teams.HTTP do
@moduledoc false
alias Livebook.Teams.Org

View file

@ -1,9 +1,9 @@
defmodule Livebook.WebSocket.Client do
defmodule Livebook.Teams.WebSocket do
@moduledoc false
alias Mint.WebSocket.UpgradeFailureError
@ws_path "/livebook/websocket"
@ws_path "/user/websocket"
@type conn :: Mint.HTTP.t()
@type websocket :: Mint.WebSocket.t()
@ -13,14 +13,14 @@ defmodule Livebook.WebSocket.Client do
defguard is_frame(value) when value in [:close, :ping] or elem(value, 0) == :binary
@doc """
Connects to the WebSocket server with given url and headers.
Connects to the WebSocket server with given headers.
"""
@spec connect(String.t(), list({String.t(), String.t()})) ::
@spec connect(list({String.t(), String.t()})) ::
{:ok, conn(), websocket(), ref()}
| {:transport_error, String.t()}
| {:server_error, String.t()}
def connect(url, headers \\ []) do
uri = URI.parse(url)
def connect(headers \\ []) do
uri = URI.parse(Livebook.Config.teams_url())
{http_scheme, ws_scheme} = parse_scheme(uri)
state = %{status: nil, headers: [], body: []}

View file

@ -1,187 +0,0 @@
defmodule Livebook.WebSocket.ClientConnection do
@moduledoc false
@behaviour :gen_statem
require Logger
alias Livebook.WebSocket
alias Livebook.WebSocket.Client
@timeout 10_000
@backoff 5_000
@no_state :no_state
@loop_ping_delay 5_000
defstruct [:url, :listener, :headers, :http_conn, :websocket, :ref, id: 0, reply: %{}]
@doc """
Starts a new WebSocket connection with given URL and headers.
"""
@spec start_link(pid(), String.t(), Mint.Types.headers()) :: :gen_statem.start_ret()
def start_link(listener, url, headers \\ []) do
:gen_statem.start_link(__MODULE__, {listener, url, headers}, [])
end
@doc """
Sends a Request to given WebSocket Server.
"""
@spec send_request(pid(), WebSocket.proto()) :: {atom(), term()}
def send_request(conn, %_struct{} = data) do
:gen_statem.call(conn, {:request, data}, @timeout)
end
## gen_statem callbacks
@impl true
def callback_mode, do: :handle_event_function
@impl true
def init({listener, url, headers}) do
data = %__MODULE__{listener: listener, url: url, headers: headers}
{:ok, @no_state, data, {:next_event, :internal, :connect}}
end
@impl true
def handle_event(event_type, event_data, state, data)
def handle_event(:internal, :connect, @no_state, %__MODULE__{} = data) do
case Client.connect(data.url, data.headers) do
{:ok, conn, websocket, ref} ->
send(data.listener, {:connect, :ok, :connected})
send(self(), {:loop_ping, ref})
{:keep_state, %__MODULE__{data | http_conn: conn, ref: ref, websocket: websocket}}
{:transport_error, reason} ->
send(data.listener, {:connect, :error, reason})
{:keep_state_and_data, {{:timeout, :backoff}, @backoff, nil}}
{:server_error, binary} ->
{:response, %{type: {:error, error}}} = decode_response_or_event(binary)
send(data.listener, {:connect, :error, error.details})
{:keep_state_and_data, {{:timeout, :reconnect}, @backoff, nil}}
end
end
def handle_event({:timeout, :backoff}, nil, _state, _data) do
{:keep_state_and_data, {:next_event, :internal, :connect}}
end
def handle_event({:timeout, :reconnect}, nil, _state, _data) do
{:keep_state_and_data, {:next_event, :internal, :connect}}
end
def handle_event({:call, from}, {:request, request}, @no_state, %__MODULE__{id: id} = data) do
frame = LivebookProto.build_request_frame(request, id)
reply = Map.put(data.reply, id, from)
case Client.send(data.http_conn, data.websocket, data.ref, frame) do
{:ok, conn, websocket} ->
data = %__MODULE__{data | http_conn: conn, websocket: websocket, id: id + 1, reply: reply}
{:keep_state, data}
{:error, conn, websocket, reason} ->
data = %__MODULE__{data | http_conn: conn, websocket: websocket}
{:keep_state, data, {:reply, from, {:error, reason}}}
end
end
def handle_event(:info, {:loop_ping, ref}, @no_state, %__MODULE__{} = data)
when ref == data.ref and is_reference(ref) do
case Client.send(data.http_conn, data.websocket, data.ref, :ping) do
{:ok, conn, websocket} ->
Process.send_after(self(), {:loop_ping, data.ref}, @loop_ping_delay)
{:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}}
{:error, conn, websocket, _reason} ->
{:keep_state, %__MODULE__{data | http_conn: conn, websocket: websocket}}
end
end
def handle_event(:info, {:loop_ping, _another_ref}, @no_state, _data) do
:keep_state_and_data
end
def handle_event(:info, {:tcp_closed, _port} = message, @no_state, %__MODULE__{} = data) do
handle_websocket_message(message, data)
end
def handle_event(:info, {:tcp, _port, _data} = message, @no_state, %__MODULE__{} = data) do
handle_websocket_message(message, data)
end
def handle_event(:info, _message, @no_state, _data) do
:keep_state_and_data
end
# Private
defp handle_websocket_message(message, %__MODULE__{} = data) do
case Client.receive(data.http_conn, data.ref, data.websocket, message) do
{:ok, conn, websocket, binaries} ->
data = %__MODULE__{data | http_conn: conn, websocket: websocket}
data = send_received(binaries, data)
{:keep_state, data}
{:server_error, conn, websocket, reason} ->
send(data.listener, {:connect, :error, reason})
data = %__MODULE__{data | http_conn: conn, websocket: websocket}
{:keep_state, data, {:next_event, :internal, :connect}}
end
end
defp send_received([], data), do: data
defp send_received([_ | _] = binaries, data) do
for binary <- binaries, reduce: data do
acc ->
case decode_response_or_event(binary) do
{:response, %{id: -1, type: {:error, %{details: reason}}}} ->
reply_to_all({:error, reason}, acc)
{:response, %{id: id, type: {:error, %{details: reason}}}} ->
reply_to_id(id, {:error, reason}, acc)
{:response, %{id: id, type: {:changeset, %{errors: field_errors}}}} ->
reply_to_id(id, {:changeset_error, to_changeset_errors(field_errors)}, acc)
{:response, %{id: id, type: result}} ->
reply_to_id(id, result, acc)
{:event, %{type: {name, data}}} ->
send(acc.listener, {:event, name, data})
acc
end
end
end
defp to_changeset_errors(field_errors) do
for %{field: field, details: errors} <- field_errors, into: %{} do
{String.to_atom(field), errors}
end
end
defp reply_to_all(message, %__MODULE__{} = data) do
for {_id, caller} <- data.reply do
:gen_statem.reply(caller, message)
end
data
end
defp reply_to_id(id, message, %__MODULE__{} = data) do
{caller, reply} = Map.pop(data.reply, id)
if caller, do: :gen_statem.reply(caller, message)
%__MODULE__{data | reply: reply}
end
defp decode_response_or_event(data) do
case LivebookProto.Response.decode(data) do
%{type: nil} -> {:event, LivebookProto.Event.decode(data)}
response -> {:response, response}
end
end
end

View file

@ -25,7 +25,7 @@ defmodule LivebookWeb.SidebarHook do
{:halt, put_flash(socket, :info, "Livebook is shutting down. You can close this page.")}
end
@connection_events ~w(hub_connected hub_disconnected hub_changed)a
@connection_events ~w(hub_connected hub_changed)a
defp handle_info(event, socket) when event in @connection_events do
{:cont, assign(socket, saved_hubs: Livebook.Hubs.get_metadatas())}
@ -35,6 +35,13 @@ defmodule LivebookWeb.SidebarHook do
{:cont, assign(socket, saved_hubs: Livebook.Hubs.get_metadatas())}
end
defp handle_info({:hub_server_error, error}, socket) do
{:cont,
socket
|> assign(saved_hubs: Livebook.Hubs.get_metadatas())
|> put_flash(:error, error)}
end
defp handle_info(_event, socket), do: {:cont, socket}
defp handle_event("shutdown", _params, socket) do

View file

@ -185,7 +185,7 @@ defmodule LivebookWeb.LayoutHelpers do
defp sidebar_hub_link_with_tooltip(assigns) do
~H"""
<.link {hub_connection_link_opts(@hub.provider, @to, @current)}>
<.link {hub_connection_link_opts(@hub, @to, @current)}>
<div class="text-lg leading-6 w-[56px] flex justify-center">
<span class="relative">
<%= @hub.emoji %>
@ -238,10 +238,14 @@ defmodule LivebookWeb.LayoutHelpers do
class =
"h-7 flex items-center hover:text-white #{text_color} border-l-4 #{border_color} hover:border-white"
if tooltip = Provider.connection_error(hub) do
[navigate: to, data_tooltip: tooltip, class: "tooltip right " <> class]
else
if hub.connected? do
[navigate: to, class: class]
else
[
navigate: to,
data_tooltip: Provider.connection_error(hub.provider),
class: "tooltip right " <> class
]
end
end

View file

@ -2,59 +2,31 @@ defmodule LivebookProto do
@moduledoc false
alias LivebookProto.{
CreateSecretRequest,
CreateSecretResponse,
HandshakeRequest,
HandshakeResponse,
Request,
Response
Event,
SecretCreated,
SecretUpdated,
SecretDeleted,
UserSynchronized
}
@request_mapping (for {_id, field_prop} <- Request.__message_props__().field_props,
@event_mapping (for {_id, field_prop} <- Event.__message_props__().field_props,
into: %{} do
{field_prop.type, field_prop.name_atom}
end)
@response_mapping (for {_id, field_prop} <- Response.__message_props__().field_props,
into: %{} do
{field_prop.type, field_prop.name_atom}
end)
@type request_proto :: HandshakeRequest.t() | CreateSecretRequest.t()
@type response_proto :: HandshakeResponse.t() | CreateSecretResponse.t()
@type event_proto ::
SecretCreated.t()
| SecretUpdated.t()
| SecretDeleted.t()
| UserSynchronized.t()
@doc """
Builds a request frame with given data and id.
Builds an event with given data.
"""
@spec build_request_frame(request_proto(), integer()) :: {:binary, iodata()}
def build_request_frame(%struct{} = data, id \\ -1) do
type = request_type(struct)
message = Request.new!(id: id, type: {type, data})
{:binary, Request.encode(message)}
@spec build_event(event_proto()) :: Event.t()
def build_event(%struct{} = data) do
Event.new!(type: {event_type(struct), data})
end
@doc """
Builds a create secret request struct.
"""
@spec build_create_secret_request(keyword()) :: CreateSecretRequest.t()
defdelegate build_create_secret_request(fields), to: CreateSecretRequest, as: :new!
@doc """
Builds a handshake request struct.
"""
@spec build_handshake_request(keyword()) :: HandshakeRequest.t()
defdelegate build_handshake_request(fields), to: HandshakeRequest, as: :new!
@doc """
Builds a response with given data and id.
"""
@spec build_response(response_proto(), integer()) :: Response.t()
def build_response(%struct{} = data, id \\ -1) do
type = response_type(struct)
Response.new!(id: id, type: {type, data})
end
defp request_type(module), do: Map.fetch!(@request_mapping, module)
defp response_type(module), do: Map.fetch!(@response_mapping, module)
defp event_type(module), do: Map.fetch!(@event_mapping, module)
end

View file

@ -1,6 +0,0 @@
defmodule LivebookProto.ChangesetError do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field :errors, 1, repeated: true, type: LivebookProto.FieldError
end

View file

@ -1,7 +0,0 @@
defmodule LivebookProto.CreateSecretRequest do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field :name, 1, type: :string
field :value, 2, type: :string
end

View file

@ -1,4 +0,0 @@
defmodule LivebookProto.CreateSecretResponse do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
end

View file

@ -4,22 +4,22 @@ defmodule LivebookProto.Event do
oneof :type, 0
field :secret_created, 100,
field :secret_created, 1,
type: LivebookProto.SecretCreated,
json_name: "secretCreated",
oneof: 0
field :secret_updated, 101,
field :secret_updated, 2,
type: LivebookProto.SecretUpdated,
json_name: "secretUpdated",
oneof: 0
field :secret_deleted, 102,
field :secret_deleted, 3,
type: LivebookProto.SecretDeleted,
json_name: "secretDeleted",
oneof: 0
field :user_synchronized, 103,
field :user_synchronized, 4,
type: LivebookProto.UserSynchronized,
json_name: "userSynchronized",
oneof: 0

View file

@ -1,7 +0,0 @@
defmodule LivebookProto.FieldError do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field :field, 1, type: :string
field :details, 2, repeated: true, type: :string
end

View file

@ -1,6 +0,0 @@
defmodule LivebookProto.HandshakeRequest do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field :app_version, 1, type: :string, json_name: "appVersion"
end

View file

@ -1,8 +0,0 @@
defmodule LivebookProto.HandshakeResponse do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field :id, 1, type: :string
field :name, 2, type: :string
field :user, 3, type: LivebookProto.User
end

View file

@ -1,14 +0,0 @@
defmodule LivebookProto.Request do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
oneof :type, 0
field :id, 1, type: :int32
field :handshake, 2, type: LivebookProto.HandshakeRequest, oneof: 0
field :create_secret, 3,
type: LivebookProto.CreateSecretRequest,
json_name: "createSecret",
oneof: 0
end

View file

@ -1,16 +0,0 @@
defmodule LivebookProto.Response do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
oneof :type, 0
field :id, 1, type: :int32
field :error, 2, type: LivebookProto.Error, oneof: 0
field :changeset, 3, type: LivebookProto.ChangesetError, oneof: 0
field :handshake, 4, type: LivebookProto.HandshakeResponse, oneof: 0
field :create_secret, 5,
type: LivebookProto.CreateSecretResponse,
json_name: "createSecret",
oneof: 0
end

View file

@ -1,7 +0,0 @@
defmodule LivebookProto.User do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field :id, 1, type: :int32
field :email, 2, type: :string
end

View file

@ -2,7 +2,6 @@ defmodule LivebookProto.UserSynchronized do
@moduledoc false
use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3
field :id, 1, type: :string
field :name, 2, type: :string
field :secrets, 3, repeated: true, type: LivebookProto.Secret
field :name, 1, type: :string
field :secrets, 2, repeated: true, type: LivebookProto.Secret
end

View file

@ -1,26 +1,12 @@
syntax = "proto3";
message User {
int32 id = 1;
string email = 2;
}
message Secret {
string name = 1;
string value = 2;
}
message Error {
string details = 1;
}
message FieldError {
string field = 1;
repeated string details = 2;
}
message ChangesetError {
repeated FieldError errors = 1;
message Secret {
string name = 1;
string value = 2;
}
message SecretCreated {
@ -39,55 +25,15 @@ message SecretDeleted {
}
message UserSynchronized {
string id = 1;
string name = 2;
repeated Secret secrets = 3;
}
message HandshakeRequest {
string app_version = 1;
}
message HandshakeResponse {
string id = 1;
string name = 2;
User user = 3;
}
message CreateSecretRequest {
string name = 1;
string value = 2;
}
message CreateSecretResponse {
}
message Request {
int32 id = 1;
oneof type {
HandshakeRequest handshake = 2;
CreateSecretRequest create_secret = 3;
}
}
message Response {
int32 id = 1;
oneof type {
Error error = 2;
ChangesetError changeset = 3;
HandshakeResponse handshake = 4;
CreateSecretResponse create_secret = 5;
}
repeated Secret secrets = 2;
}
message Event {
oneof type {
SecretCreated secret_created = 100;
SecretUpdated secret_updated = 101;
SecretDeleted secret_deleted = 102;
UserSynchronized user_synchronized = 103;
SecretCreated secret_created = 1;
SecretUpdated secret_updated = 2;
SecretDeleted secret_deleted = 3;
UserSynchronized user_synchronized = 4;
}
}

View file

@ -0,0 +1,54 @@
defmodule Livebook.Hubs.TeamClientTest do
use Livebook.TeamsIntegrationCase, async: true
@moduletag :capture_log
alias Livebook.Hubs.TeamClient
setup do
Livebook.Hubs.subscribe([:connection])
:ok
end
describe "start_link/1" do
test "successfully authenticates the web socket connection", %{user: user, node: node} do
org = :erpc.call(node, Hub.Integration, :create_org, [])
org_key = :erpc.call(node, Hub.Integration, :create_org_key, [[org: org]])
token = :erpc.call(node, Hub.Integration, :associate_user_with_org, [user, org])
team =
build(:team,
id: "team-#{org.name}",
hub_name: org.name,
user_id: user.id,
org_id: org.id,
org_key_id: org_key.id,
session_token: token
)
refute TeamClient.connected?(team.id)
TeamClient.start_link(team)
assert_receive :hub_connected
assert TeamClient.connected?(team.id)
end
test "rejects the web socket connection with invalid credentials", %{user: user, token: token} do
team =
build(:team,
user_id: user.id,
org_id: 123_456,
org_key_id: 123_456,
session_token: token
)
TeamClient.start_link(team)
assert_receive {:hub_server_error, error}
assert error ==
"#{team.hub_name}: Your session is out-of-date. Please re-join the organization."
refute Livebook.Hubs.hub_exists?(team.id)
end
end
end

View file

@ -1,5 +1,5 @@
defmodule Livebook.HubsTest do
use Livebook.DataCase
use Livebook.TeamsIntegrationCase, async: true
alias Livebook.Hubs

View file

@ -1122,7 +1122,7 @@ defmodule Livebook.LiveMarkdown.ExportTest do
end
test "persists hub id when not default" do
Livebook.Factory.insert_hub(:team, id: "team-persisted-id")
Livebook.Factory.build(:team, id: "team-persisted-id")
notebook = %{
Notebook.new()

View file

@ -0,0 +1,44 @@
defmodule Livebook.Teams.ConnectionTest do
use Livebook.TeamsIntegrationCase, async: true
@moduletag :capture_log
alias Livebook.Teams.Connection
describe "connect" do
test "successfully authenticates the websocket connection", %{user: user, node: node} do
org = :erpc.call(node, Hub.Integration, :create_org, [])
org_key = :erpc.call(node, Hub.Integration, :create_org_key, [[org: org]])
token = :erpc.call(node, Hub.Integration, :associate_user_with_org, [user, org])
header = [
{"x-user", to_string(user.id)},
{"x-org", to_string(org.id)},
{"x-org-key", to_string(org_key.id)},
{"x-session-token", token}
]
assert {:ok, _conn} = Connection.start_link(self(), header)
assert_receive :connected
end
test "rejects the websocket connection with invalid credentials", %{user: user} do
header = [
{"x-user", to_string(user.id)},
{"x-org", to_string(user.id)},
{"x-org-key", to_string(user.id)},
{"x-session-token", "foo"}
]
assert {:ok, _conn} = Connection.start_link(self(), header)
assert_receive {:server_error,
"Your session is out-of-date. Please re-join the organization."}
assert {:ok, _conn} = Connection.start_link(self(), [])
assert_receive {:server_error,
"Invalid request. Please re-join the organization and update Livebook if the issue persists."}
end
end
end

View file

@ -198,14 +198,14 @@ defmodule LivebookWeb.HomeLiveTest do
end
end
describe "hubs sidebar" do
test "render section", %{conn: conn} do
describe "hubs" do
test "renders sidebar section", %{conn: conn} do
{:ok, _view, html} = live(conn, ~p"/")
assert html =~ "HUBS"
assert html =~ "Add Hub"
end
test "render persisted hubs", %{conn: conn} do
test "renders sidebar persisted hubs", %{conn: conn} do
team = insert_hub(:team, id: "team-foo-bar-id")
{:ok, _view, html} = live(conn, ~p"/")

View file

@ -1,5 +1,5 @@
defmodule LivebookWeb.Hub.EditLiveTest do
use LivebookWeb.ConnCase
use LivebookWeb.ConnCase, async: true
import Phoenix.LiveViewTest
import Livebook.TestHelpers
@ -22,7 +22,7 @@ defmodule LivebookWeb.Hub.EditLiveTest do
|> render_change(%{"personal" => attrs})
refute view
|> element("#enterprise-form .invalid-feedback")
|> element("#personal-form .invalid-feedback")
|> has_element?()
assert {:ok, view, _html} =

View file

@ -0,0 +1,50 @@
defmodule LivebookWeb.Integration.SessionLiveTest do
use Livebook.TeamsIntegrationCase, async: true
import Phoenix.LiveViewTest
alias Livebook.{Sessions, Session}
setup do
{:ok, session} = Sessions.create_session(notebook: Livebook.Notebook.new())
on_exit(fn ->
Session.close(session.pid)
end)
%{session: session}
end
describe "hubs" do
test "selects the notebook hub", %{conn: conn, user: user, node: node, session: session} do
org = :erpc.call(node, Hub.Integration, :create_org, [])
org_key = :erpc.call(node, Hub.Integration, :create_org_key, [[org: org]])
token = :erpc.call(node, Hub.Integration, :associate_user_with_org, [user, org])
hub =
insert_hub(:team,
id: "team-#{org.name}",
hub_name: org.name,
user_id: user.id,
org_id: org.id,
org_key_id: org_key.id,
session_token: token
)
id = hub.id
personal_id = Livebook.Hubs.Personal.id()
Session.subscribe(session.id)
{:ok, view, _} = live(conn, ~p"/sessions/#{session.id}")
assert Session.get_data(session.pid).notebook.hub_id == personal_id
view
|> element(~s/#select-hub-#{id}/)
|> render_click()
assert_receive {:operation, {:set_notebook_hub, _, ^id}}
assert Session.get_data(session.pid).notebook.hub_id == hub.id
end
end
end

View file

@ -1481,24 +1481,4 @@ defmodule LivebookWeb.SessionLiveTest do
Livebook.App.close(app.pid)
end
end
describe "hubs" do
test "selects the notebook hub", %{conn: conn, session: session} do
hub = insert_hub(:team)
id = hub.id
personal_id = Livebook.Hubs.Personal.id()
Session.subscribe(session.id)
{:ok, view, _} = live(conn, ~p"/sessions/#{session.id}")
assert Session.get_data(session.pid).notebook.hub_id == personal_id
view
|> element(~s/#select-hub-#{id}/)
|> render_click()
assert_receive {:operation, {:set_notebook_hub, _, ^id}}
assert Session.get_data(session.pid).notebook.hub_id == hub.id
end
end
end

View file

@ -24,7 +24,7 @@ defmodule Livebook.Factory do
user_id: 1,
org_key_id: 1,
teams_key: org.teams_key,
session_token: Livebook.Utils.random_cookie()
session_token: Livebook.Utils.random_short_id()
}
end

View file

@ -100,7 +100,7 @@ defmodule Livebook.TeamsServer do
end
def handle_info({_port, {:exit_status, status}}, _state) do
error("enterprise quit with status #{status}")
error("team quit with status #{status}")
System.halt(status)
end
@ -212,6 +212,10 @@ defmodule Livebook.TeamsServer do
System.get_env("TEAMS_DEBUG", "false")
end
defp proto do
System.get_env("TEAMS_LIVEBOOK_PROTO_PATH")
end
defp wait_on_start(state, port) do
url = state.url || fetch_url(state)
@ -257,6 +261,8 @@ defmodule Livebook.TeamsServer do
"DEBUG" => debug()
}
env = if proto(), do: Map.merge(env, %{"LIVEBOOK_PROTO_PATH" => proto()}), else: env
if state_env do
Map.merge(env, state_env)
else