Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7c02bda
MessageSender to become dependency injected
SessionHero01 Nov 3, 2025
ebe8ab4
Message receiver to be dependency injected
SessionHero01 Nov 3, 2025
12e0406
Changes to use kotlinx deserialization for some snode API calls
SessionHero01 Nov 3, 2025
ba0ea5e
Integrating WIP
SessionHero01 Nov 3, 2025
aa7cd90
Reverted changes to batch receiver
SessionHero01 Nov 5, 2025
7cc59c5
Deprecate classes
SessionHero01 Nov 5, 2025
abbea74
Deprecate classes
SessionHero01 Nov 5, 2025
30d38c4
Added new ReceivedMessageHashDatabase to store hashes
SessionHero01 Nov 5, 2025
22d54f4
Message timestamp is optional
SessionHero01 Nov 5, 2025
c1de259
Timestamp property on message
SessionHero01 Nov 5, 2025
f2c3d73
Message receiving WIP
SessionHero01 Nov 5, 2025
2ce99c2
Create new path for message processing
SessionHero01 Nov 6, 2025
15e1418
Merge remote-tracking branch 'origin/dev' into integrate-session-pro
SessionHero01 Nov 6, 2025
fbf08a8
Poll handling
SessionHero01 Nov 6, 2025
6718492
Optimize message processing
SessionHero01 Nov 6, 2025
82b89c5
Community message handling
SessionHero01 Nov 7, 2025
ba39390
Updated community processor
SessionHero01 Nov 7, 2025
124395c
Added comment
SessionHero01 Nov 7, 2025
3fdfc20
PR feedback
SessionHero01 Nov 10, 2025
0f9a632
Merge remote-tracking branch 'origin/dev' into integrate-session-pro
SessionHero01 Nov 10, 2025
841f783
Remove duplication check if push notification doesn't include hash
SessionHero01 Nov 10, 2025
7bc8523
Merge remote-tracking branch 'origin/dev' into integrate-session-pro
SessionHero01 Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package org.session.libsession.messaging.jobs

import android.content.Context
import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject
import dagger.hilt.android.qualifiers.ApplicationContext
import okhttp3.HttpUrl.Companion.toHttpUrl
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import org.session.libsession.database.MessageDataProvider
Expand All @@ -16,7 +14,6 @@ import org.session.libsession.messaging.sending_receiving.attachments.Attachment
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.DecodedAudio
import org.session.libsession.utilities.InputStreamMediaDataSource
Expand Down Expand Up @@ -252,21 +249,18 @@ class AttachmentDownloadJob @AssistedInject constructor(
return KEY
}

