Skip to content

API Reference

Client

The main client interface for interacting with LLMs.

OpenPO

Main client class for interacting with various LLM providers.

This class serves as the primary interface for making completion requests to different language model providers. OpenPO takes optional api_key arguments for initialization.

Source code in openpo/client.py
class OpenPO:
    """
    Main client class for interacting with various LLM providers.

    This class serves as the primary interface for making completion requests to different
    language model providers. OpenPO takes optional api_key arguments for initialization.

    """

    def __init__(
        self,
        hf_api_key: Optional[str] = None,
        openrouter_api_key: Optional[str] = None,
        openai_api_key: Optional[str] = None,
        anthropic_api_key: Optional[str] = None,
    ):
        self.hf_api_key = hf_api_key or os.getenv("HF_API_KEY")
        self.openrouter_api_key = openrouter_api_key or os.getenv("OPENROUTER_API_KEY")
        self.openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY")
        self.anthropic_api_key = anthropic_api_key or os.getenv("ANTHROPIC_API_KEY")

        self._completion = Completion(self)
        self._eval = Evaluation(self)
        self._batch = Batch(self)

    def _get_model_provider(self, model: str) -> str:
        try:
            return model.split("/")[0]
        except IndexError:
            raise ValueError("Invalid model format. Expected format: provider/model-id")

    def _get_model_id(self, model: str) -> str:
        try:
            return model.split("/", 1)[1]
        except IndexError:
            raise ValueError("Invalid model format. Expected format: provider/model-id")

    def _get_provider_instance(self, provider: str):
        if provider == "huggingface":
            if not self.hf_api_key:
                raise AuthenticationError("HuggingFace")

            return HuggingFace(api_key=self.hf_api_key)

        if provider == "openrouter":
            if not self.openrouter_api_key:
                raise AuthenticationError("OpenRouter")

            return OpenRouter(api_key=self.openrouter_api_key)

        if provider == "openai":
            if not self.openai_api_key:
                raise AuthenticationError("OpenAI")

            return OpenAI(api_key=self.openai_api_key)

        if provider == "anthropic":
            if not self.anthropic_api_key:
                raise AuthenticationError("Anthropic")

            return Anthropic(api_key=self.anthropic_api_key)

        raise ProviderError(provider, "Unsupported model provider")

    @property
    def completion(self):
        """Access the chat completion functionality for LLM response.µ
        This property provides access to completion interfacce.

        Returns:
            Completion: An instance of the Completion class that provides method
                        for generatng response from LLM.
        """
        return self._completion

    @property
    def evaluate(self):
        """Access the evaluation functionality for LLM responses.
        This property provides access to the evaluation interface.

        Returns:
            Evaluation: An instance of the Evaluation class that provides methods
                       for evaluating and comparing LLM outputs.
        """
        return self._eval

    @property
    def batch(self):
        """Access the batch processing functionality for LLM operations.
        This property provides access to the batch processing interface

        Returns:
            Batch: An instance of the Batch class that provides methods for
                  processing multiple LLM requests efficiently.
        """
        return self._batch

completion property

completion

Access the chat completion functionality for LLM response.µ This property provides access to completion interfacce.

Returns:

Name Type Description
Completion

An instance of the Completion class that provides method for generatng response from LLM.

evaluate property

evaluate

Access the evaluation functionality for LLM responses. This property provides access to the evaluation interface.

Returns:

Name Type Description
Evaluation

An instance of the Evaluation class that provides methods for evaluating and comparing LLM outputs.

batch property

batch

Access the batch processing functionality for LLM operations. This property provides access to the batch processing interface

Returns:

Name Type Description
Batch

An instance of the Batch class that provides methods for processing multiple LLM requests efficiently.

Inference

VLLM

VLLM provider class for high-performance inference using the vLLM engine.

This class provides an interface to the vLLM engine for running various LLMs locally.

Attributes:

Name Type Description
model

An instance of vLLM's LLM class that handles the model operations.

Parameters:

Name Type Description Default
model str

The name or path of the model to load (e.g., 'meta-llama/Llama-2-7b-chat-hf').

required
**kwargs

Additional keyword arguments passed to vLLM's LLM initialization. These can include parameters like tensor_parallel_size, gpu_memory_utilization, etc.

{}

Raises:

Type Description
ImportError

If the vLLM package is not installed. The error message includes installation instructions.

Source code in openpo/resources/provider/vllm.py
class VLLM:
    """VLLM provider class for high-performance inference using the vLLM engine.

    This class provides an interface to the vLLM engine for running various LLMs locally.

    Attributes:
        model: An instance of vLLM's LLM class that handles the model operations.

    Args:
        model (str): The name or path of the model to load (e.g., 'meta-llama/Llama-2-7b-chat-hf').
        **kwargs: Additional keyword arguments passed to vLLM's LLM initialization.
            These can include parameters like tensor_parallel_size, gpu_memory_utilization, etc.

    Raises:
        ImportError: If the vLLM package is not installed. The error message includes
            installation instructions.
    """

    def __init__(self, model: str, **kwargs) -> None:
        try:
            from vllm import LLM, SamplingParams
        except ImportError:
            raise ImportError(
                "vLLM requires additional dependencies. Install with: pip install openpo[eval]"
            )

        self.model = LLM(model=model, **kwargs)
        self.SamplingParams = SamplingParams

    def generate(
        self,
        messages: List,
        use_tqdm: bool = True,
        lora_request=None,
        chat_template=None,
        chat_template_content_format="auto",
        add_generation_prompt: bool = True,
        continue_final_message: bool = False,
        tools: Optional[List[Dict[str, Any]]] = None,
        mm_processor_kwargs: Optional[Dict[str, Any]] = None,
        sampling_params: Optional[Dict] = {},
    ) -> List:
        """Generate responses using the vLLM model.

        This method processes input messages and generates responses using the loaded model.
        It supports various generation parameters and features like chat templates, LoRA
        adapters, and tool-based interactions.

        Args:
            messages (List): List of input messages to process.
            use_tqdm (bool, optional): Whether to show a progress bar. Defaults to True.
            lora_request: Optional LoRA adapter configuration for on-the-fly model adaptation.
            chat_template: Optional template for formatting chat messages.
            chat_template_content_format (str, optional): Format for the chat template content.
                Defaults to "auto".
            add_generation_prompt (bool, optional): Whether to add generation prompt.
                Defaults to True.
            continue_final_message (bool, optional): Whether to continue from the final message.
                Defaults to False.
            tools (Optional[List[Dict[str, Any]]], optional): List of tools available for the model.
            mm_processor_kwargs (Optional[Dict[str, Any]], optional): Additional keyword arguments
                for multimodal processing.
            sampling_params (Optional[dict]): Model specific parameters passed to vLLM's SamplingParams.

        Returns:
            The generated vLLM output from the model.

        Raises:
            ProviderError: If generation fails, with details about the error.
        """
        try:
            params = self.SamplingParams(**sampling_params)
            res = self.model.chat(
                messages=messages,
                use_tqdm=use_tqdm,
                lora_request=lora_request,
                chat_template=chat_template,
                chat_template_content_format=chat_template_content_format,
                add_generation_prompt=add_generation_prompt,
                continue_final_message=continue_final_message,
                tools=tools,
                mm_processor_kwargs=mm_processor_kwargs,
                sampling_params=params,
            )

            return res
        except Exception as e:
            raise ProviderError(
                provider="vllm", message=f"model inference failed: {str(e)}"
            )

generate

generate(messages: List, use_tqdm: bool = True, lora_request=None, chat_template=None, chat_template_content_format='auto', add_generation_prompt: bool = True, continue_final_message: bool = False, tools: Optional[List[Dict[str, Any]]] = None, mm_processor_kwargs: Optional[Dict[str, Any]] = None, sampling_params: Optional[Dict] = {}) -> List

Generate responses using the vLLM model.

