import {
  AiFeaturesAccessData,
  GenerativeAIStreamingCloseReason,
  type GenerativeAiRequestBase,
} from '../types/generative-ai';
import { EventSourceParserStream, ParsedEvent as streamParsedEvent } from 'eventsource-parser/stream';
import { StiggFeatures, SwimmDocument, asyncIter } from '../shared';
import OpenAI from 'openai';
import { JSONContent } from '@tiptap/core';

export type ParsedEvent = streamParsedEvent;

export type RequestParameters<RequestType extends GenerativeAiRequestBase> = Omit<
  RequestType,
  keyof GenerativeAiRequestBase
>;

abstract class BaseCustomError extends Error {
  protected constructor(message?: string, errorName?: string) {
    super(message);
    this.name = errorName || this.constructor.name;
  }
}
export class QuotaExceededError extends BaseCustomError {
  constructor(message?: string) {
    super(message, GenerativeAIStreamingCloseReason.ENTITLEMENT_LIMIT_REACHED);
  }
}
export class CancelledError extends BaseCustomError {
  constructor(message?: string) {
    super(message, GenerativeAIStreamingCloseReason.STOPPED_FROM_CLIENT);
  }
}

export class RequestFailedError extends BaseCustomError {
  constructor(message: string, public readonly status: number, public readonly error?: unknown) {
    super(message);
    this.name = 'RequestFailedError';
  }
}

export async function* readableStreamToAsyncGenerator<T>(
  stream: ReadableStream<T>,
  shouldCancel: (() => boolean) | null
): AsyncGenerator<T> {
  const reader = stream.getReader();
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (shouldCancel?.()) {
        throw new CancelledError('User cancelled');
      }
      if (done) {
        return;
      }
      yield value;
    }
  } finally {
    reader.releaseLock();
    if (shouldCancel?.()) {
      await stream.cancel();
    }
  }
}

export function getReadableStreamFromResponse(response: Response): ReadableStream<ParsedEvent> {
  return response?.body
    ? (response.body
        .pipeThrough(new TextDecoderStream() as ReadableWritablePair<string, Uint8Array>)
        .pipeThrough(
          new EventSourceParserStream() as ReadableWritablePair<ParsedEvent, string>
        ) as ReadableStream<ParsedEvent>)
    : new ReadableStream();
}

export async function* streamOpenAIResponse({
  response,
  swimmDocument,
  processOpenAIResponseSwmd,
  shouldCancel,
}: {
  response: Response;
  swimmDocument: SwimmDocument;
  processOpenAIResponseSwmd: (response: string, swimmDocContent: JSONContent) => Promise<JSONContent>;
  shouldCancel?: () => boolean;
}): AsyncGenerator<{
  type: string;
  swimmDocument?: SwimmDocument;
  reason?: string;
  code?: GenerativeAIStreamingCloseReason;
}> {
  try {
    const eventStream = getReadableStreamFromResponse(response) as ReadableStream<ParsedEvent>;

    const stream = asyncIter.map(
      readableStreamToAsyncGenerator<ParsedEvent>(eventStream, shouldCancel || null) as AsyncGenerator<ParsedEvent>,
      (event: ParsedEvent) => {
        if (event.id === 'error') {
          throw new Error(`Error streaming text from Azure OpenAI: ${event.data}`);
        }
        return JSON.parse(event.data) as OpenAI.Chat.ChatCompletionChunk;
      }
    );
    let openAIResponse = '';
    const originalFileStructure: SwimmDocument = JSON.parse(JSON.stringify(swimmDocument)) as SwimmDocument;
    const originalContent = swimmDocument.content;
    for await (const message of stream) {
      if (message?.choices?.[0]?.finish_reason === 'stop') {
        yield { type: 'stop', reason: 'Done streaming', code: GenerativeAIStreamingCloseReason.FINISHED_STREAMING };
      }
      if (message?.choices?.[0]?.delta?.content) {
        openAIResponse += message?.choices?.[0]?.delta?.content;
        const newContent = await processOpenAIResponseSwmd(`${openAIResponse}`, originalContent);
        if (newContent) {
          originalFileStructure.content = newContent;
          const swimmDocumentNew = { ...originalFileStructure, content: { content: newContent.content, type: 'doc' } };
          yield { swimmDocument: swimmDocumentNew, type: 'chunk' };
        }
      }
    }
  } catch (error) {
    if (error instanceof QuotaExceededError) {
      yield { type: 'stop', code: GenerativeAIStreamingCloseReason.ENTITLEMENT_LIMIT_REACHED, reason: error.message };
      return;
    } else if (error instanceof CancelledError) {
      yield { type: 'stop', code: GenerativeAIStreamingCloseReason.STOPPED_FROM_CLIENT, reason: error.message };
      return;
    }
    yield {
      type: 'error',
      code: GenerativeAIStreamingCloseReason.ERROR_FROM_CLIENT,
      reason: error.message,
    };
  }
}

export function isAccessAiFeatureAllowed(
  aiFeatureAccessData: AiFeaturesAccessData | undefined,
  plan: StiggFeatures.TEXT_COMPLETION_CAP | StiggFeatures.ASK_SWIMM_CAP
): boolean {
  const MIN_ACCESS_CHECK_INTERVAL = 5 * 60 * 1000; // Every 5 minutes
  const aiFeatureAccess = aiFeatureAccessData?.[plan];

  // If we don't have the access-data stored, it means we didn't check the cap yet. Try to use the feature and update the data afterwards.
  if (!aiFeatureAccess?.lastAccessCheckTimestamp || aiFeatureAccess?.hasAccess === undefined) {
    return true;
  }
  if (Date.now() - aiFeatureAccess.lastAccessCheckTimestamp > MIN_ACCESS_CHECK_INTERVAL) {
    return true;
  }
  return aiFeatureAccess.hasAccess;
}
