Reformat and Fix some bugs

This commit is contained in:
SuperSonic 2020-12-19 14:54:29 +08:00
parent e76ef511d8
commit eaa371e5be
No known key found for this signature in database
GPG key ID: E511B80256C9F20D
7 changed files with 141 additions and 108 deletions

View file

@ -9,6 +9,7 @@ file, You can obtain one at http://mozilla.org/MPL/2.0/.
from thrift.protocol import TCompactProtocol
from thrift.transport import THttpClient
from .config import YuukiConfig
from yuuki_core.TalkService import Client, TalkException
# NC HighSpeed Library
@ -19,15 +20,15 @@ except ImportError:
class YuukiConnect:
def __init__(self, configs):
def __init__(self, configs: YuukiConfig):
self.helper = {}
self.host = configs.connectInfo["Host"]
self.com_path = configs.connectInfo["Command_Path"]
self.poll_path = configs.connectInfo["LongPoll_path"]
self.host = configs.connect_info["Host"]
self.com_path = configs.connect_info["Command_Path"]
self.poll_path = configs.connect_info["LongPoll_path"]
self.con_header = configs.connectHeader
self.con_header = configs.connect_header
def connect(self, listen_timeout=600000):
transport = THttpClient.THttpClient(self.host + self.com_path)
@ -49,7 +50,7 @@ class YuukiConnect:
return client, listen
def helperConnect(self, auth_token):
def helper_connect(self, auth_token):
helper_connect_header = self.con_header.copy()
helper_connect_header["X-Line-Access"] = auth_token

View file

@ -69,7 +69,7 @@ class YuukiData:
initHeader = "<title>{} - SYB</title>" \
"<meta charset='utf-8' />"
def __init__(self, threading, mds_port):
def __init__(self, threading: bool, mds_port: int):
self.threading = threading
self.mds_port = mds_port
self.ThreadControl = YuukiThread()
@ -176,11 +176,11 @@ class YuukiData:
over.update(data)
return False
def file(self, Type, Mode, Format):
if Format == "Data":
return open(self.DataPath + self.DataName.format(Type), Mode)
elif Format == "Log":
return open(self.LogPath + self.LogName.format(Type), Mode)
def file(self, type_, mode_, format_):
if format_ == "Data":
return open(self.DataPath + self.DataName.format(type_), mode_)
elif format_ == "Log":
return open(self.LogPath + self.LogName.format(type_), mode_)
def sync_data(self):
if self.threading:

View file

