Отключение звука рекламы на телевизоре через ИИ

Наверно и такие были. Но скорее всего они писали весь эфир, не исключая рекламу

Есть приставки с функцией time-shift. Идея технологии в том, что вы смотрите видеопоток не в том времени, в котором он вещается, а в записи, отстающей не несколько минут. Приставка ведет запись эфира, а вам транслируется сделанная запись. При начале рекламы просто проматываете вперед до ее окончания. Понятно, что “на холодную” сразу после переключения на канал оно работать не будет. Но, если никуда не спешишь, смотришь только один канал, то выбрал его, поставил на паузу, сходил чайку себе налил, бутербродик намазал, потом снял с паузы и смотришь. Уже на блок-другой рекламы буфер заполнен.

Но все равно это руками…

Попрограммировать - совместить технологию time-shift и распознавание рекламы. Прелесть в том, что софт опознает рекламу с некоторой задержкой, но потом откатится по времени назад точно до момента начала рекламы. И вернет звук точно по концу рекламного блока. Ну просто потому что не в реальном времени, а по записи выделять инородные фрагменты (а особенно фрагменты с известным паттерном - то есть знакомые рекламные блоки) проще.

Идея-то правильная, и , уверен. абсолютно реализуемая на нынешнем уровне развития… но, подохреваю, для персонального использования будет жрать немеряно ресурсов…
Ну второе - не вижу тут специалистов, способных это воплотить

@ЕвгенийП в тебе сомневаются, простую хрень выделить из потока битовую последовательность не сможешь реализовать…жеж

Петрович, жить он без тебя не может)
у меня вот в один телек тока лан кабель идет, а другой ваще на WiFi, мне из какого потока ту самую “битовую последовательность” выделять?

тот есть, опять жеж катушку мотать нада?)))

Блин. Или фамилия проклята или!? :face_with_raised_eyebrow:

image

за чёрную Луну Павел Глоба топил, а он подтвердил, хотя это виртуальная планета, теперь кто-то кому-то видимо денег должен

и таки да, сколько можно из пустого в пустопорожнее

ffmpeg (он бесплатный) умеет всё

Вот тут некоторые издеваются, а катушки мотать дело чрезвычайно полезное!

Я тут, блин генератор делал на 50 МГц (одному Богу известно нахрена!), а он, сволочь на дешманской катушке никак не хотел стабильно работать. Так, чё - кусок медной проволоки из мотора приказавшего долго жить пылесоса, намотать несколько витков на гвоздь и voilà! Всё работает, а если растягивать и сжимать как гармошку, так ещё и частота “регулируется”! Вот те и катушка! Вот такая красота получилась:

воском залей, для стабильности

Зачем? Мне не нужен этот генератор, я делал его в качестве учебного упражнения. Сделал, заработало, весь процесс расчёта и настройки задокументирован. Больше он мне не нужен.

Кстати, если заливать тем же воском или парафином (или там “соплями девственницы”), частота ведь немного уйдёт после заливки, так ведь?

