from rest_framework.views import APIView
from oauth2_provider.contrib.rest_framework import OAuth2Authentication
from authentication.permissions import require_permission,CustomIsAuthenticated
from django.views.decorators.csrf import csrf_exempt
from Datamplify import settings
from rest_framework.response import Response
import requests
from rest_framework import status
from Monitor.utils import airflow_token

def get_headers(force_refresh=False):
        token = airflow_token(force_refresh =force_refresh)
        return {
            "Authorization": f"Bearer {token}"
        }

class AirflowDagRunStatusView(APIView):
    """
    Proxy endpoint:
    GET /api/airflow/dags/<dag_id>/runs/?limit=10
    """
    authentication_classes = [OAuth2Authentication]
    permission_classes = [CustomIsAuthenticated]

    def get(self, request, dag_id):
        limit = request.query_params.get("limit", 10)

        airflow_url = f"{settings.airflow_host}/ui/grid/{dag_id}"
        params = {
            "limit": limit,
            "order_by": "-run_after"
        }

        try:
            response = requests.get(
                airflow_url,
                params=params,
                headers=get_headers(),
                timeout=10
            )
            if response.status_code == 401:
                response = requests.get(
                airflow_url,
                params=params,
                headers=get_headers(force_refresh=True),
                timeout=10
            )
            response.raise_for_status()
            return Response(response.json(), status=response.status_code)

        except requests.exceptions.RequestException as e:
            return Response(
                {"error": str(e)},
                status=status.HTTP_502_BAD_GATEWAY
            )



class AirflowRecentDagRunsView(APIView):
    """
    Proxy endpoint:
    GET /api/airflow/dags/<dag_id>/recent-runs/
    """
    authentication_classes = [OAuth2Authentication]
    permission_classes = [CustomIsAuthenticated]

    def get(self, request):
        dag_id = request.query_params.get('dag_id')

        airflow_url = f"{settings.airflow_host}/ui/dags/recent_dag_runs"
        params = {
            "dag_ids": dag_id
        }

        try:
            response = requests.get(
                airflow_url,
                params=params,
                headers=get_headers(),
                timeout=10
            )
            if response.status_code == 401:
                response = requests.get(
                airflow_url,
                params=params,
                headers=get_headers(force_refresh=True),
                timeout=10
            )
            response.raise_for_status()
            return Response(response.json(), status=response.status_code)

        except requests.exceptions.RequestException as e:
            return Response(
                {"error": str(e)},
                status=status.HTTP_502_BAD_GATEWAY
            )




def get_dag_runs(dag_id, limit, current_page, state=None, run_type=None, order_by=None):
    offset = (current_page - 1) * limit

    url = f"{settings.airflow_host}/api/v2/dags/{dag_id}/dagRuns"

    params = {
        "limit": limit,
        "offset": offset,
    }

    if order_by:
        params["order_by"] = order_by

    if state:
        params["state"] = state

    if run_type:
        params["run_type"] = run_type

    response = requests.get(
        url,
        params=params,
        headers=get_headers(),
        timeout=15
    )
    if response.status_code == 401:
        response = requests.get(
        url,
        params=params,
        headers=get_headers(force_refresh=True),
            timeout=15)

    response.raise_for_status()
    return response.json()




class AirflowDagRunsView(APIView):
    """
    GET /api/airflow/dags/<dag_id>/dag-runs/
    """
    authentication_classes = [OAuth2Authentication]
    permission_classes = [CustomIsAuthenticated]

    def get(self, request, dag_id):
        try:
            limit = int(request.query_params.get("limit", 10))
            current_page = int(request.query_params.get("page", 1))
            state = request.query_params.get("state")
            run_type = request.query_params.get("run_type")
            order_by = request.query_params.get("order_by", "-start_date")

            data = get_dag_runs(
                dag_id=dag_id,
                limit=limit,
                current_page=current_page,
                state=state,
                run_type=run_type,
                order_by=order_by
            )

            return Response(data)

        except Exception as e:
            return Response(
                {"error": str(e)},
                status=status.HTTP_502_BAD_GATEWAY
            )