@ -41,17 +41,22 @@ class YuukiCommand:
)
def _version(self, operation):
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.configs["version"])
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.configs["version"]
)
def _user_id(self, operation):
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("LINE System UserID:\n") + operation.message.from_)
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("LINE System UserID:\n") + operation.message.from_
)
def _get_all_helper(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group = self.YuukiDynamicTools \
.get_client(self.Yuuki.MyMID) \
.getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
YuukiStaticTools.get_group_creator(group).mid,
@ -59,9 +64,8 @@ class YuukiCommand:
]
if operation.message.from_ in group_privilege:
for user_id in self.Yuuki.Connect.helper:
self.YuukiDynamicTools.send_user(
YuukiStaticTools.send_to_who(operation),
user_id)
self.YuukiDynamicTools \
.send_user(YuukiStaticTools.send_to_who(operation), user_id)
def _speed(self, operation):
timer_start = time.time()
@ -82,20 +86,26 @@ class YuukiCommand:
if message_separated[1].isdigit() and 1 >= int(message_separated[1]) >= 0:
self.Yuuki.data.update_data(
["Global", "SecurityService"], bool(message_separated[1]))
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Okay"))
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Okay")
)
else:
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Enable(True): 1\nDisable(False): 0"))
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Enable(True): 1\nDisable(False): 0")
)
else:
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(operation),
str(bool(
self.Yuuki.data.get_data(["Global", "SecurityService"]))))
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
str(bool(self.Yuuki.data.get_data(["Global", "SecurityService"])))
)
def _switch(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group = self.YuukiDynamicTools \
.get_client(self.Yuuki.MyMID) \
.getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
YuukiStaticTools.get_group_creator(group).mid,
@ -113,7 +123,7 @@ class YuukiCommand:
message_separated = operation.message.text.split(" ")
status = []
unknown_msg = []
unknown_msgtext = ""
unknown_msg_text = ""
for count, code in enumerate(message_separated):
if code.isdigit() and 3 >= int(code) >= 0:
status.append(int(code))
@ -122,24 +132,24 @@ class YuukiCommand:
self.YuukiDynamicTools.config_security_status(
operation.message.to, status)
if unknown_msg:
unknown_msgtext = ", ".join(unknown_msg)
unknown_msg_text = ", ".join(unknown_msg)
if status:
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Okay"))
else:
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Not Found"))
if unknown_msgtext != "":
if unknown_msg_text != "":
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text(
"Notice: Unknown command line argument(s)") + "\n({})".format(unknown_msgtext)
self.Yuuki.get_text("Notice: Unknown command line argument(s)") + f"\n({unknown_msg_text})"
)
def _disable_all(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group = self.YuukiDynamicTools \
.get_client(self.Yuuki.MyMID) \
.getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
YuukiStaticTools.get_group_creator(group).mid,
@ -151,16 +161,18 @@ class YuukiCommand:
self.Yuuki.get_text("SecurityService of %s was disable") % (self.Yuuki.configs["name"],)
)
elif operation.message.from_ in group_privilege:
self.YuukiDynamicTools.config_security_status(
operation.message.to, [])
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Okay"))
self.YuukiDynamicTools.config_security_status(operation.message.to, [])
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Okay")
)
def _ext_admin(self, operation):
message_separated = operation.message.text.split(" ")
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group = self.YuukiDynamicTools \
.get_client(self.Yuuki.MyMID) \
.getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
YuukiStaticTools.get_group_creator(group).mid
@ -182,22 +194,24 @@ class YuukiCommand:
self.Yuuki.get_text("Added")
)
elif message_separated[2] not in self.Yuuki.data.get_data(["BlackList"]):
origin = self.Yuuki.data.get_data(
["Group", group.id, "Ext_Admin"])
origin = self.Yuuki.data.get_data(["Group", group.id, "Ext_Admin"])
ext_admin_list = origin.copy()
ext_admin_list.append(message_separated[2])
self.Yuuki.data.update_data(
["Group", group.id, "Ext_Admin"], ext_admin_list)
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Okay"))
self.Yuuki.data.update_data(["Group", group.id, "Ext_Admin"], ext_admin_list)
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Okay")
)
else:
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text(
"The User(s) was in our blacklist database."))
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("The User(s) was in our blacklist database.")
)
else:
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text(
"Wrong UserID or the guy is not in Group"))
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Wrong UserID or the guy is not in Group")
)
def _ext_admin_delete(self, operation, message_separated, group):
if message_separated[2] in self.Yuuki.data.get_group(group.id)["Ext_Admin"]:
@ -205,14 +219,18 @@ class YuukiCommand:
ext_admin_list = origin.copy()
ext_admin_list.remove(message_separated[2])
self.Yuuki.data.update_data(
["Group", group.id, "Ext_Admin"], ext_admin_list)
["Group", group.id, "Ext_Admin"],
ext_admin_list
)
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Okay")
)
else:
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Not Found"))
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Not Found")
)
def _ext_admin_query(self, operation, group):
if self.Yuuki.data.get_group(group.id)["Ext_Admin"]:
@ -220,7 +238,7 @@ class YuukiCommand:
status_added = []
for member in group.members:
if member.mid in self.Yuuki.data.get_group(group.id)["Ext_Admin"]:
status += "{}\n".format(member.displayName)
status += f"{member.displayName}\n"
status_added.append(member.mid)
for user_id in self.Yuuki.data.get_group(group.id)["Ext_Admin"]:
if user_id not in status_added:
@ -233,29 +251,28 @@ class YuukiCommand:
status + self.Yuuki.get_text("\nExtend Administrator(s)")
)
else:
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Not Found"))
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Not Found")
)
def _status(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group_status = self.Yuuki.data.get_se_group(
operation.message.to)
group = self.YuukiDynamicTools \
.get_client(self.Yuuki.MyMID) \
.getGroup(operation.message.to)
group_status = self.Yuuki.data.get_se_group(operation.message.to)
if not self.Yuuki.data.get_data(["Global", "SecurityService"]):
status = self.Yuuki.get_text("SecurityService of %s was disable") % (
self.Yuuki.configs["name"],
)
status = self.Yuuki.get_text("SecurityService of %s was disable") % (self.Yuuki.configs["name"],)
elif group_status is None:
status = self.Yuuki.get_text("Default without Initialize\nMain Admin of the Group:\n%s") % (
YuukiStaticTools.get_group_creator(
group).displayName,
)
status = self.Yuuki.get_text("Default without Initialize\nMain Admin of the Group:\n%s") % \
(YuukiStaticTools.get_group_creator(group).displayName,)
else:
status = self.Yuuki.get_text(
"SecurityService is Listening on\n"
"\nURL:%s\nInvite:%s\nJoin:%s\nMembers:%s\n"
"\nMain Admin of the Group:\n%s") % (
"\nMain Admin of the Group:\n%s") % \
(
group_status[OpType.NOTIFIED_UPDATE_GROUP],
group_status[OpType.NOTIFIED_INVITE_INTO_GROUP],
group_status[OpType.NOTIFIED_ACCEPT_GROUP_INVITATION],
@ -264,23 +281,25 @@ class YuukiCommand:
group).displayName,
)
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation), status)
YuukiStaticTools.send_to_who(operation),
status
)
def _group_backup(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group = self.YuukiDynamicTools \
.get_client(self.Yuuki.MyMID) \
.getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
YuukiStaticTools.get_group_creator(group).mid,
*self.Yuuki.data.get_group(group.id)["Ext_Admin"]
]
if operation.message.from_ in group_privilege:
group_members = [User.mid for User in group.members]
group_members = [user.mid for user in group.members]
group_invitations = None
if group.invitee:
group_invitations = [
User.mid for User in group.invitee]
group_invitations = [user.mid for user in group.invitee]
output_info = {
"OriginID": group.id,
"Members": group_members,
@ -295,9 +314,9 @@ class YuukiCommand:
def _quit(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID
).getGroup(operation.message.to)
group = self.YuukiDynamicTools \
.get_client(self.Yuuki.MyMID) \
.getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
YuukiStaticTools.get_group_creator(group).mid
@ -322,7 +341,7 @@ class YuukiCommand:
reporting_message = str(eval(command_message))
except:
(err1, err2, err3, error_info) = YuukiStaticTools.report_error()
reporting_message = "Star Yuuki BOT - Eval Error:\n%s\n%s\n%s\n\n%s" % (err1, err2, err3, error_info)
reporting_message = f"Star Yuuki BOT - Eval Error:\n{err1}\n{err2}\n{err3}\n\n{error_info}"
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(operation), reporting_message)
def _text(self, operation):
@ -360,12 +379,19 @@ class YuukiCommand:
if not contact_info:
msg = self.Yuuki.get_text("Not Found")
elif contact_info.mid in self.Yuuki.data.get_data(["BlackList"]):
msg = "{}\n{}".format(self.Yuuki.get_text(
"The User(s) was in our blacklist database."), contact_info.mid)
msg = "{}\n{}".format(
self.Yuuki.get_text("The User(s) was in our blacklist database."),
contact_info.mid
)
else:
msg = self.Yuuki.get_text("Name:%s\nPicture URL:%s/%s\nStatusMessage:\n%s\nLINE System UserID:%s") % \
(contact_info.displayName, self.Yuuki.LINE_Media_server, contact_info.pictureStatus,
contact_info.statusMessage, contact_info.mid)
(
contact_info.displayName,
self.Yuuki.LINE_Media_server,
contact_info.pictureStatus,
contact_info.statusMessage,
contact_info.mid
)
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(operation), msg)
def action(self, operation):
@ -376,11 +402,9 @@ class YuukiCommand:
pass
elif operation.message.toType == MIDType.ROOM:
self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID
).leaveRoom(
self.Yuuki.Seq, operation.message.to
)
self.YuukiDynamicTools \
.get_client(self.Yuuki.MyMID) \
.leaveRoom(self.Yuuki.Seq, operation.message.to)
elif operation.message.contentType == ContentType.NONE:
self._text(operation)

