use std::cmp::Reverse; use std::collections::{HashMap, HashSet}; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context as AnyhowContext, Error}; use common::EventUtils; use device::DeviceRegistry; use flume::Sender; use fuzzy_matcher::skim::SkimMatcherV2; use fuzzy_matcher::FuzzyMatcher; use gpui::{ App, AppContext, Context, Entity, EventEmitter, Global, Subscription, Task, WeakEntity, }; use nostr_sdk::prelude::*; use smallvec::{smallvec, SmallVec}; use state::{tracker, NostrRegistry, RelayState, DEVICE_GIFTWRAP, USER_GIFTWRAP}; mod message; mod room; pub use message::*; pub use room::*; pub fn init(cx: &mut App) { ChatRegistry::set_global(cx.new(ChatRegistry::new), cx); } struct GlobalChatRegistry(Entity); impl Global for GlobalChatRegistry {} /// Chat event. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum ChatEvent { /// An event to open a room by its ID OpenRoom(u64), /// An event to close a room by its ID CloseRoom(u64), /// An event to notify UI about a new chat request Ping, } /// Channel signal. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] enum NostrEvent { /// Message received from relay pool Message(NewMessage), /// Unwrapping status Unwrapping(bool), /// Eose received from relay pool Eose, } /// Chat Registry #[derive(Debug)] pub struct ChatRegistry { /// Collection of all chat rooms rooms: Vec>, /// Loading status of the registry loading: bool, /// Channel's sender for communication between nostr and gpui sender: Sender, /// Tracking the status of unwrapping gift wrap events. tracking_flag: Arc, /// Handle tracking asynchronous task tracking: Option>>, /// Handle notifications asynchronous task notifications: Option>, /// Tasks for asynchronous operations tasks: Vec>, /// Subscriptions _subscriptions: SmallVec<[Subscription; 1]>, } impl EventEmitter for ChatRegistry {} impl ChatRegistry { /// Retrieve the global chat registry state pub fn global(cx: &App) -> Entity { cx.global::().0.clone() } /// Set the global chat registry instance fn set_global(state: Entity, cx: &mut App) { cx.set_global(GlobalChatRegistry(state)); } /// Create a new chat registry instance fn new(cx: &mut Context) -> Self { let device = DeviceRegistry::global(cx); let nostr = NostrRegistry::global(cx); let nip17_state = nostr.read(cx).nip17_state(); // A flag to indicate if the registry is loading let tracking_flag = Arc::new(AtomicBool::new(false)); // Channel for communication between nostr and gpui let (tx, rx) = flume::bounded::(2048); let mut tasks = vec![]; let mut subscriptions = smallvec![]; subscriptions.push( // Observe the identity cx.observe(&nip17_state, |this, state, cx| { if state.read(cx) == &RelayState::Configured { // Handle nostr notifications this.handle_notifications(cx); // Track unwrapping progress this.tracking(cx); } // Get chat rooms from the database on every identity change this.get_rooms(cx); }), ); subscriptions.push( // Observe the device signer state cx.observe(&device, |this, state, cx| { if state.read(cx).state().set() { this.handle_notifications(cx); } }), ); tasks.push( // Update GPUI states cx.spawn(async move |this, cx| { while let Ok(message) = rx.recv_async().await { match message { NostrEvent::Message(message) => { this.update(cx, |this, cx| { this.new_message(message, cx); }) .ok(); } NostrEvent::Unwrapping(status) => { this.update(cx, |this, cx| { this.set_loading(status, cx); this.get_rooms(cx); }) .ok(); } NostrEvent::Eose => { this.update(cx, |this, cx| { this.get_rooms(cx); }) .ok(); } }; } }), ); Self { rooms: vec![], loading: false, sender: tx.clone(), tracking_flag, tracking: None, notifications: None, tasks, _subscriptions: subscriptions, } } /// Handle nostr notifications fn handle_notifications(&mut self, cx: &mut Context) { let nostr = NostrRegistry::global(cx); let client = nostr.read(cx).client(); let signer = nostr.read(cx).signer(); let status = self.tracking_flag.clone(); let tx = self.sender.clone(); self.notifications = Some(cx.background_spawn(async move { let initialized_at = Timestamp::now(); let sub_id1 = SubscriptionId::new(DEVICE_GIFTWRAP); let sub_id2 = SubscriptionId::new(USER_GIFTWRAP); let device_signer = signer.get_encryption_signer().await; let mut notifications = client.notifications(); let mut processed_events = HashSet::new(); while let Some(notification) = notifications.next().await { let ClientNotification::Message { message, .. } = notification else { // Skip non-message notifications continue; }; match message { RelayMessage::Event { event, .. } => { if !processed_events.insert(event.id) { // Skip if the event has already been processed continue; } if event.kind != Kind::GiftWrap { // Skip non-gift wrap events continue; } // Extract the rumor from the gift wrap event match Self::extract_rumor(&client, &device_signer, event.as_ref()).await { Ok(rumor) => match rumor.created_at >= initialized_at { true => { // Check if the event is sent by coop let sent_by_coop = { let tracker = tracker().read().await; tracker.is_sent_by_coop(&event.id) }; // No need to emit if sent by coop // the event is already emitted if !sent_by_coop { let new_message = NewMessage::new(event.id, rumor); let signal = NostrEvent::Message(new_message); tx.send_async(signal).await.ok(); } } false => { status.store(true, Ordering::Release); } }, Err(e) => { log::warn!("Failed to unwrap the gift wrap event: {e}"); } } } RelayMessage::EndOfStoredEvents(id) => { if id.as_ref() == &sub_id1 || id.as_ref() == &sub_id2 { tx.send_async(NostrEvent::Eose).await.ok(); } } _ => {} } } })); } /// Tracking the status of unwrapping gift wrap events. fn tracking(&mut self, cx: &mut Context) { let status = self.tracking_flag.clone(); let tx = self.sender.clone(); self.tracking = Some(cx.background_spawn(async move { let loop_duration = Duration::from_secs(12); loop { if status.load(Ordering::Acquire) { _ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed); tx.send_async(NostrEvent::Unwrapping(true)).await.ok(); } else { tx.send_async(NostrEvent::Unwrapping(false)).await.ok(); } smol::Timer::after(loop_duration).await; } })); } /// Get the loading status of the chat registry pub fn loading(&self) -> bool { self.loading } /// Set the loading status of the chat registry pub fn set_loading(&mut self, loading: bool, cx: &mut Context) { self.loading = loading; cx.notify(); } /// Get a weak reference to a room by its ID. pub fn room(&self, id: &u64, cx: &App) -> Option> { self.rooms .iter() .find(|this| &this.read(cx).id == id) .map(|this| this.downgrade()) } /// Get all rooms based on the filter. pub fn rooms(&self, filter: &RoomKind, cx: &App) -> Vec> { self.rooms .iter() .filter(|room| &room.read(cx).kind == filter) .cloned() .collect() } /// Count the number of rooms based on the filter. pub fn count(&self, filter: &RoomKind, cx: &App) -> usize { self.rooms .iter() .filter(|room| &room.read(cx).kind == filter) .count() } /// Add a new room to the start of list. pub fn add_room(&mut self, room: I, cx: &mut Context) where I: Into + 'static, { let nostr = NostrRegistry::global(cx); let client = nostr.read(cx).client(); self.tasks.push(cx.spawn(async move |this, cx| { if let Some(signer) = client.signer() { if let Ok(public_key) = signer.get_public_key().await { this.update(cx, |this, cx| { this.rooms .insert(0, cx.new(|_| room.into().organize(&public_key))); cx.emit(ChatEvent::Ping); cx.notify(); }) .ok(); } } })); } /// Emit an open room event. /// /// If the room is new, add it to the registry. pub fn emit_room(&mut self, room: &Entity, cx: &mut Context) { // Get the room's ID. let id = room.read(cx).id; // If the room is new, add it to the registry. if !self.rooms.iter().any(|r| r.read(cx).id == id) { self.rooms.insert(0, room.to_owned()); } // Emit the open room event. cx.emit(ChatEvent::OpenRoom(id)); } /// Close a room. pub fn close_room(&mut self, id: u64, cx: &mut Context) { if self.rooms.iter().any(|r| r.read(cx).id == id) { cx.emit(ChatEvent::CloseRoom(id)); } } /// Sort rooms by their created at. pub fn sort(&mut self, cx: &mut Context) { self.rooms.sort_by_key(|ev| Reverse(ev.read(cx).created_at)); cx.notify(); } /// Finding rooms based on a query. pub fn find(&self, query: &str, cx: &App) -> Vec> { let matcher = SkimMatcherV2::default(); if let Ok(public_key) = PublicKey::parse(query) { self.rooms .iter() .filter(|room| room.read(cx).members.contains(&public_key)) .cloned() .collect() } else { self.rooms .iter() .filter(|room| { matcher .fuzzy_match(room.read(cx).display_name(cx).as_ref(), query) .is_some() }) .cloned() .collect() } } /// Reset the registry. pub fn reset(&mut self, cx: &mut Context) { self.rooms.clear(); cx.notify(); } /// Extend the registry with new rooms. fn extend_rooms(&mut self, rooms: HashSet, cx: &mut Context) { let mut room_map: HashMap = self .rooms .iter() .enumerate() .map(|(idx, room)| (room.read(cx).id, idx)) .collect(); for new_room in rooms.into_iter() { // Check if we already have a room with this ID if let Some(&index) = room_map.get(&new_room.id) { self.rooms[index].update(cx, |this, cx| { if new_room.created_at > this.created_at { *this = new_room; cx.notify(); } }); } else { let new_room_id = new_room.id; self.rooms.push(cx.new(|_| new_room)); let new_index = self.rooms.len(); room_map.insert(new_room_id, new_index); } } } /// Load all rooms from the database. pub fn get_rooms(&mut self, cx: &mut Context) { let task = self.get_rooms_from_database(cx); self.tasks.push(cx.spawn(async move |this, cx| { match task.await { Ok(rooms) => { this.update(cx, move |this, cx| { this.extend_rooms(rooms, cx); this.sort(cx); }) .ok(); } Err(e) => { log::error!("Failed to load rooms: {e}") } }; })); } /// Create a task to load rooms from the database fn get_rooms_from_database(&self, cx: &App) -> Task, Error>> { let nostr = NostrRegistry::global(cx); let client = nostr.read(cx).client(); cx.background_spawn(async move { let signer = client.signer().context("Signer not found")?; let public_key = signer.get_public_key().await?; // Get contacts let contacts = client.database().contacts_public_keys(public_key).await?; // Construct authored filter let authored_filter = Filter::new() .kind(Kind::ApplicationSpecificData) .custom_tag(SingleLetterTag::lowercase(Alphabet::A), public_key); // Get all authored events let authored = client.database().query(authored_filter).await?; // Construct addressed filter let addressed_filter = Filter::new() .kind(Kind::ApplicationSpecificData) .custom_tag(SingleLetterTag::lowercase(Alphabet::P), public_key); // Get all addressed events let addressed = client.database().query(addressed_filter).await?; // Merge authored and addressed events let events = authored.merge(addressed); // Collect results let mut rooms: HashSet = HashSet::new(); let mut grouped: HashMap> = HashMap::new(); // Process each event and group by room hash for raw in events.into_iter() { if let Ok(rumor) = UnsignedEvent::from_json(&raw.content) { if rumor.tags.public_keys().peekable().peek().is_some() { grouped.entry(rumor.uniq_id()).or_default().push(rumor); } } } for (_id, mut messages) in grouped.into_iter() { messages.sort_by_key(|m| Reverse(m.created_at)); // Always use the latest message let Some(latest) = messages.first() else { continue; }; // Construct the room from the latest message. // // Call `.organize` to ensure the current user is at the end of the list. let mut room = Room::from(latest).organize(&public_key); // Check if the user has responded to the room let user_sent = messages.iter().any(|m| m.pubkey == public_key); // Check if public keys are from the user's contacts let is_contact = room.members.iter().any(|k| contacts.contains(k)); // Set the room's kind based on status if user_sent || is_contact { room = room.kind(RoomKind::Ongoing); } rooms.insert(room); } Ok(rooms) }) } /// Parse a nostr event into a message and push it to the belonging room /// /// If the room doesn't exist, it will be created. /// Updates room ordering based on the most recent messages. pub fn new_message(&mut self, message: NewMessage, cx: &mut Context) { match self.rooms.iter().find(|e| e.read(cx).id == message.room) { Some(room) => { room.update(cx, |this, cx| { this.push_message(message, cx); }); } None => { // Push the new room to the front of the list self.add_room(message.rumor, cx); } } } /// Trigger a refresh of the opened chat rooms by their IDs pub fn refresh_rooms(&mut self, ids: Option>, cx: &mut Context) { if let Some(ids) = ids { for room in self.rooms.iter() { if ids.contains(&room.read(cx).id) { room.update(cx, |this, cx| { this.emit_refresh(cx); }); } } } } /// Unwraps a gift-wrapped event and processes its contents. async fn extract_rumor( client: &Client, device_signer: &Option>, gift_wrap: &Event, ) -> Result { // Try to get cached rumor first if let Ok(event) = Self::get_rumor(client, gift_wrap.id).await { return Ok(event); } // Try to unwrap with the available signer let unwrapped = Self::try_unwrap(client, device_signer, gift_wrap).await?; let mut rumor_unsigned = unwrapped.rumor; // Generate event id for the rumor if it doesn't have one rumor_unsigned.ensure_id(); // Cache the rumor Self::set_rumor(client, gift_wrap.id, &rumor_unsigned).await?; Ok(rumor_unsigned) } /// Helper method to try unwrapping with different signers async fn try_unwrap( client: &Client, device_signer: &Option>, gift_wrap: &Event, ) -> Result { // Try with the device signer first if let Some(signer) = device_signer { if let Ok(unwrapped) = Self::try_unwrap_with(gift_wrap, signer).await { return Ok(unwrapped); }; }; // Try with the user's signer let user_signer = client.signer().context("Signer not found")?; let unwrapped = UnwrappedGift::from_gift_wrap(user_signer, gift_wrap).await?; Ok(unwrapped) } /// Attempts to unwrap a gift wrap event with a given signer. async fn try_unwrap_with( gift_wrap: &Event, signer: &Arc, ) -> Result { // Get the sealed event let seal = signer .nip44_decrypt(&gift_wrap.pubkey, &gift_wrap.content) .await?; // Verify the sealed event let seal: Event = Event::from_json(seal)?; seal.verify_with_ctx(&SECP256K1)?; // Get the rumor event let rumor = signer.nip44_decrypt(&seal.pubkey, &seal.content).await?; let rumor = UnsignedEvent::from_json(rumor)?; Ok(UnwrappedGift { sender: seal.pubkey, rumor, }) } /// Stores an unwrapped event in local database with reference to original async fn set_rumor(client: &Client, id: EventId, rumor: &UnsignedEvent) -> Result<(), Error> { let rumor_id = rumor.id.context("Rumor is missing an event id")?; let author = rumor.pubkey; let conversation = Self::conversation_id(rumor); let mut tags = rumor.tags.clone().to_vec(); // Add a unique identifier tags.push(Tag::identifier(id)); // Add a reference to the rumor's author tags.push(Tag::custom( TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::A)), [author], )); // Add a conversation id tags.push(Tag::custom( TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::C)), [conversation.to_string()], )); // Add a reference to the rumor's id tags.push(Tag::event(rumor_id)); // Add references to the rumor's participants for receiver in rumor.tags.public_keys().copied() { tags.push(Tag::custom( TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::P)), [receiver], )); } // Convert rumor to json let content = rumor.as_json(); // Construct the event let event = EventBuilder::new(Kind::ApplicationSpecificData, content) .tags(tags) .sign(&Keys::generate()) .await?; // Save the event to the database client.database().save_event(&event).await?; Ok(()) } /// Retrieves a previously unwrapped event from local database async fn get_rumor(client: &Client, gift_wrap: EventId) -> Result { let filter = Filter::new() .kind(Kind::ApplicationSpecificData) .identifier(gift_wrap) .limit(1); if let Some(event) = client.database().query(filter).await?.first_owned() { UnsignedEvent::from_json(event.content).map_err(|e| anyhow!(e)) } else { Err(anyhow!("Event is not cached yet.")) } } /// Get the conversation ID for a given rumor (message). fn conversation_id(rumor: &UnsignedEvent) -> u64 { let mut hasher = DefaultHasher::new(); let mut pubkeys: Vec = rumor.tags.public_keys().copied().collect(); pubkeys.push(rumor.pubkey); pubkeys.sort(); pubkeys.dedup(); pubkeys.hash(&mut hasher); hasher.finish() } }