Skip to content

Commit f042fb6

Browse files
committed
Add system tests for python buffers
Signed-off-by: Travis F. Collins <[email protected]>
1 parent 91196b4 commit f042fb6

File tree

2 files changed

+155
-0
lines changed

2 files changed

+155
-0
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pytest
2+
scipy
3+
numpy
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import pytest
2+
3+
import iio
4+
import numpy as np
5+
from scipy import signal
6+
7+
import os
8+
9+
10+
def gen_data(fs, fc):
11+
N = 2**12
12+
ts = 1 / float(fs)
13+
t = np.arange(0, N * ts, ts)
14+
i = np.cos(2 * np.pi * t * fc) * 2**14
15+
q = np.sin(2 * np.pi * t * fc) * 2**14
16+
iq = i + 1j * q
17+
return iq
18+
19+
20+
def estimate_freq(x, fs):
21+
f, Pxx_den = signal.periodogram(x, fs)
22+
idx = np.argmax(Pxx_den)
23+
return f[idx]
24+
25+
26+
## Streaming not supported yet
27+
# def stream_based_tx(buf_tx, iq):
28+
# # Stream version (Does not support cyclic?)
29+
# tx_stream = iio.Stream(buf_tx, 1024)
30+
# block_tx = next(tx_stream)
31+
# block_tx.write(iq)
32+
# block_tx.enqueue(None, True)
33+
# buf_tx.enabled = True
34+
35+
36+
def block_based_tx_chans(block_tx, buf_tx, mask_tx, ib, qb):
37+
# Block version channel based
38+
mask_tx.channels[0].write(block_tx, ib)
39+
mask_tx.channels[1].write(block_tx, qb)
40+
block_tx.enqueue(None, True)
41+
buf_tx.enabled = True
42+
43+
44+
def block_based_tx_single(block_tx, buf_tx, ib, qb):
45+
iqb = np.stack((ib, qb), axis=-1)
46+
iqb = iqb.flatten()
47+
iqb = bytearray(iqb)
48+
49+
# Block version single write
50+
block_tx.write(iqb)
51+
block_tx.enqueue(None, True)
52+
buf_tx.enabled = True
53+
54+
55+
@pytest.mark.parametrize("tx_buffer_modes", ["dds", "chans", "single"])
56+
@pytest.mark.parametrize("rx_buffer_modes", ["chans", "single"])
57+
def test_ad9361_buffers(tx_buffer_modes, rx_buffer_modes):
58+
fs = 4e6
59+
lo = 1e9
60+
fc = 5e5
61+
62+
uri = os.getenv("URI", "ip:analog.local")
63+
64+
ctx = iio.Context("ip:analog.local")
65+
dev = ctx.find_device("ad9361-phy")
66+
dev_rx = ctx.find_device("cf-ad9361-lpc")
67+
dev_tx = ctx.find_device("cf-ad9361-dds-core-lpc")
68+
69+
chan = dev.find_channel("voltage0")
70+
chan.attrs["sampling_frequency"].value = str(int(fs))
71+
chan.attrs["rf_bandwidth"].value = str(int(fs))
72+
73+
achan = dev.find_channel("altvoltage0", True)
74+
achan.attrs["frequency"].value = str(int(lo))
75+
achan = dev.find_channel("altvoltage1", True)
76+
achan.attrs["frequency"].value = str(int(lo))
77+
78+
dev.debug_attrs["loopback"].value = "1"
79+
80+
if tx_buffer_modes == "dds":
81+
# DDS
82+
for N in [1]:
83+
for IQ in ["I", "Q"]:
84+
chan = f"TX1_{IQ}_F{N}"
85+
dds = dev_tx.find_channel(chan, True)
86+
if not dds:
87+
raise Exception(f"Could not find channel {chan}")
88+
dds.attrs["frequency"].value = str(int(fc))
89+
dds.attrs["scale"].value = "0.2"
90+
if IQ == "I":
91+
dds.attrs["phase"].value = "90000"
92+
else:
93+
dds.attrs["phase"].value = "0.0"
94+
95+
## Buffer stuff
96+
97+
# RX Side
98+
chan1 = dev_rx.find_channel("voltage0")
99+
chan2 = dev_rx.find_channel("voltage1")
100+
mask = iio.ChannelsMask(dev_rx)
101+
mask.channels = [chan1, chan2]
102+
103+
buf = iio.Buffer(dev_rx, mask)
104+
stream = iio.Stream(buf, 1024)
105+
106+
if tx_buffer_modes != "dds":
107+
# TX Side
108+
chan1_tx = dev_tx.find_channel("voltage0", True)
109+
chan2_tx = dev_tx.find_channel("voltage1", True)
110+
mask_tx = iio.ChannelsMask(dev_tx)
111+
mask_tx.channels = [chan1_tx, chan2_tx]
112+
113+
# Create a sinewave waveform
114+
fc = 15e5
115+
iq = gen_data(fs, fc)
116+
# Convert iq to interleaved int16 byte array
117+
ib = np.array(iq.real, dtype=np.int16)
118+
qb = np.array(iq.imag, dtype=np.int16)
119+
120+
# Send data to iio
121+
buf_tx = iio.Buffer(dev_tx, mask_tx)
122+
block_tx = iio.Block(buf_tx, len(ib))
123+
124+
if tx_buffer_modes == "chans":
125+
block_based_tx_chans(block_tx, buf_tx, mask_tx, ib, qb)
126+
127+
if tx_buffer_modes == "single":
128+
block_based_tx_single(block_tx, buf_tx, ib, qb)
129+
130+
# Clear buffer queue
131+
for r in range(10):
132+
block = next(stream)
133+
134+
for r in range(20):
135+
block = next(stream)
136+
137+
# Single buffer read
138+
if rx_buffer_modes == "single":
139+
x = np.frombuffer(block.read(), dtype=np.int16)
140+
x = x[0::2] + 1j * x[1::2]
141+
else:
142+
# Buffer read by channel
143+
re = mask.channels[0].read(block)
144+
re = np.frombuffer(re, dtype=np.int16)
145+
im = mask.channels[1].read(block)
146+
im = np.frombuffer(im, dtype=np.int16)
147+
x = re + 1j * im
148+
149+
freq = estimate_freq(x, fs)
150+
print(f"Estimated freq: {freq/1e6} MHz | Expected freq: {fc/1e6} MHz")
151+
152+
assert np.abs(freq - fc) < 1e3

0 commit comments

Comments
 (0)