]> git.treefish.org Git - mtxbot.git/blob - src/presence.py
be more tolerant on fifo writing
[mtxbot.git] / src / presence.py
1 import aiofiles
2 import asyncio
3 import errno
4 import logging
5 import posix
6
7 from inputparser import InputParser
8
9 class Presence:
10     def __init__(self, room_name, fifo_path):
11         self._input_parser = InputParser()
12         self._room_name = room_name
13         self._fifo_path = fifo_path
14         self._joined_room_id = None
15
16     async def run(self, client):
17         self._log(logging.INFO, "Presence running")
18         self._stop = False
19
20         done, pending = await asyncio.wait(
21             [asyncio.create_task( self._room_joining_loop(client) ),
22              asyncio.create_task( self._message_loop(client) )],
23             return_when = asyncio.FIRST_EXCEPTION
24         )
25
26         for task in done:
27             task.result()
28
29         self._log(logging.INFO, "Presence stopped")
30
31     def stop(self):
32         asyncio.get_running_loop().call_soon_threadsafe(
33             Presence._do_stop, self
34         )
35
36     def _do_stop(self):
37         self._stop = True
38         self._tickle_fifo()
39
40     async def _room_joining_loop(self, client):
41         while not self._stop:
42             await client.sync()
43
44             prior_joined_room_id = self._joined_room_id
45             self._joined_room_id = await self._get_joined_room_id(client)
46
47             if self._joined_room_id != None:
48                 if self._joined_room_id != prior_joined_room_id:
49                     self._log(logging.INFO, "Joined room")
50             else:
51                 if self._joined_room_id != prior_joined_room_id:
52                     self._log(logging.INFO, "Got kicked out of room?")
53                 else:
54                     self._log(logging.INFO, "Not yet a room member")
55                 was_invited = False
56                 for room_id, room in client.invited_rooms.items():
57                     if room.display_name == self._room_name:
58                         was_invited = True
59                         self._log(logging.INFO, "Joining room")
60                         await client.join(room_id)
61                         break
62                 if not was_invited:
63                     self._log(logging.INFO, "Got no room invite yet")
64
65             await asyncio.sleep(3.0)
66
67     async def _message_loop(self, client):
68         if self._stop:
69             return
70         async for line in self._read_fifo(self._fifo_path):
71             if self._stop:
72                 break
73             if self._joined_room_id != None:
74                 try:
75                     self._input_parser.feed_line(line)
76                 except Exception as e:
77                     self._log(logging.WARNING, "Error parsing input: %s" % str(e))
78                     continue
79                 for content in self._input_parser.fetch_decoded():
80                     await client.room_send(
81                         room_id=self._joined_room_id,
82                         message_type="m.room.message",
83                         content=content
84                     )
85             else:
86                 self._log(logging.WARNING, "Dropping message cause no room joined")
87
88     def _tickle_fifo(self):
89         fifo = -1
90         try:
91             fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK)
92             posix.write(fifo, "\n".encode())
93         except OSError as e:
94             if e.errno == errno.ENXIO:
95                 pass
96         finally:
97             if fifo != -1:
98                 posix.close(fifo)
99
100     async def _get_joined_room_id(self, client):
101         joined_room_ids = ( await client.joined_rooms() ).rooms
102         for room_id, room in client.rooms.items():
103             if room.display_name == self._room_name:
104                 if room_id in joined_room_ids:
105                     return room_id
106         return None
107
108     def _log(self, level, msg, *args, **kwargs):
109         logging.log(level, "P{%s}: %s" % (self._room_name, msg), *args, **kwargs)
110
111     @staticmethod
112     async def _read_fifo(file_path):
113         while True:
114             async with aiofiles.open(file_path) as fifo:
115                 async for line in fifo:
116                     yield(line)