]> git.treefish.org Git - mtxbot.git/blob - src/presence.py
0ab1ad44a3f2f43e7cb4c0032c199e91eff09051
[mtxbot.git] / src / presence.py
1 import aiofiles
2 import asyncio
3 import errno
4 import logging
5 import posix
6
7 class Presence:
8     def __init__(self, room_name, fifo_path):
9         self._room_name = room_name
10         self._fifo_path = fifo_path
11         self._is_member = False
12
13     async def run(self, client):
14         self._log(logging.INFO, "Presence running")
15         self._stop = False
16         asyncio.create_task( self._do_watch(client) )
17         while not self._stop:
18             room_id = await self._enter_room(client)
19             try:
20                 await self._process_messages(client, room_id)
21             except Exception as e:
22                 self._log(logging.ERROR, "Error processing messages: %s", str(e))
23         self._log(logging.INFO, "Presence stopped")
24
25     def stop(self):
26         asyncio.get_running_loop().call_soon_threadsafe(
27             Presence._do_stop, self
28         )
29
30     def _do_stop(self):
31         self._stop = True
32         self._tickle_fifo()
33
34     async def _enter_room(self, client):
35         while not self._stop:
36             await client.sync()
37
38             room_id = await self._joined_room_id(client)
39             if room_id != None:
40                 self._log(logging.INFO, "A room member already")
41                 self._is_member = True
42                 return room_id
43
44             self._log(logging.INFO, "Not yet a room member")
45
46             was_invited = False
47             for room_id, room in client.invited_rooms.items():
48                 if room.display_name == self._room_name:
49                     was_invited = True
50                     self._log(logging.INFO, "Joining room")
51                     await client.join(room_id)
52                     break
53
54             if not was_invited:
55                 self._log(logging.INFO, "Got no room invite yet")
56
57             await asyncio.sleep(3.0)
58
59     async def _do_watch(self, client):
60         while not self._stop:
61             await client.sync()
62             if await self._joined_room_id(client) == None:
63                 self._is_member = False
64                 self._tickle_fifo()
65             else:
66                 self._is_member = True
67             await asyncio.sleep(3.0)
68
69     def _tickle_fifo(self):
70         fifo = -1
71         try:
72             fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK)
73             posix.write(fifo, "\n".encode())
74         except OSError as e:
75             if e.errno == errno.ENXIO:
76                 pass
77         finally:
78             if fifo != -1:
79                 posix.close(fifo)
80
81     async def _process_messages(self, client, room_id):
82         if self._stop or not self._is_member:
83             return
84         async for line in self._read_fifo(self._fifo_path):
85             if self._stop or not self._is_member:
86                 break
87             await client.room_send(
88                 room_id=room_id,
89                 message_type="m.room.message",
90                 content={
91                     "msgtype": "m.text",
92                     "body": line.rstrip("\n")
93                 }
94             )
95
96     async def _joined_room_id(self, client):
97         joined_room_ids = ( await client.joined_rooms() ).rooms
98         for room_id, room in client.rooms.items():
99             if room.display_name == self._room_name:
100                 if room_id in joined_room_ids:
101                     return room_id
102         return None
103
104     def _log(self, level, msg, *args, **kwargs):
105         logging.log(level, "P{%s}: %s" % (self._room_name, msg), *args, **kwargs)
106
107     @staticmethod
108     async def _read_fifo(file_path):
109         while True:
110             async with aiofiles.open(file_path) as fifo:
111                 async for line in fifo:
112                     yield(line)