diff --git a/lib/livebook/k8s/auth.ex b/lib/livebook/k8s/auth.ex
deleted file mode 100644
index 207fb0c68..000000000
--- a/lib/livebook/k8s/auth.ex
+++ /dev/null
@@ -1,65 +0,0 @@
-defmodule Livebook.K8s.Auth do
- # Implementation of Access Review checks for the authenticated user
- # using the `SelfSubjectAccessReview` [1] resource.
- #
- # [1]: https://kubernetes.io/docs/reference/kubernetes-api/authorization-resources/self-subject-access-review-v1/#SelfSubjectAccessReviewSpec
-
- @doc """
- Concurrently reviews access according to a list of `resource_attributes`.
-
- Expects `req` to be prepared for `SelfSubjectAccessReview`.
- """
- @spec batch_check(Req.Request.t(), [keyword()]) ::
- [:ok | {:error, %Req.Response{}} | {:error, Exception.t()}]
- def batch_check(req, resource_attribute_list) do
- resource_attribute_list
- |> Enum.map(&Task.async(fn -> can_i?(req, &1) end))
- |> Task.await_many(:infinity)
- end
-
- @doc """
- Reviews access according to `resource_attributes`.
-
- Expects `req` to be prepared for `SelfSubjectAccessReview`.
- """
- @spec can_i?(Req.Request.t(), keyword()) ::
- :ok | {:error, %Req.Response{}} | {:error, Exception.t()}
- def can_i?(req, resource_attributes) do
- resource_attributes =
- resource_attributes
- |> Keyword.validate!([
- :name,
- :namespace,
- :path,
- :resource,
- :subresource,
- :verb,
- :version,
- group: ""
- ])
- |> Enum.into(%{})
-
- access_review = %{
- "apiVersion" => "authorization.k8s.io/v1",
- "kind" => "SelfSubjectAccessReview",
- "spec" => %{
- "resourceAttributes" => resource_attributes
- }
- }
-
- create_self_subject_access_review(req, access_review)
- end
-
- defp create_self_subject_access_review(req, access_review) do
- case Kubereq.create(req, access_review) do
- {:ok, %Req.Response{status: 201, body: %{"status" => %{"allowed" => true}}}} ->
- :ok
-
- {:ok, %Req.Response{} = response} ->
- {:error, response}
-
- {:error, error} ->
- {:error, error}
- end
- end
-end
diff --git a/lib/livebook/k8s/pod.ex b/lib/livebook/k8s/pod.ex
index 74d28e311..9e4f7eac4 100644
--- a/lib/livebook/k8s/pod.ex
+++ b/lib/livebook/k8s/pod.ex
@@ -177,6 +177,38 @@ defmodule Livebook.K8s.Pod do
end
defp access_main_container() do
- Kubereq.Access.find(&(&1["name"] == @main_container_name))
+ access_find(&(&1["name"] == @main_container_name))
+ end
+
+ # TODO: use Access.find/1 once we require Elixir v1.17
+ defp access_find(predicate) when is_function(predicate, 1) do
+ fn op, data, next -> find(op, data, predicate, next) end
+ end
+
+ defp find(:get, data, predicate, next) when is_list(data) do
+ data |> Enum.find(predicate) |> next.()
+ end
+
+ defp find(:get_and_update, data, predicate, next) when is_list(data) do
+ get_and_update_find(data, [], predicate, next)
+ end
+
+ defp find(_op, data, _predicate, _next) do
+ raise "Access.find/1 expected a list, got: #{inspect(data)}"
+ end
+
+ defp get_and_update_find([], updates, _predicate, _next) do
+ {nil, :lists.reverse(updates)}
+ end
+
+ defp get_and_update_find([head | rest], updates, predicate, next) do
+ if predicate.(head) do
+ case next.(head) do
+ {get, update} -> {get, :lists.reverse([update | updates], rest)}
+ :pop -> {head, :lists.reverse(updates, rest)}
+ end
+ else
+ get_and_update_find(rest, [head | updates], predicate, next)
+ end
end
end
diff --git a/lib/livebook/k8s_api.ex b/lib/livebook/k8s_api.ex
new file mode 100644
index 000000000..aba4d7032
--- /dev/null
+++ b/lib/livebook/k8s_api.ex
@@ -0,0 +1,292 @@
+defmodule Livebook.K8sAPI do
+ # Calls to the Kubernetes API.
+
+ @type kubeconfig :: Kubereq.Kubeconfig.t()
+ @type error :: %{message: String.t(), status: pos_integer() | nil}
+
+ @doc """
+ Creates a new Pod.
+ """
+ @spec create_pod(kubeconfig(), map()) :: {:ok, data} | error()
+ when data: %{name: String.t()}
+ def create_pod(kubeconfig, manifest) do
+ req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/pods/:name")
+
+ case Kubereq.create(req, manifest) do
+ {:ok, %{status: 201, body: %{"metadata" => %{"name" => name}}}} ->
+ {:ok, %{name: name}}
+
+ other ->
+ result_to_error(other)
+ end
+ end
+
+ @doc """
+ Fetches Pod with the given name.
+ """
+ @spec get_pod(kubeconfig(), String.t(), String.t()) :: {:ok, data} | error()
+ when data: %{ip: String.t()}
+ def get_pod(kubeconfig, namespace, name) do
+ req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/pods/:name")
+
+ case Kubereq.get(req, namespace, name) do
+ {:ok, %{status: 200, body: pod}} ->
+ {:ok, %{ip: pod["status"]["podIP"]}}
+
+ other ->
+ result_to_error(other)
+ end
+ end
+
+ @doc """
+ Deletes Pod with the given name.
+ """
+ @spec delete_pod(kubeconfig(), String.t(), String.t()) :: :ok | error()
+ def delete_pod(kubeconfig, namespace, name) do
+ req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/pods/:name")
+
+ case Kubereq.delete(req, namespace, name) do
+ {:ok, %{status: 200}} -> :ok
+ other -> result_to_error(other)
+ end
+ end
+
+ @doc """
+ Awaits Pod to reach the ready status.
+ """
+ @spec await_pod_ready(kubeconfig(), String.t(), String.t()) :: :ok | error()
+ def await_pod_ready(kubeconfig, namespace, name) do
+ req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/pods/:name")
+
+ callback = fn
+ :deleted ->
+ {:error, %{message: "the Pod has been deleted", status: nil}}
+
+ pod ->
+ get_in(pod, [
+ "status",
+ "conditions",
+ Access.filter(&(&1["type"] == "Ready")),
+ "status"
+ ]) == ["True"]
+ end
+
+ # Wait up to 30 minutes
+ case Kubereq.wait_until(req, namespace, name, callback, 1_800_000) do
+ {:error, :watch_timeout} ->
+ {:error, %{message: "timed out waiting for Pod to become ready", status: nil}}
+
+ other ->
+ other
+ end
+ end
+
+ @doc """
+ Watches and streams Pod events to the caller.
+
+ The emitted events have the following shape:
+
+ %{message: String.t()}
+
+ """
+ @spec watch_pod_events(kubeconfig(), String.t(), String.t()) :: {:ok, Enumerable.t()} | error()
+ def watch_pod_events(kubeconfig, namespace, name) do
+ req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/events/:name")
+
+ Kubereq.watch(req, namespace,
+ field_selectors: [
+ {"involvedObject.kind", "Pod"},
+ {"involvedObject.name", name}
+ ]
+ )
+ |> case do
+ {:ok, stream} ->
+ stream =
+ Stream.map(stream, fn event ->
+ %{message: event["object"]["message"]}
+ end)
+
+ {:ok, stream}
+
+ other ->
+ result_to_error(other)
+ end
+ end
+
+ @doc """
+ Lists all namespaces in the cluster.
+ """
+ @spec list_namespaces(kubeconfig()) :: {:ok, data} | error()
+ when data: list(%{name: String.t()})
+ def list_namespaces(kubeconfig) do
+ req = Kubereq.new(kubeconfig, "api/v1/namespaces/:name")
+
+ case Kubereq.list(req, nil) do
+ {:ok, %{status: 200, body: %{"items" => items}}} ->
+ namespaces =
+ for item <- items do
+ %{name: item["metadata"]["name"]}
+ end
+
+ {:ok, namespaces}
+
+ other ->
+ result_to_error(other)
+ end
+ end
+
+ @doc """
+ Creates a Persistent Volume Claim.
+ """
+ @spec create_pvc(kubeconfig(), map()) :: {:ok, data} | error()
+ when data: %{name: String.t()}
+ def create_pvc(kubeconfig, manifest) do
+ req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/persistentvolumeclaims/:name")
+
+ case Kubereq.create(req, manifest) do
+ {:ok, %{status: 201, body: %{"metadata" => %{"name" => name}}}} ->
+ {:ok, %{name: name}}
+
+ other ->
+ result_to_error(other)
+ end
+ end
+
+ @doc """
+ Lists all Persistent Volume Claims.
+ """
+ @spec list_pvcs(kubeconfig(), String.t()) :: {:ok, data} | error()
+ when data: list(%{name: String.t(), deleted_at: String.t() | nil})
+ def list_pvcs(kubeconfig, namespace) do
+ req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/persistentvolumeclaims/:name")
+
+ case Kubereq.list(req, namespace) do
+ {:ok, %{status: 200, body: %{"items" => items}}} ->
+ storage_classes =
+ for item <- items do
+ %{
+ name: item["metadata"]["name"],
+ deleted_at: item["metadata"]["deletionTimestamp"]
+ }
+ end
+
+ {:ok, storage_classes}
+
+ other ->
+ result_to_error(other)
+ end
+ end
+
+ @doc """
+ Deletes Persistent Volume Claim with the given name.
+ """
+ @spec delete_pvc(kubeconfig(), String.t(), String.t()) :: :ok | error()
+ def delete_pvc(kubeconfig, namespace, name) do
+ req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/persistentvolumeclaims/:name")
+
+ case Kubereq.delete(req, namespace, name) do
+ {:ok, %{status: 200}} -> :ok
+ other -> result_to_error(other)
+ end
+ end
+
+ @doc """
+ Lists storage classes available in the cluster.
+ """
+ @spec list_storage_classes(kubeconfig()) :: {:ok, data} | error()
+ when data: list(%{name: String.t()})
+ def list_storage_classes(kubeconfig) do
+ req = Kubereq.new(kubeconfig, "apis/storage.k8s.io/v1/storageclasses/:name")
+
+ case Kubereq.list(req, nil) do
+ {:ok, %{status: 200, body: %{"items" => items}}} ->
+ storage_classes =
+ for item <- items do
+ %{name: item["metadata"]["name"]}
+ end
+
+ {:ok, storage_classes}
+
+ other ->
+ result_to_error(other)
+ end
+ end
+
+ @doc """
+ Reviews access according to `resource_attributes`.
+
+ Implements Access Review checks for the authenticated user using
+ the `SelfSubjectAccessReview` [1] resource.
+
+ [1]: https://kubernetes.io/docs/reference/kubernetes-api/authorization-resources/self-subject-access-review-v1/#SelfSubjectAccessReviewSpec
+ """
+ @spec create_access_review(kubeconfig(), keyword()) :: {:ok, data} | error()
+ when data: %{
+ allowed: boolean(),
+ resource: String.t(),
+ verb: String.t(),
+ namespace: String.t() | nil,
+ group: String.t(),
+ version: String.t()
+ }
+ def create_access_review(kubeconfig, resource_attributes) do
+ resource_attributes =
+ resource_attributes
+ |> Keyword.validate!([
+ :name,
+ :namespace,
+ :path,
+ :resource,
+ :subresource,
+ :verb,
+ :version,
+ group: ""
+ ])
+ |> Enum.into(%{})
+
+ access_review = %{
+ "apiVersion" => "authorization.k8s.io/v1",
+ "kind" => "SelfSubjectAccessReview",
+ "spec" => %{
+ "resourceAttributes" => resource_attributes
+ }
+ }
+
+ req = Kubereq.new(kubeconfig, "apis/authorization.k8s.io/v1/selfsubjectaccessreviews")
+
+ case Kubereq.create(req, access_review) do
+ {:ok, %Req.Response{status: 201, body: body}} ->
+ resource_attributes = body["spec"]["resourceAttributes"]
+
+ {:ok,
+ %{
+ allowed: body["status"]["allowed"],
+ resource: resource_attributes["resource"],
+ verb: resource_attributes["verb"],
+ namespace: resource_attributes["namespace"],
+ group: resource_attributes["group"],
+ version: resource_attributes["version"]
+ }}
+
+ other ->
+ result_to_error(other)
+ end
+ end
+
+ defp result_to_error({:ok, %{status: status, body: body}}) do
+ message =
+ case body do
+ %{"message" => message} when is_binary(message) ->
+ message
+
+ _ ->
+ "HTTP status #{status}"
+ end
+
+ {:error, %{message: message, status: status}}
+ end
+
+ defp result_to_error({:error, exception}) do
+ {:error, %{message: "reason: #{Exception.message(exception)}", status: nil}}
+ end
+end
diff --git a/lib/livebook/runtime/k8s.ex b/lib/livebook/runtime/k8s.ex
index bcacf7bd1..f7054a9eb 100644
--- a/lib/livebook/runtime/k8s.ex
+++ b/lib/livebook/runtime/k8s.ex
@@ -6,7 +6,7 @@ defmodule Livebook.Runtime.K8s do
# proxy a local port to the distribution port of the remote node.
# See `Livebook.Runtime.Fly` for more design details.
- defstruct [:config, :node, :req, :server_pid, :lv_pid, :pod_name]
+ defstruct [:config, :node, :server_pid, :lv_pid, :pod_name]
use GenServer, restart: :temporary
@@ -17,7 +17,6 @@ defmodule Livebook.Runtime.K8s do
@type t :: %__MODULE__{
node: node() | nil,
- req: Req.Request.t(),
server_pid: pid() | nil,
lv_pid: pid(),
pod_name: String.t() | nil
@@ -74,11 +73,10 @@ defmodule Livebook.Runtime.K8s do
{"remote_runtime_#{local_port}", local_port}
end
- req =
+ kubeconfig =
Kubereq.Kubeconfig.Default
|> Kubereq.Kubeconfig.load()
|> Kubereq.Kubeconfig.set_current_context(context)
- |> Kubereq.new("api/v1/namespaces/:namespace/pods/:name")
runtime_data = RemoteUtils.encode_runtime_data(node_base)
@@ -87,17 +85,17 @@ defmodule Livebook.Runtime.K8s do
{:ok, watcher_pid} =
DynamicSupervisor.start_child(
Livebook.RuntimeSupervisor,
- {Task, fn -> watcher(parent, req, config) end}
+ {Task, fn -> watcher(parent, kubeconfig, config) end}
)
with {:ok, pod_name} <-
with_log(caller, "create pod", fn ->
- create_pod(req, config, runtime_data)
+ create_pod(kubeconfig, config, runtime_data)
end),
_ <- send(watcher_pid, {:pod_created, pod_name}),
{:ok, pod_ip} <-
- with_pod_events(caller, "waiting for pod", req, namespace, pod_name, fn ->
- await_pod_ready(req, namespace, pod_name)
+ with_pod_events(caller, "waiting for pod", kubeconfig, namespace, pod_name, fn ->
+ await_pod_ready(kubeconfig, namespace, pod_name)
end),
child_node <- :"#{node_base}@#{pod_ip}",
:ok <-
@@ -145,35 +143,18 @@ defmodule Livebook.Runtime.K8s do
{:noreply, state}
end
- defp with_pod_events(caller, name, req, namespace, pod_name, fun) do
+ defp with_pod_events(caller, name, kubeconfig, namespace, pod_name, fun) do
with_log(caller, name, fn ->
runtime_pid = self()
event_watcher_pid =
spawn_link(fn ->
- watch_result =
- req
- |> Req.merge(
- resource_path: "api/v1/namespaces/:namespace/events/:name",
- resource_list_path: "api/v1/namespaces/:namespace/events"
- )
- |> Kubereq.watch(namespace,
- field_selectors: [
- {"involvedObject.kind", "Pod"},
- {"involvedObject.name", pod_name}
- ]
- )
-
- case watch_result do
- {:ok, stream} ->
- Enum.each(stream, fn event ->
- message = Livebook.Utils.downcase_first(event["object"]["message"])
- send(caller, {:runtime_connect_info, runtime_pid, message})
- Logger.debug(~s/[k8s runtime] Pod event: "#{message}"/)
- end)
-
- _error ->
- :ok
+ with {:ok, stream} <- Livebook.K8sAPI.watch_pod_events(kubeconfig, namespace, pod_name) do
+ for event <- stream do
+ message = Livebook.Utils.downcase_first(event.message)
+ send(caller, {:runtime_connect_info, runtime_pid, message})
+ Logger.debug(~s/[k8s runtime] Pod event: "#{message}"/)
+ end
end
end)
@@ -183,9 +164,9 @@ defmodule Livebook.Runtime.K8s do
end)
end
- defp watcher(parent, req, config) do
+ defp watcher(parent, kubeconfig, config) do
ref = Process.monitor(parent)
- watcher_loop(%{ref: ref, config: config, req: req, pod_name: nil})
+ watcher_loop(%{ref: ref, config: config, kubeconfig: kubeconfig, pod_name: nil})
end
defp watcher_loop(state) do
@@ -194,8 +175,7 @@ defmodule Livebook.Runtime.K8s do
# If the parent process is killed, we try to eagerly free the
# created resources
if pod_name = state.pod_name do
- namespace = state.config.namespace
- _ = Kubereq.delete(state.req, namespace, pod_name)
+ _ = Livebook.K8sAPI.delete_pod(state.kubeconfig, state.config.namespace, pod_name)
end
{:pod_created, pod_name} ->
@@ -206,7 +186,7 @@ defmodule Livebook.Runtime.K8s do
end
end
- defp create_pod(req, config, runtime_data) do
+ defp create_pod(kubeconfig, config, runtime_data) do
%{
pod_template: pod_template,
docker_tag: docker_tag,
@@ -245,15 +225,12 @@ defmodule Livebook.Runtime.K8s do
manifest
end
- case Kubereq.create(req, manifest) do
- {:ok, %{status: 201, body: %{"metadata" => %{"name" => pod_name}}}} ->
- {:ok, pod_name}
+ case Livebook.K8sAPI.create_pod(kubeconfig, manifest) do
+ {:ok, %{name: name}} ->
+ {:ok, name}
- {:ok, %{body: body}} ->
- {:error, "could not create Pod, reason: #{body["message"]}"}
-
- {:error, error} ->
- {:error, "could not create Pod, reason: #{Exception.message(error)}"}
+ {:error, %{message: message}} ->
+ {:error, "could not create Pod, reason: #{message}"}
end
end
@@ -324,38 +301,13 @@ defmodule Livebook.Runtime.K8s do
end
end
- defp await_pod_ready(req, namespace, pod_name) do
- with :ok <-
- Kubereq.wait_until(
- req,
- namespace,
- pod_name,
- fn
- :deleted ->
- {:error, "the Pod was deleted before it started running"}
-
- pod ->
- get_in(pod, [
- "status",
- "conditions",
- Access.filter(&(&1["type"] == "Ready")),
- "status"
- ]) == ["True"]
- end,
- # 30 minutes
- 1_800_000
- ),
- {:ok, %{status: 200, body: pod}} <- Kubereq.get(req, namespace, pod_name) do
- {:ok, pod["status"]["podIP"]}
+ defp await_pod_ready(kubeconfig, namespace, pod_name) do
+ with :ok <- Livebook.K8sAPI.await_pod_ready(kubeconfig, namespace, pod_name),
+ {:ok, %{ip: pod_ip}} <- Livebook.K8sAPI.get_pod(kubeconfig, namespace, pod_name) do
+ {:ok, pod_ip}
else
- {:error, :watch_timeout} ->
- {:error, "timed out waiting for Pod to start up"}
-
- {:error, error} ->
- {:error, error}
-
- _other ->
- {:error, "tailed getting the Pod's IP address"}
+ {:error, %{message: message}} ->
+ {:error, "failed while waiting for the Pod to start, reason: #{message}"}
end
end
diff --git a/lib/livebook_web/live/session_live/fly_runtime_component.ex b/lib/livebook_web/live/session_live/fly_runtime_component.ex
index 29844a153..12c98ffe0 100644
--- a/lib/livebook_web/live/session_live/fly_runtime_component.ex
+++ b/lib/livebook_web/live/session_live/fly_runtime_component.ex
@@ -356,7 +356,9 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
<.icon_button
phx-click="delete_volume"
phx-target={@myself}
- disabled={@volume_id == nil or (@volume_action != nil and @volume_action.inflight)}
+ disabled={
+ @volume_id == nil or (@volume_action != nil and @volume_action.status == :inflight)
+ }
>
<.remix_icon icon="delete-bin-6-line" />
@@ -380,7 +382,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
class="text-red-600 font-medium text-sm whitespace-nowrap"
phx-click="confirm_delete_volume"
phx-target={@myself}
- disabled={@volume_action.inflight}
+ disabled={@volume_action.status == :inflight}
>
<.remix_icon icon="delete-bin-6-line" class="align-middle mr-1" /> Delete
@@ -388,7 +390,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
class="text-gray-600 font-medium text-sm"
phx-click="cancel_delete_volume"
phx-target={@myself}
- disabled={@volume_action.inflight}
+ disabled={@volume_action.status == :inflight}
>
Cancel
@@ -415,9 +417,9 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
<.button
type="submit"
- disabled={not @volume_action.changeset.valid? or @volume_action.inflight}
+ disabled={not @volume_action.changeset.valid? or @volume_action.status == :inflight}
>
- <%= if(@volume_action.inflight, do: "Creating...", else: "Create") %>
+ <%= if(@volume_action.status == :inflight, do: "Creating...", else: "Create") %>
<.button
type="button"
@@ -425,12 +427,13 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
outlined
phx-click="cancel_new_volume"
phx-target={@myself}
+ disabled={@volume_action.status == :inflight}
>
Cancel
-
- <.message_box kind={:error} message={"Error: " <> error.message} />
+
+ <.message_box kind={:error} message={"Error: " <> @volume_action.error.message} />
@@ -459,7 +462,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
end
def handle_event("delete_volume", %{}, socket) do
- volume_action = %{type: :delete, inflight: false, error: nil}
+ volume_action = %{type: :delete, status: :initial, error: nil}
{:noreply, assign(socket, volume_action: volume_action)}
end
@@ -472,7 +475,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
end
def handle_event("new_volume", %{}, socket) do
- volume_action = %{type: :new, changeset: volume_changeset(), inflight: false, error: false}
+ volume_action = %{type: :new, changeset: volume_changeset(), status: :initial, error: nil}
{:noreply, assign(socket, volume_action: volume_action)}
end
@@ -592,7 +595,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
assign(socket, volumes: volumes, volume_id: volume.id, volume_action: nil)
{:error, error} ->
- assign_nested(socket, :volume_action, error: error, inflight: false)
+ assign_nested(socket, :volume_action, error: error, status: :error)
end
{:noreply, socket}
@@ -608,7 +611,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
assign(socket, volumes: volumes, volume_id: nil, volume_action: nil)
{:error, error} ->
- assign_nested(socket, :volume_action, error: error, inflight: false)
+ assign_nested(socket, :volume_action, error: error, status: :error)
end
{:noreply, socket}
@@ -767,7 +770,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
|> start_async(:delete_volume, fn ->
Livebook.FlyAPI.delete_volume(token, app_name, volume_id)
end)
- |> assign_nested(:volume_action, inflight: true)
+ |> assign_nested(:volume_action, status: :inflight)
end
defp create_volume(socket, name, size_gb) do
@@ -787,7 +790,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
|> start_async(:create_volume, fn ->
Livebook.FlyAPI.create_volume(token, app_name, name, region, size_gb, compute)
end)
- |> assign_nested(:volume_action, inflight: true)
+ |> assign_nested(:volume_action, status: :inflight)
end
defp build_config(socket) do
diff --git a/lib/livebook_web/live/session_live/k8s_runtime_component.ex b/lib/livebook_web/live/session_live/k8s_runtime_component.ex
index 97b3d9660..2f445230a 100644
--- a/lib/livebook_web/live/session_live/k8s_runtime_component.ex
+++ b/lib/livebook_web/live/session_live/k8s_runtime_component.ex
@@ -4,7 +4,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
import Ecto.Changeset
alias Livebook.{Session, Runtime}
- alias Livebook.K8s.{Auth, Pod, PVC}
+ alias Livebook.K8s.{Pod, PVC}
@kubeconfig_pipeline Application.compile_env(:livebook, :k8s_kubeconfig_pipeline)
@@ -23,7 +23,6 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
kubeconfig: kubeconfig,
context_options: context_options,
context: nil,
- reqs: nil,
cluster_check: %{status: :initial, error: nil},
namespace: nil,
namespace_options: nil,
@@ -118,15 +117,13 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
phx-change="set_context"
phx-nosubmit
phx-target={@myself}
- class="mt-1"
+ class="mt-1 flex flex-col gap-4"
>
<.select_field name="context" value={@context} label="Context" options={@context_options} />
+ <.loader :if={@cluster_check.status == :inflight} />
+ <.cluster_check_error :if={@cluster_check.status == :error} error={@cluster_check.error} />
- <.loader :if={@cluster_check.status == :inflight} />
-
- <.cluster_check_error :if={@cluster_check.status == :error} error={@cluster_check.error} />
-
- <.message_box :if={@rbac.status === :errors} kind={:error}>
+ <.message_box :if={@rbac.status == :errors} kind={:error}>
<%= for error <- @rbac.errors do %>
<.rbac_error error={error} />
<% end %>
@@ -260,7 +257,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
@@ -313,7 +310,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
class="text-red-600 font-medium text-sm whitespace-nowrap"
phx-click="confirm_delete_pvc"
phx-target={@myself}
- disabled={@pvc_action[:type] == :delete_inflight}
+ disabled={@pvc_action.status == :inflight}
>
<.remix_icon icon="delete-bin-6-line" class="align-middle mr-1" />
<%= if @pvc_action[:type] == :delete, do: "Delete", else: "Deleting..." %>
@@ -322,7 +319,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
class="text-gray-600 font-medium text-sm"
phx-click="cancel_delete_pvc"
phx-target={@myself}
- disabled={@pvc_action[:type] == :delete_inflight}
+ disabled={@pvc_action.status == :inflight}
>
Cancel
@@ -331,7 +328,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
<.form
:let={pvcf}
- :if={@pvc_action[:type] in [:new, :new_inflight]}
+ :if={@pvc_action[:type] == :new}
for={@pvc_action.changeset}
as={:pvc}
phx-submit="create_pvc"
@@ -354,25 +351,27 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
<.select_field field={pvcf[:storage_class]} options={@pvc_action.storage_classes} />
<.button
- :if={@pvc_action[:type] == :new}
type="submit"
- disabled={not @pvc_action.changeset.valid? or @pvc_action[:type] == :new_inflight}
+ disabled={not @pvc_action.changeset.valid? or @pvc_action.status == :inflight}
>
- <%= if @pvc_action[:type] == :new, do: "Create", else: "Creating..." %>
+ <%= if(@pvc_action.status == :inflight, do: "Creating...", else: "Create") %>
<.button
- :if={@pvc_action[:type] == :new}
type="button"
color="gray"
outlined
phx-click="cancel_new_pvc"
phx-target={@myself}
- disabled={@pvc_action[:type] == :new_inflight}
+ disabled={@pvc_action.status == :inflight}
>
Cancel
- <.error :if={@pvc_action[:error]}><%= @pvc_action[:error] %>
+ <.message_box
+ :if={@pvc_action[:status] == :error}
+ kind={:error}
+ message={"Error: " <> @pvc_action.error.message}
+ />
"""
@@ -389,58 +388,47 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
defp cluster_check_error(%{error: %{status: 401}} = assigns) do
~H"""
- <.message_box kind={:error}>
-
-
Authentication with cluster failed.
-
-
- """
- end
-
- defp cluster_check_error(%{error: %{reason: :timeout}} = assigns) do
- ~H"""
- <.message_box kind={:error}>
-
-
Connection to cluster timed out.
-
-
+ <.message_box kind={:error} message="Authentication with cluster failed." />
"""
end
defp cluster_check_error(assigns) do
~H"""
- <.message_box kind={:error}>
-
-
Connection to cluster failed.
-
-
+ <.message_box kind={:error} message={"Connection to cluster failed, reason: " <> @error.message} />
"""
end
- defp rbac_error(%{error: %Req.Response{status: 201} = resp} = assigns) do
- resourceAttributes = resp.body["spec"]["resourceAttributes"]
- verb = resourceAttributes["verb"]
- namespace = resourceAttributes["namespace"]
+ defp rbac_error(%{error: {:ok, access_review}} = assigns) do
+ namespace = access_review.namespace
+ verb = access_review.verb
- gkv =
+ path =
String.trim(
- "#{resourceAttributes["group"]}/#{resourceAttributes["version"]}/#{resourceAttributes["resource"]}",
+ "#{access_review.group}/#{access_review.version}/#{access_review.resource}",
"/"
)
- assigns = assign(assigns, verb: verb, gkv: gkv, namespace: namespace)
+ assigns = assign(assigns, verb: verb, path: path, namespace: namespace)
~H"""
Authenticated user has no permission to <%= @verb %>
- <%= @gkv %>
+ <%= @path %>
in namespace <%= @namespace %>
(or the namespace doesn't exist).
"""
end
+ defp rbac_error(%{error: {:error, %{message: message}}} = assigns) do
+ assigns = assign(assigns, :message, message)
+
+ ~H"""
+ <%= @message %>
+ """
+ end
+
@impl true
def handle_event("set_context", %{"context" => context}, socket) do
{:noreply, socket |> set_context(context) |> set_namespace(nil)}
@@ -467,8 +455,8 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
type: :new,
changeset: PVC.changeset(),
storage_classes: storage_classes(socket.assigns),
- inflight: false,
- error: false
+ status: :initial,
+ error: nil
}
{:noreply, assign(socket, pvc_action: pvc_action)}
@@ -501,18 +489,20 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
end
def handle_event("delete_pvc", %{}, socket) do
- pvc_action = %{type: :delete, error: nil}
+ pvc_action = %{type: :delete, status: :initial, error: nil}
{:noreply, assign(socket, pvc_action: pvc_action)}
end
def handle_event("confirm_delete_pvc", %{}, socket) do
%{namespace: namespace, pvc_name: name} = socket.assigns
- req = socket.assigns.reqs.pvc
+ kubeconfig = socket.assigns.kubeconfig
socket =
socket
- |> start_async(:delete_pvc, fn -> Kubereq.delete(req, namespace, name) end)
- |> assign_nested(:pvc_action, type: :delete_inflight)
+ |> start_async(:delete_pvc, fn ->
+ Livebook.K8sAPI.delete_pvc(kubeconfig, namespace, name)
+ end)
+ |> assign_nested(:pvc_action, status: :inflight)
{:noreply, socket}
end
@@ -536,26 +526,46 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
@impl true
def handle_async(:rbac_check, {:ok, %{errors: errors, permissions: permissions}}, socket) do
- status = if errors === [], do: :ok, else: :errors
+ status = if errors == [], do: :ok, else: :errors
{:noreply, assign(socket, :rbac, %{status: status, errors: errors, permissions: permissions})}
end
- def handle_async(:load_namespace_options, {:ok, [:ok, {:ok, resp}]}, socket) do
+ def handle_async(:cluster_check, {:ok, results}, socket) do
+ [access_review_result, namespaces_result] = results
+
+ access_review_result =
+ case access_review_result do
+ {:ok, %{allowed: true}} -> :ok
+ {:ok, %{allowed: false}} -> {:error, %{message: "no access", status: nil}}
+ error -> error
+ end
+
+ namespaces_result =
+ case namespaces_result do
+ {:ok, namespaces} ->
+ namespace_options = Enum.map(namespaces, & &1.name)
+ {:ok, namespace_options, List.first(namespace_options)}
+
+ {:error, %{status: 403}} ->
+ # No access to list namespaces, we will show an input instead
+ {:ok, nil, nil}
+
+ other ->
+ other
+ end
+
socket =
- case resp do
- %Req.Response{status: 200, body: %{"items" => resources}} ->
- namespace_options = Enum.map(resources, & &1["metadata"]["name"])
-
- socket
- |> assign(:namespace_options, namespace_options)
- |> set_namespace(List.first(namespace_options))
- |> assign(:cluster_check, %{status: :ok, error: nil})
-
- %Req.Response{status: _other} ->
- # cannot list namespaces
+ with :ok <- access_review_result,
+ {:ok, namespace_options, namespace} <- namespaces_result do
+ socket
+ |> assign(:namespace_options, namespace_options)
+ |> set_namespace(namespace)
+ |> assign(:cluster_check, %{status: :ok, error: nil})
+ else
+ {:error, error} ->
socket
|> assign(:namespace_options, nil)
- |> assign(:cluster_check, %{status: :ok, error: nil})
+ |> assign(:cluster_check, %{status: :error, error: error})
end
{:noreply, socket}
@@ -564,13 +574,13 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
def handle_async(:delete_pvc, {:ok, result}, socket) do
socket =
case result do
- {:ok, %{status: 200}} ->
+ :ok ->
socket
|> assign(pvc_name: nil, pvc_action: nil)
|> pvc_options()
- {:ok, %{body: %{"message" => message}}} ->
- assign_nested(socket, :pvc_action, error: message, type: :delete)
+ {:error, error} ->
+ assign_nested(socket, :pvc_action, status: :error, error: error)
end
{:noreply, socket}
@@ -579,40 +589,18 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
def handle_async(:create_pvc, {:ok, result}, socket) do
socket =
case result do
- {:ok, %{status: 201, body: created_pvc}} ->
+ {:ok, %{name: name}} ->
socket
- |> assign(pvc_name: created_pvc["metadata"]["name"], pvc_action: nil)
+ |> assign(pvc_name: name, pvc_action: nil)
|> pvc_options()
- {:ok, %{body: body}} ->
- socket
- |> assign_nested(:pvc_action,
- error: "Creating the PVC failed: #{body["message"]}",
- type: :new
- )
-
- {:error, error} when is_exception(error) ->
- socket
- |> assign_nested(:pvc_action,
- error: "Creating the PVC failed: #{Exception.message(error)}",
- type: :new
- )
+ {:error, error} ->
+ assign_nested(socket, :pvc_action, status: :error, error: error)
end
{:noreply, socket}
end
- def handle_async(:load_namespace_options, {:ok, results}, socket) do
- {:error, error} = List.first(results, &match?({:error, _}, &1))
-
- socket =
- socket
- |> assign(:namespace_options, nil)
- |> assign(:cluster_check, %{status: :error, error: error})
-
- {:noreply, socket}
- end
-
defp label(namespace, runtime, runtime_status) do
reconnecting? = reconnecting?(namespace, runtime)
@@ -630,11 +618,11 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
defp create_pvc(socket, pvc) do
namespace = socket.assigns.namespace
manifest = PVC.manifest(pvc, namespace)
- req = socket.assigns.reqs.pvc
+ kubeconfig = socket.assigns.kubeconfig
socket
- |> start_async(:create_pvc, fn -> Kubereq.create(req, manifest) end)
- |> assign_nested(:pvc_action, type: :new_inflight)
+ |> start_async(:create_pvc, fn -> Livebook.K8sAPI.create_pvc(kubeconfig, manifest) end)
+ |> assign_nested(:pvc_action, status: :inflight)
end
defp set_context(socket, nil), do: assign(socket, :context, nil)
@@ -642,26 +630,18 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
defp set_context(socket, context) do
kubeconfig = Kubereq.Kubeconfig.set_current_context(socket.assigns.kubeconfig, context)
- reqs = %{
- access_reviews:
- Kubereq.new(kubeconfig, "apis/authorization.k8s.io/v1/selfsubjectaccessreviews"),
- namespaces: Kubereq.new(kubeconfig, "api/v1/namespaces/:name"),
- pvc: Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/persistentvolumeclaims/:name"),
- sc: Kubereq.new(kubeconfig, "apis/storage.k8s.io/v1/storageclasses/:name")
- }
-
socket
- |> start_async(:load_namespace_options, fn ->
+ |> start_async(:cluster_check, fn ->
[
Task.async(fn ->
- Livebook.K8s.Auth.can_i?(reqs.access_reviews,
+ Livebook.K8sAPI.create_access_review(kubeconfig,
verb: "create",
group: "authorization.k8s.io",
version: "v1",
resource: "selfsubjectaccessreviews"
)
end),
- Task.async(fn -> Kubereq.list(reqs.namespaces, nil) end)
+ Task.async(fn -> Livebook.K8sAPI.list_namespaces(kubeconfig) end)
]
|> Task.await_many(:infinity)
end)
@@ -670,8 +650,6 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
context: context,
namespace: nil,
namespace_options: nil,
- rbac_error: nil,
- reqs: reqs,
cluster_check: %{status: :inflight, error: nil}
)
end
@@ -681,36 +659,38 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
end
defp set_namespace(socket, ns) do
- reqs = socket.assigns.reqs
+ kubeconfig = socket.assigns.kubeconfig
socket
|> start_async(:rbac_check, fn ->
+ resource_attribute_list = [
+ # Required permissions
+ [verb: "get", version: "v1", resource: "pods", namespace: ns],
+ [verb: "list", version: "v1", resource: "pods", namespace: ns],
+ [verb: "watch", version: "v1", resource: "pods", namespace: ns],
+ [verb: "create", version: "v1", resource: "pods", namespace: ns],
+ [verb: "delete", version: "v1", resource: "pods", namespace: ns],
+ [verb: "create", version: "v1", resource: "pods/portforward", namespace: ns],
+ # Optional permissions
+ [verb: "list", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
+ [verb: "create", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
+ [verb: "delete", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
+ [verb: "list", version: "v1", resource: "storageclasses", namespace: ns]
+ ]
+
{required_permissions, optional_permissions} =
- Auth.batch_check(reqs.access_reviews, [
- # required permissions:
- [verb: "get", version: "v1", resource: "pods", namespace: ns],
- [verb: "list", version: "v1", resource: "pods", namespace: ns],
- [verb: "watch", version: "v1", resource: "pods", namespace: ns],
- [verb: "create", version: "v1", resource: "pods", namespace: ns],
- [verb: "delete", version: "v1", resource: "pods", namespace: ns],
- [verb: "create", version: "v1", resource: "pods/portforward", namespace: ns],
- # optional permissions:
- [verb: "list", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
- [verb: "create", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
- [verb: "delete", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
- [verb: "list", version: "v1", resource: "storageclasses", namespace: ns]
- ])
+ resource_attribute_list
+ |> Enum.map(&Task.async(fn -> Livebook.K8sAPI.create_access_review(kubeconfig, &1) end))
+ |> Task.await_many(:infinity)
|> Enum.split(6)
errors =
- required_permissions
- |> Enum.reject(&(&1 === :ok))
- |> Enum.map(fn {:error, error} -> error end)
+ Enum.reject(required_permissions, &match?({:ok, %{allowed: true}}, &1))
permissions =
optional_permissions
- |> Enum.map(&(&1 === :ok))
- |> then(&Enum.zip([:list_pvc, :create_pvc, :delete_pvc, :list_sc], &1))
+ |> Enum.map(&match?({:ok, %{allowed: true}}, &1))
+ |> then(&Enum.zip([:list_pvc, :create_pvc, :delete_pvc, :list_storage_classes], &1))
|> Map.new()
%{errors: errors, permissions: permissions}
@@ -751,33 +731,26 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
end
defp pvc_options(socket) do
- %{reqs: %{pvc: req}, namespace: ns} = socket.assigns
+ %{kubeconfig: kubeconfig, namespace: namespace} = socket.assigns
- case Kubereq.list(req, ns) do
- {:ok, %Req.Response{status: 200} = resp} ->
- pvcs =
- resp.body["items"]
- |> Enum.reject(& &1["metadata"]["deletionTimestamp"])
- |> Enum.map(& &1["metadata"]["name"])
+ case Livebook.K8sAPI.list_pvcs(kubeconfig, namespace) do
+ {:ok, pvcs} ->
+ pvcs = for pvc <- pvcs, pvc.deleted_at == nil, do: pvc.name
+ assign(socket, :pvcs, pvcs)
- socket
- |> assign(:pvcs, pvcs)
-
- _ ->
+ _other ->
assign(socket, :pvcs, [])
end
end
- defp storage_classes(%{rbac: %{permissions: %{list_sc: false}}}), do: []
+ defp storage_classes(%{rbac: %{permissions: %{list_storage_classes: false}}}), do: []
- defp storage_classes(assigns) do
- %{reqs: %{sc: req}} = assigns
+ defp storage_classes(%{kubeconfig: kubeconfig}) do
+ case Livebook.K8sAPI.list_storage_classes(kubeconfig) do
+ {:ok, storage_classes} ->
+ Enum.map(storage_classes, & &1.name)
- case Kubereq.list(req, nil) do
- {:ok, %Req.Response{status: 200} = resp} ->
- Enum.map(resp.body["items"], & &1["metadata"]["name"])
-
- _ ->
+ _other ->
[]
end
end
diff --git a/test/livebook/runtime/k8s_test.exs b/test/livebook/runtime/k8s_test.exs
index 1b6139506..2a4241d29 100644
--- a/test/livebook/runtime/k8s_test.exs
+++ b/test/livebook/runtime/k8s_test.exs
@@ -47,9 +47,8 @@ defmodule Livebook.Runtime.K8sTest do
test "connecting flow" do
config = config()
- req = req()
- assert [] = list_pods(req)
+ assert [] = list_pods()
pid = Runtime.K8s.new(config) |> Runtime.connect()
@@ -70,7 +69,7 @@ defmodule Livebook.Runtime.K8sTest do
Runtime.take_ownership(runtime)
- assert [_] = list_pods(req)
+ assert [_] = list_pods()
# Verify that we can actually evaluate code on the Kubernetes Pod
Runtime.evaluate_code(runtime, :elixir, ~s/System.fetch_env!("POD_NAME")/, {:c1, :e1}, [])
@@ -80,23 +79,10 @@ defmodule Livebook.Runtime.K8sTest do
Runtime.disconnect(runtime)
# Wait for Pod to terminate
- assert :ok ==
- Kubereq.wait_until(
- req,
- "default",
- runtime.pod_name,
- &(&1["status"]["phase"] == "Succeeded")
- )
+ cmd!(~w(kubectl wait --for=jsonpath={.status.phase}=Succeeded pod/#{runtime.pod_name}))
# Finally, delete the Pod object
- Kubereq.delete(req, "default", runtime.pod_name)
- end
-
- defp req() do
- Kubereq.Kubeconfig.Default
- |> Kubereq.Kubeconfig.load()
- |> Kubereq.Kubeconfig.set_current_context("kind-#{@cluster_name}")
- |> Kubereq.new("api/v1/namespaces/:namespace/pods/:name")
+ cmd!(~w(kubectl delete pod #{runtime.pod_name}))
end
defp config(attrs \\ %{}) do
@@ -123,21 +109,19 @@ defmodule Livebook.Runtime.K8sTest do
Map.merge(defaults, attrs)
end
- defp list_pods(req) do
- {:ok, resp} =
- Kubereq.list(req, "default",
- label_selectors: [{"livebook.dev/runtime", "integration-test"}],
- field_selectors: [{"status.phase", "Running"}]
- )
-
- resp.body["items"]
+ defp list_pods() do
+ cmd!(
+ ~w(kubectl get pod --selector=livebook.dev/runtime=integration-test --field-selector=status.phase==Running --output json)
+ )
+ |> Jason.decode!()
+ |> Map.fetch!("items")
end
defp cmd!([command | args]) do
{output, status} = System.cmd(command, args, stderr_to_stdout: true)
if status != 0 do
- raise "command #{inspect(command)} #{inspect(args)} failed"
+ raise "command #{inspect(command)} #{inspect(args)} failed with output:\n#{output}"
end
output
diff --git a/test/livebook_web/live/session_live_test.exs b/test/livebook_web/live/session_live_test.exs
index b175f9d89..e1026d392 100644
--- a/test/livebook_web/live/session_live_test.exs
+++ b/test/livebook_web/live/session_live_test.exs
@@ -1205,6 +1205,8 @@ defmodule LivebookWeb.SessionLiveTest do
|> element(~s{form[phx-change="set_namespace"]})
|> render_change(%{namespace: "default"})
+ render_async(view)
+
assert view
|> element(~s{select[name="pvc_name"] option[value="foo-pvc"]})
|> has_element?()
diff --git a/test/support/k8s_cluster_stub.ex b/test/support/k8s_cluster_stub.ex
index 57e13a72b..c6c8a6645 100644
--- a/test/support/k8s_cluster_stub.ex
+++ b/test/support/k8s_cluster_stub.ex
@@ -12,12 +12,16 @@ defmodule Livebook.K8sClusterStub do
plug :dispatch
post "/apis/authorization.k8s.io/v1/selfsubjectaccessreviews" do
- resource = conn.body_params["spec"]["resourceAttributes"]["resource"]
+ resource_attributes = conn.body_params["spec"]["resourceAttributes"]
+ resource = resource_attributes["resource"]
allowed = conn.host == "default" or resource == "selfsubjectaccessreviews"
conn
|> put_status(201)
- |> Req.Test.json(%{"status" => %{"allowed" => allowed}})
+ |> Req.Test.json(%{
+ "status" => %{"allowed" => allowed},
+ "spec" => %{"resourceAttributes" => resource_attributes}
+ })
end
get "/api/v1/namespaces", host: "default" do