From: Alexander Schmidt Date: Sat, 3 Oct 2020 00:50:28 +0000 (+0200) Subject: initial commit X-Git-Url: http://git.treefish.org/~alex/mtxbot.git/commitdiff_plain/b07be8cb62cc873d0c069c6410fdb0275620f66e?ds=inline initial commit --- b07be8cb62cc873d0c069c6410fdb0275620f66e diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f6a9ff7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/run +*~ diff --git a/doc/config.json b/doc/config.json new file mode 100644 index 0000000..ad3d114 --- /dev/null +++ b/doc/config.json @@ -0,0 +1,10 @@ +{ + "server": { + "url": "https://matrix.example.com", + "user": "@user:matrix.example.com", + "password": "WHATEVER" + }, + "paths": { + "fifodir": "/path/to/fifo/dir" + } +} diff --git a/src/mtxbot.py b/src/mtxbot.py new file mode 100755 index 0000000..0843f7d --- /dev/null +++ b/src/mtxbot.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 + +import asyncio +import aiofiles +import json +import logging +import nio +import os +import signal +import stat +import time + +import sys +assert sys.version_info >= (3, 5) + +from presence import Presence + +def handleInterrupt(sig, frame): + global stopped + if os.getpid() == mainPid: + logging.info( "Got stop signal." ) + stopped = True + for presence in presences: + presence.stop() + +async def main(): + client = nio.AsyncClient(config['server']['url'], config['server']['user']) + + try: + await client.login(config['server']['password']) + + presence_tasks = [] + for presence in presences: + presence_tasks.append( asyncio.create_task(presence.run(client)) ) + + done, pending = await asyncio.wait( presence_tasks, + return_when = asyncio.FIRST_EXCEPTION ) + + for task in done: + try: + task.result() + except Exception as e: + logging.error( "Error running task: %s" % str(e) ) + + finally: + await client.close() + +logging.basicConfig(format='[%(asctime)s] %(levelname)s: %(message)s', + level=logging.INFO, + datefmt='%m/%d/%Y %H:%M:%S') + +mainPid = os.getpid() +signal.signal(signal.SIGINT, handleInterrupt) + +stopped = False + +if len(sys.argv) != 2: + print("Usage: %s " % sys.argv[0]) + sys.exit(1) + +with open(sys.argv[1]) as configFile: + config = json.load(configFile) + +while not stopped: + presences = [] + for entry in os.listdir(config['paths']['fifodir']): + if stat.S_ISFIFO(os.stat(entry).st_mode): + logging.info("Creating presence for %s..." % entry) + presences.append( Presence( entry, "%s/%s" % + (config['paths']['fifodir'], entry) ) ) + if len(presences) == 0: + logging.error("No fifos could be found!") + break + asyncio.run(main()) + if not stopped: + logging.warning("Main loop exited!") + logging.info("Restarting after grace period...") + time.sleep(3.0) diff --git a/src/presence.py b/src/presence.py new file mode 100644 index 0000000..0a7b1a1 --- /dev/null +++ b/src/presence.py @@ -0,0 +1,83 @@ +import aiofiles +import asyncio +import errno +import logging +import posix + +class Presence: + def __init__(self, room_name, fifo_path): + self._room_name = room_name + self._fifo_path = fifo_path + + async def run(self, client): + self._log(logging.INFO, "Presence running") + self._stop = False + while not self._stop: + room_id = await self._enter_room(client) + try: + await self._process_messages(client, room_id) + except Exception as e: + self._log(logging.ERROR, "Error processing messages: %s", str(e)) + self._log(logging.INFO, "Presence stopped") + + def stop(self): + self._stop = True + fifo = -1 + try: + fifo = posix.open(self._fifo_path, posix.O_WRONLY | posix.O_NONBLOCK) + posix.write(fifo, "\n".encode()) + except OSError as e: + if e.errno == errno.ENXIO: + pass + finally: + if fifo != -1: + posix.close(fifo) + + async def _enter_room(self, client): + while not self._stop: + await client.sync() + + for room_id, room in client.rooms.items(): + if room.display_name == self._room_name: + self._log(logging.INFO, "A room member already") + return room_id + + self._log(logging.INFO, "Not yet a room member") + + was_invited = False + for room_id, room in client.invited_rooms.items(): + if room.display_name == self._room_name: + was_invited = True + self._log(logging.INFO, "Joining room") + await client.join(room_id) + break + + if not was_invited: + self._log(logging.INFO, "Got no room invite yet") + + await asyncio.sleep(3.0) + + async def _process_messages(self, client, room_id): + if self._stop: + return + async for line in self._read_fifo(self._fifo_path): + if self._stop: + break + await client.room_send( + room_id=room_id, + message_type="m.room.message", + content={ + "msgtype": "m.text", + "body": line.rstrip("\n") + } + ) + + def _log(self, level, msg, *args, **kwargs): + logging.log(level, "P{%s}: %s" % (self._room_name, msg), *args, **kwargs) + + @staticmethod + async def _read_fifo(file_path): + while True: + async with aiofiles.open(file_path) as fifo: + async for line in fifo: + yield(line)