Timeline: wrap datasource in an actor for safety and perforamances

This commit is contained in:
Thomas Ricouard 2023-02-18 18:04:46 +01:00
parent b1424aadd0
commit 7112e6515b
2 changed files with 128 additions and 63 deletions

View file

@ -0,0 +1,54 @@
import Foundation
import Models
actor TimelineDatasource {
private var statuses: [Status] = []
var isEmpty: Bool {
statuses.isEmpty
}
func get() -> [Status] {
statuses
}
func reset() {
statuses = []
}
func indexOf(statusId: String) -> Int? {
statuses.firstIndex(where: { $0.id == statusId })
}
func contains(statusId: String) -> Bool {
statuses.contains(where: { $0.id == statusId })
}
func set(_ statuses: [Status]) {
self.statuses = statuses
}
func append(_ status: Status) {
statuses.append(status)
}
func append(contentOf: [Status]) {
statuses.append(contentsOf: contentOf)
}
func insert(_ status: Status, at: Int) {
statuses.insert(status, at: at)
}
func insert(contentOf: [Status], at: Int) {
statuses.insert(contentsOf: contentOf, at: at)
}
func replace(_ status: Status, at: Int) {
statuses[at] = status
}
func remove(_ statusId: String) {
statuses.removeAll(where: { $0.id == statusId })
}
}

View file

