From 71c755ef04739050e3c6cd8ecf3613670ec79762 Mon Sep 17 00:00:00 2001 From: Felix Hilgers Date: Wed, 5 Mar 2025 16:10:56 +0000 Subject: [PATCH 1/4] extract notifications from syncresponse --- .../notification/NotificationService.kt | 123 ++++++++++++------ .../trixnity/client/room/RoomService.kt | 53 +++++--- .../trixnity/client/mocks/RoomServiceMock.kt | 8 +- .../notification/NotificationServiceTest.kt | 114 ++++++++++------ 4 files changed, 200 insertions(+), 98 deletions(-) diff --git a/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/notification/NotificationService.kt b/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/notification/NotificationService.kt index b8ed2002c..8e9b8bebe 100644 --- a/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/notification/NotificationService.kt +++ b/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/notification/NotificationService.kt @@ -1,7 +1,7 @@ package net.folivo.trixnity.client.notification import io.github.oshai.kotlinlogging.KotlinLogging -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.json.* @@ -11,7 +11,8 @@ import net.folivo.trixnity.client.room.RoomService import net.folivo.trixnity.client.store.* import net.folivo.trixnity.clientserverapi.client.MatrixClientServerApiClient import net.folivo.trixnity.clientserverapi.client.SyncState -import net.folivo.trixnity.core.ClientEventEmitter +import net.folivo.trixnity.clientserverapi.model.sync.Sync +import net.folivo.trixnity.core.ClientEventEmitter.Priority import net.folivo.trixnity.core.UserInfo import net.folivo.trixnity.core.model.events.* import net.folivo.trixnity.core.model.events.ClientEvent.RoomEvent @@ -39,6 +40,11 @@ interface NotificationService { decryptionTimeout: Duration = 5.seconds, syncResponseBufferSize: Int = 4 ): Flow + + fun getNotificationsOnce( + response: Sync.Response, + decryptionTimeout: Duration = 5.seconds, + ): Flow } class NotificationServiceImpl( @@ -55,49 +61,27 @@ class NotificationServiceImpl( private val roomSizePattern = Regex("\\s*(==|<|>|<=|>=)\\s*([0-9]+)") + override fun getNotificationsOnce( + response: Sync.Response, + decryptionTimeout: Duration, + ) : Flow = evaluateDefaultPushRules( + extractClientEvents(response, decryptionTimeout) + ) + @OptIn(ExperimentalCoroutinesApi::class) override fun getNotifications( decryptionTimeout: Duration, syncResponseBufferSize: Int, ): Flow = channelFlow { currentSyncState.first { it == SyncState.STARTED || it == SyncState.RUNNING } - val syncResponseFlow = - api.sync.subscribeAsFlow(ClientEventEmitter.Priority.AFTER_DEFAULT).map { it.syncResponse } - val pushRules = - globalAccountDataStore.get().map { event -> - event?.content?.global?.let { globalRuleSet -> - log.trace { "global rule set: $globalRuleSet" } - globalRuleSet.override.orEmpty() + - globalRuleSet.content.orEmpty() + - globalRuleSet.room.orEmpty() + - globalRuleSet.sender.orEmpty() + - globalRuleSet.underride.orEmpty() - } ?: listOf() - }.stateIn(this) - val inviteEventsFlow = syncResponseFlow - .map { syncResponse -> - syncResponse.room?.invite?.values?.flatMap { inviteRoom -> - inviteRoom.inviteState?.events.orEmpty() - }?.asFlow() - }.filterNotNull() - .flattenConcat() - - val timelineEventsFlow = - room.getTimelineEventsFromNowOn(decryptionTimeout, syncResponseBufferSize) - .map { extractDecryptedEvent(it) } - .filterNotNull() - .filter { - it.sender != userInfo.userId - } - merge(inviteEventsFlow, timelineEventsFlow) - .map { - evaluatePushRules( - event = it, - allRules = pushRules.value - ) - }.filterNotNull() - .collect { send(it) } + val clientEvents = api.sync.subscribeAsFlow(Priority.AFTER_DEFAULT) + .buffer(syncResponseBufferSize) + .flatMapConcat { extractClientEvents(it.syncResponse, decryptionTimeout) } + + evaluateDefaultPushRules(clientEvents).collect { + send(it) + } }.buffer(0) private fun extractDecryptedEvent(timelineEvent: TimelineEvent): RoomEvent<*>? { @@ -296,4 +280,67 @@ class NotificationServiceImpl( }.toRegex() } + private fun pushRulesFlow() = globalAccountDataStore.get() + .map { event -> + event?.content?.global?.let { globalRuleSet -> + log.trace { "global rule set: $globalRuleSet" } + ( + globalRuleSet.override.orEmpty() + + globalRuleSet.content.orEmpty() + + globalRuleSet.room.orEmpty() + + globalRuleSet.sender.orEmpty() + + globalRuleSet.underride.orEmpty() + ) + } ?: emptyList() + } + + private fun extractInviteEventsFromSyncResponse( + response: Sync.Response, + ): Flow> = + response.room?.invite?.values + ?.flatMap { inviteRoom -> + inviteRoom.inviteState?.events.orEmpty() + } + ?.asFlow() + ?: emptyFlow() + + private fun extractTimelineEventsFromSyncResponse( + response: Sync.Response, + decryptionTimeout: Duration, + ): Flow> = + room.getTimelineEventsOnce(response, decryptionTimeout) + .map { extractDecryptedEvent(it) } + .filterNotNull() + .filter { + it.sender != userInfo.userId + } + + private fun extractClientEvents( + response: Sync.Response, + decryptionTimeout: Duration, + ): Flow> = merge( + extractInviteEventsFromSyncResponse(response), + extractTimelineEventsFromSyncResponse(response, decryptionTimeout) + ) + + private fun evaluateDefaultPushRules( + clientEvents: Flow> + ): Flow = flow { + coroutineScope { + val allRules = pushRulesFlow().stateIn(this) + + clientEvents.map { event -> + + evaluatePushRules( + event = event, + allRules = allRules.value + ) + }.filterNotNull().collect { + emit(it) + } + + coroutineContext.cancelChildren() + } + } + } \ No newline at end of file diff --git a/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/room/RoomService.kt b/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/room/RoomService.kt index 1d0b00e00..3146d62b0 100644 --- a/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/room/RoomService.kt +++ b/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/room/RoomService.kt @@ -16,6 +16,7 @@ import net.folivo.trixnity.clientserverapi.client.SyncState.RUNNING import net.folivo.trixnity.clientserverapi.model.rooms.GetEvents.Direction import net.folivo.trixnity.clientserverapi.model.rooms.GetEvents.Direction.BACKWARDS import net.folivo.trixnity.clientserverapi.model.rooms.GetEvents.Direction.FORWARDS +import net.folivo.trixnity.clientserverapi.model.sync.Sync import net.folivo.trixnity.core.ClientEventEmitter.Priority import net.folivo.trixnity.core.UserInfo import net.folivo.trixnity.core.model.EventId @@ -123,6 +124,11 @@ interface RoomService { syncResponseBufferSize: Int = 4, ): Flow + fun getTimelineEventsOnce( + response: Sync.Response, + decryptionTimeout: Duration = 30.seconds, + ): Flow + /** * Returns a [Timeline] for a room. */ @@ -560,27 +566,38 @@ class RoomServiceImpl( syncResponseBufferSize: Int, ): Flow = api.sync.subscribeAsFlow(Priority.AFTER_DEFAULT).map { it.syncResponse } - .buffer(syncResponseBufferSize).flatMapConcat { syncResponse -> - coroutineScope { - val timelineEvents = - syncResponse.room?.join?.values?.flatMap { it.timeline?.events.orEmpty() }.orEmpty() + - syncResponse.room?.leave?.values?.flatMap { it.timeline?.events.orEmpty() }.orEmpty() - timelineEvents.map { - async { - getTimelineEvent(it.roomId, it.id) { - this.decryptionTimeout = decryptionTimeout - } - } - }.asFlow() - .map { timelineEventFlow -> - // we must wait until TimelineEvent is saved into store - val notNullTimelineEvent = timelineEventFlow.await().filterNotNull().first() - withTimeoutOrNull(decryptionTimeout) { - timelineEventFlow.await().filterNotNull().first { it.content != null } - } ?: notNullTimelineEvent + .buffer(syncResponseBufferSize).flatMapConcat { getTimelineEventsOnce(it, decryptionTimeout) } + + override fun getTimelineEventsOnce( + response: Sync.Response, + decryptionTimeout: Duration, + ): Flow { + val timelineEvents = + response.room?.join?.values?.flatMap { it.timeline?.events.orEmpty() }.orEmpty() + + response.room?.leave?.values?.flatMap { it.timeline?.events.orEmpty() }.orEmpty() + + return flow { + coroutineScope { + val deferredTimelineEvents = timelineEvents.map { + async { + getTimelineEvent(it.roomId, it.id) { + this.decryptionTimeout = decryptionTimeout } + } + } + + deferredTimelineEvents.forEach { deferredTimelineEvent -> + // we must wait until TimelineEvent is saved into store + val notNullTimelineEvent = deferredTimelineEvent.await().filterNotNull().first() + val timelineEvent = withTimeoutOrNull(decryptionTimeout) { + deferredTimelineEvent.await().filterNotNull().first { it.content != null } + } ?: notNullTimelineEvent + + emit(timelineEvent) } } + } + } override fun getTimeline( roomId: RoomId, diff --git a/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/mocks/RoomServiceMock.kt b/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/mocks/RoomServiceMock.kt index 2e5e91cad..e5dc24cff 100644 --- a/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/mocks/RoomServiceMock.kt +++ b/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/mocks/RoomServiceMock.kt @@ -9,6 +9,7 @@ import net.folivo.trixnity.client.store.RoomOutboxMessage import net.folivo.trixnity.client.store.TimelineEvent import net.folivo.trixnity.client.store.TimelineEventRelation import net.folivo.trixnity.clientserverapi.model.rooms.GetEvents +import net.folivo.trixnity.clientserverapi.model.sync.Sync import net.folivo.trixnity.core.model.EventId import net.folivo.trixnity.core.model.RoomId import net.folivo.trixnity.core.model.UserId @@ -89,6 +90,11 @@ class RoomServiceMock : RoomService { throw NotImplementedError() } + var returnGetTimelineEventsOnce: Flow = flowOf() + override fun getTimelineEventsOnce(response: Sync.Response, decryptionTimeout: Duration): Flow { + return returnGetTimelineEventsOnce + } + override fun getTimelineEventRelations( roomId: RoomId, eventId: EventId, @@ -177,4 +183,4 @@ class RoomServiceMock : RoomService { ): Flow?>>> { throw NotImplementedError() } -} \ No newline at end of file +} diff --git a/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/notification/NotificationServiceTest.kt b/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/notification/NotificationServiceTest.kt index 6cf3ffc97..52484e285 100644 --- a/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/notification/NotificationServiceTest.kt +++ b/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/notification/NotificationServiceTest.kt @@ -42,6 +42,7 @@ import net.folivo.trixnity.core.model.push.PushRuleSet import net.folivo.trixnity.core.serialization.createMatrixEventJson import net.folivo.trixnity.testutils.PortableMockEngineConfig import net.folivo.trixnity.testutils.matrixJsonEndpoint +import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds @@ -216,18 +217,35 @@ private val body: ShouldSpec.() -> Unit = { } suspend fun checkNoNotification() = coroutineScope { - val notifications = async { cut.getNotifications(0.seconds).first() } - api.sync.startOnce( + api.sync.start( getBatchToken = { null }, setBatchToken = {}, - ).getOrThrow() + scope = this + ) + + val notifications = async { cut.getNotifications(0.seconds).first() } continually(50.milliseconds) { notifications.isCompleted shouldBe false } notifications.cancel() + + api.sync.cancel(true) } + suspend fun checkNotifications(block: suspend (Flow) -> Unit) = coroutineScope { + api.sync.start( + getBatchToken = { null }, + setBatchToken = {}, + scope = this, + ) + + block(cut.getNotifications(0.seconds)) + + api.sync.cancel(true) + } + + context(NotificationServiceImpl::getNotifications.name) { should("wait for sync to be started or running") { currentSyncState.value = SyncState.INITIAL_SYNC @@ -265,17 +283,10 @@ private val body: ShouldSpec.() -> Unit = { ) } } - val notification = async { cut.getNotifications(0.seconds).first() } - delay(50) - api.sync.startOnce( - getBatchToken = { null }, - setBatchToken = {}, - ).getOrThrow() - continually(50.milliseconds) { - notification.isCompleted shouldBe false - } - notification.cancel() + + checkNoNotification() } + should("do nothing when no events") { apiConfig.endpoints { matrixJsonEndpoint(Sync(timeout = 0)) { @@ -321,13 +332,10 @@ private val body: ShouldSpec.() -> Unit = { ) } } - val notification = async { cut.getNotifications(0.seconds).first() } - delay(50) - api.sync.startOnce( - getBatchToken = { null }, - setBatchToken = {}, - ).getOrThrow() - notification.await() shouldBe Notification(invitation, setOf(Notify)) + + checkNotifications { + it.first() shouldBe Notification(invitation, setOf(Notify)) + } } context("new timeline events") { val timelineEvent = messageEventWithContent( @@ -343,10 +351,12 @@ private val body: ShouldSpec.() -> Unit = { Sync.Response("next") } } - room.returnGetTimelineEventsFromNowOn = flowOf(timelineEvent) + room.returnGetTimelineEventsOnce = flowOf(timelineEvent) } should("check push rules and notify") { - cut.getNotifications(0.seconds).first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + checkNotifications { + it.first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + } } should("have correct order") { val timelineEvents = (0..99).map { @@ -356,9 +366,10 @@ private val body: ShouldSpec.() -> Unit = { ) ) } - room.returnGetTimelineEventsFromNowOn = timelineEvents.asFlow() - cut.getNotifications(0.seconds).take(100).toList() shouldBe timelineEvents.map { - Notification(it.event, setOf(Notify)) + room.returnGetTimelineEventsOnce = timelineEvents.asFlow() + + checkNotifications { + it.take(100).toList() shouldBe timelineEvents.map { Notification(it.event, setOf(Notify)) } } } should("not notify on own messages") { @@ -370,13 +381,16 @@ private val body: ShouldSpec.() -> Unit = { sender = if (it == 0 || it == 9) user1 else otherUser ) } - room.returnGetTimelineEventsFromNowOn = timelineEvents.asFlow() - cut.getNotifications(0.seconds).take(8).toList() shouldBe - timelineEvents.drop(1).dropLast(1).map { - Notification(it.event, setOf(Notify)) - } + room.returnGetTimelineEventsOnce = timelineEvents.asFlow() + + checkNotifications { + it.take(8).toList() shouldBe timelineEvents.drop(1).dropLast(1).map { + Notification(it.event, setOf(Notify)) + } + } } } + context("new decrypted timeline events") { val timelineEvent = messageEventWithContent( roomId, MegolmEncryptedMessageEventContent( @@ -391,15 +405,17 @@ private val body: ShouldSpec.() -> Unit = { Sync.Response("next") } } - room.returnGetTimelineEventsFromNowOn = flowOf(timelineEvent) + room.returnGetTimelineEventsOnce = flowOf(timelineEvent) } should("check push rules and notify") { setUser1DisplayName(roomId) globalAccountDataStore.save(GlobalAccountDataEvent(pushRules(listOf(pushRuleDisplayName())))) - assertSoftly(cut.getNotifications(0.seconds).first()) { - event.idOrNull shouldBe timelineEvent.eventId - event.content shouldBe timelineEvent.content?.getOrThrow() + checkNotifications { + assertSoftly(it.first()) { + event.idOrNull shouldBe timelineEvent.eventId + event.content shouldBe timelineEvent.content?.getOrThrow() + } } } } @@ -415,7 +431,7 @@ private val body: ShouldSpec.() -> Unit = { Sync.Response("next") } } - room.returnGetTimelineEventsFromNowOn = flowOf(timelineEvent) + room.returnGetTimelineEventsOnce = flowOf(timelineEvent) } context("room member count") { beforeTest { @@ -427,7 +443,10 @@ private val body: ShouldSpec.() -> Unit = { roomStore.update(roomId) { Room(roomId, name = RoomDisplayName(summary = RoomSummary(joinedMemberCount = 2))) } - cut.getNotifications(0.seconds).first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + + checkNotifications { + it.first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + } } should("not notify when not met") { roomStore.update(roomId) { @@ -456,7 +475,10 @@ private val body: ShouldSpec.() -> Unit = { stateKey = "", ) ) - cut.getNotifications(0.seconds).first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + + checkNotifications { + it.first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + } } should("not notify when not met") { roomStateStore.save( @@ -491,7 +513,10 @@ private val body: ShouldSpec.() -> Unit = { ) ) setUser1DisplayName(roomId) - cut.getNotifications(0.seconds).first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + + checkNotifications { + it.first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + } } should("not notify when one condition matches") { globalAccountDataStore.save( @@ -503,12 +528,17 @@ private val body: ShouldSpec.() -> Unit = { } } should("always notify when no conditions") { + println("executing mythings") globalAccountDataStore.save( GlobalAccountDataEvent( pushRules(listOf(pushRuleNoCondition())) ) ) - cut.getNotifications(0.seconds).first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + checkNotifications { + println("checking notifications") + it.first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + println("done") + } } should("override") { globalAccountDataStore.save( @@ -579,7 +609,9 @@ private val body: ShouldSpec.() -> Unit = { ) ) - cut.getNotifications(0.seconds).first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + checkNotifications { + it.first() shouldBe Notification(timelineEvent.event, setOf(Notify)) + } } } } @@ -595,7 +627,7 @@ private val body: ShouldSpec.() -> Unit = { Sync.Response("next") } } - room.returnGetTimelineEventsFromNowOn = flowOf(timelineEvent) + room.returnGetTimelineEventsOnce = flowOf(timelineEvent) } should("not notify when action says it") { globalAccountDataStore.save( -- GitLab From d82704cc47d31aef277d00dea61a7261e72dd2fd Mon Sep 17 00:00:00 2001 From: Felix Hilgers Date: Thu, 6 Mar 2025 13:35:10 +0000 Subject: [PATCH 2/4] run syncOnce lambda before transaction --- .../clientserverapi/client/SyncApiClient.kt | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/trixnity-clientserverapi/trixnity-clientserverapi-client/src/commonMain/kotlin/net/folivo/trixnity/clientserverapi/client/SyncApiClient.kt b/trixnity-clientserverapi/trixnity-clientserverapi-client/src/commonMain/kotlin/net/folivo/trixnity/clientserverapi/client/SyncApiClient.kt index 626827b89..341dce9fb 100644 --- a/trixnity-clientserverapi/trixnity-clientserverapi-client/src/commonMain/kotlin/net/folivo/trixnity/clientserverapi/client/SyncApiClient.kt +++ b/trixnity-clientserverapi/trixnity-clientserverapi-client/src/commonMain/kotlin/net/folivo/trixnity/clientserverapi/client/SyncApiClient.kt @@ -196,7 +196,8 @@ class SyncApiClientImpl( timeout = if (_currentSyncState.value == STARTED) ZERO else timeout, withTransaction = withTransaction, allowStoppingRequest = true, - asUserId = asUserId + asUserId = asUserId, + runOnce = { it } ) delay(syncLoopDelay) } catch (error: Throwable) { @@ -249,8 +250,7 @@ class SyncApiClientImpl( val isInitialSync = getBatchToken() == null log.info { "started single sync (initial=$isInitialSync)" } _currentSyncState.value = if (isInitialSync) INITIAL_SYNC else STARTED - val syncResponse = - syncAndResponse( + syncAndResponse( getBatchToken = getBatchToken, setBatchToken = setBatchToken, filter = filter, @@ -258,9 +258,9 @@ class SyncApiClientImpl( timeout = timeout, withTransaction = withTransaction, allowStoppingRequest = false, - asUserId = asUserId + asUserId = asUserId, + runOnce = runOnce ) - runOnce(syncResponse) } }.onSuccess { log.info { "stopped single sync with success" } @@ -270,7 +270,7 @@ class SyncApiClientImpl( _currentSyncState.value = STOPPED } - private suspend fun syncAndResponse( + private suspend fun syncAndResponse( getBatchToken: suspend () -> String?, setBatchToken: suspend (String) -> Unit, filter: String?, @@ -279,7 +279,8 @@ class SyncApiClientImpl( withTransaction: suspend (block: suspend () -> Unit) -> Unit, allowStoppingRequest: Boolean, asUserId: UserId?, - ): Sync.Response { + runOnce: suspend (Sync.Response) -> T + ): T { val batchToken = getBatchToken() val (response, measuredSyncDuration) = measureTime { coroutineScope { @@ -305,6 +306,8 @@ class SyncApiClientImpl( } log.info { "received sync response after about $measuredSyncDuration with token $batchToken" } + val result = runOnce(response) + withTransaction { val measuredProcessDuration = measureTime { processSyncResponse(response) @@ -314,7 +317,7 @@ class SyncApiClientImpl( setBatchToken(response.nextBatch) } _currentSyncState.value = RUNNING - return response + return result } private suspend fun processSyncResponse(response: Sync.Response) { -- GitLab From 90a915c14389ba6aea559b143b24ba90779a8ca2 Mon Sep 17 00:00:00 2001 From: Felix Hilgers Date: Thu, 6 Mar 2025 14:15:29 +0000 Subject: [PATCH 3/4] remove suffix and use overloads --- .../trixnity/client/notification/NotificationService.kt | 6 +++--- .../kotlin/net/folivo/trixnity/client/room/RoomService.kt | 6 +++--- .../net/folivo/trixnity/client/mocks/RoomServiceMock.kt | 2 +- .../trixnity/client/notification/NotificationServiceTest.kt | 3 +-- .../trixnity/client/room/RoomServiceTimelineUtilsTest.kt | 2 +- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/notification/NotificationService.kt b/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/notification/NotificationService.kt index 8e9b8bebe..1f2ad1168 100644 --- a/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/notification/NotificationService.kt +++ b/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/notification/NotificationService.kt @@ -41,7 +41,7 @@ interface NotificationService { syncResponseBufferSize: Int = 4 ): Flow - fun getNotificationsOnce( + fun getNotifications( response: Sync.Response, decryptionTimeout: Duration = 5.seconds, ): Flow @@ -61,7 +61,7 @@ class NotificationServiceImpl( private val roomSizePattern = Regex("\\s*(==|<|>|<=|>=)\\s*([0-9]+)") - override fun getNotificationsOnce( + override fun getNotifications( response: Sync.Response, decryptionTimeout: Duration, ) : Flow = evaluateDefaultPushRules( @@ -308,7 +308,7 @@ class NotificationServiceImpl( response: Sync.Response, decryptionTimeout: Duration, ): Flow> = - room.getTimelineEventsOnce(response, decryptionTimeout) + room.getTimelineEvents(response, decryptionTimeout) .map { extractDecryptedEvent(it) } .filterNotNull() .filter { diff --git a/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/room/RoomService.kt b/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/room/RoomService.kt index 3146d62b0..f55b631b4 100644 --- a/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/room/RoomService.kt +++ b/trixnity-client/src/commonMain/kotlin/net/folivo/trixnity/client/room/RoomService.kt @@ -124,7 +124,7 @@ interface RoomService { syncResponseBufferSize: Int = 4, ): Flow - fun getTimelineEventsOnce( + fun getTimelineEvents( response: Sync.Response, decryptionTimeout: Duration = 30.seconds, ): Flow @@ -566,9 +566,9 @@ class RoomServiceImpl( syncResponseBufferSize: Int, ): Flow = api.sync.subscribeAsFlow(Priority.AFTER_DEFAULT).map { it.syncResponse } - .buffer(syncResponseBufferSize).flatMapConcat { getTimelineEventsOnce(it, decryptionTimeout) } + .buffer(syncResponseBufferSize).flatMapConcat { getTimelineEvents(it, decryptionTimeout) } - override fun getTimelineEventsOnce( + override fun getTimelineEvents( response: Sync.Response, decryptionTimeout: Duration, ): Flow { diff --git a/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/mocks/RoomServiceMock.kt b/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/mocks/RoomServiceMock.kt index e5dc24cff..43d4d5bf2 100644 --- a/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/mocks/RoomServiceMock.kt +++ b/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/mocks/RoomServiceMock.kt @@ -91,7 +91,7 @@ class RoomServiceMock : RoomService { } var returnGetTimelineEventsOnce: Flow = flowOf() - override fun getTimelineEventsOnce(response: Sync.Response, decryptionTimeout: Duration): Flow { + override fun getTimelineEvents(response: Sync.Response, decryptionTimeout: Duration): Flow { return returnGetTimelineEventsOnce } diff --git a/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/notification/NotificationServiceTest.kt b/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/notification/NotificationServiceTest.kt index 52484e285..f9c967117 100644 --- a/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/notification/NotificationServiceTest.kt +++ b/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/notification/NotificationServiceTest.kt @@ -245,8 +245,7 @@ private val body: ShouldSpec.() -> Unit = { api.sync.cancel(true) } - - context(NotificationServiceImpl::getNotifications.name) { + context("getNotifications") { should("wait for sync to be started or running") { currentSyncState.value = SyncState.INITIAL_SYNC globalAccountDataStore.save( diff --git a/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/room/RoomServiceTimelineUtilsTest.kt b/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/room/RoomServiceTimelineUtilsTest.kt index 7c6e49b4d..1693f0d55 100644 --- a/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/room/RoomServiceTimelineUtilsTest.kt +++ b/trixnity-client/src/commonTest/kotlin/net/folivo/trixnity/client/room/RoomServiceTimelineUtilsTest.kt @@ -136,7 +136,7 @@ class RoomServiceTimelineUtilsTest : ShouldSpec({ nextEventId = null, gap = TimelineEvent.Gap.GapAfter("3") ) - context(RoomServiceImpl::getTimelineEvents.name) { + context("getTimelineEvents") { context("all requested events in store") { beforeTest { roomTimelineStore.addAll(listOf(timelineEvent1, timelineEvent2, timelineEvent3)) -- GitLab From 9c423db0c4d187721cb9fcfbac81c1c0942e3842 Mon Sep 17 00:00:00 2001 From: Felix Hilgers Date: Thu, 6 Mar 2025 14:21:51 +0000 Subject: [PATCH 4/4] chore: changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f05343772..c748d42f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `RoomMessageEventContent` should have `type` +- Extract notifications from a single sync ### Changed -- GitLab