DataFrame.js

"use strict";

var DataFrameWriter = require("./DataFrameWriter");
var GroupedData = require("./GroupedData");
var Column = require("./Column");
var Row = require("./Row");
var java = require("./java");
var helpers = require("./helpers");

/**
 * A distributed collection of data organized into named columns.
 *
 * DataFrames can be created using various functions in {@link SQLContext}.
 * They can be manipulated using the various domain-specific-language (DSL)
 * functions defined in: {@link DataFrame} (this class), {@link Column}, and
 * {@link Functions}.
 *
 * @example <caption>To select a column from the data frame, use the <code>col</code> method.</caption>
 *   var ageCol = people.col("age");
 *
 * @example <caption>Note that the {@link Column} type can also be manipulated through its various functions.</caption>
 *   // The following creates a new column that increases everybody's age by 10.
 *   people.col("age").plus(10);
 *
 * @example <caption>A more complete example.</caption>
 *   var people = sqlContext.read().json("...");
 *   var department = sqlContext.read().json("...");
 *
 *   people.filter("age > 30")
 *     .join(department, people.col("deptId").eq(department.col("id")))
 *     .groupBy(department.col("name"), people.col("gender"))
 *     .agg(F.avg(people.col("salary")), F.max(people.col("age")));
 *
 * @since 1.3.0
 */
class DataFrame {

    /**
     * **Note:** Do not use directly (see above).
     */
    constructor(jvm_obj) {
        this.jvm_obj = jvm_obj;
    }

    /**
     * Returns a new {@link DataFrame} with new specified column names.
     * @param colNames Array of new column names.
     * @since 1.3.0
     */
    toDF(...colNames /*: String* */) /*: DataFrame*/ {
        var new_jvm_obj = this.jvm_obj.toDF(...colNames);
        return new DataFrame(new_jvm_obj);
    }

    /**
     * Returns all column names as an array.
     *
     * @param cb Node-style callback function (error-first).
     * @since 1.3.0
     */
    columns(cb /*: cb: (err: any, res: Array[String]) =>  any */) /*: void */ {
        this.jvm_obj.columnsAsync(cb);
    }

    /**
     * The synchronous version of {@link DataFrame#columns}
     * @since 1.3.0
     */
    columnsSync() /*: Array[String]*/ {
        return this.jvm_obj.columns();
    }

    /**
     * Prints the schema to the console in a nice tree format.
     *
     * This method runs a computation but is still synchronous, because it is
     * used in an interactive setting (shell).
     *
     * @since 1.3.0
     */
    printSchema() /*: Unit */ {
        this.jvm_obj.printSchema();
    }

    /**
     * Prints the plans (logical and physical) to the console for debugging purposes.
     * @param {string} [extended=false]
     * @since 1.3.0
     */
    explain(extended=false /*: Boolean */) /*: Unit */ {
        this.jvm_obj.explain(extended);
    }

    /**
     * Returns true if the `collect` and `take` methods can be run locally
     * (without any Spark executors).
     * @since 1.3.0
     */
    isLocal() /*: Boolean*/ {
        return this.jvm_obj.isLocal();
    }

    /**
     * Displays the {@link DataFrame} in a tabular form. Strings more than 20 characters will be
     * truncated, and all cells will be aligned right.
     *
     * This method runs a computation but is still synchronous, because it is
     * used in an interactive setting (shell).
     *
     * @param {Number} [numRows=20] Number of rows to show.
     *
     * @param {Boolean} [truncate=true] If true, strings more than 20 characters will
     *              be truncated and all cells will be aligned right.
     *
     * @since 1.3.0
     */
    show(numRows=20 /*: Int */, truncate=true /*: Boolean */) /*: Unit */ {
        this.jvm_obj.show(numRows, truncate);
    }


    /**
     * Join with another {@link DataFrame}.
     *
     * If no `col` is provided, does a Cartesian join. (Note that cartesian
     * joins are very expensive without an extra filter that can be pushed
     * down).
     *
     * If a column name (string) is provided, does an equi-join.
     *
     * If a column expression is provided, uses that as a join expression.
     *
     * @example <caption>perform a full outer join between <code>df1</code> and <code>df2</code></caption>
     * df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer");
     *
     * @param right Right side of the join.
     * @param [col=null] Column name or join expression.
     * @param [joinType="inner"] One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
     * @since 1.3.0
     */
    join(right /*: DataFrame */, col=null /*: Column|String */, joinType="inner" /*: String*/) /*: DataFrame */ {
        if (col === null) {
            return new DataFrame(this.jvm_obj.join(right));
        } else {
            col = helpers.jobj_from_maybe_string(col);
            return new DataFrame(this.jvm_obj.join(right, col, joinType));
        }
    }

