This commit is contained in:
2026-01-03 20:39:18 +07:00
parent bb455871e5
commit 067f88dfa6
26 changed files with 1212 additions and 2117 deletions

View File

@@ -7,8 +7,6 @@ publish.workspace = true
[dependencies]
common = { path = "../common" }
state = { path = "../state" }
account = { path = "../account" }
encryption = { path = "../encryption" }
person = { path = "../person" }
settings = { path = "../settings" }

View File

@@ -5,20 +5,18 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use account::Account;
use anyhow::{anyhow, Context as AnyhowContext, Error};
use common::{EventUtils, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT};
use encryption::Encryption;
use flume::Sender;
use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher;
use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Subscription, Task};
use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Task};
pub use message::*;
use nostr_sdk::prelude::*;
pub use room::*;
use settings::AppSettings;
use smallvec::{smallvec, SmallVec};
use state::{initialized_at, NostrRegistry, GIFTWRAP_SUBSCRIPTION};
use state::{tracker, NostrRegistry, GIFTWRAP_SUBSCRIPTION};
mod message;
mod room;
@@ -35,16 +33,10 @@ impl Global for GlobalChatRegistry {}
#[derive(Debug)]
pub struct ChatRegistry {
/// Collection of all chat rooms
pub rooms: Vec<Entity<Room>>,
rooms: Vec<Entity<Room>>,
/// Loading status of the registry
pub loading: bool,
/// Async task for handling notifications
handle_notifications: Task<()>,
/// Event subscriptions
_subscriptions: SmallVec<[Subscription; 1]>,
loading: bool,
/// Tasks for asynchronous operations
_tasks: SmallVec<[Task<()>; 4]>,
@@ -79,53 +71,30 @@ impl ChatRegistry {
/// Create a new chat registry instance
fn new(cx: &mut Context<Self>) -> Self {
let encryption = Encryption::global(cx);
let encryption_key = encryption.read(cx).encryption.clone();
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
// A flag to indicate if the registry is loading
let status = Arc::new(AtomicBool::new(true));
// Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<Signal>(2048);
let handle_notifications = cx.background_spawn({
let client = nostr.read(cx).client();
let status = Arc::clone(&status);
let tx = tx.clone();
let signer: Option<Arc<dyn NostrSigner>> = None;
async move { Self::handle_notifications(&client, &signer, &tx, &status).await }
});
let mut subscriptions = smallvec![];
let mut tasks = smallvec![];
subscriptions.push(
// Observe the encryption global state
cx.observe(&encryption_key, {
tasks.push(
// Handle nostr notifications
cx.background_spawn({
let client = client.clone();
let status = Arc::clone(&status);
let tx = tx.clone();
move |this, state, cx| {
if let Some(signer) = state.read(cx).clone() {
this.handle_notifications = cx.background_spawn({
let client = nostr.read(cx).client();
let status = Arc::clone(&status);
let tx = tx.clone();
let signer = Some(signer);
async move {
Self::handle_notifications(&client, &signer, &tx, &status).await
}
});
cx.notify();
}
}
async move { Self::handle_notifications(&client, &status, &tx).await }
}),
);
tasks.push(
// Handle unwrapping status
// Handle unwrapping progress
cx.background_spawn(
async move { Self::handle_unwrapping(&client, &status, &tx).await },
),
@@ -140,20 +109,20 @@ impl ChatRegistry {
this.update(cx, |this, cx| {
this.new_message(message, cx);
})
.expect("Entity has been released");
.ok();
}
Signal::Eose => {
this.update(cx, |this, cx| {
this.get_rooms(cx);
})
.expect("Entity has been released");
.ok();
}
Signal::Loading(status) => {
this.update(cx, |this, cx| {
this.set_loading(status, cx);
this.get_rooms(cx);
})
.expect("Entity has been released");
.ok();
}
};
}
@@ -163,21 +132,12 @@ impl ChatRegistry {
Self {
rooms: vec![],
loading: true,
handle_notifications,
_subscriptions: subscriptions,
_tasks: tasks,
}
}
async fn handle_notifications<T>(
client: &Client,
signer: &Option<T>,
tx: &Sender<Signal>,
status: &Arc<AtomicBool>,
) where
T: NostrSigner,
{
let initialized_at = initialized_at();
async fn handle_notifications(client: &Client, loading: &Arc<AtomicBool>, tx: &Sender<Signal>) {
let initialized_at = Timestamp::now();
let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
let mut notifications = client.notifications();
@@ -203,13 +163,13 @@ impl ChatRegistry {
}
// Extract the rumor from the gift wrap event
match Self::extract_rumor(client, signer, event.as_ref()).await {
match Self::extract_rumor(client, event.as_ref()).await {
Ok(rumor) => {
// Get all public keys
public_keys.extend(rumor.all_pubkeys());
let limit_reached = public_keys.len() >= METADATA_BATCH_LIMIT;
let done = !status.load(Ordering::Acquire) && !public_keys.is_empty();
let done = !loading.load(Ordering::Acquire) && !public_keys.is_empty();
// Get metadata for all public keys if the limit is reached
if limit_reached || done {
@@ -218,17 +178,24 @@ impl ChatRegistry {
Self::get_metadata(client, public_keys).await.ok();
}
match &rumor.created_at >= initialized_at {
match rumor.created_at >= initialized_at {
true => {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
let sent_by_coop = {
let tracker = tracker().read().await;
tracker.is_sent_by_coop(&event.id)
};
if let Err(e) = tx.send_async(signal).await {
log::error!("Failed to send signal: {}", e);
if !sent_by_coop {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
if let Err(e) = tx.send_async(signal).await {
log::error!("Failed to send signal: {}", e);
}
}
}
false => {
status.store(true, Ordering::Release);
loading.store(true, Ordering::Release);
}
}
}
@@ -530,7 +497,7 @@ impl ChatRegistry {
pub fn new_message(&mut self, message: NewMessage, cx: &mut Context<Self>) {
let id = message.rumor.uniq_id();
let author = message.rumor.pubkey;
let account = Account::global(cx);
let nostr = NostrRegistry::global(cx);
if let Some(room) = self.rooms.iter().find(|room| room.read(cx).id == id) {
let is_new_event = message.rumor.created_at > room.read(cx).created_at;
@@ -544,7 +511,7 @@ impl ChatRegistry {
}
// Set this room is ongoing if the new message is from current user
if author == account.read(cx).public_key() {
if author == nostr.read(cx).identity(cx).public_key() {
this.set_ongoing(cx);
}
@@ -566,21 +533,14 @@ impl ChatRegistry {
}
// Unwraps a gift-wrapped event and processes its contents.
async fn extract_rumor<T>(
client: &Client,
signer: &Option<T>,
gift_wrap: &Event,
) -> Result<UnsignedEvent, Error>
where
T: NostrSigner,
{
async fn extract_rumor(client: &Client, gift_wrap: &Event) -> Result<UnsignedEvent, Error> {
// Try to get cached rumor first
if let Ok(event) = Self::get_rumor(client, gift_wrap.id).await {
return Ok(event);
}
// Try to unwrap with the available signer
let unwrapped = Self::try_unwrap(client, signer, gift_wrap).await?;
let unwrapped = Self::try_unwrap(client, gift_wrap).await?;
let mut rumor_unsigned = unwrapped.rumor;
// Generate event id for the rumor if it doesn't have one
@@ -593,39 +553,7 @@ impl ChatRegistry {
}
// Helper method to try unwrapping with different signers
async fn try_unwrap<T>(
client: &Client,
signer: &Option<T>,
gift_wrap: &Event,
) -> Result<UnwrappedGift, Error>
where
T: NostrSigner,
{
if let Some(custom_signer) = signer.as_ref() {
if let Ok(seal) = custom_signer
.nip44_decrypt(&gift_wrap.pubkey, &gift_wrap.content)
.await
{
let seal: Event = Event::from_json(seal)?;
seal.verify_with_ctx(&SECP256K1)?;
// Decrypt the rumor
// TODO: verify the sender
let rumor = custom_signer
.nip44_decrypt(&seal.pubkey, &seal.content)
.await?;
// Construct the unsigned event
let rumor = UnsignedEvent::from_json(rumor)?;
// Return the unwrapped gift
return Ok(UnwrappedGift {
sender: rumor.pubkey,
rumor,
});
}
}
async fn try_unwrap(client: &Client, gift_wrap: &Event) -> Result<UnwrappedGift, Error> {
let signer = client.signer().await?;
let unwrapped = UnwrappedGift::from_gift_wrap(&signer, gift_wrap).await?;

View File

@@ -3,43 +3,16 @@ use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::time::Duration;
use account::Account;
use anyhow::{anyhow, Error};
use anyhow::Error;
use common::{EventUtils, RenderedProfile};
use encryption::{Encryption, SignerKind};
use gpui::{App, AppContext, Context, EventEmitter, SharedString, Task};
use itertools::Itertools;
use nostr_sdk::prelude::*;
use person::PersonRegistry;
use state::NostrRegistry;
use state::{tracker, NostrRegistry};
const SEND_RETRY: usize = 10;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SendOptions {
pub backup: bool,
pub signer_kind: SignerKind,
}
impl SendOptions {
pub fn new() -> Self {
Self {
backup: true,
signer_kind: SignerKind::default(),
}
}
pub fn backup(&self) -> bool {
self.backup
}
}
impl Default for SendOptions {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct SendReport {
pub receiver: PublicKey,
@@ -248,6 +221,19 @@ impl Room {
self.members.clone()
}
/// Returns the members of the room with their messaging relays
pub fn members_with_relays(&self, cx: &App) -> Vec<(PublicKey, Vec<RelayUrl>)> {
let nostr = NostrRegistry::global(cx);
let mut result = vec![];
for member in self.members.iter() {
let messaging_relays = nostr.read(cx).messaging_relays(member, cx);
result.push((member.to_owned(), messaging_relays));
}
result
}
/// Checks if the room has more than two members (group)
pub fn is_group(&self) -> bool {
self.members.len() > 2
@@ -276,8 +262,8 @@ impl Room {
/// Display member is always different from the current user.
pub fn display_member(&self, cx: &App) -> Profile {
let persons = PersonRegistry::global(cx);
let account = Account::global(cx);
let public_key = account.read(cx).public_key();
let nostr = NostrRegistry::global(cx);
let public_key = nostr.read(cx).identity(cx).public_key();
let target_member = self
.members
@@ -381,12 +367,9 @@ impl Room {
/// Create a new message event (unsigned)
pub fn create_message(&self, content: &str, replies: &[EventId], cx: &App) -> UnsignedEvent {
let nostr = NostrRegistry::global(cx);
let gossip = nostr.read(cx).gossip();
let read_gossip = gossip.read_blocking();
// Get current user
let account = Account::global(cx);
let public_key = account.read(cx).public_key();
let public_key = nostr.read(cx).identity(cx).public_key();
// Get room's subject
let subject = self.subject.clone();
@@ -398,7 +381,7 @@ impl Room {
// NOTE: current user will be removed from the list of receivers
for member in self.members.iter() {
// Get relay hint if available
let relay_url = read_gossip.messaging_relays(member).first().cloned();
let relay_url = nostr.read(cx).relay_hint(member, cx);
// Construct a public key tag with relay hint
let tag = TagStandard::PublicKey {
@@ -449,98 +432,63 @@ impl Room {
pub fn send_message(
&self,
rumor: &UnsignedEvent,
opts: &SendOptions,
cx: &App,
) -> Task<Result<Vec<SendReport>, Error>> {
let encryption = Encryption::global(cx);
let encryption_key = encryption.read(cx).encryption_key(cx);
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let gossip = nostr.read(cx).gossip();
let tracker = nostr.read(cx).tracker();
// Get current user's public key and relays
let current_user = nostr.read(cx).identity(cx).public_key();
let current_user_relays = nostr.read(cx).messaging_relays(&current_user, cx);
let rumor = rumor.to_owned();
let opts = opts.to_owned();
// Get all members
let mut members = self.members();
// Get all members and their messaging relays
let mut members = self.members_with_relays(cx);
cx.background_spawn(async move {
let signer_kind = opts.signer_kind;
let gossip = gossip.read().await;
// Get current user's signer and public key
let user_signer = client.signer().await?;
let user_pubkey = user_signer.get_public_key().await?;
// Get the encryption public key
let encryption_pubkey = if let Some(signer) = encryption_key.as_ref() {
signer.get_public_key().await.ok()
} else {
None
};
let signer = client.signer().await?;
// Remove the current user's public key from the list of receivers
// the current user will be handled separately
members.retain(|&pk| pk != user_pubkey);
// Determine the signer will be used based on the provided options
let signer = Self::select_signer(&opts.signer_kind, user_signer, encryption_key)?;
members.retain(|(this, _)| this != &current_user);
// Collect the send reports
let mut reports: Vec<SendReport> = vec![];
for member in members.into_iter() {
// Get user's messaging relays
let urls = gossip.messaging_relays(&member);
// Get user's encryption public key if available
let encryption = gossip.announcement(&member).map(|a| a.public_key());
for (receiver, relays) in members.into_iter() {
// Check if there are any relays to send the message to
if urls.is_empty() {
reports.push(SendReport::new(member).relays_not_found());
if relays.is_empty() {
reports.push(SendReport::new(receiver).relays_not_found());
continue;
}
// Skip sending if using encryption signer but receiver's encryption keys not found
if encryption.is_none() && matches!(signer_kind, SignerKind::Encryption) {
reports.push(SendReport::new(member).device_not_found());
continue;
// Ensure relay connection
for url in relays.iter() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
// Ensure connections to the relays
gossip.ensure_connections(&client, &urls).await;
// Determine the receiver based on the signer kind
let receiver = Self::select_receiver(&signer_kind, member, encryption)?;
// Construct the gift wrap event
let event = EventBuilder::gift_wrap(
&signer,
&receiver,
rumor.clone(),
vec![Tag::public_key(member)],
)
.await?;
let event =
EventBuilder::gift_wrap(&signer, &receiver, rumor.clone(), vec![]).await?;
// Send the gift wrap event to the messaging relays
match client.send_event_to(urls, &event).await {
match client.send_event_to(relays, &event).await {
Ok(output) => {
let id = output.id().to_owned();
let auth = output.failed.iter().any(|(_, s)| s.starts_with("auth-"));
let report = SendReport::new(receiver).status(output);
let tracker = tracker().read().await;
if auth {
// Wait for authenticated and resent event successfully
for attempt in 0..=SEND_RETRY {
let tracker = tracker.read().await;
let ids = tracker.resent_ids();
// Check if event was successfully resent
if let Some(output) = ids.iter().find(|e| e.id() == &id).cloned() {
let output = SendReport::new(receiver).status(output);
reports.push(output);
if tracker.is_sent_by_coop(&id) {
let output = Output::new(id);
let report = SendReport::new(receiver).status(output);
reports.push(report);
break;
}
@@ -562,55 +510,35 @@ impl Room {
}
}
// Return early if the user disabled backup.
//
// Coop will not send a gift wrap event to the current user.
if !opts.backup() {
return Ok(reports);
}
// Skip sending if using encryption signer but receiver's encryption keys not found
if encryption_pubkey.is_none() && matches!(signer_kind, SignerKind::Encryption) {
reports.push(SendReport::new(user_pubkey).device_not_found());
return Ok(reports);
}
// Determine the receiver based on the signer kind
let receiver = Self::select_receiver(&signer_kind, user_pubkey, encryption_pubkey)?;
// Construct the gift-wrapped event
let event = EventBuilder::gift_wrap(
&signer,
&receiver,
rumor.clone(),
vec![Tag::public_key(user_pubkey)],
)
.await?;
let event =
EventBuilder::gift_wrap(&signer, &current_user, rumor.clone(), vec![]).await?;
// Only send a backup message to current user if sent successfully to others
if reports.iter().all(|r| r.is_sent_success()) {
let urls = gossip.messaging_relays(&user_pubkey);
// Check if there are any relays to send the event to
if urls.is_empty() {
reports.push(SendReport::new(user_pubkey).relays_not_found());
if current_user_relays.is_empty() {
reports.push(SendReport::new(current_user).relays_not_found());
return Ok(reports);
}
// Ensure connections to the relays
gossip.ensure_connections(&client, &urls).await;
// Ensure relay connection
for url in current_user_relays.iter() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
// Send the event to the messaging relays
match client.send_event_to(urls, &event).await {
match client.send_event_to(current_user_relays, &event).await {
Ok(output) => {
reports.push(SendReport::new(user_pubkey).status(output));
reports.push(SendReport::new(current_user).status(output));
}
Err(e) => {
reports.push(SendReport::new(user_pubkey).error(e.to_string()));
reports.push(SendReport::new(current_user).error(e.to_string()));
}
}
} else {
reports.push(SendReport::new(user_pubkey).on_hold(event));
reports.push(SendReport::new(current_user).on_hold(event));
}
Ok(reports)
@@ -625,10 +553,8 @@ impl Room {
) -> Task<Result<Vec<SendReport>, Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let gossip = nostr.read(cx).gossip();
cx.background_spawn(async move {
let gossip = gossip.read().await;
let mut resend_reports = vec![];
for report in reports.into_iter() {
@@ -657,23 +583,13 @@ impl Room {
// Process the on hold event if it exists
if let Some(event) = report.on_hold {
let urls = gossip.messaging_relays(&receiver);
// Check if there are any relays to send the event to
if urls.is_empty() {
resend_reports.push(SendReport::new(receiver).relays_not_found());
} else {
// Ensure connections to the relays
gossip.ensure_connections(&client, &urls).await;
// Send the event to the messaging relays
match client.send_event_to(urls, &event).await {
Ok(output) => {
resend_reports.push(SendReport::new(receiver).status(output));
}
Err(e) => {
resend_reports.push(SendReport::new(receiver).error(e.to_string()));
}
// Send the event to the messaging relays
match client.send_event(&event).await {
Ok(output) => {
resend_reports.push(SendReport::new(receiver).status(output));
}
Err(e) => {
resend_reports.push(SendReport::new(receiver).error(e.to_string()));
}
}
}
@@ -682,31 +598,4 @@ impl Room {
Ok(resend_reports)
})
}
fn select_signer<T>(kind: &SignerKind, user: T, encryption: Option<T>) -> Result<T, Error>
where
T: NostrSigner,
{
match kind {
SignerKind::Encryption => {
Ok(encryption.ok_or_else(|| anyhow!("No encryption key found"))?)
}
SignerKind::User => Ok(user),
SignerKind::Auto => Ok(encryption.unwrap_or(user)),
}
}
fn select_receiver(
kind: &SignerKind,
member: PublicKey,
encryption: Option<PublicKey>,
) -> Result<PublicKey, Error> {
match kind {
SignerKind::Encryption => {
Ok(encryption.ok_or_else(|| anyhow!("Receiver's encryption key not found"))?)
}
SignerKind::User => Ok(member),
SignerKind::Auto => Ok(encryption.unwrap_or(member)),
}
}
}