def get_task_instances(dag_id, task_id, limit=14):
    url = (
        f"{settings.airflow_host}"
        f"/api/v2/dags/{dag_id}/dagRuns/~/taskInstances"
    )

    params = {
        "task_id": task_id,
        "order_by": "-run_after",
        "limit": limit,
    }

    response = requests.get(
        url,
        params=params,
        headers=get_headers(),
        timeout=15
    )
    if response.status_code == 401:
        response = requests.get(
            url,
            params=params,
            headers=get_headers(force_refresh=True),
            timeout=15
        )

    response.raise_for_status()
    return response.json()




class AirflowTaskInstancesView(APIView):
    """
    GET /api/airflow/dags/<dag_id>/runs/<run_id>/task-instances/
    """
    authentication_classes = [OAuth2Authentication]
    permission_classes = [CustomIsAuthenticated]

    def get(self, request, dag_id):
        try:
            task_id = request.query_params.get("task_id")
            limit = int(request.query_params.get("limit", 14))

            if not task_id:
                return Response(
                    {"error": "task_id is required"},
                    status=status.HTTP_400_BAD_REQUEST
                )

            data = get_task_instances(
                dag_id=dag_id,
                task_id=task_id,
                limit=limit
            )

            return Response(data)

        except Exception as e:
            return Response(
                {"error": str(e)},
                status=status.HTTP_502_BAD_GATEWAY
            )



def get_task_instance_logs(dag_id, run_id, task_id, try_number=1, map_index=-1):
    url = (
        f"{settings.airflow_host}"
        f"/api/v2/dags/{dag_id}/dagRuns/{run_id}"
        f"/taskInstances/{task_id}/logs/{try_number}"
    )

    params = {
        "map_index": map_index
    }

    response = requests.get(
        url,
        params=params,
        headers=get_headers(),
        timeout=20
    )
    if response.status_code == 401:
        response = requests.get(
        url,
        params=params,
        headers=get_headers(force_refresh=True),
        timeout=20
    )

    response.raise_for_status()
    return response.json()




class AirflowTaskLogsView(APIView):
    """
    GET /api/airflow/dags/<dag_id>/runs/<run_id>/tasks/<task_id>/logs/
    """
    authentication_classes = [OAuth2Authentication]
    permission_classes = [CustomIsAuthenticated]

    def get(self, request, dag_id, run_id, task_id):
        try:
            try_number = int(request.query_params.get("try_number", 1))
            map_index = int(request.query_params.get("map_index", -1))

            data = get_task_instance_logs(
                dag_id,
                run_id,
                task_id,
                try_number,
                map_index
            )

            return Response(data)

        except Exception as e:
            return Response(
                {"error": str(e)},
                status=status.HTTP_502_BAD_GATEWAY
            )


def get_task_instances_of_run(dag_id, run_id):
    url = (
        f"{settings.airflow_host}"
        f"/api/v2/dags/{dag_id}/dagRuns/{run_id}/taskInstances"
    )

    response = requests.get(
        url,
        headers=get_headers(),
        timeout=15
    )
    if response.status_code == 401:
        response = requests.get(
        url,
        headers=get_headers(force_refresh=True),
        timeout=15
    )

    response.raise_for_status()
    return response.json()



class AirflowTaskInstancesOfRunView(APIView):
    """
    GET /api/airflow/dags/<dag_id>/runs/<run_id>/tasks/
    """
    authentication_classes = [OAuth2Authentication]
    permission_classes = [CustomIsAuthenticated]

    def get(self, request, dag_id, run_id):
        try:
            data = get_task_instances_of_run(dag_id, run_id)
            return Response(data)

        except Exception as e:
            return Response(
                {"error": str(e)},
                status=status.HTTP_502_BAD_GATEWAY
            )
        

def get_dag_details(dag_id):
    url = (
        f"{settings.airflow_host}"
        f"/api/v2/dags/{dag_id}/details"
    )

    response = requests.get(
        url,
        headers=get_headers(),
        timeout=20
    )
    if response.status_code == 401:
        response = requests.get(
        url,
        headers=get_headers(force_refresh=True),
        timeout=20
    )

    response.raise_for_status()
    return response.json()


class AirflowDAGDetails(APIView):
    """
    GET /api/airflow/dags/<dag_id>/runs/<run_id>/tasks/
    """
    authentication_classes = [OAuth2Authentication]
    permission_classes = [CustomIsAuthenticated]

    def get(self, request, dag_id):
        try:
            data =get_dag_details(dag_id)
            return Response(data)

        except Exception as e:
            return Response(
                {"error": str(e)},
                status=status.HTTP_502_BAD_GATEWAY
            )