diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java index af944fce750..99c4b9c2ecb 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java @@ -55,16 +55,16 @@ public class CompressionSettings { /** * The sampling ratio used when choosing ColGroups. Note that, default behavior is to use exact estimator if the * number of elements is below 1000. - * + * * DEPRECATED */ public final double samplingRatio; /** * The sampling ratio power to use when choosing sample size. This is used in accordance to the function: - * + * * sampleSize += nRows^samplePower; - * + * * The value is bounded to be in the range of 0 to 1, 1 giving a sample size of everything, and 0 adding 1. */ public final double samplePower; @@ -114,8 +114,9 @@ public class CompressionSettings { /** * Transpose input matrix, to optimize access when extracting bitmaps. This setting is changed inside the script * based on the transposeInput setting. - * - * This is intentionally left as a mutable value, since the transposition of the input matrix is decided in phase 3. + * + * This is intentionally left as a mutable value, since the transposition of the input matrix is decided in phase + * 3. */ public boolean transposed = false; @@ -135,6 +136,19 @@ public class CompressionSettings { public final boolean preferDeltaEncoding; + // Handling Targetloss for piecewise linear Kompression + + private double piecewiseTargetLoss = Double.NaN; + + public void setPiecewiseTargetLoss(double piecewiseTargetLoss) { + this.piecewiseTargetLoss = piecewiseTargetLoss; + + } + + public double getPiecewiseTargetLoss() { + return piecewiseTargetLoss; + } + protected CompressionSettings(double samplingRatio, double samplePower, boolean allowSharedDictionary, String transposeInput, int seed, boolean lossy, EnumSet validCompressions, boolean sortValuesByLength, PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage, @@ -161,7 +175,7 @@ protected CompressionSettings(double samplingRatio, double samplePower, boolean this.sdcSortType = sdcSortType; this.scaleFactors = scaleFactors; this.preferDeltaEncoding = preferDeltaEncoding; - + if(!printedStatus && LOG.isDebugEnabled()) { printedStatus = true; LOG.debug(this.toString()); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java index 003703f86a4..07382ed932b 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java @@ -55,7 +55,7 @@ /** * Abstract Class that is the lowest class type for the Compression framework. - * + * * AColGroup store information about a number of columns. * */ @@ -65,7 +65,8 @@ public abstract class AColGroup implements Serializable { /** Public super types of compression ColGroups supported */ public static enum CompressionType { - UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, DeltaDDC, LinearFunctional; + UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, DeltaDDC, LinearFunctional, PiecewiseLinear, + PiecewiseLinearSukzessive; public boolean isDense() { return this == DDC || this == CONST || this == DDCFOR || this == DDCFOR; @@ -82,12 +83,12 @@ public boolean isSDC() { /** * Concrete ColGroupType - * + * * Protected such that outside the ColGroup package it should be unknown which specific subtype is used. */ protected static enum ColGroupType { UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DeltaDDC, - LinearFunctional; + LinearFunctional, PiecewiseLinear; } /** The ColGroup indexes contained in the ColGroup */ @@ -95,7 +96,7 @@ protected static enum ColGroupType { /** * Main constructor. - * + * * @param colIndices offsets of the columns in the matrix block that make up the group */ protected AColGroup(IColIndex colIndices) { @@ -104,7 +105,7 @@ protected AColGroup(IColIndex colIndices) { /** * Obtain the offsets of the columns in the matrix block that make up the group - * + * * @return offsets of the columns in the matrix block that make up the group */ public final IColIndex getColIndices() { @@ -113,7 +114,7 @@ public final IColIndex getColIndices() { /** * Obtain the number of columns in this column group. - * + * * @return number of columns in this column group */ public final int getNumCols() { @@ -124,9 +125,9 @@ public final int getNumCols() { * Shift all column indexes contained by an offset. * * This is used for rbind to combine compressed matrices. - * + * * Since column indexes are reused between operations, we allocate a new list here to be safe - * + * * @param offset The offset to move all columns * @return A new column group object with the shifted columns */ @@ -138,7 +139,7 @@ public final AColGroup shiftColIndices(int offset) { * Copy the content of the column group with pointers to the previous content but with new column given Note this * method does not verify if the colIndexes specified are valid and correct dimensions for the underlying column * groups. - * + * * @param colIndexes the new indexes to use in the copy * @return a new object with pointers to underlying data. */ @@ -146,7 +147,7 @@ public final AColGroup shiftColIndices(int offset) { /** * Get the upper bound estimate of in memory allocation for the column group. - * + * * @return an upper bound on the number of bytes used to store this ColGroup in memory. */ public long estimateInMemorySize() { @@ -157,9 +158,9 @@ public long estimateInMemorySize() { /** * Decompress a range of rows into a sparse block - * + * * Note that this is using append, so the sparse column indexes need to be sorted afterwards. - * + * * @param sb Sparse Target block * @param rl Row to start at * @param ru Row to end at @@ -170,7 +171,7 @@ public final void decompressToSparseBlock(SparseBlock sb, int rl, int ru) { /** * Decompress a range of rows into a dense block - * + * * @param db Dense target block * @param rl Row to start at * @param ru Row to end at @@ -181,7 +182,7 @@ public final void decompressToDenseBlock(DenseBlock db, int rl, int ru) { /** * Decompress a range of rows into a dense transposed block. - * + * * @param db Dense target block * @param rl Row in this column group to start at. * @param ru Row in this column group to end at. @@ -191,7 +192,7 @@ public final void decompressToDenseBlock(DenseBlock db, int rl, int ru) { /** * Decompress the column group to the sparse transposed block. Note that the column groups would only need to * decompress into specific sub rows of the Sparse block - * + * * @param sb Sparse target block * @param nColOut The number of columns in the sb. */ @@ -199,7 +200,7 @@ public final void decompressToDenseBlock(DenseBlock db, int rl, int ru) { /** * Serializes column group to data output. - * + * * @param out data output * @throws IOException if IOException occurs */ @@ -212,7 +213,7 @@ protected void write(DataOutput out) throws IOException { /** * Returns the exact serialized size of column group. This can be used for example for buffer preallocation. - * + * * @return exact serialized size for column group */ public long getExactSizeOnDisk() { @@ -225,11 +226,11 @@ public long getExactSizeOnDisk() { /** * Slice out the columns within the range of cl and cu to remove the dictionary values related to these columns. If * the ColGroup slicing from does not contain any columns within the range null is returned. - * + * * @param cl The lower bound of the columns to select * @param cu The upper bound of the columns to select (not inclusive). * @return A cloned Column Group, with a copied pointer to the old column groups index structure, but reduced - * dictionary and _columnIndexes correctly aligned with the expected sliced compressed matrix. + * dictionary and _columnIndexes correctly aligned with the expected sliced compressed matrix. */ public final AColGroup sliceColumns(int cl, int cu) { if(cl <= _colIndexes.get(0) && cu > _colIndexes.get(_colIndexes.size() - 1)) { @@ -247,10 +248,10 @@ else if(cu - cl == 1) /** * Slice out a single column from the column group. - * + * * @param col The column to slice, the column could potentially not be inside the column group * @return A new column group that is a single column, if the column requested is not in this column group null is - * returned. + * returned. */ public final AColGroup sliceColumn(int col) { int idx = _colIndexes.findIndex(col); @@ -262,11 +263,11 @@ public final AColGroup sliceColumn(int col) { /** * Slice out multiple columns within the interval between the given indexes. - * + * * @param cl The lower column index to slice from * @param cu The upper column index to slice to, (not included) * @return A column group of this containing the columns specified, returns null if the columns specified is not - * contained in the column group + * contained in the column group */ protected final AColGroup sliceMultiColumns(int cl, int cu) { SliceResult sr = _colIndexes.slice(cl, cu); @@ -278,7 +279,7 @@ protected final AColGroup sliceMultiColumns(int cl, int cu) { /** * Compute the column sum of the given list of groups - * + * * @param groups The Groups to sum * @param res The result to put the values into * @param nRows The number of rows in the groups @@ -292,9 +293,9 @@ public static double[] colSum(Collection groups, double[] res, int nR /** * Get the value at a global row/column position. - * + * * In general this performs since a binary search of colIndexes is performed for each lookup. - * + * * @param r row * @param c column * @return value at the row/column position @@ -309,7 +310,7 @@ public double get(int r, int c) { /** * Get the value at a colGroup specific row/column index position. - * + * * @param r row * @param colIdx column index in the _colIndexes. * @return value at the row/column index position @@ -318,16 +319,16 @@ public double get(int r, int c) { /** * Obtain number of distinct tuples in contained sets of values associated with this column group. - * + * * If the column group is uncompressed the number or rows is returned. - * + * * @return the number of distinct sets of values associated with the bitmaps in this column group */ public abstract int getNumValues(); /** * Obtain the compression type. - * + * * @return How the elements of the column group are compressed. */ public abstract CompressionType getCompType(); @@ -335,14 +336,14 @@ public double get(int r, int c) { /** * Internally get the specific type of ColGroup, this could be extracted from the object but that does not allow for * nice switches in the code. - * + * * @return ColGroupType of the object. */ protected abstract ColGroupType getColGroupType(); /** * Decompress into the DenseBlock. (no NNZ handling) - * + * * @param db Target DenseBlock * @param rl Row to start decompression from * @param ru Row to end decompression at (not inclusive) @@ -353,10 +354,10 @@ public double get(int r, int c) { /** * Decompress into the SparseBlock. (no NNZ handling) - * + * * Note this method is allowing to calls to append since it is assumed that the sparse column indexes are sorted * afterwards - * + * * @param sb Target SparseBlock * @param rl Row to start decompression from * @param ru Row to end decompression at (not inclusive) @@ -367,9 +368,9 @@ public double get(int r, int c) { /** * Right matrix multiplication with this column group. - * + * * This method can return null, meaning that the output overlapping group would have been empty. - * + * * @param right The MatrixBlock on the right of this matrix multiplication * @return The new Column Group or null that is the result of the matrix multiplication. */ @@ -379,9 +380,9 @@ public final AColGroup rightMultByMatrix(MatrixBlock right) { /** * Right matrix multiplication with this column group. - * + * * This method can return null, meaning that the output overlapping group would have been empty. - * + * * @param right The MatrixBlock on the right of this matrix multiplication * @param allCols A pre-materialized list of all col indexes, that can be shared across all column groups if use * full, can be set to null. @@ -392,7 +393,7 @@ public final AColGroup rightMultByMatrix(MatrixBlock right) { /** * Right side Matrix multiplication, iterating though this column group and adding to the ret - * + * * @param right Right side matrix to multiply with. * @param ret The return matrix to add results to * @param rl The row of this column group to multiply from @@ -401,18 +402,20 @@ public final AColGroup rightMultByMatrix(MatrixBlock right) { * @param cru The right hand side column upper * @param nRows The number of rows in this column group */ - public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, int ru, int nRows, int crl, int cru){ - throw new NotImplementedException("not supporting right Decompressing Multiply on class: " + this.getClass().getSimpleName()); + public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, int ru, int nRows, int crl, + int cru) { + throw new NotImplementedException( + "not supporting right Decompressing Multiply on class: " + this.getClass().getSimpleName()); } /** * Do a transposed self matrix multiplication on the left side t(x) %*% x. but only with this column group. - * + * * This gives better performance since there is no need to iterate through all the rows of the matrix, but the * execution can be limited to its number of distinct values. - * + * * Note it only calculate the upper triangle - * + * * @param ret The return matrix block [numColumns x numColumns] * @param nRows The number of rows in the column group */ @@ -420,7 +423,7 @@ public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, i /** * Left multiply with this column group. - * + * * @param matrix The matrix to multiply with on the left * @param result The result to output the values into, always dense for the purpose of the column groups * parallelizing @@ -434,7 +437,7 @@ public abstract void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock re /** * Left side matrix multiplication with a column group that is transposed. - * + * * @param lhs The left hand side Column group to multiply with, the left hand side should be considered * transposed. Also it should be guaranteed that this column group is not empty. * @param result The result matrix to insert the result of the multiplication into @@ -444,16 +447,16 @@ public abstract void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock re /** * Matrix multiply with this other column group, but: - * + * * 1. Only output upper triangle values. - * + * * 2. Multiply both ways with "this" being on the left and on the right. - * + * * It should be guaranteed that the input is not the same as the caller of the method. - * + * * The second step is achievable by treating the initial multiplied matrix, and adding its values to the correct * locations in the output. - * + * * @param other The other Column group to multiply with * @param result The result matrix to put the results into */ @@ -462,7 +465,7 @@ public abstract void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock re /** * Perform the specified scalar operation directly on the compressed column group, without decompressing individual * cells if possible. - * + * * @param op operation to perform * @return version of this column group with the operation applied */ @@ -470,7 +473,7 @@ public abstract void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock re /** * Perform a binary row operation. - * + * * @param op The operation to execute * @param v The vector of values to apply the values contained should be at least the length of the highest * value in the column index @@ -481,7 +484,7 @@ public abstract void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock re /** * Short hand add operator call on column group to add a row vector to the column group - * + * * @param v The vector to add * @return A new column group where the vector is added. */ @@ -491,7 +494,7 @@ public AColGroup addVector(double[] v) { /** * Perform a binary row operation. - * + * * @param op The operation to execute * @param v The vector of values to apply the values contained should be at least the length of the highest * value in the column index @@ -503,9 +506,9 @@ public AColGroup addVector(double[] v) { /** * Unary Aggregate operator, since aggregate operators require new object output, the output becomes an uncompressed * matrix. - * + * * The range of rl to ru only applies to row aggregates. (ReduceCol) - * + * * @param op The operator used * @param c The output matrix block * @param nRows The total number of rows in the Column Group @@ -516,9 +519,9 @@ public AColGroup addVector(double[] v) { /** * Slice out column at specific index of this column group. - * + * * It is guaranteed that the column to slice is contained in this columnGroup. - * + * * @param idx The column index to slice out. * @return A new column group containing the columns inside. (never null) */ @@ -526,9 +529,9 @@ public AColGroup addVector(double[] v) { /** * Slice range of columns inside this column group. - * + * * It is guaranteed that the columns to slice is contained in this columnGroup. - * + * * @param idStart The column index to start at * @param idEnd The column index to end at (not included) * @param outputCols The output columns to extract materialized for ease of implementation @@ -538,9 +541,10 @@ public AColGroup addVector(double[] v) { /** * Slice range of rows out of the column group and return a new column group only containing the row segment. - * - * Note that this slice should maintain pointers back to the original dictionaries and only modify index structures. - * + * + * Note that this slice should maintain pointers back to the original dictionaries and only modify index + * structures. + * * @param rl The row to start at * @param ru The row to end at (not included) * @return A new column group containing the specified row range. @@ -549,21 +553,21 @@ public AColGroup addVector(double[] v) { /** * Short hand method for getting minimum value contained in this column group. - * + * * @return The minimum value contained in this ColumnGroup */ public abstract double getMin(); /** * Short hand method for getting maximum value contained in this column group. - * + * * @return The maximum value contained in this ColumnGroup */ public abstract double getMax(); /** * Short hand method for getting the sum of this column group - * + * * @param nRows The number of rows in the column group * @return The sum of this column group */ @@ -571,7 +575,7 @@ public AColGroup addVector(double[] v) { /** * Detect if the column group contains a specific value. - * + * * @param pattern The value to look for. * @return boolean saying true if the value is contained. */ @@ -579,7 +583,7 @@ public AColGroup addVector(double[] v) { /** * Get the number of nonZeros contained in this column group. - * + * * @param nRows The number of rows in the column group, this is used for groups that does not contain information * about how many rows they have. * @return The nnz. @@ -588,7 +592,7 @@ public AColGroup addVector(double[] v) { /** * Make a copy of the column group values, and replace all values that match pattern with replacement value. - * + * * @param pattern The value to look for * @param replace The value to replace the other value with * @return A new Column Group, reusing the index structure but with new values. @@ -597,7 +601,7 @@ public AColGroup addVector(double[] v) { /** * Compute the column sum - * + * * @param c The array to add the column sum to. * @param nRows The number of rows in the column group. */ @@ -605,7 +609,7 @@ public AColGroup addVector(double[] v) { /** * Central Moment instruction executed on a column group. - * + * * @param op The Operator to use. * @param nRows The number of rows contained in the ColumnGroup. * @return A Central Moment object. @@ -614,7 +618,7 @@ public AColGroup addVector(double[] v) { /** * Expand the column group to multiple columns. (one hot encode the column group) - * + * * @param max The number of columns to expand to and cutoff values at. * @param ignore If zero and negative values should be ignored. * @param cast If the double values contained should be cast to whole numbers. @@ -625,7 +629,7 @@ public AColGroup addVector(double[] v) { /** * Get the computation cost associated with this column group. - * + * * @param e The computation cost estimator * @param nRows the number of rows in the column group * @return The cost of this column group @@ -634,7 +638,7 @@ public AColGroup addVector(double[] v) { /** * Perform unary operation on the column group and return a new column group - * + * * @param op The operation to perform * @return The new column group */ @@ -642,19 +646,19 @@ public AColGroup addVector(double[] v) { /** * Get if the group is only containing zero - * + * * @return true if empty */ public abstract boolean isEmpty(); /** - * Append the other column group to this column group. This method tries to combine them to return a new column group - * containing both. In some cases it is possible in reasonable time, in others it is not. - * + * Append the other column group to this column group. This method tries to combine them to return a new column + * group containing both. In some cases it is possible in reasonable time, in others it is not. + * * The result is first this column group followed by the other column group in higher row values. - * + * * If it is not possible or very inefficient null is returned. - * + * * @param g The other column group * @return A combined column group or null */ @@ -662,9 +666,9 @@ public AColGroup addVector(double[] v) { /** * Append all column groups in the list provided together in one go allocating the output once. - * + * * If it is not possible or very inefficient null is returned. - * + * * @param groups The groups to combine. * @param blen The normal number of rows in the groups * @param rlen The total number of rows of all combined. @@ -676,11 +680,11 @@ public static AColGroup appendN(AColGroup[] groups, int blen, int rlen) { /** * Append all column groups in the list provided together with this. - * + * * A Important detail is the first entry in the group == this, and should not be appended twice. - * + * * If it is not possible or very inefficient null is returned. - * + * * @param groups The groups to combine. * @param blen The normal number of rows in the groups * @param rlen The total number of rows of all combined. @@ -690,7 +694,7 @@ public static AColGroup appendN(AColGroup[] groups, int blen, int rlen) { /** * Get the compression scheme for this column group to enable compression of other data. - * + * * @return The compression scheme of this column group */ public abstract ICLAScheme getCompressionScheme(); @@ -704,14 +708,14 @@ public void clear() { /** * Recompress this column group into a new column group. - * + * * @return A new or the same column group depending on optimization goal. */ public abstract AColGroup recompress(); /** * Recompress this column group into a new column group of the given type. - * + * * @param ct The compressionType that the column group should morph into * @param nRow The number of rows in this columngroup. * @return A new column group @@ -741,7 +745,7 @@ else if(ct == CompressionType.UNCOMPRESSED) { /** * Get the compression info for this column group. - * + * * @param nRow The number of rows in this column group. * @return The compression info for this group. */ @@ -749,7 +753,7 @@ else if(ct == CompressionType.UNCOMPRESSED) { /** * Combine this column group with another - * + * * @param other The other column group to combine with. * @param nRow The number of rows in both column groups. * @return A combined representation as a column group. @@ -760,7 +764,7 @@ public AColGroup combine(AColGroup other, int nRow) { /** * Get encoding of this column group. - * + * * @return The encoding of the index structure. */ public IEncode getEncoding() { @@ -781,19 +785,19 @@ public AColGroup sortColumnIndexes() { /** * Perform row sum on the internal dictionaries, and return the same index structure. - * + * * This method returns null on empty column groups. - * + * * Note this method does not guarantee correct behavior if the given group is AMorphingGroup, instead it should be * morphed to a valid columngroup via extractCommon first. - * + * * @return The reduced colgroup. */ public abstract AColGroup reduceCols(); /** * Selection (left matrix multiply) - * + * * @param selection A sparse matrix with "max" a single one in each row all other values are zero. * @param points The coordinates in the selection matrix to extract. * @param ret The MatrixBlock to decompress the selected rows into @@ -806,17 +810,17 @@ public final void selectionMultiply(MatrixBlock selection, P[] points, MatrixBlo else denseSelection(selection, points, ret, rl, ru); } - + /** * Get an approximate sparsity of this column group - * + * * @return the approximate sparsity of this columngroup */ public abstract double getSparsity(); /** * Sparse selection (left matrix multiply) - * + * * @param selection A sparse matrix with "max" a single one in each row all other values are zero. * @param points The coordinates in the selection matrix to extract. * @param ret The Sparse MatrixBlock to decompress the selected rows into @@ -827,7 +831,7 @@ public final void selectionMultiply(MatrixBlock selection, P[] points, MatrixBlo /** * Dense selection (left matrix multiply) - * + * * @param selection A sparse matrix with "max" a single one in each row all other values are zero. * @param points The coordinates in the selection matrix to extract. * @param ret The Dense MatrixBlock to decompress the selected rows into @@ -839,7 +843,7 @@ public final void selectionMultiply(MatrixBlock selection, P[] points, MatrixBlo /** * Method to determine if the columnGroup have the same index structure as another. Note that the column indexes and * dictionaries are allowed to be different. - * + * * @param that the other column group * @return if the index is the same. */ @@ -850,7 +854,7 @@ public boolean sameIndexStructure(AColGroup that) { /** * C bind the list of column groups with this column group. the list of elements provided in the index of each list * is guaranteed to have the same index structures - * + * * @param nRow The number of rows contained in all right and this column group. * @param nCol The number of columns to shift the right hand side column groups over when combining, this should * only effect the column indexes @@ -888,7 +892,7 @@ public AColGroup combineWithSameIndex(int nRow, int nCol, List right) /** * C bind the given column group to this. - * + * * @param nRow The number of rows contained in the right and this column group. * @param nCol The number of columns in this. * @param right The column group to c-bind. @@ -928,16 +932,16 @@ protected IColIndex combineColIndexes(final int nCol, List right) { /** * This method returns a list of column groups that are naive splits of this column group as if it is reshaped. - * + * * This means the column groups rows are split into x number of other column groups where x is the multiplier. - * + * * The indexes are assigned round robbin to each of the output groups, meaning the first index is assigned. - * + * * If for instance the 4. column group is split by a 2 multiplier and there was 5 columns in total originally. The * output becomes 2 column groups at column index 4 and one at 9. - * + * * If possible the split column groups should reuse pointers back to the original dictionaries! - * + * * @param multiplier The number of column groups to split into * @param nRow The number of rows in this column group in case the underlying column group does not know * @param nColOrg The number of overall columns in the host CompressedMatrixBlock. @@ -947,25 +951,25 @@ protected IColIndex combineColIndexes(final int nCol, List right) { /** * This method returns a list of column groups that are naive splits of this column group as if it is reshaped. - * + * * This means the column groups rows are split into x number of other column groups where x is the multiplier. - * + * * The indexes are assigned round robbin to each of the output groups, meaning the first index is assigned. - * + * * If for instance the 4. column group is split by a 2 multiplier and there was 5 columns in total originally. The * output becomes 2 column groups at column index 4 and one at 9. - * + * * If possible the split column groups should reuse pointers back to the original dictionaries! - * + * * This specific variation is pushing down the parallelization given via the executor service provided. If not * overwritten the default is to call the normal split reshape - * + * * @param multiplier The number of column groups to split into * @param nRow The number of rows in this column group in case the underlying column group does not know * @param nColOrg The number of overall columns in the host CompressedMatrixBlock * @param pool The executor service to submit parallel tasks to - * @throws Exception In case there is an error we throw the exception out instead of handling it * @return a list of split column groups + * @throws Exception In case there is an error we throw the exception out instead of handling it */ public AColGroup[] splitReshapePushDown(final int multiplier, final int nRow, final int nColOrg, final ExecutorService pool) throws Exception { diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java index 273df9ff26f..dfec14d2704 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java @@ -43,6 +43,7 @@ import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; import org.apache.sysds.runtime.compress.colgroup.functional.LinearRegression; +import org.apache.sysds.runtime.compress.colgroup.functional.PiecewiseLinearUtils; import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; import org.apache.sysds.runtime.compress.colgroup.insertionsort.AInsertionSorter; @@ -106,7 +107,7 @@ private ColGroupFactory(MatrixBlock in, CompressedSizeInfo csi, CompressionSetti /** * The actual compression method, that handles the logic of compressing multiple columns together. - * + * * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify * this. * @param csi The compression information extracted from the estimation, this contains which groups of columns to @@ -120,7 +121,7 @@ public static List compressColGroups(MatrixBlock in, CompressedSizeIn /** * The actual compression method, that handles the logic of compressing multiple columns together. - * + * * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify * this. * @param csi The compression information extracted from the estimation, this contains which groups of columns to @@ -135,7 +136,7 @@ public static List compressColGroups(MatrixBlock in, CompressedSizeIn } /** - * + * * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify * this. * @param csi The compression information extracted from the estimation, this contains which groups of columns to @@ -232,8 +233,9 @@ private void logEstVsActual(double time, AColGroup act, CompressedSizeInfoColGro time, retType, estC, actC, act.getNumValues(), cols, wanted, warning)); } else { - LOG.debug(String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s", - time, retType, estC, actC, act.getNumValues(), cols, wanted)); + LOG.debug( + String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s", time, + retType, estC, actC, act.getNumValues(), cols, wanted)); } } @@ -303,6 +305,12 @@ else if(ct == CompressionType.LinearFunctional) { return compressLinearFunctional(colIndexes, in, cs); } } + else if(ct == CompressionType.PiecewiseLinear) { + return compressPiecewiseLinearFunctional(colIndexes, in, cs); + } + else if(ct == CompressionType.PiecewiseLinearSukzessive) { + return compressPiecewiseLinearFunctionalSukzessive(colIndexes, in, cs); + } else if(ct == CompressionType.DDCFOR) { AColGroup g = directCompressDDC(colIndexes, cg); if(g instanceof ColGroupDDC) @@ -698,7 +706,7 @@ private AColGroup directCompressDeltaDDC(IColIndex colIndexes, CompressedSizeInf if(cs.scaleFactors != null) { throw new NotImplementedException("Delta encoding with quantization not yet implemented"); } - + if(colIndexes.size() > 1) { return directCompressDeltaDDCMultiCol(colIndexes, cg); } @@ -730,7 +738,7 @@ private AColGroup directCompressDeltaDDCSingleCol(IColIndex colIndexes, Compress if(map.size() == 0) return new ColGroupEmpty(colIndexes); - + final double[] dictValues = map.getDictionary(); IDictionary dict = new DeltaDictionary(dictValues, 1); @@ -739,7 +747,8 @@ private AColGroup directCompressDeltaDDCSingleCol(IColIndex colIndexes, Compress return ColGroupDeltaDDC.create(colIndexes, dict, resData, null); } - private AColGroup directCompressDeltaDDCMultiCol(IColIndex colIndexes, CompressedSizeInfoColGroup cg) throws Exception { + private AColGroup directCompressDeltaDDCMultiCol(IColIndex colIndexes, CompressedSizeInfoColGroup cg) + throws Exception { final AMapToData d = MapToFactory.create(nRow, Math.max(Math.min(cg.getNumOffs() + 1, nRow), 126)); final int fill = d.getUpperBoundValue(); d.fill(fill); @@ -818,8 +827,8 @@ private boolean readToMapDDC(IColIndex colIndexes, DblArrayCountHashMap map, AMa int fill) { ReaderColumnSelection reader = (cs.scaleFactors == null) ? ReaderColumnSelection.createReader(in, colIndexes, - cs.transposed, rl, - ru) : ReaderColumnSelection.createQuantizedReader(in, colIndexes, cs.transposed, rl, ru, cs.scaleFactors); + cs.transposed, rl, ru) : ReaderColumnSelection.createQuantizedReader(in, colIndexes, cs.transposed, rl, ru, + cs.scaleFactors); DblArray cellVals = reader.nextRow(); boolean extra = false; @@ -1066,6 +1075,53 @@ private static AColGroup compressLinearFunctional(IColIndex colIndexes, MatrixBl return ColGroupLinearFunctional.create(colIndexes, coefficients, numRows); } + public static AColGroup compressPiecewiseLinearFunctional(IColIndex colIndexes, MatrixBlock in, + CompressionSettings cs) { + + final int numRows = in.getNumRows(); + final int numCols = colIndexes.size(); + int[][] breakpointsPerCol = new int[numCols][]; + double[][] slopesPerCol = new double[numCols][]; + double[][] interceptsPerCol = new double[numCols][]; + + for(int col = 0; col < numCols; col++) { + final int colIdx = colIndexes.get(col); + double[] column = PiecewiseLinearUtils.getColumn(in, colIdx); + PiecewiseLinearUtils.SegmentedRegression fit = PiecewiseLinearUtils.compressSegmentedLeastSquares(column, + cs); + breakpointsPerCol[col] = fit.getBreakpoints(); + interceptsPerCol[col] = fit.getIntercepts(); + slopesPerCol[col] = fit.getSlopes(); + + } + return ColGroupPiecewiseLinearCompressed.create(colIndexes, breakpointsPerCol, slopesPerCol, interceptsPerCol, + numRows); + + } + + public static AColGroup compressPiecewiseLinearFunctionalSukzessive(IColIndex colIndexes, MatrixBlock in, + CompressionSettings cs) { + final int numRows = in.getNumRows(); + final int numCols = colIndexes.size(); + int[][] breakpointsPerCol = new int[numCols][]; + double[][] slopesPerCol = new double[numCols][]; + double[][] interceptsPerCol = new double[numCols][]; + + for(int col = 0; col < numCols; col++) { + final int colIdx = colIndexes.get(col); + double[] column = PiecewiseLinearUtils.getColumn(in, colIdx); + PiecewiseLinearUtils.SegmentedRegression fit = PiecewiseLinearUtils.compressSukzessivePiecewiseLinear(column, + cs); + breakpointsPerCol[col] = fit.getBreakpoints(); + interceptsPerCol[col] = fit.getIntercepts(); + slopesPerCol[col] = fit.getSlopes(); + + } + return ColGroupPiecewiseLinearCompressed.create(colIndexes, breakpointsPerCol, slopesPerCol, interceptsPerCol, + numRows); + + } + private AColGroup compressSDCFromSparseTransposedBlock(IColIndex cols, int nrUniqueEstimate, double tupleSparsity) { if(cols.size() > 1) return compressMultiColSDCFromSparseTransposedBlock(cols, nrUniqueEstimate, tupleSparsity); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java new file mode 100644 index 00000000000..d6ad0c6c421 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java @@ -0,0 +1,587 @@ +package org.apache.sysds.runtime.compress.colgroup; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.functionobjects.*; +import org.apache.sysds.runtime.instructions.cp.CmCovObject; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.CMOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.apache.sysds.utils.MemoryEstimates; + +import java.util.Arrays; + +public class ColGroupPiecewiseLinearCompressed extends AColGroupCompressed { + + int[][] breakpointsPerCol; + double[][] slopesPerCol; + double[][] interceptsPerCol; + int numRows; + + protected ColGroupPiecewiseLinearCompressed(IColIndex colIndices) { + super(colIndices); + } + + public ColGroupPiecewiseLinearCompressed(IColIndex colIndices, int[][] breakpoints, double[][] slopes, + double[][] intercepts, int numRows) { + super(colIndices); + this.breakpointsPerCol = breakpoints; + this.slopesPerCol = slopes; + this.interceptsPerCol = intercepts; + this.numRows = numRows; + } + + public static AColGroup create(IColIndex colIndices, int[][] breakpointsPerCol, double[][] slopesPerCol, + double[][] interceptsPerCol, int numRows) { + int expectedCols = colIndices.size(); + if(breakpointsPerCol.length != expectedCols) + throw new IllegalArgumentException( + "bp.length=" + breakpointsPerCol.length + " != colIndices.size()=" + expectedCols); + if(breakpointsPerCol.length != colIndices.size()) + throw new IllegalArgumentException("Need at least one segment"); + + for(int c = 0; c < colIndices.size(); c++) { + if(breakpointsPerCol[c].length < 1 || breakpointsPerCol[c][0] != 0 || + breakpointsPerCol[c][breakpointsPerCol[c].length - 1] != numRows) + throw new IllegalArgumentException( + "Invalid breakpoints for col " + c + ": must start=0, end=numRows, >=1 pts"); + + if(slopesPerCol[c].length != interceptsPerCol[c].length || + slopesPerCol[c].length != breakpointsPerCol[c].length - 1) + throw new IllegalArgumentException("Inconsistent array lengths col " + c); + } + + int numCols = colIndices.size(); + int[][] bpCopy = new int[numCols][]; + double[][] slopeCopy = new double[numCols][]; + double[][] interceptCopy = new double[numCols][]; + + for(int c = 0; c < numCols; c++) { + bpCopy[c] = Arrays.copyOf(breakpointsPerCol[c], breakpointsPerCol[c].length); + slopeCopy[c] = Arrays.copyOf(slopesPerCol[c], slopesPerCol[c].length); + interceptCopy[c] = Arrays.copyOf(interceptsPerCol[c], interceptsPerCol[c].length); + } + + return new ColGroupPiecewiseLinearCompressed(colIndices, bpCopy, slopeCopy, interceptCopy, numRows); + + } + + @Override + public void decompressToDenseBlock(DenseBlock db, int rl, int ru, int offR, int offC) { + if(db == null || _colIndexes == null || _colIndexes.size() == 0 || breakpointsPerCol == null || + slopesPerCol == null || interceptsPerCol == null) { + return; + } + for(int col = 0; col < _colIndexes.size(); col++) { + final int colIndex = _colIndexes.get(col); + int[] breakpoints = breakpointsPerCol[col]; + double[] slopes = slopesPerCol[col]; + double[] intercepts = interceptsPerCol[col]; + // per segment in this column + for(int seg = 0; seg + 1 < breakpoints.length; seg++) { // ← +1 statt length + int segStart = breakpoints[seg]; + int segEnd = breakpoints[seg + 1]; + if(segStart >= segEnd) + continue; + + double currentSlopeInSegment = slopes[seg]; + double currentInterceptInSegment = intercepts[seg]; + + int rowStart = Math.max(segStart, rl); + int rowEnd = Math.min(segEnd, ru); + if(rowStart >= rowEnd) + continue; + + //Fill DenseBlock für this column and Segment + for(int row = rowStart; row < rowEnd; row++) { + double yhat = currentSlopeInSegment * row + currentInterceptInSegment; + int dbRow = offR + row; + int dbCol = offC + colIndex; + + if(dbRow >= 0 && dbRow < db.numRows() && dbCol >= 0 && dbCol < db.numCols()) { + db.set(dbRow, dbCol, yhat); + } + } + + } + + } + } + + public int[][] getBreakpointsPerCol() { + return breakpointsPerCol; + } + + public double[][] getSlopesPerCol() { + return slopesPerCol; + } + + public double[][] getInterceptsPerCol() { + return interceptsPerCol; + } + + @Override + public double getIdx(int r, int colIdx) { + //Check if the rowIDx is valid (safety check) + if(r < 0 || r >= numRows || colIdx < 0 || colIdx >= _colIndexes.size()) { + return 0.0; + } + int[] bps = breakpointsPerCol[colIdx]; + double[] slps = slopesPerCol[colIdx]; + double[] ints = interceptsPerCol[colIdx]; + // Using Binary Search for efficient Search for the right Segment ( finding rowIdx r) + // have to use int higherBound = breakpointsPerCol.length - 2 because it's the last valid segment + int lowerBound = 0; + int higherBound = bps.length - 2; + while(lowerBound <= higherBound) { + int mid = (lowerBound + higherBound) / 2; + if(r < bps[mid + 1]) { + higherBound = mid - 1; + } + else + lowerBound = mid + 1; + } + int segment = Math.min(lowerBound, bps.length - 2); + return slps[segment] * (double) r + ints[segment]; + } + + @Override + public int getNumValues() { + return breakpointsPerCol.length + slopesPerCol.length + interceptsPerCol.length; + } + + @Override + public long getExactSizeOnDisk() { + long ret = super.getExactSizeOnDisk(); + int numCols = _colIndexes.size(); + ret += 8L * numCols * 3; + ret += 24L * 3; + + for(int c = 0; c < numCols; c++) { + ret += (long) MemoryEstimates.intArrayCost(breakpointsPerCol[c].length); + ret += (long) MemoryEstimates.doubleArrayCost(slopesPerCol[c].length); + ret += (long) MemoryEstimates.doubleArrayCost(interceptsPerCol[c].length); + } + + ret += 4L; + return ret; + + } + + @Override + public void computeSum(double[] c, int nRows) { + for(int col = 0; col < _colIndexes.size(); col++) { + double colSum = 0.0; + int[] breakpoints = breakpointsPerCol[col]; + double[] intercepts = interceptsPerCol[col]; + double[] slopes = slopesPerCol[col]; + for(int seg = 0; seg < breakpoints.length - 1; seg++) { + int start = breakpoints[seg], end = breakpoints[seg + 1]; + int len = end - start; + double b = intercepts[seg], m = slopes[seg]; + double sumR = (double) len * (len - 1) / 2.0; + colSum += (double) len * b + m * sumR; + } + c[col] += colSum; + } + } + + @Override + public void computeColSums(double[] c, int nRows) { + computeSum(c, nRows); + } + + @Override + public CompressionType getCompType() { + return CompressionType.PiecewiseLinear; + } + + @Override + protected ColGroupType getColGroupType() { + return ColGroupType.PiecewiseLinear; + } + + @Override + public AColGroup scalarOperation(ScalarOperator op) { + final int numCols = _colIndexes.size(); + double[][] newIntercepts = new double[numCols][]; + double[][] newSlopes = new double[numCols][]; + if(op.fn instanceof Plus || op.fn instanceof Minus) { + for(int col = 0; col < numCols; col++) { + int numSegments = interceptsPerCol[col].length; + newIntercepts[col] = new double[numSegments]; + newSlopes[col] = slopesPerCol[col].clone(); // Unverändert + for(int seg = 0; seg < numSegments; seg++) + newIntercepts[col][seg] = op.executeScalar(interceptsPerCol[col][seg]); + } // shift intercept + } + else if(op.fn instanceof Multiply || op.fn instanceof Divide) { + for(int col = 0; col < numCols; col++) { + int numSegments = interceptsPerCol[col].length; + newIntercepts[col] = new double[numSegments]; + newSlopes[col] = new double[numSegments]; + for(int seg = 0; seg < numSegments; seg++) { + newIntercepts[col][seg] = op.executeScalar(interceptsPerCol[col][seg]); + newSlopes[col][seg] = op.executeScalar(slopesPerCol[col][seg]); + } + }//shift slope and intercept + } + else { + throw new NotImplementedException("Unsupported scalar op"); + } + // new ColGroup because of changed slopes, intercepts + return new ColGroupPiecewiseLinearCompressed(_colIndexes, breakpointsPerCol, newSlopes, newIntercepts, numRows); + } + + @Override + public AColGroup binaryRowOpLeft(BinaryOperator op, double[] v, boolean isRowSafe) { + final int numCols = _colIndexes.size(); + double[][] newIntercepts = new double[numCols][]; + double[][] newSlopes = new double[numCols][]; + if(op.fn instanceof Plus || op.fn instanceof Minus) { + for(int col = 0; col < numCols; col++) { + double rowValue = v[_colIndexes.get(col)]; + int numSeg = interceptsPerCol[col].length; + newIntercepts[col] = new double[numSeg]; + newSlopes[col] = slopesPerCol[col].clone(); + for(int seg = 0; seg < numSeg; seg++) { + newIntercepts[col][seg] = op.fn.execute(rowValue, interceptsPerCol[col][seg]); + } + } + } + else if(op.fn instanceof Multiply || op.fn instanceof Divide) { + for(int col = 0; col < numCols; col++) { + double rowValue = v[_colIndexes.get(col)]; + int numSeg = interceptsPerCol[col].length; + newIntercepts[col] = new double[numSeg]; + newSlopes[col] = new double[numSeg]; + for(int seg = 0; seg < numSeg; seg++) { + newIntercepts[col][seg] = op.fn.execute(rowValue, interceptsPerCol[col][seg]); + newSlopes[col][seg] = op.fn.execute(rowValue, slopesPerCol[col][seg]); + } + } + } + else { + throw new NotImplementedException("Unsupported binary op"); + } + return new ColGroupPiecewiseLinearCompressed(_colIndexes, breakpointsPerCol, newSlopes, newIntercepts, numRows); + } + + @Override + public AColGroup binaryRowOpRight(BinaryOperator op, double[] v, boolean isRowSafe) { + final int numCols = _colIndexes.size(); + double[][] newIntercepts = new double[numCols][]; + double[][] newSlopes = new double[numCols][]; + if(op.fn instanceof Plus || op.fn instanceof Minus) { + for(int col = 0; col < numCols; col++) { + double rowValue = v[_colIndexes.get(col)]; + int numSeg = interceptsPerCol[col].length; + newIntercepts[col] = new double[numSeg]; + newSlopes[col] = slopesPerCol[col].clone(); + for(int seg = 0; seg < numSeg; seg++) { + newIntercepts[col][seg] = op.fn.execute(interceptsPerCol[col][seg], rowValue); + } + } + } + else if(op.fn instanceof Multiply || op.fn instanceof Divide) { + for(int col = 0; col < numCols; col++) { + double rowValue = v[_colIndexes.get(col)]; + int numSeg = interceptsPerCol[col].length; + newIntercepts[col] = new double[numSeg]; + newSlopes[col] = new double[numSeg]; + for(int seg = 0; seg < numSeg; seg++) { + newIntercepts[col][seg] = op.fn.execute(interceptsPerCol[col][seg], rowValue); + newSlopes[col][seg] = op.fn.execute(slopesPerCol[col][seg], rowValue); + } + } + } + else { + throw new NotImplementedException("Unsupported binary op"); + } + return new ColGroupPiecewiseLinearCompressed(_colIndexes, breakpointsPerCol, newSlopes, newIntercepts, numRows); + } + + @Override + public boolean containsValue(double pattern) { + for(int col = 0; col < _colIndexes.size(); col++) { + if(colContainsValue(col, pattern)) + return true; + } + return false; + } + + private boolean colContainsValue(int col, double pattern) { + int[] breakpoints = breakpointsPerCol[col]; + double[] intercepts = interceptsPerCol[col]; + double[] slopes = slopesPerCol[col]; + int numSeg = breakpoints.length - 1; + + for(int seg = 0; seg < numSeg; seg++) { + int start = breakpoints[seg]; + int end = breakpoints[seg + 1]; + int len = end - start; + if(len <= 0) + continue; + + double yIntercept = intercepts[seg]; + double slope = slopes[seg]; + + if(slope == 0.0) { + if(Double.compare(yIntercept, pattern) == 0) + return true; + continue; + } + + if(Double.compare(yIntercept, pattern) == 0) + return true; + + double endVal = yIntercept + slope * (len - 1); + if(Double.compare(endVal, pattern) == 0) + return true; + + double rowIndex = (pattern - yIntercept) / slope; + if(rowIndex > 0 && rowIndex < (len - 1) && Double.compare(yIntercept + slope * rowIndex, pattern) == 0) + return true; + } + return false; + } + + @Override + public AColGroup unaryOperation(UnaryOperator op) { + throw new NotImplementedException(); + } + + @Override + public AColGroup replace(double pattern, double replace) { + throw new NotImplementedException(); + } + + @Override + protected double computeMxx(double c, Builtin builtin) { + throw new NotImplementedException(); + } + + @Override + protected void computeColMxx(double[] c, Builtin builtin) { + throw new NotImplementedException(); + } + + @Override + protected void computeSumSq(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected void computeColSumsSq(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected void computeRowSums(double[] c, int rl, int ru, double[] preAgg) { + throw new NotImplementedException(); + + } + + @Override + protected void computeRowMxx(double[] c, Builtin builtin, int rl, int ru, double[] preAgg) { + throw new NotImplementedException(); + + } + + @Override + protected void computeProduct(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected void computeRowProduct(double[] c, int rl, int ru, double[] preAgg) { + throw new NotImplementedException(); + + } + + @Override + protected void computeColProduct(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected double[] preAggSumRows() { + throw new NotImplementedException(); + } + + @Override + protected double[] preAggSumSqRows() { + throw new NotImplementedException(); + } + + @Override + protected double[] preAggProductRows() { + throw new NotImplementedException(); + } + + @Override + protected double[] preAggBuiltinRows(Builtin builtin) { + throw new NotImplementedException(); + } + + @Override + public boolean sameIndexStructure(AColGroupCompressed that) { + throw new NotImplementedException(); + } + + @Override + protected void tsmm(double[] result, int numColumns, int nRows) { + throw new NotImplementedException(); + + } + + @Override + public AColGroup copyAndSet(IColIndex colIndexes) { + throw new NotImplementedException(); + } + + @Override + public void decompressToDenseBlockTransposed(DenseBlock db, int rl, int ru) { + throw new NotImplementedException(); + + } + + @Override + public void decompressToSparseBlockTransposed(SparseBlockMCSR sb, int nColOut) { + throw new NotImplementedException(); + + } + + @Override + public void decompressToSparseBlock(SparseBlock sb, int rl, int ru, int offR, int offC) { + throw new NotImplementedException(); + + } + + @Override + public AColGroup rightMultByMatrix(MatrixBlock right, IColIndex allCols, int k) { + throw new NotImplementedException(); + } + + @Override + public void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) { + throw new NotImplementedException(); + + } + + @Override + public void leftMultByAColGroup(AColGroup lhs, MatrixBlock result, int nRows) { + throw new NotImplementedException(); + + } + + @Override + public void tsmmAColGroup(AColGroup other, MatrixBlock result) { + throw new NotImplementedException(); + + } + + @Override + protected AColGroup sliceSingleColumn(int idx) { + throw new NotImplementedException(); + } + + @Override + protected AColGroup sliceMultiColumns(int idStart, int idEnd, IColIndex outputCols) { + throw new NotImplementedException(); + } + + @Override + public AColGroup sliceRows(int rl, int ru) { + throw new NotImplementedException(); + } + + @Override + public long getNumberNonZeros(int nRows) { + throw new NotImplementedException(); + } + + @Override + public CmCovObject centralMoment(CMOperator op, int nRows) { + throw new NotImplementedException(); + } + + @Override + public AColGroup rexpandCols(int max, boolean ignore, boolean cast, int nRows) { + throw new NotImplementedException(); + } + + @Override + public double getCost(ComputationCostEstimator e, int nRows) { + throw new NotImplementedException(); + } + + @Override + public AColGroup append(AColGroup g) { + throw new NotImplementedException(); + } + + @Override + protected AColGroup appendNInternal(AColGroup[] groups, int blen, int rlen) { + throw new NotImplementedException(); + } + + @Override + public ICLAScheme getCompressionScheme() { + throw new NotImplementedException(); + } + + @Override + public AColGroup recompress() { + throw new NotImplementedException(); + } + + @Override + public CompressedSizeInfoColGroup getCompressionInfo(int nRow) { + throw new NotImplementedException(); + } + + @Override + protected AColGroup fixColIndexes(IColIndex newColIndex, int[] reordering) { + throw new NotImplementedException(); + } + + @Override + public AColGroup reduceCols() { + throw new NotImplementedException(); + } + + @Override + public double getSparsity() { + throw new NotImplementedException(); + } + + @Override + protected void sparseSelection(MatrixBlock selection, ColGroupUtils.P[] points, MatrixBlock ret, int rl, int ru) { + throw new NotImplementedException(); + } + + @Override + protected void denseSelection(MatrixBlock selection, ColGroupUtils.P[] points, MatrixBlock ret, int rl, int ru) { + throw new NotImplementedException(); + } + + @Override + public AColGroup[] splitReshape(int multiplier, int nRow, int nColOrg) { + throw new NotImplementedException(); + } + +} + diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java new file mode 100644 index 00000000000..5b67cba2173 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java @@ -0,0 +1,244 @@ +package org.apache.sysds.runtime.compress.colgroup.functional; + +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class PiecewiseLinearUtils { + + private PiecewiseLinearUtils() { + + } + + public static final class SegmentedRegression { + private final int[] breakpoints; + private final double[] slopes; + private final double[] intercepts; + + public SegmentedRegression(int[] breakpoints, double[] slopes, double[] intercepts) { + this.breakpoints = breakpoints; + this.slopes = slopes; + this.intercepts = intercepts; + } + + public int[] getBreakpoints() { + return breakpoints; + } + + public double[] getSlopes() { + return slopes; + } + + public double[] getIntercepts() { + return intercepts; + } + } + + public static double[] getColumn(MatrixBlock in, int colIndex) { + final int numRows = in.getNumRows(); + final double[] column = new double[numRows]; + + for(int row = 0; row < numRows; row++) { + column[row] = in.get(row, colIndex); + } + return column; + } + + public static SegmentedRegression compressSegmentedLeastSquares(double[] column, CompressionSettings cs) { + //compute Breakpoints for a Column with dynamic Programming + final List breakpointsList = computeBreakpoints(cs, column); + final int[] breakpoints = breakpointsList.stream().mapToInt(Integer::intValue).toArray(); + + //get values for Regression + final int numSeg = breakpoints.length - 1; + final double[] slopes = new double[numSeg]; + final double[] intercepts = new double[numSeg]; + + // Regress per Segment + for(int seg = 0; seg < numSeg; seg++) { + final int SegStart = breakpoints[seg]; + final int SegEnd = breakpoints[seg + 1]; + + final double[] line = regressSegment(column, SegStart, SegEnd); + slopes[seg] = line[0]; //slope regession line + intercepts[seg] = line[1]; //intercept regression line + } + + return new SegmentedRegression(breakpoints, slopes, intercepts); + } + + public static SegmentedRegression compressSukzessivePiecewiseLinear(double[] column, CompressionSettings cs) { + //compute Breakpoints for a Column with a sukzessive breakpoints algorithm + + final List breakpointsList = computeBreakpointSukzessive(column, cs); + final int[] breakpoints = breakpointsList.stream().mapToInt(Integer::intValue).toArray(); + + //get values for Regression + final int numSeg = breakpoints.length - 1; + final double[] slopes = new double[numSeg]; + final double[] intercepts = new double[numSeg]; + + // Regress per Segment + for(int seg = 0; seg < numSeg; seg++) { + final int segstart = breakpoints[seg]; + final int segEnd = breakpoints[seg + 1]; + final double[] line = regressSegment(column, segstart, segEnd); + slopes[seg] = line[0]; + intercepts[seg] = line[1]; + } + return new SegmentedRegression(breakpoints, slopes, intercepts); + } + + public static List computeBreakpoints(CompressionSettings cs, double[] column) { + final int numElements = column.length; + final double targetMSE = cs.getPiecewiseTargetLoss(); + + // max targetloss + final double sseMax = numElements * targetMSE; + double lambda = 1000.0; // Regulationparam + List bestBreaks = Arrays.asList(0, numElements); + + for(int iter = 0; iter < 20; iter++) { // fixed Iterations + List breaks = computeBreakpointsLambda(column, lambda); + double totalSSE = computeTotalSSE(column, breaks); + + if(totalSSE <= sseMax) { + bestBreaks = breaks; + } + lambda *= 0.8; + } + + return bestBreaks; + } + + public static List computeBreakpointsLambda(double[] column, double lambda) { + final int numRows = column.length; + final double[] costs = new double[numRows + 1]; //min Cost + final int[] prevStart = new int[numRows + 1]; //previous Start + costs[0] = 0.0; + // Find Cost + for(int rowEnd = 1; rowEnd <= numRows; rowEnd++) { + costs[rowEnd] = Double.POSITIVE_INFINITY; + //Test all possible Segment to find the lowest costs + for(int rowStart = 0; rowStart < rowEnd; rowStart++) { + //costs per Segment = current costs + segmentloss + penaltiy + final double costCurrentSegment = computeSegmentCost(column, rowStart, rowEnd); + final double totalCost = costs[rowStart] + costCurrentSegment + lambda; + // Check if it is the better solution + if(totalCost < costs[rowEnd]) { + costs[rowEnd] = totalCost; + prevStart[rowEnd] = rowStart; + } + } + } + //Check the optimal segmentlimits + final List segmentLimits = new ArrayList<>(); + int breakpointIndex = numRows; + while(breakpointIndex > 0) { + segmentLimits.add(breakpointIndex); + breakpointIndex = prevStart[breakpointIndex]; + } + segmentLimits.add(0); + Collections.sort(segmentLimits); + return segmentLimits; + } + + public static double computeSegmentCost(double[] column, int start, int end) { + final int segSize = end - start; + if(segSize <= 1) + return 0.0; + + final double[] ab = regressSegment(column, start, end); //Regressionline + final double slope = ab[0]; + final double intercept = ab[1]; + + double sumSquaredError = 0.0; + for(int i = start; i < end; i++) { + final double rowIdx = i; + final double actualValue = column[i]; + final double predictedValue = slope * rowIdx + intercept; + final double difference = actualValue - predictedValue; + sumSquaredError += difference * difference; + } + return sumSquaredError; + } + + public static double computeTotalSSE(double[] column, List breaks) { + double total = 0.0; + for(int s = 0; s < breaks.size() - 1; s++) { + final int start = breaks.get(s); + final int end = breaks.get(s + 1); + total += computeSegmentCost(column, start, end); + } + return total; + } + + public static double[] regressSegment(double[] column, int start, int end) { + final int numElements = end - start; + if(numElements <= 0) + return new double[] {0.0, 0.0}; + + double sumOfRowIndices = 0, sumOfColumnValues = 0, sumOfRowIndicesSquared = 0, productRowIndexTimesColumnValue = 0; + for(int i = start; i < end; i++) { + final double x = i; + final double y = column[i]; + sumOfRowIndices += x; + sumOfColumnValues += y; + sumOfRowIndicesSquared += x * x; + productRowIndexTimesColumnValue += x * y; + } + + final double numPointsInSegmentDouble = numElements; + final double denominatorForSlope = + numPointsInSegmentDouble * sumOfRowIndicesSquared - sumOfRowIndices * sumOfRowIndices; + final double slope; + final double intercept; + if(denominatorForSlope == 0) { + slope = 0.0; + intercept = sumOfColumnValues / numPointsInSegmentDouble; + } + else { + slope = (numPointsInSegmentDouble * productRowIndexTimesColumnValue - sumOfRowIndices * sumOfColumnValues) / + denominatorForSlope; + intercept = (sumOfColumnValues - slope * sumOfRowIndices) / numPointsInSegmentDouble; + } + return new double[] {slope, intercept}; + } + + public static List computeBreakpointSukzessive(double[] column, CompressionSettings cs) { + final int numElements = column.length; + final double targetMSE = cs.getPiecewiseTargetLoss(); + if(Double.isNaN(targetMSE) || targetMSE <= 0) { + return Arrays.asList(0, numElements); // Fallback one Segment if targetloss is not valid + } + + List breakpoints = new ArrayList<>(); + breakpoints.add(0); // first segment start is always 0 + int currentStart = 0; + + while(currentStart < numElements) { + int bestEnd = numElements; + //Check all possible Ends for this one segment + for(int end = currentStart + 1; end <= numElements; end++) { + double sse = computeSegmentCost(column, currentStart, end); + // Check if the loss for this segment is smaller/egual to the targetloss + double sseMax = (end - currentStart) * targetMSE; + if(sse > sseMax) { + bestEnd = end - 1; + break; + } + } + breakpoints.add(bestEnd); + currentStart = bestEnd; + } + + if(breakpoints.get(breakpoints.size() - 1) != numElements) { + breakpoints.add(numElements); + } + return breakpoints; + } +} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java new file mode 100644 index 00000000000..1672da79704 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java @@ -0,0 +1,959 @@ +package org.apache.sysds.test.component.compress.colgroup; + +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; +import org.apache.sysds.runtime.compress.colgroup.ColGroupPiecewiseLinearCompressed; +import org.apache.sysds.runtime.compress.colgroup.functional.PiecewiseLinearUtils; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.compress.estim.EstimationFactors; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.test.AutomatedTestBase; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import static org.apache.sysds.runtime.compress.colgroup.functional.PiecewiseLinearUtils.*; +import static org.junit.Assert.*; + +public class ColGroupPiecewiseLinearCompressedTest extends AutomatedTestBase { + @Override + public void setUp() { + + } + + private static final long SEED = 42L; + + @Test + public void testCompressPiecewiseLinearFunctionalRandom() { + // Generate random data + final int nrows = 50, ncols = 3; + double[][] data = getRandomMatrix(nrows, ncols, -3, 3, 1.0, SEED); + MatrixBlock in = DataConverter.convertToMatrixBlock(data); + in.allocateDenseBlock(); + + // extract columns + double[][] columns = new double[ncols][nrows]; + for(int c = 0; c < ncols; c++) + for(int r = 0; r < nrows; r++) + columns[c][r] = data[r][c]; + + // create ColIndexes + int[] colArray = {0, 1, 2}; + IColIndex colIndexes = ColIndexFactory.create(colArray); + + // set targetloss + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(25.0); + + // compress + AColGroup result = ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, in, cs); + assertTrue(result instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed plGroup = (ColGroupPiecewiseLinearCompressed) result; + + // check structure + int[][] bp = plGroup.getBreakpointsPerCol(); + assertEquals(3, bp.length); // 3 Spalten + assertEquals(3, colIndexes.size()); + + for(int c = 0; c < ncols; c++) { + assertEquals(0, bp[c][0]); // start with 0 + assertEquals(nrows, bp[c][bp[c].length - 1]); + assertTrue(bp[c].length >= 2); // Mind. 1 Segment + } + + double[][] slopes = plGroup.getSlopesPerCol(); + double[][] intercepts = plGroup.getInterceptsPerCol(); + assertEquals(3, slopes.length); + for(int c = 0; c < ncols; c++) { + assertEquals(bp[c].length - 1, slopes[c].length); + assertEquals(bp[c].length - 1, intercepts[c].length); + } + + // check col indexes shouldnt change + assertEquals(3, plGroup.getColIndices().size()); + + // decompress + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, ncols - 1); + assertFalse(Double.isNaN(recon.get(0, 0))); + } + + private void testCompressStructure(double[][] data) { + final int nrows = data.length, ncols = data[0].length; + MatrixBlock in = DataConverter.convertToMatrixBlock(data); + in.allocateDenseBlock(); + + int[] colArray = new int[ncols]; + for(int i = 0; i < ncols; i++) + colArray[i] = i; + IColIndex colIndexes = ColIndexFactory.create(colArray); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(100.0); + + AColGroup result = ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, in, cs); + assertTrue(result instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed plGroup = (ColGroupPiecewiseLinearCompressed) result; + + int[][] bp = plGroup.getBreakpointsPerCol(); + assertEquals(ncols, bp.length); + for(int c = 0; c < ncols; c++) { + assertEquals(0, bp[c][0]); + assertEquals(nrows, bp[c][bp[c].length - 1]); + } + double[][] slopes = plGroup.getSlopesPerCol(); + assertEquals(ncols, slopes.length); + for(int c = 0; c < ncols; c++) { + assertEquals(bp[c].length - 1, slopes[c].length); + } + assertEquals(ncols, plGroup.getColIndices().size()); + + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, ncols - 1); + } + + @Test + public void testCompressTrendNoise() { + final int nrows = 100, ncols = 2; + Random rng = new Random(SEED); + double[][] data = new double[nrows][ncols]; + + for(int r = 0; r < nrows; r++) { + double trend = 0.05 * r; + for(int c = 0; c < ncols; c++) { + data[r][c] = trend + rng.nextGaussian() * 1.5 + c * 2.0; + } + } + + testCompressStructure(data); + } + + @Test + public void testCompressJumps() { + final int nrows = 80, ncols = 3; + double[][] data = getRandomMatrix(nrows, ncols, -2, 2, 1.0, SEED); + for(int c = 0; c < ncols; c++) { + for(int r = 25; r < 55; r++) + data[r][c] += 8.0; + for(int r = 55; r < nrows; r++) + data[r][c] += 15.0; + } + testCompressStructure(data); + } + + @Test + public void testCompressHighFreq() { + final int nrows = 100, ncols = 50; + Random rng = new Random(SEED); + double[][] data = new double[nrows][ncols]; + for(int r = 0; r < nrows; r++) { + double sine = Math.sin(r * 0.4) * 4.0; + for(int c = 0; c < ncols; c++) { + data[r][c] = sine + rng.nextGaussian() * 0.8 + Math.sin(r * 0.2 + c) * 2.0; + } + } + testCompressStructure(data); + } + + @Test + public void testCompressSingleLowVariance() { + final int nrows = 50, ncols = 1; + double[][] data = getRandomMatrix(nrows, ncols, -1, 1, 1.0, SEED); + testCompressStructure(data); + } + + @Test + public void testCompressSingleColumnStructure() { + double[][] data = getRandomMatrix(50, 1, -1, 1, 1.0, SEED); + testCompressStructure(data); + } + + @Test(expected = NullPointerException.class) // ← Dein realer Crash! + public void testCreateNullBreakpoints() { + IColIndex cols = ColIndexFactory.create(new int[] {0}); + int[][] nullBp = {null}; + ColGroupPiecewiseLinearCompressed.create(cols, nullBp, new double[][] {{1.0}}, new double[][] {{0.0}}, 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateTooFewBreakpoints() { + int[][] singleBp = {new int[] {0}}; + ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), singleBp, + new double[][] {new double[] {1.0}}, new double[][] {new double[] {0.0}}, 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateInconsistentSlopes() { + int[] bp = {0, 5, 10}; + ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), new int[][] {bp}, + new double[][] {new double[] {1.0, 2.0, 3.0}}, new double[][] {new double[] {0.0, 1.0}}, 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateInconsistentIntercepts() { + int[] bp = {0, 5, 10}; + ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), new int[][] {bp}, + new double[][] {new double[] {1.0, 2.0}}, new double[][] {new double[] {0.0}}, 10); + } + + private int findSegment(int[] bps, int r) { + for(int s = 0; s < bps.length - 1; s++) { + if(r < bps[s + 1]) + return s; + } + return bps.length - 2; + } + + @Test + public void testCreateValidMultiSegmentRandom() { + Random rng = new Random(SEED); + final int nrows = 20; + + int[][] bp = {{0, rng.nextInt(5) + 3, rng.nextInt(10) + 8, nrows}, {0, rng.nextInt(8) + 2, nrows}}; + double[][] slopes = {{rng.nextDouble() * 3 - 1.5, rng.nextDouble() * 3 - 1.5, rng.nextDouble() * 3 - 1.5}, + {rng.nextDouble() * 3 - 1.5, rng.nextDouble() * 3 - 1.5}}; + double[][] intercepts = {{rng.nextDouble() * 2 - 1, rng.nextDouble() * 2 - 1, rng.nextDouble() * 2 - 1}, + {rng.nextDouble() * 2 - 1, rng.nextDouble() * 2 - 1}}; + + IColIndex cols = ColIndexFactory.create(new int[] {0, 1}); + AColGroup cg = ColGroupPiecewiseLinearCompressed.create(cols, bp, slopes, intercepts, nrows); + + assertTrue(cg instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed pl = (ColGroupPiecewiseLinearCompressed) cg; + assertNotSame(bp, pl.getBreakpointsPerCol()); + assertEquals(2, pl.getBreakpointsPerCol().length); + + for(int c = 0; c < 2; c++) { + for(int r = 0; r < nrows; r++) { + int seg = findSegment(bp[c], r); + double expected = slopes[c][seg] * r + intercepts[c][seg]; + assertEquals(expected, cg.getIdx(r, c), 1e-8); + } + } + } + + @Test + public void testCreateMultiColumnRandom() { + Random rng = new Random(SEED); + final int nrows = 80, numGlobalCols = 5; + int[] globalCols = {2, 7, 12, 25, 42}; + + int numSegs = rng.nextInt(3) + 1; + int[][] bp = new int[numGlobalCols][numSegs + 1]; + double[][] slopes = new double[numGlobalCols][numSegs]; + double[][] intercepts = new double[numGlobalCols][numSegs]; + + double slope = rng.nextDouble() * 4 - 2; + double intercept = rng.nextDouble() * 4 - 2; + for(int c = 0; c < numGlobalCols; c++) { + bp[c][0] = 0; + bp[c][numSegs] = nrows; + for(int s = 1; s < numSegs; s++) + bp[c][s] = rng.nextInt(nrows - 10) + 5; + Arrays.fill(slopes[c], slope); + Arrays.fill(intercepts[c], intercept); + } + + IColIndex cols = ColIndexFactory.create(globalCols); + AColGroup cg = ColGroupPiecewiseLinearCompressed.create(cols, bp, slopes, intercepts, nrows); + + assertTrue(cg.getNumValues() > 0); + assertEquals(numGlobalCols, cols.size()); + + for(int r = 0; r < nrows; r++) { + double expected = slope * r + intercept; + for(int localC = 0; localC < numGlobalCols; localC++) { + assertEquals(expected, cg.getIdx(r, localC), 1e-8); + } + } + } + + @Test + public void testCreateSingleColumnRandom() { + Random rng = new Random(SEED); + final int nrows = rng.nextInt(30) + 20; + int numSegs = rng.nextInt(3) + 1; + + int[] bp = new int[numSegs + 1]; + bp[0] = 0; + bp[numSegs] = nrows; + for(int s = 1; s < numSegs; s++) + bp[s] = rng.nextInt(nrows / 2) + 5; + + double[] slopes = new double[numSegs]; + double[] intercepts = new double[numSegs]; + for(int s = 0; s < numSegs; s++) { + slopes[s] = rng.nextDouble() * 4 - 2; + intercepts[s] = rng.nextDouble() * 4 - 2; + } + + IColIndex cols = ColIndexFactory.create(new int[] {rng.nextInt(50)}); + int[][] bp2d = {bp}; + double[][] slopes2d = {slopes}; + double[][] ints2d = {intercepts}; + + AColGroup cg = ColGroupPiecewiseLinearCompressed.create(cols, bp2d, slopes2d, ints2d, nrows); + + for(int r = 0; r < nrows; r++) { + int seg = findSegment(bp, r); + double expected = slopes[seg] * r + intercepts[seg]; + assertEquals(expected, cg.getIdx(r, 0), 1e-8); + } + } + + @Test + public void testDecompressToDenseBlock() { + int[][] bp = {{0, 5, 10}}; + double[][] slopes = {{1.0, 2.0}}; + double[][] intercepts = {{0.0, 1.0}}; + int numRows = 10; + + IColIndex cols = ColIndexFactory.create(new int[] {0}); + AColGroup cg = ColGroupPiecewiseLinearCompressed.create(cols, bp, slopes, intercepts, numRows); + + MatrixBlock target = new MatrixBlock(numRows, 1, false); + target.allocateDenseBlock(); + + DenseBlock db = target.getDenseBlock(); + assertNotNull("DenseBlock null?", db); + + cg.decompressToDenseBlock(db, 0, numRows, 0, 0); + + for(int r = 0; r < numRows; r++) { + double expected = (r < 5) ? (1.0 * r + 0.0) : (2.0 * r + 1.0); + assertEquals("Row " + r + " mismatch", expected, db.get(r, 0), 1e-9); + } + + assertEquals(0.0, db.get(0, 0), 1e-9); + assertEquals(4.0, db.get(4, 0), 1e-9); + assertEquals(11.0, db.get(5, 0), 1e-9); + assertEquals(19.0, db.get(9, 0), 1e-9); + } + + private ColGroupPiecewiseLinearCompressed createTestGroup(int numRows) { + int[][] bp = {{0, 5, numRows}}; + double[][] slopes = {{1.0, 3.0}}; + double[][] intercepts = {{0.0, 2.0}}; + return (ColGroupPiecewiseLinearCompressed) ColGroupPiecewiseLinearCompressed.create( + ColIndexFactory.create(new int[] {0}), bp, slopes, intercepts, numRows); + } + + private double computeMSE(MatrixBlock orig, MatrixBlock recon) { + double sumSqErr = 0.0; + final int rows = orig.getNumRows(), cols = orig.getNumColumns(); + DenseBlock origDb = orig.getDenseBlock(); + DenseBlock reconDb = recon.getDenseBlock(); + + for(int r = 0; r < rows; r++) + for(int c = 0; c < cols; c++) { + double diff = origDb.get(r, c) - reconDb.get(r, c); + sumSqErr += diff * diff; + } + return sumSqErr / (rows * cols); + } + + @Test + public void testDecompressRandomMultiCol() { + final int nrows = 50, ncols = 3; + double[][] origData = getRandomMatrix(nrows, ncols, -3, 3, 1.0, SEED); + + int[] colArray = {0, 1, 2}; + IColIndex cols = ColIndexFactory.create(colArray); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(10.0); + + MatrixBlock orig = DataConverter.convertToMatrixBlock(origData); + orig.allocateDenseBlock(); + + AColGroup cg = ColGroupFactory.compressPiecewiseLinearFunctional(cols, orig, cs); + ColGroupPiecewiseLinearCompressed pl = (ColGroupPiecewiseLinearCompressed) cg; + + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + pl.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, ncols - 1); + + double mse = computeMSE(orig, recon); + assertTrue("MSE=" + mse + " > bound 20.0", mse <= 20.0); + } + + @Test + public void testDecompressRandomSingleCol() { + final int nrows = 40, ncols = 1; + double[][] origData = getRandomMatrix(nrows, ncols, -2, 2, 1.0, SEED); + + IColIndex cols = ColIndexFactory.create(new int[] {0}); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(5.0); + + MatrixBlock orig = DataConverter.convertToMatrixBlock(origData); + orig.allocateDenseBlock(); + + AColGroup cg = ColGroupFactory.compressPiecewiseLinearFunctional(cols, orig, cs); + ColGroupPiecewiseLinearCompressed pl = (ColGroupPiecewiseLinearCompressed) cg; + + MatrixBlock recon = new MatrixBlock(nrows, 1, false); + recon.allocateDenseBlock(); + pl.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, 0); + + double mse = computeMSE(orig, recon); + assertTrue("Single-Col MSE=" + mse + " > 8.0", mse <= 8.0); + } + + @Test + public void testDecompressRandomTrend() { + final int nrows = 60, ncols = 2; + Random rng = new Random(SEED); + double[][] origData = new double[nrows][ncols]; + + for(int r = 0; r < nrows; r++) { + double trend = 0.03 * r; + for(int c = 0; c < ncols; c++) { + origData[r][c] = trend + rng.nextGaussian() * 1.2 + c * 1.5; + } + } + + int[] colArray = {0, 1}; + IColIndex cols = ColIndexFactory.create(colArray); + + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(8.0); + + MatrixBlock orig = DataConverter.convertToMatrixBlock(origData); + orig.allocateDenseBlock(); + + AColGroup cg = ColGroupFactory.compressPiecewiseLinearFunctional(cols, orig, cs); + assertTrue(cg instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed pl = (ColGroupPiecewiseLinearCompressed) cg; + + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + pl.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, ncols - 1); + + double mse = computeMSE(orig, recon); + assertTrue("Trend MSE=" + String.format("%.4f", mse) + " > bound 12.0", mse <= 12.0); + + int[][] bp = pl.getBreakpointsPerCol(); + assertEquals(2, bp.length); + for(int c = 0; c < 2; c++) { + assertEquals(0, bp[c][0]); + assertEquals(nrows, bp[c][bp[c].length - 1]); + assertTrue(bp[c].length >= 2); + } + } + + @Test + public void testDecompressRandomJumps() { + final int nrows = 50, ncols = 2; + double[][] origData = getRandomMatrix(nrows, ncols, -2, 2, 1.0, SEED); + + for(int c = 0; c < ncols; c++) { + for(int r = 20; r < 30; r++) + origData[r][c] += 2.0; + for(int r = 35; r < nrows; r++) + origData[r][c] += 7.0; + } + + int[] colArray = {0, 1}; + IColIndex cols = ColIndexFactory.create(colArray); + + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(12.0); + + MatrixBlock orig = DataConverter.convertToMatrixBlock(origData); + orig.allocateDenseBlock(); + + AColGroup cg = ColGroupFactory.compressPiecewiseLinearFunctional(cols, orig, cs); + assertTrue(cg instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed pl = (ColGroupPiecewiseLinearCompressed) cg; + + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + pl.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, ncols - 1); + + double mse = computeMSE(orig, recon); + assertTrue("Jumps MSE=" + String.format("%.4f", mse) + " > bound 18.0", mse <= 18.0); + + int[][] bp = pl.getBreakpointsPerCol(); + assertEquals(2, bp.length); + for(int c = 0; c < 2; c++) { + assertEquals(0, bp[c][0]); + assertEquals(nrows, bp[c][bp[c].length - 1]); + assertTrue(bp[c].length >= 3); + } + } + + private CompressedSizeInfo createTestCompressedSizeInfo() { + IColIndex cols = ColIndexFactory.create(new int[] {0}); + EstimationFactors facts = new EstimationFactors(2, 10); + + CompressedSizeInfoColGroup info = new CompressedSizeInfoColGroup(cols, facts, + AColGroup.CompressionType.PiecewiseLinear); + + List infos = Arrays.asList(info); + CompressedSizeInfo csi = new CompressedSizeInfo(infos); + + return csi; + } + + @Test + public void testCompressPiecewiseLinearViaRealAPI() { + + MatrixBlock in = new MatrixBlock(10, 1, false); + in.allocateDenseBlock(); + for(int r = 0; r < 10; r++) { + in.set(r, 0, r * 0.5); + } + + CompressionSettings cs = new CompressionSettingsBuilder().addValidCompression( + AColGroup.CompressionType.PiecewiseLinear).create(); + + CompressedSizeInfo csi = createTestCompressedSizeInfo(); + + List colGroups = ColGroupFactory.compressColGroups(in, csi, cs); + + boolean hasPiecewise = colGroups.stream().anyMatch(cg -> cg instanceof ColGroupPiecewiseLinearCompressed); + assertTrue(hasPiecewise); + } + + private double computeColumnMSE(MatrixBlock orig, MatrixBlock target, int col) { + final int numRows = orig.getNumRows(); + double totalSSE = 0.0; + final int origStride = orig.getNumColumns(); + final int tgtStride = target.getNumColumns(); + + for(int r = 0; r < numRows; r++) { + double origVal = orig.getDenseBlock().pos(r * origStride + col); + double tgtVal = target.getDenseBlock().pos(r * tgtStride + col); + totalSSE += (origVal - tgtVal) * (origVal - tgtVal); + } + return totalSSE / numRows; + } + + @Test + public void testSukzessiveLinearColumnSingleSegment() { + double[] linearCol = {1.0, 2.0, 3.0, 4.0, 5.0}; + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1e-6); + + List breaks = PiecewiseLinearUtils.computeBreakpointSukzessive(linearCol, cs); + assertEquals("[0, 5]", breaks.toString()); + } + + @Test + public void testSukzessiveNoisyColumnMultipleSegments() { + double[] noisyCol = {1.1, 1.9, 2.2, 10.1, 10.8, 11.3}; + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1.0); + + List breaks = PiecewiseLinearUtils.computeBreakpointSukzessive(noisyCol, cs); + assertTrue(breaks.size() >= 3); + } + + @Test + public void testSukzessiveTargetLossIncreasesSegments() { + double[] colWithJumps = {1, 2, 3, 10, 11, 12, 20, 21, 22}; + CompressionSettings csStrict = new CompressionSettingsBuilder().create(); + csStrict.setPiecewiseTargetLoss(0.01); + + CompressionSettings csLoose = new CompressionSettingsBuilder().create(); + csLoose.setPiecewiseTargetLoss(10.0); + + List strictBreaks = PiecewiseLinearUtils.computeBreakpointSukzessive(colWithJumps, csStrict); + List looseBreaks = PiecewiseLinearUtils.computeBreakpointSukzessive(colWithJumps, csLoose); + + assertTrue(strictBreaks.size() > looseBreaks.size()); + } + + @Test + public void testMultiColumnTargetLossRespected() { + final int rows = 50, cols = 2; + double[][] data = getRandomMatrix(rows, cols, 0, 10, 1.0, 42L); + MatrixBlock orig = DataConverter.convertToMatrixBlock(data); + orig.allocateDenseBlock(); + + IColIndex colIdx = ColIndexFactory.create(0, cols - 1); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1.0); + + AColGroup cg = ColGroupFactory.compressPiecewiseLinearFunctionalSukzessive(colIdx, orig, cs); + + MatrixBlock target = new MatrixBlock(rows, cols, false); + target.allocateDenseBlock(); + + cg.decompressToDenseBlock(target.getDenseBlock(), 0, rows - 1, 0, cols - 1); + + for(int c = 0; c < cols; c++) { + double mse = computeColumnMSE(orig, target, c); + System.out.println("Col " + c + " MSE = " + mse); + assertTrue("Col " + c + " MSE=" + mse + " > target=1.0", mse <= 1.0 + 1e-10); + } + } + + @Test + public void testMultiColumnRandomDecompressLoss() { + final int rows = 60, cols = 3; + double[][] origData = getRandomMatrix(rows, cols, -5, 5, 1.0, SEED); + + IColIndex colIdx = ColIndexFactory.create(0, cols - 1); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(8.0); + + MatrixBlock orig = DataConverter.convertToMatrixBlock(origData); + orig.allocateDenseBlock(); + + AColGroup cg = ColGroupFactory.compressPiecewiseLinearFunctionalSukzessive(colIdx, orig, cs); + + MatrixBlock target = new MatrixBlock(rows, cols, false); + target.allocateDenseBlock(); + cg.decompressToDenseBlock(target.getDenseBlock(), 0, rows, 0, cols - 1); + + for(int c = 0; c < cols; c++) { + double mse = computeColumnMSE(orig, target, c); + assertTrue("Col " + c + " MSE=" + mse + " > bound 15.0", mse <= 15.0); + } + } + + @Test + public void testDecompressRandomTrendJumps() { + final int rows = 80, cols = 2; + Random rng = new Random(42L); + double[][] origData = new double[rows][cols]; + + for(int r = 0; r < rows; r++) { + double trend = 0.04 * r; + for(int c = 0; c < cols; c++) { + origData[r][c] = trend + rng.nextGaussian() * 1.5; + if(r >= 25 && r < 45) + origData[r][c] += 6.0; + if(r >= 60) + origData[r][c] += 10.0; + } + } + + IColIndex colIdx = ColIndexFactory.create(0, cols - 1); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(10.0); + + MatrixBlock orig = DataConverter.convertToMatrixBlock(origData); + orig.allocateDenseBlock(); + + AColGroup cg = ColGroupFactory.compressPiecewiseLinearFunctionalSukzessive(colIdx, orig, cs); + MatrixBlock target = new MatrixBlock(rows, cols, false); + target.allocateDenseBlock(); + cg.decompressToDenseBlock(target.getDenseBlock(), 0, rows, 0, cols - 1); + + for(int c = 0; c < cols; c++) { + double mse = computeColumnMSE(orig, target, c); + assertTrue("Trend+Jumps Col " + c + ": MSE=" + mse + " > 20.0", mse <= 20.0); + } + } + + @Test + public void testDecompressRandomSingleColSukzessive() { + final int rows = 40; + Random rng = new Random(SEED); + double[] origCol = new double[rows]; + + for(int r = 0; r < rows; r++) { + origCol[r] = 0.02 * r + rng.nextGaussian() * 0.8; + } + + double[][] origData = new double[rows][1]; + for(int r = 0; r < rows; r++) + origData[r][0] = origCol[r]; + + IColIndex colIdx = ColIndexFactory.create(new int[] {0}); + + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1.0); + + MatrixBlock orig = DataConverter.convertToMatrixBlock(origData); + orig.allocateDenseBlock(); + + AColGroup cg = ColGroupFactory.compressPiecewiseLinearFunctionalSukzessive(colIdx, orig, cs); + ColGroupPiecewiseLinearCompressed pl = (ColGroupPiecewiseLinearCompressed) cg; + + MatrixBlock target = new MatrixBlock(rows, 1, false); + target.allocateDenseBlock(); + pl.decompressToDenseBlock(target.getDenseBlock(), 0, rows, 0, 0); + + double mse = computeColumnMSE(orig, target, 0); + assertTrue("Single-Col MSE=" + mse + " > 3.0", mse <= 3.0); + + int[][] bp = pl.getBreakpointsPerCol(); + assertEquals(1, bp.length); + assertEquals(0, bp[0][0]); + assertEquals(rows, bp[0][bp[0].length - 1]); + } + + private boolean hasBreakInRange(int[] bps, int min, int max) { + for(int i = 1; i < bps.length - 1; i++) { + if(bps[i] >= min && bps[i] <= max) + return true; + } + return false; + } + + @Test + public void testBreakpointsRandomJump() { + final int len = 30; + double[] col = getRandomColumn(len, SEED); + + for(int r = 10; r < 20; r++) + col[r] += 8.0; + + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(2.0); + + List bps = computeBreakpointSukzessive(col, cs); + int[] bpsArray = bps.stream().mapToInt(Integer::intValue).toArray(); + + assertTrue(" (Segs=" + bps.size() + ")", bps.size() >= 3); + assertTrue("No Break in Jump", hasBreakInRange(bpsArray, 8, 22)); + } + + @Test + public void testGlobalMSE_random() { + final int len = 40; + double[] col = getRandomColumn(len, SEED + 1); + + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1.5); + + List bps = computeBreakpointSukzessive(col, cs); + double totalSSE = 0.0; + for(int i = 0; i < bps.size() - 1; i++) { + totalSSE += computeSegmentCost(col, bps.get(i), bps.get(i + 1)); + } + double mse = totalSSE / col.length; + + assertTrue("Global MSE=" + mse + " > target=" + cs.getPiecewiseTargetLoss(), + mse <= cs.getPiecewiseTargetLoss() + 1e-10); + } + + private double[] getRandomColumn(int len, long seed) { + Random rng = new Random(seed); + double[] col = new double[len]; + for(int i = 0; i < len; i++) + col[i] = rng.nextGaussian() * 2 + i * 0.01; + return col; + } + + @Test + public void testGetExactSizeOnDiskRandom() { + Random rng = new Random(SEED); + final int nrows = 80 + rng.nextInt(40); + + int numSegs = 1 + rng.nextInt(3); + int[] bp = new int[numSegs + 1]; + bp[0] = 0; + bp[numSegs] = nrows; + for(int s = 1; s < numSegs; s++) + bp[s] = rng.nextInt(nrows * 2 / 3) + nrows / 10; + + double[] slopes = new double[numSegs]; + double[] intercepts = new double[numSegs]; + for(int s = 0; s < numSegs; s++) { + slopes[s] = rng.nextDouble() * 4 - 2; + intercepts[s] = rng.nextDouble() * 4 - 2; + } + + IColIndex cols = ColIndexFactory.create(new int[] {rng.nextInt(20)}); + int[][] bp2d = {bp}; + double[][] slopes2d = {slopes}; + double[][] ints2d = {intercepts}; + + AColGroup cg = ColGroupPiecewiseLinearCompressed.create(cols, bp2d, slopes2d, ints2d, nrows); + + long diskSize = cg.getExactSizeOnDisk(); + System.out.println("Single Random: nrows=" + nrows + ", segs=" + numSegs + ", size=" + diskSize); + + assertTrue(diskSize > 0); + assertTrue(cg.getNumValues() > 0); + } + + @Test + public void testMultiColSizeRandom() { + Random rng = new Random(SEED + 1); + final int nrows = 100; + final int numGlobalCols = 3 + rng.nextInt(3); + int[] globalCols = new int[numGlobalCols]; + for(int i = 0; i < numGlobalCols; i++) + globalCols[i] = rng.nextInt(50) + i * 5; + + int[][] bp = new int[numGlobalCols][]; + double[][] slopes = new double[numGlobalCols][]; + double[][] intercepts = new double[numGlobalCols][]; + + for(int c = 0; c < numGlobalCols; c++) { + int numSegs = 1 + rng.nextInt(4); + bp[c] = new int[numSegs + 1]; + bp[c][0] = 0; + bp[c][numSegs] = nrows; + for(int s = 1; s < numSegs; s++) + bp[c][s] = rng.nextInt(nrows * 3 / 4) + nrows / 8; + + slopes[c] = new double[numSegs]; + intercepts[c] = new double[numSegs]; + for(int s = 0; s < numSegs; s++) { + slopes[c][s] = rng.nextDouble() * 3 - 1.5; + intercepts[c][s] = rng.nextDouble() * 3 - 1.5; + } + } + + IColIndex cols = ColIndexFactory.create(globalCols); + AColGroup cg = ColGroupPiecewiseLinearCompressed.create(cols, bp, slopes, intercepts, nrows); + + if(cg instanceof ColGroupPiecewiseLinearCompressed) { + ColGroupPiecewiseLinearCompressed pl = (ColGroupPiecewiseLinearCompressed) cg; + + long diskSize = cg.getExactSizeOnDisk(); + System.out.println("Multi Random: cols=" + numGlobalCols + ", size=" + diskSize); + + assertEquals(numGlobalCols, cols.size()); + assertEquals(numGlobalCols, pl.getBreakpointsPerCol().length); + for(int c = 0; c < numGlobalCols; c++) { + assertEquals(nrows, pl.getBreakpointsPerCol()[c][pl.getBreakpointsPerCol()[c].length - 1]); + } + assertTrue(diskSize > 0); + } + + } + + private ColGroupPiecewiseLinearCompressed createTestColGroup() { + int[][] bps = {{0, 2, 6}, // Col 0: Seg1(len=2), Seg2(len=4) + {0, 3, 6} // Col 1: Seg1(len=3), Seg2(len=3) + }; + double[][] ints = {{1.0, 3.0}, // Col 0 intercepts + {2.0, 4.0} // Col 1 intercepts + }; + double[][] slps = {{0.5, 1.0}, // Col 0 slopes + {0.0, 2.0} // Col 1 slopes + }; + return new ColGroupPiecewiseLinearCompressed(ColIndexFactory.create(0, 2), bps, slps, ints, 6); + } + + @Test + public void testComputeSum() { + ColGroupPiecewiseLinearCompressed cg = createTestColGroup(); + double[] c = new double[2]; + cg.computeSum(c, 6); + assertEquals(20.5, c[0], 1e-8); + assertEquals(24.0, c[1], 1e-8); + } + + @Test + public void testComputeColSums() { + ColGroupPiecewiseLinearCompressed cg = createTestColGroup(); + double[] c = new double[2]; + + cg.computeColSums(c, 6); + assertEquals(20.5, c[0], 1e-8); + assertEquals(24.0, c[1], 1e-8); + } + + @Test + public void testSingleColumn() { + int[][] bps1 = {{0, 3}}; + double[][] ints1 = {{1.0}}; + double[][] slps1 = {{2.0}}; + ColGroupPiecewiseLinearCompressed cg1 = new ColGroupPiecewiseLinearCompressed(ColIndexFactory.create(0, 1), + bps1, slps1, ints1, 3); + + RightScalarOperator plus5 = new RightScalarOperator(Plus.getPlusFnObject(), 5.0); + AColGroup result = cg1.scalarOperation(plus5); + + ColGroupPiecewiseLinearCompressed plResult = (ColGroupPiecewiseLinearCompressed) result; + assertEquals(6.0, plResult.getInterceptsPerCol()[0][0], 1e-8); + double[] origSum = new double[1]; + cg1.computeSum(origSum, 3); + double[] newSum = new double[1]; + ((ColGroupPiecewiseLinearCompressed) result).computeSum(newSum, 3); + assertEquals(origSum[0] + 5.0 * 3, newSum[0], 1e-8); + } + + @Test + public void testScalarPlus() { + ColGroupPiecewiseLinearCompressed cg = createTestColGroup(); + RightScalarOperator plus2 = new RightScalarOperator(Plus.getPlusFnObject(), 2.0); + ColGroupPiecewiseLinearCompressed result = (ColGroupPiecewiseLinearCompressed) cg.scalarOperation(plus2); + ColGroupPiecewiseLinearCompressed plResult = (ColGroupPiecewiseLinearCompressed) result; + + assertArrayEquals(new double[] {0.5, 1.0}, plResult.getSlopesPerCol()[0], 1e-8); + assertArrayEquals(new double[] {0.0, 2.0}, plResult.getSlopesPerCol()[1], 1e-8); + + assertArrayEquals(new double[] {3.0, 5.0}, plResult.getInterceptsPerCol()[0], 1e-8); + assertArrayEquals(new double[] {4.0, 6.0}, plResult.getInterceptsPerCol()[1], 1e-8); + + double[] origSums = new double[2]; + cg.computeSum(origSums, 6); + double[] newSums = new double[2]; + result.computeSum(newSums, 6); + assertEquals(origSums[0] + 12.0, newSums[0], 1e-8); + assertEquals(origSums[1] + 12.0, newSums[1], 1e-8); + } + + @Test + public void testBinaryRowOpLeftMultiply() { + ColGroupPiecewiseLinearCompressed cg = createTestColGroup(); + double[] v = {3.0, 4.0}; + BinaryOperator mult = new BinaryOperator(Multiply.getMultiplyFnObject()); + + AColGroup result = cg.binaryRowOpLeft(mult, v, false); + + double[] sums = new double[2]; + result.computeColSums(sums, 6); + + assertEquals(61.5, sums[0], 1e-8); + assertEquals(96.0, sums[1], 1e-8); + } + + @Test + public void testBinaryRowOpRightPlus() { + ColGroupPiecewiseLinearCompressed cg = createTestColGroup(); + double[] v = {1.0, 2.0}; + BinaryOperator plus = new BinaryOperator(Plus.getPlusFnObject()); + + AColGroup result = cg.binaryRowOpRight(plus, v, false); + + double[] sums = new double[2]; + result.computeColSums(sums, 6); + assertEquals(26.5, sums[0], 1e-8); + assertEquals(36.0, sums[1], 1e-8); + } + + @Test + public void testContainsValue() { + ColGroupPiecewiseLinearCompressed cg = createTestColGroup(); + assertTrue(cg.containsValue(1.0)); + assertTrue(cg.containsValue(2.0)); + assertTrue(cg.containsValue(1.5)); + assertFalse(cg.containsValue(999.0)); + assertFalse(cg.containsValue(0.0)); + } + + @Test + public void testEdgeCases() { + ColGroupPiecewiseLinearCompressed cg = createTestColGroup(); + double[] c = new double[2]; + cg.computeSum(c, 6); + assertNotNull(cg.binaryRowOpLeft(new BinaryOperator(Plus.getPlusFnObject()), new double[] {0, 0}, true)); + } + +} + +