From 1c4806bd92147056798cc2dce2905b075aa33754 Mon Sep 17 00:00:00 2001 From: reya Date: Mon, 24 Feb 2025 16:18:21 +0700 Subject: [PATCH] chore: refactor chat room --- Cargo.lock | 1 + crates/app/Cargo.toml | 1 + crates/app/src/views/chat.rs | 210 ++++++------------------ crates/app/src/views/sidebar/compose.rs | 21 +-- crates/chats/src/registry.rs | 6 +- crates/chats/src/room.rs | 192 ++++++++++++++-------- 6 files changed, 185 insertions(+), 246 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7e106d..8be2cdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1206,6 +1206,7 @@ dependencies = [ "rustls", "serde", "serde_json", + "smallvec", "smol", "state", "tracing-subscriber", diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 348ee39..054f133 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -27,6 +27,7 @@ itertools.workspace = true dirs.workspace = true rust-embed.workspace = true log.workspace = true +smallvec.workspace = true smol.workspace = true oneshot.workspace = true diff --git a/crates/app/src/views/chat.rs b/crates/app/src/views/chat.rs index 6f62353..4d4eb1e 100644 --- a/crates/app/src/views/chat.rs +++ b/crates/app/src/views/chat.rs @@ -1,4 +1,3 @@ -use account::registry::Account; use anyhow::anyhow; use async_utility::task::spawn; use chats::{registry::ChatRegistry, room::Room}; @@ -100,6 +99,7 @@ pub struct Chat { room: WeakEntity, messages: Entity>, list_state: ListState, + #[allow(dead_code)] subscriptions: Vec, // New Message input: Entity, @@ -125,15 +125,27 @@ impl Chat { }); cx.new(|cx| { - let subscriptions = vec![cx.subscribe_in( + let mut subscriptions = Vec::with_capacity(2); + + subscriptions.push(cx.subscribe_in( &input, window, - move |this: &mut Chat, _, input_event, window, cx| { - if let InputEvent::PressEnter = input_event { + move |this: &mut Self, _, event, window, cx| { + if let InputEvent::PressEnter = event { this.send_message(window, cx); } }, - )]; + )); + + if let Some(room) = room.upgrade() { + subscriptions.push(cx.subscribe_in( + &room, + window, + move |this: &mut Self, _, event, window, cx| { + this.push_message(&event.event, window, cx); + }, + )); + } // Initialize list state // [item_count] always equal to 1 at the beginning @@ -147,7 +159,7 @@ impl Chat { } }); - let mut this = Self { + let this = Self { focus_handle: cx.focus_handle(), is_uploading: false, id: id.to_string().into(), @@ -165,9 +177,6 @@ impl Chat { // Load all messages from database this.load_messages(cx); - // Subscribe and load new messages - this.load_new_messages(cx); - this }) } @@ -239,30 +248,10 @@ impl Chat { }; let room = model.read(cx); - let client = get_client(); - let (tx, rx) = oneshot::channel::(); - - let pubkeys = room - .members - .iter() - .map(|m| m.public_key()) - .collect::>(); - - let filter = Filter::new() - .kind(Kind::PrivateDirectMessage) - .authors(pubkeys.iter().copied()) - .pubkeys(pubkeys); - - cx.background_spawn(async move { - let Ok(events) = client.database().query(filter).await else { - return; - }; - _ = tx.send(events); - }) - .detach(); + let task = room.load_messages(cx); cx.spawn(|this, cx| async move { - if let Ok(events) = rx.await { + if let Ok(events) = task.await { _ = cx.update(|cx| { _ = this.update(cx, |this, cx| { this.push_messages(events, cx); @@ -285,29 +274,29 @@ impl Chat { self.list_state.splice(old_len..old_len, 1); } - fn push_message(&self, content: String, window: &mut Window, cx: &mut Context) { - let Some(account) = Account::global(cx) else { + fn push_message(&self, event: &Event, _window: &mut Window, cx: &mut Context) { + let Some(model) = self.room.upgrade() else { return; }; let old_len = self.messages.read(cx).len(); - let profile = account.read(cx).get(); - let message = Message::new(ParsedMessage::new(profile, &content, Timestamp::now())); + let room = model.read(cx); + + let profile = room + .member(&event.pubkey) + .unwrap_or(NostrProfile::new(event.pubkey, Metadata::default())); + + let message = Message::new(ParsedMessage::new( + &profile, + &event.content, + Timestamp::now(), + )); - // Update message list cx.update_entity(&self.messages, |this, cx| { this.extend(vec![message]); cx.notify(); }); - // Reset message input - cx.update_entity(&self.input, |this, cx| { - this.set_loading(false, window, cx); - this.set_disabled(false, window, cx); - this.set_text("", window, cx); - cx.notify(); - }); - self.list_state.splice(old_len..old_len, 1); } @@ -317,12 +306,7 @@ impl Chat { }; let room = model.read(cx); - let pubkeys = room - .members - .iter() - .map(|m| m.public_key()) - .collect::>(); - + let pubkeys = room.public_keys(); let old_len = self.messages.read(cx).len(); let (messages, new_len) = { @@ -360,75 +344,13 @@ impl Chat { self.list_state.splice(old_len..old_len, new_len); } - fn load_new_messages(&mut self, cx: &mut Context) { - let Some(room) = self.room.upgrade() else { - return; - }; - - let subscription = cx.observe(&room, |view, this, cx| { - let room = this.read(cx); - - if room.new_messages.is_empty() { - return; - }; - - let old_messages = view.messages.read(cx); - let old_len = old_messages.len(); - - let items: Vec = this - .read(cx) - .new_messages - .iter() - .filter_map(|event| { - if let Some(profile) = room.member(&event.pubkey) { - let message = Message::new(ParsedMessage::new( - &profile, - &event.content, - event.created_at, - )); - - if !old_messages.iter().any(|old| old == &message) { - Some(message) - } else { - None - } - } else { - None - } - }) - .collect(); - - let total = items.len(); - - cx.update_entity(&view.messages, |this, cx| { - let messages: Vec = items - .into_iter() - .filter_map(|new| { - if !this.iter().any(|old| old == &new) { - Some(new) - } else { - None - } - }) - .collect(); - - this.extend(messages); - cx.notify(); - }); - - view.list_state.splice(old_len..old_len, total); - }); - - self.subscriptions.push(subscription); - } - fn send_message(&mut self, window: &mut Window, cx: &mut Context) { let Some(model) = self.room.upgrade() else { return; }; // Get message - let mut content = self.input.read(cx).text(); + let mut content = self.input.read(cx).text().to_string(); // Get all attaches and merge its with message if let Some(attaches) = self.attaches.read(cx).as_ref() { @@ -438,7 +360,7 @@ impl Chat { .collect::>() .join("\n"); - content = format!("{}\n{}", content, merged).into() + content = format!("{}\n{}", content, merged) } if content.is_empty() { @@ -453,57 +375,25 @@ impl Chat { }); let room = model.read(cx); - // let subject = Tag::from_standardized_without_cell(TagStandard::Subject(room.title.clone())); - let pubkeys = room.public_keys(); - let async_content = content.clone().to_string(); - - let client = get_client(); + let task = room.send_message(content, cx); let window_handle = window.window_handle(); - let (tx, rx) = oneshot::channel::>(); - - // Send message to all pubkeys - cx.background_spawn(async move { - let signer = client.signer().await.unwrap(); - let public_key = signer.get_public_key().await.unwrap(); - - let mut errors = Vec::new(); - - let tags: Vec = pubkeys - .iter() - .filter_map(|pubkey| { - if pubkey != &public_key { - Some(Tag::public_key(*pubkey)) - } else { - None - } - }) - .collect(); - - for pubkey in pubkeys.iter() { - if let Err(e) = client - .send_private_msg(*pubkey, &async_content, tags.clone()) - .await - { - errors.push(e); - } - } - - _ = tx.send(errors); - }) - .detach(); cx.spawn(|this, mut cx| async move { - _ = cx.update_window(window_handle, |_, window, cx| { - _ = this.update(cx, |this, cx| { - this.push_message(content.to_string(), window, cx); - }); - }); - - if let Ok(errors) = rx.await { + if let Ok(msgs) = task.await { _ = cx.update_window(window_handle, |_, window, cx| { - for error in errors.into_iter() { + _ = this.update(cx, |this, cx| { + // Reset message input + cx.update_entity(&this.input, |this, cx| { + this.set_loading(false, window, cx); + this.set_disabled(false, window, cx); + this.set_text("", window, cx); + cx.notify(); + }); + }); + + for item in msgs.into_iter() { window.push_notification( - Notification::error(error.to_string()).title("Message Failed to Send"), + Notification::error(item).title("Message Failed to Send"), cx, ); } diff --git a/crates/app/src/views/sidebar/compose.rs b/crates/app/src/views/sidebar/compose.rs index 71e9bb8..cbf1185 100644 --- a/crates/app/src/views/sidebar/compose.rs +++ b/crates/app/src/views/sidebar/compose.rs @@ -1,4 +1,3 @@ -use async_utility::task::spawn; use chats::{registry::ChatRegistry, room::Room}; use common::{profile::NostrProfile, utils::random_name}; use gpui::{ @@ -286,18 +285,16 @@ impl Compose { let (tx, rx) = oneshot::channel::>(); cx.background_spawn(async move { - spawn(async move { - if let Ok(profile) = nip05::profile(&content, None).await { - let metadata = (client - .fetch_metadata(profile.public_key, Duration::from_secs(2)) - .await) - .unwrap_or_default(); + if let Ok(profile) = nip05::profile(&content, None).await { + let metadata = (client + .fetch_metadata(profile.public_key, Duration::from_secs(2)) + .await) + .unwrap_or_default(); - _ = tx.send(Some(NostrProfile::new(profile.public_key, metadata))); - } else { - _ = tx.send(None); - } - }); + _ = tx.send(Some(NostrProfile::new(profile.public_key, metadata))); + } else { + _ = tx.send(None); + } }) .detach(); diff --git a/crates/chats/src/registry.rs b/crates/chats/src/registry.rs index ddf1771..da70f5d 100644 --- a/crates/chats/src/registry.rs +++ b/crates/chats/src/registry.rs @@ -1,4 +1,4 @@ -use crate::room::Room; +use crate::room::{IncomingEvent, Room}; use anyhow::anyhow; use common::{last_seen::LastSeen, utils::room_hash}; use gpui::{App, AppContext, Context, Entity, Global, WeakEntity}; @@ -175,7 +175,7 @@ impl ChatRegistry { if let Some(last_seen) = Rc::get_mut(&mut this.last_seen) { *last_seen = LastSeen(event.created_at); } - this.new_messages.push(event); + cx.emit(IncomingEvent { event }); cx.notify(); }); @@ -184,8 +184,8 @@ impl ChatRegistry { cx.notify(); } else { - let mut rooms = self.rooms.write().unwrap(); let new_room = Room::new(&event, cx); + let mut rooms = self.rooms.write().unwrap(); rooms.insert(0, new_room); cx.notify(); diff --git a/crates/chats/src/room.rs b/crates/chats/src/room.rs index b914660..ac12aee 100644 --- a/crates/chats/src/room.rs +++ b/crates/chats/src/room.rs @@ -1,27 +1,27 @@ -use common::{ - last_seen::LastSeen, - profile::NostrProfile, - utils::{random_name, room_hash}, -}; -use gpui::{App, AppContext, Entity, SharedString}; +use anyhow::Error; +use common::{last_seen::LastSeen, profile::NostrProfile, utils::room_hash}; +use gpui::{App, AppContext, Entity, EventEmitter, SharedString, Task}; use nostr_sdk::prelude::*; use smallvec::{smallvec, SmallVec}; use state::get_client; use std::{collections::HashSet, rc::Rc}; +#[derive(Debug, Clone)] +pub struct IncomingEvent { + pub event: Event, +} + pub struct Room { pub id: u64, pub last_seen: Rc, - /// Subject of the room (Nostr) - pub title: String, - /// Display name of the room (used for display purposes in Coop) - pub display_name: Option, + /// Subject of the room + pub name: Option, /// All members of the room pub members: SmallVec<[NostrProfile; 2]>, - /// Store all new messages - pub new_messages: Vec, } +impl EventEmitter for Room {} + impl PartialEq for Room { fn eq(&self, other: &Self) -> bool { self.id == other.id @@ -32,78 +32,47 @@ impl Room { pub fn new(event: &Event, cx: &mut App) -> Entity { let id = room_hash(event); let last_seen = Rc::new(LastSeen(event.created_at)); - // Get the subject from the event's tags, or create a random subject if none is found - let title = if let Some(tag) = event.tags.find(TagKind::Subject) { - tag.content() - .map(|s| s.to_owned()) - .unwrap_or(random_name(2)) + + // Get the subject from the event's tags + let name = if let Some(tag) = event.tags.find(TagKind::Subject) { + tag.content().map(|s| s.to_owned().into()) } else { - random_name(2) + None }; + // Create a task for loading metadata + let load_metadata = Self::load_metadata(event, cx); + let room = cx.new(|cx| { let this = Self { id, last_seen, - title, - display_name: None, + name, members: smallvec![], - new_messages: vec![], }; - let mut pubkeys = vec![]; - - // Get all pubkeys from event's tags - pubkeys.extend(event.tags.public_keys().collect::>()); - pubkeys.push(event.pubkey); - - let client = get_client(); - let (tx, rx) = oneshot::channel::>(); - - cx.background_spawn(async move { - let signer = client.signer().await.unwrap(); - let signer_pubkey = signer.get_public_key().await.unwrap(); - let mut profiles = vec![]; - - for public_key in pubkeys.into_iter() { - if let Ok(result) = client.database().metadata(public_key).await { - let metadata = result.unwrap_or_default(); - let profile = NostrProfile::new(public_key, metadata); - - if public_key == signer_pubkey { - profiles.push(profile); - } else { - profiles.insert(0, profile); - } - } - } - - _ = tx.send(profiles); - }) - .detach(); - cx.spawn(|this, cx| async move { - if let Ok(profiles) = rx.await { + if let Ok(profiles) = load_metadata.await { _ = cx.update(|cx| { - let display_name = if profiles.len() > 2 { - let merged = profiles - .iter() - .take(2) - .map(|profile| profile.name().to_string()) - .collect::>() - .join(", "); - - let name: SharedString = - format!("{}, +{}", merged, profiles.len() - 2).into(); - - Some(name) - } else { - None - }; - _ = this.update(cx, |this: &mut Room, cx| { + // Update the room's name if it's not already set + if this.name.is_none() { + let mut name = profiles + .iter() + .take(2) + .map(|profile| profile.name().to_string()) + .collect::>() + .join(", "); + + if profiles.len() > 2 { + name = format!("{}, +{}", name, profiles.len() - 2); + } + + this.name = Some(name.into()) + }; + // Update the room's members this.members.extend(profiles); - this.display_name = display_name; + cx.notify(); }); }); @@ -141,7 +110,7 @@ impl Room { /// Get room's display name pub fn name(&self) -> Option { - self.display_name.clone() + self.name.clone() } /// Determine if room is a group @@ -158,4 +127,85 @@ impl Room { pub fn ago(&self) -> SharedString { self.last_seen.ago() } + + /// Send message to all room's members + pub fn send_message(&self, content: String, cx: &App) -> Task, Error>> { + let client = get_client(); + let pubkeys = self.public_keys(); + + cx.background_spawn(async move { + let signer = client.signer().await?; + let public_key = signer.get_public_key().await?; + + let mut msg = Vec::new(); + + let tags: Vec = pubkeys + .iter() + .filter_map(|pubkey| { + if pubkey != &public_key { + Some(Tag::public_key(*pubkey)) + } else { + None + } + }) + .collect(); + + for pubkey in pubkeys.iter() { + if let Err(e) = client + .send_private_msg(*pubkey, &content, tags.clone()) + .await + { + msg.push(e.to_string()); + } + } + + Ok(msg) + }) + } + + /// Load metadata for all members + pub fn load_messages(&self, cx: &App) -> Task> { + let client = get_client(); + let pubkeys = self.public_keys(); + let filter = Filter::new() + .kind(Kind::PrivateDirectMessage) + .authors(pubkeys.iter().copied()) + .pubkeys(pubkeys); + + cx.background_spawn(async move { + let query = client.database().query(filter).await?; + Ok(query) + }) + } + + /// Load metadata for all members + fn load_metadata(event: &Event, cx: &App) -> Task, Error>> { + let client = get_client(); + let mut pubkeys = vec![]; + + // Get all pubkeys from event's tags + pubkeys.extend(event.tags.public_keys().collect::>()); + pubkeys.push(event.pubkey); + + cx.background_spawn(async move { + let signer = client.signer().await?; + let signer_pubkey = signer.get_public_key().await?; + let mut profiles = vec![]; + + for public_key in pubkeys.into_iter() { + if let Ok(result) = client.database().metadata(public_key).await { + let metadata = result.unwrap_or_default(); + let profile = NostrProfile::new(public_key, metadata); + + if public_key == signer_pubkey { + profiles.push(profile); + } else { + profiles.insert(0, profile); + } + } + } + + Ok(profiles) + }) + } }