From 3c15e74e5652dfdb2d4c38bd85c0fd0cd3c1ae32 Mon Sep 17 00:00:00 2001 From: reya Date: Sun, 26 Jan 2025 08:31:29 +0700 Subject: [PATCH] wip: refactor --- Cargo.lock | 1 + crates/app/Cargo.toml | 1 + crates/app/src/main.rs | 234 +++++++++++++++++++----------------- crates/app/src/views/app.rs | 118 +++++++++++------- crates/registry/src/app.rs | 13 +- crates/registry/src/chat.rs | 123 +++++++++---------- 6 files changed, 266 insertions(+), 224 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3074f89..a018d39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1089,6 +1089,7 @@ dependencies = [ "dirs 5.0.1", "gpui", "itertools 0.13.0", + "log", "nostr-sdk", "registry", "reqwest_client", diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index f54151f..5a4f87f 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -29,3 +29,4 @@ rust-embed.workspace = true smol.workspace = true tracing-subscriber = { version = "0.3.18", features = ["fmt"] } +log = "0.4" diff --git a/crates/app/src/main.rs b/crates/app/src/main.rs index 645913c..52f44d8 100644 --- a/crates/app/src/main.rs +++ b/crates/app/src/main.rs @@ -9,6 +9,7 @@ use gpui::{ }; #[cfg(target_os = "linux")] use gpui::{WindowBackgroundAppearance, WindowDecorations}; +use log::warn; use nostr_sdk::prelude::*; use registry::{app::AppRegistry, chat::ChatRegistry, contact::Contact}; use state::{get_client, initialize_client}; @@ -42,7 +43,6 @@ async fn main() { // Get client let client = get_client(); - let mut notifications = client.notifications(); // Add some bootstrap relays _ = client.add_relay("wss://relay.damus.io/").await; @@ -57,120 +57,15 @@ async fn main() { // Signal let (signal_tx, mut signal_rx) = mpsc::channel::(4096); - let (mta_tx, mut mta_rx) = mpsc::channel::(4096); + let (mta_tx, mta_rx) = mpsc::channel::(4096); - // Handle notification from Relays + // Handle notifications from relays // Send notify back to GPUI - tokio::spawn(async move { - let sig = Signature::from_str(FAKE_SIG).unwrap(); - let new_message = SubscriptionId::new(NEW_MESSAGE_SUB_ID); - let all_messages = SubscriptionId::new(ALL_MESSAGES_SUB_ID); - - while let Ok(notification) = notifications.recv().await { - if let RelayPoolNotification::Message { message, .. } = notification { - if let RelayMessage::Event { - event, - subscription_id, - } = message - { - match event.kind { - Kind::GiftWrap => { - match client.unwrap_gift_wrap(&event).await { - Ok(UnwrappedGift { mut rumor, sender }) => { - // Request metadata - if let Err(e) = mta_tx.send(sender).await { - println!("Send error: {}", e) - }; - - // Compute event id if not exist - rumor.ensure_id(); - - if let Some(id) = rumor.id { - let ev = Event::new( - id, - rumor.pubkey, - rumor.created_at, - rumor.kind, - rumor.tags, - rumor.content, - sig, - ); - - // Save rumor to database to further query - if let Err(e) = client.database().save_event(&ev).await { - println!("Save error: {}", e); - } - - // Send event back to channel - if subscription_id == new_message { - if let Err(e) = signal_tx.send(Signal::Event(ev)).await - { - println!("Send error: {}", e) - } - } - } - } - Err(e) => println!("Unwrap error: {}", e), - } - } - Kind::ContactList => { - let public_keys: Vec = - event.tags.public_keys().copied().collect(); - - for public_key in public_keys.into_iter() { - if let Err(e) = mta_tx.send(public_key).await { - println!("Send error: {}", e) - }; - } - } - _ => {} - } - } else if let RelayMessage::EndOfStoredEvents(subscription_id) = message { - if subscription_id == all_messages { - if let Err(e) = signal_tx.send(Signal::Eose).await { - println!("Send error: {}", e) - } - } - } - } - } - }); + tokio::spawn(async move { handle_notifications(client, signal_tx, mta_tx).await }); // Handle metadata request // Merge all requests into single subscription - tokio::spawn(async move { - let queue: Arc>> = Arc::new(Mutex::new(HashSet::new())); - let queue_clone = queue.clone(); - - let (tx, mut rx) = mpsc::channel::(200); - - tokio::spawn(async move { - while let Some(public_key) = mta_rx.recv().await { - queue_clone.lock().await.insert(public_key); - _ = tx.send(public_key).await; - } - }); - - tokio::spawn(async move { - while rx.recv().await.is_some() { - sleep(Duration::from_millis(METADATA_DELAY)).await; - - let authors: Vec = queue.lock().await.drain().collect(); - let total = authors.len(); - - if total > 0 { - let filter = Filter::new() - .authors(authors) - .kind(Kind::Metadata) - .limit(total); - - if let Err(e) = client.sync(filter, &SyncOptions::default()).await { - println!("Error: {}", e); - } - } - } - }); - }); + tokio::spawn(async move { handle_metadata(client, mta_rx).await }); App::new() .with_assets(Assets) @@ -189,14 +84,14 @@ async fn main() { // Spawn a thread to handle Nostr events cx.spawn(|async_cx| async move { - let (tx, rx) = smol::channel::unbounded::(); + let (tx, rx) = smol::channel::bounded::(4096); async_cx .background_executor() .spawn(async move { while let Some(signal) = signal_rx.recv().await { if let Err(e) = tx.send(signal).await { - println!("Send error: {}", e) + warn!("Send error: {}", e) } } }) @@ -206,7 +101,7 @@ async fn main() { match signal { Signal::Eose => { _ = async_cx.update_global::(|chat, cx| { - chat.init(cx); + chat.load(cx); }); } Signal::Event(event) => { @@ -293,6 +188,119 @@ async fn main() { }); } +async fn handle_notifications( + client: &Client, + signal_tx: mpsc::Sender, + mta_tx: mpsc::Sender, +) { + let mut notifications = client.notifications(); + let sig = Signature::from_str(FAKE_SIG).unwrap(); + let new_message = SubscriptionId::new(NEW_MESSAGE_SUB_ID); + let all_messages = SubscriptionId::new(ALL_MESSAGES_SUB_ID); + + while let Ok(notification) = notifications.recv().await { + if let RelayPoolNotification::Message { message, .. } = notification { + if let RelayMessage::Event { + event, + subscription_id, + } = message + { + match event.kind { + Kind::GiftWrap => { + match client.unwrap_gift_wrap(&event).await { + Ok(UnwrappedGift { mut rumor, sender }) => { + // Request metadata + if let Err(e) = mta_tx.send(sender).await { + warn!("Send error: {}", e) + }; + + // Compute event id if not exist + rumor.ensure_id(); + + if let Some(id) = rumor.id { + let ev = Event::new( + id, + rumor.pubkey, + rumor.created_at, + rumor.kind, + rumor.tags, + rumor.content, + sig, + ); + + // Save rumor to database to further query + if let Err(e) = client.database().save_event(&ev).await { + warn!("Save error: {}", e); + } + + // Send event back to channel + if subscription_id == new_message { + if let Err(e) = signal_tx.send(Signal::Event(ev)).await { + warn!("Send error: {}", e) + } + } + } + } + Err(e) => warn!("Unwrap error: {}", e), + } + } + Kind::ContactList => { + let public_keys: Vec = + event.tags.public_keys().copied().collect(); + + for public_key in public_keys.into_iter() { + if let Err(e) = mta_tx.send(public_key).await { + warn!("Send error: {}", e) + }; + } + } + _ => {} + } + } else if let RelayMessage::EndOfStoredEvents(subscription_id) = message { + if subscription_id == all_messages { + if let Err(e) = signal_tx.send(Signal::Eose).await { + warn!("Send error: {}", e) + }; + } + } + } + } +} + +async fn handle_metadata(client: &'static Client, mut mta_rx: mpsc::Receiver) { + let queue: Arc>> = Arc::new(Mutex::new(HashSet::new())); + let queue_clone = Arc::clone(&queue); + + let (tx, mut rx) = mpsc::channel::(200); + + tokio::spawn(async move { + while let Some(public_key) = mta_rx.recv().await { + queue_clone.lock().await.insert(public_key); + _ = tx.send(public_key).await; + } + }); + + tokio::spawn(async move { + while rx.recv().await.is_some() { + sleep(Duration::from_millis(METADATA_DELAY)).await; + + let authors: Vec = queue.lock().await.drain().collect(); + let total = authors.len(); + + if total > 0 { + let filter = Filter::new() + .authors(authors) + .kind(Kind::Metadata) + .limit(total); + + if let Err(e) = client.sync(filter, &SyncOptions::default()).await { + warn!("Error: {}", e); + } + } + } + }); +} + fn quit(_: &Quit, cx: &mut AppContext) { cx.quit(); } diff --git a/crates/app/src/views/app.rs b/crates/app/src/views/app.rs index 39d5cc5..3def19c 100644 --- a/crates/app/src/views/app.rs +++ b/crates/app/src/views/app.rs @@ -1,11 +1,12 @@ use super::{chat::ChatPanel, onboarding::Onboarding, sidebar::Sidebar, welcome::WelcomePanel}; use gpui::{ - actions, div, img, impl_internal_actions, px, svg, Axis, Edges, InteractiveElement, - IntoElement, ObjectFit, ParentElement, Render, Styled, StyledImage, View, ViewContext, - VisualContext, WeakView, WindowContext, + actions, div, img, impl_internal_actions, px, svg, Axis, BorrowAppContext, Edges, + InteractiveElement, IntoElement, ObjectFit, ParentElement, Render, Styled, StyledImage, View, + ViewContext, VisualContext, WeakView, WindowContext, }; use registry::{app::AppRegistry, chat::ChatRegistry, contact::Contact}; use serde::Deserialize; +use state::get_client; use std::sync::Arc; use ui::{ button::{Button, ButtonVariants}, @@ -68,48 +69,6 @@ impl AppView { AppView { onboarding, dock } } - fn on_panel_action(&mut self, action: &AddPanel, cx: &mut ViewContext) { - match &action.panel { - PanelKind::Room(id) => { - if let Some(weak_room) = cx.global::().room(id, cx) { - if let Some(room) = weak_room.upgrade() { - let panel = Arc::new(ChatPanel::new(room, cx)); - - self.dock.update(cx, |dock_area, cx| { - dock_area.add_panel(panel, action.position, cx); - }); - } else { - cx.push_notification(( - NotificationType::Error, - "System error. Cannot open this chat room.", - )); - } - } - } - }; - } - - fn render_account(&self, account: Contact) -> impl IntoElement { - Button::new("account") - .ghost() - .xsmall() - .reverse() - .icon(Icon::new(IconName::ChevronDownSmall)) - .child( - img(account.avatar()) - .size_5() - .rounded_full() - .object_fit(ObjectFit::Cover), - ) - .popup_menu(move |this, _cx| { - this.menu("Profile", Box::new(OpenProfile)) - .menu("Contacts", Box::new(OpenContacts)) - .menu("Settings", Box::new(OpenSettings)) - .separator() - .menu("Change account", Box::new(Logout)) - }) - } - fn render_dock(dock_area: WeakView, cx: &mut WindowContext) { let left = DockItem::panel(Arc::new(Sidebar::new(cx))); let center = DockItem::split_with_sizes( @@ -140,6 +99,70 @@ impl AppView { // TODO: support bottom dock? }); } + + fn render_account(&self, account: Contact) -> impl IntoElement { + Button::new("account") + .ghost() + .xsmall() + .reverse() + .icon(Icon::new(IconName::ChevronDownSmall)) + .child( + img(account.avatar()) + .size_5() + .rounded_full() + .object_fit(ObjectFit::Cover), + ) + .popup_menu(move |this, _cx| { + this.menu("Profile", Box::new(OpenProfile)) + .menu("Contacts", Box::new(OpenContacts)) + .menu("Settings", Box::new(OpenSettings)) + .separator() + .menu("Change account", Box::new(Logout)) + }) + } + + fn on_panel_action(&mut self, action: &AddPanel, cx: &mut ViewContext) { + match &action.panel { + PanelKind::Room(id) => { + if let Some(weak_room) = cx.global::().room(id, cx) { + if let Some(room) = weak_room.upgrade() { + let panel = Arc::new(ChatPanel::new(room, cx)); + + self.dock.update(cx, |dock_area, cx| { + dock_area.add_panel(panel, action.position, cx); + }); + } else { + cx.push_notification(( + NotificationType::Error, + "System error. Cannot open this chat room.", + )); + } + } + } + }; + } + + fn on_profile_action(&mut self, _action: &OpenProfile, cx: &mut ViewContext) { + // TODO + } + + fn on_contacts_action(&mut self, _action: &OpenContacts, cx: &mut ViewContext) { + // TODO + } + + fn on_settings_action(&mut self, _action: &OpenSettings, cx: &mut ViewContext) { + // TODO + } + + fn on_logout_action(&mut self, _action: &Logout, cx: &mut ViewContext) { + cx.update_global::(|this, cx| { + this.logout(cx); + // Reset nostr client + cx.background_executor() + .spawn(async move { get_client().reset().await }) + .detach(); + }); + } } impl Render for AppView { @@ -183,7 +206,12 @@ impl Render for AppView { ), ) .child(self.dock.clone()) + // Listener .on_action(cx.listener(Self::on_panel_action)) + .on_action(cx.listener(Self::on_logout_action)) + .on_action(cx.listener(Self::on_profile_action)) + .on_action(cx.listener(Self::on_contacts_action)) + .on_action(cx.listener(Self::on_settings_action)) } else { this.child(TitleBar::new()).child(self.onboarding.clone()) } diff --git a/crates/registry/src/app.rs b/crates/registry/src/app.rs index 7365634..894d743 100644 --- a/crates/registry/src/app.rs +++ b/crates/registry/src/app.rs @@ -15,8 +15,8 @@ impl Global for AppRegistry {} impl AppRegistry { pub fn set_global(cx: &mut AppContext) { - let is_loading = true; let user: Model> = cx.new_model(|_| None); + let is_loading = true; cx.observe(&user, |this, cx| { if let Some(contact) = this.read(cx).as_ref() { @@ -80,6 +80,10 @@ impl AppRegistry { self.user.downgrade() } + pub fn current_user(&self, cx: &WindowContext) -> Option { + self.user.read(cx).clone() + } + pub fn set_user(&mut self, contact: Contact, cx: &mut AppContext) { self.user.update(cx, |this, cx| { *this = Some(contact); @@ -89,7 +93,10 @@ impl AppRegistry { self.is_loading = false; } - pub fn current_user(&self, cx: &WindowContext) -> Option { - self.user.read(cx).clone() + pub fn logout(&mut self, cx: &mut AppContext) { + self.user.update(cx, |this, cx| { + *this = None; + cx.notify(); + }); } } diff --git a/crates/registry/src/chat.rs b/crates/registry/src/chat.rs index 09d301b..b12787c 100644 --- a/crates/registry/src/chat.rs +++ b/crates/registry/src/chat.rs @@ -1,12 +1,12 @@ +use crate::room::Room; +use anyhow::Error; use common::utils::{compare, room_hash}; -use gpui::{AppContext, Context, Global, Model, WeakModel}; +use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel}; use itertools::Itertools; use nostr_sdk::prelude::*; use state::get_client; use std::cmp::Reverse; -use crate::room::Room; - pub struct Inbox { pub rooms: Vec>, pub is_loading: bool, @@ -19,6 +19,37 @@ impl Inbox { is_loading: true, } } + + pub fn current_rooms(&self, cx: &ModelContext) -> Vec { + self.rooms.iter().map(|room| room.read(cx).id).collect() + } + + pub fn load(&mut self, cx: AsyncAppContext) -> Task, Error>> { + cx.background_executor().spawn(async move { + let client = get_client(); + let signer = client.signer().await?; + let public_key = signer.get_public_key().await?; + + let filter = Filter::new() + .kind(Kind::PrivateDirectMessage) + .author(public_key); + + // Get all DM events from database + let events = client.database().query(vec![filter]).await?; + + // Filter result + // - Get unique rooms only + // - Sorted by created_at + let result = events + .into_iter() + .filter(|ev| ev.tags.public_keys().peekable().peek().is_some()) + .unique_by(|ev| room_hash(&ev.tags)) + .sorted_by_key(|ev| Reverse(ev.created_at)) + .collect::>(); + + Ok(result) + }) + } } impl Default for Inbox { @@ -77,72 +108,38 @@ impl ChatRegistry { cx.set_global(Self { inbox }); } - pub fn init(&mut self, cx: &mut AppContext) { - let mut async_cx = cx.to_async(); - let async_inbox = self.inbox.clone(); + pub fn load(&mut self, cx: &mut AppContext) { + self.inbox.update(cx, |this, cx| { + let task = this.load(cx.to_async()); - // Get all current room's id - let hashes: Vec = self - .inbox - .read(cx) - .rooms - .iter() - .map(|room| room.read(cx).id) - .collect(); + cx.spawn(|this, mut async_cx| async move { + if let Some(inbox) = this.upgrade() { + if let Ok(events) = task.await { + _ = async_cx.update_model(&inbox, |this, cx| { + let current_rooms = this.current_rooms(cx); + let items: Vec> = events + .into_iter() + .filter_map(|ev| { + let id = room_hash(&ev.tags); + // Filter all seen events + if !current_rooms.iter().any(|h| h == &id) { + Some(cx.new_model(|_| Room::parse(&ev))) + } else { + None + } + }) + .collect(); - cx.foreground_executor() - .spawn(async move { - let client = get_client(); - let query: anyhow::Result, anyhow::Error> = async_cx - .background_executor() - .spawn(async move { - let signer = client.signer().await?; - let public_key = signer.get_public_key().await?; + this.rooms.extend(items); + this.is_loading = false; - let filter = Filter::new() - .kind(Kind::PrivateDirectMessage) - .author(public_key); - - // Get all DM events from database - let events = client.database().query(vec![filter]).await?; - - // Filter result - // - Only unique rooms - // - Sorted by created_at - let result = events - .into_iter() - .filter(|ev| ev.tags.public_keys().peekable().peek().is_some()) - .unique_by(|ev| room_hash(&ev.tags)) - .sorted_by_key(|ev| Reverse(ev.created_at)) - .collect::>(); - - Ok(result) - }) - .await; - - if let Ok(events) = query { - _ = async_cx.update_model(&async_inbox, |model, cx| { - let items: Vec> = events - .into_iter() - .filter_map(|ev| { - let id = room_hash(&ev.tags); - // Filter all seen events - if !hashes.iter().any(|h| h == &id) { - Some(cx.new_model(|_| Room::parse(&ev))) - } else { - None - } - }) - .collect(); - - model.rooms.extend(items); - model.is_loading = false; - - cx.notify(); - }); + cx.notify(); + }); + } } }) .detach(); + }); } pub fn inbox(&self) -> WeakModel {