From 7fb9eb993022aac23876ddee919876fd0ffc3e51 Mon Sep 17 00:00:00 2001 From: reya Date: Tue, 6 Jan 2026 10:17:25 +0700 Subject: [PATCH] wip --- crates/chat/src/lib.rs | 173 ++++++++------- crates/chat_ui/src/lib.rs | 356 +++++++++++-------------------- crates/coop/src/chatspace.rs | 11 +- crates/coop/src/login/mod.rs | 23 +- crates/coop/src/sidebar/mod.rs | 147 ++++++------- crates/coop/src/views/compose.rs | 3 +- 6 files changed, 315 insertions(+), 398 deletions(-) diff --git a/crates/chat/src/lib.rs b/crates/chat/src/lib.rs index 5e96710..560a2aa 100644 --- a/crates/chat/src/lib.rs +++ b/crates/chat/src/lib.rs @@ -10,7 +10,7 @@ use common::{EventUtils, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT}; use flume::Sender; use fuzzy_matcher::skim::SkimMatcherV2; use fuzzy_matcher::FuzzyMatcher; -use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Task}; +use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Task, WeakEntity}; use nostr_sdk::prelude::*; use settings::AppSettings; use smallvec::{smallvec, SmallVec}; @@ -30,6 +30,28 @@ struct GlobalChatRegistry(Entity); impl Global for GlobalChatRegistry {} +/// Chat event. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum ChatEvent { + /// An event to open a room by its ID + OpenRoom(u64), + /// An event to close a room by its ID + CloseRoom(u64), + /// An event to notify UI about a new chat request + Ping, +} + +/// Channel signal. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +enum NostrEvent { + /// Message received from relay pool + Message(NewMessage), + /// Unwrapping status + Unwrapping(bool), + /// Eose received from relay pool + Eose, +} + /// Chat Registry #[derive(Debug)] pub struct ChatRegistry { @@ -40,26 +62,7 @@ pub struct ChatRegistry { loading: bool, /// Tasks for asynchronous operations - _tasks: SmallVec<[Task<()>; 4]>, -} - -/// Chat event. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub enum ChatEvent { - OpenRoom(u64), - CloseRoom(u64), - NewRequest(RoomKind), -} - -/// Channel signal. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -enum Signal { - /// Message received from relay pool - Message(NewMessage), - /// Loading status of the registry - Loading(bool), - /// Eose received from relay pool - Eose, + _tasks: SmallVec<[Task<()>; 3]>, } impl EventEmitter for ChatRegistry {} @@ -84,7 +87,7 @@ impl ChatRegistry { let status = Arc::new(AtomicBool::new(true)); // Channel for communication between nostr and gpui - let (tx, rx) = flume::bounded::(2048); + let (tx, rx) = flume::bounded::(2048); let mut tasks = smallvec![]; @@ -102,7 +105,7 @@ impl ChatRegistry { tasks.push( // Handle unwrapping progress cx.background_spawn( - async move { Self::handle_unwrapping(&client, &status, &tx).await }, + async move { Self::unwrapping_status(&client, &status, &tx).await }, ), ); @@ -111,19 +114,19 @@ impl ChatRegistry { cx.spawn(async move |this, cx| { while let Ok(message) = rx.recv_async().await { match message { - Signal::Message(message) => { + NostrEvent::Message(message) => { this.update(cx, |this, cx| { this.new_message(message, cx); }) .ok(); } - Signal::Eose => { + NostrEvent::Eose => { this.update(cx, |this, cx| { this.get_rooms(cx); }) .ok(); } - Signal::Loading(status) => { + NostrEvent::Unwrapping(status) => { this.update(cx, |this, cx| { this.set_loading(status, cx); this.get_rooms(cx); @@ -142,7 +145,11 @@ impl ChatRegistry { } } - async fn handle_notifications(client: &Client, loading: &Arc, tx: &Sender) { + async fn handle_notifications( + client: &Client, + loading: &Arc, + tx: &Sender, + ) { let initialized_at = Timestamp::now(); let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION); @@ -193,7 +200,7 @@ impl ChatRegistry { if !sent_by_coop { let new_message = NewMessage::new(event.id, rumor); - let signal = Signal::Message(new_message); + let signal = NostrEvent::Message(new_message); if let Err(e) = tx.send_async(signal).await { log::error!("Failed to send signal: {}", e); @@ -212,7 +219,7 @@ impl ChatRegistry { } RelayMessage::EndOfStoredEvents(id) => { if id.as_ref() == &subscription_id { - if let Err(e) = tx.send_async(Signal::Eose).await { + if let Err(e) = tx.send_async(NostrEvent::Eose).await { log::error!("Failed to send signal: {}", e); } } @@ -222,7 +229,7 @@ impl ChatRegistry { } } - async fn handle_unwrapping(client: &Client, status: &Arc, tx: &Sender) { + async fn unwrapping_status(client: &Client, status: &Arc, tx: &Sender) { let loop_duration = Duration::from_secs(20); let mut is_start_processing = false; let mut total_loops = 0; @@ -238,7 +245,7 @@ impl ChatRegistry { _ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed); // Send loading signal - if let Err(e) = tx.send_async(Signal::Loading(true)).await { + if let Err(e) = tx.send_async(NostrEvent::Unwrapping(true)).await { log::error!("Failed to send signal: {}", e); } } else { @@ -246,7 +253,7 @@ impl ChatRegistry { // Wait until after 2 loops to prevent exiting early while events are still being processed if is_start_processing && total_loops >= 2 { // Send loading signal - if let Err(e) = tx.send_async(Signal::Loading(false)).await { + if let Err(e) = tx.send_async(NostrEvent::Unwrapping(false)).await { log::error!("Failed to send signal: {}", e); } // Reset the counter @@ -270,12 +277,12 @@ impl ChatRegistry { cx.notify(); } - /// Get a room by its ID. - pub fn room(&self, id: &u64, cx: &App) -> Option> { + /// Get a weak reference to a room by its ID. + pub fn room(&self, id: &u64, cx: &App) -> Option> { self.rooms .iter() - .find(|model| model.read(cx).id == *id) - .cloned() + .find(|this| &this.read(cx).id == id) + .map(|this| this.downgrade()) } /// Get all ongoing rooms. @@ -297,11 +304,30 @@ impl ChatRegistry { } /// Add a new room to the start of list. - pub fn add_room(&mut self, room: Entity, cx: &mut Context) { - self.rooms.insert(0, room); + pub fn add_room(&mut self, room: I, cx: &mut Context) + where + I: Into, + { + self.rooms.insert(0, cx.new(|_| room.into())); cx.notify(); } + /// Emit an open room event. + /// If the room is new, add it to the registry. + pub fn emit_room(&mut self, room: WeakEntity, cx: &mut Context) { + if let Some(room) = room.upgrade() { + let id = room.read(cx).id; + + // If the room is new, add it to the registry. + if !self.rooms.iter().any(|r| r.read(cx).id == id) { + self.rooms.insert(0, room); + } + + // Emit the open room event. + cx.emit(ChatEvent::OpenRoom(id)); + } + } + /// Close a room. pub fn close_room(&mut self, id: u64, cx: &mut Context) { if self.rooms.iter().any(|r| r.read(cx).id == id) { @@ -345,17 +371,6 @@ impl ChatRegistry { cx.notify(); } - /// Push a new room to the chat registry - pub fn push_room(&mut self, room: Entity, cx: &mut Context) { - let id = room.read(cx).id; - - if !self.rooms.iter().any(|r| r.read(cx).id == id) { - self.add_room(room, cx); - } - - cx.emit(ChatEvent::OpenRoom(id)); - } - /// Extend the registry with new rooms. fn extend_rooms(&mut self, rooms: HashSet, cx: &mut Context) { let mut room_map: HashMap = self @@ -506,39 +521,45 @@ impl ChatRegistry { /// If the room doesn't exist, it will be created. /// Updates room ordering based on the most recent messages. pub fn new_message(&mut self, message: NewMessage, cx: &mut Context) { - let id = message.rumor.uniq_id(); - let author = message.rumor.pubkey; let nostr = NostrRegistry::global(cx); + // Get the unique id + let id = message.rumor.uniq_id(); + // Get the author + let author = message.rumor.pubkey; - if let Some(room) = self.rooms.iter().find(|room| room.read(cx).id == id) { - let is_new_event = message.rumor.created_at > room.read(cx).created_at; - let created_at = message.rumor.created_at; + match self.rooms.iter().find(|room| room.read(cx).id == id) { + Some(room) => { + let new_message = message.rumor.created_at > room.read(cx).created_at; + let created_at = message.rumor.created_at; - // Update room - room.update(cx, |this, cx| { - if is_new_event { - this.set_created_at(created_at, cx); + // Update room + room.update(cx, |this, cx| { + // Update the last timestamp if the new message is newer + if new_message { + this.set_created_at(created_at, cx); + } + + // Set this room is ongoing if the new message is from current user + if author == nostr.read(cx).identity().read(cx).public_key() { + this.set_ongoing(cx); + } + + // Emit the new message to the room + this.emit_message(message, cx); + }); + + // Resort all rooms in the registry by their created at (after updated) + if new_message { + self.sort(cx); } - - // Set this room is ongoing if the new message is from current user - if author == nostr.read(cx).identity().read(cx).public_key() { - this.set_ongoing(cx); - } - - // Emit the new message to the room - this.emit_message(message, cx); - }); - - // Resort all rooms in the registry by their created at (after updated) - if is_new_event { - self.sort(cx); } - } else { - // Push the new room to the front of the list - self.add_room(cx.new(|_| Room::from(&message.rumor)), cx); + None => { + // Push the new room to the front of the list + self.add_room(&message.rumor, cx); - // Notify the UI about the new room - cx.emit(ChatEvent::NewRequest(RoomKind::default())); + // Notify the UI about the new room + cx.emit(ChatEvent::Ping); + } } } diff --git a/crates/chat_ui/src/lib.rs b/crates/chat_ui/src/lib.rs index 36f6609..96058c3 100644 --- a/crates/chat_ui/src/lib.rs +++ b/crates/chat_ui/src/lib.rs @@ -7,10 +7,10 @@ use common::{nip96_upload, RenderedProfile, RenderedTimestamp}; use gpui::prelude::FluentBuilder; use gpui::{ div, img, list, px, red, relative, rems, svg, white, AnyElement, App, AppContext, - ClipboardItem, Context, Element, Entity, EventEmitter, Flatten, FocusHandle, Focusable, + ClipboardItem, Context, Entity, EventEmitter, Flatten, FocusHandle, Focusable, InteractiveElement, IntoElement, ListAlignment, ListOffset, ListState, MouseButton, ObjectFit, ParentElement, PathPromptOptions, Render, RetainAllImageCache, SharedString, - StatefulInteractiveElement, Styled, StyledImage, Subscription, Task, Window, + StatefulInteractiveElement, Styled, StyledImage, Subscription, Task, WeakEntity, Window, }; use gpui_tokio::Tokio; use indexset::{BTreeMap, BTreeSet}; @@ -27,7 +27,6 @@ use ui::button::{Button, ButtonVariants}; use ui::context_menu::ContextMenuExt; use ui::dock_area::panel::{Panel, PanelEvent}; use ui::input::{InputEvent, InputState, TextInput}; -use ui::modal::ModalButtonProps; use ui::notification::Notification; use ui::popup_menu::PopupMenuExt; use ui::{ @@ -43,39 +42,52 @@ mod emoji; mod subject; mod text; -pub fn init(room: Entity, window: &mut Window, cx: &mut App) -> Entity { +pub fn init(room: WeakEntity, window: &mut Window, cx: &mut App) -> Entity { cx.new(|cx| ChatPanel::new(room, window, cx)) } +/// Chat Panel pub struct ChatPanel { - // Chat Room - room: Entity, - - // Messages - list_state: ListState, - messages: BTreeSet, - rendered_texts_by_id: BTreeMap, - reports_by_id: BTreeMap>, - - // New Message - input: Entity, - replies_to: Entity>, - - // Media Attachment - attachments: Entity>, - uploading: bool, - - // Panel id: SharedString, focus_handle: FocusHandle, image_cache: Entity, - _subscriptions: SmallVec<[Subscription; 3]>, - _tasks: SmallVec<[Task<()>; 2]>, + /// Chat Room + room: WeakEntity, + + /// Message list state + list_state: ListState, + + /// All messages + messages: BTreeSet, + + /// Mapping message ids to their rendered texts + rendered_texts_by_id: BTreeMap, + + /// Mapping message ids to their reports + reports_by_id: BTreeMap>, + + /// Input state + input: Entity, + + /// Replies to + replies_to: Entity>, + + /// Media Attachment + attachments: Entity>, + + /// Upload state + uploading: bool, + + /// Async operations + tasks: SmallVec<[Task<()>; 2]>, + + /// Event subscriptions + _subscriptions: SmallVec<[Subscription; 2]>, } impl ChatPanel { - pub fn new(room: Entity, window: &mut Window, cx: &mut Context) -> Self { + pub fn new(room: WeakEntity, window: &mut Window, cx: &mut Context) -> Self { let input = cx.new(|cx| { InputState::new(window, cx) .placeholder("Message...") @@ -87,43 +99,63 @@ impl ChatPanel { let attachments = cx.new(|_| vec![]); let replies_to = cx.new(|_| HashSet::new()); - let id = room.read(cx).id.to_string().into(); let messages = BTreeSet::from([Message::system()]); let list_state = ListState::new(messages.len(), ListAlignment::Bottom, px(1024.)); - let connect = room.read(cx).connect(cx); - let get_messages = room.read(cx).get_messages(cx); + let id: SharedString = room + .read_with(cx, |this, _cx| this.id.to_string().into()) + .unwrap_or("Unknown".into()); let mut subscriptions = smallvec![]; let mut tasks = smallvec![]; - tasks.push( - // Get messaging relays and encryption keys announcement for each member - cx.background_spawn(async move { - if let Err(e) = connect.await { - log::error!("Failed to initialize room: {}", e); - } - }), - ); + if let Ok(connect) = room.read_with(cx, |this, cx| this.connect(cx)) { + tasks.push( + // Get messaging relays and encryption keys announcement for each member + cx.background_spawn(async move { + if let Err(e) = connect.await { + log::error!("Failed to initialize room: {}", e); + } + }), + ); + }; - tasks.push( - // Load all messages belonging to this room - cx.spawn_in(window, async move |this, cx| { - let result = get_messages.await; + if let Ok(get_messages) = room.read_with(cx, |this, cx| this.get_messages(cx)) { + tasks.push( + // Load all messages belonging to this room + cx.spawn_in(window, async move |this, cx| { + let result = get_messages.await; - this.update_in(cx, |this, window, cx| { - match result { - Ok(events) => { - this.insert_messages(&events, cx); + this.update_in(cx, |this, window, cx| { + match result { + Ok(events) => { + this.insert_messages(&events, cx); + } + Err(e) => { + window.push_notification(e.to_string(), cx); + } + }; + }) + .ok(); + }), + ); + } + + if let Some(room) = room.upgrade() { + subscriptions.push( + // Subscribe to room events + cx.subscribe_in(&room, window, move |this, _room, event, window, cx| { + match event { + RoomEvent::Incoming(message) => { + this.insert_message(message, false, cx); } - Err(e) => { - window.push_notification(e.to_string(), cx); + RoomEvent::Reload => { + this.load_messages(window, cx); } }; - }) - .ok(); - }), - ); + }), + ); + } subscriptions.push( // Subscribe to input events @@ -138,32 +170,6 @@ impl ChatPanel { ), ); - subscriptions.push( - // Subscribe to room events - cx.subscribe_in(&room, window, move |this, _, signal, window, cx| { - match signal { - RoomEvent::Incoming(message) => { - this.insert_message(message, false, cx); - } - RoomEvent::Reload => { - this.load_messages(window, cx); - } - }; - }), - ); - - subscriptions.push( - // Observe when user close chat panel - cx.on_release_in(window, move |this, window, cx| { - this.messages.clear(); - this.rendered_texts_by_id.clear(); - this.reports_by_id.clear(); - this.image_cache.update(cx, |this, cx| { - this.clear(window, cx); - }); - }), - ); - Self { id, messages, @@ -178,17 +184,14 @@ impl ChatPanel { image_cache: RetainAllImageCache::new(cx), focus_handle: cx.focus_handle(), _subscriptions: subscriptions, - _tasks: tasks, + tasks, } } /// Load all messages belonging to this room fn load_messages(&mut self, window: &mut Window, cx: &mut Context) { - let get_messages = self.room.read(cx).get_messages(cx); - - self._tasks.push( - // Run the task in the background - cx.spawn_in(window, async move |this, cx| { + if let Ok(get_messages) = self.room.read_with(cx, |this, cx| this.get_messages(cx)) { + self.tasks.push(cx.spawn_in(window, async move |this, cx| { let result = get_messages.await; this.update_in(cx, |this, window, cx| { @@ -202,12 +205,13 @@ impl ChatPanel { }; }) .ok(); - }), - ); + })); + } } /// Get user input content and merged all attachments fn input_content(&self, cx: &Context) -> String { + // Get input's value let mut content = self.input.read(cx).value().trim().to_string(); // Get all attaches and merge its with message @@ -241,19 +245,14 @@ impl ChatPanel { return; } - // Temporary disable the message input - self.input.update(cx, |this, cx| { - this.set_loading(false, cx); - this.set_disabled(false, cx); - this.set_value("", window, cx); - }); + // Get the current room entity + let Some(room) = self.room.upgrade().map(|this| this.read(cx)) else { + return; + }; // Get replies_to if it's present let replies: Vec = self.replies_to.read(cx).iter().copied().collect(); - // Get the current room entity - let room = self.room.read(cx); - // Create a temporary message for optimistic update let rumor = room.create_message(&content, replies.as_ref(), cx); let rumor_id = rumor.id.unwrap(); @@ -272,12 +271,14 @@ impl ChatPanel { this.update_in(cx, |this, window, cx| { this.remove_all_replies(cx); this.remove_all_attachments(cx); + // Reset the input to its default state this.input.update(cx, |this, cx| { this.set_loading(false, cx); this.set_disabled(false, cx); this.set_value("", window, cx); }); + // Update the message list this.insert_message(&rumor, true, cx); }) @@ -285,16 +286,15 @@ impl ChatPanel { }) .detach(); - self._tasks.push( - // Continue sending the message in the background - cx.spawn_in(window, async move |this, cx| { - let result = send_message.await; + self.tasks.push(cx.spawn_in(window, async move |this, cx| { + let result = send_message.await; - this.update_in(cx, |this, window, cx| { - match result { - Ok(reports) => { - // Update room's status - this.room.update(cx, |this, cx| { + this.update_in(cx, |this, window, cx| { + match result { + Ok(reports) => { + // Update room's status + this.room + .update(cx, |this, cx| { if this.kind != RoomKind::Ongoing { // Update the room kind to ongoing, // but keep the room kind if send failed @@ -303,50 +303,21 @@ impl ChatPanel { cx.notify(); } } - }); + }) + .ok(); - // Insert the sent reports - this.reports_by_id.insert(rumor_id, reports); + // Insert the sent reports + this.reports_by_id.insert(rumor_id, reports); - cx.notify(); - } - Err(e) => { - window.push_notification(e.to_string(), cx); - } + cx.notify(); } - }) - .ok(); - }), - ); - } - - /// Resend a failed message - #[allow(dead_code)] - fn resend_message(&mut self, id: &EventId, window: &mut Window, cx: &mut Context) { - if let Some(reports) = self.reports_by_id.get(id).cloned() { - let id_clone = id.to_owned(); - let resend = self.room.read(cx).resend_message(reports, cx); - - cx.spawn_in(window, async move |this, cx| { - let result = resend.await; - - this.update_in(cx, |this, window, cx| { - match result { - Ok(reports) => { - this.reports_by_id.entry(id_clone).and_modify(|this| { - *this = reports; - }); - cx.notify(); - } - Err(e) => { - window.push_notification(Notification::error(e.to_string()), cx); - } - }; - }) - .ok(); + Err(e) => { + window.push_notification(e.to_string(), cx); + } + } }) - .detach(); - } + .ok(); + })); } /// Insert a message into the chat panel @@ -379,13 +350,6 @@ impl ChatPanel { } } - /// Insert a warning message into the chat panel - #[allow(dead_code)] - fn insert_warning(&mut self, content: impl Into, cx: &mut Context) { - let m = Message::warning(content.into()); - self.insert_message(m, true, cx); - } - /// Check if a message failed to send by its ID fn is_sent_failed(&self, id: &EventId) -> bool { self.reports_by_id @@ -417,11 +381,6 @@ impl ChatPanel { }) } - fn profile(&self, public_key: &PublicKey, cx: &Context) -> Profile { - let persons = PersonRegistry::global(cx); - persons.read(cx).get_person(public_key, cx) - } - fn scroll_to(&self, id: EventId) { if let Some(ix) = self.messages.iter().position(|m| { if let Message::User(msg) = m { @@ -547,6 +506,11 @@ impl ChatPanel { }); } + fn profile(&self, public_key: &PublicKey, cx: &Context) -> Profile { + let persons = PersonRegistry::global(cx); + persons.read(cx).get_person(public_key, cx) + } + fn render_announcement(&self, ix: usize, cx: &Context) -> AnyElement { v_flex() .id(ix) @@ -1158,61 +1122,6 @@ impl ChatPanel { items } - - fn subject_button(&self, cx: &App) -> Button { - let room = self.room.downgrade(); - let subject = self - .room - .read(cx) - .subject - .as_ref() - .map(|subject| subject.to_string()); - - Button::new("subject") - .icon(IconName::Edit) - .tooltip("Change the subject of the conversation") - .on_click(move |_, window, cx| { - let view = subject::init(subject.clone(), window, cx); - let room = room.clone(); - let weak_view = view.downgrade(); - - window.open_modal(cx, move |this, _window, _cx| { - let room = room.clone(); - let weak_view = weak_view.clone(); - - this.confirm() - .title("Change the subject of the conversation") - .child(view.clone()) - .button_props(ModalButtonProps::default().ok_text("Change")) - .on_ok(move |_, _window, cx| { - if let Ok(subject) = - weak_view.read_with(cx, |this, cx| this.new_subject(cx)) - { - room.update(cx, |this, cx| { - this.set_subject(subject, cx); - }) - .ok(); - } - // true to close the modal - true - }) - }); - }) - } - - fn reload_button(&self, _cx: &App) -> Button { - let room = self.room.downgrade(); - - Button::new("reload") - .icon(IconName::Refresh) - .tooltip("Reload") - .on_click(move |_ev, window, cx| { - _ = room.update(cx, |this, cx| { - this.emit_refresh(cx); - window.push_notification("Reloaded", cx); - }); - }) - } } impl Panel for ChatPanel { @@ -1221,24 +1130,19 @@ impl Panel for ChatPanel { } fn title(&self, cx: &App) -> AnyElement { - self.room.read_with(cx, |this, cx| { - let proxy = AppSettings::get_proxy_user_avatars(cx); - let label = this.display_name(cx); - let url = this.display_image(proxy, cx); + self.room + .read_with(cx, |this, cx| { + let proxy = AppSettings::get_proxy_user_avatars(cx); + let label = this.display_name(cx); + let url = this.display_image(proxy, cx); - h_flex() - .gap_1p5() - .child(Avatar::new(url).size(rems(1.25))) - .child(label) - .into_any() - }) - } - - fn toolbar_buttons(&self, _window: &Window, cx: &App) -> Vec