chore: refactor event fetching (#148)

* use stream for nip65 and nip17 relays fetching

* .
This commit is contained in:
reya
2025-09-13 07:42:17 +07:00
committed by GitHub
parent 2ea2519e8b
commit b142982ab1
2 changed files with 112 additions and 126 deletions

View File

@@ -216,67 +216,60 @@ impl ChatSpace {
async fn observe_signer() {
let client = nostr_client();
let ingester = ingester();
let stream_timeout = Duration::from_secs(5);
let loop_duration = Duration::from_secs(1);
let mut is_sent_signal = false;
let mut identity: Option<PublicKey> = None;
loop {
if let Some(public_key) = identity {
let nip65 = Filter::new().kind(Kind::RelayList).author(public_key);
let Ok(signer) = client.signer().await else {
smol::Timer::after(loop_duration).await;
continue;
};
if client.database().count(nip65).await.unwrap_or(0) > 0 {
let dm_relays = Filter::new().kind(Kind::InboxRelays).author(public_key);
let Ok(public_key) = signer.get_public_key().await else {
smol::Timer::after(loop_duration).await;
continue;
};
match client.database().query(dm_relays).await {
Ok(events) => {
if let Some(event) = events.first_owned() {
let relay_urls = nip17::extract_relay_list(&event).collect_vec();
// Notify the app that the signer has been set.
ingester.send(Signal::SignerSet(public_key)).await;
if relay_urls.is_empty() {
if !is_sent_signal {
ingester.send(Signal::DmRelayNotFound).await;
is_sent_signal = true;
}
} else {
break;
}
} else if !is_sent_signal {
ingester.send(Signal::DmRelayNotFound).await;
is_sent_signal = true;
} else {
break;
}
}
Err(e) => {
log::error!("Database query error: {e}");
if !is_sent_signal {
ingester.send(Signal::DmRelayNotFound).await;
is_sent_signal = true;
}
}
}
} else {
log::error!("Database error.");
break;
}
} else {
// Wait for signer set
if let Ok(signer) = client.signer().await {
if let Ok(public_key) = signer.get_public_key().await {
identity = Some(public_key);
// Notify the app that the signer has been set.
ingester.send(Signal::SignerSet(public_key)).await;
// Subscribe to the NIP-65 relays for the public key.
if let Err(e) = Self::fetch_nip65_relays(public_key).await {
log::error!("Failed to fetch NIP-65 relays: {e}");
// Subscribe to the NIP-65 relays for the public key.
let filter = Filter::new()
.kind(Kind::RelayList)
.author(public_key)
.limit(1);
match client
.stream_events_from(BOOTSTRAP_RELAYS, filter, stream_timeout)
.await
{
Ok(mut stream) => {
let mut processed_ids = HashSet::new();
if let Some(event) = stream.next().await {
if processed_ids.insert(event.id) {
// Fetch user's metadata event
Self::fetch_single_event(Kind::Metadata, event.pubkey).await;
// Fetch user's contact list event
Self::fetch_single_event(Kind::ContactList, event.pubkey).await;
// Fetch user's inbox relays event
Self::fetch_nip17_relays(event.pubkey).await;
break;
}
} else {
ingester.send(Signal::DmRelayNotFound).await;
}
}
}
Err(e) => {
log::error!("Error fetching NIP-17 Relay: {e:?}");
ingester.send(Signal::DmRelayNotFound).await;
}
};
smol::Timer::after(loop_duration).await;
break;
}
}
@@ -395,39 +388,6 @@ impl ChatSpace {
}
match event.kind {
Kind::RelayList => {
if let Ok(true) = Self::is_self_event(&event).await {
// Fetch user's metadata event
Self::fetch_single_event(Kind::Metadata, event.pubkey).await;
// Fetch user's contact list event
Self::fetch_single_event(Kind::ContactList, event.pubkey).await;
// Fetch user's inbox relays event
Self::fetch_single_event(Kind::InboxRelays, event.pubkey).await;
}
}
Kind::InboxRelays => {
if let Ok(true) = Self::is_self_event(&event).await {
let relays = nip17::extract_relay_list(&event).collect_vec();
if !relays.is_empty() {
for relay in relays.clone().into_iter() {
if client.add_relay(relay).await.is_err() {
let notice = Notice::RelayFailed(relay.clone());
ingester.send(Signal::Notice(notice)).await;
}
if client.connect_relay(relay).await.is_err() {
let notice = Notice::RelayFailed(relay.clone());
ingester.send(Signal::Notice(notice)).await;
}
}
// Subscribe to gift wrap events only in the current user's NIP-17 relays
Self::fetch_gift_wrap(relays, event.pubkey).await;
}
}
}
Kind::ContactList => {
if let Ok(true) = Self::is_self_event(&event).await {
let public_keys = event.tags.public_keys().copied().collect_vec();
@@ -610,6 +570,49 @@ impl ChatSpace {
}
}
pub async fn fetch_nip17_relays(public_key: PublicKey) {
let client = nostr_client();
let ingester = ingester();
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
match client.stream_events(filter, Duration::from_secs(5)).await {
Ok(mut stream) => {
let mut processed_ids = HashSet::new();
if let Some(event) = stream.next().await {
if processed_ids.insert(event.id) {
let relays = nip17::extract_relay_list(&event).collect_vec();
if !relays.is_empty() {
for relay in relays.clone().into_iter() {
if client.add_relay(relay).await.is_err() {
let notice = Notice::RelayFailed(relay.clone());
ingester.send(Signal::Notice(notice)).await;
}
if client.connect_relay(relay).await.is_err() {
let notice = Notice::RelayFailed(relay.clone());
ingester.send(Signal::Notice(notice)).await;
}
}
// Subscribe to gift wrap events only in the current user's NIP-17 relays
Self::fetch_gift_wrap(relays, event.pubkey).await;
}
}
} else {
ingester.send(Signal::DmRelayNotFound).await;
}
}
Err(e) => {
log::error!("Error fetching NIP-17 Relay: {e:?}");
ingester.send(Signal::DmRelayNotFound).await;
}
};
}
pub async fn fetch_gift_wrap(relays: Vec<&RelayUrl>, public_key: PublicKey) {
let client = nostr_client();
let sub_id = css().gift_wrap_sub_id.clone();
@@ -624,23 +627,6 @@ impl ChatSpace {
}
}
/// Fetches NIP-65 relay list for a given public key
pub async fn fetch_nip65_relays(public_key: PublicKey) -> Result<(), Error> {
let client = nostr_client();
let css = css();
let filter = Filter::new()
.kind(Kind::RelayList)
.author(public_key)
.limit(1);
client
.subscribe_to(BOOTSTRAP_RELAYS, filter, css.auto_close_opts)
.await?;
Ok(())
}
/// Fetches metadata for a list of public keys
async fn fetch_metadata_for_pubkeys(public_keys: HashSet<PublicKey>) {
if public_keys.is_empty() {