8 def __init__(self, room_name, fifo_path):
9 self._room_name = room_name
10 self._fifo_path = fifo_path
11 self._is_member = False
13 async def run(self, client):
14 self._log(logging.INFO, "Presence running")
16 asyncio.create_task( self._do_watch(client) )
18 room_id = await self._enter_room(client)
20 await self._process_messages(client, room_id)
21 except Exception as e:
22 self._log(logging.ERROR, "Error processing messages: %s", str(e))
23 self._log(logging.INFO, "Presence stopped")
26 asyncio.get_running_loop().call_soon_threadsafe(
27 Presence._do_stop, self
34 async def _enter_room(self, client):
38 room_id = await self._joined_room_id(client)
40 self._log(logging.INFO, "A room member already")
41 self._is_member = True
44 self._log(logging.INFO, "Not yet a room member")
47 for room_id, room in client.invited_rooms.items():
48 if room.display_name == self._room_name:
50 self._log(logging.INFO, "Joining room")
51 await client.join(room_id)
55 self._log(logging.INFO, "Got no room invite yet")
57 await asyncio.sleep(3.0)
59 async def _do_watch(self, client):
61 self._log(logging.INFO, "WATCHING...")
65 if await self._joined_room_id(client) == None:
66 self._is_member = False
69 self._is_member = True
71 await asyncio.sleep(3.0)
73 def _tickle_fifo(self):
76 fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK)
77 posix.write(fifo, "\n".encode())
79 if e.errno == errno.ENXIO:
85 async def _process_messages(self, client, room_id):
86 if self._stop or not self._is_member:
88 async for line in self._read_fifo(self._fifo_path):
89 if self._stop or not self._is_member:
91 await client.room_send(
93 message_type="m.room.message",
96 "body": line.rstrip("\n")
100 async def _joined_room_id(self, client):
101 joined_room_ids = ( await client.joined_rooms() ).rooms
102 for room_id, room in client.rooms.items():
103 if room.display_name == self._room_name:
104 if room_id in joined_room_ids:
108 def _log(self, level, msg, *args, **kwargs):
109 logging.log(level, "P{%s}: %s" % (self._room_name, msg), *args, **kwargs)
112 async def _read_fifo(file_path):
114 async with aiofiles.open(file_path) as fifo:
115 async for line in fifo: