chore: update deps and refactor the event loop

This commit is contained in:
2025-09-06 20:55:11 +07:00
parent ede41c41c3
commit 60bca49200
10 changed files with 259 additions and 221 deletions

View File

@@ -59,6 +59,7 @@ smallvec.workspace = true
smol.workspace = true
futures.workspace = true
oneshot.workspace = true
flume.workspace = true
webbrowser.workspace = true
tracing-subscriber = { version = "0.3.18", features = ["fmt"] }

View File

@@ -9,11 +9,12 @@ use auto_update::AutoUpdater;
use client_keys::ClientKeys;
use common::display::ReadableProfile;
use common::event::EventUtils;
use flume::{Receiver, Sender};
use global::constants::{
ACCOUNT_IDENTIFIER, BOOTSTRAP_RELAYS, DEFAULT_SIDEBAR_WIDTH, METADATA_BATCH_LIMIT,
METADATA_BATCH_TIMEOUT, SEARCH_RELAYS,
};
use global::{css, ingester, nostr_client, AuthRequest, Notice, Signal};
use global::{css, ingester, nostr_client, AuthRequest, Notice, Signal, UnwrappingStatus};
use gpui::prelude::FluentBuilder;
use gpui::{
div, px, rems, App, AppContext, AsyncWindowContext, Axis, Context, Entity, InteractiveElement,
@@ -28,7 +29,6 @@ use registry::{Registry, RegistryEvent};
use settings::AppSettings;
use signer_proxy::{BrowserSignerProxy, BrowserSignerProxyOptions};
use smallvec::{smallvec, SmallVec};
use smol::channel::{Receiver, Sender};
use theme::{ActiveTheme, Theme, ThemeMode};
use title_bar::TitleBar;
use ui::actions::OpenProfile;
@@ -40,7 +40,6 @@ use ui::dock_area::{ClosePanel, DockArea, DockItem};
use ui::modal::ModalButtonProps;
use ui::notification::Notification;
use ui::popup_menu::PopupMenuExt;
use ui::tooltip::Tooltip;
use ui::{h_flex, v_flex, ContextModal, IconName, Root, Sizable, StyledExt};
use crate::actions::{DarkMode, Logout, Settings};
@@ -69,20 +68,20 @@ pub struct ChatSpace {
dock: Entity<DockArea>,
auth_requests: Vec<(String, RelayUrl)>,
has_nip17_relays: bool,
_subscriptions: SmallVec<[Subscription; 2]>,
_tasks: SmallVec<[Task<()>; 3]>,
_subscriptions: SmallVec<[Subscription; 3]>,
_tasks: SmallVec<[Task<()>; 5]>,
}
impl ChatSpace {
pub fn new(window: &mut Window, cx: &mut Context<Self>) -> Self {
let registry = Registry::global(cx);
let client_keys = ClientKeys::global(cx);
let registry = Registry::global(cx);
let status = registry.read(cx).unwrapping_status.clone();
let title_bar = cx.new(|_| TitleBar::new());
let dock = cx.new(|cx| DockArea::new(window, cx));
let (pubkey_tx, pubkey_rx) = smol::channel::bounded::<PublicKey>(1024);
let (pubkey_tx, pubkey_rx) = flume::bounded::<PublicKey>(1024);
let mut subscriptions = smallvec![];
let mut tasks = smallvec![];
@@ -97,6 +96,31 @@ impl ChatSpace {
}),
);
subscriptions.push(
// Observe the global registry
cx.observe_in(&status, window, move |this, status, window, cx| {
let registry = Registry::global(cx);
let status = status.read(cx);
let all_panels = this.get_all_panel_ids(cx);
match status {
UnwrappingStatus::Processing => {
registry.update(cx, |this, cx| {
this.load_rooms(window, cx);
this.refresh_rooms(all_panels, cx);
});
}
UnwrappingStatus::Complete => {
registry.update(cx, |this, cx| {
this.load_rooms(window, cx);
this.refresh_rooms(all_panels, cx);
});
}
_ => {}
};
}),
);
subscriptions.push(
// Subscribe to open chat room requests
cx.subscribe_in(&registry, window, move |this, _, event, window, cx| {
@@ -181,7 +205,7 @@ impl ChatSpace {
async fn observe_signer() {
let client = nostr_client();
let ingester = ingester();
let loop_duration = Duration::from_millis(500);
let loop_duration = Duration::from_secs(1);
let mut is_sent_signal = false;
let mut identity: Option<PublicKey> = None;
@@ -212,17 +236,16 @@ impl ChatSpace {
break;
}
}
Err(_) => {
Err(e) => {
log::error!("Database query error: {e}");
if !is_sent_signal {
ingester.send(Signal::DmRelayNotFound).await;
is_sent_signal = true;
}
}
}
} else if !is_sent_signal {
ingester.send(Signal::DmRelayNotFound).await;
is_sent_signal = true;
} else {
log::error!("Database error.");
break;
}
} else {
@@ -250,13 +273,15 @@ impl ChatSpace {
let client = nostr_client();
let css = css();
let ingester = ingester();
let loop_duration = Duration::from_secs(10);
let mut total_notify = 0;
let loop_duration = Duration::from_secs(20);
let mut is_start_processing = false;
let mut total_loops = 0;
loop {
if client.has_signer().await {
if css.gift_wrap_processing.load(Ordering::Acquire) {
ingester.send(Signal::EventProcessing).await;
is_start_processing = true;
total_loops += 1;
// Reset gift wrap processing flag
let _ = css.gift_wrap_processing.compare_exchange(
@@ -265,11 +290,19 @@ impl ChatSpace {
Ordering::Release,
Ordering::Relaxed,
);
let signal = Signal::GiftWrapProcess(UnwrappingStatus::Processing);
ingester.send(signal).await;
} else {
// Only send signal to ingester a maximum of three times
if total_notify <= 3 {
ingester.send(Signal::EventProcessed(true)).await;
total_notify += 1;
// Only run further if we are already processing
// Wait until after 3 loops to prevent exiting early while events are still being processed
if is_start_processing && total_loops >= 3 {
let signal = Signal::GiftWrapProcess(UnwrappingStatus::Complete);
ingester.send(signal).await;
// Reset the counter
is_start_processing = false;
total_loops = 0;
}
}
}
@@ -293,7 +326,7 @@ impl ChatSpace {
loop {
let futs = smol::future::or(
async move {
if let Ok(public_key) = rx.recv().await {
if let Ok(public_key) = rx.recv_async().await {
BatchEvent::PublicKey(public_key)
} else {
BatchEvent::Closed
@@ -336,7 +369,6 @@ impl ChatSpace {
let auto_close =
SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let mut event_counter = 0;
let mut processed_events: HashSet<EventId> = HashSet::new();
let mut challenges: HashSet<Cow<'_, str>> = HashSet::new();
let mut notifications = client.notifications();
@@ -406,25 +438,15 @@ impl ChatSpace {
ingester.send(Signal::Metadata(event.into_owned())).await;
}
Kind::GiftWrap => {
// Mark gift wrap event as currently being processed
css.gift_wrap_processing.store(true, Ordering::Release);
// Process the gift wrap event
Self::unwrap_gift_wrap_event(&event, pubkey_tx).await;
// Trigger a partial UI reload if at least 50 events have been processed
if event_counter >= 20 {
ingester.send(Signal::EventProcessed(false)).await;
event_counter = 0;
}
event_counter += 1;
Self::unwrap_gift_wrap(&event, pubkey_tx).await;
}
_ => {}
}
}
RelayMessage::EndOfStoredEvents(subscription_id) => {
if *subscription_id == css.gift_wrap_sub_id {
ingester.send(Signal::EventProcessed(false)).await;
let signal = Signal::GiftWrapProcess(UnwrappingStatus::Processing);
ingester.send(signal).await;
}
}
RelayMessage::Auth { challenge } => {
@@ -443,9 +465,7 @@ impl ChatSpace {
// Keep track of events that need to be resent
match MachineReadablePrefix::parse(&message) {
Some(MachineReadablePrefix::AuthRequired) => {
if client.has_signer().await {
css.resend_queue.write().await.insert(event_id, relay_url);
};
css.resend_queue.write().await.insert(event_id, relay_url);
}
Some(_) => {}
None => {}
@@ -463,7 +483,7 @@ impl ChatSpace {
let signals = ingester.signals();
let mut is_open_proxy_modal = false;
while let Ok(signal) = signals.recv().await {
while let Ok(signal) = signals.recv_async().await {
cx.update(|window, cx| {
let registry = Registry::global(cx);
let settings = AppSettings::global(cx);
@@ -486,7 +506,7 @@ impl ChatSpace {
// Load all chat rooms
registry.update(cx, |this, cx| {
this.set_identity(public_key, cx);
this.load_rooms(false, window, cx);
this.load_rooms(window, cx);
});
}
Signal::SignerUnset => {
@@ -531,36 +551,21 @@ impl ChatSpace {
.ok();
}
}
// Notify the user that the gift wrap still processing
Signal::EventProcessing => {
Signal::GiftWrapProcess(status) => {
registry.update(cx, |this, cx| {
this.set_loading(true, cx);
this.set_unwrapping_status(status, cx);
});
}
Signal::EventProcessed(finish) => {
registry.update(cx, |this, cx| {
// Load all chat rooms in the database
this.load_rooms(finish, window, cx);
// Send a signal to refresh all opened rooms' messages
if let Some(ids) = ChatSpace::all_panels(window, cx) {
this.refresh_rooms(ids, cx);
}
});
}
// Add the new metadata to the registry or update the existing one
Signal::Metadata(event) => {
registry.update(cx, |this, cx| {
this.insert_or_update_person(event, cx);
});
}
// Convert the gift wrapped message to a message
Signal::Message((gift_wrap_id, event)) => {
registry.update(cx, |this, cx| {
this.event_to_message(gift_wrap_id, event, window, cx);
});
}
// Notify the user that the DM relay is not set
Signal::DmRelayNotFound => {
view.update(cx, |this, cx| {
this.set_no_nip17_relays(cx);
@@ -702,52 +707,46 @@ impl ChatSpace {
}
/// Unwraps a gift-wrapped event and processes its contents.
async fn unwrap_gift_wrap_event(gift: &Event, pubkey_tx: &Sender<PublicKey>) -> bool {
async fn unwrap_gift_wrap(target: &Event, pubkey_tx: &Sender<PublicKey>) {
let client = nostr_client();
let ingester = ingester();
let css = css();
let mut is_cached = false;
let mut message: Option<Event> = None;
let event = match Self::get_unwrapped_event(gift.id).await {
Ok(event) => {
is_cached = true;
event
if let Ok(event) = Self::get_unwrapped_event(target.id).await {
message = Some(event);
} else if let Ok(unwrapped) = client.unwrap_gift_wrap(target).await {
// Sign the unwrapped event with a RANDOM KEYS
if let Ok(event) = unwrapped.rumor.sign_with_keys(&Keys::generate()) {
// Save this event to the database for future use.
if let Err(e) = Self::set_unwrapped_event(target.id, &event).await {
log::warn!("Failed to cache unwrapped event: {e}")
}
message = Some(event);
}
Err(_) => {
match client.unwrap_gift_wrap(gift).await {
Ok(unwrap) => {
// Sign the unwrapped event with a RANDOM KEYS
let Ok(unwrapped) = unwrap.rumor.sign_with_keys(&Keys::generate()) else {
log::error!("Failed to sign event");
return false;
};
}
// Save this event to the database for future use.
if let Err(e) = Self::set_unwrapped_event(gift.id, &unwrapped).await {
log::warn!("Failed to cache unwrapped event: {e}")
}
if let Some(event) = message {
// Send all pubkeys to the metadata batch to sync data
for public_key in event.all_pubkeys() {
pubkey_tx.send_async(public_key).await.ok();
}
unwrapped
}
Err(e) => {
log::error!("Failed to unwrap event: {e}");
return false;
match event.created_at >= css.init_at {
// New message: send a signal to notify the UI
true => {
// Prevent notification if the event was sent by Coop
if !css.sent_ids.read().await.contains(&target.id) {
ingester.send(Signal::Message((target.id, event))).await;
}
}
// Old message: Coop is probably processing the user's messages during initial load
false => {
css.gift_wrap_processing.store(true, Ordering::Release);
}
}
};
// Send all pubkeys to the metadata batch to sync data
for public_key in event.all_pubkeys() {
pubkey_tx.send(public_key).await.ok();
}
// Send a notify to GPUI if this is a new message
if event.created_at >= css.init_at {
ingester.send(Signal::Message((gift.id, event))).await;
}
is_cached
}
fn process_registry_event(
@@ -1202,26 +1201,23 @@ impl ChatSpace {
cx: &Context<Self>,
) -> impl IntoElement {
let registry = Registry::read_global(cx);
let loading = registry.loading;
let status = registry.unwrapping_status.read(cx);
h_flex()
.gap_2()
.h_6()
.w_full()
.child(compose_button())
.when(loading, |this| {
.when(status != &UnwrappingStatus::Complete, |this| {
this.child(
h_flex()
.id("downloading")
.px_2()
.h_6()
.gap_1()
.text_xs()
.rounded_full()
.bg(cx.theme().surface_background)
.child(shared_t!("loading.label"))
.tooltip(|window, cx| {
Tooltip::new(t!("loading.tooltip"), window, cx).into()
}),
.child(shared_t!("loading.label")),
)
})
}
@@ -1366,17 +1362,8 @@ impl ChatSpace {
});
}
pub(crate) fn all_panels(window: &mut Window, cx: &mut App) -> Option<Vec<u64>> {
let Some(Some(root)) = window.root::<Root>() else {
return None;
};
let Ok(chatspace) = root.read(cx).view().clone().downcast::<ChatSpace>() else {
return None;
};
let ids: Vec<u64> = chatspace
.read(cx)
fn get_all_panel_ids(&self, cx: &App) -> Option<Vec<u64>> {
let ids: Vec<u64> = self
.dock
.read(cx)
.items

View File

@@ -148,7 +148,8 @@ impl Onboarding {
client.set_signer(signer).await;
}
Err(e) => {
cx.update(|window, cx| {
log::warn!("Nostr Connect instance (QR Code) is timeout. TODO: fix this");
this.update_in(cx, |_, window, cx| {
window.push_notification(
Notification::error(e.to_string()).title("Nostr Connect"),
cx,

View File

@@ -6,7 +6,7 @@ use anyhow::{anyhow, Error};
use common::debounced_delay::DebouncedDelay;
use common::display::{ReadableTimestamp, TextUtils};
use global::constants::{BOOTSTRAP_RELAYS, SEARCH_RELAYS};
use global::nostr_client;
use global::{nostr_client, UnwrappingStatus};
use gpui::prelude::FluentBuilder;
use gpui::{
div, uniform_list, AnyElement, App, AppContext, Context, Entity, EventEmitter, FocusHandle,
@@ -33,7 +33,6 @@ mod list_item;
const FIND_DELAY: u64 = 600;
const FIND_LIMIT: usize = 10;
const TOTAL_SKELETONS: usize = 3;
pub fn init(window: &mut Window, cx: &mut App) -> Entity<Sidebar> {
Sidebar::new(window, cx)
@@ -587,7 +586,6 @@ impl Focusable for Sidebar {
impl Render for Sidebar {
fn render(&mut self, _window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
let registry = Registry::read_global(cx);
let loading = registry.loading;
// Get rooms from either search results or the chat registry
let rooms = if let Some(results) = self.local_result.read(cx).as_ref() {
@@ -606,10 +604,9 @@ impl Render for Sidebar {
// Get total rooms count
let mut total_rooms = rooms.len();
// If loading in progress
// Add 3 skeletons to the room list
if loading {
total_rooms += TOTAL_SKELETONS;
// Add 3 dummy rooms to display as skeletons
if registry.unwrapping_status.read(cx) != &UnwrappingStatus::Complete {
total_rooms += 3
}
v_flex()

View File

@@ -5,11 +5,10 @@ edition.workspace = true
publish.workspace = true
[dependencies]
nostr-connect.workspace = true
nostr-sdk.workspace = true
dirs.workspace = true
smol.workspace = true
futures.workspace = true
flume.workspace = true
log.workspace = true
anyhow.workspace = true

View File

@@ -51,9 +51,6 @@ pub const METADATA_BATCH_LIMIT: usize = 100;
/// Maximum timeout for grouping metadata requests. (milliseconds)
pub const METADATA_BATCH_TIMEOUT: u64 = 300;
/// Maximum timeout for waiting for finish (seconds)
pub const WAIT_FOR_FINISH: u64 = 60;
/// Default width of the sidebar.
pub const DEFAULT_SIDEBAR_WIDTH: f32 = 240.;

View File

@@ -3,10 +3,9 @@ use std::sync::atomic::AtomicBool;
use std::sync::OnceLock;
use std::time::Duration;
use nostr_connect::prelude::*;
use flume::{Receiver, Sender};
use nostr_sdk::prelude::*;
use paths::nostr_file;
use smol::channel::{Receiver, Sender};
use smol::lock::RwLock;
use crate::paths::support_dir;
@@ -46,6 +45,14 @@ impl Notice {
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
pub enum UnwrappingStatus {
#[default]
Initialized,
Processing,
Complete,
}
/// Signals sent through the global event channel to notify UI
#[derive(Debug)]
pub enum Signal {
@@ -67,11 +74,8 @@ pub enum Signal {
/// A signal to notify UI that a new gift wrap event has been received
Message((EventId, Event)),
/// A signal to notify UI that gift wrap events still processing
EventProcessing,
/// A signal to notify UI that gift wrap events have been processed
EventProcessed(bool),
/// A signal to notify UI that gift wrap process status has changed
GiftWrapProcess(UnwrappingStatus),
/// A signal to notify UI that no DM relay for current user was found
DmRelayNotFound,
@@ -94,7 +98,7 @@ impl Default for Ingester {
impl Ingester {
pub fn new() -> Self {
let (tx, rx) = smol::channel::bounded::<Signal>(2048);
let (tx, rx) = flume::bounded::<Signal>(2048);
Self { rx, tx }
}
@@ -103,7 +107,7 @@ impl Ingester {
}
pub async fn send(&self, signal: Signal) {
if let Err(e) = self.tx.send(signal).await {
if let Err(e) = self.tx.send_async(signal).await {
log::error!("Failed to send signal: {e}");
}
}
@@ -131,7 +135,7 @@ impl CoopSimpleStorage {
Self {
init_at: Timestamp::now(),
gift_wrap_sub_id: SubscriptionId::new("inbox"),
gift_wrap_processing: AtomicBool::new(true),
gift_wrap_processing: AtomicBool::new(false),
sent_ids: RwLock::new(HashSet::new()),
resent_ids: RwLock::new(Vec::new()),
resend_queue: RwLock::new(HashMap::new()),

View File

@@ -5,7 +5,7 @@ use anyhow::Error;
use common::event::EventUtils;
use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher;
use global::nostr_client;
use global::{nostr_client, UnwrappingStatus};
use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Task, WeakEntity, Window};
use itertools::Itertools;
use nostr_sdk::prelude::*;
@@ -41,10 +41,8 @@ pub struct Registry {
/// Collection of all persons (user profiles)
pub persons: HashMap<PublicKey, Entity<Profile>>,
/// Indicates if rooms are currently being loaded
///
/// Always equal to `true` when the app starts
pub loading: bool,
/// Status of the unwrapping process
pub unwrapping_status: Entity<UnwrappingStatus>,
/// Public Key of the current user
pub identity: Option<PublicKey>,
@@ -73,6 +71,7 @@ impl Registry {
/// Create a new Registry instance
pub(crate) fn new(cx: &mut Context<Self>) -> Self {
let unwrapping_status = cx.new(|_| UnwrappingStatus::default());
let mut tasks = smallvec![];
let load_local_persons: Task<Result<Vec<Profile>, Error>> =
@@ -104,10 +103,10 @@ impl Registry {
);
Self {
unwrapping_status,
rooms: vec![],
persons: HashMap::new(),
identity: None,
loading: true,
_tasks: tasks,
}
}
@@ -244,15 +243,17 @@ impl Registry {
}
/// Set the loading status of the registry.
pub fn set_loading(&mut self, status: bool, cx: &mut Context<Self>) {
self.loading = status;
cx.notify();
pub fn set_unwrapping_status(&mut self, status: UnwrappingStatus, cx: &mut Context<Self>) {
self.unwrapping_status.update(cx, |this, cx| {
*this = status;
cx.notify();
});
}
/// Reset the registry.
pub fn reset(&mut self, cx: &mut Context<Self>) {
// Reset the loading status (default: true)
self.loading = true;
// Reset the unwrapping status
self.set_unwrapping_status(UnwrappingStatus::default(), cx);
// Clear the current identity
self.identity = None;
@@ -264,7 +265,7 @@ impl Registry {
}
/// Load all rooms from the database.
pub fn load_rooms(&mut self, finish: bool, window: &mut Window, cx: &mut Context<Self>) {
pub fn load_rooms(&mut self, window: &mut Window, cx: &mut Context<Self>) {
log::info!("Starting to load chat rooms...");
// Get the contact bypass setting
@@ -342,9 +343,6 @@ impl Registry {
Ok(rooms) => {
this.update_in(cx, move |_, window, cx| {
cx.defer_in(window, move |this, _window, cx| {
if finish {
this.set_loading(false, cx);
}
this.extend_rooms(rooms, cx);
this.sort(cx);
});
@@ -405,12 +403,14 @@ impl Registry {
}
/// Refresh messages for a room in the global registry
pub fn refresh_rooms(&mut self, ids: Vec<u64>, cx: &mut Context<Self>) {
for room in self.rooms.iter() {
if ids.contains(&room.read(cx).id) {
room.update(cx, |this, cx| {
this.emit_refresh(cx);
});
pub fn refresh_rooms(&mut self, ids: Option<Vec<u64>>, cx: &mut Context<Self>) {
if let Some(ids) = ids {
for room in self.rooms.iter() {
if ids.contains(&room.read(cx).id) {
room.update(cx, |this, cx| {
this.emit_refresh(cx);
});
}
}
}
}