Try to reformat with following PEP8

This commit is contained in:
SuperSonic 2020-12-19 03:31:48 +08:00
parent d7c01ca323
commit 78404d42e1
No known key found for this signature in database
GPG key ID: E511B80256C9F20D
18 changed files with 878 additions and 875 deletions

View file

@ -6,7 +6,7 @@ This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
from .config import Yuuki_Config
from .config import YuukiConfig
from .yuuki import Yuuki
__all__ = ['connection', 'data', 'data_mds', 'thread_control', 'Yuuki', 'Yuuki_Config']
__all__ = ['connection', 'data', 'data_mds', 'thread_control', 'Yuuki', 'YuukiConfig']

View file

@ -11,7 +11,7 @@ import os
import yaml
class Yuuki_Config:
class YuukiConfig:
"""
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@ -23,19 +23,19 @@ class Yuuki_Config:
"""
connectInfo = {
connect_info = {
"Host": "",
"Command_Path": "",
"LongPoll_path": "",
}
connectHeader = {
connect_header = {
"X-Line-Application": "",
"X-Line-Access": "",
"User-Agent": ""
}
systemConfig = {
system_config = {
"name": "Yuuki",
"version": "v6.5.3-beta_RC0",
"version_check": True,
@ -54,7 +54,7 @@ class Yuuki_Config:
"Hour_KickLimit": 10,
"Hour_CancelLimit": 10,
"Default_Language": "en",
"GroupMebers_Demand": 100,
"GroupMembers_Demand": 100,
"helper_LINE_ACCESS_KEYs": [],
}
@ -62,30 +62,30 @@ class Yuuki_Config:
assert os.path.isfile(config_path), "The config file, `config.yaml` was not found."
with open(config_path, "r") as configfile:
self.config = yaml.load(configfile, Loader=yaml.FullLoader)
self._yuuki_config()
self._config_yuuki()
def _yuuki_config(self):
def _config_yuuki(self):
assert self.config is not None, "Invalid config file"
if "Yuuki" in self.config:
for key in self.config["Yuuki"]:
if key in self.systemConfig:
self.systemConfig[key] = self.config["Yuuki"][key]
return self._server_config()
if key in self.system_config:
self.system_config[key] = self.config["Yuuki"][key]
return self._config_server()
def _server_config(self):
def _config_server(self):
if "Server" in self.config.get("LINE"):
for key in self.config["LINE"]["Server"]:
if key in self.connectInfo:
self.connectInfo[key] = self.config["LINE"]["Server"][key]
return self._account_config()
if key in self.connect_info:
self.connect_info[key] = self.config["LINE"]["Server"][key]
return self._config_account()
def _account_config(self):
def _config_account(self):
if "Account" in self.config.get("LINE"):
for key in self.config["LINE"]["Account"]:
if key in ["X-Line-Application", "User-Agent"]:
self.config["LINE"]["Account"][key] = self.config["LINE"]["Account"][key].replace("\\t", "\t")
if key in self.connectHeader:
self.connectHeader[key] = self.config["LINE"]["Account"][key]
if key in self.connect_header:
self.connect_header[key] = self.config["LINE"]["Account"][key]
if "helper_LINE_ACCESS_KEYs" in self.config.get("LINE"):
self.systemConfig["helper_LINE_ACCESS_KEYs"] = self.config["LINE"]["helper_LINE_ACCESS_KEYs"]
self.system_config["helper_LINE_ACCESS_KEYs"] = self.config["LINE"]["helper_LINE_ACCESS_KEYs"]

View file

@ -18,16 +18,16 @@ except ImportError:
print("[No fast_binary using]")
class Yuuki_Connect:
def __init__(self, Yuuki_Configs):
class YuukiConnect:
def __init__(self, configs):
self.helper = {}
self.host = Yuuki_Configs.connectInfo["Host"]
self.com_path = Yuuki_Configs.connectInfo["Command_Path"]
self.poll_path = Yuuki_Configs.connectInfo["LongPoll_path"]
self.host = configs.connectInfo["Host"]
self.com_path = configs.connectInfo["Command_Path"]
self.poll_path = configs.connectInfo["LongPoll_path"]
self.con_header = Yuuki_Configs.connectHeader
self.con_header = configs.connectHeader
def connect(self, listen_timeout=600000):
transport = THttpClient.THttpClient(self.host + self.com_path)

View file

@ -15,11 +15,11 @@ from tornado.httpclient import HTTPClient, HTTPRequest
from yuuki_core.ttypes import OpType
from .data_mds import PythonMDS
from .thread_control import Yuuki_Multiprocess
from .thread_control import Yuuki_Thread
from .thread_control import YuukiMultiprocess
from .thread_control import YuukiThread
class Yuuki_Data:
class YuukiData:
# Data Struct Define
Data = {}
@ -72,11 +72,11 @@ class Yuuki_Data:
def __init__(self, threading, mds_port):
self.threading = threading
self.mds_port = mds_port
self.ThreadControl = Yuuki_Thread()
self.MdsThreadControl = Yuuki_Multiprocess()
self._Data_Initialize()
self.ThreadControl = YuukiThread()
self.MdsThreadControl = YuukiMultiprocess()
self._data_initialize()
def _Data_Initialize(self):
def _data_initialize(self):
if not os.path.isdir(self.DataPath):
os.mkdir(self.DataPath)
@ -94,9 +94,9 @@ class Yuuki_Data:
except ValueError:
assert "{}\nJson Test Error".format(name)
return self._MDS_Initialize()
return self._mds_initialize()
def _MDS_Initialize(self):
def _mds_initialize(self):
if self.threading:
mds = PythonMDS(self.mds_port)
self.mdsHost = "http://localhost:{}/".format(self.mds_port)
@ -106,10 +106,10 @@ class Yuuki_Data:
# MDS Sync
time.sleep(1)
self.mdsShake("SYC", self.Data)
return self._Log_Initialize()
self.mds_shake("SYC", self.Data)
return self._log_initialize()
def _Log_Initialize(self):
def _log_initialize(self):
if not os.path.isdir(self.LogPath):
os.mkdir(self.LogPath)
@ -119,15 +119,15 @@ class Yuuki_Data:
with open(name, "w") as f:
f.write(self.initHeader.format(Type))
def ThreadExec(self, Function, args):
def thread_append(self, function_, args):
if self.threading:
self.ThreadControl.lock.acquire()
self.ThreadControl.add(Function, args)
self.ThreadControl.add(function_, args)
self.ThreadControl.lock.release()
else:
Function(*args)
function_(*args)
def mdsShake(self, do, path, data=None):
def mds_shake(self, do, path, data=None):
status = 0
if self.threading:
http_client = HTTPClient()
@ -182,81 +182,81 @@ class Yuuki_Data:
elif Format == "Log":
return open(self.LogPath + self.LogName.format(Type), Mode)
def syncData(self):
def sync_data(self):
if self.threading:
self.Data = self.getData([])
self.Data = self.get_data([])
for Type in self.DataType:
with self.file(Type, "w", "Data") as f:
json.dump(self.Data[Type], f)
return self.getData(["Global", "Power"])
return self.get_data(["Global", "Power"])
def updateData(self, path, data):
def update_data(self, path, data):
if self.threading:
self.ThreadExec(self._updateData, (path, data))
self.thread_append(self._update_data, (path, data))
else:
self._updateData(path, data)
self._update_data(path, data)
def _updateData(self, path, data):
assert path and type(path) is list, "Empty path - updateData"
def _update_data(self, path, data):
assert path and type(path) is list, "Empty path - update_data"
if len(path) == 1:
origin_data = self.getData([])
assert type(origin_data) is dict, "Error origin data type (1) - updateData"
origin_data = self.get_data([])
assert type(origin_data) is dict, "Error origin data type (1) - update_data"
origin = origin_data.copy()
origin[path[0]] = data
path = []
else:
origin_data = self.getData(path[:-1])
assert type(origin_data) is dict, "Error origin data type (2) - updateData"
origin_data = self.get_data(path[:-1])
assert type(origin_data) is dict, "Error origin data type (2) - update_data"
origin = origin_data.copy()
origin[path[-1]] = data
path = path[:-1]
assert type(origin) is dict, "Error request data type - updateData"
assert type(origin) is dict, "Error request data type - update_data"
if self.threading:
self.mdsShake("UPT", path, origin)
self.mds_shake("UPT", path, origin)
else:
self._local_update(path, origin)
def updateLog(self, Type, Data):
def update_log(self, type_, data):
if self.threading:
self.ThreadExec(self._updateLog, (Type, Data))
self.thread_append(self._update_log, (type_, data))
else:
self._updateLog(Type, Data)
self._update_log(type_, data)
def _updateLog(self, Type, Data):
with self.file(Type, "a", "Log") as f:
f.write(self.LogType[Type] % Data)
def _update_log(self, type_, data):
with self.file(type_, "a", "Log") as f:
f.write(self.LogType[type_] % data)
@staticmethod
def getTime(time_format="%b %d %Y %H:%M:%S %Z"):
Time = time.localtime(time.time())
return time.strftime(time_format, Time)
def get_time(time_format="%b %d %Y %H:%M:%S %Z"):
time_ = time.localtime(time.time())
return time.strftime(time_format, time_)
def getData(self, path):
def get_data(self, path):
if self.threading:
return self.mdsShake("GET", path).get("data")
return self.mds_shake("GET", path).get("data")
else:
return self._local_query(path)
def getGroup(self, GroupID):
Groups = self.getData(["Group"])
if GroupID not in Groups:
self.updateData(["Group", GroupID], self.GroupType)
def get_group(self, group_id):
group_ids = self.get_data(["Group"])
if group_id not in group_ids:
self.update_data(["Group", group_ids], self.GroupType)
return self.GroupType
return Groups.get(GroupID)
return group_ids.get(group_ids)
def getSEGroup(self, GroupID):
GroupData = self.getGroup(GroupID)
SEMode = GroupData.get("SEGroup")
if SEMode is None:
def get_se_group(self, group_id):
group = self.get_group(group_id)
mode = group.get("SEGroup")
if mode is None:
return None
SEMode_ = {}
for Mode in SEMode:
Num_Mode = int(Mode)
SEMode_[Num_Mode] = SEMode[Mode]
return SEMode_
mode_ = {}
for mode__ in mode:
mode_num = int(mode__)
mode_[mode_num] = mode[mode__]
return mode_
def limitDecrease(self, limit_type, userId):
def trigger_limit(self, limit_type, user_id):
if self.threading:
self.mdsShake("YLD", limit_type, userId)
self.mds_shake("YLD", limit_type, user_id)
else:
self.Data["LimitInfo"][limit_type][userId] -= 1
self.Data["LimitInfo"][limit_type][user_id] -= 1

View file

@ -64,7 +64,7 @@ class PythonMDS:
_work["DEL"] = self._delete
_work["GET"] = self._query
_work["SYC"] = self._sync
_work["YLD"] = self._yuuki_limit_decrease
_work["YLD"] = self._yuuki_limit_trigger
_work["EXT"] = self._shutdown
self.port = port
@ -105,7 +105,7 @@ class PythonMDS:
self.switch_data = data["path"]
return {"status": 200}
def _yuuki_limit_decrease(self, data):
def _yuuki_limit_trigger(self, data):
self.switch_data["LimitInfo"][data["path"]][data["data"]] -= 1
return {"status": 200}

View file

@ -7,9 +7,9 @@ License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
from .callback import Yuuki_Callback
from .command import Yuuki_Command
from .join_group import Yuuki_JoinGroup
from .security import Yuuki_Security
from .callback import YuukiCallback
from .command import YuukiCommand
from .join_group import YuukiJoinGroup
from .security import YuukiSecurity
__all__ = ["Yuuki_Command", "Yuuki_JoinGroup", "Yuuki_Security", "Yuuki_Callback"]
__all__ = ["YuukiCommand", "YuukiJoinGroup", "YuukiSecurity", "YuukiCallback"]

View file

@ -10,36 +10,39 @@ import time
from yuuki_core.ttypes import ContentType
from ..tools import Yuuki_DynamicTools
from ..tools import YuukiDynamicTools
from ..yuuki import Yuuki
class Yuuki_Callback:
def __init__(self, Yuuki):
class YuukiCallback:
def __init__(self, handler: Yuuki):
"""
Event Type:
SEND_MESSAGE(25)
"""
self.Yuuki = Yuuki
self.Yuuki_DynamicTools = Yuuki_DynamicTools(self.Yuuki)
self.Yuuki = handler
self.YuukiDynamicTools = YuukiDynamicTools(self.Yuuki)
def _shutdown(self, ncMessage):
self.Yuuki.Thread_Control.add(self._shutdown_reply, (ncMessage,))
def _shutdown(self, operation):
self.Yuuki.Thread_Control.add(self._shutdown_reply, (operation,))
self.Yuuki.exit()
def _shutdown_reply(self, ncMessage):
def _shutdown_reply(self, operation):
time.sleep(1)
self.Yuuki_DynamicTools.sendText(
ncMessage.message.to,
self.YuukiDynamicTools.send_text(
operation.message.to,
self.Yuuki.get_text("Exit.")
)
def _text(self, ncMessage):
def _text(self, operation):
actions = {
'[Yuuki] Remote Shutdown': self._shutdown,
}
if ncMessage.message.text in actions:
actions[ncMessage.message.text](ncMessage)
if operation.message.text in actions:
function_ = actions[operation.message.text]
if callable(function_):
function_(operation)
def action(self, ncMessage):
if ncMessage.message.contentType == ContentType.NONE:
self._text(ncMessage)
def action(self, operation):
if operation.message.contentType == ContentType.NONE:
self._text(operation)

View file

@ -12,239 +12,244 @@ import time
from yuuki_core.ttypes import MIDType, ContentType, OpType
from ..tools import Yuuki_StaticTools, Yuuki_DynamicTools
from ..tools import YuukiStaticTools, YuukiDynamicTools
from ..yuuki import Yuuki
class Yuuki_Command:
def __init__(self, Yuuki):
class YuukiCommand:
def __init__(self, handler: Yuuki):
"""
Event Type:
RECEIVE_MESSAGE (26)
"""
self.Yuuki = Yuuki
self.Yuuki_DynamicTools = Yuuki_DynamicTools(self.Yuuki)
self.Yuuki = handler
self.YuukiDynamicTools = YuukiDynamicTools(self.Yuuki)
def _Help(self, ncMessage):
self.Yuuki_DynamicTools.sendText(
Yuuki_StaticTools.sendToWho(ncMessage),
def _help(self, operation):
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text(
"%s\n\t%s\n\nCommands Info:\n%s\n\nPrivacy:\n%s\n\nMore Information:\n%s\n\n%s") %
(
self.Yuuki.YuukiConfigs["name"],
self.Yuuki.YuukiConfigs["version"],
self.Yuuki.YuukiConfigs["man_page"],
self.Yuuki.YuukiConfigs["privacy_page"],
self.Yuuki.YuukiConfigs["project_url"],
self.Yuuki.YuukiConfigs["copyright"],
self.Yuuki.configs["name"],
self.Yuuki.configs["version"],
self.Yuuki.configs["man_page"],
self.Yuuki.configs["privacy_page"],
self.Yuuki.configs["project_url"],
self.Yuuki.configs["copyright"],
)
)
def _Version(self, ncMessage):
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.YuukiConfigs["version"])
def _version(self, operation):
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.configs["version"])
def _UserID(self, ncMessage):
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("LINE System UserID:\n") + ncMessage.message.from_)
def _user_id(self, operation):
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("LINE System UserID:\n") + operation.message.from_)
def _GetAllHelper(self, ncMessage):
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.Yuuki_DynamicTools.getClient(
self.Yuuki.MyMID).getGroup(ncMessage.message.to)
GroupPrivilege = [
def _get_all_helper(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
Yuuki_StaticTools.sybGetGroupCreator(GroupInfo).mid,
*self.Yuuki.data.getGroup(GroupInfo.id)["Ext_Admin"]
YuukiStaticTools.get_group_creator(group).mid,
*self.Yuuki.data.get_group(group.id)["Ext_Admin"]
]
if ncMessage.message.from_ in GroupPrivilege:
for userId in self.Yuuki.Connect.helper:
self.Yuuki_DynamicTools.sendUser(
Yuuki_StaticTools.sendToWho(ncMessage),
userId)
if operation.message.from_ in group_privilege:
for user_id in self.Yuuki.Connect.helper:
self.YuukiDynamicTools.send_user(
YuukiStaticTools.send_to_who(operation),
user_id)
def _Speed(self, ncMessage):
Time1 = time.time()
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("Testing..."))
Time2 = time.time()
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("Speed:\n %s com/s") % (Time2 - Time1,))
def _speed(self, operation):
timer_start = time.time()
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Testing...")
)
timer_end = time.time()
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Speed:\n %s com/s") % (timer_end - timer_start,)
)
def _SecurityMode(self, ncMessage):
msgSep = ncMessage.message.text.split(" ")
if ncMessage.message.from_ in self.Yuuki.Admin:
if len(msgSep) == 2:
if msgSep[1].isdigit() and 1 >= int(msgSep[1]) >= 0:
self.Yuuki.data.updateData(
["Global", "SecurityService"], bool(msgSep[1]))
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("Okay"))
def _security_mode(self, operation):
message_separated = operation.message.text.split(" ")
if operation.message.from_ in self.Yuuki.Admin:
if len(message_separated) == 2:
if message_separated[1].isdigit() and 1 >= int(message_separated[1]) >= 0:
self.Yuuki.data.update_data(
["Global", "SecurityService"], bool(message_separated[1]))
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Okay"))
else:
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("Enable(True): 1\nDisable(False): 0"))
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Enable(True): 1\nDisable(False): 0"))
else:
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(ncMessage),
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(operation),
str(bool(
self.Yuuki.data.getData(["Global", "SecurityService"]))))
self.Yuuki.data.get_data(["Global", "SecurityService"]))))
def _Switch(self, ncMessage):
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.Yuuki_DynamicTools.getClient(
self.Yuuki.MyMID).getGroup(ncMessage.message.to)
GroupPrivilege = [
def _switch(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
Yuuki_StaticTools.sybGetGroupCreator(GroupInfo).mid,
*self.Yuuki.data.getGroup(GroupInfo.id)["Ext_Admin"]
YuukiStaticTools.get_group_creator(group).mid,
*self.Yuuki.data.get_group(group.id)["Ext_Admin"]
]
if not self.Yuuki.data.getData(["Global", "SecurityService"]):
self.Yuuki_DynamicTools.sendText(
Yuuki_StaticTools.sendToWho(ncMessage),
self.Yuuki.get_text("SecurityService of %s was disable") % (self.Yuuki.YuukiConfigs["name"],)
if not self.Yuuki.data.get_data(["Global", "SecurityService"]):
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("SecurityService of %s was disable") % (self.Yuuki.configs["name"],)
)
elif ncMessage.message.from_ in GroupPrivilege:
self._Switch_action(ncMessage)
elif operation.message.from_ in group_privilege:
self._switch_action(operation)
def _Switch_action(self, ncMessage):
msgSep = ncMessage.message.text.split(" ")
def _switch_action(self, operation):
message_separated = operation.message.text.split(" ")
status = []
unknown_msg = []
unknown_msgtext = ""
for count, code in enumerate(msgSep):
for count, code in enumerate(message_separated):
if code.isdigit() and 3 >= int(code) >= 0:
status.append(int(code))
elif count != 0:
unknown_msg.append(code.strip())
self.Yuuki_DynamicTools.configSecurityStatus(
ncMessage.message.to, status)
self.YuukiDynamicTools.config_security_status(
operation.message.to, status)
if unknown_msg:
unknown_msgtext = ", ".join(unknown_msg)
if status:
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("Okay"))
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Okay"))
else:
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("Not Found"))
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Not Found"))
if unknown_msgtext != "":
self.Yuuki_DynamicTools.sendText(
Yuuki_StaticTools.sendToWho(ncMessage),
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text(
"Notice: Unknown command line argument(s)") + "\n({})".format(unknown_msgtext)
)
def _DisableAll(self, ncMessage):
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.Yuuki_DynamicTools.getClient(
self.Yuuki.MyMID).getGroup(ncMessage.message.to)
GroupPrivilege = [
def _disable_all(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
Yuuki_StaticTools.sybGetGroupCreator(GroupInfo).mid,
*self.Yuuki.data.getGroup(GroupInfo.id)["Ext_Admin"]
YuukiStaticTools.get_group_creator(group).mid,
*self.Yuuki.data.get_group(group.id)["Ext_Admin"]
]
if not self.Yuuki.data.getData(["Global", "SecurityService"]):
self.Yuuki_DynamicTools.sendText(
Yuuki_StaticTools.sendToWho(ncMessage),
self.Yuuki.get_text("SecurityService of %s was disable") % (self.Yuuki.YuukiConfigs["name"],)
if not self.Yuuki.data.get_data(["Global", "SecurityService"]):
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("SecurityService of %s was disable") % (self.Yuuki.configs["name"],)
)
elif ncMessage.message.from_ in GroupPrivilege:
self.Yuuki_DynamicTools.configSecurityStatus(
ncMessage.message.to, [])
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("Okay"))
elif operation.message.from_ in group_privilege:
self.YuukiDynamicTools.config_security_status(
operation.message.to, [])
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Okay"))
def _ExtAdmin(self, ncMessage):
msgSep = ncMessage.message.text.split(" ")
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.Yuuki_DynamicTools.getClient(
self.Yuuki.MyMID).getGroup(ncMessage.message.to)
GroupPrivilege = [
def _ext_admin(self, operation):
message_separated = operation.message.text.split(" ")
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
Yuuki_StaticTools.sybGetGroupCreator(GroupInfo).mid
YuukiStaticTools.get_group_creator(group).mid
]
if len(msgSep) == 3:
if ncMessage.message.from_ in GroupPrivilege:
if msgSep[1] == "add":
self._ExtAdmin_Add(ncMessage, msgSep, GroupInfo)
elif msgSep[1] == "delete":
self._ExtAdmin_Delete(ncMessage, msgSep, GroupInfo)
if len(message_separated) == 3:
if operation.message.from_ in group_privilege:
if message_separated[1] == "add":
self._ext_admin_add(operation, message_separated, group)
elif message_separated[1] == "delete":
self._ext_admin_delete(operation, message_separated, group)
else:
self._ExtAdmin_Query(ncMessage, GroupInfo)
self._ext_admin_query(operation, group)
def _ExtAdmin_Add(self, ncMessage, msgSep, GroupInfo):
if msgSep[2] in [Member.mid for Member in GroupInfo.members]:
if msgSep[2] in self.Yuuki.data.getGroup(GroupInfo.id)["Ext_Admin"]:
self.Yuuki_DynamicTools.sendText(
Yuuki_StaticTools.sendToWho(ncMessage),
def _ext_admin_add(self, operation, message_separated, group):
if message_separated[2] in [Member.mid for Member in group.members]:
if message_separated[2] in self.Yuuki.data.get_group(group.id)["Ext_Admin"]:
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Added")
)
elif msgSep[2] not in self.Yuuki.data.getData(["BlackList"]):
origin = self.Yuuki.data.getData(
["Group", GroupInfo.id, "Ext_Admin"])
elif message_separated[2] not in self.Yuuki.data.get_data(["BlackList"]):
origin = self.Yuuki.data.get_data(
["Group", group.id, "Ext_Admin"])
ext_admin_list = origin.copy()
ext_admin_list.append(msgSep[2])
self.Yuuki.data.updateData(
["Group", GroupInfo.id, "Ext_Admin"], ext_admin_list)
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("Okay"))
ext_admin_list.append(message_separated[2])
self.Yuuki.data.update_data(
["Group", group.id, "Ext_Admin"], ext_admin_list)
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Okay"))
else:
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(ncMessage),
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text(
"The User(s) was in our blacklist database."))
else:
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(ncMessage),
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text(
"Wrong UserID or the guy is not in Group"))
def _ExtAdmin_Delete(self, ncMessage, msgSep, GroupInfo):
if msgSep[2] in self.Yuuki.data.getGroup(GroupInfo.id)["Ext_Admin"]:
origin = self.Yuuki.data.getData(["Group", GroupInfo.id, "Ext_Admin"])
def _ext_admin_delete(self, operation, message_separated, group):
if message_separated[2] in self.Yuuki.data.get_group(group.id)["Ext_Admin"]:
origin = self.Yuuki.data.get_data(["Group", group.id, "Ext_Admin"])
ext_admin_list = origin.copy()
ext_admin_list.remove(msgSep[2])
self.Yuuki.data.updateData(
["Group", GroupInfo.id, "Ext_Admin"], ext_admin_list)
self.Yuuki_DynamicTools.sendText(
Yuuki_StaticTools.sendToWho(ncMessage),
ext_admin_list.remove(message_separated[2])
self.Yuuki.data.update_data(
["Group", group.id, "Ext_Admin"], ext_admin_list)
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Okay")
)
else:
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("Not Found"))
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Not Found"))
def _ExtAdmin_Query(self, ncMessage, GroupInfo):
if self.Yuuki.data.getGroup(GroupInfo.id)["Ext_Admin"]:
def _ext_admin_query(self, operation, group):
if self.Yuuki.data.get_group(group.id)["Ext_Admin"]:
status = ""
status_added = []
for member in GroupInfo.members:
if member.mid in self.Yuuki.data.getGroup(GroupInfo.id)["Ext_Admin"]:
for member in group.members:
if member.mid in self.Yuuki.data.get_group(group.id)["Ext_Admin"]:
status += "{}\n".format(member.displayName)
status_added.append(member.mid)
for userId in self.Yuuki.data.getGroup(GroupInfo.id)["Ext_Admin"]:
if userId not in status_added:
for user_id in self.Yuuki.data.get_group(group.id)["Ext_Admin"]:
if user_id not in status_added:
status += "{}: {}\n".format(
self.Yuuki.get_text("Unknown"),
userId
user_id
)
self.Yuuki_DynamicTools.sendText(
Yuuki_StaticTools.sendToWho(ncMessage),
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
status + self.Yuuki.get_text("\nExtend Administrator(s)")
)
else:
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(
ncMessage), self.Yuuki.get_text("Not Found"))
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(
operation), self.Yuuki.get_text("Not Found"))
def _Status(self, ncMessage):
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.Yuuki_DynamicTools.getClient(
self.Yuuki.MyMID).getGroup(ncMessage.message.to)
group_status = self.Yuuki.data.getSEGroup(
ncMessage.message.to)
if not self.Yuuki.data.getData(["Global", "SecurityService"]):
def _status(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group_status = self.Yuuki.data.get_se_group(
operation.message.to)
if not self.Yuuki.data.get_data(["Global", "SecurityService"]):
status = self.Yuuki.get_text("SecurityService of %s was disable") % (
self.Yuuki.YuukiConfigs["name"],
self.Yuuki.configs["name"],
)
elif group_status is None:
status = self.Yuuki.get_text("Default without Initialize\nMain Admin of the Group:\n%s") % (
Yuuki_StaticTools.sybGetGroupCreator(
GroupInfo).displayName,
YuukiStaticTools.get_group_creator(
group).displayName,
)
else:
status = self.Yuuki.get_text(
@ -255,128 +260,130 @@ class Yuuki_Command:
group_status[OpType.NOTIFIED_INVITE_INTO_GROUP],
group_status[OpType.NOTIFIED_ACCEPT_GROUP_INVITATION],
group_status[OpType.NOTIFIED_KICKOUT_FROM_GROUP],
Yuuki_StaticTools.sybGetGroupCreator(
GroupInfo).displayName,
YuukiStaticTools.get_group_creator(
group).displayName,
)
self.Yuuki_DynamicTools.sendText(
Yuuki_StaticTools.sendToWho(ncMessage), status)
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation), status)
def _GroupBackup(self, ncMessage):
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.Yuuki_DynamicTools.getClient(
self.Yuuki.MyMID).getGroup(ncMessage.message.to)
GroupPrivilege = [
def _group_backup(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
Yuuki_StaticTools.sybGetGroupCreator(GroupInfo).mid,
*self.Yuuki.data.getGroup(GroupInfo.id)["Ext_Admin"]
YuukiStaticTools.get_group_creator(group).mid,
*self.Yuuki.data.get_group(group.id)["Ext_Admin"]
]
if ncMessage.message.from_ in GroupPrivilege:
GroupMembers = [User.mid for User in GroupInfo.members]
GroupInvites = None
if GroupInfo.invitee:
GroupInvites = [
User.mid for User in GroupInfo.invitee]
LayoutInfo = {
"OriginID": GroupInfo.id,
"Members": GroupMembers,
"Invites": GroupInvites
if operation.message.from_ in group_privilege:
group_members = [User.mid for User in group.members]
group_invitations = None
if group.invitee:
group_invitations = [
User.mid for User in group.invitee]
output_info = {
"OriginID": group.id,
"Members": group_members,
"Invites": group_invitations
}
self.Yuuki_DynamicTools.sendText(
ncMessage.message.from_, GroupInfo.name)
self.Yuuki_DynamicTools.sendText(
ncMessage.message.from_, json.dumps(LayoutInfo))
self.Yuuki_DynamicTools.sendText(
ncMessage.message.to, self.Yuuki.get_text("Okay"))
self.YuukiDynamicTools.send_text(
operation.message.from_, group.name)
self.YuukiDynamicTools.send_text(
operation.message.from_, json.dumps(output_info))
self.YuukiDynamicTools.send_text(
operation.message.to, self.Yuuki.get_text("Okay"))
def _Quit(self, ncMessage):
if ncMessage.message.toType == MIDType.GROUP:
GroupInfo = self.Yuuki_DynamicTools.getClient(
def _quit(self, operation):
if operation.message.toType == MIDType.GROUP:
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID
).getGroup(ncMessage.message.to)
GroupPrivilege = [
).getGroup(operation.message.to)
group_privilege = [
*self.Yuuki.Admin,
Yuuki_StaticTools.sybGetGroupCreator(GroupInfo).mid
YuukiStaticTools.get_group_creator(group).mid
]
if ncMessage.message.from_ in GroupPrivilege:
self.Yuuki_DynamicTools.leaveGroup(GroupInfo)
if operation.message.from_ in group_privilege:
self.YuukiDynamicTools.leave_group(group)
def _Exit(self, ncMessage):
if ncMessage.message.from_ in self.Yuuki.Admin:
self.Yuuki_DynamicTools.sendText(
Yuuki_StaticTools.sendToWho(ncMessage),
def _exit(self, operation):
if operation.message.from_ in self.Yuuki.Admin:
self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Exit.")
)
self.Yuuki.exit()
def _SystemCall(self, ncMessage):
msgSep = ncMessage.message.text.split(" ")
if ncMessage.message.from_ in self.Yuuki.Admin:
def _system_call(self, operation):
message_separated = operation.message.text.split(" ")
if operation.message.from_ in self.Yuuki.Admin:
# noinspection PyBroadException
try:
ComMsg = Yuuki_StaticTools.readCommandLine(msgSep[1:len(msgSep)])
Report = str(eval(ComMsg))
command_message = YuukiStaticTools.read_commands(message_separated[1:len(message_separated)])
reporting_message = str(eval(command_message))
except:
(err1, err2, err3, ErrorInfo) = Yuuki_StaticTools.errorReport()
Report = "Star Yuuki BOT - Eval Error:\n%s\n%s\n%s\n\n%s" % (err1, err2, err3, ErrorInfo)
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(ncMessage), Report)
(err1, err2, err3, error_info) = YuukiStaticTools.report_error()
reporting_message = "Star Yuuki BOT - Eval Error:\n%s\n%s\n%s\n\n%s" % (err1, err2, err3, error_info)
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(operation), reporting_message)
def _text(self, ncMessage):
Yuuki_Name = self.Yuuki.YuukiConfigs["name"]
msgSep = ncMessage.message.text.split(" ")[0].split("/")
def _text(self, operation):
yuuki_name = self.Yuuki.configs["name"]
message_separated = operation.message.text.split(" ")[0].split("/")
actions = {
'Help': self._Help,
'Version': self._Version,
'UserID': self._UserID,
'GetAllHelper': self._GetAllHelper,
'Speed': self._Speed,
'SecurityMode': self._SecurityMode,
'Switch': self._Switch,
'DisableAll': self._DisableAll,
'ExtAdmin': self._ExtAdmin,
'Status': self._Status,
'GroupBackup': self._GroupBackup,
'Quit': self._Quit,
'Exit': self._Exit,
'SystemCall': self._SystemCall,
'Help': self._help,
'Version': self._version,
'UserID': self._user_id,
'GetAllHelper': self._get_all_helper,
'Speed': self._speed,
'SecurityMode': self._security_mode,
'Switch': self._switch,
'DisableAll': self._disable_all,
'ExtAdmin': self._ext_admin,
'Status': self._status,
'GroupBackup': self._group_backup,
'Quit': self._quit,
'Exit': self._exit,
'SystemCall': self._system_call,
}
if Yuuki_Name == msgSep[0]:
if len(msgSep) > 1 and msgSep[1] in actions:
return actions[msgSep[1]](ncMessage)
return self.Yuuki_DynamicTools.sendText(
Yuuki_StaticTools.sendToWho(ncMessage),
self.Yuuki.get_text("Helllo^^\nMy name is %s ><\nNice to meet you OwO") % (Yuuki_Name,)
if yuuki_name == message_separated[0]:
if len(message_separated) > 1 and message_separated[1] in actions:
function_ = actions[message_separated[1]]
if callable(function_):
return function_(operation)
return self.YuukiDynamicTools.send_text(
YuukiStaticTools.send_to_who(operation),
self.Yuuki.get_text("Helllo^^\nMy name is %s ><\nNice to meet you OwO") % (yuuki_name,)
)
def _contact(self, ncMessage):
cache = ncMessage.message.contentMetadata["mid"]
contactInfo = self.Yuuki_DynamicTools.getContact(cache)
if not contactInfo:
def _contact(self, operation):
cache = operation.message.contentMetadata["mid"]
contact_info = self.YuukiDynamicTools.get_contact(cache)
if not contact_info:
msg = self.Yuuki.get_text("Not Found")
elif contactInfo.mid in self.Yuuki.data.getData(["BlackList"]):
elif contact_info.mid in self.Yuuki.data.get_data(["BlackList"]):
msg = "{}\n{}".format(self.Yuuki.get_text(
"The User(s) was in our blacklist database."), contactInfo.mid)
"The User(s) was in our blacklist database."), contact_info.mid)
else:
msg = self.Yuuki.get_text("Name:%s\nPicture URL:%s/%s\nStatusMessage:\n%s\nLINE System UserID:%s") % \
(contactInfo.displayName, self.Yuuki.LINE_Media_server, contactInfo.pictureStatus,
contactInfo.statusMessage, contactInfo.mid)
self.Yuuki_DynamicTools.sendText(Yuuki_StaticTools.sendToWho(ncMessage), msg)
(contact_info.displayName, self.Yuuki.LINE_Media_server, contact_info.pictureStatus,
contact_info.statusMessage, contact_info.mid)
self.YuukiDynamicTools.send_text(YuukiStaticTools.send_to_who(operation), msg)
def action(self, ncMessage):
BlockedIgnore = (ncMessage.message.to in self.Yuuki.data.getData(["BlackList"])) or \
(ncMessage.message.from_ in self.Yuuki.data.getData(["BlackList"]))
def action(self, operation):
blocked_user = (operation.message.to in self.Yuuki.data.get_data(["BlackList"])) or \
(operation.message.from_ in self.Yuuki.data.get_data(["BlackList"]))
if ('BOT_CHECK' in ncMessage.message.contentMetadata) or BlockedIgnore:
if ('BOT_CHECK' in operation.message.contentMetadata) or blocked_user:
pass
elif ncMessage.message.toType == MIDType.ROOM:
self.Yuuki_DynamicTools.getClient(
elif operation.message.toType == MIDType.ROOM:
self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID
).leaveRoom(
self.Yuuki.Seq, ncMessage.message.to
self.Yuuki.Seq, operation.message.to
)
elif ncMessage.message.contentType == ContentType.NONE:
self._text(ncMessage)
elif operation.message.contentType == ContentType.NONE:
self._text(operation)
elif ncMessage.message.contentType == ContentType.CONTACT:
self._contact(ncMessage)
elif operation.message.contentType == ContentType.CONTACT:
self._contact(operation)