View file

@ -43,7 +43,9 @@ class YuukiJoinGroup:
self.YuukiDynamicTools.get_group_ticket(group_id, self.Yuuki.MyMID, True)
# Log
self.Yuuki.data.update_log(
"JoinGroup", (self.Yuuki.data.get_time(), group.name, group_id, inviter))
"JoinGroup",
(self.Yuuki.data.get_time(), group.name, group_id, inviter)
)
def _reject(self, group_id, inviter):
self.YuukiDynamicTools.send_text(
@ -54,7 +56,9 @@ class YuukiJoinGroup:
self.YuukiDynamicTools.get_client(self.Yuuki.MyMID).leave_group(self.Yuuki.Seq, group_id)
# Log
self.Yuuki.data.update_log(
"JoinGroup", (self.Yuuki.data.get_time(), group_id, "Not Join", inviter))
"JoinGroup",
(self.Yuuki.data.get_time(), group_id, "Not Join", inviter)
)
def _check_helper(self, operation, group_invitations, blocked_user):
if operation.param1 in self.Yuuki.data.get_data(["Global", "GroupJoined"]) and not blocked_user:
@ -76,8 +80,9 @@ class YuukiJoinGroup:
if self.YuukiDynamicTools.check_invitation(operation) and not blocked_user:
group_id = operation.param1
inviter = operation.param2
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(group_id)
group = self.YuukiDynamicTools \
.get_client(self.Yuuki.MyMID) \
.getGroup(group_id)
group_member = [user.mid for user in group.members] if group.members else []
group_invitations = [user.mid for user in group.invitee] if group.invitee else []
self.YuukiDynamicTools.get_client(self.Yuuki.MyMID).acceptGroupInvitation(self.Yuuki.Seq, group_id)

