+import aiofiles
+import asyncio
+import errno
+import logging
+import posix
+
+class Presence:
+ def __init__(self, room_name, fifo_path):
+ self._room_name = room_name
+ self._fifo_path = fifo_path
+
+ async def run(self, client):
+ self._log(logging.INFO, "Presence running")
+ self._stop = False
+ while not self._stop:
+ room_id = await self._enter_room(client)
+ try:
+ await self._process_messages(client, room_id)
+ except Exception as e:
+ self._log(logging.ERROR, "Error processing messages: %s", str(e))
+ self._log(logging.INFO, "Presence stopped")
+
+ def stop(self):
+ self._stop = True
+ fifo = -1
+ try:
+ fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK)
+ posix.write(fifo, "\n".encode())
+ except OSError as e:
+ if e.errno == errno.ENXIO:
+ pass
+ finally:
+ if fifo != -1:
+ posix.close(fifo)
+
+ async def _enter_room(self, client):
+ while not self._stop:
+ await client.sync()
+
+ for room_id, room in client.rooms.items():
+ if room.display_name == self._room_name:
+ self._log(logging.INFO, "A room member already")
+ return room_id
+
+ self._log(logging.INFO, "Not yet a room member")
+
+ was_invited = False
+ for room_id, room in client.invited_rooms.items():
+ if room.display_name == self._room_name:
+ was_invited = True
+ self._log(logging.INFO, "Joining room")
+ await client.join(room_id)
+ break
+
+ if not was_invited:
+ self._log(logging.INFO, "Got no room invite yet")
+
+ await asyncio.sleep(3.0)
+
+ async def _process_messages(self, client, room_id):
+ if self._stop:
+ return
+ async for line in self._read_fifo(self._fifo_path):
+ if self._stop:
+ break
+ await client.room_send(
+ room_id=room_id,
+ message_type="m.room.message",
+ content={
+ "msgtype": "m.text",
+ "body": line.rstrip("\n")
+ }
+ )
+
+ def _log(self, level, msg, *args, **kwargs):
+ logging.log(level, "P{%s}: %s" % (self._room_name, msg), *args, **kwargs)
+
+ @staticmethod
+ async def _read_fifo(file_path):
+ while True:
+ async with aiofiles.open(file_path) as fifo:
+ async for line in fifo:
+ yield(line)