This method processes input messages and generates responses using the loaded model. It supports various generation parameters and features like chat templates, LoRA adapters, and tool-based interactions.

Parameters:

Name Type Description Default
messages List

List of input messages to process.

required
use_tqdm bool

Whether to show a progress bar. Defaults to True.

True
lora_request

Optional LoRA adapter configuration for on-the-fly model adaptation.

None
chat_template

Optional template for formatting chat messages.

None
chat_template_content_format str

Format for the chat template content. Defaults to "auto".

'auto'
add_generation_prompt bool

Whether to add generation prompt. Defaults to True.

True
continue_final_message bool

Whether to continue from the final message. Defaults to False.

False
tools Optional[List[Dict[str, Any]]]

List of tools available for the model.

None
mm_processor_kwargs Optional[Dict[str, Any]]

Additional keyword arguments for multimodal processing.

None
sampling_params Optional[dict]

Model specific parameters passed to vLLM's SamplingParams.

{}

Returns:

Type Description
List

The generated vLLM output from the model.

Raises:

Type Description
ProviderError

If generation fails, with details about the error.

Source code in openpo/resources/provider/vllm.py
def generate(
    self,
    messages: List,
    use_tqdm: bool = True,
    lora_request=None,
    chat_template=None,
    chat_template_content_format="auto",
    add_generation_prompt: bool = True,
    continue_final_message: bool = False,
    tools: Optional[List[Dict[str, Any]]] = None,
    mm_processor_kwargs: Optional[Dict[str, Any]] = None,
    sampling_params: Optional[Dict] = {},
) -> List:
    """Generate responses using the vLLM model.

    This method processes input messages and generates responses using the loaded model.
    It supports various generation parameters and features like chat templates, LoRA
    adapters, and tool-based interactions.

    Args:
        messages (List): List of input messages to process.
        use_tqdm (bool, optional): Whether to show a progress bar. Defaults to True.
        lora_request: Optional LoRA adapter configuration for on-the-fly model adaptation.
        chat_template: Optional template for formatting chat messages.
        chat_template_content_format (str, optional): Format for the chat template content.
            Defaults to "auto".
        add_generation_prompt (bool, optional): Whether to add generation prompt.
            Defaults to True.
        continue_final_message (bool, optional): Whether to continue from the final message.
            Defaults to False.
        tools (Optional[List[Dict[str, Any]]], optional): List of tools available for the model.
        mm_processor_kwargs (Optional[Dict[str, Any]], optional): Additional keyword arguments
            for multimodal processing.
        sampling_params (Optional[dict]): Model specific parameters passed to vLLM's SamplingParams.

    Returns:
        The generated vLLM output from the model.

    Raises:
        ProviderError: If generation fails, with details about the error.
    """
    try:
        params = self.SamplingParams(**sampling_params)
        res = self.model.chat(
            messages=messages,
            use_tqdm=use_tqdm,
            lora_request=lora_request,
            chat_template=chat_template,
            chat_template_content_format=chat_template_content_format,
            add_generation_prompt=add_generation_prompt,
            continue_final_message=continue_final_message,
            tools=tools,
            mm_processor_kwargs=mm_processor_kwargs,
            sampling_params=params,
        )

        return res
    except Exception as e:
        raise ProviderError(
            provider="vllm", message=f"model inference failed: {str(e)}"
        )

Completion

Source code in openpo/resources/completion/completion.py
class Completion:
    def __init__(self, client):
        self.client = client

    def generate(
        self,
        model: Union[str, List[str]],
        messages: List[Dict[str, Any]],
        params: Optional[Dict[str, Any]] = None,
    ) -> List[ChatCompletionOutput]:
        """Generate completions using the specified LLM provider.

        Args:
            model (str, List[str]): model identifier or list of model identifiers to use for generation. Follows <provider>/<model-identifier> format.
            messages (List[Dict[str, Any]]): List of message dictionaries containing
                the conversation history and prompts.
            params (Optional[Dict[str, Any]]): Additional model parameters for the request (e.g., temperature, max_tokens).

        Returns:
            The response from the LLM provider containing the generated completions.

        Raises:
            AuthenticationError: If required API keys are missing or invalid.
            ProviderError: For provider-specific errors during completion generation.
            ValueError: If the model format is invalid.
        """
        if isinstance(model, str):
            try:
                provider = self.client._get_model_provider(model=model)
                model_id = self.client._get_model_id(model=model)
                llm = self.client._get_provider_instance(provider=provider)

                res = llm.generate(model=model_id, messages=messages, params=params)
                return res
            except Exception as e:
                raise ProviderError(
                    provider=provider,
                    message=f"Failed to execute chat completions: {str(e)}",
                )
        responses = []
        for m in model:
            try:
                provider = self.client._get_model_provider(model=m)
                model_id = self.client._get_model_id(model=m)
                llm = self.client._get_provider_instance(provider=provider)

                res = llm.generate(model=model_id, messages=messages, params=params)
                responses.append(res)
            except (AuthenticationError, ValueError) as e:
                # Re-raise authentication and validation errors as is
                raise e
            except Exception as e:
                raise ProviderError(
                    provider=provider,
                    message=f"Failed to execute chat completions: {str(e)}",
                )
        return responses

generate

generate(model: Union[str, List[str]], messages: List[Dict[str, Any]], params: Optional[Dict[str, Any]] = None) -> List[ChatCompletionOutput]

Generate completions using the specified LLM provider.

Parameters:

Name Type Description Default
model (str, List[str])

model identifier or list of model identifiers to use for generation. Follows / format.

required
messages List[Dict[str, Any]]

List of message dictionaries containing the conversation history and prompts.

required
params Optional[Dict[str, Any]]

Additional model parameters for the request (e.g., temperature, max_tokens).

None

Returns:

Type Description
List[ChatCompletionOutput]

The response from the LLM provider containing the generated completions.

Raises:

Type Description
AuthenticationError

If required API keys are missing or invalid.

ProviderError

For provider-specific errors during completion generation.

ValueError

If the model format is invalid.

Source code in openpo/resources/completion/completion.py
def generate(
    self,
    model: Union[str, List[str]],
    messages: List[Dict[str, Any]],
    params: Optional[Dict[str, Any]] = None,
) -> List[ChatCompletionOutput]:
    """Generate completions using the specified LLM provider.

    Args:
        model (str, List[str]): model identifier or list of model identifiers to use for generation. Follows <provider>/<model-identifier> format.
        messages (List[Dict[str, Any]]): List of message dictionaries containing
            the conversation history and prompts.
        params (Optional[Dict[str, Any]]): Additional model parameters for the request (e.g., temperature, max_tokens).

    Returns:
        The response from the LLM provider containing the generated completions.

    Raises:
        AuthenticationError: If required API keys are missing or invalid.
        ProviderError: For provider-specific errors during completion generation.
        ValueError: If the model format is invalid.
    """
    if isinstance(model, str):
        try:
            provider = self.client._get_model_provider(model=model)
            model_id = self.client._get_model_id(model=model)
            llm = self.client._get_provider_instance(provider=provider)

            res = llm.generate(model=model_id, messages=messages, params=params)
            return res
        except Exception as e:
            raise ProviderError(
                provider=provider,
                message=f"Failed to execute chat completions: {str(e)}",
            )
    responses = []
    for m in model:
        try:
            provider = self.client._get_model_provider(model=m)
            model_id = self.client._get_model_id(model=m)
            llm = self.client._get_provider_instance(provider=provider)

            res = llm.generate(model=model_id, messages=messages, params=params)
            responses.append(res)
        except (AuthenticationError, ValueError) as e:
            # Re-raise authentication and validation errors as is
            raise e
        except Exception as e:
            raise ProviderError(
                provider=provider,
                message=f"Failed to execute chat completions: {str(e)}",
            )
    return responses

Evaluation

