Здравствуйте!
У меня есть сервер с установленной Ubuntu 24.04.2 LTS, на котором функционирует Airflow в Docker. К этому серверу подключен внешний HDD объемом 2 ТБ, отформатированный в NTFS.
Мне необходимо создать DAG в Airflow, который будет удалять определенные файлы с этого HDD при запуске.
Я столкнулся с проблемой: Airflow видит директории и файлы на диске и может их прочитать, однако при использовании функции `shutil.rmtree(path_dir_file)` возникает ошибка, которая сообщает, что директория не найдена.
Я уже попробовал множество способов решить эту проблему в течение трех дней:
- Выдал права на чтение и удаление для указанной директории. В настоящее время HDD перемонтирован с общими правами.
- Убедился, что в Docker путь также смонтирован с правами для чтения и удаления.
- Попытался удалить файлы с диска, отформатированного в ext4, с тем же результатом.
- Исследовал варианты использования Bash-операторов в DAG.
К сожалению, я не могу вспомнить все свои попытки, и возможно, что я упустил какое-то простое решение из-за недостатка опыта и знаний.
Вот часть конфигурации для Airflow:
```yaml
airflow-webserver:
container_name: airflow_web
build: .
user: "1000:0"
environment:
UMASK: "000"
AIRFLOW__CORE__UMASK: "000"
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__EXECUTOR: LocalExecutor
volumes:
- /mnt/sdb1/file_load:/opt/airflow/files:rw,z,delegated,uid=1000,gid=0,cached
ports:
- "8081:8080"
command: |
bash -c "
airflow db migrate
airflow users create --username admin --password admin --firstname admin --lastname admin --role Admin --email
```
Помогите, пожалуйста, разобраться с данной проблемой.
Чтобы устранить проблемы с удалением файлов и директорий на жестком диске, подключенном к вашему серверу с Airflow, есть несколько шагов, которые вы можете предпринять. Вот некоторые рекомендации:
### 1. Проверка прав доступа
Убедитесь, что у пользователя, от имени которого выполняется Airflow в Docker, есть необходимые права доступа для удаления файлов и директорий на внешнем диске. Вы можете проверить это с помощью следующей команды в терминале вашего сервера:
```bash
ls -ld /mnt/sdb1/file_load
```
Если права доступа не установлены должным образом, измените их, используя `chmod` для установки необходимых прав:
```bash
sudo chmod -R 777 /mnt/sdb1/file_load
```
Также проверьте пользователя и группу:
```bash
chown -R 1000:0 /mnt/sdb1/file_load
```
### 2. Использование Bash-операторов
К примеру, вы можете использовать `BashOperator` в вашем DAG, чтобы выполнить команду на удаление файла или директории. Вот пример, как это можно сделать:
```python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('delete_files_dag', default_args=default_args, schedule_interval='@daily')
delete_files = BashOperator(
task_id='delete_files',
bash_command='rm -rf /opt/airflow/files/some_directory_or_file',
dag=dag,
)
delete_files
```
### 3. Проверка Docker-контейнера
Убедитесь, что ваш Docker-контейнер имеет доступ к пути `/opt/airflow/files`, и он правильно смонтирован. Вы можете проверить это, войдя в контейнер и выполнив команду:
```bash
docker exec -it airflow_web bash
ls /opt/airflow/files
```
### 4. Проверка ошибок
Если ошибка "директория не найдена" продолжается, вы можете использовать `print` или `logging` в вашем коде для диагностики. Например, убедитесь, что Python знает, где искать ваш файл.
```python
import os
import shutil
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def delete_files():
path = '/opt/airflow/files/some_directory_or_file' # Замените на ваш путь
if os.path.exists(path):
print(f'Удаление: {path}')
shutil.rmtree(path)
else:
print(f'Путь не найден: {path}')
dag = DAG(
'delete_files_dag',
default_args={'start_date': datetime(2023, 10, 1)},
schedule_interval='@daily',
)
delete_task = PythonOperator(
task_id='delete_files_task',
python_callable=delete_files,
dag=dag,
)
```
### 5. Запуск и мониторинг
Следите за логами вашего DAG, чтобы увидеть, сможете ли вы отследить, когда и почему происходит ошибка. Это поможет вам понять причины неудачи, если они появятся.
### Заключение
Если вы попробовали все эти шаги и все еще сталкиваетесь с проблемами, возможно, вам стоит заключить в коде больше логирования, чтобы лучше понять, что именно идет не так. Иногда это может быть связано с особенностями работы с внешними файловыми системами, поскольку NTFS и Linux имеют разные модели обработки прав доступа.
Вопрос решен.
Смог какое как наладить доступы для удаления файлов.
Правда удалять директории приходиться не с помощь shutil.rmtree(path_dir_file) напрямую, а сначала чистить директорию, и после этого удалять, т.к. начала возникать ошибка о том что директорию удалить не удается, но при этом она удалялась. Связано это я так понял было с тем что скрипт удаляет данные, но информация о удалении слишком долго обновляется на hdd, в итоге python думает что удалить ничего не вышло, но фактически директория была удалена.
Решил это вот таким образом.
for root, dirs, files in os.walk(path_dir_file, topdown=False): for file in files: try: os.unlink(file) os.sync() # синхронизируем except OSError as e: if not os.path.exists(file): # проверяем удален ли файл continue raise shutil.rmtree(path_dir_file) # только теперь удаляем директорию
Для HDD в /etc/fstab пришлось указать такие параметры
UUID=UUID-HDD /mnt/sdb1 ntfs rw,sync,noatime,relatime,uid=UUID-AIRFLOW,gid=UUID-AIRFLOW,force,dmask=002,fmask=113,nofail 0 0
до этого была эта же настройка, но с ntfs-3g и с ней почему то не работало вообще ни как.