mirror of
https://github.com/EDeev/dorm_alarm.git
synced 2026-06-15 11:01:09 +03:00
v. 1.0
This commit is contained in:
parent
0ab108c2b2
commit
37c468be08
4 changed files with 481 additions and 0 deletions
85
bot.py
Normal file
85
bot.py
Normal file
|
|
@ -0,0 +1,85 @@
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from init import bot, dp
|
||||||
|
from handlers import router, initialize_existing_users
|
||||||
|
|
||||||
|
# Конфигурация системы логирования для production среды
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||||
|
handlers=[
|
||||||
|
logging.StreamHandler(sys.stdout),
|
||||||
|
logging.FileHandler('bot.log', encoding='utf-8')
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
async def graceful_shutdown():
|
||||||
|
"""Корректное завершение работы с очисткой ресурсов"""
|
||||||
|
logger.info("Initiating graceful shutdown...")
|
||||||
|
|
||||||
|
# Остановка всех активных задач уведомлений
|
||||||
|
from handlers import notification_tasks
|
||||||
|
for user_id, task in notification_tasks.items():
|
||||||
|
task.cancel()
|
||||||
|
logger.info(f"Cancelled notification task for user {user_id}")
|
||||||
|
|
||||||
|
# Очистка webhook'ов и закрытие соединений
|
||||||
|
await bot.delete_webhook(drop_pending_updates=True)
|
||||||
|
await bot.session.close()
|
||||||
|
|
||||||
|
logger.info("Graceful shutdown completed")
|
||||||
|
|
||||||
|
async def main() -> None:
|
||||||
|
"""Главная точка входа приложения с полной инициализацией системы"""
|
||||||
|
try:
|
||||||
|
logger.info("Starting notification monitoring bot...")
|
||||||
|
|
||||||
|
# Регистрация обработчиков сигналов для graceful shutdown
|
||||||
|
def signal_handler():
|
||||||
|
logger.info("Received shutdown signal")
|
||||||
|
# Создание task для корректного завершения в event loop
|
||||||
|
asyncio.create_task(graceful_shutdown())
|
||||||
|
|
||||||
|
# Подключение обработчиков команд и callback'ов
|
||||||
|
dp.include_router(router)
|
||||||
|
|
||||||
|
# Очистка устаревших webhook'ов
|
||||||
|
await bot.delete_webhook(drop_pending_updates=True)
|
||||||
|
|
||||||
|
# Инициализация уведомлений для существующих пользователей
|
||||||
|
await initialize_existing_users()
|
||||||
|
|
||||||
|
logger.info("Bot initialization completed successfully")
|
||||||
|
logger.info("Starting polling mode...")
|
||||||
|
|
||||||
|
# Запуск long polling с оптимизированными параметрами
|
||||||
|
await dp.start_polling(
|
||||||
|
bot,
|
||||||
|
allowed_updates=dp.resolve_used_update_types(),
|
||||||
|
timeout=20, # Оптимизированный timeout для стабильности
|
||||||
|
relax=0.1 # Минимальная пауза между запросами
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"Critical error during bot startup: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
# Установка политики event loop для Windows compatibility
|
||||||
|
if sys.platform.startswith('win'):
|
||||||
|
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
||||||
|
|
||||||
|
# Запуск основного процесса
|
||||||
|
asyncio.run(main())
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("Bot stopped by user (KeyboardInterrupt)")
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"Unhandled exception: {e}")
|
||||||
|
sys.exit(1)
|
||||||
256
handlers.py
Normal file
256
handlers.py
Normal file
|
|
@ -0,0 +1,256 @@
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
from aiogram import F, Router
|
||||||
|
from aiogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup, CallbackQuery
|
||||||
|
from aiogram.filters import Command
|
||||||
|
|
||||||
|
from sql import DatabaseManager
|
||||||
|
from init import bot
|
||||||
|
|
||||||
|
router = Router()
|
||||||
|
db = DatabaseManager()
|
||||||
|
|
||||||
|
# Глобальный реестр активных задач уведомлений для управления жизненным циклом
|
||||||
|
notification_tasks: Dict[int, asyncio.Task] = {}
|
||||||
|
|
||||||
|
|
||||||
|
@router.message(Command("start"))
|
||||||
|
async def start_handler(msg: Message) -> None:
|
||||||
|
"""Регистрация пользователя и инициализация уведомлений"""
|
||||||
|
user_id = msg.from_user.id
|
||||||
|
username = msg.from_user.username or msg.from_user.first_name
|
||||||
|
|
||||||
|
if db.add_user(user_id, username):
|
||||||
|
keyboard = create_main_keyboard()
|
||||||
|
await msg.answer(
|
||||||
|
f"<b>🔔 Система мониторинга активирована!</b>\n\n"
|
||||||
|
f"👤 Пользователь: <code>{username}</code>\n"
|
||||||
|
f"🆔 ID: <code>{user_id}</code>\n\n"
|
||||||
|
f"⏰ Интервал по умолчанию: <b>60 минут</b>\n"
|
||||||
|
f"📊 Статус уведомлений: <b>Включены</b>\n\n"
|
||||||
|
f"<i>Используйте кнопки ниже для настройки параметров</i>",
|
||||||
|
reply_markup=keyboard
|
||||||
|
)
|
||||||
|
|
||||||
|
# Запуск персональной задачи уведомлений
|
||||||
|
start_user_notifications(user_id)
|
||||||
|
else:
|
||||||
|
await msg.answer("❌ Ошибка инициализации системы")
|
||||||
|
|
||||||
|
@router.message(Command("status"))
|
||||||
|
async def status_handler(msg: Message) -> None:
|
||||||
|
"""Отображение текущего статуса пользователя"""
|
||||||
|
user_info = db.get_user_info(msg.from_user.id)
|
||||||
|
if not user_info:
|
||||||
|
await msg.answer("❌ Пользователь не найден. Используйте /start для регистрации")
|
||||||
|
return
|
||||||
|
|
||||||
|
user_id, username, interval, is_active = user_info
|
||||||
|
status_emoji = "🟢" if is_active else "🔴"
|
||||||
|
task_status = "Включены" if user_id in notification_tasks else "Остановлены"
|
||||||
|
|
||||||
|
keyboard = create_main_keyboard()
|
||||||
|
await msg.answer(
|
||||||
|
f"<b>📊 Статус системы мониторинга</b>\n\n"
|
||||||
|
f"Пользователь: <code>{username}</code>\n"
|
||||||
|
f"ID: <code>{user_id}</code>\n\n"
|
||||||
|
f"⏰ Интервал: <b>{interval} мин</b>\n"
|
||||||
|
f"🔄 Статус уведомлений: {status_emoji} <b>{task_status}</b>\n\n"
|
||||||
|
f"👥 Всего пользователей: <b>{db.get_user_count()}</b>",
|
||||||
|
reply_markup=keyboard
|
||||||
|
)
|
||||||
|
|
||||||
|
@router.message(Command("help"))
|
||||||
|
async def help_handler(msg: Message) -> None:
|
||||||
|
"""Справочная информация по командам"""
|
||||||
|
help_text = (
|
||||||
|
"<b>Бот мониторинга работоспособности</b>\n\n"
|
||||||
|
"<b>Доступные команды:</b>\n"
|
||||||
|
"/start - Регистрация и активация\n"
|
||||||
|
"/status - Текущий статус сервера\n"
|
||||||
|
"/help - Справочная информация\n\n"
|
||||||
|
"<b>Функциональность:</b>\n"
|
||||||
|
"• Периодические уведомления о работе\n"
|
||||||
|
"• Настройка интервала (5-1440 мин)\n"
|
||||||
|
"• Включение/отключение уведомлений"
|
||||||
|
)
|
||||||
|
await msg.answer(help_text)
|
||||||
|
|
||||||
|
# === CALLBACK HANDLERS ===
|
||||||
|
|
||||||
|
@router.callback_query(F.data == "toggle_notifications")
|
||||||
|
async def toggle_notifications_handler(call: CallbackQuery) -> None:
|
||||||
|
"""Переключение активности уведомлений"""
|
||||||
|
user_id = call.from_user.id
|
||||||
|
new_status = db.toggle_notifications(user_id)
|
||||||
|
|
||||||
|
if new_status is None:
|
||||||
|
await call.answer("❌ Ошибка обновления настроек")
|
||||||
|
return
|
||||||
|
|
||||||
|
if new_status:
|
||||||
|
start_user_notifications(user_id)
|
||||||
|
await call.answer("✅ Уведомления включены")
|
||||||
|
else:
|
||||||
|
stop_user_notifications(user_id)
|
||||||
|
await call.answer("⏸️ Уведомления отключены")
|
||||||
|
|
||||||
|
# Обновление интерфейса с новыми данными
|
||||||
|
await update_status_message(call)
|
||||||
|
|
||||||
|
@router.callback_query(F.data.startswith("set_interval_"))
|
||||||
|
async def set_interval_handler(call: CallbackQuery) -> None:
|
||||||
|
"""Установка нового интервала уведомлений"""
|
||||||
|
interval = int(call.data.split("_")[2])
|
||||||
|
user_id = call.from_user.id
|
||||||
|
|
||||||
|
if db.update_interval(user_id, interval):
|
||||||
|
# Перезапуск задачи с новым интервалом
|
||||||
|
restart_user_notifications(user_id)
|
||||||
|
await call.answer(f"⏰ Интервал установлен: {interval} мин")
|
||||||
|
await update_status_message(call)
|
||||||
|
else:
|
||||||
|
await call.answer("❌ Ошибка обновления интервала")
|
||||||
|
|
||||||
|
@router.callback_query(F.data == "interval_menu")
|
||||||
|
async def interval_menu_handler(call: CallbackQuery) -> None:
|
||||||
|
"""Отображение меню выбора интервала"""
|
||||||
|
keyboard = create_interval_keyboard()
|
||||||
|
await call.message.edit_reply_markup(reply_markup=keyboard)
|
||||||
|
|
||||||
|
@router.callback_query(F.data == "main_menu")
|
||||||
|
async def main_menu_handler(call: CallbackQuery) -> None:
|
||||||
|
"""Возврат к главному меню"""
|
||||||
|
keyboard = create_main_keyboard()
|
||||||
|
await call.message.edit_reply_markup(reply_markup=keyboard)
|
||||||
|
|
||||||
|
# === NOTIFICATION SYSTEM ===
|
||||||
|
|
||||||
|
async def send_notification(user_id: int) -> bool:
|
||||||
|
"""Отправка уведомления конкретному пользователю"""
|
||||||
|
try:
|
||||||
|
current_time = datetime.now().strftime("%H:%M:%S")
|
||||||
|
message = (
|
||||||
|
f"<b>🟢 Сервер работает</b>\n\n"
|
||||||
|
f"Время проверки: <code>{current_time}</code>\n"
|
||||||
|
f"Статус уведомлений: <b>Включены</b>"
|
||||||
|
)
|
||||||
|
|
||||||
|
await bot.send_message(user_id, message)
|
||||||
|
db.log_notification(user_id, "sent")
|
||||||
|
logging.info(f"Notification sent to user {user_id}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to send notification to {user_id}: {e}")
|
||||||
|
db.log_notification(user_id, "failed")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def notification_task(user_id: int, interval_minutes: int):
|
||||||
|
"""Асинхронная задача периодических уведомлений"""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(interval_minutes * 60)
|
||||||
|
# Проверка активности пользователя перед отправкой
|
||||||
|
user_info = db.get_user_info(user_id)
|
||||||
|
if user_info and user_info[3]: # is_active
|
||||||
|
await send_notification(user_id)
|
||||||
|
else:
|
||||||
|
logging.info(f"User {user_id} notifications disabled, stopping task")
|
||||||
|
break
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logging.info(f"Notification task for user {user_id} cancelled")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error in notification task for user {user_id}: {e}")
|
||||||
|
await asyncio.sleep(60) # Retry delay при ошибках
|
||||||
|
|
||||||
|
def start_user_notifications(user_id: int):
|
||||||
|
"""Запуск задачи уведомлений для пользователя"""
|
||||||
|
stop_user_notifications(user_id) # Остановка существующей задачи
|
||||||
|
|
||||||
|
user_info = db.get_user_info(user_id)
|
||||||
|
if user_info and user_info[3]: # is_active
|
||||||
|
interval = user_info[2] # interval_minutes
|
||||||
|
task = asyncio.create_task(notification_task(user_id, interval))
|
||||||
|
notification_tasks[user_id] = task
|
||||||
|
logging.info(f"Started notifications for user {user_id} with {interval}min interval")
|
||||||
|
|
||||||
|
def stop_user_notifications(user_id: int):
|
||||||
|
"""Остановка задачи уведомлений для пользователя"""
|
||||||
|
if user_id in notification_tasks:
|
||||||
|
notification_tasks[user_id].cancel()
|
||||||
|
del notification_tasks[user_id]
|
||||||
|
logging.info(f"Stopped notifications for user {user_id}")
|
||||||
|
|
||||||
|
def restart_user_notifications(user_id: int):
|
||||||
|
"""Перезапуск задачи уведомлений с обновленными параметрами"""
|
||||||
|
stop_user_notifications(user_id)
|
||||||
|
start_user_notifications(user_id)
|
||||||
|
|
||||||
|
# === UTILITY FUNCTIONS ===
|
||||||
|
|
||||||
|
def create_main_keyboard() -> InlineKeyboardMarkup:
|
||||||
|
"""Создание основной клавиатуры управления"""
|
||||||
|
buttons = [
|
||||||
|
[InlineKeyboardButton(text="Изменить интервал", callback_data="interval_menu")],
|
||||||
|
[InlineKeyboardButton(text="Вкл/выкл уведомления", callback_data="toggle_notifications")]
|
||||||
|
]
|
||||||
|
return InlineKeyboardMarkup(inline_keyboard=buttons)
|
||||||
|
|
||||||
|
def create_interval_keyboard() -> InlineKeyboardMarkup:
|
||||||
|
"""Создание клавиатуры выбора интервала"""
|
||||||
|
intervals = [5, 15, 30, 60, 120, 360, 720, 1440] # От 5 минут до суток
|
||||||
|
buttons = []
|
||||||
|
|
||||||
|
# Группировка кнопок по 2 в ряду для компактности
|
||||||
|
for i in range(0, len(intervals), 2):
|
||||||
|
row = []
|
||||||
|
for j in range(i, min(i + 2, len(intervals))):
|
||||||
|
interval = intervals[j]
|
||||||
|
text = f"{interval}м" if interval < 60 else f"{interval//60}ч"
|
||||||
|
row.append(InlineKeyboardButton(
|
||||||
|
text=text,
|
||||||
|
callback_data=f"set_interval_{interval}"
|
||||||
|
))
|
||||||
|
buttons.append(row)
|
||||||
|
|
||||||
|
buttons.append([InlineKeyboardButton(text="↩️ Назад", callback_data="main_menu")])
|
||||||
|
return InlineKeyboardMarkup(inline_keyboard=buttons)
|
||||||
|
|
||||||
|
async def update_status_message(call: CallbackQuery):
|
||||||
|
"""Обновление сообщения со статусом"""
|
||||||
|
user_info = db.get_user_info(call.from_user.id)
|
||||||
|
if not user_info:
|
||||||
|
return
|
||||||
|
|
||||||
|
user_id, username, interval, is_active = user_info
|
||||||
|
status_emoji = "🟢" if is_active else "🔴"
|
||||||
|
task_status = "Включены" if user_id in notification_tasks else "Остановлены"
|
||||||
|
|
||||||
|
text = (
|
||||||
|
f"<b>📊 Статус системы мониторинга</b>\n\n"
|
||||||
|
f"Пользователь: <code>{username}</code>\n"
|
||||||
|
f"ID: <code>{user_id}</code>\n\n"
|
||||||
|
f"⏰ Интервал: <b>{interval} мин</b>\n"
|
||||||
|
f"🔄 Статус уведомлений: {status_emoji} <b>{task_status}</b>\n\n"
|
||||||
|
f"👥 Всего пользователей: <b>{db.get_user_count()}</b>",
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await call.message.edit_text(text, reply_markup=keyboard)
|
||||||
|
except:
|
||||||
|
pass # Игнорирование ошибок при неизменном контенте
|
||||||
|
|
||||||
|
# Старт функций
|
||||||
|
|
||||||
|
async def initialize_existing_users():
|
||||||
|
"""Инициализация уведомлений для существующих пользователей при запуске"""
|
||||||
|
active_users = db.get_active_users()
|
||||||
|
for user_id, interval in active_users:
|
||||||
|
start_user_notifications(user_id)
|
||||||
|
|
||||||
|
logging.info(f"Initialized notifications for {len(active_users)} active users")
|
||||||
9
init.py
Normal file
9
init.py
Normal file
|
|
@ -0,0 +1,9 @@
|
||||||
|
from aiogram import Bot, Dispatcher
|
||||||
|
from aiogram.enums.parse_mode import ParseMode
|
||||||
|
from aiogram.fsm.storage.memory import MemoryStorage
|
||||||
|
from aiogram.client.bot import DefaultBotProperties
|
||||||
|
|
||||||
|
BOT_TOKEN = "****************" # @dorm_notibot
|
||||||
|
|
||||||
|
bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
|
||||||
|
dp = Dispatcher(storage=MemoryStorage())
|
||||||
131
sql.py
Normal file
131
sql.py
Normal file
|
|
@ -0,0 +1,131 @@
|
||||||
|
import sqlite3
|
||||||
|
import asyncio
|
||||||
|
from typing import List, Tuple, Optional
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
class DatabaseManager:
|
||||||
|
def __init__(self, db_path: str = "notifications.db"):
|
||||||
|
self.db_path = db_path
|
||||||
|
self.init_database()
|
||||||
|
|
||||||
|
def init_database(self):
|
||||||
|
"""Создание таблиц при первом запуске"""
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
# Таблица пользователей с настройками уведомлений
|
||||||
|
cursor.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS users (
|
||||||
|
user_id INTEGER PRIMARY KEY,
|
||||||
|
username TEXT,
|
||||||
|
interval_minutes INTEGER DEFAULT 60,
|
||||||
|
is_active BOOLEAN DEFAULT 1,
|
||||||
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||||
|
)
|
||||||
|
''')
|
||||||
|
|
||||||
|
cursor.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS notifications_log (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
user_id INTEGER,
|
||||||
|
sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
status TEXT DEFAULT 'sent',
|
||||||
|
FOREIGN KEY (user_id) REFERENCES users (user_id)
|
||||||
|
)
|
||||||
|
''')
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
def add_user(self, user_id: int, username: str) -> bool:
|
||||||
|
"""Добавление нового пользователя в систему"""
|
||||||
|
try:
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
"INSERT OR REPLACE INTO users (user_id, username) VALUES (?, ?)",
|
||||||
|
(user_id, username)
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
return True
|
||||||
|
except sqlite3.Error:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def update_interval(self, user_id: int, interval_minutes: int) -> bool:
|
||||||
|
"""Обновление интервала уведомлений для пользователя"""
|
||||||
|
try:
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
"UPDATE users SET interval_minutes = ? WHERE user_id = ?",
|
||||||
|
(interval_minutes, user_id)
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
return cursor.rowcount > 0
|
||||||
|
except sqlite3.Error:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def toggle_notifications(self, user_id: int) -> Optional[bool]:
|
||||||
|
"""Переключение статуса активности уведомлений"""
|
||||||
|
try:
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
"UPDATE users SET is_active = NOT is_active WHERE user_id = ?",
|
||||||
|
(user_id,)
|
||||||
|
)
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT is_active FROM users WHERE user_id = ?",
|
||||||
|
(user_id,)
|
||||||
|
)
|
||||||
|
result = cursor.fetchone()
|
||||||
|
conn.commit()
|
||||||
|
return bool(result[0]) if result else None
|
||||||
|
except sqlite3.Error:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_active_users(self) -> List[Tuple[int, int]]:
|
||||||
|
"""Получение списка активных пользователей с их интервалами"""
|
||||||
|
try:
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT user_id, interval_minutes FROM users WHERE is_active = 1"
|
||||||
|
)
|
||||||
|
return cursor.fetchall()
|
||||||
|
except sqlite3.Error:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def get_user_info(self, user_id: int) -> Optional[Tuple]:
|
||||||
|
"""Получение информации о пользователе"""
|
||||||
|
try:
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
"SELECT user_id, username, interval_minutes, is_active FROM users WHERE user_id = ?",
|
||||||
|
(user_id,)
|
||||||
|
)
|
||||||
|
return cursor.fetchone()
|
||||||
|
except sqlite3.Error:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def log_notification(self, user_id: int, status: str = "sent"):
|
||||||
|
"""Логирование отправленного уведомления"""
|
||||||
|
try:
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
"INSERT INTO notifications_log (user_id, status) VALUES (?, ?)",
|
||||||
|
(user_id, status)
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
except sqlite3.Error:
|
||||||
|
pass # Ошибки логирования не должны прерывать основной процесс
|
||||||
|
|
||||||
|
def get_user_count(self) -> int:
|
||||||
|
"""Получение общего количества пользователей"""
|
||||||
|
try:
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("SELECT COUNT(*) FROM users")
|
||||||
|
result = cursor.fetchone()
|
||||||
|
return result[0] if result else 0
|
||||||
|
except sqlite3.Error:
|
||||||
|
return 0
|
||||||
Loading…
Add table
Reference in a new issue