Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions lui-core/src/main/scala/com/twitter/lui/ColumnIO.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.twitter.lui

import java.util.Arrays

import org.apache.parquet.Log
import org.apache.parquet.schema.Type
import org.apache.parquet.schema.Type.Repetition

abstract class ColumnIO {
def columnType: Type
def parent: GroupColumnIO
def index: Int

def fieldPath: Array[String]

def getFieldPath(level: Int): String = fieldPath(level)

def indexFieldPath: Array[Int]

def getIndexFieldPath(level: Int): Int = indexFieldPath(level)

def name: String = columnType.getName

def repetitionLevel: Int

def definitionLevel: Int

// def setLevels(r: Int, d: Int, fieldPath: Array[String], indexFieldPath: Array[Int], repetition: List[ColumnIO], path: List[ColumnIO]) {
// setRepetitionLevel(r)
// setDefinitionLevel(d)
// this.fieldPath = fieldPath
// this.indexFieldPath = indexFieldPath
// }

def getColumnNames: Seq[Seq[String]]

def last: PrimitiveColumnIO
def first: PrimitiveColumnIO

def leaves: Iterator[PrimitiveColumnIO]

def getParent(r: Int): ColumnIO =
if (repetitionLevel == r && columnType.isRepetition(Repetition.REPEATED)) {
this
} else if (parent != null && parent.definitionLevel >= r) {
parent.getParent(r)
} else {
throw new org.apache.parquet.io.InvalidRecordException(s"no parent($r) for $fieldPath")
}

override def toString: String =
s"${getClass.getSimpleName} $name r: $repetitionLevel d: $definitionLevel $fieldPath"

}
39 changes: 39 additions & 0 deletions lui-core/src/main/scala/com/twitter/lui/ColumnIOBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.twitter.lui

import java.util.Arrays

import org.apache.parquet.Log
import org.apache.parquet.schema.Type
import org.apache.parquet.schema.Type.Repetition

abstract class ColumnIOBuilder {

def columnType: Type
def index: Int

def setLevels(parentBox: LazyBox[GroupColumnIO],
r: Int,
d: Int,
fieldPath: Array[String],
indexFieldPath: Array[Int],
repetition: List[ColumnIOBuilder]): ColumnIO

}
67 changes: 67 additions & 0 deletions lui-core/src/main/scala/com/twitter/lui/GroupColumnIO.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.twitter.lui

import org.apache.parquet.Log
import org.apache.parquet.schema.GroupType

/**
* Group level of the IO structure
*
*
*/

object GroupColumnIO {
private val logger: Log = Log.getLog(getClass)
private val DEBUG: Boolean = Log.DEBUG
}

case class GroupColumnIO(columnType: GroupType,
parentBox: LazyBox[GroupColumnIO],
index: Int,
repetitionLevel: Int,
definitionLevel: Int,
fieldPath: Array[String],
indexFieldPath: Array[Int],
childrenByName: Map[String, ColumnIO],
children: Vector[ColumnIO]) extends ColumnIO {

lazy val parent = parentBox.get

import GroupColumnIO._

override def getColumnNames: Seq[Seq[String]] = children.flatMap(_.getColumnNames)

override def last: PrimitiveColumnIO = children.last.last

override def first: PrimitiveColumnIO = children.head.first

def getChild(name: String): Option[ColumnIO] = childrenByName.get(name)

def getChild(fieldIndex: Int): ColumnIO =
try {
children(fieldIndex)
} catch {
case e: IndexOutOfBoundsException =>
throw new org.apache.parquet.io.InvalidRecordException(s"could not get child $fieldIndex from $children", e)
}

override def leaves: Iterator[PrimitiveColumnIO] = children.toIterator.flatMap(_.leaves)

}
103 changes: 103 additions & 0 deletions lui-core/src/main/scala/com/twitter/lui/GroupColumnIOBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.twitter.lui

import org.apache.parquet.schema.Type.Repetition.REPEATED
import org.apache.parquet.schema.Type.Repetition.REQUIRED

import java.util.Arrays
import scala.collection.mutable.{ Map => MMap }

