X-Git-Url: http://git.treefish.org/~alex/mtxbot.git/blobdiff_plain/b07be8cb62cc873d0c069c6410fdb0275620f66e..bb49ec12c34aa247c5a972695a318324fffa15ca:/src/presence.py?ds=sidebyside diff --git a/src/presence.py b/src/presence.py index 0a7b1a1..d9b6529 100644 --- a/src/presence.py +++ b/src/presence.py @@ -4,73 +4,106 @@ import errno import logging import posix +from inputparser import InputParser + class Presence: def __init__(self, room_name, fifo_path): + self._input_parser = InputParser() self._room_name = room_name self._fifo_path = fifo_path + self._joined_room_id = None async def run(self, client): self._log(logging.INFO, "Presence running") self._stop = False - while not self._stop: - room_id = await self._enter_room(client) - try: - await self._process_messages(client, room_id) - except Exception as e: - self._log(logging.ERROR, "Error processing messages: %s", str(e)) + + done, pending = await asyncio.wait( + [asyncio.create_task( self._room_joining_loop(client) ), + asyncio.create_task( self._message_loop(client) )], + return_when = asyncio.FIRST_EXCEPTION + ) + + for task in done: + task.result() + self._log(logging.INFO, "Presence stopped") def stop(self): + asyncio.get_running_loop().call_soon_threadsafe( + Presence._do_stop, self + ) + + def _do_stop(self): self._stop = True - fifo = -1 - try: - fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK) - posix.write(fifo, "\n".encode()) - except OSError as e: - if e.errno == errno.ENXIO: - pass - finally: - if fifo != -1: - posix.close(fifo) + self._tickle_fifo() - async def _enter_room(self, client): + async def _room_joining_loop(self, client): while not self._stop: await client.sync() - for room_id, room in client.rooms.items(): - if room.display_name == self._room_name: - self._log(logging.INFO, "A room member already") - return room_id - - self._log(logging.INFO, "Not yet a room member") + prior_joined_room_id = self._joined_room_id + self._joined_room_id = await self._get_joined_room_id(client) - was_invited = False - for room_id, room in client.invited_rooms.items(): - if room.display_name == self._room_name: - was_invited = True - self._log(logging.INFO, "Joining room") - await client.join(room_id) - break - - if not was_invited: - self._log(logging.INFO, "Got no room invite yet") + if self._joined_room_id != None: + if self._joined_room_id != prior_joined_room_id: + self._log(logging.INFO, "Joined room") + else: + if self._joined_room_id != prior_joined_room_id: + self._log(logging.INFO, "Got kicked out of room?") + else: + self._log(logging.INFO, "Not yet a room member") + was_invited = False + for room_id, room in client.invited_rooms.items(): + if room.display_name == self._room_name: + was_invited = True + self._log(logging.INFO, "Joining room") + await client.join(room_id) + break + if not was_invited: + self._log(logging.INFO, "Got no room invite yet") await asyncio.sleep(3.0) - async def _process_messages(self, client, room_id): + async def _message_loop(self, client): if self._stop: return async for line in self._read_fifo(self._fifo_path): if self._stop: break - await client.room_send( - room_id=room_id, - message_type="m.room.message", - content={ - "msgtype": "m.text", - "body": line.rstrip("\n") - } - ) + if self._joined_room_id != None: + try: + self._input_parser.feed_line(line) + except Exception as e: + self._log(logging.WARNING, "Error parsing input: %s" % str(e)) + continue + for content in self._input_parser.fetch_decoded(): + await client.room_send( + room_id=self._joined_room_id, + message_type="m.room.message", + content=content + ) + else: + self._log(logging.WARNING, "Dropping message cause no room joined") + + def _tickle_fifo(self): + fifo = -1 + try: + fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK) + posix.write(fifo, "\n".encode()) + except OSError as e: + if e.errno == errno.ENXIO: + pass + finally: + if fifo != -1: + posix.close(fifo) + + async def _get_joined_room_id(self, client): + joined_room_ids = ( await client.joined_rooms() ).rooms + for room_id, room in client.rooms.items(): + if room.display_name == self._room_name: + if room_id in joined_room_ids: + return room_id + return None def _log(self, level, msg, *args, **kwargs): logging.log(level, "P{%s}: %s" % (self._room_name, msg), *args, **kwargs)