diff --git a/crates/chat/src/lib.rs b/crates/chat/src/lib.rs index faec2fe..8d6d031 100644 --- a/crates/chat/src/lib.rs +++ b/crates/chat/src/lib.rs @@ -63,11 +63,17 @@ pub struct ChatRegistry { /// Loading status of the registry loading: bool, + /// Tracking the status of unwrapping gift wrap events. + tracking_flag: Arc, + + /// Channel's sender for communication between nostr and gpui + sender: Sender, + /// Handle notifications asynchronous task notifications: Option>>, /// Tasks for asynchronous operations - tasks: SmallVec<[Task<()>; 3]>, + tasks: Vec>, /// Subscriptions _subscriptions: SmallVec<[Subscription; 1]>, @@ -89,68 +95,41 @@ impl ChatRegistry { /// Create a new chat registry instance fn new(cx: &mut Context) -> Self { let nostr = NostrRegistry::global(cx); - let client = nostr.read(cx).client(); + let identity = nostr.read(cx).identity(); let device_signer = nostr.read(cx).device_signer(); // A flag to indicate if the registry is loading - let status = Arc::new(AtomicBool::new(true)); + let tracking_flag = Arc::new(AtomicBool::new(true)); // Channel for communication between nostr and gpui let (tx, rx) = flume::bounded::(2048); - let mut tasks = smallvec![]; + let mut tasks = vec![]; let mut subscriptions = smallvec![]; - let notifications = - Some( - cx.background_spawn({ - let client = client.clone(); - let device_signer = device_signer.read(cx).clone(); - - let loading = Arc::clone(&status); - let tx = tx.clone(); - - async move { - Self::handle_notifications(&client, &device_signer, &loading, &tx).await - } - }), - ); + subscriptions.push( + // Observe the identity + cx.observe(&identity, |this, state, cx| { + if state.read(cx).has_public_key() { + // Handle nostr notifications + this.handle_notifications(cx); + // Track unwrapping progress + this.tracking(cx); + } + }), + ); subscriptions.push( // Observe the device signer state - cx.observe(&device_signer, { - let loading = Arc::clone(&status); - let tx = tx.clone(); - - move |this, state, cx| { - if state.read(cx).is_some() { - this.notifications = Some(cx.background_spawn({ - let nostr = NostrRegistry::global(cx); - let client = nostr.read(cx).client(); - let device_signer = state.read(cx).clone(); - - let loading = Arc::clone(&loading); - let tx = tx.clone(); - - async move { - Self::handle_notifications(&client, &device_signer, &loading, &tx) - .await - } - })) - } + cx.observe(&device_signer, |this, state, cx| { + if state.read(cx).is_some() { + this.handle_notifications(cx); } }), ); tasks.push( - // Handle unwrapping progress - cx.background_spawn( - async move { Self::unwrapping_status(&client, &status, &tx).await }, - ), - ); - - tasks.push( - // Handle new messages + // Update GPUI states cx.spawn(async move |this, cx| { while let Ok(message) = rx.recv_async().await { match message { @@ -181,110 +160,130 @@ impl ChatRegistry { Self { rooms: vec![], loading: true, - notifications, + tracking_flag, + sender: tx.clone(), + notifications: None, tasks, _subscriptions: subscriptions, } } - async fn handle_notifications( - client: &Client, - device_signer: &Option>, - loading: &Arc, - tx: &Sender, - ) -> Result<(), Error> { - let initialized_at = Timestamp::now(); - let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION); + /// Handle nostr notifications + fn handle_notifications(&mut self, cx: &mut Context) { + let nostr = NostrRegistry::global(cx); + let client = nostr.read(cx).client(); + let device_signer = nostr.read(cx).device_signer().read(cx).clone(); - let mut notifications = client.notifications(); - let mut processed_events = HashSet::new(); + let status = self.tracking_flag.clone(); + let tx = self.sender.clone(); - while let Ok(notification) = notifications.recv().await { - let RelayPoolNotification::Message { message, .. } = notification else { - // Skip non-message notifications - continue; - }; + self.tasks.push(cx.background_spawn(async move { + let initialized_at = Timestamp::now(); + let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION); - match message { - RelayMessage::Event { event, .. } => { - if !processed_events.insert(event.id) { - // Skip if the event has already been processed - continue; - } + let mut notifications = client.notifications(); + let mut processed_events = HashSet::new(); - if event.kind != Kind::GiftWrap { - // Skip non-gift wrap events - continue; - } + while let Ok(notification) = notifications.recv().await { + let RelayPoolNotification::Message { message, .. } = notification else { + // Skip non-message notifications + continue; + }; - // Extract the rumor from the gift wrap event - match Self::extract_rumor(client, device_signer, event.as_ref()).await { - Ok(rumor) => match rumor.created_at >= initialized_at { - true => { - // Check if the event is sent by coop - let sent_by_coop = { - let tracker = tracker().read().await; - tracker.is_sent_by_coop(&event.id) - }; - // No need to emit if sent by coop - // the event is already emitted - if !sent_by_coop { - let new_message = NewMessage::new(event.id, rumor); - let signal = NostrEvent::Message(new_message); + match message { + RelayMessage::Event { event, .. } => { + if !processed_events.insert(event.id) { + // Skip if the event has already been processed + continue; + } - tx.send_async(signal).await.ok(); + if event.kind != Kind::GiftWrap { + // Skip non-gift wrap events + continue; + } + + // Extract the rumor from the gift wrap event + match Self::extract_rumor(&client, &device_signer, event.as_ref()).await { + Ok(rumor) => match rumor.created_at >= initialized_at { + true => { + // Check if the event is sent by coop + let sent_by_coop = { + let tracker = tracker().read().await; + tracker.is_sent_by_coop(&event.id) + }; + // No need to emit if sent by coop + // the event is already emitted + if !sent_by_coop { + let new_message = NewMessage::new(event.id, rumor); + let signal = NostrEvent::Message(new_message); + + tx.send_async(signal).await.ok(); + } } + false => { + status.store(true, Ordering::Release); + } + }, + Err(e) => { + log::warn!("Failed to unwrap: {e}"); } - false => { - loading.store(true, Ordering::Release); - } - }, - Err(e) => { - log::warn!("Failed to unwrap: {e}"); + } + } + RelayMessage::EndOfStoredEvents(id) => { + if id.as_ref() == &subscription_id { + tx.send_async(NostrEvent::Eose).await.ok(); + } + } + _ => {} + } + } + })); + } + + /// Tracking the status of unwrapping gift wrap events. + fn tracking(&mut self, cx: &mut Context) { + let nostr = NostrRegistry::global(cx); + let client = nostr.read(cx).client(); + + let status = self.tracking_flag.clone(); + let tx = self.sender.clone(); + + self.notifications = Some(cx.background_spawn(async move { + let loop_duration = Duration::from_secs(12); + + let mut is_start_processing = false; + let mut total_loops = 0; + + loop { + if client.has_signer().await { + total_loops += 1; + + if status.load(Ordering::Acquire) { + is_start_processing = true; + // Reset gift wrap processing flag + _ = status.compare_exchange( + true, + false, + Ordering::Release, + Ordering::Relaxed, + ); + + tx.send_async(NostrEvent::Unwrapping(true)).await.ok(); + } else { + // Only run further if we are already processing + // Wait until after 2 loops to prevent exiting early while events are still being processed + if is_start_processing && total_loops >= 2 { + tx.send_async(NostrEvent::Unwrapping(false)).await.ok(); + + // Reset the counter + is_start_processing = false; + total_loops = 0; } } } - RelayMessage::EndOfStoredEvents(id) => { - if id.as_ref() == &subscription_id { - tx.send_async(NostrEvent::Eose).await.ok(); - } - } - _ => {} + smol::Timer::after(loop_duration).await; } - } - - Ok(()) - } - - async fn unwrapping_status(client: &Client, status: &Arc, tx: &Sender) { - let loop_duration = Duration::from_secs(12); - let mut is_start_processing = false; - let mut total_loops = 0; - - loop { - if client.has_signer().await { - total_loops += 1; - - if status.load(Ordering::Acquire) { - is_start_processing = true; - // Reset gift wrap processing flag - _ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed); - - tx.send_async(NostrEvent::Unwrapping(true)).await.ok(); - } else { - // Only run further if we are already processing - // Wait until after 2 loops to prevent exiting early while events are still being processed - if is_start_processing && total_loops >= 2 { - tx.send_async(NostrEvent::Unwrapping(false)).await.ok(); - - // Reset the counter - is_start_processing = false; - total_loops = 0; - } - } - } - smol::Timer::after(loop_duration).await; - } + })); } /// Get the loading status of the chat registry