diff --git a/crates/coop/src/views/chat/mod.rs b/crates/coop/src/views/chat/mod.rs index 350c163..d538cbb 100644 --- a/crates/coop/src/views/chat/mod.rs +++ b/crates/coop/src/views/chat/mod.rs @@ -166,7 +166,7 @@ impl Chat { match signal { RoomSignal::NewMessage((gift_wrap_id, event)) => { let gift_wrap_id = gift_wrap_id.to_owned(); - let message = Message::user(event); + let message = Message::user(event.clone()); cx.spawn_in(window, async move |this, cx| { let states = app_state(); @@ -423,8 +423,8 @@ impl Chat { } /// Convert and insert a vector of nostr events into the chat panel - fn insert_messages(&mut self, events: Vec, cx: &mut Context) { - for event in events.into_iter() { + fn insert_messages(&mut self, events: Vec, cx: &mut Context) { + for event in events { let m = Message::user(event); // Bulk inserting messages, so no need to scroll to the latest message self.insert_message(m, false, cx); diff --git a/crates/registry/src/lib.rs b/crates/registry/src/lib.rs index 70bf62f..dd688ea 100644 --- a/crates/registry/src/lib.rs +++ b/crates/registry/src/lib.rs @@ -9,7 +9,6 @@ use fuzzy_matcher::FuzzyMatcher; use gpui::{ App, AppContext, AsyncApp, Context, Entity, EventEmitter, Global, Task, WeakEntity, Window, }; -use itertools::Itertools; use nostr_sdk::prelude::*; use room::RoomKind; use settings::AppSettings; @@ -338,63 +337,64 @@ impl Registry { let public_key = signer.get_public_key().await?; let contacts = client.database().contacts_public_keys(public_key).await?; - // Get messages sent by the user - let sent = Filter::new() - .kind(Kind::PrivateDirectMessage) - .author(public_key); + let authored_filter = Filter::new() + .kind(Kind::ApplicationSpecificData) + .custom_tag( + SingleLetterTag::lowercase(Alphabet::A), + public_key.to_hex(), + ); - // Get messages received by the user - let recv = Filter::new() - .kind(Kind::PrivateDirectMessage) - .pubkey(public_key); + let addressed_filter = Filter::new() + .kind(Kind::ApplicationSpecificData) + .custom_tag(SingleLetterTag::lowercase(Alphabet::P), public_key); - let sent_events = client.database().query(sent).await?; - let recv_events = client.database().query(recv).await?; - let events = sent_events.merge(recv_events); + let authored = client.database().query(authored_filter).await?; + let addressed = client.database().query(addressed_filter).await?; + let events = authored.merge(addressed); let mut rooms: HashSet = HashSet::new(); + let mut grouped: HashMap> = HashMap::new(); // Process each event and group by room hash - for event in events - .into_iter() - .sorted_by_key(|event| Reverse(event.created_at)) - .filter(|ev| ev.tags.public_keys().peekable().peek().is_some()) - { - // Parse the room from the nostr event - let room = Room::from(&event); + for raw in events.into_iter() { + match UnsignedEvent::from_json(&raw.content) { + Ok(rumor) => { + if rumor.tags.public_keys().peekable().peek().is_some() { + grouped.entry(rumor.uniq_id()).or_default().push(rumor); + } + } + Err(e) => log::warn!("Failed to parse stored rumor: {e}"), + } + } + + for (_room_id, mut messages) in grouped.into_iter() { + messages.sort_by_key(|m| Reverse(m.created_at)); + + let Some(latest) = messages.first() else { + continue; + }; + + let mut room = Room::from(latest); - // Skip if the room is already in the set if rooms.iter().any(|r| r.id == room.id) { continue; } - // Get all public keys from the event's tags let mut public_keys: Vec = room.members().to_vec(); public_keys.retain(|pk| pk != &public_key); - // Bypass screening flag - let mut bypassed = false; + let user_sent = messages.iter().any(|m| m.pubkey == public_key); - // If the user has enabled bypass screening in settings, - // check if any of the room's members are contacts of the current user + let mut bypassed = false; if bypass_setting { bypassed = public_keys.iter().any(|k| contacts.contains(k)); } - // Check if the current user has sent at least one message to this room - let filter = Filter::new() - .kind(Kind::PrivateDirectMessage) - .author(public_key) - .pubkeys(public_keys); - - // If current user has sent a message at least once, mark as ongoing - let is_ongoing = client.database().count(filter).await.unwrap_or(1) >= 1; - - if is_ongoing || bypassed { - rooms.insert(room.kind(RoomKind::Ongoing)); - } else { - rooms.insert(room); + if user_sent || bypassed { + room = room.kind(RoomKind::Ongoing); } + + rooms.insert(room); } Ok(rooms) @@ -482,7 +482,7 @@ impl Registry { pub fn event_to_message( &mut self, gift_wrap: EventId, - event: Event, + event: UnsignedEvent, window: &mut Window, cx: &mut Context, ) { @@ -495,11 +495,13 @@ impl Registry { if let Some(room) = self.rooms.iter().find(|room| room.read(cx).id == id) { let is_new_event = event.created_at > room.read(cx).created_at; + let created_at = event.created_at; + let event_for_emit = event.clone(); // Update room room.update(cx, |this, cx| { if is_new_event { - this.set_created_at(event.created_at, cx); + this.set_created_at(created_at, cx); } // Set this room is ongoing if the new message is from current user @@ -508,8 +510,9 @@ impl Registry { } // Emit the new message to the room + let event_to_emit = event_for_emit.clone(); cx.defer_in(window, move |this, _window, cx| { - this.emit_message(gift_wrap, event, cx); + this.emit_message(gift_wrap, event_to_emit, cx); }); }); diff --git a/crates/registry/src/room.rs b/crates/registry/src/room.rs index 4c5c33d..3a3dab5 100644 --- a/crates/registry/src/room.rs +++ b/crates/registry/src/room.rs @@ -71,7 +71,7 @@ impl SendReport { #[derive(Debug, Clone)] pub enum RoomSignal { - NewMessage((EventId, Box)), + NewMessage((EventId, UnsignedEvent)), Refresh, } @@ -359,65 +359,37 @@ impl Room { } /// Loads all messages for this room from the database - pub fn load_messages(&self, cx: &App) -> Task, Error>> { - let members = self.members.clone(); + pub fn load_messages(&self, cx: &App) -> Task, Error>> { + let conversation_id = self.id.to_string(); cx.background_spawn(async move { let client = app_state().client(); - let signer = client.signer().await?; - let public_key = signer.get_public_key().await?; - let sent_ids: Vec = app_state() - .tracker() - .read() - .await - .sent_ids() - .iter() - .copied() - .collect(); - - // Get seen events from database let filter = Filter::new() .kind(Kind::ApplicationSpecificData) - .identifiers(sent_ids); + .custom_tag( + SingleLetterTag::lowercase(Alphabet::C), + conversation_id.as_str(), + ); - let seen_events = client.database().query(filter).await?; + let stored = client.database().query(filter).await?; + let mut messages = Vec::with_capacity(stored.len()); - // Extract seen event IDs - let seen_ids: Vec = seen_events - .into_iter() - .filter_map(|event| event.tags.event_ids().next().copied()) - .collect(); + for event in stored { + match UnsignedEvent::from_json(&event.content) { + Ok(rumor) => messages.push(rumor), + Err(e) => log::warn!("Failed to parse stored rumor: {e}"), + } + } - // Get events that sent by current user - let filter = Filter::new() - .kind(Kind::PrivateDirectMessage) - .author(public_key) - .pubkeys(members.clone()); + messages.sort_by_key(|message| message.created_at); - let sent_events = client.database().query(filter).await?; - - // Get events that received by current user - let filter = Filter::new() - .kind(Kind::PrivateDirectMessage) - .authors(members) - .pubkey(public_key); - - let recv_events = client.database().query(filter).await?; - - // Merge events - let events: Vec = sent_events - .merge(recv_events) - .into_iter() - .filter(|event| !seen_ids.contains(&event.id)) - .collect(); - - Ok(events) + Ok(messages) }) } /// Emits a new message signal to the current room - pub fn emit_message(&self, gift_wrap_id: EventId, event: Event, cx: &mut Context) { - cx.emit(RoomSignal::NewMessage((gift_wrap_id, Box::new(event)))); + pub fn emit_message(&self, gift_wrap_id: EventId, event: UnsignedEvent, cx: &mut Context) { + cx.emit(RoomSignal::NewMessage((gift_wrap_id, event))); } /// Emits a signal to refresh the current room's messages. diff --git a/crates/states/src/state.rs b/crates/states/src/state.rs index 1f1f8d2..b80aaee 100644 --- a/crates/states/src/state.rs +++ b/crates/states/src/state.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; -use std::collections::{HashMap, HashSet}; +use std::collections::{hash_map::DefaultHasher, HashMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; @@ -57,7 +58,7 @@ pub enum SignalKind { NewProfile(Profile), /// A signal to notify UI that a new gift wrap event has been received - NewMessage((EventId, Event)), + NewMessage((EventId, UnsignedEvent)), /// A signal to notify UI that no messaging relays for current user was found MessagingRelaysNotFound, @@ -685,37 +686,53 @@ impl AppState { } /// Stores an unwrapped event in local database with reference to original - async fn set_rumor(&self, id: EventId, rumor: &Event) -> Result<(), Error> { - // Save unwrapped event - self.client.database().save_event(rumor).await?; + async fn set_rumor(&self, id: EventId, rumor: &UnsignedEvent) -> Result<(), Error> { + let rumor_id = rumor + .id + .ok_or_else(|| anyhow!("Rumor is missing an event id"))?; + let author_hex = rumor.pubkey.to_hex(); + let conversation = Self::conversation_id(rumor).to_string(); - // Create a reference event pointing to the unwrapped event - let event = EventBuilder::new(Kind::ApplicationSpecificData, "") - .tags(vec![Tag::identifier(id), Tag::event(rumor.id)]) + let mut tags = rumor.tags.clone().to_vec(); + tags.push(Tag::identifier(id)); + tags.push(Tag::custom( + TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::A)), + [author_hex], + )); + tags.push(Tag::custom( + TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::C)), + [conversation], + )); + tags.push(Tag::event(rumor_id)); + + for receiver in rumor.tags.public_keys().copied() { + tags.push(Tag::custom( + TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::P)), + [receiver.to_hex()], + )); + } + + let content = rumor.as_json(); + + let event = EventBuilder::new(Kind::ApplicationSpecificData, content) + .tags(tags) .sign(&Keys::generate()) .await?; - // Save reference event self.client.database().save_event(&event).await?; Ok(()) } /// Retrieves a previously unwrapped event from local database - async fn get_rumor(&self, id: EventId) -> Result { + async fn get_rumor(&self, id: EventId) -> Result { let filter = Filter::new() .kind(Kind::ApplicationSpecificData) .identifier(id) .limit(1); if let Some(event) = self.client.database().query(filter).await?.first_owned() { - let target_id = event.tags.event_ids().collect::>()[0]; - - if let Some(event) = self.client.database().event_by_id(target_id).await? { - Ok(event) - } else { - Err(anyhow!("Event not found.")) - } + UnsignedEvent::from_json(event.content).map_err(|e| anyhow!(e)) } else { Err(anyhow!("Event is not cached yet.")) } @@ -723,13 +740,13 @@ impl AppState { // Unwraps a gift-wrapped event and processes its contents. async fn extract_rumor(&self, gift_wrap: &Event) { - let mut rumor: Option = None; + let mut rumor: Option = None; if let Ok(event) = self.get_rumor(gift_wrap.id).await { rumor = Some(event); } else if let Ok(unwrapped) = self.client.unwrap_gift_wrap(gift_wrap).await { let sender = unwrapped.sender; - let rumor_unsigned = unwrapped.rumor; + let mut rumor_unsigned = unwrapped.rumor; if !Self::verify_rumor_sender(sender, &rumor_unsigned) { log::warn!( @@ -738,13 +755,14 @@ impl AppState { sender, rumor_unsigned.pubkey ); - } else if let Ok(event) = rumor_unsigned.clone().sign_with_keys(&Keys::generate()) { - // Save this event to the database for future use. - if let Err(e) = self.set_rumor(gift_wrap.id, &event).await { - log::warn!("Failed to cache unwrapped event: {e}") - } + } else { + rumor_unsigned.ensure_id(); - rumor = Some(event); + if let Err(e) = self.set_rumor(gift_wrap.id, &rumor_unsigned).await { + log::warn!("Failed to cache unwrapped event: {e}") + } else { + rumor = Some(rumor_unsigned); + } } } @@ -769,6 +787,16 @@ impl AppState { } } + fn conversation_id(rumor: &UnsignedEvent) -> u64 { + let mut hasher = DefaultHasher::new(); + let mut pubkeys: Vec = rumor.tags.public_keys().copied().collect(); + pubkeys.push(rumor.pubkey); + pubkeys.sort(); + pubkeys.dedup(); + pubkeys.hash(&mut hasher); + hasher.finish() + } + fn verify_rumor_sender(sender: PublicKey, rumor: &UnsignedEvent) -> bool { rumor.pubkey == sender }