Skip to content

Commit c79e833

Browse files
[#91] Add extractId to Connector and AvroSchemaManager API
1 parent e657553 commit c79e833

File tree

3 files changed

+95
-8
lines changed

3 files changed

+95
-8
lines changed

common/src/main/scala/it/agilelab/darwin/common/Connector.scala

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ trait Connector extends Serializable {
138138
getSchema: Long => Option[Schema]
139139
): (Schema, Array[Byte]) = {
140140
if (AvroSingleObjectEncodingUtils.isAvroSingleObjectEncoded(avroSingleObjectEncoded)) {
141-
val id = AvroSingleObjectEncodingUtils.extractId(avroSingleObjectEncoded, endianness)
141+
val id = extractId(avroSingleObjectEncoded, endianness)
142142
getSchema(id) match {
143143
case Some(schema) =>
144144
schema -> AvroSingleObjectEncodingUtils.dropHeader(avroSingleObjectEncoded)
@@ -163,7 +163,7 @@ trait Connector extends Serializable {
163163
getSchema: Long => Option[Schema]
164164
): Schema = {
165165
if (AvroSingleObjectEncodingUtils.isAvroSingleObjectEncoded(avroSingleObjectEncoded)) {
166-
val id = AvroSingleObjectEncodingUtils.extractId(avroSingleObjectEncoded, endianness)
166+
val id = extractId(avroSingleObjectEncoded, endianness)
167167
getSchema(id) match {
168168
case Some(schema) => schema
169169
case _ => throw new DarwinException(s"No schema found for ID $id")
@@ -187,7 +187,7 @@ trait Connector extends Serializable {
187187
endianness: ByteOrder,
188188
getSchema: Long => Option[Schema]
189189
): Either[Array[Byte], Schema] = {
190-
AvroSingleObjectEncodingUtils.extractId(inputStream, endianness).rightMap { id =>
190+
extractId(inputStream, endianness).rightMap { id =>
191191
getSchema(id).getOrElse(throw new DarwinException(s"No schema found for ID $id"))
192192
}
193193
}
@@ -204,14 +204,54 @@ trait Connector extends Serializable {
204204
getSchema: Long => Option[Schema]
205205
): Either[Exception, Schema] = {
206206
try {
207-
val id = AvroSingleObjectEncodingUtils.extractId(array, endianness)
207+
val id = extractId(array, endianness)
208208
getSchema(id)
209209
.toRight(new RuntimeException(s"Cannot find schema with id $id"))
210210
} catch {
211211
case ie: IllegalArgumentException => Left(ie)
212212
}
213213
}
214214

215+
/**
216+
* Extracts the schema ID from the avro single-object encoded byte array
217+
*
218+
* @param array avro single-object encoded byte array
219+
* @param endianness the endianness that will be used to read fingerprint bytes,
220+
* it won't affect how avro payload is read, that is up to the darwin user
221+
* @return the schema ID extracted from the input data
222+
*/
223+
def extractId(array: Array[Byte], endianness: ByteOrder): Long = {
224+
AvroSingleObjectEncodingUtils.extractId(array, endianness)
225+
}
226+
227+
/**
228+
* Extracts the schema ID from the avro single-object encoded at the head of this input stream.
229+
* The input stream will have 10 bytes consumed if the first two bytes correspond to the single object encoded
230+
* header, or zero bytes consumed if the InputStream supports marking; if it doesn't, the first bytes (up to 2) will
231+
* be consumed and returned in the Left part of the Either.
232+
*
233+
* @param inputStream avro single-object encoded input stream
234+
* @param endianness the endianness that will be used to read fingerprint bytes,
235+
* it won't affect how avro payload is read, that is up to the darwin user
236+
* @return the schema ID extracted from the input data
237+
*/
238+
def extractId(inputStream: InputStream, endianness: ByteOrder): Either[Array[Byte], Long] = {
239+
AvroSingleObjectEncodingUtils.extractId(inputStream, endianness)
240+
}
241+
242+
/**
243+
* Extracts the schema ID from the avro single-object encoded ByteBuffer, the ByteBuffer position will be after the
244+
* header when this method returns
245+
*
246+
* @param avroSingleObjectEncoded avro single-object encoded byte array
247+
* @param endianness the endianness that will be used to read fingerprint bytes,
248+
* it won't affect how avro payload is read, that is up to the darwin user
249+
* @return the schema ID extracted from the input data
250+
*/
251+
def extractId(avroSingleObjectEncoded: ByteBuffer, endianness: ByteOrder): Long = {
252+
AvroSingleObjectEncodingUtils.extractId(avroSingleObjectEncoded, endianness)
253+
}
254+
215255
/**
216256
* Extracts a SchemaPayloadPair that contains the Schema and the Avro-encoded payload
217257
*

confluent/src/main/scala/it/agilelab/darwin/connector/confluent/ConfluentConnector.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ class ConfluentConnector(options: ConfluentConnectorOptions, client: SchemaRegis
164164
getSchema: Long => Option[Schema]
165165
): (Schema, Array[Byte]) = {
166166
if (ConfluentSingleObjectEncoding.isAvroSingleObjectEncoded(avroSingleObjectEncoded)) {
167-
val id = ConfluentSingleObjectEncoding.extractId(avroSingleObjectEncoded, endianness)
167+
val id = extractId(avroSingleObjectEncoded, endianness)
168168
getSchema(id) match {
169169
case Some(schema) =>
170170
schema -> ConfluentSingleObjectEncoding.dropHeader(avroSingleObjectEncoded)
@@ -189,7 +189,7 @@ class ConfluentConnector(options: ConfluentConnectorOptions, client: SchemaRegis
189189
getSchema: Long => Option[Schema]
190190
): Schema = {
191191
if (ConfluentSingleObjectEncoding.isAvroSingleObjectEncoded(avroSingleObjectEncoded)) {
192-
val id = ConfluentSingleObjectEncoding.extractId(avroSingleObjectEncoded, endianness)
192+
val id = extractId(avroSingleObjectEncoded, endianness)
193193
getSchema(id) match {
194194
case Some(schema) => schema
195195
case _ => throw new DarwinException(s"No schema found for ID $id")
@@ -213,7 +213,7 @@ class ConfluentConnector(options: ConfluentConnectorOptions, client: SchemaRegis
213213
endianness: ByteOrder,
214214
getSchema: Long => Option[Schema]
215215
): Either[Array[Byte], Schema] = {
216-
ConfluentSingleObjectEncoding.extractId(inputStream, endianness).rightMap { id =>
216+
extractId(inputStream, endianness).rightMap { id =>
217217
getSchema(id).getOrElse(throw new DarwinException(s"No schema found for ID $id"))
218218
}
219219
}
@@ -230,7 +230,7 @@ class ConfluentConnector(options: ConfluentConnectorOptions, client: SchemaRegis
230230
getSchema: Long => Option[Schema]
231231
): Either[Exception, Schema] = {
232232
try {
233-
val id = ConfluentSingleObjectEncoding.extractId(array, endianness)
233+
val id = extractId(array, endianness)
234234
getSchema(id)
235235
.toRight(new RuntimeException(s"Cannot find schema with id $id"))
236236
} catch {
@@ -252,4 +252,16 @@ class ConfluentConnector(options: ConfluentConnectorOptions, client: SchemaRegis
252252
val (schema, payload) = retrieveSchemaAndAvroPayload(avroSingleObjectEncoded, endianness, getSchema)
253253
SchemaPayloadPair.create(schema, payload)
254254
}
255+
256+
override def extractId(array: Array[Byte], endianness: ByteOrder): Long = {
257+
ConfluentSingleObjectEncoding.extractId(array, endianness)
258+
}
259+
260+
override def extractId(inputStream: InputStream, endianness: ByteOrder): Either[Array[Byte], Long] = {
261+
ConfluentSingleObjectEncoding.extractId(inputStream, endianness)
262+
}
263+
264+
override def extractId(avroSingleObjectEncoded: ByteBuffer, endianness: ByteOrder): Long = {
265+
ConfluentSingleObjectEncoding.extractId(avroSingleObjectEncoded, endianness)
266+
}
255267
}

core/src/main/scala/it/agilelab/darwin/manager/AvroSchemaManager.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,39 @@ abstract class AvroSchemaManager(connector: Connector, endianness: ByteOrder) ex
178178
* Reloads all the schemas from the previously configured storage.
179179
*/
180180
def reload(): AvroSchemaManager
181+
182+
/**
183+
* Extracts the schema ID from the avro single-object encoded byte array
184+
*
185+
* @param array avro single-object encoded byte array
186+
* @return the schema ID extracted from the input data
187+
*/
188+
def extractId(array: Array[Byte]): Long = {
189+
connector.extractId(array, endianness)
190+
}
191+
192+
/**
193+
* Extracts the schema ID from the avro single-object encoded at the head of this input stream.
194+
* The input stream will have 10 bytes consumed if the first two bytes correspond to the single object encoded
195+
* header, or zero bytes consumed if the InputStream supports marking; if it doesn't, the first bytes (up to 2) will
196+
* be consumed and returned in the Left part of the Either.
197+
*
198+
* @param inputStream avro single-object encoded input stream
199+
* @return the schema ID extracted from the input data
200+
*/
201+
def extractId(inputStream: InputStream): Either[Array[Byte], Long] = {
202+
connector.extractId(inputStream, endianness)
203+
}
204+
205+
/**
206+
* Extracts the schema ID from the avro single-object encoded ByteBuffer, the ByteBuffer position will be after the
207+
* header when this method returns
208+
*
209+
* @param avroSingleObjectEncoded avro single-object encoded byte array
210+
* @return the schema ID extracted from the input data
211+
*/
212+
def extractId(avroSingleObjectEncoded: ByteBuffer): Long = {
213+
connector.extractId(avroSingleObjectEncoded, endianness)
214+
}
215+
181216
}

0 commit comments

Comments
 (0)