Source code in openpo/resources/eval/eval.py
class Evaluation:
    def __init__(self, client):
        self.client = client

    def _validate_provider(self, provider: str) -> None:
        if provider not in ["openai", "anthropic"]:
            raise ProviderError(provider, "Provider not supported for evaluation")

    def _parse_response(self, response) -> List[Dict]:
        try:
            if "chatcmpl" in response.id:
                return json.loads(response.choices[0].message.content)["evaluation"]
            return response.content[0].input["evaluation"]
        except Exception as e:
            raise Exception(f"Error parsing model responses: {e}")

    def eval(
        self,
        model: Union[str, List[str]],
        questions: List[str],
        responses: List[List[str]],
        prompt: Optional[str] = None,
    ) -> List[Dict]:
        """Evaluate responses using either single or multiple LLMs as judges.

        Args:
            model (str, List[str]): model identifier or list of them to use as a judge. Follows provider/model-identifier format.
            questions (List[str]): Questions for each response pair.
            responses (List[List[str]]): Pairwise responses to evaluate.
            prompt (str): Optional custom prompt for judge model to follow.

        Returns:
            List[Dict]: The evaluation data for responses. Response returns preferred, rejected, confidence_score and reason.

        Raises:
            AuthenticationError: If required API keys are missing or invalid.
            ProviderError: For provider-specific errors during evaluation.
            ValueError: If the model format is invalid or required models are missing.
        """
        if isinstance(model, str):
            try:
                provider = self.client._get_model_provider(model)
                model_id = self.client._get_model_id(model)

                self._validate_provider(provider)

                llm = self.client._get_provider_instance(provider)
                res = llm.generate(
                    model=model_id,
                    questions=questions,
                    responses=responses,
                    prompt=prompt if prompt else None,
                )
                return res
            except Exception as e:
                raise ProviderError(
                    provider=provider, message=f"Error during evaluation: {str(e)}"
                )

        eval_res = []
        for m in model:
            try:
                provider = self.client._get_model_provider(m)
                model_id = self.client._get_model_id(m)

                self._validate_provider(provider)

                llm = self.client._get_provider_instance(provider=provider)
                res = llm.generate(
                    model=model_id,
                    questions=questions,
                    responses=responses,
                    prompt=prompt if prompt else None,
                )
                eval_res.append(res)
            except Exception as e:
                raise ProviderError(
                    provider=provider, message=f"Error during evaluation: {str(e)}"
                )
        return eval_res

    def get_consensus(self, eval_A: List, eval_B: List) -> List:
        """Reach consensus between two evaluation results

        Args:
            eval_A (List): List of batch results to compare
            eval_B (List): List of batch results to compare

        Returns:
            List: List of evaluation results where both providers agreed on the rank

        Raises:
            Exception: If there's an error processing the batch results
        """
        try:
            parsed_a = self._parse_response(
                response=eval_A,
            )
            parsed_b = self._parse_response(
                response=eval_B,
            )

            res = []
            check = {}

            for e in parsed_a:
                q_index = e["q_index"]
                check[q_index] = e["rank"]

            for e in parsed_b:
                q_index = e["q_index"]
                if q_index in check and check[q_index] == e["rank"]:
                    res.append(e)
            return res
        except Exception as e:
            raise Exception(f"Error processing responses for consensus: {str(e)}")

eval

eval(model: Union[str, List[str]], questions: List[str], responses: List[List[str]], prompt: Optional[str] = None) -> List[Dict]

Evaluate responses using either single or multiple LLMs as judges.

Parameters:

Name Type Description Default
model (str, List[str])

model identifier or list of them to use as a judge. Follows provider/model-identifier format.

required
questions List[str]

Questions for each response pair.

required
responses List[List[str]]

Pairwise responses to evaluate.

required
prompt str

Optional custom prompt for judge model to follow.

None

Returns:

Type Description
List[Dict]

List[Dict]: The evaluation data for responses. Response returns preferred, rejected, confidence_score and reason.

Raises:

Type Description
AuthenticationError

If required API keys are missing or invalid.

ProviderError

For provider-specific errors during evaluation.

ValueError

If the model format is invalid or required models are missing.

Source code in openpo/resources/eval/eval.py
def eval(
    self,
    model: Union[str, List[str]],
    questions: List[str],
    responses: List[List[str]],
    prompt: Optional[str] = None,
) -> List[Dict]:
    """Evaluate responses using either single or multiple LLMs as judges.

    Args:
        model (str, List[str]): model identifier or list of them to use as a judge. Follows provider/model-identifier format.
        questions (List[str]): Questions for each response pair.
        responses (List[List[str]]): Pairwise responses to evaluate.
        prompt (str): Optional custom prompt for judge model to follow.

    Returns:
        List[Dict]: The evaluation data for responses. Response returns preferred, rejected, confidence_score and reason.

    Raises:
        AuthenticationError: If required API keys are missing or invalid.
        ProviderError: For provider-specific errors during evaluation.
        ValueError: If the model format is invalid or required models are missing.
    """
    if isinstance(model, str):
        try:
            provider = self.client._get_model_provider(model)
            model_id = self.client._get_model_id(model)

            self._validate_provider(provider)

            llm = self.client._get_provider_instance(provider)
            res = llm.generate(
                model=model_id,
                questions=questions,
                responses=responses,
                prompt=prompt if prompt else None,
            )
            return res
        except Exception as e:
            raise ProviderError(
                provider=provider, message=f"Error during evaluation: {str(e)}"
            )

    eval_res = []
    for m in model:
        try:
            provider = self.client._get_model_provider(m)
            model_id = self.client._get_model_id(m)

            self._validate_provider(provider)

            llm = self.client._get_provider_instance(provider=provider)
            res = llm.generate(
                model=model_id,
                questions=questions,
                responses=responses,
                prompt=prompt if prompt else None,
            )
            eval_res.append(res)
        except Exception as e:
            raise ProviderError(
                provider=provider, message=f"Error during evaluation: {str(e)}"
            )
    return eval_res

get_consensus

get_consensus(eval_A: List, eval_B: List) -> List

Reach consensus between two evaluation results

Parameters:

Name Type Description Default
eval_A List

List of batch results to compare

required
eval_B List

List of batch results to compare

required

Returns:

Name Type Description
List List

List of evaluation results where both providers agreed on the rank

Raises:

Type Description
Exception

If there's an error processing the batch results

Source code in openpo/resources/eval/eval.py
def get_consensus(self, eval_A: List, eval_B: List) -> List:
    """Reach consensus between two evaluation results

    Args:
        eval_A (List): List of batch results to compare
        eval_B (List): List of batch results to compare

    Returns:
        List: List of evaluation results where both providers agreed on the rank

    Raises:
        Exception: If there's an error processing the batch results
    """
    try:
        parsed_a = self._parse_response(
            response=eval_A,
        )
        parsed_b = self._parse_response(
            response=eval_B,
        )

        res = []
        check = {}

        for e in parsed_a:
            q_index = e["q_index"]
            check[q_index] = e["rank"]

        for e in parsed_b:
            q_index = e["q_index"]
            if q_index in check and check[q_index] == e["rank"]:
                res.append(e)
        return res
    except Exception as e:
        raise Exception(f"Error processing responses for consensus: {str(e)}")

Batch

