diff --git a/.gitignore b/.gitignore
index 9bc8287..e25705b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,7 +17,6 @@
[Dd]ebug/
[Rr]elease/
-x64/
build/
[Bb]in/
[Oo]bj/
diff --git a/.paket/Paket.Restore.targets b/.paket/Paket.Restore.targets
index e7c1bc0..44afec5 100644
--- a/.paket/Paket.Restore.targets
+++ b/.paket/Paket.Restore.targets
@@ -48,16 +48,16 @@
- /usr/bin/shasum $(PaketRestoreCacheFile) | /usr/bin/awk '{ print $1 }'
- /usr/bin/shasum $(PaketLockFilePath) | /usr/bin/awk '{ print $1 }'
+ /usr/bin/shasum $(PaketRestoreCacheFile) | /usr/bin/awk '{ print $1 }'
+ /usr/bin/shasum $(PaketLockFilePath) | /usr/bin/awk '{ print $1 }'
-
+
-
+
@@ -127,6 +127,7 @@
%(PaketReferencesFileLinesInfo.PackageVersion)
All
+ runtime
@@ -183,8 +184,8 @@
-
-
+
+
]
[]
[]
-[]
-[]
+[]
+[]
do ()
module internal AssemblyVersionInformation =
let [] AssemblyTitle = "kafunk"
let [] AssemblyProduct = "kafunk"
let [] AssemblyDescription = "F# client for Kafka"
- let [] AssemblyVersion = "0.1.16"
- let [] AssemblyFileVersion = "0.1.16"
+ let [] AssemblyVersion = "0.1.17"
+ let [] AssemblyFileVersion = "0.1.17"
diff --git a/src/kafunk/AssemblyInfoVisibility.fs b/src/kafunk/AssemblyInfoVisibility.fs
new file mode 100644
index 0000000..99f3278
--- /dev/null
+++ b/src/kafunk/AssemblyInfoVisibility.fs
@@ -0,0 +1,5 @@
+namespace System
+open System.Runtime.CompilerServices
+
+[]
+do ()
\ No newline at end of file
diff --git a/src/kafunk/Compression.fs b/src/kafunk/Compression.fs
index aa2c0fc..daf25a0 100644
--- a/src/kafunk/Compression.fs
+++ b/src/kafunk/Compression.fs
@@ -134,47 +134,15 @@ module Snappy =
#endif
-// TODO: implement lz4
-//[]
-//module LZ4 =
-
-// open LZ4
-
-// //let compress ver ms =
-// // Stream.compress
-// // CompressionCodec.LZ4
-// // (fun memStream -> upcast new LZ4Stream(memStream, LZ4StreamMode.Compress, LZ4StreamFlags.IsolateInnerStream))
-// // ver
-// // ms
-
-// //let decompress ver m =
-// // Stream.decompress
-// // (fun memStream -> upcast new LZ4Stream(memStream, LZ4StreamMode.Decompress, LZ4StreamFlags.IsolateInnerStream))
-// // ver
-// // m
-
-// let compress (value:Binary.Segment) =
-// let maxLen = LZ4Codec.MaximumOutputLength value.Count
-// let outBuf = Binary.zeros maxLen
-// let written = LZ4Codec.Encode(value.Array, value.Offset, value.Count, outBuf.Array, outBuf.Offset, outBuf.Count)
-// if written <= 0 then failwith "compression failed" else
-// ArraySegment(outBuf.Array, outBuf.Offset, written)
-
-// let decompress (value:Binary.Segment) =
-// let guessedOutputLength = value.Count * 10
-// //let buf = Binary.zeros outputLength
-// //let decoded = LZ4Codec.Decode(m.value.Array, m.value.Offset, m.value.Count, buf.Array, buf.Offset, buf.Count, false)
-// //let buf = ArraySegment(buf.Array, buf.Count, decoded)
-// let buf = LZ4Codec.Decode(value.Array, value.Offset, value.Count, guessedOutputLength)
-// Binary.ofArray buf
-
- //let compress (messageVer:ApiVersion) (ms:MessageSet) =
- // let buf = MessageSet.Size (messageVer,ms) |> Binary.zeros
- // MessageSet.Write (messageVer,ms,BinaryZipper(buf))
- // let compressed = LZ4.LZ4Codec.Wrap (buf.Array, buf.Offset, buf.Count)
- // createMessage (Binary.ofArray compressed) CompressionCodec.LZ4
-
- //let decompress (messageVer:ApiVersion) (m:Message) =
- // let decompressed = LZ4.LZ4Codec.Unwrap(m.value.Array, m.value.Offset)
- // let buf = Binary.ofArray decompressed
- // MessageSet.Read (messageVer, 0, 0s, buf.Count, true, BinaryZipper(buf))
\ No newline at end of file
+[]
+module LZ4 =
+ open Kafunk.Native
+
+ let compress (value: ArraySegment) : ArraySegment =
+ // TODO: consider preallocated buffer for compression
+ let compressedBound = Lz4Framing.compressFrameBound value.Count
+ let compressedBuffer = Array.zeroCreate compressedBound
+ Lz4Framing.compressFrame value compressedBuffer
+
+ let decompress (value: ArraySegment) : ArraySegment =
+ Lz4Framing.decompress value |> ArraySegment
\ No newline at end of file
diff --git a/src/kafunk/Native/Loader.fs b/src/kafunk/Native/Loader.fs
new file mode 100644
index 0000000..0d84bce
--- /dev/null
+++ b/src/kafunk/Native/Loader.fs
@@ -0,0 +1,44 @@
+/// Windlows pre-load dll from x86/x64 folder depending on Environment.Is64BitProcess
+module internal Kafunk.Native.Loader
+
+open System
+open System.Runtime.InteropServices
+open System.IO
+
+[]
+extern IntPtr private LoadLibrary(string _path)
+
+//
+// Unix
+//
+let RTLD_NOW = 2
+
+[]
+extern IntPtr private dlopen(string _fileName, int _flags)
+
+/// Load assembly relative to executing assembly's CodeBase.
+/// This function will not work for multi-assembly configuration, but is ok for kafunk for now.
+/// More elaborative loading strategies can be found here:
+/// https://github.com/mellinoe/nativelibraryloader
+let private resolveLibPath name =
+ System.Reflection.Assembly.GetExecutingAssembly().CodeBase
+ |> fun path -> (new Uri(path)).LocalPath
+ |> Path.GetDirectoryName
+ |> fun path -> Path.Combine(path, name)
+
+let private loadWin name =
+ let path = resolveLibPath name
+ let ptr = LoadLibrary path
+
+ if ptr = IntPtr.Zero then
+ failwithf "Failed to load native dll '%s'" name
+
+let load name = lazy(
+ match (Environment.Is64BitProcess, Environment.OSVersion.Platform) with
+ | (true, PlatformID.Win32NT) -> loadWin (sprintf "x64\\%s.dll" name)
+ | (false, PlatformID.Win32NT) -> loadWin (sprintf "x86\\%s.dll" name)
+ | _ -> ()
+)
+
+
+
\ No newline at end of file
diff --git a/src/kafunk/Native/lz4.fs b/src/kafunk/Native/lz4.fs
new file mode 100644
index 0000000..4926422
--- /dev/null
+++ b/src/kafunk/Native/lz4.fs
@@ -0,0 +1,172 @@
+module internal Kafunk.Native.Lz4Framing
+/// For C API details, see:
+/// https://github.com/lz4/lz4/blob/dev/lib/lz4frame.h
+module private native =
+ open System
+ open System.Runtime.InteropServices
+
+ let LZ4F_VERSION = nativeint 100
+
+ type LZ4F_errorCode_t = uint64
+
+ // Shortcut these enums to int because we do not use them at this time
+ type LZ4F_blockSizeID_t = int
+ type LZ4F_blockMode_t = int
+ type LZ4F_contentChecksum_t = int
+ type LZ4F_frameType_t = int
+ type LZ4F_blockChecksum_t = int
+
+ []
+ type LZ4F_frameInfo_t =
+ struct
+ val blockSizeID: LZ4F_blockSizeID_t
+ val blockMode: LZ4F_blockMode_t
+ val contentChecksumFlag: LZ4F_contentChecksum_t
+ val frameType: LZ4F_frameType_t
+ val mutable contentSize: uint64
+ val dictID: uint32
+ val blockChecksumFlag: LZ4F_blockChecksum_t
+ end
+
+ []
+ type LZ4F_preferences_t =
+ struct
+ val mutable frameInfo: LZ4F_frameInfo_t
+ val compressionLevel: int32
+ val autoFlush: uint32
+ val reserved1: uint32
+ val reserved2: uint32
+ val reserved3: uint32
+ val reserved4: uint32
+ end
+
+ []
+ type LZ4F_decompressOptions_t =
+ struct
+ val mutable stableDst: uint32
+ val reserved1: uint32
+ val reserved2: uint32
+ val reserved3: uint32
+ end
+
+
+ []
+ extern nativeint LZ4F_compressFrameBound(nativeint _srcSize, IntPtr _preferencesPtr);
+
+ []
+ extern nativeint LZ4F_compressFrame(nativeint _dstBuffer, nativeint _dstCapacity,
+ nativeint _srcBuffer, nativeint _srcSize,
+ LZ4F_preferences_t& _preferences);
+
+ []
+ extern uint32 LZ4F_isError(nativeint _code);
+
+ []
+ extern nativeint LZ4F_getErrorName(nativeint _code);
+
+ []
+ extern nativeint LZ4F_createDecompressionContext(nativeint& _dctxPtr, nativeint _version);
+
+ []
+ extern nativeint LZ4F_freeDecompressionContext(nativeint _dctx);
+
+ []
+ extern nativeint LZ4F_decompress(nativeint _dctx, nativeint _dstBuffer, nativeint& _dstSize,
+ nativeint _srcBuffer, nativeint& _srcSizePtr, nativeint _optionsPtr);
+
+ []
+ extern nativeint LZ4F_getFrameInfo(nativeint _dctx, LZ4F_frameInfo_t& _frameInfoPtr, nativeint _srcBuffer, nativeint& _srcSizePtr);
+
+open System
+open FSharp.NativeInterop
+open native
+#nowarn "9"
+
+let private ensureNativeIsLoaded = Loader.load "liblz4"
+
+//
+// liblz4 error reporting
+//
+let private isError code =
+ LZ4F_isError(code) <> 0u
+
+let private getErrorName code =
+ let stringAddr = LZ4F_getErrorName(code)
+ System.Runtime.InteropServices.Marshal.PtrToStringAnsi(stringAddr)
+
+
+let failIfError funcName code =
+ if isError code then
+ let error = getErrorName code
+ failwithf "LZ4 native call '%s' failed: '%s'" funcName error
+ else
+ code
+
+//
+// Public API
+//
+
+let compressFrameBound (srcSize: int) : int =
+ ensureNativeIsLoaded.Value
+ LZ4F_compressFrameBound((nativeint srcSize), IntPtr.Zero)
+ |> int
+
+let compressFrame (src: ArraySegment) (dst: byte[]) =
+ ensureNativeIsLoaded.Value
+
+ let mutable compressParams = LZ4F_preferences_t()
+ compressParams.frameInfo.contentSize <- (uint64 src.Count)
+
+ use srcPtr = fixed src.Array
+ use dstPtr = fixed dst
+
+ let res =
+ LZ4F_compressFrame(
+ dstPtr |> NativePtr.toNativeInt,
+ (nativeint dst.Length),
+ NativePtr.add srcPtr src.Offset |> NativePtr.toNativeInt, (nativeint src.Count),
+ &compressParams)
+ |> failIfError "LZ4F_compressFrame"
+ |> int
+
+ new ArraySegment(dst, 0, res)
+
+let decompress (src: ArraySegment): byte[] =
+ ensureNativeIsLoaded.Value
+
+ let mutable ctx = IntPtr.Zero
+ do LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION) |> failIfError "LZ4F_createDecompressionContext" |> ignore
+ try
+ // read frame info to get uncompressed size
+ let mutable frameInfo = LZ4F_frameInfo_t()
+ let mutable srcSize = nativeint src.Count
+ use srcPtr = fixed src.Array
+ let srcAddr = NativePtr.add srcPtr src.Offset |> NativePtr.toNativeInt
+
+ do LZ4F_getFrameInfo(ctx, &frameInfo, srcAddr, &srcSize) |> failIfError "LZ4F_getFrameInfo" |> ignore
+
+ let decompressedSize = frameInfo.contentSize
+ if decompressedSize = 0UL then
+ Array.empty
+ else
+ // LZ4F_getFrameInfo have updated srcSize to consumed bytes
+ let dataAddr = srcAddr + srcSize
+ srcSize <- (nativeint src.Count) - srcSize
+
+ let decompressed = Array.zeroCreate (int decompressedSize)
+ use decompressedPtr = fixed decompressed
+ let decompressedAddr = decompressedPtr |> NativePtr.toNativeInt
+ let mutable dstSize = nativeint decompressed.Length
+
+ let before = sprintf "dstSize: %d; srcSize: %d" dstSize srcSize
+ let res = LZ4F_decompress(ctx, decompressedAddr, &dstSize, dataAddr, &srcSize, IntPtr.Zero) |> failIfError "LZ4F_decompress"
+ let after = sprintf "dstSize: %d; srcSize: %d" dstSize srcSize
+ if res <> nativeint 0 then
+ failwithf "Expected LZ4F_decompress to return 0 but got %d. Buffer too small?\n %s\n %s" res before after
+ else
+ decompressed
+ finally
+ // protect context from leaking
+ do LZ4F_freeDecompressionContext(ctx) |> failIfError "LZ4F_freeDecompressionContext" |> ignore
+
+
\ No newline at end of file
diff --git a/src/kafunk/Protocol.fs b/src/kafunk/Protocol.fs
index beb5990..80c9351 100644
--- a/src/kafunk/Protocol.fs
+++ b/src/kafunk/Protocol.fs
@@ -82,7 +82,7 @@ module Protocol =
match compression with
| None -> value
| GZIP -> Compression.GZip.compress value
- //| LZ4 -> Compression.LZ4.compress value
+ | LZ4 -> Compression.LZ4.compress value
#if !NETSTANDARD2_0
| Snappy -> Compression.Snappy.compress value
#endif
@@ -92,7 +92,7 @@ module Protocol =
match compression with
| None -> value
| GZIP -> Compression.GZip.decompress value
- //| LZ4 -> Compression.LZ4.decompress value
+ | LZ4 -> Compression.LZ4.decompress value
#if !NETSTANDARD2_0
| Snappy -> Compression.Snappy.decompress value
#endif
diff --git a/src/kafunk/kafunk.fsproj b/src/kafunk/kafunk.fsproj
index 43b8fbd..cf789d5 100644
--- a/src/kafunk/kafunk.fsproj
+++ b/src/kafunk/kafunk.fsproj
@@ -25,9 +25,24 @@
+
+
+ PreserveNewest
+
+
+ PreserveNewest
+
+
+ PreserveNewest
+
+
+ PreserveNewest
+
+
+
diff --git a/src/kafunk/liblz4.dylib b/src/kafunk/liblz4.dylib
new file mode 100644
index 0000000..5253ed9
Binary files /dev/null and b/src/kafunk/liblz4.dylib differ
diff --git a/src/kafunk/liblz4.so b/src/kafunk/liblz4.so
new file mode 100644
index 0000000..f101709
Binary files /dev/null and b/src/kafunk/liblz4.so differ
diff --git a/src/kafunk/paket.references b/src/kafunk/paket.references
index 482285a..03ae48e 100644
--- a/src/kafunk/paket.references
+++ b/src/kafunk/paket.references
@@ -2,7 +2,7 @@ FSharp.Core
FSharp.Control.AsyncSeq
SourceLink.Create.CommandLine
group Legacy
- FSharp.Core
- FSharp.Control.AsyncSeq
- Snappy.NET
- SourceLink.Create.CommandLine
\ No newline at end of file
+FSharp.Core
+FSharp.Control.AsyncSeq
+Snappy.NET
+SourceLink.Create.CommandLine
\ No newline at end of file
diff --git a/src/kafunk/x64/liblz4.dll b/src/kafunk/x64/liblz4.dll
new file mode 100644
index 0000000..92e7f06
Binary files /dev/null and b/src/kafunk/x64/liblz4.dll differ
diff --git a/src/kafunk/x86/liblz4.dll b/src/kafunk/x86/liblz4.dll
new file mode 100644
index 0000000..09e88d6
Binary files /dev/null and b/src/kafunk/x86/liblz4.dll differ
diff --git a/tests/kafunk.Tests/LZ4.fs b/tests/kafunk.Tests/LZ4.fs
new file mode 100644
index 0000000..1f34429
--- /dev/null
+++ b/tests/kafunk.Tests/LZ4.fs
@@ -0,0 +1,56 @@
+namespace Kafunk.Tests
+
+open NUnit.Framework
+open Kafunk.Native
+open System
+open System.Text
+
+module LZ4 =
+ open System.Numerics
+
+ []
+ let ``compress produce result smaller than original`` () =
+ let text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit
+ . Sed malesuada consectetur augue, vitae euismod dui imperdiet in. Nunc arcu felis,
+ luctus eleifend ipsum ac, iaculis commodo enim. Proin aliquet odio nisi, et efficitur
+ enim vehicula in. Nam vitae vestibulum risus. Donec egestas dapibus urna. Proin sit amet
+ tincidunt dui, eget condimentum erat. Morbi blandit enim non massa laoreet ultricies.
+ Vivamus consequat pharetra felis, sit amet feugiat metus porta iaculis."
+
+ let data = text |> Encoding.UTF8.GetBytes |> ArraySegment
+
+
+ let compressedBound = Lz4Framing.compressFrameBound data.Count
+ let compressedBuffer = Array.zeroCreate compressedBound
+ let compressed = Lz4Framing.compressFrame data compressedBuffer
+
+ let decompressed =
+ compressed
+ |> Lz4Framing.decompress
+ |> Encoding.UTF8.GetString
+
+ // Compression is correct
+ Assert.AreEqual(text, decompressed)
+
+ // Compression produce result smaller than the original
+ Assert.Less(compressed.Count, data.Count)
+
+ []
+ let ``Array of any size compress correctly`` () =
+ [0; 1; 63; 64; 65; 1000000]
+ |> Seq.iter( fun size ->
+ let data = Array.zeroCreate size
+ (new Random()).NextBytes data
+
+ let compressedBound = Lz4Framing.compressFrameBound data.Length
+ let compressedBuffer = Array.zeroCreate compressedBound
+ let compressed = Lz4Framing.compressFrame (new ArraySegment(data)) compressedBuffer
+
+ let decompressed =
+ compressed
+ |> Lz4Framing.decompress
+
+ Assert.AreEqual(data.Length, decompressed.Length)
+ Assert.AreEqual(data, decompressed)
+ )
+
diff --git a/tests/kafunk.Tests/kafunk.Tests.fsproj b/tests/kafunk.Tests/kafunk.Tests.fsproj
index 74069f2..2c91d54 100644
--- a/tests/kafunk.Tests/kafunk.Tests.fsproj
+++ b/tests/kafunk.Tests/kafunk.Tests.fsproj
@@ -1,10 +1,13 @@
-
+
+
netcoreapp2.0;net45
false
+ true
+