class DeserializeFactory(private val factory: Factory) : Job.DeserializeFactory<AttachmentDownloadJob> {
@AssistedFactory
abstract class Factory : Job.DeserializeFactory<AttachmentDownloadJob> {
abstract fun create(
@Assisted("attachmentID") attachmentID: Long,
mmsMessageId: Long
): AttachmentDownloadJob

override fun create(data: Data): AttachmentDownloadJob {
return factory.create(
return create(
attachmentID = data.getLong(ATTACHMENT_ID_KEY),
mmsMessageId = data.getLong(TS_INCOMING_MESSAGE_ID_KEY)
)
}
}

@AssistedFactory
interface Factory {
fun create(
@Assisted("attachmentID") attachmentID: Long,
mmsMessageId: Long
): AttachmentDownloadJob
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.DecodedAudio
import org.session.libsession.utilities.InputStreamMediaDataSource
Expand All @@ -38,6 +37,7 @@ class AttachmentUploadJob @AssistedInject constructor(
private val attachmentProcessor: AttachmentProcessor,
private val preferences: TextSecurePreferences,
private val fileServerApi: FileServerApi,
private val messageSender: MessageSender,
) : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
Expand Down Expand Up @@ -219,7 +219,7 @@ class AttachmentUploadJob @AssistedInject constructor(

private fun failAssociatedMessageSendJob(e: Exception) {
val messageSendJob = storage.getMessageSendJob(messageSendJobID)
MessageSender.handleFailedMessageSend(this.message, e)
messageSender.handleFailedMessageSend(this.message, e)
if (messageSendJob != null) {
storage.markJobAsFailedPermanently(messageSendJobID)
}
Expand All @@ -244,7 +244,14 @@ class AttachmentUploadJob @AssistedInject constructor(
return KEY
}

class DeserializeFactory(private val factory: Factory): Job.DeserializeFactory<AttachmentUploadJob> {
@AssistedFactory
abstract class Factory : Job.DeserializeFactory<AttachmentUploadJob> {
abstract fun create(
attachmentID: Long,
@Assisted("threadID") threadID: String,
message: Message,
messageSendJobID: String
): AttachmentUploadJob

override fun create(data: Data): AttachmentUploadJob? {
val serializedMessage = data.getByteArray(MESSAGE_KEY)
Expand All @@ -259,22 +266,12 @@ class AttachmentUploadJob @AssistedInject constructor(
return null
}
input.close()
return factory.create(
return create(
attachmentID = data.getLong(ATTACHMENT_ID_KEY),
threadID = data.getString(THREAD_ID_KEY)!!,
message = message,
messageSendJobID = data.getString(MESSAGE_SEND_JOB_ID_KEY)!!
)
}
}

@AssistedFactory
interface Factory {
fun create(
attachmentID: Long,
@Assisted("threadID") threadID: String,
message: Message,
messageSendJobID: String
): AttachmentUploadJob
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ data class MessageReceiveParameters(
val closedGroup: Destination.ClosedGroup? = null
)

@Deprecated("BatchMessageReceiveJob is now only here so that existing persisted jobs can be processed.")
class BatchMessageReceiveJob @AssistedInject constructor(
@Assisted private val messages: List<MessageReceiveParameters>,
@Assisted val fromCommunity: Address.Community?, // The community the messages are received in, if any
Expand All @@ -62,6 +63,7 @@ class BatchMessageReceiveJob @AssistedInject constructor(
private val messageNotifier: MessageNotifier,
private val threadDatabase: ThreadDatabase,
private val recipientRepository: RecipientRepository,
private val messageReceiver: MessageReceiver,
) : Job {

override var delegate: JobDelegate? = null
Expand Down Expand Up @@ -105,6 +107,7 @@ class BatchMessageReceiveJob @AssistedInject constructor(
fromCommunity = fromCommunity,
threadDatabase = threadDatabase,
recipientRepository = recipientRepository,
messageReceiver = messageReceiver,
)
}

Expand Down Expand Up @@ -157,7 +160,7 @@ class BatchMessageReceiveJob @AssistedInject constructor(
messages.forEach { messageParameters ->
val (data, serverHash, openGroupMessageServerID) = messageParameters
try {
val (message, proto) = MessageReceiver.parse(
val (message, proto) = messageReceiver.parse(
data,
openGroupMessageServerID,
openGroupPublicKey = serverPublicKey,
Expand Down Expand Up @@ -358,7 +361,8 @@ class BatchMessageReceiveJob @AssistedInject constructor(

@AssistedFactory
abstract class Factory : Job.DeserializeFactory<BatchMessageReceiveJob> {
abstract fun create(
@Deprecated("New code should try to handle message directly instead of creating this job")
protected abstract fun create(
messages: List<MessageReceiveParameters>,
fromCommunity: Address.Community?,
): BatchMessageReceiveJob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package org.session.libsession.messaging.jobs

import android.widget.Toast
import com.google.protobuf.ByteString
import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
Expand All @@ -16,13 +19,19 @@ import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.messaging.utilities.MessageAuthentication.buildGroupInviteSignature
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.getGroup
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateInviteMessage
import org.session.libsignal.protos.SignalServiceProtos.DataMessage.GroupUpdateMessage
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Log

class InviteContactsJob(val groupSessionId: String, val memberSessionIds: Array<String>) : Job {
class InviteContactsJob @AssistedInject constructor(
@Assisted val groupSessionId: String,
@Assisted val memberSessionIds: Array<String>,
private val configFactory: ConfigFactoryProtocol,
private val messageSender: MessageSender,
) : Job {

companion object {
const val KEY = "InviteContactJob"
Expand All @@ -37,8 +46,7 @@ class InviteContactsJob(val groupSessionId: String, val memberSessionIds: Array<
override val maxFailureCount: Int = 1

override suspend fun execute(dispatcherName: String) {
val configs = MessagingModuleConfiguration.shared.configFactory
val group = requireNotNull(configs.getGroup(AccountId(groupSessionId))) {
val group = requireNotNull(configFactory.getGroup(AccountId(groupSessionId))) {
"Group must exist to invite"
}

Expand All @@ -54,7 +62,7 @@ class InviteContactsJob(val groupSessionId: String, val memberSessionIds: Array<
runCatching {
// Make the request for this member
val memberId = AccountId(memberSessionId)
val (groupName, subAccount) = configs.withMutableGroupConfigs(sessionId) { configs ->
val (groupName, subAccount) = configFactory.withMutableGroupConfigs(sessionId) { configs ->
configs.groupInfo.getName() to configs.groupKeys.makeSubAccount(memberSessionId)
}

Expand All @@ -76,14 +84,14 @@ class InviteContactsJob(val groupSessionId: String, val memberSessionIds: Array<
sentTimestamp = timestamp
}

MessageSender.sendNonDurably(update, Destination.Contact(memberSessionId), false)
messageSender.sendNonDurably(update, Destination.Contact(memberSessionId), false)
}
}
}

val results = memberSessionIds.zip(requests.awaitAll())

configs.withMutableGroupConfigs(sessionId) { configs ->
configFactory.withMutableGroupConfigs(sessionId) { configs ->
results.forEach { (memberSessionId, result) ->
configs.groupMembers.get(memberSessionId)?.let { member ->
if (result.isFailure) {
Expand All @@ -96,8 +104,8 @@ class InviteContactsJob(val groupSessionId: String, val memberSessionIds: Array<
}
}

val groupName = configs.withGroupConfigs(sessionId) { it.groupInfo.getName() }
?: configs.getGroup(sessionId)?.name
val groupName = configFactory.withGroupConfigs(sessionId) { it.groupInfo.getName() }
?: configFactory.getGroup(sessionId)?.name

// Gather all the exceptions, while keeping track of the invitee account IDs
val failures = results.mapNotNull { (id, result) ->
Expand Down Expand Up @@ -140,4 +148,20 @@ class InviteContactsJob(val groupSessionId: String, val memberSessionIds: Array<

override fun getFactoryKey(): String = KEY

@AssistedFactory
abstract class Factory : Job.DeserializeFactory<InviteContactsJob> {
abstract fun create(
groupSessionId: String,
memberSessionIds: Array<String>,
): InviteContactsJob

override fun create(data: Data): InviteContactsJob? {
val groupSessionId = data.getString(GROUP) ?: return null
val memberSessionIds = data.getStringArray(MEMBER) ?: return null
return create(
groupSessionId = groupSessionId,
memberSessionIds = memberSessionIds,
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class MessageSendJob @AssistedInject constructor(
private val messageDataProvider: MessageDataProvider,
private val storage: StorageProtocol,
private val configFactory: ConfigFactoryProtocol,
private val messageSender: MessageSender,
) : Job {

object AwaitingAttachmentUploadException : Exception("Awaiting attachment upload.")
Expand Down Expand Up @@ -97,7 +98,7 @@ class MessageSendJob @AssistedInject constructor(
}
}

MessageSender.sendNonDurably(this@MessageSendJob.message, destination, isSync)
messageSender.sendNonDurably(this@MessageSendJob.message, destination, isSync)

this.handleSuccess(dispatcherName)
statusCallback?.trySend(Result.success(Unit))
Expand Down Expand Up @@ -173,7 +174,14 @@ class MessageSendJob @AssistedInject constructor(
return KEY
}

class DeserializeFactory(private val factory: Factory) : Job.DeserializeFactory<MessageSendJob> {

@AssistedFactory
abstract class Factory : Job.DeserializeFactory<MessageSendJob> {
abstract fun create(
message: Message,
destination: Destination,
statusCallback: SendChannel<Result<Unit>>? = null
): MessageSendJob

override fun create(data: Data): MessageSendJob? {
val serializedMessage = data.getByteArray(MESSAGE_KEY)
Expand Down Expand Up @@ -201,20 +209,11 @@ class MessageSendJob @AssistedInject constructor(
}
destinationInput.close()
// Return
return factory.create(
return create(
message = message,
destination = destination,
statusCallback = null
)
}
}

@AssistedFactory
interface Factory {
fun create(
message: Message,
destination: Destination,
statusCallback: SendChannel<Result<Unit>>? = null
): MessageSendJob
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ class SessionJobManagerFactories @Inject constructor(
private val batchFactory: BatchMessageReceiveJob.Factory,
private val trimThreadFactory: TrimThreadJob.Factory,
private val messageSendJobFactory: MessageSendJob.Factory,
private val deleteJobFactory: OpenGroupDeleteJob.Factory
private val deleteJobFactory: OpenGroupDeleteJob.Factory,
private val inviteContactsJobFactory: InviteContactsJob.Factory,
) {

fun getSessionJobFactories(): Map<String, Job.DeserializeFactory<out Job>> {
return mapOf(
AttachmentDownloadJob.KEY to AttachmentDownloadJob.DeserializeFactory(attachmentDownloadJobFactory),
AttachmentUploadJob.KEY to AttachmentUploadJob.DeserializeFactory(attachmentUploadJobFactory),
MessageSendJob.KEY to MessageSendJob.DeserializeFactory(messageSendJobFactory),
AttachmentDownloadJob.KEY to attachmentDownloadJobFactory,
AttachmentUploadJob.KEY to attachmentUploadJobFactory,
MessageSendJob.KEY to messageSendJobFactory,
NotifyPNServerJob.KEY to NotifyPNServerJob.DeserializeFactory(),
TrimThreadJob.KEY to trimThreadFactory,
BatchMessageReceiveJob.KEY to batchFactory,
OpenGroupDeleteJob.KEY to deleteJobFactory,
InviteContactsJob.KEY to inviteContactsJobFactory,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@ sealed class Destination {
data class Contact(var publicKey: String) : Destination() {
internal constructor(): this("")
}
data class LegacyClosedGroup(var groupPublicKey: String) : Destination() {
internal constructor(): this("")
}
data class LegacyOpenGroup(var roomToken: String, var server: String) : Destination() {
internal constructor(): this("", "")
}
data class ClosedGroup(var publicKey: String): Destination() {
internal constructor(): this("")
}
Expand All @@ -39,9 +33,6 @@ sealed class Destination {
is Address.Standard -> {
Contact(address.address)
}
is Address.LegacyGroup -> {
LegacyClosedGroup(address.groupPublicKeyHex)
}
is Address.Community -> {
OpenGroup(roomToken = address.room, server = address.serverUrl, fileIds = fileIds)
}
Expand All @@ -63,9 +54,10 @@ sealed class Destination {
is Address.Group -> {
ClosedGroup(address.accountId.hexString)
}
else -> {
throw Exception("TODO: Handle legacy closed groups.")
}

is Address.Blinded,
is Address.LegacyGroup,
is Address.Unknown -> error("Unsupported address as destination: $address")
}
}
}
Expand Down
Loading