From cc79f0ed1c8300e44b2879631f6cf267c8992333 Mon Sep 17 00:00:00 2001 From: reya Date: Mon, 15 Sep 2025 09:10:37 +0700 Subject: [PATCH] chore: clean up --- Cargo.lock | 85 ++++++------- crates/client_keys/src/lib.rs | 7 +- crates/coop/src/chatspace.rs | 209 +++++++++++++++---------------- crates/coop/src/main.rs | 5 +- crates/coop/src/views/account.rs | 6 +- crates/global/src/lib.rs | 113 +++++++++++------ 6 files changed, 234 insertions(+), 191 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 816549a..6badb6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -251,9 +251,9 @@ dependencies = [ [[package]] name = "async-fs" -version = "2.1.3" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09f7e37c0ed80b2a977691c47dae8625cfb21e205827106c64f7c588766b2e50" +checksum = "8034a681df4aed8b8edbd7fbe472401ecf009251c8b40556b304567052e294c5" dependencies = [ "async-lock", "blocking", @@ -262,11 +262,11 @@ dependencies = [ [[package]] name = "async-io" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19634d6336019ef220f09fd31168ce5c184b295cbf80345437cc36094ef223ca" +checksum = "456b8a8feb6f42d237746d4b3e9a178494627745c3c56c6ea55d92ba50d026fc" dependencies = [ - "async-lock", + "autocfg", "cfg-if", "concurrent-queue", "futures-io", @@ -275,7 +275,7 @@ dependencies = [ "polling", "rustix 1.1.2", "slab", - "windows-sys 0.60.2", + "windows-sys 0.61.0", ] [[package]] @@ -302,9 +302,9 @@ dependencies = [ [[package]] name = "async-process" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65daa13722ad51e6ab1a1b9c01299142bc75135b337923cfa10e79bbbd669f00" +checksum = "fc50921ec0055cdd8a16de48773bfeec5c972598674347252c0399676be7da75" dependencies = [ "async-channel", "async-io", @@ -331,9 +331,9 @@ dependencies = [ [[package]] name = "async-signal" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f567af260ef69e1d52c2b560ce0ea230763e6fbb9214a85d768760a920e3e3c1" +checksum = "43c070bbf59cd3570b6b2dd54cd772527c7c3620fce8be898406dd3ed6adc64c" dependencies = [ "async-io", "async-lock", @@ -344,7 +344,7 @@ dependencies = [ "rustix 1.1.2", "signal-hook-registry", "slab", - "windows-sys 0.60.2", + "windows-sys 0.61.0", ] [[package]] @@ -1131,7 +1131,7 @@ dependencies = [ [[package]] name = "collections" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "indexmap", "rustc-hash 2.1.1", @@ -1572,7 +1572,7 @@ dependencies = [ [[package]] name = "derive_refineable" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "proc-macro2", "quote", @@ -1824,11 +1824,12 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "erased-serde" -version = "0.4.6" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e004d887f51fcb9fef17317a2f3525c887d8aa3f4f50fed920816a688284a5b7" +checksum = "259d404d09818dec19332e31d94558aeb442fea04c817006456c24b5460bbd4b" dependencies = [ "serde", + "serde_core", "typeid", ] @@ -2498,7 +2499,7 @@ dependencies = [ [[package]] name = "gpui" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "anyhow", "as-raw-xcb-connection", @@ -2559,7 +2560,6 @@ dependencies = [ "seahash", "semantic_version", "serde", - "serde_derive", "serde_json", "slotmap", "smallvec", @@ -2592,7 +2592,7 @@ dependencies = [ [[package]] name = "gpui_macros" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -2604,7 +2604,7 @@ dependencies = [ [[package]] name = "gpui_tokio" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "anyhow", "gpui", @@ -2818,7 +2818,7 @@ dependencies = [ [[package]] name = "http_client" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "anyhow", "bytes", @@ -2838,7 +2838,7 @@ dependencies = [ [[package]] name = "http_client_tls" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "rustls", "rustls-platform-verifier", @@ -3428,9 +3428,9 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libredox" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "391290121bad3d37fbddad76d8f5d1c1c314cfc646d143d7e07a3086ddff0ce3" +checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" dependencies = [ "bitflags 2.9.4", "libc", @@ -3629,7 +3629,7 @@ dependencies = [ [[package]] name = "media" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "anyhow", "bindgen 0.71.1", @@ -4594,16 +4594,16 @@ dependencies = [ [[package]] name = "polling" -version = "3.10.0" +version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5bd19146350fe804f7cb2669c851c03d69da628803dab0d98018142aaa5d829" +checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" dependencies = [ "cfg-if", "concurrent-queue", "hermit-abi", "pin-project-lite", "rustix 1.1.2", - "windows-sys 0.60.2", + "windows-sys 0.61.0", ] [[package]] @@ -5069,7 +5069,7 @@ dependencies = [ [[package]] name = "refineable" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "derive_refineable", "workspace-hack", @@ -5223,7 +5223,7 @@ dependencies = [ [[package]] name = "reqwest_client" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "anyhow", "bytes", @@ -5758,7 +5758,7 @@ checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749" [[package]] name = "semantic_version" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "anyhow", "serde", @@ -5767,15 +5767,15 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.26" +version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" [[package]] name = "serde" -version = "1.0.221" +version = "1.0.223" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "341877e04a22458705eb4e131a1508483c877dca2792b3781d4e5d8a6019ec43" +checksum = "a505d71960adde88e293da5cb5eda57093379f64e61cf77bf0e6a63af07a7bac" dependencies = [ "serde_core", "serde_derive", @@ -5783,18 +5783,18 @@ dependencies = [ [[package]] name = "serde_core" -version = "1.0.221" +version = "1.0.223" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c459bc0a14c840cb403fc14b148620de1e0778c96ecd6e0c8c3cacb6d8d00fe" +checksum = "20f57cbd357666aa7b3ac84a90b4ea328f1d4ddb6772b430caa5d9e1309bb9e9" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.221" +version = "1.0.223" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6185cf75117e20e62b1ff867b9518577271e58abe0037c40bb4794969355ab0" +checksum = "3d428d07faf17e306e699ec1e91996e5a165ba5d6bce5b5155173e91a8a01a56" dependencies = [ "proc-macro2", "quote", @@ -5823,14 +5823,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.144" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56177480b00303e689183f110b4e727bb4211d692c62d4fcd16d02be93077d40" +checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ "indexmap", "itoa", "memchr", "ryu", + "serde", "serde_core", ] @@ -6209,7 +6210,7 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "sum_tree" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "arrayvec", "log", @@ -7239,7 +7240,7 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "util" version = "0.1.0" -source = "git+https://github.com/zed-industries/zed#c50b561e1c826e707e0d89bd7d82373c27f2fe32" +source = "git+https://github.com/zed-industries/zed#1090c47a90c586b397dc3fdd0cd09b530f37e5a0" dependencies = [ "anyhow", "async-fs", diff --git a/crates/client_keys/src/lib.rs b/crates/client_keys/src/lib.rs index 3ae7905..8c61a43 100644 --- a/crates/client_keys/src/lib.rs +++ b/crates/client_keys/src/lib.rs @@ -1,5 +1,7 @@ +use std::sync::atomic::Ordering; + use global::constants::KEYRING_URL; -use global::first_run; +use global::css; use gpui::{App, AppContext, Context, Entity, Global, Subscription, Window}; use nostr_sdk::prelude::*; use smallvec::{smallvec, SmallVec}; @@ -59,6 +61,7 @@ impl ClientKeys { return; } + let css = css(); let read_client_keys = cx.read_credentials(KEYRING_URL); cx.spawn_in(window, async move |this, cx| { @@ -73,7 +76,7 @@ impl ClientKeys { this.set_keys(Some(keys), false, true, cx); }) .ok(); - } else if *first_run() { + } else if css.is_first_run.load(Ordering::Acquire) { // If this is the first run, generate new keys and use them for the client keys this.update(cx, |this, cx| { this.new_keys(cx); diff --git a/crates/coop/src/chatspace.rs b/crates/coop/src/chatspace.rs index be9525a..98bd42c 100644 --- a/crates/coop/src/chatspace.rs +++ b/crates/coop/src/chatspace.rs @@ -9,12 +9,11 @@ use auto_update::AutoUpdater; use client_keys::ClientKeys; use common::display::ReadableProfile; use common::event::EventUtils; -use flume::{Receiver, Sender}; use global::constants::{ ACCOUNT_IDENTIFIER, BOOTSTRAP_RELAYS, DEFAULT_SIDEBAR_WIDTH, METADATA_BATCH_LIMIT, METADATA_BATCH_TIMEOUT, SEARCH_RELAYS, }; -use global::{css, ingester, nostr_client, AuthRequest, Notice, Signal, UnwrappingStatus}; +use global::{css, nostr_client, AuthRequest, Notice, SignalKind, UnwrappingStatus}; use gpui::prelude::FluentBuilder; use gpui::{ div, px, rems, App, AppContext, AsyncWindowContext, Axis, Context, Entity, InteractiveElement, @@ -92,7 +91,6 @@ impl ChatSpace { let title_bar = cx.new(|_| TitleBar::new()); let dock = cx.new(|cx| DockArea::new(window, cx)); - let (pubkey_tx, pubkey_rx) = flume::bounded::(1024); let mut subscriptions = smallvec![]; let mut tasks = smallvec![]; @@ -147,7 +145,7 @@ impl ChatSpace { .await .expect("Failed connect the bootstrap relays. Please restart the application."); - Self::process_nostr_events(&pubkey_tx) + Self::process_nostr_events() .await .expect("Failed to handle nostr events. Please restart the application."); }), @@ -171,7 +169,7 @@ impl ChatSpace { tasks.push( // Listen all metadata requests then batch them into single subscription cx.background_spawn(async move { - Self::process_batching_metadata(&pubkey_rx).await; + Self::process_batching_metadata().await; }), ); @@ -215,7 +213,7 @@ impl ChatSpace { async fn observe_signer() { let client = nostr_client(); - let ingester = ingester(); + let css = css(); let stream_timeout = Duration::from_secs(5); let loop_duration = Duration::from_secs(1); @@ -231,7 +229,7 @@ impl ChatSpace { }; // Notify the app that the signer has been set. - ingester.send(Signal::SignerSet(public_key)).await; + css.signal.send(SignalKind::SignerSet(public_key)).await; // Subscribe to the NIP-65 relays for the public key. let filter = Filter::new() @@ -239,36 +237,49 @@ impl ChatSpace { .author(public_key) .limit(1); + let mut nip65_found = false; + match client .stream_events_from(BOOTSTRAP_RELAYS, filter, stream_timeout) .await { Ok(mut stream) => { - let mut processed_ids = HashSet::new(); - - if let Some(event) = stream.next().await { - if processed_ids.insert(event.id) { - // Fetch user's metadata event - Self::fetch_single_event(Kind::Metadata, event.pubkey).await; - - // Fetch user's contact list event - Self::fetch_single_event(Kind::ContactList, event.pubkey).await; - - // Fetch user's inbox relays event - Self::fetch_nip17_relays(event.pubkey).await; - - break; - } + if stream.next().await.is_some() { + nip65_found = true; } else { - ingester.send(Signal::DmRelayNotFound).await; + // Timeout + css.signal.send(SignalKind::RelaysNotFound).await; } } Err(e) => { - log::error!("Error fetching NIP-17 Relay: {e:?}"); - ingester.send(Signal::DmRelayNotFound).await; + log::error!("Error fetching NIP-65 Relay: {e:?}"); + css.signal.send(SignalKind::RelaysNotFound).await; } }; + if nip65_found { + // Subscribe to the NIP-17 relays for the public key. + let filter = Filter::new() + .kind(Kind::InboxRelays) + .author(public_key) + .limit(1); + + match client.stream_events(filter, stream_timeout).await { + Ok(mut stream) => { + if stream.next().await.is_some() { + break; + } else { + // Timeout + css.signal.send(SignalKind::RelaysNotFound).await; + } + } + Err(e) => { + log::error!("Error fetching NIP-17 Relay: {e:?}"); + css.signal.send(SignalKind::RelaysNotFound).await; + } + }; + } + break; } } @@ -276,7 +287,6 @@ impl ChatSpace { async fn observe_giftwrap() { let client = nostr_client(); let css = css(); - let ingester = ingester(); let loop_duration = Duration::from_secs(20); let mut is_start_processing = false; let mut total_loops = 0; @@ -296,14 +306,14 @@ impl ChatSpace { Ordering::Relaxed, ); - let signal = Signal::GiftWrapProcess(UnwrappingStatus::Processing); - ingester.send(signal).await; + let signal = SignalKind::GiftWrapStatus(UnwrappingStatus::Processing); + css.signal.send(signal).await; } else { // Only run further if we are already processing // Wait until after 2 loops to prevent exiting early while events are still being processed if is_start_processing && total_loops >= 2 { - let signal = Signal::GiftWrapProcess(UnwrappingStatus::Complete); - ingester.send(signal).await; + let signal = SignalKind::GiftWrapStatus(UnwrappingStatus::Complete); + css.signal.send(signal).await; // Reset the counter is_start_processing = false; @@ -316,7 +326,8 @@ impl ChatSpace { } } - async fn process_batching_metadata(rx: &Receiver) { + async fn process_batching_metadata() { + let css = css(); let timeout = Duration::from_millis(METADATA_BATCH_TIMEOUT); let mut processed_pubkeys: HashSet = HashSet::new(); let mut batch: HashSet = HashSet::new(); @@ -331,7 +342,7 @@ impl ChatSpace { loop { let futs = smol::future::or( async move { - if let Ok(public_key) = rx.recv_async().await { + if let Ok(public_key) = css.ingester.receiver().recv_async().await { BatchEvent::PublicKey(public_key) } else { BatchEvent::Closed @@ -366,9 +377,8 @@ impl ChatSpace { } } - async fn process_nostr_events(pubkey_tx: &Sender) -> Result<(), Error> { + async fn process_nostr_events() -> Result<(), Error> { let client = nostr_client(); - let ingester = ingester(); let css = css(); let mut processed_events: HashSet = HashSet::new(); @@ -396,6 +406,36 @@ impl ChatSpace { } match event.kind { + Kind::RelayList => { + if let Ok(true) = Self::is_self_event(&event).await { + // Fetch user's metadata event + Self::fetch_single_event(Kind::Metadata, event.pubkey).await; + + // Fetch user's contact list event + Self::fetch_single_event(Kind::ContactList, event.pubkey).await; + } + } + Kind::InboxRelays => { + let relays = nip17::extract_relay_list(&event).collect_vec(); + + if !relays.is_empty() { + for relay in relays.clone().into_iter() { + if client.add_relay(relay).await.is_err() { + let notice = Notice::RelayFailed(relay.clone()); + css.signal.send(SignalKind::Notice(notice)).await; + } + if client.connect_relay(relay).await.is_err() { + let notice = Notice::RelayFailed(relay.clone()); + css.signal.send(SignalKind::Notice(notice)).await; + } + } + + // Subscribe to gift wrap events only in the current user's NIP-17 relays + Self::fetch_gift_wrap(relays, event.pubkey).await; + } else { + css.signal.send(SignalKind::RelaysNotFound).await; + } + } Kind::ContactList => { if let Ok(true) = Self::is_self_event(&event).await { let public_keys = event.tags.public_keys().copied().collect_vec(); @@ -411,28 +451,28 @@ impl ChatSpace { } } Kind::Metadata => { - if let Ok(metadata) = Metadata::from_json(&event.content) { - let profile = Profile::new(event.pubkey, metadata); - ingester.send(Signal::Metadata(profile)).await; - } + let metadata = Metadata::from_json(&event.content).unwrap_or_default(); + let profile = Profile::new(event.pubkey, metadata); + + css.signal.send(SignalKind::NewProfile(profile)).await; } Kind::GiftWrap => { - Self::unwrap_gift_wrap(&event, pubkey_tx).await; + Self::unwrap_gift_wrap(&event).await; } _ => {} } } RelayMessage::EndOfStoredEvents(subscription_id) => { if *subscription_id == css.gift_wrap_sub_id { - let signal = Signal::GiftWrapProcess(UnwrappingStatus::Processing); - ingester.send(signal).await; + let signal = SignalKind::GiftWrapStatus(UnwrappingStatus::Processing); + css.signal.send(signal).await; } } RelayMessage::Auth { challenge } => { if challenges.insert(challenge.clone()) { let req = AuthRequest::new(challenge, relay_url); // Send a signal to the ingester to handle the auth request - ingester.send(Signal::Auth(req)).await; + css.signal.send(SignalKind::Auth(req)).await; } } RelayMessage::Ok { @@ -458,17 +498,16 @@ impl ChatSpace { } async fn process_nostr_signals(view: WeakEntity, cx: &mut AsyncWindowContext) { - let ingester = ingester(); - let signals = ingester.signals(); + let css = css(); let mut is_open_proxy_modal = false; - while let Ok(signal) = signals.recv_async().await { + while let Ok(signal) = css.signal.receiver().recv_async().await { cx.update(|window, cx| { let registry = Registry::global(cx); let settings = AppSettings::global(cx); match signal { - Signal::SignerSet(public_key) => { + SignalKind::SignerSet(public_key) => { window.close_modal(cx); // Setup the default layout for current workspace @@ -488,7 +527,7 @@ impl ChatSpace { this.load_rooms(window, cx); }); } - Signal::SignerUnset => { + SignalKind::SignerUnset => { // Setup the onboarding layout for current workspace view.update(cx, |this, cx| { this.set_onboarding_layout(window, cx); @@ -500,7 +539,7 @@ impl ChatSpace { this.reset(cx); }); } - Signal::Auth(req) => { + SignalKind::Auth(req) => { let url = &req.url; let auto_auth = AppSettings::get_auto_auth(cx); let is_authenticated = AppSettings::read_global(cx).is_authenticated(url); @@ -518,7 +557,7 @@ impl ChatSpace { }) .ok(); } - Signal::ProxyDown => { + SignalKind::ProxyDown => { if !is_open_proxy_modal { is_open_proxy_modal = true; @@ -528,28 +567,28 @@ impl ChatSpace { .ok(); } } - Signal::GiftWrapProcess(status) => { + SignalKind::GiftWrapStatus(status) => { registry.update(cx, |this, cx| { this.set_unwrapping_status(status, cx); }); } - Signal::Metadata(profile) => { + SignalKind::NewProfile(profile) => { registry.update(cx, |this, cx| { this.insert_or_update_person(profile, cx); }); } - Signal::Message((gift_wrap_id, event)) => { + SignalKind::NewMessage((gift_wrap_id, event)) => { registry.update(cx, |this, cx| { this.event_to_message(gift_wrap_id, event, window, cx); }); } - Signal::DmRelayNotFound => { + SignalKind::RelaysNotFound => { view.update(cx, |this, cx| { this.set_required_relays(cx); }) .ok(); } - Signal::Notice(msg) => { + SignalKind::Notice(msg) => { window.push_notification(msg.as_str(), cx); } }; @@ -578,49 +617,7 @@ impl ChatSpace { } } - pub async fn fetch_nip17_relays(public_key: PublicKey) { - let client = nostr_client(); - let ingester = ingester(); - let filter = Filter::new() - .kind(Kind::InboxRelays) - .author(public_key) - .limit(1); - - match client.stream_events(filter, Duration::from_secs(5)).await { - Ok(mut stream) => { - let mut processed_ids = HashSet::new(); - - if let Some(event) = stream.next().await { - if processed_ids.insert(event.id) { - let relays = nip17::extract_relay_list(&event).collect_vec(); - - if !relays.is_empty() { - for relay in relays.clone().into_iter() { - if client.add_relay(relay).await.is_err() { - let notice = Notice::RelayFailed(relay.clone()); - ingester.send(Signal::Notice(notice)).await; - } - if client.connect_relay(relay).await.is_err() { - let notice = Notice::RelayFailed(relay.clone()); - ingester.send(Signal::Notice(notice)).await; - } - } - - // Subscribe to gift wrap events only in the current user's NIP-17 relays - Self::fetch_gift_wrap(relays, event.pubkey).await; - } - } - } else { - ingester.send(Signal::DmRelayNotFound).await; - } - } - Err(e) => { - log::error!("Error fetching NIP-17 Relay: {e:?}"); - ingester.send(Signal::DmRelayNotFound).await; - } - }; - } - + /// Fetches gift wrap events for a given public key and relays pub async fn fetch_gift_wrap(relays: Vec<&RelayUrl>, public_key: PublicKey) { let client = nostr_client(); let sub_id = css().gift_wrap_sub_id.clone(); @@ -697,9 +694,8 @@ impl ChatSpace { } /// Unwraps a gift-wrapped event and processes its contents. - async fn unwrap_gift_wrap(target: &Event, pubkey_tx: &Sender) { + async fn unwrap_gift_wrap(target: &Event) { let client = nostr_client(); - let ingester = ingester(); let css = css(); let mut message: Option = None; @@ -720,14 +716,17 @@ impl ChatSpace { if let Some(event) = message { // Send all pubkeys to the metadata batch to sync data for public_key in event.all_pubkeys() { - pubkey_tx.send_async(public_key).await.ok(); + css.ingester.send(public_key).await; } match event.created_at >= css.init_at { // New message: send a signal to notify the UI true => { + // A small delay to prevent UI flickering smol::Timer::after(Duration::from_millis(200)).await; - ingester.send(Signal::Message((target.id, event))).await; + css.signal + .send(SignalKind::NewMessage((target.id, event))) + .await; } // Old message: Coop is probably processing the user's messages during initial load false => { @@ -1130,7 +1129,7 @@ impl ChatSpace { fn on_sign_out(&mut self, _e: &Logout, _window: &mut Window, cx: &mut Context) { cx.background_spawn(async move { let client = nostr_client(); - let ingester = ingester(); + let css = css(); let filter = Filter::new() .kind(Kind::ApplicationSpecificData) @@ -1143,7 +1142,7 @@ impl ChatSpace { client.reset().await; // Notify the channel about the signer being unset - ingester.send(Signal::SignerUnset).await; + css.signal.send(SignalKind::SignerUnset).await; }) .detach(); } @@ -1367,7 +1366,7 @@ impl ChatSpace { this._tasks.push(cx.background_spawn(async move { let client = nostr_client(); - let ingester = ingester(); + let css = css(); if proxy.start().await.is_ok() { webbrowser::open(&url).ok(); @@ -1398,7 +1397,7 @@ impl ChatSpace { break; } else { - ingester.send(Signal::ProxyDown).await; + css.signal.send(SignalKind::ProxyDown).await; } smol::Timer::after(Duration::from_secs(1)).await; } diff --git a/crates/coop/src/main.rs b/crates/coop/src/main.rs index a9422fb..5c24d7c 100644 --- a/crates/coop/src/main.rs +++ b/crates/coop/src/main.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use assets::Assets; use global::constants::{APP_ID, APP_NAME}; -use global::{css, ingester, nostr_client}; +use global::{css, nostr_client}; use gpui::{ point, px, size, AppContext, Application, Bounds, KeyBinding, Menu, MenuItem, SharedString, TitlebarOptions, WindowBackgroundAppearance, WindowBounds, WindowDecorations, WindowKind, @@ -26,9 +26,6 @@ fn main() { // Initialize the Nostr client let _client = nostr_client(); - // Initialize the ingester - let _ingester = ingester(); - // Initialize the coop simple storage let _css = css(); diff --git a/crates/coop/src/views/account.rs b/crates/coop/src/views/account.rs index cc8b2ae..bb1fe75 100644 --- a/crates/coop/src/views/account.rs +++ b/crates/coop/src/views/account.rs @@ -5,7 +5,7 @@ use client_keys::ClientKeys; use common::display::ReadableProfile; use common::handle_auth::CoopAuthUrlHandler; use global::constants::{ACCOUNT_IDENTIFIER, BUNKER_TIMEOUT}; -use global::{ingester, nostr_client, Signal}; +use global::{css, nostr_client, SignalKind}; use gpui::prelude::FluentBuilder; use gpui::{ div, relative, rems, svg, AnyElement, App, AppContext, Context, Entity, EventEmitter, @@ -248,7 +248,7 @@ impl Account { // Reset the nostr client in the background cx.background_spawn(async move { let client = nostr_client(); - let ingester = ingester(); + let css = css(); let filter = Filter::new() .kind(Kind::ApplicationSpecificData) @@ -261,7 +261,7 @@ impl Account { client.unset_signer().await; // Notify the channel about the signer being unset - ingester.send(Signal::SignerUnset).await; + css.signal.send(SignalKind::SignerUnset).await; }), ); } diff --git a/crates/global/src/lib.rs b/crates/global/src/lib.rs index fa4fa13..5516b89 100644 --- a/crates/global/src/lib.rs +++ b/crates/global/src/lib.rs @@ -57,7 +57,7 @@ pub enum UnwrappingStatus { /// Signals sent through the global event channel to notify UI #[derive(Debug)] -pub enum Signal { +pub enum SignalKind { /// A signal to notify UI that the client's signer has been set SignerSet(PublicKey), @@ -71,25 +71,54 @@ pub enum Signal { ProxyDown, /// A signal to notify UI that a new profile has been received - Metadata(Profile), + NewProfile(Profile), /// A signal to notify UI that a new gift wrap event has been received - Message((EventId, Event)), + NewMessage((EventId, Event)), - /// A signal to notify UI that gift wrap process status has changed - GiftWrapProcess(UnwrappingStatus), + /// A signal to notify UI that no DM relays for current user was found + RelaysNotFound, - /// A signal to notify UI that no DM relay for current user was found - DmRelayNotFound, + /// A signal to notify UI that gift wrap status has changed + GiftWrapStatus(UnwrappingStatus), /// A signal to notify UI that there are errors or notices occurred Notice(Notice), } +#[derive(Debug)] +pub struct Signal { + rx: Receiver, + tx: Sender, +} + +impl Default for Signal { + fn default() -> Self { + Self::new() + } +} + +impl Signal { + pub fn new() -> Self { + let (tx, rx) = flume::bounded::(2048); + Self { rx, tx } + } + + pub fn receiver(&self) -> &Receiver { + &self.rx + } + + pub async fn send(&self, kind: SignalKind) { + if let Err(e) = self.tx.send_async(kind).await { + log::error!("Failed to send signal: {e}"); + } + } +} + #[derive(Debug)] pub struct Ingester { - rx: Receiver, - tx: Sender, + rx: Receiver, + tx: Sender, } impl Default for Ingester { @@ -100,17 +129,17 @@ impl Default for Ingester { impl Ingester { pub fn new() -> Self { - let (tx, rx) = flume::bounded::(2048); + let (tx, rx) = flume::bounded::(1024); Self { rx, tx } } - pub fn signals(&self) -> &Receiver { + pub fn receiver(&self) -> &Receiver { &self.rx } - pub async fn send(&self, signal: Signal) { - if let Err(e) = self.tx.send_async(signal).await { - log::error!("Failed to send signal: {e}"); + pub async fn send(&self, public_key: PublicKey) { + if let Err(e) = self.tx.send_async(public_key).await { + log::error!("Failed to send public key: {e}"); } } } @@ -119,14 +148,28 @@ impl Ingester { #[derive(Debug)] pub struct CoopSimpleStorage { pub init_at: Timestamp, + pub last_used_at: Option, + + pub is_first_run: AtomicBool, + pub gift_wrap_sub_id: SubscriptionId, + pub gift_wrap_processing: AtomicBool, + pub auto_close_opts: Option, + pub seen_on_relays: RwLock>>, + pub sent_ids: RwLock>, + pub resent_ids: RwLock>>, + pub resend_queue: RwLock>, + + pub signal: Signal, + + pub ingester: Ingester, } impl Default for CoopSimpleStorage { @@ -137,14 +180,22 @@ impl Default for CoopSimpleStorage { impl CoopSimpleStorage { pub fn new() -> Self { + let init_at = Timestamp::now(); + let first_run = first_run(); + let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); + + let signal = Signal::default(); + let ingester = Ingester::default(); + Self { - init_at: Timestamp::now(), + init_at, + signal, + ingester, last_used_at: None, + is_first_run: AtomicBool::new(first_run), gift_wrap_sub_id: SubscriptionId::new("inbox"), gift_wrap_processing: AtomicBool::new(false), - auto_close_opts: Some( - SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE), - ), + auto_close_opts: Some(opts), seen_on_relays: RwLock::new(HashMap::new()), sent_ids: RwLock::new(HashSet::new()), resent_ids: RwLock::new(Vec::new()), @@ -154,9 +205,7 @@ impl CoopSimpleStorage { } static NOSTR_CLIENT: OnceLock = OnceLock::new(); -static INGESTER: OnceLock = OnceLock::new(); static COOP_SIMPLE_STORAGE: OnceLock = OnceLock::new(); -static FIRST_RUN: OnceLock = OnceLock::new(); pub fn nostr_client() -> &'static Client { NOSTR_CLIENT.get_or_init(|| { @@ -181,25 +230,19 @@ pub fn nostr_client() -> &'static Client { }) } -pub fn ingester() -> &'static Ingester { - INGESTER.get_or_init(Ingester::new) -} - pub fn css() -> &'static CoopSimpleStorage { COOP_SIMPLE_STORAGE.get_or_init(CoopSimpleStorage::new) } -pub fn first_run() -> &'static bool { - FIRST_RUN.get_or_init(|| { - let flag = support_dir().join(format!(".{}-first_run", env!("CARGO_PKG_VERSION"))); +fn first_run() -> bool { + let flag = support_dir().join(format!(".{}-first_run", env!("CARGO_PKG_VERSION"))); - if !flag.exists() { - if std::fs::write(&flag, "").is_err() { - return false; - } - true // First run - } else { - false // Not first run + if !flag.exists() { + if std::fs::write(&flag, "").is_err() { + return false; } - }) + true // First run + } else { + false // Not first run + } }