diff --git a/crates/app/src/main.rs b/crates/app/src/main.rs index eb0ea20..38bb4c2 100644 --- a/crates/app/src/main.rs +++ b/crates/app/src/main.rs @@ -4,14 +4,18 @@ use dirs::config_dir; use gpui::*; use nostr_sdk::prelude::*; use std::{ + collections::HashSet, fs, str::FromStr, sync::{Arc, OnceLock}, time::Duration, }; -use tokio::sync::mpsc; +use tokio::{ + sync::{broadcast, mpsc}, + time::sleep, +}; -use constants::{ALL_MESSAGES_SUB_ID, APP_NAME, FAKE_SIG, NEW_MESSAGE_SUB_ID}; +use constants::{ALL_MESSAGES_SUB_ID, APP_NAME, FAKE_SIG, METADATA_DELAY, NEW_MESSAGE_SUB_ID}; use states::{account::AccountRegistry, chat::ChatRegistry, signal::SignalRegistry}; use views::app::AppView; @@ -77,8 +81,12 @@ async fn main() { let (new_chat_tx, mut new_chat_rx) = mpsc::channel::(1000); // TODO: adjust? // Channel for all chats + // When receive EOSE from relay(s). Reload UI let (all_chats_tx, mut all_chats_rx) = mpsc::channel::(1); + // Channel for metadata request queue + let (queue_tx, mut queue_rx) = broadcast::channel::(100); + tokio::spawn(async move { let sig = Signature::from_str(FAKE_SIG).unwrap(); let all_messages_sub_id = SubscriptionId::new(ALL_MESSAGES_SUB_ID); @@ -115,9 +123,11 @@ async fn main() { // Save rumor to database to further query _ = client.database().save_event(&ev).await; - // Send event to channel + // Send event back to channel if subscription_id == new_message_sub_id { - _ = new_chat_tx.send(ev).await; + if let Err(e) = new_chat_tx.send(ev).await { + println!("Error: {}", e) + } } } } @@ -133,13 +143,47 @@ async fn main() { } }); + tokio::spawn(async move { + let mut queue: HashSet = HashSet::new(); + + while let Ok(public_key) = queue_rx.recv().await { + queue.insert(public_key); + + // Wait for METADATA_DELAY + sleep(Duration::from_millis(METADATA_DELAY)).await; + + if !queue.is_empty() { + let authors: Vec = queue.iter().copied().collect(); + let total = authors.len(); + + let filter = Filter::new() + .authors(authors) + .kind(Kind::Metadata) + .limit(total); + + let opts = SubscribeAutoCloseOptions::default() + .filter(FilterOptions::WaitDurationAfterEOSE(Duration::from_secs(2))); + + // Clear queue + queue.clear(); + + if let Err(e) = client.subscribe(vec![filter], Some(opts)).await { + println!("Error: {}", e); + } + } + } + }); + App::new() .with_assets(Assets) .with_http_client(Arc::new(reqwest_client::ReqwestClient::new())) .run(move |cx| { + // Account state AccountRegistry::set_global(cx); + // Chat state ChatRegistry::set_global(cx); - SignalRegistry::set_global(cx); + // Hold all metadata requests and merged it + SignalRegistry::set_global(cx, Arc::new(queue_tx)); // Initialize components coop_ui::init(cx); diff --git a/crates/app/src/states/signal.rs b/crates/app/src/states/signal.rs index b5714af..ced90fc 100644 --- a/crates/app/src/states/signal.rs +++ b/crates/app/src/states/signal.rs @@ -1,15 +1,18 @@ use gpui::*; use nostr_sdk::prelude::*; +use std::sync::Arc; +use tokio::sync::broadcast::Sender; pub struct SignalRegistry { public_keys: Vec, + pub queue: Arc>, } impl Global for SignalRegistry {} impl SignalRegistry { - pub fn set_global(cx: &mut AppContext) { - cx.set_global(Self::new()); + pub fn set_global(cx: &mut AppContext, queue: Arc>) { + cx.set_global(Self::new(queue)); } pub fn contains(&self, public_key: PublicKey) -> bool { @@ -20,9 +23,16 @@ impl SignalRegistry { self.public_keys.push(public_key); } - fn new() -> Self { + pub fn add_to_queue(&mut self, public_key: PublicKey) { + if let Err(e) = self.queue.send(public_key) { + println!("Dropped: {}", e) + } + } + + fn new(queue: Arc>) -> Self { Self { public_keys: Vec::new(), + queue, } } } diff --git a/crates/app/src/views/dock/inbox/chat.rs b/crates/app/src/views/dock/inbox/chat.rs index c599932..5b293d0 100644 --- a/crates/app/src/views/dock/inbox/chat.rs +++ b/crates/app/src/views/dock/inbox/chat.rs @@ -170,6 +170,10 @@ impl Chat { }) .detach(); + cx.update_global::(|state, _cx| { + state.add_to_queue(public_key); + }); + cx.observe_global::(|chat, cx| { chat.load_profile(cx); }) diff --git a/crates/app/src/views/dock/inbox/mod.rs b/crates/app/src/views/dock/inbox/mod.rs index dffefe1..2c61f44 100644 --- a/crates/app/src/views/dock/inbox/mod.rs +++ b/crates/app/src/views/dock/inbox/mod.rs @@ -1,6 +1,7 @@ use chat::Chat; use coop_ui::{theme::ActiveTheme, v_flex, Collapsible, Icon, IconName, StyledExt}; use gpui::*; + use prelude::FluentBuilder; use crate::states::chat::ChatRegistry; @@ -10,6 +11,7 @@ pub mod chat; pub struct Inbox { label: SharedString, chats: Model>>>, + is_loading: bool, is_collapsed: bool, } @@ -17,19 +19,25 @@ impl Inbox { pub fn new(cx: &mut ViewContext<'_, Self>) -> Self { let chats = cx.new_model(|_| None); + // Reload UI if global state changes cx.observe_global::(|inbox, cx| { - inbox.add_chats(cx); + inbox.load(cx); }) .detach(); Self { chats, label: "Inbox".into(), + is_loading: true, is_collapsed: false, } } - fn add_chats(&self, cx: &mut ViewContext) { + fn load(&mut self, cx: &mut ViewContext) { + // Stop loading indicator; + self.set_loading(cx); + + // Read global chat registry let events = cx.global::().get(cx); if let Some(events) = events { @@ -44,6 +52,11 @@ impl Inbox { }); } } + + fn set_loading(&mut self, cx: &mut ViewContext) { + self.is_loading = false; + cx.notify(); + } } impl Collapsible for Inbox { @@ -63,6 +76,11 @@ impl Render for Inbox { if let Some(chats) = self.chats.read(cx).as_ref() { content = content.children(chats.clone()) + } else { + match self.is_loading { + true => content = content.child("Loading..."), + false => content = content.child("Empty"), + } } v_flex() diff --git a/crates/app/src/views/onboarding.rs b/crates/app/src/views/onboarding.rs index 643fc79..321f3e1 100644 --- a/crates/app/src/views/onboarding.rs +++ b/crates/app/src/views/onboarding.rs @@ -49,8 +49,8 @@ impl Onboarding { get_client().set_signer(keys).await; }); - // Update view - cx.update_global(|state: &mut AccountRegistry, cx| { + // Update globals state + cx.update_global::(|state, cx| { state.set_user(Some(public_key)); cx.notify(); });