| 3 |
好的,我们开始。以下是为您生成的详细项目开发任务描述。 |
completed |
# TG USDT 地址监控机器人 - 完整实现
## 项目结构
```
tg_usdt_monitor/
├── .env
├── requirements.txt
├── main.py
├── bot/
│ ├── __init__.py
│ ├── handlers.py
│ ├── monitor.py
│ └── keyboards.py
├── db/
│ ├── __init__.py
│ └── database.py
├── services/
│ ├── __init__.py
│ ├── blockchain.py
│ └── validator.py
└── utils/
├── __init__.py
└── helpers.py
```
---
## 配置文件
### `.env`
```env
# Telegram Bot Token (从 @BotFather 获取)
BOT_TOKEN=your_telegram_bot_token_here
# TronGrid API Key (https://www.trongrid.io/)
TRONGRID_API_KEY=your_trongrid_api_key_here
# BSCScan API Key (https://bscscan.com/apis)
BSCSCAN_API_KEY=your_bscscan_api_key_here
# 监控轮询间隔(秒)
MONITOR_INTERVAL=60
# 每次查询获取的最大交易数
MAX_TRANSACTIONS=50
```
### `requirements.txt`
```txt
python-telegram-bot==20.7
httpx==0.26.0
python-dotenv==1.0.0
aiosqlite==0.19.0
```
---
## 数据库层
### `db/database.py`
```python
import aiosqlite
import logging
from datetime import datetime
from typing import Optional
logger = logging.getLogger(__name__)
DB_PATH = "monitor.db"
async def init_db():
"""初始化数据库,创建所需表"""
async with aiosqlite.connect(DB_PATH) as db:
# 监控列表表:存储 chat_id 与 address 的对应关系
await db.execute("""
CREATE TABLE IF NOT EXISTS monitor_list (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
address TEXT NOT NULL,
chain TEXT NOT NULL, -- 'TRC20' 或 'BEP20'
label TEXT, -- 用户自定义标签
added_at TEXT NOT NULL,
UNIQUE(chat_id, address)
)
""")
# 地址状态表:记录每个地址最后一次检查的交易哈希
await db.execute("""
CREATE TABLE IF NOT EXISTS address_state (
address TEXT PRIMARY KEY,
chain TEXT NOT NULL,
last_tx_hash TEXT, -- 最新的交易哈希
last_checked_at TEXT,
total_received REAL DEFAULT 0
)
""")
# 交易记录表:避免重复推送
await db.execute("""
CREATE TABLE IF NOT EXISTS sent_transactions (
tx_hash TEXT PRIMARY KEY,
address TEXT NOT NULL,
pushed_at TEXT NOT NULL
)
""")
await db.commit()
logger.info("数据库初始化完成")
async def add_monitor(chat_id: int, address: str, chain: str, label: Optional[str] = None) -> bool:
"""
添加监控地址
Returns: True 表示新增成功, False 表示已存在
"""
async with aiosqlite.connect(DB_PATH) as db:
try:
await db.execute(
"""
INSERT INTO monitor_list (chat_id, address, chain, label, added_at)
VALUES (?, ?, ?, ?, ?)
""",
(chat_id, address, chain, label, datetime.utcnow().isoformat())
)
await db.commit()
# 若地址状态表中不存在该地址,则初始化
await db.execute(
"""
INSERT OR IGNORE INTO address_state (address, chain, last_checked_at)
VALUES (?, ?, ?)
""",
(address, chain, datetime.utcnow().isoformat())
)
await db.commit()
return True
except aiosqlite.IntegrityError:
return False # 已存在该监控关系
async def remove_monitor(chat_id: int, address: str) -> bool:
"""
移除监控地址
Returns: True 表示删除成功, False 表示记录不存在
"""
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute(
"DELETE FROM monitor_list WHERE chat_id = ? AND address = ?",
(chat_id, address)
)
await db.commit()
return cursor.rowcount > 0
async def list_monitors(chat_id: int) -> list[dict]:
"""获取某个 chat_id 的所有监控地址"""
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"""
SELECT ml.address, ml.chain, ml.label, ml.added_at,
ads.last_tx_hash, ads.last_checked_at, ads.total_received
FROM monitor_list ml
LEFT JOIN address_state ads ON ml.address = ads.address
WHERE ml.chat_id = ?
ORDER BY ml.added_at DESC
""",
(chat_id,)
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_all_monitored_addresses() -> list[dict]:
"""获取所有被监控的唯一地址(供监控任务使用)"""
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"""
SELECT DISTINCT address, chain
FROM monitor_list
"""
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_subscribers(address: str) -> list[int]:
"""获取订阅了某个地址的所有 chat_id"""
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute(
"SELECT chat_id FROM monitor_list WHERE address = ?",
(address,)
)
rows = await cursor.fetchall()
return [row[0] for row in rows]
async def get_address_state(address: str) -> Optional[dict]:
"""获取地址状态"""
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM address_state WHERE address = ?",
(address,)
)
row = await cursor.fetchone()
return dict(row) if row else None
async def update_address_state(address: str, last_tx_hash: str, total_received: float):
"""更新地址状态"""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"""
UPDATE address_state
SET last_tx_hash = ?, last_checked_at = ?, total_received = ?
WHERE address = ?
""",
(last_tx_hash, datetime.utcnow().isoformat(), total_received, address)
)
await db.commit()
async def is_tx_sent(tx_hash: str) -> bool:
"""检查某交易是否已推送过"""
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute(
"SELECT 1 FROM sent_transactions WHERE tx_hash = ?",
(tx_hash,)
)
return await cursor.fetchone() is not None
async def mark_tx_sent(tx_hash: str, address: str):
"""标记交易已推送"""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"""
INSERT OR IGNORE INTO sent_transactions (tx_hash, address, pushed_at)
VALUES (?, ?, ?)
""",
(tx_hash, address, datetime.utcnow().isoformat())
)
await db.commit()
async def count_monitors(chat_id: int) -> int:
"""统计某 chat_id 的监控数量"""
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute(
"SELECT COUNT(*) FROM monitor_list WHERE chat_id = ?",
(chat_id,)
)
row = await cursor.fetchone()
return row[0]
```
---
## 地址验证服务
### `services/validator.py`
```python
import re
from typing import Tuple
# TRC20 地址:T 开头,34位 Base58
TRC20_PATTERN = re.compile(r'^T[A-Za-z1-9]{33}$')
# BEP20 地址:0x 开头,40位十六进制
BEP20_PATTERN = re.compile(r'^0x[0-9a-fA-F]{40}$')
# 单个用户最大监控数量
MAX_MONITORS_PER_USER = 10
def validate_address(address: str) -> Tuple[bool, str]:
"""
验证地址格式
Returns: (is_valid, chain_type)
chain_type: 'TRC20' | 'BEP20' | ''
"""
address = address.strip()
if TRC20_PATTERN.match(address):
return True, "TRC20"
if BEP20_PATTERN.match(address):
return True, "BEP20"
return False, ""
def format_address(address: str) -> str:
"""格式化地址显示(中间省略)"""
if len(address) <= 12:
return address
return f"{address[:6]}...{address[-6:]}"
def format_usdt_amount(amount: float) -> str:
"""格式化 USDT 金额"""
if amount >= 1_000_000:
return f"{amount / 1_000_000:.2f}M USDT"
elif amount >= 1_000:
return f"{amount / 1_000:.2f}K USDT"
else:
return f"{amount:.2f} USDT"
```
---
## 区块链查询服务
### `services/blockchain.py`
```python
import httpx
import logging
import os
from typing import Optional
logger = logging.getLogger(__name__)
TRONGRID_API_KEY = os.getenv("TRONGRID_API_KEY", "")
BSCSCAN_API_KEY = os.getenv("BSCSCAN_API_KEY", "")
# USDT 合约地址
USDT_TRC20_CONTRACT = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
USDT_BEP20_CONTRACT = "0x55d398326f99059ff775485246999027b3197955"
# USDT 精度
TRC20_DECIMALS = 1_000_000 # 6位小数
BEP20_DECIMALS = 10**18 # 18位小数
async def fetch_trc20_transactions(address: str, limit: int = 50) -> list[dict]:
"""
从 TronGrid 获取 TRC20 USDT 转入交易
"""
url = f"https://api.trongrid.io/v1/accounts/{address}/transactions/trc20"
params = {
"limit": limit,
"contract_address": USDT_TRC20_CONTRACT,
"only_to": "true", # 只获取转入
"order_by": "block_timestamp,desc"
}
headers = {
"TRON-PRO-API-KEY": TRONGRID_API_KEY,
"Accept": "application/json"
}
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(url, params=params, headers=headers)
resp.raise_for_status()
data = resp.json()
transactions = []
for tx in data.get("data", []):
# 过滤确认的 USDT 转入交易
if (
tx.get("to") == address
and tx.get("token_info", {}).get("address") == USDT_TRC20_CONTRACT
):
amount_raw = int(tx.get("value", 0))
amount_usdt = amount_raw / TRC20_DECIMALS
transactions.append({
"tx_hash": tx.get("transaction_id", ""),
"from_address": tx.get("from", ""),
"to_address": tx.get("to", ""),
"amount": amount_usdt,
"timestamp": tx.get("block_timestamp", 0) // 1000,
"chain": "TRC20",
"explorer_url": f"https://tronscan.org/#/transaction/{tx.get('transaction_id', '')}"
})
return transactions
except httpx.HTTPStatusError as e:
logger.error(f"TronGrid HTTP 错误 {e.response.status_code}: {address}")
return []
except httpx.RequestError as e:
logger.error(f"TronGrid 请求错误: {e}")
return []
except Exception as e:
logger.error(f"TronGrid 未知错误: {e}")
return []
async def fetch_bep20_transactions(address: str, limit: int = 50) -> list[dict]:
"""
从 BSCScan 获取 BEP20 USDT 转入交易
"""
url = "https://api.bscscan.com/api"
params = {
"module": "account",
"action": "tokentx",
"contractaddress": USDT_BEP20_CONTRACT,
"address": address,
"page": 1,
"offset": limit,
"sort": "desc",
"apikey": BSCSCAN_API_KEY
}
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
if data.get("status") != "1":
if data.get("message") == "No transactions found":
return []
logger.warning(f"BSCScan API 返回: {data.get('message')} for {address}")
return []
transactions = []
for tx in data.get("result", []):
# 只取转入交易
if tx.get("to", "").lower() != address.lower():
continue
amount_raw = int(tx.get("value", 0))
amount_usdt = amount_raw / BEP20_DECIMALS
tx_hash = tx.get("hash", "")
transactions.append({
"tx_hash": tx_hash,
"from_address": tx.get("from", ""),
"to_address": tx.get("to", ""),
"amount": amount_usdt,
"
|
2026-05-10 13:49:22
|