From 282ffeba54761c30a33d4bdfd9753523407ee070 Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Wed, 18 Sep 2024 13:56:17 +0200 Subject: [PATCH] Add K8s runtime (#2756) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jonatan KÅ‚osko --- config/config.exs | 3 +- config/test.exs | 8 +- lib/livebook.ex | 3 +- lib/livebook/k8s/auth.ex | 65 ++ lib/livebook/k8s/pod.ex | 182 ++++ lib/livebook/k8s/pvc.ex | 58 + lib/livebook/runtime/k8s.ex | 555 ++++++++++ .../session_live/k8s_runtime_component.ex | 992 ++++++++++++++++++ .../live/session_live/runtime_component.ex | 11 + mix.exs | 2 + mix.lock | 4 + rel/server/overlays/bin/start_runtime.exs | 20 +- test/livebook/runtime/k8s_test.exs | 145 +++ test/livebook_web/live/session_live_test.exs | 195 ++++ test/support/k8s_cluster_stub.ex | 66 ++ test/test_helper.exs | 5 +- 16 files changed, 2305 insertions(+), 9 deletions(-) create mode 100644 lib/livebook/k8s/auth.ex create mode 100644 lib/livebook/k8s/pod.ex create mode 100644 lib/livebook/k8s/pvc.ex create mode 100644 lib/livebook/runtime/k8s.ex create mode 100644 lib/livebook_web/live/session_live/k8s_runtime_component.ex create mode 100644 test/livebook/runtime/k8s_test.exs create mode 100644 test/support/k8s_cluster_stub.ex diff --git a/config/config.exs b/config/config.exs index 529ea8e65..bfcf6ebfa 100644 --- a/config/config.exs +++ b/config/config.exs @@ -40,7 +40,8 @@ config :livebook, teams_url: "https://teams.livebook.dev", github_release_info: %{repo: "livebook-dev/livebook", version: Mix.Project.config()[:version]}, update_instructions_url: nil, - within_iframe: false + within_iframe: false, + k8s_kubeconfig_pipeline: Kubereq.Kubeconfig.Default config :livebook, Livebook.Apps.Manager, retry_backoff_base_ms: 5_000 diff --git a/config/test.exs b/config/test.exs index 3e9471b2d..316b41d3e 100644 --- a/config/test.exs +++ b/config/test.exs @@ -24,6 +24,12 @@ end config :livebook, data_path: data_path, - agent_name: "chonky-cat" + agent_name: "chonky-cat", + k8s_kubeconfig_pipeline: + {Kubereq.Kubeconfig.Stub, + plugs: %{ + "default" => {Req.Test, :k8s_cluster}, + "no-permission" => {Req.Test, :k8s_cluster} + }} config :livebook, Livebook.Apps.Manager, retry_backoff_base_ms: 0 diff --git a/lib/livebook.ex b/lib/livebook.ex index 5455ce503..3cb1fe1fb 100644 --- a/lib/livebook.ex +++ b/lib/livebook.ex @@ -166,7 +166,8 @@ defmodule Livebook do [ Livebook.Runtime.Standalone, Livebook.Runtime.Attached, - Livebook.Runtime.Fly + Livebook.Runtime.Fly, + Livebook.Runtime.K8s ] if home = Livebook.Config.writable_dir!("LIVEBOOK_HOME") do diff --git a/lib/livebook/k8s/auth.ex b/lib/livebook/k8s/auth.ex new file mode 100644 index 000000000..207fb0c68 --- /dev/null +++ b/lib/livebook/k8s/auth.ex @@ -0,0 +1,65 @@ +defmodule Livebook.K8s.Auth do + # Implementation of Access Review checks for the authenticated user + # using the `SelfSubjectAccessReview` [1] resource. + # + # [1]: https://kubernetes.io/docs/reference/kubernetes-api/authorization-resources/self-subject-access-review-v1/#SelfSubjectAccessReviewSpec + + @doc """ + Concurrently reviews access according to a list of `resource_attributes`. + + Expects `req` to be prepared for `SelfSubjectAccessReview`. + """ + @spec batch_check(Req.Request.t(), [keyword()]) :: + [:ok | {:error, %Req.Response{}} | {:error, Exception.t()}] + def batch_check(req, resource_attribute_list) do + resource_attribute_list + |> Enum.map(&Task.async(fn -> can_i?(req, &1) end)) + |> Task.await_many(:infinity) + end + + @doc """ + Reviews access according to `resource_attributes`. + + Expects `req` to be prepared for `SelfSubjectAccessReview`. + """ + @spec can_i?(Req.Request.t(), keyword()) :: + :ok | {:error, %Req.Response{}} | {:error, Exception.t()} + def can_i?(req, resource_attributes) do + resource_attributes = + resource_attributes + |> Keyword.validate!([ + :name, + :namespace, + :path, + :resource, + :subresource, + :verb, + :version, + group: "" + ]) + |> Enum.into(%{}) + + access_review = %{ + "apiVersion" => "authorization.k8s.io/v1", + "kind" => "SelfSubjectAccessReview", + "spec" => %{ + "resourceAttributes" => resource_attributes + } + } + + create_self_subject_access_review(req, access_review) + end + + defp create_self_subject_access_review(req, access_review) do + case Kubereq.create(req, access_review) do + {:ok, %Req.Response{status: 201, body: %{"status" => %{"allowed" => true}}}} -> + :ok + + {:ok, %Req.Response{} = response} -> + {:error, response} + + {:error, error} -> + {:error, error} + end + end +end diff --git a/lib/livebook/k8s/pod.ex b/lib/livebook/k8s/pod.ex new file mode 100644 index 000000000..0337416e6 --- /dev/null +++ b/lib/livebook/k8s/pod.ex @@ -0,0 +1,182 @@ +defmodule Livebook.K8s.Pod do + @main_container_name "livebook-runtime" + @home_pvc_volume_name "livebook-home" + + @default_pod_template """ + apiVersion: v1 + kind: Pod + metadata: + generateName: livebook-runtime- + spec: + containers: + - name: livebook-runtime + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi\ + """ + + @doc """ + Returns the default pod template. + """ + @spec default_pod_template() :: String.t() + def default_pod_template(), do: @default_pod_template + + @doc """ + Set the namespace on the given manifest. + """ + @spec set_namespace(map(), String.t()) :: map() + def set_namespace(manifest, namespace) do + put_in(manifest, ["metadata", "namespace"], namespace) + end + + @doc """ + Adds "volume" and "volumeMount" configurations to `manifest` in order + to mount `home_pvc` under /home/livebook on the pod. + """ + @spec set_home_pvc(map(), String.t()) :: map() + def set_home_pvc(manifest, home_pvc) do + manifest + |> update_in(["spec", Access.key("volumes", [])], fn volumes -> + volume = %{ + "name" => @home_pvc_volume_name, + "persistentVolumeClaim" => %{"claimName" => home_pvc} + } + + [volume | volumes] + end) + |> update_in( + ["spec", "containers", access_main_container(), Access.key("volumeMounts", [])], + fn volume_mounts -> + [%{"name" => @home_pvc_volume_name, "mountPath" => "/home/livebook"} | volume_mounts] + end + ) + end + + @doc """ + Adds the list of `env_vars` to the main container of the given `manifest`. + """ + @spec add_env_vars(map(), list()) :: map() + def add_env_vars(manifest, env_vars) do + update_in( + manifest, + ["spec", "containers", access_main_container(), Access.key("env", [])], + fn existing_vars -> env_vars ++ existing_vars end + ) + end + + @doc """ + Sets the tag of the main container's image. + """ + @spec set_docker_tag(map(), String.t()) :: map() + def set_docker_tag(manifest, docker_tag) do + image = "ghcr.io/livebook-dev/livebook:#{docker_tag}" + put_in(manifest, ["spec", "containers", access_main_container(), "image"], image) + end + + @doc """ + Adds the `port` to the main container and adds a readiness probe. + """ + @spec add_container_port(map(), non_neg_integer()) :: map() + def add_container_port(manifest, port) do + readiness_probe = %{ + "tcpSocket" => %{"port" => port}, + "initialDelaySeconds" => 1, + "periodSeconds" => 1 + } + + manifest + |> update_in( + ["spec", "containers", access_main_container(), Access.key("ports", [])], + &[%{"containerPort" => port} | &1] + ) + |> put_in(["spec", "containers", access_main_container(), "readinessProbe"], readiness_probe) + end + + @doc """ + Turns the given `pod_template` into a Pod manifest. + """ + @spec pod_from_template(String.t()) :: map() + def pod_from_template(pod_template) do + pod_template + |> YamlElixir.read_from_string!() + |> do_pod_from_template() + end + + defp do_pod_from_template(pod) do + pod + |> Map.merge(%{"apiVersion" => "v1", "kind" => "Pod"}) + |> put_in(["spec", "restartPolicy"], "Never") + end + + @doc """ + Validates the given Pod manifest. + """ + @spec validate_pod_template(map(), String.t()) :: :ok | {:error, String.t()} + def validate_pod_template(pod, namespace) + + def validate_pod_template(%{"apiVersion" => "v1", "kind" => "Pod"} = pod, namespace) do + with :ok <- validate_basics(pod), + :ok <- validate_main_container(pod), + :ok <- validate_namespace(pod, namespace) do + validate_container_image(pod) + end + end + + def validate_pod_template(_other_input, _namespace) do + {:error, ~s/Make sure to define a valid resource of apiVersion "v1" and kind "Pod"./} + end + + defp validate_basics(pod) do + cond do + not match?(%{"metadata" => %{}}, pod) -> + {:error, ".metadata is missing in your pod template."} + + not match?(%{"spec" => %{"containers" => containers}} when is_list(containers), pod) -> + {:error, ".spec.containers is missing in your pod template."} + + pod["metadata"]["name"] in [nil, ""] and pod["metadata"]["generateName"] in [nil, ""] -> + {:error, + "Make sure to define .metadata.name or .metadata.generateName in your pod template."} + + true -> + :ok + end + end + + defp validate_main_container(pod) do + if get_in(pod, ["spec", "containers", access_main_container()]) do + :ok + else + {:error, + ~s/Main container is missing. The main container should be named "#{@main_container_name}"./} + end + end + + defp validate_container_image(pod) do + if get_in(pod, ["spec", "containers", access_main_container(), "image"]) do + {:error, + "You can't set the container image of the main container. It's going to be overridden."} + else + :ok + end + end + + defp validate_namespace(pod, namespace) do + template_ns = get_in(pod, ["metadata", "namespace"]) + + if template_ns == nil or template_ns == namespace do + :ok + else + {:error, + "The field .template.metadata.namespace has to be omitted or set to the namespace you selected."} + end + end + + defp access_main_container() do + Kubereq.Access.find(&(&1["name"] == @main_container_name)) + end +end diff --git a/lib/livebook/k8s/pvc.ex b/lib/livebook/k8s/pvc.ex new file mode 100644 index 000000000..85a3df054 --- /dev/null +++ b/lib/livebook/k8s/pvc.ex @@ -0,0 +1,58 @@ +defmodule Livebook.K8s.PVC do + use Ecto.Schema + + import Ecto.Changeset + + @type t :: %__MODULE__{ + name: String.t(), + size_gb: integer(), + access_mode: String.t(), + storage_class: String.t() + } + + @primary_key false + embedded_schema do + field :name, :string + field :size_gb, :integer + field :access_mode, :string, default: "ReadWriteOnce" + field :storage_class, :string, default: nil + end + + @fields ~w(name size_gb access_mode storage_class)a + @required ~w(name size_gb access_mode)a + + @doc """ + Build a PVC changeset for the given `attrs`. + """ + @spec changeset(map()) :: Ecto.Changeset.t() + def changeset(attrs \\ %{}) do + %__MODULE__{} + |> cast(attrs, @fields) + |> validate_required(@required) + end + + @doc """ + Build PVC manifest for the given `pvc` and `namespace` to be applied to a + cluster. + """ + @spec manifest(pvc :: t(), namespace: String.t()) :: manifest :: map() + def manifest(pvc, namespace) do + %{ + "apiVersion" => "v1", + "kind" => "PersistentVolumeClaim", + "metadata" => %{ + "name" => pvc.name, + "namespace" => namespace + }, + "spec" => %{ + "storageClassName" => pvc.storage_class, + "accessModes" => [pvc.access_mode], + "resources" => %{ + "requests" => %{ + "storage" => "#{pvc.size_gb}Gi" + } + } + } + } + end +end diff --git a/lib/livebook/runtime/k8s.ex b/lib/livebook/runtime/k8s.ex new file mode 100644 index 000000000..65765af64 --- /dev/null +++ b/lib/livebook/runtime/k8s.ex @@ -0,0 +1,555 @@ +defmodule Livebook.Runtime.K8s do + # A runtime backed by a Kubernetes Pod managed by Livebook. + # + # This runtime uses the same concepts as the Fly runtime. In this + # case, we start a Pod in a Kubernetes cluster and use kubectl to + # proxy a local port to the distribution port of the remote node. + # See `Livebook.Runtime.Fly` for more design details. + + defstruct [:config, :node, :req, :server_pid, :lv_pid, :pod_name] + + @type config :: %{ + context: String.t(), + namespace: String.t(), + home_pvc: String.t() | nil, + docker_tag: String.t(), + pod_template: String.t() + } + + @type t :: %__MODULE__{ + node: node() | nil, + req: Req.Request.t(), + server_pid: pid() | nil, + lv_pid: pid(), + pod_name: String.t() | nil + } + + use GenServer, restart: :temporary + + require Logger + + alias Livebook.K8s.Pod + + @doc """ + Returns a new runtime instance. + """ + @spec new(config :: map(), req :: Req.Request.t()) :: t() + def new(config, req) do + %__MODULE__{config: config, req: req, lv_pid: self()} + end + + def __connect__(runtime) do + {:ok, pid} = + DynamicSupervisor.start_child(Livebook.RuntimeSupervisor, {__MODULE__, {runtime, self()}}) + + pid + end + + @doc false + def start_link({runtime, caller}) do + GenServer.start_link(__MODULE__, {runtime, caller}) + end + + @impl true + def init({runtime, caller}) do + state = %{primary_ref: nil} + {:ok, state, {:continue, {:init, runtime, caller}}} + end + + @impl true + def handle_continue({:init, runtime, caller}, state) do + config = runtime.config + %{namespace: namespace, context: context} = config + req = runtime.req + + kubeconfig = + if System.get_env("KUBERNETES_SERVICE_HOST") do + nil + else + System.get_env("KUBECONFIG") || Path.join(System.user_home!(), ".kube/config") + end + + cluster_data = get_cluster_data(kubeconfig) + + runtime_data = + %{ + node_base: cluster_data.node_base, + cookie: Node.get_cookie(), + dist_port: cluster_data.remote_port + } + |> :erlang.term_to_binary() + |> Base.encode64() + + parent = self() + + {:ok, watcher_pid} = + DynamicSupervisor.start_child( + Livebook.RuntimeSupervisor, + {Task, fn -> watcher(parent, req, config) end} + ) + + with {:ok, pod_name} <- + with_log(caller, "create pod", fn -> + create_pod(req, config, runtime_data, cluster_data.remote_port) + end), + _ <- send(watcher_pid, {:pod_created, pod_name}), + {:ok, pod_ip} <- + with_pod_events(caller, "waiting for pod", req, namespace, pod_name, fn -> + await_pod_ready(req, namespace, pod_name) + end), + child_node <- :"#{cluster_data.node_base}@#{pod_ip}", + :ok <- + with_log(caller, "start proxy", fn -> + k8s_forward_port(kubeconfig, context, cluster_data, pod_name, namespace) + end), + :ok <- + with_log(caller, "connect to node", fn -> + connect_loop(child_node, 40, 250) + end), + {:ok, primary_pid} <- fetch_runtime_info(child_node) do + primary_ref = Process.monitor(primary_pid) + + server_pid = + with_log(caller, "initialize node", fn -> + initialize_node(child_node) + end) + + send(primary_pid, :node_initialized) + + send(watcher_pid, :done) + + runtime = %{runtime | node: child_node, server_pid: server_pid, pod_name: pod_name} + send(caller, {:runtime_connect_done, self(), {:ok, runtime}}) + + {:noreply, %{state | primary_ref: primary_ref}} + else + {:error, error} -> + send(caller, {:runtime_connect_done, self(), {:error, error}}) + + {:stop, :shutdown, state} + end + end + + @impl true + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) when ref == state.primary_ref do + {:stop, :shutdown, state} + end + + def handle_info({port, _message}, state) when is_port(port) do + {:noreply, state} + end + + defp get_free_port!() do + {:ok, socket} = :gen_tcp.listen(0, active: false, reuseaddr: true) + {:ok, port} = :inet.port(socket) + :gen_tcp.close(socket) + port + end + + defp with_log(caller, name, fun) do + send(caller, {:runtime_connect_info, self(), name}) + + {microseconds, result} = :timer.tc(fun) + milliseconds = div(microseconds, 1000) + + case result do + {:error, error} -> + Logger.debug("[K8s runtime] #{name} FAILED in #{milliseconds}ms, error: #{error}") + + _ -> + Logger.debug("[K8s runtime] #{name} finished in #{milliseconds}ms") + end + + result + end + + defp with_pod_events(caller, name, req, namespace, pod_name, fun) do + with_log(caller, name, fn -> + runtime_pid = self() + + event_watcher_pid = + spawn_link(fn -> + watch_result = + req + |> Req.merge( + resource_path: "api/v1/namespaces/:namespace/events/:name", + resource_list_path: "api/v1/namespaces/:namespace/events" + ) + |> Kubereq.watch(namespace, + field_selectors: [ + {"involvedObject.kind", "Pod"}, + {"involvedObject.name", pod_name} + ] + ) + + case watch_result do + {:ok, stream} -> + Enum.each(stream, fn event -> + message = Livebook.Utils.downcase_first(event["object"]["message"]) + Logger.debug(~s'[K8s runtime] Pod event: "#{message}"') + send(caller, {:runtime_connect_info, runtime_pid, message}) + end) + + _error -> + :ok + end + end) + + result = fun.() + Process.exit(event_watcher_pid, :normal) + result + end) + end + + defp watcher(parent, req, config) do + ref = Process.monitor(parent) + watcher_loop(%{ref: ref, config: config, req: req, pod_name: nil}) + end + + defp watcher_loop(state) do + receive do + {:DOWN, ref, :process, _pid, _reason} when ref == state.ref -> + # If the parent process is killed, we try to eagerly free the + # created resources + if pod_name = state.pod_name do + namespace = state.config.namespace + _ = Kubereq.delete(state.req, namespace, pod_name) + end + + {:pod_created, pod_name} -> + watcher_loop(%{state | pod_name: pod_name}) + + :done -> + :ok + end + end + + defp create_pod(req, config, runtime_data, remote_port) do + %{ + pod_template: pod_template, + docker_tag: docker_tag, + home_pvc: home_pvc, + namespace: namespace + } = config + + manifest = + pod_template + |> Pod.pod_from_template() + |> Pod.add_env_vars([ + %{"name" => "LIVEBOOK_RUNTIME", "value" => runtime_data}, + %{ + "name" => "POD_IP", + "valueFrom" => %{"fieldRef" => %{"apiVersion" => "v1", "fieldPath" => "status.podIP"}} + }, + %{ + "name" => "POD_NAMESPACE", + "valueFrom" => %{ + "fieldRef" => %{"apiVersion" => "v1", "fieldPath" => "metadata.namespace"} + } + }, + %{ + "name" => "POD_NAME", + "valueFrom" => %{"fieldRef" => %{"apiVersion" => "v1", "fieldPath" => "metadata.name"}} + } + ]) + |> Pod.set_docker_tag(docker_tag) + |> Pod.set_namespace(namespace) + |> Pod.add_container_port(remote_port) + + manifest = + if home_pvc do + Pod.set_home_pvc(manifest, home_pvc) + else + manifest + end + + case Kubereq.create(req, manifest) do + {:ok, %{status: 201, body: %{"metadata" => %{"name" => pod_name}}}} -> + {:ok, pod_name} + + {:ok, %{body: body}} -> + {:error, "could not create Pod, reason: #{body["message"]}"} + + {:error, error} -> + {:error, "could not create Pod, reason: #{Exception.message(error)}"} + end + end + + defp get_cluster_data(_kubeconfig = nil) do + # When already running within Kubernetes we don't need the proxy, + # the node is reachable directly + %{node_base: "k8s_runtime", remote_port: 44444} + end + + defp get_cluster_data(_kubeconfig) do + local_port = get_free_port!() + %{node_base: "remote_runtime_#{local_port}", remote_port: 44444, local_port: local_port} + end + + defp k8s_forward_port(_kubeconfig = nil, _, _, _, _), do: :ok + + defp k8s_forward_port(kubeconfig, context, cluster_data, pod_name, namespace) do + %{local_port: local_port, remote_port: remote_port} = cluster_data + + with {:ok, kubectl_path} <- find_kubectl_executable() do + ports = "#{local_port}:#{remote_port}" + + # We want the proxy to accept the same protocol that we are + # going to use for distribution + bind_addr = + if Livebook.Utils.proto_dist() == :inet6_tcp do + "[::1]" + else + "127.0.0.1" + end + + args = + [ + "port-forward", + "--kubeconfig", + Path.expand(kubeconfig), + "--context", + context, + "-n", + namespace, + pod_name, + ports, + "--address", + bind_addr + ] + + port = + Port.open( + {:spawn_executable, kubectl_path}, + [:binary, :hide, :stderr_to_stdout, args: args, env: []] + ) + + port_ref = Port.monitor(port) + + result = + receive do + {^port, {:data, "Forwarding from " <> _}} -> + :ok + + {^port, {:data, "Error " <> _ = message}} -> + {:error, "failed to port-forward. #{String.trim(message)}"} + + {:DOWN, ^port_ref, :port, _object, reason} -> + {:error, "failed to port-forward. Process terminated, reason: #{inspect(reason)}"} + after + 30_000 -> + {:error, "failed to port-forward. Timed out after 30s"} + end + + Port.demonitor(port_ref, [:flush]) + + result + end + end + + defp find_kubectl_executable() do + if path = System.find_executable("kubectl") do + {:ok, path} + else + {:error, "no kubectl executable found in PATH."} + end + end + + defp await_pod_ready(req, namespace, pod_name) do + with :ok <- + Kubereq.wait_until( + req, + namespace, + pod_name, + fn + :deleted -> + {:error, "The Pod was deleted before it started running."} + + pod -> + get_in(pod, [ + "status", + "conditions", + Access.filter(&(&1["type"] == "Ready")), + "status" + ]) == ["True"] + end, + # 30 minutes + 1_800_000 + ), + {:ok, %{status: 200, body: pod}} <- Kubereq.get(req, namespace, pod_name) do + {:ok, pod["status"]["podIP"]} + else + {:error, :watch_timeout} -> + {:error, "Timed out waiting for Pod to start up."} + + {:error, error} -> + {:error, error} + + _other -> + {:error, "Failed getting the Pod's IP address."} + end + end + + defp connect_loop(_node, 0, _interval) do + {:error, "could not establish connection with the node"} + end + + defp connect_loop(node, attempts, interval) do + if Node.connect(node) do + :ok + else + Process.sleep(interval) + connect_loop(node, attempts - 1, interval) + end + end + + defp fetch_runtime_info(child_node) do + # Note: it is Livebook that starts the runtime node, so we know + # that the node runs Livebook release of the exact same version + # + # Also, the remote node already has all the runtime modules in + # the code path, compiled for its Elixir version, so we don't + # need to check for matching Elixir version. + + %{pid: pid} = :erpc.call(child_node, :persistent_term, :get, [:livebook_runtime_info]) + + {:ok, pid} + end + + defp initialize_node(child_node) do + init_opts = [ + runtime_server_opts: [ + extra_smart_cell_definitions: Livebook.Runtime.Definitions.smart_cell_definitions() + ] + ] + + Livebook.Runtime.ErlDist.initialize(child_node, init_opts) + end +end + +defimpl Livebook.Runtime, for: Livebook.Runtime.K8s do + alias Livebook.Runtime.ErlDist.RuntimeServer + + def describe(runtime) do + [{"Type", "K8s Pod"}] ++ + if runtime.node do + [{"Pod name", runtime.pod_name}, {"Node name", Atom.to_string(runtime.node)}] + else + [] + end + end + + def connect(runtime) do + Livebook.Runtime.K8s.__connect__(runtime) + end + + def take_ownership(runtime, opts \\ []) do + RuntimeServer.attach(runtime.server_pid, self(), opts) + Process.monitor(runtime.server_pid) + end + + def disconnect(runtime) do + :ok = RuntimeServer.stop(runtime.server_pid) + end + + def duplicate(runtime) do + Livebook.Runtime.K8s.new(runtime.config, runtime.req) + end + + def evaluate_code(runtime, language, code, locator, parent_locators, opts \\ []) do + RuntimeServer.evaluate_code( + runtime.server_pid, + language, + code, + locator, + parent_locators, + opts + ) + end + + def forget_evaluation(runtime, locator) do + RuntimeServer.forget_evaluation(runtime.server_pid, locator) + end + + def drop_container(runtime, container_ref) do + RuntimeServer.drop_container(runtime.server_pid, container_ref) + end + + def handle_intellisense(runtime, send_to, request, parent_locators, node) do + RuntimeServer.handle_intellisense(runtime.server_pid, send_to, request, parent_locators, node) + end + + def read_file(runtime, path) do + RuntimeServer.read_file(runtime.server_pid, path) + end + + def transfer_file(runtime, path, file_id, callback) do + RuntimeServer.transfer_file(runtime.server_pid, path, file_id, callback) + end + + def relabel_file(runtime, file_id, new_file_id) do + RuntimeServer.relabel_file(runtime.server_pid, file_id, new_file_id) + end + + def revoke_file(runtime, file_id) do + RuntimeServer.revoke_file(runtime.server_pid, file_id) + end + + def start_smart_cell(runtime, kind, ref, attrs, parent_locators) do + RuntimeServer.start_smart_cell(runtime.server_pid, kind, ref, attrs, parent_locators) + end + + def set_smart_cell_parent_locators(runtime, ref, parent_locators) do + RuntimeServer.set_smart_cell_parent_locators(runtime.server_pid, ref, parent_locators) + end + + def stop_smart_cell(runtime, ref) do + RuntimeServer.stop_smart_cell(runtime.server_pid, ref) + end + + def fixed_dependencies?(_runtime), do: false + + def add_dependencies(_runtime, code, dependencies) do + Livebook.Runtime.Dependencies.add_dependencies(code, dependencies) + end + + def has_dependencies?(runtime, dependencies) do + RuntimeServer.has_dependencies?(runtime.server_pid, dependencies) + end + + def snippet_definitions(_runtime) do + Livebook.Runtime.Definitions.snippet_definitions() + end + + def search_packages(_runtime, send_to, search) do + Livebook.Runtime.Dependencies.search_packages_on_hex(send_to, search) + end + + def put_system_envs(runtime, envs) do + RuntimeServer.put_system_envs(runtime.server_pid, envs) + end + + def delete_system_envs(runtime, names) do + RuntimeServer.delete_system_envs(runtime.server_pid, names) + end + + def restore_transient_state(runtime, transient_state) do + RuntimeServer.restore_transient_state(runtime.server_pid, transient_state) + end + + def register_clients(runtime, clients) do + RuntimeServer.register_clients(runtime.server_pid, clients) + end + + def unregister_clients(runtime, client_ids) do + RuntimeServer.unregister_clients(runtime.server_pid, client_ids) + end + + def fetch_proxy_handler_spec(runtime) do + RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid) + end + + def disconnect_node(runtime, node) do + RuntimeServer.disconnect_node(runtime.server_pid, node) + end +end diff --git a/lib/livebook_web/live/session_live/k8s_runtime_component.ex b/lib/livebook_web/live/session_live/k8s_runtime_component.ex new file mode 100644 index 000000000..2f9c3ec5b --- /dev/null +++ b/lib/livebook_web/live/session_live/k8s_runtime_component.ex @@ -0,0 +1,992 @@ +defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do + use LivebookWeb, :live_component + + import Ecto.Changeset + + alias Livebook.{Session, Runtime} + alias Livebook.K8s.{Auth, Pod, PVC} + + @config_secret_prefix "K8S_RUNTIME_" + @kubeconfig_pipeline Application.compile_env(:livebook, :k8s_kubeconfig_pipeline) + + @impl true + def mount(socket) do + unless Livebook.Config.runtime_enabled?(Livebook.Runtime.K8s) do + raise "runtime module not allowed" + end + + kubeconfig = Kubereq.Kubeconfig.load(@kubeconfig_pipeline) + context_options = Enum.map(kubeconfig.contexts, & &1["name"]) + + {:ok, + socket + |> assign( + kubeconfig: kubeconfig, + context_options: context_options, + context: nil, + reqs: nil, + cluster_check: %{status: :initial, error: nil}, + namespace: nil, + namespace_options: nil, + rbac: %{status: :inflight, errors: [], permissions: []}, + save_config: nil, + pvcs: nil, + pvc_action: nil, + home_pvc: nil, + docker_tag: hd(Livebook.Config.docker_images()).tag, + pod_template: %{template: Pod.default_pod_template(), status: :valid, message: nil} + )} + end + + @impl true + @spec update(maybe_improper_list() | map(), any()) :: {:ok, any()} + def update(assigns, socket) do + socket = assign(socket, assigns) + + socket = + cond do + is_map_key(socket.assigns, :config_defaults) -> + socket + + is_struct(assigns.runtime, Runtime.K8s) -> + %{config: config} = assigns.runtime + + config_defaults = + Map.new(config, fn {key, value} -> + {Atom.to_string(key), value} + end) + + socket + |> assign(config_defaults: config_defaults) + |> load_config_defaults() + + true -> + socket + |> assign(config_defaults: nil) + |> set_context(socket.assigns.kubeconfig.current_context) + end + + {:ok, socket} + end + + @impl true + def render(assigns) do + ~H""" +
+

+ Start a temporary Kubernetes Pod with an Elixir node to evaluate code. + The Pod is automatically deleted, once you disconnect the runtime. +

+ + <.save_config_form :if={@save_config} save_config={@save_config} hub={@hub} myself={@myself} /> + +
+ <.config_actions hub_secrets={@hub_secrets} myself={@myself} /> + + <.message_box :if={@kubeconfig.current_cluster == nil} kind={:error}> + In order to use the Kubernetes context, you need to set the KUBECONFIG + environment variable to a path pointing to a Kubernetes configuration YAML file (e.g. to "~/.kube/config"). + + +
+ <.select_field name="context" value={@context} label="Context" options={@context_options} /> +
+ + <.loader :if={@cluster_check.status == :inflight} /> + + <.cluster_check_error :if={@cluster_check.status == :error} error={@cluster_check.error} /> + +
+ <.select_field + :if={@namespace_options != nil} + name="namespace" + value={@namespace} + label="Namespace" + options={@namespace_options} + /> +
+ <.text_field name="namespace" value={@namespace} label="Namespace" phx-debounce="600" /> +
+ Authenticated user has no permission to list namespaces. But you can enter a name of an existing namespace. +
+
+
+ + <.message_box :if={@rbac.status === :errors} kind={:error}> + <%= for error <- @rbac.errors do %> + <.rbac_error error={error} /> + <% end %> + + +
+
+ Pod +
+
+ You can fully customize the runtime pod by editing the pod template. +
+
+ <.radio_field + :if={@rbac.status == :ok} + name="docker_tag" + value={@docker_tag} + label="Base Docker image" + options={LivebookWeb.AppComponents.docker_tag_options()} + /> +
+
+ <.textarea_field + name="pod_template" + label="Template" + value={@pod_template.template} + phx-debounce={500} + monospace={true} + phx-hook="TextareaAutosize" + /> + + <.message_box :if={@pod_template.status != :valid} kind={@pod_template.status}> +
+ <%= @pod_template.message %> +
+ +
+
+ + <.storage_config + :if={@rbac.status == :ok} + myself={@myself} + home_pvc={@home_pvc} + pvcs={@pvcs} + pvc_action={@pvc_action} + rbac={@rbac} + /> + +
+
+ <.button phx-click="init" phx-target={@myself} disabled={@runtime_status == :connecting}> + <%= label(@namespace, @runtime, @runtime_status) %> + + <.button + :if={@runtime_status == :connecting} + color="red" + outlined + phx-click="disconnect" + phx-target={@myself} + > + Disconnect + +
+
+ <.message_box kind={:info}> +
+ <.spinner /> + Step: <%= @runtime_connect_info %> +
+ +
+
+
+
+ """ + end + + defp storage_config(assigns) do + ~H""" +
+
+ Storage +
+
+ Every time you connect to the runtime, a fresh machine is created. + In order to persist data and caches, you can optionally mount a + volume at /home/livebook. Setting a Persistent Volume + Claim will automatically add a .template.spec.volumes[] + entry and a .template.spec.containers[name="livebook-runtime"].volumeMounts[] + entry to the pod template. +
+ +
+
+
+ <.select_field + :if={@rbac.permissions.list_pvc} + value={@home_pvc} + name="home_pvc" + label="Persistent Volume Claim" + options={[{"None", nil} | @pvcs]} + /> +
+ <.text_field value={@home_pvc} name="home_pvc" label="Persistent Volume Claim" /> +
+ Authenticated user has no permission to list PVCs. But you can enter a name of an existing PVC to be attached. +
+
+
+ +
+ + <.icon_button + phx-click="delete_pvc" + phx-target={@myself} + disabled={@home_pvc == nil or @pvc_action != nil} + > + <.remix_icon icon="delete-bin-6-line" /> + + + + <.icon_button phx-click="new_pvc" phx-target={@myself}> + <.remix_icon icon="add-line" /> + + +
+
+
+

+ Are you sure you want to irreversibly delete Persistent Volume Claim <%= @home_pvc %>? +

+
+ + +
+
+ + <.form + :let={pvcf} + :if={@pvc_action[:type] in [:new, :new_inflight]} + for={@pvc_action.changeset} + as={:pvc} + phx-submit="create_pvc" + phx-change="validate_pvc" + phx-target={@myself} + class="flex gap-2 mt-4 items-center" + autocomplete="off" + spellcheck="false" + > +
+ <.remix_icon icon="corner-down-right-line" class="text-gray-400 text-lg" /> +
+
+ <.text_field field={pvcf[:name]} placeholder="Name" /> + <.text_field field={pvcf[:size_gb]} placeholder="Size (Gi)" type="number" min="1" /> + <.select_field + field={pvcf[:access_mode]} + options={["ReadWriteOnce", "ReadWriteMany", "ReadWriteOncePod"]} + /> + <.select_field field={pvcf[:storage_class]} options={@pvc_action.storage_classes} /> +
+ <.button + :if={@pvc_action[:type] == :new} + type="submit" + disabled={not @pvc_action.changeset.valid? or @pvc_action[:type] == :new_inflight} + > + <%= if @pvc_action[:type] == :new, do: "Create", else: "Creating..." %> + + <.button + :if={@pvc_action[:type] == :new} + type="button" + color="gray" + outlined + phx-click="cancel_new_pvc" + phx-target={@myself} + disabled={@pvc_action[:type] == :new_inflight} + > + Cancel + + + <.error :if={@pvc_action[:error]}><%= @pvc_action[:error] %> +
+
+ """ + end + + defp save_config_form(assigns) do + ~H""" + <.form + :let={f} + for={@save_config.changeset} + as={:secret} + class="mt-4 flex flex-col" + phx-change="validate_save_config" + phx-submit="save_config" + phx-target={@myself} + autocomplete="off" + spellcheck="false" + > +
+ Save config +
+
+ Store the config in a secret in the <.workspace hub={@hub} /> workspace to reuse it later. +
+
+ <.message_box kind={:error} message={error} /> +
+
+ <.text_field field={f[:name]} label="Secret name" class="uppercase" autofocus /> +
+
+ <.button type="submit" disabled={not @save_config.changeset.valid? or @save_config.inflight}> + <%= if(@save_config.inflight, do: "Saving...", else: "Save") %> + + <.button + color="gray" + outlined + type="button" + phx-click="cancel_save_config" + phx-target={@myself} + > + Cancel + +
+ + """ + end + + defp workspace(assigns) do + ~H""" + + <%= @hub.hub_emoji %> + <%= @hub.hub_name %> + + """ + end + + defp config_actions(assigns) do + ~H""" +
+ <.button + color="gray" + outlined + small + type="button" + phx-click="open_save_config" + phx-target={@myself} + > + Save config + + <.menu id="config-secret-menu"> + <:toggle> + <.button color="gray" outlined small type="button"> + Load config + <.remix_icon icon="arrow-down-s-line" class="text-base leading-none" /> + + +
+ No configs saved yet +
+ <.menu_item :for={name <- config_secret_names(@hub_secrets)}> + + + +
+ """ + end + + defp loader(assigns) do + ~H""" +
+ Loading + <.spinner /> +
+ """ + end + + defp cluster_check_error(%{error: %{status: 401}} = assigns) do + ~H""" + <.message_box kind={:error}> +
+
Authentication with cluster failed.
+
+ + """ + end + + defp cluster_check_error(%{error: %{reason: :timeout}} = assigns) do + ~H""" + <.message_box kind={:error}> +
+
Connection to cluster timed out.
+
+ + """ + end + + defp cluster_check_error(assigns) do + ~H""" + <.message_box kind={:error}> +
+
Connection to cluster failed.
+
+ + """ + end + + defp rbac_error(%{error: %Req.Response{status: 201} = resp} = assigns) do + resourceAttributes = resp.body["spec"]["resourceAttributes"] + verb = resourceAttributes["verb"] + namespace = resourceAttributes["namespace"] + + gkv = + String.trim( + "#{resourceAttributes["group"]}/#{resourceAttributes["version"]}/#{resourceAttributes["resource"]}", + "/" + ) + + assigns = assign(assigns, verb: verb, gkv: gkv, namespace: namespace) + + ~H""" +
+
+ Authenticated user has no permission to <%= @verb %> + <%= @gkv %> + in namespace <%= @namespace %> (or the namespace doesn't exist). +
+
+ """ + end + + @impl true + def handle_event("set_context", %{"context" => context}, socket) do + {:noreply, socket |> set_context(context) |> set_namespace(nil)} + end + + def handle_event("set_namespace", %{"namespace" => namespace}, socket) do + {:noreply, set_namespace(socket, namespace)} + end + + def handle_event("set_docker_tag", %{"docker_tag" => docker_tag}, socket) do + {:noreply, assign(socket, :docker_tag, docker_tag)} + end + + def handle_event("set_pod_template", %{"pod_template" => pod_template}, socket) do + {:noreply, set_pod_template(socket, pod_template)} + end + + def handle_event("set_home_pvc", %{"home_pvc" => home_pvc}, socket) do + {:noreply, assign(socket, :home_pvc, home_pvc)} + end + + def handle_event("disconnect", %{}, socket) do + Session.disconnect_runtime(socket.assigns.session.pid) + {:noreply, socket} + end + + def handle_event("new_pvc", %{}, socket) do + pvc_action = %{ + type: :new, + changeset: PVC.changeset(), + storage_classes: storage_classes(socket.assigns), + inflight: false, + error: false + } + + {:noreply, assign(socket, pvc_action: pvc_action)} + end + + def handle_event("validate_pvc", %{"pvc" => pvc}, socket) do + changeset = + pvc + |> PVC.changeset() + |> Map.replace!(:action, :validate) + + {:noreply, assign_nested(socket, :pvc_action, changeset: changeset)} + end + + def handle_event("cancel_new_pvc", %{}, socket) do + {:noreply, assign(socket, pvc_action: nil)} + end + + def handle_event("create_pvc", %{"pvc" => pvc}, socket) do + pvc + |> PVC.changeset() + |> apply_action(:insert) + |> case do + {:ok, applied_pvc} -> + {:noreply, create_pvc(socket, applied_pvc)} + + {:error, changeset} -> + {:noreply, assign_nested(socket, :pvc_action, changeset: changeset)} + end + end + + def handle_event("delete_pvc", %{}, socket) do + pvc_action = %{type: :delete, error: nil} + {:noreply, assign(socket, pvc_action: pvc_action)} + end + + def handle_event("confirm_delete_pvc", %{}, socket) do + %{namespace: namespace, home_pvc: name} = socket.assigns + req = socket.assigns.reqs.pvc + + socket = + socket + |> start_async(:delete_pvc, fn -> Kubereq.delete(req, namespace, name) end) + |> assign_nested(:pvc_action, type: :delete_inflight) + + {:noreply, socket} + end + + def handle_event("cancel_delete_pvc", %{}, socket) do + {:noreply, assign(socket, pvc_action: nil)} + end + + def handle_event("init", %{}, socket) do + config = build_config(socket) + runtime = Runtime.K8s.new(config, socket.assigns.reqs.pod) + Session.set_runtime(socket.assigns.session.pid, runtime) + Session.connect_runtime(socket.assigns.session.pid) + {:noreply, socket} + end + + def handle_event("open_save_config", %{}, socket) do + changeset = config_secret_changeset(socket, %{name: @config_secret_prefix}) + save_config = %{changeset: changeset, inflight: false, error: false} + {:noreply, assign(socket, save_config: save_config)} + end + + def handle_event("cancel_save_config", %{}, socket) do + {:noreply, assign(socket, save_config: nil)} + end + + def handle_event("validate_save_config", %{"secret" => secret}, socket) do + changeset = + socket + |> config_secret_changeset(secret) + |> Map.replace!(:action, :validate) + + {:noreply, assign_nested(socket, :save_config, changeset: changeset)} + end + + def handle_event("save_config", %{"secret" => secret}, socket) do + changeset = config_secret_changeset(socket, secret) + + case Ecto.Changeset.apply_action(changeset, :insert) do + {:ok, secret} -> + {:noreply, save_config_secret(socket, secret, changeset)} + + {:error, changeset} -> + {:noreply, assign_nested(socket, :save_config, changeset: changeset)} + end + end + + def handle_event("load_config", %{"name" => name}, socket) do + secret = Enum.find(socket.assigns.hub_secrets, &(&1.name == name)) + + case Jason.decode(secret.value) do + {:ok, config_defaults} -> + {:noreply, + socket + |> assign(config_defaults: config_defaults) + |> load_config_defaults()} + + {:error, _} -> + {:noreply, socket} + end + end + + @impl true + def handle_async(:rbac_check, {:ok, %{errors: errors, permissions: permissions}}, socket) do + status = if errors === [], do: :ok, else: :errors + {:noreply, assign(socket, :rbac, %{status: status, errors: errors, permissions: permissions})} + end + + def handle_async(:load_namespace_options, {:ok, [:ok, {:ok, resp}]}, socket) do + socket = + case resp do + %Req.Response{status: 200, body: %{"items" => resources}} -> + namespace_options = Enum.map(resources, & &1["metadata"]["name"]) + + socket + |> assign(:namespace_options, namespace_options) + |> set_namespace(List.first(namespace_options)) + |> assign(:cluster_check, %{status: :ok, error: nil}) + + %Req.Response{status: _other} -> + # cannot list namespaces + socket + |> assign(:namespace_options, nil) + |> assign(:cluster_check, %{status: :ok, error: nil}) + end + + {:noreply, socket} + end + + def handle_async(:delete_pvc, {:ok, result}, socket) do + socket = + case result do + {:ok, %{status: 200}} -> + socket + |> assign(home_pvc: nil, pvc_action: nil) + |> pvc_options() + + {:ok, %{body: %{"message" => message}}} -> + assign_nested(socket, :pvc_action, error: message, type: :delete) + end + + {:noreply, socket} + end + + def handle_async(:create_pvc, {:ok, result}, socket) do + socket = + case result do + {:ok, %{status: 201, body: created_pvc}} -> + socket + |> assign(home_pvc: created_pvc["metadata"]["name"], pvc_action: nil) + |> pvc_options() + + {:ok, %{body: body}} -> + socket + |> assign_nested(:pvc_action, + error: "Creating the PVC failed: #{body["message"]}", + type: :new + ) + + {:error, error} when is_exception(error) -> + socket + |> assign_nested(:pvc_action, + error: "Creating the PVC failed: #{Exception.message(error)}", + type: :new + ) + end + + {:noreply, socket} + end + + def handle_async(:load_namespace_options, {:ok, results}, socket) do + {:error, error} = List.first(results, &match?({:error, _}, &1)) + + socket = + socket + |> assign(:namespace_options, nil) + |> assign(:cluster_check, %{status: :error, error: error}) + + {:noreply, socket} + end + + def handle_async(:save_config, {:ok, result}, socket) do + socket = + case result do + :ok -> + assign(socket, save_config: nil) + + {:error, %Ecto.Changeset{} = changeset} -> + assign_nested(socket, :save_config, changeset: changeset, inflight: false) + + {:transport_error, error} -> + assign_nested(socket, :save_config, error: error, inflight: false) + end + + {:noreply, socket} + end + + defp label(namespace, runtime, runtime_status) do + reconnecting? = reconnecting?(namespace, runtime) + + case {reconnecting?, runtime_status} do + {true, :connected} -> "Reconnect" + {true, :connecting} -> "Connecting..." + _ -> "Connect" + end + end + + defp reconnecting?(namespace, runtime) do + match?(%Runtime.K8s{config: %{namespace: ^namespace}}, runtime) + end + + defp create_pvc(socket, pvc) do + namespace = socket.assigns.namespace + manifest = PVC.manifest(pvc, namespace) + req = socket.assigns.reqs.pvc + + socket + |> start_async(:create_pvc, fn -> Kubereq.create(req, manifest) end) + |> assign_nested(:pvc_action, type: :new_inflight) + end + + defp set_context(socket, nil), do: assign(socket, :context, nil) + + defp set_context(socket, context) do + kubeconfig = Kubereq.Kubeconfig.set_current_context(socket.assigns.kubeconfig, context) + + reqs = %{ + access_reviews: + Kubereq.new(kubeconfig, "apis/authorization.k8s.io/v1/selfsubjectaccessreviews"), + namespaces: Kubereq.new(kubeconfig, "api/v1/namespaces/:name"), + pod: Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/pods/:name"), + pvc: Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/persistentvolumeclaims/:name"), + sc: Kubereq.new(kubeconfig, "apis/storage.k8s.io/v1/storageclasses/:name") + } + + socket + |> start_async(:load_namespace_options, fn -> + [ + Task.async(fn -> + Livebook.K8s.Auth.can_i?(reqs.access_reviews, + verb: "create", + group: "authorization.k8s.io", + version: "v1", + resource: "selfsubjectaccessreviews" + ) + end), + Task.async(fn -> Kubereq.list(reqs.namespaces, nil) end) + ] + |> Task.await_many(:infinity) + end) + |> assign( + kubeconfig: kubeconfig, + context: context, + namespace: nil, + namespace_options: nil, + rbac_error: nil, + reqs: reqs, + cluster_check: %{status: :inflight, error: nil} + ) + end + + defp set_namespace(socket, nil) do + assign(socket, namespace: nil, rbac: %{status: :inflight, errors: [], permissions: []}) + end + + defp set_namespace(socket, ns) do + reqs = socket.assigns.reqs + + socket + |> start_async(:rbac_check, fn -> + {required_permissions, optional_permissions} = + Auth.batch_check(reqs.access_reviews, [ + # required permissions: + [verb: "get", version: "v1", resource: "pods", namespace: ns], + [verb: "list", version: "v1", resource: "pods", namespace: ns], + [verb: "watch", version: "v1", resource: "pods", namespace: ns], + [verb: "create", version: "v1", resource: "pods", namespace: ns], + [verb: "delete", version: "v1", resource: "pods", namespace: ns], + [verb: "create", version: "v1", resource: "pods/portforward", namespace: ns], + # optional permissions: + [verb: "list", version: "v1", resource: "persistentvolumeclaims", namespace: ns], + [verb: "create", version: "v1", resource: "persistentvolumeclaims", namespace: ns], + [verb: "delete", version: "v1", resource: "persistentvolumeclaims", namespace: ns], + [verb: "list", version: "v1", resource: "storageclasses", namespace: ns] + ]) + |> Enum.split(6) + + errors = + required_permissions + |> Enum.reject(&(&1 === :ok)) + |> Enum.map(fn {:error, error} -> error end) + + permissions = + optional_permissions + |> Enum.map(&(&1 === :ok)) + |> then(&Enum.zip([:list_pvc, :create_pvc, :delete_pvc, :list_sc], &1)) + |> Map.new() + + %{errors: errors, permissions: permissions} + end) + |> assign( + namespace: ns, + rbac: %{status: :inflight, errors: :inflight, permissions: :inflight} + ) + |> pvc_options() + end + + def set_pod_template(socket, pod_template_yaml) do + namespace = socket.assigns.namespace + + with {:parse, {:ok, pod_template}} <- + {:parse, YamlElixir.read_from_string(pod_template_yaml)}, + {:validate, :ok} <- {:validate, Pod.validate_pod_template(pod_template, namespace)} do + assign(socket, :pod_template, %{template: pod_template_yaml, status: :valid, message: nil}) + else + {:parse, {:error, error}} -> + assign(socket, :pod_template, %{ + template: pod_template_yaml, + status: :error, + message: Exception.message(error) + }) + + {:validate, {:error, message}} -> + assign(socket, :pod_template, %{ + template: pod_template_yaml, + status: :error, + message: message + }) + end + end + + defp pvc_options(%{assigns: %{rbac: %{permissions: %{list_pvc: false}}}} = socket) do + assign(socket, :pvcs, []) + end + + defp pvc_options(socket) do + %{reqs: %{pvc: req}, namespace: ns} = socket.assigns + + case Kubereq.list(req, ns) do + {:ok, %Req.Response{status: 200} = resp} -> + pvcs = + resp.body["items"] + |> Enum.reject(& &1["metadata"]["deletionTimestamp"]) + |> Enum.map(& &1["metadata"]["name"]) + + socket + |> assign(:pvcs, pvcs) + + _ -> + assign(socket, :pvcs, []) + end + end + + defp storage_classes(%{rbac: %{permissions: %{list_sc: false}}}), do: [] + + defp storage_classes(assigns) do + %{reqs: %{sc: req}} = assigns + + case Kubereq.list(req, nil) do + {:ok, %Req.Response{status: 200} = resp} -> + Enum.map(resp.body["items"], & &1["metadata"]["name"]) + + _ -> + [] + end + end + + defp config_secret_names(hub_secrets) do + names = + for %{name: name} <- hub_secrets, + String.starts_with?(name, @config_secret_prefix), + do: name + + Enum.sort(names) + end + + defp load_config_defaults(socket) do + config_defaults = socket.assigns.config_defaults + + socket + |> assign( + home_pvc: config_defaults["home_pvc"], + docker_tag: config_defaults["docker_tag"] + ) + |> set_context(config_defaults["context"]) + |> set_namespace(config_defaults["namespace"]) + |> set_pod_template(config_defaults["pod_template"]) + end + + defp config_secret_changeset(socket, attrs) do + hub = socket.assigns.hub + value = socket |> build_config() |> Jason.encode!() + secret = %Livebook.Secrets.Secret{hub_id: hub.id, name: nil, value: value} + + secret + |> Livebook.Secrets.change_secret(attrs) + |> validate_format(:name, ~r/^#{@config_secret_prefix}\w+$/, + message: "must be in the format #{@config_secret_prefix}*" + ) + end + + defp save_config_secret(socket, secret, changeset) do + hub = socket.assigns.hub + exists? = Enum.any?(socket.assigns.hub_secrets, &(&1.name == secret.name)) + + socket + |> start_async(:save_config, fn -> + result = + if exists? do + Livebook.Hubs.update_secret(hub, secret) + else + Livebook.Hubs.create_secret(hub, secret) + end + + with {:error, errors} <- result do + {:error, + changeset + |> Livebook.Utils.put_changeset_errors(errors) + |> Map.replace!(:action, :validate)} + end + end) + |> assign_nested(:save_config, inflight: true) + end + + defp assign_nested(socket, key, keyword) do + update(socket, key, fn map -> + Enum.reduce(keyword, map, fn {key, value}, map -> Map.replace!(map, key, value) end) + end) + end + + defp build_config(socket) do + %{ + context: socket.assigns.context, + namespace: socket.assigns.namespace, + home_pvc: socket.assigns.home_pvc, + docker_tag: socket.assigns.docker_tag, + pod_template: socket.assigns.pod_template.template + } + end +end diff --git a/lib/livebook_web/live/session_live/runtime_component.ex b/lib/livebook_web/live/session_live/runtime_component.ex index 0e3c561e5..4a339621d 100644 --- a/lib/livebook_web/live/session_live/runtime_component.ex +++ b/lib/livebook_web/live/session_live/runtime_component.ex @@ -73,6 +73,15 @@ defmodule LivebookWeb.SessionLive.RuntimeComponent do > Fly.io machine + <.choice_button + :if={Livebook.Config.runtime_enabled?(Livebook.Runtime.K8s)} + active={@type == "k8s"} + phx-click="set_runtime_type" + phx-value-type="k8s" + phx-target={@myself} + > + Kubernetes Pod +
type}, socket) do diff --git a/mix.exs b/mix.exs index 11cc56834..08db7dd8b 100644 --- a/mix.exs +++ b/mix.exs @@ -117,6 +117,8 @@ defmodule Livebook.MixProject do {:mint_web_socket, "~> 1.0.0"}, {:protobuf, "~> 0.12.0"}, {:dns_cluster, "~> 0.1.2"}, + {:kubereq, "~> 0.1.8"}, + {:yaml_elixir, "~> 2.11"}, {:phoenix_live_reload, "~> 1.2", only: :dev}, {:floki, ">= 0.27.0", only: :test}, {:bypass, "~> 2.1", only: :test}, diff --git a/mix.lock b/mix.lock index 4158e446b..ecde2eab6 100644 --- a/mix.lock +++ b/mix.lock @@ -21,6 +21,7 @@ "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "jose": {:hex, :jose, "1.11.10", "a903f5227417bd2a08c8a00a0cbcc458118be84480955e8d251297a425723f83", [:mix, :rebar3], [], "hexpm", "0d6cd36ff8ba174db29148fc112b5842186b68a90ce9fc2b3ec3afe76593e614"}, "jsx": {:hex, :jsx, "3.1.0", "d12516baa0bb23a59bb35dccaf02a1bd08243fcbb9efe24f2d9d056ccff71268", [:rebar3], [], "hexpm", "0c5cc8fdc11b53cc25cf65ac6705ad39e54ecc56d1c22e4adb8f5a53fb9427f3"}, + "kubereq": {:hex, :kubereq, "0.1.8", "d84b2a9cb0a5ae9e74243f0ff2d44d91db8e80c3b09498bdb7b1b562335416de", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:pluggable, "~> 1.0", [hex: :pluggable, repo: "hexpm", optional: false]}, {:req, "~> 0.5.0", [hex: :req, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.0", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "de02c60caa2f76a8d72dad329fb8c019f88cb2dd3e2ac8241927e0dabe3b90ad"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"}, @@ -42,6 +43,7 @@ "plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"}, "plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"}, "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, + "pluggable": {:hex, :pluggable, "1.1.0", "7eba3bc70c0caf4d9056c63c882df8862f7534f0145da7ab3a47ca73e4adb1e4", [:mix], [], "hexpm", "d12eb00ea47b21e92cd2700d6fbe3737f04b64e71b63aad1c0accde87c751637"}, "protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "req": {:hex, :req, "0.5.2", "70b4976e5fbefe84e5a57fd3eea49d4e9aa0ac015301275490eafeaec380f97f", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0c63539ab4c2d6ced6114d2684276cef18ac185ee00674ee9af4b1febba1f986"}, @@ -51,4 +53,6 @@ "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock_adapter": {:hex, :websock_adapter, "0.5.6", "0437fe56e093fd4ac422de33bf8fc89f7bc1416a3f2d732d8b2c8fd54792fe60", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "e04378d26b0af627817ae84c92083b7e97aca3121196679b73c73b99d0d133ea"}, + "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, + "yaml_elixir": {:hex, :yaml_elixir, "2.11.0", "9e9ccd134e861c66b84825a3542a1c22ba33f338d82c07282f4f1f52d847bd50", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "53cc28357ee7eb952344995787f4bb8cc3cecbf189652236e9b163e8ce1bc242"}, } diff --git a/rel/server/overlays/bin/start_runtime.exs b/rel/server/overlays/bin/start_runtime.exs index e6c966a57..cfab9c41f 100644 --- a/rel/server/overlays/bin/start_runtime.exs +++ b/rel/server/overlays/bin/start_runtime.exs @@ -6,10 +6,22 @@ File.cd!(System.user_home!()) dist_port: dist_port } = System.fetch_env!("LIVEBOOK_RUNTIME") |> Base.decode64!() |> :erlang.binary_to_term() -# This is the only Fly-specific part of starting Livebook as runtime -app = System.fetch_env!("FLY_APP_NAME") -machine_id = System.fetch_env!("FLY_MACHINE_ID") -node = :"#{node_base}@#{machine_id}.vm.#{app}.internal" +node = + cond do + System.get_env("FLY_APP_NAME") -> + # This is the only Fly-specific part of starting Livebook as runtime + app = System.fetch_env!("FLY_APP_NAME") + machine_id = System.fetch_env!("FLY_MACHINE_ID") + :"#{node_base}@#{machine_id}.vm.#{app}.internal" + + System.get_env("POD_IP") -> + # This is the only K8s-specific part of starting Livebook as runtime + hostname = System.fetch_env!("POD_IP") + :"#{node_base}@#{hostname}" + + true -> + raise "expected either POD_IP (for k8s) or FLY_APP_NAME (for Fly.io) to be set" + end # We persist the information before the node is reachable :persistent_term.put(:livebook_runtime_info, %{ diff --git a/test/livebook/runtime/k8s_test.exs b/test/livebook/runtime/k8s_test.exs new file mode 100644 index 000000000..f65552571 --- /dev/null +++ b/test/livebook/runtime/k8s_test.exs @@ -0,0 +1,145 @@ +defmodule Livebook.Runtime.K8sTest do + alias Livebook.Runtime + use ExUnit.Case, async: true + + # To run these tests, install [Kind](https://kind.sigs.k8s.io/) on your machine. + @moduletag :k8s + + @assert_receive_timeout 10_000 + @cluster_name "livebook-runtime-test" + @kubeconfig_path "tmp/k8s_runtime/kubeconfig.yaml" + + @default_pod_template """ + apiVersion: v1 + kind: Pod + metadata: + generateName: livebook-runtime- + labels: + livebook.dev/runtime: integration-test + spec: + containers: + - image: ghcr.io/livebook-dev/livebook:nightly + name: livebook-runtime + env: + - name: TEST_VAR + value: present + + """ + setup_all do + unless System.find_executable("kind") do + raise "kind is not installed" + end + + clusters = cmd!(~w(kind get clusters)) |> String.split("\n", trim: true) + + if @cluster_name not in clusters do + cmd!(~w(kind create cluster --name #{@cluster_name})) + end + + # Export kubeconfig file + cmd!(~w(kind export kubeconfig --name #{@cluster_name} --kubeconfig #{@kubeconfig_path})) + + # In most cases we can use the existing image, but when making + # changes to the remote runtime code, we need to build a new image + if System.get_env("TEST_K8S_BUILD_IMAGE") in ~w(true 1) do + {_, versions} = Code.eval_file("versions") + + cmd!(~w(docker build + --build-arg BASE_IMAGE=hexpm/elixir:#{versions[:elixir]}-erlang-#{versions[:otp]}-ubuntu-#{versions[:ubuntu]} + --build-arg VARIANT=default + -t ghcr.io/livebook-dev/livebook:nightly .)) + else + cmd!(~w(docker image pull ghcr.io/livebook-dev/livebook:nightly)) + end + + # Load container image into Kind cluster + cmd!(~w(kind load docker-image --name #{@cluster_name} ghcr.io/livebook-dev/livebook:nightly)) + + :ok + end + + test "connecting flow" do + config = config() + req = req() + + assert [] = list_pods(req) + + pid = Runtime.K8s.new(config, req) |> Runtime.connect() + + assert_receive {:runtime_connect_info, ^pid, "create pod"}, @assert_receive_timeout + + assert_receive {:runtime_connect_info, ^pid, "waiting for pod"}, @assert_receive_timeout + + assert_receive {:runtime_connect_info, ^pid, "created container livebook-runtime"}, + @assert_receive_timeout + + assert_receive {:runtime_connect_info, ^pid, "started container livebook-runtime"}, + @assert_receive_timeout + + assert_receive {:runtime_connect_info, ^pid, "start proxy"}, @assert_receive_timeout + assert_receive {:runtime_connect_info, ^pid, "connect to node"}, @assert_receive_timeout + assert_receive {:runtime_connect_info, ^pid, "initialize node"}, @assert_receive_timeout + assert_receive {:runtime_connect_done, ^pid, {:ok, runtime}}, @assert_receive_timeout + + Runtime.take_ownership(runtime) + + assert [_] = list_pods(req) + + # Verify that we can actually evaluate code on the Kubernetes Pod + Runtime.evaluate_code(runtime, :elixir, ~s/System.fetch_env!("TEST_VAR")/, {:c1, :e1}, []) + assert_receive {:runtime_evaluation_response, :e1, %{type: :terminal_text, text: text}, _meta} + assert text =~ "present" + + Runtime.disconnect(runtime) + + # Wait for Pod to terminate + assert :ok == + Kubereq.wait_until( + req, + "default", + runtime.pod_name, + &(&1["status"]["phase"] == "Succeeded") + ) + + # Finally, delete the Pod object + Kubereq.delete(req, "default", runtime.pod_name) + end + + defp req() do + [Kubereq.Kubeconfig.ENV, {Kubereq.Kubeconfig.File, path: @kubeconfig_path}] + |> Kubereq.Kubeconfig.load() + |> Kubereq.new("api/v1/namespaces/:namespace/pods/:name") + end + + defp config(attrs \\ %{}) do + defaults = %{ + context: "kind-#{@cluster_name}", + namespace: "default", + home_pvc: nil, + docker_tag: "nightly", + pod_template: @default_pod_template + } + + Map.merge(defaults, attrs) + end + + defp list_pods(req) do + {:ok, resp} = + Kubereq.list(req, "default", + label_selectors: [{"livebook.dev/runtime", "integration-test"}], + field_selectors: [{"status.phase", "Running"}] + ) + + resp.body["items"] + end + + defp cmd!([command | args]) do + {output, status} = System.cmd(command, args, stderr_to_stdout: true) + + if status != 0 do + raise "command #{inspect(command)} #{inspect(args)} failed" + end + + output + end +end diff --git a/test/livebook_web/live/session_live_test.exs b/test/livebook_web/live/session_live_test.exs index af14557b7..0a737b30e 100644 --- a/test/livebook_web/live/session_live_test.exs +++ b/test/livebook_web/live/session_live_test.exs @@ -1171,6 +1171,201 @@ defmodule LivebookWeb.SessionLiveTest do |> has_element?() end + test "configuring k8s runtime", %{conn: conn, session: session} do + {:ok, view, _} = live(conn, ~p"/sessions/#{session.id}/settings/runtime") + + Session.subscribe(session.id) + + Req.Test.stub(:k8s_cluster, Livebook.K8sClusterStub) + + view + |> element("#runtime-settings-modal button", "Kubernetes Pod") + |> render_click() + + # Check context switcher and switch to context with no permission + + view + |> element(~s{form[phx-change="set_context"]}) + |> render_change(%{context: "no-permission"}) + + rendered = render_async(view) + + assert rendered =~ "Authenticated user has no permission to" + refute rendered =~ "You can fully customize" + + # Test cluster with full access + + view + |> element(~s{form[phx-change="set_context"]}) + |> render_change(%{context: "default"}) + + render_async(view) + + view + |> element(~s{form[phx-change="set_namespace"]}) + |> render_change(%{namespace: "default"}) + + assert view + |> element(~s{select[name="home_pvc"] option[value="foo-pvc"]}) + |> has_element?() + + assert view + |> element(~s{select[name="home_pvc"] option[value="new-pvc"]}) + |> has_element?() + + assert render_async(view) =~ "You can fully customize" + + # Create new PVC + + view + |> element(~s{button[phx-click="new_pvc"]}) + |> render_click() + + assert view + |> element(~s{form[phx-submit="create_pvc"]}) + |> has_element?() + + # Cancel button intermezzo + + view + |> element(~s{button[phx-click="cancel_new_pvc"]}) + |> render_click() + + refute view + |> element(~s{form[phx-submit="create_pvc"]}) + |> has_element?() + + # Create new PVC again + + view + |> element(~s{button[phx-click="new_pvc"]}) + |> render_click() + + assert view + |> element( + ~s{form[phx-submit="create_pvc"] select[name="pvc[storage_class]"] option[value="first-storage-class"]} + ) + |> has_element?() + + assert view + |> element( + ~s{form[phx-submit="create_pvc"] select[name="pvc[storage_class]"] option[value="second-storage-class"]} + ) + |> has_element?() + + assert view + |> element(~s{form[phx-submit="create_pvc"] button[type="submit"][disabled]}) + |> has_element?() + + view + |> element(~s{form[phx-submit="create_pvc"]}) + |> render_change(%{pvc: %{name: "new-pvc", size_gb: 1}}) + + assert view + |> element(~s{form[phx-submit="create_pvc"] button[type="submit"]:not([disabled])}) + |> has_element?() + + Req.Test.expect(:k8s_cluster, Livebook.K8sClusterStub) + + view + |> element(~s{form[phx-submit="create_pvc"]}) + |> render_submit(%{pvc: %{name: "new-pvc", size_gb: 1}}) + + Req.Test.verify!() + + # Delete a PVC + + view + |> element(~s{button[phx-click="delete_pvc"]}) + |> render_click() + + assert render_async(view) =~ + "Are you sure you want to irreversibly delete Persistent Volume Claim" + + Req.Test.expect(:k8s_cluster, Livebook.K8sClusterStub) + + view + |> element(~s{button[phx-click="confirm_delete_pvc"]}) + |> render_click() + + Req.Test.verify!() + + # Pod Template Validation + + refute render_async(view) =~ ~s/Make sure to define a valid resource of apiVersion / + + view + |> element(~s{form[phx-change="set_pod_template"]}) + |> render_change(%{pod_template: ""}) + + assert render_async(view) =~ ~s/Make sure to define a valid resource of apiVersion / + + view + |> element(~s{form[phx-change="set_pod_template"]}) + |> render_change(%{ + pod_template: """ + apiVersion: v1 + kind: Pod + metadata: + generateName: livebook-runtime- + spec: + containers: + - name: other-name + """ + }) + + assert render_async(view) =~ ~s/Main container is missing./ + + # We do not actually connect the runtime. We test connecting againast the + # real API separately + end + + test "populates k8s runtime config form existing runtime", %{conn: conn, session: session} do + pod_template = """ + apiVersion: v1 + kind: Pod + metadata: + generateName: livebook-runtime- + labels: + livebook.dev/component: test + spec: + containers: + - name: livebook-runtime\ + """ + + runtime = + Runtime.K8s.new( + %{ + context: "default", + namespace: "default", + home_pvc: "foo-pvc", + docker_tag: "nightly", + pod_template: pod_template + }, + nil + ) + + Req.Test.stub(:k8s_cluster, Livebook.K8sClusterStub) + + Session.set_runtime(session.pid, runtime) + + {:ok, view, _} = live(conn, ~p"/sessions/#{session.id}/settings/runtime") + + assert render_async(view) =~ "You can fully customize" + + assert view + |> element(~s{select[name="home_pvc"] option[value="foo-pvc"][selected]}) + |> has_element?() + + assert view + |> element(~s{select[name="home_pvc"] option[value="new-pvc"]}) + |> has_element?() + + assert view + |> element(~s{button[phx-click="init"]:not([disabled])}) + |> has_element?() + end + test "saving and loading config from secret", %{conn: conn, session: session} do runtime = Runtime.Fly.new(%{ diff --git a/test/support/k8s_cluster_stub.ex b/test/support/k8s_cluster_stub.ex new file mode 100644 index 000000000..57e13a72b --- /dev/null +++ b/test/support/k8s_cluster_stub.ex @@ -0,0 +1,66 @@ +defmodule Livebook.K8sClusterStub do + use Plug.Router + + require Logger + + plug :match + + plug Plug.Parsers, + parsers: [:urlencoded, :json], + json_decoder: Jason + + plug :dispatch + + post "/apis/authorization.k8s.io/v1/selfsubjectaccessreviews" do + resource = conn.body_params["spec"]["resourceAttributes"]["resource"] + allowed = conn.host == "default" or resource == "selfsubjectaccessreviews" + + conn + |> put_status(201) + |> Req.Test.json(%{"status" => %{"allowed" => allowed}}) + end + + get "/api/v1/namespaces", host: "default" do + Req.Test.json(conn, %{"items" => [%{"metadata" => %{"name" => "default"}}]}) + end + + get "/api/v1/namespaces", host: "no-permission" do + send_resp(conn, 403, "") + end + + get "apis/storage.k8s.io/v1/storageclasses", host: "default" do + Req.Test.json(conn, %{ + "items" => [ + %{"metadata" => %{"name" => "first-storage-class"}}, + %{"metadata" => %{"name" => "second-storage-class"}} + ] + }) + end + + get "/api/v1/namespaces/default/persistentvolumeclaims", host: "default" do + Req.Test.json(conn, %{ + "items" => [ + %{"metadata" => %{"name" => "foo-pvc"}}, + %{"metadata" => %{"name" => "new-pvc"}} + ] + }) + end + + delete "/api/v1/namespaces/default/persistentvolumeclaims/:name", host: "default" do + send_resp(conn, 200, "") + end + + post "/api/v1/namespaces/default/persistentvolumeclaims", host: "default" do + conn + |> put_status(201) + |> Req.Test.json(%{"metadata" => %{"name" => "new-pvc"}}) + end + + match _ do + Logger.error("Unimplemented #{conn.method} Stub Request to #{conn.request_path}") + + conn + |> put_status(500) + |> Req.Test.text("Endpoint not implemented") + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index f822fe85c..d1e0ca55d 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -15,7 +15,8 @@ Application.put_env(:livebook, :runtime_modules, [ Livebook.Runtime.Standalone, Livebook.Runtime.Attached, Livebook.Runtime.Embedded, - Livebook.Runtime.Fly + Livebook.Runtime.Fly, + Livebook.Runtime.K8s ]) defmodule Livebook.Runtime.Embedded.Packages do @@ -75,5 +76,5 @@ fly_exclude = if System.get_env("TEST_FLY_API_TOKEN"), do: [], else: [:fly] ExUnit.start( assert_receive_timeout: if(windows?, do: 5_000, else: 1_500), - exclude: erl_docs_exclude ++ windows_exclude ++ teams_exclude ++ fly_exclude + exclude: erl_docs_exclude ++ windows_exclude ++ teams_exclude ++ fly_exclude ++ [:k8s] )