mirror of
https://github.com/TuringSoftware/CrystalFetch.git
synced 2025-12-20 20:38:26 +08:00
172 lines
7.2 KiB
Swift
172 lines
7.2 KiB
Swift
//
|
|
// Copyright © 2023 Turing Software, LLC. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
import Foundation
|
|
|
|
typealias DownloadItem = (task: URLSessionDownloadTask, destinationUrl: URL, retry: Int)
|
|
typealias ProgressCallback = (_ bytesWritten: Int64, _ bytesTotal: Int64) -> Void
|
|
|
|
actor Downloader {
|
|
private class Delegate: NSObject, URLSessionDelegate, URLSessionDownloadDelegate {
|
|
let downloader: Downloader
|
|
|
|
init(for downloader: Downloader) {
|
|
self.downloader = downloader
|
|
}
|
|
|
|
func urlSession(_ session: URLSession, downloadTask: URLSessionDownloadTask, didFinishDownloadingTo location: URL) {
|
|
let tempUrl = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)
|
|
do {
|
|
try FileManager.default.moveItem(at: location, to: tempUrl)
|
|
Task {
|
|
await downloader.urlSession(session, downloadTask: downloadTask, didFinishDownloadingTo: tempUrl)
|
|
}
|
|
} catch {
|
|
Task {
|
|
await downloader.urlSession(session, task: downloadTask, didCompleteWithError: error)
|
|
}
|
|
}
|
|
}
|
|
|
|
func urlSession(_ session: URLSession, downloadTask: URLSessionDownloadTask, didWriteData bytesWritten: Int64, totalBytesWritten: Int64, totalBytesExpectedToWrite: Int64) {
|
|
Task {
|
|
await downloader.urlSession(session, downloadTask: downloadTask, didWriteData: bytesWritten, totalBytesWritten: totalBytesWritten, totalBytesExpectedToWrite: totalBytesExpectedToWrite)
|
|
}
|
|
}
|
|
|
|
func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
|
|
Task {
|
|
await downloader.urlSession(session, task: task, didCompleteWithError: error)
|
|
}
|
|
}
|
|
}
|
|
|
|
private let kMaxConcurrentDownloads = 5
|
|
private let kMaxRetries = 5
|
|
|
|
private var downloads: [URLSessionDownloadTask: CheckedContinuation<URL, Error>] = [:]
|
|
private var queue: [DownloadItem] = []
|
|
private var totalExpectedSize: Int64 = 0
|
|
private var totalDownloadedSize: Int64 = 0
|
|
private var progressCallback: ProgressCallback?
|
|
|
|
private lazy var coordinator: Delegate = {
|
|
Delegate(for: self)
|
|
}()
|
|
|
|
private lazy var session: URLSession = {
|
|
URLSession(configuration: .default, delegate: coordinator, delegateQueue: nil)
|
|
}()
|
|
|
|
private func urlSession(_ session: URLSession, downloadTask: URLSessionDownloadTask, didFinishDownloadingTo location: URL) {
|
|
if let continuation = downloads.removeValue(forKey: downloadTask) {
|
|
continuation.resume(returning: location)
|
|
} else {
|
|
try? FileManager.default.removeItem(at: location)
|
|
}
|
|
}
|
|
|
|
private func urlSession(_ session: URLSession, downloadTask: URLSessionDownloadTask, didWriteData bytesWritten: Int64, totalBytesWritten: Int64, totalBytesExpectedToWrite: Int64) {
|
|
totalDownloadedSize += bytesWritten
|
|
let expectedSize = totalExpectedSize > 0 ? totalExpectedSize : totalBytesExpectedToWrite
|
|
progressCallback?(totalDownloadedSize, expectedSize)
|
|
}
|
|
|
|
private func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
|
|
if let continuation = downloads.removeValue(forKey: task as! URLSessionDownloadTask) {
|
|
continuation.resume(throwing: error!)
|
|
}
|
|
}
|
|
|
|
private func register(_ task: URLSessionDownloadTask, continuation: CheckedContinuation<URL, Error>) {
|
|
downloads[task] = continuation
|
|
}
|
|
|
|
/// Add an item to the download queue
|
|
/// - Parameters:
|
|
/// - downloadUrl: What to download
|
|
/// - destinationUrl: Where to put it
|
|
/// - size: Estimated size for progress updates
|
|
func enqueue(downloadUrl: URL, to destinationUrl: URL, size: Int64 = 0) {
|
|
let task = session.downloadTask(with: downloadUrl)
|
|
queue.append((task: task, destinationUrl: destinationUrl, retry: kMaxRetries))
|
|
totalExpectedSize += size
|
|
}
|
|
|
|
/// Start downloading a single item from the queue and retry if the download is interrupted
|
|
private func dequeue() async throws {
|
|
guard !queue.isEmpty else {
|
|
return
|
|
}
|
|
let (task, destinationUrl, retry) = queue.removeFirst()
|
|
let debugIdentifier = task.originalRequest?.url?.absoluteString ?? "(unknown request)"
|
|
NSLog("Downloading %@ to %@ (retries left: %d)", debugIdentifier, destinationUrl.path, retry)
|
|
do {
|
|
let resultUrl = try await withTaskCancellationHandler {
|
|
try await withCheckedThrowingContinuation { continuation in
|
|
register(task, continuation: continuation)
|
|
task.resume()
|
|
}
|
|
} onCancel: {
|
|
task.cancel()
|
|
}
|
|
try FileManager.default.moveItem(at: resultUrl, to: destinationUrl)
|
|
} catch {
|
|
let error = error as NSError
|
|
NSLog("Downloading %@ failed: ", debugIdentifier, error.localizedDescription)
|
|
if retry > 0 {
|
|
let newTask: URLSessionDownloadTask
|
|
if let resumeData = error.userInfo[NSURLSessionDownloadTaskResumeData] as? Data {
|
|
newTask = session.downloadTask(withResumeData: resumeData)
|
|
} else {
|
|
newTask = session.downloadTask(with: task.originalRequest!)
|
|
totalDownloadedSize -= task.countOfBytesReceived
|
|
}
|
|
queue.insert((task: newTask, destinationUrl: destinationUrl, retry: retry - 1), at: 0)
|
|
return
|
|
}
|
|
if error.code == NSURLErrorCancelled {
|
|
throw CancellationError()
|
|
} else {
|
|
throw error
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Starts downloading all enqueued items
|
|
/// - Parameter onProgressUpdated: Optional callback for download progress
|
|
func start(_ onProgressUpdated: ProgressCallback? = nil) async throws {
|
|
progressCallback = onProgressUpdated
|
|
defer {
|
|
progressCallback = nil
|
|
}
|
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
|
for _ in 0..<kMaxConcurrentDownloads {
|
|
group.addTask {
|
|
try await self.dequeue()
|
|
}
|
|
}
|
|
for try await _ in group {
|
|
if !queue.isEmpty {
|
|
try Task.checkCancellation()
|
|
group.addTask {
|
|
try await self.dequeue()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|