mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-09-04 20:14:57 +08:00
Use Req instead of :aws_signature (#2886)
This commit is contained in:
parent
6f55b0e5b2
commit
36bea7006a
3 changed files with 41 additions and 73 deletions
|
@ -9,9 +9,9 @@ defmodule Livebook.FileSystem.S3.Client do
|
|||
def list_objects(file_system, opts) do
|
||||
prefix = opts[:prefix]
|
||||
delimiter = opts[:delimiter]
|
||||
query = %{"list-type" => "2", "prefix" => prefix, "delimiter" => delimiter}
|
||||
params = %{"list-type" => "2", "prefix" => prefix, "delimiter" => delimiter}
|
||||
|
||||
case request(file_system, "/", query: query) do
|
||||
case request(file_system, "/", params: params) do
|
||||
{:ok, %{status: 200, body: %{"ListBucketResult" => result}}} ->
|
||||
file_keys = xml_get_list(result, "Contents", "Key")
|
||||
prefix_keys = xml_get_list(result, "CommonPrefixes", "Prefix")
|
||||
|
@ -36,7 +36,7 @@ defmodule Livebook.FileSystem.S3.Client do
|
|||
# name, but the listing endpoint does, so we just list keys
|
||||
# with an upper limit of 0 and retrieve the bucket name.
|
||||
|
||||
case request(file_system, "/", query: %{"list-type" => "2", "max-keys" => "0"}) do
|
||||
case request(file_system, "/", params: %{"list-type" => "2", "max-keys" => "0"}) do
|
||||
{:ok, %{status: 200, body: %{"ListBucketResult" => result}}} -> {:ok, result["Name"]}
|
||||
other -> request_response_to_error(other)
|
||||
end
|
||||
|
@ -134,9 +134,9 @@ defmodule Livebook.FileSystem.S3.Client do
|
|||
|
||||
body_md5 = :crypto.hash(:md5, body)
|
||||
headers = [{"Content-MD5", Base.encode64(body_md5)}]
|
||||
query = %{"delete" => ""}
|
||||
params = %{"delete" => ""}
|
||||
|
||||
case request(file_system, "/", query: query, method: :post, headers: headers, body: body) do
|
||||
case request(file_system, "/", params: params, method: :post, headers: headers, body: body) do
|
||||
{:ok, %{status: 200, body: %{"Error" => _}}} = result -> request_response_to_error(result)
|
||||
{:ok, %{status: 200}} -> :ok
|
||||
other -> request_response_to_error(other)
|
||||
|
@ -167,9 +167,9 @@ defmodule Livebook.FileSystem.S3.Client do
|
|||
"""
|
||||
@spec create_multipart_upload(S3.t(), String.t()) :: {:ok, String.t()} | {:error, String.t()}
|
||||
def create_multipart_upload(file_system, key) do
|
||||
query = %{"uploads" => ""}
|
||||
params = %{"uploads" => ""}
|
||||
|
||||
case request(file_system, "/" <> encode_key(key), method: :post, query: query, body: "") do
|
||||
case request(file_system, "/" <> encode_key(key), method: :post, params: params, body: "") do
|
||||
{:ok, %{status: 200, body: %{"InitiateMultipartUploadResult" => result}}} ->
|
||||
{:ok, result["UploadId"]}
|
||||
|
||||
|
@ -184,8 +184,8 @@ defmodule Livebook.FileSystem.S3.Client do
|
|||
@spec upload_part(S3.t(), String.t(), String.t(), pos_integer(), String.t()) ::
|
||||
{:ok, map()} | {:error, String.t()}
|
||||
def upload_part(file_system, key, upload_id, part_number, content) do
|
||||
query = %{"uploadId" => upload_id, "partNumber" => part_number}
|
||||
opts = [method: :put, query: query, body: content, long: true]
|
||||
params = %{"uploadId" => upload_id, "partNumber" => part_number}
|
||||
opts = [method: :put, params: params, body: content, long: true]
|
||||
|
||||
with {:ok, %{status: 200, headers: headers}} <-
|
||||
request(file_system, "/" <> encode_key(key), opts),
|
||||
|
@ -202,7 +202,7 @@ defmodule Livebook.FileSystem.S3.Client do
|
|||
@spec complete_multipart_upload(S3.t(), String.t(), String.t(), list(String.t())) ::
|
||||
:ok | {:error, String.t()}
|
||||
def complete_multipart_upload(file_system, key, upload_id, etags) do
|
||||
query = %{"uploadId" => upload_id}
|
||||
params = %{"uploadId" => upload_id}
|
||||
parts = for {etag, n} <- Enum.with_index(etags, 1), do: %{"PartNumber" => n, "ETag" => etag}
|
||||
|
||||
body =
|
||||
|
@ -210,7 +210,7 @@ defmodule Livebook.FileSystem.S3.Client do
|
|||
|> S3.XML.encode_to_iodata!()
|
||||
|> IO.iodata_to_binary()
|
||||
|
||||
case request(file_system, "/" <> encode_key(key), method: :post, query: query, body: body) do
|
||||
case request(file_system, "/" <> encode_key(key), method: :post, params: params, body: body) do
|
||||
{:ok, %{status: 200}} -> :ok
|
||||
other -> request_response_to_error(other)
|
||||
end
|
||||
|
@ -221,23 +221,21 @@ defmodule Livebook.FileSystem.S3.Client do
|
|||
"""
|
||||
@spec abort_multipart_upload(S3.t(), String.t(), String.t()) :: :ok | {:error, String.t()}
|
||||
def abort_multipart_upload(file_system, key, upload_id) do
|
||||
query = %{"uploadId" => upload_id}
|
||||
params = %{"uploadId" => upload_id}
|
||||
|
||||
case request(file_system, "/" <> encode_key(key), method: :delete, query: query) do
|
||||
case request(file_system, "/" <> encode_key(key), method: :delete, params: params) do
|
||||
{:ok, %{status: 204}} -> :ok
|
||||
other -> request_response_to_error(other)
|
||||
end
|
||||
end
|
||||
|
||||
defp download(file_system, path, collectable, opts \\ []) do
|
||||
query = opts[:query] || %{}
|
||||
headers = opts[:headers] || []
|
||||
url = build_url(file_system, path, query)
|
||||
headers = sign_headers(file_system, :get, url, headers)
|
||||
defp download(file_system, path, collectable) do
|
||||
req =
|
||||
Req.new(base_url: file_system.bucket_url)
|
||||
|> Livebook.Utils.req_attach_defaults()
|
||||
|> with_aws_sigv4(file_system)
|
||||
|
||||
req = Req.new() |> Livebook.Utils.req_attach_defaults()
|
||||
|
||||
case Req.get(req, url: url, headers: headers, into: collectable) do
|
||||
case Req.get(req, url: path, into: collectable) do
|
||||
{:ok, %{status: 200, body: collected}} ->
|
||||
{:ok, collected}
|
||||
|
||||
|
@ -249,67 +247,39 @@ defmodule Livebook.FileSystem.S3.Client do
|
|||
end
|
||||
end
|
||||
|
||||
defp with_aws_sigv4(req, file_system) do
|
||||
credentials = S3.credentials(file_system)
|
||||
|
||||
Req.merge(req,
|
||||
aws_sigv4: [
|
||||
access_key_id: credentials.access_key_id,
|
||||
secret_access_key: credentials.secret_access_key,
|
||||
token: credentials.token,
|
||||
region: file_system.region,
|
||||
service: :s3
|
||||
]
|
||||
)
|
||||
end
|
||||
|
||||
defp encode_key(key) do
|
||||
key
|
||||
|> String.split("/")
|
||||
|> Enum.map_join("/", fn segment -> URI.encode(segment, &URI.char_unreserved?/1) end)
|
||||
end
|
||||
|
||||
defp build_url(file_system, path, query) do
|
||||
query_string = URI.encode_query(query, :rfc3986)
|
||||
query_string = if query_string != "", do: "?#{query_string}", else: ""
|
||||
|
||||
file_system.bucket_url <> path <> query_string
|
||||
end
|
||||
|
||||
defp sign_headers(file_system, method, url, headers, body \\ nil) do
|
||||
credentials = S3.credentials(file_system)
|
||||
now = NaiveDateTime.utc_now() |> NaiveDateTime.to_erl()
|
||||
%{host: host} = URI.parse(file_system.bucket_url)
|
||||
headers = [{"Host", host} | headers]
|
||||
|
||||
headers =
|
||||
if credentials.token,
|
||||
do: [{"X-Amz-Security-Token", credentials.token} | headers],
|
||||
else: headers
|
||||
|
||||
:aws_signature.sign_v4(
|
||||
credentials.access_key_id,
|
||||
credentials.secret_access_key,
|
||||
file_system.region,
|
||||
"s3",
|
||||
now,
|
||||
Atom.to_string(method),
|
||||
url,
|
||||
headers,
|
||||
body || "",
|
||||
uri_encode_path: false
|
||||
)
|
||||
end
|
||||
|
||||
defp request(file_system, path, opts) do
|
||||
long = Keyword.get(opts, :long, false)
|
||||
decode? = Keyword.get(opts, :decode, true)
|
||||
{long, opts} = Keyword.pop(opts, :long, false)
|
||||
{decode?, opts} = Keyword.pop(opts, :decode, true)
|
||||
|
||||
method = opts[:method] || :get
|
||||
query = opts[:query] || %{}
|
||||
headers = opts[:headers] || []
|
||||
body = opts[:body]
|
||||
timeout = if long, do: 60_000, else: 30_000
|
||||
|
||||
url = build_url(file_system, path, query)
|
||||
headers = sign_headers(file_system, method, url, headers, body)
|
||||
req =
|
||||
Req.new(base_url: file_system.bucket_url)
|
||||
|> Req.merge(opts)
|
||||
|> Livebook.Utils.req_attach_defaults()
|
||||
|> with_aws_sigv4(file_system)
|
||||
|
||||
req = Req.new() |> Livebook.Utils.req_attach_defaults()
|
||||
|
||||
result =
|
||||
Req.request(req,
|
||||
method: method,
|
||||
url: url,
|
||||
headers: headers,
|
||||
body: body,
|
||||
receive_timeout: timeout
|
||||
)
|
||||
result = Req.request(req, url: path, receive_timeout: timeout)
|
||||
|
||||
if decode?, do: decode(result), else: result
|
||||
end
|
||||
|
|
1
mix.exs
1
mix.exs
|
@ -117,7 +117,6 @@ defmodule Livebook.MixProject do
|
|||
{:ecto, "~> 3.10"},
|
||||
{:phoenix_ecto, "~> 4.4"},
|
||||
{:aws_credentials, "~> 0.3.0", runtime: false},
|
||||
{:aws_signature, "~> 0.3.0"},
|
||||
{:mint_web_socket, "~> 1.0.0"},
|
||||
{:protobuf, "~> 0.13.0"},
|
||||
{:dns_cluster, "~> 0.1.2"},
|
||||
|
|
1
mix.lock
1
mix.lock
|
@ -1,6 +1,5 @@
|
|||
%{
|
||||
"aws_credentials": {:hex, :aws_credentials, "0.3.2", "ba2ccee4ec6dcb5426cf71830b7afd73795b1f19655f401d4401015b468fec6f", [:rebar3], [{:eini, "~> 2.2.4", [hex: :eini_beam, repo: "hexpm", optional: false]}, {:iso8601, "~> 1.3.4", [hex: :iso8601, repo: "hexpm", optional: false]}, {:jsx, "~> 3.1.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "2e748626a935a7a544647fb79d7054f38db8bf378978542c962ccbeab387387b"},
|
||||
"aws_signature": {:hex, :aws_signature, "0.3.2", "adf33bc4af00b2089b7708bf20e3246f09c639a905a619b3689f0a0a22c3ef8f", [:rebar3], [], "hexpm", "b0daf61feb4250a8ab0adea60db3e336af732ff71dd3fb22e45ae3dcbd071e44"},
|
||||
"bandit": {:hex, :bandit, "1.6.0", "9cb6c67c27cecab2d0c93968cb957fa8decccb7275193c8bf33f97397b3ac25d", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "fd2491e564a7c5e11ff8496ebf530c342c742452c59de17ac0fb1f814a0ab01a"},
|
||||
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
|
||||
"castore": {:hex, :castore, "1.0.9", "5cc77474afadf02c7c017823f460a17daa7908e991b0cc917febc90e466a375c", [:mix], [], "hexpm", "5ea956504f1ba6f2b4eb707061d8e17870de2bee95fb59d512872c2ef06925e7"},
|
||||
|
|
Loading…
Add table
Reference in a new issue