Encapsulate Kubernetes API calls (#2790)

This commit is contained in:
Jonatan Kłosko 2024-09-20 10:30:50 +02:00 committed by GitHub
parent de7552a99b
commit ea6331f324
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 512 additions and 335 deletions

View file

@ -1,65 +0,0 @@
defmodule Livebook.K8s.Auth do
# Implementation of Access Review checks for the authenticated user
# using the `SelfSubjectAccessReview` [1] resource.
#
# [1]: https://kubernetes.io/docs/reference/kubernetes-api/authorization-resources/self-subject-access-review-v1/#SelfSubjectAccessReviewSpec
@doc """
Concurrently reviews access according to a list of `resource_attributes`.
Expects `req` to be prepared for `SelfSubjectAccessReview`.
"""
@spec batch_check(Req.Request.t(), [keyword()]) ::
[:ok | {:error, %Req.Response{}} | {:error, Exception.t()}]
def batch_check(req, resource_attribute_list) do
resource_attribute_list
|> Enum.map(&Task.async(fn -> can_i?(req, &1) end))
|> Task.await_many(:infinity)
end
@doc """
Reviews access according to `resource_attributes`.
Expects `req` to be prepared for `SelfSubjectAccessReview`.
"""
@spec can_i?(Req.Request.t(), keyword()) ::
:ok | {:error, %Req.Response{}} | {:error, Exception.t()}
def can_i?(req, resource_attributes) do
resource_attributes =
resource_attributes
|> Keyword.validate!([
:name,
:namespace,
:path,
:resource,
:subresource,
:verb,
:version,
group: ""
])
|> Enum.into(%{})
access_review = %{
"apiVersion" => "authorization.k8s.io/v1",
"kind" => "SelfSubjectAccessReview",
"spec" => %{
"resourceAttributes" => resource_attributes
}
}
create_self_subject_access_review(req, access_review)
end
defp create_self_subject_access_review(req, access_review) do
case Kubereq.create(req, access_review) do
{:ok, %Req.Response{status: 201, body: %{"status" => %{"allowed" => true}}}} ->
:ok
{:ok, %Req.Response{} = response} ->
{:error, response}
{:error, error} ->
{:error, error}
end
end
end

View file

@ -177,6 +177,38 @@ defmodule Livebook.K8s.Pod do
end
defp access_main_container() do
Kubereq.Access.find(&(&1["name"] == @main_container_name))
access_find(&(&1["name"] == @main_container_name))
end
# TODO: use Access.find/1 once we require Elixir v1.17
defp access_find(predicate) when is_function(predicate, 1) do
fn op, data, next -> find(op, data, predicate, next) end
end
defp find(:get, data, predicate, next) when is_list(data) do
data |> Enum.find(predicate) |> next.()
end
defp find(:get_and_update, data, predicate, next) when is_list(data) do
get_and_update_find(data, [], predicate, next)
end
defp find(_op, data, _predicate, _next) do
raise "Access.find/1 expected a list, got: #{inspect(data)}"
end
defp get_and_update_find([], updates, _predicate, _next) do
{nil, :lists.reverse(updates)}
end
defp get_and_update_find([head | rest], updates, predicate, next) do
if predicate.(head) do
case next.(head) do
{get, update} -> {get, :lists.reverse([update | updates], rest)}
:pop -> {head, :lists.reverse(updates, rest)}
end
else
get_and_update_find(rest, [head | updates], predicate, next)
end
end
end

292
lib/livebook/k8s_api.ex Normal file
View file

@ -0,0 +1,292 @@
defmodule Livebook.K8sAPI do
# Calls to the Kubernetes API.
@type kubeconfig :: Kubereq.Kubeconfig.t()
@type error :: %{message: String.t(), status: pos_integer() | nil}
@doc """
Creates a new Pod.
"""
@spec create_pod(kubeconfig(), map()) :: {:ok, data} | error()
when data: %{name: String.t()}
def create_pod(kubeconfig, manifest) do
req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/pods/:name")
case Kubereq.create(req, manifest) do
{:ok, %{status: 201, body: %{"metadata" => %{"name" => name}}}} ->
{:ok, %{name: name}}
other ->
result_to_error(other)
end
end
@doc """
Fetches Pod with the given name.
"""
@spec get_pod(kubeconfig(), String.t(), String.t()) :: {:ok, data} | error()
when data: %{ip: String.t()}
def get_pod(kubeconfig, namespace, name) do
req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/pods/:name")
case Kubereq.get(req, namespace, name) do
{:ok, %{status: 200, body: pod}} ->
{:ok, %{ip: pod["status"]["podIP"]}}
other ->
result_to_error(other)
end
end
@doc """
Deletes Pod with the given name.
"""
@spec delete_pod(kubeconfig(), String.t(), String.t()) :: :ok | error()
def delete_pod(kubeconfig, namespace, name) do
req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/pods/:name")
case Kubereq.delete(req, namespace, name) do
{:ok, %{status: 200}} -> :ok
other -> result_to_error(other)
end
end
@doc """
Awaits Pod to reach the ready status.
"""
@spec await_pod_ready(kubeconfig(), String.t(), String.t()) :: :ok | error()
def await_pod_ready(kubeconfig, namespace, name) do
req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/pods/:name")
callback = fn
:deleted ->
{:error, %{message: "the Pod has been deleted", status: nil}}
pod ->
get_in(pod, [
"status",
"conditions",
Access.filter(&(&1["type"] == "Ready")),
"status"
]) == ["True"]
end
# Wait up to 30 minutes
case Kubereq.wait_until(req, namespace, name, callback, 1_800_000) do
{:error, :watch_timeout} ->
{:error, %{message: "timed out waiting for Pod to become ready", status: nil}}
other ->
other
end
end
@doc """
Watches and streams Pod events to the caller.
The emitted events have the following shape:
%{message: String.t()}
"""
@spec watch_pod_events(kubeconfig(), String.t(), String.t()) :: {:ok, Enumerable.t()} | error()
def watch_pod_events(kubeconfig, namespace, name) do
req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/events/:name")
Kubereq.watch(req, namespace,
field_selectors: [
{"involvedObject.kind", "Pod"},
{"involvedObject.name", name}
]
)
|> case do
{:ok, stream} ->
stream =
Stream.map(stream, fn event ->
%{message: event["object"]["message"]}
end)
{:ok, stream}
other ->
result_to_error(other)
end
end
@doc """
Lists all namespaces in the cluster.
"""
@spec list_namespaces(kubeconfig()) :: {:ok, data} | error()
when data: list(%{name: String.t()})
def list_namespaces(kubeconfig) do
req = Kubereq.new(kubeconfig, "api/v1/namespaces/:name")
case Kubereq.list(req, nil) do
{:ok, %{status: 200, body: %{"items" => items}}} ->
namespaces =
for item <- items do
%{name: item["metadata"]["name"]}
end
{:ok, namespaces}
other ->
result_to_error(other)
end
end
@doc """
Creates a Persistent Volume Claim.
"""
@spec create_pvc(kubeconfig(), map()) :: {:ok, data} | error()
when data: %{name: String.t()}
def create_pvc(kubeconfig, manifest) do
req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/persistentvolumeclaims/:name")
case Kubereq.create(req, manifest) do
{:ok, %{status: 201, body: %{"metadata" => %{"name" => name}}}} ->
{:ok, %{name: name}}
other ->
result_to_error(other)
end
end
@doc """
Lists all Persistent Volume Claims.
"""
@spec list_pvcs(kubeconfig(), String.t()) :: {:ok, data} | error()
when data: list(%{name: String.t(), deleted_at: String.t() | nil})
def list_pvcs(kubeconfig, namespace) do
req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/persistentvolumeclaims/:name")
case Kubereq.list(req, namespace) do
{:ok, %{status: 200, body: %{"items" => items}}} ->
storage_classes =
for item <- items do
%{
name: item["metadata"]["name"],
deleted_at: item["metadata"]["deletionTimestamp"]
}
end
{:ok, storage_classes}
other ->
result_to_error(other)
end
end
@doc """
Deletes Persistent Volume Claim with the given name.
"""
@spec delete_pvc(kubeconfig(), String.t(), String.t()) :: :ok | error()
def delete_pvc(kubeconfig, namespace, name) do
req = Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/persistentvolumeclaims/:name")
case Kubereq.delete(req, namespace, name) do
{:ok, %{status: 200}} -> :ok
other -> result_to_error(other)
end
end
@doc """
Lists storage classes available in the cluster.
"""
@spec list_storage_classes(kubeconfig()) :: {:ok, data} | error()
when data: list(%{name: String.t()})
def list_storage_classes(kubeconfig) do
req = Kubereq.new(kubeconfig, "apis/storage.k8s.io/v1/storageclasses/:name")
case Kubereq.list(req, nil) do
{:ok, %{status: 200, body: %{"items" => items}}} ->
storage_classes =
for item <- items do
%{name: item["metadata"]["name"]}
end
{:ok, storage_classes}
other ->
result_to_error(other)
end
end
@doc """
Reviews access according to `resource_attributes`.
Implements Access Review checks for the authenticated user using
the `SelfSubjectAccessReview` [1] resource.
[1]: https://kubernetes.io/docs/reference/kubernetes-api/authorization-resources/self-subject-access-review-v1/#SelfSubjectAccessReviewSpec
"""
@spec create_access_review(kubeconfig(), keyword()) :: {:ok, data} | error()
when data: %{
allowed: boolean(),
resource: String.t(),
verb: String.t(),
namespace: String.t() | nil,
group: String.t(),
version: String.t()
}
def create_access_review(kubeconfig, resource_attributes) do
resource_attributes =
resource_attributes
|> Keyword.validate!([
:name,
:namespace,
:path,
:resource,
:subresource,
:verb,
:version,
group: ""
])
|> Enum.into(%{})
access_review = %{
"apiVersion" => "authorization.k8s.io/v1",
"kind" => "SelfSubjectAccessReview",
"spec" => %{
"resourceAttributes" => resource_attributes
}
}
req = Kubereq.new(kubeconfig, "apis/authorization.k8s.io/v1/selfsubjectaccessreviews")
case Kubereq.create(req, access_review) do
{:ok, %Req.Response{status: 201, body: body}} ->
resource_attributes = body["spec"]["resourceAttributes"]
{:ok,
%{
allowed: body["status"]["allowed"],
resource: resource_attributes["resource"],
verb: resource_attributes["verb"],
namespace: resource_attributes["namespace"],
group: resource_attributes["group"],
version: resource_attributes["version"]
}}
other ->
result_to_error(other)
end
end
defp result_to_error({:ok, %{status: status, body: body}}) do
message =
case body do
%{"message" => message} when is_binary(message) ->
message
_ ->
"HTTP status #{status}"
end
{:error, %{message: message, status: status}}
end
defp result_to_error({:error, exception}) do
{:error, %{message: "reason: #{Exception.message(exception)}", status: nil}}
end
end

View file

@ -6,7 +6,7 @@ defmodule Livebook.Runtime.K8s do
# proxy a local port to the distribution port of the remote node.
# See `Livebook.Runtime.Fly` for more design details.
defstruct [:config, :node, :req, :server_pid, :lv_pid, :pod_name]
defstruct [:config, :node, :server_pid, :lv_pid, :pod_name]
use GenServer, restart: :temporary
@ -17,7 +17,6 @@ defmodule Livebook.Runtime.K8s do
@type t :: %__MODULE__{
node: node() | nil,
req: Req.Request.t(),
server_pid: pid() | nil,
lv_pid: pid(),
pod_name: String.t() | nil
@ -74,11 +73,10 @@ defmodule Livebook.Runtime.K8s do
{"remote_runtime_#{local_port}", local_port}
end
req =
kubeconfig =
Kubereq.Kubeconfig.Default
|> Kubereq.Kubeconfig.load()
|> Kubereq.Kubeconfig.set_current_context(context)
|> Kubereq.new("api/v1/namespaces/:namespace/pods/:name")
runtime_data = RemoteUtils.encode_runtime_data(node_base)
@ -87,17 +85,17 @@ defmodule Livebook.Runtime.K8s do
{:ok, watcher_pid} =
DynamicSupervisor.start_child(
Livebook.RuntimeSupervisor,
{Task, fn -> watcher(parent, req, config) end}
{Task, fn -> watcher(parent, kubeconfig, config) end}
)
with {:ok, pod_name} <-
with_log(caller, "create pod", fn ->
create_pod(req, config, runtime_data)
create_pod(kubeconfig, config, runtime_data)
end),
_ <- send(watcher_pid, {:pod_created, pod_name}),
{:ok, pod_ip} <-
with_pod_events(caller, "waiting for pod", req, namespace, pod_name, fn ->
await_pod_ready(req, namespace, pod_name)
with_pod_events(caller, "waiting for pod", kubeconfig, namespace, pod_name, fn ->
await_pod_ready(kubeconfig, namespace, pod_name)
end),
child_node <- :"#{node_base}@#{pod_ip}",
:ok <-
@ -145,35 +143,18 @@ defmodule Livebook.Runtime.K8s do
{:noreply, state}
end
defp with_pod_events(caller, name, req, namespace, pod_name, fun) do
defp with_pod_events(caller, name, kubeconfig, namespace, pod_name, fun) do
with_log(caller, name, fn ->
runtime_pid = self()
event_watcher_pid =
spawn_link(fn ->
watch_result =
req
|> Req.merge(
resource_path: "api/v1/namespaces/:namespace/events/:name",
resource_list_path: "api/v1/namespaces/:namespace/events"
)
|> Kubereq.watch(namespace,
field_selectors: [
{"involvedObject.kind", "Pod"},
{"involvedObject.name", pod_name}
]
)
case watch_result do
{:ok, stream} ->
Enum.each(stream, fn event ->
message = Livebook.Utils.downcase_first(event["object"]["message"])
send(caller, {:runtime_connect_info, runtime_pid, message})
Logger.debug(~s/[k8s runtime] Pod event: "#{message}"/)
end)
_error ->
:ok
with {:ok, stream} <- Livebook.K8sAPI.watch_pod_events(kubeconfig, namespace, pod_name) do
for event <- stream do
message = Livebook.Utils.downcase_first(event.message)
send(caller, {:runtime_connect_info, runtime_pid, message})
Logger.debug(~s/[k8s runtime] Pod event: "#{message}"/)
end
end
end)
@ -183,9 +164,9 @@ defmodule Livebook.Runtime.K8s do
end)
end
defp watcher(parent, req, config) do
defp watcher(parent, kubeconfig, config) do
ref = Process.monitor(parent)
watcher_loop(%{ref: ref, config: config, req: req, pod_name: nil})
watcher_loop(%{ref: ref, config: config, kubeconfig: kubeconfig, pod_name: nil})
end
defp watcher_loop(state) do
@ -194,8 +175,7 @@ defmodule Livebook.Runtime.K8s do
# If the parent process is killed, we try to eagerly free the
# created resources
if pod_name = state.pod_name do
namespace = state.config.namespace
_ = Kubereq.delete(state.req, namespace, pod_name)
_ = Livebook.K8sAPI.delete_pod(state.kubeconfig, state.config.namespace, pod_name)
end
{:pod_created, pod_name} ->
@ -206,7 +186,7 @@ defmodule Livebook.Runtime.K8s do
end
end
defp create_pod(req, config, runtime_data) do
defp create_pod(kubeconfig, config, runtime_data) do
%{
pod_template: pod_template,
docker_tag: docker_tag,
@ -245,15 +225,12 @@ defmodule Livebook.Runtime.K8s do
manifest
end
case Kubereq.create(req, manifest) do
{:ok, %{status: 201, body: %{"metadata" => %{"name" => pod_name}}}} ->
{:ok, pod_name}
case Livebook.K8sAPI.create_pod(kubeconfig, manifest) do
{:ok, %{name: name}} ->
{:ok, name}
{:ok, %{body: body}} ->
{:error, "could not create Pod, reason: #{body["message"]}"}
{:error, error} ->
{:error, "could not create Pod, reason: #{Exception.message(error)}"}
{:error, %{message: message}} ->
{:error, "could not create Pod, reason: #{message}"}
end
end
@ -324,38 +301,13 @@ defmodule Livebook.Runtime.K8s do
end
end
defp await_pod_ready(req, namespace, pod_name) do
with :ok <-
Kubereq.wait_until(
req,
namespace,
pod_name,
fn
:deleted ->
{:error, "the Pod was deleted before it started running"}
pod ->
get_in(pod, [
"status",
"conditions",
Access.filter(&(&1["type"] == "Ready")),
"status"
]) == ["True"]
end,
# 30 minutes
1_800_000
),
{:ok, %{status: 200, body: pod}} <- Kubereq.get(req, namespace, pod_name) do
{:ok, pod["status"]["podIP"]}
defp await_pod_ready(kubeconfig, namespace, pod_name) do
with :ok <- Livebook.K8sAPI.await_pod_ready(kubeconfig, namespace, pod_name),
{:ok, %{ip: pod_ip}} <- Livebook.K8sAPI.get_pod(kubeconfig, namespace, pod_name) do
{:ok, pod_ip}
else
{:error, :watch_timeout} ->
{:error, "timed out waiting for Pod to start up"}
{:error, error} ->
{:error, error}
_other ->
{:error, "tailed getting the Pod's IP address"}
{:error, %{message: message}} ->
{:error, "failed while waiting for the Pod to start, reason: #{message}"}
end
end

View file

@ -356,7 +356,9 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
<.icon_button
phx-click="delete_volume"
phx-target={@myself}
disabled={@volume_id == nil or (@volume_action != nil and @volume_action.inflight)}
disabled={
@volume_id == nil or (@volume_action != nil and @volume_action.status == :inflight)
}
>
<.remix_icon icon="delete-bin-6-line" />
</.icon_button>
@ -380,7 +382,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
class="text-red-600 font-medium text-sm whitespace-nowrap"
phx-click="confirm_delete_volume"
phx-target={@myself}
disabled={@volume_action.inflight}
disabled={@volume_action.status == :inflight}
>
<.remix_icon icon="delete-bin-6-line" class="align-middle mr-1" /> Delete
</button>
@ -388,7 +390,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
class="text-gray-600 font-medium text-sm"
phx-click="cancel_delete_volume"
phx-target={@myself}
disabled={@volume_action.inflight}
disabled={@volume_action.status == :inflight}
>
Cancel
</button>
@ -415,9 +417,9 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
</div>
<.button
type="submit"
disabled={not @volume_action.changeset.valid? or @volume_action.inflight}
disabled={not @volume_action.changeset.valid? or @volume_action.status == :inflight}
>
<%= if(@volume_action.inflight, do: "Creating...", else: "Create") %>
<%= if(@volume_action.status == :inflight, do: "Creating...", else: "Create") %>
</.button>
<.button
type="button"
@ -425,12 +427,13 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
outlined
phx-click="cancel_new_volume"
phx-target={@myself}
disabled={@volume_action.status == :inflight}
>
Cancel
</.button>
</.form>
<div :if={error = @volume_action[:error]}>
<.message_box kind={:error} message={"Error: " <> error.message} />
<div :if={@volume_action[:status] == :error}>
<.message_box kind={:error} message={"Error: " <> @volume_action.error.message} />
</div>
</div>
</div>
@ -459,7 +462,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
end
def handle_event("delete_volume", %{}, socket) do
volume_action = %{type: :delete, inflight: false, error: nil}
volume_action = %{type: :delete, status: :initial, error: nil}
{:noreply, assign(socket, volume_action: volume_action)}
end
@ -472,7 +475,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
end
def handle_event("new_volume", %{}, socket) do
volume_action = %{type: :new, changeset: volume_changeset(), inflight: false, error: false}
volume_action = %{type: :new, changeset: volume_changeset(), status: :initial, error: nil}
{:noreply, assign(socket, volume_action: volume_action)}
end
@ -592,7 +595,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
assign(socket, volumes: volumes, volume_id: volume.id, volume_action: nil)
{:error, error} ->
assign_nested(socket, :volume_action, error: error, inflight: false)
assign_nested(socket, :volume_action, error: error, status: :error)
end
{:noreply, socket}
@ -608,7 +611,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
assign(socket, volumes: volumes, volume_id: nil, volume_action: nil)
{:error, error} ->
assign_nested(socket, :volume_action, error: error, inflight: false)
assign_nested(socket, :volume_action, error: error, status: :error)
end
{:noreply, socket}
@ -767,7 +770,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
|> start_async(:delete_volume, fn ->
Livebook.FlyAPI.delete_volume(token, app_name, volume_id)
end)
|> assign_nested(:volume_action, inflight: true)
|> assign_nested(:volume_action, status: :inflight)
end
defp create_volume(socket, name, size_gb) do
@ -787,7 +790,7 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do
|> start_async(:create_volume, fn ->
Livebook.FlyAPI.create_volume(token, app_name, name, region, size_gb, compute)
end)
|> assign_nested(:volume_action, inflight: true)
|> assign_nested(:volume_action, status: :inflight)
end
defp build_config(socket) do

View file

@ -4,7 +4,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
import Ecto.Changeset
alias Livebook.{Session, Runtime}
alias Livebook.K8s.{Auth, Pod, PVC}
alias Livebook.K8s.{Pod, PVC}
@kubeconfig_pipeline Application.compile_env(:livebook, :k8s_kubeconfig_pipeline)
@ -23,7 +23,6 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
kubeconfig: kubeconfig,
context_options: context_options,
context: nil,
reqs: nil,
cluster_check: %{status: :initial, error: nil},
namespace: nil,
namespace_options: nil,
@ -118,15 +117,13 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
phx-change="set_context"
phx-nosubmit
phx-target={@myself}
class="mt-1"
class="mt-1 flex flex-col gap-4"
>
<.select_field name="context" value={@context} label="Context" options={@context_options} />
<.loader :if={@cluster_check.status == :inflight} />
<.cluster_check_error :if={@cluster_check.status == :error} error={@cluster_check.error} />
</form>
<.loader :if={@cluster_check.status == :inflight} />
<.cluster_check_error :if={@cluster_check.status == :error} error={@cluster_check.error} />
<form
:if={@cluster_check.status == :ok}
phx-change="set_namespace"
@ -149,7 +146,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
</div>
</form>
<.message_box :if={@rbac.status === :errors} kind={:error}>
<.message_box :if={@rbac.status == :errors} kind={:error}>
<%= for error <- @rbac.errors do %>
<.rbac_error error={error} />
<% end %>
@ -260,7 +257,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
<div class="mt-4 flex flex-col">
<div class="flex items-start gap-1">
<form phx-change="set_pvc_name" phx-target={@myself} class="grow">
<form phx-change="set_pvc_name" phx-nosubmit phx-target={@myself} class="grow">
<.select_field
:if={@rbac.permissions.list_pvc}
value={@pvc_name}
@ -302,7 +299,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
</div>
</div>
<div
:if={@pvc_action[:type] in [:delete, :delete_inflight]}
:if={@pvc_action[:type] == :delete}
class="px-4 py-3 mt-4 flex space-x-4 items-center border border-gray-200 rounded-lg"
>
<p class="grow text-gray-700 text-sm">
@ -313,7 +310,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
class="text-red-600 font-medium text-sm whitespace-nowrap"
phx-click="confirm_delete_pvc"
phx-target={@myself}
disabled={@pvc_action[:type] == :delete_inflight}
disabled={@pvc_action.status == :inflight}
>
<.remix_icon icon="delete-bin-6-line" class="align-middle mr-1" />
<%= if @pvc_action[:type] == :delete, do: "Delete", else: "Deleting..." %>
@ -322,7 +319,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
class="text-gray-600 font-medium text-sm"
phx-click="cancel_delete_pvc"
phx-target={@myself}
disabled={@pvc_action[:type] == :delete_inflight}
disabled={@pvc_action.status == :inflight}
>
Cancel
</button>
@ -331,7 +328,7 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
<.form
:let={pvcf}
:if={@pvc_action[:type] in [:new, :new_inflight]}
:if={@pvc_action[:type] == :new}
for={@pvc_action.changeset}
as={:pvc}
phx-submit="create_pvc"
@ -354,25 +351,27 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
<.select_field field={pvcf[:storage_class]} options={@pvc_action.storage_classes} />
</div>
<.button
:if={@pvc_action[:type] == :new}
type="submit"
disabled={not @pvc_action.changeset.valid? or @pvc_action[:type] == :new_inflight}
disabled={not @pvc_action.changeset.valid? or @pvc_action.status == :inflight}
>
<%= if @pvc_action[:type] == :new, do: "Create", else: "Creating..." %>
<%= if(@pvc_action.status == :inflight, do: "Creating...", else: "Create") %>
</.button>
<.button
:if={@pvc_action[:type] == :new}
type="button"
color="gray"
outlined
phx-click="cancel_new_pvc"
phx-target={@myself}
disabled={@pvc_action[:type] == :new_inflight}
disabled={@pvc_action.status == :inflight}
>
Cancel
</.button>
</.form>
<.error :if={@pvc_action[:error]}><%= @pvc_action[:error] %></.error>
<.message_box
:if={@pvc_action[:status] == :error}
kind={:error}
message={"Error: " <> @pvc_action.error.message}
/>
</div>
</div>
"""
@ -389,58 +388,47 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
defp cluster_check_error(%{error: %{status: 401}} = assigns) do
~H"""
<.message_box kind={:error}>
<div class="flex items-center justify-between">
<div>Authentication with cluster failed.</div>
</div>
</.message_box>
"""
end
defp cluster_check_error(%{error: %{reason: :timeout}} = assigns) do
~H"""
<.message_box kind={:error}>
<div class="flex items-center justify-between">
<div>Connection to cluster timed out.</div>
</div>
</.message_box>
<.message_box kind={:error} message="Authentication with cluster failed." />
"""
end
defp cluster_check_error(assigns) do
~H"""
<.message_box kind={:error}>
<div class="flex items-center justify-between">
<div>Connection to cluster failed.</div>
</div>
</.message_box>
<.message_box kind={:error} message={"Connection to cluster failed, reason: " <> @error.message} />
"""
end
defp rbac_error(%{error: %Req.Response{status: 201} = resp} = assigns) do
resourceAttributes = resp.body["spec"]["resourceAttributes"]
verb = resourceAttributes["verb"]
namespace = resourceAttributes["namespace"]
defp rbac_error(%{error: {:ok, access_review}} = assigns) do
namespace = access_review.namespace
verb = access_review.verb
gkv =
path =
String.trim(
"#{resourceAttributes["group"]}/#{resourceAttributes["version"]}/#{resourceAttributes["resource"]}",
"#{access_review.group}/#{access_review.version}/#{access_review.resource}",
"/"
)
assigns = assign(assigns, verb: verb, gkv: gkv, namespace: namespace)
assigns = assign(assigns, verb: verb, path: path, namespace: namespace)
~H"""
<div class="flex items-center justify-between">
<div>
Authenticated user has no permission to <span class="font-semibold"><%= @verb %></span>
<code><%= @gkv %></code>
<code><%= @path %></code>
<span :if={@namespace}> in namespace <code><%= @namespace %></code> (or the namespace doesn't exist)</span>.
</div>
</div>
"""
end
defp rbac_error(%{error: {:error, %{message: message}}} = assigns) do
assigns = assign(assigns, :message, message)
~H"""
<div><%= @message %></div>
"""
end
@impl true
def handle_event("set_context", %{"context" => context}, socket) do
{:noreply, socket |> set_context(context) |> set_namespace(nil)}
@ -467,8 +455,8 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
type: :new,
changeset: PVC.changeset(),
storage_classes: storage_classes(socket.assigns),
inflight: false,
error: false
status: :initial,
error: nil
}
{:noreply, assign(socket, pvc_action: pvc_action)}
@ -501,18 +489,20 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
end
def handle_event("delete_pvc", %{}, socket) do
pvc_action = %{type: :delete, error: nil}
pvc_action = %{type: :delete, status: :initial, error: nil}
{:noreply, assign(socket, pvc_action: pvc_action)}
end
def handle_event("confirm_delete_pvc", %{}, socket) do
%{namespace: namespace, pvc_name: name} = socket.assigns
req = socket.assigns.reqs.pvc
kubeconfig = socket.assigns.kubeconfig
socket =
socket
|> start_async(:delete_pvc, fn -> Kubereq.delete(req, namespace, name) end)
|> assign_nested(:pvc_action, type: :delete_inflight)
|> start_async(:delete_pvc, fn ->
Livebook.K8sAPI.delete_pvc(kubeconfig, namespace, name)
end)
|> assign_nested(:pvc_action, status: :inflight)
{:noreply, socket}
end
@ -536,26 +526,46 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
@impl true
def handle_async(:rbac_check, {:ok, %{errors: errors, permissions: permissions}}, socket) do
status = if errors === [], do: :ok, else: :errors
status = if errors == [], do: :ok, else: :errors
{:noreply, assign(socket, :rbac, %{status: status, errors: errors, permissions: permissions})}
end
def handle_async(:load_namespace_options, {:ok, [:ok, {:ok, resp}]}, socket) do
def handle_async(:cluster_check, {:ok, results}, socket) do
[access_review_result, namespaces_result] = results
access_review_result =
case access_review_result do
{:ok, %{allowed: true}} -> :ok
{:ok, %{allowed: false}} -> {:error, %{message: "no access", status: nil}}
error -> error
end
namespaces_result =
case namespaces_result do
{:ok, namespaces} ->
namespace_options = Enum.map(namespaces, & &1.name)
{:ok, namespace_options, List.first(namespace_options)}
{:error, %{status: 403}} ->
# No access to list namespaces, we will show an input instead
{:ok, nil, nil}
other ->
other
end
socket =
case resp do
%Req.Response{status: 200, body: %{"items" => resources}} ->
namespace_options = Enum.map(resources, & &1["metadata"]["name"])
socket
|> assign(:namespace_options, namespace_options)
|> set_namespace(List.first(namespace_options))
|> assign(:cluster_check, %{status: :ok, error: nil})
%Req.Response{status: _other} ->
# cannot list namespaces
with :ok <- access_review_result,
{:ok, namespace_options, namespace} <- namespaces_result do
socket
|> assign(:namespace_options, namespace_options)
|> set_namespace(namespace)
|> assign(:cluster_check, %{status: :ok, error: nil})
else
{:error, error} ->
socket
|> assign(:namespace_options, nil)
|> assign(:cluster_check, %{status: :ok, error: nil})
|> assign(:cluster_check, %{status: :error, error: error})
end
{:noreply, socket}
@ -564,13 +574,13 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
def handle_async(:delete_pvc, {:ok, result}, socket) do
socket =
case result do
{:ok, %{status: 200}} ->
:ok ->
socket
|> assign(pvc_name: nil, pvc_action: nil)
|> pvc_options()
{:ok, %{body: %{"message" => message}}} ->
assign_nested(socket, :pvc_action, error: message, type: :delete)
{:error, error} ->
assign_nested(socket, :pvc_action, status: :error, error: error)
end
{:noreply, socket}
@ -579,40 +589,18 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
def handle_async(:create_pvc, {:ok, result}, socket) do
socket =
case result do
{:ok, %{status: 201, body: created_pvc}} ->
{:ok, %{name: name}} ->
socket
|> assign(pvc_name: created_pvc["metadata"]["name"], pvc_action: nil)
|> assign(pvc_name: name, pvc_action: nil)
|> pvc_options()
{:ok, %{body: body}} ->
socket
|> assign_nested(:pvc_action,
error: "Creating the PVC failed: #{body["message"]}",
type: :new
)
{:error, error} when is_exception(error) ->
socket
|> assign_nested(:pvc_action,
error: "Creating the PVC failed: #{Exception.message(error)}",
type: :new
)
{:error, error} ->
assign_nested(socket, :pvc_action, status: :error, error: error)
end
{:noreply, socket}
end
def handle_async(:load_namespace_options, {:ok, results}, socket) do
{:error, error} = List.first(results, &match?({:error, _}, &1))
socket =
socket
|> assign(:namespace_options, nil)
|> assign(:cluster_check, %{status: :error, error: error})
{:noreply, socket}
end
defp label(namespace, runtime, runtime_status) do
reconnecting? = reconnecting?(namespace, runtime)
@ -630,11 +618,11 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
defp create_pvc(socket, pvc) do
namespace = socket.assigns.namespace
manifest = PVC.manifest(pvc, namespace)
req = socket.assigns.reqs.pvc
kubeconfig = socket.assigns.kubeconfig
socket
|> start_async(:create_pvc, fn -> Kubereq.create(req, manifest) end)
|> assign_nested(:pvc_action, type: :new_inflight)
|> start_async(:create_pvc, fn -> Livebook.K8sAPI.create_pvc(kubeconfig, manifest) end)
|> assign_nested(:pvc_action, status: :inflight)
end
defp set_context(socket, nil), do: assign(socket, :context, nil)
@ -642,26 +630,18 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
defp set_context(socket, context) do
kubeconfig = Kubereq.Kubeconfig.set_current_context(socket.assigns.kubeconfig, context)
reqs = %{
access_reviews:
Kubereq.new(kubeconfig, "apis/authorization.k8s.io/v1/selfsubjectaccessreviews"),
namespaces: Kubereq.new(kubeconfig, "api/v1/namespaces/:name"),
pvc: Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/persistentvolumeclaims/:name"),
sc: Kubereq.new(kubeconfig, "apis/storage.k8s.io/v1/storageclasses/:name")
}
socket
|> start_async(:load_namespace_options, fn ->
|> start_async(:cluster_check, fn ->
[
Task.async(fn ->
Livebook.K8s.Auth.can_i?(reqs.access_reviews,
Livebook.K8sAPI.create_access_review(kubeconfig,
verb: "create",
group: "authorization.k8s.io",
version: "v1",
resource: "selfsubjectaccessreviews"
)
end),
Task.async(fn -> Kubereq.list(reqs.namespaces, nil) end)
Task.async(fn -> Livebook.K8sAPI.list_namespaces(kubeconfig) end)
]
|> Task.await_many(:infinity)
end)
@ -670,8 +650,6 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
context: context,
namespace: nil,
namespace_options: nil,
rbac_error: nil,
reqs: reqs,
cluster_check: %{status: :inflight, error: nil}
)
end
@ -681,36 +659,38 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
end
defp set_namespace(socket, ns) do
reqs = socket.assigns.reqs
kubeconfig = socket.assigns.kubeconfig
socket
|> start_async(:rbac_check, fn ->
resource_attribute_list = [
# Required permissions
[verb: "get", version: "v1", resource: "pods", namespace: ns],
[verb: "list", version: "v1", resource: "pods", namespace: ns],
[verb: "watch", version: "v1", resource: "pods", namespace: ns],
[verb: "create", version: "v1", resource: "pods", namespace: ns],
[verb: "delete", version: "v1", resource: "pods", namespace: ns],
[verb: "create", version: "v1", resource: "pods/portforward", namespace: ns],
# Optional permissions
[verb: "list", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
[verb: "create", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
[verb: "delete", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
[verb: "list", version: "v1", resource: "storageclasses", namespace: ns]
]
{required_permissions, optional_permissions} =
Auth.batch_check(reqs.access_reviews, [
# required permissions:
[verb: "get", version: "v1", resource: "pods", namespace: ns],
[verb: "list", version: "v1", resource: "pods", namespace: ns],
[verb: "watch", version: "v1", resource: "pods", namespace: ns],
[verb: "create", version: "v1", resource: "pods", namespace: ns],
[verb: "delete", version: "v1", resource: "pods", namespace: ns],
[verb: "create", version: "v1", resource: "pods/portforward", namespace: ns],
# optional permissions:
[verb: "list", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
[verb: "create", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
[verb: "delete", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
[verb: "list", version: "v1", resource: "storageclasses", namespace: ns]
])
resource_attribute_list
|> Enum.map(&Task.async(fn -> Livebook.K8sAPI.create_access_review(kubeconfig, &1) end))
|> Task.await_many(:infinity)
|> Enum.split(6)
errors =
required_permissions
|> Enum.reject(&(&1 === :ok))
|> Enum.map(fn {:error, error} -> error end)
Enum.reject(required_permissions, &match?({:ok, %{allowed: true}}, &1))
permissions =
optional_permissions
|> Enum.map(&(&1 === :ok))
|> then(&Enum.zip([:list_pvc, :create_pvc, :delete_pvc, :list_sc], &1))
|> Enum.map(&match?({:ok, %{allowed: true}}, &1))
|> then(&Enum.zip([:list_pvc, :create_pvc, :delete_pvc, :list_storage_classes], &1))
|> Map.new()
%{errors: errors, permissions: permissions}
@ -751,33 +731,26 @@ defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
end
defp pvc_options(socket) do
%{reqs: %{pvc: req}, namespace: ns} = socket.assigns
%{kubeconfig: kubeconfig, namespace: namespace} = socket.assigns
case Kubereq.list(req, ns) do
{:ok, %Req.Response{status: 200} = resp} ->
pvcs =
resp.body["items"]
|> Enum.reject(& &1["metadata"]["deletionTimestamp"])
|> Enum.map(& &1["metadata"]["name"])
case Livebook.K8sAPI.list_pvcs(kubeconfig, namespace) do
{:ok, pvcs} ->
pvcs = for pvc <- pvcs, pvc.deleted_at == nil, do: pvc.name
assign(socket, :pvcs, pvcs)
socket
|> assign(:pvcs, pvcs)
_ ->
_other ->
assign(socket, :pvcs, [])
end
end
defp storage_classes(%{rbac: %{permissions: %{list_sc: false}}}), do: []
defp storage_classes(%{rbac: %{permissions: %{list_storage_classes: false}}}), do: []
defp storage_classes(assigns) do
%{reqs: %{sc: req}} = assigns
defp storage_classes(%{kubeconfig: kubeconfig}) do
case Livebook.K8sAPI.list_storage_classes(kubeconfig) do
{:ok, storage_classes} ->
Enum.map(storage_classes, & &1.name)
case Kubereq.list(req, nil) do
{:ok, %Req.Response{status: 200} = resp} ->
Enum.map(resp.body["items"], & &1["metadata"]["name"])
_ ->
_other ->
[]
end
end

View file

@ -47,9 +47,8 @@ defmodule Livebook.Runtime.K8sTest do
test "connecting flow" do
config = config()
req = req()
assert [] = list_pods(req)
assert [] = list_pods()
pid = Runtime.K8s.new(config) |> Runtime.connect()
@ -70,7 +69,7 @@ defmodule Livebook.Runtime.K8sTest do
Runtime.take_ownership(runtime)
assert [_] = list_pods(req)
assert [_] = list_pods()
# Verify that we can actually evaluate code on the Kubernetes Pod
Runtime.evaluate_code(runtime, :elixir, ~s/System.fetch_env!("POD_NAME")/, {:c1, :e1}, [])
@ -80,23 +79,10 @@ defmodule Livebook.Runtime.K8sTest do
Runtime.disconnect(runtime)
# Wait for Pod to terminate
assert :ok ==
Kubereq.wait_until(
req,
"default",
runtime.pod_name,
&(&1["status"]["phase"] == "Succeeded")
)
cmd!(~w(kubectl wait --for=jsonpath={.status.phase}=Succeeded pod/#{runtime.pod_name}))
# Finally, delete the Pod object
Kubereq.delete(req, "default", runtime.pod_name)
end
defp req() do
Kubereq.Kubeconfig.Default
|> Kubereq.Kubeconfig.load()
|> Kubereq.Kubeconfig.set_current_context("kind-#{@cluster_name}")
|> Kubereq.new("api/v1/namespaces/:namespace/pods/:name")
cmd!(~w(kubectl delete pod #{runtime.pod_name}))
end
defp config(attrs \\ %{}) do
@ -123,21 +109,19 @@ defmodule Livebook.Runtime.K8sTest do
Map.merge(defaults, attrs)
end
defp list_pods(req) do
{:ok, resp} =
Kubereq.list(req, "default",
label_selectors: [{"livebook.dev/runtime", "integration-test"}],
field_selectors: [{"status.phase", "Running"}]
)
resp.body["items"]
defp list_pods() do
cmd!(
~w(kubectl get pod --selector=livebook.dev/runtime=integration-test --field-selector=status.phase==Running --output json)
)
|> Jason.decode!()
|> Map.fetch!("items")
end
defp cmd!([command | args]) do
{output, status} = System.cmd(command, args, stderr_to_stdout: true)
if status != 0 do
raise "command #{inspect(command)} #{inspect(args)} failed"
raise "command #{inspect(command)} #{inspect(args)} failed with output:\n#{output}"
end
output

View file

@ -1205,6 +1205,8 @@ defmodule LivebookWeb.SessionLiveTest do
|> element(~s{form[phx-change="set_namespace"]})
|> render_change(%{namespace: "default"})
render_async(view)
assert view
|> element(~s{select[name="pvc_name"] option[value="foo-pvc"]})
|> has_element?()

View file

@ -12,12 +12,16 @@ defmodule Livebook.K8sClusterStub do
plug :dispatch
post "/apis/authorization.k8s.io/v1/selfsubjectaccessreviews" do
resource = conn.body_params["spec"]["resourceAttributes"]["resource"]
resource_attributes = conn.body_params["spec"]["resourceAttributes"]
resource = resource_attributes["resource"]
allowed = conn.host == "default" or resource == "selfsubjectaccessreviews"
conn
|> put_status(201)
|> Req.Test.json(%{"status" => %{"allowed" => allowed}})
|> Req.Test.json(%{
"status" => %{"allowed" => allowed},
"spec" => %{"resourceAttributes" => resource_attributes}
})
end
get "/api/v1/namespaces", host: "default" do