Module ragability.ragability_eval

Module for the CLI to create evaluation reports from a ragability_check output file.

Functions

def get_args()
Expand source code
def get_args():
    """
    Get the command line arguments
    """
    parser = argparse.ArgumentParser(description='Evaluation of a ragability_check output file')
    parser.add_argument('--input', '-i', type=str, help='Input ragability_check output file', required=True)
    parser.add_argument('--save-json', '-o', type=str,
                        help='Output json or hjson', required=False)
    parser.add_argument('--config', '-c', type=str, help='Configuration file', required=False)
    parser.add_argument("--save-longdf", type=str, help="Save the long format dataframe to a csv or tsv file", required=False)
    parser.add_argument("--save-widedf", type=str, help="Save the wide format dataframe to a csv or tsv file", required=False)
    parser.add_argument('--verbose', '-v', action="store_true",
                        help='Be more verbose', required=False)
    parser.add_argument('--by_tags', nargs="+", type=str,
                        help='List of tags or comma-separated taglists to evaluate by', required=False)
    parser.add_argument('--by_qfields', nargs="+", type=str,
                        help='List of query fields to evaluate by', required=False)
    parser.add_argument("--debug", "-d", action="store_true", help="Debug mode", required=False)
    parser.add_argument("--debug-save-checkdfs", action="store_true", help="Save all the per-metric data frames", required=False)
    args_tmp = parser.parse_args()
    args = {}
    args.update(vars(args_tmp))
    return args

Get the command line arguments

def main()
Expand source code
def main():
    args = get_args()
    if args["debug"]:
        set_logging_level(DEBUG)
        ppargs = pp_config(args)
        logger.debug(f"Effective arguments: {ppargs}")
    run(args)
def make_grouping_func(df: pandas.core.frame.DataFrame,
tags: List[str] | None = None,
fields: Dict[str, str] | None = None)
Expand source code
def make_grouping_func(
    df: pd.DataFrame,
    tags: Optional[List[str]] = None,
    fields: Optional[Dict[str,str]] = None ):
    """
    Create a function which can be used as an argument to the pandas group_by method on the given dataframe.
    This creates a binary grouping where one group consists of all the rows that match the given tags and field values,
    and another group which does not.
    """
    # if both tags and fields are None or empty, raise and Exception
    if tags is None and fields is None:
        raise Exception("No grouping criteria")

    def the_groupby_func(index):
        row = df.loc[index]
        if tags:
            tag_values = [s.strip() for s in row["tags"].split(",")]
            for t in tags:
                if t not in tag_values:
                    logger.debug(f"Tag {t} not in {tag_values} in {row} for groupby {tags}")
                    return False
        if fields:
            for fname, fval in fields.items():
                if row[fname] != fval:
                    return False
        return True
    return the_groupby_func

Create a function which can be used as an argument to the pandas group_by method on the given dataframe. This creates a binary grouping where one group consists of all the rows that match the given tags and field values, and another group which does not.

