replace to async method for URLSessionWebSocketTask.receive()

This commit is contained in:
Keita Watanabe 2024-07-25 17:28:51 +09:00
parent 4c1ba2168d
commit c0c6dafd17
3 changed files with 46 additions and 50 deletions

View file

@ -71,7 +71,9 @@ extension IceCubesApp {
.onChange(of: appAccountsManager.currentClient) { _, newValue in
setNewClientsInEnv(client: newValue)
if newValue.isAuth {
watcher.watch(streams: [.user, .direct])
Task {
await watcher.watch(streams: [.user, .direct])
}
}
}
}

View file

@ -44,8 +44,8 @@ struct IceCubesApp: App {
userPreferences.setClient(client: client)
Task {
await currentInstance.fetchCurrentInstance()
watcher.setClient(client: client, instanceStreamingURL: currentInstance.instance?.urls?.streamingApi)
watcher.watch(streams: [.user, .direct])
await watcher.setClient(client: client, instanceStreamingURL: currentInstance.instance?.urls?.streamingApi)
await watcher.watch(streams: [.user, .direct])
}
}
@ -54,10 +54,10 @@ struct IceCubesApp: App {
case .background:
watcher.stopWatching()
case .active:
watcher.watch(streams: [.user, .direct])
UNUserNotificationCenter.current().setBadgeCount(0)
userPreferences.reloadNotificationsCount(tokens: appAccountsManager.availableAccounts.compactMap(\.oauthToken))
Task {
await watcher.watch(streams: [.user, .direct])
try? await UNUserNotificationCenter.current().setBadgeCount(0)
userPreferences.reloadNotificationsCount(tokens: appAccountsManager.availableAccounts.compactMap(\.oauthToken))
await userPreferences.refreshServerPreferences()
}
default:

View file

@ -36,16 +36,16 @@ import OSLog
decoder.keyDecodingStrategy = .convertFromSnakeCase
}
public func setClient(client: Client, instanceStreamingURL: URL?) {
public func setClient(client: Client, instanceStreamingURL: URL?) async {
if self.client != nil {
stopWatching()
}
self.client = client
self.instanceStreamingURL = instanceStreamingURL
connect()
await connect()
}
private func connect() {
private func connect() async {
guard let task = try? client?.makeWebSocketTask(
endpoint: Streaming.streaming,
instanceStreamingURL: instanceStreamingURL
@ -54,15 +54,15 @@ import OSLog
}
self.task = task
self.task?.resume()
receiveMessage()
await receiveMessage()
}
public func watch(streams: [Stream]) {
public func watch(streams: [Stream]) async {
if client?.isAuth == false {
return
}
if task == nil {
connect()
await connect()
}
watchedStreams = streams
for stream in streams {
@ -83,11 +83,10 @@ import OSLog
}
}
private func receiveMessage() {
task?.receive(completionHandler: { [weak self] result in
guard let self else { return }
switch result {
case let .success(message):
private func receiveMessage() async {
do {
guard let message = try await task?.receive() else { return }
switch message {
case let .string(string):
do {
@ -98,12 +97,10 @@ import OSLog
let rawEvent = try decoder.decode(RawStreamEvent.self, from: data)
logger.info("Stream update: \(rawEvent.event)")
if let event = rawEventToEvent(rawEvent: rawEvent) {
Task { @MainActor in
self.events.append(event)
self.latestEvent = event
events.append(event)
latestEvent = event
if let event = event as? StreamEventNotification, event.notification.status?.visibility != .direct {
self.unreadNotificationsCount += 1
}
unreadNotificationsCount += 1
}
}
} catch {
@ -114,18 +111,15 @@ import OSLog
break
}
receiveMessage()
case .failure:
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(retryDelay)) {
self.retryDelay += 30
self.stopWatching()
self.connect()
self.watch(streams: self.watchedStreams)
await receiveMessage()
} catch {
try? await Task.sleep(nanoseconds: UInt64(retryDelay * 1000 * 1000 * 1000))
retryDelay += 30
stopWatching()
await connect()
await watch(streams: watchedStreams)
}
}
})
}
private func rawEventToEvent(rawEvent: RawStreamEvent) -> (any StreamEvent)? {
guard let payloadData = rawEvent.payload.data(using: .utf8) else {