#!/usr/bin/python3 # coding=UTF-8 from .core.TalkService import Client from thrift.transport import THttpClient from thrift.protocol import TCompactProtocol """ NC HightSpeed Lib """ try: from thrift.protocol import fastbinary except: fastbinary = None ########################################## class Yuuki_Connection: """ Connection Dict Type """ connectInfo = { "Host": "", "Command_Path": "", "LongPoll_path": "" } connectHeader = { "X-Line-Application": "", "X-Line-Access": "", "User-Agent": "" } class Yuuki_Connect: def __init__(self, Yuuki_Connection): self.host = Yuuki_Connection.connectInfo["Host"] self.com_path = Yuuki_Connection.connectInfo["Command_Path"] self.poll_path = Yuuki_Connection.connectInfo["LongPoll_path"] self.con_header = Yuuki_Connection.connectHeader self.helper = [] self.helper_ids = [] def connect(self): transport = THttpClient.THttpClient(self.host + self.com_path) transport_in = THttpClient.THttpClient(self.host + self.poll_path) transport.setCustomHeaders(self.con_header) transport_in.setCustomHeaders(self.con_header) protocol = TCompactProtocol.TCompactProtocol(transport) protocol_in = TCompactProtocol.TCompactProtocol(transport_in) client = Client(protocol) listen = Client(protocol_in) transport.open() transport_in.open() return client, listen def helperConnect(self, LINE_ACCESS_KEY): helper_ConnectHeader = self.con_header.copy() helper_ConnectHeader["X-Line-Access"] = LINE_ACCESS_KEY transport = THttpClient.THttpClient(self.host + self.com_path) transport.setCustomHeaders(helper_ConnectHeader) protocol = TCompactProtocol.TCompactProtocol(transport) client = Client(protocol) transport.open() try: profile = client.getProfile() self.helper.append(client) self.helper_ids.append(profile.mid) return True except: print("Error:\n%s\nNot Acceptable\n" % (LINE_ACCESS_KEY,))