View file

@ -7,85 +7,83 @@ License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
from ..tools import Yuuki_StaticTools, Yuuki_DynamicTools
from ..tools import YuukiStaticTools, YuukiDynamicTools
from ..yuuki import Yuuki
class Yuuki_JoinGroup:
def __init__(self, Yuuki):
class YuukiJoinGroup:
def __init__(self, handler: Yuuki):
"""
Event Type:
NOTIFIED_INVITE_INTO_GROUP (13)
"""
self.Yuuki = Yuuki
self.Yuuki_DynamicTools = Yuuki_DynamicTools(self.Yuuki)
self.Yuuki = handler
self.YuukiDynamicTools = YuukiDynamicTools(self.Yuuki)
def _accept(self, GroupID, GroupInfo, Inviter):
GroupList = self.Yuuki.data.getData(["Global", "GroupJoined"])
NewGroupList = GroupList.copy()
NewGroupList.append(GroupID)
self.Yuuki.data.updateData(
["Global", "GroupJoined"], NewGroupList)
self.Yuuki_DynamicTools.sendText(
GroupID,
def _accept(self, group_id, group, inviter):
group_list = self.Yuuki.data.get_data(["Global", "GroupJoined"])
new_group_list = group_list.copy()
new_group_list.append(group_id)
self.Yuuki.data.update_data(
["Global", "GroupJoined"], new_group_list)
self.YuukiDynamicTools.send_text(
group_id,
self.Yuuki.get_text("Helllo^^\nMy name is %s ><\nNice to meet you OwO") %
(self.Yuuki.YuukiConfigs["name"],)
(self.Yuuki.configs["name"],)
)
self.Yuuki_DynamicTools.sendText(
GroupID,
self.YuukiDynamicTools.send_text(
group_id,
self.Yuuki.get_text("Type:\n\t%s/Help\nto get more information\n\nMain Admin of the Group:\n%s") %
(
self.Yuuki.YuukiConfigs["name"],
Yuuki_StaticTools.sybGetGroupCreator(GroupInfo).displayName,
self.Yuuki.configs["name"],
YuukiStaticTools.get_group_creator(group).displayName,
)
)
self.Yuuki_DynamicTools.getGroupTicket(GroupID, self.Yuuki.MyMID, True)
self.YuukiDynamicTools.get_group_ticket(group_id, self.Yuuki.MyMID, True)
# Log
self.Yuuki.data.updateLog(
"JoinGroup", (self.Yuuki.data.getTime(), GroupInfo.name, GroupID, Inviter))
self.Yuuki.data.update_log(
"JoinGroup", (self.Yuuki.data.get_time(), group.name, group_id, inviter))
def _reject(self, GroupID, Inviter):
self.Yuuki_DynamicTools.sendText(
GroupID,
def _reject(self, group_id, inviter):
self.YuukiDynamicTools.send_text(
group_id,
self.Yuuki.get_text("Sorry...\nThe number of members is not satisfied (%s needed)") %
(self.Yuuki.YuukiConfigs["GroupMebers_Demand"],)
(self.Yuuki.configs["GroupMembers_Demand"],)
)
self.Yuuki_DynamicTools.getClient(self.Yuuki.MyMID).leaveGroup(self.Yuuki.Seq, GroupID)
self.YuukiDynamicTools.get_client(self.Yuuki.MyMID).leave_group(self.Yuuki.Seq, group_id)
# Log
self.Yuuki.data.updateLog(
"JoinGroup", (self.Yuuki.data.getTime(), GroupID, "Not Join", Inviter))
self.Yuuki.data.update_log(
"JoinGroup", (self.Yuuki.data.get_time(), group_id, "Not Join", inviter))
def _helper_check(self, ncMessage, GroupInvite, BlockedIgnore):
if ncMessage.param1 in self.Yuuki.data.getData(["Global", "GroupJoined"]) and not BlockedIgnore:
for userId in self.Yuuki.Connect.helper:
if self.Yuuki_DynamicTools.checkInInvitationList(ncMessage, userId) or userId in GroupInvite:
self.Yuuki_DynamicTools.getClient(userId).acceptGroupInvitation(self.Yuuki.Seq, ncMessage.param1)
self.Yuuki_DynamicTools.getGroupTicket(ncMessage.param1, userId, True)
def _check_helper(self, operation, group_invitations, blocked_user):
if operation.param1 in self.Yuuki.data.get_data(["Global", "GroupJoined"]) and not blocked_user:
for user_id in self.Yuuki.Connect.helper:
if self.YuukiDynamicTools.check_invitation(operation, user_id) or user_id in group_invitations:
self.YuukiDynamicTools.get_client(user_id).acceptGroupInvitation(self.Yuuki.Seq, operation.param1)
self.YuukiDynamicTools.get_group_ticket(operation.param1, user_id, True)
# Log
self.Yuuki.data.updateLog("JoinGroup", (
self.Yuuki.data.getTime(),
ncMessage.param1,
userId,
ncMessage.param2
self.Yuuki.data.update_log("JoinGroup", (
self.Yuuki.data.get_time(),
operation.param1,
user_id,
operation.param2
))
def action(self, ncMessage):
GroupInvite = []
BlockedIgnore = ncMessage.param2 in self.Yuuki.data.getData(["BlackList"])
if self.Yuuki_DynamicTools.checkInInvitationList(ncMessage) and not BlockedIgnore:
GroupID = ncMessage.param1
Inviter = ncMessage.param2
GroupInfo = self.Yuuki_DynamicTools.getClient(
self.Yuuki.MyMID).getGroup(GroupID)
if GroupInfo.members:
GroupMember = [Catched.mid for Catched in GroupInfo.members]
GroupInfo.invitee = []
if GroupInfo.invitee:
GroupInvite = [
Catched.mid for Catched in GroupInfo.invitee]
self.Yuuki_DynamicTools.getClient(self.Yuuki.MyMID).acceptGroupInvitation(self.Yuuki.Seq, GroupID)
if len(GroupMember) >= self.Yuuki.YuukiConfigs["GroupMebers_Demand"]:
self._accept(GroupID, GroupInfo, Inviter)
else:
self._reject(GroupID, Inviter)
self._helper_check(ncMessage, GroupInvite, BlockedIgnore)
self.Yuuki.Security(ncMessage)
def action(self, operation):
group_invitations = []
blocked_user = operation.param2 in self.Yuuki.data.get_data(["BlackList"])
if self.YuukiDynamicTools.check_invitation(operation) and not blocked_user:
group_id = operation.param1
inviter = operation.param2
group = self.YuukiDynamicTools.get_client(
self.Yuuki.MyMID).getGroup(group_id)
group_member = [user.mid for user in group.members] if group.members else []
group_invitations = [user.mid for user in group.invitee] if group.invitee else []
self.YuukiDynamicTools.get_client(self.Yuuki.MyMID).acceptGroupInvitation(self.Yuuki.Seq, group_id)
if len(group_member) >= self.Yuuki.configs["GroupMembers_Demand"]:
self._accept(group_id, group, inviter)
else:
self._reject(group_id, inviter)
self._check_helper(operation, group_invitations, blocked_user)
self.Yuuki.Security(operation)

View file

@ -9,7 +9,8 @@ file, You can obtain one at http://mozilla.org/MPL/2.0/.
from yuuki_core.ttypes import OpType
from ..tools import Yuuki_StaticTools, Yuuki_DynamicTools
from ..tools import YuukiStaticTools, YuukiDynamicTools
from ..yuuki import Yuuki
def security_access_checker(function):
@ -21,8 +22,8 @@ def security_access_checker(function):
return wrapper
class Yuuki_Security:
def __init__(self, Yuuki):
class YuukiSecurity:
def __init__(self, handler: Yuuki):
"""
Event Type:
NOTIFIED_UPDATE_GROUP (11)
@ -30,279 +31,278 @@ class Yuuki_Security:
NOTIFIED_ACCEPT_GROUP_INVITATION (17)
NOTIFIED_KICKOUT_FROM_GROUP (19)
"""
self.Yuuki = Yuuki
self.Yuuki_DynamicTools = Yuuki_DynamicTools(self.Yuuki)
self.Yuuki = handler
self.YuukiDynamicTools = YuukiDynamicTools(self.Yuuki)
@security_access_checker
def _NOTIFIED_UPDATE_GROUP(self, GroupInfo, SecurityInfo, ncMessage):
if SecurityInfo["Another"] == '4':
if not GroupInfo.preventJoinByTicket and SecurityInfo["Action"] not in self.Yuuki.Connect.helper:
self.Yuuki.threadExec(
self.Yuuki_DynamicTools.changeGroupUrlStatus,
(GroupInfo, False)
def _notified_update_group(self, group, security_info, operation):
if security_info["Another"] == '4':
if not group.preventJoinByTicket and security_info["Action"] not in self.Yuuki.Connect.helper:
self.Yuuki.thread_append(
self.YuukiDynamicTools.switch_group_url_status,
(group, False)
)
self.Yuuki.threadExec(
self.Yuuki_DynamicTools.sendText,
(SecurityInfo["GroupID"], self.Yuuki.get_text("DO NOT ENABLE THE GROUP URL STATUS, see you..."))
self.Yuuki.thread_append(
self.YuukiDynamicTools.send_text,
(security_info["GroupID"], self.Yuuki.get_text("DO NOT ENABLE THE GROUP URL STATUS, see you..."))
)
Kicker = self.Yuuki_DynamicTools.modifyGroupMemberList(1, GroupInfo, SecurityInfo["Action"])
kicker = self.YuukiDynamicTools.modify_group_members(1, group, security_info["Action"])
# Log
self.Yuuki.data.updateLog("KickEvent", (
self.Yuuki.data.getTime(),
GroupInfo.name,
SecurityInfo["GroupID"],
Kicker,
SecurityInfo["Action"],
SecurityInfo["Another"],
ncMessage.type
self.Yuuki.data.update_log("KickEvent", (
self.Yuuki.data.get_time(),
group.name,
security_info["GroupID"],
kicker,
security_info["Action"],
security_info["Another"],
operation.type
))
@security_access_checker
def _NOTIFIED_INVITE_INTO_GROUP(self, GroupInfo, SecurityInfo, ncMessage):
Canceler = "None"
if "\x1e" in SecurityInfo["Another"]:
Canceler = self._NOTIFIED_INVITE_INTO_GROUP_list(GroupInfo, SecurityInfo, ncMessage, Canceler)
elif SecurityInfo["Another"] not in self.Yuuki.AllAccountIds + SecurityInfo["GroupPrivilege"]:
Canceler = self._NOTIFIED_INVITE_INTO_GROUP_single(GroupInfo, SecurityInfo, ncMessage)
if Canceler != "None":
self.Yuuki_DynamicTools.sendText(
SecurityInfo["GroupID"],
def _notified_invite_into_group(self, group, security_info, operation):
canceler = "None"
if "\x1e" in security_info["Another"]:
canceler = self._notified_invite_into_group_list(group, security_info, operation, canceler)
elif security_info["Another"] not in self.Yuuki.AllAccountIds + security_info["GroupPrivilege"]:
canceler = self._notified_invite_into_group_single(group, security_info, operation)
if canceler != "None":
self.YuukiDynamicTools.send_text(
security_info["GroupID"],
self.Yuuki.get_text("Do not invite anyone...thanks")
)
def _NOTIFIED_INVITE_INTO_GROUP_list(self, GroupInfo, SecurityInfo, ncMessage, Canceler):
for userId in SecurityInfo["Another"].split("\x1e"):
if userId not in self.Yuuki.AllAccountIds + SecurityInfo["GroupPrivilege"]:
if GroupInfo.invitee and userId in [user.mid for user in GroupInfo.invitee]:
Canceler = self.Yuuki_DynamicTools.modifyGroupMemberList(2, GroupInfo, userId)
def _notified_invite_into_group_list(self, group, security_info, operation, canceler):
for user_id in security_info["Another"].split("\x1e"):
if user_id not in self.Yuuki.AllAccountIds + security_info["GroupPrivilege"]:
if group.invitee and user_id in [user.mid for user in group.invitee]:
canceler = self.YuukiDynamicTools.modify_group_members(2, group, user_id)
else:
Canceler = self.Yuuki_DynamicTools.modifyGroupMemberList(1, GroupInfo, userId)
canceler = self.YuukiDynamicTools.modify_group_members(1, group, user_id)
# Log
self.Yuuki.data.updateLog("KickEvent", (
self.Yuuki.data.getTime(),
GroupInfo.name,
SecurityInfo["GroupID"],
Canceler,
SecurityInfo["Action"],
userId,
ncMessage.type * 10
self.Yuuki.data.update_log("KickEvent", (
self.Yuuki.data.get_time(),
group.name,
security_info["GroupID"],
canceler,
security_info["Action"],
user_id,
operation.type * 10
))
# Log
self.Yuuki.data.updateLog("CancelEvent", (
self.Yuuki.data.getTime(),
GroupInfo.name,
SecurityInfo["GroupID"],
Canceler,
SecurityInfo["Action"],
SecurityInfo["Another"].replace("\x1e", ",")
self.Yuuki.data.update_log("CancelEvent", (
self.Yuuki.data.get_time(),
group.name,
security_info["GroupID"],
canceler,
security_info["Action"],
security_info["Another"].replace("\x1e", ",")
))
return Canceler
return canceler
def _NOTIFIED_INVITE_INTO_GROUP_single(self, GroupInfo, SecurityInfo, ncMessage):
if GroupInfo.invitee and SecurityInfo["Another"] in [user.mid for user in GroupInfo.invitee]:
Canceler = self.Yuuki_DynamicTools.modifyGroupMemberList(2, GroupInfo, SecurityInfo["Another"])
def _notified_invite_into_group_single(self, group, security_info, operation):
if group.invitee and security_info["Another"] in [user.mid for user in group.invitee]:
canceler = self.YuukiDynamicTools.modify_group_members(2, group, security_info["Another"])
else:
Canceler = self.Yuuki_DynamicTools.modifyGroupMemberList(1, GroupInfo, SecurityInfo["Another"])
canceler = self.YuukiDynamicTools.modify_group_members(1, group, security_info["Another"])
# Log
self.Yuuki.data.updateLog("KickEvent", (
self.Yuuki.data.getTime(),
GroupInfo.name, SecurityInfo["GroupID"],
Canceler, SecurityInfo["Action"],
SecurityInfo["Another"],
ncMessage.type * 10
self.Yuuki.data.update_log("KickEvent", (
self.Yuuki.data.get_time(),
group.name, security_info["GroupID"],
canceler, security_info["Action"],
security_info["Another"],
operation.type * 10
))
# Log
self.Yuuki.data.updateLog("CancelEvent", (
self.Yuuki.data.getTime(),
GroupInfo.name,
SecurityInfo["GroupID"],
Canceler,
SecurityInfo["Action"],
SecurityInfo["Another"]
self.Yuuki.data.update_log("CancelEvent", (
self.Yuuki.data.get_time(),
group.name,
security_info["GroupID"],
canceler,
security_info["Action"],
security_info["Another"]
))
return Canceler
return canceler
@security_access_checker
def _NOTIFIED_ACCEPT_GROUP_INVITATION(self, GroupInfo, SecurityInfo, ncMessage):
for userId in self.Yuuki.data.getData(["BlackList"]):
if userId == SecurityInfo["Action"]:
self.Yuuki.threadExec(
self.Yuuki_DynamicTools.sendText,
(SecurityInfo["GroupID"], self.Yuuki.get_text("You are our blacklist. Bye~"))
def _notified_accept_group_invitation(self, group, security_info, operation):
for user_id in self.Yuuki.data.get_data(["BlackList"]):
if user_id == security_info["Action"]:
self.Yuuki.thread_append(
self.YuukiDynamicTools.send_text,
(security_info["GroupID"], self.Yuuki.get_text("You are our blacklist. Bye~"))
)
Kicker = self.Yuuki_DynamicTools.modifyGroupMemberList(1, GroupInfo, SecurityInfo["Action"])
kicker = self.YuukiDynamicTools.modify_group_members(1, group, security_info["Action"])
# Log
self.Yuuki.data.updateLog("KickEvent", (
self.Yuuki.data.getTime(),
GroupInfo.name,
SecurityInfo["GroupID"],
Kicker,
Kicker,
SecurityInfo["Action"],
ncMessage.type
self.Yuuki.data.update_log("KickEvent", (
self.Yuuki.data.get_time(),
group.name,
security_info["GroupID"],
kicker,
kicker,
security_info["Action"],
operation.type
))
def _NOTIFIED_KICKOUT_FROM_GROUP(self, GroupInfo, SecurityInfo, ncMessage):
if SecurityInfo["Action"] in self.Yuuki.Connect.helper:
def _notified_kickout_from_group(self, group, security_info, operation):
if security_info["Action"] in self.Yuuki.Connect.helper:
# Log
self.Yuuki.data.updateLog("KickEvent", (
self.Yuuki.data.getTime(),
GroupInfo.name,
SecurityInfo["GroupID"],
SecurityInfo["Action"],
SecurityInfo["Action"],
SecurityInfo["Another"],
ncMessage.type * 10 + 1
self.Yuuki.data.update_log("KickEvent", (
self.Yuuki.data.get_time(),
group.name,
security_info["GroupID"],
security_info["Action"],
security_info["Action"],
security_info["Another"],
operation.type * 10 + 1
))
elif SecurityInfo["Another"] in self.Yuuki.AllAccountIds:
Kicker = "None"
elif security_info["Another"] in self.Yuuki.AllAccountIds:
kicker = "None"
try:
Kicker = self.Yuuki_DynamicTools.modifyGroupMemberList(
kicker = self.YuukiDynamicTools.modify_group_members(
1,
GroupInfo,
SecurityInfo["Action"],
SecurityInfo["Another"]
group,
security_info["Action"],
security_info["Another"]
)
self._NOTIFIED_KICKOUT_FROM_GROUP_rescue(GroupInfo, SecurityInfo, ncMessage, Kicker)
self._notified_kickout_from_group_rescue(group, security_info, operation, kicker)
except:
self._NOTIFIED_KICKOUT_FROM_GROUP_rescue_failure(GroupInfo, SecurityInfo, ncMessage, Kicker)
BlackList = self.Yuuki.data.getData(["BlackList"])
if SecurityInfo["Action"] not in BlackList:
NewBlackList = BlackList.copy()
NewBlackList.append(SecurityInfo["Action"])
self.Yuuki.data.updateData(["BlackList"], NewBlackList)
self._notified_kickout_from_group_rescue_failure(group, security_info, operation, kicker)
black_list = self.Yuuki.data.get_data(["BlackList"])
if security_info["Action"] not in black_list:
new_black_list = black_list.copy()
new_black_list.append(security_info["Action"])
self.Yuuki.data.update_data(["BlackList"], new_black_list)
# Log
self.Yuuki.data.updateLog("BlackList", (
self.Yuuki.data.getTime(),
SecurityInfo["Action"],
SecurityInfo["GroupID"]
self.Yuuki.data.update_log("BlackList", (
self.Yuuki.data.get_time(),
security_info["Action"],
security_info["GroupID"]
))
self.Yuuki.threadExec(
self.Yuuki_DynamicTools.sendText,
self.Yuuki.thread_append(
self.YuukiDynamicTools.send_text,
(
SecurityInfo["Action"],
security_info["Action"],
self.Yuuki.get_text("You had been blocked by our database.")
)
)
elif SecurityInfo["Security_Access"]:
self._NOTIFIED_KICKOUT_FROM_GROUP_normal(GroupInfo, SecurityInfo, ncMessage)
elif security_info["Security_Access"]:
self._notified_kickout_from_group_normal(group, security_info, operation)
def _NOTIFIED_KICKOUT_FROM_GROUP_rescue(self, GroupInfo, SecurityInfo, ncMessage, Kicker):
def _notified_kickout_from_group_rescue(self, group, security_info, operation, kicker):
# Log
self.Yuuki.data.updateLog("KickEvent", (
self.Yuuki.data.getTime(),
GroupInfo.name,
SecurityInfo["GroupID"],
Kicker,
SecurityInfo["Action"],
SecurityInfo["Another"],
ncMessage.type * 10 + 2
self.Yuuki.data.update_log("KickEvent", (
self.Yuuki.data.get_time(),
group.name,
security_info["GroupID"],
kicker,
security_info["Action"],
security_info["Another"],
operation.type * 10 + 2
))
assert Kicker != "None", "No Helper Found"
if GroupInfo.preventJoinByTicket:
self.Yuuki.threadExec(
self.Yuuki_DynamicTools.changeGroupUrlStatus,
(GroupInfo, True, Kicker)
assert kicker != "None", "No Helper Found"
if group.preventJoinByTicket:
self.Yuuki.thread_append(
self.YuukiDynamicTools.switch_group_url_status,
(group, True, kicker)
)
GroupTicket = self.Yuuki_DynamicTools.getGroupTicket(
SecurityInfo["GroupID"], Kicker)
group_ticket = self.YuukiDynamicTools.get_group_ticket(
security_info["GroupID"], kicker)
try:
self.Yuuki_DynamicTools.getClient(SecurityInfo["Another"]).acceptGroupInvitationByTicket(
self.YuukiDynamicTools.get_client(security_info["Another"]).acceptGroupInvitationByTicket(
self.Yuuki.Seq,
SecurityInfo["GroupID"],
GroupTicket
security_info["GroupID"],
group_ticket
)
except:
if GroupInfo.preventJoinByTicket:
self.Yuuki_DynamicTools.changeGroupUrlStatus(
GroupInfo,
if group.preventJoinByTicket:
self.YuukiDynamicTools.switch_group_url_status(
group,
True,
Kicker
kicker
)
GroupTicket = self.Yuuki_DynamicTools.getGroupTicket(
SecurityInfo["GroupID"], Kicker, True)
self.Yuuki_DynamicTools.getClient(SecurityInfo["Another"]).acceptGroupInvitationByTicket(
group_ticket = self.YuukiDynamicTools.get_group_ticket(
security_info["GroupID"], kicker, True)
self.YuukiDynamicTools.get_client(security_info["Another"]).acceptGroupInvitationByTicket(
self.Yuuki.Seq,
SecurityInfo["GroupID"],
GroupTicket
security_info["GroupID"],
group_ticket
)
if GroupInfo.preventJoinByTicket:
self.Yuuki.threadExec(
self.Yuuki_DynamicTools.changeGroupUrlStatus, (GroupInfo, False, SecurityInfo["Another"]))
self.Yuuki_DynamicTools.getGroupTicket(
SecurityInfo["GroupID"], SecurityInfo["Another"], True)
if group.preventJoinByTicket:
self.Yuuki.thread_append(
self.YuukiDynamicTools.switch_group_url_status, (group, False, security_info["Another"]))
self.YuukiDynamicTools.get_group_ticket(
security_info["GroupID"], security_info["Another"], True)
def _NOTIFIED_KICKOUT_FROM_GROUP_rescue_failure(self, GroupInfo, SecurityInfo, ncMessage, Kicker):
(err1, err2, err3, ErrorInfo) = Yuuki_StaticTools.errorReport()
def _notified_kickout_from_group_rescue_failure(self, group, security_info, operation, kicker):
(err1, err2, err3, error_info) = YuukiStaticTools.report_error()
for Root in self.Yuuki.Admin:
self.Yuuki_DynamicTools.sendText(
self.YuukiDynamicTools.send_text(
Root,
"Star Yuuki BOT - SecurityService Failure\n\n%s\n%s\n%s\n\n%s" % (err1, err2, err3, ErrorInfo)
"Star Yuuki BOT - SecurityService Failure\n\n%s\n%s\n%s\n\n%s" % (err1, err2, err3, error_info)
)
if SecurityInfo["Another"] == self.Yuuki.MyMID:
GroupList = self.Yuuki.data.getData(["Global", "GroupJoined"])
NewGroupList = GroupList.copy()
NewGroupList.remove(SecurityInfo["GroupID"])
self.Yuuki.data.updateData(["Global", "GroupJoined"], NewGroupList)
if security_info["Another"] == self.Yuuki.MyMID:
group_list = self.Yuuki.data.get_data(["Global", "GroupJoined"])
new_group_list = group_list.copy()
new_group_list.remove(security_info["GroupID"])
self.Yuuki.data.update_data(["Global", "GroupJoined"], new_group_list)
# Log
self.Yuuki.data.updateLog("KickEvent", (
self.Yuuki.data.getTime(),
GroupInfo.name,
SecurityInfo["GroupID"],
Kicker,
SecurityInfo["Action"],
SecurityInfo["Another"],
ncMessage.type * 10 + 3
self.Yuuki.data.update_log("KickEvent", (
self.Yuuki.data.get_time(),
group.name,
security_info["GroupID"],
kicker,
security_info["Action"],
security_info["Another"],
operation.type * 10 + 3
))
def _NOTIFIED_KICKOUT_FROM_GROUP_normal(self, GroupInfo, SecurityInfo, ncMessage):
self.Yuuki.threadExec(self.Yuuki_DynamicTools.sendText, (
SecurityInfo["GroupID"], self.Yuuki.get_text("DO NOT KICK, thank you ^^")))
Kicker = self.Yuuki_DynamicTools.modifyGroupMemberList(1, GroupInfo, SecurityInfo["Action"])
def _notified_kickout_from_group_normal(self, group, security_info, operation):
self.Yuuki.thread_append(self.YuukiDynamicTools.send_text, (
security_info["GroupID"], self.Yuuki.get_text("DO NOT KICK, thank you ^^")))
kicker = self.YuukiDynamicTools.modify_group_members(1, group, security_info["Action"])
# Log
self.Yuuki.data.updateLog("KickEvent", (
self.Yuuki.data.getTime(),
GroupInfo.name,
SecurityInfo["GroupID"],
Kicker,
SecurityInfo["Action"],
SecurityInfo["Another"],
ncMessage.type
self.Yuuki.data.update_log("KickEvent", (
self.Yuuki.data.get_time(),
group.name,
security_info["GroupID"],
kicker,
security_info["Action"],
security_info["Another"],
operation.type
))
self.Yuuki.threadExec(self.Yuuki_DynamicTools.sendText, (SecurityInfo["GroupID"], self.Yuuki.get_text(
self.Yuuki.thread_append(self.YuukiDynamicTools.send_text, (security_info["GroupID"], self.Yuuki.get_text(
"The one who was been kicked:")))
self.Yuuki.threadExec(
self.Yuuki_DynamicTools.sendUser, (SecurityInfo["GroupID"], SecurityInfo["Another"]))
self.Yuuki.thread_append(
self.YuukiDynamicTools.send_user, (security_info["GroupID"], security_info["Another"]))
def action(self, ncMessage):
SecurityInfo = Yuuki_StaticTools.securityForWhere(ncMessage)
def action(self, operation):
security_info = YuukiStaticTools.security_for_where(operation)
GroupInfo = self.Yuuki_DynamicTools.getClient(self.Yuuki.MyMID).getGroup(SecurityInfo["GroupID"])
SecurityInfo["GroupPrivilege"] = [
group = self.YuukiDynamicTools.get_client(self.Yuuki.MyMID).getGroup(security_info["GroupID"])
security_info["GroupPrivilege"] = [
*self.Yuuki.Admin,
Yuuki_StaticTools.sybGetGroupCreator(GroupInfo).mid,
*self.Yuuki.data.getGroup(GroupInfo.id)["Ext_Admin"]
YuukiStaticTools.get_group_creator(group).mid,
*self.Yuuki.data.get_group(group.id)["Ext_Admin"]
]
if SecurityInfo["Action"] in SecurityInfo["GroupPrivilege"] or \
SecurityInfo["Another"] in SecurityInfo["GroupPrivilege"]:
if ncMessage.type != OpType.NOTIFIED_KICKOUT_FROM_GROUP:
if security_info["Action"] in security_info["GroupPrivilege"] or \
security_info["Another"] in security_info["GroupPrivilege"]:
if operation.type != OpType.NOTIFIED_KICKOUT_FROM_GROUP:
return
elif SecurityInfo["Action"] in SecurityInfo["GroupPrivilege"]:
elif security_info["Action"] in security_info["GroupPrivilege"]:
return
SEGroup = self.Yuuki.data.getSEGroup(SecurityInfo["GroupID"])
if SEGroup is None:
SecurityInfo["Security_Access"] = self.Yuuki.data.getData(["Global", "SecurityService"])
elif SEGroup[ncMessage.type]:
SecurityInfo["Security_Access"] = SEGroup[ncMessage.type]
se_group = self.Yuuki.data.get_se_group(security_info["GroupID"])
if se_group is None:
security_info["Security_Access"] = self.Yuuki.data.get_data(["Global", "SecurityService"])
elif se_group[operation.type]:
security_info["Security_Access"] = se_group[operation.type]
else:
SecurityInfo["Security_Access"] = False
security_info["Security_Access"] = False
if self.Yuuki.data.getData(["Global", "SecurityService"]):
if self.Yuuki.data.get_data(["Global", "SecurityService"]):
{
OpType.NOTIFIED_UPDATE_GROUP: self._NOTIFIED_UPDATE_GROUP,
OpType.NOTIFIED_INVITE_INTO_GROUP: self._NOTIFIED_INVITE_INTO_GROUP,
OpType.NOTIFIED_ACCEPT_GROUP_INVITATION: self._NOTIFIED_ACCEPT_GROUP_INVITATION,
OpType.NOTIFIED_KICKOUT_FROM_GROUP: self._NOTIFIED_KICKOUT_FROM_GROUP
}[ncMessage.type](GroupInfo, SecurityInfo, ncMessage)
OpType.NOTIFIED_UPDATE_GROUP: self._notified_update_group,
OpType.NOTIFIED_INVITE_INTO_GROUP: self._notified_invite_into_group,
OpType.NOTIFIED_ACCEPT_GROUP_INVITATION: self._notified_accept_group_invitation,
OpType.NOTIFIED_KICKOUT_FROM_GROUP: self._notified_kickout_from_group
}[operation.type](group, security_info, operation)

View file

@ -11,10 +11,10 @@ import time
from yuuki_core.ttypes import Operation
from .tools import Yuuki_StaticTools, Yuuki_DynamicTools
from .tools import YuukiStaticTools, YuukiDynamicTools
class Yuuki_Poll:
class YuukiPoll:
Power = True
NoWork = 0
@ -25,22 +25,22 @@ class Yuuki_Poll:
def __init__(self, Yuuki):
self.Yuuki = Yuuki
self.Yuuki_DynamicTools = Yuuki_DynamicTools(self.Yuuki)
self.YuukiDynamicTools = YuukiDynamicTools(self.Yuuki)
def _action(self):
ncMessage = Operation()
operation = Operation()
if time.localtime().tm_hour != self.Yuuki.data.getData(["Global", "LastResetLimitTime"]):
self.Yuuki_DynamicTools.limitReset()
self.Yuuki.data.updateData(["Global", "LastResetLimitTime"], time.localtime().tm_hour)
if time.localtime().tm_hour != self.Yuuki.data.get_data(["Global", "LastResetLimitTime"]):
self.YuukiDynamicTools.reset_limit()
self.Yuuki.data.update_data(["Global", "LastResetLimitTime"], time.localtime().tm_hour)
if self.NoWork >= self.NoWorkLimit:
self.NoWork = 0
for ncMessage in self.cacheOperations:
if ncMessage.reqSeq != -1 and ncMessage.revision > self.Yuuki.revision:
self.Yuuki.revision = ncMessage.revision
for operation in self.cacheOperations:
if operation.reqSeq != -1 and operation.revision > self.Yuuki.revision:
self.Yuuki.revision = operation.revision
break
if ncMessage.revision != self.Yuuki.revision:
if operation.revision != self.Yuuki.revision:
self.Yuuki.revision = self.Yuuki.client.getLastOpRevision()
try:
@ -50,45 +50,45 @@ class Yuuki_Poll:
if self.cacheOperations:
self.NoWork = 0
self.Yuuki.threadExec(self.Yuuki.taskDemux, (self.cacheOperations,))
self.Yuuki.thread_append(self.Yuuki.task_executor, (self.cacheOperations,))
if len(self.cacheOperations) > 1:
self.Yuuki.revision = max(self.cacheOperations[-1].revision, self.cacheOperations[-2].revision)
def _exception(self):
(err1, err2, err3, ErrorInfo) = Yuuki_StaticTools.errorReport()
(err1, err2, err3, error_info) = YuukiStaticTools.report_error()
ncMessage = Operation()
operation = Operation()
# noinspection PyBroadException
try:
for ncMessage in self.cacheOperations:
if ncMessage.reqSeq != -1 and ncMessage.revision > self.Yuuki.revision:
self.Yuuki.revision = ncMessage.revision
for operation in self.cacheOperations:
if operation.reqSeq != -1 and operation.revision > self.Yuuki.revision:
self.Yuuki.revision = operation.revision
break
if ncMessage.revision != self.Yuuki.revision:
if operation.revision != self.Yuuki.revision:
self.Yuuki.revision = self.Yuuki.client.getLastOpRevision()
for Root in self.Yuuki.Admin:
self.Yuuki_DynamicTools.sendText(
self.YuukiDynamicTools.send_text(
Root,
"Star Yuuki BOT - Something was wrong...\nError:\n%s\n%s\n%s\n\n%s" %
(err1, err2, err3, ErrorInfo)
(err1, err2, err3, error_info)
)
except:
print("Star Yuuki BOT - Damage!\nError:\n%s\n%s\n%s\n\n%s" % (err1, err2, err3, ErrorInfo))
print("Star Yuuki BOT - Damage!\nError:\n%s\n%s\n%s\n\n%s" % (err1, err2, err3, error_info))
self.Yuuki.exit()
def init(self):
self.Yuuki.data.updateData(["Global", "Power"], self.Power)
self.Yuuki.data.update_data(["Global", "Power"], self.Power)
if "LastResetLimitTime" not in self.Yuuki.data.getData(["Global"]):
self.Yuuki.data.updateData(["Global", "LastResetLimitTime"], None)
if "LastResetLimitTime" not in self.Yuuki.data.get_data(["Global"]):
self.Yuuki.data.update_data(["Global", "LastResetLimitTime"], None)
while self.Power:
# noinspection PyBroadException
try:
self._action()
try:
self.Power = self.Yuuki.data.syncData()
self.Power = self.Yuuki.data.sync_data()
except ConnectionRefusedError:
self.Power = False

View file

@ -10,7 +10,7 @@ import multiprocessing
import threading
class Yuuki_Thread:
class YuukiThread:
def __init__(self):
self.lock = threading.Lock()
@ -20,13 +20,13 @@ class Yuuki_Thread:
added_thread.start()
@staticmethod
def getThreadInfo():
def get_thread_info():
print(threading.active_count())
print(threading.enumerate())
print("{} add Threading\n".format(threading.current_thread()))
print(f"{threading.current_thread()} add Threading\n")
class Yuuki_Multiprocess:
class YuukiMultiprocess:
multiprocess_list = {}
def add(self, function, args=()):

View file

@ -17,62 +17,62 @@ import requests
from yuuki_core.ttypes import OpType, MIDType, ContentType, Group, Message
class Yuuki_StaticTools:
class YuukiStaticTools:
@staticmethod
def sybGetGroupCreator(groupInfo):
def get_group_creator(group):
"""
Get the LINE Group Creator
:param groupInfo: LINE Group
:param group: LINE Group
:return: LINE Contact
"""
if groupInfo.creator is None:
contact = groupInfo.members[0]
if group.creator is None:
contact = group.members[0]
else:
contact = groupInfo.creator
contact = group.creator
return contact
@staticmethod
def readCommandLine(msgs):
def read_commands(list_):
"""
Read string list as a command line
:param msgs: List of strings
:param list_: List of strings
:return: string
"""
reply_msg = " ".join(msgs).lstrip()
reply_msg = " ".join(list_).lstrip()
return reply_msg
@staticmethod
def errorReport():
def report_error():
"""
Report errors as tuple
reporting_message errors as tuple
:return: tuple
"""
err1, err2, err3 = sys.exc_info()
traceback.print_tb(err3)
tb_info = traceback.extract_tb(err3)
filename, line, func, text = tb_info[-1]
ErrorInfo = "occurred in\n{}\n\non line {}\nin statement {}".format(filename, line, text)
return err1, err2, err3, ErrorInfo
error_info = f"occurred in\n{filename}\n\non line {line}\nin statement {text}"
return err1, err2, err3, error_info
@staticmethod
def securityForWhere(ncMessage):
def security_for_where(operation):
"""
Return arguments for security tasks
:param ncMessage: Operation
:param operation: Operation
:return: tuple
"""
if ncMessage.type == OpType.NOTIFIED_UPDATE_GROUP or \
ncMessage.type == OpType.NOTIFIED_INVITE_INTO_GROUP or \
ncMessage.type == OpType.NOTIFIED_ACCEPT_GROUP_INVITATION or \
ncMessage.type == OpType.NOTIFIED_KICKOUT_FROM_GROUP:
if operation.type == OpType.NOTIFIED_UPDATE_GROUP or \
operation.type == OpType.NOTIFIED_INVITE_INTO_GROUP or \
operation.type == OpType.NOTIFIED_ACCEPT_GROUP_INVITATION or \
operation.type == OpType.NOTIFIED_KICKOUT_FROM_GROUP:
return {
"GroupID": ncMessage.param1,
"Action": ncMessage.param2,
"Another": ncMessage.param3,
"GroupID": operation.param1,
"Action": operation.param2,
"Another": operation.param3,
}
@staticmethod
def dictShuffle(dict_object, requirement=None):
def dict_shuffle(dict_object, requirement=None):
"""
Shuffle dicts
:param dict_object: dict
@ -91,70 +91,70 @@ class Yuuki_StaticTools:
return result
@staticmethod
def sendToWho(ncMessage):
def send_to_who(operation):
"""
Get who to send with Operation
:param ncMessage: Operation
:param operation: Operation
:return: string
"""
if ncMessage.message.toType == MIDType.USER:
return ncMessage.message.from_
elif ncMessage.message.toType == MIDType.ROOM:
return ncMessage.message.to
elif ncMessage.message.toType == MIDType.GROUP:
return ncMessage.message.to
if operation.message.toType == MIDType.USER:
return operation.message.from_
elif operation.message.toType == MIDType.ROOM:
return operation.message.to
elif operation.message.toType == MIDType.GROUP:
return operation.message.to
class Yuuki_DynamicTools:
class YuukiDynamicTools:
def __init__(self, Yuuki):
self.Yuuki = Yuuki
def getClient(self, userId):
def get_client(self, user_id):
"""
Get client by account userId
:param userId: string
Get client by account user_id
:param user_id: string
:return: TalkServiceClient
"""
if userId == self.Yuuki.MyMID:
if user_id == self.Yuuki.MyMID:
return self.Yuuki.client
return self.Yuuki.Connect.helper[userId].get("client")
return self.Yuuki.Connect.helper[user_id].get("client")
def checkInInvitationList(self, ncMessage, userId=None):
def check_invitation(self, operation, user_id=None):
"""
Check If userId in Invitation List
:param ncMessage: Operation
:param userId: string
Check If user_id in Invitation List
:param operation: Operation
:param user_id: string
:return: boolean
"""
if userId is None:
userId = self.Yuuki.MyMID
if ncMessage.param3 == userId:
if user_id is None:
user_id = self.Yuuki.MyMID
if operation.param3 == user_id:
return True
elif "\x1e" in ncMessage.param3:
if userId in ncMessage.param3.split("\x1e"):
elif "\x1e" in operation.param3:
if user_id in operation.param3.split("\x1e"):
return True
return False
def changeGroupUrlStatus(self, groupInfo, status, handlerId=None):
def switch_group_url_status(self, group, status, handlerId=None):
"""
Change LINE Group URL Status
:param groupInfo: Line Group
:param group: Line Group
:param status: boolean
:param handlerId: string
:return: None
"""
result = Group()
for key in groupInfo.__dict__:
for key in group.__dict__:
if key != "members" or key != "invitee":
result.__dict__[key] = groupInfo.__dict__[key]
result.__dict__[key] = group.__dict__[key]
result.preventJoinByTicket = not status
handler = self.Yuuki.MyMID if handlerId is None else handlerId
self.getClient(handler).updateGroup(self.Yuuki.Seq, result)
self.get_client(handler).updateGroup(self.Yuuki.Seq, result)
def configSecurityStatus(self, groupId, status):
def config_security_status(self, group_id, status):
"""
Configure LINE Group Security Status for Yuuki
:param groupId: string
:param group_id: string
:param status: boolean
:return: None
"""
@ -168,156 +168,153 @@ class Yuuki_DynamicTools:
if 3 in status:
group_status[OpType.NOTIFIED_KICKOUT_FROM_GROUP] = True
self.Yuuki.data.updateData(["Group", groupId, "SEGroup"], group_status)
self.Yuuki.data.update_data(["Group", group_id, "SEGroup"], group_status)
def cleanMyGroupInvitations(self):
def clean_my_group_invitations(self):
"""
Clean personal group invitations for LINE account
:return: None
"""
for client in [self.getClient(userId) for userId in self.Yuuki.MyMID + self.Yuuki.Connect.helper.keys()]:
for client in [self.get_client(user_id) for user_id in self.Yuuki.MyMID + self.Yuuki.Connect.helper.keys()]:
for cleanInvitations in client.getGroupIdsInvited():
client.acceptGroupInvitation(self.Yuuki.Seq, cleanInvitations)
client.leaveGroup(self.Yuuki.Seq, cleanInvitations)
client.leave_group(self.Yuuki.Seq, cleanInvitations)
def leaveGroup(self, groupInfo):
def leave_group(self, group):
"""
Leave a group by its group information object
:param groupInfo: Line Group
:param group: Line Group
:return: None
"""
self.sendText(groupInfo.id, self.Yuuki.get_text("Bye Bye"))
self.getClient(self.Yuuki.MyMID).leaveGroup(self.Yuuki.Seq, groupInfo.id)
for userId in self.Yuuki.Connect.helper:
if userId in [member.mid for member in groupInfo.members]:
self.getClient(userId).leaveGroup(self.Yuuki.Seq, groupInfo.id)
GroupList = self.Yuuki.data.getData(["Global", "GroupJoined"])
NewGroupList = GroupList.copy()
NewGroupList.remove(groupInfo.id)
self.Yuuki.data.updateData(["Global", "GroupJoined"], NewGroupList)
self.send_text(group.id, self.Yuuki.get_text("Bye Bye"))
self.get_client(self.Yuuki.MyMID).leave_group(self.Yuuki.Seq, group.id)
for user_id in self.Yuuki.Connect.helper:
if user_id in [member.mid for member in group.members]:
self.get_client(user_id).leave_group(self.Yuuki.Seq, group.id)
group_list = self.Yuuki.data.get_data(["Global", "GroupJoined"])
new_group_list = group_list.copy()
new_group_list.remove(group.id)
self.Yuuki.data.update_data(["Global", "GroupJoined"], new_group_list)
def getContact(self, userId):
def get_contact(self, user_id):
"""
Get LINE Contact information with userId
:param userId: string
Get LINE Contact information with user_id
:param user_id: string
:return: LINE Contact
"""
if len(userId) == len(self.Yuuki.MyMID) and userId[0] == "u":
if len(user_id) == len(self.Yuuki.MyMID) and user_id.startswith("u"):
# noinspection PyBroadException
try:
contactInfo = self.getClient(self.Yuuki.MyMID).getContact(userId)
return self.get_client(self.Yuuki.MyMID).getContact(user_id)
except:
contactInfo = False
else:
contactInfo = False
return contactInfo
return False
return False
def getGroupTicket(self, groupId, userId, renew=False):
def get_group_ticket(self, group_id, user_id, renew=False):
"""
Get LINE Group Ticket with groupId and userId
:param groupId: string
:param userId: string
Get LINE Group Ticket with group_id and user_id
:param group_id: string
:param user_id: string
:param renew: boolean
:return: string
"""
GroupTicket = ""
GroupData = self.Yuuki.data.getGroup(groupId)
if "GroupTicket" in GroupData:
if GroupData["GroupTicket"].get(userId) is not None:
GroupTicket = GroupData["GroupTicket"].get(userId)
group_ticket = ""
group = self.Yuuki.data.get_group(group_id)
if "GroupTicket" in group:
if group["GroupTicket"].get(user_id) is not None:
group_ticket = group["GroupTicket"].get(user_id)
else:
assert "Error JSON data type - GroupTicket"
if GroupTicket == "" or renew:
GroupTicket = self.getClient(userId).reissueGroupTicket(groupId)
self.Yuuki.data.updateData(
["Group", groupId, "GroupTicket", userId], GroupTicket)
return GroupTicket
if group_ticket == "" or renew:
group_ticket = self.get_client(user_id).reissuegroup_ticket(group_id)
self.Yuuki.data.update_data(["Group", group_id, "GroupTicket", user_id], group_ticket)
return group_ticket
def limitReset(self):
def reset_limit(self):
"""
Reset Yuuki modify LINE Group Member List limits
:return: None
"""
for userId in self.Yuuki.AllAccountIds:
self.Yuuki.data.updateData(
["LimitInfo", "KickLimit", userId], self.Yuuki.KickLimit)
self.Yuuki.data.updateData(
["LimitInfo", "CancelLimit", userId], self.Yuuki.CancelLimit)
for user_id in self.Yuuki.AllAccountIds:
self.Yuuki.data.update_data(
["LimitInfo", "KickLimit", user_id], self.Yuuki.KickLimit)
self.Yuuki.data.update_data(
["LimitInfo", "CancelLimit", user_id], self.Yuuki.CancelLimit)
def modifyGroupMemberList(self, action, groupInfo, userId, exceptUserId=None):
def modify_group_members(self, action, group, user_id, except_user_id=None):
"""
Modify LINE Group Member List
:param action: integer (1->kick 2->cancel)
:param groupInfo: LINE Group
:param userId: string
:param exceptUserId: List of userId
:param group: LINE Group
:param user_id: string
:param except_user_id: List of user_id
:return: string
"""
actions = {
1: {
"command": "KickLimit",
"message": "Kick Limit.",
"function": lambda handlerId: self.getClient(handlerId).kickoutFromGroup
"function": lambda handler_id: self.get_client(handler_id).kickoutFromGroup
},
2: {
"command": "CancelLimit",
"message": "Cancel Limit.",
"function": lambda handlerId: self.getClient(handlerId).cancelGroupInvitation
"function": lambda handler_id: self.get_client(handler_id).cancelGroupInvitation
}
}
assert action in actions, "Invalid action code"
if len(self.Yuuki.Connect.helper) >= 1:
members = [member.mid for member in groupInfo.members if member.mid in self.Yuuki.AllAccountIds]
accounts = Yuuki_StaticTools.dictShuffle(
self.Yuuki.data.getData(["LimitInfo", actions[action].get("command")]), members)
members = [member.mid for member in group.members if member.mid in self.Yuuki.AllAccountIds]
accounts = YuukiStaticTools.dict_shuffle(
self.Yuuki.data.get_data(["LimitInfo", actions[action].get("command")]), members)
if len(accounts) == 0:
return "None"
if exceptUserId:
accounts[exceptUserId] = -1
if except_user_id:
accounts[except_user_id] = -1
helper = max(accounts, key=accounts.get)
else:
if exceptUserId == self.Yuuki.MyMID:
if except_user_id == self.Yuuki.MyMID:
return "None"
helper = self.Yuuki.MyMID
Limit = self.Yuuki.data.getData(["LimitInfo", actions[action].get("command"), helper])
if Limit > 0:
actions[action].get("function")(helper)(self.Yuuki.Seq, groupInfo.id, [userId])
self.Yuuki.data.limitDecrease(actions[action].get("command"), helper)
limit = self.Yuuki.data.get_data(["LimitInfo", actions[action].get("command"), helper])
if limit > 0:
actions[action].get("function")(helper)(self.Yuuki.Seq, group.id, [user_id])
self.Yuuki.data.trigger_limit(actions[action].get("command"), helper)
else:
self.sendText(groupInfo.id, self.Yuuki.get_text(actions[action].get("message")), helper)
self.send_text(group.id, self.Yuuki.get_text(actions[action].get("message")), helper)
return helper
def sendText(self, send_to, msg, senderId=None):
def send_text(self, send_to, msg, sender_id=None):
"""
Send text to LINE Chat
:param send_to: The target to received
:param msg: The message hope to send
:param senderId: The client specified to send the message
:param sender_id: The client specified to send the message
:return: None
"""
message = Message(to=send_to, text=msg)
sender = self.Yuuki.MyMID if senderId is None else senderId
self.getClient(sender).sendMessage(self.Yuuki.Seq, message)
sender = self.Yuuki.MyMID if sender_id is None else sender_id
self.get_client(sender).sendMessage(self.Yuuki.Seq, message)
def sendUser(self, send_to, userId):
def send_user(self, send_to, user_id):
"""
Send LINE contact to LINE Chat
:param send_to: string
:param userId: string
:param user_id: string
:return: None
"""
message = Message(
contentType=ContentType.CONTACT,
text='',
contentMetadata={
'mid': userId,
'mid': user_id,
'displayName': 'LINE User',
},
to=send_to
)
self.getClient(self.Yuuki.MyMID).sendMessage(self.Yuuki.Seq, message)
self.get_client(self.Yuuki.MyMID).sendMessage(self.Yuuki.Seq, message)
def sendMedia(self, send_to, send_type, path):
def send_media(self, send_to, send_type, path):
"""
Send media file to LINE Chat
:param send_to: string
@ -339,7 +336,7 @@ class Yuuki_DynamicTools:
media_name = file_name
else:
media_name = 'media'
message_id = self.getClient(self.Yuuki.MyMID).sendMessage(
message_id = self.get_client(self.Yuuki.MyMID).sendMessage(
self.Yuuki.Seq, message).id
files = {
'file': open(path, 'rb'),
@ -357,4 +354,4 @@ class Yuuki_DynamicTools:
url = self.Yuuki.LINE_Media_server + '/talk/m/upload.nhn'
r = requests.post(url, headers=self.Yuuki.connectHeader, data=data, files=files)
if r.status_code != 201:
self.sendText(send_to, "Error!")
self.send_text(send_to, "Error!")

View file

@ -7,20 +7,16 @@ License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
from bs4 import BeautifulSoup
from ..data import YuukiData
class Yuuki_WebDataReader:
def __init__(self, Yuuki_Data):
self.handle = Yuuki_Data
class YuukiWebDataReader:
def __init__(self, yuuki_data: YuukiData):
self.handle = yuuki_data
def get_logs(self, name):
if name not in self.handle.LogType:
return {"status": 404}
with open(
self.handle.LogPath
+
self.handle.LogName.format(name)
) as file:
with open(f"{self.handle.LogPath}{self.handle.LogName.format(name)}") as file:
html_doc = file.read()
parser = BeautifulSoup(html_doc, 'html.parser')
events = parser.find_all('li')

View file

@ -17,14 +17,16 @@ from flask import Flask, render_template, request, redirect, jsonify
from flask_bootstrap import Bootstrap
from gevent.pywsgi import WSGIServer
from .reader import Yuuki_WebDataReader
from ..tools import Yuuki_DynamicTools
from ..yuuki import Yuuki
from ..data import YuukiData
from .reader import YuukiWebDataReader
from ..tools import YuukiDynamicTools
wa_app = Flask(__name__)
Yuuki_Handle = None
Yuuki_Handle_Data = None
Yuuki_APIHandle_Data = None
Yuuki_Handle = Yuuki
Yuuki_Handle_Data = YuukiData
Yuuki_APIHandle_Data = YuukiWebDataReader
passports = []
password = str(hash(random.random()))
@ -47,7 +49,7 @@ def authorized_response(function):
return wrapper
class Yuuki_WebAdmin:
class YuukiWebAdmin:
Bootstrap(wa_app)
http_server = None
@ -55,7 +57,7 @@ class Yuuki_WebAdmin:
global Yuuki_Handle, Yuuki_Handle_Data, Yuuki_APIHandle_Data
Yuuki_Handle = Yuuki
Yuuki_Handle_Data = Yuuki.data
Yuuki_APIHandle_Data = Yuuki_WebDataReader(Yuuki_Handle_Data)
Yuuki_APIHandle_Data = YuukiWebDataReader(Yuuki_Handle_Data)
self.port = port
@staticmethod
@ -77,7 +79,7 @@ class Yuuki_WebAdmin:
status = True
return render_template(
'/index.html',
name=Yuuki_Handle.YuukiConfigs["name"],
name=Yuuki_Handle.configs["name"],
authorized=status
)
@ -131,7 +133,7 @@ class Yuuki_WebAdmin:
if request.method == "GET":
return {
"id": Yuuki_Handle.profile.mid,
"version": Yuuki_Handle.YuukiConfigs["version"],
"version": Yuuki_Handle.configs["version"],
"name": Yuuki_Handle.profile.displayName,
"status": Yuuki_Handle.profile.statusMessage,
"picture": f"{Yuuki_Handle.LINE_Media_server}/{Yuuki_Handle.profile.pictureStatus}"
@ -140,7 +142,7 @@ class Yuuki_WebAdmin:
if request.method == "PUT" and "name" in request.values and "status" in request.values:
Yuuki_Handle.profile.displayName = request.values["name"]
Yuuki_Handle.profile.statusMessage = request.values["status"]
Yuuki_DynamicTools(Yuuki_Handle).getClient(Yuuki_Handle.MyMID).updateProfile(
YuukiDynamicTools(Yuuki_Handle).get_client(Yuuki_Handle.MyMID).updateProfile(
Yuuki_Handle.Seq, Yuuki_Handle.profile
)
return {"status": 200}
@ -151,16 +153,16 @@ class Yuuki_WebAdmin:
@authorized_response
def groups():
if request.method == "GET":
return Yuuki_Handle_Data.getData(["Global", "GroupJoined"])
return Yuuki_Handle_Data.get_data(["Global", "GroupJoined"])
if request.method == "POST" and "id" in request.values:
return Yuuki_DynamicTools(Yuuki_Handle).getClient(Yuuki_Handle.MyMID).acceptGroupInvitation(
return YuukiDynamicTools(Yuuki_Handle).get_client(Yuuki_Handle.MyMID).acceptGroupInvitation(
Yuuki_Handle.Seq, request.values["id"]
)
if request.method == "DELETE" and "id" in request.values:
group_information = Yuuki_DynamicTools(Yuuki_Handle).getClient(
group_information = YuukiDynamicTools(Yuuki_Handle).get_client(
Yuuki_Handle.MyMID
).getGroup(request.values["id"])
return Yuuki_DynamicTools(Yuuki_Handle).leaveGroup(group_information)
return YuukiDynamicTools(Yuuki_Handle).leave_group(group_information)
return {"status": 400}
@staticmethod
@ -168,7 +170,7 @@ class Yuuki_WebAdmin:
@authorized_response
def group_ticket():
if "id" in request.values and "ticket" in request.values:
return Yuuki_DynamicTools(Yuuki_Handle).getClient(Yuuki_Handle.MyMID).acceptGroupInvitationByTicket(
return YuukiDynamicTools(Yuuki_Handle).get_client(Yuuki_Handle.MyMID).acceptGroupInvitationByTicket(
Yuuki_Handle.Seq, request.values["id"], request.values["ticket"]
)
return {"status": 400}
@ -193,7 +195,7 @@ class Yuuki_WebAdmin:
return [
type_handle(obj)
for obj in Yuuki_DynamicTools(Yuuki_Handle).getClient(Yuuki_Handle.MyMID).getGroups(read_id_list)
for obj in YuukiDynamicTools(Yuuki_Handle).get_client(Yuuki_Handle.MyMID).getGroups(read_id_list)
]
return {"status": 400}
@ -211,8 +213,8 @@ class Yuuki_WebAdmin:
}
return [
info_handle(Yuuki_Handle.Connect.helper[userId].get("profile"))
for userId in Yuuki_Handle.Connect.helper
info_handle(Yuuki_Handle.Connect.helper[user_id].get("profile"))
for user_id in Yuuki_Handle.Connect.helper
]
@staticmethod
@ -233,14 +235,14 @@ class Yuuki_WebAdmin:
def broadcast():
if "message" in request.values and "audience" in request.values and request.values["message"]:
audience_ids = {
"groups": lambda: Yuuki_Handle_Data.getData(
"groups": lambda: Yuuki_Handle_Data.get_data(
["Global", "GroupJoined"]
)
}
if request.values["audience"] not in audience_ids:
return {"status": "404"}
return [
Yuuki_DynamicTools(Yuuki_Handle).sendText(target_id, request.values["message"])
YuukiDynamicTools(Yuuki_Handle).send_text(target_id, request.values["message"])
for target_id in audience_ids[request.values["audience"]]()
]
return {"status": 400}
@ -251,7 +253,7 @@ class Yuuki_WebAdmin:
def shutdown():
LINE_ACCOUNT_SECURITY_NOTIFY_ID = "u085311ecd9e3e3d74ae4c9f5437cbcb5"
# The ID belongs to an official account, which is controlled by SysOp of LINE.
Yuuki_DynamicTools(Yuuki_Handle).sendText(
YuukiDynamicTools(Yuuki_Handle).send_text(
LINE_ACCOUNT_SECURITY_NOTIFY_ID,
"[Yuuki] Remote Shutdown"
)

View file

@ -25,7 +25,7 @@ export default {
{{getGroupStatus(group)}}
</p>
<p v-if="!checkSystemMessage(group)">
<a href="#" class="text-danger" title="Leave" @click.prevent="leaveGroup(group.id)">
<a href="#" class="text-danger" title="Leave" @click.prevent="leave_group(group.id)">
<svg width="30px" height="30px" viewBox="0 0 16 16" class="bi bi-door-open" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
<path fill-rule="evenodd" d="M1 15.5a.5.5 0 0 1 .5-.5h13a.5.5 0 0 1 0 1h-13a.5.5 0 0 1-.5-.5zM11.5 2H11V1h.5A1.5 1.5 0 0 1 13 2.5V15h-1V2.5a.5.5 0 0 0-.5-.5z"/>
<path fill-rule="evenodd" d="M10.828.122A.5.5 0 0 1 11 .5V15h-1V1.077l-6 .857V15H3V1.5a.5.5 0 0 1 .43-.495l7-1a.5.5 0 0 1 .398.117z"/>
@ -49,16 +49,16 @@ export default {
const inviteeCount = group.invitee ? group.invitee.length : 0
return `Members:${membersCount} Invites:${inviteeCount}`;
},
leaveGroup(groupId) {
leave_group(group_id) {
const checkpoint = confirm("The group will be removed from the BOT, are you sure?");
if (!checkpoint) return;
let body = new FormData();
body.append("id", groupId);
body.append("id", group_id);
fetch("/api/groups", {
method: "DELETE",
body
});
const deleteIndex = this.groupList.findIndex(group => group.id === groupId);
const deleteIndex = this.groupList.findIndex(group => group.id === group_id);
this.groupList.splice(deleteIndex, 1);
},
async fetchGroupsJoined() {
@ -69,8 +69,8 @@ export default {
.catch(reject)
.then(resolve));
},
async fetchGroupsInfo(groupIds) {
return await new Promise((resolve, reject) => fetch(`/api/groups/${groupIds.join(',')}`, {
async fetchGroupsInfo(group_ids) {
return await new Promise((resolve, reject) => fetch(`/api/groups/${group_ids.join(',')}`, {
credentials: "same-origin"
})
.then((body) => body.json())

View file

@ -15,40 +15,40 @@ import time
from git import Repo
from yuuki_core.ttypes import OpType
from .connection import Yuuki_Connect
from .data import Yuuki_Data
from .events import Yuuki_Command, Yuuki_JoinGroup, Yuuki_Security, Yuuki_Callback
from .connection import YuukiConnect
from .data import YuukiData
from .events import YuukiCommand, YuukiJoinGroup, YuukiSecurity, YuukiCallback
from .i18n import Yuuki_LangSetting
from .poll import Yuuki_Poll
from .thread_control import Yuuki_Multiprocess
from .webadmin.server import Yuuki_WebAdmin
from .poll import YuukiPoll
from .thread_control import YuukiMultiprocess
from .webadmin.server import YuukiWebAdmin
class Yuuki:
def __init__(self, Yuuki_Settings):
def __init__(self, yuuki_config):
self.Connect = Yuuki_Connect(Yuuki_Settings)
self.YuukiConfigs = Yuuki_Settings.systemConfig
self.Connect = YuukiConnect(yuuki_config)
self.configs = yuuki_config.systemConfig
# Static_Variable
self.Thread_Control = Yuuki_Multiprocess()
self.Thread_Control = YuukiMultiprocess()
self.Seq = self.YuukiConfigs["Seq"]
self.Admin = self.YuukiConfigs["Admin"]
self.Threading = self.YuukiConfigs["Advanced"]
self.KickLimit = self.YuukiConfigs["Hour_KickLimit"]
self.CancelLimit = self.YuukiConfigs["Hour_CancelLimit"]
self.i18n = Yuuki_LangSetting(self.YuukiConfigs["Default_Language"])
self.Seq = self.configs["Seq"]
self.Admin = self.configs["Admin"]
self.Threading = self.configs["Advanced"]
self.KickLimit = self.configs["Hour_KickLimit"]
self.CancelLimit = self.configs["Hour_CancelLimit"]
self.i18n = Yuuki_LangSetting(self.configs["Default_Language"])
self.LINE_Media_server = "https://obs.line-apps.com"
self._Version_Check()
self._version_check()
def _Version_Check(self):
def _version_check(self):
git_result = "Unknown"
origin_url = "https://github.com/star-inc/star_yuuki_bot.git"
if self.YuukiConfigs["version_check"]:
if self.configs["version_check"]:
git_remote = Repo('.').remote()
update_status = git_remote.fetch()[0]
if update_status.flags == 64:
@ -57,9 +57,9 @@ class Yuuki:
git_result = "This is the latest version."
origin_url = "\n".join(git_remote.urls)
return self._Announce_Dog(git_result, origin_url)
return self._announce_dog(git_result, origin_url)
def _Announce_Dog(self, git_result, origin_url):
def _announce_dog(self, git_result, origin_url):
print(
"\n{} {}\n"
"\t===\n\n"
@ -67,57 +67,57 @@ class Yuuki:
"Update Origin:\n"
"\t{}\n\n\t\t\t\t\t"
"{}\n\t{}\n".format(
self.YuukiConfigs["name"],
self.YuukiConfigs["version"],
self.configs["name"],
self.configs["version"],
git_result,
origin_url,
self.YuukiConfigs["copyright"],
self.configs["copyright"],
"\t==" * 5
)
)
self._LINE_Login()
self._login_line()
def _LINE_Login(self):
def _login_line(self):
(self.client, self.listen) = self.Connect.connect()
if self.YuukiConfigs.get("helper_LINE_ACCESS_KEYs"):
for access in self.YuukiConfigs["helper_LINE_ACCESS_KEYs"]:
if self.configs.get("helper_LINE_ACCESS_KEYs"):
for access in self.configs["helper_LINE_ACCESS_KEYs"]:
self.Connect.helperConnect(access)
# Dynamic Variable
self.get_text = self.i18n.gettext
self.JoinGroup = Yuuki_JoinGroup(self).action
self.Command = Yuuki_Command(self).action
self.Security = Yuuki_Security(self).action
self.Callback = Yuuki_Callback(self).action
self.JoinGroup = YuukiJoinGroup(self).action
self.Command = YuukiCommand(self).action
self.Security = YuukiSecurity(self).action
self.Callback = YuukiCallback(self).action
mds_port = self.YuukiConfigs["MDS_Port"]
self.data = Yuuki_Data(self.Threading, mds_port)
mds_port = self.configs["MDS_Port"]
self.data = YuukiData(self.Threading, mds_port)
self.data.updateData(["Global", "GroupJoined"], self.client.getGroupIdsJoined())
self.data.updateData(["Global", "SecurityService"], self.YuukiConfigs["SecurityService"])
self._Initialize()
self.data.update_data(["Global", "GroupJoined"], self.client.getGroupIdsJoined())
self.data.update_data(["Global", "SecurityService"], self.configs["SecurityService"])
self._initialize()
def _Initialize(self):
def _initialize(self):
self.profile = self.client.getProfile()
self.MyMID = self.profile.mid
self.revision = self.client.getLastOpRevision()
self.AllAccountIds = [self.MyMID]
for userId in self.Connect.helper:
self.AllAccountIds.append(userId)
for user_id in self.Connect.helper:
self.AllAccountIds.append(user_id)
if len(self.data.getData(["LimitInfo"])) != 2:
self.data.updateData(["LimitInfo"], self.data.LimitType)
self._Setup_WebAdmin()
if len(self.data.get_data(["LimitInfo"])) != 2:
self.data.update_data(["LimitInfo"], self.data.LimitType)
self._setup_web_admin()
def _Setup_WebAdmin(self):
if self.Threading and self.YuukiConfigs.get("WebAdmin"):
wa_port = self.YuukiConfigs["WebAdmin_Port"]
def _setup_web_admin(self):
if self.Threading and self.configs.get("WebAdmin"):
wa_port = self.configs["WebAdmin_Port"]
password = str(hash(random.random()))
self.web_admin = Yuuki_WebAdmin(self, wa_port)
self.web_admin = YuukiWebAdmin(self, wa_port)
self.web_admin.set_password(password)
self.Thread_Control.add(self.web_admin.wa_listen)
print(
@ -130,13 +130,13 @@ class Yuuki:
def exit(self, restart=False):
print("System Exit")
while self.data.syncData():
self.data.updateData(["Global", "Power"], False)
while self.data.sync_data():
self.data.update_data(["Global", "Power"], False)
if self.Threading:
self.data.mdsShake("EXT", None, None)
self.data.mds_shake("EXT", None, None)
time.sleep(1)
self.data.MdsThreadControl.stop("mds_listen")
if self.YuukiConfigs.get("WebAdmin"):
if self.configs.get("WebAdmin"):
self.data.MdsThreadControl.stop("wa_listen")
if restart:
self.__restart()
@ -157,27 +157,27 @@ class Yuuki:
else:
print("Star Yuuki BOT - Restart Error\n\nUnknown Platform")
def threadExec(self, function, args):
def thread_append(self, function, args):
if self.Threading:
self.Thread_Control.add(function, args)
else:
function(*args)
def taskDemux(self, catched_news):
for ncMessage in catched_news:
if ncMessage.type == OpType.NOTIFIED_INVITE_INTO_GROUP:
self.threadExec(self.JoinGroup, (ncMessage,))
elif ncMessage.type == OpType.NOTIFIED_KICKOUT_FROM_GROUP:
self.threadExec(self.Security, (ncMessage,))
elif ncMessage.type == OpType.NOTIFIED_ACCEPT_GROUP_INVITATION:
self.threadExec(self.Security, (ncMessage,))
elif ncMessage.type == OpType.NOTIFIED_UPDATE_GROUP:
self.threadExec(self.Security, (ncMessage,))
elif ncMessage.type == OpType.RECEIVE_MESSAGE:
self.threadExec(self.Command, (ncMessage,))
elif ncMessage.type == OpType.SEND_MESSAGE:
self.threadExec(self.Callback, (ncMessage,))
def task_executor(self, operations):
for operation in operations:
if operation.type == OpType.NOTIFIED_INVITE_INTO_GROUP:
self.thread_append(self.JoinGroup, (operation,))
elif operation.type == OpType.NOTIFIED_KICKOUT_FROM_GROUP:
self.thread_append(self.Security, (operation,))
elif operation.type == OpType.NOTIFIED_ACCEPT_GROUP_INVITATION:
self.thread_append(self.Security, (operation,))
elif operation.type == OpType.NOTIFIED_UPDATE_GROUP:
self.thread_append(self.Security, (operation,))
elif operation.type == OpType.RECEIVE_MESSAGE:
self.thread_append(self.Command, (operation,))
elif operation.type == OpType.SEND_MESSAGE:
self.thread_append(self.Callback, (operation,))
def main(self):
handle = Yuuki_Poll(self)
handle = YuukiPoll(self)
handle.init()

View file

@ -12,9 +12,9 @@
The software licensed under Mozilla Public License Version 2.0
"""
from libs import Yuuki, Yuuki_Config
from libs import Yuuki, YuukiConfig
config = Yuuki_Config()
config = YuukiConfig()
Console = Yuuki(config)
if __name__ == "__main__":