@ -17,7 +17,7 @@ class TimelineViewModel: ObservableObject {
timeline = .home
}
if oldValue != timeline {
statuses = []
await datasource.reset()
pendingStatusesObserver.pendingStatuses = []
tag = nil
}
@ -40,7 +40,7 @@ class TimelineViewModel: ObservableObject {
@Published var tag: Tag?
// Internal source of truth for a timeline.
private var statuses: [Status] = []
private var datasource = TimelineDatasource()
private let cache: TimelineCache = .shared
private var visibileStatusesIds = Set<String>()
private var canStreamEvents: Bool = true
@ -52,7 +52,9 @@ class TimelineViewModel: ObservableObject {
var client: Client? {
didSet {
if oldValue != client {
statuses = []
Task {
await datasource.reset()
}
}
}
}
@ -92,41 +94,39 @@ class TimelineViewModel: ObservableObject {
}
func handleEvent(event: any StreamEvent, currentAccount _: CurrentAccount) {
if let event = event as? StreamEventUpdate,
canStreamEvents,
isTimelineVisible,
pendingStatusesEnabled,
!statuses.contains(where: { $0.id == event.status.id })
{
pendingStatusesObserver.pendingStatuses.insert(event.status.id, at: 0)
let newStatus = event.status
if let accountId {
if newStatus.mentions.first(where: { $0.id == accountId }) != nil {
newStatus.userMentioned = true
Task {
if let event = event as? StreamEventUpdate,
canStreamEvents,
isTimelineVisible,
pendingStatusesEnabled,
await !datasource.contains(statusId: event.status.id)
{
pendingStatusesObserver.pendingStatuses.insert(event.status.id, at: 0)
let newStatus = event.status
if let accountId {
if newStatus.mentions.first(where: { $0.id == accountId }) != nil {
newStatus.userMentioned = true
}
}
}
statuses.insert(newStatus, at: 0)
Task {
await datasource.insert(newStatus, at: 0)
await cacheHome()
}
withAnimation {
statusesState = .display(statuses: statuses, nextPageState: .hasNextPage)
}
} else if let event = event as? StreamEventDelete {
withAnimation {
statuses.removeAll(where: { $0.id == event.status })
Task {
await cacheHome()
let statuses = await datasource.get()
withAnimation {
statusesState = .display(statuses: statuses, nextPageState: .hasNextPage)
}
statusesState = .display(statuses: statuses, nextPageState: .hasNextPage)
}
} else if let event = event as? StreamEventStatusUpdate {
if let originalIndex = statuses.firstIndex(where: { $0.id == event.status.id }) {
statuses[originalIndex] = event.status
Task {
await cacheHome()
} else if let event = event as? StreamEventDelete {
await datasource.remove(event.status)
await cacheHome()
let statuses = await datasource.get()
withAnimation {
statusesState = .display(statuses: statuses, nextPageState: .hasNextPage)
}
} else if let event = event as? StreamEventStatusUpdate {
if let originalIndex = await datasource.indexOf(statusId: event.status.id) {
await datasource.replace(event.status, at: originalIndex)
await cacheHome()
statusesState = await .display(statuses: datasource.get(), nextPageState: .hasNextPage)
}
statusesState = .display(statuses: statuses, nextPageState: .hasNextPage)
}
}
}
@ -137,7 +137,7 @@ class TimelineViewModel: ObservableObject {
extension TimelineViewModel {
private func cacheHome() async {
if let client, timeline == .home {
await cache.set(statuses: statuses, client: client)
await cache.set(statuses: datasource.get(), client: client)
}
}
@ -155,12 +155,12 @@ extension TimelineViewModel: StatusesFetcher {
func fetchStatuses() async {
guard let client else { return }
do {
if statuses.isEmpty || timeline == .trending {
if !statuses.isEmpty && timeline == .trending {
if await datasource.isEmpty || timeline == .trending {
if await !datasource.isEmpty && timeline == .trending {
return
}
try await fetchFirstPage(client: client)
} else if let latest = statuses.first {
} else if let latest = await datasource.get().first {
try await fetchNewPagesFrom(latestStatus: latest, client: client)
}
} catch {
@ -174,7 +174,7 @@ extension TimelineViewModel: StatusesFetcher {
private func fetchFirstPage(client: Client) async throws {
pendingStatusesObserver.pendingStatuses = []
if statuses.isEmpty {
if await datasource.isEmpty {
statusesState = .loading
}
@ -184,17 +184,18 @@ extension TimelineViewModel: StatusesFetcher {
!cachedStatuses.isEmpty,
timeline == .home
{
statuses = cachedStatuses
await datasource.set(cachedStatuses)
if let latestSeenId = await cache.getLatestSeenStatus(for: client)?.last,
let index = statuses.firstIndex(where: { $0.id == latestSeenId }),
let index = await datasource.indexOf(statusId: latestSeenId),
index > 0
{
// Restore cache and scroll to latest seen status.
statusesState = .display(statuses: statuses, nextPageState: .hasNextPage)
statusesState = await .display(statuses: datasource.get(), nextPageState: .hasNextPage)
scrollToIndexAnimated = false
scrollToIndex = index + 1
} else {
// Restore cache and scroll to top.
let statuses = await datasource.get()
withAnimation {
statusesState = .display(statuses: statuses, nextPageState: .hasNextPage)
}
@ -202,14 +203,15 @@ extension TimelineViewModel: StatusesFetcher {
// And then we fetch statuses again toget newest statuses from there.
await fetchStatuses()
} else {
statuses = try await client.get(endpoint: timeline.endpoint(sinceId: nil,
maxId: nil,
minId: nil,
offset: 0))
var statuses: [Status] = try await client.get(endpoint: timeline.endpoint(sinceId: nil,
maxId: nil,
minId: nil,
offset: 0))
updateMentionsToBeHighlighted(&statuses)
ReblogCache.shared.removeDuplicateReblogs(&statuses)
await datasource.set(statuses)
await cacheHome()
withAnimation {
@ -225,8 +227,9 @@ extension TimelineViewModel: StatusesFetcher {
var newStatuses: [Status] = await fetchNewPages(minId: latestStatus.id, maxPages: 10)
// Dedup statuses, a status with the same id could have been streamed in.
let ids = await datasource.get().map{ $0.id }
newStatuses = newStatuses.filter { status in
!statuses.contains(where: { $0.id == status.id })
!ids.contains(where: { $0 == status.id })
}
ReblogCache.shared.removeDuplicateReblogs(&newStatuses)
@ -256,10 +259,10 @@ extension TimelineViewModel: StatusesFetcher {
}
// Keep track of the top most status, so we can scroll back to it after view update.
let topStatusId = statuses.first?.id
let topStatusId = await datasource.get().first?.id
// Insert new statuses in internal datasource.
statuses.insert(contentsOf: newStatuses, at: 0)
await datasource.insert(contentOf: newStatuses, at: 0)
// Cache statuses for home timeline.
await cacheHome()
@ -267,8 +270,10 @@ extension TimelineViewModel: StatusesFetcher {
// If pending statuses are not enabled, we simply load status on the top regardless of the current position.
if !pendingStatusesEnabled {
pendingStatusesObserver.pendingStatuses = []
let statuses = await datasource.get()
withAnimation {
statusesState = .display(statuses: statuses, nextPageState: statuses.count < 20 ? .none : .hasNextPage)
statusesState = .display(statuses: statuses,
nextPageState: statuses.count < 20 ? .none : .hasNextPage)
canStreamEvents = true
}
} else {
@ -279,7 +284,9 @@ extension TimelineViewModel: StatusesFetcher {
// We need to update the statuses state, and then scroll to the previous top most status.
if let topStatusId, visibileStatusesIds.contains(topStatusId), scrollToTopVisible {
pendingStatusesObserver.disableUpdate = true
statusesState = .display(statuses: statuses, nextPageState: statuses.count < 20 ? .none : .hasNextPage)
let statuses = await datasource.get()
statusesState = .display(statuses: statuses,
nextPageState: statuses.count < 20 ? .none : .hasNextPage)
scrollToIndexAnimated = false
scrollToIndex = newStatuses.count + 1
DispatchQueue.main.async {
@ -288,15 +295,17 @@ extension TimelineViewModel: StatusesFetcher {
}
} else {
// This will keep the scroll position (if the list is scrolled) and prepend statuses on the top.
let statuses = await datasource.get()
withAnimation {
statusesState = .display(statuses: statuses, nextPageState: statuses.count < 20 ? .none : .hasNextPage)
statusesState = .display(statuses: statuses,
nextPageState: statuses.count < 20 ? .none : .hasNextPage)
canStreamEvents = true
}
}
// We trigger a new fetch so we can get the next new statuses if any.
// If none, it'll stop there.
if !Task.isCancelled, let latest = statuses.first, let client {
if !Task.isCancelled, let latest = await datasource.get().first, let client {
try await fetchNewPagesFrom(latestStatus: latest, client: client)
}
}
@ -308,10 +317,11 @@ extension TimelineViewModel: StatusesFetcher {
var allStatuses: [Status] = []
var latestMinId = minId
do {
while var newStatuses: [Status] = try await client.get(endpoint: timeline.endpoint(sinceId: nil,
maxId: nil,
minId: latestMinId,
offset: statuses.count)),
while var newStatuses: [Status] =
try await client.get(endpoint: timeline.endpoint(sinceId: nil,
maxId: nil,
minId: latestMinId,
offset: datasource.get().count)),
!newStatuses.isEmpty,
pagesLoaded < maxPages
{
@ -332,19 +342,20 @@ extension TimelineViewModel: StatusesFetcher {
func fetchNextPage() async {
guard let client else { return }
do {
guard let lastId = statuses.last?.id else { return }
statusesState = .display(statuses: statuses, nextPageState: .loadingNextPage)
guard let lastId = await datasource.get().last?.id else { return }
statusesState = await .display(statuses: datasource.get(), nextPageState: .loadingNextPage)
var newStatuses: [Status] = try await client.get(endpoint: timeline.endpoint(sinceId: nil,
maxId: lastId,
minId: nil,
offset: statuses.count))
offset: datasource.get().count))
updateMentionsToBeHighlighted(&newStatuses)
ReblogCache.shared.removeDuplicateReblogs(&newStatuses)
statuses.append(contentsOf: newStatuses)
await datasource.append(contentOf: newStatuses)
statusesState = .display(statuses: statuses, nextPageState: newStatuses.count < 20 ? .none : .hasNextPage)
statusesState = await .display(statuses: datasource.get(),
nextPageState: newStatuses.count < 20 ? .none : .hasNextPage)
} catch {
statusesState = .error(error: error)
}