feat: custom gossip implementation (#181)

* .

* rename global to app_state

* refactor event tracker

* gossip

* .

* .
This commit is contained in:
reya
2025-10-10 17:36:38 +07:00
committed by GitHub
parent b7693444e6
commit 68a8ec7a69
34 changed files with 1020 additions and 913 deletions

View File

@@ -1,5 +1,5 @@
[package]
name = "global"
name = "app_state"
version.workspace = true
edition.workspace = true
publish.workspace = true

View File

@@ -0,0 +1,44 @@
use std::sync::OnceLock;
use std::time::Duration;
use nostr_lmdb::NostrLMDB;
use nostr_sdk::prelude::*;
use paths::nostr_file;
use crate::state::AppState;
pub mod constants;
pub mod paths;
pub mod state;
static APP_STATE: OnceLock<AppState> = OnceLock::new();
static NOSTR_CLIENT: OnceLock<Client> = OnceLock::new();
/// Initialize the application state.
pub fn app_state() -> &'static AppState {
APP_STATE.get_or_init(AppState::new)
}
/// Initialize the nostr client.
pub fn nostr_client() -> &'static Client {
NOSTR_CLIENT.get_or_init(|| {
// rustls uses the `aws_lc_rs` provider by default
// This only errors if the default provider has already
// been installed. We can ignore this `Result`.
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.ok();
let lmdb = NostrLMDB::open(nostr_file()).expect("Database is NOT initialized");
let opts = ClientOptions::new()
.gossip(false)
.automatic_authentication(false)
.verify_subscriptions(false)
.sleep_when_idle(SleepWhenIdle::Enabled {
timeout: Duration::from_secs(600),
});
ClientBuilder::default().database(lmdb).opts(opts).build()
})
}

View File

@@ -0,0 +1,217 @@
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use anyhow::{anyhow, Error};
use nostr_sdk::prelude::*;
use crate::constants::BOOTSTRAP_RELAYS;
use crate::state::SignalKind;
use crate::{app_state, nostr_client};
#[derive(Debug, Clone, Default)]
pub struct Gossip {
pub nip17: HashMap<PublicKey, HashSet<RelayUrl>>,
pub nip65: HashMap<PublicKey, HashSet<(RelayUrl, Option<RelayMetadata>)>>,
}
impl Gossip {
pub fn insert(&mut self, event: &Event) {
match event.kind {
Kind::InboxRelays => {
let urls: Vec<RelayUrl> = nip17::extract_relay_list(event).cloned().collect();
if !urls.is_empty() {
self.nip17.entry(event.pubkey).or_default().extend(urls);
}
}
Kind::RelayList => {
let urls: Vec<(RelayUrl, Option<RelayMetadata>)> = nip65::extract_relay_list(event)
.map(|(url, metadata)| (url.to_owned(), metadata.to_owned()))
.collect();
if !urls.is_empty() {
self.nip65.entry(event.pubkey).or_default().extend(urls);
}
}
_ => {}
}
}
pub fn write_relays(&self, public_key: &PublicKey) -> Vec<&RelayUrl> {
self.nip65
.get(public_key)
.map(|relays| {
relays
.iter()
.filter(|(_, metadata)| metadata.as_ref() != Some(&RelayMetadata::Write))
.map(|(url, _)| url)
.take(3)
.collect()
})
.unwrap_or_default()
}
pub fn read_relays(&self, public_key: &PublicKey) -> Vec<&RelayUrl> {
self.nip65
.get(public_key)
.map(|relays| {
relays
.iter()
.filter(|(_, metadata)| metadata.as_ref() != Some(&RelayMetadata::Read))
.map(|(url, _)| url)
.take(3)
.collect()
})
.unwrap_or_default()
}
pub fn messaging_relays(&self, public_key: &PublicKey) -> Vec<&RelayUrl> {
self.nip17
.get(public_key)
.map(|relays| relays.iter().collect())
.unwrap_or_default()
}
pub async fn get_nip65(&mut self, public_key: PublicKey) -> Result<(), Error> {
let client = nostr_client();
let timeout = Duration::from_secs(5);
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let filter = Filter::new()
.kind(Kind::RelayList)
.author(public_key)
.limit(1);
// Subscribe to events from the bootstrapping relays
client
.subscribe_to(BOOTSTRAP_RELAYS, filter.clone(), Some(opts))
.await?;
// Verify the received data after a timeout
smol::spawn(async move {
smol::Timer::after(timeout).await;
if client.database().count(filter).await.unwrap_or(0) < 1 {
app_state()
.signal
.send(SignalKind::GossipRelaysNotFound)
.await;
}
})
.detach();
Ok(())
}
pub async fn get_nip17(&mut self, public_key: PublicKey) -> Result<(), Error> {
let client = nostr_client();
let timeout = Duration::from_secs(5);
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
let urls = self.write_relays(&public_key);
// Ensure user's have at least one write relay
if urls.is_empty() {
return Err(anyhow!("NIP-17 relays are empty"));
}
// Ensure connection to relays
for url in urls.iter().cloned() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
// Subscribe to events from the bootstrapping relays
client
.subscribe_to(urls, filter.clone(), Some(opts))
.await?;
// Verify the received data after a timeout
smol::spawn(async move {
smol::Timer::after(timeout).await;
if client.database().count(filter).await.unwrap_or(0) < 1 {
app_state()
.signal
.send(SignalKind::MessagingRelaysNotFound)
.await;
}
})
.detach();
Ok(())
}
pub async fn subscribe(&mut self, public_key: PublicKey, kind: Kind) -> Result<(), Error> {
let client = nostr_client();
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let filter = Filter::new().author(public_key).kind(kind).limit(1);
let urls = self.write_relays(&public_key);
// Ensure user's have at least one write relay
if urls.is_empty() {
return Err(anyhow!("NIP-65 relays are empty"));
}
// Ensure connection to relays
for url in urls.iter().cloned() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
// Subscribe to filters to user's write relays
client.subscribe_to(urls, filter, Some(opts)).await?;
Ok(())
}
pub async fn bulk_subscribe(&mut self, public_keys: HashSet<PublicKey>) -> Result<(), Error> {
if public_keys.is_empty() {
return Err(anyhow!("You need at least one public key"));
}
let client = nostr_client();
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let kinds = vec![Kind::Metadata, Kind::ContactList, Kind::RelayList];
let limit = public_keys.len() * kinds.len() + 20;
let filter = Filter::new().authors(public_keys).kinds(kinds).limit(limit);
let urls = BOOTSTRAP_RELAYS;
// Subscribe to filters to the bootstrap relays
client.subscribe_to(urls, filter, Some(opts)).await?;
Ok(())
}
/// Monitor all gift wrap events in the messaging relays for a given public key
pub async fn monitor_inbox(&mut self, public_key: PublicKey) -> Result<(), Error> {
let client = nostr_client();
let id = SubscriptionId::new("inbox");
let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
let urls = self.messaging_relays(&public_key);
// Ensure user's have at least one messaging relay
if urls.is_empty() {
return Err(anyhow!("Messaging relays are empty"));
}
// Ensure connection to relays
for url in urls.iter().cloned() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
// Subscribe to filters to user's messaging relays
client.subscribe_with_id_to(urls, id, filter, None).await?;
Ok(())
}
}

View File

