Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import net.openhft.chronicle.bytes.BytesMarshallable;
import net.openhft.chronicle.bytes.BytesOut;
import net.openhft.chronicle.bytes.PointerBytesStore;
import open.vincentf13.service.spot.infra.alloc.ThreadContext;
import open.vincentf13.service.spot.infra.alloc.NativeUnsafeBuffer;
import open.vincentf13.service.spot.sbe.MessageHeaderDecoder;
import open.vincentf13.service.spot.sbe.MessageHeaderEncoder;
import org.agrona.DirectBuffer;
Expand All @@ -27,6 +27,7 @@ public abstract class AbstractSbeModel implements BytesMarshallable {
protected final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
protected final MessageHeaderDecoder headerDecoder = new MessageHeaderDecoder();
protected final UnsafeBuffer internalBuffer = new UnsafeBuffer(0, 0);
protected final NativeUnsafeBuffer encodeBuffer = new NativeUnsafeBuffer(256);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Release per-model NativeUnsafeBuffer on context cleanup

Allocating encodeBuffer in every AbstractSbeModel instance introduces new off-heap NativeUnsafeBuffer ownership, but there is no corresponding release() path (the existing ThreadContext.cleanup() only releases scratchBuffer). In services that restart worker threads (or recreate contexts), these command/report buffers remain unreleased until GC, so direct memory can accumulate across cycles and eventually hit MaxDirectMemorySize under sustained restarts.

Useful? React with 👍 / 👎.


public void setGatewaySeq(long val) { this.seq = val; }
public long getGatewaySeq() { return this.seq; }
Expand All @@ -40,8 +41,8 @@ protected DirectBuffer wrapStore(PointerBytesStore store) {
return internalBuffer;
}

public void fillFromScratch(int length) {
fillFrom(ThreadContext.get().getScratchBuffer().buffer(), 0, length);
public void fillFromEncodeBuffer(int length) {
fillFrom(encodeBuffer.buffer(), 0, length);
}

public void fillFrom(open.vincentf13.service.spot.infra.alloc.aeron.AbstractAeronAlloc<?> aeron) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import lombok.Data;
import lombok.EqualsAndHashCode;
import open.vincentf13.service.spot.infra.alloc.ThreadContext;
import open.vincentf13.service.spot.sbe.AuthDecoder;
import open.vincentf13.service.spot.sbe.AuthEncoder;
import org.agrona.DirectBuffer;
Expand All @@ -24,9 +23,9 @@ public AuthDecoder decode() {
}

public void encode(long timestamp, long userId) {
MutableDirectBuffer buffer = ThreadContext.get().getScratchBuffer().wrapForWrite();
MutableDirectBuffer buffer = encodeBuffer.wrapForWrite();
wrapHeader(buffer, AuthEncoder.TEMPLATE_ID, AuthEncoder.BLOCK_LENGTH, AuthEncoder.SCHEMA_ID, AuthEncoder.SCHEMA_VERSION);
encoder.wrap(buffer, HEADER_SIZE).timestamp(timestamp).userId(userId);
fillFromScratch(HEADER_SIZE + encoder.encodedLength());
fillFromEncodeBuffer(HEADER_SIZE + encoder.encodedLength());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import lombok.Data;
import lombok.EqualsAndHashCode;
import open.vincentf13.service.spot.infra.alloc.ThreadContext;
import open.vincentf13.service.spot.sbe.DepositDecoder;
import open.vincentf13.service.spot.sbe.DepositEncoder;
import org.agrona.DirectBuffer;
Expand All @@ -24,9 +23,9 @@ public DepositDecoder decode() {
}

public void encode(long timestamp, long userId, int assetId, long amount) {
MutableDirectBuffer buffer = ThreadContext.get().getScratchBuffer().wrapForWrite();
MutableDirectBuffer buffer = encodeBuffer.wrapForWrite();
wrapHeader(buffer, DepositEncoder.TEMPLATE_ID, DepositEncoder.BLOCK_LENGTH, DepositEncoder.SCHEMA_ID, DepositEncoder.SCHEMA_VERSION);
encoder.wrap(buffer, HEADER_SIZE).timestamp(timestamp).userId(userId).assetId(assetId).amount(amount);
fillFromScratch(HEADER_SIZE + encoder.encodedLength());
fillFromEncodeBuffer(HEADER_SIZE + encoder.encodedLength());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import open.vincentf13.service.spot.sbe.ExecutionReportDecoder;
import open.vincentf13.service.spot.sbe.ExecutionReportEncoder;
import open.vincentf13.service.spot.sbe.OrderStatus;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;

/**
* 訂單接受回報
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class OrderAcceptedReport extends AbstractSbeModel {
private final ExecutionReportEncoder encoder = new ExecutionReportEncoder();
private final ExecutionReportDecoder decoder = new ExecutionReportDecoder();

public ExecutionReportDecoder decode() {
Expand All @@ -23,4 +26,11 @@ public ExecutionReportDecoder decode() {
public void encode(long timestamp, long userId, long orderId, long clientOrderId) {
encodeReport(timestamp, userId, orderId, OrderStatus.NEW, 0, 0, 0, 0, clientOrderId);
}

private void encodeReport(long timestamp, long userId, long orderId, OrderStatus status, long lastPrice, long lastQty, long cumQty, long avgPrice, long clientOrderId) {
MutableDirectBuffer buffer = encodeBuffer.wrapForWrite();
wrapHeader(buffer, ExecutionReportEncoder.TEMPLATE_ID, ExecutionReportEncoder.BLOCK_LENGTH, ExecutionReportEncoder.SCHEMA_ID, ExecutionReportEncoder.SCHEMA_VERSION);
encoder.wrap(buffer, HEADER_SIZE).timestamp(timestamp).userId(userId).orderId(orderId).status(status).lastPrice(lastPrice).lastQty(lastQty).cumQty(cumQty).avgPrice(avgPrice).clientOrderId(clientOrderId);
fillFromEncodeBuffer(HEADER_SIZE + encoder.encodedLength());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import lombok.Data;
import lombok.EqualsAndHashCode;
import open.vincentf13.service.spot.infra.alloc.ThreadContext;
import open.vincentf13.service.spot.sbe.OrderCancelDecoder;
import open.vincentf13.service.spot.sbe.OrderCancelEncoder;
import org.agrona.DirectBuffer;
Expand All @@ -24,9 +23,9 @@ public OrderCancelDecoder decode() {
}

public void encode(long timestamp, long userId, long orderId) {
MutableDirectBuffer buffer = ThreadContext.get().getScratchBuffer().wrapForWrite();
MutableDirectBuffer buffer = encodeBuffer.wrapForWrite();
wrapHeader(buffer, OrderCancelEncoder.TEMPLATE_ID, OrderCancelEncoder.BLOCK_LENGTH, OrderCancelEncoder.SCHEMA_ID, OrderCancelEncoder.SCHEMA_VERSION);
encoder.wrap(buffer, HEADER_SIZE).timestamp(timestamp).userId(userId).orderId(orderId);
fillFromScratch(HEADER_SIZE + encoder.encodedLength());
fillFromEncodeBuffer(HEADER_SIZE + encoder.encodedLength());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import open.vincentf13.service.spot.sbe.ExecutionReportDecoder;
import open.vincentf13.service.spot.sbe.ExecutionReportEncoder;
import open.vincentf13.service.spot.sbe.OrderStatus;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;

/**
* 訂單撤單回報
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class OrderCanceledReport extends AbstractSbeModel {
private final ExecutionReportEncoder encoder = new ExecutionReportEncoder();
private final ExecutionReportDecoder decoder = new ExecutionReportDecoder();

public ExecutionReportDecoder decode() {
Expand All @@ -23,4 +26,11 @@ public ExecutionReportDecoder decode() {
public void encode(long timestamp, long userId, long orderId, long filledQuantity, long clientOrderId) {
encodeReport(timestamp, userId, orderId, OrderStatus.CANCELED, 0, 0, filledQuantity, 0, clientOrderId);
}

private void encodeReport(long timestamp, long userId, long orderId, OrderStatus status, long lastPrice, long lastQty, long cumQty, long avgPrice, long clientOrderId) {
MutableDirectBuffer buffer = encodeBuffer.wrapForWrite();
wrapHeader(buffer, ExecutionReportEncoder.TEMPLATE_ID, ExecutionReportEncoder.BLOCK_LENGTH, ExecutionReportEncoder.SCHEMA_ID, ExecutionReportEncoder.SCHEMA_VERSION);
encoder.wrap(buffer, HEADER_SIZE).timestamp(timestamp).userId(userId).orderId(orderId).status(status).lastPrice(lastPrice).lastQty(lastQty).cumQty(cumQty).avgPrice(avgPrice).clientOrderId(clientOrderId);
fillFromEncodeBuffer(HEADER_SIZE + encoder.encodedLength());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import lombok.Data;
import lombok.EqualsAndHashCode;
import open.vincentf13.service.spot.infra.alloc.ThreadContext;
import open.vincentf13.service.spot.sbe.OrderCreateDecoder;
import open.vincentf13.service.spot.sbe.OrderCreateEncoder;
import open.vincentf13.service.spot.sbe.Side;
Expand All @@ -25,9 +24,9 @@ public OrderCreateDecoder decode() {
}

public void encode(long timestamp, long userId, int symbolId, long price, long qty, Side side, long clientOrderId) {
MutableDirectBuffer buffer = ThreadContext.get().getScratchBuffer().wrapForWrite();
MutableDirectBuffer buffer = encodeBuffer.wrapForWrite();
wrapHeader(buffer, OrderCreateEncoder.TEMPLATE_ID, OrderCreateEncoder.BLOCK_LENGTH, OrderCreateEncoder.SCHEMA_ID, OrderCreateEncoder.SCHEMA_VERSION);
encoder.wrap(buffer, HEADER_SIZE).timestamp(timestamp).userId(userId).symbolId(symbolId).price(price).qty(qty).side(side).clientOrderId(clientOrderId);
fillFromScratch(HEADER_SIZE + encoder.encodedLength());
fillFromEncodeBuffer(HEADER_SIZE + encoder.encodedLength());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import open.vincentf13.service.spot.sbe.ExecutionReportDecoder;
import open.vincentf13.service.spot.sbe.ExecutionReportEncoder;
import open.vincentf13.service.spot.sbe.OrderStatus;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;

/**
* 訂單成交回報
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class OrderMatchReport extends AbstractSbeModel {
private final ExecutionReportEncoder encoder = new ExecutionReportEncoder();
private final ExecutionReportDecoder decoder = new ExecutionReportDecoder();

public ExecutionReportDecoder decode() {
Expand All @@ -23,4 +26,11 @@ public ExecutionReportDecoder decode() {
public void encode(long timestamp, long userId, long orderId, OrderStatus status, long lastPrice, long lastQty, long cumQty, long avgPrice, long clientOrderId) {
encodeReport(timestamp, userId, orderId, status, lastPrice, lastQty, cumQty, avgPrice, clientOrderId);
}

private void encodeReport(long timestamp, long userId, long orderId, OrderStatus status, long lastPrice, long lastQty, long cumQty, long avgPrice, long clientOrderId) {
MutableDirectBuffer buffer = encodeBuffer.wrapForWrite();
wrapHeader(buffer, ExecutionReportEncoder.TEMPLATE_ID, ExecutionReportEncoder.BLOCK_LENGTH, ExecutionReportEncoder.SCHEMA_ID, ExecutionReportEncoder.SCHEMA_VERSION);
encoder.wrap(buffer, HEADER_SIZE).timestamp(timestamp).userId(userId).orderId(orderId).status(status).lastPrice(lastPrice).lastQty(lastQty).cumQty(cumQty).avgPrice(avgPrice).clientOrderId(clientOrderId);
fillFromEncodeBuffer(HEADER_SIZE + encoder.encodedLength());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import open.vincentf13.service.spot.sbe.ExecutionReportDecoder;
import open.vincentf13.service.spot.sbe.ExecutionReportEncoder;
import open.vincentf13.service.spot.sbe.OrderStatus;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;

/**
* 訂單拒絕回報
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class OrderRejectedReport extends AbstractSbeModel {
private final ExecutionReportEncoder encoder = new ExecutionReportEncoder();
private final ExecutionReportDecoder decoder = new ExecutionReportDecoder();

public ExecutionReportDecoder decode() {
Expand All @@ -23,4 +26,11 @@ public ExecutionReportDecoder decode() {
public void encode(long timestamp, long userId, long clientOrderId) {
encodeReport(timestamp, userId, 0, OrderStatus.REJECTED, 0, 0, 0, 0, clientOrderId);
}

private void encodeReport(long timestamp, long userId, long orderId, OrderStatus status, long lastPrice, long lastQty, long cumQty, long avgPrice, long clientOrderId) {
MutableDirectBuffer buffer = encodeBuffer.wrapForWrite();
wrapHeader(buffer, ExecutionReportEncoder.TEMPLATE_ID, ExecutionReportEncoder.BLOCK_LENGTH, ExecutionReportEncoder.SCHEMA_ID, ExecutionReportEncoder.SCHEMA_VERSION);
encoder.wrap(buffer, HEADER_SIZE).timestamp(timestamp).userId(userId).orderId(orderId).status(status).lastPrice(lastPrice).lastQty(lastQty).cumQty(cumQty).avgPrice(avgPrice).clientOrderId(clientOrderId);
fillFromEncodeBuffer(HEADER_SIZE + encoder.encodedLength());
}
}