mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-11-09 21:51:42 +08:00
Remove storage abstraction (#1777)
This commit is contained in:
parent
deacb1a4a4
commit
42bc98a42d
7 changed files with 313 additions and 351 deletions
|
|
@ -31,7 +31,6 @@ config :livebook,
|
||||||
learn_notebooks: [],
|
learn_notebooks: [],
|
||||||
plugs: [],
|
plugs: [],
|
||||||
shutdown_callback: nil,
|
shutdown_callback: nil,
|
||||||
storage: Livebook.Storage.Ets,
|
|
||||||
update_instructions_url: nil,
|
update_instructions_url: nil,
|
||||||
within_iframe: false,
|
within_iframe: false,
|
||||||
allowed_uri_schemes: []
|
allowed_uri_schemes: []
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ defmodule Livebook.Application do
|
||||||
# Start a supervisor for Livebook tasks
|
# Start a supervisor for Livebook tasks
|
||||||
{Task.Supervisor, name: Livebook.TaskSupervisor},
|
{Task.Supervisor, name: Livebook.TaskSupervisor},
|
||||||
# Start the storage module
|
# Start the storage module
|
||||||
Livebook.Storage.current(),
|
Livebook.Storage,
|
||||||
# Start the periodic version check
|
# Start the periodic version check
|
||||||
Livebook.UpdateCheck,
|
Livebook.UpdateCheck,
|
||||||
# Periodic measurement of system resources
|
# Periodic measurement of system resources
|
||||||
|
|
|
||||||
|
|
@ -82,16 +82,12 @@ defmodule Livebook.Settings do
|
||||||
@spec remove_file_system(file_system_id()) :: :ok
|
@spec remove_file_system(file_system_id()) :: :ok
|
||||||
def remove_file_system(file_system_id) do
|
def remove_file_system(file_system_id) do
|
||||||
if default_file_system_id() == file_system_id do
|
if default_file_system_id() == file_system_id do
|
||||||
storage().delete_key(:settings, "global", :default_file_system_id)
|
Livebook.Storage.delete_key(:settings, "global", :default_file_system_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
Livebook.NotebookManager.remove_file_system(file_system_id)
|
Livebook.NotebookManager.remove_file_system(file_system_id)
|
||||||
|
|
||||||
storage().delete(:filesystem, file_system_id)
|
Livebook.Storage.delete(:filesystem, file_system_id)
|
||||||
end
|
|
||||||
|
|
||||||
defp storage() do
|
|
||||||
Livebook.Storage.current()
|
|
||||||
end
|
end
|
||||||
|
|
||||||
defp storage_to_fs(%{type: "s3"} = config) do
|
defp storage_to_fs(%{type: "s3"} = config) do
|
||||||
|
|
@ -230,7 +226,7 @@ defmodule Livebook.Settings do
|
||||||
"""
|
"""
|
||||||
@spec set_default_file_system(file_system_id()) :: :ok
|
@spec set_default_file_system(file_system_id()) :: :ok
|
||||||
def set_default_file_system(file_system_id) do
|
def set_default_file_system(file_system_id) do
|
||||||
storage().insert(:settings, "global", default_file_system_id: file_system_id)
|
Livebook.Storage.insert(:settings, "global", default_file_system_id: file_system_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
|
|
@ -238,7 +234,7 @@ defmodule Livebook.Settings do
|
||||||
"""
|
"""
|
||||||
@spec default_file_system() :: Filesystem.t()
|
@spec default_file_system() :: Filesystem.t()
|
||||||
def default_file_system() do
|
def default_file_system() do
|
||||||
case storage().fetch(:filesystem, default_file_system_id()) do
|
case Livebook.Storage.fetch(:filesystem, default_file_system_id()) do
|
||||||
{:ok, file} -> storage_to_fs(file)
|
{:ok, file} -> storage_to_fs(file)
|
||||||
:error -> Livebook.Config.local_file_system()
|
:error -> Livebook.Config.local_file_system()
|
||||||
end
|
end
|
||||||
|
|
@ -249,7 +245,7 @@ defmodule Livebook.Settings do
|
||||||
"""
|
"""
|
||||||
@spec default_file_system_id() :: file_system_id()
|
@spec default_file_system_id() :: file_system_id()
|
||||||
def default_file_system_id() do
|
def default_file_system_id() do
|
||||||
case storage().fetch_key(:settings, "global", :default_file_system_id) do
|
case Livebook.Storage.fetch_key(:settings, "global", :default_file_system_id) do
|
||||||
{:ok, default_file_system_id} -> default_file_system_id
|
{:ok, default_file_system_id} -> default_file_system_id
|
||||||
:error -> "local"
|
:error -> "local"
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,18 @@
|
||||||
defmodule Livebook.Storage do
|
defmodule Livebook.Storage do
|
||||||
@moduledoc """
|
@moduledoc false
|
||||||
Behaviour defining an interface for storing arbitrary data in
|
|
||||||
[Entity-Attribute-Value](https://en.wikipedia.org/wiki/Entity%E2%80%93attribute%E2%80%93value_model) fashion.
|
# Storage for arbitrary data in [Entity-Attribute-Value](https://en.wikipedia.org/wiki/Entity%E2%80%93attribute%E2%80%93value_model)
|
||||||
"""
|
# fashion.
|
||||||
|
#
|
||||||
|
# The storage uses an ETS table and synchronizes all the changes to
|
||||||
|
# a file, so they are restored when the application is started again.
|
||||||
|
#
|
||||||
|
# `insert` and `delete` operations are supposed to be called using a GenServer
|
||||||
|
# while all the lookups can be performed by directly accessing the named table.
|
||||||
|
|
||||||
|
use GenServer
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
@type namespace :: atom()
|
@type namespace :: atom()
|
||||||
@type entity_id :: binary()
|
@type entity_id :: binary()
|
||||||
|
|
@ -22,38 +32,69 @@ defmodule Livebook.Storage do
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Returns all values in namespace.
|
Starts the storage process.
|
||||||
|
|
||||||
all(:filesystem)
|
|
||||||
[%{id: "rand-id", type: "s3", bucket_url: "/...", secret: "abc", access_key: "xyz"}]
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@callback all(namespace()) :: [entity()]
|
@spec start_link(keyword()) :: GenServer.on_start()
|
||||||
|
def start_link(opts) do
|
||||||
|
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
||||||
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Delegate for `c:all/1`.
|
Returns all values in namespace.
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
|
||||||
|
Livebook.Storage.all(:filesystem)
|
||||||
|
#=> [%{id: "rand-id", type: "s3", bucket_url: "/...", secret: "abc", access_key: "xyz"}]
|
||||||
|
|
||||||
"""
|
"""
|
||||||
def all(namespace), do: current().all(namespace)
|
@spec all(namespace()) :: [entity()]
|
||||||
|
def all(namespace) do
|
||||||
|
table_name()
|
||||||
|
|> :ets.match({{namespace, :"$1"}, :"$2", :"$3", :_})
|
||||||
|
|> Enum.group_by(
|
||||||
|
fn [entity_id, _attr, _val] -> entity_id end,
|
||||||
|
fn [_id, attr, val] -> {attr, val} end
|
||||||
|
)
|
||||||
|
|> Enum.map(fn {entity_id, attributes} ->
|
||||||
|
attributes
|
||||||
|
|> Map.new()
|
||||||
|
|> Map.put(:id, entity_id)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Returns a map identified by `entity_id` in `namespace`.
|
Returns a map identified by `entity_id` in `namespace`.
|
||||||
|
|
||||||
fetch(:filesystem, "rand-id")
|
## Examples
|
||||||
|
|
||||||
|
Livebook.Storage.fetch(:filesystem, "rand-id")
|
||||||
#=> {:ok, %{id: "rand-id", type: "s3", bucket_url: "/...", secret: "abc", access_key: "xyz"}}
|
#=> {:ok, %{id: "rand-id", type: "s3", bucket_url: "/...", secret: "abc", access_key: "xyz"}}
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@callback fetch(namespace(), entity_id()) :: {:ok, entity()} | :error
|
@spec fetch(namespace(), entity_id()) :: {:ok, entity()} | :error
|
||||||
|
def fetch(namespace, entity_id) do
|
||||||
|
table_name()
|
||||||
|
|> :ets.lookup({namespace, entity_id})
|
||||||
|
|> case do
|
||||||
|
[] ->
|
||||||
|
:error
|
||||||
|
|
||||||
@doc """
|
entries ->
|
||||||
Delegate for `c:fetch/2`.
|
entries
|
||||||
"""
|
|> Enum.map(fn {_key, attr, val, _timestamp} -> {attr, val} end)
|
||||||
def fetch(namespace, id), do: current().fetch(namespace, id)
|
|> Map.new()
|
||||||
|
|> Map.put(:id, entity_id)
|
||||||
|
|> then(&{:ok, &1})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Raising delegate for `c:fetch/2`.
|
Raising delegate for `c:fetch/2`.
|
||||||
"""
|
"""
|
||||||
|
@spec fetch!(namespace(), entity_id()) :: entity()
|
||||||
def fetch!(namespace, id) do
|
def fetch!(namespace, id) do
|
||||||
case current().fetch(namespace, id) do
|
case fetch(namespace, id) do
|
||||||
{:ok, entity} -> entity
|
{:ok, entity} -> entity
|
||||||
:error -> raise NotFoundError, namespace: namespace, id: id
|
:error -> raise NotFoundError, namespace: namespace, id: id
|
||||||
end
|
end
|
||||||
|
|
@ -62,47 +103,144 @@ defmodule Livebook.Storage do
|
||||||
@doc """
|
@doc """
|
||||||
Returns the value for a given `namespace`-`entity_id`-`attribute`.
|
Returns the value for a given `namespace`-`entity_id`-`attribute`.
|
||||||
|
|
||||||
fetch_key(:filesystem, "rand-id", :type)
|
## Examples
|
||||||
|
|
||||||
|
Livebook.Storage.fetch_key(:filesystem, "rand-id", :type)
|
||||||
#=> {:ok, "s3"}
|
#=> {:ok, "s3"}
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@callback fetch_key(namespace(), entity_id(), attribute()) :: {:ok, value()} | :error
|
@spec fetch_key(namespace(), entity_id(), attribute()) :: {:ok, value()} | :error
|
||||||
|
def fetch_key(namespace, entity_id, key) do
|
||||||
|
table_name()
|
||||||
|
|> :ets.match({{namespace, entity_id}, key, :"$1", :_})
|
||||||
|
|> case do
|
||||||
|
[[value]] -> {:ok, value}
|
||||||
|
[] -> :error
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Delegate for `c:fetch_key/3`.
|
Inserts given list of attribute-value pairs to a entity belonging to
|
||||||
|
specified namespace.
|
||||||
"""
|
"""
|
||||||
def fetch_key(namespace, id, attribute), do: current().fetch_key(namespace, id, attribute)
|
@spec insert(namespace(), entity_id(), [{attribute(), value()}]) :: :ok
|
||||||
|
def insert(namespace, entity_id, attributes) do
|
||||||
@doc """
|
GenServer.call(__MODULE__, {:insert, namespace, entity_id, attributes})
|
||||||
Inserts given list of attribute-value paris to a entity belonging to specified namespace.
|
end
|
||||||
"""
|
|
||||||
@callback insert(namespace(), entity_id(), [{attribute(), value()}]) :: :ok
|
|
||||||
|
|
||||||
@doc """
|
|
||||||
Delegate for `c:insert/3`.
|
|
||||||
"""
|
|
||||||
def insert(namespace, id, attributes), do: current().insert(namespace, id, attributes)
|
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Deletes an entity of given id from given namespace.
|
Deletes an entity of given id from given namespace.
|
||||||
"""
|
"""
|
||||||
@callback delete(namespace(), entity_id()) :: :ok
|
@spec delete(namespace(), entity_id()) :: :ok
|
||||||
|
def delete(namespace, entity_id) do
|
||||||
@doc """
|
GenServer.call(__MODULE__, {:delete, namespace, entity_id})
|
||||||
Delegate for `c:delete/2`.
|
end
|
||||||
"""
|
|
||||||
def delete(namespace, id), do: current().delete(namespace, id)
|
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Deletes an attribute from given entity.
|
Deletes an attribute from given entity.
|
||||||
"""
|
"""
|
||||||
@callback delete_key(namespace(), entity_id(), attribute()) :: :ok
|
@spec delete_key(namespace(), entity_id(), attribute()) :: :ok
|
||||||
|
def delete_key(namespace, entity_id, key) do
|
||||||
|
GenServer.call(__MODULE__, {:delete_key, namespace, entity_id, key})
|
||||||
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Delegate for `c:delete_key/3`.
|
Returns file path where the data is persisted.
|
||||||
"""
|
"""
|
||||||
def delete_key(namespace, id, attribute), do: current().delete_key(namespace, id, attribute)
|
@spec config_file_path() :: Path.t()
|
||||||
|
def config_file_path() do
|
||||||
|
Path.join([Livebook.Config.data_path(), "livebook_config.ets"])
|
||||||
|
end
|
||||||
|
|
||||||
@spec current() :: module()
|
@doc """
|
||||||
def current(), do: Application.fetch_env!(:livebook, :storage)
|
Synchronously awaits for all prior changes to be processed.
|
||||||
|
"""
|
||||||
|
@spec sync() :: :ok
|
||||||
|
def sync() do
|
||||||
|
GenServer.call(__MODULE__, :sync)
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def init(_opts) do
|
||||||
|
# Make sure that this process does not terminate abruptly
|
||||||
|
# in case it is persisting to disk. terminate/2 is still a no-op.
|
||||||
|
Process.flag(:trap_exit, true)
|
||||||
|
|
||||||
|
table = load_or_create_table()
|
||||||
|
:persistent_term.put(__MODULE__, table)
|
||||||
|
|
||||||
|
{:ok, %{table: table}}
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_call(:sync, _from, state) do
|
||||||
|
{:reply, :ok, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_call({:insert, namespace, entity_id, attributes}, _from, %{table: table} = state) do
|
||||||
|
keys_to_delete = Enum.map(attributes, fn {key, _val} -> key end)
|
||||||
|
|
||||||
|
delete_keys(table, namespace, entity_id, keys_to_delete)
|
||||||
|
|
||||||
|
timestamp = System.os_time(:millisecond)
|
||||||
|
|
||||||
|
attributes =
|
||||||
|
Enum.map(attributes, fn {attr, val} ->
|
||||||
|
{{namespace, entity_id}, attr, val, timestamp}
|
||||||
|
end)
|
||||||
|
|
||||||
|
:ets.insert(table, attributes)
|
||||||
|
{:reply, :ok, state, {:continue, :save_to_file}}
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_call({:delete, namespace, entity_id}, _from, %{table: table} = state) do
|
||||||
|
:ets.delete(table, {namespace, entity_id})
|
||||||
|
{:reply, :ok, state, {:continue, :save_to_file}}
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_call({:delete_key, namespace, entity_id, key}, _from, %{table: table} = state) do
|
||||||
|
delete_keys(table, namespace, entity_id, [key])
|
||||||
|
{:reply, :ok, state, {:continue, :save_to_file}}
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_continue(:save_to_file, %{table: table} = state) do
|
||||||
|
file_path = String.to_charlist(config_file_path())
|
||||||
|
:ok = :ets.tab2file(table, file_path)
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp table_name(), do: :persistent_term.get(__MODULE__)
|
||||||
|
|
||||||
|
defp load_or_create_table() do
|
||||||
|
path = config_file_path()
|
||||||
|
|
||||||
|
path
|
||||||
|
|> String.to_charlist()
|
||||||
|
|> :ets.file2tab()
|
||||||
|
|> case do
|
||||||
|
{:ok, tab} ->
|
||||||
|
Logger.info("Reading storage from #{path}")
|
||||||
|
tab
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
case reason do
|
||||||
|
{:read_error, {:file_error, _, :enoent}} -> :ok
|
||||||
|
_ -> Logger.warning("Could not open up #{config_file_path()}: #{inspect(reason)}")
|
||||||
|
end
|
||||||
|
|
||||||
|
:ets.new(__MODULE__, [:protected, :duplicate_bag])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp delete_keys(table, namespace, entity_id, keys) do
|
||||||
|
match_head = {{namespace, entity_id}, :"$1", :_, :_}
|
||||||
|
|
||||||
|
guards = Enum.map(keys, &{:==, :"$1", &1})
|
||||||
|
|
||||||
|
:ets.select_delete(table, [{match_head, guards, [true]}])
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -1,171 +0,0 @@
|
||||||
defmodule Livebook.Storage.Ets do
|
|
||||||
@moduledoc """
|
|
||||||
Ets implementation of `Livebook.Storage` behaviour.
|
|
||||||
|
|
||||||
The module is supposed to be started just once as it
|
|
||||||
is responsible for managing a named ets table.
|
|
||||||
|
|
||||||
`insert` and `delete` operations are supposed to be called using a GenServer
|
|
||||||
while all the lookups can be performed by directly accessing the named table.
|
|
||||||
"""
|
|
||||||
@behaviour Livebook.Storage
|
|
||||||
|
|
||||||
require Logger
|
|
||||||
use GenServer
|
|
||||||
|
|
||||||
@impl Livebook.Storage
|
|
||||||
def all(namespace) do
|
|
||||||
table_name()
|
|
||||||
|> :ets.match({{namespace, :"$1"}, :"$2", :"$3", :_})
|
|
||||||
|> Enum.group_by(
|
|
||||||
fn [entity_id, _attr, _val] -> entity_id end,
|
|
||||||
fn [_id, attr, val] -> {attr, val} end
|
|
||||||
)
|
|
||||||
|> Enum.map(fn {entity_id, attributes} ->
|
|
||||||
attributes
|
|
||||||
|> Map.new()
|
|
||||||
|> Map.put(:id, entity_id)
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl Livebook.Storage
|
|
||||||
def fetch(namespace, entity_id) do
|
|
||||||
table_name()
|
|
||||||
|> :ets.lookup({namespace, entity_id})
|
|
||||||
|> case do
|
|
||||||
[] ->
|
|
||||||
:error
|
|
||||||
|
|
||||||
entries ->
|
|
||||||
entries
|
|
||||||
|> Enum.map(fn {_key, attr, val, _timestamp} -> {attr, val} end)
|
|
||||||
|> Map.new()
|
|
||||||
|> Map.put(:id, entity_id)
|
|
||||||
|> then(&{:ok, &1})
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl Livebook.Storage
|
|
||||||
def fetch_key(namespace, entity_id, key) do
|
|
||||||
table_name()
|
|
||||||
|> :ets.match({{namespace, entity_id}, key, :"$1", :_})
|
|
||||||
|> case do
|
|
||||||
[[value]] -> {:ok, value}
|
|
||||||
[] -> :error
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
@spec config_file_path() :: Path.t()
|
|
||||||
def config_file_path() do
|
|
||||||
Path.join([Livebook.Config.data_path(), "livebook_config.ets"])
|
|
||||||
end
|
|
||||||
|
|
||||||
@spec sync() :: :ok
|
|
||||||
def sync() do
|
|
||||||
GenServer.call(__MODULE__, :sync)
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl Livebook.Storage
|
|
||||||
def insert(namespace, entity_id, attributes) do
|
|
||||||
GenServer.call(__MODULE__, {:insert, namespace, entity_id, attributes})
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl Livebook.Storage
|
|
||||||
def delete(namespace, entity_id) do
|
|
||||||
GenServer.call(__MODULE__, {:delete, namespace, entity_id})
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl Livebook.Storage
|
|
||||||
def delete_key(namespace, entity_id, key) do
|
|
||||||
GenServer.call(__MODULE__, {:delete_key, namespace, entity_id, key})
|
|
||||||
end
|
|
||||||
|
|
||||||
@spec start_link(keyword()) :: GenServer.on_start()
|
|
||||||
def start_link(opts) do
|
|
||||||
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl GenServer
|
|
||||||
def init(_opts) do
|
|
||||||
# Make sure that this process does not terminate abruptly
|
|
||||||
# in case it is persisting to disk. terminate/2 is still a no-op.
|
|
||||||
Process.flag(:trap_exit, true)
|
|
||||||
|
|
||||||
table = load_or_create_table()
|
|
||||||
:persistent_term.put(__MODULE__, table)
|
|
||||||
|
|
||||||
{:ok, %{table: table}}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl GenServer
|
|
||||||
def handle_call(:sync, _from, state) do
|
|
||||||
{:reply, :ok, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl GenServer
|
|
||||||
def handle_call({:insert, namespace, entity_id, attributes}, _from, %{table: table} = state) do
|
|
||||||
keys_to_delete = Enum.map(attributes, fn {key, _val} -> key end)
|
|
||||||
|
|
||||||
delete_keys(table, namespace, entity_id, keys_to_delete)
|
|
||||||
|
|
||||||
timestamp = System.os_time(:millisecond)
|
|
||||||
|
|
||||||
attributes =
|
|
||||||
Enum.map(attributes, fn {attr, val} ->
|
|
||||||
{{namespace, entity_id}, attr, val, timestamp}
|
|
||||||
end)
|
|
||||||
|
|
||||||
:ets.insert(table, attributes)
|
|
||||||
{:reply, :ok, state, {:continue, :save_to_file}}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl GenServer
|
|
||||||
def handle_call({:delete, namespace, entity_id}, _from, %{table: table} = state) do
|
|
||||||
:ets.delete(table, {namespace, entity_id})
|
|
||||||
{:reply, :ok, state, {:continue, :save_to_file}}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl GenServer
|
|
||||||
def handle_call({:delete_key, namespace, entity_id, key}, _from, %{table: table} = state) do
|
|
||||||
delete_keys(table, namespace, entity_id, [key])
|
|
||||||
{:reply, :ok, state, {:continue, :save_to_file}}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl GenServer
|
|
||||||
def handle_continue(:save_to_file, %{table: table} = state) do
|
|
||||||
file_path = String.to_charlist(config_file_path())
|
|
||||||
:ok = :ets.tab2file(table, file_path)
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
defp table_name(), do: :persistent_term.get(__MODULE__)
|
|
||||||
|
|
||||||
defp load_or_create_table() do
|
|
||||||
path = config_file_path()
|
|
||||||
|
|
||||||
path
|
|
||||||
|> String.to_charlist()
|
|
||||||
|> :ets.file2tab()
|
|
||||||
|> case do
|
|
||||||
{:ok, tab} ->
|
|
||||||
Logger.info("Reading storage from #{path}")
|
|
||||||
tab
|
|
||||||
|
|
||||||
{:error, reason} ->
|
|
||||||
case reason do
|
|
||||||
{:read_error, {:file_error, _, :enoent}} -> :ok
|
|
||||||
_ -> Logger.warning("Could not open up #{config_file_path()}: #{inspect(reason)}")
|
|
||||||
end
|
|
||||||
|
|
||||||
:ets.new(__MODULE__, [:protected, :duplicate_bag])
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp delete_keys(table, namespace, entity_id, keys) do
|
|
||||||
match_head = {{namespace, entity_id}, :"$1", :_, :_}
|
|
||||||
|
|
||||||
guards = Enum.map(keys, &{:==, :"$1", &1})
|
|
||||||
|
|
||||||
:ets.select_delete(table, [{match_head, guards, [true]}])
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
@ -1,125 +0,0 @@
|
||||||
defmodule Livebook.Storage.EtsTest do
|
|
||||||
use ExUnit.Case, async: true
|
|
||||||
|
|
||||||
alias Livebook.Storage.Ets
|
|
||||||
|
|
||||||
describe "insert/3 and fetch/2" do
|
|
||||||
test "properly inserts a new key-value attribute" do
|
|
||||||
assert :ok = Ets.insert(:insert, "test", key1: "val1", key2: "val2")
|
|
||||||
|
|
||||||
assert {:ok,
|
|
||||||
%{
|
|
||||||
id: "test",
|
|
||||||
key1: "val1",
|
|
||||||
key2: "val2"
|
|
||||||
}} = Ets.fetch(:insert, "test")
|
|
||||||
end
|
|
||||||
|
|
||||||
test "replaces already existing attributes with new values" do
|
|
||||||
assert :ok = Ets.insert(:insert, "replace", key1: "val1", key2: "val2")
|
|
||||||
|
|
||||||
assert {:ok,
|
|
||||||
%{
|
|
||||||
key1: "val1",
|
|
||||||
key2: "val2"
|
|
||||||
}} = Ets.fetch(:insert, "replace")
|
|
||||||
|
|
||||||
assert :ok =
|
|
||||||
Ets.insert(:insert, "replace", key1: "updated_val1", key2: "val2", key3: "val3")
|
|
||||||
|
|
||||||
assert {:ok,
|
|
||||||
%{
|
|
||||||
key1: "updated_val1",
|
|
||||||
key2: "val2",
|
|
||||||
key3: "val3"
|
|
||||||
}} = Ets.fetch(:insert, "replace")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "fetch_key/3" do
|
|
||||||
test "reads a given key" do
|
|
||||||
:ok = Ets.insert(:fetch_key, "test", key1: "val1")
|
|
||||||
assert Ets.fetch_key(:fetch_key, "test", :key1) == {:ok, "val1"}
|
|
||||||
assert Ets.fetch_key(:fetch_key, "test", :key2) == :error
|
|
||||||
end
|
|
||||||
|
|
||||||
test "handles nil accordingly" do
|
|
||||||
assert Ets.fetch_key(:fetch_key, "test_nil", :key1) == :error
|
|
||||||
:ok = Ets.insert(:fetch_key, "test_nil", key1: nil)
|
|
||||||
assert Ets.fetch_key(:fetch_key, "test_nil", :key1) == {:ok, nil}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
test "fetch/2" do
|
|
||||||
:ok = Ets.insert(:fetch, "test", key1: "val1")
|
|
||||||
|
|
||||||
assert {:ok,
|
|
||||||
%{
|
|
||||||
id: "test",
|
|
||||||
key1: "val1"
|
|
||||||
}} = Ets.fetch(:fetch, "test")
|
|
||||||
|
|
||||||
assert :error = Ets.fetch(:fetch, "unknown")
|
|
||||||
end
|
|
||||||
|
|
||||||
test "delete/2" do
|
|
||||||
:ok = Ets.insert(:delete, "test", key1: "val1")
|
|
||||||
|
|
||||||
assert {:ok, _entity} = Ets.fetch(:delete, "test")
|
|
||||||
|
|
||||||
assert :ok = Ets.delete(:delete, "test")
|
|
||||||
|
|
||||||
assert :error = Ets.fetch(:delete, "test")
|
|
||||||
end
|
|
||||||
|
|
||||||
test "delete_key/3" do
|
|
||||||
:ok = Ets.insert(:delete_key, "test", key1: "val1", key2: "val2")
|
|
||||||
|
|
||||||
assert :ok = Ets.delete_key(:delete_key, "test", :key2)
|
|
||||||
|
|
||||||
assert {:ok, "val1"} = Ets.fetch_key(:delete_key, "test", :key1)
|
|
||||||
assert :error = Ets.fetch_key(:delete_key, "test", :key2)
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "all/1" do
|
|
||||||
test "returns all inserted entities for given namespace" do
|
|
||||||
:ok = Ets.insert(:all, "test1", key1: "val1")
|
|
||||||
:ok = Ets.insert(:all, "test2", key1: "val1")
|
|
||||||
|
|
||||||
{:ok, entity1} = Ets.fetch(:all, "test1")
|
|
||||||
{:ok, entity2} = Ets.fetch(:all, "test2")
|
|
||||||
|
|
||||||
assert [^entity1, ^entity2] = Enum.sort(Ets.all(:all))
|
|
||||||
end
|
|
||||||
|
|
||||||
test "returns an empty list if no entities exist for given namespace" do
|
|
||||||
assert [] = Ets.all(:unknown_namespace)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "persistence" do
|
|
||||||
defp read_table_and_lookup(entity) do
|
|
||||||
:ok = Ets.sync()
|
|
||||||
|
|
||||||
{:ok, tab} =
|
|
||||||
Ets.config_file_path()
|
|
||||||
|> String.to_charlist()
|
|
||||||
|> :ets.file2tab()
|
|
||||||
|
|
||||||
:ets.lookup(tab, {:persistence, entity})
|
|
||||||
end
|
|
||||||
|
|
||||||
test "insert triggers saving to file" do
|
|
||||||
:ok = Ets.insert(:persistence, "insert", key: "val")
|
|
||||||
|
|
||||||
assert [_test] = read_table_and_lookup("insert")
|
|
||||||
end
|
|
||||||
|
|
||||||
test "delete triggers saving to file" do
|
|
||||||
:ok = Ets.insert(:persistence, "delete", key: "val")
|
|
||||||
:ok = Ets.delete(:persistence, "delete")
|
|
||||||
|
|
||||||
assert [] = read_table_and_lookup("delete")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
125
test/livebook/storage_test.exs
Normal file
125
test/livebook/storage_test.exs
Normal file
|
|
@ -0,0 +1,125 @@
|
||||||
|
defmodule Livebook.StorageTest do
|
||||||
|
use ExUnit.Case, async: true
|
||||||
|
|
||||||
|
alias Livebook.Storage
|
||||||
|
|
||||||
|
describe "insert/3 and fetch/2" do
|
||||||
|
test "properly inserts a new key-value attribute" do
|
||||||
|
assert :ok = Storage.insert(:insert, "test", key1: "val1", key2: "val2")
|
||||||
|
|
||||||
|
assert {:ok,
|
||||||
|
%{
|
||||||
|
id: "test",
|
||||||
|
key1: "val1",
|
||||||
|
key2: "val2"
|
||||||
|
}} = Storage.fetch(:insert, "test")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "replaces already existing attributes with new values" do
|
||||||
|
assert :ok = Storage.insert(:insert, "replace", key1: "val1", key2: "val2")
|
||||||
|
|
||||||
|
assert {:ok,
|
||||||
|
%{
|
||||||
|
key1: "val1",
|
||||||
|
key2: "val2"
|
||||||
|
}} = Storage.fetch(:insert, "replace")
|
||||||
|
|
||||||
|
assert :ok =
|
||||||
|
Storage.insert(:insert, "replace", key1: "updated_val1", key2: "val2", key3: "val3")
|
||||||
|
|
||||||
|
assert {:ok,
|
||||||
|
%{
|
||||||
|
key1: "updated_val1",
|
||||||
|
key2: "val2",
|
||||||
|
key3: "val3"
|
||||||
|
}} = Storage.fetch(:insert, "replace")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "fetch_key/3" do
|
||||||
|
test "reads a given key" do
|
||||||
|
:ok = Storage.insert(:fetch_key, "test", key1: "val1")
|
||||||
|
assert Storage.fetch_key(:fetch_key, "test", :key1) == {:ok, "val1"}
|
||||||
|
assert Storage.fetch_key(:fetch_key, "test", :key2) == :error
|
||||||
|
end
|
||||||
|
|
||||||
|
test "handles nil accordingly" do
|
||||||
|
assert Storage.fetch_key(:fetch_key, "test_nil", :key1) == :error
|
||||||
|
:ok = Storage.insert(:fetch_key, "test_nil", key1: nil)
|
||||||
|
assert Storage.fetch_key(:fetch_key, "test_nil", :key1) == {:ok, nil}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
test "fetch/2" do
|
||||||
|
:ok = Storage.insert(:fetch, "test", key1: "val1")
|
||||||
|
|
||||||
|
assert {:ok,
|
||||||
|
%{
|
||||||
|
id: "test",
|
||||||
|
key1: "val1"
|
||||||
|
}} = Storage.fetch(:fetch, "test")
|
||||||
|
|
||||||
|
assert :error = Storage.fetch(:fetch, "unknown")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "delete/2" do
|
||||||
|
:ok = Storage.insert(:delete, "test", key1: "val1")
|
||||||
|
|
||||||
|
assert {:ok, _entity} = Storage.fetch(:delete, "test")
|
||||||
|
|
||||||
|
assert :ok = Storage.delete(:delete, "test")
|
||||||
|
|
||||||
|
assert :error = Storage.fetch(:delete, "test")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "delete_key/3" do
|
||||||
|
:ok = Storage.insert(:delete_key, "test", key1: "val1", key2: "val2")
|
||||||
|
|
||||||
|
assert :ok = Storage.delete_key(:delete_key, "test", :key2)
|
||||||
|
|
||||||
|
assert {:ok, "val1"} = Storage.fetch_key(:delete_key, "test", :key1)
|
||||||
|
assert :error = Storage.fetch_key(:delete_key, "test", :key2)
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "all/1" do
|
||||||
|
test "returns all inserted entities for given namespace" do
|
||||||
|
:ok = Storage.insert(:all, "test1", key1: "val1")
|
||||||
|
:ok = Storage.insert(:all, "test2", key1: "val1")
|
||||||
|
|
||||||
|
{:ok, entity1} = Storage.fetch(:all, "test1")
|
||||||
|
{:ok, entity2} = Storage.fetch(:all, "test2")
|
||||||
|
|
||||||
|
assert [^entity1, ^entity2] = Enum.sort(Storage.all(:all))
|
||||||
|
end
|
||||||
|
|
||||||
|
test "returns an empty list if no entities exist for given namespace" do
|
||||||
|
assert [] = Storage.all(:unknown_namespace)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "persistence" do
|
||||||
|
defp read_table_and_lookup(entity) do
|
||||||
|
:ok = Storage.sync()
|
||||||
|
|
||||||
|
{:ok, tab} =
|
||||||
|
Storage.config_file_path()
|
||||||
|
|> String.to_charlist()
|
||||||
|
|> :ets.file2tab()
|
||||||
|
|
||||||
|
:ets.lookup(tab, {:persistence, entity})
|
||||||
|
end
|
||||||
|
|
||||||
|
test "insert triggers saving to file" do
|
||||||
|
:ok = Storage.insert(:persistence, "insert", key: "val")
|
||||||
|
|
||||||
|
assert [_test] = read_table_and_lookup("insert")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "delete triggers saving to file" do
|
||||||
|
:ok = Storage.insert(:persistence, "delete", key: "val")
|
||||||
|
:ok = Storage.delete(:persistence, "delete")
|
||||||
|
|
||||||
|
assert [] = read_table_and_lookup("delete")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
Loading…
Add table
Reference in a new issue