Compare commits

...

18 commits

Author SHA1 Message Date
9c6a250a33
README.md 2025-06-30 20:11:37 +03:00
b73df23108
README.md 2025-06-30 13:26:57 +03:00
616d7295da
requirements 2025-06-30 13:24:31 +03:00
faab8abc8e
v. 3.0 2025-06-30 13:14:57 +03:00
8f43acaa50
Delete libraries.txt 2025-06-30 13:12:14 +03:00
Egor Deev
de5a86a133
v. 2.2 2023-10-19 13:24:47 +03:00
Egor Deev
ba5aa85800
v. 1.1 2023-10-19 13:23:42 +03:00
Egor Deev
59af13e95b
Delete data/data directory 2023-10-19 13:22:26 +03:00
Egor Deev
178df95ab2
v. 2.2 2023-10-19 12:53:58 +03:00
Egor Deev
78c38a3b6a
v. 2.1 2022-11-22 14:25:20 +03:00
Egor Deev
ffca88a1bd
Delete data/putin directory 2022-11-22 14:24:20 +03:00
Egor Deev
3b16f533b4
Delete sql.py 2022-11-22 14:23:44 +03:00
Egor Deev
0ada23f3ac
Delete script.py 2022-11-22 14:23:34 +03:00
Egor Deev
153b198171
Delete month.txt 2022-11-22 14:23:23 +03:00
Egor Deev
a70bcf0485
Delete groups.db 2022-11-22 14:23:05 +03:00
Egor Deev
2d417cc4df
Delete bot.py 2022-11-22 14:22:54 +03:00
Egor Deev
0863511429
Delete base.py 2022-11-22 14:22:43 +03:00
Egor Deev
a425d3ba0e
v. 2.1 2022-11-22 14:22:09 +03:00
31 changed files with 1351 additions and 1155 deletions

156
README.md
View file

