mirror of
https://github.com/RfidResearchGroup/proxmark3.git
synced 2026-01-02 21:54:07 +08:00
651 lines
23 KiB
Python
651 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Contributed by klks84 (https://github.com/klks)
|
|
# Run with ./pm3 -c "script run read_t-union.py"
|
|
#
|
|
# Script to read China T-Union cards, based on the work of SocialSisterYi
|
|
# See LICENSE.txt for the text of the license.
|
|
|
|
import sys
|
|
from typing import Optional, List, Tuple, Any
|
|
import pm3
|
|
|
|
#References
|
|
# https://github.com/SocialSisterYi/T-Union_Master/blob/main/src/protocol/t_union_poller_i.c
|
|
# https://wiki.nfc.im/books/%E6%99%BA%E8%83%BD%E5%8D%A1%E6%89%8B%E5%86%8C/page/%E4%BA%A4%E9%80%9A%E8%81%94%E5%90%88%E5%8D%A1%EF%BC%88t-union%EF%BC%89
|
|
|
|
# optional color support .. `pip install ansicolors`
|
|
try:
|
|
from colors import color # type: ignore
|
|
except ModuleNotFoundError:
|
|
def color(s, fg=None):
|
|
_ = fg
|
|
return str(s)
|
|
|
|
# Constants
|
|
DEBUG = False
|
|
MAX_CARD_POLL_TRIES = 5
|
|
MAX_TRANSACTION_RECORDS = 10
|
|
MAX_TRAVEL_RECORDS = 30
|
|
FILE_ID_TRANSACTION = 0x18
|
|
FILE_ID_TRAVEL = 0x1E
|
|
AID_PBOC_DEBIT_CREDIT = "A00000000386980701"
|
|
AID_TUNION_TRANSIT = "A000000632010105"
|
|
DDF_PPSE = b"2PAY.SYS.DDF01"
|
|
|
|
class BridgePM3:
|
|
"""Bridge class for communicating with Proxmark3 device."""
|
|
|
|
def __init__(self, hw_debug: bool, pm3: Any = None):
|
|
self._debug = hw_debug
|
|
if pm3 is None:
|
|
raise ValueError("Need a pm3 instance")
|
|
self.pm3 = pm3
|
|
self.recv_buff: Optional[str] = None
|
|
|
|
def recv(self) -> bytes:
|
|
"""Receive data from PM3."""
|
|
if self.recv_buff is None:
|
|
raise ValueError("No data in receive buffer")
|
|
|
|
ret_buff = bytes.fromhex(self.recv_buff)
|
|
|
|
if self._debug:
|
|
if ret_buff[-2:] == b"\x90\x00":
|
|
print(f"[{color('+', fg='green')}] PM3 <= {self.recv_buff}")
|
|
else:
|
|
print(f"[{color('-', fg='red')}] PM3 <= {self.recv_buff}")
|
|
|
|
return ret_buff
|
|
|
|
def hw_reset(self) -> None:
|
|
"""Reset the Proxmark3 hardware."""
|
|
self.pm3.console("hw reset")
|
|
|
|
def send(self, data: bytes, select: bool = False) -> None:
|
|
"""Send APDU command to card via PM3."""
|
|
|
|
exec_cmd = "hf 14a apdu -k"
|
|
|
|
if select:
|
|
exec_cmd += "s" # activate field and select card
|
|
|
|
exec_cmd += "d " # full APDU package
|
|
|
|
# Convert bytearray to string
|
|
exec_cmd += bytearray(data).hex()
|
|
|
|
if self._debug:
|
|
print(f"[{color('+', fg='green')}] PM3 => exec_cmd = {color(exec_cmd, fg='yellow')}")
|
|
|
|
self.pm3.console(exec_cmd)
|
|
self.recv_buff = self.extract_ret(self.pm3.grabbed_output.split('\n'))
|
|
|
|
def extract_ret(self, ret: List[str]) -> Optional[str]:
|
|
"""Extract response data from PM3 output."""
|
|
for line in ret:
|
|
if "<<< " in line:
|
|
parts = line.split(" ")
|
|
if len(parts) > 2:
|
|
return parts[2]
|
|
return None
|
|
|
|
def sendToNfc(self, data: bytes) -> None:
|
|
"""Send data to NFC card, auto-detecting SELECT commands."""
|
|
enable_select = False
|
|
if len(data) > 5 and data[0] == 0x00 and data[1] == 0xA4 and (data[2] == 0x00 or data[2] == 0x04) and data[3] == 0x00:
|
|
enable_select = True
|
|
self.send(data, select=enable_select)
|
|
|
|
def nfcFindCard(self) -> str:
|
|
"""Check if a card is present."""
|
|
self.pm3.console("hf 14a info")
|
|
|
|
for line in self.pm3.grabbed_output.split("\n"):
|
|
if "UID:" in line:
|
|
return line
|
|
return "noCard"
|
|
|
|
def nfcGetRecData(self) -> bytes:
|
|
"""Get received data from PM3."""
|
|
recvdata = self.recv()
|
|
if recvdata is None:
|
|
raise ValueError("Did not receive any data from PM3")
|
|
return recvdata
|
|
|
|
def waitForCard(self, max_tries: int = MAX_CARD_POLL_TRIES) -> bool:
|
|
"""Poll for a card up to max_tries. Return True if found, else False."""
|
|
tries = 0
|
|
while self.nfcFindCard() == 'noCard':
|
|
print('No card detected, retrying...')
|
|
tries += 1
|
|
if tries >= max_tries:
|
|
break
|
|
|
|
if tries >= max_tries:
|
|
print(f"No card found after {max_tries} attempts")
|
|
return False
|
|
return True
|
|
|
|
def assert_success(ret: bytes) -> None:
|
|
"""Assert that a response has SW=0x9000 and abort with message otherwise."""
|
|
if len(ret) < 2:
|
|
raise ValueError("Response too short to contain status word")
|
|
assert ret[-2:] == b"\x90\x00", f"Aborting execution, SW1_SW2 = {bytes_to_hexstr(ret[-2:])}"
|
|
|
|
def parse_return_code(ret_code: Optional[bytes], console_print: bool = True) -> Optional[str]:
|
|
"""Parse ISO 7816-4 status words and return description."""
|
|
if ret_code is None:
|
|
if console_print:
|
|
print("Return code empty")
|
|
return None
|
|
|
|
if len(ret_code) < 2:
|
|
if console_print:
|
|
print(f"Insufficient length of ret_code: {len(ret_code)}")
|
|
return None
|
|
|
|
ret_string = "Unknown return code"
|
|
|
|
match ret_code[0]:
|
|
case 0x62:
|
|
if ret_code[1] >= 2 and ret_code[1] <= 0x80:
|
|
ret_string = "Triggering by the card"
|
|
match ret_code[1]:
|
|
case 0x81:
|
|
ret_string = "Part of returned data may be corrupted"
|
|
case 0x82:
|
|
ret_string = "End of file or record reached before reading Ne bytes"
|
|
case 0x83:
|
|
ret_string = "Selected file deactivated"
|
|
case 0x84:
|
|
ret_string = "File control information not formatted"
|
|
case 0x85:
|
|
ret_string = "Selected file in termination state"
|
|
case 0x86:
|
|
ret_string = "No input data available from a sensor on the card"
|
|
|
|
case 0x63:
|
|
if ret_code[1] == 0x81:
|
|
ret_string = "File filled up by the last write"
|
|
elif (ret_code[1] & 0xF0) == 0xC0:
|
|
ret_string = "Counter from 0 to 15 encoded by 'X'(SW2&0xF)"
|
|
|
|
case 0x64:
|
|
if ret_code[1] >= 2 and ret_code[1] <= 0x80:
|
|
ret_string = "Triggering by the card"
|
|
elif ret_code[1] == 1:
|
|
ret_string = "Immediate response required by the card"
|
|
|
|
case 0x65:
|
|
if ret_code[1] == 0x81:
|
|
ret_string = "Memory failure"
|
|
|
|
case 0x67:
|
|
if ret_code[1] == 0x00:
|
|
ret_string = "Invalid length"
|
|
|
|
case 0x68:
|
|
match ret_code[1]:
|
|
case 0x81:
|
|
ret_string = "Logical channel not supported"
|
|
case 0x82:
|
|
ret_string = "Secure messaging not supported"
|
|
case 0x83:
|
|
ret_string = "Last command of the chain expected"
|
|
case 0x84:
|
|
ret_string = "Command chaining not supported"
|
|
|
|
case 0x69:
|
|
match ret_code[1]:
|
|
case 0x81:
|
|
ret_string = "Command incompatible with file structure"
|
|
case 0x82:
|
|
ret_string = "Security status not satisfied"
|
|
case 0x83:
|
|
ret_string = "Authentication method blocked"
|
|
case 0x84:
|
|
ret_string = "Reference data not usable"
|
|
case 0x85:
|
|
ret_string = "Conditions of use not satisfied"
|
|
case 0x86:
|
|
ret_string = "Command not allowed (no current EF)"
|
|
case 0x87:
|
|
ret_string = "Expected secure messaging data objects missing"
|
|
case 0x88:
|
|
ret_string = "Incorrect secure messaging data objects"
|
|
|
|
case 0x6A:
|
|
match ret_code[1]:
|
|
case 0x80:
|
|
ret_string = "Incorrect parameters in the command data field"
|
|
case 0x81:
|
|
ret_string = "Function not supported"
|
|
case 0x82:
|
|
ret_string = "File or application not found"
|
|
case 0x83:
|
|
ret_string = "Record not found"
|
|
case 0x84:
|
|
ret_string = "Not enough memory space in the file"
|
|
case 0x85:
|
|
ret_string = "Nc inconsistent with TLV structure"
|
|
case 0x86:
|
|
ret_string = "Incorrect parameters P1-P2"
|
|
case 0x87:
|
|
ret_string = "Nc inconsistent with parameters P1-P2"
|
|
case 0x88:
|
|
ret_string = "Referenced data or reference data not found"
|
|
case 0x89:
|
|
ret_string = "File already exists"
|
|
case 0x8A:
|
|
ret_string = "DF name already exists"
|
|
|
|
case 0x6D:
|
|
match ret_code[1]:
|
|
case 0x00:
|
|
ret_string = "Invalid INS parameter"
|
|
|
|
case 0x6E:
|
|
match ret_code[1]:
|
|
case 0x00:
|
|
ret_string = "Invalid CLA parameter"
|
|
|
|
case 0x93:
|
|
match ret_code[1]:
|
|
case 0x02:
|
|
ret_string = "Invalid MAC"
|
|
|
|
case 0x94:
|
|
match ret_code[1]:
|
|
case 0x01:
|
|
ret_string = "The amount is insufficient"
|
|
case 0x03:
|
|
ret_string = "Key indexes are not supported"
|
|
|
|
case 0x90:
|
|
if ret_code[1] == 0:
|
|
ret_string = "Operation Successful"
|
|
|
|
if console_print:
|
|
print(f"[{color('=', fg='yellow')}] SW1_SW2 <= {bytes_to_hexstr(ret_code)} => {ret_string}")
|
|
|
|
return ret_string
|
|
|
|
|
|
def parse_tlv(data: bytes, tag_length: int) -> List[Tuple[bytes, bytes]]:
|
|
"""
|
|
Parse TLV (Tag-Length-Value) structure.
|
|
|
|
Args:
|
|
data: bytes or bytearray containing TLV data
|
|
tag_length: int, number of bytes for the tag field (must be > 0)
|
|
|
|
Returns:
|
|
list of tuples: [(tag, value), (tag, value), ...]
|
|
"""
|
|
if tag_length <= 0:
|
|
raise ValueError(f"Invalid tag_length: {tag_length}, must be > 0")
|
|
|
|
result = []
|
|
offset = 0
|
|
|
|
while offset < len(data):
|
|
# Check if we have enough bytes for tag and length
|
|
if offset + tag_length + 1 > len(data):
|
|
break
|
|
|
|
# Extract tag
|
|
tag = data[offset:offset + tag_length]
|
|
offset += tag_length
|
|
|
|
# Extract length (assuming 1 byte for length)
|
|
length = data[offset]
|
|
offset += 1
|
|
|
|
# Check if we have enough bytes for value
|
|
if offset + length > len(data):
|
|
break
|
|
|
|
# Extract value
|
|
value = data[offset:offset + length]
|
|
offset += length
|
|
|
|
result.append((tag, value))
|
|
|
|
return result
|
|
|
|
# https://github.com/SocialSisterYi/T-Union_Master/blob/857ffec87d67413e759c5e055e6a410a93536b2e/src/protocol/t_union_poller_i.c#L88
|
|
def parse_tunion_meta(level: int, tlv_data: bytes) -> None:
|
|
"""Parse T-Union card metadata from TLV data."""
|
|
if len(tlv_data) < 0x1C:
|
|
raise ValueError(f"TLV data too short: {len(tlv_data)} bytes, expected at least 28")
|
|
|
|
card_type = tlv_data[0]
|
|
city_id = int.from_bytes(tlv_data[1:3], byteorder='big')
|
|
card_number = tlv_data[10:20].hex().upper()
|
|
|
|
issued_year = tlv_data[20:22].hex().upper()
|
|
issued_month = tlv_data[22:23].hex().upper()
|
|
issued_day = tlv_data[23:24].hex().upper()
|
|
|
|
exp_year = tlv_data[24:26].hex().upper()
|
|
exp_month = tlv_data[26:27].hex().upper()
|
|
exp_day = tlv_data[27:28].hex().upper()
|
|
|
|
print(" " * level + f"Card Type = {color(card_type, fg='green')}")
|
|
print(" " * level + f"City ID = {color(city_id, fg='green')}")
|
|
print(" " * level + f"Card Number = {color(card_number, fg='green')}")
|
|
issued_date = f"{issued_year}-{issued_month}-{issued_day}"
|
|
exp_date = f"{exp_year}-{exp_month}-{exp_day}"
|
|
print(" " * level + f"Issued Date = {color(issued_date, fg='green')}")
|
|
print(" " * level + f"Expiration Date = {color(exp_date, fg='green')}")
|
|
|
|
# https://github.com/SocialSisterYi/T-Union_Master/blob/857ffec87d67413e759c5e055e6a410a93536b2e/src/protocol/t_union_poller_i.c#L114
|
|
def decode_transaction(data: bytes) -> None:
|
|
"""Decode and display transaction record."""
|
|
if len(data) < 23:
|
|
raise ValueError(f"Transaction data too short: {len(data)} bytes, expected at least 23")
|
|
|
|
sequence = int.from_bytes(data[0:2], byteorder='big')
|
|
money = int.from_bytes(data[5:9], byteorder='big')
|
|
trans_type = data[9]
|
|
type_str = "Unknown"
|
|
match trans_type:
|
|
case 0x01 | 0x02:
|
|
type_str = "Load"
|
|
case 0x05 | 0x06:
|
|
type_str = "Purchase"
|
|
case 0x09:
|
|
type_str = "CompoundPurchase"
|
|
terminal_id = data[10:16].hex().upper()
|
|
year = data[16:18].hex().upper()
|
|
month = data[18:19].hex().upper()
|
|
day = data[19:20].hex().upper()
|
|
hour = data[20:21].hex().upper()
|
|
minute = data[21:22].hex().upper()
|
|
second = data[22:23].hex().upper()
|
|
print(f"\nSequence: {color(sequence, fg='green')}")
|
|
money_str = f"{money/100:.2f} CNY"
|
|
print(f"Amount: {color(money_str, fg='green')}")
|
|
print(f"Type: {color(trans_type, fg='green')} ({color(type_str, fg='green')})")
|
|
print(f"Terminal ID: {color(terminal_id, fg='green')}")
|
|
transaction_date = f"{year}-{month}-{day} {hour}:{minute}:{second}"
|
|
print(f"Transaction Date: {color(transaction_date, fg='green')}")
|
|
print("")
|
|
|
|
# https://github.com/SocialSisterYi/T-Union_Master/blob/857ffec87d67413e759c5e055e6a410a93536b2e/src/protocol/t_union_poller_i.c#L136
|
|
def decode_travel(data: bytes) -> None:
|
|
"""Decode and display travel record."""
|
|
if len(data) < 42:
|
|
raise ValueError(f"Travel data too short: {len(data)} bytes, expected at least 42")
|
|
|
|
travel_type = data[0]
|
|
terminal_id = data[1:9].hex().upper()
|
|
sub_type = data[9]
|
|
station_id = data[10:17].hex().upper()
|
|
money = int.from_bytes(data[17:21], byteorder='big')
|
|
balance = int.from_bytes(data[21:25], byteorder='big')
|
|
year = data[25:27].hex().upper()
|
|
month = data[27:28].hex().upper()
|
|
day = data[28:29].hex().upper()
|
|
hour = data[29:30].hex().upper()
|
|
minute = data[30:31].hex().upper()
|
|
second = data[31:32].hex().upper()
|
|
city_id = data[32:34].hex().upper()
|
|
institution_id = data[34:42].hex().upper()
|
|
|
|
print(f"\nType: {color(travel_type, fg='green')}")
|
|
print(f"Terminal ID: {color(terminal_id, fg='green')}")
|
|
print(f"Sub Type: {color(sub_type, fg='green')}")
|
|
print(f"Station ID: {color(station_id, fg='green')}")
|
|
travel_cost_str = f"{money/100:.2f} CNY"
|
|
card_balance_str = f"{balance/100:.2f} CNY"
|
|
print(f"Travel Cost: {color(travel_cost_str, fg='green')}")
|
|
print(f"Card Balance: {color(card_balance_str, fg='green')}")
|
|
travel_date = f"{year}-{month}-{day} {hour}:{minute}:{second}"
|
|
print(f"Transaction Date: {color(travel_date, fg='green')}")
|
|
print(f"City ID: {color(city_id, fg='green')}")
|
|
print(f"Institution ID: {color(institution_id, fg='green')}")
|
|
print("")
|
|
|
|
# TLV data decoded with https://emvlab.org/tlvutils/
|
|
def decode_tlv(level: int, tlv_data: bytes, tag_length: int) -> None:
|
|
"""Recursively decode and print TLV structure."""
|
|
parsed_tlv = parse_tlv(tlv_data, tag_length)
|
|
|
|
for p_tag, p_value in parsed_tlv:
|
|
match p_tag:
|
|
case b"\x6f":
|
|
print("6F File Control Information (FCI) Template")
|
|
decode_tlv(level+1, p_value, 1)
|
|
case b"\x84":
|
|
try:
|
|
df_name = p_value.decode('utf-8')
|
|
except (UnicodeDecodeError, AttributeError):
|
|
df_name = p_value.hex().upper()
|
|
print(" " * level + f"84 Dedicated File (DF) Name = {color(df_name, fg='green')}")
|
|
case b"\xA5":
|
|
print(" " * level + "A5 File Control Information (FCI) Proprietary Template")
|
|
decode_tlv(level+1, p_value, 2)
|
|
case b"\xbf\x0c":
|
|
print(" " * level + "BF0C File Control Information (FCI) Issuer Discretionary Data")
|
|
decode_tlv(level+1, p_value, 1)
|
|
case b"\x61":
|
|
print(" " * level + "61 Application Template")
|
|
decode_tlv(level+1, p_value, 1)
|
|
case b"\x4f":
|
|
aid = p_value.hex().upper()
|
|
print(" " * level + f"4F Application Identifier (AID) = {color(aid, fg='green')}")
|
|
case b"\x50":
|
|
app_label = p_value.decode('utf-8', errors='ignore')
|
|
print(" " * level + f"50 Application Label = {color(app_label, fg='green')}")
|
|
case b"\x87":
|
|
app_priority = p_value.hex().upper()
|
|
print(" " * level + f"87 Application Priority Indicator = {app_priority}")
|
|
case b"\x9f\x08":
|
|
app_ver = p_value.hex().upper()
|
|
print(" " * level + f"9F08 Application Version Number = {app_ver}")
|
|
case b"\x9f\x0c":
|
|
print(" " * level + "9F0C File Control Information (FCI) Issuer Discretionary Data")
|
|
parse_tunion_meta(level+1, p_value)
|
|
case _:
|
|
print(f"Unable to parse tag {level=}, {p_tag=}, {p_value=}")
|
|
|
|
def process_tlv(tlv_data: bytes) -> None:
|
|
"""Process TLV data after checking status word."""
|
|
if DEBUG:
|
|
print(f"[{color('+', fg='green')}] Calling: {sys._getframe(0).f_code.co_name}")
|
|
|
|
if len(tlv_data) < 2:
|
|
raise ValueError("TLV data too short")
|
|
|
|
SW1_SW2 = tlv_data[-2:]
|
|
answer = tlv_data[:-2]
|
|
|
|
if SW1_SW2 == b"\x90\x00":
|
|
decode_tlv(0, answer, 1)
|
|
else:
|
|
raise ValueError(f"Failed to parse TLV, SW={bytes_to_hexstr(SW1_SW2)}")
|
|
|
|
def bytes_to_hexstr(inp: bytes) -> str:
|
|
"""Convert bytes-like to space-separated hex, e.g., b"\x01\x02" -> "01 02"."""
|
|
return ' '.join(format(ch, "02X") for ch in inp)
|
|
|
|
def strToint16(hex_str: str) -> List[int]:
|
|
"""Parse hex string into list of 1-byte ints grouped by two chars per byte.
|
|
|
|
Example: "3F00" -> [0x3F, 0x00]
|
|
"""
|
|
if len(hex_str) % 2 != 0:
|
|
raise ValueError(f"Hex string must have even length, got {len(hex_str)}")
|
|
|
|
return [int(hex_str[i:i+2], 16) for i in range(0, len(hex_str), 2)]
|
|
|
|
def GetRecData(pm3_conn: BridgePM3) -> bytes:
|
|
"""Get received data from PM3 connection."""
|
|
nfcdata = pm3_conn.nfcGetRecData()
|
|
|
|
if DEBUG:
|
|
print(f"[{color('=', fg='yellow')}] RECV <= " + bytes_to_hexstr(nfcdata))
|
|
|
|
parse_return_code(nfcdata[-2:], DEBUG)
|
|
return nfcdata
|
|
|
|
def sendCommand(pm3_conn: BridgePM3, cla: int, ins: int, p1: int, p2: int,
|
|
Data: Optional[bytes] = None, le: Optional[int] = None) -> bytes:
|
|
"""Send APDU command and receive response."""
|
|
context = [cla, ins, p1, p2]
|
|
if Data is not None:
|
|
lc = len(Data)
|
|
context = context + [lc] + list(Data)
|
|
else:
|
|
lc = None
|
|
|
|
if le is not None:
|
|
context = context + [le]
|
|
|
|
if lc is None and le is None:
|
|
context = context + [0x00]
|
|
|
|
if DEBUG:
|
|
print(f"[{color('=', fg='yellow')}] SEND => {bytes_to_hexstr(bytes(context))}")
|
|
|
|
pm3_conn.sendToNfc(bytes(context))
|
|
recdata = GetRecData(pm3_conn)
|
|
return recdata
|
|
|
|
def cmd_select(pm3_conn: BridgePM3, fileID: Optional[str] = None, name: Optional[bytes] = None) -> bytes:
|
|
"""Send SELECT command to card."""
|
|
if DEBUG:
|
|
print(f"[{color('+', fg='green')}] Calling: {sys._getframe(0).f_code.co_name}")
|
|
|
|
if fileID is None and name is None:
|
|
raise ValueError("fileID or name cannot be empty")
|
|
|
|
cla = 0x00
|
|
ins = 0xA4
|
|
p2 = 0x00
|
|
|
|
if name:
|
|
p1 = 0x04
|
|
ret = sendCommand(pm3_conn, cla=cla, ins=ins, p1=p1, p2=p2, Data=name)
|
|
else:
|
|
p1 = 0x00
|
|
fileIDlist = bytes(strToint16(fileID))
|
|
ret = sendCommand(pm3_conn, cla=cla, ins=ins, p1=p1, p2=p2, Data=fileIDlist, le=0x00)
|
|
|
|
if DEBUG:
|
|
print(f"[{color('=', fg='yellow')}] SELECT => {bytes_to_hexstr(ret)}\n")
|
|
|
|
if ret[-2:] == b"\x90\x00":
|
|
process_tlv(ret)
|
|
|
|
return ret
|
|
|
|
def cmd_get_balance(pm3_conn: BridgePM3) -> bytes:
|
|
"""Get card balance."""
|
|
if DEBUG:
|
|
print(f"[{color('+', fg='green')}] Calling: {sys._getframe(0).f_code.co_name}")
|
|
|
|
cla = 0x80
|
|
ins = 0x5C
|
|
p1 = 0x00
|
|
p2 = 0x02
|
|
|
|
ret = sendCommand(pm3_conn, cla=cla, ins=ins, p1=p1, p2=p2, le=4)
|
|
if DEBUG:
|
|
print(f"[{color('=', fg='yellow')}] GET_BALANCE => {bytes_to_hexstr(ret)}\n")
|
|
return ret
|
|
|
|
def cmd_read_record(pm3_conn: BridgePM3, record_number: int, file_id: int) -> bytes:
|
|
"""Read a record from a file on the card."""
|
|
if DEBUG:
|
|
print(f"[{color('+', fg='green')}] Calling: {sys._getframe(0).f_code.co_name}")
|
|
|
|
cla = 0x00
|
|
ins = 0xB2
|
|
p1 = record_number
|
|
p2 = ((file_id & 0x1F) << 3) | 4
|
|
|
|
ret = sendCommand(pm3_conn, cla, ins, p1, p2, Data=None, le=0)
|
|
assert ret[-2:] == b"\x90\x00", f"Card did not return success"
|
|
|
|
if DEBUG:
|
|
print(f"[{color('=', fg='yellow')}] READ_RECORD => {bytes_to_hexstr(ret)}\n")
|
|
return ret
|
|
|
|
def process_tunion_transit_card(pm3_conn: BridgePM3) -> None:
|
|
"""Process T-Union transit card - read balance, transactions, and travel records."""
|
|
print("\nReading Balance...")
|
|
ret = cmd_get_balance(pm3_conn)
|
|
assert_success(ret)
|
|
balance = int.from_bytes(ret[0:4], byteorder='big')
|
|
balance_str = f"{balance/100:.2f} CNY"
|
|
print(f"Balance = {color(balance_str, fg='green')}")
|
|
|
|
print("\nReading Transactions...")
|
|
for i in range(MAX_TRANSACTION_RECORDS):
|
|
print(f"Reading Transaction Record {i+1}...", end="")
|
|
try:
|
|
ret = cmd_read_record(pm3_conn, record_number=i+1, file_id=FILE_ID_TRANSACTION)
|
|
assert_success(ret)
|
|
if all(b == 0x00 for b in ret[0:-2]):
|
|
print(" Empty")
|
|
else:
|
|
decode_transaction(ret[0:-2])
|
|
except (AssertionError, ValueError) as e:
|
|
print(f" Error: {e}")
|
|
break
|
|
|
|
print("\nReading Travel Records...")
|
|
for i in range(MAX_TRAVEL_RECORDS):
|
|
print(f"Reading Travel Record {i+1}...", end="")
|
|
try:
|
|
ret = cmd_read_record(pm3_conn, record_number=i+1, file_id=FILE_ID_TRAVEL)
|
|
assert_success(ret)
|
|
if all(b == 0x00 for b in ret[0:-2]):
|
|
print(" Empty")
|
|
else:
|
|
decode_travel(ret[0:-2])
|
|
except (AssertionError, ValueError) as e:
|
|
print(f" Error: {e}")
|
|
break
|
|
|
|
def main() -> None:
|
|
"""Main entry point for T-Union card reader."""
|
|
try:
|
|
p = pm3.pm3()
|
|
pm3_conn = BridgePM3(hw_debug=DEBUG, pm3=p)
|
|
|
|
if not pm3_conn.waitForCard():
|
|
raise ValueError("Unable to find card...")
|
|
|
|
print("\nSelecting DDF...")
|
|
ret = cmd_select(pm3_conn, name=DDF_PPSE)
|
|
assert_success(ret)
|
|
|
|
for aidl in [AID_PBOC_DEBIT_CREDIT, AID_TUNION_TRANSIT]:
|
|
print(f"\nSelecting AID: {aidl}")
|
|
ret = cmd_select(pm3_conn, name=bytes.fromhex(aidl))
|
|
assert_success(ret)
|
|
|
|
match aidl:
|
|
case "A00000000386980701": # PBOC Debit/Credit
|
|
print("PBOC Debit/Credit card detected - implementation pending")
|
|
# TODO: Implement PBOC card processing
|
|
case "A000000632010105": # T-Union Transit
|
|
process_tunion_transit_card(pm3_conn)
|
|
case _:
|
|
print(f"Unknown AID: {aidl}, please report!")
|
|
|
|
except Exception as e:
|
|
print(f"\nError: {e}")
|
|
raise
|
|
finally:
|
|
# Reset device to known good state
|
|
if 'pm3_conn' in locals():
|
|
pm3_conn.hw_reset()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|