chore: rewrite the backend (not tested) (#203)

* wip: refactor

* refactor

* clean up

* .

* rename

* add relay auth

* .

* .

* optimize

* .

* clean up

* add encryption crate

* .

* .

* .

* .

* .

* add encryption crate

* .

* refactor nip4e

* .

* fix endless loop

* fix metadata fetching
This commit is contained in:
reya
2025-11-11 09:09:33 +07:00
committed by GitHub
parent a1a0a7ecd4
commit 512834b640
68 changed files with 3503 additions and 3194 deletions

View File

@@ -6,20 +6,22 @@ publish.workspace = true
[dependencies]
common = { path = "../common" }
states = { path = "../states" }
state = { path = "../state" }
account = { path = "../account" }
encryption = { path = "../encryption" }
person = { path = "../person" }
settings = { path = "../settings" }
gpui.workspace = true
nostr.workspace = true
nostr-sdk.workspace = true
anyhow.workspace = true
itertools.workspace = true
smallvec.workspace = true
smol.workspace = true
log.workspace = true
futures.workspace = true
flume.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -1,27 +1,53 @@
use std::cmp::Reverse;
use std::collections::{HashMap, HashSet};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use account::Account;
use anyhow::Error;
use common::event::EventUtils;
use anyhow::{anyhow, Context as AnyhowContext, Error};
use common::{EventUtils, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT};
use encryption::Encryption;
use flume::Sender;
use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher;
use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Task, Window};
use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Subscription, Task};
pub use message::*;
use nostr_sdk::prelude::*;
use room::RoomKind;
pub use room::*;
use settings::AppSettings;
use smallvec::{smallvec, SmallVec};
use states::{app_state, NewMessage};
use smol::lock::RwLock;
use state::{initialized_at, EventTracker, NostrRegistry, GIFTWRAP_SUBSCRIPTION};
use crate::room::Room;
pub mod message;
pub mod room;
mod message;
mod room;
pub fn init(cx: &mut App) {
ChatRegistry::set_global(cx.new(ChatRegistry::new), cx);
}
struct GlobalChatRegistry(Entity<ChatRegistry>);
impl Global for GlobalChatRegistry {}
/// Chat Registry
#[derive(Debug)]
pub struct ChatRegistry {
/// Collection of all chat rooms
pub rooms: Vec<Entity<Room>>,
/// Loading status of the registry
pub loading: bool,
/// Event subscriptions
_subscriptions: SmallVec<[Subscription; 1]>,
/// Tasks for asynchronous operations
_tasks: SmallVec<[Task<()>; 4]>,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum ChatEvent {
OpenRoom(u64),
@@ -29,43 +55,271 @@ pub enum ChatEvent {
NewChatRequest(RoomKind),
}
struct GlobalChatRegistry(Entity<ChatRegistry>);
impl Global for GlobalChatRegistry {}
pub struct ChatRegistry {
/// Collection of all chat rooms
pub rooms: Vec<Entity<Room>>,
/// Loading status of the registry
pub loading: bool,
/// Tasks for asynchronous operations
_tasks: SmallVec<[Task<()>; 2]>,
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum Signal {
Loading(bool),
Message(NewMessage),
Eose,
}
impl EventEmitter<ChatEvent> for ChatRegistry {}
impl ChatRegistry {
/// Retrieve the global registry state
/// Retrieve the global chat registry state
pub fn global(cx: &App) -> Entity<Self> {
cx.global::<GlobalChatRegistry>().0.clone()
}
/// Set the global registry instance
pub(crate) fn set_global(state: Entity<Self>, cx: &mut App) {
/// Set the global chat registry instance
fn set_global(state: Entity<Self>, cx: &mut App) {
cx.set_global(GlobalChatRegistry(state));
}
/// Create a new registry instance
pub(crate) fn new(_cx: &mut Context<Self>) -> Self {
/// Create a new chat registry instance
fn new(cx: &mut Context<Self>) -> Self {
let encryption = Encryption::global(cx);
let encryption_key = encryption.read(cx).encryption.clone();
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let tracker = nostr.read(cx).tracker();
let status = Arc::new(AtomicBool::new(true));
let (tx, rx) = flume::bounded::<Signal>(2048);
let mut subscriptions = smallvec![];
let mut tasks = smallvec![];
subscriptions.push(
// Observe the encryption global state
cx.observe(&encryption_key, {
let status = Arc::clone(&status);
let tx = tx.clone();
move |this, state, cx| {
if let Some(signer) = state.read(cx).clone() {
this.retry_failed_events(&signer, &tx, &status, cx);
}
}
}),
);
tasks.push(
// Handle notifications
cx.background_spawn({
let client = Arc::clone(&client);
let status = Arc::clone(&status);
let tx = tx.clone();
async move { Self::handle_notifications(&client, &tracker, &tx, &status).await }
}),
);
tasks.push(
// Handle unwrapping status
cx.background_spawn({
let client = Arc::clone(&client);
async move { Self::handle_unwrapping(&client, &status, &tx).await }
}),
);
tasks.push(
// Handle new messages
cx.spawn(async move |this, cx| {
while let Ok(message) = rx.recv_async().await {
match message {
Signal::Message(message) => {
this.update(cx, |this, cx| {
this.new_message(message, cx);
})
.expect("Entity has been released");
}
Signal::Eose => {
this.update(cx, |this, cx| {
this.get_rooms(cx);
})
.expect("Entity has been released");
}
Signal::Loading(status) => {
this.update(cx, |this, cx| {
this.set_loading(status, cx);
this.get_rooms(cx);
})
.expect("Entity has been released");
}
};
}
}),
);
Self {
rooms: vec![],
loading: true,
_tasks: smallvec![],
_subscriptions: subscriptions,
_tasks: tasks,
}
}
async fn handle_notifications(
client: &Client,
tracker: &Arc<RwLock<EventTracker>>,
tx: &Sender<Signal>,
status: &Arc<AtomicBool>,
) {
let mut notifications = client.notifications();
log::info!("Listening for notifications");
let initialized_at = initialized_at();
let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
let mut public_keys = HashSet::new();
let mut processed_events = HashSet::new();
while let Ok(notification) = notifications.recv().await {
let RelayPoolNotification::Message { message, .. } = notification else {
// Skip non-message notifications
continue;
};
match message {
RelayMessage::Event { event, .. } => {
if !processed_events.insert(event.id) {
// Skip if the event has already been processed
continue;
}
if event.kind != Kind::GiftWrap {
// Skip non-gift wrap events
continue;
}
// Extract the rumor from the gift wrap event
match Self::extract_rumor(client, event.as_ref()).await {
Ok(rumor) => {
// Get all public keys
public_keys.extend(rumor.all_pubkeys());
let limit_reached = public_keys.len() >= METADATA_BATCH_LIMIT;
let done = !status.load(Ordering::Acquire) && !public_keys.is_empty();
// Get metadata for all public keys if the limit is reached
if limit_reached || done {
let public_keys = std::mem::take(&mut public_keys);
// Get metadata for the public keys
Self::get_metadata(client, public_keys).await.ok();
}
match &event.created_at >= initialized_at {
true => {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
if let Err(e) = tx.send_async(signal).await {
log::error!("Failed to send signal: {}", e);
}
}
false => {
status.store(true, Ordering::Release);
}
}
}
Err(_e) => {
let mut tracker = tracker.write().await;
tracker.failed_unwrap_events.push(event.as_ref().clone());
drop(tracker);
}
}
}
RelayMessage::EndOfStoredEvents(id) => {
if id.as_ref() == &subscription_id {
if let Err(e) = tx.send_async(Signal::Eose).await {
log::error!("Failed to send signal: {}", e);
}
}
}
_ => {}
}
}
}
async fn handle_unwrapping(client: &Client, status: &Arc<AtomicBool>, tx: &Sender<Signal>) {
let loop_duration = Duration::from_secs(20);
let mut is_start_processing = false;
let mut total_loops = 0;
loop {
if client.has_signer().await {
total_loops += 1;
if status.load(Ordering::Acquire) {
is_start_processing = true;
// Reset gift wrap processing flag
_ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed);
// Send loading signal
if let Err(e) = tx.send_async(Signal::Loading(true)).await {
log::error!("Failed to send signal: {}", e);
}
} else {
// Only run further if we are already processing
// Wait until after 2 loops to prevent exiting early while events are still being processed
if is_start_processing && total_loops >= 2 {
// Send loading signal
if let Err(e) = tx.send_async(Signal::Loading(false)).await {
log::error!("Failed to send signal: {}", e);
}
// Reset the counter
is_start_processing = false;
total_loops = 0;
}
}
}
smol::Timer::after(loop_duration).await;
}
}
fn retry_failed_events(
&mut self,
signer: &Arc<dyn NostrSigner>,
tx: &Sender<Signal>,
status: &Arc<AtomicBool>,
cx: &mut Context<Self>,
) {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let tracker = nostr.read(cx).tracker();
let signer = Arc::clone(signer);
let status = Arc::clone(status);
let tx = tx.clone();
let initialized_at = initialized_at();
self._tasks.push(cx.background_spawn(async move {
let tracker = tracker.read().await;
for event in tracker.failed_unwrap_events.iter() {
if let Ok(rumor) = Self::try_unwrap_custom(&client, &signer, event).await {
match &event.created_at >= initialized_at {
true => {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
if let Err(e) = tx.send_async(signal).await {
log::error!("Failed to send signal: {}", e);
}
}
false => {
status.store(true, Ordering::Release);
}
}
}
}
}));
}
/// Set the loading status of the chat registry
pub fn set_loading(&mut self, loading: bool, cx: &mut Context<Self>) {
self.loading = loading;
@@ -147,97 +401,19 @@ impl ChatRegistry {
cx.notify();
}
/// Load all rooms from the database.
pub fn load_rooms(&mut self, window: &mut Window, cx: &mut Context<Self>) {
log::info!("Starting to load chat rooms...");
/// Push a new room to the chat registry
pub fn push_room(&mut self, room: Entity<Room>, cx: &mut Context<Self>) {
let id = room.read(cx).id;
// Get the contact bypass setting
let bypass_setting = AppSettings::get_contact_bypass(cx);
if !self.rooms.iter().any(|r| r.read(cx).id == id) {
self.add_room(room, cx);
}
let task: Task<Result<HashSet<Room>, Error>> = cx.background_spawn(async move {
let client = app_state().client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let contacts = client.database().contacts_public_keys(public_key).await?;
let authored_filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.custom_tag(SingleLetterTag::lowercase(Alphabet::A), public_key);
let addressed_filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.custom_tag(SingleLetterTag::lowercase(Alphabet::P), public_key);
let authored = client.database().query(authored_filter).await?;
let addressed = client.database().query(addressed_filter).await?;
let events = authored.merge(addressed);
let mut rooms: HashSet<Room> = HashSet::new();
let mut grouped: HashMap<u64, Vec<UnsignedEvent>> = HashMap::new();
// Process each event and group by room hash
for raw in events.into_iter() {
match UnsignedEvent::from_json(&raw.content) {
Ok(rumor) => {
if rumor.tags.public_keys().peekable().peek().is_some() {
grouped.entry(rumor.uniq_id()).or_default().push(rumor);
}
}
Err(e) => log::warn!("Failed to parse stored rumor: {e}"),
}
}
for (_room_id, mut messages) in grouped.into_iter() {
messages.sort_by_key(|m| Reverse(m.created_at));
let Some(latest) = messages.first() else {
continue;
};
let mut room = Room::from(latest);
if rooms.iter().any(|r| r.id == room.id) {
continue;
}
let mut public_keys: Vec<PublicKey> = room.members().to_vec();
public_keys.retain(|pk| pk != &public_key);
let user_sent = messages.iter().any(|m| m.pubkey == public_key);
let mut bypassed = false;
if bypass_setting {
bypassed = public_keys.iter().any(|k| contacts.contains(k));
}
if user_sent || bypassed {
room = room.kind(RoomKind::Ongoing);
}
rooms.insert(room);
}
Ok(rooms)
});
cx.spawn_in(window, async move |this, cx| {
match task.await {
Ok(rooms) => {
this.update_in(cx, move |this, _window, cx| {
this.extend_rooms(rooms, cx);
this.sort(cx);
})
.ok();
}
Err(e) => {
log::error!("Failed to load rooms: {e}")
}
};
})
.detach();
cx.emit(ChatEvent::OpenRoom(id));
}
pub(crate) fn extend_rooms(&mut self, rooms: HashSet<Room>, cx: &mut Context<Self>) {
/// Extend the registry with new rooms.
fn extend_rooms(&mut self, rooms: HashSet<Room>, cx: &mut Context<Self>) {
let mut room_map: HashMap<u64, usize> = self
.rooms
.iter()
@@ -264,18 +440,111 @@ impl ChatRegistry {
}
}
/// Push a new room to the chat registry
pub fn push_room(&mut self, room: Entity<Room>, cx: &mut Context<Self>) {
let id = room.read(cx).id;
/// Load all rooms from the database.
pub fn get_rooms(&mut self, cx: &mut Context<Self>) {
let task = self.create_get_rooms_task(cx);
if !self.rooms.iter().any(|r| r.read(cx).id == id) {
self.add_room(room, cx);
}
cx.emit(ChatEvent::OpenRoom(id));
self._tasks.push(
// Run and finished in the background
cx.spawn(async move |this, cx| {
match task.await {
Ok(rooms) => {
this.update(cx, move |this, cx| {
this.extend_rooms(rooms, cx);
this.sort(cx);
})
.expect("Entity has been released");
}
Err(e) => {
log::error!("Failed to load rooms: {e}")
}
};
}),
);
}
/// Refresh messages for a room in the global registry
/// Create a task to load rooms from the database
fn create_get_rooms_task(&self, cx: &App) -> Task<Result<HashSet<Room>, Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
// Get the contact bypass setting
let bypass_setting = AppSettings::get_contact_bypass(cx);
cx.background_spawn(async move {
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let contacts = client.database().contacts_public_keys(public_key).await?;
let authored_filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.custom_tag(SingleLetterTag::lowercase(Alphabet::A), public_key);
// Get all authored events
let authored = client.database().query(authored_filter).await?;
let addressed_filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.custom_tag(SingleLetterTag::lowercase(Alphabet::P), public_key);
// Get all addressed events
let addressed = client.database().query(addressed_filter).await?;
// Merge authored and addressed events
let events = authored.merge(addressed);
let mut rooms: HashSet<Room> = HashSet::new();
let mut grouped: HashMap<u64, Vec<UnsignedEvent>> = HashMap::new();
// Process each event and group by room hash
for raw in events.into_iter() {
if let Ok(rumor) = UnsignedEvent::from_json(&raw.content) {
if rumor.tags.public_keys().peekable().peek().is_some() {
grouped.entry(rumor.uniq_id()).or_default().push(rumor);
}
}
}
for (_id, mut messages) in grouped.into_iter() {
messages.sort_by_key(|m| Reverse(m.created_at));
let Some(latest) = messages.first() else {
continue;
};
let mut room = Room::from(latest);
if rooms.iter().any(|r| r.id == room.id) {
continue;
}
let mut public_keys = room.members();
public_keys.retain(|pk| pk != &public_key);
// Check if the user has responded to the room
let user_sent = messages.iter().any(|m| m.pubkey == public_key);
// Determine if the room is ongoing or not
let mut bypassed = false;
// Check if public keys are from the user's contacts
if bypass_setting {
bypassed = public_keys.iter().any(|k| contacts.contains(k));
}
// Set the room's kind based on status
if user_sent || bypassed {
room = room.kind(RoomKind::Ongoing);
}
rooms.insert(room);
}
Ok(rooms)
})
}
/// Trigger a refresh of the opened chat rooms by their IDs
pub fn refresh_rooms(&mut self, ids: Option<Vec<u64>>, cx: &mut Context<Self>) {
if let Some(ids) = ids {
for room in self.rooms.iter() {
@@ -292,15 +561,15 @@ impl ChatRegistry {
///
/// If the room doesn't exist, it will be created.
/// Updates room ordering based on the most recent messages.
pub fn new_message(&mut self, msg: NewMessage, window: &mut Window, cx: &mut Context<Self>) {
let id = msg.rumor.uniq_id();
let author = msg.rumor.pubkey;
pub fn new_message(&mut self, message: NewMessage, cx: &mut Context<Self>) {
let id = message.rumor.uniq_id();
let author = message.rumor.pubkey;
let account = Account::global(cx);
if let Some(room) = self.rooms.iter().find(|room| room.read(cx).id == id) {
let is_new_event = msg.rumor.created_at > room.read(cx).created_at;
let created_at = msg.rumor.created_at;
let event_for_emit = msg.rumor.clone();
let is_new_event = message.rumor.created_at > room.read(cx).created_at;
let created_at = message.rumor.created_at;
let event_for_emit = message.rumor.clone();
// Update room
room.update(cx, |this, cx| {
@@ -314,26 +583,170 @@ impl ChatRegistry {
}
// Emit the new message to the room
let event_to_emit = event_for_emit.clone();
cx.defer_in(window, move |this, _window, cx| {
this.emit_message(msg.gift_wrap, event_to_emit, cx);
});
this.emit_message(message.gift_wrap, event_for_emit.clone(), cx);
});
// Resort all rooms in the registry by their created at (after updated)
if is_new_event {
cx.defer_in(window, |this, _window, cx| {
this.sort(cx);
});
self.sort(cx);
}
} else {
// Push the new room to the front of the list
self.add_room(cx.new(|_| Room::from(&msg.rumor)), cx);
self.add_room(cx.new(|_| Room::from(&message.rumor)), cx);
// Notify the UI about the new room
cx.defer_in(window, move |_this, _window, cx| {
cx.emit(ChatEvent::NewChatRequest(RoomKind::default()));
});
cx.emit(ChatEvent::NewChatRequest(RoomKind::default()));
}
}
// Unwraps a gift-wrapped event and processes its contents.
async fn extract_rumor(client: &Client, gift_wrap: &Event) -> Result<UnsignedEvent, Error> {
// Try to get cached rumor first
if let Ok(event) = Self::get_rumor(client, gift_wrap.id).await {
return Ok(event);
}
// Try to unwrap with the available signer
let unwrapped = Self::try_unwrap(client, gift_wrap).await?;
let mut rumor_unsigned = unwrapped.rumor;
// Generate event id for the rumor if it doesn't have one
rumor_unsigned.ensure_id();
// Cache the rumor
Self::set_rumor(client, gift_wrap.id, &rumor_unsigned).await?;
Ok(rumor_unsigned)
}
// Helper method to try unwrapping with different signers
async fn try_unwrap(client: &Client, gift_wrap: &Event) -> Result<UnwrappedGift, Error> {
let signer = client.signer().await?;
let unwrapped = UnwrappedGift::from_gift_wrap(&signer, gift_wrap).await?;
Ok(unwrapped)
}
/// Helper method to try unwrapping with a custom signer
async fn try_unwrap_custom<T>(
client: &Client,
signer: &T,
gift_wrap: &Event,
) -> Result<UnsignedEvent, Error>
where
T: NostrSigner,
{
let unwrapped = UnwrappedGift::from_gift_wrap(signer, gift_wrap).await?;
let mut rumor_unsigned = unwrapped.rumor;
// Generate event id for the rumor if it doesn't have one
rumor_unsigned.ensure_id();
// Cache the rumor
Self::set_rumor(client, gift_wrap.id, &rumor_unsigned).await?;
Ok(rumor_unsigned)
}
/// Stores an unwrapped event in local database with reference to original
async fn set_rumor(client: &Client, id: EventId, rumor: &UnsignedEvent) -> Result<(), Error> {
let rumor_id = rumor.id.context("Rumor is missing an event id")?;
let author = rumor.pubkey;
let conversation = Self::conversation_id(rumor);
let mut tags = rumor.tags.clone().to_vec();
// Add a unique identifier
tags.push(Tag::identifier(id));
// Add a reference to the rumor's author
tags.push(Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::A)),
[author],
));
// Add a conversation id
tags.push(Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::C)),
[conversation.to_string()],
));
// Add a reference to the rumor's id
tags.push(Tag::event(rumor_id));
// Add references to the rumor's participants
for receiver in rumor.tags.public_keys().copied() {
tags.push(Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::P)),
[receiver],
));
}
// Convert rumor to json
let content = rumor.as_json();
// Construct the event
let event = EventBuilder::new(Kind::ApplicationSpecificData, content)
.tags(tags)
.sign(&Keys::generate())
.await?;
// Save the event to the database
client.database().save_event(&event).await?;
Ok(())
}
/// Retrieves a previously unwrapped event from local database
async fn get_rumor(client: &Client, gift_wrap: EventId) -> Result<UnsignedEvent, Error> {
let filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.identifier(gift_wrap)
.limit(1);
if let Some(event) = client.database().query(filter).await?.first_owned() {
UnsignedEvent::from_json(event.content).map_err(|e| anyhow!(e))
} else {
Err(anyhow!("Event is not cached yet."))
}
}
/// Get metadata for a list of public keys
async fn get_metadata<I>(client: &Client, public_keys: I) -> Result<(), Error>
where
I: IntoIterator<Item = PublicKey>,
{
let authors: Vec<PublicKey> = public_keys.into_iter().collect();
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let kinds = vec![Kind::Metadata, Kind::ContactList, Kind::RelayList];
// Return if the list is empty
if authors.is_empty() {
return Err(anyhow!("You need at least one public key".to_string(),));
}
let filter = Filter::new()
.limit(authors.len() * kinds.len() + 10)
.authors(authors)
.kinds(kinds);
// Subscribe to filters to the bootstrap relays
client
.subscribe_to(BOOTSTRAP_RELAYS, filter, Some(opts))
.await?;
Ok(())
}
/// Get the conversation ID for a given rumor (message).
fn conversation_id(rumor: &UnsignedEvent) -> u64 {
let mut hasher = DefaultHasher::new();
let mut pubkeys: Vec<PublicKey> = rumor.tags.public_keys().copied().collect();
pubkeys.push(rumor.pubkey);
pubkeys.sort();
pubkeys.dedup();
pubkeys.hash(&mut hasher);
hasher.finish()
}
}

View File

@@ -2,6 +2,18 @@ use std::hash::Hash;
use nostr_sdk::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct NewMessage {
pub gift_wrap: EventId,
pub rumor: UnsignedEvent,
}
impl NewMessage {
pub fn new(gift_wrap: EventId, rumor: UnsignedEvent) -> Self {
Self { gift_wrap, rumor }
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum Message {
User(RenderedMessage),

View File

@@ -5,12 +5,14 @@ use std::time::Duration;
use account::Account;
use anyhow::{anyhow, Error};
use common::display::RenderedProfile;
use common::event::EventUtils;
use gpui::{App, AppContext, Context, EventEmitter, SharedString, SharedUri, Task};
use common::{EventUtils, RenderedProfile};
use encryption::{Encryption, SignerKind};
use gpui::{App, AppContext, Context, EventEmitter, SharedString, Task};
use nostr_sdk::prelude::*;
use person::PersonRegistry;
use states::{app_state, SignerKind, SEND_RETRY};
use state::NostrRegistry;
const SEND_RETRY: usize = 10;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SendOptions {
@@ -159,30 +161,6 @@ impl Eq for Room {}
impl EventEmitter<RoomSignal> for Room {}
impl From<&Event> for Room {
fn from(val: &Event) -> Self {
let id = val.uniq_id();
let created_at = val.created_at;
// Get the members from the event's tags and event's pubkey
let members = val.all_pubkeys();
// Get subject from tags
let subject = val
.tags
.find(TagKind::Subject)
.and_then(|tag| tag.content().map(|s| s.to_owned().into()));
Room {
id,
created_at,
subject,
members,
kind: RoomKind::default(),
}
}
}
impl From<&UnsignedEvent> for Room {
fn from(val: &UnsignedEvent) -> Self {
let id = val.uniq_id();
@@ -209,15 +187,7 @@ impl From<&UnsignedEvent> for Room {
impl Room {
/// Constructs a new room with the given receiver and tags.
pub async fn new(subject: Option<String>, receivers: Vec<PublicKey>) -> Result<Self, Error> {
let client = app_state().client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
if receivers.is_empty() {
return Err(anyhow!("You need to add at least one receiver"));
};
pub fn new(subject: Option<String>, author: PublicKey, receivers: Vec<PublicKey>) -> Self {
// Convert receiver's public keys into tags
let mut tags: Tags = Tags::from_list(
receivers
@@ -235,12 +205,12 @@ impl Room {
let mut event = EventBuilder::new(Kind::PrivateDirectMessage, "")
.tags(tags)
.build(public_key);
.build(author);
// Generate event ID
event.ensure_id();
Ok(Room::from(&event))
Room::from(&event)
}
/// Sets the kind of the room and returns the modified room
@@ -292,11 +262,11 @@ impl Room {
}
/// Gets the display image for the room
pub fn display_image(&self, proxy: bool, cx: &App) -> SharedUri {
pub fn display_image(&self, proxy: bool, cx: &App) -> SharedString {
if !self.is_group() {
self.display_member(cx).avatar(proxy)
} else {
SharedUri::from("brand/group.png")
SharedString::from("brand/group.png")
}
}
@@ -358,10 +328,11 @@ impl Room {
/// Get messaging relays and encryption keys announcement for each member
pub fn connect(&self, cx: &App) -> Task<Result<(), Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let members = self.members();
cx.background_spawn(async move {
let client = app_state().client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
@@ -396,10 +367,11 @@ impl Room {
/// Get all messages belonging to the room
pub fn get_messages(&self, cx: &App) -> Task<Result<Vec<UnsignedEvent>, Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let conversation_id = self.id.to_string();
cx.background_spawn(async move {
let client = app_state().client();
let filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.custom_tag(SingleLetterTag::lowercase(Alphabet::C), conversation_id);
@@ -419,10 +391,6 @@ impl Room {
/// Create a new message event (unsigned)
pub fn create_message(&self, content: &str, replies: &[EventId], cx: &App) -> UnsignedEvent {
// Get the app state
let state = app_state();
let relay_cache = state.relay_cache.read_blocking();
// Get current user
let account = Account::global(cx);
let public_key = account.read(cx).public_key();
@@ -437,9 +405,7 @@ impl Room {
// NOTE: current user will be removed from the list of receivers
for member in self.members.iter() {
// Get relay hint if available
let relay_url = relay_cache
.get(member)
.and_then(|urls| urls.iter().nth(0).cloned());
let relay_url = None;
// Construct a public key tag with relay hint
let tag = TagStandard::PublicKey {
@@ -475,12 +441,12 @@ impl Room {
// Construct a direct message event
//
// WARNING: never send this event to relays
// WARNING: never sign and send this event to relays
let mut event = EventBuilder::new(Kind::PrivateDirectMessage, content)
.tags(tags)
.build(public_key);
// Generate event ID
// Ensure the event id has been generated
event.ensure_id();
event
@@ -493,6 +459,14 @@ impl Room {
opts: &SendOptions,
cx: &App,
) -> Task<Result<Vec<SendReport>, Error>> {
let encryption = Encryption::global(cx);
let encryption_key = encryption.read(cx).encryption_key(cx);
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let cache_manager = nostr.read(cx).cache_manager();
let tracker = nostr.read(cx).tracker();
let rumor = rumor.to_owned();
let opts = opts.to_owned();
@@ -500,16 +474,12 @@ impl Room {
let mut members = self.members();
cx.background_spawn(async move {
let state = app_state();
let client = state.client();
let signer_kind = opts.signer_kind;
let cache = cache_manager.read().await;
let tracker = tracker.read().await;
let relay_cache = state.relay_cache.read().await;
let announcement_cache = state.announcement_cache.read().await;
let encryption = state.device.read().await.encryption.clone();
// Get the encryption public key
let encryption_pubkey = if let Some(signer) = encryption.as_ref() {
let encryption_pubkey = if let Some(signer) = encryption_key.as_ref() {
signer.get_public_key().await.ok()
} else {
None
@@ -523,14 +493,14 @@ impl Room {
members.retain(|&pk| pk != user_pubkey);
// Determine the signer will be used based on the provided options
let signer = Self::select_signer(&opts.signer_kind, user_signer, encryption)?;
let signer = Self::select_signer(&opts.signer_kind, user_signer, encryption_key)?;
// Collect the send reports
let mut reports: Vec<SendReport> = vec![];
for member in members.into_iter() {
// Get user's messaging relays
let urls = relay_cache.get(&member).cloned().unwrap_or_default();
let urls = cache.relay(&member).cloned().unwrap_or_default();
// Check if there are any relays to send the message to
if urls.is_empty() {
@@ -539,8 +509,8 @@ impl Room {
}
// Get user's encryption public key if available
let encryption = announcement_cache
.get(&member)
let encryption = cache
.announcement(&member)
.and_then(|a| a.to_owned().map(|a| a.public_key()));
// Skip sending if using encryption signer but receiver's encryption keys not found
@@ -551,10 +521,16 @@ impl Room {
let receiver = Self::select_receiver(&signer_kind, member, encryption)?;
let rumor = rumor.clone();
let tags = vec![Tag::public_key(member)];
// Construct the sealed event
let seal = EventBuilder::seal(&signer, &receiver, rumor.clone())
.await?
.build(member)
.sign(&signer)
.await?;
// Construct the gift wrap event
let event = EventBuilder::gift_wrap(&signer, &receiver, rumor, tags).await?;
let event = EventBuilder::gift_wrap_from_seal(&member, &seal, vec![])?;
// Send the gift wrap event to the messaging relays
match client.send_event_to(urls, &event).await {
@@ -566,8 +542,7 @@ impl Room {
if auth {
// Wait for authenticated and resent event successfully
for attempt in 0..=SEND_RETRY {
let retry_manager = state.tracker().read().await;
let ids = retry_manager.resent_ids();
let ids = tracker.resent_ids();
// Check if event was successfully resent
if let Some(output) = ids.iter().find(|e| e.id() == &id).cloned() {
@@ -596,14 +571,20 @@ impl Room {
let receiver = Self::select_receiver(&signer_kind, user_pubkey, encryption_pubkey)?;
let rumor = rumor.clone();
let tags = vec![Tag::public_key(user_pubkey)];
// Construct the sealed event
let seal = EventBuilder::seal(&signer, &receiver, rumor.clone())
.await?
.build(user_pubkey)
.sign(&signer)
.await?;
// Construct the gift-wrapped event
let event = EventBuilder::gift_wrap(&signer, &receiver, rumor, tags).await?;
let event = EventBuilder::gift_wrap_from_seal(&receiver, &seal, vec![])?;
// Only send a backup message to current user if sent successfully to others
if opts.backup() && reports.iter().all(|r| r.is_sent_success()) {
let urls = relay_cache.get(&user_pubkey).cloned().unwrap_or_default();
let urls = cache.relay(&user_pubkey).cloned().unwrap_or_default();
// Check if there are any relays to send the event to
if urls.is_empty() {
@@ -633,9 +614,12 @@ impl Room {
reports: Vec<SendReport>,
cx: &App,
) -> Task<Result<Vec<SendReport>, Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let cache_manager = nostr.read(cx).cache_manager();
cx.background_spawn(async move {
let states = app_state();
let client = states.client();
let cache = cache_manager.read().await;
let mut resend_reports = vec![];
for report in reports.into_iter() {
@@ -664,7 +648,7 @@ impl Room {
// Process the on hold event if it exists
if let Some(event) = report.on_hold {
let urls = states.messaging_relays(receiver).await;
let urls = cache.relay(&receiver).cloned().unwrap_or_default();
// Check if there are any relays to send the event to
if urls.is_empty() {
@@ -702,15 +686,15 @@ impl Room {
fn select_receiver(
kind: &SignerKind,
user: PublicKey,
members: PublicKey,
encryption: Option<PublicKey>,
) -> Result<PublicKey, Error> {
match kind {
SignerKind::Encryption => {
Ok(encryption.ok_or_else(|| anyhow!("Receiver's encryption key not found"))?)
}
SignerKind::User => Ok(user),
SignerKind::Auto => Ok(encryption.unwrap_or(user)),
SignerKind::User => Ok(members),
SignerKind::Auto => Ok(encryption.unwrap_or(members)),
}
}
}