import logging
import posix
+from inputparser import InputParser
+
class Presence:
def __init__(self, room_name, fifo_path):
+ self._input_parser = InputParser()
self._room_name = room_name
self._fifo_path = fifo_path
+ self._joined_room_id = None
async def run(self, client):
self._log(logging.INFO, "Presence running")
self._stop = False
- while not self._stop:
- room_id = await self._enter_room(client)
- try:
- await self._process_messages(client, room_id)
- except Exception as e:
- self._log(logging.ERROR, "Error processing messages: %s", str(e))
+
+ done, pending = await asyncio.wait(
+ [asyncio.create_task( self._room_joining_loop(client) ),
+ asyncio.create_task( self._message_loop(client) )],
+ return_when = asyncio.FIRST_EXCEPTION
+ )
+
+ for task in done:
+ task.result()
+
self._log(logging.INFO, "Presence stopped")
def stop(self):
+ asyncio.get_running_loop().call_soon_threadsafe(
+ Presence._do_stop, self
+ )
+
+ def _do_stop(self):
self._stop = True
- fifo = -1
- try:
- fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK)
- posix.write(fifo, "\n".encode())
- except OSError as e:
- if e.errno == errno.ENXIO:
- pass
- finally:
- if fifo != -1:
- posix.close(fifo)
+ self._tickle_fifo()
- async def _enter_room(self, client):
+ async def _room_joining_loop(self, client):
while not self._stop:
await client.sync()
- for room_id, room in client.rooms.items():
- if room.display_name == self._room_name:
- self._log(logging.INFO, "A room member already")
- return room_id
-
- self._log(logging.INFO, "Not yet a room member")
+ prior_joined_room_id = self._joined_room_id
+ self._joined_room_id = await self._get_joined_room_id(client)
- was_invited = False
- for room_id, room in client.invited_rooms.items():
- if room.display_name == self._room_name:
- was_invited = True
- self._log(logging.INFO, "Joining room")
- await client.join(room_id)
- break
-
- if not was_invited:
- self._log(logging.INFO, "Got no room invite yet")
+ if self._joined_room_id != None:
+ if self._joined_room_id != prior_joined_room_id:
+ self._log(logging.INFO, "Joined room")
+ else:
+ if self._joined_room_id != prior_joined_room_id:
+ self._log(logging.INFO, "Got kicked out of room?")
+ else:
+ self._log(logging.INFO, "Not yet a room member")
+ was_invited = False
+ for room_id, room in client.invited_rooms.items():
+ if room.display_name == self._room_name:
+ was_invited = True
+ self._log(logging.INFO, "Joining room")
+ await client.join(room_id)
+ break
+ if not was_invited:
+ self._log(logging.INFO, "Got no room invite yet")
await asyncio.sleep(3.0)
- async def _process_messages(self, client, room_id):
+ async def _message_loop(self, client):
if self._stop:
return
async for line in self._read_fifo(self._fifo_path):
if self._stop:
break
- await client.room_send(
- room_id=room_id,
- message_type="m.room.message",
- content={
- "msgtype": "m.text",
- "body": line.rstrip("\n")
- }
- )
+ if self._joined_room_id != None:
+ try:
+ self._input_parser.feed_line(line)
+ except Exception as e:
+ self._log(logging.WARNING, "Error parsing input: %s" % str(e))
+ continue
+ for content in self._input_parser.fetch_decoded():
+ await client.room_send(
+ room_id=self._joined_room_id,
+ message_type="m.room.message",
+ content=content
+ )
+ else:
+ self._log(logging.WARNING, "Dropping message cause no room joined")
+
+ def _tickle_fifo(self):
+ fifo = -1
+ try:
+ fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK)
+ posix.write(fifo, "\n".encode())
+ except OSError as e:
+ if e.errno == errno.ENXIO:
+ pass
+ finally:
+ if fifo != -1:
+ posix.close(fifo)
+
+ async def _get_joined_room_id(self, client):
+ joined_room_ids = ( await client.joined_rooms() ).rooms
+ for room_id, room in client.rooms.items():
+ if room.display_name == self._room_name:
+ if room_id in joined_room_ids:
+ return room_id
+ return None
def _log(self, level, msg, *args, **kwargs):
logging.log(level, "P{%s}: %s" % (self._room_name, msg), *args, **kwargs)