import org.apache.parquet.Log
import org.apache.parquet.schema.GroupType

object LazyBox {
def apply[T] = new LazyBox[T]
}
class LazyBox[T]() {
private[this] var contents: Option[T] = None
def get = this.synchronized {
if (contents.isEmpty) sys.error("Accessed Box contents before populated")
contents.get
}
def setValue(v: T): Unit = this.synchronized {
if (contents.isDefined) sys.error("Attempted to set box contents twice!")
contents = Some(v)
}
}

case class GroupColumnIOBuilder(
columnType: GroupType, index: Int) extends ColumnIOBuilder {
private[this] var children = Vector[ColumnIOBuilder]()

def add(child: ColumnIOBuilder) {
children = children :+ child
}

override def setLevels(parentBox: LazyBox[GroupColumnIO],
repetitionLevel: Int,
definitionLevel: Int,
fieldPath: Array[String],
indexFieldPath: Array[Int],
repetition: List[ColumnIOBuilder]): GroupColumnIO = {

val ourBox = LazyBox[GroupColumnIO]

val newChildren: Vector[ColumnIO] = children.map { child =>
val newFieldPath: Array[String] = Arrays.copyOf(fieldPath, fieldPath.length + 1)
val newIndexFieldPath: Array[Int] = Arrays.copyOf(indexFieldPath, indexFieldPath.length + 1)
newFieldPath(fieldPath.length) = child.columnType.getName
newIndexFieldPath(indexFieldPath.length) = child.index

val newRepetition = if (child.columnType.isRepetition(REPEATED)) {
repetition :+ child
} else {
repetition
}

child.setLevels(
ourBox,
// the type repetition level increases whenever there's a possible repetition
if (child.columnType.isRepetition(REPEATED)) repetitionLevel + 1 else repetitionLevel,
// the type definition level increases whenever a field can be missing (not required)
if (!child.columnType.isRepetition(REQUIRED)) definitionLevel + 1 else definitionLevel,
newFieldPath,
newIndexFieldPath,
newRepetition)
}

val childrenByName: Map[String, ColumnIO] = newChildren.map { child =>
child.columnType.getName -> child
}.toMap

val gcIO = GroupColumnIO(
columnType,
parentBox,
index,
repetitionLevel,
definitionLevel,
fieldPath,
indexFieldPath,
childrenByName,
newChildren)

ourBox.setValue(gcIO)
gcIO
}
}
45 changes: 45 additions & 0 deletions lui-core/src/main/scala/com/twitter/lui/MessageColumnIO.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.twitter.lui

import java.util.Arrays

import org.apache.parquet.Log
import org.apache.parquet.column.ColumnWriteStore
import org.apache.parquet.column.ColumnWriter
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.column.values.dictionary.IntList
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.MessageType
import scala.collection.JavaConverters._
import com.twitter.lui.hadoop.{ ReadSupport, ReadContext }
import org.apache.hadoop.conf.Configuration

/**
* Message level of the IO structure
*/
object MessageColumnIO {
private val logger: Log = Log.getLog(getClass)
private val DEBUG: Boolean = Log.DEBUG
}

case class MessageColumnIO(messageType: MessageType,
validating: Boolean,
createdBy: String,
root: GroupColumnIO)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.twitter.lui

import java.util.Arrays

import org.apache.parquet.Log
import org.apache.parquet.schema.MessageType
import scala.collection.JavaConverters._

case class MessageColumnIOBuilder(columnType: MessageType, validating: Boolean, createdBy: String) {
val rootColumnIOBuilder = GroupColumnIOBuilder(columnType, 0)

def setLevels: MessageColumnIO = {
val ourBox = LazyBox[GroupColumnIO]
val definitionLevel = 0
val repetitionLevel = 0
val fieldPath = Array[String]()
val indexFieldPath = Array[Int]()
val repetition = List(rootColumnIOBuilder)
val r = rootColumnIOBuilder.setLevels(ourBox, definitionLevel, repetitionLevel, fieldPath, indexFieldPath, repetition)
ourBox.setValue(null)
MessageColumnIO(columnType, validating, createdBy, r)
}
}
Loading