chore: improve nip4e implementation (#204)

* patch

* update ui

* add load response

* fix

* .

* wip: rewrite gossip

* new gossip implementation

* clean up

* .

* debug

* .

* .

* update

* .

* fix

* fix
This commit is contained in:
reya
2025-11-15 08:30:45 +07:00
committed by GitHub
parent d87bcfbd65
commit 122299f548
18 changed files with 847 additions and 579 deletions

View File

@@ -18,8 +18,7 @@ use nostr_sdk::prelude::*;
pub use room::*;
use settings::AppSettings;
use smallvec::{smallvec, SmallVec};
use smol::lock::RwLock;
use state::{initialized_at, EventTracker, NostrRegistry, GIFTWRAP_SUBSCRIPTION};
use state::{initialized_at, NostrRegistry, GIFTWRAP_SUBSCRIPTION};
mod message;
mod room;
@@ -41,6 +40,9 @@ pub struct ChatRegistry {
/// Loading status of the registry
pub loading: bool,
/// Async task for handling notifications
handle_notifications: Task<()>,
/// Event subscriptions
_subscriptions: SmallVec<[Subscription; 1]>,
@@ -82,11 +84,19 @@ impl ChatRegistry {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let tracker = nostr.read(cx).tracker();
let status = Arc::new(AtomicBool::new(true));
let (tx, rx) = flume::bounded::<Signal>(2048);
let handle_notifications = cx.background_spawn({
let client = nostr.read(cx).client();
let status = Arc::clone(&status);
let tx = tx.clone();
let signer: Option<Arc<dyn NostrSigner>> = None;
async move { Self::handle_notifications(&client, &signer, &tx, &status).await }
});
let mut subscriptions = smallvec![];
let mut tasks = smallvec![];
@@ -98,29 +108,27 @@ impl ChatRegistry {
move |this, state, cx| {
if let Some(signer) = state.read(cx).clone() {
this.retry_failed_events(&signer, &tx, &status, cx);
this.handle_notifications = cx.background_spawn({
let client = nostr.read(cx).client();
let status = Arc::clone(&status);
let tx = tx.clone();
let signer = Some(signer);
async move {
Self::handle_notifications(&client, &signer, &tx, &status).await
}
});
cx.notify();
}
}
}),
);
tasks.push(
// Handle notifications
cx.background_spawn({
let client = Arc::clone(&client);
let status = Arc::clone(&status);
let tx = tx.clone();
async move { Self::handle_notifications(&client, &tracker, &tx, &status).await }
}),
);
tasks.push(
// Handle unwrapping status
cx.background_spawn({
let client = Arc::clone(&client);
async move { Self::handle_unwrapping(&client, &status, &tx).await }
}),
cx.background_spawn(
async move { Self::handle_unwrapping(&client, &status, &tx).await },
),
);
tasks.push(
@@ -155,23 +163,24 @@ impl ChatRegistry {
Self {
rooms: vec![],
loading: true,
handle_notifications,
_subscriptions: subscriptions,
_tasks: tasks,
}
}
async fn handle_notifications(
async fn handle_notifications<T>(
client: &Client,
tracker: &Arc<RwLock<EventTracker>>,
signer: &Option<T>,
tx: &Sender<Signal>,
status: &Arc<AtomicBool>,
) {
let mut notifications = client.notifications();
log::info!("Listening for notifications");
) where
T: NostrSigner,
{
let initialized_at = initialized_at();
let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
let mut notifications = client.notifications();
let mut public_keys = HashSet::new();
let mut processed_events = HashSet::new();
@@ -194,7 +203,7 @@ impl ChatRegistry {
}
// Extract the rumor from the gift wrap event
match Self::extract_rumor(client, event.as_ref()).await {
match Self::extract_rumor(client, signer, event.as_ref()).await {
Ok(rumor) => {
// Get all public keys
public_keys.extend(rumor.all_pubkeys());
@@ -209,7 +218,7 @@ impl ChatRegistry {
Self::get_metadata(client, public_keys).await.ok();
}
match &event.created_at >= initialized_at {
match &rumor.created_at >= initialized_at {
true => {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
@@ -223,11 +232,8 @@ impl ChatRegistry {
}
}
}
Err(_e) => {
let mut tracker = tracker.write().await;
tracker.failed_unwrap_events.push(event.as_ref().clone());
drop(tracker);
Err(e) => {
log::warn!("Failed to unwrap gift wrap event: {}", e);
}
}
}
@@ -280,46 +286,6 @@ impl ChatRegistry {
}
}
fn retry_failed_events(
&mut self,
signer: &Arc<dyn NostrSigner>,
tx: &Sender<Signal>,
status: &Arc<AtomicBool>,
cx: &mut Context<Self>,
) {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let tracker = nostr.read(cx).tracker();
let signer = Arc::clone(signer);
let status = Arc::clone(status);
let tx = tx.clone();
let initialized_at = initialized_at();
self._tasks.push(cx.background_spawn(async move {
let tracker = tracker.read().await;
for event in tracker.failed_unwrap_events.iter() {
if let Ok(rumor) = Self::try_unwrap_custom(&client, &signer, event).await {
match &event.created_at >= initialized_at {
true => {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
if let Err(e) = tx.send_async(signal).await {
log::error!("Failed to send signal: {}", e);
}
}
false => {
status.store(true, Ordering::Release);
}
}
}
}
}));
}
/// Set the loading status of the chat registry
pub fn set_loading(&mut self, loading: bool, cx: &mut Context<Self>) {
self.loading = loading;
@@ -600,14 +566,21 @@ impl ChatRegistry {
}
// Unwraps a gift-wrapped event and processes its contents.
async fn extract_rumor(client: &Client, gift_wrap: &Event) -> Result<UnsignedEvent, Error> {
async fn extract_rumor<T>(
client: &Client,
signer: &Option<T>,
gift_wrap: &Event,
) -> Result<UnsignedEvent, Error>
where
T: NostrSigner,
{
// Try to get cached rumor first
if let Ok(event) = Self::get_rumor(client, gift_wrap.id).await {
return Ok(event);
}
// Try to unwrap with the available signer
let unwrapped = Self::try_unwrap(client, gift_wrap).await?;
let unwrapped = Self::try_unwrap(client, signer, gift_wrap).await?;
let mut rumor_unsigned = unwrapped.rumor;
// Generate event id for the rumor if it doesn't have one
@@ -620,34 +593,45 @@ impl ChatRegistry {
}
// Helper method to try unwrapping with different signers
async fn try_unwrap(client: &Client, gift_wrap: &Event) -> Result<UnwrappedGift, Error> {
async fn try_unwrap<T>(
client: &Client,
signer: &Option<T>,
gift_wrap: &Event,
) -> Result<UnwrappedGift, Error>
where
T: NostrSigner,
{
if let Some(custom_signer) = signer.as_ref() {
if let Ok(seal) = custom_signer
.nip44_decrypt(&gift_wrap.pubkey, &gift_wrap.content)
.await
{
let seal: Event = Event::from_json(seal)?;
seal.verify_with_ctx(SECP256K1)?;
// Decrypt the rumor
// TODO: verify the sender
let rumor = custom_signer
.nip44_decrypt(&seal.pubkey, &seal.content)
.await?;
// Construct the unsigned event
let rumor = UnsignedEvent::from_json(rumor)?;
// Return the unwrapped gift
return Ok(UnwrappedGift {
sender: rumor.pubkey,
rumor,
});
}
}
let signer = client.signer().await?;
let unwrapped = UnwrappedGift::from_gift_wrap(&signer, gift_wrap).await?;
Ok(unwrapped)
}
/// Helper method to try unwrapping with a custom signer
async fn try_unwrap_custom<T>(
client: &Client,
signer: &T,
gift_wrap: &Event,
) -> Result<UnsignedEvent, Error>
where
T: NostrSigner,
{
let unwrapped = UnwrappedGift::from_gift_wrap(signer, gift_wrap).await?;
let mut rumor_unsigned = unwrapped.rumor;
// Generate event id for the rumor if it doesn't have one
rumor_unsigned.ensure_id();
// Cache the rumor
Self::set_rumor(client, gift_wrap.id, &rumor_unsigned).await?;
Ok(rumor_unsigned)
}
/// Stores an unwrapped event in local database with reference to original
async fn set_rumor(client: &Client, id: EventId, rumor: &UnsignedEvent) -> Result<(), Error> {
let rumor_id = rumor.id.context("Rumor is missing an event id")?;
@@ -718,7 +702,7 @@ impl ChatRegistry {
{
let authors: Vec<PublicKey> = public_keys.into_iter().collect();
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let kinds = vec![Kind::Metadata, Kind::ContactList, Kind::RelayList];
let kinds = vec![Kind::Metadata, Kind::ContactList];
// Return if the list is empty
if authors.is_empty() {
@@ -726,7 +710,7 @@ impl ChatRegistry {
}
let filter = Filter::new()
.limit(authors.len() * kinds.len() + 10)
.limit(authors.len() * kinds.len())
.authors(authors)
.kinds(kinds);

View File

@@ -8,6 +8,7 @@ use anyhow::{anyhow, Error};
use common::{EventUtils, RenderedProfile};
use encryption::{Encryption, SignerKind};
use gpui::{App, AppContext, Context, EventEmitter, SharedString, Task};
use itertools::Itertools;
use nostr_sdk::prelude::*;
use person::PersonRegistry;
use state::NostrRegistry;
@@ -326,7 +327,7 @@ impl Room {
cx.emit(RoomSignal::Refresh);
}
/// Get messaging relays and encryption keys announcement for each member
/// Get gossip relays for each member
pub fn connect(&self, cx: &App) -> Task<Result<(), Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
@@ -342,22 +343,10 @@ impl Room {
continue;
};
// Construct a filter for messaging relays
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(member)
.limit(1);
// Construct a filter for gossip relays
let filter = Filter::new().kind(Kind::RelayList).author(member).limit(1);
// Subscribe to get members messaging relays
client.subscribe(filter, Some(opts)).await?;
// Construct a filter for encryption keys announcement
let filter = Filter::new()
.kind(Kind::Custom(10044))
.author(member)
.limit(1);
// Subscribe to get members encryption keys announcement
// Subscribe to get member's gossip relays
client.subscribe(filter, Some(opts)).await?;
}
@@ -376,15 +365,15 @@ impl Room {
.kind(Kind::ApplicationSpecificData)
.custom_tag(SingleLetterTag::lowercase(Alphabet::C), conversation_id);
let stored = client.database().query(filter).await?;
let mut messages: Vec<UnsignedEvent> = stored
let messages = client
.database()
.query(filter)
.await?
.into_iter()
.filter_map(|event| UnsignedEvent::from_json(&event.content).ok())
.sorted_by_key(|message| message.created_at)
.collect();
messages.sort_by_key(|message| message.created_at);
Ok(messages)
})
}
@@ -392,8 +381,8 @@ impl Room {
/// Create a new message event (unsigned)
pub fn create_message(&self, content: &str, replies: &[EventId], cx: &App) -> UnsignedEvent {
let nostr = NostrRegistry::global(cx);
let cache = nostr.read(cx).cache_manager();
let cache = cache.read_blocking();
let gossip = nostr.read(cx).gossip();
let read_gossip = gossip.read_blocking();
// Get current user
let account = Account::global(cx);
@@ -409,9 +398,7 @@ impl Room {
// NOTE: current user will be removed from the list of receivers
for member in self.members.iter() {
// Get relay hint if available
let relay_url = cache
.relay(member)
.and_then(|urls| urls.iter().nth(0).cloned());
let relay_url = read_gossip.messaging_relays(member).first().cloned();
// Construct a public key tag with relay hint
let tag = TagStandard::PublicKey {
@@ -470,7 +457,7 @@ impl Room {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let cache = nostr.read(cx).cache_manager();
let gossip = nostr.read(cx).gossip();
let tracker = nostr.read(cx).tracker();
let rumor = rumor.to_owned();
@@ -481,8 +468,11 @@ impl Room {
cx.background_spawn(async move {
let signer_kind = opts.signer_kind;
let cache = cache.read().await;
let tracker = tracker.read().await;
let gossip = gossip.read().await;
// Get current user's signer and public key
let user_signer = client.signer().await?;
let user_pubkey = user_signer.get_public_key().await?;
// Get the encryption public key
let encryption_pubkey = if let Some(signer) = encryption_key.as_ref() {
@@ -491,9 +481,6 @@ impl Room {
None
};
let user_signer = client.signer().await?;
let user_pubkey = user_signer.get_public_key().await?;
// Remove the current user's public key from the list of receivers
// the current user will be handled separately
members.retain(|&pk| pk != user_pubkey);
@@ -506,7 +493,9 @@ impl Room {
for member in members.into_iter() {
// Get user's messaging relays
let urls = cache.relay(&member).cloned().unwrap_or_default();
let urls = gossip.messaging_relays(&member);
// Get user's encryption public key if available
let encryption = gossip.announcement(&member).map(|a| a.public_key());
// Check if there are any relays to send the message to
if urls.is_empty() {
@@ -514,35 +503,26 @@ impl Room {
continue;
}
// Ensure connection to all messaging relays
for url in urls.iter() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
// Get user's encryption public key if available
let encryption = cache
.announcement(&member)
.and_then(|a| a.to_owned().map(|a| a.public_key()));
// Skip sending if using encryption signer but receiver's encryption keys not found
if encryption.is_none() && matches!(signer_kind, SignerKind::Encryption) {
reports.push(SendReport::new(member).device_not_found());
continue;
}
let receiver = Self::select_receiver(&signer_kind, member, encryption)?;
let rumor = rumor.clone();
// Ensure connections to the relays
gossip.ensure_connections(&client, &urls).await;
// Construct the sealed event
let seal = EventBuilder::seal(&signer, &receiver, rumor.clone())
.await?
.build(member)
.sign(&signer)
.await?;
// Determine the receiver based on the signer kind
let receiver = Self::select_receiver(&signer_kind, member, encryption)?;
// Construct the gift wrap event
let event = EventBuilder::gift_wrap_from_seal(&member, &seal, vec![])?;
let event = EventBuilder::gift_wrap(
&signer,
&receiver,
rumor.clone(),
vec![Tag::public_key(member)],
)
.await?;
// Send the gift wrap event to the messaging relays
match client.send_event_to(urls, &event).await {
@@ -554,6 +534,7 @@ impl Room {
if auth {
// Wait for authenticated and resent event successfully
for attempt in 0..=SEND_RETRY {
let tracker = tracker.read().await;
let ids = tracker.resent_ids();
// Check if event was successfully resent
@@ -581,22 +562,34 @@ impl Room {
}
}
let receiver = Self::select_receiver(&signer_kind, user_pubkey, encryption_pubkey)?;
let rumor = rumor.clone();
// Return early if the user disabled backup.
//
// Coop will not send a gift wrap event to the current user.
if !opts.backup() {
return Ok(reports);
}
// Construct the sealed event
let seal = EventBuilder::seal(&signer, &receiver, rumor.clone())
.await?
.build(user_pubkey)
.sign(&signer)
.await?;
// Skip sending if using encryption signer but receiver's encryption keys not found
if encryption_pubkey.is_none() && matches!(signer_kind, SignerKind::Encryption) {
reports.push(SendReport::new(user_pubkey).device_not_found());
return Ok(reports);
}
// Determine the receiver based on the signer kind
let receiver = Self::select_receiver(&signer_kind, user_pubkey, encryption_pubkey)?;
// Construct the gift-wrapped event
let event = EventBuilder::gift_wrap_from_seal(&receiver, &seal, vec![])?;
let event = EventBuilder::gift_wrap(
&signer,
&receiver,
rumor.clone(),
vec![Tag::public_key(user_pubkey)],
)
.await?;
// Only send a backup message to current user if sent successfully to others
if opts.backup() && reports.iter().all(|r| r.is_sent_success()) {
let urls = cache.relay(&user_pubkey).cloned().unwrap_or_default();
if reports.iter().all(|r| r.is_sent_success()) {
let urls = gossip.messaging_relays(&user_pubkey);
// Check if there are any relays to send the event to
if urls.is_empty() {
@@ -604,11 +597,8 @@ impl Room {
return Ok(reports);
}
// Ensure connection to all messaging relays
for url in urls.iter() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
// Ensure connections to the relays
gossip.ensure_connections(&client, &urls).await;
// Send the event to the messaging relays
match client.send_event_to(urls, &event).await {
@@ -635,10 +625,10 @@ impl Room {
) -> Task<Result<Vec<SendReport>, Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let cache_manager = nostr.read(cx).cache_manager();
let gossip = nostr.read(cx).gossip();
cx.background_spawn(async move {
let cache = cache_manager.read().await;
let gossip = gossip.read().await;
let mut resend_reports = vec![];
for report in reports.into_iter() {
@@ -667,12 +657,15 @@ impl Room {
// Process the on hold event if it exists
if let Some(event) = report.on_hold {
let urls = cache.relay(&receiver).cloned().unwrap_or_default();
let urls = gossip.messaging_relays(&receiver);
// Check if there are any relays to send the event to
if urls.is_empty() {
resend_reports.push(SendReport::new(receiver).relays_not_found());
} else {
// Ensure connections to the relays
gossip.ensure_connections(&client, &urls).await;
// Send the event to the messaging relays
match client.send_event_to(urls, &event).await {
Ok(output) => {
@@ -705,15 +698,15 @@ impl Room {
fn select_receiver(
kind: &SignerKind,
members: PublicKey,
member: PublicKey,
encryption: Option<PublicKey>,
) -> Result<PublicKey, Error> {
match kind {
SignerKind::Encryption => {
Ok(encryption.ok_or_else(|| anyhow!("Receiver's encryption key not found"))?)
}
SignerKind::User => Ok(members),
SignerKind::Auto => Ok(encryption.unwrap_or(members)),
SignerKind::User => Ok(member),
SignerKind::Auto => Ok(encryption.unwrap_or(member)),
}
}
}