Skip to content

Commit 79ed636

Browse files
committed
differentiate control frame and data frame
1 parent bf0d5ac commit 79ed636

File tree

5 files changed

+43
-33
lines changed

5 files changed

+43
-33
lines changed

session.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ const (
1717
openCloseTimeout = 30 * time.Second // stream open/close timeout
1818
)
1919

20+
// define frame class
21+
type CLASSID int
22+
23+
const (
24+
CLSCTRL CLASSID = iota
25+
CLSDATA
26+
)
27+
2028
var (
2129
ErrInvalidProtocol = errors.New("invalid protocol")
2230
ErrConsumed = errors.New("peer consumed more than sent")
@@ -26,7 +34,7 @@ var (
2634
)
2735

2836
type writeRequest struct {
29-
prio uint32
37+
class CLASSID
3038
frame Frame
3139
seq uint32
3240
result chan writeResult
@@ -396,7 +404,7 @@ func (s *Session) keepalive() {
396404
for {
397405
select {
398406
case <-tickerPing.C:
399-
s.writeFrameInternal(newFrame(byte(s.config.Version), cmdNOP, 0), tickerPing.C, 0)
407+
s.writeFrameInternal(newFrame(byte(s.config.Version), cmdNOP, 0), tickerPing.C, CLSCTRL)
400408
s.notifyBucket() // force a signal to the recvLoop
401409
case <-tickerTimeout.C:
402410
if !atomic.CompareAndSwapInt32(&s.dataReady, 1, 0) {
@@ -515,13 +523,13 @@ func (s *Session) sendLoop() {
515523
// writeFrame writes the frame to the underlying connection
516524
// and returns the number of bytes written if successful
517525
func (s *Session) writeFrame(f Frame) (n int, err error) {
518-
return s.writeFrameInternal(f, time.After(openCloseTimeout), 0)
526+
return s.writeFrameInternal(f, time.After(openCloseTimeout), CLSCTRL)
519527
}
520528

521529
// internal writeFrame version to support deadline used in keepalive
522-
func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint32) (int, error) {
530+
func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, class CLASSID) (int, error) {
523531
req := writeRequest{
524-
prio: prio,
532+
class: class,
525533
frame: f,
526534
seq: atomic.AddUint32(&s.requestID, 1),
527535
result: make(chan writeResult, 1),

session_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -867,7 +867,7 @@ func TestWriteFrameInternal(t *testing.T) {
867867
session.Close()
868868
for i := 0; i < 100; i++ {
869869
f := newFrame(1, byte(rand.Uint32()), rand.Uint32())
870-
session.writeFrameInternal(f, time.After(session.config.KeepAliveTimeout), 0)
870+
session.writeFrameInternal(f, time.After(session.config.KeepAliveTimeout), CLSDATA)
871871
}
872872

873873
// random cmds
@@ -879,14 +879,14 @@ func TestWriteFrameInternal(t *testing.T) {
879879
session, _ = Client(cli, nil)
880880
for i := 0; i < 100; i++ {
881881
f := newFrame(1, allcmds[rand.Int()%len(allcmds)], rand.Uint32())
882-
session.writeFrameInternal(f, time.After(session.config.KeepAliveTimeout), 0)
882+
session.writeFrameInternal(f, time.After(session.config.KeepAliveTimeout), CLSDATA)
883883
}
884884
//deadline occur
885885
{
886886
c := make(chan time.Time)
887887
close(c)
888888
f := newFrame(1, allcmds[rand.Int()%len(allcmds)], rand.Uint32())
889-
_, err := session.writeFrameInternal(f, c, 0)
889+
_, err := session.writeFrameInternal(f, c, CLSDATA)
890890
if !strings.Contains(err.Error(), "timeout") {
891891
t.Fatal("write frame with deadline failed", err)
892892
}
@@ -911,7 +911,7 @@ func TestWriteFrameInternal(t *testing.T) {
911911
time.Sleep(time.Second)
912912
close(c)
913913
}()
914-
_, err = session.writeFrameInternal(f, c, 0)
914+
_, err = session.writeFrameInternal(f, c, CLSDATA)
915915
if !strings.Contains(err.Error(), "closed pipe") {
916916
t.Fatal("write frame with to closed conn failed", err)
917917
}

shaper.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,14 @@ func _itimediff(later, earlier uint32) int32 {
66

77
type shaperHeap []writeRequest
88

9-
func (h shaperHeap) Len() int { return len(h) }
10-
func (h shaperHeap) Less(i, j int) bool { return _itimediff(h[j].seq, h[i].seq) > 0 }
9+
func (h shaperHeap) Len() int { return len(h) }
10+
func (h shaperHeap) Less(i, j int) bool {
11+
if h[i].class != h[j].class {
12+
return h[i].class < h[j].class
13+
}
14+
return _itimediff(h[j].seq, h[i].seq) > 0
15+
}
16+
1117
func (h shaperHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
1218
func (h *shaperHeap) Push(x interface{}) { *h = append(*h, x.(writeRequest)) }
1319

shaper_test.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ import (
66
)
77

88
func TestShaper(t *testing.T) {
9-
w1 := writeRequest{prio: 10}
10-
w2 := writeRequest{prio: 10}
11-
w3 := writeRequest{prio: 20}
12-
w4 := writeRequest{prio: 100}
13-
w5 := writeRequest{prio: (1 << 32) - 1}
9+
w1 := writeRequest{seq: 1}
10+
w2 := writeRequest{seq: 2}
11+
w3 := writeRequest{seq: 3}
12+
w4 := writeRequest{seq: 4}
13+
w5 := writeRequest{seq: 5}
1414

1515
var reqs shaperHeap
1616
heap.Push(&reqs, w5)
@@ -19,25 +19,20 @@ func TestShaper(t *testing.T) {
1919
heap.Push(&reqs, w2)
2020
heap.Push(&reqs, w1)
2121

22-
var lastPrio = reqs[0].prio
2322
for len(reqs) > 0 {
2423
w := heap.Pop(&reqs).(writeRequest)
25-
if int32(w.prio-lastPrio) < 0 {
26-
t.Fatal("incorrect shaper priority")
27-
}
28-
29-
t.Log("prio:", w.prio)
30-
lastPrio = w.prio
24+
t.Log("sid:", w.frame.sid, "seq:", w.seq)
3125
}
3226
}
3327

3428
func TestShaper2(t *testing.T) {
35-
w1 := writeRequest{prio: 10, seq: 1} // stream 0
36-
w2 := writeRequest{prio: 10, seq: 2}
37-
w3 := writeRequest{prio: 20, seq: 3}
38-
w4 := writeRequest{prio: 100, seq: 4}
39-
w5 := writeRequest{prio: (1 << 32) - 1, seq: 5}
40-
w6 := writeRequest{prio: 10, seq: 1, frame: Frame{sid: 10}} // stream 1
29+
w1 := writeRequest{class: CLSDATA, seq: 1} // stream 0
30+
w2 := writeRequest{class: CLSDATA, seq: 2}
31+
w3 := writeRequest{class: CLSDATA, seq: 3}
32+
w4 := writeRequest{class: CLSDATA, seq: 4}
33+
w5 := writeRequest{class: CLSDATA, seq: 5}
34+
w6 := writeRequest{class: CLSCTRL, seq: 6, frame: Frame{sid: 10}} // ctrl 1
35+
w7 := writeRequest{class: CLSCTRL, seq: 7, frame: Frame{sid: 11}} // ctrl 2
4136

4237
var reqs shaperHeap
4338
heap.Push(&reqs, w6)
@@ -46,9 +41,10 @@ func TestShaper2(t *testing.T) {
4641
heap.Push(&reqs, w3)
4742
heap.Push(&reqs, w2)
4843
heap.Push(&reqs, w1)
44+
heap.Push(&reqs, w7)
4945

5046
for len(reqs) > 0 {
5147
w := heap.Pop(&reqs).(writeRequest)
52-
t.Log("prio:", w.prio, "sid:", w.frame.sid, "seq:", w.seq)
48+
t.Log("sid:", w.frame.sid, "seq:", w.seq)
5349
}
5450
}

stream.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func (s *Stream) sendWindowUpdate(consumed uint32) error {
255255
binary.LittleEndian.PutUint32(hdr[:], consumed)
256256
binary.LittleEndian.PutUint32(hdr[4:], uint32(s.sess.config.MaxStreamBuffer))
257257
frame.data = hdr[:]
258-
_, err := s.sess.writeFrameInternal(frame, deadline, 0)
258+
_, err := s.sess.writeFrameInternal(frame, deadline, CLSDATA)
259259
return err
260260
}
261261

@@ -325,7 +325,7 @@ func (s *Stream) Write(b []byte) (n int, err error) {
325325
}
326326
frame.data = bts[:sz]
327327
bts = bts[sz:]
328-
n, err := s.sess.writeFrameInternal(frame, deadline, 0)
328+
n, err := s.sess.writeFrameInternal(frame, deadline, CLSDATA)
329329
s.numWritten++
330330
sent += n
331331
if err != nil {
@@ -393,7 +393,7 @@ func (s *Stream) writeV2(b []byte) (n int, err error) {
393393
}
394394
frame.data = bts[:sz]
395395
bts = bts[sz:]
396-
n, err := s.sess.writeFrameInternal(frame, deadline, 0)
396+
n, err := s.sess.writeFrameInternal(frame, deadline, CLSDATA)
397397
atomic.AddUint32(&s.numWritten, uint32(sz))
398398
sent += n
399399
if err != nil {

0 commit comments

Comments
 (0)