def run(config: dict)
Expand source code
def run(config: dict):
    # read the input file and collect for each check the necessary fields
    indata = read_input_file(config["input"])
    checksdata = defaultdict(lambda: defaultdict(list))
    # counter to count the queries with errors
    n_errors = 0
    n_errors_per_llm = Counter()
    nc_errors = 0
    nc_errors_per_llm = Counter()
    n_rows = 0
    for idx, q in enumerate(indata):
        error = q.get("error")
        llm = q.get("llm")
        if not llm:
            raise ValueError(f"Error: Missing 'llm' field in entry with index {idx}: {q}")
        if error:
            n_errors += 1
            n_errors_per_llm[llm] += 1
            continue
        for check in q["checks"]:
            func = check["func"]
            funcdef = CHECKS.get(func)
            kind = funcdef["kind"]
            if not funcdef:
                logger.error(f"Check function {func} not found in check for qid {q['qid']}")
                nc_errors += 1
                nc_errors_per_llm[llm] += 1
                continue
            metrics = check["metrics"]
            for metric in metrics:
                row = dict(
                    target=funcdef["target"],
                    result=check["result"],
                    qid=q["qid"],
                    tags=q["tags"],
                    llm=llm,
                )
                # add any non=standard fields from the query to the row
                for k, v in q.items():
                    if k not in Q_STANDARD_FIELDS:
                        row[k] = v
                # add any non-standard fields from the check to the row
                for k, v in check.items():
                    if k not in C_STANDARD_FIELDS:
                        row[f"check_{k}"] = v
                checksdata[kind][metric].append(row)
                n_rows += 1
    logger.debug(f"Errors in queries: {n_errors}")
    logger.debug(f"Errors in checks: {nc_errors}")
    logger.debug(f"Errors in queries per llm: {n_errors_per_llm}")
    logger.debug(f"Errors in checks per llm: {nc_errors_per_llm}")
    logger.debug(f"Generated check data rows: {n_rows}")
    # convert the checksdata to a dictionary of dataframes, one for each metric
    checkdfs = {}
    for kind, kinddata in checksdata.items():
        for metric, metricdata in kinddata.items():
            dftmp = pd.DataFrame(metricdata)
            checkdfs[f"{kind}:{metric}"] = dftmp
            logger.debug(f"Generated check data dataframe for {kind}:{metric} with {len(dftmp)} rows and {len(dftmp.columns)} columns")
            # if --debug option is given, write the dataframe to a csv file
            if config.get("debug-save-checkdfs"):
                dftmp.to_csv(f"debug_checkdata_{kind}_{metric}.csv", index=False)
    logger.debug(f"Generated check data dataframes: {len(checkdfs)} for keys {list(checkdfs.keys())}")

    # we have to generate an evaluation report dataframe of the following format:
    # * column "group" contains a description of how the subgroup is defined
    # * columns "llm" contains the llm name
    # * one column per metric and statistic, for binary metrics this is of the form "metricname:accuracy"
    # * one column per metric which contains the number of rows, this is of the form "metricname:n"
    #
    # example:
    # group, llm, metric1:accuracy, metric1:n, metric2:accuracy, metric2:n
    #
    # To prepare the data for this dataframe, collect the rows of the dataframe in list, where each row
    # is a dictionary with all the necessary fields

    dfrows = []
    # first of all, create the entries without any grouping, just by LLMs for all the metrics
    for key, df in checkdfs.items():
        kind, metric = key.split(":")
        for llm, llmgroup in df.groupby("llm"):
            dfrows.append(dict(
                group="all",
                llm=llm,
                metric=f"{metric}:accuracy",
                value=sk.metrics.accuracy_score(llmgroup["target"].values, llmgroup["result"].values)
            ))
            dfrows.append(dict(
                group="all",
                llm=llm,
                metric=f"{metric}:n",
                value=len(llmgroup)
            ))
    logger.debug(f"Generated {len(dfrows)} rows for all LLMs")


    # now if we have grouping criteria, do the following: for each of the by_tags or by_qfields criteria,
    # create a grouping function to split the df into two groups, one that matches the criteria and one that does not.
    # Create the corresponding dataframes with the rows matching the criteria and the other with the rows not
    # matching the criteria. Then group each of these dataframes by LLM and calculate the accuracy and number of rows
    # for each metric.
    if config.get("by_tags") or config.get("by_qfields"):
        for groupbyname in ["by_tags", "by_qfields"]:
            groupbyvalues = config.get(groupbyname)
            logger.debug(f"Grouping by {groupbyname} with values {groupbyvalues}")
            if not groupbyvalues:
                continue
            n_rows4group = 0
            for groupbyvalue in groupbyvalues:
                logger.debug(f"Generating rows for grouping by {groupbyname} with value {groupbyvalue}")
                for key, df in checkdfs.items():
                    if groupbyname == "by_tags":
                        grouping_func = make_grouping_func(df, tags=[groupbyvalue])
                    else:
                        # find all possible values of the field in the df
                        fields = {groupbyvalue: v for v in df[groupbyvalue].unique()}
                        grouping_func = make_grouping_func(df, fields=fields)
                    kind, metric = key.split(":")
                    grouped = df.groupby(grouping_func)
                    for group, groupdf in grouped:
                        logger.debug(f"Grouping {key} by {groupbyname} with value {groupbyvalue} and group {group}")
                        if group:
                            groupname = f"{groupbyvalue}:yes"
                        else:
                            groupname = f"{groupbyvalue}:no"
                        for llm, llmgroup in groupdf.groupby("llm"):
                            dfrows.append(dict(
                                group=groupname,
                                llm=llm,
                                metric=f"{metric}:accuracy",
                                value=sk.metrics.accuracy_score(llmgroup["target"].values, llmgroup["result"].values)
                            ))
                            dfrows.append(dict(
                                group=groupname,
                                llm=llm,
                                metric=f"{metric}:n",
                                value=len(llmgroup)
                            ))
                            n_rows4group += 2
            logger.debug(f"Generated {n_rows4group} rows for grouping by {groupbyname}")
    logger.debug(f"Generated {len(dfrows)} rows in total")
    # create the long format dataframe from the list of rows
    dfout_long = pd.DataFrame(dfrows)
    if config.get("save-longdf"):
        if config["save-longdf"].endswith(".csv"):
            dfout_long.to_csv(config["save-longdf"], index=False)
        elif config["save-longdf"].endswith(".tsv"):
            dfout_long.to_csv(config["save-longdf"], index=False, sep="\t")
        else:
            raise Exception(f"Error: Output file must end in .csv or .tsv, not {config['save-longdf']}")
    # now pivot the long format dataframe to the wide format
    dfout = dfout_long.pivot_table(index=["group", "llm"], columns="metric", values="value")
    dfout.reset_index(inplace=True)
    if config.get("save-widedf"):
        if config["save-widedf"].endswith(".csv"):
            dfout.to_csv(config["save-widedf"], index=False)
        elif config["save-widedf"].endswith(".tsv"):
            dfout.to_csv(config["save-widedf"], index=False, sep="\t")
        else:
            raise Exception(f"Error: Output file must end in .csv or .tsv, not {config['save-widedf']}")
    # if the output file is specified, save the dataframe as csv or tsv depending on the extension or
    # save a dictionary representation of the dataframe as json or hjson
    if config.get("save-json"):
        if config["output"].endswith(".json"):
            dfout.to_json(config["output"], orient="records")
        elif config["output"].endswith(".hjson"):
            with open(config["output"], "wt") as outfp:
                hjson.dump(dfout.to_dict(orient="records"), outfp)
        else:
            raise Exception(f"Error: Output file must end in .csv, .tsv, .json or .hjson, not {config['output']}")
    # if verbose is set, or no output file is specified, write the results to stdout using textual formattign of
    # the dataframe
    if config.get("verbose") or not config.get("output"):
        print(dfout_long.to_string())