--- /dev/null
+#!/usr/bin/env python3
+
+import asyncio
+import aiofiles
+import json
+import logging
+import nio
+import os
+import signal
+import stat
+import time
+
+import sys
+assert sys.version_info >= (3, 5)
+
+from presence import Presence
+
+def handleInterrupt(sig, frame):
+ global stopped
+ if os.getpid() == mainPid:
+ logging.info( "Got stop signal." )
+ stopped = True
+ for presence in presences:
+ presence.stop()
+
+async def main():
+ client = nio.AsyncClient(config['server']['url'], config['server']['user'])
+
+ try:
+ await client.login(config['server']['password'])
+
+ presence_tasks = []
+ for presence in presences:
+ presence_tasks.append( asyncio.create_task(presence.run(client)) )
+
+ done, pending = await asyncio.wait( presence_tasks,
+ return_when = asyncio.FIRST_EXCEPTION )
+
+ for task in done:
+ try:
+ task.result()
+ except Exception as e:
+ logging.error( "Error running task: %s" % str(e) )
+
+ finally:
+ await client.close()
+
+logging.basicConfig(format='[%(asctime)s] %(levelname)s: %(message)s',
+ level=logging.INFO,
+ datefmt='%m/%d/%Y %H:%M:%S')
+
+mainPid = os.getpid()
+signal.signal(signal.SIGINT, handleInterrupt)
+
+stopped = False
+
+if len(sys.argv) != 2:
+ print("Usage: %s <config json>" % sys.argv[0])
+ sys.exit(1)
+
+with open(sys.argv[1]) as configFile:
+ config = json.load(configFile)
+
+while not stopped:
+ presences = []
+ for entry in os.listdir(config['paths']['fifodir']):
+ if stat.S_ISFIFO(os.stat(entry).st_mode):
+ logging.info("Creating presence for %s..." % entry)
+ presences.append( Presence( entry, "%s/%s" %
+ (config['paths']['fifodir'], entry) ) )
+ if len(presences) == 0:
+ logging.error("No fifos could be found!")
+ break
+ asyncio.run(main())
+ if not stopped:
+ logging.warning("Main loop exited!")
+ logging.info("Restarting after grace period...")
+ time.sleep(3.0)
--- /dev/null
+import aiofiles
+import asyncio
+import errno
+import logging
+import posix
+
+class Presence:
+ def __init__(self, room_name, fifo_path):
+ self._room_name = room_name
+ self._fifo_path = fifo_path
+
+ async def run(self, client):
+ self._log(logging.INFO, "Presence running")
+ self._stop = False
+ while not self._stop:
+ room_id = await self._enter_room(client)
+ try:
+ await self._process_messages(client, room_id)
+ except Exception as e:
+ self._log(logging.ERROR, "Error processing messages: %s", str(e))
+ self._log(logging.INFO, "Presence stopped")
+
+ def stop(self):
+ self._stop = True
+ fifo = -1
+ try:
+ fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK)
+ posix.write(fifo, "\n".encode())
+ except OSError as e:
+ if e.errno == errno.ENXIO:
+ pass
+ finally:
+ if fifo != -1:
+ posix.close(fifo)
+
+ async def _enter_room(self, client):
+ while not self._stop:
+ await client.sync()
+
+ for room_id, room in client.rooms.items():
+ if room.display_name == self._room_name:
+ self._log(logging.INFO, "A room member already")
+ return room_id
+
+ self._log(logging.INFO, "Not yet a room member")
+
+ was_invited = False
+ for room_id, room in client.invited_rooms.items():
+ if room.display_name == self._room_name:
+ was_invited = True
+ self._log(logging.INFO, "Joining room")
+ await client.join(room_id)
+ break
+
+ if not was_invited:
+ self._log(logging.INFO, "Got no room invite yet")
+
+ await asyncio.sleep(3.0)
+
+ async def _process_messages(self, client, room_id):
+ if self._stop:
+ return
+ async for line in self._read_fifo(self._fifo_path):
+ if self._stop:
+ break
+ await client.room_send(
+ room_id=room_id,
+ message_type="m.room.message",
+ content={
+ "msgtype": "m.text",
+ "body": line.rstrip("\n")
+ }
+ )
+
+ def _log(self, level, msg, *args, **kwargs):
+ logging.log(level, "P{%s}: %s" % (self._room_name, msg), *args, **kwargs)
+
+ @staticmethod
+ async def _read_fifo(file_path):
+ while True:
+ async with aiofiles.open(file_path) as fifo:
+ async for line in fifo:
+ yield(line)