Source code for backtracked.models.message

from .base import Model, OrderedCollection
from .user import Member, User
from .room import Room
from ..client.constants import Endpoints
from .. import utils

__all__ = ["Message", "MessageCollection", "Conversation", "ConversationCollection"]

def everything_except(iterable, exception):
    return filter(lambda o: o != exception, iterable)

[docs]class Message(Model): """ Represents a sent message on QueUp. Attributes ---------- id: str Chat ID of this message. content: str Text of the message. deleted: bool True if the message has been deleted, False otherwise. created_at: :class:`datetime.datetime` Datetime representing the time this message was sent. """ def __init__(self, client, data: dict): super().__init__(client) self.id = data.get("chatid") self.content = data.get("message") self.deleted = False self.created_at = utils.dt(data.get("time")) self._userid = data.get("user", {}).get("_id") self._roomid = data.get("queue_object", {}).get("roomid") # self.member = Member(self, data.get("queue_object")) # today on naming things with dubtrack @classmethod def from_conversation_message(cls, client, data: dict): new_data = dict(chatid=data.get("_id"), message=data.get("message"), time=data.get("created"), user=data.get("_user"), queue_object={"roomid": None}) return cls(client, new_data) @property def author(self) -> User: """ Get the author of this message. Returns ------- :class:`User` Author of this message """ return self.client.users.get(self._userid) @property def room(self) -> Room: """ Get the room this message was sent in. If this message was sent in a private conversation, this will be None. Returns ------- :class:`Room` Room this message was sent to. """ return self.client.rooms.get(self._roomid) @property def member(self) -> Member: """ Get the author of this message as a member of the room. Returns ------- :class:`Member` Member who sent this message. """ return self.room.members.from_user_id(self._userid)
# TODO: Conversations can actually have multiple recipients.
[docs]class Conversation(Model): """ Represents a private conversation between two users. Attributes ---------- id: str ID of this conversation. created_at: :class:`datetime.datetime` Time this conversation was started. latest_message_str: str Text of the last message to be sent in this conversation. May not be up-to-date unless :meth:`fetch` is called. """ def __init__(self, client, data: dict): super().__init__(client) self.id = data.get("_id") self.created_at = utils.dt(data.get("created")) self.latest_message_str = data.get("latest_message_str") self._latest_message_dt = data.get("latest_message") self._read = data.get("users_read") self._users = [] for user in data.get("usersid"): u = User(self.client, user) self._users.append(u.id) self.client.users.add(u) @property def _recipients(self): others = everything_except(self._users, self.client.user.id) return list(others) @property def recipients(self) -> list: """ Get the recipients of this conversation. Returns ------- list[:class:`User`] Recipients of this conversation. """ return list(map(self.client.users.get, self._recipients))
[docs] async def fetch(self) -> list: """ Fetch all messages for this conversation. Returns ------- list[str] List of message IDs in this conversation. """ _, messages = await self.client.http.get(Endpoints.conversation(cid=self.id)) self.latest_message_str = messages[0]['message'] for msg_data in messages: message = Message.from_conversation_message(self.client, msg_data) self.client.messages.add(message) return list(map(lambda m: m['_id'], messages))
[docs] async def send_message(self, text: str): """ Send a message to this conversation. Parameters ---------- text: str Text to send in the message. """ data = { "message": text, "userid": self.client.user.id } _, msg_data = await self.client.http.post(Endpoints.conversation(cid=self.id), json=data)
# TODO: fuck me the message data is in yet *another* format # we'll deal with this later
[docs] async def mark_as_read(self): """ Marks this conversation as read by the current user. """ _, resp = await self.client.http.post(Endpoints.conversation_read(cid=self.id)) self._read = resp.get("users_read")
[docs] def has_read(self, user: User) -> bool: """ Checks if the passed :class:`User` has read this conversation. Parameters ---------- user: :class:`User` User to check Returns ------- bool: True if the user has read this conversation, False otherwise. """ return user.id in self._read
class MessageCollection(OrderedCollection): pass class ConversationCollection(OrderedCollection): def get_by_recipients(self, *args): def _checker(conv: Conversation): for uid in args: if uid not in conv._recipients: return False return True convs = filter(_checker, self.values()) return next(convs, None)