View file

@ -9,6 +9,7 @@ file, You can obtain one at http://mozilla.org/MPL/2.0/.
import socket
import time
from .yuuki import Yuuki
from yuuki_core.ttypes import Operation
from .tools import YuukiStaticTools, YuukiDynamicTools
@ -23,9 +24,9 @@ class YuukiPoll:
fetchNum = 50
cacheOperations = []
def __init__(self, Yuuki):
self.Yuuki = Yuuki
self.YuukiDynamicTools = YuukiDynamicTools(self.Yuuki)
def __init__(self, handler: Yuuki):
self.Yuuki = handler
self.YuukiDynamicTools = YuukiDynamicTools(handler)
def _action(self):
operation = Operation()

View file

@ -14,6 +14,7 @@ import sys
import traceback
import requests
from .yuuki import Yuuki
from yuuki_core.ttypes import OpType, MIDType, ContentType, Group, Message
@ -106,8 +107,8 @@ class YuukiStaticTools:
class YuukiDynamicTools:
def __init__(self, Yuuki):
self.Yuuki = Yuuki
def __init__(self, handler: Yuuki):
self.Yuuki = handler
def get_client(self, user_id):
"""
@ -135,12 +136,12 @@ class YuukiDynamicTools:
return True
return False
def switch_group_url_status(self, group, status, handlerId=None):
def switch_group_url_status(self, group, status, handler_id=None):
"""
Change LINE Group URL Status
:param group: Line Group
:param status: boolean
:param handlerId: string
:param handler_id: string
:return: None
"""
result = Group()
@ -148,7 +149,7 @@ class YuukiDynamicTools:
if key != "members" or key != "invitee":
result.__dict__[key] = group.__dict__[key]
result.preventJoinByTicket = not status
handler = self.Yuuki.MyMID if handlerId is None else handlerId
handler = self.Yuuki.MyMID if handler_id is None else handler_id
self.get_client(handler).updateGroup(self.Yuuki.Seq, result)
def config_security_status(self, group_id, status):
@ -352,6 +353,6 @@ class YuukiDynamicTools:
'params': json.dumps(params)
}
url = self.Yuuki.LINE_Media_server + '/talk/m/upload.nhn'
r = requests.post(url, headers=self.Yuuki.connectHeader, data=data, files=files)
r = requests.post(url, headers=self.Yuuki.Connect.con_header, data=data, files=files)
if r.status_code != 201:
self.send_text(send_to, "Error!")

View file

@ -15,6 +15,7 @@ import time
from git import Repo
from yuuki_core.ttypes import OpType
from .config import YuukiConfig
from .connection import YuukiConnect
from .data import YuukiData
from .events import YuukiCommand, YuukiJoinGroup, YuukiSecurity, YuukiCallback
@ -25,10 +26,10 @@ from .webadmin.server import YuukiWebAdmin
class Yuuki:
def __init__(self, yuuki_config):
def __init__(self, yuuki_config: YuukiConfig):
self.Connect = YuukiConnect(yuuki_config)
self.configs = yuuki_config.systemConfig
self.configs = yuuki_config.system_config
# Static_Variable
self.Thread_Control = YuukiMultiprocess()
@ -82,7 +83,7 @@ class Yuuki:
if self.configs.get("helper_LINE_ACCESS_KEYs"):
for access in self.configs["helper_LINE_ACCESS_KEYs"]:
self.Connect.helperConnect(access)
self.Connect.helper_connect(access)
# Dynamic Variable
self.get_text = self.i18n.gettext