chore: improve gossip implementation (#184)
* add send event function * add set nip17 and set nip65 functions * setup gossip relays * .
This commit is contained in:
@@ -19,17 +19,6 @@ pub const BOOTSTRAP_RELAYS: [&str; 5] = [
|
||||
/// Search Relays.
|
||||
pub const SEARCH_RELAYS: [&str; 1] = ["wss://relay.nostr.band"];
|
||||
|
||||
/// NIP65 Relays. Used for new account
|
||||
pub const NIP65_RELAYS: [&str; 4] = [
|
||||
"wss://relay.damus.io",
|
||||
"wss://relay.primal.net",
|
||||
"wss://relay.nostr.net",
|
||||
"wss://nos.lol",
|
||||
];
|
||||
|
||||
/// Messaging Relays. Used for new account
|
||||
pub const NIP17_RELAYS: [&str; 2] = ["wss://nip17.com", "wss://auth.nostr1.com"];
|
||||
|
||||
/// Default relay for Nostr Connect
|
||||
pub const NOSTR_CONNECT_RELAY: &str = "wss://relay.nsec.app";
|
||||
|
||||
|
||||
@@ -13,6 +13,8 @@ pub mod state;
|
||||
|
||||
static APP_STATE: OnceLock<AppState> = OnceLock::new();
|
||||
static NOSTR_CLIENT: OnceLock<Client> = OnceLock::new();
|
||||
static NIP65_RELAYS: OnceLock<Vec<(RelayUrl, Option<RelayMetadata>)>> = OnceLock::new();
|
||||
static NIP17_RELAYS: OnceLock<Vec<RelayUrl>> = OnceLock::new();
|
||||
|
||||
/// Initialize the application state.
|
||||
pub fn app_state() -> &'static AppState {
|
||||
@@ -42,3 +44,39 @@ pub fn nostr_client() -> &'static Client {
|
||||
ClientBuilder::default().database(lmdb).opts(opts).build()
|
||||
})
|
||||
}
|
||||
|
||||
/// Default NIP65 Relays. Used for new account
|
||||
pub fn default_nip65_relays() -> &'static Vec<(RelayUrl, Option<RelayMetadata>)> {
|
||||
NIP65_RELAYS.get_or_init(|| {
|
||||
vec![
|
||||
(
|
||||
RelayUrl::parse("wss://nostr.mom").unwrap(),
|
||||
Some(RelayMetadata::Read),
|
||||
),
|
||||
(
|
||||
RelayUrl::parse("wss://nostr.bitcoiner.social").unwrap(),
|
||||
Some(RelayMetadata::Read),
|
||||
),
|
||||
(
|
||||
RelayUrl::parse("wss://nostr.oxtr.dev").unwrap(),
|
||||
Some(RelayMetadata::Write),
|
||||
),
|
||||
(
|
||||
RelayUrl::parse("wss://nostr.fmt.wiz.biz").unwrap(),
|
||||
Some(RelayMetadata::Write),
|
||||
),
|
||||
(RelayUrl::parse("wss://relay.primal.net").unwrap(), None),
|
||||
(RelayUrl::parse("wss://relay.damus.io").unwrap(), None),
|
||||
]
|
||||
})
|
||||
}
|
||||
|
||||
/// Default NIP17 Relays. Used for new account
|
||||
pub fn default_nip17_relays() -> &'static Vec<RelayUrl> {
|
||||
NIP17_RELAYS.get_or_init(|| {
|
||||
vec![
|
||||
RelayUrl::parse("wss://nip17.com").unwrap(),
|
||||
RelayUrl::parse("wss://auth.nostr1.com").unwrap(),
|
||||
]
|
||||
})
|
||||
}
|
||||
|
||||
@@ -15,10 +15,12 @@ pub struct Gossip {
|
||||
}
|
||||
|
||||
impl Gossip {
|
||||
/// Parse and insert NIP-65 or NIP-17 relays into the gossip state.
|
||||
pub fn insert(&mut self, event: &Event) {
|
||||
match event.kind {
|
||||
Kind::InboxRelays => {
|
||||
let urls: Vec<RelayUrl> = nip17::extract_relay_list(event).cloned().collect();
|
||||
let urls: Vec<RelayUrl> =
|
||||
nip17::extract_relay_list(event).take(3).cloned().collect();
|
||||
|
||||
if !urls.is_empty() {
|
||||
self.nip17.entry(event.pubkey).or_default().extend(urls);
|
||||
@@ -37,6 +39,7 @@ impl Gossip {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get all write relays for a given public key
|
||||
pub fn write_relays(&self, public_key: &PublicKey) -> Vec<&RelayUrl> {
|
||||
self.nip65
|
||||
.get(public_key)
|
||||
@@ -51,6 +54,7 @@ impl Gossip {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Get all read relays for a given public key
|
||||
pub fn read_relays(&self, public_key: &PublicKey) -> Vec<&RelayUrl> {
|
||||
self.nip65
|
||||
.get(public_key)
|
||||
@@ -65,6 +69,7 @@ impl Gossip {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Get all messaging relays for a given public key
|
||||
pub fn messaging_relays(&self, public_key: &PublicKey) -> Vec<&RelayUrl> {
|
||||
self.nip17
|
||||
.get(public_key)
|
||||
@@ -72,17 +77,30 @@ impl Gossip {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub async fn get_nip65(&mut self, public_key: PublicKey) -> Result<(), Error> {
|
||||
/// Get and verify NIP-65 relays for a given public key
|
||||
///
|
||||
/// Only fetch from the public relays
|
||||
pub async fn get_nip65(&self, public_key: PublicKey) -> Result<(), Error> {
|
||||
let client = nostr_client();
|
||||
let timeout = Duration::from_secs(5);
|
||||
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
|
||||
|
||||
let filter = Filter::new()
|
||||
let latest_filter = Filter::new()
|
||||
.kind(Kind::RelayList)
|
||||
.author(public_key)
|
||||
.limit(1);
|
||||
|
||||
// Subscribe to events from the bootstrapping relays
|
||||
client
|
||||
.subscribe_to(BOOTSTRAP_RELAYS, latest_filter.clone(), Some(opts))
|
||||
.await?;
|
||||
|
||||
let filter = Filter::new()
|
||||
.kind(Kind::RelayList)
|
||||
.author(public_key)
|
||||
.since(Timestamp::now());
|
||||
|
||||
// Continuously subscribe for new events from the bootstrap relays
|
||||
client
|
||||
.subscribe_to(BOOTSTRAP_RELAYS, filter.clone(), Some(opts))
|
||||
.await?;
|
||||
@@ -91,7 +109,7 @@ impl Gossip {
|
||||
smol::spawn(async move {
|
||||
smol::Timer::after(timeout).await;
|
||||
|
||||
if client.database().count(filter).await.unwrap_or(0) < 1 {
|
||||
if client.database().count(latest_filter).await.unwrap_or(0) < 1 {
|
||||
app_state()
|
||||
.signal
|
||||
.send(SignalKind::GossipRelaysNotFound)
|
||||
@@ -103,16 +121,49 @@ impl Gossip {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_nip17(&mut self, public_key: PublicKey) -> Result<(), Error> {
|
||||
/// Set NIP-65 relays for a current user
|
||||
pub async fn set_nip65(
|
||||
&mut self,
|
||||
relays: &[(RelayUrl, Option<RelayMetadata>)],
|
||||
) -> Result<(), Error> {
|
||||
let client = nostr_client();
|
||||
let signer = client.signer().await?;
|
||||
|
||||
let tags: Vec<Tag> = relays
|
||||
.iter()
|
||||
.map(|(url, metadata)| Tag::relay_metadata(url.to_owned(), metadata.to_owned()))
|
||||
.collect();
|
||||
|
||||
let event = EventBuilder::new(Kind::RelayList, "")
|
||||
.tags(tags)
|
||||
.sign(&signer)
|
||||
.await?;
|
||||
|
||||
// Send event to the public relays
|
||||
client.send_event_to(BOOTSTRAP_RELAYS, &event).await?;
|
||||
|
||||
// Update gossip data
|
||||
for relay in relays {
|
||||
self.nip65
|
||||
.entry(event.pubkey)
|
||||
.or_default()
|
||||
.insert(relay.to_owned());
|
||||
}
|
||||
|
||||
// Get NIP-17 relays
|
||||
self.get_nip17(event.pubkey).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get and verify NIP-17 relays for a given public key
|
||||
///
|
||||
/// Only fetch from public key's write relays
|
||||
pub async fn get_nip17(&self, public_key: PublicKey) -> Result<(), Error> {
|
||||
let client = nostr_client();
|
||||
let timeout = Duration::from_secs(5);
|
||||
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
|
||||
|
||||
let filter = Filter::new()
|
||||
.kind(Kind::InboxRelays)
|
||||
.author(public_key)
|
||||
.limit(1);
|
||||
|
||||
let urls = self.write_relays(&public_key);
|
||||
|
||||
// Ensure user's have at least one write relay
|
||||
@@ -126,7 +177,22 @@ impl Gossip {
|
||||
client.connect_relay(url).await?;
|
||||
}
|
||||
|
||||
let latest_filter = Filter::new()
|
||||
.kind(Kind::InboxRelays)
|
||||
.author(public_key)
|
||||
.limit(1);
|
||||
|
||||
// Subscribe to events from the bootstrapping relays
|
||||
client
|
||||
.subscribe_to(urls.clone(), latest_filter.clone(), Some(opts))
|
||||
.await?;
|
||||
|
||||
let filter = Filter::new()
|
||||
.kind(Kind::InboxRelays)
|
||||
.author(public_key)
|
||||
.since(Timestamp::now());
|
||||
|
||||
// Continuously subscribe for new events from the bootstrap relays
|
||||
client
|
||||
.subscribe_to(urls, filter.clone(), Some(opts))
|
||||
.await?;
|
||||
@@ -135,7 +201,7 @@ impl Gossip {
|
||||
smol::spawn(async move {
|
||||
smol::Timer::after(timeout).await;
|
||||
|
||||
if client.database().count(filter).await.unwrap_or(0) < 1 {
|
||||
if client.database().count(latest_filter).await.unwrap_or(0) < 1 {
|
||||
app_state()
|
||||
.signal
|
||||
.send(SignalKind::MessagingRelaysNotFound)
|
||||
@@ -147,7 +213,51 @@ impl Gossip {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn subscribe(&mut self, public_key: PublicKey, kind: Kind) -> Result<(), Error> {
|
||||
/// Set NIP-17 relays for a current user
|
||||
pub async fn set_nip17(&mut self, relays: &[RelayUrl]) -> Result<(), Error> {
|
||||
let client = nostr_client();
|
||||
let signer = client.signer().await?;
|
||||
let public_key = signer.get_public_key().await?;
|
||||
|
||||
let urls = self.write_relays(&public_key);
|
||||
|
||||
// Ensure user's have at least one relay
|
||||
if urls.is_empty() {
|
||||
return Err(anyhow!("Relays are empty"));
|
||||
}
|
||||
|
||||
// Ensure connection to relays
|
||||
for url in urls.iter().cloned() {
|
||||
client.add_relay(url).await?;
|
||||
client.connect_relay(url).await?;
|
||||
}
|
||||
|
||||
let event = EventBuilder::new(Kind::InboxRelays, "")
|
||||
.tags(relays.iter().map(|relay| Tag::relay(relay.to_owned())))
|
||||
.sign(&signer)
|
||||
.await?;
|
||||
|
||||
// Send event to the public relays
|
||||
client.send_event_to(urls, &event).await?;
|
||||
|
||||
// Update gossip data
|
||||
for relay in relays {
|
||||
self.nip17
|
||||
.entry(event.pubkey)
|
||||
.or_default()
|
||||
.insert(relay.to_owned());
|
||||
}
|
||||
|
||||
// Run inbox monitor
|
||||
self.monitor_inbox(event.pubkey).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Subscribe for events that match the given kind for a given author
|
||||
///
|
||||
/// Only fetch from author's write relays
|
||||
pub async fn subscribe(&self, public_key: PublicKey, kind: Kind) -> Result<(), Error> {
|
||||
let client = nostr_client();
|
||||
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
|
||||
|
||||
@@ -171,7 +281,10 @@ impl Gossip {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn bulk_subscribe(&mut self, public_keys: HashSet<PublicKey>) -> Result<(), Error> {
|
||||
/// Bulk subscribe to metadata events for a list of public keys
|
||||
///
|
||||
/// Only fetch from the public relays
|
||||
pub async fn bulk_subscribe(&self, public_keys: HashSet<PublicKey>) -> Result<(), Error> {
|
||||
if public_keys.is_empty() {
|
||||
return Err(anyhow!("You need at least one public key"));
|
||||
}
|
||||
@@ -192,7 +305,7 @@ impl Gossip {
|
||||
}
|
||||
|
||||
/// Monitor all gift wrap events in the messaging relays for a given public key
|
||||
pub async fn monitor_inbox(&mut self, public_key: PublicKey) -> Result<(), Error> {
|
||||
pub async fn monitor_inbox(&self, public_key: PublicKey) -> Result<(), Error> {
|
||||
let client = nostr_client();
|
||||
let id = SubscriptionId::new("inbox");
|
||||
let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
|
||||
@@ -214,4 +327,27 @@ impl Gossip {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send an event to author's write relays
|
||||
pub async fn send_event_to_write_relays(&self, event: &Event) -> Result<(), Error> {
|
||||
let client = nostr_client();
|
||||
let public_key = event.pubkey;
|
||||
let urls = self.write_relays(&public_key);
|
||||
|
||||
// Ensure user's have at least one relay
|
||||
if urls.is_empty() {
|
||||
return Err(anyhow!("Relays are empty"));
|
||||
}
|
||||
|
||||
// Ensure connection to relays
|
||||
for url in urls.iter().cloned() {
|
||||
client.add_relay(url).await?;
|
||||
client.connect_relay(url).await?;
|
||||
}
|
||||
|
||||
// Send event to relays
|
||||
client.send_event(event).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::nostr_client;
|
||||
use crate::paths::support_dir;
|
||||
use crate::state::gossip::Gossip;
|
||||
|
||||
pub mod gossip;
|
||||
mod gossip;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct AuthRequest {
|
||||
@@ -267,14 +267,18 @@ impl AppState {
|
||||
|
||||
match event.kind {
|
||||
Kind::RelayList => {
|
||||
let mut gossip = self.gossip.write().await;
|
||||
let is_self_authored = Self::is_self_authored(&event).await;
|
||||
|
||||
// Update NIP-65 relays for event's public key
|
||||
gossip.insert(&event);
|
||||
{
|
||||
let mut gossip = self.gossip.write().await;
|
||||
gossip.insert(&event);
|
||||
}
|
||||
|
||||
let is_self_authored = Self::is_self_authored(&event).await;
|
||||
|
||||
// Get events if relay list belongs to current user
|
||||
if is_self_authored {
|
||||
let gossip = self.gossip.read().await;
|
||||
|
||||
// Fetch user's metadata event
|
||||
gossip.subscribe(event.pubkey, Kind::Metadata).await.ok();
|
||||
|
||||
@@ -286,16 +290,19 @@ impl AppState {
|
||||
}
|
||||
}
|
||||
Kind::InboxRelays => {
|
||||
let mut gossip = self.gossip.write().await;
|
||||
let is_self_authored = Self::is_self_authored(&event).await;
|
||||
|
||||
// Update NIP-17 relays for event's public key
|
||||
gossip.insert(&event);
|
||||
{
|
||||
let mut gossip = self.gossip.write().await;
|
||||
gossip.insert(&event);
|
||||
}
|
||||
|
||||
let is_self_authored = Self::is_self_authored(&event).await;
|
||||
|
||||
// Subscribe to gift wrap events if messaging relays belong to the current user
|
||||
if is_self_authored {
|
||||
if let Err(e) = gossip.monitor_inbox(event.pubkey).await {
|
||||
log::error!("Error: {e}");
|
||||
let gossip = self.gossip.read().await;
|
||||
|
||||
if gossip.monitor_inbox(event.pubkey).await.is_err() {
|
||||
self.signal.send(SignalKind::MessagingRelaysNotFound).await;
|
||||
}
|
||||
}
|
||||
@@ -304,11 +311,15 @@ impl AppState {
|
||||
let is_self_authored = Self::is_self_authored(&event).await;
|
||||
|
||||
if is_self_authored {
|
||||
let mut gossip = self.gossip.write().await;
|
||||
let public_keys: HashSet<PublicKey> =
|
||||
event.tags.public_keys().copied().collect();
|
||||
|
||||
gossip.bulk_subscribe(public_keys).await.ok();
|
||||
self.gossip
|
||||
.read()
|
||||
.await
|
||||
.bulk_subscribe(public_keys)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
Kind::Metadata => {
|
||||
@@ -395,16 +406,16 @@ impl AppState {
|
||||
|
||||
// Process the batch if it's full
|
||||
if batch.len() >= METADATA_BATCH_LIMIT {
|
||||
let mut gossip = self.gossip.write().await;
|
||||
let gossip = self.gossip.read().await;
|
||||
gossip.bulk_subscribe(std::mem::take(&mut batch)).await.ok();
|
||||
}
|
||||
}
|
||||
BatchEvent::Timeout => {
|
||||
let mut gossip = self.gossip.write().await;
|
||||
let gossip = self.gossip.read().await;
|
||||
gossip.bulk_subscribe(std::mem::take(&mut batch)).await.ok();
|
||||
}
|
||||
BatchEvent::Closed => {
|
||||
let mut gossip = self.gossip.write().await;
|
||||
let gossip = self.gossip.read().await;
|
||||
gossip.bulk_subscribe(std::mem::take(&mut batch)).await.ok();
|
||||
|
||||
// Exit the current loop
|
||||
|
||||
Reference in New Issue
Block a user