Source code in openpo/resources/batch/batch.py
class Batch:
    def __init__(self, client):
        self.client = client
        self._openai_client = None
        self._anthropic_client = None

    @property
    def openai_client(self):
        if self._openai_client is None:
            from openai import OpenAI as OpenAIClient

            if not self.client.openai_api_key:
                raise AuthenticationError("OpenAI")
            self._openai_client = OpenAIClient(api_key=self.client.openai_api_key)
        return self._openai_client

    @property
    def anthropic_client(self):
        if self._anthropic_client is None:
            from anthropic import Anthropic as AnthropicClient

            if not self.client.anthropic_api_key:
                raise AuthenticationError("Anthropic")
            self._anthropic_client = AnthropicClient(
                api_key=self.client.anthropic_api_key
            )
        return self._anthropic_client

    def _validate_provider(self, provider: str) -> None:
        if provider not in ["openai", "anthropic"]:
            raise ProviderError(provider, "Provider not supported for evaluation")

    def eval(
        self,
        model: Union[str, List[str]],
        questions: List[str],
        responses: List[List[str]],
        prompt: Optional[str] = None,
    ):
        """Use input model as a judge to evaluate responses.

        Args:
            model (str, List[str]): model identifier or list of them to use as a judge. Follows provider/model-identifier format.
            questions (List(str)): Questions for each response pair.
            responses (List[List[str]]): Pairwise responses to evaluate.
            prompt (str): Optional custom prompt for judge model to follow.

        Returns (Dict): The evaluation data for responses with preferred, rejected, confidence_score and reason.

        Raises:
            AuthenticationError: If required API keys are missing or invalid.
            ProviderError: For provider-specific errors during evaluation.
            ValueError: If the model format is invalid or provider is not supported.
        """
        if isinstance(model, str):
            try:
                provider = self.client._get_model_provider(model)
                model_id = self.client._get_model_id(model)

                self._validate_provider(provider)

                llm = self.client._get_provider_instance(provider=provider)
                res = llm.generate_batch(
                    model=model_id,
                    questions=questions,
                    responses=responses,
                    prompt=prompt if prompt else None,
                )
                return res
            except Exception as e:
                raise ProviderError(
                    provider=provider,
                    message=f"Error during batch processing: {str(e)}",
                )

        result = []
        for m in model:
            try:
                provider = self.client._get_model_provider(m)
                model_id = self.client._get_model_id(m)

                self._validate_provider(provider)

                llm = self.client._get_provider_instance(provider=provider)
                res = llm.generate_batch(
                    model=model_id,
                    questions=questions,
                    responses=responses,
                    prompt=prompt if prompt else None,
                )
                result.append(res)
            except Exception as e:
                raise ProviderError(
                    provider=provider,
                    message=f"Error during batch processing: {str(e)}",
                )

        return result

    def check_status(self, batch_id: str):
        if batch_id.split("_")[0] == "batch":
            status = self.openai_client.batches.retrieve(batch_id)
        else:
            status = self.anthropic_client.beta.messages.batches.retrieve(batch_id)

        return status

    def load_batch(self, filename: str, provider: str):
        data = []
        if provider == "openai":
            res = self.openai_client.files.content(filename)

            for line in res.text.splitlines():
                if line.strip():  # Skip empty lines
                    data.append(json.loads(line))

            return data

        if provider == "anthropic":
            res = self.anthropic_client.beta.messages.batches.results(filename)
            for r in res:
                data.append(r)

            return data

    def get_consensus(
        self,
        batch_A: List,
        batch_B: List,
    ) -> List[Dict]:
        """Reach consensus between two batch results.

        Args:
            batch_A (List): List of batch results to compare
            batch_B (List): List of batch results to compare

        Returns:
            List[Dict]: List of evaluation results where both providers agree on

        Raises:
            Exception: If there's an error processing the batch results
        """
        try:
            # uses dictionary to keep record of index and rank
            # only requires single pass on batch data to reach consensus.
            res = []
            check = {}
            for r in batch_A:
                # check if batch is from openai
                if isinstance(r, dict):
                    custom_id = r["custom_id"]
                    content = json.loads(
                        r["response"]["body"]["choices"][0]["message"]["content"]
                    )
                else:
                    custom_id = r.custom_id
                    content = r.result.message.content[0].input

                check[custom_id] = content["evaluation"][0]["rank"]

            for r in batch_B:
                if isinstance(r, dict):
                    custom_id = r["custom_id"]
                    content = json.loads(
                        r["response"]["body"]["choices"][0]["message"]["content"]
                    )
                else:
                    custom_id = r.custom_id
                    content = r.result.message.content[0].input

                if (
                    custom_id in check
                    and check[custom_id] == content["evaluation"][0]["rank"]
                ):
                    record = {"q_index": custom_id} | content["evaluation"][0]
                    res.append(record)

            return res
        except Exception as e:
            raise Exception(f"Error processing batch results: {str(e)}")

eval

eval(model: Union[str, List[str]], questions: List[str], responses: List[List[str]], prompt: Optional[str] = None)

Use input model as a judge to evaluate responses.

Parameters:

Name Type Description Default
model (str, List[str])

model identifier or list of them to use as a judge. Follows provider/model-identifier format.

required
questions List(str

Questions for each response pair.

required
responses List[List[str]]

Pairwise responses to evaluate.

required
prompt str

Optional custom prompt for judge model to follow.

None

Returns (Dict): The evaluation data for responses with preferred, rejected, confidence_score and reason.

Raises:

Type Description
AuthenticationError

If required API keys are missing or invalid.

ProviderError

For provider-specific errors during evaluation.

ValueError

If the model format is invalid or provider is not supported.

Source code in openpo/resources/batch/batch.py
def eval(
    self,
    model: Union[str, List[str]],
    questions: List[str],
    responses: List[List[str]],
    prompt: Optional[str] = None,
):
    """Use input model as a judge to evaluate responses.

    Args:
        model (str, List[str]): model identifier or list of them to use as a judge. Follows provider/model-identifier format.
        questions (List(str)): Questions for each response pair.
        responses (List[List[str]]): Pairwise responses to evaluate.
        prompt (str): Optional custom prompt for judge model to follow.

    Returns (Dict): The evaluation data for responses with preferred, rejected, confidence_score and reason.

    Raises:
        AuthenticationError: If required API keys are missing or invalid.
        ProviderError: For provider-specific errors during evaluation.
        ValueError: If the model format is invalid or provider is not supported.
    """
    if isinstance(model, str):
        try:
            provider = self.client._get_model_provider(model)
            model_id = self.client._get_model_id(model)

            self._validate_provider(provider)

            llm = self.client._get_provider_instance(provider=provider)
            res = llm.generate_batch(
                model=model_id,
                questions=questions,
                responses=responses,
                prompt=prompt if prompt else None,
            )
            return res
        except Exception as e:
            raise ProviderError(
                provider=provider,
                message=f"Error during batch processing: {str(e)}",
            )

    result = []
    for m in model:
        try:
            provider = self.client._get_model_provider(m)
            model_id = self.client._get_model_id(m)

            self._validate_provider(provider)

            llm = self.client._get_provider_instance(provider=provider)
            res = llm.generate_batch(
                model=model_id,
                questions=questions,
                responses=responses,
                prompt=prompt if prompt else None,
            )
            result.append(res)
        except Exception as e:
            raise ProviderError(
                provider=provider,
                message=f"Error during batch processing: {str(e)}",
            )

    return result

get_consensus

get_consensus(batch_A: List, batch_B: List) -> List[Dict]

Reach consensus between two batch results.

Parameters:

Name Type Description Default
batch_A List

List of batch results to compare

required
batch_B List

List of batch results to compare

required

Returns:

Type Description
List[Dict]

List[Dict]: List of evaluation results where both providers agree on

Raises:

Type Description
Exception

If there's an error processing the batch results

Source code in openpo/resources/batch/batch.py
def get_consensus(
    self,
    batch_A: List,
    batch_B: List,
) -> List[Dict]:
    """Reach consensus between two batch results.

    Args:
        batch_A (List): List of batch results to compare
        batch_B (List): List of batch results to compare

    Returns:
        List[Dict]: List of evaluation results where both providers agree on

    Raises:
        Exception: If there's an error processing the batch results
    """
    try:
        # uses dictionary to keep record of index and rank
        # only requires single pass on batch data to reach consensus.
        res = []
        check = {}
        for r in batch_A:
            # check if batch is from openai
            if isinstance(r, dict):
                custom_id = r["custom_id"]
                content = json.loads(
                    r["response"]["body"]["choices"][0]["message"]["content"]
                )
            else:
                custom_id = r.custom_id
                content = r.result.message.content[0].input

            check[custom_id] = content["evaluation"][0]["rank"]

        for r in batch_B:
            if isinstance(r, dict):
                custom_id = r["custom_id"]
                content = json.loads(
                    r["response"]["body"]["choices"][0]["message"]["content"]
                )
            else:
                custom_id = r.custom_id
                content = r.result.message.content[0].input

            if (
                custom_id in check
                and check[custom_id] == content["evaluation"][0]["rank"]
            ):
                record = {"q_index": custom_id} | content["evaluation"][0]
                res.append(record)

        return res
    except Exception as e:
        raise Exception(f"Error processing batch results: {str(e)}")

Evaluation Model

PairRM

A class that implements the Pairwise Rewards Model (PairRM) for evaluating and ranking LLM responses.

This class uses the llm-blender package to load and utilize the PairRM model, which can rank multiple responses for a given prompt based on their quality.

Source code in openpo/resources/pairrm/pairrm.py
class PairRM:
    """
    A class that implements the Pairwise Rewards Model (PairRM) for evaluating and ranking LLM responses.

    This class uses the llm-blender package to load and utilize the PairRM model, which can rank
    multiple responses for a given prompt based on their quality.

    """

    def __init__(self):
        try:
            import llm_blender
        except ImportError:
            raise ImportError(
                "PairRM requires additional dependencies. Install with: pip install openpo[eval]"
            )

        self.blender = llm_blender.Blender()
        self.blender.loadranker("llm-blender/PairRM")

    def _format_preference(
        self,
        ranks: List,
        prompts: List,
        responses: List,
    ) -> List[dict]:
        dataset = []

        for i in range(len(prompts)):
            try:
                dataset.append(
                    {
                        "prompt": prompts[i],
                        "preferred": responses[i][np.where(ranks[i] == 1)[0][0]],
                        "rejected": responses[i][
                            np.where(ranks[i] == max(ranks[i]))[0][0]
                        ],
                        "ranks": ranks[i],
                    }
                )
            except (ValueError, IndexError):
                print(f"Skipping index {i} due to ranking issues.")
                continue

        return dataset

    def eval(
        self,
        prompts: List,
        responses: List,
    ) -> List[dict]:
        """
        Evaluates and ranks multiple responses for given prompts.

        Args:
            prompts (List): List of input prompts to evaluate.
            responses (List): List of response sets to be ranked.
                Each response set should contain multiple responses for the corresponding prompt.

        Returns:
            List[dict]: A formatted list of preference data containing the ranking results.
                See _format_preference method for the structure of the returned data.
        """
        ranks = self.blender.rank(prompts, responses)
        return self._format_preference(ranks, prompts, responses)

eval

eval(prompts: List, responses: List) -> List[dict]

Evaluates and ranks multiple responses for given prompts.

Parameters:

Name Type Description Default
prompts List

List of input prompts to evaluate.

required
responses List

List of response sets to be ranked. Each response set should contain multiple responses for the corresponding prompt.

required

Returns:

Type Description
List[dict]

List[dict]: A formatted list of preference data containing the ranking results. See _format_preference method for the structure of the returned data.

Source code in openpo/resources/pairrm/pairrm.py
def eval(
    self,
    prompts: List,
    responses: List,
) -> List[dict]:
    """
    Evaluates and ranks multiple responses for given prompts.

    Args:
        prompts (List): List of input prompts to evaluate.
        responses (List): List of response sets to be ranked.
            Each response set should contain multiple responses for the corresponding prompt.

    Returns:
        List[dict]: A formatted list of preference data containing the ranking results.
            See _format_preference method for the structure of the returned data.
    """
    ranks = self.blender.rank(prompts, responses)
    return self._format_preference(ranks, prompts, responses)

Prometheus2

A class that implements the Prometheus2 evaluation model for assessing LLM responses.

This class provides methods for both relative and absolute evaluation of LLM responses using different rubrics such as factual validity, helpfulness, honesty, and reasoning.

Source code in openpo/resources/prometheus2/prometheus2.py
class Prometheus2:
    """
    A class that implements the Prometheus2 evaluation model for assessing LLM responses.

    This class provides methods for both relative and absolute evaluation of LLM responses
    using different rubrics such as factual validity, helpfulness, honesty, and reasoning.

    """

    def __init__(self, model):
        try:
            from prometheus_eval import PrometheusEval
            from prometheus_eval.prompts import (
                ABSOLUTE_PROMPT_WO_REF,
                FACTUAL_VALIDITY_RUBRIC,
                HARMLESSNESS_RUBRIC,
                HELPFULNESS_RUBRIC,
                HONESTY_RUBRIC,
                REASONING_RUBRIC,
                RELATIVE_PROMPT_WO_REF,
            )

            self.PrometheusEval = PrometheusEval
            self.ABSOLUTE_PROMPT_WO_REF = ABSOLUTE_PROMPT_WO_REF
            self.RELATIVE_PROMPT_WO_REF = RELATIVE_PROMPT_WO_REF
        except ImportError:
            raise ImportError(
                "Prometheus2 requires additional dependencies. Install with: pip install openpo[eval]"
            )
        try:
            self.model = VLLM(model=model)
        except Exception as e:
            raise ProviderError(
                "Prometheus2",
                message=f"failed to initialize Prometheus model: {str(e)}",
            )
        self.rubric_mapping = {
            "factual-validity": FACTUAL_VALIDITY_RUBRIC,
            "helpfulness": HELPFULNESS_RUBRIC,
            "honesty": HONESTY_RUBRIC,
            "harmlessness": HARMLESSNESS_RUBRIC,
            "reasoning": REASONING_RUBRIC,
        }

    def _format_absolute(
        self,
        scores: List[int],
        instructions: List[str],
        responses: List[str],
        feedbacks: List[str],
    ) -> List[dict]:

        dataset = []
        for i in range(len(instructions)):
            dataset.append(
                {
                    "prompt": instructions[i],
                    "response": responses[i],
                    "score": scores[i],
                    "feedback": feedbacks[i],
                }
            )

        return dataset

    def _format_relative(
        self,
        instructions: List[str],
        responses_A: List[str],
        responses_B: List[str],
        feedbacks: List[str],
        scores: List[str],
    ) -> List[dict]:

        dataset = []

        for i in range(len(instructions)):
            dataset.append(
                {
                    "prompt": instructions[i],
                    "score": scores[i],
                    "preferred": responses_A[i] if scores[i] == "A" else responses_B[i],
                    "rejected": responses_A[i] if scores[i] == "B" else responses_B[i],
                    "feedback": feedbacks[i],
                }
            )

        return dataset

    def eval_relative(
        self,
        instructions: List[str],
        responses_A: List[str],
        responses_B: List[str],
        rubric: str,
    ) -> List[dict]:
        """
        Performs relative evaluation comparing pairs of responses.

        Args:
            instructions (List[str]): List of instruction prompts.
            responses_A (List[str]): First set of responses to compare.
            responses_B (List[str]): Second set of responses to compare.
            rubric (str): The evaluation rubric to use. Supported rubrics:

                - factual-validity
                - helpfulness
                - honesty
                - harmlessness
                - reasoning

        Returns:
            List[dict]: A formatted list of evaluation results containing preferences and feedback.

        Raises:
            Exception: If there's an error during the evaluation process.
        """
        try:
            judge = self.PrometheusEval(
                model=self.model,
                relative_grade_template=self.RELATIVE_PROMPT_WO_REF,
            )
            feedbacks, scores = judge.relative_grade(
                instructions=instructions,
                responses_A=responses_A,
                responses_B=responses_B,
                rubric=self.rubric_mapping[rubric],
            )

            return self._format_relative(
                instructions=instructions,
                responses_A=responses_A,
                responses_B=responses_B,
                feedbacks=feedbacks,
                scores=scores,
            )

        except Exception as e:
            raise Exception(f"Error while evaluating with Prometheus2: {e}")

    def eval_absolute(
        self,
        instructions: List[str],
        responses: List[str],
        rubric: str,
    ) -> List[dict]:
        """
        Performs absolute evaluation of individual responses.

        Args:
            instructions (List[str]): List of instruction prompts.
            responses (List[str]): List of responses to evaluate.
            rubric (str): The evaluation rubric to use. Supported rubrics:

                - factual-validity
                - helpfulness
                - honesty
                - harmlessness
                - reasoning

        Returns:
            List[dict]: A formatted list of evaluation results containing scores and feedback.

        Raises:
            Exception: If there's an error during the evaluation process.
        """
        try:
            judge = self.PrometheusEval(
                model=self.model,
                absolute_grade_template=self.ABSOLUTE_PROMPT_WO_REF,
            )
            feedbacks, scores = judge.absolute_grade(
                instructions=instructions,
                responses=responses,
                rubric=self.rubric_mapping[rubric],
            )

            return self._format_absolute(
                scores=scores,
                instructions=instructions,
                responses=responses,
                feedbacks=feedbacks,
            )
        except Exception as e:
            raise Exception(f"Error while evaluating with Prometheus2: {e}")

eval_relative

eval_relative(instructions: List[str], responses_A: List[str], responses_B: List[str], rubric: str) -> List[dict]

Performs relative evaluation comparing pairs of responses.

Parameters:

Name Type Description Default
instructions List[str]

List of instruction prompts.

required
responses_A List[str]

First set of responses to compare.

required
responses_B List[str]

Second set of responses to compare.

required
rubric str

The evaluation rubric to use. Supported rubrics:

  • factual-validity
  • helpfulness
  • honesty
  • harmlessness
  • reasoning
required

Returns:

Type Description
List[dict]

List[dict]: A formatted list of evaluation results containing preferences and feedback.

Raises:

Type Description
Exception

If there's an error during the evaluation process.

Source code in openpo/resources/prometheus2/prometheus2.py
def eval_relative(
    self,
    instructions: List[str],
    responses_A: List[str],
    responses_B: List[str],
    rubric: str,
) -> List[dict]:
    """
    Performs relative evaluation comparing pairs of responses.

    Args:
        instructions (List[str]): List of instruction prompts.
        responses_A (List[str]): First set of responses to compare.
        responses_B (List[str]): Second set of responses to compare.
        rubric (str): The evaluation rubric to use. Supported rubrics:

            - factual-validity
            - helpfulness
            - honesty
            - harmlessness
            - reasoning

    Returns:
        List[dict]: A formatted list of evaluation results containing preferences and feedback.

    Raises:
        Exception: If there's an error during the evaluation process.
    """
    try:
        judge = self.PrometheusEval(
            model=self.model,
            relative_grade_template=self.RELATIVE_PROMPT_WO_REF,
        )
        feedbacks, scores = judge.relative_grade(
            instructions=instructions,
            responses_A=responses_A,
            responses_B=responses_B,
            rubric=self.rubric_mapping[rubric],
        )

        return self._format_relative(
            instructions=instructions,
            responses_A=responses_A,
            responses_B=responses_B,
            feedbacks=feedbacks,
            scores=scores,
        )

    except Exception as e:
        raise Exception(f"Error while evaluating with Prometheus2: {e}")

eval_absolute

eval_absolute(instructions: List[str], responses: List[str], rubric: str) -> List[dict]

Performs absolute evaluation of individual responses.

Parameters:

Name Type Description Default
instructions List[str]

List of instruction prompts.

required
responses List[str]

List of responses to evaluate.

required
rubric str

The evaluation rubric to use. Supported rubrics:

  • factual-validity
  • helpfulness
  • honesty
  • harmlessness
  • reasoning
required

Returns:

Type Description
List[dict]

List[dict]: A formatted list of evaluation results containing scores and feedback.

Raises:

Type Description
Exception

If there's an error during the evaluation process.

Source code in openpo/resources/prometheus2/prometheus2.py
def eval_absolute(
    self,
    instructions: List[str],
    responses: List[str],
    rubric: str,
) -> List[dict]:
    """
    Performs absolute evaluation of individual responses.

    Args:
        instructions (List[str]): List of instruction prompts.
        responses (List[str]): List of responses to evaluate.
        rubric (str): The evaluation rubric to use. Supported rubrics:

            - factual-validity
            - helpfulness
            - honesty
            - harmlessness
            - reasoning

    Returns:
        List[dict]: A formatted list of evaluation results containing scores and feedback.

    Raises:
        Exception: If there's an error during the evaluation process.
    """
    try:
        judge = self.PrometheusEval(
            model=self.model,
            absolute_grade_template=self.ABSOLUTE_PROMPT_WO_REF,
        )
        feedbacks, scores = judge.absolute_grade(
            instructions=instructions,
            responses=responses,
            rubric=self.rubric_mapping[rubric],
        )

        return self._format_absolute(
            scores=scores,
            instructions=instructions,
            responses=responses,
            feedbacks=feedbacks,
        )
    except Exception as e:
        raise Exception(f"Error while evaluating with Prometheus2: {e}")

Storage

HuggingFaceStorage

Storage class for HuggingFace Datasets.

This class provides methods to store and retrieve data from HuggingFace's dataset repositories. It handles the creation of repositories and manages data upload and download operations.

Parameters:

Name Type Description Default
api_key str

HuggingFace API token with write access. Environment variable can be set instead of passing in the key.

None

Raises:

Type Description
AuthenticationError

If authentication fails

ProviderError

If HuggingFace error is raised

Source code in openpo/storage/huggingface.py
class HuggingFaceStorage:
    """Storage class for HuggingFace Datasets.

    This class provides methods to store and retrieve data from HuggingFace's dataset
    repositories. It handles the creation of repositories and manages data upload
    and download operations.

    Parameters:
        api_key (str): HuggingFace API token with write access. Environment variable can be set instead of passing in the key.

    Raises:
        AuthenticationError: If authentication fails
        ProviderError: If HuggingFace error is raised
    """

    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.getenv("HF_API_KEY")
        if not self.api_key:
            raise AuthenticationError(
                provider="Huggingface",
                message=f"HuggingFace API key must be provided.",
            )

    def _convert_to_dict(self, data: List[Dict[str, Any]]) -> Dict:
        if not data:
            return {}

        keys = data[0].keys()

        return {key: [item[key] for item in data] for key in keys}

    def push_to_repo(
        self,
        repo_id: str,
        data: Union[List[Dict[str, Any]], pd.DataFrame],
        config_name: str = "default",
        set_default: Optional[bool] = None,
        split: Optional[str] = None,
        data_dir: Optional[str] = None,
        commit_message: Optional[str] = None,
        commit_description: Optional[str] = None,
        private: Optional[bool] = False,
        token: Optional[str] = None,
        revision: Optional[str] = None,
        create_pr: Optional[bool] = False,
        max_shard_size: Optional[Union[int, str]] = None,
        num_shards: Optional[int] = None,
        embed_external_files: bool = True,
    ):
        """
        Push data to HuggingFace dataset repository.
        This is the implementation of HuggingFace Dataset's push_to_hub method.
        For parameters not listed, check HuggingFace documentation for more detail.

        Args:
            data: The data to upload.

                - List[Dict]
                - pandas DataFrame

            repo_id (str): Name of the dataset repository.

        Raises:
            Exception: If pushing to dataset repository fails.
        """

        if not isinstance(data, (list, pd.DataFrame)):
            raise TypeError("data must be a list of dictionaries or pandas DataFrame")

        if isinstance(data, pd.DataFrame):
            ds = Dataset.from_pandas(data)

        if isinstance(data, list):
            ds = self._convert_to_dict(data)
            ds = Dataset.from_dict(ds)

        try:
            ds.push_to_hub(
                repo_id=repo_id,
                config_name=config_name,
                set_default=set_default,
                split=split,
                data_dir=data_dir,
                commit_message=commit_message,
                commit_description=commit_description,
                private=private,
                token=token,
                revision=revision,
                create_pr=create_pr,
                max_shard_size=max_shard_size,
                num_shards=num_shards,
                embed_external_files=embed_external_files,
            )
        except Exception as e:
            raise ProviderError(
                provider="huggingface storage",
                message=f"Error pushing data to the repository: {str(e)}",
            )

    def load_from_repo(
        self,
        path: str,
        name: Optional[str] = None,
        data_dir: Optional[str] = None,
        data_files: Optional[
            Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
        ] = None,
        split: Optional[str] = None,
        cache_dir: Optional[str] = None,
        features=None,
        download_config=None,
        download_mode=None,
        verification_mode=None,
        keep_in_memory: Optional[bool] = None,
        save_infos: bool = False,
        revision: Optional[str] = None,
        token: Optional[Union[bool, str]] = None,
        streaming: bool = False,
        num_proc: Optional[int] = None,
        storage_options: Optional[Dict] = None,
        trust_remote_code: bool = None,
        **config_kwargs,
    ) -> Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset]:
        """
        Load data from HuggingFace dataset repository.
        This is direct implementation of HuggingFace Dataset load_dataset method.
        For arguments not listed here, check HuggingFace documentation for more detail.

        Args:
            path (str): Path or name of the dataset.

        Raises:
            Exception: If loading data from repository fails.
        """

        try:
            return load_dataset(
                path=path,
                name=name,
                data_dir=data_dir,
                data_files=data_files,
                split=split,
                cache_dir=cache_dir,
                features=features,
                download_config=download_config,
                download_mode=download_mode,
                verification_mode=verification_mode,
                keep_in_memory=keep_in_memory,
                save_infos=save_infos,
                revision=revision,
                token=token,
                streaming=streaming,
                num_proc=num_proc,
                storage_options=storage_options,
                trust_remote_code=trust_remote_code,
                **config_kwargs,
            )
        except Exception as e:
            raise ProviderError(
                provider="huggingface storage",
                message=f"Error loading data from the HF repository: {str(e)}",
            )

