개발

Combine Publisher - Scheduler

한번사는인생~키야 2024. 7. 8. 12:30
728x90

Combine에서 스케줄러는 데이터 스트림 내에서 작업의 타이밍과 스레딩을 관리하는 데 중요한 역할을 합니다. 스케줄러는 게시자가 값을 내보내고 구독자가 값을 수신하는 시점과 스레드를 지시합니다.

Combine의 스케줄러 세부 내용은 다음과 같습니다.

  • 스레드 실행: 스케줄러는 게시자와 구독자 작업이 실행되는 스레드를 제어합니다. 이를 통해 비동기 처리와 시스템 리소스의 효율적인 사용이 가능합니다.
  • 타이밍 제어: 스케줄러는 특정 시점에 지연을 도입하거나 작업을 스케줄할 수 있습니다. 이는 데이터 스트림 내에서 가치 배출과 가입자 반응의 타이밍을 관리하는 데 도움이 됩니다.
  • 스케줄러 유형: Combine은 다양한 내장 스케줄러를 제공합니다.

스케줄러 사용:

  • Publisher Subscription: Operator 를 사용하여 Publisher를 구독할 때 스케줄러를 지정할 수 있습니다 subscribe(on:). 이는 Publisher의 로직이 실행되어 값을 내보내는 스레드를 결정합니다.
  • 구독자 수신: 연산자를 사용하여 게시자로부터 값을 수신할 때 스케줄러를 지정할 수 있습니다 receive(on:). 이는 구독자가 수신된 값에 대해 알림을 받을 스레드를 결정합니다.

예: 메인 스레드 업데이트를 사용한 백그라운드 페치:

import Combine



func fetchDataFromNetwork() -> AnyPublisher<String, Error> {

  return Future { promise in

    DispatchQueue.global().asyncAfter(deadline: .now() + 2) {

      promise(.success("Fetched data from network"))

    }

  }

  .map { $0.uppercased() } // Simulate data processing

  .eraseToAnyPublisher()

}



fetchDataFromNetwork()

  .subscribe(on: .global()) // Fetch data on a background thread

  .receive(on: .main) // Update UI on the main thread

  .sink(receiveCompletion: { completion in

    if case .failure(let error) = completion {

      print("Error fetching data: \(error)")

    }

  }, receiveValue: { value in

    print("Received data: \(value)") // Update UI with received data

  })

 

설명:

  1. 네트워크 데이터 가져오기를 시뮬레이션하는 게시자를 반환하는 함수를 정의합니다 .fetchDataFromNetwork
  2. .을 사용 Future하여 백그라운드 스레드에서 네트워크 가져오기를 수행하는 게시자를 만들었습니다 .global().
  3. 우리는 데이터 처리를 위해 연산자를 체인으로 연결합니다 map.
  4. 우리는 게시자를 구독하고 스케줄러를 지정합니다:

키 포인트:

  • 스케줄러는 게시자 및 구독자 작업의 스레딩과 타이밍을 제어합니다.
  • 스케줄러를 사용하여 비동기 처리를 관리하고, UI 스레드 차단을 방지하고, 특정 시점에 대해 작업을 예약합니다.
  • Combine은 다양한 사용 사례에 맞춰 다양한 스케줄러를 제공합니다.

스케줄러를 이해하면 데이터 흐름을 효과적으로 관리하고 Combine 애플리케이션의 성능과 반응성을 개선할 있습니다.

 

여기에는 Combine에서 스케줄러를 사용하여 다양한 기능을 구현하고 데이터 스트림 내에서 작업의 스레딩 및 타이밍을 관리하는 방법에 대한 몇 가지 예가 있습니다.

 

1. 백그라운드 스레드에서 사용자 입력 디바운싱:

import Combine



class SearchViewModel {

  private let subject = PassthroughSubject<String, Never>()

  var searchTerm: AnyPublisher<String, Never> {

    subject.eraseToAnyPublisher()

  }



