From: Alexander Schmidt Date: Sat, 3 Oct 2020 18:58:29 +0000 (+0200) Subject: reworked presence X-Git-Url: http://git.treefish.org/~alex/mtxbot.git/commitdiff_plain/a6f58518069909db7b170ad374b13328986b98da reworked presence --- diff --git a/src/mtxbot.py b/src/mtxbot.py index 0372ffe..d92dcdc 100755 --- a/src/mtxbot.py +++ b/src/mtxbot.py @@ -16,10 +16,10 @@ assert sys.version_info >= (3, 5) from presence import Presence def handleInterrupt(sig, frame): - global stopped + global stop if os.getpid() == mainPid: logging.info( "Got stop signal." ) - stopped = True + stop = True for presence in presences: presence.stop() @@ -52,7 +52,7 @@ logging.basicConfig(format='[%(asctime)s] %(levelname)s: %(message)s', mainPid = os.getpid() signal.signal(signal.SIGINT, handleInterrupt) -stopped = False +stop = False if len(sys.argv) != 2: print("Usage: %s " % sys.argv[0]) @@ -61,7 +61,7 @@ if len(sys.argv) != 2: with open(sys.argv[1]) as configFile: config = json.load(configFile) -while not stopped: +while not stop: presences = [] for entry in os.listdir(config['paths']['fifodir']): fullpath = "%s/%s" % (config['paths']['fifodir'], entry) @@ -72,7 +72,7 @@ while not stopped: logging.error("No fifos could be found!") break asyncio.run(main()) - if not stopped: + if not stop: logging.warning("Main loop exited!") logging.info("Restarting after grace period...") time.sleep(3.0) diff --git a/src/presence.py b/src/presence.py index 0ab1ad4..e239bdb 100644 --- a/src/presence.py +++ b/src/presence.py @@ -8,18 +8,21 @@ class Presence: def __init__(self, room_name, fifo_path): self._room_name = room_name self._fifo_path = fifo_path - self._is_member = False + self._joined_room_id = None async def run(self, client): self._log(logging.INFO, "Presence running") self._stop = False - asyncio.create_task( self._do_watch(client) ) - while not self._stop: - room_id = await self._enter_room(client) - try: - await self._process_messages(client, room_id) - except Exception as e: - self._log(logging.ERROR, "Error processing messages: %s", str(e)) + + done, pending = await asyncio.wait( + [asyncio.create_task( self._room_joining_loop(client) ), + asyncio.create_task( self._message_loop(client) )], + return_when = asyncio.FIRST_EXCEPTION + ) + + for task in done: + task.result() + self._log(logging.INFO, "Presence stopped") def stop(self): @@ -31,40 +34,50 @@ class Presence: self._stop = True self._tickle_fifo() - async def _enter_room(self, client): + async def _room_joining_loop(self, client): while not self._stop: await client.sync() - room_id = await self._joined_room_id(client) - if room_id != None: - self._log(logging.INFO, "A room member already") - self._is_member = True - return room_id + prior_joined_room_id = self._joined_room_id + self._joined_room_id = await self._get_joined_room_id(client) - self._log(logging.INFO, "Not yet a room member") - - was_invited = False - for room_id, room in client.invited_rooms.items(): - if room.display_name == self._room_name: - was_invited = True - self._log(logging.INFO, "Joining room") - await client.join(room_id) - break - - if not was_invited: - self._log(logging.INFO, "Got no room invite yet") + if self._joined_room_id != None: + if self._joined_room_id != prior_joined_room_id: + self._log(logging.INFO, "Joined room") + else: + if self._joined_room_id != prior_joined_room_id: + self._log(logging.INFO, "Got kicked out of room?") + else: + self._log(logging.INFO, "Not yet a room member") + was_invited = False + for room_id, room in client.invited_rooms.items(): + if room.display_name == self._room_name: + was_invited = True + self._log(logging.INFO, "Joining room") + await client.join(room_id) + break + if not was_invited: + self._log(logging.INFO, "Got no room invite yet") await asyncio.sleep(3.0) - async def _do_watch(self, client): - while not self._stop: - await client.sync() - if await self._joined_room_id(client) == None: - self._is_member = False - self._tickle_fifo() + async def _message_loop(self, client): + if self._stop: + return + async for line in self._read_fifo(self._fifo_path): + if self._stop: + break + if self._joined_room_id != None: + await client.room_send( + room_id=self._joined_room_id, + message_type="m.room.message", + content={ + "msgtype": "m.text", + "body": line.rstrip("\n") + } + ) else: - self._is_member = True - await asyncio.sleep(3.0) + self._log(logging.WARNING, "Dropping message cause no room joined") def _tickle_fifo(self): fifo = -1 @@ -78,22 +91,7 @@ class Presence: if fifo != -1: posix.close(fifo) - async def _process_messages(self, client, room_id): - if self._stop or not self._is_member: - return - async for line in self._read_fifo(self._fifo_path): - if self._stop or not self._is_member: - break - await client.room_send( - room_id=room_id, - message_type="m.room.message", - content={ - "msgtype": "m.text", - "body": line.rstrip("\n") - } - ) - - async def _joined_room_id(self, client): + async def _get_joined_room_id(self, client): joined_room_ids = ( await client.joined_rooms() ).rooms for room_id, room in client.rooms.items(): if room.display_name == self._room_name: