diff --git a/.gitignore b/.gitignore index 9bc8287..e25705b 100644 --- a/.gitignore +++ b/.gitignore @@ -17,7 +17,6 @@ [Dd]ebug/ [Rr]elease/ -x64/ build/ [Bb]in/ [Oo]bj/ diff --git a/.paket/Paket.Restore.targets b/.paket/Paket.Restore.targets index e7c1bc0..44afec5 100644 --- a/.paket/Paket.Restore.targets +++ b/.paket/Paket.Restore.targets @@ -48,16 +48,16 @@ - /usr/bin/shasum $(PaketRestoreCacheFile) | /usr/bin/awk '{ print $1 }' - /usr/bin/shasum $(PaketLockFilePath) | /usr/bin/awk '{ print $1 }' + /usr/bin/shasum $(PaketRestoreCacheFile) | /usr/bin/awk '{ print $1 }' + /usr/bin/shasum $(PaketLockFilePath) | /usr/bin/awk '{ print $1 }' - + - + @@ -127,6 +127,7 @@ %(PaketReferencesFileLinesInfo.PackageVersion) All + runtime @@ -183,8 +184,8 @@ - - + + ] [] [] -[] -[] +[] +[] do () module internal AssemblyVersionInformation = let [] AssemblyTitle = "kafunk" let [] AssemblyProduct = "kafunk" let [] AssemblyDescription = "F# client for Kafka" - let [] AssemblyVersion = "0.1.16" - let [] AssemblyFileVersion = "0.1.16" + let [] AssemblyVersion = "0.1.17" + let [] AssemblyFileVersion = "0.1.17" diff --git a/src/kafunk/AssemblyInfoVisibility.fs b/src/kafunk/AssemblyInfoVisibility.fs new file mode 100644 index 0000000..99f3278 --- /dev/null +++ b/src/kafunk/AssemblyInfoVisibility.fs @@ -0,0 +1,5 @@ +namespace System +open System.Runtime.CompilerServices + +[] +do () \ No newline at end of file diff --git a/src/kafunk/Compression.fs b/src/kafunk/Compression.fs index aa2c0fc..daf25a0 100644 --- a/src/kafunk/Compression.fs +++ b/src/kafunk/Compression.fs @@ -134,47 +134,15 @@ module Snappy = #endif -// TODO: implement lz4 -//[] -//module LZ4 = - -// open LZ4 - -// //let compress ver ms = -// // Stream.compress -// // CompressionCodec.LZ4 -// // (fun memStream -> upcast new LZ4Stream(memStream, LZ4StreamMode.Compress, LZ4StreamFlags.IsolateInnerStream)) -// // ver -// // ms - -// //let decompress ver m = -// // Stream.decompress -// // (fun memStream -> upcast new LZ4Stream(memStream, LZ4StreamMode.Decompress, LZ4StreamFlags.IsolateInnerStream)) -// // ver -// // m - -// let compress (value:Binary.Segment) = -// let maxLen = LZ4Codec.MaximumOutputLength value.Count -// let outBuf = Binary.zeros maxLen -// let written = LZ4Codec.Encode(value.Array, value.Offset, value.Count, outBuf.Array, outBuf.Offset, outBuf.Count) -// if written <= 0 then failwith "compression failed" else -// ArraySegment(outBuf.Array, outBuf.Offset, written) - -// let decompress (value:Binary.Segment) = -// let guessedOutputLength = value.Count * 10 -// //let buf = Binary.zeros outputLength -// //let decoded = LZ4Codec.Decode(m.value.Array, m.value.Offset, m.value.Count, buf.Array, buf.Offset, buf.Count, false) -// //let buf = ArraySegment(buf.Array, buf.Count, decoded) -// let buf = LZ4Codec.Decode(value.Array, value.Offset, value.Count, guessedOutputLength) -// Binary.ofArray buf - - //let compress (messageVer:ApiVersion) (ms:MessageSet) = - // let buf = MessageSet.Size (messageVer,ms) |> Binary.zeros - // MessageSet.Write (messageVer,ms,BinaryZipper(buf)) - // let compressed = LZ4.LZ4Codec.Wrap (buf.Array, buf.Offset, buf.Count) - // createMessage (Binary.ofArray compressed) CompressionCodec.LZ4 - - //let decompress (messageVer:ApiVersion) (m:Message) = - // let decompressed = LZ4.LZ4Codec.Unwrap(m.value.Array, m.value.Offset) - // let buf = Binary.ofArray decompressed - // MessageSet.Read (messageVer, 0, 0s, buf.Count, true, BinaryZipper(buf)) \ No newline at end of file +[] +module LZ4 = + open Kafunk.Native + + let compress (value: ArraySegment) : ArraySegment = + // TODO: consider preallocated buffer for compression + let compressedBound = Lz4Framing.compressFrameBound value.Count + let compressedBuffer = Array.zeroCreate compressedBound + Lz4Framing.compressFrame value compressedBuffer + + let decompress (value: ArraySegment) : ArraySegment = + Lz4Framing.decompress value |> ArraySegment \ No newline at end of file diff --git a/src/kafunk/Native/Loader.fs b/src/kafunk/Native/Loader.fs new file mode 100644 index 0000000..0d84bce --- /dev/null +++ b/src/kafunk/Native/Loader.fs @@ -0,0 +1,44 @@ +/// Windlows pre-load dll from x86/x64 folder depending on Environment.Is64BitProcess +module internal Kafunk.Native.Loader + +open System +open System.Runtime.InteropServices +open System.IO + +[] +extern IntPtr private LoadLibrary(string _path) + +// +// Unix +// +let RTLD_NOW = 2 + +[] +extern IntPtr private dlopen(string _fileName, int _flags) + +/// Load assembly relative to executing assembly's CodeBase. +/// This function will not work for multi-assembly configuration, but is ok for kafunk for now. +/// More elaborative loading strategies can be found here: +/// https://github.com/mellinoe/nativelibraryloader +let private resolveLibPath name = + System.Reflection.Assembly.GetExecutingAssembly().CodeBase + |> fun path -> (new Uri(path)).LocalPath + |> Path.GetDirectoryName + |> fun path -> Path.Combine(path, name) + +let private loadWin name = + let path = resolveLibPath name + let ptr = LoadLibrary path + + if ptr = IntPtr.Zero then + failwithf "Failed to load native dll '%s'" name + +let load name = lazy( + match (Environment.Is64BitProcess, Environment.OSVersion.Platform) with + | (true, PlatformID.Win32NT) -> loadWin (sprintf "x64\\%s.dll" name) + | (false, PlatformID.Win32NT) -> loadWin (sprintf "x86\\%s.dll" name) + | _ -> () +) + + + \ No newline at end of file diff --git a/src/kafunk/Native/lz4.fs b/src/kafunk/Native/lz4.fs new file mode 100644 index 0000000..4926422 --- /dev/null +++ b/src/kafunk/Native/lz4.fs @@ -0,0 +1,172 @@ +module internal Kafunk.Native.Lz4Framing +/// For C API details, see: +/// https://github.com/lz4/lz4/blob/dev/lib/lz4frame.h +module private native = + open System + open System.Runtime.InteropServices + + let LZ4F_VERSION = nativeint 100 + + type LZ4F_errorCode_t = uint64 + + // Shortcut these enums to int because we do not use them at this time + type LZ4F_blockSizeID_t = int + type LZ4F_blockMode_t = int + type LZ4F_contentChecksum_t = int + type LZ4F_frameType_t = int + type LZ4F_blockChecksum_t = int + + [] + type LZ4F_frameInfo_t = + struct + val blockSizeID: LZ4F_blockSizeID_t + val blockMode: LZ4F_blockMode_t + val contentChecksumFlag: LZ4F_contentChecksum_t + val frameType: LZ4F_frameType_t + val mutable contentSize: uint64 + val dictID: uint32 + val blockChecksumFlag: LZ4F_blockChecksum_t + end + + [] + type LZ4F_preferences_t = + struct + val mutable frameInfo: LZ4F_frameInfo_t + val compressionLevel: int32 + val autoFlush: uint32 + val reserved1: uint32 + val reserved2: uint32 + val reserved3: uint32 + val reserved4: uint32 + end + + [] + type LZ4F_decompressOptions_t = + struct + val mutable stableDst: uint32 + val reserved1: uint32 + val reserved2: uint32 + val reserved3: uint32 + end + + + [] + extern nativeint LZ4F_compressFrameBound(nativeint _srcSize, IntPtr _preferencesPtr); + + [] + extern nativeint LZ4F_compressFrame(nativeint _dstBuffer, nativeint _dstCapacity, + nativeint _srcBuffer, nativeint _srcSize, + LZ4F_preferences_t& _preferences); + + [] + extern uint32 LZ4F_isError(nativeint _code); + + [] + extern nativeint LZ4F_getErrorName(nativeint _code); + + [] + extern nativeint LZ4F_createDecompressionContext(nativeint& _dctxPtr, nativeint _version); + + [] + extern nativeint LZ4F_freeDecompressionContext(nativeint _dctx); + + [] + extern nativeint LZ4F_decompress(nativeint _dctx, nativeint _dstBuffer, nativeint& _dstSize, + nativeint _srcBuffer, nativeint& _srcSizePtr, nativeint _optionsPtr); + + [] + extern nativeint LZ4F_getFrameInfo(nativeint _dctx, LZ4F_frameInfo_t& _frameInfoPtr, nativeint _srcBuffer, nativeint& _srcSizePtr); + +open System +open FSharp.NativeInterop +open native +#nowarn "9" + +let private ensureNativeIsLoaded = Loader.load "liblz4" + +// +// liblz4 error reporting +// +let private isError code = + LZ4F_isError(code) <> 0u + +let private getErrorName code = + let stringAddr = LZ4F_getErrorName(code) + System.Runtime.InteropServices.Marshal.PtrToStringAnsi(stringAddr) + + +let failIfError funcName code = + if isError code then + let error = getErrorName code + failwithf "LZ4 native call '%s' failed: '%s'" funcName error + else + code + +// +// Public API +// + +let compressFrameBound (srcSize: int) : int = + ensureNativeIsLoaded.Value + LZ4F_compressFrameBound((nativeint srcSize), IntPtr.Zero) + |> int + +let compressFrame (src: ArraySegment) (dst: byte[]) = + ensureNativeIsLoaded.Value + + let mutable compressParams = LZ4F_preferences_t() + compressParams.frameInfo.contentSize <- (uint64 src.Count) + + use srcPtr = fixed src.Array + use dstPtr = fixed dst + + let res = + LZ4F_compressFrame( + dstPtr |> NativePtr.toNativeInt, + (nativeint dst.Length), + NativePtr.add srcPtr src.Offset |> NativePtr.toNativeInt, (nativeint src.Count), + &compressParams) + |> failIfError "LZ4F_compressFrame" + |> int + + new ArraySegment(dst, 0, res) + +let decompress (src: ArraySegment): byte[] = + ensureNativeIsLoaded.Value + + let mutable ctx = IntPtr.Zero + do LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION) |> failIfError "LZ4F_createDecompressionContext" |> ignore + try + // read frame info to get uncompressed size + let mutable frameInfo = LZ4F_frameInfo_t() + let mutable srcSize = nativeint src.Count + use srcPtr = fixed src.Array + let srcAddr = NativePtr.add srcPtr src.Offset |> NativePtr.toNativeInt + + do LZ4F_getFrameInfo(ctx, &frameInfo, srcAddr, &srcSize) |> failIfError "LZ4F_getFrameInfo" |> ignore + + let decompressedSize = frameInfo.contentSize + if decompressedSize = 0UL then + Array.empty + else + // LZ4F_getFrameInfo have updated srcSize to consumed bytes + let dataAddr = srcAddr + srcSize + srcSize <- (nativeint src.Count) - srcSize + + let decompressed = Array.zeroCreate (int decompressedSize) + use decompressedPtr = fixed decompressed + let decompressedAddr = decompressedPtr |> NativePtr.toNativeInt + let mutable dstSize = nativeint decompressed.Length + + let before = sprintf "dstSize: %d; srcSize: %d" dstSize srcSize + let res = LZ4F_decompress(ctx, decompressedAddr, &dstSize, dataAddr, &srcSize, IntPtr.Zero) |> failIfError "LZ4F_decompress" + let after = sprintf "dstSize: %d; srcSize: %d" dstSize srcSize + if res <> nativeint 0 then + failwithf "Expected LZ4F_decompress to return 0 but got %d. Buffer too small?\n %s\n %s" res before after + else + decompressed + finally + // protect context from leaking + do LZ4F_freeDecompressionContext(ctx) |> failIfError "LZ4F_freeDecompressionContext" |> ignore + + \ No newline at end of file diff --git a/src/kafunk/Protocol.fs b/src/kafunk/Protocol.fs index beb5990..80c9351 100644 --- a/src/kafunk/Protocol.fs +++ b/src/kafunk/Protocol.fs @@ -82,7 +82,7 @@ module Protocol = match compression with | None -> value | GZIP -> Compression.GZip.compress value - //| LZ4 -> Compression.LZ4.compress value + | LZ4 -> Compression.LZ4.compress value #if !NETSTANDARD2_0 | Snappy -> Compression.Snappy.compress value #endif @@ -92,7 +92,7 @@ module Protocol = match compression with | None -> value | GZIP -> Compression.GZip.decompress value - //| LZ4 -> Compression.LZ4.decompress value + | LZ4 -> Compression.LZ4.decompress value #if !NETSTANDARD2_0 | Snappy -> Compression.Snappy.decompress value #endif diff --git a/src/kafunk/kafunk.fsproj b/src/kafunk/kafunk.fsproj index 43b8fbd..cf789d5 100644 --- a/src/kafunk/kafunk.fsproj +++ b/src/kafunk/kafunk.fsproj @@ -25,9 +25,24 @@ + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + + + diff --git a/src/kafunk/liblz4.dylib b/src/kafunk/liblz4.dylib new file mode 100644 index 0000000..5253ed9 Binary files /dev/null and b/src/kafunk/liblz4.dylib differ diff --git a/src/kafunk/liblz4.so b/src/kafunk/liblz4.so new file mode 100644 index 0000000..f101709 Binary files /dev/null and b/src/kafunk/liblz4.so differ diff --git a/src/kafunk/paket.references b/src/kafunk/paket.references index 482285a..03ae48e 100644 --- a/src/kafunk/paket.references +++ b/src/kafunk/paket.references @@ -2,7 +2,7 @@ FSharp.Core FSharp.Control.AsyncSeq SourceLink.Create.CommandLine group Legacy - FSharp.Core - FSharp.Control.AsyncSeq - Snappy.NET - SourceLink.Create.CommandLine \ No newline at end of file +FSharp.Core +FSharp.Control.AsyncSeq +Snappy.NET +SourceLink.Create.CommandLine \ No newline at end of file diff --git a/src/kafunk/x64/liblz4.dll b/src/kafunk/x64/liblz4.dll new file mode 100644 index 0000000..92e7f06 Binary files /dev/null and b/src/kafunk/x64/liblz4.dll differ diff --git a/src/kafunk/x86/liblz4.dll b/src/kafunk/x86/liblz4.dll new file mode 100644 index 0000000..09e88d6 Binary files /dev/null and b/src/kafunk/x86/liblz4.dll differ diff --git a/tests/kafunk.Tests/LZ4.fs b/tests/kafunk.Tests/LZ4.fs new file mode 100644 index 0000000..1f34429 --- /dev/null +++ b/tests/kafunk.Tests/LZ4.fs @@ -0,0 +1,56 @@ +namespace Kafunk.Tests + +open NUnit.Framework +open Kafunk.Native +open System +open System.Text + +module LZ4 = + open System.Numerics + + [] + let ``compress produce result smaller than original`` () = + let text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit + . Sed malesuada consectetur augue, vitae euismod dui imperdiet in. Nunc arcu felis, + luctus eleifend ipsum ac, iaculis commodo enim. Proin aliquet odio nisi, et efficitur + enim vehicula in. Nam vitae vestibulum risus. Donec egestas dapibus urna. Proin sit amet + tincidunt dui, eget condimentum erat. Morbi blandit enim non massa laoreet ultricies. + Vivamus consequat pharetra felis, sit amet feugiat metus porta iaculis." + + let data = text |> Encoding.UTF8.GetBytes |> ArraySegment + + + let compressedBound = Lz4Framing.compressFrameBound data.Count + let compressedBuffer = Array.zeroCreate compressedBound + let compressed = Lz4Framing.compressFrame data compressedBuffer + + let decompressed = + compressed + |> Lz4Framing.decompress + |> Encoding.UTF8.GetString + + // Compression is correct + Assert.AreEqual(text, decompressed) + + // Compression produce result smaller than the original + Assert.Less(compressed.Count, data.Count) + + [] + let ``Array of any size compress correctly`` () = + [0; 1; 63; 64; 65; 1000000] + |> Seq.iter( fun size -> + let data = Array.zeroCreate size + (new Random()).NextBytes data + + let compressedBound = Lz4Framing.compressFrameBound data.Length + let compressedBuffer = Array.zeroCreate compressedBound + let compressed = Lz4Framing.compressFrame (new ArraySegment(data)) compressedBuffer + + let decompressed = + compressed + |> Lz4Framing.decompress + + Assert.AreEqual(data.Length, decompressed.Length) + Assert.AreEqual(data, decompressed) + ) + diff --git a/tests/kafunk.Tests/kafunk.Tests.fsproj b/tests/kafunk.Tests/kafunk.Tests.fsproj index 74069f2..2c91d54 100644 --- a/tests/kafunk.Tests/kafunk.Tests.fsproj +++ b/tests/kafunk.Tests/kafunk.Tests.fsproj @@ -1,10 +1,13 @@ - + + netcoreapp2.0;net45 false + true +