2525
2626import org .apache .activemq .transport .stomp .StompFrame ;
2727import org .apache .activemq .transport .stomp .StompWireFormat ;
28- import org .eclipse .jetty .ee9 .websocket .api .Session ;
29- import org .eclipse .jetty .ee9 .websocket .api .WebSocketAdapter ;
30- import org .eclipse .jetty .ee9 .websocket .api .WebSocketListener ;
28+ import org .eclipse .jetty .websocket .api .Session ;
3129import org .slf4j .Logger ;
3230import org .slf4j .LoggerFactory ;
3331
3432/**
3533 * STOMP over WS based Connection class
3634 */
37- public class StompWSConnection extends WebSocketAdapter implements WebSocketListener {
35+ public class StompWSConnection extends org . eclipse . jetty . websocket . api . Session . Listener . AbstractAutoDemanding implements Session . Listener . AutoDemanding {
3836
3937 private static final Logger LOG = LoggerFactory .getLogger (StompWSConnection .class );
4038
41- private Session connection ;
4239 private final CountDownLatch connectLatch = new CountDownLatch (1 );
4340
4441 private final BlockingQueue <String > prefetch = new LinkedBlockingDeque <String >();
@@ -47,36 +44,37 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList
4744 private int closeCode = -1 ;
4845 private String closeMessage ;
4946
50- @ Override
5147 public boolean isConnected () {
52- return connection != null ? connection .isOpen () : false ;
48+ Session session = getSession ();
49+ return session != null && session .isOpen ();
5350 }
5451
5552 public void close () {
56- if (connection != null ) {
57- connection .close ();
53+ Session session = getSession ();
54+ if (session != null ) {
55+ session .close ();
5856 }
5957 }
6058
6159 protected Session getConnection () {
62- return connection ;
60+ return getSession () ;
6361 }
6462
6563 //---- Send methods ------------------------------------------------------//
6664
6765 public synchronized void sendRawFrame (String rawFrame ) throws Exception {
6866 checkConnected ();
69- connection . getRemote ().sendString (rawFrame );
67+ getSession ().sendText (rawFrame , null );
7068 }
7169
7270 public synchronized void sendFrame (StompFrame frame ) throws Exception {
7371 checkConnected ();
74- connection . getRemote ().sendString (wireFormat .marshalToString (frame ));
72+ getSession ().sendText (wireFormat .marshalToString (frame ), null );
7573 }
7674
7775 public synchronized void keepAlive () throws Exception {
7876 checkConnected ();
79- connection . getRemote ().sendString ("\n " );
77+ getSession ().sendText ("\n " , null );
8078 }
8179
8280 //----- Receive methods --------------------------------------------------//
@@ -136,22 +134,21 @@ public void onWebSocketText(String data) {
136134 public void onWebSocketClose (int statusCode , String reason ) {
137135 LOG .trace ("STOMP WS Connection closed, code:{} message:{}" , statusCode , reason );
138136
139- this .connection = null ;
140137 this .closeCode = statusCode ;
141138 this .closeMessage = reason ;
142139 }
143140
144141 @ Override
145- public void onWebSocketConnect ( org . eclipse . jetty . ee9 . websocket . api . Session session ) {
146- this . connection = session ;
147- this . connection .setIdleTimeout (Duration .ZERO );
142+ public void onWebSocketOpen ( Session session ) {
143+ super . onWebSocketOpen ( session ) ;
144+ session .setIdleTimeout (Duration .ZERO );
148145 this .connectLatch .countDown ();
149146 }
150147
151148 //----- Internal implementation ------------------------------------------//
152149
153150 private void checkConnected () throws IOException {
154- if (!isConnected ()) {
151+ if (!isOpen ()) {
155152 throw new IOException ("STOMP WS Connection is closed." );
156153 }
157154 }
0 commit comments