import asyncio
import nio
from nio import AsyncClient , AsyncClientConfig , MatrixRoom , RoomMessageText , InviteEvent , UploadResponse , RedactionEvent , LoginResponse , Event
from nio import KeyVerificationCancel , KeyVerificationEvent , KeyVerificationKey , KeyVerificationMac , KeyVerificationStart , LocalProtocolError , ToDeviceError
import os , sys
import json
import aiofiles . os
import magic
from PIL import Image
import logging
logger = logging . getLogger ( __name__ )
class Callbacks ( object ) :
""" Class to pass client to callback methods. """
def __init__ ( self , client : AsyncClient ) :
self . client = client
self . client . add_event_callback ( self . message_cb , RoomMessageText )
self . client . add_event_callback ( self . invite_cb , InviteEvent )
self . client . add_event_callback ( self . redaction_cb , RedactionEvent )
self . message_callbacks = [ ]
self . message_redaction_callbacks = [ ]
def add_message_callback ( self , callback , redaction_callback = None ) :
self . message_callbacks . append ( callback )
if redaction_callback :
self . message_redaction_callbacks . append ( redaction_callback )
async def message_cb ( self , room : MatrixRoom , event : RoomMessageText ) - > None :
""" Got a message in a room """
logger . debug (
f " Message received in room { room . display_name } | "
f " { room . user_name ( event . sender ) } : { event . body } "
)
for cb in self . message_callbacks :
if asyncio . iscoroutinefunction ( cb ) :
await cb ( room , event )
else :
cb ( room , event )
async def redaction_cb ( self , room : MatrixRoom , event : RedactionEvent ) - > None :
""" Message was deleted """
logger . debug ( f " event redacted in room { room . room_id } . event_id: { event . redacts } " )
for cb in self . message_redaction_callbacks :
if asyncio . iscoroutinefunction ( cb ) :
await cb ( room , event )
else :
cb ( room , event )
async def invite_cb ( self , room : MatrixRoom , event : InviteEvent ) - > None :
""" Automatically join all rooms we get invited to """
result = await self . client . join ( room . room_id )
if isinstance ( result , nio . responses . JoinResponse ) :
logger . info ( ' Invited and joined room: {} {} ' . format ( room . name , room . room_id ) )
else :
logger . error ( " Error joining room: {} " . format ( str ( result ) ) )
async def to_device_callback ( self , event ) :
""" Handle events sent to device. """
if isinstance ( event , KeyVerificationStart ) : # first step
if " emoji " not in event . short_authentication_string :
logger . warning (
" Other device does not support emoji verification "
f " { event . short_authentication_string } . "
)
return
resp = await self . client . accept_key_verification ( event . transaction_id )
if isinstance ( resp , ToDeviceError ) :
logger . warning ( f " accept_key_verification failed with { resp } " )
sas = self . client . key_verifications [ event . transaction_id ]
todevice_msg = sas . share_key ( )
resp = await self . client . to_device ( todevice_msg )
if isinstance ( resp , ToDeviceError ) :
logger . warning ( f " to_device failed with { resp } " )
elif isinstance ( event , KeyVerificationCancel ) :
logger . warning (
f " Verification has been cancelled by { event . sender } "
f ' for reason " { event . reason } " . '
)
elif isinstance ( event , KeyVerificationKey ) : # second step
sas = self . client . key_verifications [ event . transaction_id ]
logger . info ( f " { sas . get_emoji ( ) } " )
#yn = input("Do the emojis match? (Y/N) (C for Cancel) ")
await asyncio . sleep ( 5 )
yn = ' y '
if yn . lower ( ) == " y " :
#print(
# "Match! The verification for this " "device will be accepted."
#)
resp = await self . client . confirm_short_auth_string ( event . transaction_id )
if isinstance ( resp , ToDeviceError ) :
logger . warning ( f " confirm_short_auth_string failed with { resp } " )
elif yn . lower ( ) == " n " : # no, don't match, reject
#print(
# "No match! Device will NOT be verified "
# "by rejecting verification."
#)
resp = await self . client . cancel_key_verification ( event . transaction_id , reject = True )
if isinstance ( resp , ToDeviceError ) :
logger . warning ( f " cancel_key_verification failed with { resp } " )
else : # C or anything for cancel
#print("Cancelled by user! Verification will be " "cancelled.")
resp = await self . client . cancel_key_verification ( event . transaction_id , reject = False )
if isinstance ( resp , ToDeviceError ) :
logger . warning ( f " cancel_key_verification failed with { resp } " )
elif isinstance ( event , KeyVerificationMac ) : # third step
sas = self . client . key_verifications [ event . transaction_id ]
try :
todevice_msg = sas . get_mac ( )
except LocalProtocolError as e :
# e.g. it might have been cancelled by ourselves
logger . warning (
f " Cancelled or protocol error: Reason: { e } . \n "
f " Verification with { event . sender } not concluded. "
" Try again? "
)
else :
resp = await self . client . to_device ( todevice_msg )
if isinstance ( resp , ToDeviceError ) :
logger . warning ( f " to_device failed with { resp } " )
logger . info (
f " sas.we_started_it = { sas . we_started_it } \n "
f " sas.sas_accepted = { sas . sas_accepted } \n "
f " sas.canceled = { sas . canceled } \n "
f " sas.timed_out = { sas . timed_out } \n "
f " sas.verified = { sas . verified } \n "
f " sas.verified_devices = { sas . verified_devices } \n "
)
logger . info (
" Emoji verification was successful! \n "
" Hit Control-C to stop the program or "
" initiate another Emoji verification from "
" another device or room. "
)
else :
logger . warning (
f " Received unexpected event type { type ( event ) } . "
f " Event is { event } . Event will be ignored. "
)
class ChatClient ( object ) :
def __init__ ( self , homeserver , user_id , password , device_name = " matrix-nio " ) :
self . homeserver = homeserver
self . user_id = user_id
self . password = password
self . device_name = device_name
self . synced = False
async def persist ( self , data_dir ) :
#self.data_dir = data_dir
self . config_file = f " { data_dir } /matrix_credentials.json "
self . store_path = f " { data_dir } /store/ "
os . makedirs ( data_dir , exist_ok = True )
os . makedirs ( self . store_path , exist_ok = True )
async def login ( self ) :
client_config = AsyncClientConfig (
max_limit_exceeded = 0 ,
max_timeouts = 0 ,
store_sync_tokens = True ,
encryption_enabled = False ,
)
if not hasattr ( self , ' config_file ' ) or not os . path . exists ( self . config_file ) :
logger . info ( f " No credentials file. Connecting to \" { self . homeserver } \" with user_id and password " )
if hasattr ( self , ' store_path ' ) :
if not os . path . exists ( self . store_path ) :
os . makedirs ( self . store_path )
else :
self . store_path = None
# initialize the matrix client
self . client = AsyncClient (
self . homeserver ,
self . user_id ,
store_path = self . store_path ,
config = client_config ,
)
self . callbacks = Callbacks ( self . client )
resp = await self . client . login ( self . password , device_name = self . device_name )
# check that we logged in succesfully
if isinstance ( resp , LoginResponse ) :
if hasattr ( self , ' config_file ' ) :
self . write_details_to_disk ( self . config_file , resp , self . homeserver )
else :
logger . error ( f ' homeserver = " { self . homeserver } " ; user = " { self . user_id } " ' )
logger . error ( f " Failed to log in: { resp } " )
sys . exit ( 1 )
else :
logger . info ( f " Logging in to \" { self . homeserver } \" using stored credentials. " )
with open ( self . config_file , " r " ) as f :
config = json . load ( f )
self . client = AsyncClient (
config [ " homeserver " ] ,
config [ " user_id " ] ,
device_id = config [ " device_id " ] ,
store_path = self . store_path ,
config = client_config ,
)
self . callbacks = Callbacks ( self . client )
# self.client.user_id=config["user_id"],
# self.client.device_id=config["device_id"],
# self.client.access_token=config["access_token"]
self . client . restore_login ( # the load_store() inside somehow makes the client.rooms empty when encrypted. you can just set the access_token. see commented code before
user_id = config [ " user_id " ] ,
device_id = config [ " device_id " ] ,
access_token = config [ " access_token " ]
)
# if os.path.exists(self.store_path + "megolm_keys"):
# await self.client.import_keys(self.store_path + "megolm_keys", "pass")
# self.client.load_store()
# if self.client.should_upload_keys:
# await self.client.keys_upload()
self . client . add_to_device_callback ( self . callbacks . to_device_callback , ( KeyVerificationEvent , ) )
logger . info ( f " Connected as \" { self . user_id } \" " )
sync_task = asyncio . create_task ( self . watch_for_sync ( ) )
return self . client
async def logout ( self ) :
logger . warning ( " Disconnected " )
await self . client . close ( )
async def send_message ( self , room_id , message ) :
try :
return await self . client . room_send (
room_id = room_id ,
message_type = " m.room.message " ,
content = {
" msgtype " : " m.text " ,
" body " : message
} ,
ignore_unverified_devices = True , # ToDo
)
except nio . exceptions . OlmUnverifiedDeviceError as err :
print ( " These are all known devices: " )
device_store : crypto . DeviceStore = device_store
[
print (
f " \t { device . user_id } \t { device . device_id } \t { device . trust_state } \t { device . display_name } "
)
for device in self . client . device_store
]
raise
async def send_image ( self , room_id , image ) :
""" Send image to room
https : / / matrix - nio . readthedocs . io / en / latest / examples . html #sending-an-image
"""
mime_type = magic . from_file ( image , mime = True ) # e.g. "image/jpeg"
if not mime_type . startswith ( " image/ " ) :
logger . error ( " Drop message because file does not have an image mime type. " )
return
im = Image . open ( image )
( width , height ) = im . size # im.size returns (width,height) tuple
# first do an upload of image, then send URI of upload to room
file_stat = await aiofiles . os . stat ( image )
async with aiofiles . open ( image , " r+b " ) as f :
resp , maybe_keys = await self . client . upload (
f ,
content_type = mime_type , # image/jpeg
filename = os . path . basename ( image ) ,
filesize = file_stat . st_size ,
)
if isinstance ( resp , UploadResponse ) :
logger . info ( " Image was uploaded successfully to server. " )
else :
logger . error ( f " Failed to upload image. Failure response: { resp } " )
content = {
" body " : os . path . basename ( image ) , # descriptive title
" info " : {
" size " : file_stat . st_size ,
" mimetype " : mime_type ,
" thumbnail_info " : None , # TODO
" w " : width , # width in pixel
" h " : height , # height in pixel
" thumbnail_url " : None , # TODO
} ,
" msgtype " : " m.image " ,
" url " : resp . content_uri ,
}
try :
await self . client . room_send ( room_id , message_type = " m.room.message " , content = content )
logger . info ( " Image was sent successfully " )
except Exception :
logger . error ( f " Image send of file { image } failed. " )
async def room_typing ( self , room_id , is_typing , timeout = 15000 ) :
if is_typing :
return await self . client . room_typing ( room_id , is_typing , timeout )
else :
return await self . client . room_typing ( room_id , False )
async def room_read_markers ( self , room_id , event1 , event2 ) :
return await self . client . room_read_markers ( room_id , event1 , event2 )
def sync_forever ( self , timeout = 30000 , full_state = True ) :
return self . client . sync_forever ( timeout , full_state )
async def watch_for_sync ( self ) :
logger . debug ( " Awaiting sync " )
await self . client . synced . wait ( )
logger . info ( " Client is synced " )
self . synced = True
logger . info ( f " { self . client . user_id } , { self . client . rooms } , { self . client . invited_rooms } , { self . client . encrypted_rooms } " )
# if os.path.exists(self.store_path + "megolm_keys"):
# os.remove(self.store_path + "megolm_keys", "pass")
# await self.client.export_keys(self.store_path + "megolm_keys", "pass")
def write_details_to_disk ( self , config_file : str , resp : LoginResponse , homeserver ) - > None :
""" Writes the required login details to disk so we can log in later without
using a password .
Arguments :
resp { LoginResponse } - - the successful client login response .
homeserver - - URL of homeserver , e . g . " https://matrix.example.org "
"""
# open the config file in write-mode
with open ( config_file , " w " ) as f :
# write the login details to disk
json . dump (
{
" homeserver " : homeserver , # e.g. "https://matrix.example.org"
" user_id " : resp . user_id , # e.g. "@user:example.org"
" device_id " : resp . device_id , # device ID, 10 uppercase letters
" access_token " : resp . access_token , # cryptogr. access token
} ,
f ,
)