    /**
     * Returns a new {@link DataFrame} sorted by the specified column, all in ascending order.
     * @param col {@link Column}
     * @param cols Array of additional column names or expressions to sort by.
     * @since 1.3.0
     */
    sort(col /*: (Column | String) */, ...cols /*: (Column* | String*) */) /*: DataFrame */ {
        col = helpers.jobj_from_maybe_string(col);
        cols = cols.map(helpers.jobj_from_maybe_string);
        return new DataFrame(this.jvm_obj.sort(col, ...cols));
    }

    /**
     * Selects column based on the column name and return it as a {@link Column}.
     * Note that the column name can also reference to a nested column like `a.b`.
     * @param colName
     * @since 1.3.0
     */
    col(colName /*: String */) /*: Column */ {
        return new Column(this.jvm_obj.col(colName));
    }

    /**
     * Selects a set of column based expressions.
     *
     * @param cols Array of column names or expressions.
            If one of the column names is '*', that column is expanded to include all columns
            in the current DataFrame.
     * @since 1.3.0
     */
    select(...cols /*: (Column* | String*) */) /*: DataFrame */ {
        cols = cols.map(helpers.jobj_from_maybe_string);
        return new DataFrame(this.jvm_obj.select(...cols));
    }

    /**
     * Selects a set of SQL expressions. This is a variant of `select` that accepts
     * SQL expressions.
     *
     * @param exprs Array of SQL expressions.
     * @since 1.3.0
     */
    selectExpr(...exprs /*: String* */) /*: DataFrame */ {
        return new DataFrame(this.jvm_obj.selectExpr(...exprs));
    }

    /**
     * Filters rows using the given column expression or SQL expression.
     * @example <caption>The following are equivalent:</caption>
     *   peopleDf.filter(peopleDf.col("age").gt(15));
     *   peopleDf.filter("age > 15");
     * @param condition A {@link Column} of booleans or a string containing a SQL expression.
     * @since 1.3.0
     */
    filter(condition /*: Column|String */) /*: DataFrame */ {
        condition = helpers.jobj_from_maybe_string(condition);
        return new DataFrame(this.jvm_obj.filter(condition));
    }

    /**
     * Filters rows using the given condition. This is an alias for `filter`.
     * @param condition
     * @since 1.3.0
     */
    where(condition /*: Column|String */) /*: DataFrame */ {
        condition = helpers.jobj_from_maybe_string(condition);
        return new DataFrame(this.jvm_obj.filter(condition));
    }

    /**
     * Groups the {@link DataFrame} using the specified columns, so we can run aggregations on them.
     * See {@link GroupedData} for all the available aggregation functions.
     *
     * @example <caption>Compute the average for all numeric columns grouped by department.</caption>
     *   df.groupBy("department").avg();
     *
     * @param cols Array of column names or expressions to group by.
     * @since 1.3.0
     */
    groupBy(...cols /*: (Column* | String*) */) /*: GroupedData */ {
        cols = cols.map(helpers.jobj_from_maybe_string);
        return new GroupedData(this.jvm_obj.groupBy(...cols));
    }

    /**
     * Create a multi-dimensional rollup for the current {@link DataFrame} using the specified columns,
     * so we can run aggregations on them.
     * See {@link GroupedData} for all the available aggregation functions.
     *
     * @example
     * // Compute the average for all numeric columns rolluped by department and group
     *   df.rollup(df.col("department"), df.col("group")).avg();
     *
     * @param cols Array of column names or expressions.
     * @since 1.4.0
     */
    rollup(...cols /*: (Column* | String*) */) /*: GroupedData */ {
        cols = cols.map(helpers.jobj_from_maybe_string);
        return new GroupedData(this.jvm_obj.rollup(...cols));
    }

    /**
     * Create a multi-dimensional cube for the current {@link DataFrame} using the specified columns,
     * so we can run aggregation on them.
     * See {@link GroupedData} for all the available aggregation functions.
     *
     * @example
     *   // Compute the average for all numeric columns cubed by department and group.
     *   df.cube("department", "group").avg();
     *
     * @param cols Array of column names or expressions.
     * @since 1.4.0
     */
    cube(...cols /*: (Column*|String*) */) /*: GroupedData */ {
        cols = cols.map(helpers.jobj_from_maybe_string);
        return new GroupedData(this.jvm_obj.cube(...cols));
    }

