feat: implement basic notification (#6)

Reviewed-on: #6
This commit was merged in pull request #6.
This commit is contained in:
2026-05-29 06:56:47 +00:00
parent a2a4433a9d
commit e9eb071208
13 changed files with 370 additions and 254 deletions

View File

@@ -6,13 +6,13 @@ import io.ktor.client.plugins.websocket.WebSockets
import io.ktor.client.request.get
import io.ktor.client.statement.HttpResponse
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import rust.nostr.sdk.AckPolicy
import rust.nostr.sdk.Alphabet
import rust.nostr.sdk.AsyncNostrSigner
@@ -62,9 +62,6 @@ object NostrManager {
}
class Nostr {
private val _isInitialized = MutableStateFlow(false)
val isInitialized: StateFlow<Boolean> = _isInitialized.asStateFlow()
var client: Client? = null
private set
var signer: UniversalSigner = UniversalSigner(Keys.generate())
@@ -76,9 +73,35 @@ class Nostr {
var rumorMap: MutableMap<EventId, EventId> = mutableMapOf()
private set
private val isInitialized = MutableStateFlow(false)
// Add these to the Nostr class
private val _newEvents = MutableSharedFlow<UnsignedEvent>(extraBufferCapacity = 100)
val newEvents = _newEvents.asSharedFlow()
private val _metadataUpdates =
MutableSharedFlow<Pair<PublicKey, Metadata>>(extraBufferCapacity = 100)
val metadataUpdates = _metadataUpdates.asSharedFlow()
private val _contactListUpdates = MutableSharedFlow<List<PublicKey>>(extraBufferCapacity = 100)
val contactListUpdates = _contactListUpdates.asSharedFlow()
private val _subscriptionClosed = MutableSharedFlow<Unit>(extraBufferCapacity = 10)
val subscriptionClosed = _subscriptionClosed.asSharedFlow()
suspend fun emitNewEvent(event: UnsignedEvent) = _newEvents.emit(event)
suspend fun emitSubscriptionClosed() = _subscriptionClosed.emit(Unit)
suspend fun emitMetadataUpdate(pubkey: PublicKey, metadata: Metadata) =
_metadataUpdates.emit(pubkey to metadata)
suspend fun emitContactListUpdate(contacts: List<PublicKey>) =
_contactListUpdates.emit(contacts)
suspend fun init(dbPath: String) {
try {
if (_isInitialized.value) return
if (isInitialized.value) return
// Initialize the logger for nostr client
initLogger(LogLevel.DEBUG)
@@ -108,14 +131,14 @@ class Nostr {
.sleepWhenIdle(SleepWhenIdle.Enabled(idleTimeout))
.build()
_isInitialized.value = true
isInitialized.value = true
} catch (e: Exception) {
throw IllegalStateException("Failed to initialize Nostr client: ${e.message}", e)
}
}
suspend fun waitUntilInitialized() {
_isInitialized.first { it }
isInitialized.first { it }
}
suspend fun connectBootstrapRelays() {
@@ -147,8 +170,6 @@ class Nostr {
suspend fun setSigner(new: AsyncNostrSigner) {
try {
signer.switch(new)
// Fetch metadata for current user
getUserMetadata()
} catch (e: Exception) {
throw IllegalStateException("Failed to set signer: ${e.message}", e)
}
@@ -216,70 +237,15 @@ class Nostr {
}
}
suspend fun handleLiteNotifications(
onNewMessage: (UnsignedEvent) -> Unit,
) {
val now = Timestamp.now()
val processedEvent = mutableSetOf<EventId>()
val notifications = client?.notifications() ?: return
while (true) {
val notification = notifications.next() ?: continue
when (notification) {
is ClientNotification.Message -> {
val relayUrl = notification.relayUrl
when (val message = notification.message.asEnum()) {
is RelayMessageEnum.EventMsg -> {
val event = message.event
val subscriptionId = message.subscriptionId
// Ignore events not from the newest gift wraps subscription
if (subscriptionId != "newest-gift-wraps") continue
// Prevent processing duplicate events
if (processedEvent.contains(event.id())) continue
processedEvent.add(event.id())
if (event.kind().asStd()?.equals(KindStandard.GIFT_WRAP) == true) {
try {
val rumor = extractRumor(event)
// Handle new message
rumor?.createdAt()?.asSecs()?.let {
if (it >= now.asSecs()) {
onNewMessage(rumor)
}
}
} catch (e: Exception) {
println("Failed to extract rumor: $e")
}
}
}
else -> {
/* Ignore other event kinds */
}
}
}
else -> {
/* Ignore other message types */
}
}
}
}
suspend fun handleNotifications(
onMetadataUpdate: (PublicKey, Metadata) -> Unit,
onContactListUpdate: (List<PublicKey>) -> Unit,
onNewMessage: (UnsignedEvent) -> Unit,
onSubscriptionClose: () -> Unit,
) = coroutineScope {
) = supervisorScope {
val now = Timestamp.now()
val processedEvent = mutableSetOf<EventId>()
val notifications = client?.notifications() ?: return@coroutineScope
val notifications = client?.notifications() ?: return@supervisorScope
var eoseTrackerJob: Job? = null
@@ -293,7 +259,6 @@ class Nostr {
when (val message = notification.message.asEnum()) {
is RelayMessageEnum.EventMsg -> {
val event = message.event
val id = message.subscriptionId
// Prevent processing duplicate events
if (processedEvent.contains(event.id())) continue

View File

@@ -39,8 +39,8 @@ class NostrViewModel(
private val nostr: Nostr,
private val secretStore: SecretStorage
) : ViewModel() {
private val _emptySecret = MutableStateFlow<Boolean?>(null)
val emptySecret = _emptySecret.asStateFlow()
private val _signerRequired = MutableStateFlow<Boolean?>(null)
val signerRequired = _signerRequired.asStateFlow()
private val _isCreating = MutableStateFlow(false)
val isCreating = _isCreating.asStateFlow()
@@ -71,11 +71,20 @@ class NostrViewModel(
private val seenPublicKeys = mutableSetOf<PublicKey>()
init {
startNotificationHandler()
startMetadataBatchHandler()
getCacheMetadata()
// Check local stored secret (secret key or bunker)
login()
// Observe the signer state and verify the relay list
observeSignerAndCheckRelays()
// Get all local stored metadata
getCacheMetadata()
// Observe new events from the Nostr client
runObserver()
// Wait and merge metadata requests into a single batch
runMetadataBatching()
}
override fun onCleared() {
@@ -95,35 +104,53 @@ class NostrViewModel(
}
}
private fun startNotificationHandler() {
private fun runObserver() {
viewModelScope.launch {
// Wait until the client is ready
nostr.waitUntilInitialized()
// Observe new messages
launch {
nostr.newEvents.collect { event ->
val roomId = event.roomId()
val existingRoom = _chatRooms.value.firstOrNull { it.id == roomId }
nostr.handleNotifications(
onMetadataUpdate = { pubkey, metadata ->
if (existingRoom == null) {
val currentUser = nostr.signer.currentUser
if (currentUser != null) {
val newRoom = Room.new(event, currentUser)
_chatRooms.update { (it + newRoom).sortedDescending().toSet() }
}
} else {
updateRoomList(roomId, event)
}
_newEvents.emit(event)
}
}
// Observe metadata updates
launch {
nostr.metadataUpdates.collect { (pubkey, metadata) ->
updateMetadata(pubkey, metadata)
},
onContactListUpdate = { contactList ->
_contactList.value = contactList.toSet()
},
onSubscriptionClose = {
getChatRooms()
}
}
if (!_isPartialProcessedGiftWrap.value) {
_isPartialProcessedGiftWrap.value = true
}
},
onNewMessage = { event ->
viewModelScope.launch {
_newEvents.emit(event)
}
},
)
// Observe contact list updates
launch {
nostr.contactListUpdates.collect { contacts ->
_contactList.value = contacts.toSet()
}
}
// Observes subscription close
launch {
nostr.subscriptionClosed.collect {
getChatRooms()
_isPartialProcessedGiftWrap.value = true
}
}
}
}
private fun startMetadataBatchHandler() {
private fun runMetadataBatching() {
viewModelScope.launch {
// Wait until the client is ready
nostr.waitUntilInitialized()
@@ -164,7 +191,9 @@ class NostrViewModel(
val results = nostr.getAllCacheMetadata()
results.forEach { (pubkey, metadata) ->
// Update the metadata state
updateMetadata(pubkey, metadata)
// Update seenPublicKeys to avoid duplicate requests
seenPublicKeys.add(pubkey)
}
}
@@ -172,22 +201,18 @@ class NostrViewModel(
private fun login() {
viewModelScope.launch {
// Wait until the client is ready
nostr.waitUntilInitialized()
// Get user's signer secret
val secret = secretStore.get("user_signer")
// If no secret is found, show onboarding screen
when (secret) {
null -> {
_emptySecret.value = true
return@launch
}
else -> _emptySecret.value = false
if (secret == null) {
_signerRequired.value = true
return@launch
}
// Update the empty secret state
_signerRequired.value = false
// Handle different signer types
if (secret.startsWith("nsec1")) {
val keys = Keys.parse(secret)
@@ -197,8 +222,7 @@ class NostrViewModel(
val appKeys = getOrInitAppKeys()
val bunker = NostrConnectUri.parse(secret)
val timeout = Duration.parse("50s") // 50 seconds timeout
val remote =
NostrConnect(uri = bunker, appKeys = appKeys, timeout = timeout, null)
val remote = NostrConnect(uri = bunker, appKeys, timeout, opts = null)
nostr.setSigner(remote)
} catch (e: Exception) {
showError("Error: ${e.message}")
@@ -215,15 +239,29 @@ class NostrViewModel(
val pubkey = nostr.signer.currentUser
if (pubkey != null) {
// Get chat rooms
val rooms = nostr.getChatRooms() ?: emptySet()
if (rooms.isNotEmpty()) {
_chatRooms.value = rooms
_isPartialProcessedGiftWrap.value = true
}
// Get all metadata for the current user
nostr.getUserMetadata()
// Small delay to ensure all relays are connected
delay(3000)
// Check if the relay list is empty
val relays = nostr.getMsgRelays(pubkey)
if (relays.isEmpty()) {
_isRelayListEmpty.value = true
}
break
}
delay(1000)
delay(500)
}
}
}
@@ -256,7 +294,7 @@ class NostrViewModel(
viewModelScope.launch {
secretStore.clear("user_signer")
nostr.signer.switch(Keys.generate())
_emptySecret.value = true
_signerRequired.value = true
}
}
@@ -325,7 +363,7 @@ class NostrViewModel(
secretStore.set("user_signer", secret)
// Set an empty secret state
_emptySecret.value = false
_signerRequired.value = false
} catch (e: Exception) {
showError("Error: ${e.message}")
}
@@ -358,18 +396,16 @@ class NostrViewModel(
nostr.setSigner(keys)
secretStore.set("user_signer", secret)
// Set an empty secret state
_emptySecret.value = false
_signerRequired.value = false
} else if (secret.startsWith("bunker://")) {
try {
val appKeys = getOrInitAppKeys()
val bunker = NostrConnectUri.parse(secret)
val timeout = Duration.parse("50s") // 50 seconds timeout
val remote =
NostrConnect(uri = bunker, appKeys = appKeys, timeout = timeout, null)
val remote = NostrConnect(uri = bunker, appKeys, timeout, null)
nostr.setSigner(remote)
secretStore.set("user_signer", secret)
// Set an empty secret state
_emptySecret.value = false
_signerRequired.value = false
} catch (e: Exception) {
showError("Error: ${e.message}")
}
@@ -411,11 +447,13 @@ class NostrViewModel(
if (nostr.signer.currentUser == null) throw IllegalStateException("User not signed in")
if (to.isEmpty()) throw IllegalArgumentException("At least one recipient is required")
val currentUser = nostr.signer.currentUser!!
// Construct the rumor event
val rumor = EventBuilder
.privateMsgRumor(to.first(), "")
.tags(to.map { Tag.publicKey(it) })
.build(nostr.signer.currentUser!!)
.build(currentUser)
// Check if the room already exists
val id = rumor.roomId()
@@ -427,7 +465,7 @@ class NostrViewModel(
}
// Create a room from the rumor event
val room = Room.new(rumor, nostr.signer.currentUser!!)
val room = Room.new(rumor, currentUser)
// Update the chat rooms state
_chatRooms.update { currentRooms ->
@@ -522,13 +560,18 @@ class NostrViewModel(
}
private fun updateRoomList(roomId: Long, newMessage: UnsignedEvent) {
_chatRooms.value = _chatRooms.value.map { room ->
if (room.id == roomId) {
room.copy(lastMessage = newMessage.content(), createdAt = newMessage.createdAt())
} else {
room
}
}.toSet()
_chatRooms.update { currentRooms ->
currentRooms.map { room ->
if (room.id == roomId) {
room.copy(
lastMessage = newMessage.content(),
createdAt = newMessage.createdAt()
)
} else {
room
}
}.sortedDescending().toSet()
}
}
suspend fun searchByAddress(query: String): PublicKey? {

View File

@@ -40,10 +40,10 @@ data class Room(
val subject = rumor.tags().find(TagKind.Subject)?.content()
// Collect the author's public key and all public keys from tags
// Also remove the user's public key from the list, current user is always a member
val pubkeys: MutableSet<PublicKey> = mutableSetOf()
pubkeys.add(rumor.author())
pubkeys.addAll(rumor.tags().publicKeys())
// Also remove the user's public key from the list, current user is always a member
pubkeys.remove(userPubkey)
// Create a new Room instance