This commit is contained in:
2026-01-07 14:05:24 +07:00
parent 2d9563b888
commit 967355cd4a
12 changed files with 511 additions and 365 deletions

327
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use anyhow::{anyhow, Context as AnyhowContext, Error}; use anyhow::{anyhow, Context as AnyhowContext, Error};
use common::{EventUtils, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT}; use common::EventUtils;
use flume::Sender; use flume::Sender;
use fuzzy_matcher::skim::SkimMatcherV2; use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher; use fuzzy_matcher::FuzzyMatcher;
@@ -154,7 +154,6 @@ impl ChatRegistry {
let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION); let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
let mut notifications = client.notifications(); let mut notifications = client.notifications();
let mut public_keys = HashSet::new();
let mut processed_events = HashSet::new(); let mut processed_events = HashSet::new();
while let Ok(notification) = notifications.recv().await { while let Ok(notification) = notifications.recv().await {
@@ -177,41 +176,26 @@ impl ChatRegistry {
// Extract the rumor from the gift wrap event // Extract the rumor from the gift wrap event
match Self::extract_rumor(client, event.as_ref()).await { match Self::extract_rumor(client, event.as_ref()).await {
Ok(rumor) => { Ok(rumor) => match rumor.created_at >= initialized_at {
// Get all public keys true => {
public_keys.extend(rumor.all_pubkeys()); let sent_by_coop = {
let tracker = tracker().read().await;
tracker.is_sent_by_coop(&event.id)
};
let limit_reached = public_keys.len() >= METADATA_BATCH_LIMIT; if !sent_by_coop {
let done = !loading.load(Ordering::Acquire) && !public_keys.is_empty(); let new_message = NewMessage::new(event.id, rumor);
let signal = NostrEvent::Message(new_message);
// Get metadata for all public keys if the limit is reached if let Err(e) = tx.send_async(signal).await {
if limit_reached || done { log::error!("Failed to send signal: {}", e);
let public_keys = std::mem::take(&mut public_keys);
// Get metadata for the public keys
Self::get_metadata(client, public_keys).await.ok();
}
match rumor.created_at >= initialized_at {
true => {
let sent_by_coop = {
let tracker = tracker().read().await;
tracker.is_sent_by_coop(&event.id)
};
if !sent_by_coop {
let new_message = NewMessage::new(event.id, rumor);
let signal = NostrEvent::Message(new_message);
if let Err(e) = tx.send_async(signal).await {
log::error!("Failed to send signal: {}", e);
}
} }
} }
false => {
loading.store(true, Ordering::Release);
}
} }
} false => {
loading.store(true, Ordering::Release);
}
},
Err(e) => { Err(e) => {
log::warn!("Failed to unwrap gift wrap event: {}", e); log::warn!("Failed to unwrap gift wrap event: {}", e);
} }
@@ -230,7 +214,7 @@ impl ChatRegistry {
} }
async fn unwrapping_status(client: &Client, status: &Arc<AtomicBool>, tx: &Sender<NostrEvent>) { async fn unwrapping_status(client: &Client, status: &Arc<AtomicBool>, tx: &Sender<NostrEvent>) {
let loop_duration = Duration::from_secs(20); let loop_duration = Duration::from_secs(12);
let mut is_start_processing = false; let mut is_start_processing = false;
let mut total_loops = 0; let mut total_loops = 0;
@@ -240,22 +224,16 @@ impl ChatRegistry {
if status.load(Ordering::Acquire) { if status.load(Ordering::Acquire) {
is_start_processing = true; is_start_processing = true;
// Reset gift wrap processing flag // Reset gift wrap processing flag
_ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed); _ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed);
// Send loading signal tx.send_async(NostrEvent::Unwrapping(true)).await.ok();
if let Err(e) = tx.send_async(NostrEvent::Unwrapping(true)).await {
log::error!("Failed to send signal: {}", e);
}
} else { } else {
// Only run further if we are already processing // Only run further if we are already processing
// Wait until after 2 loops to prevent exiting early while events are still being processed // Wait until after 2 loops to prevent exiting early while events are still being processed
if is_start_processing && total_loops >= 2 { if is_start_processing && total_loops >= 2 {
// Send loading signal tx.send_async(NostrEvent::Unwrapping(false)).await.ok();
if let Err(e) = tx.send_async(NostrEvent::Unwrapping(false)).await {
log::error!("Failed to send signal: {}", e);
}
// Reset the counter // Reset the counter
is_start_processing = false; is_start_processing = false;
total_loops = 0; total_loops = 0;
@@ -654,33 +632,6 @@ impl ChatRegistry {
} }
} }
/// Get metadata for a list of public keys
async fn get_metadata<I>(client: &Client, public_keys: I) -> Result<(), Error>
where
I: IntoIterator<Item = PublicKey>,
{
let authors: Vec<PublicKey> = public_keys.into_iter().collect();
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let kinds = vec![Kind::Metadata, Kind::ContactList];
// Return if the list is empty
if authors.is_empty() {
return Err(anyhow!("You need at least one public key".to_string(),));
}
let filter = Filter::new()
.limit(authors.len() * kinds.len())
.authors(authors)
.kinds(kinds);
// Subscribe to filters to the bootstrap relays
client
.subscribe_to(BOOTSTRAP_RELAYS, filter, Some(opts))
.await?;
Ok(())
}
/// Get the conversation ID for a given rumor (message). /// Get the conversation ID for a given rumor (message).
fn conversation_id(rumor: &UnsignedEvent) -> u64 { fn conversation_id(rumor: &UnsignedEvent) -> u64 {
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();

View File

@@ -147,7 +147,7 @@ impl From<&UnsignedEvent> for Room {
let created_at = val.created_at; let created_at = val.created_at;
// Get the members from the event's tags and event's pubkey // Get the members from the event's tags and event's pubkey
let members = val.all_pubkeys(); let members = val.extract_public_keys();
// Get subject from tags // Get subject from tags
let subject = val let subject = val
@@ -228,16 +228,25 @@ impl Room {
} }
/// Returns the members of the room with their messaging relays /// Returns the members of the room with their messaging relays
pub fn members_with_relays(&self, cx: &App) -> Vec<(PublicKey, Vec<RelayUrl>)> { pub fn members_with_relays(&self, cx: &App) -> Task<Vec<(PublicKey, Vec<RelayUrl>)>> {
let nostr = NostrRegistry::global(cx); let nostr = NostrRegistry::global(cx);
let mut result = vec![]; let mut tasks = vec![];
for member in self.members.iter() { for member in self.members.iter() {
let messaging_relays = nostr.read(cx).messaging_relays(member, cx); let task = nostr.read(cx).messaging_relays(member, cx);
result.push((member.to_owned(), messaging_relays)); tasks.push((*member, task));
} }
result cx.background_spawn(async move {
let mut results = vec![];
for (public_key, task) in tasks.into_iter() {
let urls = task.await;
results.push((public_key, urls));
}
results
})
} }
/// Checks if the room has more than two members (group) /// Checks if the room has more than two members (group)
@@ -457,10 +466,12 @@ impl Room {
let rumor = rumor.to_owned(); let rumor = rumor.to_owned();
// Get all members and their messaging relays // Get all members and their messaging relays
let mut members = self.members_with_relays(cx); let task = self.members_with_relays(cx);
cx.background_spawn(async move { cx.background_spawn(async move {
let signer = client.signer().await?; let signer = client.signer().await?;
let current_user_relays = current_user_relays.await;
let mut members = task.await;
// Remove the current user's public key from the list of receivers // Remove the current user's public key from the list of receivers
// the current user will be handled separately // the current user will be handled separately

View File

@@ -27,8 +27,5 @@ pub const NOSTR_CONNECT_TIMEOUT: u64 = 200;
/// Default timeout (in seconds) for Nostr Connect (Bunker) /// Default timeout (in seconds) for Nostr Connect (Bunker)
pub const BUNKER_TIMEOUT: u64 = 30; pub const BUNKER_TIMEOUT: u64 = 30;
/// Total metadata requests will be grouped.
pub const METADATA_BATCH_LIMIT: usize = 20;
/// Default width of the sidebar. /// Default width of the sidebar.
pub const DEFAULT_SIDEBAR_WIDTH: f32 = 240.; pub const DEFAULT_SIDEBAR_WIDTH: f32 = 240.;

View File

@@ -5,19 +5,19 @@ use nostr_sdk::prelude::*;
pub trait EventUtils { pub trait EventUtils {
fn uniq_id(&self) -> u64; fn uniq_id(&self) -> u64;
fn all_pubkeys(&self) -> Vec<PublicKey>; fn extract_public_keys(&self) -> Vec<PublicKey>;
} }
impl EventUtils for Event { impl EventUtils for Event {
fn uniq_id(&self) -> u64 { fn uniq_id(&self) -> u64 {
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
let mut pubkeys: Vec<PublicKey> = self.all_pubkeys(); let mut pubkeys: Vec<PublicKey> = self.extract_public_keys();
pubkeys.sort(); pubkeys.sort();
pubkeys.hash(&mut hasher); pubkeys.hash(&mut hasher);
hasher.finish() hasher.finish()
} }
fn all_pubkeys(&self) -> Vec<PublicKey> { fn extract_public_keys(&self) -> Vec<PublicKey> {
let mut public_keys: Vec<PublicKey> = self.tags.public_keys().copied().collect(); let mut public_keys: Vec<PublicKey> = self.tags.public_keys().copied().collect();
public_keys.push(self.pubkey); public_keys.push(self.pubkey);
@@ -45,7 +45,7 @@ impl EventUtils for UnsignedEvent {
hasher.finish() hasher.finish()
} }
fn all_pubkeys(&self) -> Vec<PublicKey> { fn extract_public_keys(&self) -> Vec<PublicKey> {
let mut public_keys: Vec<PublicKey> = self.tags.public_keys().copied().collect(); let mut public_keys: Vec<PublicKey> = self.tags.public_keys().copied().collect();
public_keys.push(self.pubkey); public_keys.push(self.pubkey);
public_keys.into_iter().unique().sorted().collect() public_keys.into_iter().unique().sorted().collect()

View File

@@ -73,7 +73,6 @@ fn main() {
// Bring the app to the foreground // Bring the app to the foreground
cx.activate(true); cx.activate(true);
// Root Entity
cx.new(|cx| { cx.new(|cx| {
// Initialize the tokio runtime // Initialize the tokio runtime
gpui_tokio::init(cx); gpui_tokio::init(cx);
@@ -90,21 +89,22 @@ fn main() {
// Initialize the nostr client // Initialize the nostr client
state::init(cx); state::init(cx);
// Initialize person registry
person::init(cx);
// Initialize settings // Initialize settings
settings::init(cx); settings::init(cx);
// Initialize app registry
chat::init(cx);
// Initialize relay auth registry // Initialize relay auth registry
relay_auth::init(window, cx); relay_auth::init(window, cx);
// Initialize app registry
chat::init(cx);
// Initialize person registry
person::init(cx);
// Initialize auto update // Initialize auto update
auto_update::init(cx); auto_update::init(cx);
// Root Entity
Root::new(chatspace::init(window, cx).into(), window, cx) Root::new(chatspace::init(window, cx).into(), window, cx)
}) })
}) })

View File

@@ -263,13 +263,14 @@ impl UserProfile {
let write_relays = nostr.read(cx).write_relays(&public_key, cx); let write_relays = nostr.read(cx).write_relays(&public_key, cx);
cx.background_spawn(async move { cx.background_spawn(async move {
let urls = write_relays.await;
let signer = client.signer().await?; let signer = client.signer().await?;
// Sign the new metadata event // Sign the new metadata event
let event = EventBuilder::metadata(&new_metadata).sign(&signer).await?; let event = EventBuilder::metadata(&new_metadata).sign(&signer).await?;
// Send event to user's write relayss // Send event to user's write relayss
client.send_event_to(write_relays, &event).await?; client.send_event_to(urls, &event).await?;
// Return the updated profile // Return the updated profile
let metadata = Metadata::from_json(&event.content).unwrap_or_default(); let metadata = Metadata::from_json(&event.content).unwrap_or_default();

View File

@@ -164,6 +164,7 @@ impl SetupRelay {
let relays = self.relays.clone(); let relays = self.relays.clone();
let task: Task<Result<(), Error>> = cx.background_spawn(async move { let task: Task<Result<(), Error>> = cx.background_spawn(async move {
let urls = write_relays.await;
let signer = client.signer().await?; let signer = client.signer().await?;
let tags: Vec<Tag> = relays let tags: Vec<Tag> = relays
@@ -177,7 +178,7 @@ impl SetupRelay {
.await?; .await?;
// Set messaging relays // Set messaging relays
client.send_event_to(write_relays, &event).await?; client.send_event_to(urls, &event).await?;
// Connect to messaging relays // Connect to messaging relays
for relay in relays.iter() { for relay in relays.iter() {

View File

@@ -1,9 +1,14 @@
use std::cell::RefCell;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::rc::Rc;
use std::time::Duration;
use anyhow::{anyhow, Error};
use common::{EventUtils, BOOTSTRAP_RELAYS};
use gpui::{App, AppContext, Context, Entity, Global, Task}; use gpui::{App, AppContext, Context, Entity, Global, Task};
use nostr_sdk::prelude::*; use nostr_sdk::prelude::*;
use smallvec::{smallvec, SmallVec}; use smallvec::{smallvec, SmallVec};
use state::NostrRegistry; use state::{NostrRegistry, TIMEOUT};
pub fn init(cx: &mut App) { pub fn init(cx: &mut App) {
PersonRegistry::set_global(cx.new(PersonRegistry::new), cx); PersonRegistry::set_global(cx.new(PersonRegistry::new), cx);
@@ -19,8 +24,14 @@ pub struct PersonRegistry {
/// Collection of all persons (user profiles) /// Collection of all persons (user profiles)
persons: HashMap<PublicKey, Entity<Profile>>, persons: HashMap<PublicKey, Entity<Profile>>,
/// Set of public keys that have been seen
seen: Rc<RefCell<HashSet<PublicKey>>>,
/// Sender for requesting metadata
sender: flume::Sender<PublicKey>,
/// Tasks for asynchronous operations /// Tasks for asynchronous operations
_tasks: SmallVec<[Task<()>; 3]>, _tasks: SmallVec<[Task<()>; 4]>,
} }
impl PersonRegistry { impl PersonRegistry {
@@ -41,6 +52,7 @@ impl PersonRegistry {
// Channel for communication between nostr and gpui // Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<Profile>(100); let (tx, rx) = flume::bounded::<Profile>(100);
let (mta_tx, mta_rx) = flume::bounded::<PublicKey>(100);
let mut tasks = smallvec![]; let mut tasks = smallvec![];
@@ -49,7 +61,20 @@ impl PersonRegistry {
cx.background_spawn({ cx.background_spawn({
let client = client.clone(); let client = client.clone();
async move { Self::handle_notifications(&client, &tx).await } async move {
Self::handle_notifications(&client, &tx).await;
}
}),
);
tasks.push(
// Handle metadata requests
cx.background_spawn({
let client = client.clone();
async move {
Self::handle_requests(&client, &mta_rx).await;
}
}), }),
); );
@@ -89,6 +114,8 @@ impl PersonRegistry {
Self { Self {
persons: HashMap::new(), persons: HashMap::new(),
seen: Rc::new(RefCell::new(HashSet::new())),
sender: mta_tx,
_tasks: tasks, _tasks: tasks,
} }
} }
@@ -110,17 +137,82 @@ impl PersonRegistry {
continue; continue;
} }
// Only process metadata events match event.kind {
if event.kind == Kind::Metadata { Kind::Metadata => {
let metadata = Metadata::from_json(&event.content).unwrap_or_default(); let metadata = Metadata::from_json(&event.content).unwrap_or_default();
let profile = Profile::new(event.pubkey, metadata); let profile = Profile::new(event.pubkey, metadata);
tx.send_async(profile).await.ok(); tx.send_async(profile).await.ok();
}; }
Kind::ContactList => {
let public_keys = event.extract_public_keys();
Self::get_metadata(client, public_keys).await.ok();
}
_ => {}
}
} }
} }
} }
/// Handle request for metadata
async fn handle_requests(client: &Client, rx: &flume::Receiver<PublicKey>) {
let mut batch: HashSet<PublicKey> = HashSet::new();
loop {
match flume::Selector::new()
.recv(rx, |result| result.ok())
.wait_timeout(Duration::from_secs(2))
{
Ok(Some(public_key)) => {
log::info!("Received public key: {}", public_key);
batch.insert(public_key);
// Process the batch if it's full
if batch.len() >= 20 {
Self::get_metadata(client, std::mem::take(&mut batch))
.await
.ok();
}
}
_ => {
Self::get_metadata(client, std::mem::take(&mut batch))
.await
.ok();
}
}
}
}
/// Get metadata for all public keys in a event
async fn get_metadata<I>(client: &Client, public_keys: I) -> Result<(), Error>
where
I: IntoIterator<Item = PublicKey>,
{
let authors: Vec<PublicKey> = public_keys.into_iter().collect();
let limit = authors.len();
if authors.is_empty() {
return Err(anyhow!("You need at least one public key"));
}
// Construct the subscription option
let opts = SubscribeAutoCloseOptions::default()
.exit_policy(ReqExitPolicy::ExitOnEOSE)
.timeout(Some(Duration::from_secs(TIMEOUT)));
// Construct the filter for metadata
let filter = Filter::new()
.kind(Kind::Metadata)
.authors(authors)
.limit(limit);
client
.subscribe_to(BOOTSTRAP_RELAYS, filter, Some(opts))
.await?;
Ok(())
}
/// Load all user profiles from the database /// Load all user profiles from the database
async fn load_persons(client: &Client) -> Result<Vec<Profile>, Error> { async fn load_persons(client: &Client) -> Result<Vec<Profile>, Error> {
let filter = Filter::new().kind(Kind::Metadata).limit(200); let filter = Filter::new().kind(Kind::Metadata).limit(200);
@@ -165,10 +257,26 @@ impl PersonRegistry {
/// Get single person by public key /// Get single person by public key
pub fn get(&self, public_key: &PublicKey, cx: &App) -> Profile { pub fn get(&self, public_key: &PublicKey, cx: &App) -> Profile {
self.persons if let Some(profile) = self.persons.get(public_key) {
.get(public_key) return profile.read(cx).clone();
.map(|e| e.read(cx)) }
.cloned()
.unwrap_or(Profile::new(public_key.to_owned(), Metadata::default())) let public_key = *public_key;
let mut seen = self.seen.borrow_mut();
if seen.insert(public_key) {
let sender = self.sender.clone();
// Spawn background task to request metadata
cx.background_spawn(async move {
if let Err(e) = sender.send_async(public_key).await {
log::warn!("Failed to send public key for metadata request: {}", e);
}
})
.detach();
}
// Return a temporary profile with default metadata
Profile::new(public_key, Metadata::default())
} }
} }

View File

@@ -0,0 +1,34 @@
use gpui::SharedString;
use nostr_sdk::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Announcement {
id: EventId,
public_key: PublicKey,
client_name: Option<String>,
}
impl Announcement {
pub fn new(id: EventId, client_name: Option<String>, public_key: PublicKey) -> Self {
Self {
id,
client_name,
public_key,
}
}
pub fn id(&self) -> EventId {
self.id
}
pub fn public_key(&self) -> PublicKey {
self.public_key
}
pub fn client_name(&self) -> SharedString {
self.client_name
.as_ref()
.map(SharedString::from)
.unwrap_or(SharedString::from("Unknown"))
}
}

View File

@@ -1,5 +1,7 @@
use nostr_sdk::prelude::*; use nostr_sdk::prelude::*;
use crate::Announcement;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RelayState { pub enum RelayState {
#[default] #[default]
@@ -19,6 +21,9 @@ pub struct Identity {
/// The public key of the account /// The public key of the account
public_key: Option<PublicKey>, public_key: Option<PublicKey>,
/// Encryption key announcement
announcement: Option<Announcement>,
/// Status of the current user NIP-65 relays /// Status of the current user NIP-65 relays
relay_list: RelayState, relay_list: RelayState,
@@ -36,6 +41,7 @@ impl Identity {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
public_key: None, public_key: None,
announcement: None,
relay_list: RelayState::default(), relay_list: RelayState::default(),
messaging_relays: RelayState::default(), messaging_relays: RelayState::default(),
} }

View File

@@ -7,10 +7,12 @@ use gpui::{App, AppContext, Context, Entity, Global, Subscription, Task};
use nostr_lmdb::NostrLmdb; use nostr_lmdb::NostrLmdb;
use nostr_sdk::prelude::*; use nostr_sdk::prelude::*;
mod device;
mod event; mod event;
mod gossip; mod gossip;
mod identity; mod identity;
pub use device::*;
pub use event::*; pub use event::*;
pub use gossip::*; pub use gossip::*;
pub use identity::*; pub use identity::*;
@@ -116,15 +118,14 @@ impl NostrRegistry {
subscriptions.push( subscriptions.push(
// Observe the identity entity // Observe the identity entity
cx.observe(&identity, |this, state, cx| { cx.observe(&identity, |this, state, cx| {
let identity = state.read(cx); if state.read(cx).has_public_key() {
match state.read(cx).relay_list_state() {
if identity.has_public_key() {
match identity.relay_list_state() {
RelayState::Initial => { RelayState::Initial => {
this.get_relay_list(cx); this.get_relay_list(cx);
} }
RelayState::Set => match identity.messaging_relays_state() { RelayState::Set => match state.read(cx).messaging_relays_state() {
RelayState::Initial => { RelayState::Initial => {
this.get_profile(cx);
this.get_messaging_relays(cx); this.get_messaging_relays(cx);
} }
RelayState::Set => { RelayState::Set => {
@@ -138,30 +139,6 @@ impl NostrRegistry {
}), }),
); );
tasks.push(
// Establish connection to the bootstrap relays
cx.background_spawn({
let client = client.clone();
async move {
// Add bootstrap relay to the relay pool
for url in BOOTSTRAP_RELAYS.into_iter() {
client.add_relay(url).await?;
}
// Add search relay to the relay pool
for url in SEARCH_RELAYS.into_iter() {
client.add_relay(url).await?;
}
// Connect to all added relays
client.connect().await;
Ok(())
}
}),
);
tasks.push( tasks.push(
// Handle nostr notifications // Handle nostr notifications
cx.background_spawn({ cx.background_spawn({
@@ -208,6 +185,19 @@ impl NostrRegistry {
// Handle nostr notifications // Handle nostr notifications
async fn handle_notifications(client: &Client, tx: &flume::Sender<Event>) -> Result<(), Error> { async fn handle_notifications(client: &Client, tx: &flume::Sender<Event>) -> Result<(), Error> {
// Add bootstrap relay to the relay pool
for url in BOOTSTRAP_RELAYS.into_iter() {
client.add_relay(url).await?;
}
// Add search relay to the relay pool
for url in SEARCH_RELAYS.into_iter() {
client.add_relay(url).await?;
}
// Connect to all added relays
client.connect().await;
let mut notifications = client.notifications(); let mut notifications = client.notifications();
let mut processed_events = HashSet::new(); let mut processed_events = HashSet::new();
@@ -281,7 +271,7 @@ impl NostrRegistry {
// Ensure relay connections // Ensure relay connections
for relay in write_relays.iter() { for relay in write_relays.iter() {
client.add_write_relay(*relay).await?; client.add_relay(*relay).await?;
client.connect_relay(*relay).await?; client.connect_relay(*relay).await?;
} }
@@ -347,57 +337,51 @@ impl NostrRegistry {
} }
/// Get a list of write relays for a given public key /// Get a list of write relays for a given public key
pub fn write_relays(&self, public_key: &PublicKey, cx: &App) -> Vec<RelayUrl> { pub fn write_relays(&self, public_key: &PublicKey, cx: &App) -> Task<Vec<RelayUrl>> {
let client = self.client(); let client = self.client();
let relays = self.gossip.read(cx).write_relays(public_key); let relays = self.gossip.read(cx).write_relays(public_key);
let async_relays = relays.clone();
// Ensure relay connections
cx.background_spawn(async move { cx.background_spawn(async move {
for url in async_relays.iter() { // Ensure relay connections
client.add_write_relay(url).await.ok(); for url in relays.iter() {
client.connect_relay(url).await.ok();
}
})
.detach();
relays
}
/// Get a list of read relays for a given public key
pub fn read_relays(&self, public_key: &PublicKey, cx: &App) -> Vec<RelayUrl> {
let client = self.client();
let relays = self.gossip.read(cx).read_relays(public_key);
let async_relays = relays.clone();
// Ensure relay connections
cx.background_spawn(async move {
for url in async_relays.iter() {
client.add_read_relay(url).await.ok();
client.connect_relay(url).await.ok();
}
})
.detach();
relays
}
/// Get a list of messaging relays for a given public key
pub fn messaging_relays(&self, public_key: &PublicKey, cx: &App) -> Vec<RelayUrl> {
let client = self.client();
let relays = self.gossip.read(cx).messaging_relays(public_key);
let async_relays = relays.clone();
// Ensure relay connections
cx.background_spawn(async move {
for url in async_relays.iter() {
client.add_relay(url).await.ok(); client.add_relay(url).await.ok();
client.connect_relay(url).await.ok(); client.connect_relay(url).await.ok();
} }
})
.detach();
relays relays
})
}
/// Get a list of read relays for a given public key
pub fn read_relays(&self, public_key: &PublicKey, cx: &App) -> Task<Vec<RelayUrl>> {
let client = self.client();
let relays = self.gossip.read(cx).read_relays(public_key);
cx.background_spawn(async move {
// Ensure relay connections
for url in relays.iter() {
client.add_relay(url).await.ok();
client.connect_relay(url).await.ok();
}
relays
})
}
/// Get a list of messaging relays for a given public key
pub fn messaging_relays(&self, public_key: &PublicKey, cx: &App) -> Task<Vec<RelayUrl>> {
let client = self.client();
let relays = self.gossip.read(cx).messaging_relays(public_key);
cx.background_spawn(async move {
// Ensure relay connections
for url in relays.iter() {
client.add_relay(url).await.ok();
client.connect_relay(url).await.ok();
}
relays
})
} }
/// Set the signer for the nostr client and verify the public key /// Set the signer for the nostr client and verify the public key
@@ -479,9 +463,14 @@ impl NostrRegistry {
.await?; .await?;
while let Some((_url, res)) = stream.next().await { while let Some((_url, res)) = stream.next().await {
if let Ok(event) = res { match res {
log::info!("Received relay list event: {event:?}"); Ok(event) => {
return Ok(RelayState::Set); log::info!("Received relay list event: {event:?}");
return Ok(RelayState::Set);
}
Err(e) => {
log::error!("Failed to receive relay list event: {e}");
}
} }
} }
@@ -507,6 +496,48 @@ impl NostrRegistry {
})); }));
} }
/// Get profile and contact list for current user
fn get_profile(&mut self, cx: &mut Context<Self>) {
let client = self.client();
let public_key = self.identity().read(cx).public_key();
let write_relays = self.write_relays(&public_key, cx);
let task: Task<Result<(), Error>> = cx.background_spawn(async move {
let mut urls = vec![];
urls.extend(write_relays.await);
urls.extend(
BOOTSTRAP_RELAYS
.iter()
.filter_map(|url| RelayUrl::parse(url).ok()),
);
// Construct subscription options
let opts = SubscribeAutoCloseOptions::default()
.exit_policy(ReqExitPolicy::ExitOnEOSE)
.timeout(Some(Duration::from_secs(TIMEOUT)));
// Filter for metadata
let metadata = Filter::new()
.kind(Kind::Metadata)
.limit(1)
.author(public_key);
// Filter for contact list
let contact_list = Filter::new()
.kind(Kind::ContactList)
.limit(1)
.author(public_key);
client
.subscribe_to(urls, vec![metadata, contact_list], Some(opts))
.await?;
Ok(())
});
task.detach();
}
/// Get messaging relays for current user /// Get messaging relays for current user
fn get_messaging_relays(&mut self, cx: &mut Context<Self>) { fn get_messaging_relays(&mut self, cx: &mut Context<Self>) {
let client = self.client(); let client = self.client();
@@ -515,19 +546,28 @@ impl NostrRegistry {
let write_relays = self.write_relays(&public_key, cx); let write_relays = self.write_relays(&public_key, cx);
let task: Task<Result<RelayState, Error>> = cx.background_spawn(async move { let task: Task<Result<RelayState, Error>> = cx.background_spawn(async move {
let urls = write_relays.await;
// Construct the filter for inbox relays
let filter = Filter::new() let filter = Filter::new()
.kind(Kind::InboxRelays) .kind(Kind::InboxRelays)
.author(public_key) .author(public_key)
.limit(1); .limit(1);
// Stream events from the write relays
let mut stream = client let mut stream = client
.stream_events_from(write_relays, vec![filter], Duration::from_secs(TIMEOUT)) .stream_events_from(urls, vec![filter], Duration::from_secs(TIMEOUT))
.await?; .await?;
while let Some((_url, res)) = stream.next().await { while let Some((_url, res)) = stream.next().await {
if let Ok(event) = res { match res {
log::info!("Received messaging relays event: {event:?}"); Ok(event) => {
return Ok(RelayState::Set); log::info!("Received messaging relays event: {event:?}");
return Ok(RelayState::Set);
}
Err(e) => {
log::error!("Failed to get messaging relays: {e}");
}
} }
} }
@@ -560,11 +600,12 @@ impl NostrRegistry {
let messaging_relays = self.messaging_relays(&public_key, cx); let messaging_relays = self.messaging_relays(&public_key, cx);
cx.background_spawn(async move { cx.background_spawn(async move {
let urls = messaging_relays.await;
let id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION); let id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key); let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
if let Err(e) = client if let Err(e) = client
.subscribe_with_id_to(messaging_relays, id, vec![filter], None) .subscribe_with_id_to(urls, id, vec![filter], None)
.await .await
{ {
log::error!("Failed to subscribe to gift wrap events: {e}"); log::error!("Failed to subscribe to gift wrap events: {e}");
@@ -573,14 +614,6 @@ impl NostrRegistry {
.detach(); .detach();
} }
/// Publish an event to author's write relays
pub fn publish(&self, event: Event, cx: &App) -> Task<Result<Output<EventId>, Error>> {
let client = self.client();
let write_relays = self.write_relays(&event.pubkey, cx);
cx.background_spawn(async move { Ok(client.send_event_to(&write_relays, &event).await?) })
}
/// Subscribe to event kinds to author's write relays /// Subscribe to event kinds to author's write relays
pub fn subscribe<I>(&self, kinds: I, author: PublicKey, cx: &App) pub fn subscribe<I>(&self, kinds: I, author: PublicKey, cx: &App)
where where
@@ -596,16 +629,15 @@ impl NostrRegistry {
.map(|kind| Filter::new().kind(kind).author(author).limit(1)) .map(|kind| Filter::new().kind(kind).author(author).limit(1))
.collect(); .collect();
// Construct subscription options
let opts = SubscribeAutoCloseOptions::default()
.timeout(Some(Duration::from_secs(TIMEOUT)))
.exit_policy(ReqExitPolicy::ExitOnEOSE);
cx.background_spawn(async move { cx.background_spawn(async move {
if let Err(e) = client let urls = write_relays.await;
.subscribe_to(&write_relays, filters, Some(opts))
.await // Construct subscription options
{ let opts = SubscribeAutoCloseOptions::default()
.timeout(Some(Duration::from_secs(TIMEOUT)))
.exit_policy(ReqExitPolicy::ExitOnEOSE);
if let Err(e) = client.subscribe_to(urls, filters, Some(opts)).await {
log::error!("Failed to create a subscription: {e}"); log::error!("Failed to create a subscription: {e}");
}; };
}) })