Chatbot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

359 lines
15 KiB

import asyncio
import nio
from nio import AsyncClient, AsyncClientConfig, MatrixRoom, RoomMessageText, InviteEvent, UploadResponse, RedactionEvent, LoginResponse, Event
from nio import KeyVerificationCancel, KeyVerificationEvent, KeyVerificationKey, KeyVerificationMac, KeyVerificationStart, LocalProtocolError, ToDeviceError
import os, sys
import json
import aiofiles.os
import magic
from PIL import Image
import logging
logger = logging.getLogger(__name__)
class Callbacks(object):
"""Class to pass client to callback methods."""
def __init__(self):
self.message_callbacks = []
self.message_redaction_callbacks = []
def setup_callbacks(self, client: AsyncClient):
self.client = client
self.client.add_event_callback(self.message_cb, RoomMessageText)
self.client.add_event_callback(self.invite_cb, InviteEvent)
self.client.add_event_callback(self.redaction_cb, RedactionEvent)
def add_message_callback(self, callback, redaction_callback=None):
self.message_callbacks.append(callback)
if redaction_callback:
self.message_redaction_callbacks.append(redaction_callback)
async def message_cb(self, room: MatrixRoom, event: RoomMessageText) -> None:
"""Got a message in a room"""
logger.debug(
f"Message received in room {room.display_name} | "
f"{room.user_name(event.sender)}: {event.body}"
)
for cb in self.message_callbacks:
if asyncio.iscoroutinefunction(cb):
await cb(room, event)
else:
cb(room, event)
async def redaction_cb(self, room: MatrixRoom, event: RedactionEvent) -> None:
"""Message was deleted"""
logger.debug(f"event redacted in room {room.room_id}. event_id: {event.redacts}")
for cb in self.message_redaction_callbacks:
if asyncio.iscoroutinefunction(cb):
await cb(room, event)
else:
cb(room, event)
async def invite_cb(self, room: MatrixRoom, event: InviteEvent) -> None:
"""Automatically join all rooms we get invited to"""
result = await self.client.join(room.room_id)
if isinstance(result, nio.responses.JoinResponse):
logger.info('Invited and joined room: {} {}'.format(room.name, room.room_id))
else:
logger.error("Error joining room: {}".format(str(result)))
async def to_device_callback(self, event):
"""Handle events sent to device."""
if isinstance(event, KeyVerificationStart): # first step
if "emoji" not in event.short_authentication_string:
logger.warning(
"Other device does not support emoji verification "
f"{event.short_authentication_string}."
)
return
resp = await self.client.accept_key_verification(event.transaction_id)
if isinstance(resp, ToDeviceError):
logger.warning(f"accept_key_verification failed with {resp}")
sas = self.client.key_verifications[event.transaction_id]
todevice_msg = sas.share_key()
resp = await self.client.to_device(todevice_msg)
if isinstance(resp, ToDeviceError):
logger.warning(f"to_device failed with {resp}")
elif isinstance(event, KeyVerificationCancel):
logger.warning(
f"Verification has been cancelled by {event.sender} "
f'for reason "{event.reason}".'
)
elif isinstance(event, KeyVerificationKey): # second step
sas = self.client.key_verifications[event.transaction_id]
logger.info(f"{sas.get_emoji()}")
#yn = input("Do the emojis match? (Y/N) (C for Cancel) ")
await asyncio.sleep(5)
yn = 'y'
if yn.lower() == "y":
#print(
# "Match! The verification for this " "device will be accepted."
#)
resp = await self.client.confirm_short_auth_string(event.transaction_id)
if isinstance(resp, ToDeviceError):
logger.warning(f"confirm_short_auth_string failed with {resp}")
elif yn.lower() == "n": # no, don't match, reject
#print(
# "No match! Device will NOT be verified "
# "by rejecting verification."
#)
resp = await self.client.cancel_key_verification(event.transaction_id, reject=True)
if isinstance(resp, ToDeviceError):
logger.warning(f"cancel_key_verification failed with {resp}")
else: # C or anything for cancel
#print("Cancelled by user! Verification will be " "cancelled.")
resp = await self.client.cancel_key_verification(event.transaction_id, reject=False)
if isinstance(resp, ToDeviceError):
logger.warning(f"cancel_key_verification failed with {resp}")
elif isinstance(event, KeyVerificationMac): # third step
sas = self.client.key_verifications[event.transaction_id]
try:
todevice_msg = sas.get_mac()
except LocalProtocolError as e:
# e.g. it might have been cancelled by ourselves
logger.warning(
f"Cancelled or protocol error: Reason: {e}.\n"
f"Verification with {event.sender} not concluded. "
"Try again?"
)
else:
resp = await self.client.to_device(todevice_msg)
if isinstance(resp, ToDeviceError):
logger.warning(f"to_device failed with {resp}")
logger.info(
f"sas.we_started_it = {sas.we_started_it}\n"
f"sas.sas_accepted = {sas.sas_accepted}\n"
f"sas.canceled = {sas.canceled}\n"
f"sas.timed_out = {sas.timed_out}\n"
f"sas.verified = {sas.verified}\n"
f"sas.verified_devices = {sas.verified_devices}\n"
)
logger.info(
"Emoji verification was successful!\n"
"Hit Control-C to stop the program or "
"initiate another Emoji verification from "
"another device or room."
)
else:
logger.warning(
f"Received unexpected event type {type(event)}. "
f"Event is {event}. Event will be ignored."
)
class ChatClient(object):
def __init__(self, homeserver, user_id, password, device_name="matrix-nio"):
self.homeserver = homeserver
self.user_id = user_id
self.password = password
self.device_name = device_name
self.synced = False
self.callbacks = Callbacks()
async def persist(self, data_dir):
#self.data_dir = data_dir
self.config_file = f"{data_dir}/matrix_credentials.json"
self.store_path = f"{data_dir}/store/"
os.makedirs(data_dir, exist_ok=True)
os.makedirs(self.store_path, exist_ok=True)
async def login(self):
client_config = AsyncClientConfig(
max_limit_exceeded=0,
max_timeouts=0,
store_sync_tokens=True,
encryption_enabled=False,
)
if not hasattr(self, 'config_file') or not os.path.exists(self.config_file):
logger.info(f"No credentials file. Connecting to \"{self.homeserver}\" with user_id and password")
if hasattr(self, 'store_path'):
if not os.path.exists(self.store_path):
os.makedirs(self.store_path)
else:
self.store_path=None
# initialize the matrix client
self.client = AsyncClient(
self.homeserver,
self.user_id,
store_path=self.store_path,
config=client_config,
)
self.callbacks.setup_callbacks(self.client)
resp = await self.client.login(self.password, device_name=self.device_name)
# check that we logged in succesfully
if isinstance(resp, LoginResponse):
if hasattr(self, 'config_file'):
self.write_details_to_disk(self.config_file, resp, self.homeserver)
else:
logger.error(f'homeserver = "{self.homeserver}"; user = "{self.user_id}"')
logger.error(f"Failed to log in: {resp}")
sys.exit(1)
else:
logger.info(f"Logging in to \"{self.homeserver}\" using stored credentials.")
with open(self.config_file, "r") as f:
config = json.load(f)
self.client = AsyncClient(
config["homeserver"],
config["user_id"],
device_id=config["device_id"],
store_path=self.store_path,
config=client_config,
)
self.callbacks.setup_callbacks(self.client)
# self.client.user_id=config["user_id"],
# self.client.device_id=config["device_id"],
# self.client.access_token=config["access_token"]
self.client.restore_login( # the load_store() inside somehow makes the client.rooms empty when encrypted. you can just set the access_token. see commented code before
user_id=config["user_id"],
device_id=config["device_id"],
access_token=config["access_token"]
)
# if os.path.exists(self.store_path + "megolm_keys"):
# await self.client.import_keys(self.store_path + "megolm_keys", "pass")
# self.client.load_store()
# if self.client.should_upload_keys:
# await self.client.keys_upload()
self.client.add_to_device_callback(self.callbacks.to_device_callback, (KeyVerificationEvent,))
logger.info(f"Connected as \"{self.user_id}\"")
sync_task = asyncio.create_task(self.watch_for_sync())
return self.client
async def logout(self):
logger.warning("Disconnected")
await self.client.close()
async def send_message(self, room_id, message):
try:
return await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message
},
ignore_unverified_devices = True, # ToDo
)
except nio.exceptions.OlmUnverifiedDeviceError as err:
print("These are all known devices:")
device_store: crypto.DeviceStore = device_store
[
print(
f"\t{device.user_id}\t {device.device_id}\t {device.trust_state}\t {device.display_name}"
)
for device in self.client.device_store
]
raise
async def send_image(self, room_id, image):
"""Send image to room
https://matrix-nio.readthedocs.io/en/latest/examples.html#sending-an-image
"""
mime_type = magic.from_file(image, mime=True) # e.g. "image/jpeg"
if not mime_type.startswith("image/"):
logger.error("Drop message because file does not have an image mime type.")
return
im = Image.open(image)
(width, height) = im.size # im.size returns (width,height) tuple
# first do an upload of image, then send URI of upload to room
file_stat = await aiofiles.os.stat(image)
async with aiofiles.open(image, "r+b") as f:
resp, maybe_keys = await self.client.upload(
f,
content_type=mime_type, # image/jpeg
filename=os.path.basename(image),
filesize=file_stat.st_size,
)
if isinstance(resp, UploadResponse):
logger.info("Image was uploaded successfully to server. ")
else:
logger.error(f"Failed to upload image. Failure response: {resp}")
content = {
"body": os.path.basename(image), # descriptive title
"info": {
"size": file_stat.st_size,
"mimetype": mime_type,
"thumbnail_info": None, # TODO
"w": width, # width in pixel
"h": height, # height in pixel
"thumbnail_url": None, # TODO
},
"msgtype": "m.image",
"url": resp.content_uri,
}
try:
await self.client.room_send(room_id, message_type="m.room.message", content=content)
logger.info("Image was sent successfully")
except Exception:
logger.error(f"Image send of file {image} failed.")
async def room_typing(self, room_id, is_typing, timeout=15000):
if is_typing:
return await self.client.room_typing(room_id, is_typing, timeout)
else:
return await self.client.room_typing(room_id, False)
async def room_read_markers(self, room_id, event1, event2):
return await self.client.room_read_markers(room_id, event1, event2)
def sync_forever(self, timeout=30000, full_state=True):
return self.client.sync_forever(timeout, full_state)
async def watch_for_sync(self):
logger.debug("Awaiting sync")
await self.client.synced.wait()
logger.info("Client is synced")
self.synced = True
#logger.info(f"{self.client.user_id}, {self.client.rooms}, {self.client.invited_rooms}, {self.client.encrypted_rooms}")
# if os.path.exists(self.store_path + "megolm_keys"):
# os.remove(self.store_path + "megolm_keys", "pass")
# await self.client.export_keys(self.store_path + "megolm_keys", "pass")
def write_details_to_disk(self, config_file: str, resp: LoginResponse, homeserver) -> None:
"""Writes the required login details to disk so we can log in later without
using a password.
Arguments:
resp {LoginResponse} -- the successful client login response.
homeserver -- URL of homeserver, e.g. "https://matrix.example.org"
"""
# open the config file in write-mode
with open(config_file, "w") as f:
# write the login details to disk
json.dump(
{
"homeserver": homeserver, # e.g. "https://matrix.example.org"
"user_id": resp.user_id, # e.g. "@user:example.org"
"device_id": resp.device_id, # device ID, 10 uppercase letters
"access_token": resp.access_token, # cryptogr. access token
},
f,
)