This commit is contained in:
2026-01-09 16:43:02 +07:00
parent c7c10941be
commit 362b43e265

View File

@@ -63,11 +63,17 @@ pub struct ChatRegistry {
/// Loading status of the registry /// Loading status of the registry
loading: bool, loading: bool,
/// Tracking the status of unwrapping gift wrap events.
tracking_flag: Arc<AtomicBool>,
/// Channel's sender for communication between nostr and gpui
sender: Sender<NostrEvent>,
/// Handle notifications asynchronous task /// Handle notifications asynchronous task
notifications: Option<Task<Result<(), Error>>>, notifications: Option<Task<Result<(), Error>>>,
/// Tasks for asynchronous operations /// Tasks for asynchronous operations
tasks: SmallVec<[Task<()>; 3]>, tasks: Vec<Task<()>>,
/// Subscriptions /// Subscriptions
_subscriptions: SmallVec<[Subscription; 1]>, _subscriptions: SmallVec<[Subscription; 1]>,
@@ -89,68 +95,41 @@ impl ChatRegistry {
/// Create a new chat registry instance /// Create a new chat registry instance
fn new(cx: &mut Context<Self>) -> Self { fn new(cx: &mut Context<Self>) -> Self {
let nostr = NostrRegistry::global(cx); let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client(); let identity = nostr.read(cx).identity();
let device_signer = nostr.read(cx).device_signer(); let device_signer = nostr.read(cx).device_signer();
// A flag to indicate if the registry is loading // A flag to indicate if the registry is loading
let status = Arc::new(AtomicBool::new(true)); let tracking_flag = Arc::new(AtomicBool::new(true));
// Channel for communication between nostr and gpui // Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<NostrEvent>(2048); let (tx, rx) = flume::bounded::<NostrEvent>(2048);
let mut tasks = smallvec![]; let mut tasks = vec![];
let mut subscriptions = smallvec![]; let mut subscriptions = smallvec![];
let notifications = subscriptions.push(
Some( // Observe the identity
cx.background_spawn({ cx.observe(&identity, |this, state, cx| {
let client = client.clone(); if state.read(cx).has_public_key() {
let device_signer = device_signer.read(cx).clone(); // Handle nostr notifications
this.handle_notifications(cx);
let loading = Arc::clone(&status); // Track unwrapping progress
let tx = tx.clone(); this.tracking(cx);
}
async move { }),
Self::handle_notifications(&client, &device_signer, &loading, &tx).await );
}
}),
);
subscriptions.push( subscriptions.push(
// Observe the device signer state // Observe the device signer state
cx.observe(&device_signer, { cx.observe(&device_signer, |this, state, cx| {
let loading = Arc::clone(&status); if state.read(cx).is_some() {
let tx = tx.clone(); this.handle_notifications(cx);
move |this, state, cx| {
if state.read(cx).is_some() {
this.notifications = Some(cx.background_spawn({
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let device_signer = state.read(cx).clone();
let loading = Arc::clone(&loading);
let tx = tx.clone();
async move {
Self::handle_notifications(&client, &device_signer, &loading, &tx)
.await
}
}))
}
} }
}), }),
); );
tasks.push( tasks.push(
// Handle unwrapping progress // Update GPUI states
cx.background_spawn(
async move { Self::unwrapping_status(&client, &status, &tx).await },
),
);
tasks.push(
// Handle new messages
cx.spawn(async move |this, cx| { cx.spawn(async move |this, cx| {
while let Ok(message) = rx.recv_async().await { while let Ok(message) = rx.recv_async().await {
match message { match message {
@@ -181,110 +160,130 @@ impl ChatRegistry {
Self { Self {
rooms: vec![], rooms: vec![],
loading: true, loading: true,
notifications, tracking_flag,
sender: tx.clone(),
notifications: None,
tasks, tasks,
_subscriptions: subscriptions, _subscriptions: subscriptions,
} }
} }
async fn handle_notifications( /// Handle nostr notifications
client: &Client, fn handle_notifications(&mut self, cx: &mut Context<Self>) {
device_signer: &Option<Arc<dyn NostrSigner>>, let nostr = NostrRegistry::global(cx);
loading: &Arc<AtomicBool>, let client = nostr.read(cx).client();
tx: &Sender<NostrEvent>, let device_signer = nostr.read(cx).device_signer().read(cx).clone();
) -> Result<(), Error> {
let initialized_at = Timestamp::now();
let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
let mut notifications = client.notifications(); let status = self.tracking_flag.clone();
let mut processed_events = HashSet::new(); let tx = self.sender.clone();
while let Ok(notification) = notifications.recv().await { self.tasks.push(cx.background_spawn(async move {
let RelayPoolNotification::Message { message, .. } = notification else { let initialized_at = Timestamp::now();
// Skip non-message notifications let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
continue;
};
match message { let mut notifications = client.notifications();
RelayMessage::Event { event, .. } => { let mut processed_events = HashSet::new();
if !processed_events.insert(event.id) {
// Skip if the event has already been processed
continue;
}
if event.kind != Kind::GiftWrap { while let Ok(notification) = notifications.recv().await {
// Skip non-gift wrap events let RelayPoolNotification::Message { message, .. } = notification else {
continue; // Skip non-message notifications
} continue;
};
// Extract the rumor from the gift wrap event match message {
match Self::extract_rumor(client, device_signer, event.as_ref()).await { RelayMessage::Event { event, .. } => {
Ok(rumor) => match rumor.created_at >= initialized_at { if !processed_events.insert(event.id) {
true => { // Skip if the event has already been processed
// Check if the event is sent by coop continue;
let sent_by_coop = { }
let tracker = tracker().read().await;
tracker.is_sent_by_coop(&event.id)
};
// No need to emit if sent by coop
// the event is already emitted
if !sent_by_coop {
let new_message = NewMessage::new(event.id, rumor);
let signal = NostrEvent::Message(new_message);
tx.send_async(signal).await.ok(); if event.kind != Kind::GiftWrap {
// Skip non-gift wrap events
continue;
}
// Extract the rumor from the gift wrap event
match Self::extract_rumor(&client, &device_signer, event.as_ref()).await {
Ok(rumor) => match rumor.created_at >= initialized_at {
true => {
// Check if the event is sent by coop
let sent_by_coop = {
let tracker = tracker().read().await;
tracker.is_sent_by_coop(&event.id)
};
// No need to emit if sent by coop
// the event is already emitted
if !sent_by_coop {
let new_message = NewMessage::new(event.id, rumor);
let signal = NostrEvent::Message(new_message);
tx.send_async(signal).await.ok();
}
} }
false => {
status.store(true, Ordering::Release);
}
},
Err(e) => {
log::warn!("Failed to unwrap: {e}");
} }
false => { }
loading.store(true, Ordering::Release); }
} RelayMessage::EndOfStoredEvents(id) => {
}, if id.as_ref() == &subscription_id {
Err(e) => { tx.send_async(NostrEvent::Eose).await.ok();
log::warn!("Failed to unwrap: {e}"); }
}
_ => {}
}
}
}));
}
/// Tracking the status of unwrapping gift wrap events.
fn tracking(&mut self, cx: &mut Context<Self>) {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let status = self.tracking_flag.clone();
let tx = self.sender.clone();
self.notifications = Some(cx.background_spawn(async move {
let loop_duration = Duration::from_secs(12);
let mut is_start_processing = false;
let mut total_loops = 0;
loop {
if client.has_signer().await {
total_loops += 1;
if status.load(Ordering::Acquire) {
is_start_processing = true;
// Reset gift wrap processing flag
_ = status.compare_exchange(
true,
false,
Ordering::Release,
Ordering::Relaxed,
);
tx.send_async(NostrEvent::Unwrapping(true)).await.ok();
} else {
// Only run further if we are already processing
// Wait until after 2 loops to prevent exiting early while events are still being processed
if is_start_processing && total_loops >= 2 {
tx.send_async(NostrEvent::Unwrapping(false)).await.ok();
// Reset the counter
is_start_processing = false;
total_loops = 0;
} }
} }
} }
RelayMessage::EndOfStoredEvents(id) => { smol::Timer::after(loop_duration).await;
if id.as_ref() == &subscription_id {
tx.send_async(NostrEvent::Eose).await.ok();
}
}
_ => {}
} }
} }));
Ok(())
}
async fn unwrapping_status(client: &Client, status: &Arc<AtomicBool>, tx: &Sender<NostrEvent>) {
let loop_duration = Duration::from_secs(12);
let mut is_start_processing = false;
let mut total_loops = 0;
loop {
if client.has_signer().await {
total_loops += 1;
if status.load(Ordering::Acquire) {
is_start_processing = true;
// Reset gift wrap processing flag
_ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed);
tx.send_async(NostrEvent::Unwrapping(true)).await.ok();
} else {
// Only run further if we are already processing
// Wait until after 2 loops to prevent exiting early while events are still being processed
if is_start_processing && total_loops >= 2 {
tx.send_async(NostrEvent::Unwrapping(false)).await.ok();
// Reset the counter
is_start_processing = false;
total_loops = 0;
}
}
}
smol::Timer::after(loop_duration).await;
}
} }
/// Get the loading status of the chat registry /// Get the loading status of the chat registry