@ -1,7 +1,153 @@
<h2>Telegram 🤖 ABOBOT / Телеграм 🤖 АБОБОТ</h2>
# 🤖 ABOBOT - Telegram Group Management Bot
Этот бот создан для чатов в Телеграм , он может упомянуть всех участников в чате, выдать статистику
по группе / участнику группы, также имеет ситуативные функции, как: перевод транслита, разворачинание / переворачивание
слов, по команде создание опроса и отметка лайк / дизлайк на сообщение учатника группы!
![Python](https://img.shields.io/badge/Python-3.8+-blue.svg)
![Aiogram](https://img.shields.io/badge/Aiogram-3.x-green.svg)
![SQLite](https://img.shields.io/badge/SQLite-3-orange.svg)
![License](https://img.shields.io/badge/License-MIT-yellow.svg)
<b>Также вы можете ознакомиться с этим ботом прямо сейчас перейдя по этой <a href="https://t.me/abobusik_bot">ссылке</a></b>
Многофункциональный Telegram-бот для управления групповыми чатами с расширенной аналитикой и интерактивными возможностями.
## 🎯 Основные возможности
### 📊 Система аналитики
- **Комплексная статистика** - детальный учёт активности участников
- **Временная сегментация** - анализ данных за месяц и весь период
- **Многомерные метрики** - сообщения, ответы, команды, медиафайлы, голосовые
### 👥 Управление участниками
- **Интеллектуальные упоминания** - автоматическое обнаружение имён в сообщениях
- **Динамическое именование** - морфологическая обработка и нормализация
- **Групповые вызовы** - команда `/all` для оповещения всех участников
### 🔧 Утилиты и инструменты
- **Транслитерация** - автоматический перевод с латиницы на кириллицу
- **Транскрипция** - конвертация голосовых сообщений в текст (gTTS)
- **Текстовые трансформации** - реверс слов и предложений
- **Генерация голоса** - озвучивание текстовых сообщений
### 🎮 Интерактивные функции
- **Игровые механики** - симуляция драк и взаимодействий
- **Рандомизация** - генерация случайных чисел в диапазоне
- **Советник** - интеграция с API для получения рандомных советов
## 🏗️ Архитектура системы
### Технологический стек
```
Backend Framework: aiogram (Async Telegram Bot API)
Database Engine: SQLite (многотабличная структура)
NLP Processing: pymorphy3 (морфологический анализ)
Speech Recognition: speech_recognition + gTTS
Async Framework: asyncio (асинхронная обработка)
```
### Структура базы данных
```
📁 Databases
├── base.db # Основная статистика групп
├── users.db # Маппинг пользователей
├── groups.db # Индивидуальная статистика участников
└── month.db # Временная сегментация данных
```
### Модульная организация
```
📁 Project Structure
├── bot.py # Точка входа и инициализация
├── handlers.py # Обработчики событий и команд
├── script.py # Бизнес-логика и алгоритмы
├── sql.py # Абстракция базы данных
├── init.py # Конфигурация и зависимости
└── base.py # Константы и настройки
```
## 🚀 Быстрый старт
### Установка зависимостей
```bash
pip install -r requirements.txt
```
### Конфигурация
1. Получите токен бота у [@BotFather](https://t.me/BotFather)
2. Обновите `TOKEN` в `code/base.py`
3. Настройте права администратора для автоудаления системных сообщений
### Запуск
```bash
cd code
python bot.py
```
## 📋 Команды
| Команда | Функционал |
|---------|------------|
| `/start`, `/help` | Интерактивное меню с описанием возможностей |
| `/all` | Упоминание всех участников группы |
| `/stat_group` | Детальная аналитика группы |
| `/stat_user` | Персональная статистика пользователя |
| `/recognize` | Транскрипция голосового сообщения |
| `/edit <имя>` | Установка кастомного имени для упоминаний |
| `/back_edit` | Возврат к динамическому именованию |
| `/start_bot` | Активация текстовых событий |
| `/stop_bot` | Деактивация текстовых событий |
## 🎲 Текстовые события
### Интерактивные команды
- **"Чмокнуть [имя]"** - позитивное взаимодействие
- **"Отмудохать [имя]"** - игровая агрессия
- **"Подраться с [имя]"** - симуляция боя с рандомным исходом
- **"Число от X до Y"** - генерация случайного числа
- **"Переведи (символ) - текст"** - трансформация в "кирпичный" язык
- **"Переверни - текст"** - реверс слов с сохранением пунктуации
- **"Озвучь - текст"** - генерация голосового сообщения
## 🔍 Технические особенности
### Алгоритмы обработки
- **Морфологический анализ** - приведение имён к начальной форме
- **Детекция транслита** - эвристический алгоритм определения раскладки
- **Парсинг голоса** - интеграция Google Speech Recognition
- **Обработка пунктуации** - сохранение форматирования при трансформациях
### Оптимизации производительности
- **Асинхронная архитектура** - неблокирующая обработка запросов
- **Кэширование соединений** - переиспользование подключений к БД
- **Ленивая загрузка** - подгрузка данных по требованию
## 📈 Метрики и аналитика
Система ведёт учёт следующих параметров:
- Общее количество сообщений
- Количество ответов на сообщения
- Использование команд бота
- Отправка медиафайлов и стикеров
- Голосовые сообщения и видеокружки
- Размещение ссылок
## 🛠️ Требования к системе
- Python 3.8+
- Операционная система: Linux/Windows/macOS
- RAM: минимум 512MB
- Свободное место: 100MB для логов и медиафайлов
## 📄 Лицензия
Проект распространяется под лицензией MIT.
## 👨‍💻 Автор
**Деев Егор Викторович** - Backend Developer
- GitHub: [@EDeev](https://github.com/EDeev)
- Email: egor@deev.space
- Telegram: [@Egor_Deev](https://t.me/Egor_Deev)
---
<div align="center">
<sub>⭐ Если проект оказался полезным, поставьте звезду на GitHub!</sub>
<p><sub>Создано с ❤️ от вашего дорогого - deev.space ©</sub></p>
</div>

88
base.py
View file

@ -1,88 +0,0 @@
# ДАННЫЕ БОТА
TOKEN = "1791246735:AAF3f6tb4T2Rk8atewn-2-2vQ6Q-DVy41Fc"
MY_GROUP = "-1001486717553"
# РИКРОЛЛ
RICKROLL_1 = 'https://vk.cc/8U7VuC' # https://www.youtube.com/watch?v=dQw4w9WgXcQ
RICKROLL_2 = 'https://vk.cc/3A9NI7' # https://youtu.be/dQw4w9WgXcQ
# ОБРАЩЕНИЯ К ПОЛЬЗОВАТЕЛЯМ
KILL_LIST = ["побить", "отмудохать", "избить", "уебать", "отметелить"]
TMOK_LIST = ["поцеловать", "чмокнуть", "соснуть", "лайкнуть"]
QUAT_LIST = [['дай', 'дайте'], ['ахуенный', 'ахуеный', 'охуеный', 'охуенный', 'блять', 'сука'], ['совет', 'совета']]
# ФРАЗЫ ИВЕНТ ДРАКИ
VAR_WIN = ["опустив его ниже плинтуса!", "поставив его на место!"]
VAR_LOSE = ["Кунг - Фу!", "магии!", "резинового члена!", "банды цапель!"]
# ПЕРЕВОДЧИК СЛОВ
ERR = ['q', 'w', 'e', 'r', 't', 'y', 'u', 'i', 'o', 'p', '[', ']', 'a', 's', 'd', 'f', 'g', 'h', 'j', 'k', 'l', ';',
"'", 'z', 'x', 'c', 'v', 'b', 'n', 'm', ',', '.', '/', '`']
ERR_ = ['Q', 'W', 'E', 'R', 'T', 'Y', 'U', 'I', 'O', 'P', '{', '}', 'A', 'S', 'D', 'F', 'G', 'H', 'J', 'K', 'L', ':',
'"', 'Z', 'X', 'C', 'V', 'B', 'N', 'M', '<', '>', '?', '~']
TRU = ['й', 'ц', 'у', 'к', 'е', 'н', 'г', 'ш', 'щ', 'з', 'х', 'ъ', 'ф', 'ы', 'в', 'а', 'п', 'р', 'о', 'л', 'д', 'ж',
'э', 'я', 'ч', 'с', 'м', 'и', 'т', 'ь', 'б', 'ю', '.']
TRU_ = ['Й', 'Ц', 'У', 'К', 'Е', 'Н', 'Г', 'Ш', 'Щ', 'З', 'Х', 'Ъ', 'Ф', 'Ы', 'В', 'А', 'П', 'Р', 'О', 'Л', 'Д', 'Ж',
'Э', 'Я', 'Ч', 'С', 'М', 'И', 'Т', 'Ь', 'Б', 'Ю', ',']
ALB = ['а', 'у', 'о', 'ы', 'и', 'э', 'я', 'ю', 'ё', 'е', 'А', 'У', 'О', 'Ы', 'И', 'Э', 'Я', 'Ю', 'Ё', 'Е']
# НА ВСЯКИЙ СЛУЧАЙ
'''
# from filters import IsAdminFilter
# activate filters
dp.filters_factory.bind(IsAdminFilter)
# await call.message.answer(str(randint(1, 10)))
# await message.answer_sticker(r'CAACAgIAAxkBAAECmDFg8_fMoE0ZFAYxOh5KRyB2Y7VGZwACfQ0AAtMv4ElQpS-ysl5_6SAE') # ФУГА ДУМАЕТ
# state = dp.current_state(chat=config.GROUP_ID, user=message.from_user.id)
# await state.set_state(User.accepted)
# ОСУЖДАЮ
for word in base.BAN_LIST:
if word in txt:
await message.delete()
await message.answer("ОСУЖДАЮ")
print(name_group, ">>>", message.text)
for word in base.BAN_NAME:
if word in txt and not "егор" in txt:
await message.delete()
await message.answer("Для обращений существует имя!")
print(name_group, ">>", message.text)
# бан на минуту
if len(original) == 2:
if "забанься" in txt:
await message.bot.restrict_chat_member(chat_id=id_group, user_id=[base.USERS[i][1] for i in base.USERS if i == no_pct.lower().split()[0]][0],
permissions=types.ChatPermissions(),
until_date=int(time()) + (31))
await message.reply(f'{message.from_user.first_name}, по вашему желанию отлетел дурачок на минуту)')
# админ бан
@dp.message_handler(is_admin=True, commands=['ban'], commands_prefix='!/')
async def ban(message: types.Message):
if not message.reply_to_message:
await message.reply("Эта команда должна быть ответом на сообщение!")
return
await message.bot.delete_message(config.GROUP_ID, message.message_id)
await message.bot.kick_chat_member(chat_id=config.GROUP_ID, user_id=message.reply_to_message.from_user.id)
await message.bot.send_photo(chat_id=config.GROUP_ID, photo="1.jpg", caption="Пользователь унижен!\nПоздравляю ;)")
# чекер данных фото
@dp.message_handler(content_types=['photo'])
async def scan_message(msg: types.Message):
document_id = msg.photo[0].file_id
file_info = await bot.get_file(document_id)
print(f'file_id: {file_info.file_id}')
print(f'file_path: {file_info.file_path}')
print(f'file_size: {file_info.file_size}')
print(f'file_unique_id: {file_info.file_unique_id}')
'''

615
bot.py
View file

@ -1,615 +0,0 @@
import logging, random, pymorphy2, requests
import asyncio, emoji, base, re, os
from gtts import gTTS
from sql import SQLighter
from datetime import datetime
from script import cheker, translator, notice, lang_form, times, autozak, update_stat, revers
from aiogram import Bot, Dispatcher, executor, types
# log level
logging.basicConfig(level=logging.INFO)
# bot init
bot = Bot(token=base.TOKEN)
dp = Dispatcher(bot)
morph = pymorphy2.MorphAnalyzer()
# инициализируем соединение с БД
db = SQLighter('groups.db')
# events
@dp.message_handler(content_types=["new_chat_members"])
async def notification(message: types.Message):
if not db.group_exists(message.chat.id):
db.add_group(message.chat.id)
db.add_month_group(message.chat.id)
db.created_group(message.chat.id)
await bot.send_message(message.chat.id, f'ID <b>[ {message.chat.id} ]</b>\n\nДля того, чтобы уведомления по '
f'имени и команда /all нормально функционировали, необходимо чтобы '
f'каждый участник группы написал хотя бы одно сообщение в чат! Также '
f'для того, чтобы системные сообщения удалялись автоматически, '
f'необходимо боту выдать права администратора!', types.ParseMode.HTML)
else:
try:
await message.delete()
except Exception:
print(f'{message.chat.title} >>> НЕ УДАЛОСЬ УДАЛИТЬ СООБЩЕНИЕ')
@dp.message_handler(commands=['start'])
async def helps(message: types.Message):
if message.chat.id > 0:
id_group, user = message.chat.id, message.from_user.id
name_db = re.sub(r'[^\w\s]', '', message.from_user.first_name.lower())
try:
if not db.user_exists(user, id_group):
db.add_user(user, name_db, id_group)
else:
if user in db.get_users(id_group):
db.update_name(user, name_db, id_group)
except Exception as e:
await message.answer("Вы запустили бота в личном чате! Для работы в группе или в общем чате нечего делать "
"не надо, лишь добавить в чат и выдать права на удаление сообщений в чате!\n\n- Для "
"введения в обширный функционал бота можете воспользоваться командой /help!")
db.add_group(id_group)
db.add_month_group(id_group)
db.created_group(id_group)
db.add_user(user, name_db, id_group)
else:
return
@dp.message_handler(commands=['help'])
async def helps(message: types.Message):
update_stat(3, message, True)
buttons = [types.InlineKeyboardButton(text="КОМАНДЫ", callback_data="com"),
types.InlineKeyboardButton(text="ИВЕНТЫ", callback_data="even"),
types.InlineKeyboardButton(text="ОШИБКИ", callback_data="err"),
types.InlineKeyboardButton(text="АВТОР", callback_data="auth"),
types.InlineKeyboardButton(text="ФУНКЦИИ", callback_data="fun")]
keyboard = types.InlineKeyboardMarkup(row_width=2)
keyboard.add(*buttons)
await bot.send_voice(chat_id=message.chat.id, voice=open('data/tts.ogg', 'rb'),
caption='*-* Этот АБОБОТ поможет вам приятно провести время в чате с различными командами, '
'ивентами и удобными функциями, которые облегчают использование чата)\n\n'
'*-* Также прошу если вам понравился бот, оставить отзыв о его использовании на '
'команду /report)', parse_mode=types.ParseMode.MARKDOWN, reply_markup=keyboard)
# ИЛАЙН КЛАВИАТУРА HELP
@dp.callback_query_handler(text="err")
async def error(call: types.CallbackQuery):
await call.message.answer(text='*| ОШИБКИ |*\n\n*-* Так как бот разрабатывается одним человеком, все баги и '
'недочёты програмы выявить сразу очень трудно, поэтому я буду признателен вам если '
'о найденых ошибка вы сообщите по команде /report\n\n*>* Отправить в формате '
'*[ /report {сообщение} ]*', parse_mode=types.ParseMode.MARKDOWN)
@dp.callback_query_handler(text="auth")
async def function(call: types.CallbackQuery):
await call.message.answer(text='*| АВТОР |*\n\n*>>* Официальный телеграмм канал автора *@govnacoder*\n\n*-* Я '
'практикуюсь в програмировании разрабатывая ботов и простенькие приложения, если '
'интерестно ознокомиться с моими работами, то переходити ко мне в телеграм канал!',
parse_mode=types.ParseMode.MARKDOWN)
@dp.callback_query_handler(text="fun")
async def function(call: types.CallbackQuery):
await call.message.answer(text='*| ФУНКЦИИ |*\n\n*1.* Возможность перевода случайно написанного текста на '
'транслите\n*2.* Ведение обширной статистики сообщений\n*3.* Упоминание участника '
'при написании его имени в чате\n*4.* Автоматическое удаление системных сообщений',
parse_mode=types.ParseMode.MARKDOWN)
@dp.callback_query_handler(text="com")
async def commands(call: types.CallbackQuery):
await call.message.answer(text='<b>| КОМАНДЫ |</b>\n\n/all - упомянуть всех в чате\n/vote - напишите в начале '
'сообщения и создасться простой опрос (Да / Нет)\n/report - жалобы и '
'предложения\n/help - полный список функций\n/statistic_group - полная статистика '
'группы\n/statistic_user - полная статистика отправителя\n/srednya_statistic - '
'средняя статистика группы\n/edit и /back_edit - первая команда даёт возможность '
'сменить имя для упоминаний на любое слово, а вторая для возврата динамического '
'имени\n/start_bot и /stop_bot - возможность отключения текстовых ивентов\n/like '
'и /dislike - даёт возможность оценить то или иное сообщение пользователя',
parse_mode=types.ParseMode.HTML)
@dp.callback_query_handler(text="even")
async def events(call: types.CallbackQuery):
await call.message.answer(text='*| ИВЕНТЫ |*\n\n*"Чмокнуть"* - сделать кому-нибудь приятно\n*"Отмудохать"* - '
'выместить злость на кого-нибудь\n*"Число от ... до ..."* - случайное значение из '
'диапозона\n*"Подраться с ..."* - повод кого-нибудь побить\n*"Переведи (..) - ..."* '
'- перевод слова на керпичный язык\n*"Дай блять совет!"* - даёт рандомный охуенный '
'совет\n*"Переверни - ..."* - переворачивает слова в предложении\n*"Озвучь - ..."* '
'- озвучивает написанный текст',
parse_mode=types.ParseMode.MARKDOWN)
# ОСТАЛЬНЫЕ КОМАНДЫ
@dp.message_handler(commands=['all'], commands_prefix='@/')
async def always(message: types.Message):
update_stat(3, message, True)
try:
await message.reply(notice(message.from_user.first_name, True, message.chat.id, message.from_user.id),
types.ParseMode.HTML)
except Exception:
await message.reply('В группе состоит менее 3х человек, из-за чего команда не работает!')
@dp.message_handler(commands=['report'])
async def answer(message: types.Message):
update_stat(3, message, True)
if len(message.text.split()) > 1:
await bot.send_message(chat_id='-1001510980656',
text=f'{message.chat.title} >> '
f'[{message.from_user.first_name}](tg://user?id={message.from_user.id}) '
f'> {message.text}', parse_mode=types.ParseMode.MARKDOWN)
await message.answer("Ваше сообщение передано администратору)")
else:
await message.answer("Заполнять обращение к администратору надо по форме */report { жалоба или предложение }*",
types.ParseMode.MARKDOWN)
@dp.message_handler(commands=['statistic_group'])
async def stat(message: types.Message):
update_stat(3, message, True)
group = db.stat_element_group(message.chat.id)
month = db.stat_element_month_group(message.chat.id)
try:
await message.answer(text=f'*| СТАТИСТИКА ГРУППЫ |*\n\n*>>* В целом сообщений *[ {message.message_id} ]*\n*>>* '
f'Сообщений в базе *[ {group[0]} ]*\n*>>* Среднее количество сообщений в месяц - *[ '
f'{month[0] // group[8]} ]*\n\n*👍* Лайков *[ {group[10]} ]* / *👎* Дизлайков *[ '
f'{group[9]} ]*\n\n*>* Команд *[ {group[2]} ]*\n*>* Ссылок *[ '
f'{group[3]} ]*\n*>* Стикеров *[ {group[6]} ]*\n*>* Тик-токов *[ {group[4]} ]*\n*>* Медиа'
f' файлов *[ {group[5]} ]*\n*>* Ответов на сообщения *[ {group[1]} ]*\n*>* Голос / Видео '
f'сообщений *[ {group[7]} ]*', parse_mode=types.ParseMode.MARKDOWN)
except Exception:
await message.answer(text=f'*| СТАТИСТИКА ГРУППЫ |*\n\n*>>* В целом сообщений *[ {message.message_id} ]*\n*>>* '
f'Сообщений в базе *[ {group[0]} ]*\n\n*👍* Лайков *[ {group[10]} ]* / *👎* Дизлайков *[ '
f'{group[9]} ]*\n\n*>* Команд *[ {group[2]} ]*\n*>* Ссылок *[ '
f'{group[3]} ]*\n*>* Стикеров *[ {group[6]} ]*\n*>* Тик-токов *[ {group[4]} ]*\n*>* Медиа'
f' файлов *[ {group[5]} ]*\n*>* Ответов на сообщения *[ {group[1]} ]*\n*>* Голос / Видео '
f'сообщений *[ {group[7]} ]*', parse_mode=types.ParseMode.MARKDOWN)
@dp.message_handler(commands=['statistic_user'])
async def stat(message: types.Message):
update_stat(3, message, True)
user = db.stat_element_user(message.from_user.id, message.chat.id)
try:
name = morph.parse(db.get_name_user(message.from_user.id, message.chat.id))[0]
name = name.inflect({"gent"}).word.upper()
except Exception:
name = message.from_user.first_name.lower().upper()
try:
err = 1 // user[8]
await message.answer(text=f'*| СТАТИСТИКА {name} |*\n\n*>>* Всего сообщений *[ '
f'{user[0]} ]*\n*>>* Среднее количество сообщений в месяц - *[ {user[0] // user[8]} '
f']*\n\n*👍* Лайков *[ {user[10]} ]* / *👎* Дизлайков *[ {user[9]} ]*\n\n*>* Команд *[ '
f'{user[2]} ]*\n*>* Ссылок *[ {user[3]} ]*\n*>* Стикеров '
f'*[ {user[6]} ]*\n*>* Тик-токов *[ {user[4]} ]*\n*>* Медиа файлов *[ {user[5]} ]*\n*>* '
f'Ответов на сообщения *[ {user[1]} ]*\n*>* Голос / Видео сообщений *[ {user[7]} ]*',
parse_mode=types.ParseMode.MARKDOWN)
except Exception:
await message.answer(text=f'*| СТАТИСТИКА {name} |*\n\n*>>* Всего сообщений *[ '
f'{user[0]} ]*\n\n*👍* Лайков *[ {user[10]} ]* / *👎* Дизлайков *[ {user[9]} ]*\n\n*>* '
f'Команд *[ {user[2]} ]*\n*>* Ссылок *[ {user[3]} ]*\n*>* Стикеров '
f'*[ {user[6]} ]*\n*>* Тик-токов *[ {user[4]} ]*\n*>* Медиа файлов *[ {user[5]} ]*\n*>* '
f'Ответов на сообщения *[ {user[1]} ]*\n*>* Голос / Видео сообщений *[ {user[7]} ]*',
parse_mode=types.ParseMode.MARKDOWN)
@dp.message_handler(commands=['srednya_statistic'])
async def stat(message: types.Message):
update_stat(3, message, True)
timer = db.stat_element_group(message.chat.id)[8]
month = db.stat_element_month_group(message.chat.id)
try:
if timer != 0:
await message.answer(text=f'*| СРЕДНЯЯ СТАТИСТИКА ГРУППЫ |*\n\n*>>* Среднее количество сообщений в месяц - '
f'*[ {month[0] // timer} ]*\n\n*👍* Лайков *[ {month[9] // timer} ]* / *👎* Дизлайков *[ {month[8] // timer} '
f']*\n\n*>* Команд *[ {month[2] // timer} ]*\n*>* Ссылок *[ {month[3] // timer} ]*\n*>* Стикеров '
f'*[ {month[6] // timer} ]*\n*>* Тик-токов *[ {month[4] // timer} ]*\n*>* Медиа файлов *[ {month[5] // timer} ]*\n*>* '
f'Ответов на сообщения *[ {month[1] // timer} ]*\n*>* Голос / Видео сообщений *[ {month[7] // timer} ]*',
parse_mode=types.ParseMode.MARKDOWN)
else:
await message.reply('Статистика не может быть показана, так как с момента добавления бота прошло менее месяца!')
except Exception:
await message.reply('Вывод стаистики произвёл ошибку, вероятнее всего в вашем чате были использованы не все виды сообщений!')
# ЕЖЕМЕСЕЧНОЕ УВЕДОМЛЕНИЕ
async def time(wait_for):
while True:
await asyncio.sleep(wait_for)
file = open('month.txt', 'r')
data = file.readline()
file.close()
month = datetime.now().date().month
day = datetime.now().date().day
if int(data) != int(month):
for group in db.all_id_group_lst():
last = db.month_stat_group(group)
await bot.send_message(chat_id=group,
text=f'*| СТАТИСТИКА ГРУППЫ ЗА ПРОШЕДШИЙ МЕСЯЦ |*\n\n*>>* В целом сообщений *[ '
f'{last[0]} ]*\n\n*👍* Лайков *[ {last[9]} ]* / *👎* Дизлайков *[ '
f'{last[8]} ]*\n\n*>* Команд *[ {last[2]} ]*\n*>* Ссылок *[ {last[3]} '
f']*\n*>* Стикеров *[ {last[6]} ]*\n*>* Тик-токов *[ {last[4]} ]*\n*>* '
f'Медиа файлов *[ {last[5]} ]*\n*>* Ответов на сообщения *[ {last[1]} '
f']*\n*>* Голос / Видео сообщений *[ {last[7]} ]*',
parse_mode=types.ParseMode.MARKDOWN)
db.statistics_group(group, 0)
for user in db.id_lst(group):
db.user_statistic(user, group, 0)
file = open('month.txt', 'w')
file.write(str(month))
file.close()
elif (month == 6) and (day == 28):
for group in db.all_id_group_lst():
await bot.send_message(chat_id=group,
text=f'Уряяя! У меня сегодня день рождения!',
parse_mode=types.ParseMode.MARKDOWN)
@dp.message_handler(commands=['like'])
async def like(message: types.Message):
update_stat(3, message, True)
if message.reply_to_message:
update_stat(2, message)
love = (message.reply_to_message.from_user.id == message.from_user.id)
try:
comm = (message.reply_to_message.text[0] == '/')
except Exception:
comm = False
if love is False and comm is False:
db.statistics_group(message.chat.id, 10)
db.user_statistic(message.reply_to_message.from_user.id, message.chat.id, 10)
try:
await message.delete()
except Exception:
pass
await bot.send_message(chat_id=message.chat.id, text=emoji.emojize(':thumbs_up:'),
reply_to_message_id=message.reply_to_message.message_id)
else:
if love:
await message.answer('Ай-яй-яй! За себя голосовать не хорошо!')
elif comm:
await message.answer('Оценивать команду не рационально!')
else:
await message.answer('Команда должна быть ответом на сообщение пользователя!')
@dp.message_handler(commands=['dislike'])
async def dislike(message: types.Message):
update_stat(3, message, True)
if message.reply_to_message:
update_stat(2, message)
love = (message.reply_to_message.from_user.id == message.from_user.id)
try:
comm = (message.reply_to_message.text[0] == '/')
except Exception:
comm = False
if love is False and comm is False:
db.statistics_group(message.chat.id, 9)
print(">>", message.from_user.first_name, datetime.now())
db.user_statistic(message.reply_to_message.from_user.id, message.chat.id, 9)
try:
await message.delete()
except Exception:
pass
await bot.send_message(chat_id=message.chat.id, text=emoji.emojize(':thumbs_down:'),
reply_to_message_id=message.reply_to_message.message_id)
else:
if love:
await message.answer('Ай-яй-яй! За себя голосовать не хорошо!')
elif comm:
await message.answer('Оценивать команду не рационально!')
else:
await message.answer('Команда должна быть ответом на сообщение пользователя!')
@dp.message_handler(commands=['edit'])
async def update(message: types.Message):
update_stat(3, message, True)
text_edit = re.sub(r'[^\w\s]', '', message.text.lower()).split()
if len(text_edit) == 2:
norm = morph.parse(text_edit[1])[0].normal_form
db.edit_name(message.from_user.id, norm, message.chat.id, 1)
name_edit = db.name_lst(message.chat.id)[db.id_lst(message.chat.id).index(str(message.from_user.id))]
await message.reply(f'{name_edit.title()}, ваше имя было успешно изменено)')
else:
await message.reply(f'Вы не правильно ввели имя! Имя должно быть из одного слова и идти сразу после команды!')
@dp.message_handler(commands=['back_edit'])
async def update_return(message: types.Message):
update_stat(3, message, True)
text_name = re.sub(r'[^\w\s]', '', message.from_user.first_name.lower())
db.edit_name(message.from_user.id, text_name, message.chat.id, 0)
await message.reply(f'{text_name.title()}, вы успешно вернулись к динамическому изменению имени)')
@dp.message_handler(commands=['vote'])
async def voting(message: types.Message):
update_stat(3, message, True)
await bot.send_poll(chat_id=message.chat.id, question="Итак, что вы выберите?",
options=['Да', 'Нет'], is_anonymous=False, open_period=600,
reply_to_message_id=message.message_id)
@dp.message_handler(commands=['start_bot'])
async def opening(message: types.Message):
update_stat(3, message, True)
if not db.group_exists(message.chat.id):
db.add_group(message.chat.id)
await message.answer(
"Ваша группа изменила свой статус, из-за чего данные о пользователях были стёрты, "
"вам надо снова отправить хотя бы по одному сообщению от каждого пользователя! Текстовые ивенты включены!")
else:
for group in db.get_group():
if str(group[1]) == str(message.chat.id):
await message.answer("У вас уже включены текстовые ивенты!")
break
else:
db.update_status_group(message.chat.id, True)
await message.answer("Текстовые ивенты включены!")
@dp.message_handler(commands=['stop_bot'])
async def closing(message: types.Message):
update_stat(3, message, True)
if not db.group_exists(message.chat.id):
db.add_group(message.chat.id, False)
await message.answer(
"Ваша группа изменила свой статус, из-за чего данные о пользователях были стёрты, "
"вам надо снова отправить хотя бы по одному сообщению от каждого пользователя! Текстовые ивенты отключены!")
else:
for group in db.get_group():
if str(group[1]) == str(message.chat.id):
db.update_status_group(message.chat.id, False)
await message.answer("Текстовые ивенты отключены!")
break
else:
await message.answer("У вас уже отключены текстовые ивенты!")
# ФИКСАЦИЯ СТАТИСТИКИ СООБЩЕНИЙ
@dp.message_handler(
content_types=["migrate_to_chat_id", "migrate_from_chat_id", "new_chat_title", "new_chat_photo", "pinned_message",
"voice_chat_scheduled", "voice_chat_started", "voice_chat_ended", "voice_chat_participants_invited",
"left_chat_member"])
async def chat_events(message: types.Message):
try:
await message.delete()
except Exception:
print(f'{message.chat.title} >>> НЕ УДАЛОСЬ УДАЛИТЬ СООБЩЕНИЕ')
# СТАТИСТИКА
@dp.message_handler(content_types=['location', 'contact', 'video', 'photo', 'audio', 'document'])
async def media(message: types.Message):
update_stat(6, message, True)
@dp.message_handler(content_types=['voice', 'video_note'])
async def mms(message: types.Message):
update_stat(8, message, True)
@dp.message_handler(content_types=['sticker'])
async def stick(message: types.Message):
update_stat(7, message, True)
# ТЕКСТОВЫЕ ИВЕНТЫ
@dp.message_handler(content_types=['text'])
async def send_events(message: types.Message):
id_group, name_group, user = message.chat.id, message.chat.title, message.from_user.id
name_db = re.sub(r'[^\w\s]', '', message.from_user.first_name.lower())
txt = message.text.lower() # СООБЩЕНИЕ В НИЖНЕМ РЕГИСТРЕ
bk = [i for i in txt] # СПИСОК СИМВОЛОВ СООБЩЕНИЯ
# ПРОВЕРКА ПОЛЬЗОВАТЕЛЯ
try:
if not db.user_exists(user, id_group):
db.add_user(user, name_db, id_group)
else:
if user in db.get_users(id_group):
db.update_name(user, name_db, id_group)
except Exception as e:
await message.answer(f'<b>Ваша группа [ {name_group} ] изменила свой статус на супергруппу! '
f'Из-за этого, список участников был стёрт, и вам необходимо заново отослать '
f'хотя бы одно сообщение, для коректного упоминания!</b>', types.ParseMode.HTML)
db.add_group(id_group)
db.add_month_group(id_group)
db.created_group(id_group)
db.add_user(user, name_db, id_group)
# ОБНОВЛЕНИЕ КОЛ-ВО СООБЩЕНИЙ
update_stat(1, message)
# ОБНОВЛЕНИЕ КОЛ-ВО КОМАНД
if len(message.text) > 1 and message.text[0] == '/':
update_stat(3, message)
# ОБНОВЛЕНИЕ КОЛ-ВО ОТВЕТОВ НА СООБЩЕНИЯ
try:
if int(message.reply_to_message.message_id) > 1:
update_stat(2, message)
except Exception:
pass
# ИВЕНТЫ
if str(message.chat.id) in db.id_group_lst():
# ПЕРЕМЕННЫЕ
original = message.text.split() # СПИСОК СЛОВ В СООБЩЕНИИ
no_pct = re.sub(r'[^\w\s]', '', txt) # СООБЩЕНИЕ БЕЗ ПУНКТУАЦИИ
norm = [morph.parse(i)[0].normal_form for i in no_pct.split()] # СПИСОК СЛОВ В ПЕРВОЙ ФОРМЕ
names = [_ for _ in norm if _ in db.name_lst(message.chat.id)] # СПИСОК ИМЕН В СООБЩЕНИИ
# ПОЛЕЗНЫЕ ФУНКЦИИ
if len(original) == 5 and "число от" in txt:
await message.answer(text=f"Число <b>[ {random.randint(int(txt.split()[2]), int(txt.split()[4]))} ]</b>",
parse_mode=types.ParseMode.HTML)
return
if len(original) > 1 and no_pct.split()[0] == 'переведи':
if "переведи - " in txt:
await message.answer(lang_form([original[_] for _ in range(len(original)) if _ > 1]))
elif f"переведи ({no_pct.split()[1]}) - " in txt:
if len(no_pct.split()[1]) == 1:
await message.answer(lang_form([original[_] for _ in range(len(original)) if _ > 2],
no_pct.split()[1]))
return
if len(original) > 1 and no_pct.split()[0] == 'переверни':
if "переверни - " in txt:
await message.answer(revers(message.text[12:], True))
elif "переверни полностью - " in txt:
await message.answer(revers(message.text[22:], False))
return
if len(original) > 1 and no_pct.split()[0] == 'озвучь' and "озвучь - " in txt:
text_to_voice = message.text[9:]
tts = gTTS(text_to_voice, lang='ru')
tts.save('voice.ogg')
await bot.send_voice(chat_id=message.chat.id, voice=open('voice.ogg', 'rb'),
caption=f"<b>{text_to_voice}</b>", parse_mode=types.ParseMode.HTML)
os.remove("voice.ogg")
# РАНДОМ ИВЕНТЫ
if len(original) >= 2:
if "подраться с" in txt:
text = " ".join([original[i] for i in range(len(original)) if i > 1])
if random.randint(0, 1) == 0:
await message.bot.send_photo(chat_id=id_group,
photo=open(f"data/fight/({random.randint(1, 8)}).jpg", 'rb'),
caption=f"{message.from_user.first_name} ты был унижен {text.title()}"
f" с помощью {base.VAR_LOSE[random.randint(0, 3)]}")
else:
await message.bot.send_photo(chat_id=id_group,
photo=open(f"data/fight/({random.randint(1, 8)}).jpg", 'rb'),
caption=f"{message.from_user.first_name} ты победил в драке "
f"с {text.title()}, {base.VAR_WIN[random.randint(0, 1)]}")
# ИВЕНТ ВЗАИМОДЕЙСТВИЯ
if len(original) >= 2:
if "путин" not in txt:
for word in base.TMOK_LIST:
if word in txt:
lst = [original[i] for i in range(len(original)) if i != 0]
text = " ".join(lst)
slv = morph.parse(original[0].lower())[0]
await message.bot.send_photo(chat_id=id_group,
photo=open(f"data/tmok/({random.randint(1, 4)}).jpg", 'rb'),
caption=f"{message.from_user.first_name} "
f"{slv.inflect({'past', 'sing', 'indc'}).word} {text}")
for word in base.KILL_LIST:
if word in txt:
lst = [original[i] for i in range(len(original)) if i != 0]
text = " ".join(lst)
slv = morph.parse(original[0].lower())[0]
await message.bot.send_photo(chat_id=id_group,
photo=open(f"data/kill/({random.randint(1, 6)}).jpg", 'rb'),
caption=f"{message.from_user.first_name} "
f"{slv.inflect({'past', 'sing', 'indc'}).word} {text}")
for word_1 in base.QUAT_LIST[0]:
flag = False
for word_2 in base.QUAT_LIST[1]:
for word_3 in base.QUAT_LIST[2]:
if no_pct == " ".join([word_1, word_2, word_3]):
word = (requests.get('http://fucking-great-advice.ru/api/random').text).split('"')[5]
await message.reply(word)
flag = True
if flag:
break
if flag:
break
if no_pct == 'совет дня':
word = (requests.get('http://fucking-great-advice.ru/api/latest').text).split('"')[5]
await message.reply(word)
# PUTIN
if len(original) >= 2:
if "поклониться путину" in txt:
await message.bot.send_photo(chat_id=id_group,
photo=open("data/putin/putin.jpg", 'rb'),
caption=f"{message.from_user.first_name}! "
f"Я рад, что ты выбрал правильную сторону)")
return
if "чмокнуть путина" in txt:
if message.from_user.first_name != "Вадим":
await message.bot.send_photo(chat_id=id_group,
photo=open(f"data/putin/chmok/({random.randint(1, 5)}).jpg", 'rb'),
caption=f"{message.from_user.first_name}, "
f"был удостоен чести чмокнуть великого!")
else:
await message.bot.send_photo(chat_id=id_group,
photo=open(f"data/putin/chmok/vadim.jpg", 'rb'),
caption=f"{message.from_user.first_name}, а ты ему сразу приглянулся)")
return
if "путин вор" in txt:
await message.bot.send_photo(chat_id=id_group,
photo=open(f"data/putin/autozak/({random.randint(1, 4)}).jpg", 'rb'),
caption=f"{message.from_user.first_name}! Ваш автозак уже в пути! "
f"Примерное время ожидания составляет "
f"{autozak(message.from_user.first_name)} Рекомендуеться "
f"подготовить паспорт и наркотики для избежания лишней работы!")
return
if "лёша выйдет погулять" in txt:
await message.bot.send_photo(chat_id=id_group,
photo=open("data/putin/lesha.jpg", 'rb'),
caption=f"{message.from_user.first_name}, нет! Он сидит дома из-за "
f"плохого поведения, приходи через {times()}")
return
# ЛИСТ КОМАНДЫ
if names:
if "крид" not in norm:
await message.reply(notice(names, False, id_group, message.from_user.id), types.ParseMode.HTML)
return
else:
if int(id_group) == int(base.MY_GROUP):
if random.randint(1, 10) == 1:
await message.reply("Заебал уже, со своим ебучим Игорем Кридом")
return
else:
await message.reply(notice(names, False, id_group, message.from_user.id), types.ParseMode.HTML)
return
# ПЕРЕВОДЧИК СЛОВ
if cheker(bk, original, id_group, user) == len(txt):
await message.reply(f"[{message.from_user.first_name}](tg://user?id={user}) *>* {translator(original)}",
types.ParseMode.MARKDOWN)
return
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(time(21600)) # КАЖДЫЕ 6 ЧАСОВ ПРОВЕРКА
executor.start_polling(dp, skip_updates=True)

26
code/base.py Normal file
View file

@ -0,0 +1,26 @@
# ДАННЫЕ БОТА
TOKEN = "XXXXXXXXXX" # @chat_abobot
DEBAG = "XXXXXXXXXX" # Технический чат
TEX_GROUP = "XXXXXXXXXX"
# ОБРАЩЕНИЯ К ПОЛЬЗОВАТЕЛЯМ
KILL_LIST = ["побить", "отмудохать", "избить", "уебать", "отметелить"]
TMOK_LIST = ["поцеловать", "чмокнуть", "соснуть", "лайкнуть"]
QUAT_LIST = [['дай', 'дайте'], ['ахуенный', 'ахуеный', 'охуеный', 'охуенный', 'блять', 'сука'], ['совет', 'совета']]
# ФРАЗЫ ИВЕНТ ДРАКИ
VAR_WIN = ["опустив его ниже плинтуса!", "поставив его на место!"]
VAR_LOSE = ["Кунг - Фу!", "магии!", "резинового члена!", "банды цапель!"]
# ПЕРЕВОДЧИК СЛОВ
ERR = ['q', 'w', 'e', 'r', 't', 'y', 'u', 'i', 'o', 'p', '[', ']', 'a', 's', 'd', 'f', 'g', 'h', 'j', 'k', 'l', ';',
"'", 'z', 'x', 'c', 'v', 'b', 'n', 'm', ',', '.', '/', '`']
ERR_ = ['Q', 'W', 'E', 'R', 'T', 'Y', 'U', 'I', 'O', 'P', '{', '}', 'A', 'S', 'D', 'F', 'G', 'H', 'J', 'K', 'L', ':',
'"', 'Z', 'X', 'C', 'V', 'B', 'N', 'M', '<', '>', '?', '~']
TRU = ['й', 'ц', 'у', 'к', 'е', 'н', 'г', 'ш', 'щ', 'з', 'х', 'ъ', 'ф', 'ы', 'в', 'а', 'п', 'р', 'о', 'л', 'д', 'ж',
'э', 'я', 'ч', 'с', 'м', 'и', 'т', 'ь', 'б', 'ю', '.']
TRU_ = ['Й', 'Ц', 'У', 'К', 'Е', 'Н', 'Г', 'Ш', 'Щ', 'З', 'Х', 'Ъ', 'Ф', 'Ы', 'В', 'А', 'П', 'Р', 'О', 'Л', 'Д', 'Ж',
'Э', 'Я', 'Ч', 'С', 'М', 'И', 'Т', 'Ь', 'Б', 'Ю', ',']
ALB = ['а', 'у', 'о', 'ы', 'и', 'э', 'я', 'ю', 'ё', 'е', 'А', 'У', 'О', 'Ы', 'И', 'Э', 'Я', 'Ю', 'Ё', 'Е']

20
code/bot.py Normal file
View file

@ -0,0 +1,20 @@
import asyncio
import logging
from init import bot, dp
from handlers import router
async def main() -> None:
dp.include_router(router)
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types(), skip_updates=True)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
try: asyncio.run(main())
except KeyboardInterrupt:
print("Бот остановлен")
pass

540
code/handlers.py Normal file
View file

@ -0,0 +1,540 @@
from aiogram import F, Router
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, FSInputFile
from aiogram.filters import Command, CommandStart
from aiogram.enums import ContentType
import logging, random, requests, os, re
import speech_recognition as sr
import soundfile as sf
from gtts import gTTS
from init import *
from script import *
import base
router = Router()
# НОВЫЕ УЧАСТНИКИ ГРУППЫ
@router.message(F.content_type == ContentType.NEW_CHAT_MEMBERS)
async def notification(message: Message):
if not du.group_exists(message.chat.id):
du.add_group(message.chat.id)
group_id = du.get_group_id(message.chat.id)
db.add_group(group_id)
dg.created_group(group_id)
await bot.send_message(
message.chat.id,
f'*Привет группа {message.chat.title}!*\n\n'
f'Все функции вы можете узнать по команде /help! Для того, чтобы '
f'уведомления по имени и команда /all нормально функционировали, '
f'необходимо чтобы каждый участник группы написал хотя бы одно '
f'сообщение в чат! Также для того, чтобы системные сообщения удалялись '
f'автоматически, необходимо боту выдать права администратора!'
)
else:
try:
await message.delete()
except Exception:
return
# ОБНОВЛЕНИЕ АЙДИ ГРУППЫ
@router.message(F.content_type.in_([ContentType.MIGRATE_TO_CHAT_ID, ContentType.MIGRATE_FROM_CHAT_ID]))
async def chat_reload(message: Message):
if du.group_exists(message.migrate_from_chat_id):
du.update_group_id(du.get_group_id(message.migrate_from_chat_id), message.migrate_to_chat_id)
# КОМАНДЫ HELP И START
@router.message(CommandStart())
@router.message(Command('help'))
async def helps(message: Message):
if message.chat.id > 0:
user_id = message.from_user.id
if not du.user_exists(user_id):
du.add_user(user_id)
else:
group_id, user_id = message.chat.id, message.from_user.id
if not du.user_exists(user_id):
du.add_user(user_id)
if not du.group_exists(group_id):
du.add_group(group_id)
upd_stat(user_id, group_id, 3, message.from_user.first_name, True)
buttons = [
[InlineKeyboardButton(text="КОМАНДЫ", callback_data="com"),
InlineKeyboardButton(text="ИВЕНТЫ", callback_data="even")],
[InlineKeyboardButton(text="АВТОР", callback_data="auth"),
InlineKeyboardButton(text="ФУНКЦИИ", callback_data="fun")]
]
keyboard = InlineKeyboardMarkup(inline_keyboard=buttons)
try:
await bot.send_voice(
chat_id=message.chat.id,
voice=FSInputFile('../data/tts.ogg'),
caption='*-* Этот АБОБОТ поможет вам приятно провести время в чате с различными командами, '
'ивентами и удобными функциями, которые облегчают использование чата)\n\n'
'*-* Также прошу если вам понравился бот, оставить отзыв о его использовании на '
'команду /report)',
reply_markup=keyboard
)
except Exception:
await bot.send_message(
chat_id=message.chat.id,
text='*-* Этот АБОБОТ поможет вам приятно провести время в чате с различными командами, '
'ивентами и удобными функциями, которые облегчают использование чата)\n\n'
'*-* Также прошу если вам понравился бот, оставить отзыв о его использовании на '
'команду /report)',
reply_markup=keyboard
)
# INLINE КЛАВИАТУРА ОБРАБОТЧИКИ
@router.callback_query(F.data == "auth")
async def author(call: CallbackQuery):
await call.message.answer(
text='*| АВТОР |*\n\n*>>* Этот бот, как бы это не печально звучало, но одна из лучших'
' моих работ и если кого-нибудь у меня получится действительно достойный продукт,'
' вы сможете о нём узнать в моём телеграм канале *@programium*'
)
@router.callback_query(F.data == "fun")
async def function(call: CallbackQuery):
await call.message.answer(
text='*| ФУНКЦИИ |*\n\n*1.* Возможность перевода случайно написанного текста на '
'транслите\n*2.* Ведение обширной статистики сообщений\n*3.* Упоминание участника '
'при написании его имени в чате\n*4.* Автоматическое удаление системных сообщений'
)
@router.callback_query(F.data == "com")
async def commands(call: CallbackQuery):
await call.message.answer(
text='*| КОМАНДЫ |*\n\n*/all* - упомянуть всех в чате\n*/help* - полный список функций\n'
'*/recognize* - транскрипция отмеченного голосового сообщения в текст\n'
'*/stat_group* - полная статистика группы\n*/stat_user* - полная статистика '
'отправителя\n*/edit и /back_edit* - первая команда даёт возможность '
'сменить имя для упоминаний на любое слово, а вторая для возврата динамического '
'имени\n*/start_bot и /stop_bot* - возможность отключения текстовых ивентов'
)
@router.callback_query(F.data == "even")
async def events(call: CallbackQuery):
await call.message.answer(
text='*| ИВЕНТЫ |*\n\n*"Чмокнуть"* - сделать кому-нибудь приятно\n*"Отмудохать"* - '
'выместить злость на кого-нибудь\n*"Число от ... до ..."* - случайное значение из '
'диапозона\n*"Подраться с ..."* - повод кого-нибудь побить\n*"Переведи (..) - ..."* '
'- перевод слова на керпичный язык\n*"Дай блять совет!"* - даёт рандомный охуенный '
'совет\n*"Переверни - ..."* - переворачивает слова в предложении\n*"Озвучь - ..."* '
'- озвучивает написанный текст'
)
# КОМАНДА ALL
@router.message(Command('all'))
async def every(message: Message):
if message.chat.id < 0:
upd_stat(message.from_user.id, message.chat.id, 3, message.from_user.first_name, True)
try:
await message.reply(
notice(du.get_user_id(message.from_user.id), True,
du.get_group_id(message.chat.id), message.from_user.id)
)
except Exception:
await message.reply('В группе состоит менее 3х человек, из-за чего команда не работает!')
else:
await message.reply('Эта команда предназначена для вызова в чате!')
# КОМАНДА RECOGNIZE
@router.message(Command("recognize"))
async def recognise(message: Message):
user_id = du.get_user_id(message.from_user.id)
if message.reply_to_message and message.reply_to_message.voice:
num = random.randint(1000, 9999)
audio = f"../data/voices/{user_id}_{num}.oga"
file_id = message.reply_to_message.voice.file_id
file = await bot.get_file(file_id)
file_path = file.file_path
await bot.download_file(file_path, audio)
data, samplerate = sf.read(audio)
os.remove(audio)
audio = f"../data/voices/{user_id}_{num}.wav"
sf.write(audio, data, samplerate)
af = sr.AudioFile(audio)
r = sr.Recognizer()
with af as source:
r.pause_threshold = 100
source = r.listen(source)
try:
mes = await bot.send_message(
chat_id=message.chat.id,
text="Распознавание.....",
reply_to_message_id=message.reply_to_message.message_id
)
try:
query = r.recognize_google(source, language='ru-RU')
os.remove(audio)
await mes.edit_text(f'*{message.reply_to_message.from_user.first_name} сказал(a)* "{query}"')
except Exception:
try:
os.remove(audio)
except Exception:
pass
await mes.edit_text("Распознать сообщение не удалось!")
except Exception as e:
print(repr(e))
pass
# СТАТИСТИКА ГРУППЫ
@router.message(Command('stat_group'))
async def stat_group(message: Message):
if message.chat.id < 0:
upd_stat(message.from_user.id, message.chat.id, 3, message.from_user.first_name, True)
group_id = du.get_group_id(message.chat.id)
group = db.stat_group(group_id)
month = db.month_stat_group(group_id)
await message.answer(
text=f'*| СТАТИСТИКА ГРУППЫ |*\n\n'
f'*>>* В целом сообщений *[ {message.message_id} ]*\n\n'
f'*- За всё время* / *За месяц -*\n'
f'*>>* Сообщений в базе *[ {group[0]} / {month[0]} ]*\n\n'
f'*>* Ответов *- [ {group[1]} / {month[1]} ]*\n'
f'*>* Команд *- [ {group[2]} / {month[2]} ]*\n'
f'*>* Ссылок *- [ {group[3]} / {month[3]} ]*\n'
f'*>* Стикеров *- [ {group[5]} / {month[5]} ]*\n'
f'*>* Медиа файлов *- [ {group[4]} / {month[4]} ]*\n'
f'*>* Голос/Кружочки *- [ {group[6]} / {month[6]} ]*'
)
# СТАТИСТИКА ПОЛЬЗОВАТЕЛЯ
@router.message(Command('stat_user'))
async def stat_user(message: Message):
if message.chat.id < 0:
upd_stat(message.from_user.id, message.chat.id, 3, message.from_user.first_name, True)
user_id, group_id = du.get_user_id(message.from_user.id), du.get_group_id(message.chat.id)
group = dg.stat_user(user_id, group_id)
month = dm.stat_user(user_id, group_id)
namer = "".join(re.sub(r'[^\w\s]', '', message.from_user.first_name).split())
name = morph.parse(namer)[0].inflect({"gent"})
if name is None:
name = namer
else:
name = name.word
await message.answer(
text=f'*| СТАТИСТИКА {name.upper()} |*\n\n'
f'*- За всё время* / *За месяц -*\n'
f'*>>* Сообщений в базе *[ {group[0]} / {month[0]} ]*\n\n'
f'*>* Ответов *- [ {group[1]} / {month[1]} ]*\n'
f'*>* Команд *- [ {group[2]} / {month[2]} ]*\n'
f'*>* Ссылок *- [ {group[3]} / {month[3]} ]*\n'
f'*>* Стикеров *- [ {group[5]} / {month[5]} ]*\n'
f'*>* Медиа файлов *- [ {group[4]} / {month[4]} ]*\n'
f'*>* Голос/Кружочки *- [ {group[6]} / {month[6]} ]*'
)
# РЕДАКТИРОВАНИЕ ИМЕНИ
@router.message(Command('edit'))
async def update(message: Message):
if message.chat.id < 0:
upd_stat(message.from_user.id, message.chat.id, 3, message.from_user.first_name, True)
user_id, group_id = du.get_user_id(message.from_user.id), du.get_group_id(message.chat.id)
text_edit = re.sub(r'[^\w\s]', '', message.text.lower()).split()
if len(text_edit) == 2:
name = morph.parse(text_edit[1])[0]
if name is None:
name = text_edit[1]
else:
name = name.normal_form
if name not in list(map(lambda x: x[0], dg.all_names(group_id))):
if not db.edit_user_exists(user_id):
db.add_edit_user(user_id)
dg.update_name(user_id, group_id, name)
await message.reply(f'{name.title()}, ваше имя было успешно изменено)')
else:
await message.reply(f'{message.from_user.first_name.lower().title()}, такое имя уже присутствует в чате!')
else:
await message.reply('Вы не правильно ввели имя! Имя должно быть '
'из одного слова и идти сразу после команды!')
# ВОЗВРАТ К ДИНАМИЧЕСКОМУ ИМЕНИ
@router.message(Command('back_edit'))
async def update_return(message: Message):
if message.chat.id < 0:
upd_stat(message.from_user.id, message.chat.id, 3, message.from_user.first_name, True)
user_id, group_id = du.get_user_id(message.from_user.id), du.get_group_id(message.chat.id)
name = re.sub(r'[^\w\s]', '', message.from_user.first_name.lower())
if db.edit_user_exists(user_id):
db.del_edit_user(user_id)
dg.update_name(user_id, group_id, name)
await message.reply(f'{name.title()}, вы успешно вернулись к динамическому изменению имени)')
else:
await message.reply(f'{name.title()}, вы не устанавливали постоянное имя!')
# ВКЛЮЧЕНИЕ ТЕКСТОВЫХ ИВЕНТОВ
@router.message(Command('start_bot'))
async def opening(message: Message):
if message.chat.id < 0:
upd_stat(message.from_user.id, message.chat.id, 3, message.from_user.first_name, True)
group_id = du.get_group_id(message.chat.id)
if db.check_status(group_id):
await message.answer("У вас уже включены текстовые ивенты!")
else:
db.update_status(group_id)
await message.answer("Текстовые ивенты включены!")
# ОТКЛЮЧЕНИЕ ТЕКСТОВЫХ ИВЕНТОВ
@router.message(Command('stop_bot'))
async def closing(message: Message):
if message.chat.id < 0:
upd_stat(message.from_user.id, message.chat.id, 3, message.from_user.first_name, True)
group_id = du.get_group_id(message.chat.id)
if db.check_status(group_id):
db.update_status(group_id)
await message.answer("Текстовые ивенты отключены!")
else:
await message.answer("У вас уже отключены текстовые ивенты!")
# УДАЛЕНИЕ ПОЛЬЗОВАТЕЛЕЙ АВТОМАТИЧЕСКИ
@router.message(F.content_type == ContentType.LEFT_CHAT_MEMBER)
async def delete(message: Message):
if du.user_exists(message.left_chat_member.id):
user_id, group_id = du.get_user_id(message.left_chat_member.id), du.get_group_id(message.chat.id)
dg.del_user(group_id, user_id)
if db.group_exists_month(group_id):
if dm.user_exists(user_id, group_id):
dm.del_user(group_id, user_id)
try:
await message.delete()
except Exception:
return
# УДАЛЕНИЕ ТЕХНИЧЕСКИХ СООБЩЕНИЙ
@router.message(F.content_type.in_([
ContentType.NEW_CHAT_TITLE,
ContentType.NEW_CHAT_PHOTO,
ContentType.PINNED_MESSAGE,
ContentType.VIDEO_CHAT_ENDED,
ContentType.VIDEO_CHAT_PARTICIPANTS_INVITED
]))
async def chat_events(message: Message):
try:
await message.delete()
except Exception:
return
# МЕДИА ФАЙЛЫ
@router.message(F.content_type.in_([
ContentType.LOCATION,
ContentType.CONTACT,
ContentType.VIDEO,
ContentType.PHOTO,
ContentType.AUDIO,
ContentType.DOCUMENT
]))
async def media(message: Message):
if message.chat.id < 0:
upd_stat(message.from_user.id, message.chat.id, 5, message.from_user.first_name, True)
# ГОЛОСОВЫЕ СООБЩЕНИЯ И КРУЖОЧКИ
@router.message(F.content_type.in_([ContentType.VOICE, ContentType.VIDEO_NOTE]))
async def voice(message: Message):
if message.chat.id < 0:
upd_stat(message.from_user.id, message.chat.id, 7, message.from_user.first_name, True)
if message.voice and message.chat.id < 0:
if message.voice.duration <= 60:
user_id = du.get_user_id(message.from_user.id)
num = random.randint(1000, 9999)
audio = f"../data/voices/{user_id}_{num}.oga"
file_id = message.voice.file_id
file = await bot.get_file(file_id)
file_path = file.file_path
await bot.download_file(file_path, audio)
data, samplerate = sf.read(audio)
os.remove(audio)
audio = f"../data/voices/{user_id}_{num}.wav"
sf.write(audio, data, samplerate)
af = sr.AudioFile(audio)
r = sr.Recognizer()
with af as source:
r.pause_threshold = 100
source = r.listen(source)
try:
query = r.recognize_google(source, language='ru-RU')
os.remove(audio)
group_id = message.chat.id
unsigned = re.sub(r'[^\w\s]', '', query.lower()).split()
first_form = [morph.parse(i)[0].normal_form for i in unsigned]
names = [_ for _ in first_form if _ in list(map(lambda x: x[0], dg.all_names(du.get_group_id(group_id))))]
if names:
await message.reply(notice(names, False, du.get_group_id(group_id), message.from_user.id))
return
except Exception:
try:
os.remove(audio)
except Exception:
pass
# СТИКЕРЫ
@router.message(F.content_type == ContentType.STICKER)
async def stick(message: Message):
if message.chat.id < 0:
upd_stat(message.from_user.id, message.chat.id, 6, message.from_user.first_name, True)
# ТЕКСТОВЫЕ ИВЕНТЫ
@router.message(F.content_type == ContentType.TEXT)
async def send_events(message: Message):
group_id, user_id, name = message.chat.id, message.from_user.id, message.from_user.first_name
# ОБНОВЛЕНИЕ СТАТИСТИКИ
if message.chat.id < 0:
upd_stat(user_id, group_id, 1, name) # СООБЩЕНИЙ
if len(message.text) > 1 and message.text[0] == '/':
upd_stat(user_id, group_id, 3, name) # КОМАНД
if message.reply_to_message:
upd_stat(user_id, group_id, 2, name) # ОТВЕТОВ НА СООБЩЕНИЯ
# ПЕРЕМЕННЫЕ
low_mes = message.text.lower()
words = message.text.split()
unsigned = re.sub(r'[^\w\s]', '', low_mes).split()
first_form = [morph.parse(i)[0].normal_form for i in unsigned]
if message.chat.id < 0:
names = [_ for _ in first_form if _ in list(map(lambda x: x[0], dg.all_names(du.get_group_id(group_id))))]
if len(words) >= 2:
# ПОЛЕЗНЫЕ ФУНКЦИИ
if len(words) == 5 and "число от" in low_mes:
try:
num1, num2 = int(low_mes.split()[2]), int(low_mes.split()[4])
await message.answer(
text=f"Число *[ {random.randint(num1, num2)} ]*"
)
except ValueError:
pass
return
if unsigned[0] == 'переведи':
if "переведи - " in low_mes:
await message.answer(lang_form([words[_] for _ in range(len(words)) if _ > 1]))
elif f"переведи ({unsigned[1]}) - " in low_mes:
if len(unsigned[1]) == 1:
await message.answer(lang_form([words[_] for _ in range(len(words)) if _ > 2], unsigned[1]))
return
if unsigned[0] == 'переверни':
if "переверни - " in low_mes:
await message.answer(revers(message.text[12:], True))
elif "переверни полностью - " in low_mes:
await message.answer(revers(message.text[22:], False))
return
if unsigned[0] == 'озвучь' and "озвучь - " in low_mes:
text_to_voice = message.text[9:]
tts = gTTS(text_to_voice, lang='ru')
tts.save('../data/voices/voice.ogg')
await bot.send_voice(
chat_id=group_id,
voice=FSInputFile('../data/voices/voice.ogg'),
caption=f"*{text_to_voice}*"
)
os.remove('../data/voices/voice.ogg')
return
# РАНДОМ ИВЕНТЫ
if "подраться" == words[0].lower() and "с" == words[1].lower():
text = " ".join([words[i] for i in range(len(words)) if i > 1])
if random.randint(0, 1) == 0:
await message.answer_photo(
photo=FSInputFile(f"../data/fight/({random.randint(1, 8)}).jpg"),
caption=f"{message.from_user.first_name}, ты был унижен {text.title()}"
f", с помощью {base.VAR_LOSE[random.randint(0, 3)]}"
)
else:
await message.answer_photo(
photo=FSInputFile(f"../data/fight/({random.randint(1, 8)}).jpg"),
caption=f"{message.from_user.first_name}, ты победил в драке "
f"с {text.title()}, {base.VAR_WIN[random.randint(0, 1)]}"
)
# ИВЕНТ ВЗАИМОДЕЙСТВИЯ
for word in base.TMOK_LIST:
if word in low_mes:
lst = [words[i] for i in range(len(words)) if i != 0]
text = " ".join(lst)
slv = morph.parse(words[0].lower())[0]
await message.answer_photo(
photo=FSInputFile(f"../data/tmok/({random.randint(1, 4)}).jpg"),
caption=f"{message.from_user.first_name} "
f"{slv.inflect({'past', 'sing', 'indc'}).word} {text}"
)
for word in base.KILL_LIST:
if word in low_mes:
lst = [words[i] for i in range(len(words)) if i != 0]
text = " ".join(lst)
slv = morph.parse(words[0].lower())[0]
await message.answer_photo(
photo=FSInputFile(f"../data/kill/({random.randint(1, 6)}).jpg"),
caption=f"{message.from_user.first_name} "
f"{slv.inflect({'past', 'sing', 'indc'}).word} {text}"
)
if (unsigned[0] in base.QUAT_LIST[0] and
unsigned[1] in base.QUAT_LIST[1] and
unsigned[2] in base.QUAT_LIST[2]):
try:
word = requests.get('http://fucking-great-advice.ru/api/random').json()
await message.reply(word["text"])
except:
pass
return
if message.chat.id < 0:
# УПОМИНАНИЯ ПО ИМЕНАМ
if names:
try:
await message.reply(notice(names, False, du.get_group_id(group_id), user_id))
except Exception:
pass
return
# ПЕРЕВОДЧИК СЛОВ
if (checker([i for i in low_mes], words, group_id, user_id, name.lower()) == len(low_mes) and
not any([engl_dict.check(i) for i in unsigned if len(i) > 1])):
await message.reply(
f"[{message.from_user.first_name}](tg://user_id?id={user_id}) *>* {translator(words)}"
)
return

21
code/init.py Normal file
View file

@ -0,0 +1,21 @@
from aiogram import Bot, Dispatcher
from aiogram.enums.parse_mode import ParseMode
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.client.bot import DefaultBotProperties
import base, enchant, pymorphy3
bot = Bot(token=base.TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.MARKDOWN))
dp = Dispatcher(storage=MemoryStorage())
morph = pymorphy3.MorphAnalyzer()
engl_dict = enchant.Dict("en_US")
import sql
db = sql.Base('../db/base.db')
du = sql.User('../db/users.db')
dg = sql.Group('../db/groups.db')
dm = sql.Month('../db/month.db')

170
code/script.py Normal file
View file

@ -0,0 +1,170 @@
from base import ERR, ERR_, TRU, TRU_, ALB
from string import digits, punctuation
import re, sql
# инициализируем соединение с БД
db = sql.Base('../db/base.db')
du = sql.User('../db/users.db')
dg = sql.Group('../db/groups.db')
dm = sql.Month('../db/month.db')
def upd_stat(user_id, group_id, var_id, name, mes=False):
name = re.sub(r'[^\w\s]', '', name.lower())
if not du.user_exists(user_id):
du.add_user(user_id)
user_id = du.get_user_id(user_id)
if not du.group_exists(group_id):
du.add_group(group_id)
group_id = du.get_group_id(group_id)
if not db.group_exists(group_id):
db.add_group(group_id)
dg.created_group(group_id)
if not dg.user_exists(user_id, group_id):
dg.add_user(group_id, user_id, name)
db.update_stat(group_id, var_id)
dg.update_stat(user_id, group_id, var_id)
if not db.group_exists_month(group_id):
db.add_group_month(group_id)
dm.created_group(group_id)
dm.add_user(group_id, user_id)
else:
if not dm.user_exists(user_id, group_id):
dm.add_user(group_id, user_id)
db.update_month_stat(group_id, var_id)
dm.update_stat(user_id, group_id, var_id)
if mes:
db.update_month_stat(group_id, 1)
dg.update_stat(user_id, group_id, 1)
dm.update_stat(user_id, group_id, 1)
db.update_stat(group_id, 1)
if not db.edit_user_exists(user_id):
dg.update_name(user_id, group_id, name)
def lang_form(text, smbl='г'):
for i in range(len(text)):
word = [str(j) + str(smbl) + str(j).lower() if j in ALB else str(j) for j in text[i]]
text[i] = ''.join(word)
return ' '.join(text)
def notice(name, all_users, group_id, author):
names = [i[0] for i in dg.all_names(group_id)]
ids = [du.get_first_user_id(i[0]) for i in dg.all_ids(group_id)]
if all_users:
name = dg.user_name(name, group_id)
usr = [f"[{names[i].title()}](tg://user?id={str(ids[i])})" for i in range(len(names)) if names[i] != name.lower()]
usr.append(f'{usr[-2]} и {usr[-1]}')
del usr[-2], usr[-2]
return f'{", ".join(usr)} вас вызывает {name.title()}'
else:
no_copy = []
for i in name:
if i not in no_copy:
no_copy.append(i)
if len(no_copy) > 1: # ПЕРЕПИСАТЬ КОД ГАВНА КУСОК
usr = [f"[{_.title()}](tg://user?id={str(ids[names.index(_)])})" for _ in no_copy if int(author) != ids[names.index(_)]]
usr.append(f'{usr[-2]} и {usr[-1]}')
del usr[-2], usr[-2]
return f"{', '.join(usr)} вас упомянули)"
else:
usr = f"[{no_copy[0].title()}](tg://user?id={str(ids[names.index(no_copy[0])])})"
return f"{usr}, тебя упомянули)"
def checker(signs, words, group_id, user_id, name):
if len([z for z in signs if (z in digits) or (z in punctuation)]) == len(signs) or signs[0] == '/':
return int('-1')
if len(signs) > 4:
for x in range(3, len(signs)):
http = "".join([signs[x - 3], signs[x - 2], signs[x - 1], signs[x]])
if 'http' in http:
upd_stat(user_id, group_id, 4, name)
return int('-1')
count, raw = 0, 0
for word in words:
for symbol in word:
raw += 1
if (symbol in ERR) or (symbol in ERR_) or (symbol in digits):
count += 1
return len(signs) - raw + count
def translator(words):
itg = []
for word in words:
raw_word = []
for symbol in word:
if symbol in ERR:
count = 0
for _ in ERR:
if symbol == _:
raw_word.append(TRU[count])
break
count += 1
elif symbol in ERR_:
count = 0
for _ in ERR_:
if symbol == _:
raw_word.append(TRU_[count])
break
count += 1
itg.append("".join(raw_word))
return " ".join(itg)
def revers(message, var):
no_pct = re.sub(r'[^\w\s]', '', message)
if var:
sml, pnc, pct, prf = [i for i in message] + [str(0)], [], [], False
for i in sml:
if i in punctuation or i == ' ':
pnc.append(i)
prf, flag = True if sml.index(i) == 0 else False, False
else:
flag = True
if flag and len(pnc) > 0:
pct.append(''.join(pnc))
pnc = []
wrd, rev, lst, txt = no_pct.split(), [], [], []
for i in wrd:
word, up, itg = [i[-1 - l].lower() for l in range(len(i))], [], []
for _ in i:
up.append(True if _ in TRU_ else False)
for j in range(len(up)):
itg.append(word[j].upper() if up[j] else word[j])
rev.append(''.join(itg))
if prf:
for i in range(len(pct)):
txt = (txt + [pct[i]]) if (i + 1) == len(pct) and len(pct) > len(rev) else (txt + [pct[i], rev[i]])
return ''.join(txt)
else:
for i in range(len(rev)):
if i > 0:
txt.append(pct[i - 1])
txt.append(rev[i])
if (i + 1) == len(rev) and len(pct) == len(rev):
txt.append(pct[i])
return ''.join(txt)
else:
sml = [i for i in message]
return ''.join([sml[-1 - i] for i in range(len(sml))])

389
code/sql.py Normal file
View file

@ -0,0 +1,389 @@
import sqlite3
class Base:
def __init__(self, database):
"""Подключаемся к БД и сохраняем курсор соединения"""
self.connection = sqlite3.connect(database)
self.cursor = self.connection.cursor()
# КОМАНДЫ
def add_group(self, group_id):
"""Добавляем нового пользователя"""
with self.connection:
self.cursor.execute(f"INSERT INTO `work` (`group_id`) VALUES(?)", (group_id,))
self.cursor.execute(f"INSERT INTO `stat` (`group_id`, `mes`, `rep`, `com`, `url`, `med`, `sti`, "
f"`voi`) VALUES(?,?,?,?,?,?,?,?)", (group_id, 0, 0, 0, 0, 0, 0, 0))
return
def group_exists(self, group_id):
"""Проверяем, есть ли уже пользователь в базе"""
with self.connection:
result = self.cursor.execute(f'SELECT * FROM `work` WHERE `group_id` = ?', (group_id,)).fetchall()
return bool(len(result))
# ТАБЛИЦА STAT
def update_stat(self, group_id, var_id):
"""Обновляем статистику"""
try:
with self.connection:
self.cursor.execute(f"SELECT * FROM `stat` WHERE `group_id` = ?", (group_id,))
if var_id == 1:
return self.cursor.execute(f"UPDATE `stat` SET `mes` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[1] + 1), group_id))
elif var_id == 2:
return self.cursor.execute(f"UPDATE `stat` SET `rep` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[2] + 1), group_id))
elif var_id == 3:
return self.cursor.execute(f"UPDATE `stat` SET `com` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[3] + 1), group_id))
elif var_id == 4:
return self.cursor.execute(f"UPDATE `stat` SET `url` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[4] + 1), group_id))
elif var_id == 5:
return self.cursor.execute(f"UPDATE `stat` SET `med` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[5] + 1), group_id))
elif var_id == 6:
return self.cursor.execute(f"UPDATE `stat` SET `sti` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[6] + 1), group_id))
elif var_id == 7:
return self.cursor.execute(f"UPDATE `stat` SET `voi` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[7] + 1), group_id))
except Exception as e:
print(repr(e))
def stat_group(self, group_id):
"""Получение данных пользователя"""
with self.connection:
self.cursor.execute(f"SELECT * FROM `stat` WHERE `group_id` = ?", (group_id,))
data = self.cursor.fetchone()
return data[1:]
# ТАБЛИЦА MONTH
def group_exists_month(self, group_id):
"""Проверяем, есть ли уже пользователь в базе"""
with self.connection:
result = self.cursor.execute('SELECT * FROM `month` WHERE `group_id` = ?', (group_id,)).fetchall()
return bool(len(result))
def add_group_month(self, group_id):
"""Добавляем нового пользователя"""
with self.connection:
return self.cursor.execute("INSERT INTO `month` (`group_id`, `mes`, `rep`, `com`, `url`, `med`, `sti`, "
"`voi`) VALUES(?,?,?,?,?,?,?,?)", (group_id, 0, 0, 0, 0, 0, 0, 0))
def update_month_stat(self, group_id, var_id):
"""Обновляем статистику"""
try:
with self.connection:
self.cursor.execute(f"SELECT * FROM `month` WHERE `group_id` = ?", (group_id,))
if var_id == 1:
return self.cursor.execute(f"UPDATE `month` SET `mes` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[1] + 1), group_id))
elif var_id == 2:
return self.cursor.execute(f"UPDATE `month` SET `rep` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[2] + 1), group_id))
elif var_id == 3:
return self.cursor.execute(f"UPDATE `month` SET `com` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[3] + 1), group_id))
elif var_id == 4:
return self.cursor.execute(f"UPDATE `month` SET `url` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[4] + 1), group_id))
elif var_id == 5:
return self.cursor.execute(f"UPDATE `month` SET `med` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[5] + 1), group_id))
elif var_id == 6:
return self.cursor.execute(f"UPDATE `month` SET `sti` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[6] + 1), group_id))
elif var_id == 7:
return self.cursor.execute(f"UPDATE `month` SET `voi` = ? WHERE `group_id` = ?",
((self.cursor.fetchone()[7] + 1), group_id))
except Exception as e:
print(repr(e))
def month_stat_group(self, group_id):
"""Получение данных пользователя"""
with self.connection:
self.cursor.execute(f"SELECT * FROM `month` WHERE `group_id` = ?", (group_id,))
data = self.cursor.fetchone()
return data[1:]
# ТАБЛИЦА EDIT
def add_edit_user(self, user_id):
"""Добавляем нового пользователя"""
with self.connection:
return self.cursor.execute("INSERT INTO `edit` (`user_id`) VALUES(?)", (user_id,))
def edit_user_exists(self, user_id):
"""Проверяем, есть ли уже пользователь в базе"""
with self.connection:
result = self.cursor.execute('SELECT * FROM `edit` WHERE `user_id` = ?', (user_id,)).fetchall()
return bool(len(result))
def del_edit_user(self, user_id):
"""Удаление пользователя"""
with self.connection:
return self.cursor.execute(f'DELETE FROM `edit` WHERE `user_id` = ?', (user_id,))
# ТАБЛИЦА WORK
def check_status(self, group_id):
"""Получаем статус"""
with self.connection:
return self.cursor.execute(f'SELECT `state` FROM `work` WHERE `group_id` = ?', (group_id,)).fetchone()[0]
def update_status(self, group_id):
"""Обновляем статус"""
with self.connection:
state = self.cursor.execute(f'SELECT `state` FROM `work` WHERE `group_id` = ?', (group_id,)).fetchone()[0]
return self.cursor.execute(f"UPDATE `work` SET `state` = ? WHERE `group_id` = ?", (not state, group_id))
# ЗАКРЫТИЕ ВЫЗОВА
def close(self):
"""Закрываем соединение с БД"""
self.connection.close()
class User:
def __init__(self, database):
"""Подключаемся к БД и сохраняем курсор соединения"""
self.connection = sqlite3.connect(database)
self.cursor = self.connection.cursor()
# СВЯЗКА ПОЛЬЗОВАТЕЛЯ
def user_exists(self, user_id):
"""Проверяем, есть ли уже пользователь в базе"""
with self.connection:
result = self.cursor.execute(f'SELECT * FROM `users` WHERE `user_id` = ?', (user_id,)).fetchall()
return bool(len(result))
def add_user(self, user_id):
"""Добавляем нового пользователя"""
with self.connection:
return self.cursor.execute(f"INSERT INTO `users` (`user_id`) VALUES(?)", (user_id,))
def get_user_id(self, user_id):
"""Получаем короткое айди юзера"""
with self.connection:
return self.cursor.execute(f'SELECT `id` FROM `users` WHERE `user_id` = ?', (user_id,)).fetchone()[0]
def get_first_user_id(self, user_id):
"""Получаем длинное айди юзера"""
with self.connection:
return self.cursor.execute(f'SELECT `user_id` FROM `users` WHERE `id` = ?', (user_id,)).fetchone()[0]
# СВЯЗКА ГРУППЫ
def group_exists(self, group_id):
"""Проверяем, есть ли уже группа в базе"""
with self.connection:
result = self.cursor.execute('SELECT * FROM `groups` WHERE `group_id` = ?', (group_id,)).fetchall()
return bool(len(result))
def add_group(self, group_id):
"""Добавляем новую группу в таблицу"""
with self.connection:
return self.cursor.execute("INSERT INTO `groups` (`group_id`) VALUES(?)", (group_id,))
def get_group_id(self, group_id):
"""Получаем короткое айди юзера"""
with self.connection:
return self.cursor.execute(f'SELECT `id` FROM `groups` WHERE `group_id` = ?', (group_id,)).fetchone()[0]
def get_first_group_id(self, group_id):
"""Получаем длинное айди юзера"""
with self.connection:
return self.cursor.execute(f'SELECT `group_id` FROM `groups` WHERE `id` = ?', (group_id,)).fetchone()[0]
def update_group_id(self, from_id, to_id):
"""Заменяем на новый айди"""
with self.connection:
return self.cursor.execute("UPDATE `groups` SET `group_id` = ? WHERE `id` = ?", (to_id, from_id))
# ЗАКРЫТИЕ ВЫЗОВА
def close(self):
"""Закрываем соединение с БД"""
self.connection.close()
class Group:
def __init__(self, database):
"""Подключаемся к БД и сохраняем курсор соединения"""
self.connection = sqlite3.connect(database)
self.cursor = self.connection.cursor()
# КОМАНДЫ
def created_group(self, group_id):
"""Создаём новую таблицу"""
with self.connection:
return self.cursor.execute(f"""CREATE TABLE IF NOT EXISTS [{group_id}] (
user_id INTEGER NOT NULL,
first_name STRING,
mes INTEGER,
rep INTEGER,
com INTEGER,
url INTEGER,
med INTEGER,
sti INTEGER,
voi INTEGER);""")
def add_user(self, group_id, user_id, name):
"""Добавляем нового пользователя"""
with self.connection:
return self.cursor.execute(f"INSERT INTO `{group_id}` (`user_id`, `first_name`, `mes`, `rep`, `com`, `url`, "
f"`med`, `sti`, `voi`) VALUES(?,?,?,?,?,?,?,?,?)", (user_id, name, 0, 0, 0, 0, 0,
0, 0))
def all_names(self, group_id):
"""Список имён"""
with self.connection:
return self.cursor.execute(f'SELECT `first_name` FROM `{group_id}`').fetchall()
def all_ids(self, group_id):
"""Список айди"""
with self.connection:
return self.cursor.execute(f'SELECT `user_id` FROM `{group_id}`').fetchall()
def update_stat(self, user_id, group_id, var_id):
"""Обновляем статистику"""
try:
with self.connection:
self.cursor.execute(f"SELECT * FROM `{group_id}` WHERE `user_id` = ?", (user_id,))
if var_id == 1:
return self.cursor.execute(f"UPDATE `{group_id}` SET `mes` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[2] + 1), user_id))
elif var_id == 2:
return self.cursor.execute(f"UPDATE `{group_id}` SET `rep` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[3] + 1), user_id))
elif var_id == 3:
return self.cursor.execute(f"UPDATE `{group_id}` SET `com` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[4] + 1), user_id))
elif var_id == 4:
return self.cursor.execute(f"UPDATE `{group_id}` SET `url` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[5] + 1), user_id))
elif var_id == 5:
return self.cursor.execute(f"UPDATE `{group_id}` SET `med` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[6] + 1), user_id))
elif var_id == 6:
return self.cursor.execute(f"UPDATE `{group_id}` SET `sti` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[7] + 1), user_id))
elif var_id == 7:
return self.cursor.execute(f"UPDATE `{group_id}` SET `voi` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[8] + 1), user_id))
except Exception as e:
print(repr(e))
def user_exists(self, user_id, group_id):
"""Проверяем, есть ли уже пользователь в базе"""
with self.connection:
result = self.cursor.execute(f'SELECT * FROM `{group_id}` WHERE `user_id` = ?', (user_id,)).fetchall()
return bool(len(result))
def update_name(self, user_id, group_id, name):
"""Обновляем имя пользователя"""
with self.connection:
return self.cursor.execute(f"UPDATE `{group_id}` SET `first_name` = ? WHERE `user_id` = ?", (name, user_id))
def user_name(self, user_id, group_id):
"""Получаем имя пользователя по айди"""
with self.connection:
return self.cursor.execute(f"SELECT `first_name` FROM `{group_id}` WHERE `user_id` = ?", (user_id,)).fetchone()[0]
def stat_user(self, user_id, group_id):
"""Получение данных пользователя"""
with self.connection:
self.cursor.execute(f"SELECT * FROM `{group_id}` WHERE `user_id` = ?", (user_id,))
data = self.cursor.fetchone()
return data[2:]
def del_user(self, group_id, user_id):
"""Удаление пользователя"""
with self.connection:
return self.cursor.execute(f'DELETE FROM `{group_id}` WHERE `user_id` = ?', (user_id,))
# ЗАКРЫТИЕ ВЫЗОВА
def close(self):
"""Закрываем соединение с БД"""
self.connection.close()
class Month:
def __init__(self, database):
"""Подключаемся к БД и сохраняем курсор соединения"""
self.connection = sqlite3.connect(database)
self.cursor = self.connection.cursor()
# КОМАНДЫ
def update_stat(self, user_id, group_id, var_id):
"""Обновляем статистику"""
try:
with self.connection:
self.cursor.execute(f"SELECT * FROM `{group_id}` WHERE `user_id` = ?", (user_id,))
if var_id == 1:
return self.cursor.execute(f"UPDATE `{group_id}` SET `mes` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[1] + 1), user_id))
elif var_id == 2:
return self.cursor.execute(f"UPDATE `{group_id}` SET `rep` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[2] + 1), user_id))
elif var_id == 3:
return self.cursor.execute(f"UPDATE `{group_id}` SET `com` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[3] + 1), user_id))
elif var_id == 4:
return self.cursor.execute(f"UPDATE `{group_id}` SET `url` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[4] + 1), user_id))
elif var_id == 5:
return self.cursor.execute(f"UPDATE `{group_id}` SET `med` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[5] + 1), user_id))
elif var_id == 6:
return self.cursor.execute(f"UPDATE `{group_id}` SET `sti` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[6] + 1), user_id))
elif var_id == 7:
return self.cursor.execute(f"UPDATE `{group_id}` SET `voi` = ? WHERE `user_id` = ?",
((self.cursor.fetchone()[7] + 1), user_id))
except Exception as e:
print(repr(e))
def created_group(self, group_id):
"""Создаём новую таблицу"""
with self.connection:
return self.cursor.execute(f"""CREATE TABLE IF NOT EXISTS [{group_id}] (
user_id INTEGER NOT NULL,
mes INTEGER,
rep INTEGER,
com INTEGER,
url INTEGER,
med INTEGER,
sti INTEGER,
voi INTEGER);""")
def add_user(self, group_id, user_id):
"""Добавляем нового пользователя"""
with self.connection:
return self.cursor.execute(f"INSERT INTO `{group_id}` (`user_id`, `mes`, `rep`, `com`, `url`, `med`, "
f"`sti`, `voi`) VALUES(?,?,?,?,?,?,?,?)", (user_id, 0, 0, 0, 0, 0, 0, 0))
def all_ids(self, group_id):
"""Список айди"""
with self.connection:
return self.cursor.execute(f'SELECT `user_id` FROM `{group_id}`').fetchall()
def user_exists(self, user_id, group_id):
"""Проверяем, есть ли уже пользователь в базе"""
with self.connection:
result = self.cursor.execute(f'SELECT * FROM `{group_id}` WHERE `user_id` = ?', (user_id,)).fetchall()
return bool(len(result))
def stat_user(self, user_id, group_id):
"""Получение данных пользователя"""
with self.connection:
self.cursor.execute(f"SELECT * FROM `{group_id}` WHERE `user_id` = ?", (user_id,))
data = self.cursor.fetchone()
return data[1:]
def del_user(self, group_id, user_id):
"""Удаление пользователя"""
with self.connection:
return self.cursor.execute(f'DELETE FROM `{group_id}` WHERE `user_id` = ?', (user_id,))
# ЗАКРЫТИЕ ВЫЗОВА
def close(self):
"""Закрываем соединение с БД"""
self.connection.close()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 295 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 135 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 69 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 10 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 103 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

BIN
data/voices/1_7047.oga Normal file

Binary file not shown.

Binary file not shown.

0
db/groups.db Normal file
View file

0
db/month.db Normal file
View file

BIN
db/users.db Normal file

Binary file not shown.

View file

@ -1,3 +0,0 @@
libraries:
pip install random2, pymorphy2, requests, emoji, re, aiogram

View file

@ -1 +0,0 @@
0

34
requirements.txt Normal file
View file

@ -0,0 +1,34 @@
# Core Framework Dependencies
aiogram==3.10.0
asyncio-mqtt==0.16.1
# Database Management
sqlite3
# Natural Language Processing
pymorphy3==1.2.1
pymorphy3-dicts-ru==2.4.417150.4580142
enchant==1.6.6
# Speech Recognition & Synthesis
SpeechRecognition==3.10.4
gTTS==2.5.1
soundfile==0.12.1
# HTTP Client for API Integration
requests==2.31.0
# Audio Processing Dependencies
pyaudio==0.2.14
pydub==0.25.1
# System Dependencies (if needed)
# Note: Some packages may require system-level installation
# Ubuntu/Debian: sudo apt-get install python3-enchant libenchant-2-2
# macOS: brew install enchant portaudio
# Windows: Download pyaudio wheel from unofficial binaries
# Development Dependencies (Optional)
# pytest==8.2.2
# black==24.4.2
# flake8==7.1.0

180
script.py
View file

@ -1,180 +0,0 @@
from base import ERR, ERR_, TRU, TRU_, ALB
from string import digits, punctuation
from datetime import date, datetime
from sql import SQLighter
import re
db = SQLighter('groups.db')
def lang_form(text, smbl='г'):
for i in range(len(text)):
word = []
for j in text[i]:
if j in ALB:
word.append(str(str(j) + str(smbl) + str(j).lower()))
else:
word.append(str(j))
text[i] = ''.join(word)
return ' '.join(text)
def times():
today = datetime.now().date()
sel = date(2021, 2, 2)
ext = date(2023, 10, 3)
itg = str((ext - today).days)
if int(itg[-1]) == 1:
return f'{itg} день)'
elif 1 < int(itg[-1]) < 5:
return f'{itg} дня)'
else:
return f'{itg} дней)'
def autozak(name):
num = str(len(name))
if int(num[-1]) == 1:
return f'{num} минута.'
elif 1 < int(num[-1]) < 5:
return f'{num} минуты.'
else:
return f'{num} минут.'
def update_stat(var, message, main=False):
db.statistics_group(message.chat.id, var)
db.user_statistic(message.from_user.id, message.chat.id, var)
if main:
db.statistics_group(message.chat.id, 1)
db.user_statistic(message.from_user.id, message.chat.id, 1)
return
def notice(name, all_users, id_group, id_var):
names = db.name_lst(id_group)
ids, no_copy = db.id_lst(id_group), []
if all_users:
usr = ["<a href=\""+'tg://user?id='+ids[i]+"\">"+names[i].title()+"</a>" for i in range(len(names))
if names[i] != name.lower()]
usr.append(f'{usr[-2]} и {usr[-1]}')
del usr[-2], usr[-2]
return f'{", ".join(usr)} вас вызывает {name}'
else:
for i in name:
if i not in no_copy:
no_copy.append(i)
if len(no_copy) > 1:
usr = ["<a href=\""+'tg://user?id='+ids[names.index(_)]+"\">"+_.title()+"</a>" for _ in no_copy
if id_var != ids[names.index(_)]]
usr.append(f'{usr[-2]} и {usr[-1]}')
del usr[-2], usr[-2]
return f"{', '.join(usr)} вас упомянули)"
else:
usr = "<a href=\""+'tg://user?id='+ids[names.index(no_copy[0])].title()+"\">"+no_copy[0].title()+"</a>"
return f"{usr}, тебя упомянули)"
def cheker(bk, original, id_group, user):
flag = True
if flag is True:
if (len([z for z in bk if (z in digits) or (z in punctuation)]) == len(bk)) or (bk[0] == '/'):
flag = False
if (flag is True) and (len(bk) > 4):
for x in range(3, len(bk)):
http = "".join([bk[x - 3], bk[x - 2], bk[x - 1], bk[x]])
if 'http' in http:
db.statistics_group(id_group, 4)
db.user_statistic(user, id_group, 4)
flag = False
break
if flag is False:
if 'tiktok' in "".join(bk).split('.'):
db.statistics_group(id_group, 5)
db.user_statistic(user, id_group, 5)
if flag:
count, raw = 0, 0
for word in original:
for smvl in word:
raw += 1
if (smvl in ERR) or (smvl in ERR_) or (smvl in digits):
count += 1
return len(bk) - raw + count
else:
return int('-1')
def translator(original):
itg = []
for word in original:
raw_word = []
for smvl in word:
if smvl in ERR:
count = 0
for _ in ERR:
if smvl == _:
raw_word.append(TRU[count])
break
count += 1
elif smvl in ERR_:
count = 0
for _ in ERR_:
if smvl == _:
raw_word.append(TRU_[count])
break
count += 1
elif smvl in NUM:
raw_word.append(smvl)
itg.append("".join(raw_word))
return " ".join(itg)
def revers(message, var):
no_pct = re.sub(r'[^\w\s]', '', message)
if var:
sml, pnc, pct, prf = [i for i in message] + [str(0)], [], [], False
for i in sml:
if i in punctuation or i == ' ':
pnc.append(i)
prf, flag = True if sml.index(i) == 0 else False, False
else:
flag = True
if flag and len(pnc) > 0:
pct.append(''.join(pnc))
pnc = []
wrd, rev, lst, txt = no_pct.split(), [], [], []
for i in wrd:
word, up, itg = [i[-1 - l].lower() for l in range(len(i))], [], []
for _ in i:
up.append(True if _ in TRU_ else False)
for j in range(len(up)):
itg.append(word[j].upper() if up[j] else word[j])
rev.append(''.join(itg))
if prf:
for i in range(len(pct)):
txt = (txt + [pct[i]]) if (i + 1) == len(pct) and len(pct) > len(rev) else (txt + [pct[i], rev[i]])
return ''.join(txt)
else:
for i in range(len(rev)):
if i > 0:
txt.append(pct[i - 1])
txt.append(rev[i])
if (i + 1) == len(rev) and len(pct) == len(rev):
txt.append(pct[i])
return ''.join(txt)
else:
sml = [i for i in message]
return ''.join([sml[-1 - i] for i in range(len(sml))])

263
sql.py
View file

@ -1,263 +0,0 @@
import sqlite3
class SQLighter:
def __init__(self, database):
"""Подключаемся к БД и сохраняем курсор соединения"""
self.connection = sqlite3.connect(database)
self.cursor = self.connection.cursor()
self.cursor.execute("PRAGMA temp_store = 2")
# СВЯЗКА ГРУППЫ
def group_exists(self, id_group):
"""Проверяем, есть ли уже группа в базе"""
with self.connection:
result = self.cursor.execute('SELECT * FROM `group` WHERE `id_group` = ?', (id_group,)).fetchall()
return bool(len(result))
def add_group(self, id_group, status=True):
"""Добавляем новую группу в таблицу"""
with self.connection:
return self.cursor.execute("INSERT INTO `group` (`id_group`, `status`, `time`, `message`, `reply`, "
"`command`, `url`, `tik_tok`, `media`, `sticker`, `voice`, `bool`, `cool`) "
"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)",
(id_group, status, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))
def add_month_group(self, id_group):
"""Добавляем новую группу в таблицу месяца"""
with self.connection:
return self.cursor.execute("INSERT INTO `month` (`id_group`, `message`, `reply`, `command`, `url`, "
"`tik_tok`, `media`, `sticker`, `voice`, `bool`, `cool`) "
"VALUES(?,?,?,?,?,?,?,?,?,?,?)", (id_group, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))
def created_group(self, id_group):
"""Создаём новую таблицу"""
with self.connection:
return self.cursor.execute(f"""CREATE TABLE IF NOT EXISTS [{id_group}] (
id INTEGER PRIMARY KEY AUTOINCREMENT,
id_user VARCHAR (255) NOT NULL,
first_name TEXT (40) NOT NULL,
edit BOOLEAN NOT NULL
DEFAULT (FALSE),
time INTEGER NOT NULL
DEFAULT (1),
message INTEGER NOT NULL,
reply INTEGER NOT NULL,
command INTEGER NOT NULL,
url INTEGER NOT NULL,
tik_tok INTEGER NOT NULL,
media INTEGER NOT NULL,
sticker INTEGER NOT NULL,
voice INTEGER NOT NULL,
bool INTEGER NOT NULL,
cool INTEGER NOT NULL
);""")
def update_status_group(self, id_group, status):
"""Обновляем статус группы"""
with self.connection:
return self.cursor.execute("UPDATE `group` SET `status` = ? WHERE `id_group` = ?", (status, id_group))
def get_group(self, status=True):
"""Получаем все активные группы"""
with self.connection:
return self.cursor.execute("SELECT * FROM `group` WHERE `status` = ?", (status,)).fetchall()
def id_group_lst(self, status=True):
"""Список айди активных групп"""
with self.connection:
return [i[1] for i in self.cursor.execute("SELECT * FROM `group` WHERE `status` = ?", (status,)).fetchall()]
def all_id_group_lst(self):
"""Список всех айди групп"""
with self.connection:
return [i[0] for i in self.cursor.execute(f'SELECT id_group FROM `group`').fetchall()]
def statistics_group(self, id_group, var):
"""Обновляем статистику"""
with self.connection:
self.cursor.execute("SELECT * FROM `group` WHERE `id_group` = ?", (id_group,))
if var == 0:
return self.cursor.execute("UPDATE `group` SET `time` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[3] + 1), id_group))
elif var == 1:
return self.cursor.execute("UPDATE `group` SET `message` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[4] + 1), id_group))
elif var == 2:
return self.cursor.execute("UPDATE `group` SET `reply` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[5] + 1), id_group))
elif var == 3:
return self.cursor.execute("UPDATE `group` SET `command` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[6] + 1), id_group))
elif var == 4:
return self.cursor.execute("UPDATE `group` SET `url` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[7] + 1), id_group))
elif var == 5:
return self.cursor.execute("UPDATE `group` SET `tik_tok` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[8] + 1), id_group))
elif var == 6:
return self.cursor.execute("UPDATE `group` SET `media` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[9] + 1), id_group))
elif var == 7:
return self.cursor.execute("UPDATE `group` SET `sticker` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[10] + 1), id_group))
elif var == 8:
return self.cursor.execute("UPDATE `group` SET `voice` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[11] + 1), id_group))
elif var == 9:
return self.cursor.execute("UPDATE `group` SET `bool` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[12] + 1), id_group))
elif var == 10:
return self.cursor.execute("UPDATE `group` SET `cool` = ? WHERE `id_group` = ?",
((self.cursor.fetchone()[13] + 1), id_group))
def stat_element_group(self, id_group):
"""Получение данных группы"""
with self.connection:
self.cursor.execute("SELECT * FROM `group` WHERE `id_group` = ?", (id_group,))
data = self.cursor.fetchone()
time, message, reply, command, url = int(data[3]), int(data[4]), int(data[5]), int(data[6]), int(data[7])
tik_tok, media, sticker, voice = int(data[8]), int(data[9]), int(data[10]), int(data[11])
bol, cool = int(data[12]), int(data[13])
return [message, reply, command, url, tik_tok, media, sticker, voice, time, bol, cool]
def stat_element_month_group(self, id_group):
"""Получение месячных данных группы"""
with self.connection:
self.cursor.execute("SELECT * FROM `month` WHERE `id_group` = ?", (id_group,))
data = self.cursor.fetchone()
message, reply, command, url = int(data[2]), int(data[3]), int(data[4]), int(data[5])
tik_tok, media, sticker, voice = int(data[6]), int(data[7]), int(data[8]), int(data[9])
bol, cool = int(data[10]), int(data[11])
return [message, reply, command, url, tik_tok, media, sticker, voice, bol, cool]
def month_stat_group(self, id_group):
"""Перезапись месечной статистики"""
with self.connection:
self.cursor.execute("SELECT * FROM `group` WHERE `id_group` = ?", (id_group,))
group = self.cursor.fetchone()
self.cursor.execute("SELECT * FROM `month` WHERE `id_group` = ?", (id_group,))
month = self.cursor.fetchone()
message, reply, command = (group[4] - month[2]), (group[5] - month[3]), (group[6] - month[4])
url, tik_tok, media = (group[7] - month[5]), (group[8] - month[6]), (group[9] - month[7])
sticker, voice, bol = (group[10] - month[8]), (group[11] - month[9]), (group[12] - month[10])
cool = (group[13] - month[11])
self.cursor.execute("UPDATE `month` SET `message` = ? WHERE `id_group` = ?", ((group[4]), id_group))
self.cursor.execute("UPDATE `month` SET `reply` = ? WHERE `id_group` = ?", ((group[5]), id_group))
self.cursor.execute("UPDATE `month` SET `command` = ? WHERE `id_group` = ?", ((group[6]), id_group))
self.cursor.execute("UPDATE `month` SET `url` = ? WHERE `id_group` = ?", ((group[7]), id_group))
self.cursor.execute("UPDATE `month` SET `tik_tok` = ? WHERE `id_group` = ?", ((group[8]), id_group))
self.cursor.execute("UPDATE `month` SET `media` = ? WHERE `id_group` = ?", ((group[9]), id_group))
self.cursor.execute("UPDATE `month` SET `sticker` = ? WHERE `id_group` = ?", ((group[10]), id_group))
self.cursor.execute("UPDATE `month` SET `voice` = ? WHERE `id_group` = ?", ((group[11]), id_group))
self.cursor.execute("UPDATE `month` SET `bool` = ? WHERE `id_group` = ?", ((group[12]), id_group))
self.cursor.execute("UPDATE `month` SET `cool` = ? WHERE `id_group` = ?", ((group[13]), id_group))
return [message, reply, command, url, tik_tok, media, sticker, voice, bol, cool]
# СВЯЗКА ПОЛЬЗОВАТЕЛЯ
def user_exists(self, id_user, db):
"""Проверяем, есть ли уже пользователь в базе"""
with self.connection:
result = self.cursor.execute(f'SELECT * FROM `{db}` WHERE `id_user` = ?', (id_user,)).fetchall()
return bool(len(result))
def add_user(self, id_user, name, db):
"""Добавляем нового пользователя"""
with self.connection:
return self.cursor.execute(f"INSERT INTO `{db}` (`id_user`, `first_name`, `edit`, `time`, `message`, "
f"`reply`, `command`, `url`, `tik_tok`, `media`, `sticker`, `voice`, `bool`, "
f"`cool`) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(id_user, name, False, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))
def update_name(self, id_user, name, db):
"""Обновляем имя пользователя"""
with self.connection:
return self.cursor.execute(f"UPDATE `{db}` SET `first_name` = ? WHERE `id_user` = ?", (name, id_user))
def edit_name(self, id_user, name, db, var):
"""Заменяем имеющиеся имя на желаемое"""
with self.connection:
if var == 1:
self.cursor.execute(f"UPDATE `{db}` SET `edit` = ? WHERE `id_user` = ?", (True, id_user))
return self.cursor.execute(f"UPDATE `{db}` SET `first_name` = ? WHERE `id_user` = ?", (name, id_user))
elif var == 0:
self.cursor.execute(f"UPDATE `{db}` SET `edit` = ? WHERE `id_user` = ?", (False, id_user))
return self.cursor.execute(f"UPDATE `{db}` SET `first_name` = ? WHERE `id_user` = ?", (name, id_user))
def get_users(self, db, status=True):
"""Получаем всех активных пользователей"""
with self.connection:
return [i[1] for i in self.cursor.execute(f"SELECT * FROM `{db}` WHERE `edit` = ?", (status,)).fetchall()]
def id_lst(self, db):
"""Список айди"""
with self.connection:
return [i[0] for i in self.cursor.execute(f'SELECT id_user FROM `{db}`').fetchall()]
def name_lst(self, db):
"""Список имён"""
with self.connection:
return [i[0] for i in self.cursor.execute(f'SELECT first_name FROM `{db}`').fetchall()]
def user_statistic(self, id_user, id_group, var):
"""Обновляем статистику"""
with self.connection:
self.cursor.execute(f"SELECT * FROM `{id_group}` WHERE `id_user` = ?", (id_user,))
if var == 0:
return self.cursor.execute(f"UPDATE `{id_group}` SET `time` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[4] + 1), id_user))
elif var == 1:
return self.cursor.execute(f"UPDATE `{id_group}` SET `message` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[5] + 1), id_user))
elif var == 2:
return self.cursor.execute(f"UPDATE `{id_group}` SET `reply` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[6] + 1), id_user))
elif var == 3:
return self.cursor.execute(f"UPDATE `{id_group}` SET `command` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[7] + 1), id_user))
elif var == 4:
return self.cursor.execute(f"UPDATE `{id_group}` SET `url` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[8] + 1), id_user))
elif var == 5:
return self.cursor.execute(f"UPDATE `{id_group}` SET `tik_tok` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[9] + 1), id_user))
elif var == 6:
return self.cursor.execute(f"UPDATE `{id_group}` SET `media` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[10] + 1), id_user))
elif var == 7:
return self.cursor.execute(f"UPDATE `{id_group}` SET `sticker` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[11] + 1), id_user))
elif var == 8:
return self.cursor.execute(f"UPDATE `{id_group}` SET `voice` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[12] + 1), id_user))
elif var == 9:
return self.cursor.execute(f"UPDATE `{id_group}` SET `bool` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[13] + 1), id_user))
elif var == 10:
return self.cursor.execute(f"UPDATE `{id_group}` SET `cool` = ? WHERE `id_user` = ?",
((self.cursor.fetchone()[14] + 1), id_user))
def stat_element_user(self, id_user, id_group):
"""Получение данных пользователя"""
with self.connection:
self.cursor.execute(f"SELECT * FROM `{id_group}` WHERE `id_user` = ?", (id_user,))
data = self.cursor.fetchone()
time, message, reply, command, url = int(data[4]), int(data[5]), int(data[6]), int(data[7]), int(data[8])
tik_tok, media, sticker, voice = int(data[9]), int(data[10]), int(data[11]), int(data[12])
bol, cool = int(data[13]), int(data[14])
return [message, reply, command, url, tik_tok, media, sticker, voice, time, bol, cool]
def get_name_user(self, id_user, id_group):
"""Получение имени пользователя"""
with self.connection:
self.cursor.execute(f"SELECT * FROM `{id_group}` WHERE `id_user` = ?", (id_user,))
return self.cursor.fetchone()[2]
def close(self):
"""Закрываем соединение с БД"""
self.connection.close()