Files
SideStore/Pods/Nuke/Sources/ImagePipeline.swift

912 lines
36 KiB
Swift
Raw Normal View History

// The MIT License (MIT)
//
// Copyright (c) 2015-2019 Alexander Grebenyuk (github.com/kean).
import Foundation
// MARK: - ImageTask
/// A task performed by the `ImagePipeline`. The pipeline maintains a strong
/// reference to the task until the request finishes or fails; you do not need
/// to maintain a reference to the task unless it is useful to do so for your
/// apps internal bookkeeping purposes.
public /* final */ class ImageTask: Hashable {
/// An identifier uniquely identifies the task within a given pipeline. Only
/// unique within this pipeline.
public let taskId: Int
fileprivate weak var delegate: ImageTaskDelegate?
2019-09-14 11:00:58 -07:00
/// The original request with which the task was created.
public let request: ImageRequest
fileprivate var priority: ImageRequest.Priority
/// The number of bytes that the task has received.
public fileprivate(set) var completedUnitCount: Int64 = 0
/// A best-guess upper bound on the number of bytes the client expects to send.
public fileprivate(set) var totalUnitCount: Int64 = 0
/// Returns a progress object for the task. The object is created lazily.
public var progress: Progress {
if _progress == nil { _progress = Progress() }
return _progress!
}
fileprivate private(set) var _progress: Progress?
/// A completion handler to be called when task finishes or fails.
public typealias Completion = (_ response: ImageResponse?, _ error: ImagePipeline.Error?) -> Void
/// A progress handler to be called periodically during the lifetime of a task.
public typealias ProgressHandler = (_ response: ImageResponse?, _ completed: Int64, _ total: Int64) -> Void
// internal stuff associated with a task
fileprivate var metrics: ImageTaskMetrics
fileprivate weak var session: ImageLoadingSession?
internal init(taskId: Int, request: ImageRequest) {
self.taskId = taskId
self.request = request
self.metrics = ImageTaskMetrics(taskId: taskId, startDate: Date())
2019-09-14 11:00:58 -07:00
self.priority = request.priority
}
// MARK: - Priority
/// Update s priority of the task even if the task is already running.
public func setPriority(_ priority: ImageRequest.Priority) {
2019-09-14 11:00:58 -07:00
delegate?.imageTask(self, didUpdatePriority: priority)
}
// MARK: - Cancellation
fileprivate var isCancelled: Bool {
return _isCancelled.value
}
private var _isCancelled = Atomic(false)
/// Marks task as being cancelled.
///
/// The pipeline will immediately cancel any work associated with a task
/// unless there is an equivalent outstanding task running (see
/// `ImagePipeline.Configuration.isDeduplicationEnabled` for more info).
public func cancel() {
// Make sure that we ignore if `cancel` being called more than once.
if _isCancelled.swap(to: true, ifEqual: false) {
delegate?.imageTaskWasCancelled(self)
}
}
// MARK: - Hashable
public func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self).hashValue)
}
public static func == (lhs: ImageTask, rhs: ImageTask) -> Bool {
return ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
}
}
protocol ImageTaskDelegate: class {
func imageTaskWasCancelled(_ task: ImageTask)
2019-09-14 11:00:58 -07:00
func imageTask(_ task: ImageTask, didUpdatePriority: ImageRequest.Priority)
}
// MARK: - ImageResponse
/// Represents an image response.
public final class ImageResponse {
public let image: Image
public let urlResponse: URLResponse?
// the response is only nil when new disk cache is enabled (it only stores
// data for now, but this might change in the future).
public init(image: Image, urlResponse: URLResponse?) {
self.image = image; self.urlResponse = urlResponse
}
}
// MARK: - ImagePipeline
/// `ImagePipeline` will load and decode image data, process loaded images and
/// store them in caches.
///
/// See [Nuke's README](https://github.com/kean/Nuke) for a detailed overview of
/// the image pipeline and all of the related classes.
///
/// `ImagePipeline` is created with a configuration (`Configuration`).
///
/// `ImagePipeline` is thread-safe.
public /* final */ class ImagePipeline: ImageTaskDelegate {
public let configuration: Configuration
// This is a queue on which we access the sessions.
private let queue = DispatchQueue(label: "com.github.kean.Nuke.ImagePipeline")
// Image loading sessions. One or more tasks can be handled by the same session.
private var sessions = [AnyHashable: ImageLoadingSession]()
private var nextTaskId = Atomic<Int>(0)
// Unlike `nextTaskId` doesn't need to be atomic because it's accessed only on a queue
private var nextSessionId: Int = 0
private let rateLimiter: RateLimiter
/// Shared image pipeline.
public static var shared = ImagePipeline()
/// The closure that gets called each time the task is completed (or cancelled).
/// Guaranteed to be called on the main thread.
public var didFinishCollectingMetrics: ((ImageTask, ImageTaskMetrics) -> Void)?
public struct Configuration {
/// Image cache used by the pipeline.
public var imageCache: ImageCaching?
/// Data loader used by the pipeline.
public var dataLoader: DataLoading
/// Data loading queue. Default maximum concurrent task count is 6.
public var dataLoadingQueue = OperationQueue()
/// Data cache used by the pipeline.
public var dataCache: DataCaching?
/// Data caching queue. Default maximum concurrent task count is 2.
public var dataCachingQueue = OperationQueue()
/// Default implementation uses shared `ImageDecoderRegistry` to create
/// a decoder that matches the context.
internal var imageDecoder: (ImageDecodingContext) -> ImageDecoding = {
return ImageDecoderRegistry.shared.decoder(for: $0)
}
/// Image decoding queue. Default maximum concurrent task count is 1.
public var imageDecodingQueue = OperationQueue()
/// This is here just for backward compatibility with `Loader`.
internal var imageProcessor: (Image, ImageRequest) -> AnyImageProcessor? = { $1.processor }
/// Image processing queue. Default maximum concurrent task count is 2.
public var imageProcessingQueue = OperationQueue()
/// `true` by default. If `true` the pipeline will combine the requests
/// with the same `loadKey` into a single request. The request only gets
/// cancelled when all the registered requests are.
public var isDeduplicationEnabled = true
/// `true` by default. It `true` the pipeline will rate limits the requests
/// to prevent trashing of the underlying systems (e.g. `URLSession`).
/// The rate limiter only comes into play when the requests are started
/// and cancelled at a high rate (e.g. scrolling through a collection view).
public var isRateLimiterEnabled = true
/// `false` by default. If `true` the pipeline will try to produce a new
/// image each time it receives a new portion of data from data loader.
/// The decoder used by the image loading session determines whether
/// to produce a partial image or not.
public var isProgressiveDecodingEnabled = false
/// If the data task is terminated (either because of a failure or a
/// cancellation) and the image was partially loaded, the next load will
/// resume where it was left off. Supports both validators (`ETag`,
/// `Last-Modified`). The resumable downloads are enabled by default.
public var isResumableDataEnabled = true
/// If `true` pipeline will detects GIFs and set `animatedImageData`
/// (`UIImage` property). It will also disable processing of such images,
/// and alter the way cache cost is calculated. However, this will not
/// enable actual animated image rendering. To do that take a look at
/// satellite projects (FLAnimatedImage and Gifu plugins for Nuke).
/// `false` by default (to preserve resources).
public static var isAnimatedImageDataEnabled = false
/// Creates default configuration.
/// - parameter dataLoader: `DataLoader()` by default.
/// - parameter imageCache: `Cache.shared` by default.
public init(dataLoader: DataLoading = DataLoader(), imageCache: ImageCaching? = ImageCache.shared) {
self.dataLoader = dataLoader
self.imageCache = imageCache
self.dataLoadingQueue.maxConcurrentOperationCount = 6
self.dataCachingQueue.maxConcurrentOperationCount = 2
self.imageDecodingQueue.maxConcurrentOperationCount = 1
self.imageProcessingQueue.maxConcurrentOperationCount = 2
}
}
/// Initializes `ImagePipeline` instance with the given configuration.
/// - parameter configuration: `Configuration()` by default.
public init(configuration: Configuration = Configuration()) {
self.configuration = configuration
self.rateLimiter = RateLimiter(queue: queue)
}
public convenience init(_ configure: (inout ImagePipeline.Configuration) -> Void) {
var configuration = ImagePipeline.Configuration()
configure(&configuration)
self.init(configuration: configuration)
}
// MARK: Loading Images
/// Loads an image with the given url.
@discardableResult
public func loadImage(with url: URL, progress: ImageTask.ProgressHandler? = nil, completion: ImageTask.Completion? = nil) -> ImageTask {
return loadImage(with: ImageRequest(url: url), progress: progress, completion: completion)
}
/// Loads an image for the given request using image loading pipeline.
@discardableResult
public func loadImage(with request: ImageRequest, progress: ImageTask.ProgressHandler? = nil, completion: ImageTask.Completion? = nil) -> ImageTask {
let task = ImageTask(taskId: getNextTaskId(), request: request)
task.delegate = self
queue.async {
// Fast memory cache lookup. We do this asynchronously because we
// expect users to check memory cache synchronously if needed.
if task.request.memoryCacheOptions.isReadAllowed,
let response = self.configuration.imageCache?.cachedResponse(for: task.request) {
task.metrics.isMemoryCacheHit = true
self._didCompleteTask(task, response: response, error: nil, completion: completion)
return
}
// Memory cache lookup failed -> start loading.
self._startLoadingImage(
for: task,
handlers: ImageLoadingSession.Handlers(progress: progress, completion: completion)
)
}
return task
}
private func getNextTaskId() -> Int {
return nextTaskId.increment()
}
private func getNextSessionId() -> Int {
nextSessionId += 1
return nextSessionId
}
private func _startLoadingImage(for task: ImageTask, handlers: ImageLoadingSession.Handlers) {
// Create a new image loading session or register with an existing one.
let session = _createSession(with: task.request)
task.session = session
task.metrics.session = session.metrics
task.metrics.wasSubscibedToExistingSession = !session.tasks.isEmpty
// Register handler with a session.
session.tasks[task] = handlers
session.updatePriority()
// Already loaded and decoded the final image and started processing
// for previously registered tasks (if any).
if let image = session.decodedFinalImage {
_session(session, processImage: image, for: task)
}
}
// MARK: ImageTaskDelegate
func imageTaskWasCancelled(_ task: ImageTask) {
queue.async {
self._didCancelTask(task)
}
}
2019-09-14 11:00:58 -07:00
func imageTask(_ task: ImageTask, didUpdatePriority priority: ImageRequest.Priority) {
queue.async {
guard let session = task.session else { return }
2019-09-14 11:00:58 -07:00
task.priority = priority
session.updatePriority()
session.processingSessions[task]?.updatePriority()
}
}
// MARK: ImageLoadingSession (Managing)
private func _createSession(with request: ImageRequest) -> ImageLoadingSession {
// Check if session for the given key already exists.
//
// This part is more clever than I would like. The reason why we need a
// key even when deduplication is disabled is to have a way to retain
// a session by storing it in `sessions` dictionary.
let key: AnyHashable = configuration.isDeduplicationEnabled ? ImageRequest.LoadKey(request: request) : UUID()
if let session = sessions[key] {
return session
}
let session = ImageLoadingSession(sessionId: getNextSessionId(), request: request, key: key)
sessions[key] = session
_loadImage(for: session) // Start the pipeline
return session
}
private func _cancelSession(for task: ImageTask) {
guard let session = task.session else { return }
session.tasks[task] = nil
// When all registered tasks are cancelled, the session is deallocated
// and the underlying operation is cancelled automatically.
let processingSession = session.processingSessions.removeValue(forKey: task)
processingSession?.tasks.remove(task)
// Cancel the session when there are no remaining tasks.
if session.tasks.isEmpty {
_tryToSaveResumableData(for: session)
session.cts.cancel()
session.metrics.wasCancelled = true
_didFinishSession(session)
} else {
// We're not cancelling the task session yet because there are
// still tasks registered to it, but we need to update the priority.
session.updatePriority()
processingSession?.updatePriority()
}
}
// MARK: Pipeline (Loading Data)
private func _loadImage(for session: ImageLoadingSession) {
// Use rate limiter to prevent trashing of the underlying systems
if configuration.isRateLimiterEnabled {
// Rate limiter is synchronized on pipeline's queue. Delayed work is
// executed asynchronously also on this same queue.
rateLimiter.execute(token: session.cts.token) { [weak self, weak session] in
guard let session = session else { return }
self?._checkDiskCache(for: session)
}
} else { // Start loading immediately.
_checkDiskCache(for: session)
}
}
private func _checkDiskCache(for session: ImageLoadingSession) {
guard let cache = configuration.dataCache, let key = session.request.urlString else {
_loadData(for: session) // Skip disk cache lookup, load data
return
}
session.metrics.checkDiskCacheStartDate = Date()
let operation = BlockOperation { [weak self, weak session] in
guard let session = session else { return }
let data = cache.cachedData(for: key)
session.metrics.checkDiskCacheEndDate = Date()
self?.queue.async {
if let data = data {
self?._decodeFinalImage(for: session, data: data)
} else {
self?._loadData(for: session)
}
}
}
configuration.dataCachingQueue.enqueue(operation, for: session)
}
private func _loadData(for session: ImageLoadingSession) {
guard !session.token.isCancelling else { return } // Preflight check
// Wrap data request in an operation to limit maximum number of
// concurrent data tasks.
let operation = Operation(starter: { [weak self, weak session] finish in
guard let session = session else { finish(); return }
self?.queue.async {
self?._actuallyLoadData(for: session, finish: finish)
}
})
configuration.dataLoadingQueue.enqueue(operation, for: session)
}
// This methods gets called inside data loading operation (Operation).
private func _actuallyLoadData(for session: ImageLoadingSession, finish: @escaping () -> Void) {
session.metrics.loadDataStartDate = Date()
var urlRequest = session.request.urlRequest
// Read and remove resumable data from cache (we're going to insert it
// back in the cache if the request fails to complete again).
if configuration.isResumableDataEnabled,
let resumableData = ResumableData.removeResumableData(for: urlRequest) {
// Update headers to add "Range" and "If-Range" headers
resumableData.resume(request: &urlRequest)
// Save resumable data so that we could use it later (we need to
// verify that server returns "206 Partial Content" before using it.
session.resumableData = resumableData
// Collect metrics
session.metrics.wasResumed = true
session.metrics.resumedDataCount = resumableData.data.count
}
let task = configuration.dataLoader.loadData(
with: urlRequest,
didReceiveData: { [weak self, weak session] (data, response) in
self?.queue.async {
guard let session = session else { return }
self?._session(session, didReceiveData: data, response: response)
}
},
completion: { [weak self, weak session] (error) in
finish() // Important! Mark Operation as finished.
self?.queue.async {
guard let session = session else { return }
self?._session(session, didFinishLoadingDataWithError: error)
}
})
session.token.register {
task.cancel()
finish() // Make sure we always finish the operation.
}
}
private func _session(_ session: ImageLoadingSession, didReceiveData chunk: Data, response: URLResponse) {
// Check if this is the first response.
if session.urlResponse == nil {
// See if the server confirmed that we can use the resumable data.
if let resumableData = session.resumableData {
if ResumableData.isResumedResponse(response) {
session.data = resumableData.data
session.resumedDataCount = Int64(resumableData.data.count)
session.metrics.serverConfirmedResume = true
}
session.resumableData = nil // Get rid of resumable data
}
}
// Append data and save response
session.data.append(chunk)
session.urlResponse = response
// Collect metrics
session.metrics.downloadedDataCount = ((session.metrics.downloadedDataCount ?? 0) + chunk.count)
// Update tasks' progress and call progress closures if any
let (completed, total) = (Int64(session.data.count), response.expectedContentLength + session.resumedDataCount)
let tasks = session.tasks
DispatchQueue.main.async {
for (task, handlers) in tasks where !task.isCancelled {
(task.completedUnitCount, task.totalUnitCount) = (completed, total)
handlers.progress?(nil, completed, total)
task._progress?.completedUnitCount = completed
task._progress?.totalUnitCount = total
}
}
// Check if progressive decoding is enabled (disabled by default)
if configuration.isProgressiveDecodingEnabled {
// Check if we haven't loaded an entire image yet. We give decoder
// an opportunity to decide whether to decode this chunk or not.
// In case `expectedContentLength` is undetermined (e.g. 0) we
// don't allow progressive decoding.
guard session.data.count < response.expectedContentLength else { return }
_setNeedsDecodePartialImage(for: session)
}
}
private func _session(_ session: ImageLoadingSession, didFinishLoadingDataWithError error: Swift.Error?) {
session.metrics.loadDataEndDate = Date()
if let error = error {
_tryToSaveResumableData(for: session)
_session(session, didFailWithError: .dataLoadingFailed(error))
return
}
let data = session.data
session.data.removeAll() // We no longer need the data stored in session.
_decodeFinalImage(for: session, data: data)
}
// MARK: Pipeline (Decoding)
private func _setNeedsDecodePartialImage(for session: ImageLoadingSession) {
guard session.decodingOperation == nil else {
return // Already enqueued an operation.
}
let operation = BlockOperation { [weak self, weak session] in
guard let session = session else { return }
self?._actuallyDecodePartialImage(for: session)
}
_enqueueDecodingOperation(operation, for: session)
}
private func _actuallyDecodePartialImage(for session: ImageLoadingSession) {
// As soon as we get a chance to execute, grab the latest available
// data, create a decoder (if necessary) and decode the data.
let (data, decoder): (Data, ImageDecoding?) = queue.sync {
let data = session.data
let decoder = _decoder(for: session, data: data)
return (data, decoder)
}
// Produce partial image
if let image = decoder?.decode(data: data, isFinal: false) {
let scanNumber: Int? = (decoder as? ImageDecoder)?.numberOfScans
queue.async {
let container = ImageContainer(image: image, isFinal: false, scanNumber: scanNumber)
for task in session.tasks.keys {
self._session(session, processImage: container, for: task)
}
}
}
}
private func _decodeFinalImage(for session: ImageLoadingSession, data: Data) {
// Basic sanity checks, should never happen in practice.
guard !data.isEmpty, let decoder = _decoder(for: session, data: data) else {
_session(session, didFailWithError: .decodingFailed)
return
}
let metrics = session.metrics
let operation = BlockOperation { [weak self, weak session] in
guard let session = session else { return }
metrics.decodeStartDate = Date()
let image = autoreleasepool {
decoder.decode(data: data, isFinal: true) // Produce final image
}
metrics.decodeEndDate = Date()
self?.queue.async {
let container = image.map {
ImageContainer(image: $0, isFinal: true, scanNumber: nil)
}
self?._session(session, didDecodeFinalImage: container, data: data)
}
}
_enqueueDecodingOperation(operation, for: session)
}
private func _enqueueDecodingOperation(_ operation: Foundation.Operation, for session: ImageLoadingSession) {
configuration.imageDecodingQueue.enqueue(operation, for: session)
session.decodingOperation?.cancel()
session.decodingOperation = operation
}
// Lazily creates a decoder if necessary.
private func _decoder(for session: ImageLoadingSession, data: Data) -> ImageDecoding? {
guard !session.isDecodingDisabled else {
return nil
}
// Return the existing processor in case it has already been created.
if let decoder = session.decoder {
return decoder
}
// Basic sanity checks.
guard !data.isEmpty else {
return nil
}
let context = ImageDecodingContext(request: session.request, urlResponse: session.urlResponse, data: data)
let decoder = configuration.imageDecoder(context)
session.decoder = decoder
return decoder
}
private func _tryToSaveResumableData(for session: ImageLoadingSession) {
// Try to save resumable data in case the task was cancelled
// (`URLError.cancelled`) or failed to complete with other error.
if configuration.isResumableDataEnabled,
let response = session.urlResponse, !session.data.isEmpty,
let resumableData = ResumableData(response: response, data: session.data) {
ResumableData.storeResumableData(resumableData, for: session.request.urlRequest)
}
}
private func _session(_ session: ImageLoadingSession, didDecodeFinalImage image: ImageContainer?, data: Data) {
session.decoder = nil // Decoding session completed, no longer need decoder.
session.decodedFinalImage = image
guard let image = image else {
_session(session, didFailWithError: .decodingFailed)
return
}
// Store data in data cache (in case it's enabled))
if !data.isEmpty, let dataCache = configuration.dataCache, let key = session.request.urlString {
dataCache.storeData(data, for: key)
}
for task in session.tasks.keys {
_session(session, processImage: image, for: task)
}
}
// MARK: Pipeline (Processing)
/// Processes the input image for each of the given tasks. The image is processed
/// only once for the equivalent processors.
/// - parameter completion: Will get called synchronously if processing is not
/// required. If it is will get called on `self.queue` when processing is finished.
private func _session(_ session: ImageLoadingSession, processImage image: ImageContainer, for task: ImageTask) {
let isFinal = image.isFinal
guard let processor = _processor(for: image.image, request: task.request) else {
_session(session, didProcessImage: image.image, isFinal: isFinal, metrics: TaskMetrics(), for: task)
return // No processing needed.
}
if !image.isFinal && session.processingSessions[task] != nil {
return // Back pressure - we'are already busy processing another partial image
}
// Find existing session or create a new one.
let processingSession = _processingSession(for: image, processor: processor, session: session, task: task)
// Register task with a processing session.
processingSession.tasks.insert(task)
session.processingSessions[task] = processingSession
processingSession.updatePriority()
}
private func _processingSession(for image: ImageContainer, processor: AnyImageProcessor, session: ImageLoadingSession, task: ImageTask) -> ImageProcessingSession {
func findExistingSession() -> ImageProcessingSession? {
return session.processingSessions.values.first {
$0.processor == processor && $0.image.image === image.image
}
}
if let processingSession = findExistingSession() {
return processingSession
}
let processingSession = ImageProcessingSession(processor: processor, image: image)
let isFinal = image.isFinal
let operation = BlockOperation { [weak self, weak session, weak processingSession] in
var metrics = TaskMetrics.started()
let output: Image? = autoreleasepool {
processor.process(image: image, request: task.request)
}
metrics.end()
self?.queue.async {
guard let session = session else { return }
for task in (processingSession?.tasks ?? []) {
if session.processingSessions[task] === processingSession {
session.processingSessions[task] = nil
}
self?._session(session, didProcessImage: output, isFinal: isFinal, metrics: metrics, for: task)
}
}
}
operation.queuePriority = task.request.priority.queuePriority
session.priority.observe { [weak operation] in
operation?.queuePriority = $0.queuePriority
}
configuration.imageProcessingQueue.addOperation(operation)
processingSession.operation = operation
return processingSession
}
private func _processor(for image: Image, request: ImageRequest) -> AnyImageProcessor? {
if Configuration.isAnimatedImageDataEnabled && image.animatedImageData != nil {
return nil // Don't process animated images.
}
return configuration.imageProcessor(image, request)
}
private func _session(_ session: ImageLoadingSession, didProcessImage image: Image?, isFinal: Bool, metrics: TaskMetrics, for task: ImageTask) {
if isFinal {
task.metrics.processStartDate = metrics.startDate
task.metrics.processEndDate = metrics.endDate
let error: Error? = (image == nil ? .processingFailed : nil)
_session(session, didCompleteTask: task, image: image, error: error)
} else {
guard let image = image else { return }
_session(session, didProducePartialImage: image, for: task)
}
}
// MARK: ImageLoadingSession (Callbacks)
private func _session(_ session: ImageLoadingSession, didProducePartialImage image: Image, for task: ImageTask) {
// Check if we haven't completed the session yet by producing a final image
// or cancelling the task.
guard sessions[session.key] === session else { return }
let response = ImageResponse(image: image, urlResponse: session.urlResponse)
if let handler = session.tasks[task], let progress = handler.progress {
DispatchQueue.main.async {
guard !task.isCancelled else { return }
progress(response, task.completedUnitCount, task.totalUnitCount)
}
}
}
private func _session(_ session: ImageLoadingSession, didCompleteTask task: ImageTask, image: Image?, error: Error?) {
let response = image.map {
ImageResponse(image: $0, urlResponse: session.urlResponse)
}
// Store response in memory cache if allowed.
if let response = response, task.request.memoryCacheOptions.isWriteAllowed {
configuration.imageCache?.storeResponse(response, for: task.request)
}
if let handlers = session.tasks.removeValue(forKey: task) {
_didCompleteTask(task, response: response, error: error, completion: handlers.completion)
}
if session.tasks.isEmpty {
_didFinishSession(session)
}
}
private func _session(_ session: ImageLoadingSession, didFailWithError error: Error) {
for task in session.tasks.keys {
_session(session, didCompleteTask: task, image: nil, error: error)
}
}
private func _didFinishSession(_ session: ImageLoadingSession) {
// Check if session is still registered.
guard sessions[session.key] === session else { return }
session.metrics.endDate = Date()
sessions[session.key] = nil
}
// Cancel the session in case all handlers were removed.
private func _didCancelTask(_ task: ImageTask) {
task.metrics.wasCancelled = true
task.metrics.endDate = Date()
_cancelSession(for: task)
guard let didCollectMetrics = didFinishCollectingMetrics else { return }
DispatchQueue.main.async {
didCollectMetrics(task, task.metrics)
}
}
private func _didCompleteTask(_ task: ImageTask, response: ImageResponse?, error: Error?, completion: ImageTask.Completion?) {
task.metrics.endDate = Date()
DispatchQueue.main.async {
guard !task.isCancelled else { return }
completion?(response, error)
self.didFinishCollectingMetrics?(task, task.metrics)
}
}
// MARK: Errors
/// Represents all possible image pipeline errors.
public enum Error: Swift.Error, CustomDebugStringConvertible {
/// Data loader failed to load image data with a wrapped error.
case dataLoadingFailed(Swift.Error)
/// Decoder failed to produce a final image.
case decodingFailed
/// Processor failed to produce a final image.
case processingFailed
public var debugDescription: String {
switch self {
case let .dataLoadingFailed(error): return "Failed to load image data: \(error)"
case .decodingFailed: return "Failed to create an image from the image data"
case .processingFailed: return "Failed to process the image"
}
}
}
}
// MARK: - ImageLoadingSession
/// A image loading session. During a lifetime of a session handlers can
/// subscribe to and unsubscribe from it.
private final class ImageLoadingSession {
let sessionId: Int
/// The original request with which the session was created.
let request: ImageRequest
let key: AnyHashable // loading key
let cts = CancellationTokenSource()
var token: CancellationToken { return cts.token }
// Registered image tasks.
var tasks = [ImageTask: Handlers]()
struct Handlers {
let progress: ImageTask.ProgressHandler?
let completion: ImageTask.Completion?
}
// Data loading session.
var urlResponse: URLResponse?
var resumableData: ResumableData?
var resumedDataCount: Int64 = 0
lazy var data = Data()
// Decoding session.
var decoder: ImageDecoding?
var decodedFinalImage: ImageContainer? // Decoding result
weak var decodingOperation: Foundation.Operation?
// Processing sessions.
var processingSessions = [ImageTask: ImageProcessingSession]()
// Metrics that we collect during the lifetime of a session.
let metrics: ImageTaskMetrics.SessionMetrics
let priority: Property<ImageRequest.Priority>
deinit {
decodingOperation?.cancel()
}
init(sessionId: Int, request: ImageRequest, key: AnyHashable) {
self.sessionId = sessionId
self.request = request
self.key = key
self.metrics = ImageTaskMetrics.SessionMetrics(sessionId: sessionId)
self.priority = Property(value: request.priority)
}
func updatePriority() {
priority.update(with: tasks.keys)
}
var isDecodingDisabled: Bool {
return !tasks.keys.contains {
!$0.request.isDecodingDisabled
}
}
}
private final class ImageProcessingSession {
let processor: AnyImageProcessor
let image: ImageContainer
var tasks = Set<ImageTask>()
weak var operation: Foundation.Operation?
let priority = Property<ImageRequest.Priority>(value: .normal)
deinit {
operation?.cancel()
}
init(processor: AnyImageProcessor, image: ImageContainer) {
self.processor = processor; self.image = image
}
// Update priority for processing operations (those are per image task,
// not per image session).
func updatePriority() {
priority.update(with: tasks)
}
}
struct ImageContainer {
let image: Image
let isFinal: Bool
let scanNumber: Int?
}
// MARK: - Extensions
private extension Property where T == ImageRequest.Priority {
func update<Tasks: Sequence>(with tasks: Tasks) where Tasks.Element == ImageTask {
2019-09-14 11:00:58 -07:00
if let newPriority = tasks.map({ $0.priority }).max(), self.value != newPriority {
self.value = newPriority
}
}
}
private extension Foundation.OperationQueue {
func enqueue(_ operation: Foundation.Operation, for session: ImageLoadingSession) {
operation.queuePriority = session.priority.value.queuePriority
session.priority.observe { [weak operation] in
operation?.queuePriority = $0.queuePriority
}
session.token.register { [weak operation] in
operation?.cancel()
}
addOperation(operation)
}
}