push_to_repo

push_to_repo(repo_id: str, data: Union[List[Dict[str, Any]], pd.DataFrame], config_name: str = 'default', set_default: Optional[bool] = None, split: Optional[str] = None, data_dir: Optional[str] = None, commit_message: Optional[str] = None, commit_description: Optional[str] = None, private: Optional[bool] = False, token: Optional[str] = None, revision: Optional[str] = None, create_pr: Optional[bool] = False, max_shard_size: Optional[Union[int, str]] = None, num_shards: Optional[int] = None, embed_external_files: bool = True)

Push data to HuggingFace dataset repository. This is the implementation of HuggingFace Dataset's push_to_hub method. For parameters not listed, check HuggingFace documentation for more detail.

Parameters:

Name Type Description Default
data Union[List[Dict[str, Any]], DataFrame]

The data to upload.

  • List[Dict]
  • pandas DataFrame
required
repo_id str

Name of the dataset repository.

required

Raises:

Type Description
Exception

If pushing to dataset repository fails.

Source code in openpo/storage/huggingface.py
def push_to_repo(
    self,
    repo_id: str,
    data: Union[List[Dict[str, Any]], pd.DataFrame],
    config_name: str = "default",
    set_default: Optional[bool] = None,
    split: Optional[str] = None,
    data_dir: Optional[str] = None,
    commit_message: Optional[str] = None,
    commit_description: Optional[str] = None,
    private: Optional[bool] = False,
    token: Optional[str] = None,
    revision: Optional[str] = None,
    create_pr: Optional[bool] = False,
    max_shard_size: Optional[Union[int, str]] = None,
    num_shards: Optional[int] = None,
    embed_external_files: bool = True,
):
    """
    Push data to HuggingFace dataset repository.
    This is the implementation of HuggingFace Dataset's push_to_hub method.
    For parameters not listed, check HuggingFace documentation for more detail.

    Args:
        data: The data to upload.

            - List[Dict]
            - pandas DataFrame

        repo_id (str): Name of the dataset repository.

    Raises:
        Exception: If pushing to dataset repository fails.
    """

    if not isinstance(data, (list, pd.DataFrame)):
        raise TypeError("data must be a list of dictionaries or pandas DataFrame")

    if isinstance(data, pd.DataFrame):
        ds = Dataset.from_pandas(data)

    if isinstance(data, list):
        ds = self._convert_to_dict(data)
        ds = Dataset.from_dict(ds)

    try:
        ds.push_to_hub(
            repo_id=repo_id,
            config_name=config_name,
            set_default=set_default,
            split=split,
            data_dir=data_dir,
            commit_message=commit_message,
            commit_description=commit_description,
            private=private,
            token=token,
            revision=revision,
            create_pr=create_pr,
            max_shard_size=max_shard_size,
            num_shards=num_shards,
            embed_external_files=embed_external_files,
        )
    except Exception as e:
        raise ProviderError(
            provider="huggingface storage",
            message=f"Error pushing data to the repository: {str(e)}",
        )

