Implement Livebook Teams API (#1904)

This commit is contained in:
Alexandre de Souza 2023-05-16 13:21:49 -03:00 committed by GitHub
parent b4053c7dbb
commit 932e43ac65
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 646 additions and 33 deletions

View file

@ -25,6 +25,7 @@ config :mime, :types, %{
config :plug_cowboy, :log_exceptions_with_status_code, [407..599]
config :livebook,
teams_url: nil,
app_service_name: nil,
app_service_url: nil,
authentication_mode: :token,

View file

@ -122,6 +122,10 @@ defmodule Livebook do
config :livebook, :iframe_url, url
end
if url = Livebook.Config.teams_url!("LIVEBOOK_TEAMS_URL") do
config :livebook, :teams_url, url
end
if Livebook.Config.boolean!("LIVEBOOK_SHUTDOWN_ENABLED", false) do
config :livebook, :shutdown_callback, {System, :stop, []}
end

View file

@ -138,6 +138,14 @@ defmodule Livebook.Config do
Application.get_env(:livebook, :iframe_url)
end
@doc """
Returns the configured URL for the Livebook Teams endpoint.
"""
@spec teams_url() :: String.t() | nil
def teams_url() do
Application.get_env(:livebook, :teams_url)
end
@doc """
Shuts down the system, if possible.
"""
@ -407,6 +415,13 @@ defmodule Livebook.Config do
System.get_env(env)
end
@doc """
Parses teams url from env.
"""
def teams_url!(env) do
System.get_env(env)
end
@doc """
Parses and validates default runtime from env.
"""

View file

@ -55,6 +55,8 @@ defmodule Livebook.Hubs.EnterpriseClient do
@spec get_secrets(String.t()) :: list(Secret.t())
def get_secrets(id) do
GenServer.call(registry_name(id), :get_secrets)
catch
:exit, _ -> []
end
@doc """

71
lib/livebook/teams.ex Normal file
View file

@ -0,0 +1,71 @@
defmodule Livebook.Teams do
@moduledoc false
alias Livebook.Teams.{Client, Org}
@doc """
Creates an Org.
With success, returns the response from Livebook Teams API to continue the org creation flow.
Otherwise, it will return an error tuple with changeset.
"""
@spec create_org(Org.t(), map()) ::
{:ok, map()}
| {:error, Ecto.Changeset.t()}
| {:transport_error, String.t()}
def create_org(%Org{} = org, attrs) do
changeset = Org.changeset(org, attrs)
with {:ok, %Org{} = org} <- Ecto.Changeset.apply_action(changeset, :insert),
{:ok, response} <- Client.create_org(org) do
{:ok, response}
else
{:error, %Ecto.Changeset{} = changeset} ->
{:error, changeset}
{:error, %{"errors" => errors_map}} ->
errors_map =
if errors = errors_map["key_hash"],
do: Map.put_new(errors_map, "teams_key", errors),
else: errors_map
{:error, add_org_errors(changeset, errors_map)}
any ->
any
end
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking org changes.
"""
@spec change_org(Org.t(), map()) :: Ecto.Changeset.t()
def change_org(%Org{} = org, attrs \\ %{}) do
Org.changeset(org, attrs)
end
@doc """
Send a request to Livebook Teams API to get an org request.
"""
@spec get_org_request_completion_data(Org.t()) ::
{:ok, map() | :awaiting_confirmation}
| {:error, :expired}
| {:transport_error, String.t()}
def get_org_request_completion_data(%Org{id: id}) do
case Client.get_org_request_completion_data(id) do
{:ok, %{"status" => "awaiting_confirmation"}} -> {:ok, :awaiting_confirmation}
{:ok, completion_data} -> {:ok, completion_data}
{:error, %{"status" => "expired"}} -> {:error, :expired}
any -> any
end
end
defp add_org_errors(%Ecto.Changeset{} = changeset, errors_map) do
for {key, errors} <- errors_map,
field <- String.to_atom(key),
field in Org.__schema__(:fields),
error <- errors,
reduce: changeset,
do: (acc -> Ecto.Changeset.add_error(acc, field, error))
end
end

View file

@ -0,0 +1,64 @@
defmodule Livebook.Teams.Client do
@moduledoc false
alias Livebook.Teams.Org
alias Livebook.Utils.HTTP
@doc """
Send a request to Livebook Team API to create a new org.
"""
@spec create_org(Org.t()) ::
{:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def create_org(org) do
hash = :crypto.hash(:sha256, org.teams_key)
key_hash = Base.url_encode64(hash)
post("/api/org-request", %{name: org.name, key_hash: key_hash})
end
@doc """
Send a request to Livebook Team API to get an org request.
"""
@spec get_org_request_completion_data(pos_integer()) ::
{:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def get_org_request_completion_data(id) do
get("/api/org-request/#{id}")
end
defp post(path, json, header \\ []) do
body = {"application/json", Jason.encode!(json)}
request(:post, path, body: body, header: header)
end
defp get(path, params \\ %{}, header \\ []) do
query_string = URI.encode_query(params)
path = if query_string != "", do: "#{path}?#{query_string}", else: path
request(:get, path, header: header)
end
defp request(method, path, opts) do
endpoint = Livebook.Config.teams_url()
url = endpoint <> path
case HTTP.request(method, url, opts) do
{:ok, status, header, body} when status in 200..299 ->
if json?(header),
do: {:ok, Jason.decode!(body)},
else: {:error, body}
{:ok, status, header, body} when status in [410, 422] ->
if json?(header),
do: {:error, Jason.decode!(body)},
else: {:transport_error, body}
_otherwise ->
{:transport_error,
"Something went wrong, try again later or please file a bug if it persists"}
end
end
defp json?(headers) do
HTTP.fetch_content_type(headers) == {:ok, "application/json"}
end
end

41
lib/livebook/teams/org.ex Normal file
View file

@ -0,0 +1,41 @@
defmodule Livebook.Teams.Org do
@moduledoc false
use Ecto.Schema
import Ecto.Changeset
@type t :: %__MODULE__{
id: pos_integer() | nil,
name: String.t() | nil,
teams_key: String.t() | nil,
user_code: String.t() | nil
}
embedded_schema do
field :name, :string
field :teams_key, :string
field :user_code, :string
end
@fields ~w(id name teams_key user_code)a
@doc """
Generates a new teams key.
"""
@spec teams_key() :: String.t()
def teams_key, do: Base.url_encode64(:crypto.strong_rand_bytes(32), padding: false)
@doc false
def changeset(org, attrs) do
org
|> cast(attrs, @fields)
|> generate_teams_key()
|> validate_required(@fields -- [:id])
end
defp generate_teams_key(changeset) do
if get_field(changeset, :teams_key),
do: changeset,
else: put_change(changeset, :teams_key, teams_key())
end
end

View file

@ -51,7 +51,7 @@ defmodule Livebook.WebSocket.ClientConnection do
send(data.listener, {:connect, :ok, :connected})
send(self(), {:loop_ping, ref})
{:ok, %__MODULE__{data | http_conn: conn, ref: ref, websocket: websocket}}
{:keep_state, %__MODULE__{data | http_conn: conn, ref: ref, websocket: websocket}}
{:transport_error, reason} ->
send(data.listener, {:connect, :error, reason})
@ -60,10 +60,15 @@ defmodule Livebook.WebSocket.ClientConnection do
{:server_error, binary} ->
{:response, %{type: {:error, error}}} = decode_response_or_event(binary)
send(data.listener, {:connect, :error, error.details})
{:keep_state_and_data, {{:timeout, :reconnect}, @backoff, nil}}
end
end
def handle_event({:timeout, :backoff}, nil, _state, _data) do
{:keep_state_and_data, {:next_event, :internal, :connect}}
end
def handle_event({:timeout, :reconnect}, nil, _state, _data) do
{:keep_state_and_data, {:next_event, :internal, :connect}}
end

View file

@ -0,0 +1,105 @@
defmodule Livebook.TeamsTest do
use Livebook.TeamsIntegrationCase, async: true
alias Livebook.Teams
describe "create_org/1" do
test "returns the device flow data to confirm the org creation" do
org = build(:org)
assert {:ok,
%{
"device_code" => _device_code,
"expires_in" => 300,
"id" => _org_id,
"user_code" => _user_code,
"verification_uri" => _verification_uri
}} = Teams.create_org(org, %{})
end
test "returns changeset errors when data is invalid" do
org = build(:org)
assert {:error, changeset} = Teams.create_org(org, %{name: nil})
assert "can't be blank" in errors_on(changeset).name
end
end
describe "get_org_request_completion_data/1" do
test "returns the org data when it has been confirmed", %{node: node, user: user} do
teams_key = Teams.Org.teams_key()
key_hash = :crypto.hash(:sha256, teams_key)
org_request = :erpc.call(node, Hub.Integration, :create_org_request, [[key_hash: key_hash]])
org_request = :erpc.call(node, Hub.Integration, :confirm_org_request, [org_request, user])
org =
build(:org,
id: org_request.id,
name: org_request.name,
teams_key: teams_key,
user_code: org_request.user_code
)
%{
org: %{id: id, name: name, keys: [%{id: org_key_id}]},
user: %{id: user_id},
sessions: [%{token: token}]
} = org_request.user_org
assert Teams.get_org_request_completion_data(org) ==
{:ok,
%{
"id" => id,
"name" => name,
"org_key_id" => org_key_id,
"session_token" => token,
"user_id" => user_id
}}
end
test "returns the org request awaiting confirmation", %{node: node} do
teams_key = Teams.Org.teams_key()
key_hash = :crypto.hash(:sha256, teams_key)
org_request = :erpc.call(node, Hub.Integration, :create_org_request, [[key_hash: key_hash]])
org =
build(:org,
id: org_request.id,
name: org_request.name,
teams_key: teams_key,
user_code: org_request.user_code
)
assert Teams.get_org_request_completion_data(org) == {:ok, :awaiting_confirmation}
end
test "returns error when org request doesn't exist" do
org = build(:org, id: 0)
assert {:transport_error, _embarrassing} = Teams.get_org_request_completion_data(org)
end
test "returns error when org request expired", %{node: node} do
now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
expires_at = NaiveDateTime.add(now, -5000)
teams_key = Teams.Org.teams_key()
key_hash = :crypto.hash(:sha256, teams_key)
org_request =
:erpc.call(node, Hub.Integration, :create_org_request, [
[expires_at: expires_at, key_hash: key_hash]
])
org =
build(:org,
id: org_request.id,
name: org_request.name,
teams_key: teams_key,
user_code: org_request.user_code
)
assert Teams.get_org_request_completion_data(org) == {:error, :expired}
end
end
end

View file

@ -63,8 +63,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponentTest do
render_submit(form, attrs)
assert_receive {:secret_created, ^secret}
assert render(view) =~ "A new secret has been created on your Livebook Hub"
assert has_element?(view, "#hub-#{enterprise.id}-secret-#{secret.name}-wrapper")
assert has_element?(view, "#hub-#{enterprise.id}-secret-#{secret.name}")
end
test "toggle a secret from Enterprise hub",
@ -197,34 +196,5 @@ defmodule LivebookWeb.SessionLive.SecretsComponentTest do
assert output == "\e[32m\"#{secret.value}\"\e[0m"
end
test "shows secret events from Enterprise hub",
%{conn: conn, session: session, enterprise: enterprise, node: node} do
{:ok, view, _html} = live(conn, ~p"/sessions/#{session.id}/secrets")
secret =
build(:secret,
name: "EVENT_SECRET",
value: "123",
hub_id: enterprise.id,
readonly: true
)
# We need the `Secret` schema from enterprise to execute
# the following functions inside `Enterprise.Integration`
enterprise_secret =
:erpc.call(node, Enterprise.Integration, :create_secret, [secret.name, secret.value])
assert_receive {:secret_created, ^secret}
assert render(view) =~ "A new secret has been created on your Livebook Hub"
:erpc.call(node, Enterprise.Integration, :update_secret, [enterprise_secret, secret.value])
assert_receive {:secret_updated, ^secret}
assert render(view) =~ "An existing secret has been updated on your Livebook Hub"
:erpc.call(node, Enterprise.Integration, :delete_secret, [enterprise_secret])
assert_receive {:secret_deleted, ^secret}
assert render(view) =~ "An existing secret has been deleted on your Livebook Hub"
end
end
end

View file

@ -71,6 +71,15 @@ defmodule Livebook.Factory do
}
end
def build(:org) do
%Livebook.Teams.Org{
id: nil,
name: "Org Name #{System.unique_integer([:positive])}",
teams_key: Livebook.Teams.Org.teams_key(),
user_code: "request"
}
end
def build(factory_name, attrs) do
factory_name |> build() |> struct!(attrs)
end

View file

@ -0,0 +1,293 @@
defmodule Livebook.TeamsServer do
@moduledoc false
use GenServer
defstruct [:node, :token, :user, :org, :teams_key, :port, :app_port, :url, :env]
@name __MODULE__
@timeout 10_000
@default_teams_dir "../hub"
def available?() do
System.get_env("TEAMS_PATH") != nil or File.exists?(@default_teams_dir)
end
def start(opts \\ []) do
GenServer.start(__MODULE__, opts, name: @name)
end
def url() do
GenServer.call(@name, :fetch_url, @timeout)
end
def token() do
GenServer.call(@name, :fetch_token, @timeout)
end
def user() do
GenServer.call(@name, :fetch_user, @timeout)
end
def get_node() do
GenServer.call(@name, :fetch_node, @timeout)
end
def drop_database() do
app_port = GenServer.call(@name, :fetch_port)
state_env = GenServer.call(@name, :fetch_env)
app_port |> env(state_env) |> mix(["ecto.drop", "--quiet"])
end
# GenServer Callbacks
@impl true
def init(opts) do
state = struct!(__MODULE__, opts)
{:ok, %{state | node: app_node()}, {:continue, :start_app}}
end
@impl true
def handle_continue(:start_app, state) do
ensure_app_dir!()
prepare_database(state)
{:noreply, %{state | port: start_app(state)}}
end
@impl true
def handle_call(:fetch_token, _from, state) do
state = if state.token, do: state, else: ensure_session_token(state)
{:reply, state.token, state}
end
@impl true
def handle_call(:fetch_user, _from, state) do
state = if state.user, do: state, else: ensure_user(state)
{:reply, state.user, state}
end
@impl true
def handle_call(:fetch_url, _from, state) do
state = if state.app_port, do: state, else: %{state | app_port: app_port()}
url = state.url || fetch_url(state)
{:reply, url, %{state | url: url}}
end
def handle_call(:fetch_node, _from, state) do
{:reply, state.node, state}
end
def handle_call(:fetch_port, _from, state) do
app_port = state.app_port || app_port()
{:reply, app_port, %{state | app_port: app_port}}
end
def handle_call(:fetch_env, _from, state) do
{:reply, state.env, state}
end
# Port Callbacks
@impl true
def handle_info({_port, {:data, message}}, state) do
info(message)
{:noreply, state}
end
def handle_info({_port, {:exit_status, status}}, _state) do
error("enterprise quit with status #{status}")
System.halt(status)
end
# Private
defp call_erpc_function(node, function, args \\ []) do
:erpc.call(node, Hub.Integration, function, args)
end
defp ensure_session_token(state) do
state =
state
|> ensure_user()
|> ensure_org()
|> ensure_teams_key()
token = call_erpc_function(state.node, :associate_user_with_org, [state.user, state.org])
%{state | token: token}
end
defp ensure_user(state) do
if state.user,
do: state,
else: %{state | user: call_erpc_function(state.node, :create_user)}
end
defp ensure_org(state) do
if state.org,
do: state,
else: %{state | org: call_erpc_function(state.node, :create_org)}
end
defp ensure_teams_key(state) do
if state.teams_key,
do: state,
else: %{
state
| teams_key: call_erpc_function(state.node, :create_org_key, [[org: state.org]]).key_hash
}
end
defp start_app(state) do
env =
for {key, value} <- env(state), into: [] do
{String.to_charlist(key), String.to_charlist(value)}
end
args = [
"-e",
"spawn(fn -> IO.gets([]) && System.halt(0) end)",
"--sname",
to_string(state.node),
"--cookie",
to_string(Node.get_cookie()),
"-S",
"mix",
"phx.server"
]
port =
Port.open({:spawn_executable, elixir_executable()}, [
:exit_status,
:use_stdio,
:stderr_to_stdout,
:binary,
:hide,
env: env,
cd: app_dir(),
args: args
])
wait_on_start(state, port)
end
defp fetch_url(state) do
port = state.app_port || app_port()
"http://localhost:#{port}"
end
defp prepare_database(state) do
:ok = mix(state, ["ecto.drop", "--quiet"])
:ok = mix(state, ["ecto.create", "--quiet"])
:ok = mix(state, ["ecto.migrate", "--quiet"])
end
defp ensure_app_dir! do
dir = app_dir()
unless File.exists?(dir) do
IO.puts(
"Unable to find #{dir}, make sure to clone the hub repository " <>
"into it to run integration tests or set TEAMS_PATH to its location"
)
System.halt(1)
end
end
defp app_dir do
System.get_env("TEAMS_PATH", @default_teams_dir)
end
defp app_port do
System.get_env("TEAMS_PORT", "4043")
end
defp debug do
System.get_env("TEAMS_DEBUG", "false")
end
defp wait_on_start(state, port) do
url = state.url || fetch_url(state)
case :httpc.request(:get, {~c"#{url}/public/health", []}, [], []) do
{:ok, _} ->
port
{:error, _} ->
Process.sleep(10)
wait_on_start(state, port)
end
end
defp mix(state, args) when is_struct(state) do
state |> env() |> mix(args)
end
defp mix(env, args) do
cmd_opts = [
stderr_to_stdout: true,
env: env,
cd: app_dir(),
into: IO.stream(:stdio, :line)
]
args = ["--erl", "-elixir ansi_enabled true", "-S", "mix" | args]
case System.cmd(elixir_executable(), args, cmd_opts) do
{_, 0} -> :ok
_ -> :error
end
end
defp env(state) do
app_port = state.app_port || app_port()
env(app_port, state.env)
end
defp env(app_port, state_env) do
env = %{
"MIX_ENV" => "livebook",
"PORT" => to_string(app_port),
"DEBUG" => debug()
}
if state_env do
Map.merge(env, state_env)
else
env
end
end
defp elixir_executable do
System.find_executable("elixir")
end
defp app_node do
:"teams_#{Livebook.Utils.random_short_id()}@#{hostname()}"
end
defp hostname do
[nodename, hostname] =
node()
|> Atom.to_charlist()
|> :string.split(~c"@")
with {:ok, nodenames} <- :erl_epmd.names(hostname),
true <- List.keymember?(nodenames, nodename, 0) do
hostname
else
_ ->
raise "Error"
end
end
defp info(message), do: log([:blue, message <> "\n"])
defp error(message), do: log([:red, message <> "\n"])
defp log(data), do: data |> IO.ANSI.format() |> IO.write()
end

View file

@ -0,0 +1,32 @@
defmodule Livebook.TeamsIntegrationCase do
use ExUnit.CaseTemplate
alias Livebook.TeamsServer
using do
quote do
use Livebook.DataCase
use LivebookWeb.ConnCase
@moduletag :teams_integration
alias Livebook.TeamsServer
end
end
setup_all do
case TeamsServer.start() do
{:ok, _} -> :ok
{:error, {:already_started, _}} -> :ok
end
token = TeamsServer.token()
url = TeamsServer.url()
user = TeamsServer.user()
node = TeamsServer.get_node()
Application.put_env(:livebook, :teams_url, url, persistent: true)
{:ok, node: node, token: token, user: user}
end
end

View file

@ -55,6 +55,7 @@ ExUnit.start(
assert_receive_timeout: if(windows?, do: 2_500, else: 1_500),
exclude: [
erl_docs: erl_docs_available?,
enterprise_integration: not Livebook.EnterpriseServer.available?()
enterprise_integration: not Livebook.EnterpriseServer.available?(),
teams_integration: not Livebook.TeamsServer.available?()
]
)