Уйдёт меньше чем на 0.1%. Можно не увидить. Парафин имеет диэлектрическую проницаемость `εᵣ ≈ 2.0–2.2 , поэтому межвитковая ёмкость немного возрастёт. Индуктивность не изменится.

Спасибо.

ffmpeg + этот скрипт проблему одолеют

#!/usr/bin/env python3
"""
SCTE-35 Audio Mute Script
Обнаруживает рекламные метки в MPEG-TS потоке и приглушает звук
"""

import sys
import struct
import time
import threading
import subprocess
import json
from datetime import datetime
from typing import Optional, Dict, Any

# Для управления громкостью (выберите вашу ОС)
import platform

class AudioController:
    """Класс для управления громкостью системы"""
    
    def __init__(self):
        self.system = platform.system()
        self.original_volume = self.get_volume()
        self.is_muted = False
        
    def get_volume(self) -> int:
        """Получить текущую громкость (0-100)"""
        try:
            if self.system == "Linux":
                # Для PulseAudio
                result = subprocess.run(
                    ["pactl", "get-sink-volume", "@DEFAULT_SINK@"],
                    capture_output=True, text=True
                )
                # Извлекаем процент
                for part in result.stdout.split():
                    if '%' in part:
                        return int(part.replace('%', ''))
                    
            elif self.system == "Windows":
                # Через PowerShell
                cmd = '''
                (Get-AudioDevice -Playback).Volume
                '''
                result = subprocess.run(
                    ["powershell", "-Command", cmd],
                    capture_output=True, text=True
                )
                return int(result.stdout.strip())
                
            elif self.system == "Darwin":  # macOS
                result = subprocess.run(
                    ["osascript", "-e", "output volume of (get volume settings)"],
                    capture_output=True, text=True
                )
                return int(result.stdout.strip())
        except:
            pass
        return 50  # Значение по умолчанию
    
    def set_volume(self, volume: int):
        """Установить громкость"""
        try:
            if self.system == "Linux":
                subprocess.run(
                    ["pactl", "set-sink-volume", "@DEFAULT_SINK@", f"{volume}%"],
                    capture_output=True
                )
            elif self.system == "Windows":
                subprocess.run(
                    ["powershell", "-Command", 
                     f"Set-AudioDevice -PlaybackVolume {volume}"],
                    capture_output=True
                )
            elif self.system == "Darwin":  # macOS
                subprocess.run(
                    ["osascript", "-e", f"set volume output volume {volume}"],
                    capture_output=True
                )
        except Exception as e:
            print(f"Warning: Could not set volume: {e}")
    
    def mute(self):
        """Приглушить звук"""
        if not self.is_muted:
            print(f"[{datetime.now().strftime('%H:%M:%S')}] 🔇 MUTING AUDIO (Ad detected)")
            self.set_volume(0)
            self.is_muted = True
    
    def unmute(self):
        """Вернуть звук"""
        if self.is_muted:
            print(f"[{datetime.now().strftime('%H:%M:%S')}] 🔊 UNMUTING AUDIO (Ad ended)")
            self.set_volume(self.original_volume)
            self.is_muted = False


class SCTE35Parser:
    """Парсер SCTE-35 меток из MPEG-TS потока"""
    
    def __init__(self, audio_controller: AudioController):
        self.audio = audio_controller
        self.current_ad_end_pts: Optional[int] = None
        self.last_pts: Optional[int] = None
        
    def parse_splice_command(self, data: bytes, pts: int) -> Optional[Dict[str, Any]]:
        """
        Парсинг SCTE-35 splice команды
        Упрощенная версия - фокусируемся на важных полях
        """
        if len(data) < 10:
            return None
            
        try:
            # Проверяем Table ID (0xFC для SCTE-35)
            table_id = data[0]
            if table_id != 0xFC:
                return None
            
            # Длина секции
            section_length = ((data[1] & 0x0F) << 8) | data[2]
            
            # Протокол версия (должен быть 0)
            protocol_version = data[3]
            
            # Пропускаем до splice_command_type
            if len(data) < 8:
                return None
                
            splice_command_type = data[7]
            
            result = {
                'type': splice_command_type,
                'pts': pts,
                'raw': data.hex()[:32]  # для отладки
            }
            
            # Обработка splice_insert (type = 5)
            if splice_command_type == 5:
                splice_event_id = (data[8] << 24) | (data[9] << 16) | (data[10] << 8) | data[11]
                splice_event_cancel = (data[12] & 0x80) != 0
                
                result['splice_event_id'] = splice_event_id
                result['splice_event_cancel'] = splice_event_cancel
                
                if not splice_event_cancel and len(data) > 13:
                    out_of_network = (data[13] & 0x80) != 0
                    result['out_of_network'] = out_of_network
                    
                    # Длительность рекламы
                    if out_of_network and len(data) > 18:
                        duration_ticks = ((data[14] & 0x01) << 32) | (data[15] << 24) | \
                                        (data[16] << 16) | (data[17] << 8) | data[18]
                        # Преобразуем в секунды (90kHz clock)
                        duration_sec = duration_ticks / 90000.0
                        result['duration_seconds'] = duration_sec
                        
            # Обработка time_signal (type = 6)
            elif splice_command_type == 6:
                if len(data) > 12:
                    # Проверяем наличие segmentation_descriptor
                    descriptor_tag = data[12]
                    if descriptor_tag == 0x02:  # segmentation_descriptor
                        if len(data) > 14:
                            segmentation_event_id = (data[13] << 8) | data[14]
                            result['segmentation_event_id'] = segmentation_event_id
            
            return result
            
        except Exception as e:
            print(f"Error parsing SCTE-35: {e}")
            return None
    
    def process_packet(self, pid: int, data: bytes, pts: int):
        """Обработка пакета с SCTE-35 данными"""
        if pid != 0x86:  # PID для SCTE-35 обычно 0x86 (134)
            return
        
        splice_info = self.parse_splice_command(data, pts)
        if not splice_info:
            return
        
        # Логируем найденную метку
        print(f"[{datetime.now().strftime('%H:%M:%S')}] 📺 SCTE-35 found: "
              f"type={splice_info['type']}, pts={pts}")
        
        # Обработка начала рекламы
        if splice_info['type'] == 5:  # splice_insert
            if not splice_info.get('splice_event_cancel', False):
                if splice_info.get('out_of_network', False):  # начало рекламы
                    self.audio.mute()
                    
                    # Если есть длительность, планируем автоматическое восстановление
                    duration = splice_info.get('duration_seconds')
                    if duration:
                        print(f"  Ad duration: {duration:.2f} seconds")
                        # Отменяем предыдущий таймер если был
                        if hasattr(self, 'unmute_timer') and self.unmute_timer:
                            self.unmute_timer.cancel()
                        # Запускаем таймер на восстановление
                        self.unmute_timer = threading.Timer(duration, self.audio.unmute)
                        self.unmute_timer.start()
            else:
                # Отмена события - конец рекламы
                self.audio.unmute()
                if hasattr(self, 'unmute_timer') and self.unmute_timer:
                    self.unmute_timer.cancel()
                    
        elif splice_info['type'] == 6:  # time_signal
            # Обычно начало рекламы определяется по segmentation_descriptor
            if splice_info.get('segmentation_event_id'):
                print(f"  Time signal with segmentation ID: {splice_info['segmentation_event_id']}")
                # Здесь можно добавить логику для time_signal если нужно


class MPEGTSReader:
    """Читатель MPEG-TS потока"""
    
    def __init__(self, parser: SCTE35Parser):
        self.parser = parser
        self.ts_packet_size = 188
        self.sync_byte = 0x47
        
    def extract_pts(self, adaptation_field: bytes) -> Optional[int]:
        """Извлечение PTS из adaptation field"""
        if len(adaptation_field) < 5:
            return None
            
        # Флаг наличия PTS
        flags = adaptation_field[0]
        if flags & 0x40:  # PTS flag
            # PTS закодирован в 5 байтах
            pts_bytes = adaptation_field[1:6]
            pts = ((pts_bytes[0] & 0x0E) << 29) | \
                  (pts_bytes[1] << 22) | \
                  ((pts_bytes[2] & 0xFE) << 14) | \
                  (pts_bytes[3] << 7) | \
                  ((pts_bytes[4] & 0xFE) >> 1)
            return pts
        return None
    
    def read_packet(self, packet: bytes) -> tuple:
        """
        Разбор TS пакета
        Возвращает (pid, adaptation_field_control, payload, pts)
        """
        if len(packet) < self.ts_packet_size or packet[0] != self.sync_byte:
            return (None, None, None, None)
        
        # PID
        pid = ((packet[1] & 0x1F) << 8) | packet[2]
        
        # Adaptation field control
        adaptation_field_control = (packet[3] & 0x30) >> 4
        
        payload = None
        pts = None
        offset = 4
        
        # Adaptation field
        if adaptation_field_control & 0x20:  # имеет adaptation field
            adaptation_field_length = packet[offset]
            offset += 1
            
            if adaptation_field_length > 0:
                adaptation_field = packet[offset:offset + adaptation_field_length]
                pts = self.extract_pts(adaptation_field)
            
            offset += adaptation_field_length
        
        # Payload
        if adaptation_field_control & 0x10:  # имеет payload
            payload = packet[offset:]
        
        return (pid, payload, pts)
    
    def read_from_file(self, filename: str):
        """Чтение потока из файла"""
        print(f"Reading from file: {filename}")
        with open(filename, 'rb') as f:
            while True:
                packet = f.read(self.ts_packet_size)
                if not packet:
                    break
                    
                pid, payload, pts = self.read_packet(packet)
                if payload and pts:
                    self.parser.process_packet(pid, payload, pts)
                
                # Небольшая задержка для эмуляции реального времени
                time.sleep(0.001)
    
    def read_from_udp(self, host: str, port: int):
        """Чтение потока из UDP"""
        import socket
        print(f"Listening on UDP {host}:{port}")
        
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.bind((host, port))
        sock.settimeout(1.0)
        
        buffer = b''
        while True:
            try:
                data, addr = sock.recvfrom(65535)
                buffer += data
                
                # Обрабатываем полные пакеты
                while len(buffer) >= self.ts_packet_size:
                    packet = buffer[:self.ts_packet_size]
                    buffer = buffer[self.ts_packet_size:]
                    
                    pid, payload, pts = self.read_packet(packet)
                    if payload and pts:
                        self.parser.process_packet(pid, payload, pts)
                        
            except socket.timeout:
                continue
            except KeyboardInterrupt:
                break


def main():
    """Главная функция"""
    print("=" * 60)
    print("SCTE-35 Audio Mute Script")
    print("Detects ad markers and mutes audio automatically")
    print("=" * 60)
    
    # Инициализация
    audio = AudioController()
    parser = SCTE35Parser(audio)
    reader = MPEGTSReader(parser)
    
    # Выбор источника
    if len(sys.argv) < 2:
        print("\nUsage:")
        print("  python scte35_audio_mute.py file.ts")
        print("  python scte35_audio_mute.py udp://239.0.0.1:1234")
        print("  python scte35_audio_mute.py http://example.com/stream.ts")
        sys.exit(1)
    
    source = sys.argv[1]
    
    try:
        if source.startswith('udp://'):
            # UDP multicast: udp://239.0.0.1:1234
            parts = source.replace('udp://', '').split(':')
            host = parts[0]
            port = int(parts[1])
            reader.read_from_udp(host, port)
        elif source.startswith('http://') or source.startswith('https://'):
            # HTTP stream - используем ffmpeg как прослойку
            print(f"Reading HTTP stream via ffmpeg: {source}")
            cmd = ['ffmpeg', '-i', source, '-c', 'copy', '-f', 'mpegts', '-']
            process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
            
            while True:
                packet = process.stdout.read(reader.ts_packet_size)
                if not packet:
                    break
                pid, payload, pts = reader.read_packet(packet)
                if payload and pts:
                    parser.process_packet(pid, payload, pts)
        else:
            # Файл
            reader.read_from_file(source)
            
    except KeyboardInterrupt:
        print("\n\n🛑 Stopping...")
        audio.unmute()  # Восстанавливаем громкость при выходе
    except Exception as e:
        print(f"Error: {e}")
        audio.unmute()


if __name__ == "__main__":
    main()

Интересно… хотя ты, как всегда, не даешь ссылок на автора.

Ок, оставим мелочи.
Что такое ffmpeg я знаю, а вот откуда я возьму MPEG-TS поток с SCTE-35 метками для ТВ сигнала?

Под ключевым кадром идентификации рекламы в трансляции (в контексте маркировки) понимается кадр видеоролика, на котором четко виден токен (erid — уникальный цифровой идентификатор). В рамках требований Роскомнадзора и ЕРИР, этот маркер должен быть доступен для распознавания системами мониторинга.

[image]adpass.ru +1

Ключевые моменты идентификации:

  • Маркировка (erid): Реклама должна содержать буквенно-цифровой токен (например, erid=XXXXX), который размещается в кликовой ссылке, текстовом блоке или поверх видео.
  • Отображение в видео: Токен (erid) должен быть виден в течение всего времени показа рекламного материала.
  • Автоматическое распознавание: Рекламные объявления идентифицируются специальными алгоритмами, которые ищут маркер в видеопотоке.

[image]beeline +1

Технические аспекты (Keyframes в видео):

Хотя в контексте маркировки важнее визуальное наличие кода (erid), технически при анализе видео системами ИИ ключевым кадром (I-frame) является кадр, содержащий полную информацию об изображении, используемый для анализа содержания ролика.

[image]KinoSklad.RU

Важно: С 2023–2025 гг. в России обязательна маркировка рекламы в интернете, включающая токен (erid) от ОРД.

Это не совсем mpeg-ts, а hls. Но чанки mpeg-ts.
Первый канал.
Можешь принять его и тем же ffmpeg превратить его в чистый mpeg-ts.

сильно сомневаюсь