IQ Plot - Python
The purpose of this tutorial is to show how to plot IQ data that are captured from sdr_spectrum or Remote API. There are multiple ways to capture IQ data that can be processed by the example in this tutorial. Refer to following tutorials regarding the way to capture IQ data.
Table of Contents
- IQ Plot - Python
Test Setup
You can use any kind of test setup that allow you to get UE attached to callbox.
Python
Following is the version of the python and package that are used for this tutorial. (I tested this on Windows 11 Home Edition). For python, first try with whatever version you are using as long as it is ver 3.x and upgrade it to latest version if it does not work.
- Python version : 3.10.9
Python Packages
For the script in this tutorial, you need to install following packages.
pip install numpy pip install matplotlib pip install scipy pip install tqdm pip install mplcursors pip install PyQt5 |
Python Script - Command Line
In this tutorial, I will show you an example of Python script to plot IQ data from a file. This is just an example script intended as POC(Proof of Concept) example. You can revise in any way as you like if this does not suit your purpose.
Script - Barebone
Following is the source script without any comments. I didn't put any comments on purpose for simplicity.
import sys import numpy as np import matplotlib.pyplot as plt from matplotlib.widgets import Slider from scipy.signal import butter, filtfilt, convolve, ellip, cheby1 import struct from tqdm import tqdm import matplotlib matplotlib.use('Qt5Agg') import matplotlib.pyplot as plt import mplcursors import argparse
def read_iq_data(filename, start_pos, length=None): with open(filename, 'rb') as f: raw_data = f.read()
num_samples = len(raw_data) // 8 if length is not None: end_pos = min(start_pos + length, num_samples) else: end_pos = num_samples
data = np.frombuffer(raw_data, dtype=np.float32).view(np.complex64)[start_pos:end_pos] #data = np.frombuffer(raw_data, dtype=np.float32).view(np.complex64)
return data
updating_xlims = False updating_ylims = False
def plot_data(data, downsample_rate=1, vbw=1, filter_type='butter', filter_order=5, filter_cutoff=None): def butter_lowpass(cutoff, fs, order=5): nyq = 0.5 * fs normal_cutoff = cutoff / nyq b, a = butter(order, normal_cutoff, btype='low', analog=False) return b, a
def ellip_lowpass(cutoff, fs, order=5, rp=0.5, rs=40): nyq = 0.5 * fs normal_cutoff = cutoff / nyq b, a = ellip(order, rp, rs, normal_cutoff, btype='low', analog=False) return b, a
def cheby_lowpass(cutoff, fs, order=5, rp=0.5): nyq = 0.5 * fs normal_cutoff = cutoff / nyq b, a = cheby1(order, rp, normal_cutoff, btype='low', analog=False) return b, a
def lowpass_filter(data, cutoff, fs, order=5, filter_type='butter'): if filter_type == 'butter': b, a = butter_lowpass(cutoff, fs, order=order) elif filter_type == 'elliptic': b, a = ellip_lowpass(cutoff, fs, order=order) elif filter_type == 'cheby': b, a = cheby_lowpass(cutoff, fs, order=order) else: raise ValueError("Invalid filter type. Choose 'butter', 'elliptic', or 'cheby'.") y = filtfilt(b, a, data) return y
def moving_average(data, window_size): window = np.ones(window_size) / window_size return convolve(data, window, mode='same')
if downsample_rate > 1: cutoff_frequency = filter_cutoff #0.45 * (0.5 / downsample_rate) filtered_data = lowpass_filter(data, cutoff_frequency, 1, filter_order, filter_type) downsampled_data = filtered_data[::downsample_rate] else: downsampled_data = data
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1)
ax1.plot(downsampled_data.real) ax1.set_title('Real Part', fontsize=10) ax1.set_xticklabels([]) ax1.set_yticklabels([])
ax2.plot(downsampled_data.imag) ax2.set_title('Imaginary Part', fontsize=10) ax2.set_xticklabels([]) ax2.set_yticklabels([])
# Compute the magnitude of the downsampled data magnitude = np.absolute(downsampled_data)
ax3.plot(magnitude) ax3.set_title('Magnitude', fontsize=10) ax3.set_xticklabels([]) ax3.set_yticklabels([])
spectrum = np.fft.fft(downsampled_data) centered_spectrum = np.fft.fftshift(spectrum)
power_mw = np.abs(centered_spectrum)**2 centered_spectrum_dbm = 10 * np.log10(power_mw)
if vbw > 1: vbw_filtered_spectrum = moving_average(centered_spectrum_dbm, vbw) else: vbw_filtered_spectrum = centered_spectrum_dbm
vbw_filtered_spectrum_normalized = vbw_filtered_spectrum - np.max(vbw_filtered_spectrum)
ax4.plot(vbw_filtered_spectrum_normalized) ax4.set_title('Spectrum (FFT) with VBW', fontsize=10) ax4.set_xticklabels([]) ax4.set_yticklabels([])
plt.tight_layout() plt.subplots_adjust(top=0.95) plt.subplots_adjust(hspace = 0.3)
def on_xlims_change(event_axes): global updating_xlims if not updating_xlims: updating_xlims = True if event_axes == ax1: ax2.set_xlim(ax1.get_xlim()) ax3.set_xlim(ax1.get_xlim()) elif event_axes == ax2: ax1.set_xlim(ax2.get_xlim()) ax3.set_xlim(ax2.get_xlim()) elif event_axes == ax3: ax1.set_xlim(ax3.get_xlim()) ax2.set_xlim(ax3.get_xlim()) updating_xlims = False update_spectrum_plot()
def on_ylims_change(event_axes): global updating_ylims if not updating_ylims: updating_ylims = True if event_axes == ax1: ax2.set_ylim(ax1.get_ylim()) ax3.set_ylim(ax1.get_ylim()) elif event_axes == ax2: ax1.set_ylim(ax2.get_ylim()) ax3.set_ylim(ax2.get_ylim()) elif event_axes == ax3: ax1.set_ylim(ax3.get_ylim()) ax2.set_ylim(ax3.get_ylim()) updating_ylims = False update_spectrum_plot()
def update_spectrum_plot(): x1_min, x1_max = map(int, ax1.get_xlim()) y1_min, y1_max = map(int, ax1.get_ylim()) x2_min, x2_max = map(int, ax2.get_xlim()) y2_min, y2_max = map(int, ax2.get_ylim()) x3_min, x3_max = map(int, ax3.get_xlim()) # add this line for ax3 x limits y3_min, y3_max = map(int, ax3.get_ylim()) # add this line for ax3 y limits
x1_min = max(x1_min, 0) x2_min = max(x2_min, 0) x3_min = max(x3_min, 0) # make sure x3_min is non-negative
if x1_max - x1_min < 1: x1_max = x1_min + 1 if x2_max - x2_min < 1: x2_max = x2_min + 1 if x3_max - x3_min < 1: # ensure x3 range is at least 1 x3_max = x3_min + 1
visible_data_real = downsampled_data.real[x1_min:x1_max] visible_data_imag = downsampled_data.imag[x2_min:x2_max] visible_data_magnitude = np.abs(downsampled_data)[x3_min:x3_max] # visible magnitude data from ax3
visible_data = visible_data_real + 1j * visible_data_imag
spectrum_data = np.fft.fft(visible_data) centered_spectrum_data = np.fft.fftshift(spectrum_data)
power_mw = np.abs(centered_spectrum_data)**2 centered_spectrum_dbm = 10 * np.log10(power_mw)
if vbw > 1: vbw_filtered_spectrum = moving_average(centered_spectrum_dbm, vbw) else: vbw_filtered_spectrum = centered_spectrum_dbm
vbw_filtered_spectrum_normalized = vbw_filtered_spectrum - np.max(vbw_filtered_spectrum)
ax4.clear() ax4.plot(vbw_filtered_spectrum_normalized) ax4.set_title("Spectrum Data", fontsize=10) ax4.set_xlabel("Frequency", fontsize=10) ax4.set_ylabel("Amplitude", fontsize=10) ax4.set_xticklabels([]) ax4.set_yticklabels([])
fig.canvas.draw_idle()
ax1.callbacks.connect('xlim_changed', on_xlims_change) #ax1.callbacks.connect('ylim_changed', on_ylims_change) ax2.callbacks.connect('xlim_changed', on_xlims_change) #ax2.callbacks.connect('ylim_changed', on_ylims_change) ax3.callbacks.connect('xlim_changed', on_xlims_change) #ax3.callbacks.connect('ylim_changed', on_ylims_change)
plt.show()
def parse_arguments(): parser = argparse.ArgumentParser(description='Plot IQ data from a file.') parser.add_argument('datafile', type=str, help='Path to the IQ data file') parser.add_argument('--data-pos-unit', type=str, choices=['sample', 'percent'], default='sample', help='Unit for start-pos and length: sample or percent') parser.add_argument('--start-pos', type=float, default=0, help='Start position of the I/Q data pair to read') parser.add_argument('--length', type=float, default=None, help='Length of the I/Q data pair to read') parser.add_argument('--downsample-rate', type=int, default=1, help='Factor to downsample the data by') parser.add_argument('--vbw', type=int, default=1000, help='Video Bandwidth (VBW)') parser.add_argument('--filter-type', type=str, default='cheby', choices=['butter', 'elliptic', 'cheby'], help='Type of low-pass filter to use') parser.add_argument('--filter-order', type=int, default=5, help='Order of the low-pass filter') parser.add_argument('--cutoff-frequency', type=float, default=None, help='Cutoff frequency of the low-pass filter')
# Add a double dash to allow optional arguments to be in any order parser.add_argument('--', dest='double_dash', action='store_true', help=argparse.SUPPRESS)
args = parser.parse_args()
# Handle double dash by re-parsing the command line and re-assigning the arguments if args.double_dash: parser = argparse.ArgumentParser(description='Plot IQ data from a file.') parser.add_argument('datafile', type=str, help='Path to the IQ data file') parser.add_argument('--data-pos-unit', type=str, choices=['sample', 'percent'], default='sample', help='Unit for start-pos and length: sample or percent') parser.add_argument('--start-pos', type=float, default=0, help='Start position of the I/Q data pair to read') parser.add_argument('--length', type=float, default=None, help='Length of the I/Q data pair to read') parser.add_argument('--downsample-rate', type=int, default=1, help='Factor to downsample the data by') parser.add_argument('--vbw', type=int, default=1000, help='Video Bandwidth (VBW)') parser.add_argument('--filter-type', type=str, default='cheby', choices=['butter', 'elliptic', 'cheby'], help='Type of low-pass filter to use') parser.add_argument('--filter-order', type=int, default=5, help='Order of the low-pass filter') parser.add_argument('--cutoff-frequency', type=float, default=None, help='Cutoff frequency of the low-pass filter') args = parser.parse_args()
return args
def main(): args = parse_arguments()
if args.data_pos_unit == 'percent': with open(args.datafile, 'rb') as f: total_samples = len(f.read()) // 8 start_pos = int(args.start_pos / 100 * total_samples) if args.length is not None: length = int(args.length / 100 * total_samples) else: length = None else: start_pos = int(args.start_pos) length = int(args.length) if args.length is not None else None
data = read_iq_data(args.datafile, start_pos, length)
if args.cutoff_frequency is not None: filter_cutoff = args.cutoff_frequency else: filter_cutoff = 0.45 * (0.5 / args.downsample_rate)
plot_data(data, downsample_rate=args.downsample_rate, vbw=args.vbw, filter_type=args.filter_type, filter_order=args.filter_order, filter_cutoff=filter_cutoff)
if __name__ == '__main__': main()
|
Script - Comments
For the readers who is not familiar with python script and packages that are used in the code. I put the comments for each lines.
import sys import numpy as np import matplotlib.pyplot as plt from matplotlib.widgets import Slider from scipy.signal import butter, filtfilt, convolve, ellip, cheby1 import struct from tqdm import tqdm import matplotlib matplotlib.use('Qt5Agg') import matplotlib.pyplot as plt import mplcursors import argparse
def read_iq_data(filename, start_pos, length=None): with open(filename, 'rb') as f: raw_data = f.read()
num_samples = len(raw_data) // 8 if length is not None: end_pos = min(start_pos + length, num_samples) else: end_pos = num_samples
data = np.frombuffer(raw_data, dtype=np.float32).view(np.complex64)[start_pos:end_pos] #data = np.frombuffer(raw_data, dtype=np.float32).view(np.complex64)
return data
updating_xlims = False updating_ylims = False
def plot_data(data, downsample_rate=1, vbw=1, filter_type='butter', filter_order=5, filter_cutoff=None): def butter_lowpass(cutoff, fs, order=5): nyq = 0.5 * fs normal_cutoff = cutoff / nyq b, a = butter(order, normal_cutoff, btype='low', analog=False) return b, a
def ellip_lowpass(cutoff, fs, order=5, rp=0.5, rs=40): nyq = 0.5 * fs normal_cutoff = cutoff / nyq b, a = ellip(order, rp, rs, normal_cutoff, btype='low', analog=False) return b, a
def cheby_lowpass(cutoff, fs, order=5, rp=0.5): nyq = 0.5 * fs normal_cutoff = cutoff / nyq b, a = cheby1(order, rp, normal_cutoff, btype='low', analog=False) return b, a
def lowpass_filter(data, cutoff, fs, order=5, filter_type='butter'): if filter_type == 'butter': b, a = butter_lowpass(cutoff, fs, order=order) elif filter_type == 'elliptic': b, a = ellip_lowpass(cutoff, fs, order=order) elif filter_type == 'cheby': b, a = cheby_lowpass(cutoff, fs, order=order) else: raise ValueError("Invalid filter type. Choose 'butter', 'elliptic', or 'cheby'.") y = filtfilt(b, a, data) return y
def moving_average(data, window_size): window = np.ones(window_size) / window_size return convolve(data, window, mode='same')
if downsample_rate > 1: cutoff_frequency = filter_cutoff #0.45 * (0.5 / downsample_rate) filtered_data = lowpass_filter(data, cutoff_frequency, 1, filter_order, filter_type) downsampled_data = filtered_data[::downsample_rate] else: downsampled_data = data
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1)
ax1.plot(downsampled_data.real) ax1.set_title('Real Part', fontsize=10) ax1.set_xticklabels([]) ax1.set_yticklabels([])
ax2.plot(downsampled_data.imag) ax2.set_title('Imaginary Part', fontsize=10) ax2.set_xticklabels([]) ax2.set_yticklabels([])
# Compute the magnitude of the downsampled data magnitude = np.absolute(downsampled_data)
ax3.plot(magnitude) ax3.set_title('Magnitude', fontsize=10) ax3.set_xticklabels([]) ax3.set_yticklabels([])
spectrum = np.fft.fft(downsampled_data) centered_spectrum = np.fft.fftshift(spectrum)
power_mw = np.abs(centered_spectrum)**2 centered_spectrum_dbm = 10 * np.log10(power_mw)
if vbw > 1: vbw_filtered_spectrum = moving_average(centered_spectrum_dbm, vbw) else: vbw_filtered_spectrum = centered_spectrum_dbm
vbw_filtered_spectrum_normalized = vbw_filtered_spectrum - np.max(vbw_filtered_spectrum)
ax4.plot(vbw_filtered_spectrum_normalized) ax4.set_title('Spectrum (FFT) with VBW', fontsize=10) ax4.set_xticklabels([]) ax4.set_yticklabels([])
plt.tight_layout() plt.subplots_adjust(top=0.95) plt.subplots_adjust(hspace = 0.3)
def on_xlims_change(event_axes): global updating_xlims if not updating_xlims: updating_xlims = True if event_axes == ax1: ax2.set_xlim(ax1.get_xlim()) ax3.set_xlim(ax1.get_xlim()) elif event_axes == ax2: ax1.set_xlim(ax2.get_xlim()) ax3.set_xlim(ax2.get_xlim()) elif event_axes == ax3: ax1.set_xlim(ax3.get_xlim()) ax2.set_xlim(ax3.get_xlim()) updating_xlims = False update_spectrum_plot()
def on_ylims_change(event_axes): global updating_ylims if not updating_ylims: updating_ylims = True if event_axes == ax1: ax2.set_ylim(ax1.get_ylim()) ax3.set_ylim(ax1.get_ylim()) elif event_axes == ax2: ax1.set_ylim(ax2.get_ylim()) ax3.set_ylim(ax2.get_ylim()) elif event_axes == ax3: ax1.set_ylim(ax3.get_ylim()) ax2.set_ylim(ax3.get_ylim()) updating_ylims = False update_spectrum_plot()
def update_spectrum_plot(): x1_min, x1_max = map(int, ax1.get_xlim()) y1_min, y1_max = map(int, ax1.get_ylim()) x2_min, x2_max = map(int, ax2.get_xlim()) y2_min, y2_max = map(int, ax2.get_ylim()) x3_min, x3_max = map(int, ax3.get_xlim()) # add this line for ax3 x limits y3_min, y3_max = map(int, ax3.get_ylim()) # add this line for ax3 y limits
x1_min = max(x1_min, 0) x2_min = max(x2_min, 0) x3_min = max(x3_min, 0) # make sure x3_min is non-negative
if x1_max - x1_min < 1: x1_max = x1_min + 1 if x2_max - x2_min < 1: x2_max = x2_min + 1 if x3_max - x3_min < 1: # ensure x3 range is at least 1 x3_max = x3_min + 1
visible_data_real = downsampled_data.real[x1_min:x1_max] visible_data_imag = downsampled_data.imag[x2_min:x2_max] visible_data_magnitude = np.abs(downsampled_data)[x3_min:x3_max] # visible magnitude data from ax3
visible_data = visible_data_real + 1j * visible_data_imag
spectrum_data = np.fft.fft(visible_data) centered_spectrum_data = np.fft.fftshift(spectrum_data)
power_mw = np.abs(centered_spectrum_data)**2 centered_spectrum_dbm = 10 * np.log10(power_mw)
if vbw > 1: vbw_filtered_spectrum = moving_average(centered_spectrum_dbm, vbw) else: vbw_filtered_spectrum = centered_spectrum_dbm
vbw_filtered_spectrum_normalized = vbw_filtered_spectrum - np.max(vbw_filtered_spectrum)
ax4.clear() ax4.plot(vbw_filtered_spectrum_normalized) ax4.set_title("Spectrum Data", fontsize=10) ax4.set_xlabel("Frequency", fontsize=10) ax4.set_ylabel("Amplitude", fontsize=10) ax4.set_xticklabels([]) ax4.set_yticklabels([])
fig.canvas.draw_idle()
ax1.callbacks.connect('xlim_changed', on_xlims_change) #ax1.callbacks.connect('ylim_changed', on_ylims_change) ax2.callbacks.connect('xlim_changed', on_xlims_change) #ax2.callbacks.connect('ylim_changed', on_ylims_change) ax3.callbacks.connect('xlim_changed', on_xlims_change) #ax3.callbacks.connect('ylim_changed', on_ylims_change)
plt.show()
def parse_arguments(): parser = argparse.ArgumentParser(description='Plot IQ data from a file.') parser.add_argument('datafile', type=str, help='Path to the IQ data file') parser.add_argument('--data-pos-unit', type=str, choices=['sample', 'percent'], default='sample', help='Unit for start-pos and length: sample or percent') parser.add_argument('--start-pos', type=float, default=0, help='Start position of the I/Q data pair to read') parser.add_argument('--length', type=float, default=None, help='Length of the I/Q data pair to read') parser.add_argument('--downsample-rate', type=int, default=1, help='Factor to downsample the data by') parser.add_argument('--vbw', type=int, default=1000, help='Video Bandwidth (VBW)') parser.add_argument('--filter-type', type=str, default='cheby', choices=['butter', 'elliptic', 'cheby'], help='Type of low-pass filter to use') parser.add_argument('--filter-order', type=int, default=5, help='Order of the low-pass filter') parser.add_argument('--cutoff-frequency', type=float, default=None, help='Cutoff frequency of the low-pass filter')
# Add a double dash to allow optional arguments to be in any order parser.add_argument('--', dest='double_dash', action='store_true', help=argparse.SUPPRESS)
args = parser.parse_args()
# Handle double dash by re-parsing the command line and re-assigning the arguments if args.double_dash: parser = argparse.ArgumentParser(description='Plot IQ data from a file.') parser.add_argument('datafile', type=str, help='Path to the IQ data file') parser.add_argument('--data-pos-unit', type=str, choices=['sample', 'percent'], default='sample', help='Unit for start-pos and length: sample or percent') parser.add_argument('--start-pos', type=float, default=0, help='Start position of the I/Q data pair to read') parser.add_argument('--length', type=float, default=None, help='Length of the I/Q data pair to read') parser.add_argument('--downsample-rate', type=int, default=1, help='Factor to downsample the data by') parser.add_argument('--vbw', type=int, default=1000, help='Video Bandwidth (VBW)') parser.add_argument('--filter-type', type=str, default='cheby', choices=['butter', 'elliptic', 'cheby'], help='Type of low-pass filter to use') parser.add_argument('--filter-order', type=int, default=5, help='Order of the low-pass filter') parser.add_argument('--cutoff-frequency', type=float, default=None, help='Cutoff frequency of the low-pass filter') args = parser.parse_args()
return args
def main(): args = parse_arguments()
if args.data_pos_unit == 'percent': with open(args.datafile, 'rb') as f: total_samples = len(f.read()) // 8 start_pos = int(args.start_pos / 100 * total_samples) if args.length is not None: length = int(args.length / 100 * total_samples) else: length = None else: start_pos = int(args.start_pos) length = int(args.length) if args.length is not None else None
data = read_iq_data(args.datafile, start_pos, length)
if args.cutoff_frequency is not None: filter_cutoff = args.cutoff_frequency else: filter_cutoff = 0.45 * (0.5 / args.downsample_rate)
plot_data(data, downsample_rate=args.downsample_rate, vbw=args.vbw, filter_type=args.filter_type, filter_order=args.filter_order, filter_cutoff=filter_cutoff)
if __name__ == '__main__': main()
|
Script - Test
This is how I tested the script. I tested the scrip on a windows 11 PC in command prompt
Example 1 : Plotting the whole data as it is
In this example, I will show how to run the script to plot the whole data without any post processing and how to change the range of the plot with matplotlib GUI dialogbox.
$ python plot_iq.py bin\spectrum_bin\sdr_spectrum_nr_sa_dur_100.bin |
This will plot the whole data as shown below.
You can change the range of the plot by 'magnifiction' functionality of matplotlib dialogbox as shown below. First select 'Magnification' button and then select the range on the plot by mouse drag.
Example 2 : Plotting the data with video filter
In this example, I will show how to run the script to plot the whole data without any post processing and how to change the range of the plot with matplotlib GUI dialogbox.
$ python plot_iq.py bin\spectrum_bin\sdr_spectrum_nr_sa_dur_100.bin --vbw 10000 |
This will plot the whole data as shown below.
Example 3 : Plotting the data with partial data
In this example, I will show how to plot only part of the data (not the whole file). This is good for speeding up plot timing from a huge data file. Following example, plot only the 5% of the data starting from the 25 % in terms of position.
$ python plot_iq.py bin\spectrum_bin\sdr_spectrum_nr_sa_dur_100.bin --data-pos-unit percent --start-pos 25 --length 5 |
This will plot the whole data as shown below.
Example 4 : Plotting the data with undersampling and filtering
In this example, I will show how to apply undersampling (mainly for reduce the plot timing) and apply a low pass filter to compensate for the side effect of the undersampling. The final result is not as good as other method shown in previous examples.
$ python plot_iq.py bin\spectrum_bin\sdr_spectrum_nr_sa_dur_100.bin --downsample-rate 2 --filter-type butter --filter-order 5 --cutoff-frequency 0.1 |
This will plot the whole data as shown below.