    /**
     * Aggregates on the entire {@link DataFrame} without groups.
     * @example
     *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
     *   df.agg(F.max(df.col("age")), F.avg(df.col("salary")));
     *   df.groupBy().agg(F.max(df.col("age")), F.avg(df.col("salary")));
     *
     * @param cols Array of column names or expressions.
     * @since 1.3.0
     */
    agg(...cols /*: Column* */) /*: DataFrame */ {
        cols = cols.map(col => col.jvm_obj);
        return new DataFrame(this.jvm_obj.agg(...cols));
    }

    /**
     * Returns a new {@link DataFrame} by taking the first `n` rows. The difference between this function
     * and `head` is that `head` returns an array while `limit` returns a new {@link DataFrame}.
     * @param n Number of rows.
     * @since 1.3.0
     */
    limit(n /*: Int */) /*: DataFrame */ {
        return new DataFrame(this.jvm_obj.limit(n));
    }

    /**
     * Returns a new {@link DataFrame} containing union of rows in this frame and another frame.
     * This is equivalent to `UNION ALL` in SQL.
     * @param other {@link DataFrame}
     * @since 1.3.0
     */
    unionAll(other /*: DataFrame */) /*: DataFrame */ {
        return new DataFrame(this.jvm_obj.unionAll(other));
    }

    /**
     * Returns a new {@link DataFrame} containing rows only in both this frame and another frame.
     * This is equivalent to `INTERSECT` in SQL.
     * @param other {@link DataFrame}
     * @since 1.3.0
     */
    intersect(other /*: DataFrame */) /*: DataFrame */ {
        return new DataFrame(this.jvm_obj.intersect(other));
    }

    /**
     * Returns a new {@link DataFrame} containing rows in this frame but not in another frame.
     * This is equivalent to `EXCEPT` in SQL.
     * @param other {@link DataFrame}
     * @since 1.3.0
     */
    except(other /*: DataFrame */) /*: DataFrame */ {
        return new DataFrame(this.jvm_obj.except(other));
    }

    /**
     * Returns a new {@link DataFrame} by sampling a fraction of rows, using a random seed.
     *
     * @param withReplacement Sample with replacement or not.
     * @param fraction Fraction of rows to generate.
     * @since 1.3.0
     */
    sample(withReplacement /*: Boolean */, fraction /*: Double */) /*: DataFrame */ {
        return new DataFrame(this.jvm_obj.sample(withReplacement, fraction));
    }

    /**
     * Randomly splits this {@link DataFrame} with the provided weights.
     *
     * @param weights Weights for splits, will be normalized if they don't sum to 1.
     * @since 1.4.0
     */
    randomSplit(...weights /*: Double* */) /*: Array[DataFrame]*/ {

        throw new Error("Not implemented");

        // for some reason, this crashes the jvm. yet constructing the array
        // in the shell and passing it directly works. haven't dug into the
        // node-java code yet.

        /*eslint-disable no-unreachable*/
        var weightsArray = java.newArray("double", weights);

        return this.jvm_obj.randomSplit(weightsArray);
        /*eslint-enable no-unreachable*/

    }


    /**
     * Returns a new {@link DataFrame} by adding a column or replacing the existing column that has
     * the same name.
     * @param colName Name of the new column.
     * @param col {@link Column} Expression for the new column.
     * @since 1.3.0
     */
    withColumn(colName /*: String */, col /*: Column */) /*: DataFrame */ {
        col = helpers.jobj_from_maybe_string(col);
        return new DataFrame(this.jvm_obj.withColumn(colName, col));
    }


    /**
     * Returns a new {@link DataFrame} with a column dropped.
     * This is a no-op if schema doesn't contain column name.
     * @param col {@link Column}
     * @since 1.4.0
     */
    drop(col /*: String|Column */) /*: DataFrame */ {
        col = helpers.jobj_from_maybe_string(col);
        return new DataFrame(this.jvm_obj.drop(col));
    }


    /**
     * Returns a new {@link DataFrame} that contains only the unique rows from this {@link DataFrame}.
     * This is an alias for `distinct`.
     * If column names are passed in, rows are only compared in those columns.
     *
     * @param colNames Array of column names.
     * @since 1.4.0
     */
    dropDuplicates(...colNames /*: String* */) /*: DataFrame */ {
        return new DataFrame(this.jvm_obj.drop(...colNames));
    }

    /**
     * Returns a new {@link DataFrame} that contains only the unique rows from this {@link DataFrame}.
     * This is an alias for `dropDuplicates`.
     * @since 1.3.0
     */
    distinct() /*: DataFrame */ {
        return new DataFrame(this.jvm_obj.drop());
    }