load_from_repo

load_from_repo(path: str, name: Optional[str] = None, data_dir: Optional[str] = None, data_files: Optional[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]] = None, split: Optional[str] = None, cache_dir: Optional[str] = None, features=None, download_config=None, download_mode=None, verification_mode=None, keep_in_memory: Optional[bool] = None, save_infos: bool = False, revision: Optional[str] = None, token: Optional[Union[bool, str]] = None, streaming: bool = False, num_proc: Optional[int] = None, storage_options: Optional[Dict] = None, trust_remote_code: bool = None, **config_kwargs) -> Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset]

Load data from HuggingFace dataset repository. This is direct implementation of HuggingFace Dataset load_dataset method. For arguments not listed here, check HuggingFace documentation for more detail.

Parameters:

Name Type Description Default
path str

Path or name of the dataset.

required

Raises:

Type Description
Exception

If loading data from repository fails.

Source code in openpo/storage/huggingface.py
def load_from_repo(
    self,
    path: str,
    name: Optional[str] = None,
    data_dir: Optional[str] = None,
    data_files: Optional[
        Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
    ] = None,
    split: Optional[str] = None,
    cache_dir: Optional[str] = None,
    features=None,
    download_config=None,
    download_mode=None,
    verification_mode=None,
    keep_in_memory: Optional[bool] = None,
    save_infos: bool = False,
    revision: Optional[str] = None,
    token: Optional[Union[bool, str]] = None,
    streaming: bool = False,
    num_proc: Optional[int] = None,
    storage_options: Optional[Dict] = None,
    trust_remote_code: bool = None,
    **config_kwargs,
) -> Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset]:
    """
    Load data from HuggingFace dataset repository.
    This is direct implementation of HuggingFace Dataset load_dataset method.
    For arguments not listed here, check HuggingFace documentation for more detail.

    Args:
        path (str): Path or name of the dataset.

    Raises:
        Exception: If loading data from repository fails.
    """

    try:
        return load_dataset(
            path=path,
            name=name,
            data_dir=data_dir,
            data_files=data_files,
            split=split,
            cache_dir=cache_dir,
            features=features,
            download_config=download_config,
            download_mode=download_mode,
            verification_mode=verification_mode,
            keep_in_memory=keep_in_memory,
            save_infos=save_infos,
            revision=revision,
            token=token,
            streaming=streaming,
            num_proc=num_proc,
            storage_options=storage_options,
            trust_remote_code=trust_remote_code,
            **config_kwargs,
        )
    except Exception as e:
        raise ProviderError(
            provider="huggingface storage",
            message=f"Error loading data from the HF repository: {str(e)}",
        )

S3Storage

Storage adapter for Amazon S3.

