개발

Combine Publisher - ConnectablePublisher

한번사는인생~키야 2024. 7. 8. 10:36
728x90

Combine에서 퍼블리셔가 값을 방출하기 시작할 때 명시적으로 제어할 수 있는 프로토콜입니다. 구독 시 방출을 시작하는 일반 퍼블리셔와 달리 연결 및 게시 프로세스를 별도로 관리할 수 있습니다. 

다음은 세부 내용입니다 ConnectablePublisher.

  • 지연된 방출: 메서드 를 명시적으로 호출할 때까지 값의 방출을 지연합니다 . 이는 데이터 스트림의 시작을 제어합니다. ConnectablePublisherconnect()
  • 수동 연결: 가입자는 스스로 데이터 스트림 시작을 트리거할 수 없습니다. 호출 을 기다려야 합니다 connect().
  • 여러 구독: 단일 ConnectablePublisher인스턴스는 여러 구독을 지원할 수 있습니다. 연결되면 모든 구독자는 방출된 값을 수신합니다.
  • 사용 사례:

ConnectablePublisher 사용:

  1. 연결 가능한 퍼블리셔 만들기: Combine과 같은 프레임워크는 내장된 연결 가능한 퍼블리셔를 제공합니다(예 : , ). 또는 기존 퍼블리셔를 .로 래핑할 수 있습니다 . PassthroughSubject CurrentValueSubjectmakeConnectable()
  2. 연결 관리: 연결 가능한 게시자에서 메서드를 호출하여 connect()데이터 스트림을 시작하고 구독된 모든 수신자에게 값을 내보내기 시작합니다.

예: 사용자 인증 확인:

import Combine



class AuthenticationManager {

  private let subject = PassthroughSubject<Bool, Never>()

  var isUserLoggedIn: AnyPublisher<Bool, Never> {

    subject.eraseToAnyPublisher()

  }



  func checkAuthentication() {

    // Simulate authentication check (replace with actual logic)

    DispatchQueue.global().asyncAfter(deadline: .now() + 2) {

      self.subject.send(true) // Emit login status after delay

    }

  }



  func connect() {

    checkAuthentication() // Start authentication check and emit value

  }

}



let authManager = AuthenticationManager()



// Subscribe to the connectable publisher

let loginStatusSubscription = authManager.isUserLoggedIn

  .sink(receiveValue: { isLoggedIn in

    print("User is logged in: \(isLoggedIn)")

  })



// Trigger the connection (authentication check)

authManager.connect()

 

설명:

  1. 로그인 상태를 보관하기 위해 (연결 가능한 게시자 유형)을 AuthenticationManager갖는 클래스를 정의합니다 .PassthroughSubject
  2. 로그인 상태 업데이트에 액세스할 수 있는 게시자를 공개합니다 .isUserLoggedIn
  3. 이 checkAuthentication방법은 인증 검사를 시뮬레이션하고 지연 후 로그인 상태를 내보냅니다.
  4. 해당 connect메서드는 .을 호출하여 인증 확인을 트리거합니다 checkAuthentication.
  5. 로그인 상태 업데이트를 받기 위해 게시자 에 대한 구독을 생성합니다 .isUserLoggedIn
  6. 마지막으로, 인증 검사 및 데이터 스트림을 시작하도록 요청합니다 connect.AuthenticationManager

키 포인트:

  • ConnectablePublisher데이터 방출 시작에 대한 제어를 제공합니다.
  • 데이터 흐름을 조정하고 리소스를 관리하는 데 유용합니다.
  • 데이터 스트림의 시작을 지연시키거나 여러 구독을 관리해야 할 때 사용을 고려해보세요 .

이해하면 ConnectablePublisherCombine 애플리케이션에서 보다 유연하고 제어된 데이터 스트림을 생성할 있습니다.

 

 Combine을 사용하여 다양한 기능을 구현하고 애플리케이션에서 데이터 흐름을 관리하는 방법에 대한 몇 가지 예는 다음과 같습니다 

1. 사용자 입력 디바운싱:

import Combine



class SearchViewModel {

  private let subject = PassthroughSubject<String, Never>()

  var searchTerm: AnyPublisher<String, Never> {

    subject.eraseToAnyPublisher()

  }



  func search(withTerm term: String) {

    subject.send(term) // Emit search term immediately

  }



  func debouncedSearch() -> AnyPublisher<String, Never> {

    return searchTerm

      .debounce(for: 0.5, scheduler: .main) // Debounce input

      .connect() // Connect after debouncing

  }

}



let viewModel = SearchViewModel()



// Debounced search subscription

let debouncedSubscription = viewModel.debouncedSearch()

  .sink(receiveValue: { term in

    print("Debounced search term: \(term)")

    // Perform actual search with debounced term

  })



// Trigger some input

viewModel.search(withTerm: "apple")

viewModel.search(withTerm: "banana") // Debounce will only emit "banana"

 

2. 여러 가입자와 네트워크 데이터 공유:

import Combine



class NetworkManager {

  private let url = URL(string: "https://jsonplaceholder.typicode.com/posts")!

  private let subject = PassthroughSubject<[Post], Error>()

  var postsPublisher: AnyPublisher<[Post], Error> {

    subject.eraseToAnyPublisher()

  }



  func fetchPosts() {

    URLSession.shared.dataTaskPublisher(for: url)

      .map { $0.data }

      .decode(type: [Post].self, decoder: JSONDecoder())

      .sink(receiveCompletion: { completion in

        if case .failure(let error) = completion {

          self.subject.send(completion: .failure(error))

        }

      }, receiveValue: { posts in

        self.subject.send(posts) // Emit posts to all subscribers

      })

      .store(in: &cancellables) // Manage subscriptions

  }



  func connect() {

    fetchPosts() // Start fetching and emitting posts

  }



  private var cancellables: Set<AnyCancellable> = []

}



let networkManager = NetworkManager()



// Subscriber 1

let subscriber1 = networkManager.postsPublisher

  .sink(receiveCompletion: { completion in

    if case .failure(let error) = completion {

      print("Subscriber 1: Error fetching posts: \(error)")

    }

  }, receiveValue: { posts in

    print("Subscriber 1: Received posts: \(posts)")

  })



// Subscriber 2 (after some delay)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {

  let subscriber2 = networkManager.postsPublisher

    .sink(receiveCompletion: { completion in

      if case .failure(let error) = completion {

        print("Subscriber 2: Error fetching posts: \(error)")

      }

    }, receiveValue: { posts in

      print("Subscriber 2: Received posts: \(posts)")

    })

}



// Trigger the connection (start fetching posts)

networkManager.connect()

 

3. 테스트를 위한 데이터 스트림 재생:

import Combine



class DataStreamProvider {

  private let subject = PassthroughSubject<[Int], Never>()

  var dataStream: AnyPublisher<[Int], Never> {

    subject.eraseToAnyPublisher()

  }



  func generateData() {

    for i in 1...10 {

      subject.send([i]) // Emit data points

      DispatchQueue.main.asyncAfter(deadline: .now() + 0.2) {

        // Simulate some delay

      }

    }

  }



  func connect() {

    generateData() // Start generating and emitting data

  }

}



let dataProvider = DataStreamProvider()



// Create a reusable publisher with replay

let replayingPublisher = dataProvider.dataStream

  .replay(1) // Replay the last 1 emitted value

  .autoconnect() // Auto-connect on subscription



// Subscriber 1

let subscription1 = replayingPublisher

  .sink(receiveValue: { values in

    print("Subscriber 1: Received values: \(values)")

  })



// Subscriber 2 (after some delay)

DispatchQueue.main.asyncAfter(deadline: .now()