Как можно осуществить удаление файлов и директорий на жестком диске при работе с DAG в Airflow?

Здравствуйте! 

У меня есть сервер с установленной Ubuntu 24.04.2 LTS, на котором функционирует Airflow в Docker. К этому серверу подключен внешний HDD объемом 2 ТБ, отформатированный в NTFS. 

Мне необходимо создать DAG в Airflow, который будет удалять определенные файлы с этого HDD при запуске.

Я столкнулся с проблемой: Airflow видит директории и файлы на диске и может их прочитать, однако при использовании функции `shutil.rmtree(path_dir_file)` возникает ошибка, которая сообщает, что директория не найдена.

Я уже попробовал множество способов решить эту проблему в течение трех дней:
- Выдал права на чтение и удаление для указанной директории. В настоящее время HDD перемонтирован с общими правами.
- Убедился, что в Docker путь также смонтирован с правами для чтения и удаления.
- Попытался удалить файлы с диска, отформатированного в ext4, с тем же результатом.
- Исследовал варианты использования Bash-операторов в DAG.

К сожалению, я не могу вспомнить все свои попытки, и возможно, что я упустил какое-то простое решение из-за недостатка опыта и знаний.

Вот часть конфигурации для Airflow:

```yaml
airflow-webserver:
    container_name: airflow_web
    build: .
    user: "1000:0"
    environment:
      UMASK: "000" 
      AIRFLOW__CORE__UMASK: "000"
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow  
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
    volumes:
      - /mnt/sdb1/file_load:/opt/airflow/files:rw,z,delegated,uid=1000,gid=0,cached
    ports:
      - "8081:8080"
    command: |
      bash -c "
      airflow db migrate
      airflow users create --username admin --password admin --firstname admin --lastname admin --role Admin --email 
``` 

Помогите, пожалуйста, разобраться с данной проблемой.
  • 3 апреля 2025 г. 17:05
Ответы на вопрос 2
Чтобы устранить проблемы с удалением файлов и директорий на жестком диске, подключенном к вашему серверу с Airflow, есть несколько шагов, которые вы можете предпринять. Вот некоторые рекомендации:

### 1. Проверка прав доступа

Убедитесь, что у пользователя, от имени которого выполняется Airflow в Docker, есть необходимые права доступа для удаления файлов и директорий на внешнем диске. Вы можете проверить это с помощью следующей команды в терминале вашего сервера:

```bash
ls -ld /mnt/sdb1/file_load
```

Если права доступа не установлены должным образом, измените их, используя `chmod` для установки необходимых прав:

```bash
sudo chmod -R 777 /mnt/sdb1/file_load
```

Также проверьте пользователя и группу:

```bash
chown -R 1000:0 /mnt/sdb1/file_load
```

### 2. Использование Bash-операторов

К примеру, вы можете использовать `BashOperator` в вашем DAG, чтобы выполнить команду на удаление файла или директории. Вот пример, как это можно сделать:

```python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 10, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('delete_files_dag', default_args=default_args, schedule_interval='@daily')

delete_files = BashOperator(
    task_id='delete_files',
    bash_command='rm -rf /opt/airflow/files/some_directory_or_file',
    dag=dag,
)

delete_files
```

### 3. Проверка Docker-контейнера

Убедитесь, что ваш Docker-контейнер имеет доступ к пути `/opt/airflow/files`, и он правильно смонтирован. Вы можете проверить это, войдя в контейнер и выполнив команду:

```bash
docker exec -it airflow_web bash
ls /opt/airflow/files
```

### 4. Проверка ошибок 

Если ошибка "директория не найдена" продолжается, вы можете использовать `print` или `logging` в вашем коде для диагностики. Например, убедитесь, что Python знает, где искать ваш файл.

```python
import os
import shutil
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def delete_files():
    path = '/opt/airflow/files/some_directory_or_file'  # Замените на ваш путь
    if os.path.exists(path):
        print(f'Удаление: {path}')
        shutil.rmtree(path)
    else:
        print(f'Путь не найден: {path}')

dag = DAG(
    'delete_files_dag',
    default_args={'start_date': datetime(2023, 10, 1)},
    schedule_interval='@daily',
)

delete_task = PythonOperator(
    task_id='delete_files_task',
    python_callable=delete_files,
    dag=dag,
)
```

### 5. Запуск и мониторинг

Следите за логами вашего DAG, чтобы увидеть, сможете ли вы отследить, когда и почему происходит ошибка. Это поможет вам понять причины неудачи, если они появятся.

### Заключение 

Если вы попробовали все эти шаги и все еще сталкиваетесь с проблемами, возможно, вам стоит заключить в коде больше логирования, чтобы лучше понять, что именно идет не так. Иногда это может быть связано с особенностями работы с внешними файловыми системами, поскольку NTFS и Linux имеют разные модели обработки прав доступа.
Вопрос решен. 
Смог какое как наладить доступы для удаления файлов.
Правда удалять директории приходиться не с помощь shutil.rmtree(path_dir_file) напрямую, а сначала чистить директорию, и после этого удалять, т.к. начала возникать ошибка о том что директорию удалить не удается, но при этом она удалялась. Связано это я так понял было с тем что скрипт удаляет данные, но информация о удалении слишком долго обновляется на hdd, в итоге python думает что удалить ничего не вышло, но фактически директория была удалена.
Решил это вот таким образом.
for root, dirs, files in os.walk(path_dir_file, topdown=False):
            for file in files:
                try:
                    os.unlink(file)
                    os.sync()  # синхронизируем
                except OSError as e:
                    if not os.path.exists(file):  # проверяем удален ли файл
                        continue
                    raise
            shutil.rmtree(path_dir_file) # только теперь удаляем директорию

Для HDD в /etc/fstab пришлось указать такие параметры
UUID=UUID-HDD /mnt/sdb1 ntfs rw,sync,noatime,relatime,uid=UUID-AIRFLOW,gid=UUID-AIRFLOW,force,dmask=002,fmask=113,nofail 0 0


до этого была эта же настройка, но с ntfs-3g и с ней почему то не работало вообще ни как.
Похожие вопросы