chore: refactor subscription (#113)

* fix duplicate set signer request

* refactor

* .
This commit is contained in:
reya
2025-08-07 20:53:21 +07:00
committed by GitHub
parent 7b20131e3b
commit 8fca202c05
7 changed files with 177 additions and 219 deletions

View File

@@ -342,7 +342,7 @@ impl ChatSpace {
) -> impl IntoElement {
let proxy = AppSettings::get_proxy_user_avatars(cx);
let need_backup = Identity::read_global(cx).need_backup();
let relay_ready = Identity::read_global(cx).relay_ready();
let has_dm_relays = Identity::read_global(cx).has_dm_relays();
let updating = AutoUpdater::read_global(cx).status.is_updating();
let updated = AutoUpdater::read_global(cx).status.is_updated();
@@ -377,7 +377,7 @@ impl ChatSpace {
}),
)
})
.when_some(relay_ready, |this, status| {
.when_some(has_dm_relays, |this, status| {
this.when(!status, |this| this.child(messaging_relays::relay_button()))
})
.when_some(need_backup, |this, keys| {

View File

@@ -9,7 +9,7 @@ use global::constants::{
APP_ID, APP_NAME, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT, METADATA_BATCH_TIMEOUT,
SEARCH_RELAYS, WAIT_FOR_FINISH,
};
use global::{gift_wrap_sub_id, nostr_client, starting_time, NostrSignal};
use global::{nostr_client, processed_events, starting_time, NostrSignal};
use gpui::{
actions, point, px, size, App, AppContext, Application, Bounds, KeyBinding, Menu, MenuItem,
SharedString, TitlebarOptions, WindowBackgroundAppearance, WindowBounds, WindowDecorations,
@@ -50,9 +50,7 @@ fn main() {
let (signal_tx, signal_rx) = channel::bounded::<NostrSignal>(2048);
let (mta_tx, mta_rx) = channel::bounded::<PublicKey>(1024);
let (event_tx, event_rx) = channel::bounded::<Event>(2048);
let signal_tx_clone = signal_tx.clone();
let mta_tx_clone = mta_tx.clone();
app.background_executor()
.spawn(async move {
@@ -64,9 +62,7 @@ fn main() {
// Handle Nostr notifications.
//
// Send the redefined signal back to GPUI via channel.
if let Err(e) =
handle_nostr_notifications(client, &signal_tx_clone, &mta_tx_clone, &event_tx).await
{
if let Err(e) = handle_nostr_notifications(&signal_tx_clone, &event_tx).await {
log::error!("Failed to handle Nostr notifications: {e}");
}
})
@@ -75,6 +71,7 @@ fn main() {
app.background_executor()
.spawn(async move {
let duration = Duration::from_millis(METADATA_BATCH_TIMEOUT);
let mut processed_pubkeys: BTreeSet<PublicKey> = BTreeSet::new();
let mut batch: BTreeSet<PublicKey> = BTreeSet::new();
/// Internal events for the metadata batching system
@@ -102,20 +99,23 @@ fn main() {
match smol::future::or(recv(), timeout()).await {
BatchEvent::NewKeys(public_key) => {
batch.insert(public_key);
// Process immediately if batch limit reached
// Prevent duplicate keys from being processed
if processed_pubkeys.insert(public_key) {
batch.insert(public_key);
}
// Process the batch if it's full
if batch.len() >= METADATA_BATCH_LIMIT {
sync_data_for_pubkeys(client, std::mem::take(&mut batch)).await;
sync_data_for_pubkeys(std::mem::take(&mut batch)).await;
}
}
BatchEvent::Timeout => {
if !batch.is_empty() {
sync_data_for_pubkeys(client, std::mem::take(&mut batch)).await;
sync_data_for_pubkeys(std::mem::take(&mut batch)).await;
}
}
BatchEvent::Closed => {
if !batch.is_empty() {
sync_data_for_pubkeys(client, std::mem::take(&mut batch)).await;
sync_data_for_pubkeys(std::mem::take(&mut batch)).await;
}
break;
}
@@ -243,7 +243,7 @@ fn main() {
while let Ok(signal) = signal_rx.recv().await {
cx.update(|window, cx| {
let registry = Registry::global(cx);
let identity = Identity::read_global(cx);
let identity = Identity::global(cx);
match signal {
// Load chat rooms and stop the loading status
@@ -267,15 +267,6 @@ fn main() {
}
});
}
// Load chat rooms without setting as finished
NostrSignal::Eose(subscription_id) => {
// Only load chat rooms if the subscription matches the gift wrap subscription
if gift_wrap_sub_id() == &subscription_id {
registry.update(cx, |this, cx| {
this.load_rooms(window, cx);
});
}
}
// Add the new metadata to the registry or update the existing one
NostrSignal::Metadata(event) => {
registry.update(cx, |this, cx| {
@@ -284,12 +275,17 @@ fn main() {
}
// Convert the gift wrapped message to a message
NostrSignal::GiftWrap(event) => {
if let Some(public_key) = identity.public_key() {
if let Some(public_key) = identity.read(cx).public_key() {
registry.update(cx, |this, cx| {
this.event_to_message(public_key, event, window, cx);
});
}
}
NostrSignal::DmRelaysFound => {
identity.update(cx, |this, cx| {
this.set_has_dm_relays(cx);
});
}
NostrSignal::Notice(_msg) => {
// window.push_notification(msg, cx);
}
@@ -356,67 +352,116 @@ async fn connect(client: &Client) -> Result<(), Error> {
}
async fn handle_nostr_notifications(
client: &Client,
signal_tx: &Sender<NostrSignal>,
mta_tx: &Sender<PublicKey>,
event_tx: &Sender<Event>,
) -> Result<(), Error> {
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let client = nostr_client();
let auto_close = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let mut notifications = client.notifications();
let mut processed_events: BTreeSet<EventId> = BTreeSet::new();
let mut processed_dm_relays: BTreeSet<PublicKey> = BTreeSet::new();
let mut processed_relay_list = false;
let mut processed_inbox_relay = false;
while let Ok(notification) = notifications.recv().await {
let RelayPoolNotification::Message { message, .. } = notification else {
continue;
};
match message {
RelayMessage::Event { event, .. } => {
if processed_events.contains(&event.id) {
continue;
}
// Skip events that have already been processed
processed_events.insert(event.id);
let RelayMessage::Event { event, .. } = message else {
continue;
};
match event.kind {
Kind::GiftWrap => {
event_tx.send(event.into_owned()).await.ok();
}
Kind::Metadata => {
signal_tx
.send(NostrSignal::Metadata(event.into_owned()))
.await
.ok();
}
Kind::ContactList => {
if let Ok(true) = check_author(client, &event).await {
for public_key in event.tags.public_keys().copied() {
mta_tx.send(public_key).await.ok();
}
}
}
Kind::RelayList => {
if processed_dm_relays.contains(&event.pubkey) {
continue;
}
// Skip public keys that have already been processed
processed_dm_relays.insert(event.pubkey);
// Skip events that have already been processed
if !processed_events().write().await.insert(event.id) {
continue;
}
let filter = Filter::new()
.author(event.pubkey)
.kind(Kind::InboxRelays)
.limit(1);
client.subscribe(filter, Some(opts)).await.ok();
match event.kind {
Kind::RelayList => {
// Get metadata for event's pubkey that matches the current user's pubkey
if let Ok(true) = is_from_current_user(&event).await {
match processed_relay_list {
true => continue,
false => processed_relay_list = true,
}
_ => {}
let sub_id = SubscriptionId::new("metadata");
let filter = Filter::new()
.kinds(vec![Kind::Metadata, Kind::ContactList, Kind::InboxRelays])
.author(event.pubkey)
.limit(10);
client
.subscribe_with_id(sub_id, filter, Some(auto_close))
.await
.ok();
}
}
RelayMessage::EndOfStoredEvents(subscription_id) => {
Kind::InboxRelays => {
if let Ok(true) = is_from_current_user(&event).await {
match processed_inbox_relay {
true => continue,
false => processed_inbox_relay = true,
}
// Get all inbox relays
let relays = event
.tags
.filter_standardized(TagKind::Relay)
.filter_map(|t| {
if let TagStandard::Relay(url) = t {
Some(url.to_owned())
} else {
None
}
})
.collect_vec();
if !relays.is_empty() {
// Add relays to nostr client
for relay in relays.iter() {
_ = client.add_relay(relay).await;
_ = client.connect_relay(relay).await;
}
log::info!("Connected to messaging relays");
let filter = Filter::new().kind(Kind::GiftWrap).pubkey(event.pubkey);
let sub_id = SubscriptionId::new("gift-wrap");
// Notify the UI that the current user has set up the DM relays
signal_tx.send(NostrSignal::DmRelaysFound).await.ok();
if client
.subscribe_with_id_to(relays.clone(), sub_id, filter, None)
.await
.is_ok()
{
log::info!("Subscribing to gift wrap events in: {relays:?}");
}
}
}
}
Kind::ContactList => {
if let Ok(true) = is_from_current_user(&event).await {
let public_keys: Vec<PublicKey> = event.tags.public_keys().copied().collect();
let kinds = vec![Kind::Metadata, Kind::ContactList];
let lens = public_keys.len() * kinds.len();
let filter = Filter::new().limit(lens).authors(public_keys).kinds(kinds);
client
.subscribe_to(BOOTSTRAP_RELAYS, filter, Some(auto_close))
.await
.ok();
}
}
Kind::Metadata => {
signal_tx
.send(NostrSignal::Eose(subscription_id.into_owned()))
.await?;
.send(NostrSignal::Metadata(event.into_owned()))
.await
.ok();
}
Kind::GiftWrap => {
event_tx.send(event.into_owned()).await.ok();
}
_ => {}
}
@@ -425,28 +470,28 @@ async fn handle_nostr_notifications(
Ok(())
}
async fn check_author(client: &Client, event: &Event) -> Result<bool, Error> {
async fn is_from_current_user(event: &Event) -> Result<bool, Error> {
let client = nostr_client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
Ok(public_key == event.pubkey)
}
async fn sync_data_for_pubkeys(client: &Client, public_keys: BTreeSet<PublicKey>) {
async fn sync_data_for_pubkeys(public_keys: BTreeSet<PublicKey>) {
let client = nostr_client();
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let kinds = vec![Kind::Metadata, Kind::ContactList, Kind::RelayList];
let kinds = vec![Kind::Metadata, Kind::ContactList];
let filter = Filter::new()
.limit(public_keys.len() * kinds.len())
.authors(public_keys)
.kinds(kinds);
if let Err(e) = client
client
.subscribe_to(BOOTSTRAP_RELAYS, filter, Some(opts))
.await
{
log::error!("Failed to sync metadata: {e}");
}
.ok();
}
/// Stores an unwrapped event in local database with reference to original

View File

@@ -1,7 +1,7 @@
use std::time::Duration;
use anyhow::{anyhow, Error};
use global::constants::{GIFT_WRAP_SUB_ID, NIP17_RELAYS};
use global::constants::NIP17_RELAYS;
use global::nostr_client;
use gpui::prelude::FluentBuilder;
use gpui::{
@@ -10,8 +10,6 @@ use gpui::{
TextAlign, UniformList, Window,
};
use i18n::{shared_t, t};
use identity::Identity;
use itertools::Itertools;
use nostr_sdk::prelude::*;
use smallvec::{smallvec, SmallVec};
use theme::ActiveTheme;
@@ -189,15 +187,12 @@ impl MessagingRelays {
let task: Task<Result<(), Error>> = cx.background_spawn(async move {
let client = nostr_client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let tags: Vec<Tag> = relays
.iter()
.map(|relay| Tag::relay(relay.clone()))
.collect();
let builder = EventBuilder::new(Kind::InboxRelays, "").tags(
relays
.iter()
.map(|relay| Tag::relay(relay.clone()))
.collect_vec(),
);
let builder = EventBuilder::new(Kind::InboxRelays, "").tags(tags);
// Set messaging relays
client.send_event_builder(builder).await?;
@@ -208,15 +203,6 @@ impl MessagingRelays {
_ = client.connect_relay(&relay).await;
}
let id = SubscriptionId::new(GIFT_WRAP_SUB_ID);
let new_messages = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
// Close old subscriptions
client.unsubscribe(&id).await;
// Subscribe to new messages
client.subscribe_with_id(id, new_messages, None).await?;
Ok(())
});
@@ -224,10 +210,6 @@ impl MessagingRelays {
match task.await {
Ok(_) => {
cx.update(|window, cx| {
Identity::global(cx).update(cx, |this, cx| {
this.verify_dm_relays(window, cx);
});
// Close the current modal
window.close_modal(cx);
})
.ok();

View File

@@ -580,6 +580,7 @@ impl Sidebar {
});
}
#[allow(dead_code)]
fn skeletons(&self, total: i32) -> impl IntoIterator<Item = impl IntoElement> {
(0..total).map(|_| {
div()
@@ -788,16 +789,6 @@ impl Render for Sidebar {
),
),
)
.when(registry.loading, |this| {
this.child(
div()
.flex_1()
.flex()
.flex_col()
.gap_1()
.children(self.skeletons(1)),
)
})
.child(
uniform_list(
"rooms",

View File

@@ -8,10 +8,9 @@ pub const ACCOUNT_D: &str = "coop:account";
pub const SETTINGS_D: &str = "coop:settings";
/// Bootstrap Relays.
pub const BOOTSTRAP_RELAYS: [&str; 5] = [
pub const BOOTSTRAP_RELAYS: [&str; 4] = [
"wss://relay.damus.io",
"wss://relay.primal.net",
"wss://nostr.Wine",
"wss://user.kindpag.es",
"wss://purplepag.es",
];
@@ -36,14 +35,11 @@ pub const NOSTR_CONNECT_RELAY: &str = "wss://relay.nsec.app";
/// Default timeout (in seconds) for Nostr Connect
pub const NOSTR_CONNECT_TIMEOUT: u64 = 200;
/// Unique ID for all gift wraps subscription.
pub const GIFT_WRAP_SUB_ID: &str = "listen_for_giftwraps";
/// Total metadata requests will be grouped.
pub const METADATA_BATCH_LIMIT: usize = 100;
/// Maximum timeout for grouping metadata requests.
pub const METADATA_BATCH_TIMEOUT: u64 = 400;
/// Maximum timeout for grouping metadata requests. (milliseconds)
pub const METADATA_BATCH_TIMEOUT: u64 = 300;
/// Maximum timeout for waiting for finish (seconds)
pub const WAIT_FOR_FINISH: u64 = 60;

View File

@@ -1,11 +1,12 @@
use std::collections::BTreeSet;
use std::sync::OnceLock;
use std::time::Duration;
use nostr_connect::prelude::*;
use nostr_sdk::prelude::*;
use paths::nostr_file;
use smol::lock::RwLock;
use crate::constants::GIFT_WRAP_SUB_ID;
use crate::paths::support_dir;
pub mod constants;
@@ -26,15 +27,15 @@ pub enum NostrSignal {
/// Partially finished processing all gift wrap events
PartialFinish,
/// Receives EOSE response from relay pool
Eose(SubscriptionId),
/// DM relays have been found
DmRelaysFound,
/// Notice from Relay Pool
Notice(String),
}
static NOSTR_CLIENT: OnceLock<Client> = OnceLock::new();
static GIFT_WRAP_ID: OnceLock<SubscriptionId> = OnceLock::new();
static PROCESSED_EVENTS: OnceLock<RwLock<BTreeSet<EventId>>> = OnceLock::new();
static CURRENT_TIMESTAMP: OnceLock<Timestamp> = OnceLock::new();
static FIRST_RUN: OnceLock<bool> = OnceLock::new();
@@ -50,22 +51,20 @@ pub fn nostr_client() -> &'static Client {
let lmdb = NostrLMDB::open(nostr_file()).expect("Database is NOT initialized");
let opts = ClientOptions::new()
// Coop isn't social client,
// but it needs this option because it needs user's NIP65 Relays to fetch NIP17 Relays.
.gossip(true)
// TODO: Coop should handle authentication by itself
.automatic_authentication(true)
// Sleep after idle for 5 seconds
.verify_subscriptions(false)
// Sleep after idle for 20 seconds
.sleep_when_idle(SleepWhenIdle::Enabled {
timeout: Duration::from_secs(10),
timeout: Duration::from_secs(20),
});
ClientBuilder::default().database(lmdb).opts(opts).build()
})
}
pub fn gift_wrap_sub_id() -> &'static SubscriptionId {
GIFT_WRAP_ID.get_or_init(|| SubscriptionId::new(GIFT_WRAP_SUB_ID))
pub fn processed_events() -> &'static RwLock<BTreeSet<EventId>> {
PROCESSED_EVENTS.get_or_init(|| RwLock::new(BTreeSet::new()))
}
pub fn starting_time() -> &'static Timestamp {

View File

@@ -4,7 +4,7 @@ use anyhow::{anyhow, Error};
use client_keys::ClientKeys;
use common::handle_auth::CoopAuthUrlHandler;
use global::constants::{ACCOUNT_D, NIP17_RELAYS, NIP65_RELAYS, NOSTR_CONNECT_TIMEOUT};
use global::{gift_wrap_sub_id, nostr_client};
use global::nostr_client;
use gpui::prelude::FluentBuilder;
use gpui::{
div, red, App, AppContext, Context, Entity, Global, ParentElement, SharedString, Styled,
@@ -29,7 +29,7 @@ impl Global for GlobalIdentity {}
pub struct Identity {
public_key: Option<PublicKey>,
logging_in: bool,
relay_ready: Option<bool>,
has_dm_relays: Option<bool>,
need_backup: Option<Keys>,
need_onboarding: bool,
#[allow(dead_code)]
@@ -73,8 +73,8 @@ impl Identity {
Self {
public_key: None,
relay_ready: None,
need_backup: None,
has_dm_relays: None,
need_onboarding: false,
logging_in: false,
subscriptions,
@@ -129,6 +129,7 @@ impl Identity {
// Unset signer
client.unset_signer().await;
// Delete account
client.database().delete(filter).await?;
@@ -345,7 +346,7 @@ impl Identity {
}
/// Sets a new signer for the client and updates user identity
pub fn set_signer<S>(&self, signer: S, window: &mut Window, cx: &mut Context<Self>)
pub fn set_signer<S>(&mut self, signer: S, window: &mut Window, cx: &mut Context<Self>)
where
S: NostrSigner + 'static,
{
@@ -357,7 +358,7 @@ impl Identity {
client.set_signer(signer).await;
// Subscribe for user metadata
Self::subscribe(client, public_key).await?;
get_nip65_relays(public_key).await?;
Ok(public_key)
});
@@ -422,11 +423,12 @@ impl Identity {
// Set user's NIP65 relays
client.send_event_builder(relay_list).await?;
// Set user's NIP17 relays
client.send_event_builder(dm_relay).await?;
// Subscribe for user metadata
Self::subscribe(client, public_key).await?;
// Get user's NIP65 relays
get_nip65_relays(public_key).await?;
Ok(public_key)
});
@@ -547,60 +549,15 @@ impl Identity {
.detach();
}
pub fn verify_dm_relays(&self, window: &mut Window, cx: &mut Context<Self>) {
let Some(public_key) = self.public_key() else {
return;
};
let task: Task<bool> = cx.background_spawn(async move {
let client = nostr_client();
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
let Ok(events) = client.database().query(filter).await else {
return false;
};
let Some(event) = events.first() else {
return false;
};
let relays: Vec<RelayUrl> = event
.tags
.filter(TagKind::Relay)
.filter_map(|tag| RelayUrl::parse(tag.content()?).ok())
.collect();
!relays.is_empty()
});
cx.spawn_in(window, async move |this, cx| {
let result = task.await;
this.update(cx, |this, cx| {
this.relay_ready = Some(result);
cx.notify();
})
.ok();
})
.detach();
}
/// Sets the public key of the identity
pub(crate) fn set_public_key(
&mut self,
public_key: Option<PublicKey>,
window: &mut Window,
_window: &mut Window,
cx: &mut Context<Self>,
) {
self.public_key = public_key;
cx.notify();
// Run verify user's dm relays task
cx.defer_in(window, |this, window, cx| {
this.verify_dm_relays(window, cx);
});
}
/// Returns the current identity's public key
@@ -613,8 +570,9 @@ impl Identity {
self.public_key.is_some()
}
pub fn relay_ready(&self) -> Option<bool> {
self.relay_ready
/// Returns true if the identity has DM Relays
pub fn has_dm_relays(&self) -> Option<bool> {
self.has_dm_relays
}
/// Returns true if the identity is currently logging in
@@ -622,45 +580,32 @@ impl Identity {
self.logging_in
}
/// Sets the DM Relays status of the identity
pub fn set_has_dm_relays(&mut self, cx: &mut Context<Self>) {
self.has_dm_relays = Some(true);
cx.notify();
}
/// Sets the logging in status of the identity
pub(crate) fn set_logging_in(&mut self, status: bool, cx: &mut Context<Self>) {
self.logging_in = status;
cx.notify();
}
pub(crate) async fn subscribe(client: &Client, public_key: PublicKey) -> Result<(), Error> {
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
client
.subscribe_with_id(
gift_wrap_sub_id().to_owned(),
Filter::new().kind(Kind::GiftWrap).pubkey(public_key),
None,
)
.await?;
client
.subscribe(
Filter::new()
.author(public_key)
.kinds(vec![Kind::Metadata, Kind::ContactList, Kind::RelayList])
.since(Timestamp::now()),
None,
)
.await?;
client
.subscribe(
Filter::new()
.kinds(vec![Kind::Metadata, Kind::ContactList, Kind::RelayList])
.author(public_key)
.limit(10),
Some(opts),
)
.await?;
log::info!("Getting all user's metadata and messages...");
Ok(())
}
}
async fn get_nip65_relays(public_key: PublicKey) -> Result<(), Error> {
let client = nostr_client();
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let sub_id = SubscriptionId::new("nip65-relays");
let filter = Filter::new()
.kind(Kind::RelayList)
.author(public_key)
.limit(1);
if client.subscription(&sub_id).await.is_empty() {
client.subscribe_with_id(sub_id, filter, Some(opts)).await?;
}
Ok(())
}