merged previous stuffs on master

This commit is contained in:
2026-02-20 19:48:03 +07:00
parent 014757cfc9
commit b88955e62c
176 changed files with 11152 additions and 11212 deletions

View File

@@ -7,16 +7,14 @@ use std::time::Duration;
use anyhow::{anyhow, Context as AnyhowContext, Error};
use common::EventUtils;
use device::DeviceRegistry;
use flume::Sender;
use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher;
use gpui::{
App, AppContext, Context, Entity, EventEmitter, Global, Subscription, Task, WeakEntity,
App, AppContext, Context, Entity, EventEmitter, Global, Subscription, Task, WeakEntity, Window,
};
use nostr_sdk::prelude::*;
use smallvec::{smallvec, SmallVec};
use state::{tracker, NostrRegistry, GIFTWRAP_SUBSCRIPTION};
use state::{NostrRegistry, RelayState, DEVICE_GIFTWRAP, TIMEOUT, USER_GIFTWRAP};
mod message;
mod room;
@@ -24,8 +22,8 @@ mod room;
pub use message::*;
pub use room::*;
pub fn init(cx: &mut App) {
ChatRegistry::set_global(cx.new(ChatRegistry::new), cx);
pub fn init(window: &mut Window, cx: &mut App) {
ChatRegistry::set_global(cx.new(|cx| ChatRegistry::new(window, cx)), cx);
}
struct GlobalChatRegistry(Entity<ChatRegistry>);
@@ -45,11 +43,9 @@ pub enum ChatEvent {
/// Channel signal.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum NostrEvent {
enum Signal {
/// Message received from relay pool
Message(NewMessage),
/// Unwrapping status
Unwrapping(bool),
/// Eose received from relay pool
Eose,
}
@@ -57,23 +53,17 @@ enum NostrEvent {
/// Chat Registry
#[derive(Debug)]
pub struct ChatRegistry {
/// Relay state for messaging relay list
messaging_relay_list: Entity<RelayState>,
/// Collection of all chat rooms
rooms: Vec<Entity<Room>>,
/// Loading status of the registry
loading: bool,
/// Tracking the status of unwrapping gift wrap events.
tracking_flag: Arc<AtomicBool>,
/// Channel's sender for communication between nostr and gpui
sender: Sender<NostrEvent>,
/// Handle notifications asynchronous task
notifications: Option<Task<Result<(), Error>>>,
/// Tasks for asynchronous operations
tasks: Vec<Task<()>>,
/// Async tasks
tasks: SmallVec<[Task<Result<(), Error>>; 2]>,
/// Subscriptions
_subscriptions: SmallVec<[Subscription; 1]>,
@@ -93,79 +83,52 @@ impl ChatRegistry {
}
/// Create a new chat registry instance
fn new(cx: &mut Context<Self>) -> Self {
fn new(window: &mut Window, cx: &mut Context<Self>) -> Self {
let messaging_relay_list = cx.new(|_| RelayState::default());
let nostr = NostrRegistry::global(cx);
let identity = nostr.read(cx).identity();
let device = DeviceRegistry::global(cx);
let device_signer = device.read(cx).device_signer.clone();
// A flag to indicate if the registry is loading
let tracking_flag = Arc::new(AtomicBool::new(true));
// Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<NostrEvent>(2048);
let mut tasks = vec![];
let mut subscriptions = smallvec![];
subscriptions.push(
// Observe the identity
cx.observe(&identity, |this, state, cx| {
if state.read(cx).has_public_key() {
// Handle nostr notifications
this.handle_notifications(cx);
// Track unwrapping progress
this.tracking(cx);
// Observe the nip65 state and load chat rooms on every state change
cx.observe(&nostr, |this, state, cx| {
match state.read(cx).relay_list_state() {
RelayState::Idle => {
this.reset(cx);
}
RelayState::Configured => {
this.ensure_messaging_relays(cx);
}
_ => {}
}
}),
);
subscriptions.push(
// Observe the device signer state
cx.observe(&device_signer, |this, state, cx| {
if state.read(cx).is_some() {
this.handle_notifications(cx);
// Observe the nip17 state and load chat rooms on every state change
cx.observe(&messaging_relay_list, |this, state, cx| {
match state.read(cx) {
RelayState::Configured => {
this.get_messages(cx);
}
_ => {
this.get_rooms(cx);
}
}
}),
);
tasks.push(
// Update GPUI states
cx.spawn(async move |this, cx| {
while let Ok(message) = rx.recv_async().await {
match message {
NostrEvent::Message(message) => {
this.update(cx, |this, cx| {
this.new_message(message, cx);
})
.ok();
}
NostrEvent::Eose => {
this.update(cx, |this, cx| {
this.get_rooms(cx);
})
.ok();
}
NostrEvent::Unwrapping(status) => {
this.update(cx, |this, cx| {
this.set_loading(status, cx);
this.get_rooms(cx);
})
.ok();
}
};
}
}),
);
// Run at the end of current cycle
cx.defer_in(window, |this, _window, cx| {
this.handle_notifications(cx);
this.tracking(cx);
});
Self {
messaging_relay_list,
rooms: vec![],
loading: true,
tracking_flag,
sender: tx.clone(),
notifications: None,
tasks,
tracking_flag: Arc::new(AtomicBool::new(false)),
tasks: smallvec![],
_subscriptions: subscriptions,
}
}
@@ -174,22 +137,23 @@ impl ChatRegistry {
fn handle_notifications(&mut self, cx: &mut Context<Self>) {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let device = DeviceRegistry::global(cx);
let device_signer = device.read(cx).signer(cx);
let signer = nostr.read(cx).signer();
let status = self.tracking_flag.clone();
let tx = self.sender.clone();
let initialized_at = Timestamp::now();
let sub_id1 = SubscriptionId::new(DEVICE_GIFTWRAP);
let sub_id2 = SubscriptionId::new(USER_GIFTWRAP);
// Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<Signal>(1024);
self.tasks.push(cx.background_spawn(async move {
let initialized_at = Timestamp::now();
let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
let device_signer = signer.get_encryption_signer().await;
let mut notifications = client.notifications();
let mut processed_events = HashSet::new();
while let Ok(notification) = notifications.recv().await {
let RelayPoolNotification::Message { message, .. } = notification else {
while let Some(notification) = notifications.next().await {
let ClientNotification::Message { message, .. } = notification else {
// Skip non-message notifications
continue;
};
@@ -206,99 +170,187 @@ impl ChatRegistry {
continue;
}
log::info!("Received gift wrap event: {:?}", event);
// Extract the rumor from the gift wrap event
match Self::extract_rumor(&client, &device_signer, event.as_ref()).await {
Ok(rumor) => match rumor.created_at >= initialized_at {
true => {
// Check if the event is sent by coop
let sent_by_coop = {
let tracker = tracker().read().await;
tracker.is_sent_by_coop(&event.id)
};
// No need to emit if sent by coop
// the event is already emitted
if !sent_by_coop {
let new_message = NewMessage::new(event.id, rumor);
let signal = NostrEvent::Message(new_message);
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
tx.send_async(signal).await.ok();
}
tx.send_async(signal).await?;
}
false => {
status.store(true, Ordering::Release);
}
},
Err(e) => {
log::warn!("Failed to unwrap: {e}");
log::warn!("Failed to unwrap the gift wrap event: {e}");
}
}
}
RelayMessage::EndOfStoredEvents(id) => {
if id.as_ref() == &subscription_id {
tx.send_async(NostrEvent::Eose).await.ok();
if id.as_ref() == &sub_id1 || id.as_ref() == &sub_id2 {
tx.send_async(Signal::Eose).await?;
}
}
_ => {}
}
}
Ok(())
}));
self.tasks.push(cx.spawn(async move |this, cx| {
while let Ok(message) = rx.recv_async().await {
match message {
Signal::Message(message) => {
this.update(cx, |this, cx| {
this.new_message(message, cx);
})?;
}
Signal::Eose => {
this.update(cx, |this, cx| {
this.get_rooms(cx);
})?;
}
};
}
Ok(())
}));
}
/// Tracking the status of unwrapping gift wrap events.
fn tracking(&mut self, cx: &mut Context<Self>) {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let status = self.tracking_flag.clone();
let tx = self.sender.clone();
self.notifications = Some(cx.background_spawn(async move {
let loop_duration = Duration::from_secs(12);
let mut is_start_processing = false;
let mut total_loops = 0;
self.tasks.push(cx.background_spawn(async move {
let loop_duration = Duration::from_secs(10);
loop {
if client.has_signer().await {
total_loops += 1;
if status.load(Ordering::Acquire) {
is_start_processing = true;
// Reset gift wrap processing flag
_ = status.compare_exchange(
true,
false,
Ordering::Release,
Ordering::Relaxed,
);
tx.send_async(NostrEvent::Unwrapping(true)).await.ok();
} else {
// Only run further if we are already processing
// Wait until after 2 loops to prevent exiting early while events are still being processed
if is_start_processing && total_loops >= 2 {
tx.send_async(NostrEvent::Unwrapping(false)).await.ok();
// Reset the counter
is_start_processing = false;
total_loops = 0;
}
}
if status.load(Ordering::Acquire) {
_ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed);
}
smol::Timer::after(loop_duration).await;
}
}));
}
/// Get the loading status of the chat registry
pub fn loading(&self) -> bool {
self.loading
fn ensure_messaging_relays(&mut self, cx: &mut Context<Self>) {
let state = self.messaging_relay_list.downgrade();
let task = self.verify_relays(cx);
self.tasks.push(cx.spawn(async move |_this, cx| {
let result = task.await?;
// Update state
state.update(cx, |this, cx| {
*this = result;
cx.notify();
})?;
Ok(())
}));
}
/// Set the loading status of the chat registry
pub fn set_loading(&mut self, loading: bool, cx: &mut Context<Self>) {
self.loading = loading;
cx.notify();
// Verify messaging relay list for current user
fn verify_relays(&mut self, cx: &mut Context<Self>) -> Task<Result<RelayState, Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
let public_key = signer.public_key().unwrap();
let write_relays = nostr.read(cx).write_relays(&public_key, cx);
cx.background_spawn(async move {
let urls = write_relays.await;
// Construct filter for inbox relays
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
// Construct target for subscription
let target: HashMap<&RelayUrl, Filter> =
urls.iter().map(|relay| (relay, filter.clone())).collect();
// Stream events from user's write relays
let mut stream = client
.stream_events(target)
.timeout(Duration::from_secs(TIMEOUT))
.await?;
while let Some((_url, res)) = stream.next().await {
match res {
Ok(event) => {
log::info!("Received relay list event: {event:?}");
return Ok(RelayState::Configured);
}
Err(e) => {
log::error!("Failed to receive relay list event: {e}");
}
}
}
Ok(RelayState::NotConfigured)
})
}
/// Get all messages for current user
fn get_messages(&mut self, cx: &mut Context<Self>) {
let task = self.subscribe_to_giftwrap_events(cx);
self.tasks.push(cx.spawn(async move |_this, _cx| {
task.await?;
// Update state
Ok(())
}));
}
/// Continuously get gift wrap events for the current user in their messaging relays
fn subscribe_to_giftwrap_events(&mut self, cx: &mut Context<Self>) -> Task<Result<(), Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
let public_key = signer.public_key().unwrap();
let messaging_relays = nostr.read(cx).messaging_relays(&public_key, cx);
cx.background_spawn(async move {
let urls = messaging_relays.await;
let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
let id = SubscriptionId::new(USER_GIFTWRAP);
// Construct target for subscription
let target: HashMap<&RelayUrl, Filter> =
urls.iter().map(|relay| (relay, filter.clone())).collect();
let output = client.subscribe(target).with_id(id).await?;
log::info!(
"Successfully subscribed to gift-wrap messages on: {:?}",
output.success
);
Ok(())
})
}
/// Get the relay state
pub fn relay_state(&self, cx: &App) -> RelayState {
self.messaging_relay_list.read(cx).clone()
}
/// Get the loading status of the chat registry
pub fn loading(&self) -> bool {
self.tracking_flag.load(Ordering::Acquire)
}
/// Get a weak reference to a room by its ID.
@@ -309,47 +361,60 @@ impl ChatRegistry {
.map(|this| this.downgrade())
}
/// Get all ongoing rooms.
pub fn ongoing_rooms(&self, cx: &App) -> Vec<Entity<Room>> {
/// Get all rooms based on the filter.
pub fn rooms(&self, filter: &RoomKind, cx: &App) -> Vec<Entity<Room>> {
self.rooms
.iter()
.filter(|room| room.read(cx).kind == RoomKind::Ongoing)
.filter(|room| &room.read(cx).kind == filter)
.cloned()
.collect()
}
/// Get all request rooms.
pub fn request_rooms(&self, cx: &App) -> Vec<Entity<Room>> {
/// Count the number of rooms based on the filter.
pub fn count(&self, filter: &RoomKind, cx: &App) -> usize {
self.rooms
.iter()
.filter(|room| room.read(cx).kind != RoomKind::Ongoing)
.cloned()
.collect()
.filter(|room| &room.read(cx).kind == filter)
.count()
}
/// Add a new room to the start of list.
pub fn add_room<I>(&mut self, room: I, cx: &mut Context<Self>)
where
I: Into<Room>,
I: Into<Room> + 'static,
{
self.rooms.insert(0, cx.new(|_| room.into()));
cx.notify();
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
cx.spawn(async move |this, cx| {
let signer = client.signer()?;
let public_key = signer.get_public_key().await.ok()?;
let room: Room = room.into().organize(&public_key);
this.update(cx, |this, cx| {
this.rooms.insert(0, cx.new(|_| room));
cx.emit(ChatEvent::Ping);
cx.notify();
})
.ok()
})
.detach();
}
/// Emit an open room event.
///
/// If the room is new, add it to the registry.
pub fn emit_room(&mut self, room: WeakEntity<Room>, cx: &mut Context<Self>) {
if let Some(room) = room.upgrade() {
let id = room.read(cx).id;
pub fn emit_room(&mut self, room: &Entity<Room>, cx: &mut Context<Self>) {
// Get the room's ID.
let id = room.read(cx).id;
// If the room is new, add it to the registry.
if !self.rooms.iter().any(|r| r.read(cx).id == id) {
self.rooms.insert(0, room);
}
// Emit the open room event.
cx.emit(ChatEvent::OpenRoom(id));
// If the room is new, add it to the registry.
if !self.rooms.iter().any(|r| r.read(cx).id == id) {
self.rooms.insert(0, room.to_owned());
}
// Emit the open room event.
cx.emit(ChatEvent::OpenRoom(id));
}
/// Close a room.
@@ -365,28 +430,27 @@ impl ChatRegistry {
cx.notify();
}
/// Search rooms by their name.
pub fn search(&self, query: &str, cx: &App) -> Vec<Entity<Room>> {
/// Finding rooms based on a query.
pub fn find(&self, query: &str, cx: &App) -> Vec<Entity<Room>> {
let matcher = SkimMatcherV2::default();
self.rooms
.iter()
.filter(|room| {
matcher
.fuzzy_match(room.read(cx).display_name(cx).as_ref(), query)
.is_some()
})
.cloned()
.collect()
}
/// Search rooms by public keys.
pub fn search_by_public_key(&self, public_key: PublicKey, cx: &App) -> Vec<Entity<Room>> {
self.rooms
.iter()
.filter(|room| room.read(cx).members.contains(&public_key))
.cloned()
.collect()
if let Ok(public_key) = PublicKey::parse(query) {
self.rooms
.iter()
.filter(|room| room.read(cx).members.contains(&public_key))
.cloned()
.collect()
} else {
self.rooms
.iter()
.filter(|room| {
matcher
.fuzzy_match(room.read(cx).display_name(cx).as_ref(), query)
.is_some()
})
.cloned()
.collect()
}
}
/// Reset the registry.
@@ -427,23 +491,16 @@ impl ChatRegistry {
pub fn get_rooms(&mut self, cx: &mut Context<Self>) {
let task = self.get_rooms_from_database(cx);
self.tasks.push(
// Run and finished in the background
cx.spawn(async move |this, cx| {
match task.await {
Ok(rooms) => {
this.update(cx, move |this, cx| {
this.extend_rooms(rooms, cx);
this.sort(cx);
})
.ok();
}
Err(e) => {
log::error!("Failed to load rooms: {e}")
}
};
}),
);
cx.spawn(async move |this, cx| {
let rooms = task.await.ok()?;
this.update(cx, move |this, cx| {
this.extend_rooms(rooms, cx);
this.sort(cx);
})
.ok()
})
.detach();
}
/// Create a task to load rooms from the database
@@ -452,10 +509,13 @@ impl ChatRegistry {
let client = nostr.read(cx).client();
cx.background_spawn(async move {
let signer = client.signer().await?;
let signer = client.signer().context("Signer not found")?;
let public_key = signer.get_public_key().await?;
// Get contacts
let contacts = client.database().contacts_public_keys(public_key).await?;
// Construct authored filter
let authored_filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.custom_tag(SingleLetterTag::lowercase(Alphabet::A), public_key);
@@ -463,6 +523,7 @@ impl ChatRegistry {
// Get all authored events
let authored = client.database().query(authored_filter).await?;
// Construct addressed filter
let addressed_filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.custom_tag(SingleLetterTag::lowercase(Alphabet::P), public_key);
@@ -473,6 +534,7 @@ impl ChatRegistry {
// Merge authored and addressed events
let events = authored.merge(addressed);
// Collect results
let mut rooms: HashSet<Room> = HashSet::new();
let mut grouped: HashMap<u64, Vec<UnsignedEvent>> = HashMap::new();
@@ -488,24 +550,21 @@ impl ChatRegistry {
for (_id, mut messages) in grouped.into_iter() {
messages.sort_by_key(|m| Reverse(m.created_at));
// Always use the latest message
let Some(latest) = messages.first() else {
continue;
};
let mut room = Room::from(latest);
if rooms.iter().any(|r| r.id == room.id) {
continue;
}
let mut public_keys = room.members();
public_keys.retain(|pk| pk != &public_key);
// Construct the room from the latest message.
//
// Call `.organize` to ensure the current user is at the end of the list.
let mut room = Room::from(latest).organize(&public_key);
// Check if the user has responded to the room
let user_sent = messages.iter().any(|m| m.pubkey == public_key);
// Check if public keys are from the user's contacts
let is_contact = public_keys.iter().any(|k| contacts.contains(k));
let is_contact = room.members.iter().any(|k| contacts.contains(k));
// Set the room's kind based on status
if user_sent || is_contact {
@@ -519,6 +578,24 @@ impl ChatRegistry {
})
}
/// Parse a nostr event into a message and push it to the belonging room
///
/// If the room doesn't exist, it will be created.
/// Updates room ordering based on the most recent messages.
pub fn new_message(&mut self, message: NewMessage, cx: &mut Context<Self>) {
match self.rooms.iter().find(|e| e.read(cx).id == message.room) {
Some(room) => {
room.update(cx, |this, cx| {
this.push_message(message, cx);
});
}
None => {
// Push the new room to the front of the list
self.add_room(message.rumor, cx);
}
}
}
/// Trigger a refresh of the opened chat rooms by their IDs
pub fn refresh_rooms(&mut self, ids: Option<Vec<u64>>, cx: &mut Context<Self>) {
if let Some(ids) = ids {
@@ -532,54 +609,7 @@ impl ChatRegistry {
}
}
/// Parse a Nostr event into a Coop Message and push it to the belonging room
///
/// If the room doesn't exist, it will be created.
/// Updates room ordering based on the most recent messages.
pub fn new_message(&mut self, message: NewMessage, cx: &mut Context<Self>) {
let nostr = NostrRegistry::global(cx);
// Get the unique id
let id = message.rumor.uniq_id();
// Get the author
let author = message.rumor.pubkey;
match self.rooms.iter().find(|room| room.read(cx).id == id) {
Some(room) => {
let new_message = message.rumor.created_at > room.read(cx).created_at;
let created_at = message.rumor.created_at;
// Update room
room.update(cx, |this, cx| {
// Update the last timestamp if the new message is newer
if new_message {
this.set_created_at(created_at, cx);
}
// Set this room is ongoing if the new message is from current user
if author == nostr.read(cx).identity().read(cx).public_key() {
this.set_ongoing(cx);
}
// Emit the new message to the room
this.emit_message(message, cx);
});
// Resort all rooms in the registry by their created at (after updated)
if new_message {
self.sort(cx);
}
}
None => {
// Push the new room to the front of the list
self.add_room(&message.rumor, cx);
// Notify the UI about the new room
cx.emit(ChatEvent::Ping);
}
}
}
// Unwraps a gift-wrapped event and processes its contents.
/// Unwraps a gift-wrapped event and processes its contents.
async fn extract_rumor(
client: &Client,
device_signer: &Option<Arc<dyn NostrSigner>>,
@@ -603,35 +633,50 @@ impl ChatRegistry {
Ok(rumor_unsigned)
}
// Helper method to try unwrapping with different signers
/// Helper method to try unwrapping with different signers
async fn try_unwrap(
client: &Client,
device_signer: &Option<Arc<dyn NostrSigner>>,
gift_wrap: &Event,
) -> Result<UnwrappedGift, Error> {
if let Some(signer) = device_signer.as_ref() {
let seal = signer
.nip44_decrypt(&gift_wrap.pubkey, &gift_wrap.content)
.await?;
// Try with the device signer first
if let Some(signer) = device_signer {
if let Ok(unwrapped) = Self::try_unwrap_with(gift_wrap, signer).await {
return Ok(unwrapped);
};
};
let seal: Event = Event::from_json(seal)?;
seal.verify_with_ctx(&SECP256K1)?;
let rumor = signer.nip44_decrypt(&seal.pubkey, &seal.content).await?;
let rumor = UnsignedEvent::from_json(rumor)?;
return Ok(UnwrappedGift {
sender: seal.pubkey,
rumor,
});
}
let signer = client.signer().await?;
let unwrapped = UnwrappedGift::from_gift_wrap(&signer, gift_wrap).await?;
// Try with the user's signer
let user_signer = client.signer().context("Signer not found")?;
let unwrapped = UnwrappedGift::from_gift_wrap(user_signer, gift_wrap).await?;
Ok(unwrapped)
}
/// Attempts to unwrap a gift wrap event with a given signer.
async fn try_unwrap_with(
gift_wrap: &Event,
signer: &Arc<dyn NostrSigner>,
) -> Result<UnwrappedGift, Error> {
// Get the sealed event
let seal = signer
.nip44_decrypt(&gift_wrap.pubkey, &gift_wrap.content)
.await?;
// Verify the sealed event
let seal: Event = Event::from_json(seal)?;
seal.verify_with_ctx(&SECP256K1)?;
// Get the rumor event
let rumor = signer.nip44_decrypt(&seal.pubkey, &seal.content).await?;
let rumor = UnsignedEvent::from_json(rumor)?;
Ok(UnwrappedGift {
sender: seal.pubkey,
rumor,
})
}
/// Stores an unwrapped event in local database with reference to original
async fn set_rumor(client: &Client, id: EventId, rumor: &UnsignedEvent) -> Result<(), Error> {
let rumor_id = rumor.id.context("Rumor is missing an event id")?;

View File

@@ -1,17 +1,25 @@
use std::hash::Hash;
use common::EventUtils;
use nostr_sdk::prelude::*;
/// New message.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct NewMessage {
pub room: u64,
pub gift_wrap: EventId,
pub rumor: UnsignedEvent,
}
impl NewMessage {
pub fn new(gift_wrap: EventId, rumor: UnsignedEvent) -> Self {
Self { gift_wrap, rumor }
let room = rumor.uniq_id();
Self {
room,
gift_wrap,
rumor,
}
}
}

View File

@@ -1,81 +1,66 @@
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::time::Duration;
use anyhow::Error;
use anyhow::{Context as AnyhowContext, Error};
use common::EventUtils;
use gpui::{App, AppContext, Context, EventEmitter, SharedString, Task};
use itertools::Itertools;
use nostr_sdk::prelude::*;
use person::{Person, PersonRegistry};
use state::{tracker, NostrRegistry};
use settings::{RoomConfig, SignerKind};
use state::{NostrRegistry, TIMEOUT};
use crate::NewMessage;
const SEND_RETRY: usize = 10;
use crate::{ChatRegistry, NewMessage};
#[derive(Debug, Clone)]
pub struct SendReport {
pub receiver: PublicKey,
pub status: Option<Output<EventId>>,
pub gift_wrap_id: Option<EventId>,
pub error: Option<SharedString>,
pub on_hold: Option<Event>,
pub encryption: bool,
pub relays_not_found: bool,
pub device_not_found: bool,
pub output: Option<Output<EventId>>,
}
impl SendReport {
pub fn new(receiver: PublicKey) -> Self {
Self {
receiver,
status: None,
gift_wrap_id: None,
error: None,
on_hold: None,
encryption: false,
relays_not_found: false,
device_not_found: false,
output: None,
}
}
pub fn status(mut self, output: Output<EventId>) -> Self {
self.status = Some(output);
/// Set the gift wrap ID.
pub fn gift_wrap_id(mut self, gift_wrap_id: EventId) -> Self {
self.gift_wrap_id = Some(gift_wrap_id);
self
}
pub fn error(mut self, error: impl Into<SharedString>) -> Self {
/// Set the output.
pub fn output(mut self, output: Output<EventId>) -> Self {
self.output = Some(output);
self
}
/// Set the error message.
pub fn error<T>(mut self, error: T) -> Self
where
T: Into<SharedString>,
{
self.error = Some(error.into());
self
}
pub fn on_hold(mut self, event: Event) -> Self {
self.on_hold = Some(event);
self
/// Returns true if the send is pending.
pub fn pending(&self) -> bool {
self.output.is_none() && self.error.is_none()
}
pub fn encryption(mut self) -> Self {
self.encryption = true;
self
}
pub fn relays_not_found(mut self) -> Self {
self.relays_not_found = true;
self
}
pub fn device_not_found(mut self) -> Self {
self.device_not_found = true;
self
}
pub fn is_relay_error(&self) -> bool {
self.error.is_some() || self.relays_not_found
}
pub fn is_sent_success(&self) -> bool {
if let Some(output) = self.status.as_ref() {
!output.success.is_empty()
/// Returns true if the send was successful.
pub fn success(&self) -> bool {
if let Some(output) = self.output.as_ref() {
!output.failed.is_empty()
} else {
false
}
@@ -99,18 +84,25 @@ pub enum RoomKind {
Ongoing,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Room {
/// Conversation ID
pub id: u64,
/// The timestamp of the last message in the room
pub created_at: Timestamp,
/// Subject of the room
pub subject: Option<SharedString>,
/// All members of the room
pub members: Vec<PublicKey>,
pub(super) members: Vec<PublicKey>,
/// Kind
pub kind: RoomKind,
/// Configuration
config: RoomConfig,
}
impl Ord for Room {
@@ -145,11 +137,7 @@ impl From<&UnsignedEvent> for Room {
fn from(val: &UnsignedEvent) -> Self {
let id = val.uniq_id();
let created_at = val.created_at;
// Get the members from the event's tags and event's pubkey
let members = val.extract_public_keys();
// Get subject from tags
let subject = val
.tags
.find(TagKind::Subject)
@@ -161,38 +149,50 @@ impl From<&UnsignedEvent> for Room {
subject,
members,
kind: RoomKind::default(),
config: RoomConfig::default(),
}
}
}
impl From<UnsignedEvent> for Room {
fn from(val: UnsignedEvent) -> Self {
Room::from(&val)
}
}
impl Room {
/// Constructs a new room with the given receiver and tags.
pub fn new(subject: Option<String>, author: PublicKey, receivers: Vec<PublicKey>) -> Self {
// Convert receiver's public keys into tags
let mut tags: Tags = Tags::from_list(
receivers
.iter()
.map(|pubkey| Tag::public_key(pubkey.to_owned()))
.collect(),
);
// Add subject if it is present
if let Some(subject) = subject {
tags.push(Tag::from_standardized_without_cell(TagStandard::Subject(
subject,
)));
}
pub fn new<T>(author: PublicKey, receivers: T) -> Self
where
T: IntoIterator<Item = PublicKey>,
{
// Map receiver public keys to tags
let tags = Tags::from_list(receivers.into_iter().map(Tag::public_key).collect());
// Construct an unsigned event for a direct message
//
// WARNING: never sign this event
let mut event = EventBuilder::new(Kind::PrivateDirectMessage, "")
.tags(tags)
.build(author);
// Generate event ID
// Ensure that the ID is set
event.ensure_id();
Room::from(&event)
}
/// Organizes the members of the room by moving the target member to the end.
///
/// Always call this function to ensure the current user is at the end of the list.
pub fn organize(mut self, target: &PublicKey) -> Self {
if let Some(index) = self.members.iter().position(|member| member == target) {
let member = self.members.remove(index);
self.members.push(member);
}
self
}
/// Sets the kind of the room and returns the modified room
pub fn kind(mut self, kind: RoomKind) -> Self {
self.kind = kind;
@@ -227,28 +227,6 @@ impl Room {
self.members.clone()
}
/// Returns the members of the room with their messaging relays
pub fn members_with_relays(&self, cx: &App) -> Task<Vec<(PublicKey, Vec<RelayUrl>)>> {
let nostr = NostrRegistry::global(cx);
let mut tasks = vec![];
for member in self.members.iter() {
let task = nostr.read(cx).messaging_relays(member, cx);
tasks.push((*member, task));
}
cx.background_spawn(async move {
let mut results = vec![];
for (public_key, task) in tasks.into_iter() {
let urls = task.await;
results.push((public_key, urls));
}
results
})
}
/// Checks if the room has more than two members (group)
pub fn is_group(&self) -> bool {
self.members.len() > 2
@@ -277,17 +255,7 @@ impl Room {
/// Display member is always different from the current user.
pub fn display_member(&self, cx: &App) -> Person {
let persons = PersonRegistry::global(cx);
let nostr = NostrRegistry::global(cx);
let public_key = nostr.read(cx).identity().read(cx).public_key();
let target_member = self
.members
.iter()
.find(|&member| member != &public_key)
.or_else(|| self.members.first())
.expect("Room should have at least one member");
persons.read(cx).get(target_member, cx)
persons.read(cx).get(&self.members[0], cx)
}
/// Merge the names of the first two members of the room.
@@ -308,7 +276,7 @@ impl Room {
.collect::<Vec<_>>()
.join(", ");
if profiles.len() > 2 {
if profiles.len() > 3 {
name = format!("{}, +{}", name, profiles.len() - 2);
}
@@ -318,9 +286,21 @@ impl Room {
}
}
/// Emits a new message signal to the current room
pub fn emit_message(&self, message: NewMessage, cx: &mut Context<Self>) {
/// Push a new message to the current room
pub fn push_message(&mut self, message: NewMessage, cx: &mut Context<Self>) {
let created_at = message.rumor.created_at;
let new_message = created_at > self.created_at;
// Emit the incoming message event
cx.emit(RoomEvent::Incoming(message));
if new_message {
self.set_created_at(created_at, cx);
// Sort chats after emitting a new message
ChatRegistry::global(cx).update(cx, |this, cx| {
this.sort(cx);
});
}
}
/// Emits a signal to reload the current room's messages.
@@ -329,32 +309,43 @@ impl Room {
}
/// Get gossip relays for each member
pub fn connect(&self, cx: &App) -> Task<Result<(), Error>> {
pub fn early_connect(&self, cx: &App) -> Task<Result<(), Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let members = self.members();
let id = SubscriptionId::new(format!("room-{}", self.id));
let subscription_id = SubscriptionId::new(format!("room-{}", self.id));
cx.background_spawn(async move {
let signer = client.signer().await?;
let signer = client.signer().context("Signer not found")?;
let public_key = signer.get_public_key().await?;
// Subscription options
let opts = SubscribeAutoCloseOptions::default()
.timeout(Some(Duration::from_secs(2)))
.exit_policy(ReqExitPolicy::ExitOnEOSE);
for member in members.into_iter() {
if member == public_key {
continue;
};
// Construct a filter for gossip relays
let filter = Filter::new().kind(Kind::RelayList).author(member).limit(1);
// Construct a filter for messaging relays
let inbox = Filter::new()
.kind(Kind::InboxRelays)
.author(member)
.limit(1);
// Construct a filter for announcement
let announcement = Filter::new()
.kind(Kind::Custom(10044))
.author(member)
.limit(1);
// Subscribe to get member's gossip relays
client
.subscribe_with_id(id.clone(), filter, Some(opts))
.subscribe(vec![inbox, announcement])
.with_id(subscription_id.clone())
.close_on(
SubscribeAutoCloseOptions::default()
.timeout(Some(Duration::from_secs(TIMEOUT)))
.exit_policy(ReqExitPolicy::ExitOnEOSE),
)
.await?;
}
@@ -386,68 +377,265 @@ impl Room {
})
}
/// Create a new message event (unsigned)
pub fn create_message(&self, content: &str, replies: &[EventId], cx: &App) -> UnsignedEvent {
// Construct a rumor event for direct message
pub fn rumor<S, I>(&self, content: S, replies: I, cx: &App) -> Option<UnsignedEvent>
where
S: Into<String>,
I: IntoIterator<Item = EventId>,
{
let kind = Kind::PrivateDirectMessage;
let content: String = content.into();
let replies: Vec<EventId> = replies.into_iter().collect();
let persons = PersonRegistry::global(cx);
let nostr = NostrRegistry::global(cx);
// Get current user
let public_key = nostr.read(cx).identity().read(cx).public_key();
// Get current user's public key
let sender = nostr.read(cx).signer().public_key()?;
// Get room's subject
let subject = self.subject.clone();
// Get all members
let members: Vec<Person> = self
.members
.iter()
.filter(|public_key| public_key != &&sender)
.map(|member| persons.read(cx).get(member, cx))
.collect();
// Construct event's tags
let mut tags = vec![];
// Add receivers
//
// NOTE: current user will be removed from the list of receivers
for member in self.members.iter() {
// Get relay hint if available
let relay_url = nostr.read(cx).relay_hint(member, cx);
// Construct a public key tag with relay hint
let tag = TagStandard::PublicKey {
public_key: member.to_owned(),
relay_url,
alias: None,
uppercase: false,
};
tags.push(Tag::from_standardized_without_cell(tag));
}
// Add subject tag if it's present
if let Some(value) = subject {
// Add subject tag if present
if let Some(value) = self.subject.as_ref() {
tags.push(Tag::from_standardized_without_cell(TagStandard::Subject(
value.to_string(),
)));
}
// Add reply/quote tag
if replies.len() == 1 {
tags.push(Tag::event(replies[0]))
} else {
for id in replies {
let tag = TagStandard::Quote {
event_id: id.to_owned(),
relay_url: None,
public_key: None,
};
tags.push(Tag::from_standardized_without_cell(tag))
}
// Add all reply tags
for id in replies.into_iter() {
tags.push(Tag::event(id))
}
// Construct a direct message event
//
// WARNING: never sign and send this event to relays
let mut event = EventBuilder::new(Kind::PrivateDirectMessage, content)
.tags(tags)
.build(public_key);
// Add all receiver tags
for member in members.into_iter() {
// Skip current user
if member.public_key() == sender {
continue;
}
// Ensure the event id has been generated
tags.push(Tag::from_standardized_without_cell(
TagStandard::PublicKey {
public_key: member.public_key(),
relay_url: member.messaging_relay_hint(),
alias: None,
uppercase: false,
},
));
}
// Construct a direct message rumor event
// WARNING: never sign and send this event to relays
let mut event = EventBuilder::new(kind, content).tags(tags).build(sender);
// Ensure that the ID is set
event.ensure_id();
event
Some(event)
}
/// Send rumor event to all members's messaging relays
pub fn send(&self, rumor: UnsignedEvent, cx: &App) -> Option<Task<Vec<SendReport>>> {
let persons = PersonRegistry::global(cx);
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
// Get room's config
let config = self.config.clone();
// Get current user's public key
let sender = nostr.read(cx).signer().public_key()?;
// Get all members (excluding sender)
let members: Vec<Person> = self
.members
.iter()
.filter(|public_key| public_key != &&sender)
.map(|member| persons.read(cx).get(member, cx))
.collect();
Some(cx.background_spawn(async move {
let signer_kind = config.signer_kind();
let user_signer = signer.get().await;
let encryption_signer = signer.get_encryption_signer().await;
let mut reports = Vec::new();
for member in members {
let relays = member.messaging_relays();
let announcement = member.announcement();
// Skip if member has no messaging relays
if relays.is_empty() {
reports.push(SendReport::new(member.public_key()).error("No messaging relays"));
continue;
}
// Ensure relay connections
for url in relays.iter() {
client
.add_relay(url)
.and_connect()
.capabilities(RelayCapabilities::GOSSIP)
.await
.ok();
}
// When forced to use encryption signer, skip if receiver has no announcement
if signer_kind.encryption() && announcement.is_none() {
reports
.push(SendReport::new(member.public_key()).error("Encryption not found"));
continue;
}
// Determine receiver and signer based on signer kind
let (receiver, signer_to_use) = match signer_kind {
SignerKind::Auto => {
if let Some(announcement) = announcement {
if let Some(enc_signer) = encryption_signer.as_ref() {
(announcement.public_key(), enc_signer.clone())
} else {
(member.public_key(), user_signer.clone())
}
} else {
(member.public_key(), user_signer.clone())
}
}
SignerKind::Encryption => {
let Some(encryption_signer) = encryption_signer.as_ref() else {
reports.push(
SendReport::new(member.public_key()).error("Encryption not found"),
);
continue;
};
let Some(announcement) = announcement else {
reports.push(
SendReport::new(member.public_key())
.error("Announcement not found"),
);
continue;
};
(announcement.public_key(), encryption_signer.clone())
}
SignerKind::User => (member.public_key(), user_signer.clone()),
};
// Create and send gift-wrapped event
match EventBuilder::gift_wrap(&signer_to_use, &receiver, rumor.clone(), []).await {
Ok(event) => {
match client
.send_event(&event)
.to(relays)
.ack_policy(AckPolicy::none())
.await
{
Ok(output) => {
reports.push(
SendReport::new(member.public_key())
.gift_wrap_id(event.id)
.output(output),
);
}
Err(e) => {
reports.push(
SendReport::new(member.public_key()).error(e.to_string()),
);
}
}
}
Err(e) => {
reports.push(SendReport::new(member.public_key()).error(e.to_string()));
}
}
}
reports
}))
}
/*
* /// Create a new unsigned message event
pub fn create_message(
&self,
content: &str,
replies: Vec<EventId>,
cx: &App,
) -> Task<Result<UnsignedEvent, Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let subject = self.subject.clone();
let content = content.to_string();
let mut member_and_relay_hints = HashMap::new();
// Populate the hashmap with member and relay hint tasks
for member in self.members.iter() {
let hint = nostr.read(cx).relay_hint(member, cx);
member_and_relay_hints.insert(member.to_owned(), hint);
}
cx.background_spawn(async move {
let signer = client.signer().context("Signer not found")?;
let public_key = signer.get_public_key().await?;
// List of event tags for each receiver
let mut tags = vec![];
for (member, task) in member_and_relay_hints.into_iter() {
// Skip current user
if member == public_key {
continue;
}
// Get relay hint if available
let relay_url = task.await;
// Construct a public key tag with relay hint
let tag = TagStandard::PublicKey {
public_key: member,
relay_url,
alias: None,
uppercase: false,
};
tags.push(Tag::from_standardized_without_cell(tag));
}
// Add subject tag if present
if let Some(value) = subject {
tags.push(Tag::from_standardized_without_cell(TagStandard::Subject(
value.to_string(),
)));
}
// Add all reply tags
for id in replies {
tags.push(Tag::event(id))
}
// Construct a direct message event
//
// WARNING: never sign and send this event to relays
let mut event = EventBuilder::new(Kind::PrivateDirectMessage, content)
.tags(tags)
.build(public_key);
// Ensure the event ID has been generated
event.ensure_id();
Ok(event)
})
}
/// Create a task to send a message to all room members
@@ -459,46 +647,27 @@ impl Room {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
// Get current user's public key and relays
let current_user = nostr.read(cx).identity().read(cx).public_key();
let current_user_relays = nostr.read(cx).messaging_relays(&current_user, cx);
let mut members = self.members();
let rumor = rumor.to_owned();
// Get all members and their messaging relays
let task = self.members_with_relays(cx);
cx.background_spawn(async move {
let signer = client.signer().await?;
let current_user_relays = current_user_relays.await;
let mut members = task.await;
let signer = client.signer().context("Signer not found")?;
let current_user = signer.get_public_key().await?;
// Remove the current user's public key from the list of receivers
// the current user will be handled separately
members.retain(|(this, _)| this != &current_user);
members.retain(|this| this != &current_user);
// Collect the send reports
let mut reports: Vec<SendReport> = vec![];
for (receiver, relays) in members.into_iter() {
// Check if there are any relays to send the message to
if relays.is_empty() {
reports.push(SendReport::new(receiver).relays_not_found());
continue;
}
// Ensure relay connection
for url in relays.iter() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
for receiver in members.into_iter() {
// Construct the gift wrap event
let event =
EventBuilder::gift_wrap(&signer, &receiver, rumor.clone(), vec![]).await?;
EventBuilder::gift_wrap(signer, &receiver, rumor.clone(), vec![]).await?;
// Send the gift wrap event to the messaging relays
match client.send_event_to(relays, &event).await {
match client.send_event(&event).to_nip17().await {
Ok(output) => {
let id = output.id().to_owned();
let auth = output.failed.iter().any(|(_, s)| s.starts_with("auth-"));
@@ -536,24 +705,12 @@ impl Room {
// Construct the gift-wrapped event
let event =
EventBuilder::gift_wrap(&signer, &current_user, rumor.clone(), vec![]).await?;
EventBuilder::gift_wrap(signer, &current_user, rumor.clone(), vec![]).await?;
// Only send a backup message to current user if sent successfully to others
if reports.iter().all(|r| r.is_sent_success()) {
// Check if there are any relays to send the event to
if current_user_relays.is_empty() {
reports.push(SendReport::new(current_user).relays_not_found());
return Ok(reports);
}
// Ensure relay connection
for url in current_user_relays.iter() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
// Send the event to the messaging relays
match client.send_event_to(current_user_relays, &event).await {
match client.send_event(&event).to_nip17().await {
Ok(output) => {
reports.push(SendReport::new(current_user).status(output));
}
@@ -591,7 +748,7 @@ impl Room {
if let Some(event) = client.database().event_by_id(id).await? {
for url in urls.into_iter() {
let relay = client.pool().relay(url).await?;
let relay = client.relay(url).await?.context("Relay not found")?;
let id = relay.send_event(&event).await?;
let resent: Output<EventId> = Output {
@@ -622,4 +779,5 @@ impl Room {
Ok(resend_reports)
})
}
*/
}