Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion design/exchange/0.exchange_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -2661,7 +2661,7 @@ Positions 服務必須在啟動時預熱所有交易對的靜態配置與風控
| ---------- | ------------------------------ | -------------------- | --------------------------------------------------------------------------------------------------------------------- |
| 待處理器flip情況 | `account.trade-margin-settled` | `TradeMarginSettled` | **開倉入帳**:建立新倉位或增加持倉。更新 `margin` (從錢包轉入) 與 `entry_price`。 |
| | `matching.trade-executed` | `TradeExecuted` | **平倉結算**:僅處理 `makerIntent/takerIntent = REDUCE/CLOSE` 的成交;減少持倉、釋放凍結數量、計算 `realized_pnl` 並發布 `PositionMarginReleased`。 |
| | `market.mark-price` | `MarkPriceUpdated` | **行情更新**:使用最新標記價格重算 `unrealized_pnl` 與 `liquidation_price`。若觸發強平線則發布 Alert。 |
| | `market.mark-price` | `MarkPriceUpdated` | **行情更新**:使用最新標記價格重算 `unrealized_pnl` 與 `liquidation_price` 並發布 `PositionUpdated` (若有顯著變動)。(注意:強平觸發由 Risk 服務負責) |

---

Expand Down
4 changes: 4 additions & 0 deletions service/exchange/exchange-position/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,9 @@
<groupId>open.vincentf13</groupId>
<artifactId>exchange-matching-sdk-mq</artifactId>
</dependency>
<dependency>
<groupId>open.vincentf13</groupId>
<artifactId>exchange-market-sdk-mq</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,41 @@ public static PositionEvent createTradeEvent(Long positionId,
.occurredAt(occurredAt)
.build();
}

