From d25080f5e73591430b92eba7db8240aaeb244a0f Mon Sep 17 00:00:00 2001 From: reya Date: Mon, 16 Feb 2026 16:53:06 +0700 Subject: [PATCH] wip --- crates/chat/src/lib.rs | 187 +++++++++++------------------ crates/chat/src/room.rs | 211 +++++++++++++++++++++------------ crates/chat_ui/src/lib.rs | 65 +++++++++- crates/coop/src/main.rs | 6 +- crates/coop/src/sidebar/mod.rs | 13 +- crates/state/src/constants.rs | 2 +- crates/state/src/lib.rs | 21 ++++ 7 files changed, 290 insertions(+), 215 deletions(-) diff --git a/crates/chat/src/lib.rs b/crates/chat/src/lib.rs index 42348b3..49f0da7 100644 --- a/crates/chat/src/lib.rs +++ b/crates/chat/src/lib.rs @@ -7,8 +7,6 @@ use std::time::Duration; use anyhow::{anyhow, Context as AnyhowContext, Error}; use common::EventUtils; -use device::DeviceRegistry; -use flume::Sender; use fuzzy_matcher::skim::SkimMatcherV2; use fuzzy_matcher::FuzzyMatcher; use gpui::{ @@ -45,11 +43,9 @@ pub enum ChatEvent { /// Channel signal. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -enum NostrEvent { +enum Signal { /// Message received from relay pool Message(NewMessage), - /// Unwrapping status - Unwrapping(bool), /// Eose received from relay pool Eose, } @@ -60,23 +56,14 @@ pub struct ChatRegistry { /// Collection of all chat rooms rooms: Vec>, - /// Loading status of the registry - loading: bool, - - /// Channel's sender for communication between nostr and gpui - sender: Sender, - /// Tracking the status of unwrapping gift wrap events. tracking_flag: Arc, /// Handle tracking asynchronous task - tracking: Option>>, + tracking_task: Option>>, - /// Handle notifications asynchronous task - notifications: Option>, - - /// Tasks for asynchronous operations - tasks: Vec>, + /// Handle notification asynchronous task + notification_task: Option>, /// Subscriptions _subscriptions: SmallVec<[Subscription; 1]>, @@ -97,79 +84,30 @@ impl ChatRegistry { /// Create a new chat registry instance fn new(cx: &mut Context) -> Self { - let device = DeviceRegistry::global(cx); let nostr = NostrRegistry::global(cx); - let nip17_state = nostr.read(cx).nip17_state(); + let nip17 = nostr.read(cx).nip17_state(); - // A flag to indicate if the registry is loading - let tracking_flag = Arc::new(AtomicBool::new(false)); - - // Channel for communication between nostr and gpui - let (tx, rx) = flume::bounded::(2048); - - let mut tasks = vec![]; let mut subscriptions = smallvec![]; subscriptions.push( // Observe the identity - cx.observe(&nip17_state, |this, state, cx| { + cx.observe(&nip17, |this, state, cx| { if state.read(cx) == &RelayState::Configured { // Handle nostr notifications this.handle_notifications(cx); // Track unwrapping progress this.tracking(cx); } - // Get chat rooms from the database on every identity change + // Get chat rooms from the database on every state changes this.get_rooms(cx); }), ); - subscriptions.push( - // Observe the device signer state - cx.observe(&device, |this, state, cx| { - if state.read(cx).state().set() { - this.handle_notifications(cx); - } - }), - ); - - tasks.push( - // Update GPUI states - cx.spawn(async move |this, cx| { - while let Ok(message) = rx.recv_async().await { - match message { - NostrEvent::Message(message) => { - this.update(cx, |this, cx| { - this.new_message(message, cx); - }) - .ok(); - } - NostrEvent::Unwrapping(status) => { - this.update(cx, |this, cx| { - this.set_loading(status, cx); - this.get_rooms(cx); - }) - .ok(); - } - NostrEvent::Eose => { - this.update(cx, |this, cx| { - this.get_rooms(cx); - }) - .ok(); - } - }; - } - }), - ); - Self { rooms: vec![], - loading: false, - sender: tx.clone(), - tracking_flag, - tracking: None, - notifications: None, - tasks, + tracking_flag: Arc::new(AtomicBool::new(false)), + tracking_task: None, + notification_task: None, _subscriptions: subscriptions, } } @@ -179,16 +117,17 @@ impl ChatRegistry { let nostr = NostrRegistry::global(cx); let client = nostr.read(cx).client(); let signer = nostr.read(cx).signer(); - let status = self.tracking_flag.clone(); - let tx = self.sender.clone(); - self.notifications = Some(cx.background_spawn(async move { - let initialized_at = Timestamp::now(); - let sub_id1 = SubscriptionId::new(DEVICE_GIFTWRAP); - let sub_id2 = SubscriptionId::new(USER_GIFTWRAP); + let initialized_at = Timestamp::now(); + let sub_id1 = SubscriptionId::new(DEVICE_GIFTWRAP); + let sub_id2 = SubscriptionId::new(USER_GIFTWRAP); + + // Channel for communication between nostr and gpui + let (tx, rx) = flume::bounded::(1024); + + cx.background_spawn(async move { let device_signer = signer.get_encryption_signer().await; - let mut notifications = client.notifications(); let mut processed_events = HashSet::new(); @@ -223,7 +162,7 @@ impl ChatRegistry { // the event is already emitted if !sent_by_coop { let new_message = NewMessage::new(event.id, rumor); - let signal = NostrEvent::Message(new_message); + let signal = Signal::Message(new_message); tx.send_async(signal).await.ok(); } @@ -239,29 +178,45 @@ impl ChatRegistry { } RelayMessage::EndOfStoredEvents(id) => { if id.as_ref() == &sub_id1 || id.as_ref() == &sub_id2 { - tx.send_async(NostrEvent::Eose).await.ok(); + tx.send_async(Signal::Eose).await.ok(); } } _ => {} } } + }) + .detach(); + + self.notification_task = Some(cx.spawn(async move |this, cx| { + while let Ok(message) = rx.recv_async().await { + match message { + Signal::Message(message) => { + this.update(cx, |this, cx| { + this.new_message(message, cx); + }) + .ok(); + } + Signal::Eose => { + this.update(cx, |this, cx| { + this.get_rooms(cx); + }) + .ok(); + } + }; + } })); } /// Tracking the status of unwrapping gift wrap events. fn tracking(&mut self, cx: &mut Context) { let status = self.tracking_flag.clone(); - let tx = self.sender.clone(); - self.tracking = Some(cx.background_spawn(async move { - let loop_duration = Duration::from_secs(12); + self.tracking_task = Some(cx.background_spawn(async move { + let loop_duration = Duration::from_secs(10); loop { if status.load(Ordering::Acquire) { _ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed); - tx.send_async(NostrEvent::Unwrapping(true)).await.ok(); - } else { - tx.send_async(NostrEvent::Unwrapping(false)).await.ok(); } smol::Timer::after(loop_duration).await; } @@ -270,13 +225,7 @@ impl ChatRegistry { /// Get the loading status of the chat registry pub fn loading(&self) -> bool { - self.loading - } - - /// Set the loading status of the chat registry - pub fn set_loading(&mut self, loading: bool, cx: &mut Context) { - self.loading = loading; - cx.notify(); + self.tracking_flag.load(Ordering::Acquire) } /// Get a weak reference to a room by its ID. @@ -312,19 +261,19 @@ impl ChatRegistry { let nostr = NostrRegistry::global(cx); let client = nostr.read(cx).client(); - self.tasks.push(cx.spawn(async move |this, cx| { - if let Some(signer) = client.signer() { - if let Ok(public_key) = signer.get_public_key().await { - this.update(cx, |this, cx| { - this.rooms - .insert(0, cx.new(|_| room.into().organize(&public_key))); - cx.emit(ChatEvent::Ping); - cx.notify(); - }) - .ok(); - } - } - })); + cx.spawn(async move |this, cx| { + let signer = client.signer()?; + let public_key = signer.get_public_key().await.ok()?; + let room: Room = room.into().organize(&public_key); + + this.update(cx, |this, cx| { + this.rooms.insert(0, cx.new(|_| room)); + cx.emit(ChatEvent::Ping); + cx.notify(); + }) + .ok() + }) + .detach(); } /// Emit an open room event. @@ -417,20 +366,16 @@ impl ChatRegistry { pub fn get_rooms(&mut self, cx: &mut Context) { let task = self.get_rooms_from_database(cx); - self.tasks.push(cx.spawn(async move |this, cx| { - match task.await { - Ok(rooms) => { - this.update(cx, move |this, cx| { - this.extend_rooms(rooms, cx); - this.sort(cx); - }) - .ok(); - } - Err(e) => { - log::error!("Failed to load rooms: {e}") - } - }; - })); + cx.spawn(async move |this, cx| { + let rooms = task.await.ok()?; + + this.update(cx, move |this, cx| { + this.extend_rooms(rooms, cx); + this.sort(cx); + }) + .ok() + }) + .detach(); } /// Create a task to load rooms from the database diff --git a/crates/chat/src/room.rs b/crates/chat/src/room.rs index f5fedc6..1f9dc93 100644 --- a/crates/chat/src/room.rs +++ b/crates/chat/src/room.rs @@ -314,12 +314,21 @@ impl Room { continue; }; - // Construct a filter for gossip relays - let filter = Filter::new().kind(Kind::RelayList).author(member).limit(1); + // Construct a filter for messaging relays + let inbox = Filter::new() + .kind(Kind::InboxRelays) + .author(member) + .limit(1); + + // Construct a filter for announcement + let announcement = Filter::new() + .kind(Kind::Custom(10044)) + .author(member) + .limit(1); // Subscribe to get member's gossip relays client - .subscribe(filter) + .subscribe(vec![inbox, announcement]) .with_id(subscription_id.clone()) .close_on( SubscribeAutoCloseOptions::default() @@ -357,62 +366,22 @@ impl Room { }) } - /// Construct extra tags for a message - fn extra_tags(&self, sender: PublicKey, members: &[Person], replies: &[EventId]) -> Vec { - let mut extra_tags = vec![]; - - // Add subject tag if present - if let Some(value) = self.subject.as_ref() { - extra_tags.push(Tag::from_standardized_without_cell(TagStandard::Subject( - value.to_string(), - ))); - } - - // Add all reply tags - for id in replies { - extra_tags.push(Tag::event(*id)) - } - - // Add all receiver tags - for member in members.iter() { - // Skip current user - if member.public_key() == sender { - continue; - } - - extra_tags.push(Tag::from_standardized_without_cell( - TagStandard::PublicKey { - public_key: member.public_key(), - relay_url: member.messaging_relay_hint(), - alias: None, - uppercase: false, - }, - )); - } - - extra_tags - } - - pub fn send(&self, content: S, replies: I, cx: &App) -> Option>> + // Construct a rumor event for direct message + pub fn rumor(&self, content: S, replies: I, cx: &App) -> Option where S: Into, I: IntoIterator, { - let persons = PersonRegistry::global(cx); - let nostr = NostrRegistry::global(cx); - - let client = nostr.read(cx).client(); - let signer = nostr.read(cx).signer(); - + let kind = Kind::PrivateDirectMessage; let content: String = content.into(); let replies: Vec = replies.into_iter().collect(); + let persons = PersonRegistry::global(cx); + let nostr = NostrRegistry::global(cx); + // Get current user's public key let sender = nostr.read(cx).signer().public_key()?; - // get room's config - let config = self.config.clone(); - // Get all members let members: Vec = self .members @@ -421,40 +390,107 @@ impl Room { .map(|member| persons.read(cx).get(member, cx)) .collect(); - // Get extra tags - let extra_tags = self.extra_tags(sender, &members, &replies); + // Construct event's tags + let mut tags = vec![]; + + // Add subject tag if present + if let Some(value) = self.subject.as_ref() { + tags.push(Tag::from_standardized_without_cell(TagStandard::Subject( + value.to_string(), + ))); + } + + // Add all reply tags + for id in replies.into_iter() { + tags.push(Tag::event(id)) + } + + // Add all receiver tags + for member in members.into_iter() { + // Skip current user + if member.public_key() == sender { + continue; + } + + tags.push(Tag::from_standardized_without_cell( + TagStandard::PublicKey { + public_key: member.public_key(), + relay_url: member.messaging_relay_hint(), + alias: None, + uppercase: false, + }, + )); + } + + // Construct a direct message rumor event + // WARNING: never sign and send this event to relays + let mut event = EventBuilder::new(kind, content).tags(tags).build(sender); + + // Ensure that the ID is set + event.ensure_id(); + + Some(event) + } + + /// Send rumor event to all members's messaging relays + pub fn send(&self, rumor: UnsignedEvent, cx: &App) -> Option>> { + let persons = PersonRegistry::global(cx); + let nostr = NostrRegistry::global(cx); + let client = nostr.read(cx).client(); + let signer = nostr.read(cx).signer(); + + // Get room's config + let config = self.config.clone(); + + // Get current user's public key + let sender = nostr.read(cx).signer().public_key()?; + + // Get all members (excluding sender) + let members: Vec = self + .members + .iter() + .filter(|public_key| public_key != &&sender) + .map(|member| persons.read(cx).get(member, cx)) + .collect(); Some(cx.background_spawn(async move { let signer_kind = config.signer_kind(); - let backup = config.backup(); - - // Get all available signers let user_signer = signer.get().await; let encryption_signer = signer.get_encryption_signer().await; - let mut reports: Vec = vec![]; + let mut reports = Vec::new(); + + for member in members { + let relays = member.messaging_relays(); + let announcement = member.announcement(); - for member in members.into_iter() { // Skip if member has no messaging relays - if member.messaging_relays().is_empty() { - let report = SendReport::new(member.public_key()).error("No messaging relays"); - reports.push(report); - + if relays.is_empty() { + reports.push(SendReport::new(member.public_key()).error("No messaging relays")); continue; } - // When the room is forced to use an encryption signer, - // skip if the receiver has not set up an encryption signer. - if signer_kind.encryption() && member.announcement().is_none() { - let report = SendReport::new(member.public_key()).error("Encryption not found"); - reports.push(report); + // Ensure relay connections + for url in relays.iter() { + client + .add_relay(url) + .and_connect() + .capabilities(RelayCapabilities::GOSSIP) + .await + .ok(); + } + // When forced to use encryption signer, skip if receiver has no announcement + if signer_kind.encryption() && announcement.is_none() { + reports + .push(SendReport::new(member.public_key()).error("Encryption not found")); continue; } - let (receiver, signer) = match signer_kind { + // Determine receiver and signer based on signer kind + let (receiver, signer_to_use) = match signer_kind { SignerKind::Auto => { - if let Some(announcement) = member.announcement() { + if let Some(announcement) = announcement { if let Some(enc_signer) = encryption_signer.as_ref() { (announcement.public_key(), enc_signer.clone()) } else { @@ -466,21 +502,48 @@ impl Room { } SignerKind::Encryption => { let Some(encryption_signer) = encryption_signer.as_ref() else { - let report = - SendReport::new(member.public_key()).error("Encryption not found"); - reports.push(report); + reports.push( + SendReport::new(member.public_key()).error("Encryption not found"), + ); continue; }; - let Some(announcement) = member.announcement() else { - let report = SendReport::new(member.public_key()) - .error("Announcement not found"); - reports.push(report); + let Some(announcement) = announcement else { + reports.push( + SendReport::new(member.public_key()) + .error("Announcement not found"), + ); continue; }; (announcement.public_key(), encryption_signer.clone()) } SignerKind::User => (member.public_key(), user_signer.clone()), }; + + // Create and send gift-wrapped event + match EventBuilder::gift_wrap(&signer_to_use, &receiver, rumor.clone(), vec![]) + .await + { + Ok(event) => { + match client + .send_event(&event) + .to(relays) + .ack_policy(AckPolicy::none()) + .await + { + Ok(output) => { + reports.push(SendReport::new(member.public_key()).output(output)); + } + Err(e) => { + reports.push( + SendReport::new(member.public_key()).error(e.to_string()), + ); + } + } + } + Err(e) => { + reports.push(SendReport::new(member.public_key()).error(e.to_string())); + } + } } reports diff --git a/crates/chat_ui/src/lib.rs b/crates/chat_ui/src/lib.rs index 1c79adc..3fdb087 100644 --- a/crates/chat_ui/src/lib.rs +++ b/crates/chat_ui/src/lib.rs @@ -1,8 +1,8 @@ use std::collections::HashSet; pub use actions::*; -use anyhow::Error; -use chat::{Message, RenderedMessage, Room, RoomEvent, RoomKind, SendReport}; +use anyhow::{Context as AnyhowContext, Error}; +use chat::{Message, RenderedMessage, Room, RoomEvent, SendReport}; use common::{nip96_upload, RenderedTimestamp}; use dock::panel::{Panel, PanelEvent}; use gpui::prelude::FluentBuilder; @@ -60,7 +60,7 @@ pub struct ChatPanel { /// Mapping message ids to their rendered texts rendered_texts_by_id: BTreeMap, - /// Mapping message ids to their reports + /// Mapping message (rumor event) ids to their reports reports_by_id: BTreeMap>, /// Input state @@ -124,6 +124,7 @@ impl ChatPanel { // Define all functions that will run after the current cycle cx.defer_in(window, |this, window, cx| { this.subscribe_room_events(window, cx); + this.connect(window, cx); this.get_messages(window, cx); }); @@ -164,6 +165,15 @@ impl ChatPanel { ); } + /// Get all necessary data for each member + fn connect(&mut self, _window: &mut Window, cx: &mut Context) { + let Ok(connect) = self.room.read_with(cx, |this, cx| this.early_connect(cx)) else { + return; + }; + + self.tasks.push(cx.background_spawn(connect)); + } + /// Load all messages belonging to this room fn get_messages(&mut self, _window: &mut Window, cx: &mut Context) { let Ok(get_messages) = self.room.read_with(cx, |this, cx| this.get_messages(cx)) else { @@ -182,7 +192,7 @@ impl ChatPanel { })); } - /// Get user input content and merged all attachments + /// Get user input content and merged all attachments if available fn get_input_value(&self, cx: &Context) -> String { // Get input's value let mut content = self.input.read(cx).value().trim().to_string(); @@ -222,7 +232,52 @@ impl ChatPanel { /// Send a message to all members of the chat fn send_message(&mut self, value: &str, window: &mut Window, cx: &mut Context) { - // TODO + if value.trim().is_empty() { + window.push_notification("Cannot send an empty message", cx); + return; + } + + // Get room entity + let room = self.room.clone(); + + let replies: Vec = self.replies_to.read(cx).iter().copied().collect(); + let content = value.to_string(); + + self.tasks.push(cx.spawn_in(window, async move |this, cx| { + let room = room.upgrade().context("Room is not available")?; + + this.update_in(cx, |this, window, cx| { + match room.read(cx).rumor(content, replies, cx) { + Some(rumor) => { + this.insert_message(&rumor, true, cx); + this.send_and_wait(rumor, window, cx); + } + None => { + window.push_notification("Failed to create message", cx); + } + } + })?; + + Ok(()) + })); + } + + fn send_and_wait(&mut self, rumor: UnsignedEvent, window: &mut Window, cx: &mut Context) { + let Some(room) = self.room.upgrade() else { + return; + }; + + let Some(task) = room.read(cx).send(rumor, cx) else { + window.push_notification("Failed to send message", cx); + return; + }; + + self.tasks.push(cx.spawn_in(window, async move |this, cx| { + let outputs = task.await; + log::info!("Message sent successfully: {outputs:?}"); + + Ok(()) + })) } fn insert_reports(&mut self, id: EventId, reports: Vec, cx: &mut Context) { diff --git a/crates/coop/src/main.rs b/crates/coop/src/main.rs index afeac78..1a661dc 100644 --- a/crates/coop/src/main.rs +++ b/crates/coop/src/main.rs @@ -78,6 +78,9 @@ fn main() { // Initialize theme registry theme::init(cx); + // Initialize settings + settings::init(cx); + // Initialize the nostr client state::init(cx); @@ -86,9 +89,6 @@ fn main() { // NIP-4e: https://github.com/nostr-protocol/nips/blob/per-device-keys/4e.md device::init(window, cx); - // Initialize settings - settings::init(cx); - // Initialize relay auth registry relay_auth::init(window, cx); diff --git a/crates/coop/src/sidebar/mod.rs b/crates/coop/src/sidebar/mod.rs index 5050582..e9a8de2 100644 --- a/crates/coop/src/sidebar/mod.rs +++ b/crates/coop/src/sidebar/mod.rs @@ -206,17 +206,6 @@ impl Sidebar { /// Search fn search(&mut self, window: &mut Window, cx: &mut Context) { - // Return if a search is already in progress - if self.finding { - if self.find_task.is_none() { - window.push_notification("There is another search in progress", cx); - return; - } else { - // Cancel the ongoing search request - self.find_task = None; - } - } - // Get query let query = self.find_input.read(cx).value(); @@ -228,12 +217,14 @@ impl Sidebar { // Block the input until the search completes self.set_finding(true, window, cx); + // Create the search task let nostr = NostrRegistry::global(cx); let find_users = nostr.read(cx).search(&query, cx); // Run task in the main thread self.find_task = Some(cx.spawn_in(window, async move |this, cx| { let rooms = find_users.await?; + // Update the UI with the search results this.update_in(cx, |this, window, cx| { this.set_results(rooms, cx); diff --git a/crates/state/src/constants.rs b/crates/state/src/constants.rs index 992a272..c0a1a2d 100644 --- a/crates/state/src/constants.rs +++ b/crates/state/src/constants.rs @@ -37,7 +37,7 @@ pub const USER_GIFTWRAP: &str = "user-gift-wraps"; pub const WOT_RELAYS: [&str; 1] = ["wss://relay.vertexlab.io"]; /// Default search relays -pub const SEARCH_RELAYS: [&str; 1] = ["wss://antiprimal.net"]; +pub const SEARCH_RELAYS: [&str; 2] = ["wss://antiprimal.net", "wss://search.nos.today"]; /// Default bootstrap relays pub const BOOTSTRAP_RELAYS: [&str; 3] = [ diff --git a/crates/state/src/lib.rs b/crates/state/src/lib.rs index 0209498..b967083 100644 --- a/crates/state/src/lib.rs +++ b/crates/state/src/lib.rs @@ -837,9 +837,30 @@ impl NostrRegistry { let client = self.client(); let query = query.to_string(); + // Get the address task if the query is a valid NIP-05 address + let address_task = if let Ok(addr) = Nip05Address::parse(&query) { + Some(self.get_address(addr, cx)) + } else { + None + }; + cx.background_spawn(async move { let mut results: Vec = Vec::with_capacity(FIND_LIMIT); + // Return early if the query is a valid NIP-05 address + if let Some(task) = address_task { + if let Ok(public_key) = task.await { + results.push(public_key); + return Ok(results); + } + } + + // Return early if the query is a valid public key + if let Ok(public_key) = PublicKey::parse(&query) { + results.push(public_key); + return Ok(results); + } + // Construct the filter for the search query let filter = Filter::new() .search(query.to_lowercase())