Skip to content

Commit 96ea262

Browse files
author
Pirazzini Lorenzo
committed
[#19] WIP Different loading strategies first review
1 parent 800e578 commit 96ea262

File tree

18 files changed

+207
-58
lines changed

18 files changed

+207
-58
lines changed

common/src/main/scala/it/agilelab/darwin/common/Connector.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ abstract class Connector(config: Config) extends Serializable {
2626
* @param schemas a sequence of pairs (ID, schema) Schema entities to insert in the storage.
2727
*/
2828
def insert(schemas: Seq[(Long, Schema)]): Unit
29+
30+
def findSchema(id: Long): Option[Schema]
2931
}

core/src/main/scala/it/agilelab/darwin/manager/AvroSchemaCache.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,13 @@ import org.apache.avro.Schema
88
*/
99
abstract class AvroSchemaCache(schemas: Seq[(Long, Schema)]) {
1010

11-
/**
12-
* Retrieves the ID of a registered schema.
13-
*
14-
* @param schema the Schema for which an ID is required
15-
* @return the ID associated to the input schema
16-
*/
17-
def getId(schema: Schema): Long
18-
1911
/**
2012
* Retrieves a registered schema for the input ID.
2113
*
2214
* @param id the Long ID of the schema
2315
* @return the Schema associated to the input ID
2416
*/
25-
def getSchema(id: Long): Schema
17+
def getSchema(id: Long): Option[Schema]
2618

2719
/**
2820
* Tests if the input schema is contained inside the cache.

core/src/main/scala/it/agilelab/darwin/manager/AvroSchemaCacheFingerprint.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package it.agilelab.darwin.manager
22

3-
import java.security.InvalidKeyException
4-
53
import it.agilelab.darwin.common.Logging
64
import org.apache.avro.{Schema, SchemaNormalization}
75

@@ -15,10 +13,7 @@ case class AvroSchemaCacheFingerprint(schemas: Seq[(Long, Schema)]) extends Avro
1513
private val _table: Map[Long, Schema] = schemas.toMap
1614
log.debug("cache initialized")
1715

18-
override def getId(schema: Schema): Long = SchemaNormalization.parsingFingerprint64(schema)
19-
20-
override def getSchema(id: Long): Schema = _table.getOrElse(id,
21-
throw new InvalidKeyException(s"No schema registered in cache for id $id"))
16+
override def getSchema(id: Long): Option[Schema] = _table.get(id)
2217

2318
override def contains(schema: Schema): (Boolean, Long) = {
2419
val id = SchemaNormalization.parsingFingerprint64(schema)

core/src/main/scala/it/agilelab/darwin/manager/AvroSchemaManager.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import com.typesafe.config.Config
44
import it.agilelab.darwin.common.{Connector, ConnectorFactory, Logging}
55
import it.agilelab.darwin.manager.exception.ConnectorNotFoundException
66
import jdk.nashorn.internal.runtime.ParserException
7-
import org.apache.avro.Schema
7+
import org.apache.avro.{Schema, SchemaNormalization}
88
import it.agilelab.darwin.manager.util.ByteArrayUtils._
9+
import scala.collection.JavaConverters._
910

1011
trait AvroSchemaManager extends Logging {
1112
private val V1_HEADER = Array[Byte](0xC3.toByte, 0x01.toByte)
@@ -23,15 +24,15 @@ trait AvroSchemaManager extends Logging {
2324
* @param schema a Schema with unknown ID
2425
* @return the ID associated with the input schema
2526
*/
26-
def getId(schema: Schema): Long
27+
def getId(schema: Schema): Long = SchemaNormalization.parsingFingerprint64(schema)
2728

2829
/**
2930
* Extracts the Schema from its ID.
3031
*
3132
* @param id a Long representing an ID
3233
* @return the Schema associated to the input ID
3334
*/
34-
def getSchema(id: Long): Schema
35+
def getSchema(id: Long): Option[Schema]
3536

3637
/**
3738
* Checks if all the input Schema elements are already in the cache. Then, it performs an insert on the
@@ -49,7 +50,9 @@ trait AvroSchemaManager extends Logging {
4950
* @param schemas all the Schema that should be registered
5051
* @return a sequence of pairs of the input schemas associated with their IDs
5152
*/
52-
def registerAll(schemas: java.lang.Iterable[Schema]): java.lang.Iterable[IdSchemaPair]
53+
def registerAll(schemas: java.lang.Iterable[Schema]): java.lang.Iterable[IdSchemaPair] = {
54+
registerAll(schemas.asScala.toSeq).map { case (id, schema) => IdSchemaPair.create(id, schema) }.asJava
55+
}
5356

5457
/** Create an array that creates a Single-Object encoded byte array.
5558
* By specifications the encoded array is obtained concatenating the V1_HEADER, the schema id and the avro-encoded
@@ -83,7 +86,7 @@ trait AvroSchemaManager extends Logging {
8386
*/
8487
def retrieveSchemaAndAvroPayload(avroSingleObjectEncoded: Array[Byte]): (Schema, Array[Byte]) = {
8588
if (isAvroSingleObjectEncoded(avroSingleObjectEncoded)) {
86-
getSchema(avroSingleObjectEncoded.slice(V1_HEADER.length, HEADER_LENGTH).byteArrayToLong) ->
89+
getSchema(avroSingleObjectEncoded.slice(V1_HEADER.length, HEADER_LENGTH).byteArrayToLong).get ->
8790
avroSingleObjectEncoded.drop(HEADER_LENGTH)
8891
}
8992
else {

core/src/main/scala/it/agilelab/darwin/manager/CachedAvroSchemaManager.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package it.agilelab.darwin.manager
22

33
import java.util.concurrent.atomic.AtomicReference
4-
import scala.collection.JavaConverters._
54
import org.apache.avro.Schema
65

76
trait CachedAvroSchemaManager extends AvroSchemaManager {
8-
private val _cache: AtomicReference[Option[AvroSchemaCache]] = new AtomicReference[Option[AvroSchemaCache]](None)
7+
protected val _cache: AtomicReference[Option[AvroSchemaCache]] = new AtomicReference[Option[AvroSchemaCache]](None)
98

109
def cache: AvroSchemaCache = _cache.get
1110
.getOrElse(throw new IllegalAccessException("Cache not loaded: accesses are allowed only if the cache has been " +
@@ -41,8 +40,4 @@ trait CachedAvroSchemaManager extends AvroSchemaManager {
4140
log.debug(s"${allSchemas.size} schemas registered")
4241
allSchemas
4342
}
44-
45-
override def registerAll(schemas: java.lang.Iterable[Schema]): java.lang.Iterable[IdSchemaPair] = {
46-
registerAll(schemas.asScala.toSeq).map { case (id, schema) => IdSchemaPair.create(id, schema) }.asJava
47-
}
4843
}

core/src/main/scala/it/agilelab/darwin/manager/CachedEagerAvroSchemaManager.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,5 @@ import org.apache.avro.Schema
1010
*/
1111
case class CachedEagerAvroSchemaManager(override val config: Config) extends CachedAvroSchemaManager {
1212

13-
override def getId(schema: Schema): Long = cache.getId(schema)
14-
15-
override def getSchema(id: Long): Schema = cache.getSchema(id)
13+
override def getSchema(id: Long): Option[Schema] = cache.getSchema(id)
1614
}

core/src/main/scala/it/agilelab/darwin/manager/CachedLazyAvroSchemaManager.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@ import com.typesafe.config.Config
33
import org.apache.avro.Schema
44

55
case class CachedLazyAvroSchemaManager(override val config: Config) extends CachedAvroSchemaManager {
6-
//TODO
7-
override def getId(schema: Schema): Long = ???
86

9-
override def getSchema(id: Long): Schema = ???
7+
override def getSchema(id: Long): Option[Schema] = {
8+
cache.getSchema(id).orElse{
9+
val schema: Option[Schema] = connector.findSchema(id)
10+
schema.foreach(s => _cache.set(Some(cache.insert(Seq(getId(s) -> s)))))
11+
schema
12+
}
13+
}
1014
}
Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
package it.agilelab.darwin.manager
22

3-
import java.lang
4-
53
import com.typesafe.config.Config
64
import org.apache.avro.Schema
75

86
case class LazyAvroSchemaManager(override val config: Config) extends AvroSchemaManager {
9-
//TODO
10-
11-
override def getId(schema: Schema): Long = ???
12-
13-
override def getSchema(id: Long): Schema = ???
147

15-
override def registerAll(schemas: Seq[Schema]): Seq[(Long, Schema)] = ???
8+
override def getSchema(id: Long): Option[Schema] = connector.findSchema(id)
169

17-
override def registerAll(schemas: lang.Iterable[Schema]): lang.Iterable[IdSchemaPair] = ???
10+
override def registerAll(schemas: Seq[Schema]): Seq[(Long, Schema)] = {
11+
val schemasWithIds = schemas.map(s => getId(s) -> s)
12+
connector.insert(schemasWithIds)
13+
schemasWithIds
14+
}
1815

19-
override def reload(): AvroSchemaManager = ???
16+
override def reload(): AvroSchemaManager = this
2017
}

hbase/src/main/scala/it/agilelab/darwin/connector/hbase/HBaseConnector.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ case class HBaseConnector(config: Config) extends Connector(config) with Logging
125125
mutator.flush()
126126
log.debug(s"insertion of schemas into $NAMESPACE_STRING:$TABLE_NAME_STRING successful")
127127
}
128+
129+
override def findSchema(id: Long): Option[Schema] = ???
128130
}
129131

130132

ignite/src/main/scala/it/agilelab/darwin/connector/ignite/IgniteConnector.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,6 @@ class IgniteConnector(config: Config) extends Connector(config) {
88
override def fullLoad(): Seq[(Long, Schema)] = ???
99

1010
override def insert(schemas: Seq[(Long, Schema)]): Unit = ???
11+
12+
override def findSchema(id: Long): Option[Schema] = ???
1113
}

0 commit comments

Comments
 (0)