public static PositionEvent createMarkPriceEvent(Long positionId,
Long userId,
Long instrumentId,
BigDecimal newQuantity,
BigDecimal newReservedQuantity,
BigDecimal newEntryPrice,
Integer newLeverage,
BigDecimal newMargin,
BigDecimal newUnrealizedPnl,
BigDecimal marginRatio,
BigDecimal newLiquidationPrice,
Long tradeId,
Instant occurredAt) {
return PositionEvent.builder()
.positionId(positionId)
.userId(userId)
.instrumentId(instrumentId)
.eventType(PositionEventType.MARK_PRICE_UPDATED)
.deltaQuantity(BigDecimal.ZERO)
.deltaMargin(BigDecimal.ZERO)
.realizedPnl(BigDecimal.ZERO)
.tradeFee(BigDecimal.ZERO)
.fundingFee(BigDecimal.ZERO)
.newQuantity(newQuantity)
.newReservedQuantity(newReservedQuantity)
.newEntryPrice(newEntryPrice)
.newLeverage(newLeverage)
.newMargin(newMargin)
.newUnrealizedPnl(newUnrealizedPnl)
.newLiquidationPrice(newLiquidationPrice)
.referenceId(tradeId)
.referenceType(PositionReferenceType.MARK_PRICE_UPDATE)
.metadata("{\"marginRatio\": " + marginRatio + "}")
.occurredAt(occurredAt)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import open.vincentf13.exchange.position.infra.cache.RiskLimitCache;
import open.vincentf13.exchange.position.infra.persistence.po.PositionPO;
import open.vincentf13.exchange.position.infra.persistence.repository.PositionEventRepository;
import open.vincentf13.exchange.market.mq.event.MarkPriceUpdatedEvent;
import open.vincentf13.exchange.position.infra.persistence.repository.PositionRepository;
import open.vincentf13.exchange.position.sdk.rest.api.enums.PositionEventType;
import open.vincentf13.exchange.position.sdk.rest.api.enums.PositionReferenceType;
import open.vincentf13.sdk.core.exception.OpenException;
import open.vincentf13.sdk.core.object.mapper.OpenObjectMapper;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;
import java.math.RoundingMode;
Expand All @@ -45,6 +47,110 @@ public class PositionDomainService {
private static final BigDecimal MAINTENANCE_MARGIN_RATE_DEFAULT = BigDecimal.valueOf(0.005);
private static final BigDecimal CONTRACT_MULTIPLIER = BigDecimal.ONE;

@Transactional
public void handleMarkPriceUpdate(MarkPriceUpdatedEvent event) {
Long instrumentId = event.instrumentId();
BigDecimal markPrice = event.markPrice();
Instant eventTime = event.tradeExecutedAt();

markPriceCache.update(instrumentId, markPrice, eventTime);

List<Position> activePositions = positionRepository.findBy(
Wrappers.lambdaQuery(PositionPO.class)
.eq(PositionPO::getInstrumentId, instrumentId)
.eq(PositionPO::getStatus, PositionStatus.ACTIVE));

BigDecimal contractMultiplier = instrumentCache.get(instrumentId)
.map(instrument -> instrument.contractSize() != null ? instrument.contractSize() : CONTRACT_MULTIPLIER)
.orElse(CONTRACT_MULTIPLIER);

BigDecimal maintenanceMarginRate = riskLimitCache.get(instrumentId)
.map(riskLimit -> riskLimit.maintenanceMarginRate() != null ? riskLimit.maintenanceMarginRate() : MAINTENANCE_MARGIN_RATE_DEFAULT)
.orElse(MAINTENANCE_MARGIN_RATE_DEFAULT);

for (Position position : activePositions) {
updatePositionWithMarkPrice(position, markPrice, contractMultiplier, maintenanceMarginRate, event);
}
}

private void updatePositionWithMarkPrice(Position position, BigDecimal markPrice, BigDecimal contractMultiplier, BigDecimal maintenanceMarginRate, MarkPriceUpdatedEvent event) {
Position updatedPosition = OpenObjectMapper.convert(position, Position.class);
updatedPosition.setMarkPrice(markPrice);

BigDecimal quantity = safe(position.getQuantity());

// Skip if quantity is zero (should not happen for active positions but safe guard)
if (quantity.compareTo(BigDecimal.ZERO) == 0) {
return;
}

if (updatedPosition.getSide() == PositionSide.LONG) {
updatedPosition.setUnrealizedPnl(markPrice.subtract(updatedPosition.getEntryPrice())
.multiply(quantity)
.multiply(contractMultiplier));
} else {
updatedPosition.setUnrealizedPnl(updatedPosition.getEntryPrice().subtract(markPrice)
.multiply(quantity)
.multiply(contractMultiplier));
}

BigDecimal notional = markPrice.multiply(quantity).abs();
if (notional.compareTo(BigDecimal.ZERO) == 0) {
updatedPosition.setMarginRatio(BigDecimal.ZERO);
} else {
updatedPosition.setMarginRatio(updatedPosition.getMargin().add(updatedPosition.getUnrealizedPnl())
.divide(notional, ValidationConstant.Names.MARGIN_RATIO_SCALE, RoundingMode.HALF_UP));
}

if (updatedPosition.getSide() == PositionSide.LONG) {
updatedPosition.setLiquidationPrice(
updatedPosition.getEntryPrice()
.subtract(updatedPosition.getMargin().divide(quantity, ValidationConstant.Names.COMMON_SCALE, RoundingMode.HALF_UP))
.divide(BigDecimal.ONE.subtract(maintenanceMarginRate), ValidationConstant.Names.COMMON_SCALE, RoundingMode.HALF_UP)
);
} else {
updatedPosition.setLiquidationPrice(
updatedPosition.getEntryPrice()
.add(updatedPosition.getMargin().divide(quantity, ValidationConstant.Names.COMMON_SCALE, RoundingMode.HALF_UP))
.divide(BigDecimal.ONE.add(maintenanceMarginRate), ValidationConstant.Names.COMMON_SCALE, RoundingMode.HALF_UP)
);
}

// Optimistic Locking Update
int expectedVersion = position.safeVersion();
updatedPosition.setVersion(expectedVersion + 1);
boolean updated = positionRepository.updateSelectiveBy(
updatedPosition,
Wrappers.<PositionPO>lambdaUpdate()
.eq(PositionPO::getPositionId, updatedPosition.getPositionId())
.eq(PositionPO::getVersion, expectedVersion));

if (!updated) {
// If concurrent update happens (e.g. user trades at the same time), we skip this mark price update
// as the trade event will update the position with latest data anyway.
// Or we could throw exception to retry, but for high freq mark price, skipping is acceptable.
// However, to be safe and consistent with other methods, we can log it.
return;
}

PositionEvent positionEvent = PositionEvent.createMarkPriceEvent(
updatedPosition.getPositionId(),
position.getUserId(),
position.getInstrumentId(),
updatedPosition.getQuantity(),
updatedPosition.getClosingReservedQuantity(),
updatedPosition.getEntryPrice(),
updatedPosition.getLeverage(),
updatedPosition.getMargin(),
updatedPosition.getUnrealizedPnl(),
updatedPosition.getMarginRatio(),
updatedPosition.getLiquidationPrice(),
event.tradeId(),
event.calculatedAt()
);
positionEventRepository.insert(positionEvent);
}

public PositionSide toPositionSide(OrderSide orderSide) {
if (orderSide == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public enum PositionEvent implements OpenEvent {
STARTUP_LOADING_RISK_LIMITS("StartupLoadingRiskLimits", "Loading risk limits from Risk service"),
STARTUP_RISK_LIMITS_LOADED("StartupRiskLimitsLoaded", "Risk limits loaded successfully"),
STARTUP_RISK_LIMIT_FETCH_FAILED("StartupRiskLimitFetchFailed", "Failed to fetch risk limit for instrument"),
STARTUP_RISK_LIMIT_LOAD_PARTIAL("StartupRiskLimitLoadPartial", "Partial risk limit loading");
STARTUP_RISK_LIMIT_LOAD_PARTIAL("StartupRiskLimitLoadPartial", "Partial risk limit loading"),
POSITION_MARK_PRICE_PAYLOAD_INVALID("PositionMarkPricePayloadInvalid", "Invalid MarkPriceUpdated payload");

private final String event;
private final String message;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package open.vincentf13.exchange.position.infra.messaging.consumer;

import lombok.RequiredArgsConstructor;
import open.vincentf13.exchange.market.mq.event.MarkPriceUpdatedEvent;
import open.vincentf13.sdk.core.OpenValidator;
import open.vincentf13.exchange.market.mq.topic.MarketTopics;
import open.vincentf13.exchange.position.domain.service.PositionDomainService;
import open.vincentf13.exchange.position.infra.PositionEvent;
import open.vincentf13.sdk.core.log.OpenLog;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class MarkPriceUpdatedEventListener {

private final PositionDomainService positionDomainService;

@KafkaListener(topics = MarketTopics.Names.MARK_PRICE_UPDATED,
groupId = "${exchange.position.mark-price-updated.consumer-group:exchange-position-mark-price-updated}")
public void onMarkPriceUpdated(@Payload MarkPriceUpdatedEvent event,
Acknowledgment acknowledgment) {
try {
OpenValidator.validateOrThrow(event);
positionDomainService.handleMarkPriceUpdate(event);
acknowledgment.acknowledge();
} catch (Exception e) {
OpenLog.error(PositionEvent.POSITION_MARK_PRICE_PAYLOAD_INVALID, e, "event", event);
throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e);
}
}
}