This class provides methods to store and retrieve data from Amazon S3 buckets. It handles JSON serialization/deserialization and manages S3 operations through boto3 client.

Parameters:

Name Type Description Default
**kwargs

Keyword arguments can be passed to access AWS:

  • region_name
  • aws_access_key_id
  • aws_secret_access_key
  • profile_name

Alternatively, credentials can be configured with aws configure

{}

Raises:

Type Description
ProviderError

If S3 Client error is raised

Source code in openpo/storage/s3.py
class S3Storage:
    """Storage adapter for Amazon S3.

    This class provides methods to store and retrieve data from Amazon S3 buckets.
    It handles JSON serialization/deserialization and manages S3 operations through
    boto3 client.

    Parameters:
        **kwargs: Keyword arguments can be passed to access AWS:

            - region_name
            - aws_access_key_id
            - aws_secret_access_key
            - profile_name

            Alternatively, credentials can be configured with aws configure

    Raises:
        ProviderError: If S3 Client error is raised

    """

    def __init__(self, **kwargs):
        try:
            self.s3 = boto3.client("s3", **kwargs)
        except ClientError as e:
            raise ProviderError(
                provider="s3",
                message=f"Failed to initialize boto3 client: {str(e)}",
            )

    def _read_file(self, bucket: str, key: str) -> List[Dict[str, Any]]:
        try:
            res = self.s3.get_object(Bucket=bucket, Key=key)
            content = res["Body"].read()

            file_ext = key.split(".")[-1].lower()
            if file_ext == "json":
                data = json.loads(content)
                if isinstance(data, list):
                    return data
                return list(data)
            elif file_ext == "parquet":
                try:
                    parquet_buffer = io.BytesIO(content)
                    df = pd.read_parquet(parquet_buffer)
                    return json.loads(df.to_json(orient="records"))
                except Exception as e:
                    raise ValueError(f"Failed to parse content as parquet: {str(e)}")
            else:
                raise ValueError(
                    f"Unsupported content type: {content_type}. Supported extensions are: json, parquet "
                )
        except ClientError as err:
            raise err

    def _serialize_data(
        self,
        data,
        serialization_type,
    ) -> tuple[bytes, str]:
        if isinstance(data, bytes):
            return data, "application/octet-stream"

        if serialization_type == "parquet":
            buffer = io.BytesIO()

            if isinstance(data, list):
                if not all(isinstance(item, dict) for item in data):
                    raise TypeError(
                        "All items in list must be dictionaries when using 'parquet' serialization"
                    )
                df = pd.DataFrame(data)
            elif isinstance(data, pd.DataFrame):
                df = data
            else:
                raise TypeError(
                    "Data must be DataFrame or list of dicts when using 'parquet' serialization"
                )

            df.to_parquet(buffer)
            return buffer.getvalue(), "application/octet-stream"

        if serialization_type == "json":
            if isinstance(data, pd.DataFrame):
                data = json.loads(data.to_json(orient="records"))
            elif not isinstance(data, list):
                raise TypeError(
                    "Data must be a list or DataFrame when using 'json' serialization"
                )

            return json.dumps(data, default=str).encode(), "application/json"

        raise ValueError(f"Unsupported serialization type: {serialization_type}")

    def push_to_s3(
        self,
        data: Union[List[Dict[str, Any]], pd.DataFrame, bytes],
        bucket: str,
        key: Optional[str] = None,
        ext_type: Literal["parquet", "json"] = "parquet",
    ):
        """Upload data to an S3 bucket.

        Args:
            data: The data to upload.

                - List[Dict]: List of dictionaries
                - pd.DataFrame: Pandas DataFrame

            bucket (str): Name of the S3 bucket
            key (str, optional): Object key (path) in the bucket

            ext_type (str): Type of serialization to use:

                - parquet: Serialize as parquet
                - json: Serialize as JSON


        Raises:
            ClientError: If S3 operation fails
            TypeError: If data type is not compatible with chosen serialization type
            ValueError: If serialization type is not supported or data cannot be deserialized
        """
        try:
            serialized_data, content_type = self._serialize_data(data, ext_type)

            self.s3.put_object(
                Bucket=bucket,
                Key=f"{key}.{ext_type}",
                Body=serialized_data,
                ContentType=content_type,
            )

        except ClientError as err:
            raise ProviderError(
                provider="s3",
                message=f"Failed to push data to s3: {str(err)}",
            )

    def load_from_s3(self, bucket: str, key: str) -> List[Dict[str, Any]]:
        """Read data from an S3 bucket.

        Args:
            bucket (str): Name of the S3 bucket.
            key (str): Object name (path) in the bucket.

        Returns:
            List[Dict]: The loaded data as a list of dictionaries.

        Raises:
            ClientError: If S3 operation fails.
            ValueError: If content type is not supported or content cannot be parsed.
        """
        content = self._read_file(bucket, key)
        return content

push_to_s3

push_to_s3(data: Union[List[Dict[str, Any]], pd.DataFrame, bytes], bucket: str, key: Optional[str] = None, ext_type: Literal['parquet', 'json'] = 'parquet')

Upload data to an S3 bucket.

Parameters:

Name Type Description Default
data Union[List[Dict[str, Any]], DataFrame, bytes]

The data to upload.

  • List[Dict]: List of dictionaries
  • pd.DataFrame: Pandas DataFrame
required
bucket str

Name of the S3 bucket

required
key str

Object key (path) in the bucket

None
ext_type str

Type of serialization to use:

  • parquet: Serialize as parquet
  • json: Serialize as JSON
'parquet'

Raises:

Type Description
ClientError

If S3 operation fails

TypeError

If data type is not compatible with chosen serialization type

ValueError

If serialization type is not supported or data cannot be deserialized

Source code in openpo/storage/s3.py
def push_to_s3(
    self,
    data: Union[List[Dict[str, Any]], pd.DataFrame, bytes],
    bucket: str,
    key: Optional[str] = None,
    ext_type: Literal["parquet", "json"] = "parquet",
):
    """Upload data to an S3 bucket.

    Args:
        data: The data to upload.

            - List[Dict]: List of dictionaries
            - pd.DataFrame: Pandas DataFrame

        bucket (str): Name of the S3 bucket
        key (str, optional): Object key (path) in the bucket

        ext_type (str): Type of serialization to use:

            - parquet: Serialize as parquet
            - json: Serialize as JSON


    Raises:
        ClientError: If S3 operation fails
        TypeError: If data type is not compatible with chosen serialization type
        ValueError: If serialization type is not supported or data cannot be deserialized
    """
    try:
        serialized_data, content_type = self._serialize_data(data, ext_type)

        self.s3.put_object(
            Bucket=bucket,
            Key=f"{key}.{ext_type}",
            Body=serialized_data,
            ContentType=content_type,
        )

    except ClientError as err:
        raise ProviderError(
            provider="s3",
            message=f"Failed to push data to s3: {str(err)}",
        )

load_from_s3

load_from_s3(bucket: str, key: str) -> List[Dict[str, Any]]

Read data from an S3 bucket.

Parameters:

Name Type Description Default
bucket str

Name of the S3 bucket.

required
key str

Object name (path) in the bucket.

required

Returns:

Type Description
List[Dict[str, Any]]

List[Dict]: The loaded data as a list of dictionaries.

Raises:

Type Description
ClientError

If S3 operation fails.

ValueError

If content type is not supported or content cannot be parsed.

Source code in openpo/storage/s3.py
def load_from_s3(self, bucket: str, key: str) -> List[Dict[str, Any]]:
    """Read data from an S3 bucket.

    Args:
        bucket (str): Name of the S3 bucket.
        key (str): Object name (path) in the bucket.

    Returns:
        List[Dict]: The loaded data as a list of dictionaries.

    Raises:
        ClientError: If S3 operation fails.
        ValueError: If content type is not supported or content cannot be parsed.
    """
    content = self._read_file(bucket, key)
    return content