]> git.treefish.org Git - mtxbot.git/blobdiff - src/presence.py
Fix presence shutdown
[mtxbot.git] / src / presence.py
index 0ab1ad44a3f2f43e7cb4c0032c199e91eff09051..d9b65294ff68d397a5f5250c3a35fc73c8e39ba1 100644 (file)
@@ -4,22 +4,28 @@ import errno
 import logging
 import posix
 
+from inputparser import InputParser
+
 class Presence:
     def __init__(self, room_name, fifo_path):
+        self._input_parser = InputParser()
         self._room_name = room_name
         self._fifo_path = fifo_path
-        self._is_member = False
+        self._joined_room_id = None
 
     async def run(self, client):
         self._log(logging.INFO, "Presence running")
         self._stop = False
-        asyncio.create_task( self._do_watch(client) )
-        while not self._stop:
-            room_id = await self._enter_room(client)
-            try:
-                await self._process_messages(client, room_id)
-            except Exception as e:
-                self._log(logging.ERROR, "Error processing messages: %s", str(e))
+
+        done, pending = await asyncio.wait(
+            [asyncio.create_task( self._room_joining_loop(client) ),
+             asyncio.create_task( self._message_loop(client) )],
+            return_when = asyncio.FIRST_EXCEPTION
+        )
+
+        for task in done:
+            task.result()
+
         self._log(logging.INFO, "Presence stopped")
 
     def stop(self):
@@ -31,40 +37,53 @@ class Presence:
         self._stop = True
         self._tickle_fifo()
 
-    async def _enter_room(self, client):
+    async def _room_joining_loop(self, client):
         while not self._stop:
             await client.sync()
 
-            room_id = await self._joined_room_id(client)
-            if room_id != None:
-                self._log(logging.INFO, "A room member already")
-                self._is_member = True
-                return room_id
-
-            self._log(logging.INFO, "Not yet a room member")
+            prior_joined_room_id = self._joined_room_id
+            self._joined_room_id = await self._get_joined_room_id(client)
 
-            was_invited = False
-            for room_id, room in client.invited_rooms.items():
-                if room.display_name == self._room_name:
-                    was_invited = True
-                    self._log(logging.INFO, "Joining room")
-                    await client.join(room_id)
-                    break
-
-            if not was_invited:
-                self._log(logging.INFO, "Got no room invite yet")
+            if self._joined_room_id != None:
+                if self._joined_room_id != prior_joined_room_id:
+                    self._log(logging.INFO, "Joined room")
+            else:
+                if self._joined_room_id != prior_joined_room_id:
+                    self._log(logging.INFO, "Got kicked out of room?")
+                else:
+                    self._log(logging.INFO, "Not yet a room member")
+                was_invited = False
+                for room_id, room in client.invited_rooms.items():
+                    if room.display_name == self._room_name:
+                        was_invited = True
+                        self._log(logging.INFO, "Joining room")
+                        await client.join(room_id)
+                        break
+                if not was_invited:
+                    self._log(logging.INFO, "Got no room invite yet")
 
             await asyncio.sleep(3.0)
 
-    async def _do_watch(self, client):
-        while not self._stop:
-            await client.sync()
-            if await self._joined_room_id(client) == None:
-                self._is_member = False
-                self._tickle_fifo()
+    async def _message_loop(self, client):
+        if self._stop:
+            return
+        async for line in self._read_fifo(self._fifo_path):
+            if self._stop:
+                break
+            if self._joined_room_id != None:
+                try:
+                    self._input_parser.feed_line(line)
+                except Exception as e:
+                    self._log(logging.WARNING, "Error parsing input: %s" % str(e))
+                    continue
+                for content in self._input_parser.fetch_decoded():
+                    await client.room_send(
+                        room_id=self._joined_room_id,
+                        message_type="m.room.message",
+                        content=content
+                    )
             else:
-                self._is_member = True
-            await asyncio.sleep(3.0)
+                self._log(logging.WARNING, "Dropping message cause no room joined")
 
     def _tickle_fifo(self):
         fifo = -1
@@ -78,22 +97,7 @@ class Presence:
             if fifo != -1:
                 posix.close(fifo)
 
-    async def _process_messages(self, client, room_id):
-        if self._stop or not self._is_member:
-            return
-        async for line in self._read_fifo(self._fifo_path):
-            if self._stop or not self._is_member:
-                break
-            await client.room_send(
-                room_id=room_id,
-                message_type="m.room.message",
-                content={
-                    "msgtype": "m.text",
-                    "body": line.rstrip("\n")
-                }
-            )
-
-    async def _joined_room_id(self, client):
+    async def _get_joined_room_id(self, client):
         joined_room_ids = ( await client.joined_rooms() ).rooms
         for room_id, room in client.rooms.items():
             if room.display_name == self._room_name: