yuuki/libs/yuuki.py
SuperSonic b3823a667f Update
2019-08-25 13:21:49 +08:00

555 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# coding=UTF-8
import os, time, \
requests, \
json, ntpath,\
traceback
from .core.TalkService import *
from .connection import Yuuki_Connect
from .data import Yuuki_Data
from .i18n import Yuuki_LangSetting
class Yuuki_Settings:
""" Yuuki Custom Settings """
config = {
"Seq": 0,
"Admin": [],
"SecurityService": False,
"Hour_KickLimit": 10,
"Hour_CancelLimit": 10,
"Default_Language": "en",
"GroupMebers_Demand": 100,
"helper_LINE_ACCESS_KEYs": []
}
class Yuuki:
def __init__(self, Yuuki_Settings, Yuuki_Connection):
self.YuukiConfigs = Yuuki_Settings.config
self.Seq = self.YuukiConfigs["Seq"]
self.Admin = self.YuukiConfigs["Admin"]
self.KickLimit = self.YuukiConfigs["Hour_KickLimit"]
self.CancelLimit = self.YuukiConfigs["Hour_CancelLimit"]
self.data = Yuuki_Data()
self.i18n = Yuuki_LangSetting(self.YuukiConfigs["Default_Language"])
self.LINE_Media_server = "https://obs.line-apps.com"
self.Connect = Yuuki_Connect(Yuuki_Connection)
(self.client, self.listen) = self.Connect.connect()
self.connectHeader = Yuuki_Connection.connectHeader
for access in self.YuukiConfigs["helper_LINE_ACCESS_KEYs"]:
self.Connect.helperConnect(access)
self.SecurityService = self.YuukiConfigs["SecurityService"]
self.MyMID = self.client.getProfile().mid
if len(self.data.getData("LimitInfo")) != 2:
self.data.updateData(self.data.Data, "LimitInfo", self.data.LimitType)
global _
_ = self.i18n._
# Basic Func
def exit(self, restart=False):
if restart:
with open(".cache.sh", "w") as c:
c.write(sys.executable + " ./main.py")
os.system("sh .cache.sh")
os.system("rm .cache.sh")
sys.exit(0)
else:
sys.exit(0)
def sybGetGroupCreator(self, group):
if group.creator == None:
contact = group.members[0]
else:
contact = group.creator
return contact
def readCommandLine(self, msgs):
replymsg = ""
for msg in msgs:
replymsg = replymsg + " " + msg
return replymsg
def checkInInvitationList(self, ncMessage, userId=None):
if userId == None:
userId = self.MyMID
if ncMessage.param3 == userId:
inList = True
elif "\x1e" in ncMessage.param3:
if self.MyMID in ncMessage.param3.split("\x1e"):
inList = True
else:
inList = False
else:
inList = False
return inList
def changeGroupUrlStatus(self, group, stat):
if stat == True:
us = False
else:
us = True
group.members, group.invitee = None, None
group.preventJoinByTicket = us
self.client.updateGroup(self.Seq, group)
def enableSecurityStatus(self, groupId, status):
group_status = self.data.SEGrouptype
if 0 in status:
group_status[OpType.NOTIFIED_UPDATE_GROUP] = True
if 1 in status:
group_status[OpType.NOTIFIED_INVITE_INTO_GROUP] = True
if 2 in status:
group_status[OpType.NOTIFIED_ACCEPT_GROUP_INVITATION] = True
if 3 in status:
group_status[OpType.NOTIFIED_KICKOUT_FROM_GROUP] = True
self.data.updateData(self.data.getGroup(groupId), "SEGroup", group_status)
def disableSecurityStatus(self, groupId, status):
group_status = self.data.SEGrouptype
if 0 in status:
group_status[OpType.NOTIFIED_UPDATE_GROUP] = False
if 1 in status:
group_status[OpType.NOTIFIED_INVITE_INTO_GROUP] = False
if 2 in status:
group_status[OpType.NOTIFIED_ACCEPT_GROUP_INVITATION] = False
if 3 in status:
group_status[OpType.NOTIFIED_KICKOUT_FROM_GROUP] = False
self.data.updateData(self.data.getGroup(groupId), "SEGroup", group_status)
def cleanMyGroupInvitations(self):
for cleanInvitations in self.client.getGroupIdsInvited():
self.client.acceptGroupInvitation(self.Seq, cleanInvitations)
self.client.leaveGroup(self.Seq, cleanInvitations)
def getContact(self, userId):
if len(userId) == len(self.MyMID) and userId[0] == "u":
try:
contactInfo = self.getContact(userId)
except:
contactInfo = False
else:
contactInfo = False
return contactInfo
def securityForWhere(self, Message):
if Message.type == OpType.NOTIFIED_UPDATE_GROUP:
return Message.param1, Message.param2
elif Message.type == OpType.NOTIFIED_INVITE_INTO_GROUP:
return Message.param1, Message.param2
elif Message.type == OpType.NOTIFIED_ACCEPT_GROUP_INVITATION:
return Message.param1, Message.param2
elif Message.type == OpType.NOTIFIED_KICKOUT_FROM_GROUP:
return Message.param1, Message.param2
def getClientByMid(self, userId):
Accounts = [self.client] + self.Connect.helper
for count, AccountUserId in enumerate([self.MyMID] + self.Connect.helper_ids):
if AccountUserId == userId:
return Accounts[count]
def limitReset(self, reconnect=False):
for userId in [self.MyMID] + self.Connect.helper_ids:
if reconnect:
if userId not in self.data.getLimit("Kick"):
self.data.updateData(self.data.getData("LimitInfo")["KickLimit"], userId, self.KickLimit)
if userId not in self.data.getLimit("Cancel"):
self.data.updateData(self.data.getData("LimitInfo")["CancelLimit"], userId, self.CancelLimit)
else:
self.data.updateData(self.data.getData("LimitInfo")["KickLimit"], userId, self.KickLimit)
self.data.updateData(self.data.getData("LimitInfo")["CancelLimit"], userId, self.CancelLimit)
def cancelSomeone(self, groupId, userId, exceptUserId=None):
if len(self.Connect.helper) >= 1:
accounts = self.data.getLimit("Cancel")
if exceptUserId:
accounts[exceptUserId] = -1
helper = max(accounts, key=accounts.get)
else:
if exceptUserId == self.MyMID:
return
helper = self.MyMID
Limit = self.data.getLimit("Cancel")[helper]
if Limit > 0:
self.getClientByMid(helper).cancelGroupInvitation(self.Seq, groupId, [userId])
self.data.updateData(self.data.getData("LimitInfo")["CancelLimit"], helper, Limit - 1)
else:
self.sendText(groupId, _("Cancel Limit."))
def kickSomeone(self, groupId, userId, exceptUserId=None):
if len(self.Connect.helper) >= 1:
accounts = self.data.getLimit("Kick")
if exceptUserId:
accounts[exceptUserId] = -1
helper = max(accounts, key=accounts.get)
else:
if exceptUserId == self.MyMID:
return "None"
helper = self.MyMID
Limit = self.data.getLimit("Kick")[helper]
if Limit > 0:
self.getClientByMid(helper).kickoutFromGroup(self.Seq, groupId, [userId])
self.data.updateData(self.data.getData("LimitInfo")["KickLimit"], helper, Limit - 1)
else:
self.sendText(groupId, _("Kick Limit."))
return helper
def sendToWho(self, Message):
if Message.message.toType == MIDType.USER:
return Message.message.from_
elif Message.message.toType == MIDType.ROOM:
return Message.message.to
elif Message.message.toType == MIDType.GROUP:
return Message.message.to
def sendText(self, toid, msg):
message = Message(to=toid, text=msg)
self.client.sendMessage(self.Seq, message)
def sendUser(self, toid, userId):
message = Message(
contentType=ContentType.CONTACT,
text='',
contentMetadata={
'mid': userId,
'displayName': 'LINE User',
},
to=toid
)
self.client.sendMessage(self.Seq, message)
def sendMedia(self, toid, type, path):
if os.path.exists(path):
file_name = ntpath.basename(path)
file_size = len(open(path, 'rb').read())
message = Message(to=toid, text=None)
message.contentType = type
message.contentPreview = None
message.contentMetadata = {
'FILE_NAME': str(file_name),
'FILE_SIZE': str(file_size),
}
if type == ContentType.FILE:
media_name = file_name
else:
media_name = 'media'
message_id = self.client.sendMessage(self.Seq, message).id
files = {
'file': open(path, 'rb'),
}
params = {
'name': media_name,
'oid': message_id,
'size': file_size,
'type': ContentType._VALUES_TO_NAMES[type].lower(),
'ver': '1.0',
}
data = {
'params': json.dumps(params)
}
url = self.LINE_Media_server + '/talk/m/upload.nhn'
r = requests.post(url, headers=self.connectHeader, data=data, files=files)
if r.status_code != 201:
self.sendText(toid, "Error!")
# Task
def JoinGroup(self, ncMessage):
"""
ToDo Type:
NOTIFIED_INVITE_INTO_GROUP (13)
"""
if self.checkInInvitationList(ncMessage):
GroupID = ncMessage.param1
Inviter = ncMessage.param2
GroupInfo = self.client.getGroup(GroupID)
GroupMember = [Catched.mid for Catched in GroupInfo.members]
if GroupInfo.members:
self.client.acceptGroupInvitation(self.Seq, GroupID)
if len(GroupMember) >= self.YuukiConfigs["GroupMebers_Demand"]:
self.sendText(GroupID, _("Helllo^^\nMy name is Yuuki ><\nNice to meet you OwO"))
self.sendText(GroupID, _("Admin of the Group\n%s") %
(self.sybGetGroupCreator(GroupInfo).displayName,))
# Log
self.data.updateLog("JoinGroup", (self.data.getTime(), GroupInfo.name, GroupID, Inviter))
else:
self.sendText(GroupID, _("Sorry...\nThe number of members is not satisfied (%s needed)") %
(self.YuukiConfigs["GroupMebers_Demand"],))
self.client.leaveGroup(self.Seq, GroupID)
# Log
self.data.updateLog("JoinGroup", (self.data.getTime(), GroupID, "Not Join", Inviter))
for userId in self.Connect.helper_ids:
if self.checkInInvitationList(ncMessage, userId):
self.getClientByMid(userId).acceptGroupInvitation(self.Seq, ncMessage.param1)
# Log
self.data.updateLog("JoinGroup", (self.data.getTime(), ncMessage.param1, userId, ncMessage.param2))
self.Security(ncMessage)
def Commands(self, ncMessage):
"""
ToDo Type:
RECEIVE_MESSAGE (26)
"""
if 'BOT_CHECK' in ncMessage.message.contentMetadata:
pass
elif ncMessage.message.toType == MIDType.ROOM:
self.client.leaveRoom(self.Seq, ncMessage.message.to)
elif ncMessage.message.contentType == ContentType.NONE:
msgSep = ncMessage.message.text.split(" ")
if 'Yuuki/UserID' == ncMessage.message.text:
self.sendText(self.sendToWho(ncMessage), _("LINE System UserID\n") + ncMessage.message.from_)
elif 'Yuuki/Speed' == ncMessage.message.text:
Time1 = time.time()
self.sendText(self.sendToWho(ncMessage), _("Testing..."))
Time2 = time.time()
self.sendText(self.sendToWho(ncMessage), _("Speed:\n%ss") % (Time2 - Time1,))
elif 'Yuuki/SecurityMode' == msgSep[0]:
if ncMessage.message.from_ in self.Admin:
if len(msgSep) == 2:
try:
status = int(msgSep[1])
self.SecurityService = bool(status)
except:
pass
else:
self.sendText(self.sendToWho(ncMessage), bool(self.SecurityService))
elif 'Yuuki/Enable' == msgSep[0]:
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.client.getGroup(ncMessage.message.to)
GroupPrivilege = self.Admin + [self.sybGetGroupCreator(GroupInfo).mid] + self.data.getGroup(GroupInfo.id)["Ext_Admin"]
if ncMessage.message.from_ in GroupPrivilege:
status = []
for code in msgSep:
try:
status.append(int(code))
except:
pass
self.enableSecurityStatus(ncMessage.message.to, status)
self.sendText(self.sendToWho(ncMessage), _("Okay"))
elif 'Yuuki/Disable' == msgSep[0]:
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.client.getGroup(ncMessage.message.to)
GroupPrivilege = self.Admin + [self.sybGetGroupCreator(GroupInfo).mid] + self.data.getGroup(GroupInfo.id)["Ext_Admin"]
if ncMessage.message.from_ in GroupPrivilege:
status = []
for code in msgSep:
try:
status.append(int(code))
except:
pass
self.disableSecurityStatus(ncMessage.message.to, status)
self.sendText(self.sendToWho(ncMessage), _("Okay"))
elif 'Yuuki/ExtAdmin' == msgSep[0]:
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.client.getGroup(ncMessage.message.to)
GroupPrivilege = self.Admin + [self.sybGetGroupCreator(GroupInfo).mid]
if ncMessage.message.from_ in GroupPrivilege:
if msgSep[1] == "add":
if msgSep[2] in [Member.mid for Member in GroupInfo.members]:
self.data.updateData(self.data.getData("Group")[GroupInfo.id], "Ext_Admin", msgSep[2])
self.sendText(self.sendToWho(ncMessage), _("Okay"))
else:
self.sendText(self.sendToWho(ncMessage), _("Wrong UserID or the guy is not in Group"))
elif msgSep[1] == "delete":
self.data.updateData(self.data.getData("Group")[GroupInfo.id], "Ext_Admin", msgSep[2])
self.sendText(self.sendToWho(ncMessage), _("Okay"))
else:
self.sendText(self.sendToWho(ncMessage), self.data.getGroup(GroupInfo.id)["Ext_Admin"])
elif 'Yuuki/Status' == ncMessage.message.text:
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.client.getGroup(ncMessage.message.to)
group_status = self.data.getSEGroup(ncMessage.message.to)
if group_status == None:
status = _("Disable without Initialize\nAdmin of the Group\n%s") % (
self.sybGetGroupCreator(GroupInfo).displayName,
)
else:
status = _("URL:%s\nInvite:%s\nJoin:%s\nMembers:%s\n\nAdmin of the Group\n%s") % (
group_status[OpType.NOTIFIED_UPDATE_GROUP],
group_status[OpType.NOTIFIED_INVITE_INTO_GROUP],
group_status[OpType.NOTIFIED_ACCEPT_GROUP_INVITATION],
group_status[OpType.NOTIFIED_KICKOUT_FROM_GROUP],
self.sybGetGroupCreator(GroupInfo).displayName,
)
self.sendText(self.sendToWho(ncMessage), status)
elif 'Yuuki/Quit' == ncMessage.message.text:
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.client.getGroup(ncMessage.message.to)
GroupPrivilege = self.Admin + [self.sybGetGroupCreator(GroupInfo).mid] + self.data.getGroup(GroupInfo.id)["Ext_Admin"]
if ncMessage.message.from_ in GroupPrivilege:
self.sendText(self.sendToWho(ncMessage), _("Bye Bye"))
self.client.leaveGroup(self.Seq, GroupInfo.id)
elif 'Yuuki/Exit' == ncMessage.message.text:
if ncMessage.message.from_ in self.Admin:
self.sendText(self.sendToWho(ncMessage), _("Exit."))
self.exit()
elif 'Yuuki/Com' == msgSep[0] and len(msgSep) != 1:
if ncMessage.message.from_ in self.Admin:
ComMsg = self.readCommandLine(msgSep[1:len(msgSep)])
self.sendText(self.sendToWho(ncMessage), str(eval(ComMsg)))
def Security(self, ncMessage):
"""
ToDo Type:
NOTIFIED_UPDATE_GROUP (11)
NOTIFIED_INVITE_INTO_GROUP (13)
NOTIFIED_ACCEPT_GROUP_INVITATION (17)
NOTIFIED_KICKOUT_FROM_GROUP (19)
"""
(GroupID, Action) = self.securityForWhere(ncMessage)
SEGroup = self.data.getSEGroup(GroupID)
GroupInfo = self.client.getGroup(GroupID)
GroupPrivilege = self.Admin + [self.sybGetGroupCreator(GroupInfo).mid] + self.data.getGroup(GroupInfo.id)["Ext_Admin"]
if Action in GroupPrivilege:
return
if SEGroup == None:
return
if SEGroup[ncMessage.type] and self.SecurityService:
if ncMessage.type == OpType.NOTIFIED_UPDATE_GROUP:
if ncMessage.param3 == '4':
if not GroupInfo.preventJoinByTicket:
self.changeGroupUrlStatus(GroupInfo, False)
Kicker = self.kickSomeone(GroupID, ncMessage.param2)
# Log
self.data.updateLog("KickEvent", (self.data.getTime(), GroupInfo.name, GroupID, Kicker, Action, ncMessage.param3, ncMessage.type))
elif ncMessage.type == OpType.NOTIFIED_INVITE_INTO_GROUP:
if "\x1e" in ncMessage.param3:
Canceler = "None"
for userId in ncMessage.param3.split("\x1e"):
if userId not in [self.MyMID] + self.Connect.helper_ids + GroupPrivilege:
Canceler = self.cancelSomeone(GroupID, userId)
# Log
self.data.updateLog("CancelEvent", (self.data.getTime(), GroupInfo.name, GroupID, Canceler, Action, ncMessage.param3))
elif ncMessage.param3 not in [self.MyMID] + self.Connect.helper_ids + GroupPrivilege:
Canceler = self.cancelSomeone(GroupID, ncMessage.param3)
# Log
self.data.updateLog("CancelEvent", (self.data.getTime(), GroupInfo.name, GroupID, Canceler, Action, ncMessage.param3))
elif ncMessage.type == OpType.NOTIFIED_ACCEPT_GROUP_INVITATION:
for userId in self.data.getData("BlackList"):
if userId == ncMessage.param2:
Kicker = self.kickSomeone(GroupID, userId)
# Log
self.data.updateLog("KickEvent", (self.data.getTime(), GroupInfo.name, GroupID, Kicker, Kicker, Action, ncMessage.type))
elif ncMessage.type == OpType.NOTIFIED_KICKOUT_FROM_GROUP:
if ncMessage.param2 in self.Connect.helper_ids:
# Log
self.data.updateLog("KickEvent", (self.data.getTime(), GroupInfo.name, GroupID, Action, Action, ncMessage.param3, ncMessage.type*10+1))
else:
if ncMessage.param3 in [self.MyMID] + self.Connect.helper_ids:
Kicker = "None"
try:
Kicker = self.kickSomeone(GroupID, ncMessage.param2, ncMessage.param3)
# Log
self.data.updateLog("KickEvent", (self.data.getTime(), GroupInfo.name, GroupID, Kicker, Action, ncMessage.param3, ncMessage.type*10+2))
except:
# Log
self.data.updateLog("KickEvent", (self.data.getTime(), GroupInfo.name, GroupID, Kicker, Action, ncMessage.param3, ncMessage.type*10+3))
self.data.updateData(self.data.getData("BlackList"), True, ncMessage.param2)
# Log
self.data.updateLog("BlackList", (self.data.getTime(), Action, GroupID))
else:
self.sendText(GroupID, _("Bye Bye"))
Kicker = self.kickSomeone(GroupID, ncMessage.param2)
# Log
self.data.updateLog("KickEvent", (self.data.getTime(), GroupInfo.name, GroupID, Kicker, Action, ncMessage.param3, ncMessage.type))
elif self.SecurityService:
if ncMessage.type == OpType.NOTIFIED_INVITE_INTO_GROUP:
for userId in self.data.getData("BlackList"):
if self.checkInInvitationList(ncMessage, userId):
Canceler = self.cancelSomeone(GroupID, userId)
# Log
self.data.updateLog("CancelEvent", (self.data.getTime(), GroupInfo.name, GroupID, Canceler, Action, ncMessage.param3))
elif ncMessage.type == OpType.NOTIFIED_ACCEPT_GROUP_INVITATION:
for userId in self.data.getData("BlackList"):
if userId == ncMessage.param2:
Kicker = self.kickSomeone(GroupID, userId)
# Log
self.data.updateLog("KickEvent", (self.data.getTime(), GroupInfo.name, GroupID, Kicker, Kicker, Action, ncMessage.type))
# Main
def Main(self):
NoWork = 0
catchedNews = []
ncMessage = Operation()
Revision = self.client.getLastOpRevision()
if "LastResetLimitTime" not in self.data.getData("Global"):
self.data.getData("Global")["LastResetLimitTime"] = None
if time.localtime().tm_hour == self.data.getData("Global")["LastResetLimitTime"]:
self.limitReset(True)
while True:
try:
if time.localtime().tm_hour != self.data.getData("Global")["LastResetLimitTime"]:
self.limitReset()
self.data.updateData(self.data.getData("Global"), "LastResetLimitTime", time.localtime().tm_hour)
if NoWork == 300:
Revision = self.client.getLastOpRevision()
catchedNews = self.listen.fetchOperations(Revision, 50)
if catchedNews:
NoWork = 0
for ncMessage in catchedNews:
if ncMessage.type == OpType.NOTIFIED_INVITE_INTO_GROUP:
self.JoinGroup(ncMessage)
elif ncMessage.type == OpType.NOTIFIED_KICKOUT_FROM_GROUP:
self.Security(ncMessage)
elif ncMessage.type == OpType.NOTIFIED_ACCEPT_GROUP_INVITATION:
self.Security(ncMessage)
elif ncMessage.type == OpType.NOTIFIED_UPDATE_GROUP:
self.Security(ncMessage)
elif ncMessage.type == OpType.RECEIVE_MESSAGE:
self.Commands(ncMessage)
if ncMessage.reqSeq != -1:
Revision = max(Revision, ncMessage.revision)
else:
NoWork = NoWork + 1
except SystemExit:
self.exit()
except EOFError:
pass
except:
err1, err2, err3 = sys.exc_info()
traceback.print_tb(err3)
tb_info = traceback.extract_tb(err3)
filename, line, func, text = tb_info[-1]
ErrorInfo = "occurred in\n{}\n\non line {}\nin statement {}".format(filename, line, text)
try:
if catchedNews and ncMessage:
Finded = False
for Catched in catchedNews:
if Catched.revision == ncMessage.revision:
Finded = True
if Finded:
Revision = Catched.revision
break
if not Finded:
Revision = self.client.getLastOpRevision()
for Root in self.Admin:
self.sendText(Root, "Star Yuuki BOT - Something was wrong...\nError:\n%s\n%s\n%s\n\n%s" %
(err1, err2, err3, ErrorInfo))
except:
print("Star Yuuki BOT - Damage!\nError:\n%s\n%s\n%s\n\n%s" % (err1, err2, err3, ErrorInfo))
self.exit()