    /**
     * Computes statistics for numeric columns, including count, mean, stddev, min, and max.
     * If no columns are given, this computes statistics for all numerical columns.
     *
     * This is meant for exploratory data analysis, as we make no guarantee about the
     * backward compatibility of the schema of the resulting {@link DataFrame}. If you want to
     * programmatically compute summary statistics, use the `agg` method instead.
     *
     * @example
     *   df.describe("age", "height").show();
     *
     *   // output:
     *   // summary age   height
     *   // count   10.0  10.0
     *   // mean    53.3  178.05
     *   // stddev  11.6  15.7
     *   // min     18.0  163.0
     *   // max     92.0  192.0
     *
     * @param colNames Array of column names.
     *
     * @since 1.3.1
     */
    describe(...colNames /*: String* */) /*: DataFrame */ {
        return new DataFrame(this.jvm_obj.describe(...colNames));
    }

    /**
     * Returns the first `n` rows.
     *
     * Running head requires moving data into the application's driver process, and doing so with
     * a very large `n` can crash the driver process with OutOfMemoryError.
     *
     * @param {Number} [n=1] Number of rows to return.
     * @param cb Node-style callback function (error-first).
     *
     * @since 1.3.0
     */
    head(cb /*: cb: (err: any, res: Array[Object]) =>  any */, n=1 /*: Int */) /*: void */ {
        let jvm_cb =  (err, jvm_rows) => {
            if (err) return cb(err);
            cb(err, jvm_rows.map(row => new Row(row).values()));
        };

        return this.jvm_obj.headAsync(n, jvm_cb);
    }

    /**
     * The synchronous version of {@link DataFrame#head}
     *
     * @param {Number} [n=1] Number of rows to return.
     *
     * @since 1.3.0
     */
    headSync(n=1 /*: Int */) /*: Array[Object]*/ {
        return this.jvm_obj.head(n)
            .map(row => new Row(row).values());
    }

    /**
     * Returns an array that contains all of {@link Row}s in this {@link DataFrame}.
     *
     * Running collect requires moving all the data into the application's driver process, and
     * doing so on a very large dataset can crash the driver process with OutOfMemoryError.
     *
     * @param cb Node-style callback function (error-first).
     * @since 1.3.0
     */
    collect(cb /*: cb: (err: any, res: Array[Object]) =>  any */) /*: void*/ {
        let jvm_cb =  (err, jvm_rows) => {
            if (err) return cb(err);
            cb(err, jvm_rows.map(row => new Row(row).values()));
        };
        this.jvm_obj.collectAsync(jvm_cb);
    }

    /**
     * The synchronous version of {@link DataFrame#collect}
     *
     * @since 1.3.0
     */
    collectSync() /*: Array[Object]*/ {
        return this.jvm_obj.collect()
            .map(row => new Row(row).values());
    }

    /**
     * Returns the number of rows in the {@link DataFrame}.
     *
     * @param cb Node-style callback function (error-first).
     * @since 1.3.0
     */
    count(cb /*: cb: (err: any, res: Number) =>  any */) /*: void*/ {
        let jvm_cb = (err, res) => {
            if (err) return cb(err);
            cb(err, res.valueOf());
        };

        this.jvm_obj.countAsync(jvm_cb);
    }

    /**
     * The synchronous version of {@link DataFrame#count}
     * @since 1.3.0
     */
    countSync() /*: Number */ {
        return this.jvm_obj.count().valueOf();
    }

    /**
     * Returns a partitioned {@link DataFrame}.
     *
     * If partition expressions are provided, partition by the given
     * partitioning expressions into `numPartitions`. The resulting DataFrame
     * is hash partitioned. (This is the same operation as "DISTRIBUTE BY" in
     * SQL (Hive QL).)
     *
     * @param {Number} [numPartitions] Number of partitions.
     * @param partitionExprs Partitioning expressions.
     * @since 1.3.0
     */
    repartition(numPartitions /*: Int */, ...partitionExprs /*: Column* */) /*: DataFrame */ {
        if (partitionExprs.length===0) {
            return new DataFrame(this.jvm_obj.repartition(numPartitions));
        } else {
            return new DataFrame(this.jvm_obj.repartition(numPartitions, partitionExprs));
        }
    }



    /**
     * Registers this {@link DataFrame} as a temporary table using the given name.  The lifetime of this
     * temporary table is tied to the {@link SQLContext} that was used to create this DataFrame.
     *
     * @param tableName Table name.
     * @since 1.3.0
     */
    registerTempTable(tableName /*: String */) /*: Unit */ {
        this.jvm_obj.registerTempTable(tableName);
    }

    /**
     * Interface for saving the content of the {@link DataFrame} out into external storage.
     * @since 1.4.0
     */
    write() /*: DataFrameWriter*/ {
        return new DataFrameWriter(this);
    }
}

module.exports = DataFrame;