  func search(withTerm term: String) {

    subject.send(term) // Emit search term immediately

  }



  func debouncedSearch() -> AnyPublisher<String, Never> {

    return searchTerm

      .debounce(for: 0.5, scheduler: .global()) // Debounce on background thread

      .receive(on: .main) // Receive updates on main thread

  }

}



let viewModel = SearchViewModel()



// Debounced search subscription

let debouncedSubscription = viewModel.debouncedSearch()

  .sink(receiveValue: { term in

    print("Debounced search term: \(term)")

    // Perform actual search with debounced term on main thread

  })



// Trigger some input

viewModel.search(withTerm: "apple")

viewModel.search(withTerm: "banana") // Debounce will only emit "banana"

 

설명:

  1. 검색어를 보관하는 SearchViewModel클래스를 정의합니다 .PassthroughSubject
  2. 게시자 searchTerm가 검색어를 공개합니다.
  3. 해당 search메서드는 입력된 검색어를 내보냅니다.
  4. 이 debouncedSearch방법은 .을 사용하여 debounce백그라운드 스레드( .global()) 에서 입력을 지연하고 필터링하는 데 사용됩니다 DispatchQueueScheduler.
  5. 마지막으로, UI 상호작용을 위해 메인 스레드에서 디바운스된 검색어 업데이트가 수신되는지 확인합니다. receive(on: .main)

 

2. 지연을 통한 네트워크 요청 스케줄링:

import Combine



class NetworkManager {

  private let url = URL(string: "https://jsonplaceholder.typicode.com/posts")!

  private let subject = PassthroughSubject<[Post], Error>()

  var postsPublisher: AnyPublisher<[Post], Error> {

    subject.eraseToAnyPublisher()

  }



  func fetchPosts() -> AnyPublisher<[Post], Error> {

    return URLSession.shared.dataTaskPublisher(for: url)

      .map { $0.data }

      .decode(type: [Post].self, decoder: JSONDecoder())

      .delay(for: 2, scheduler: .global()) // Delay on background thread

      .receive(on: .main) // Receive updates on main thread

      .sink(receiveCompletion: { completion in

        if case .failure(let error) = completion {

          self.subject.send(completion: .failure(error))

        }

      }, receiveValue: { posts in

        self.subject.send(posts) // Emit posts to all subscribers

      })

      .eraseToAnyPublisher()

  }



  func connect() {

    fetchPosts() // Start fetching and emitting posts

  }



  private var cancellables: Set<AnyCancellable> = []

}



let networkManager = NetworkManager()



// Subscriber 1

let subscriber1 = networkManager.postsPublisher

  .sink(receiveCompletion: { completion in

    if case .failure(let error) = completion {

      print("Subscriber 1: Error fetching posts: \(error)")

    }

  }, receiveValue: { posts in

    print("Subscriber 1: Received posts: \(posts)")

  })



// Subscriber 2 (after some delay)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {

  let subscriber2 = networkManager.postsPublisher

    .sink(receiveCompletion: { completion in

      if case .failure(let error) = completion {

        print("Subscriber 2: Error fetching posts: \(error)")

      }

    }, receiveValue: { posts in

      print("Subscriber 2: Received posts: \(posts)")

    })

}



// Trigger the connection (start fetching posts)

networkManager.connect()

 

설명:

  1. 네트워크 요청을 처리하기 위한 클래스를 생성합니다 NetworkManager.
  2. fetchPosts방법은 URL에서 게시물을 가져옵니다.
  3. 네트워크 지연 시간을 시뮬레이션하기 위해 .delay(for: 2, scheduler: .global())백그라운드 스레드( ) 사용하여 2 지연을 도입합니다 .DispatchQueueScheduler
  4. 마지막으로, 가져온 게시물이 UI 업데이트를 위해 메인 스레드에 수신되었는지 확인합니다. .receive(on: .main)