]> git.treefish.org Git - mtxbot.git/blob - src/presence.py
avoid race condition on stop
[mtxbot.git] / src / presence.py
1 import aiofiles
2 import asyncio
3 import errno
4 import logging
5 import posix
6
7 class Presence:
8     def __init__(self, room_name, fifo_path):
9         self._room_name = room_name
10         self._fifo_path = fifo_path
11
12     async def run(self, client):
13         self._log(logging.INFO, "Presence running")
14         self._stop = False
15         while not self._stop:
16             room_id = await self._enter_room(client)
17             try:
18                 await self._process_messages(client, room_id)
19             except Exception as e:
20                 self._log(logging.ERROR, "Error processing messages: %s", str(e))
21         self._log(logging.INFO, "Presence stopped")
22
23     def stop(self):
24         asyncio.get_running_loop().call_soon_threadsafe(
25             Presence._do_stop, self
26         )
27
28     def _do_stop(self):
29         self._stop = True
30         fifo = -1
31         try:
32             fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK)
33             posix.write(fifo, "\n".encode())
34         except OSError as e:
35             if e.errno == errno.ENXIO:
36                 pass
37         finally:
38             if fifo != -1:
39                 posix.close(fifo)
40
41     async def _enter_room(self, client):
42         while not self._stop:
43             await client.sync()
44
45             for room_id, room in client.rooms.items():
46                 if room.display_name == self._room_name:
47                     self._log(logging.INFO, "A room member already")
48                     return room_id
49
50             self._log(logging.INFO, "Not yet a room member")
51
52             was_invited = False
53             for room_id, room in client.invited_rooms.items():
54                 if room.display_name == self._room_name:
55                     was_invited = True
56                     self._log(logging.INFO, "Joining room")
57                     await client.join(room_id)
58                     break
59
60             if not was_invited:
61                 self._log(logging.INFO, "Got no room invite yet")
62
63             await asyncio.sleep(3.0)
64
65     async def _process_messages(self, client, room_id):
66         if self._stop:
67             return
68         async for line in self._read_fifo(self._fifo_path):
69             if self._stop:
70                 break
71             await client.room_send(
72                 room_id=room_id,
73                 message_type="m.room.message",
74                 content={
75                     "msgtype": "m.text",
76                     "body": line.rstrip("\n")
77                 }
78             )
79
80     def _log(self, level, msg, *args, **kwargs):
81         logging.log(level, "P{%s}: %s" % (self._room_name, msg), *args, **kwargs)
82
83     @staticmethod
84     async def _read_fifo(file_path):
85         while True:
86             async with aiofiles.open(file_path) as fifo:
87                 async for line in fifo:
88                     yield(line)