@@ -0,0 +1,516 @@
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use anyhow::{anyhow, Error};
use flume::{Receiver, Sender};
use nostr_sdk::prelude::*;
use smol::lock::RwLock;
use crate::constants::{
BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT, METADATA_BATCH_TIMEOUT, SEARCH_RELAYS,
};
use crate::nostr_client;
use crate::paths::support_dir;
use crate::state::gossip::Gossip;
pub mod gossip;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct AuthRequest {
pub url: RelayUrl,
pub challenge: String,
pub sending: bool,
}
impl AuthRequest {
pub fn new(challenge: impl Into<String>, url: RelayUrl) -> Self {
Self {
challenge: challenge.into(),
sending: false,
url,
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
pub enum UnwrappingStatus {
#[default]
Initialized,
Processing,
Complete,
}
/// Signals sent through the global event channel to notify UI
#[derive(Debug)]
pub enum SignalKind {
/// A signal to notify UI that the client's signer has been set
SignerSet(PublicKey),
/// A signal to notify UI that the client's signer has been unset
SignerUnset,
/// A signal to notify UI that the relay requires authentication
Auth(AuthRequest),
/// A signal to notify UI that the browser proxy service is down
ProxyDown,
/// A signal to notify UI that a new profile has been received
NewProfile(Profile),
/// A signal to notify UI that a new gift wrap event has been received
NewMessage((EventId, Event)),
/// A signal to notify UI that no messaging relays for current user was found
MessagingRelaysNotFound,
/// A signal to notify UI that no gossip relays for current user was found
GossipRelaysNotFound,
/// A signal to notify UI that gift wrap status has changed
GiftWrapStatus(UnwrappingStatus),
}
#[derive(Debug)]
pub struct Signal {
rx: Receiver<SignalKind>,
tx: Sender<SignalKind>,
}
impl Default for Signal {
fn default() -> Self {
Self::new()
}
}
impl Signal {
pub fn new() -> Self {
let (tx, rx) = flume::bounded::<SignalKind>(2048);
Self { rx, tx }
}
pub fn receiver(&self) -> &Receiver<SignalKind> {
&self.rx
}
pub async fn send(&self, kind: SignalKind) {
if let Err(e) = self.tx.send_async(kind).await {
log::error!("Failed to send signal: {e}");
}
}
}
#[derive(Debug)]
pub struct Ingester {
rx: Receiver<PublicKey>,
tx: Sender<PublicKey>,
}
impl Default for Ingester {
fn default() -> Self {
Self::new()
}
}
impl Ingester {
pub fn new() -> Self {
let (tx, rx) = flume::bounded::<PublicKey>(1024);
Self { rx, tx }
}
pub fn receiver(&self) -> &Receiver<PublicKey> {
&self.rx
}
pub async fn send(&self, public_key: PublicKey) {
if let Err(e) = self.tx.send_async(public_key).await {
log::error!("Failed to send public key: {e}");
}
}
}
#[derive(Debug, Clone, Default)]
pub struct EventTracker {
/// Tracking events that have been resent by Coop in the current session
pub resent_ids: Vec<Output<EventId>>,
/// Temporarily store events that need to be resent later
pub resend_queue: HashMap<EventId, RelayUrl>,
/// Tracking events sent by Coop in the current session
pub sent_ids: HashSet<EventId>,
/// Tracking events seen on which relays in the current session
pub seen_on_relays: HashMap<EventId, HashSet<RelayUrl>>,
}
impl EventTracker {
pub fn resent_ids(&self) -> &Vec<Output<EventId>> {
&self.resent_ids
}
pub fn resend_queue(&self) -> &HashMap<EventId, RelayUrl> {
&self.resend_queue
}
pub fn sent_ids(&self) -> &HashSet<EventId> {
&self.sent_ids
}
pub fn seen_on_relays(&self) -> &HashMap<EventId, HashSet<RelayUrl>> {
&self.seen_on_relays
}
}
/// A simple storage to store all states that using across the application.
#[derive(Debug)]
pub struct AppState {
/// The timestamp when the application was initialized.
pub initialized_at: Timestamp,
/// Whether this is the first run of the application.
pub is_first_run: AtomicBool,
/// Whether gift wrap processing is in progress.
pub gift_wrap_processing: AtomicBool,
/// Subscription ID for listening to gift wrap events from relays.
pub gift_wrap_sub_id: SubscriptionId,
/// Auto-close options for relay subscriptions
pub auto_close_opts: Option<SubscribeAutoCloseOptions>,
/// NIP-65: https://github.com/nostr-protocol/nips/blob/master/65.md
pub gossip: RwLock<Gossip>,
/// Tracks activity related to Nostr events
pub event_tracker: RwLock<EventTracker>,
/// Signal channel for communication between Nostr and GPUI
pub signal: Signal,
/// Ingester channel for processing public keys
pub ingester: Ingester,
}
impl Default for AppState {
fn default() -> Self {
Self::new()
}
}
impl AppState {
pub fn new() -> Self {
let first_run = Self::first_run();
let initialized_at = Timestamp::now();
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let signal = Signal::default();
let ingester = Ingester::default();
Self {
initialized_at,
signal,
ingester,
is_first_run: AtomicBool::new(first_run),
gift_wrap_sub_id: SubscriptionId::new("inbox"),
gift_wrap_processing: AtomicBool::new(false),
auto_close_opts: Some(opts),
gossip: RwLock::new(Gossip::default()),
event_tracker: RwLock::new(EventTracker::default()),
}
}
pub async fn handle_notifications(&self) -> Result<(), Error> {
let client = nostr_client();
// Get all bootstrapping relays
let mut urls = vec![];
urls.extend(BOOTSTRAP_RELAYS);
urls.extend(SEARCH_RELAYS);
// Add relay to the relay pool
for url in urls.into_iter() {
client.add_relay(url).await?;
}
// Establish connection to relays
client.connect().await;
let mut processed_events: HashSet<EventId> = HashSet::new();
let mut challenges: HashSet<Cow<'_, str>> = HashSet::new();
let mut notifications = client.notifications();
while let Ok(notification) = notifications.recv().await {
let RelayPoolNotification::Message { message, relay_url } = notification else {
continue;
};
match message {
RelayMessage::Event { event, .. } => {
// Keep track of which relays have seen this event
{
let mut event_tracker = self.event_tracker.write().await;
event_tracker
.seen_on_relays
.entry(event.id)
.or_default()
.insert(relay_url);
}
// Skip events that have already been processed
if !processed_events.insert(event.id) {
continue;
}
match event.kind {
Kind::RelayList => {
let mut gossip = self.gossip.write().await;
let is_self_authored = Self::is_self_authored(&event).await;
// Update NIP-65 relays for event's public key
gossip.insert(&event);
// Get events if relay list belongs to current user
if is_self_authored {
// Fetch user's metadata event
gossip.subscribe(event.pubkey, Kind::Metadata).await.ok();
// Fetch user's contact list event
gossip.subscribe(event.pubkey, Kind::ContactList).await.ok();
// Fetch user's messaging relays event
gossip.get_nip17(event.pubkey).await.ok();
}
}
Kind::InboxRelays => {
let mut gossip = self.gossip.write().await;
let is_self_authored = Self::is_self_authored(&event).await;
// Update NIP-17 relays for event's public key
gossip.insert(&event);
// Subscribe to gift wrap events if messaging relays belong to the current user
if is_self_authored {
if let Err(e) = gossip.monitor_inbox(event.pubkey).await {
log::error!("Error: {e}");
self.signal.send(SignalKind::MessagingRelaysNotFound).await;
}
}
}
Kind::ContactList => {
let is_self_authored = Self::is_self_authored(&event).await;
if is_self_authored {
let mut gossip = self.gossip.write().await;
let public_keys: HashSet<PublicKey> =
event.tags.public_keys().copied().collect();
gossip.bulk_subscribe(public_keys).await.ok();
}
}
Kind::Metadata => {
let metadata = Metadata::from_json(&event.content).unwrap_or_default();
let profile = Profile::new(event.pubkey, metadata);
self.signal.send(SignalKind::NewProfile(profile)).await;
}
Kind::GiftWrap => {
self.extract_rumor(&event).await;
}
_ => {}
}
}
RelayMessage::EndOfStoredEvents(subscription_id) => {
if *subscription_id == self.gift_wrap_sub_id {
self.signal
.send(SignalKind::GiftWrapStatus(UnwrappingStatus::Processing))
.await;
}
}
RelayMessage::Auth { challenge } => {
if challenges.insert(challenge.clone()) {
// Send a signal to the ingester to handle the auth request
self.signal
.send(SignalKind::Auth(AuthRequest::new(challenge, relay_url)))
.await;
}
}
RelayMessage::Ok {
event_id, message, ..
} => {
let msg = MachineReadablePrefix::parse(&message);
let mut event_tracker = self.event_tracker.write().await;
// Keep track of events sent by Coop
event_tracker.sent_ids.insert(event_id);
// Keep track of events that need to be resend after auth
if let Some(MachineReadablePrefix::AuthRequired) = msg {
event_tracker.resend_queue.insert(event_id, relay_url);
}
}
_ => {}
}
}
Ok(())
}
pub async fn handle_metadata_batching(&self) {
let timeout = Duration::from_millis(METADATA_BATCH_TIMEOUT);
let mut processed_pubkeys: HashSet<PublicKey> = HashSet::new();
let mut batch: HashSet<PublicKey> = HashSet::new();
/// Internal events for the metadata batching system
enum BatchEvent {
PublicKey(PublicKey),
Timeout,
Closed,
}
loop {
let futs = smol::future::or(
async move {
if let Ok(public_key) = self.ingester.receiver().recv_async().await {
BatchEvent::PublicKey(public_key)
} else {
BatchEvent::Closed
}
},
async move {
smol::Timer::after(timeout).await;
BatchEvent::Timeout
},
);
match futs.await {
BatchEvent::PublicKey(public_key) => {
// Prevent duplicate keys from being processed
if processed_pubkeys.insert(public_key) {
batch.insert(public_key);
}
// Process the batch if it's full
if batch.len() >= METADATA_BATCH_LIMIT {
let mut gossip = self.gossip.write().await;
gossip.bulk_subscribe(std::mem::take(&mut batch)).await.ok();
}
}
BatchEvent::Timeout => {
let mut gossip = self.gossip.write().await;
gossip.bulk_subscribe(std::mem::take(&mut batch)).await.ok();
}
BatchEvent::Closed => {
let mut gossip = self.gossip.write().await;
gossip.bulk_subscribe(std::mem::take(&mut batch)).await.ok();
// Exit the current loop
break;
}
}
}
}
async fn is_self_authored(event: &Event) -> bool {
let client = nostr_client();
let Ok(signer) = client.signer().await else {
return false;
};
let Ok(public_key) = signer.get_public_key().await else {
return false;
};
public_key == event.pubkey
}
/// Stores an unwrapped event in local database with reference to original
async fn set_rumor(&self, id: EventId, rumor: &Event) -> Result<(), Error> {
let client = nostr_client();
// Save unwrapped event
client.database().save_event(rumor).await?;
// Create a reference event pointing to the unwrapped event
let event = EventBuilder::new(Kind::ApplicationSpecificData, "")
.tags(vec![Tag::identifier(id), Tag::event(rumor.id)])
.sign(&Keys::generate())
.await?;
// Save reference event
client.database().save_event(&event).await?;
Ok(())
}
/// Retrieves a previously unwrapped event from local database
async fn get_rumor(&self, id: EventId) -> Result<Event, Error> {
let client = nostr_client();
let filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.identifier(id)
.limit(1);
if let Some(event) = client.database().query(filter).await?.first_owned() {
let target_id = event.tags.event_ids().collect::<Vec<_>>()[0];
if let Some(event) = client.database().event_by_id(target_id).await? {
Ok(event)
} else {
Err(anyhow!("Event not found."))
}
} else {
Err(anyhow!("Event is not cached yet."))
}
}
// Unwraps a gift-wrapped event and processes its contents.
async fn extract_rumor(&self, gift_wrap: &Event) {
let client = nostr_client();
let mut rumor: Option<Event> = None;
if let Ok(event) = self.get_rumor(gift_wrap.id).await {
rumor = Some(event);
} else if let Ok(unwrapped) = client.unwrap_gift_wrap(gift_wrap).await {
// Sign the unwrapped event with a RANDOM KEYS
if let Ok(event) = unwrapped.rumor.sign_with_keys(&Keys::generate()) {
// Save this event to the database for future use.
if let Err(e) = self.set_rumor(gift_wrap.id, &event).await {
log::warn!("Failed to cache unwrapped event: {e}")
}
rumor = Some(event);
}
}
if let Some(event) = rumor {
// Send all pubkeys to the metadata batch to sync data
for public_key in event.tags.public_keys().copied() {
self.ingester.send(public_key).await;
}
match event.created_at >= self.initialized_at {
// New message: send a signal to notify the UI
true => {
self.signal
.send(SignalKind::NewMessage((gift_wrap.id, event)))
.await;
}
// Old message: Coop is probably processing the user's messages during initial load
false => {
self.gift_wrap_processing.store(true, Ordering::Release);
}
}
}
}
fn first_run() -> bool {
let flag = support_dir().join(".first_run");
!flag.exists() && std::fs::write(&flag, "").is_ok()
}
}

View File

@@ -6,7 +6,7 @@ publish.workspace = true
[dependencies]
common = { path = "../common" }
global = { path = "../global" }
app_state = { path = "../app_state" }
gpui.workspace = true
nostr-sdk.workspace = true

View File

@@ -1,7 +1,7 @@
use anyhow::Error;
use app_state::constants::{APP_PUBKEY, APP_UPDATER_ENDPOINT};
use cargo_packager_updater::semver::Version;
use cargo_packager_updater::{check_update, Config, Update};
use global::constants::{APP_PUBKEY, APP_UPDATER_ENDPOINT};
use gpui::http_client::Url;
use gpui::{App, AppContext, Context, Entity, Global, Subscription, Task, Window};
use smallvec::{smallvec, SmallVec};

View File

@@ -5,7 +5,7 @@ edition.workspace = true
publish.workspace = true
[dependencies]
global = { path = "../global" }
app_state = { path = "../app_state" }
nostr-sdk.workspace = true
gpui.workspace = true

View File

@@ -1,7 +1,7 @@
use std::sync::atomic::Ordering;
use global::app_state;
use global::constants::KEYRING_URL;
use app_state::app_state;
use app_state::constants::KEYRING_URL;
use gpui::{App, AppContext, Context, Entity, Global, Subscription, Window};
use nostr_sdk::prelude::*;
use smallvec::{smallvec, SmallVec};

View File

@@ -5,7 +5,7 @@ edition.workspace = true
publish.workspace = true
[dependencies]
global = { path = "../global" }
app_state = { path = "../app_state" }
gpui.workspace = true
nostr-connect.workspace = true

View File

@@ -1,8 +1,8 @@
use std::sync::Arc;
use anyhow::{anyhow, Error};
use app_state::constants::IMAGE_RESIZE_SERVICE;
use chrono::{Local, TimeZone};
use global::constants::IMAGE_RESIZE_SERVICE;
use gpui::{Image, ImageFormat, SharedString, SharedUri};
use nostr_sdk::prelude::*;
use qrcode::render::svg;

View File

@@ -32,7 +32,7 @@ ui = { path = "../ui" }
title_bar = { path = "../title_bar" }
theme = { path = "../theme" }
common = { path = "../common" }
global = { path = "../global" }
app_state = { path = "../app_state" }
registry = { path = "../registry" }
settings = { path = "../settings" }
client_keys = { path = "../client_keys" }

View File

@@ -5,15 +5,13 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Error};
use app_state::constants::{ACCOUNT_IDENTIFIER, BOOTSTRAP_RELAYS, DEFAULT_SIDEBAR_WIDTH};
use app_state::state::{AuthRequest, SignalKind, UnwrappingStatus};
use app_state::{app_state, nostr_client};
use auto_update::AutoUpdater;
use client_keys::ClientKeys;
use common::display::RenderedProfile;
use common::event::EventUtils;
use global::constants::{
ACCOUNT_IDENTIFIER, BOOTSTRAP_RELAYS, DEFAULT_SIDEBAR_WIDTH, METADATA_BATCH_LIMIT,
METADATA_BATCH_TIMEOUT, SEARCH_RELAYS,
};
use global::{app_state, nostr_client, AuthRequest, Notice, SignalKind, UnwrappingStatus};
use gpui::prelude::FluentBuilder;
use gpui::{
deferred, div, px, rems, App, AppContext, AsyncWindowContext, Axis, ClipboardItem, Context,
@@ -139,23 +137,43 @@ impl ChatSpace {
);
subscriptions.push(
// Subscribe to open chat room requests
cx.subscribe_in(&registry, window, move |this, _, event, window, cx| {
this.process_registry_event(event, window, cx);
// Handle registry events
cx.subscribe_in(&registry, window, move |this, _, ev, window, cx| {
match ev {
RegistryEvent::Open(room) => {
if let Some(room) = room.upgrade() {
this.dock.update(cx, |this, cx| {
let panel = chat::init(room, window, cx);
this.add_panel(Arc::new(panel), DockPlacement::Center, window, cx);
});
}
}
RegistryEvent::Close(..) => {
this.dock.update(cx, |this, cx| {
this.focus_tab_panel(window, cx);
cx.defer_in(window, |_, window, cx| {
window.dispatch_action(Box::new(ClosePanel), cx);
window.close_all_modals(cx);
});
});
}
_ => {}
};
}),
);
tasks.push(
// Connect to the bootstrap relays
// Then handle nostr events in the background
// Handle nostr events in the background
cx.background_spawn(async move {
Self::connect()
.await
.expect("Failed connect the bootstrap relays. Please restart the application.");
app_state().handle_notifications().await.ok();
}),
);
Self::process_nostr_events()
.await
.expect("Failed to handle nostr events. Please restart the application.");
tasks.push(
// Listen all metadata requests then batch them into single subscription
cx.background_spawn(async move {
app_state().handle_metadata_batching().await;
}),
);
@@ -174,17 +192,10 @@ impl ChatSpace {
}),
);
tasks.push(
// Listen all metadata requests then batch them into single subscription
cx.background_spawn(async move {
Self::process_batching_metadata().await;
}),
);
tasks.push(
// Continuously handle signals from the Nostr channel
cx.spawn_in(window, async move |this, cx| {
Self::process_nostr_signals(this, cx).await
Self::handle_signals(this, cx).await
}),
);
@@ -198,100 +209,26 @@ impl ChatSpace {
}
}
async fn connect() -> Result<(), Error> {
let client = nostr_client();
for relay in BOOTSTRAP_RELAYS.into_iter() {
client.add_relay(relay).await?;
}
log::info!("Connected to bootstrap relays");
for relay in SEARCH_RELAYS.into_iter() {
client.add_relay(relay).await?;
}
log::info!("Connected to search relays");
// Establish connection to relays
client.connect().await;
Ok(())
}
async fn observe_signer() {
let client = nostr_client();
let app_state = app_state();
let stream_timeout = Duration::from_secs(5);
let loop_duration = Duration::from_secs(1);
let loop_duration = Duration::from_millis(800);
loop {
let Ok(signer) = client.signer().await else {
smol::Timer::after(loop_duration).await;
continue;
};
if let Ok(signer) = client.signer().await {
if let Ok(pk) = signer.get_public_key().await {
// Notify the app that the signer has been set
app_state.signal.send(SignalKind::SignerSet(pk)).await;
let Ok(public_key) = signer.get_public_key().await else {
smol::Timer::after(loop_duration).await;
continue;
};
// Get user's gossip relays
app_state.gossip.write().await.get_nip65(pk).await.ok();
// Notify the app that the signer has been set.
app_state
.signal
.send(SignalKind::SignerSet(public_key))
.await;
// Subscribe to the NIP-65 relays for the public key.
let filter = Filter::new()
.kind(Kind::RelayList)
.author(public_key)
.limit(1);
let mut nip65_found = false;
match client
.stream_events_from(BOOTSTRAP_RELAYS, filter, stream_timeout)
.await
{
Ok(mut stream) => {
if stream.next().await.is_some() {
nip65_found = true;
} else {
// Timeout
app_state.signal.send(SignalKind::RelaysNotFound).await;
}
// Exit the current loop
break;
}
Err(e) => {
log::error!("Error fetching NIP-65 Relay: {e:?}");
app_state.signal.send(SignalKind::RelaysNotFound).await;
}
};
if nip65_found {
// Subscribe to the NIP-17 relays for the public key.
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
match client.stream_events(filter, stream_timeout).await {
Ok(mut stream) => {
if stream.next().await.is_some() {
break;
} else {
// Timeout
app_state.signal.send(SignalKind::RelaysNotFound).await;
}
}
Err(e) => {
log::error!("Error fetching NIP-17 Relay: {e:?}");
app_state.signal.send(SignalKind::RelaysNotFound).await;
}
};
}
break;
smol::Timer::after(loop_duration).await;
}
}
@@ -337,189 +274,7 @@ impl ChatSpace {
}
}
async fn process_batching_metadata() {
let app_state = app_state();
let timeout = Duration::from_millis(METADATA_BATCH_TIMEOUT);
let mut processed_pubkeys: HashSet<PublicKey> = HashSet::new();
let mut batch: HashSet<PublicKey> = HashSet::new();
/// Internal events for the metadata batching system
enum BatchEvent {
PublicKey(PublicKey),
Timeout,
Closed,
}
loop {
let futs = smol::future::or(
async move {
if let Ok(public_key) = app_state.ingester.receiver().recv_async().await {
BatchEvent::PublicKey(public_key)
} else {
BatchEvent::Closed
}
},
async move {
smol::Timer::after(timeout).await;
BatchEvent::Timeout
},
);
match futs.await {
BatchEvent::PublicKey(public_key) => {
// Prevent duplicate keys from being processed
if processed_pubkeys.insert(public_key) {
batch.insert(public_key);
}
// Process the batch if it's full
if batch.len() >= METADATA_BATCH_LIMIT {
Self::fetch_metadata_for_pubkeys(std::mem::take(&mut batch)).await;
}
}
BatchEvent::Timeout => {
Self::fetch_metadata_for_pubkeys(std::mem::take(&mut batch)).await;
}
BatchEvent::Closed => {
Self::fetch_metadata_for_pubkeys(std::mem::take(&mut batch)).await;
break;
}
}
}
}
async fn process_nostr_events() -> Result<(), Error> {
let client = nostr_client();
let app_state = app_state();
let mut processed_events: HashSet<EventId> = HashSet::new();
let mut challenges: HashSet<Cow<'_, str>> = HashSet::new();
let mut notifications = client.notifications();
while let Ok(notification) = notifications.recv().await {
let RelayPoolNotification::Message { message, relay_url } = notification else {
continue;
};
match message {
RelayMessage::Event { event, .. } => {
// Keep track of which relays have seen this event
app_state
.seen_on_relays
.write()
.await
.entry(event.id)
.or_insert_with(HashSet::new)
.insert(relay_url);
// Skip events that have already been processed
if !processed_events.insert(event.id) {
continue;
}
match event.kind {
Kind::RelayList => {
if let Ok(true) = Self::is_self_event(&event).await {
// Fetch user's metadata event
Self::fetch_single_event(Kind::Metadata, event.pubkey).await;
// Fetch user's contact list event
Self::fetch_single_event(Kind::ContactList, event.pubkey).await;
}
}
Kind::InboxRelays => {
if let Ok(true) = Self::is_self_event(&event).await {
let relays = nip17::extract_relay_list(&event).collect_vec();
if !relays.is_empty() {
for relay in relays.clone().into_iter() {
if client.add_relay(relay).await.is_err() {
let notice = Notice::RelayFailed(relay.clone());
app_state.signal.send(SignalKind::Notice(notice)).await;
}
if client.connect_relay(relay).await.is_err() {
let notice = Notice::RelayFailed(relay.clone());
app_state.signal.send(SignalKind::Notice(notice)).await;
}
}
// Subscribe to gift wrap events only in the current user's NIP-17 relays
Self::fetch_gift_wrap(relays, event.pubkey).await;
} else {
app_state.signal.send(SignalKind::RelaysNotFound).await;
}
}
}
Kind::ContactList => {
if let Ok(true) = Self::is_self_event(&event).await {
let public_keys = event.tags.public_keys().copied().collect_vec();
let kinds = vec![Kind::Metadata, Kind::ContactList];
let limit = public_keys.len() * kinds.len();
let filter =
Filter::new().limit(limit).authors(public_keys).kinds(kinds);
client
.subscribe_to(
BOOTSTRAP_RELAYS,
filter,
app_state.auto_close_opts,
)
.await
.ok();
}
}
Kind::Metadata => {
let metadata = Metadata::from_json(&event.content).unwrap_or_default();
let profile = Profile::new(event.pubkey, metadata);
app_state.signal.send(SignalKind::NewProfile(profile)).await;
}
Kind::GiftWrap => {
Self::unwrap_gift_wrap(&event).await;
}
_ => {}
}
}
RelayMessage::EndOfStoredEvents(subscription_id) => {
if *subscription_id == app_state.gift_wrap_sub_id {
let signal = SignalKind::GiftWrapStatus(UnwrappingStatus::Processing);
app_state.signal.send(signal).await;
}
}
RelayMessage::Auth { challenge } => {
if challenges.insert(challenge.clone()) {
let req = AuthRequest::new(challenge, relay_url);
// Send a signal to the ingester to handle the auth request
app_state.signal.send(SignalKind::Auth(req)).await;
}
}
RelayMessage::Ok {
event_id, message, ..
} => {
// Keep track of events sent by Coop
app_state.sent_ids.write().await.insert(event_id);
// Keep track of events that need to be resent
match MachineReadablePrefix::parse(&message) {
Some(MachineReadablePrefix::AuthRequired) => {
app_state
.resend_queue
.write()
.await
.insert(event_id, relay_url);
}
Some(_) => {}
None => {}
}
}
_ => {}
}
}
Ok(())
}
async fn process_nostr_signals(view: WeakEntity<ChatSpace>, cx: &mut AsyncWindowContext) {
async fn handle_signals(view: WeakEntity<ChatSpace>, cx: &mut AsyncWindowContext) {
let app_state = app_state();
let mut is_open_proxy_modal = false;
@@ -604,14 +359,17 @@ impl ChatSpace {
this.event_to_message(gift_wrap_id, event, window, cx);
});
}
SignalKind::RelaysNotFound => {
SignalKind::GossipRelaysNotFound => {
view.update(cx, |this, cx| {
this.set_required_relays(cx);
})
.ok();
}
SignalKind::Notice(msg) => {
window.push_notification(msg.as_str(), cx);
SignalKind::MessagingRelaysNotFound => {
view.update(cx, |this, cx| {
this.set_required_relays(cx);
})
.ok();
}
};
})
@@ -619,177 +377,6 @@ impl ChatSpace {
}
}
/// Checks if an event is belong to the current user
async fn is_self_event(event: &Event) -> Result<bool, Error> {
let client = nostr_client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
Ok(public_key == event.pubkey)
}
/// Fetches a single event by kind and public key
pub async fn fetch_single_event(kind: Kind, public_key: PublicKey) {
let client = nostr_client();
let app_state = app_state();
let filter = Filter::new().kind(kind).author(public_key).limit(1);
if let Err(e) = client.subscribe(filter, app_state.auto_close_opts).await {
log::info!("Failed to subscribe: {e}");
}
}
/// Fetches gift wrap events for a given public key and relays
pub async fn fetch_gift_wrap(relays: Vec<&RelayUrl>, public_key: PublicKey) {
let client = nostr_client();
let id = app_state().gift_wrap_sub_id.clone();
let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
if client
.subscribe_with_id_to(relays.clone(), id, filter, None)
.await
.is_ok()
{
log::info!("Subscribed to messages in: {relays:?}");
}
}
/// Fetches metadata for a list of public keys
async fn fetch_metadata_for_pubkeys(public_keys: HashSet<PublicKey>) {
if public_keys.is_empty() {
return;
}
let client = nostr_client();
let app_state = app_state();
let kinds = vec![Kind::Metadata, Kind::ContactList, Kind::RelayList];
let limit = public_keys.len() * kinds.len() + 20;
// A filter to fetch metadata
let filter = Filter::new().authors(public_keys).kinds(kinds).limit(limit);
client
.subscribe_to(BOOTSTRAP_RELAYS, filter, app_state.auto_close_opts)
.await
.ok();
}
/// Stores an unwrapped event in local database with reference to original
async fn set_unwrapped_event(gift_wrap: EventId, unwrapped: &Event) -> Result<(), Error> {
let client = nostr_client();
// Save unwrapped event
client.database().save_event(unwrapped).await?;
// Create a reference event pointing to the unwrapped event
let event = EventBuilder::new(Kind::ApplicationSpecificData, "")
.tags(vec![Tag::identifier(gift_wrap), Tag::event(unwrapped.id)])
.sign(&Keys::generate())
.await?;
// Save reference event
client.database().save_event(&event).await?;
Ok(())
}
/// Retrieves a previously unwrapped event from local database
async fn get_unwrapped_event(root: EventId) -> Result<Event, Error> {
let client = nostr_client();
let filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.identifier(root)
.limit(1);
if let Some(event) = client.database().query(filter).await?.first_owned() {
let target_id = event.tags.event_ids().collect_vec()[0];
if let Some(event) = client.database().event_by_id(target_id).await? {
Ok(event)
} else {
Err(anyhow!("Event not found."))
}
} else {
Err(anyhow!("Event is not cached yet."))
}
}
/// Unwraps a gift-wrapped event and processes its contents.
async fn unwrap_gift_wrap(target: &Event) {
let client = nostr_client();
let app_state = app_state();
let mut message: Option<Event> = None;
if let Ok(event) = Self::get_unwrapped_event(target.id).await {
message = Some(event);
} else if let Ok(unwrapped) = client.unwrap_gift_wrap(target).await {
// Sign the unwrapped event with a RANDOM KEYS
if let Ok(event) = unwrapped.rumor.sign_with_keys(&Keys::generate()) {
// Save this event to the database for future use.
if let Err(e) = Self::set_unwrapped_event(target.id, &event).await {
log::warn!("Failed to cache unwrapped event: {e}")
}
message = Some(event);
}
}
if let Some(event) = message {
// Send all pubkeys to the metadata batch to sync data
for public_key in event.all_pubkeys() {
app_state.ingester.send(public_key).await;
}
match event.created_at >= app_state.init_at {
// New message: send a signal to notify the UI
true => {
app_state
.signal
.send(SignalKind::NewMessage((target.id, event)))
.await;
}
// Old message: Coop is probably processing the user's messages during initial load
false => {
app_state
.gift_wrap_processing
.store(true, Ordering::Release);
}
}
}
}
fn process_registry_event(
&mut self,
event: &RegistryEvent,
window: &mut Window,
cx: &mut Context<Self>,
) {
match event {
RegistryEvent::Open(room) => {
if let Some(room) = room.upgrade() {
self.dock.update(cx, |this, cx| {
let panel = chat::init(room, window, cx);
this.add_panel(Arc::new(panel), DockPlacement::Center, window, cx);
});
} else {
window.push_notification(t!("common.room_error"), cx);
}
}
RegistryEvent::Close(..) => {
self.dock.update(cx, |this, cx| {
this.focus_tab_panel(window, cx);
cx.defer_in(window, |_, window, cx| {
window.dispatch_action(Box::new(ClosePanel), cx);
window.close_all_modals(cx);
});
});
}
_ => {}
};
}
fn auth(&mut self, req: AuthRequest, window: &mut Window, cx: &mut Context<Self>) {
let settings = AppSettings::global(cx);
@@ -835,16 +422,17 @@ impl ChatSpace {
relay.resubscribe().await?;
// Get all failed events that need to be resent
let mut queue = app_state.resend_queue.write().await;
let mut event_tracker = app_state.event_tracker.write().await;
let ids: Vec<EventId> = queue
let ids: Vec<EventId> = event_tracker
.resend_queue
.iter()
.filter(|(_, url)| relay_url == *url)
.map(|(id, _)| *id)
.collect();
for id in ids.into_iter() {
if let Some(relay_url) = queue.remove(&id) {
if let Some(relay_url) = event_tracker.resend_queue.remove(&id) {
if let Some(event) = client.database().event_by_id(&id).await? {
let event_id = relay.send_event(&event).await?;
@@ -854,8 +442,8 @@ impl ChatSpace {
success: HashSet::from([relay_url]),
};
app_state.sent_ids.write().await.insert(event_id);
app_state.resent_ids.write().await.push(output);
event_tracker.sent_ids.insert(event_id);
event_tracker.resent_ids.push(output);
}
}
}

View File

@@ -1,8 +1,8 @@
use std::sync::Arc;
use app_state::constants::{APP_ID, APP_NAME};
use app_state::{app_state, nostr_client};
use assets::Assets;
use global::constants::{APP_ID, APP_NAME};
use global::{app_state, nostr_client};
use gpui::{
point, px, size, AppContext, Application, Bounds, KeyBinding, Menu, MenuItem, SharedString,
TitlebarOptions, WindowBackgroundAppearance, WindowBounds, WindowDecorations, WindowKind,

View File

@@ -1,10 +1,11 @@
use std::time::Duration;
use anyhow::Error;
use app_state::constants::{ACCOUNT_IDENTIFIER, BUNKER_TIMEOUT};
use app_state::state::SignalKind;
use app_state::{app_state, nostr_client};
use client_keys::ClientKeys;
use common::display::RenderedProfile;
use global::constants::{ACCOUNT_IDENTIFIER, BUNKER_TIMEOUT};
use global::{app_state, nostr_client, SignalKind};
use gpui::prelude::FluentBuilder;
use gpui::{
div, relative, rems, svg, AnyElement, App, AppContext, Context, Entity, EventEmitter,

View File

@@ -1,9 +1,9 @@
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use app_state::{app_state, nostr_client};
use common::display::{RenderedProfile, RenderedTimestamp};
use common::nip96::nip96_upload;
use global::{app_state, nostr_client};
use gpui::prelude::FluentBuilder;
use gpui::{
div, img, list, px, red, relative, rems, svg, white, Action, AnyElement, App, AppContext,
@@ -170,7 +170,8 @@ impl Chat {
cx.spawn_in(window, async move |this, cx| {
let app_state = app_state();
let sent_ids = app_state.sent_ids.read().await;
let event_tracker = app_state.event_tracker.read().await;
let sent_ids = event_tracker.sent_ids();
this.update_in(cx, |this, _window, cx| {
if !sent_ids.contains(&gift_wrap_id) {
@@ -1240,6 +1241,7 @@ impl Chat {
let task: Task<Result<Vec<RelayUrl>, Error>> = cx.background_spawn(async move {
let client = nostr_client();
let app_state = app_state();
let event_tracker = app_state.event_tracker.read().await;
let mut relays: Vec<RelayUrl> = vec![];
let filter = Filter::new()
@@ -1249,7 +1251,7 @@ impl Chat {
if let Some(event) = client.database().query(filter).await?.first_owned() {
if let Some(Ok(id)) = event.tags.identifier().map(EventId::parse) {
if let Some(urls) = app_state.seen_on_relays.read().await.get(&id).cloned() {
if let Some(urls) = event_tracker.seen_on_relays.get(&id).cloned() {
relays.extend(urls);
}
}

View File

@@ -2,10 +2,10 @@ use std::ops::Range;
use std::time::Duration;
use anyhow::{anyhow, Error};
use app_state::constants::BOOTSTRAP_RELAYS;
use app_state::{app_state, nostr_client};
use common::display::{RenderedProfile, TextUtils};
use common::nip05::nip05_profile;
use global::constants::BOOTSTRAP_RELAYS;
use global::{app_state, nostr_client};
use gpui::prelude::FluentBuilder;
use gpui::{
div, px, relative, rems, uniform_list, App, AppContext, Context, Entity, InteractiveElement,

View File

@@ -1,8 +1,8 @@
use std::str::FromStr;
use std::time::Duration;
use app_state::nostr_client;
use common::nip96::nip96_upload;
use global::nostr_client;
use gpui::prelude::FluentBuilder;
use gpui::{
div, img, App, AppContext, Context, Entity, Flatten, IntoElement, ParentElement,

View File

@@ -1,8 +1,8 @@
use std::time::Duration;
use app_state::constants::{ACCOUNT_IDENTIFIER, BUNKER_TIMEOUT};
use app_state::nostr_client;
use client_keys::ClientKeys;
use global::constants::{ACCOUNT_IDENTIFIER, BUNKER_TIMEOUT};
use global::nostr_client;
use gpui::prelude::FluentBuilder;
use gpui::{
div, relative, AnyElement, App, AppContext, Context, Entity, EventEmitter, FocusHandle,

View File

@@ -1,7 +1,7 @@
use anyhow::anyhow;
use app_state::constants::{ACCOUNT_IDENTIFIER, NIP17_RELAYS, NIP65_RELAYS};
use app_state::nostr_client;
use common::nip96::nip96_upload;
use global::constants::{ACCOUNT_IDENTIFIER, NIP17_RELAYS, NIP65_RELAYS};
use global::nostr_client;
use gpui::{
div, relative, rems, AnyElement, App, AppContext, AsyncWindowContext, Context, Entity,
EventEmitter, Flatten, FocusHandle, Focusable, IntoElement, ParentElement, PathPromptOptions,

View File

@@ -1,10 +1,12 @@
use std::sync::Arc;
use std::time::Duration;
use app_state::constants::{
ACCOUNT_IDENTIFIER, APP_NAME, NOSTR_CONNECT_RELAY, NOSTR_CONNECT_TIMEOUT,
};
use app_state::nostr_client;
use client_keys::ClientKeys;
use common::display::TextUtils;
use global::constants::{ACCOUNT_IDENTIFIER, APP_NAME, NOSTR_CONNECT_RELAY, NOSTR_CONNECT_TIMEOUT};
use global::nostr_client;
use gpui::prelude::FluentBuilder;
use gpui::{
div, img, px, relative, svg, AnyElement, App, AppContext, ClipboardItem, Context, Entity,

View File

@@ -1,9 +1,9 @@
use std::time::Duration;
use app_state::constants::BOOTSTRAP_RELAYS;
use app_state::nostr_client;
use common::display::{shorten_pubkey, RenderedProfile, RenderedTimestamp};
use common::nip05::nip05_verify;
use global::constants::BOOTSTRAP_RELAYS;
use global::nostr_client;
use gpui::prelude::FluentBuilder;
use gpui::{
div, px, relative, rems, uniform_list, App, AppContext, Context, Div, Entity,

View File

@@ -1,8 +1,8 @@
use std::time::Duration;
use anyhow::{anyhow, Error};
use global::constants::NIP17_RELAYS;
use global::{app_state, nostr_client};
use app_state::constants::NIP17_RELAYS;
use app_state::{app_state, nostr_client};
use gpui::prelude::FluentBuilder;
use gpui::{
div, px, uniform_list, App, AppContext, Context, Entity, InteractiveElement, IntoElement,

View File

@@ -3,10 +3,11 @@ use std::ops::Range;
use std::time::Duration;
use anyhow::{anyhow, Error};
use app_state::constants::{BOOTSTRAP_RELAYS, SEARCH_RELAYS};
use app_state::state::UnwrappingStatus;
use app_state::{app_state, nostr_client};
use common::debounced_delay::DebouncedDelay;
use common::display::{RenderedTimestamp, TextUtils};
use global::constants::{BOOTSTRAP_RELAYS, SEARCH_RELAYS};
use global::{app_state, nostr_client, UnwrappingStatus};
use gpui::prelude::FluentBuilder;
use gpui::{
deferred, div, relative, uniform_list, AnyElement, App, AppContext, Context, Entity,

View File

@@ -1,8 +1,8 @@
use std::time::Duration;
use app_state::nostr_client;
use common::display::RenderedProfile;
use common::nip05::nip05_verify;
use global::nostr_client;
use gpui::prelude::FluentBuilder;
use gpui::{
div, relative, rems, App, AppContext, ClipboardItem, Context, Entity, IntoElement,

View File

@@ -1,261 +0,0 @@
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicBool;
use std::sync::OnceLock;
use std::time::Duration;
use flume::{Receiver, Sender};
use nostr_lmdb::NostrLMDB;
use nostr_sdk::prelude::*;
use paths::nostr_file;
use smol::lock::RwLock;
use crate::paths::support_dir;
pub mod constants;
pub mod paths;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct AuthRequest {
pub url: RelayUrl,
pub challenge: String,
pub sending: bool,
}
impl AuthRequest {
pub fn new(challenge: impl Into<String>, url: RelayUrl) -> Self {
Self {
challenge: challenge.into(),
sending: false,
url,
}
}
}
#[derive(Debug, Clone)]
pub enum Notice {
RelayFailed(RelayUrl),
AuthFailed(RelayUrl),
Custom(String),
}
impl Notice {
pub fn as_str(&self) -> String {
match self {
Notice::AuthFailed(url) => format!("Authenticate failed for relay {url}"),
Notice::RelayFailed(url) => format!("Failed to connect the relay {url}"),
Notice::Custom(msg) => msg.into(),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
pub enum UnwrappingStatus {
#[default]
Initialized,
Processing,
Complete,
}
/// Signals sent through the global event channel to notify UI
#[derive(Debug)]
pub enum SignalKind {
/// A signal to notify UI that the client's signer has been set
SignerSet(PublicKey),
/// A signal to notify UI that the client's signer has been unset
SignerUnset,
/// A signal to notify UI that the relay requires authentication
Auth(AuthRequest),
/// A signal to notify UI that the browser proxy service is down
ProxyDown,
/// A signal to notify UI that a new profile has been received
NewProfile(Profile),
/// A signal to notify UI that a new gift wrap event has been received
NewMessage((EventId, Event)),
/// A signal to notify UI that no DM relays for current user was found
RelaysNotFound,
/// A signal to notify UI that gift wrap status has changed
GiftWrapStatus(UnwrappingStatus),
/// A signal to notify UI that there are errors or notices occurred
Notice(Notice),
}
#[derive(Debug)]
pub struct Signal {
rx: Receiver<SignalKind>,
tx: Sender<SignalKind>,
}
impl Default for Signal {
fn default() -> Self {
Self::new()
}
}
impl Signal {
pub fn new() -> Self {
let (tx, rx) = flume::bounded::<SignalKind>(2048);
Self { rx, tx }
}
pub fn receiver(&self) -> &Receiver<SignalKind> {
&self.rx
}
pub async fn send(&self, kind: SignalKind) {
if let Err(e) = self.tx.send_async(kind).await {
log::error!("Failed to send signal: {e}");
}
}
}
#[derive(Debug)]
pub struct Ingester {
rx: Receiver<PublicKey>,
tx: Sender<PublicKey>,
}
impl Default for Ingester {
fn default() -> Self {
Self::new()
}
}
impl Ingester {
pub fn new() -> Self {
let (tx, rx) = flume::bounded::<PublicKey>(1024);
Self { rx, tx }
}
pub fn receiver(&self) -> &Receiver<PublicKey> {
&self.rx
}
pub async fn send(&self, public_key: PublicKey) {
if let Err(e) = self.tx.send_async(public_key).await {
log::error!("Failed to send public key: {e}");
}
}
}
/// A simple storage to store all states that using across the application.
#[derive(Debug)]
pub struct AppState {
/// The timestamp when the application was initialized.
pub init_at: Timestamp,
/// The timestamp when the application was last used.
pub last_used_at: Option<Timestamp>,
/// Whether this is the first run of the application.
pub is_first_run: AtomicBool,
/// Subscription ID for listening to gift wrap events from relays.
pub gift_wrap_sub_id: SubscriptionId,
/// Auto-close options for relay subscriptions
pub auto_close_opts: Option<SubscribeAutoCloseOptions>,
/// Whether gift wrap processing is in progress.
pub gift_wrap_processing: AtomicBool,
/// Tracking events sent by Coop in the current session
pub sent_ids: RwLock<HashSet<EventId>>,
/// Tracking events seen on which relays in the current session
pub seen_on_relays: RwLock<HashMap<EventId, HashSet<RelayUrl>>>,
/// Tracking events that have been resent by Coop in the current session
pub resent_ids: RwLock<Vec<Output<EventId>>>,
/// Temporarily store events that need to be resent later
pub resend_queue: RwLock<HashMap<EventId, RelayUrl>>,
/// Signal channel for communication between Nostr and GPUI
pub signal: Signal,
/// Ingester channel for processing public keys
pub ingester: Ingester,
}
impl Default for AppState {
fn default() -> Self {
Self::new()
}
}
impl AppState {
pub fn new() -> Self {
let init_at = Timestamp::now();
let first_run = first_run();
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let signal = Signal::default();
let ingester = Ingester::default();
Self {
init_at,
signal,
ingester,
last_used_at: None,
is_first_run: AtomicBool::new(first_run),
gift_wrap_sub_id: SubscriptionId::new("inbox"),
gift_wrap_processing: AtomicBool::new(false),
auto_close_opts: Some(opts),
sent_ids: RwLock::new(HashSet::new()),
seen_on_relays: RwLock::new(HashMap::new()),
resent_ids: RwLock::new(Vec::new()),
resend_queue: RwLock::new(HashMap::new()),
}
}
}
static NOSTR_CLIENT: OnceLock<Client> = OnceLock::new();
static APP_STATE: OnceLock<AppState> = OnceLock::new();
pub fn nostr_client() -> &'static Client {
NOSTR_CLIENT.get_or_init(|| {
// rustls uses the `aws_lc_rs` provider by default
// This only errors if the default provider has already
// been installed. We can ignore this `Result`.
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.ok();
let lmdb = NostrLMDB::open(nostr_file()).expect("Database is NOT initialized");
let opts = ClientOptions::new()
.gossip(true)
.automatic_authentication(false)
.verify_subscriptions(false)
.sleep_when_idle(SleepWhenIdle::Enabled {
timeout: Duration::from_secs(600),
});
ClientBuilder::default().database(lmdb).opts(opts).build()
})
}
pub fn app_state() -> &'static AppState {
APP_STATE.get_or_init(AppState::new)
}
fn first_run() -> bool {
let flag = support_dir().join(format!(".{}-first_run", env!("CARGO_PKG_VERSION")));
if !flag.exists() {
if std::fs::write(&flag, "").is_err() {
return false;
}
true // First run
} else {
false // Not first run
}
}

View File

@@ -6,7 +6,7 @@ publish.workspace = true
[dependencies]
common = { path = "../common" }
global = { path = "../global" }
app_state = { path = "../app_state" }
settings = { path = "../settings" }
gpui.workspace = true

View File

@@ -2,10 +2,11 @@ use std::cmp::Reverse;
use std::collections::{HashMap, HashSet};
use anyhow::Error;
use app_state::nostr_client;
use app_state::state::UnwrappingStatus;
use common::event::EventUtils;
use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher;
use global::{nostr_client, UnwrappingStatus};
use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Task, WeakEntity, Window};
use itertools::Itertools;
use nostr_sdk::prelude::*;

View File

@@ -4,12 +4,11 @@ use std::hash::{Hash, Hasher};
use std::time::Duration;
use anyhow::{anyhow, Error};
use app_state::constants::SEND_RETRY;
use app_state::{app_state, nostr_client};
use common::display::RenderedProfile;
use common::event::EventUtils;
use global::constants::SEND_RETRY;
use global::{app_state, nostr_client};
use gpui::{App, AppContext, Context, EventEmitter, SharedString, SharedUri, Task};
use itertools::Itertools;
use nostr_sdk::prelude::*;
use crate::Registry;
@@ -358,18 +357,6 @@ impl Room {
})
}
pub fn disconnect(&self, relays: Vec<RelayUrl>, cx: &App) -> Task<Result<(), Error>> {
cx.background_spawn(async move {
let client = nostr_client();
for relay in relays.into_iter() {
client.disconnect_relay(relay).await?;
}
Ok(())
})
}
/// Loads all messages for this room from the database
pub fn load_messages(&self, cx: &App) -> Task<Result<Vec<Event>, Error>> {
let members = self.members.clone();
@@ -378,13 +365,14 @@ impl Room {
let client = nostr_client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let sent_ids = app_state()
.sent_ids
let sent_ids: Vec<EventId> = app_state()
.event_tracker
.read()
.await
.sent_ids()
.iter()
.copied()
.collect_vec();
.collect();
// Get seen events from database
let filter = Filter::new()
@@ -508,12 +496,17 @@ impl Room {
let rumor = rumor.clone();
let event = EventBuilder::gift_wrap(&signer, &receiver, rumor, vec![]).await?;
let Ok(relay_urls) = Self::messaging_relays(receiver).await else {
let gossip = app_state.gossip.read().await;
let urls = gossip.messaging_relays(&receiver);
// Check if there are any relays to send the event to
if urls.is_empty() {
reports.push(SendReport::new(receiver).not_found());
continue;
};
}
match client.send_event_to(relay_urls, &event).await {
// Send the event to the relays
match client.send_event_to(urls, &event).await {
Ok(output) => {
let id = output.id().to_owned();
let auth_required = output.failed.iter().any(|m| m.1.starts_with("auth-"));
@@ -522,7 +515,8 @@ impl Room {
if auth_required {
// Wait for authenticated and resent event successfully
for attempt in 0..=SEND_RETRY {
let ids = app_state.resent_ids.read().await;
let retry_manager = app_state.event_tracker.read().await;
let ids = retry_manager.resent_ids();
// Check if event was successfully resent
if let Some(output) = ids.iter().find(|e| e.id() == &id).cloned() {
@@ -555,8 +549,15 @@ impl Room {
// Only send a backup message to current user if sent successfully to others
if reports.iter().all(|r| r.is_sent_success()) && backup {
if let Ok(relay_urls) = Self::messaging_relays(public_key).await {
match client.send_event_to(relay_urls, &event).await {
let gossip = app_state.gossip.read().await;
let urls = gossip.messaging_relays(&public_key);
// Check if there are any relays to send the event to
if urls.is_empty() {
reports.push(SendReport::new(public_key).not_found());
} else {
// Send the event to the relays
match client.send_event_to(urls, &event).await {
Ok(output) => {
reports.push(SendReport::new(public_key).status(output));
}
@@ -564,8 +565,6 @@ impl Room {
reports.push(SendReport::new(public_key).error(e.to_string()));
}
}
} else {
reports.push(SendReport::new(public_key).not_found());
}
} else {
reports.push(SendReport::new(public_key).on_hold(event));
@@ -583,6 +582,8 @@ impl Room {
) -> Task<Result<Vec<SendReport>, Error>> {
cx.background_spawn(async move {
let client = nostr_client();
let app_state = app_state();
let mut resend_reports = vec![];
for report in reports.into_iter() {
@@ -611,8 +612,15 @@ impl Room {
// Process the on hold event if it exists
if let Some(event) = report.on_hold {
if let Ok(relay_urls) = Self::messaging_relays(receiver).await {
match client.send_event_to(relay_urls, &event).await {
let gossip = app_state.gossip.read().await;
let urls = gossip.messaging_relays(&receiver);
// Check if there are any relays to send the event to
if urls.is_empty() {
resend_reports.push(SendReport::new(receiver).not_found());
} else {
// Send the event to the relays
match client.send_event_to(urls, &event).await {
Ok(output) => {
resend_reports.push(SendReport::new(receiver).status(output));
}
@@ -620,8 +628,6 @@ impl Room {
resend_reports.push(SendReport::new(receiver).error(e.to_string()));
}
}
} else {
resend_reports.push(SendReport::new(receiver).not_found());
}
}
}
@@ -629,36 +635,4 @@ impl Room {
Ok(resend_reports)
})
}
/// Gets messaging relays for public key
async fn messaging_relays(public_key: PublicKey) -> Result<Vec<RelayUrl>, Error> {
let client = nostr_client();
let mut relay_urls = vec![];
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
if let Some(event) = client.database().query(filter).await?.first_owned() {
let urls: Vec<RelayUrl> = nip17::extract_owned_relay_list(event).collect();
// Check if at least one URL exists
if urls.is_empty() {
return Err(anyhow!("Not found"));
}
// Connect to relays
for url in urls.iter() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
relay_urls.extend(urls.into_iter().take(3).unique());
} else {
return Err(anyhow!("Not found"));
}
Ok(relay_urls)
}
}

View File

@@ -5,7 +5,7 @@ edition.workspace = true
publish.workspace = true
[dependencies]
global = { path = "../global" }
app_state = { path = "../app_state" }
nostr-sdk.workspace = true
gpui.workspace = true

View File

@@ -1,6 +1,6 @@
use anyhow::anyhow;
use global::constants::SETTINGS_IDENTIFIER;
use global::nostr_client;
use app_state::constants::SETTINGS_IDENTIFIER;
use app_state::nostr_client;
use gpui::{App, AppContext, Context, Entity, Global, Subscription, Task};
use nostr_sdk::prelude::*;
use serde::{Deserialize, Serialize};

View File

@@ -5,7 +5,7 @@ edition.workspace = true
publish.workspace = true
[dependencies]
global = { path = "../global" }
app_state = { path = "../app_state" }
nostr.workspace = true
smol.workspace = true