/*
 * Copyright Starburst Data, Inc. All rights reserved.
 *
 * THIS IS UNPUBLISHED PROPRIETARY SOURCE CODE OF STARBURST DATA.
 * The copyright notice above does not evidence any
 * actual or intended publication of such source code.
 *
 * Redistribution of this material is strictly prohibited.
 */
import axios, { Canceler } from "axios";
import { QueryState } from "./queriesApi";
import { baseUrl, trinoBaseUrl } from "./api";
import { Observable, of } from "rxjs";
import { delay, filter, repeat, switchMap, take, tap } from "rxjs/operators";
import { fromPromise } from "rxjs/internal-compatibility";

export interface QueryFailureReason {
  type: string;
  message: string;
  stack: string[];
  cause: QueryFailureReason;
  suppressed: QueryFailureReason[];
  errorCode: {
    code: number;
    name: string;
    type: string;
  };
}

export interface QueryTable {
  catalogName: string;
  schemaName: string;
  tableName: string;
  physicalBytes: number;
  physicalRows: number;
  output: boolean;
}

export interface TaskDetail {
  id: string;
  nodeHost: string;
  state: TaskState;
  queuedSplits: number;
  runningSplits: number;
  blockedSplits: number;
  completedSplits: number;
  rowsRead: number;
  rowsPerSecond: number;
  bytesRead: number;
  bytesPerSecond: number;
  elapsedTime: number;
  cpuTime: number;
  bytesBuffered: number;
}

export interface StageDetail {
  id: string;
  state: StageState;
  scheduledTime: number;
  blockedTime: number;
  cpuTime: number;
  fullyBlocked: boolean;
  inputBuffer: number;
  tasks: TaskDetail[];
}

export type StageState =
  | "PLANNED"
  | "SCHEDULING"
  | "SCHEDULING_SPLITS"
  | "SCHEDULED"
  | "RUNNING"
  | "FLUSHING"
  | "FINISHED"
  | "CANCELED"
  | "ABORTED"
  | "FAILED";

export type TaskState =
  | "PLANNED"
  | "RUNNING"
  | "FLUSHING"
  | "FINISHED"
  | "CANCELED"
  | "ABORTED"
  | "FAILED";

interface AccessedMetaData {
  catalogName: string;
  schema: string;
  table: string;
  columns: string[];
  connectorMetrics: Record<string, { total: number }>;
}
export interface SingleQueryDetails {
  queryId: string;
  state: QueryState;
  queryText: string;
  user: string;
  principal: string;
  groups: string[] | undefined;
  catalog: string;
  schema: string;
  source: string;
  clientAddress: string;
  submissionTime: string;
  completionTime: string;
  elapsedTime: number;
  queuedTime: number;
  analysisTime: number;
  planningTime: number;
  executionTime: number;
  waitingForResourcesTime: number;
  queryPlan: string | undefined;
  failureReason: QueryFailureReason | undefined;
  cpuTime: number;
  scheduledTime: number;
  bytesRead: number;
  bytesWritten: number;
  rowsRead: number;
  rowsWritten: number;
  peakMemory: number;
  cumulativeMemory: number;
  tables?: QueryTable[];
  stages?: StageDetail[];
  authorizedToKillQuery: boolean;
  accessedMetadata: Array<AccessedMetaData>;
}

export const getQueryDetails = (
  queryId: string
): { result: Promise<SingleQueryDetails>; cancel: Canceler } => {
  const source = axios.CancelToken.source();
  const result = axios
    .get(`${baseUrl}/history/queries/${queryId}`, {
      cancelToken: source.token,
    })
    .then((response) => response.data)
    .then(
      (details): SingleQueryDetails => ({
        ...details,
        failureReason: details.failureInfo
          ? JSON.parse(details.failureInfo)
          : undefined,
      })
    );
  return { result, cancel: source.cancel };
};

export interface QueryPlanNodeInfo {
  id: string;
  name: string;
  identifier: string;
  details: string[];
  descriptor: {
    [key: string]: string;
  };
  children: QueryPlanNodeInfo[];
}

export interface StageStats {
  completedDrivers: string;
  bufferedDataSize: string;
  fullyBlocked: boolean;
  outputDataSize: string;
  outputPositions: number;
  queuedDrivers: string;
  rawInputDataSize: string;
  rawInputPositions: number;
  runningDrivers: string;
  totalBlockedTime: string;
  totalCpuTime: string;
  userMemoryReservation: string;
}

export interface QueryPlanStageInfo {
  stageId: string;
  stageStats: StageStats;
  state: StageState;
  plan: QueryPlanNodeInfo;
}

export interface SingleQueryPlanDetails {
  finalQueryInfo: boolean;
  stages: QueryPlanStageInfo[];
}

export const getQueryPlanDetails = (
  queryId: string
): Promise<SingleQueryPlanDetails> => {
  return axios
    .get(`${baseUrl}/history/queries/${queryId}/plan`)
    .then((response) => response.data);
};

export const getQueryPlanDetails$ = (
  queryId: string,
  onNext: (query: SingleQueryPlanDetails) => void,
  fetchDelay = 1000
): Observable<SingleQueryPlanDetails> => {
  return of({})
    .pipe(switchMap(() => fromPromise(getQueryPlanDetails(queryId))))
    .pipe(
      tap(onNext),
      filter(({ finalQueryInfo }) => finalQueryInfo),
      delay(fetchDelay),
      repeat(),
      take(1)
    );
};

export const killQuery = (queryId: string): Promise<void> => {
  return axios.put(`${trinoBaseUrl}/${queryId}/killed`, null);
};
