Amarisoft

IQ Decode - Python

 

The purpose of this tutorial is to show how to decode signal from the IQ data. Of course, the purpose is not to provide examples for decoding everything. Decoding every signal and every channel involves the implementation of entire PHY stack which is definitely out of the scope of a tutorial. What I am trying to do in this tutorial is to provide a simple entry point from which you can expand and expand.

In this tutorial, I assume that you already have some I/Q data to work with. If you are not familiar with how to capture I/Q, check out following tutorial.

 

Table of Contents

 

 

 

 

Disclaimer and Special NOTEs !!!

NOTE :  The purpose of this tutorial is just to provide an example of Python script for processing a I/Q data. We don't provide any technical support for Python itself.

NOTE :  The script explained is not a part of Amarisoft product. It is just to show a proof of concept example showing how you can utilize Amarisoft SDR card and IQ capture functionaly and write your own post processing software. We DO NOT provide any technical support for this script itself.

NOTE :  This script would work with only for specific IQ Data with following requirement. (It means this script is NOT a general purpose decoder. It is writen for a very specific case just to show that a user can utilize Amarisoft I/Q capture data for the input to verify their own decoder algorithm)

NOTE :  If you want to try with the script, USE the script that is hyperlinked at the beginning of each test case, DO NOT COPY the code in the script description since it may be a little outdated.  You can get the test I/Q data as well at the beginning of each test.

 

Amarisoft as Study and Development tool for PHY

In this tutorial I wanted to propose a new use case of Amarisoft solution which is a study and development tool for physical layer. Probably in most case, the very beginning of PHY layer development would start from purely software based simulation solutions. At this stage, we usually simulate PHY processing (e.g, generation and decode of PHY signal) purely with software.

However, it eventually reach a point where you need to verify our solution with real signal or real hardware. For example, if you are UE PHY reciever developer, you would need real physical gNB transmitter signal for the test. But generating the proper gNB transmitter signal is not easy (especially when you don't have experties on eNB/gNB transmitter chain) and very tedious job to prepare such a signal just for testing. On top of it, the test signal should come from a well verified solution that has proved to be working. If the test signal is not well proved, the entire test result with your own development become unreliable. So in most case we do this (assuming that we want to develope UE side reciever PHY at baseband)

Based on my experience, I struggled with most of these steps. Some of the difficulties that I had were :

Why Amarisoft ?

 

 

Test Setup

You can use any kind of test setup that allow you to capture IQ in spectrum Analyzer mode

 

 

Python

Following is the version of the python and package that are used for this tutorial. (I tested this on Windows 11 Home Edition).  For python, first try with whatever version you are using as long as it is ver 3.x and upgrade it to latest version if it does not work.

 

 

Python Packages

For the script in this tutorial, you need to install following packages.

pip install numpy

pip install matplotlib

pip install scipy

 

 

Python Script - Command Line

In this tutorial,  I will show you an example of Python script to plot IQ data from a file. This is just an example script intended as POC(Proof of Concept) example. You can revise in any way as you like if this does not suit your purpose.

 

Test 1 : SISO

By running the script, you would get the test result shown in this section.  For the original Python file, click here.  For the sample IQ data that you may try with this code, you can download it from here (Unzip the file and change the file name and path in the source code).

 

Step 1 : Detect LTE PSS and Display (20 Mhz BW)

For this test, I ran a LTE cell with 20Mhz (enb.default.cfg) and capture the TX signal using another sdr card that is not being used for the LTE cell. In my case, I used the sdr card on UEsim, but you can use an SDR card on the callbox which is not being used for the cell. Following is the sdr_spectrum command to capture the IQ data for the this example.

./sdr_spectrum -c 0 -rx_freq 2680e6 -rate 30.72e6 -save-and-exit -duration 0.01 -save_path "/tmp"

If you want to capture the signal with better quality, you may try with the following command

./sdr_spectrum -c 0 -rx_freq 2680e6 -rate 30.72e6 -rx_gain -1 -save-and-exit -duration 0.01 -save_delay 5 -save_path "/tmp"

 

In this test, the main routine is to detect PSS from the given I/Q data. What is being done in DetectPSS() function can be illustrated as follows.

 

IQ Decode LTE PSS Ex01 FrequencyDomainDetection

 

Running the code, you will first get the text output as follows.

Detected PSS Sequence (N_ID_2): 0

Maximum correlation: 1.6852206708698385

Maximum correlation for PSS0: 1.6852206708698385

Maximum correlation for PSS1: 1.2416934348611477

Maximum correlation for PSS2: 1.1363440216733818

Position of maximum correlation: 269166

Estimated Frequency Offset: 127120.00185358223 Hz

Dimension of resourceGrid =  (14, 1199)

Detected Subframe :  0

Detected N_ID_2 (PSS) :  0

Detected N_ID_1 (SSS) :  84

Detected PCI:  252

 

After the detection of PSS, the result is displayed by the function ShowPSS() as follows.  This is the output of ShowPSS() function.

IQ Decode LTE PSS Ex01 01

(A) : This is frequency spectrum (plot in frequency) of the entire IQ data given to the program.

(B) : This shows I/Q data in 2D parameteric plot detected from the IQ data. The blue plot shows the I/Q data with magnitue normalized to 1. The plot in orange plot shows the detected PSS sequence with magnitude of all data mapped to 1 reserving angle as it is.

(C) : This shows the expected I/Q data (i.e, The sequence generated by 3GPP spec) corresponding to the detected PSS sequence.

(D) : This is time doman plot of the entire IQ data given to the program.  The orange part is the where the detected PSS is located. You can magnify a specific portion of the plot using toolbox at the top of the window.

(0)~(13) : Shows the frequency spectrum for each OFDM symbols within the subframe. (6) is the OFDM symbol where PSS is located.

 

After the PSS detection, we can plot the power of each I/Q data in LTE resource Grid as shown below. This is the output of ShowGrid()

IQ Decode LTE PSS Ex01 02

 

Step 2 : Detect LTE SSS and Display (20 Mhz BW)

The overall procedure implemented in this script to detect SSS is illustrated below.

IQ Decode LTE SSS Ex02 DetectionAlgorithm

 

After the PSS detection, it will detect SSS and visualize the detected SSS as below. This is the output of ShowSSS()

IQ Decode LTE SSS Ex01

 

Step 3 : Detect CRS, Estimate Channel Coefficient and Display (20 Mhz BW)

Once you detected PCI correctly, you can figure out the location of CRS resource elements within the resource grid. If you retrieve the I/Q value in the form of  complex number from the CRS resource elements, you will have the list of complex numbers for all the CRS within the subframe.

First run  crs_rx, crs_ex, h, crs_k = GetCRSandChannelCoef(resourceGrid, PCI, nRB = 100, method = "generate", subframe=0, debug=False) and run following functions for visualization. Followings are description of arguments and returns.

 

Followings are constellations and I/Q sequence plot for CRS (Cell Reference Signal) from the detected subframe. (In the constellation, the solid blue indicates the CRS at the lowest frequency and the solid red indcates the CRS at the highest frequency).  This is the output of ShowCRS().

IQ Decode LTE CRS Ex01

Column (A) :  The constellation of CRS retrieved from resourceGrid (these are the received CRS). Rows indicates symbol 0, 4, 7, 11

Column (B), (C) :  I and Q sequence of the CRS retrieved from resourceGrid. Rows indicates symbol 0, 4, 7, 11

Column(D) : The constellation of CRS generated by 3GPP algorithm. Rows indicates symbol 0, 4, 7, 11

Column(E) : The constellation of channel coefficient estimated from (A) and (D). Rows indicates symbol 0, 4, 7, 11

 

Step 4 : Creation of Channel Coefficient Grid

At this step, I will create a resource grid filled with channel coefficient at each and every resource elements. This is done in a sequence of multiple steps listed below.

First run following functions :

Then interpolate all non-CRS resource elements based on the h (channel coefficient) values of CRS by running following function. In this function, both frequency domain interpolation and time domain interpolation happens. Frequency domain interpolation is done first and then time-domain interpolation is done.

Followings are the descriptions on arguments and returns :

Then, run following function for visualization.  

Output of plotGridImage(hResourceGrid_interpolated). This shows the magnitude of complex numbers (channel coefficient) in every resource elements of the resource grid.

IQ Decode LTE H Grid Ex01

 

Output of  ShowResourceGridIQ(hResourceGrid_interpolated, figTitle="Interpolated [h] Grid I/Q", figTitleColor="green"). This shows the resource elements in every OFDM symbols of the hResourceGrid_interpolated in constellation.

IQ Decode LTE H Grid Ex02

 

Step 5 : Equalization

At this step, I will equalize the received signal using the channel coefficient resource grid (hResourceGrid_interpolated).

Before equalization, let's plot the constellation of the received signal as it is using ShowResourceGridIQ(resourceGrid, clip = 20, figTitle="Recieved Resource Grid I/Q : Before Equalization", figTitleColor="green")

IQ Decode LTE Equalization Ex01

 

Then run resourceGrid_corrected = ResourceGridDiv(resourceGrid, hResourceGrid_interpolated) and run following function for visualization. This performs the following operation for each and every resource elements of the resource grids.

Now, plot the result of the operation using ShowResourceGridIQ(resourceGrid_corrected, clip = 10, figTitle="Corrected Resource Grid I/Q : After Equalization", figTitleColor="green")

IQ Decode LTE Equalization Ex02

 

Test 2 : MIMO 2x2

This test is to show on how to decode 2x2 MIMO IQ data. In terms of fundamental algorithm, the log is same as SISO case. But the complexity of implementation, especially in terms of channel coefficient and equalization. In case of SISO, the channel coefficient is 1x1 (e.g, complex scalar), but in 2x2 MIMO they becomes 2x2 MIMO.

By running the script, you would get the test result shown in this section.  For the original Python file, click here.  For the sample IQ data that you may try with this code, you can download it from here (Unzip the file and change the file name and path in the source code).

 

Step 1 : Detect LTE PSS/SSS and Display (20 Mhz BW) for Antenna 0

The first step is to detect the PSS and the SSS for Antenna 0. In my implementation, I did this only for Antenna 0 and will use this information to process(i.e, construction of resource grid) Antenna 1 as well.

The script for this step is as follows.  Amarisoft Remote API command to capture the 2x2 MIMO captures IQ data and saves into two separate files. So I read each of the files and save it into separate variable iq_samples_p0 and iq_samples_p1.

IQ Decode LTE Test2 PSS 01

 

Step 2 : Construct the Resource Grid for Antenna 0 and Antenna 1

With the information obtained from previous step, construct the resource grid for Antenna 0 and Antenna 1 as shown below.  These two resource grid is drawn by following two functions.

Actually this function is an important one and it does many other things in addition to display the resource grid. High level functionality of AnalyzeGrid() function is as follows. (For more details, look into the source code).

IQ Decode LTE Test2 ResourceGrid 01

IQ Decode LTE Test2 ResourceGrid 02

 

Step 3 : Construct channel matrix

 I have to construct the channel matrix for this 2x2 MIMO case from the two resource grid that I obtained in previous step. In case of SISO, obtaining channel coefficient is relatively simple as shown here and here because the channel coefficient for each resource element is a scalar (complex valued scalar). However, for 2x2 MIMO we need to get an 2x2 matrix for channel coefficient for each resource element. The way the matrix is constructed is illustrated as below.

IQ Decode LTE Test2 Hmatrix 01

image source : sharetechnote

Once you extracted channel coefficient matrix(H matrix) for every CRS points, we can plot it for visualization. As you see, we get 4 resource grids : h11 grid, h21 grid, h12 grid, h22 grid respectively. The script to do this process is as follows :

The first step is to process the IQ data for Antenn0 and extract CRS for port 0 and port 1, and save them to a separate variables to be easily used for other process

 

Now plot the all 4 channel coefficient before interplation.

IQ Decode LTE Test2 Hmatrix 02

Then perform the interpolation for each resource grid. The algorithm for the interpolation is same as shown for SISO case. I just applied the same algorithm for each resource grid and get the result as shown below.

Now plot the all 4 channel coefficient after interpolation.

IQ Decode LTE Test2 Hmatrix 03

For a little bit different perspective, I plotted the 4 resource grid data into constellation as shown below. The horizontal 14 plots represents the 14 OFDM symbols of the grid and the vertial 4 plots indicates the h11, h21, h12, h22 respectively.

IQ Decode LTE Test2 Hmatrix 04

 

Step 4 : Equalization

Since we have all the necessary H matrix, we can equalize the received data with the H matrix.

Let's take a look at how the received data look like before equalization. The received data for Antenna 0 (labeled as rx_p0) and Antenna 1 (labeled as rx_p1) are look as shown below.   

The code to plot these constellation is as follows.

IQ Decode LTE Test2 Eq 01

IQ Decode LTE Test2 Eq 02

 

Now for the equalization process and processing Precoding / Large CDD, we need a little bit of preparation as below.  What we need for this is the interpolated H matrix and W, U, D matrix. So first let's do the preps for this.

Following six lines are basically changing the name of the variables. This is not mandatary, but I am renaming h11_interpolate to h11 and so on just to use the shorter / simple name.

Followings are the result of the equalization for Antenna 0 and Antenna 1. Now you see something looking like a constellation. One thing you may notice would be that this would not look like the constellation that you are expecting. I used the IQ data for 4 QAM Data, but the constellation does not look like QAM except OFDM symbol 0. The reason behind these strange looking constellation is precoding and large CDD which will be processed in following steps.

 

The code for equalization is as follows. Logic is simple. Multiply each RE data with the inverse of H matrix. That is what the two for loops are doing.

IQ Decode LTE Test2 Eq 03

IQ Decode LTE Test2 Eq 04

 

Step 5 : Undo Precoding

In case of LTE MIMO, the transmitted data gets precoded as summarized below. The original data gets applied with large CDD(U and D matrix) first and then gets applied with precoding matrix (W matrix). So at the reciever side (i.e, in decoding process) the process should be done in reverse order. That is, precoding should be undone first and then large CDD should be undone.

IQ Decode LTE Test2 Precoding 01

image source : sharetechnote

 

The result of precoding undoing is as follows. Undoing precoding is done simply by multiplying the inverse matrix of precoding matrix W.  You would not see much difference even after undoing the precoding just from the constellation. Actually there are a lot of position changes for each symbols (each dots), but the changed position also falls into one of the 9 clusters as before undoing the precoding, so you would not see big difference overall. If you plot all the symbols(dots) in different colors, you may notice some color distributions before and after this process.

In terms of programming logic, this is exactly same as we did in equalization process. Multiply each RE data with the the inverse of precoding matrix(W). That is what the two for loops are doing. It is what the two for loops are doing.

IQ Decode LTE Test2 Precoding 02

IQ Decode LTE Test2 Precoding 03

 

Step 6 : Undo Large CDD

Now we are at the last step which is undoing large CDD. As illustrated below, large CDD is made up of two steps which is multiplication of two matrix U and D. Since U is multiplied and then D is multiplied during large CDD, we have to do this in reverse order in decoding process. That is, undo D matrix first and then undo U matrix.

IQ Decode LTE Test2 LargeCDD 01

image source : sharetechnote

 

First take a look at the result of undoing D matrix as shown below. Undoing D matrix mean multiplying the inverse matrix of D to data set from previous step.

In terms of programming logic, this is exactly same as we did in previous step. Multiply each RE data with the the inverse of large CDD matrix(D). That is what the two for loops are doing. It is what the two for loops are doing.

IQ Decode LTE Test2 LargeCDD 02

IQ Decode LTE Test2 LargeCDD 03

 

As the last step, let's undo U matrix and take a look at the result as shown below. Undoing U matrix mean multiplying the inverse matrix of U to data set from previous step. Now we get the constellation that matches what we are expecting.

As we did in previous steps, just multiply each RE data with the the inverse of large CDD matrix(U). That is what the two for loops are doing. It is what the two for loops are doing.

IQ Decode LTE Test2 LargeCDD 04

IQ Decode LTE Test2 LargeCDD 05

 

Script - Barebone

Following is the source script without any comments. I didn't put any comments on purpose for simplicity.   For the original Python file, click here.  For the sample IQ data that you may try with this code, you can download it from here (Unzip the file and change the file name and path in the source code).

NOTE : The code shown here is early and simplified version of the implementation. As more functions are added, the script may get longer and more complicated. For simplicity and readability, I will keep the early implementation as it is and just update the source code Python file.

import numpy as np

import matplotlib.pyplot as plt

import struct

import matplotlib.gridspec as gridspec

from scipy.signal import butter, lfilter

 

def generate_pss(n_id_2):

 

    u_shift = [25, 29, 34] # Root Zadoff-Chu sequence numbers

 

    d_u = []

 

    for n in range(62):  

        u = u_shift[n_id_2]

 

        if n <= 30:

            d = np.exp(-1j * np.pi * u * n * (n + 1) / 63)  

        else:

            d = np.exp(-1j * np.pi * u * (n + 1) * (n + 2) / 63)  

        

        d_u.append(d)

 

    return np.array(d_u)

 

 

def frequency_offset_estimation(received_pss, expected_pss, sample_rate = 30.72e6):

 

    phase_difference = np.angle(np.dot(received_pss, np.conj(expected_pss)))

    time_duration_pss = 62 / sample_rate

    frequency_offset = (phase_difference / (2 * np.pi)) / time_duration_pss

 

    return frequency_offset

 

 

def correct_frequency_offset(signal, frequency_offset, sample_rate, debug = False):

 

    if debug :

        print("[Debug Info] Frequency Offset ==> ",frequency_offset)

        print("[Debug Info] sample_rate ==> ",sample_rate)

 

    signal = np.array(signal, dtype=np.complex128)

    time = len(signal) / sample_rate

    correction = np.exp(-1j * 2 * np.pi * frequency_offset * time)

    corrected_signal = signal * correction

 

    # just for debugging

    if debug :

        original_signal_magnitude = np.abs(signal)

        corrected_signal_magnitude = np.abs(corrected_signal)

        # Check if the original signal magnitude and corrected signal magnitude are the same

        print("comparison of mag : ", np.allclose(original_signal_magnitude, corrected_signal_magnitude))

        print("correction : ", correction)

 

    return corrected_signal

 

def generate_sss(NID1,NID2):

 

    q_prime = np.floor(NID1/30)

    q = np.floor(((NID1+q_prime*(q_prime+1)/2))/30)

    m_prime = NID1 + q *(q+1)/2

    m0 = m_prime % 31

    m1 = (m0 + np.floor(m_prime/31) + 1) % 31

 

    # generate d_even() sequence

    x_s = np.zeros(31)

    x_s[:5] = [0, 0, 0, 0, 1]

    for i in range(26):

        x_s[i+5] = (x_s[i+2] + x_s[i]) % 2

 

    x_c = np.zeros(31)

    x_c[:5] = [0, 0, 0, 0, 1]

    for i in range(26):

        x_c[i+5] = (x_c[i+3] + x_c[i]) % 2

 

    s_tilda = 1 - 2*x_s

    c_tilda = 1 - 2*x_c

    s0_m0_even = np.zeros(31)

    s1_m1_even = np.zeros(31)

    for n in range(31):

        s0_m0_even[n] = s_tilda[int((n+m0)%31)]

        s1_m1_even[n] = s_tilda[int((n+m1)%31)]

 

    c0_even = np.zeros(31)

    for n in range(31):

        c0_even[n] = c_tilda[int((n+NID2)%31)]

 

    d_even_sub0 = s0_m0_even * c0_even

    d_even_sub5 = s1_m1_even * c0_even

 

    # generate d_odd() sequence

    x_z = np.zeros(31)

    x_z[:5] = [0, 0, 0, 0, 1]

    for i in range(26):

        x_z[i+5] = (x_z[i+4] + x_z[i+2] + x_z[i+1] + x_z[i]) % 2

 

    z_tilda = 1 - 2*x_z

    s1_m1_odd = np.zeros(31)

    s0_m0_odd = np.zeros(31)

    for n in range(31):

        s1_m1_odd[n] = s_tilda[int((n+m1)%31)]

        s0_m0_odd[n] = s_tilda[int((n+m0)%31)]

 

    c1_odd = np.zeros(31)

    for n in range(31):

        c1_odd[n] = c_tilda[int((n+NID2+3)%31)]

 

    z1_m0_odd = np.zeros(31)

    z1_m1_odd = np.zeros(31)

    for n in range(31):

        z1_m0_odd[n] = z_tilda[int((n+m0%8)%31)]

        z1_m1_odd[n] = z_tilda[int((n+m1%8)%31)]

 

    d_odd_sub0 = s1_m1_odd * c1_odd * z1_m0_odd

    d_odd_sub5 = s0_m0_odd * c1_odd * z1_m1_odd

 

    # calculate d_sub0

    d_sub0 = np.zeros(62)

    d_sub0[::2] = d_even_sub0

    d_sub0[1::2] = d_odd_sub0

    sss0 = d_sub0

 

    d_sub5 = np.zeros(62)

    d_sub5[::2] = d_even_sub5

    d_sub5[1::2] = d_odd_sub5

    sss5 = d_sub5

 

    return sss0, sss5

 

def cross_correlate(a, b):

    return np.sum(a * np.conj(b))

 

 

def DetectPSS(iq_samples, fft_bin_size=2048):

 

    pss_seqs = [generate_pss(i) for i in range(3)]

 

    max_correlation = 0

    max_window_samples = None

    max_correlation_index = None

    max_correlation_position = None

 

    max_correlation_pss0 = max_correlation_pss1 = max_correlation_pss2 = 0

 

    for j in range(len(iq_samples) - fft_bin_size):

        iq_window = iq_samples[j : j + fft_bin_size]

        frequency_samples = np.fft.fft(iq_window, fft_bin_size)

        frequency_samples = np.fft.fftshift(frequency_samples)

 

        for k in range(3):

            window_start = (fft_bin_size - 62) // 2

            window_end = window_start + 62

            window_samples = frequency_samples[window_start:window_end]

 

            correlation = np.sum(window_samples * np.conj(pss_seqs[k]))

 

            if k == 0 and abs(correlation) > max_correlation_pss0:

                max_correlation_pss0 = abs(correlation)

            elif k == 1 and abs(correlation) > max_correlation_pss1:

                max_correlation_pss1 = abs(correlation)

            elif k == 2 and abs(correlation) > max_correlation_pss2:

                max_correlation_pss2 = abs(correlation)

 

            if abs(correlation) > max_correlation:

                max_correlation = abs(correlation)

                max_correlation_index = k

                max_correlation_position = j

                max_window_samples = window_samples

                pss_start = j

                pss_time_domain = np.abs(iq_window)

 

    iq_window = iq_samples[max_correlation_position : max_correlation_position + fft_bin_size]

    frequency_samples = np.fft.fft(iq_window, fft_bin_size)

    frequency_samples = np.fft.fftshift(frequency_samples)

    received_pss = frequency_samples[993:1055]

    expected_pss = pss_seqs[max_correlation_index]

    frequency_offset = frequency_offset_estimation(received_pss, expected_pss)

    

    cp_lengths = [160, 144, 144, 144, 144, 144, 144, 160, 144, 144, 144, 144, 144, 144]

    pss_slot_index = 6

 

    first_symbol_start = (pss_start - cp_lengths[pss_slot_index]) - 5 * (cp_lengths[pss_slot_index]+fft_bin_size) - (cp_lengths[0]+fft_bin_size)

 

    if pss_start >= first_symbol_start + 6 * fft_bin_size and pss_start + 8 * fft_bin_size <= len(iq_samples):

        ofdm_symbols = []

        start_index = first_symbol_start

        for i in range(14):

            start_index = start_index + cp_lengths[i]

            end_index = start_index + fft_bin_size

            ofdm_symbols.append(iq_samples[start_index : end_index])

            start_index = end_index

        ofdm_symbols = np.array(ofdm_symbols)

    else:

        print("Not enough samples to extract 14 OFDM symbols.")

 

 

    return {

        'max_correlation': max_correlation,

        'max_correlation_pss0': max_correlation_pss0,

        'max_correlation_pss1': max_correlation_pss1,

        'max_correlation_pss2': max_correlation_pss2,

        'max_correlation_index': max_correlation_index,

        'max_correlation_position': max_correlation_position,

        'frequency_offset': frequency_offset,

        'max_window_samples': max_window_samples,

        'ofdm_symbols': ofdm_symbols,

        'pss_start' : pss_start,

        'pss_time_domain' : pss_time_domain ,      

    }

 

 

def getREs(ofdm_symbols, symNo, scList):

 

    try:

        return [ofdm_symbols[symNo][sc] for sc in scList]

    except IndexError:

        print("Invalid symbol number or subcarrier index")

        return []

 

 

def ShowPSS(iq_samples, detected_pss_freq_domain, max_correlation_index, ofdm_symbols, pss_start, pss_time_domain, fft_bin_size = 2048):

 

    import matplotlib.pyplot as plt

    import numpy as np

 

    fig = plt.figure(figsize=(15, 9))

 

    ax1 = plt.subplot2grid((7, 3), (0, 0), rowspan=2, colspan=1)

    ax3 = plt.subplot2grid((7, 3), (2, 0), rowspan=2, colspan=1)

    ax4 = plt.subplot2grid((7, 3), (4, 0), rowspan=2, colspan=1)

    ax5 = plt.subplot2grid((7, 3), (6, 0), rowspan=1, colspan=1)

 

    fft_iq_samples = np.fft.fft(iq_samples)

    fft_iq_samples = np.fft.fftshift(fft_iq_samples)

    fft_iq_samples_db = 20 * np.log10(np.abs(fft_iq_samples))

    ax1.plot(fft_iq_samples_db)

    ax1.set_title('Frequency Spectrum of IQ samples (dB scale)', fontsize=8)

    ax1.grid(True)

    ax1.set_xlabel('Frequency', fontsize=8)

    ax1.set_ylabel('Amplitude (dB)', fontsize=8)

    ax1.tick_params(axis='both', labelsize=8)

 

    fft_detected_pss = np.fft.fft(detected_pss_freq_domain)

    fft_detected_pss_normalized = fft_detected_pss / np.abs(fft_detected_pss).max()

 

    ax3.scatter(fft_detected_pss_normalized.real, fft_detected_pss_normalized.imag, label='Detected PSS')

    normalized_data = fft_detected_pss / np.abs(fft_detected_pss)

    ax3.scatter(normalized_data.real, normalized_data.imag, label='Normalized PSS', alpha=0.5)

    ax3.legend()

    ax3.grid(True)

    ax3.set_title('IQ Constellation of Detected PSS and Normalized PSS', fontsize=8)

    ax3.set_xlabel('In-Phase (I)', fontsize=8)

    ax3.set_ylabel('Quadrature (Q)', fontsize=8)

    ax3.legend(fontsize=8)

    ax3.tick_params(axis='both', labelsize=8)

 

    generated_pss = generate_pss(max_correlation_index)

    ax4.scatter(generated_pss.real, generated_pss.imag)

    ax4.grid(True)

    ax4.set_title('IQ Constellation of Generated PSS for NID2='+str(max_correlation_index), fontsize=8)

    ax4.set_xlabel('In-Phase (I)', fontsize=8)

    ax4.set_ylabel('Quadrature (Q)', fontsize=8)

    ax4.tick_params(axis='both', labelsize=8)

 

    mag_iq_samples = np.abs(iq_samples)

    ax5.plot(mag_iq_samples)

    ax5.plot(range(pss_start, pss_start + len(pss_time_domain)), pss_time_domain, label='Detected PSS', linestyle='--')

    ax5.grid(True)

    ax5.set_title('IQ in time doman and Detected PSS', fontsize=8)

    ax5.set_xlabel('Time', fontsize=8)

    ax5.set_ylabel('Amplitude', fontsize=8)

    ax5.legend(fontsize=8)

    ax5.tick_params(axis='both', labelsize=8)

 

    axs = [plt.subplot2grid((7, 3), (i % 7, i // 7 + 1)) for i in range(14)]

 

    for i in range(14):

        ax = axs[i]

        symbol = ofdm_symbols[i]

        freq_domain_symbol = np.fft.fft(symbol, fft_bin_size)

        freq_domain_symbol = np.fft.fftshift(freq_domain_symbol)

        freq_domain_symbol = freq_domain_symbol / np.max(np.abs(freq_domain_symbol))

        freq_domain_symbol_db = 20 * np.log10(np.abs(freq_domain_symbol))

        ax.plot(freq_domain_symbol_db)

        ax.set_ylim([-50, 0])

 

    plt.tight_layout()

    plt.show()

 

 

 

def ShowGrid(ofdm_symbols, nRB = 100, zmin=-25, zmax=0):

 

    frequency_domain_symbols = np.fft.fft(ofdm_symbols, axis=1)

    frequency_domain_symbols = np.fft.fftshift(frequency_domain_symbols, axes=1)

 

    magnitude = np.abs(frequency_domain_symbols)

    magnitude = magnitude / np.max(magnitude, axis=1, keepdims=True)

 

    magnitude_db = 20 * np.log10(magnitude + np.finfo(float).eps)

 

    total_subcarriers = frequency_domain_symbols.shape[1]

 

    start_index = (total_subcarriers - nRB * 12) // 2

    end_index = start_index + nRB * 12

 

    magnitude_db = magnitude_db[:, start_index:end_index]

 

    fig, ax = plt.subplots(figsize=(14, 5))

    

    cax = ax.imshow(magnitude_db, aspect='auto', cmap='RdBu_r', interpolation='none', vmin=zmin, vmax=zmax)

 

    fig.colorbar(cax)

    

    ax.set_xlabel("Subcarriers")

    ax.set_ylabel("OFDM Symbols")

    ax.set_title("Resource Grid")

 

    symbol_labels = ['Symbol {}'.format(i) for i in range(ofdm_symbols.shape[0])]

    ax.set_yticks(np.arange(ofdm_symbols.shape[0]))

    ax.set_yticklabels(symbol_labels)

 

    ax.set_xticks(np.arange(0, nRB * 12 + 1, 12*5))

    ax.set_xticklabels(np.arange(0, nRB * 12 + 1, 12*5))

 

    plt.tight_layout()

    plt.show()

 

 

def getResourceGrid(ofdm_symbols, nRB = 100):

    frequency_domain_symbols = np.fft.fft(ofdm_symbols, axis=1)

    frequency_domain_symbols = np.fft.fftshift(frequency_domain_symbols, axes=1)

 

    total_subcarriers = frequency_domain_symbols.shape[1]

 

    start_index = (total_subcarriers - nRB * 12) // 2

    end_index = start_index + nRB * 12

 

    resource_grid = frequency_domain_symbols[:, start_index:end_index]

 

    return resource_grid

 

 

def DetectSSS(resourceGrid, N_ID_2, frequency_offset, nRB = 100, samplingRate = 30.72e6, debug = False):

    if debug :

        print("[Debug Info] Frequency Offset ==> ",frequency_offset)

        print("[Debug Info] N_ID_2 ==> ",N_ID_2)

 

    noOfRE = nRB * 12

    sss_sc_indices = list(range(noOfRE-noOfRE//2-31, noOfRE-noOfRE//2+31))

 

    sss_no_correction = getREs(resourceGrid, 5, sss_sc_indices)

    sss = correct_frequency_offset(sss_no_correction, frequency_offset, samplingRate)

 

    max_corr = -np.inf

    best_N_ID_1 = None

    for N_ID_1 in range(168):

        expected_sss0, expected_sss5 = generate_sss(N_ID_1, N_ID_2)

        corr0 = np.abs(cross_correlate(sss, expected_sss0))

        corr5 = np.abs(cross_correlate(sss, expected_sss5))

        total_corr = max(corr0, corr5)

        if debug :

            print("corr0 = ", corr0, ",corr5 = ", corr5, ", N_ID_1 = ", N_ID_1)

 

        if total_corr > max_corr:

            max_corr = total_corr

            best_N_ID_1 = N_ID_1

            if debug :

                print("corr0 = ", corr0, ", corr5 = ", corr5, ", max_corr = ", max_corr, ", N_ID_1 = ", N_ID_1)

 

    if debug :

        print('Detected SSS (N_ID_1): ', best_N_ID_1)

 

    detected_subframe = 0 if cross_correlate(sss, generate_sss(best_N_ID_1, N_ID_2)[0]) > cross_correlate(sss, generate_sss(best_N_ID_1, N_ID_2)[1]) else 5

 

    return {

        'sss_no_correction': sss_no_correction,

        'sss_corrected': sss,

        'N_ID_1': best_N_ID_1,

        'detected_subframe': detected_subframe,

    }

 

 

def getPCI(N_ID_1, N_ID_2) :

    return 3*N_ID_1 + N_ID_2

 

 

def ShowSSS(sss_no_correction, sss_corrected, N_ID_1, N_ID_2,detected_subframe):

 

    sss_generated = np.array(generate_sss(N_ID_1, N_ID_2))

    if detected_subframe == 0 :

        sss_generated = sss_generated[0]

    else :

        sss_generated = sss_generated[1]    

 

    sss_no_correction = np.array(sss_no_correction, dtype=np.complex128)

    sss_corrected = np.array(sss_corrected, dtype=np.complex128)

    

    sss_no_correction = sss_no_correction / np.mean(np.abs(sss_no_correction))

    sss_corrected = sss_corrected / np.mean(np.abs(sss_corrected))

 

    data_list = [sss_no_correction, sss_corrected, sss_generated ]

    titles = ['sss_no_correction', 'sss_corrected', 'sss from N_ID_1']

    fig, axs = plt.subplots(3, 3, figsize=(9, 9))

 

    for i, data in enumerate(data_list):

        data = np.array(data, dtype=np.complex128)

 

        axs[0, i].scatter(data.real, data.imag, color='b')  

        axs[0, i].set_title(f'{titles[i]}', fontsize=8)

        axs[0, i].set_xlabel('Real', fontsize=8)

        axs[0, i].set_ylabel('Imaginary', fontsize=8)

        axs[0, i].tick_params(axis='both', which='major', labelsize=8)

        axs[0, i].set_xlim(-2,2)

        axs[0, i].set_ylim(-2,2)

 

        axs[1, i].plot(data.real)

        axs[1, i].set_title(f'{titles[i]}', fontsize=8)

        axs[1, i].set_xlabel('Data Index', fontsize=8)

        axs[1, i].set_ylabel('Real', fontsize=8)

        axs[1, i].tick_params(axis='both', which='major', labelsize=8)

        axs[1, i].set_ylim(-2,2)

 

        axs[2, i].plot(data.imag)

        axs[2, i].set_title(f'{titles[i]}', fontsize=8)

        axs[2, i].set_xlabel('Data Index', fontsize=8)

        axs[2, i].set_ylabel('Imaginary', fontsize=8)

        axs[2, i].tick_params(axis='both', which='major', labelsize=8)

        axs[2, i].set_ylim(-2,2)

 

    plt.tight_layout()

    plt.show()

 

 

 

def main():

    # Load IQ data from binary file

    with open('IQ/lte_20Mhz_rate30.72Mhz_dur_10ms_pci252.bin', 'rb') as f:  # expected PSS Seq = 0       

        iq_samples = np.fromfile(f, dtype=np.complex64)

 

    resultsPSS = DetectPSS(iq_samples)

 

    print(f'Detected PSS Sequence (N_ID_2): {resultsPSS["max_correlation_index"]}')

    print(f'Maximum correlation: {resultsPSS["max_correlation"]}')

    print(f'Maximum correlation for PSS0: {resultsPSS["max_correlation_pss0"]}')

    print(f'Maximum correlation for PSS1: {resultsPSS["max_correlation_pss1"]}')

    print(f'Maximum correlation for PSS2: {resultsPSS["max_correlation_pss2"]}')

    print(f'Position of maximum correlation: {resultsPSS["max_correlation_position"]}')

    print(f'Estimated Frequency Offset: {resultsPSS["frequency_offset"]} Hz')

 

    max_correlation_index = resultsPSS["max_correlation_index"]

    pss_start = resultsPSS["pss_start"]

    pss_time_domain = resultsPSS["pss_time_domain"]

    ofdm_symbols = resultsPSS["ofdm_symbols"]

    detected_pss_freq_domain = resultsPSS["max_window_samples"]

    frequency_offset = resultsPSS["frequency_offset"]

 

    resourceGrid = getResourceGrid(ofdm_symbols, nRB = 100)

    print("Dimension of resourceGrid = ",resourceGrid.shape)

 

    N_ID_2 = max_correlation_index

    resultsSSS = DetectSSS(resourceGrid, N_ID_2, frequency_offset, nRB = 100, samplingRate = 30.72e6)

 

    N_ID_1 = resultsSSS["N_ID_1"]

    sss_no_correction = resultsSSS["sss_no_correction"]

    sss_corrected = resultsSSS["sss_corrected"]

    detected_subframe = resultsSSS["detected_subframe"]

 

    print('Detected Subframe : ', detected_subframe)

 

    print('Detected N_ID_2 (PSS) : ', N_ID_2)

    print('Detected N_ID_1 (SSS) : ', N_ID_1)

 

    PCI = getPCI(N_ID_1, N_ID_2)

    print('Detected PCI: ', PCI)

 

    ShowPSS(iq_samples, detected_pss_freq_domain, max_correlation_index, ofdm_symbols, pss_start, pss_time_domain)

    ShowGrid(ofdm_symbols, nRB = 100, zmin = -25, zmax = 0)

    ShowSSS(sss_no_correction, sss_corrected, N_ID_1, N_ID_2,detected_subframe)  

 

 

if __name__ == "__main__":

    main()   

 

 

 

Script - Comments

For the readers who is not familiar with python script and packages that are used in the code. I put the comments for each lines. For the original Python file, click here.  For the sample IQ data that you may try with this code, you can download it from here (Unzip the file and change the file name and path in the source code).

NOTE : The code shown here is early and simplified version of the implementation. As more functions are added, the script may get longer and more complicated. For simplicity and readability, I will keep the early implementation as it is and just update the source code Python file.

import numpy as np

import matplotlib.pyplot as plt

import struct

import matplotlib.gridspec as gridspec

from scipy.signal import butter, lfilter

 

 

# The function generate_pss(n_id_2) is designed to generate the Primary Synchronization Signal (PSS) for LTE systems. This generate 3 predefined PSS sequence specified by 3GPP.

# The three PSS signal will be used in the process of correlation check with the received signal

def generate_pss(n_id_2):

 

    u_shift = [25, 29, 34]

 

    d_u = []

 

    for n in range(62):  

        u = u_shift[n_id_2]

 

        if n <= 30:

            d = np.exp(-1j * np.pi * u * n * (n + 1) / 63)  

        else:

            d = np.exp(-1j * np.pi * u * (n + 1) * (n + 2) / 63)  

        

        d_u.append(d)

 

    return np.array(d_u)

 

 

# The function `frequency_offset_estimation` estimates the frequency offset in a received PSS.

# The frequency offset is the difference between the received signal's frequency and the transmitter's frequency.

 

# Parameters:

# `received_pss`: This is the Primary Synchronization Signal that was received.

# `expected_pss`: This is the expected or original Primary Synchronization Signal transmitted from the base station. This is used as a reference to estimate the frequency offset.

 

# The function assumes that both received_pss and expected_pss are numpy arrays of complex numbers representing the signal in the frequency domain.

 

# The function calculates the frequency offset by comparing the phase difference between the received_pss  and the expected_pss. This is done by taking the phase of the complex cross-correlation

# between the two signals.Then, this phase difference is used to estimate the frequency offset.

def frequency_offset_estimation(received_pss, expected_pss):

    phase_difference = np.angle(np.dot(received_pss, np.conj(expected_pss)))

    frequency_offset = phase_difference / (2 * np.pi * 62 / 30.72e6)  # Convert phase difference to frequency offset

    return frequency_offset

 

 

# The function `correct_frequency_offset` compensate the frequency offset for a given signal and returns the corrected signal.

 

# Parameters:

#  signal: The original signal to be corrected.

#   frequency_offset : The estimated frequency offset, in Hz, to be corrected.

#   sample_rate: The sample rate of the signal, in samples per second (Hz).

#   debug: If True, additional debug information is printed. Default is False.

 

def correct_frequency_offset(signal, frequency_offset, sample_rate, debug = False):

 

    if debug :

        print("[Debug Info] Frequency Offset ==> ",frequency_offset)

        print("[Debug Info] sample_rate ==> ",sample_rate)

 

    signal = np.array(signal, dtype=np.complex128)

    time = len(signal) / sample_rate

    correction = np.exp(-1j * 2 * np.pi * frequency_offset * time)

    corrected_signal = signal * correction

 

    # just for debugging

    if debug :

        original_signal_magnitude = np.abs(signal)

        corrected_signal_magnitude = np.abs(corrected_signal)

        # Check if the original signal magnitude and corrected signal magnitude are the same

        print("comparison of mag : ", np.allclose(original_signal_magnitude, corrected_signal_magnitude))

        print("correction : ", correction)

 

    return corrected_signal

 

 

# The function `generate_sss` generate the SSS signal for a given NID1 and NID2. This function returns two sequence. One for subframe 0 SSS and the other one for subframe 5 

# Parameters:

#   NID1 :  SSS Sequence Number.

#   NID2 : PSS Sequence Number.

 

def generate_sss(NID1,NID2):

 

    q_prime = np.floor(NID1/30)

    q = np.floor(((NID1+q_prime*(q_prime+1)/2))/30)

    m_prime = NID1 + q *(q+1)/2

    m0 = m_prime % 31

    m1 = (m0 + np.floor(m_prime/31) + 1) % 31

 

    # generate d_even() sequence

    x_s = np.zeros(31)

    x_s[:5] = [0, 0, 0, 0, 1]

    for i in range(26):

        x_s[i+5] = (x_s[i+2] + x_s[i]) % 2

 

    x_c = np.zeros(31)

    x_c[:5] = [0, 0, 0, 0, 1]

    for i in range(26):

        x_c[i+5] = (x_c[i+3] + x_c[i]) % 2

 

    s_tilda = 1 - 2*x_s

    c_tilda = 1 - 2*x_c

    s0_m0_even = np.zeros(31)

    s1_m1_even = np.zeros(31)

    for n in range(31):

        s0_m0_even[n] = s_tilda[int((n+m0)%31)]

        s1_m1_even[n] = s_tilda[int((n+m1)%31)]

 

    c0_even = np.zeros(31)

    for n in range(31):

        c0_even[n] = c_tilda[int((n+NID2)%31)]

 

    d_even_sub0 = s0_m0_even * c0_even

    d_even_sub5 = s1_m1_even * c0_even

 

    # generate d_odd() sequence

    x_z = np.zeros(31)

    x_z[:5] = [0, 0, 0, 0, 1]

    for i in range(26):

        x_z[i+5] = (x_z[i+4] + x_z[i+2] + x_z[i+1] + x_z[i]) % 2

 

    z_tilda = 1 - 2*x_z

    s1_m1_odd = np.zeros(31)

    s0_m0_odd = np.zeros(31)

    for n in range(31):

        s1_m1_odd[n] = s_tilda[int((n+m1)%31)]

        s0_m0_odd[n] = s_tilda[int((n+m0)%31)]

 

    c1_odd = np.zeros(31)

    for n in range(31):

        c1_odd[n] = c_tilda[int((n+NID2+3)%31)]

 

    z1_m0_odd = np.zeros(31)

    z1_m1_odd = np.zeros(31)

    for n in range(31):

        z1_m0_odd[n] = z_tilda[int((n+m0%8)%31)]

        z1_m1_odd[n] = z_tilda[int((n+m1%8)%31)]

 

    d_odd_sub0 = s1_m1_odd * c1_odd * z1_m0_odd

    d_odd_sub5 = s0_m0_odd * c1_odd * z1_m1_odd

 

    # calculate d_sub0

    d_sub0 = np.zeros(62)

    d_sub0[::2] = d_even_sub0

    d_sub0[1::2] = d_odd_sub0

    sss0 = d_sub0

 

   # calculate d_sub5

    d_sub5 = np.zeros(62)

    d_sub5[::2] = d_even_sub5

    d_sub5[1::2] = d_odd_sub5

    sss5 = d_sub5

 

    return sss0, sss5

 

 

# The function `cross_correlate` compute the cross correlation between the two sequence 

# Parameters:

#   a, b : sequence of complex numbers.

 

def cross_correlate(a, b):

    return np.sum(a * np.conj(b))

 

 

# The function `DetectPSS` is responsible for detecting the presence of the Primary Synchronization Signal (PSS) in a given set of IQ samples.

# Parameters:

#     `iq_samples`: These are the In-phase and Quadrature-phase samples of the received signal.

#      `fft_bin_size`: This is the size of the FFT bin used to transform the signal into the frequency domain. By default, the fft_bin_size is 2048, which is a common size in many LTE systems.

#                           However, it can be changed according to the system's specifications.

 

# The function works by first converting the time-domain signal (iq_samples) into the frequency domain using FFT.

# Then, it attempts to detect the PSS by cross-correlating the received frequency-domain signal with the expected PSS in the frequency domain.

# If the PSS is present, this cross-correlation should produce a strong peak in its output.

def DetectPSS(iq_samples, fft_bin_size=2048):

 

    pss_seqs = [generate_pss(i) for i in range(3)]

 

    max_correlation = 0

    max_window_samples = None

    max_correlation_index = None

    max_correlation_position = None

 

    max_correlation_pss0 = max_correlation_pss1 = max_correlation_pss2 = 0

 

    for j in range(len(iq_samples) - fft_bin_size):

        iq_window = iq_samples[j : j + fft_bin_size]

        frequency_samples = np.fft.fft(iq_window, fft_bin_size)

        frequency_samples = np.fft.fftshift(frequency_samples)

 

        for k in range(3):

            # Calculate the start and end of the window

            window_start = (fft_bin_size - 62) // 2

            window_end = window_start + 62

            window_samples = frequency_samples[window_start:window_end]

 

            correlation = np.sum(window_samples * np.conj(pss_seqs[k]))

 

            if k == 0 and abs(correlation) > max_correlation_pss0:

                max_correlation_pss0 = abs(correlation)

            elif k == 1 and abs(correlation) > max_correlation_pss1:

                max_correlation_pss1 = abs(correlation)

            elif k == 2 and abs(correlation) > max_correlation_pss2:

                max_correlation_pss2 = abs(correlation)

 

            if abs(correlation) > max_correlation:

                max_correlation = abs(correlation)

                max_correlation_index = k

                max_correlation_position = j

                max_window_samples = window_samples

                pss_start = j

                pss_time_domain = np.abs(iq_window)

 

    # Calculate Frequency Offset

    iq_window = iq_samples[max_correlation_position : max_correlation_position + fft_bin_size]

    frequency_samples = np.fft.fft(iq_window, fft_bin_size)

    frequency_samples = np.fft.fftshift(frequency_samples)

    received_pss = frequency_samples[993:1055]

    expected_pss = pss_seqs[max_correlation_index]

    frequency_offset = frequency_offset_estimation(received_pss, expected_pss)

    

    # Specify the cyclic prefix lengths for each OFDM symbol in a slot. This is CP length for 20Mhz BW. CP removal for other BW will be implemented later release

    cp_lengths = [160, 144, 144, 144, 144, 144, 144, 160, 144, 144, 144, 144, 144, 144]

    pss_slot_index = 6

 

    # Calculate the start position of the first OFDM symbol by adding up the CP lengths

    first_symbol_start = (pss_start - cp_lengths[pss_slot_index]) - 5 * (cp_lengths[pss_slot_index]+fft_bin_size) - (cp_lengths[0]+fft_bin_size)

 

    # Check if there are enough samples before and after PSS

    if pss_start >= first_symbol_start + 6 * fft_bin_size and pss_start + 8 * fft_bin_size <= len(iq_samples):

        # Extract 14 OFDM symbols where PSS is located, taking into account the CP

        ofdm_symbols = []

        start_index = first_symbol_start

        for i in range(14):

            start_index = start_index + cp_lengths[i]

            end_index = start_index + fft_bin_size

            ofdm_symbols.append(iq_samples[start_index : end_index])

            start_index = end_index

        ofdm_symbols = np.array(ofdm_symbols)

    else:

        print("Not enough samples to extract 14 OFDM symbols.")

 

 

# This function returns various values in the form of a dictionary.

    return {

        'max_correlation': max_correlation,   # max_correlation value detected from the process

        'max_correlation_pss0': max_correlation_pss0,   # max_correlation value for pss0 detected from the process (this is just for the reference)

        'max_correlation_pss1': max_correlation_pss1,  # max_correlation value for pss1 detected from the process  (this is just for the reference)

        'max_correlation_pss2': max_correlation_pss2,  # max_correlation value for pss2 detected from the process  (this is just for the reference)

        'max_correlation_index': max_correlation_index,# the pss index (i.e, pss sequence number that gives max_correlation). this is the detected PSS sequence

       'max_correlation_position': max_correlation_position,# this is position of the sliding window that gives the max correlation. It is the start of the sliding window on the received I/Q data

        'frequency_offset': frequency_offset,

        'max_window_samples': max_window_samples,# this is the frequency domain I/Q of the detected PSS

        'ofdm_symbols': ofdm_symbols,# 2D array of one radio frame OFDM symbols (14 symbols). Frequency domain I/Q data after CP removal

        'pss_start' : pss_start,# this is same as max_correlation_position

        'pss_time_domain' : pss_time_domain ,  # this is the sliding window I/Q data that carries the detected PSS    

    }

 

# The function `getREs` extracts the value from the given resource grid pointed by OFDM symbol number and the list of subcarriers.

# Parameters:

#   `resourceGrid`: OFDM resource Grid.

#   `symNo`: OFDM symbol number within the resource grid. This starts from 0.

#   'scList' : the list of subcarrier index (the index starting from 0) .

def getREs(resourceGrid, symNo, scList):

 

    try:

        # Retrieve the REs corresponding to the given symbol number and subcarrier indices

        return [ofdm_symbols[symNo][sc] for sc in scList]

    except IndexError:

        print("Invalid symbol number or subcarrier index")

        return []

 

 

# The function is used to visualize the detection of the Primary Synchronization Signal (PSS) in a given set of IQ samples. The function displays several plots to visualize the PSS detection process

 

# Parameters:

#     `iq_samples`: These are the In-phase and Quadrature-phase samples of the received signal.

#      `detected_pss_freq_domain`: This is the detected PSS in the frequency domain, obtained from the `DetectPSS` function.

#      `max_correlation_index`: This is the index in the IQ samples where the correlation between the expected and received PSS is the maximum.

#                                            This indicates the location of the PSS in the received signal.

#      `ofdm_symbols`: These are the OFDM symbols extracted from the received signal.

#      `pss_start`: This is the starting index of the PSS in the IQ samples, obtained from the `DetectPSS` function.

#      `pss_time_domain`: this is the time-domain IQ sequence in which the PSS is located.

#      `fft_bin_size`: This is the size of the FFT bin used to transform the signal into the frequency domain. this is same as the length of the sliding window searching for PSS.

#      By default, the fft_bin_size is 2048, which is a common size in many LTE systems. However, it can be changed according to the system's specifications.

def ShowPSS(iq_samples, detected_pss_freq_domain, max_correlation_index, ofdm_symbols, pss_start, pss_time_domain, fft_bin_size = 2048):

 

    # Create a single figure

    fig = plt.figure(figsize=(15, 9))

 

    # Create the left side subplots

    ax1 = plt.subplot2grid((7, 3), (0, 0), rowspan=2, colspan=1)

    ax3 = plt.subplot2grid((7, 3), (2, 0), rowspan=2, colspan=1)

    ax4 = plt.subplot2grid((7, 3), (4, 0), rowspan=2, colspan=1)

    ax5 = plt.subplot2grid((7, 3), (6, 0), rowspan=1, colspan=1)

 

    # Frequency Spectrum of IQ samples

    fft_iq_samples = np.fft.fft(iq_samples)

    fft_iq_samples = np.fft.fftshift(fft_iq_samples)

    fft_iq_samples_db = 20 * np.log10(np.abs(fft_iq_samples))

    ax1.plot(fft_iq_samples_db)

    ax1.set_title('Frequency Spectrum of IQ samples (dB scale)', fontsize=8)

    ax1.grid(True)

    ax1.set_xlabel('Frequency', fontsize=8)

    ax1.set_ylabel('Amplitude (dB)', fontsize=8)

    ax1.tick_params(axis='both', labelsize=8)

 

    # Frequency Spectrum of Detected PSS

    fft_detected_pss = np.fft.fft(detected_pss_freq_domain)

    # Normalize by dividing all elements by the maximum magnitude

    fft_detected_pss_normalized = fft_detected_pss / np.abs(fft_detected_pss).max()

 

    # Plot the Detected PSS

    ax3.scatter(fft_detected_pss_normalized.real, fft_detected_pss_normalized.imag, label='Detected PSS')

    # Normalized PSS data

    normalized_data = fft_detected_pss / np.abs(fft_detected_pss)

    # Plot the Normalized data

    ax3.scatter(normalized_data.real, normalized_data.imag, label='Normalized PSS', alpha=0.5)

    ax3.legend()

    ax3.grid(True)

    ax3.set_title('IQ Constellation of Detected PSS and Normalized PSS', fontsize=8)

    ax3.set_xlabel('In-Phase (I)', fontsize=8)

    ax3.set_ylabel('Quadrature (Q)', fontsize=8)

    ax3.legend(fontsize=8)

    ax3.tick_params(axis='both', labelsize=8)

 

    # Generate the PSS for the detected PCI

    generated_pss = generate_pss(max_correlation_index)

    # IQ plot of the generated PSS

    ax4.scatter(generated_pss.real, generated_pss.imag)

    ax4.grid(True)

    ax4.set_title('IQ Constellation of Generated PSS for NID2='+str(max_correlation_index), fontsize=8)

    ax4.set_xlabel('In-Phase (I)', fontsize=8)

    ax4.set_ylabel('Quadrature (Q)', fontsize=8)

    ax4.tick_params(axis='both', labelsize=8)

 

    # Frequency Spectrum of IQ samples

    mag_iq_samples = np.abs(iq_samples)

    ax5.plot(mag_iq_samples)

    ax5.plot(range(pss_start, pss_start + len(pss_time_domain)), pss_time_domain, label='Detected PSS', linestyle='--')

    ax5.grid(True)

    ax5.set_title('IQ in time doman and Detected PSS', fontsize=8)

    ax5.set_xlabel('Time', fontsize=8)

    ax5.set_ylabel('Amplitude', fontsize=8)

    ax5.legend(fontsize=8)

    ax5.tick_params(axis='both', labelsize=8)

 

    # Create the right side subplots

    axs = [plt.subplot2grid((7, 3), (i % 7, i // 7 + 1)) for i in range(14)]

 

    # Code to generate the plots for axs

    for i in range(14):

        ax = axs[i]

        symbol = ofdm_symbols[i]

        freq_domain_symbol = np.fft.fft(symbol, fft_bin_size)

        freq_domain_symbol = np.fft.fftshift(freq_domain_symbol)

        freq_domain_symbol = freq_domain_symbol / np.max(np.abs(freq_domain_symbol))

        freq_domain_symbol_db = 20 * np.log10(np.abs(freq_domain_symbol))

        ax.plot(freq_domain_symbol_db)

        ax.set_ylim([-50, 0])

 

    plt.tight_layout()

    plt.show()

 

 

# The function is used to visualize the OFDM grid, representing the Resource Blocks (RBs) of an LTE signal in Resource Grid. This grid is particularly useful for visualizing  the allocation of resources

#  in the frequency-time space in a LTE system.

# The function generates a heatmap style plot of the OFDM grid. The x-axis represents the time in terms of  the OFDM symbol number, and the y-axis represents the frequency in terms of

#  the sub-carrier number. The color of each cell in the grid corresponds to the amplitude of the corresponding sub-carrier at the corresponding time, with warmer colors indicating higher amplitudes.

 

# Parameters:

#     `ofdm_symbols`: These are OFDM symbols extracted from the received signal. These are 2D array comprised of OFDM symbols after removing CP (Cyclic Prefix).

#      `nRB`: This parameter represents the number of Resource Blocks in the LTE signal.  The default value is 100,  which corresponds to a 20 MHz bandwidth in LTE, but it can be adjusted

#                according to the system's specifications.

 

#      `zmin`, `zmax`: These parameters define the color range for the plot. Any amplitude that falls within  this range is represented with a color from the colormap.

#                            `zmin` is the minimum dB value and `zmax`  is the maximum dB value for the color range. By default, `zmin` is -25 dB and `zmax` is 0 dB.

 

def ShowGrid(ofdm_symbols, nRB = 100, zmin=-25, zmax=0):

 

    # Calculate the magnitude of the OFDM symbols

    frequency_domain_symbols = np.fft.fft(ofdm_symbols, axis=1)

    frequency_domain_symbols = np.fft.fftshift(frequency_domain_symbols, axes=1)

 

    # Compute the magnitude of the frequency domain symbols

    magnitude = np.abs(frequency_domain_symbols)

    magnitude = magnitude / np.max(magnitude, axis=1, keepdims=True)

 

    # Convert to decibels and normalize

    magnitude_db = 20 * np.log10(magnitude + np.finfo(float).eps)

 

    # Number of subcarriers

    total_subcarriers = frequency_domain_symbols.shape[1]

 

    # Calculate the start and end index for the desired number of RBs

    start_index = (total_subcarriers - nRB * 12) // 2

    end_index = start_index + nRB * 12

 

    # Slice the magnitude_db array to include only the desired RBs

    magnitude_db = magnitude_db[:, start_index:end_index]

 

    # Create a figure and an axes object

    fig, ax = plt.subplots(figsize=(14, 5))

    

    # Use the imshow function to create the 2D graph

    cax = ax.imshow(magnitude_db, aspect='auto', cmap='RdBu_r', interpolation='none', vmin=zmin, vmax=zmax)

 

    # Create a color bar to show the mapping between values and colors

    fig.colorbar(cax)

    

    # Add labels and title

    ax.set_xlabel("Subcarriers")

    ax.set_ylabel("OFDM Symbols")

    ax.set_title("Resource Grid")

 

    # Set y-axis labels for every symbol

    symbol_labels = ['Symbol {}'.format(i) for i in range(ofdm_symbols.shape[0])]

    ax.set_yticks(np.arange(ofdm_symbols.shape[0]))

    ax.set_yticklabels(symbol_labels)

 

    # Adjust x-axis ticks and labels to start from 0 and go up to nRB*12 with step size of 12

    ax.set_xticks(np.arange(0, nRB * 12 + 1, 12*5))

    ax.set_xticklabels(np.arange(0, nRB * 12 + 1, 12*5))

 

    # Display the graph

    plt.tight_layout()

    plt.show()

 

 

# The function constructs 2D resource grid (ofdm symbol index vs subcarrier) from the 2D time domain array (ofdm symbol index vs ofdm symbol in time domain)

 

# Parameters:

#     `ofdm_symbols`: These are OFDM symbols extracted from the received signal. These are 2D array comprised of OFDM symbols after removing CP (Cyclic Prefix).

#      `nRB`: This parameter represents the number of Resource Blocks in the LTE signal.  The default value is 100,  which corresponds to a 20 MHz bandwidth in LTE, but it can be adjusted

 

def getResourceGrid(ofdm_symbols, nRB = 100):

    # Calculate the magnitude of the OFDM symbols

    frequency_domain_symbols = np.fft.fft(ofdm_symbols, axis=1)

    frequency_domain_symbols = np.fft.fftshift(frequency_domain_symbols, axes=1)

 

    # Number of subcarriers

    total_subcarriers = frequency_domain_symbols.shape[1]

 

    # Calculate the start and end index for the desired number of RBs

    start_index = (total_subcarriers - nRB * 12) // 2

    end_index = start_index + nRB * 12

 

    # Slice the frequency_domain_symbols array to include only the desired RBs

    resource_grid = frequency_domain_symbols[:, start_index:end_index]

 

    return resource_grid

 

 

 

# The function detects SSS embedded in a given resourceGrid

 

# Parameters:

#    resourceGrid : The resource grid containing the received signal.

#    N_ID_2: The second cell identity index.

#    frequency_offset: The estimated frequency offset to be corrected.

#    nRB : The number of Resource Blocks. Default is 100.

#    samplingRate: The sampling rate of the signal, in samples per second (Hz). Default is 30.72e6.

#    debug (bool, optional): If True, additional debug information is printed. Default is False.

 

# This function returns various values in the form of dictionary as follows :

#    'sss_no_correction': The extracted SSS without frequency offset correction.

#    'sss_corrected': The extracted SSS with frequency offset correction.

#    'N_ID_1': The detected first cell identity index. (The detected SSS Sequence Number)

#    'detected_subframe': The detected subframe number (0 or 5).

 

def DetectSSS(resourceGrid, N_ID_2, frequency_offset, nRB = 100, samplingRate = 30.72e6, debug = False):

    if debug :

        print("[Debug Info] Frequency Offset ==> ",frequency_offset)

        print("[Debug Info] N_ID_2 ==> ",N_ID_2)

 

    # Generate the list of subcarrier indices for SSS

    noOfRE = nRB * 12

    sss_sc_indices = list(range(noOfRE-noOfRE//2-31, noOfRE-noOfRE//2+31))

 

    # Extract SSS for the first and second slots

    sss_no_correction = getREs(resourceGrid, 5, sss_sc_indices)

 

    # Apply frequency offset correction to the extracted SSS

    sss = correct_frequency_offset(sss_no_correction, frequency_offset, samplingRate)

 

    # Calculate correlations for all possible N_ID_1

    max_corr = -np.inf

    best_N_ID_1 = None

    for N_ID_1 in range(168):

        # Generate expected SSS

        expected_sss0, expected_sss5 = generate_sss(N_ID_1, N_ID_2)

        # Calculate correlation

        corr0 = np.abs(cross_correlate(sss, expected_sss0))

        corr5 = np.abs(cross_correlate(sss, expected_sss5))

        total_corr = max(corr0, corr5)

        if debug :

            print("corr0 = ", corr0, ",corr5 = ", corr5, ", N_ID_1 = ", N_ID_1)

 

        # If this correlation is the highest we've seen, save it and the current N_ID_1

        if total_corr > max_corr:

            max_corr = total_corr

            best_N_ID_1 = N_ID_1

            if debug :

                print("corr0 = ", corr0, ", corr5 = ", corr5, ", max_corr = ", max_corr, ", N_ID_1 = ", N_ID_1)

 

    if debug :

        print('Detected SSS (N_ID_1): ', best_N_ID_1)

 

    # Detect the subframe (0 or 5) based on the correlation results

    detected_subframe = 0 if cross_correlate(sss, generate_sss(best_N_ID_1, N_ID_2)[0]) > cross_correlate(sss, generate_sss(best_N_ID_1, N_ID_2)[1]) else 5

 

    return {

        'sss_no_correction': sss_no_correction,

        'sss_corrected': sss,

        'N_ID_1': best_N_ID_1,

        'detected_subframe': detected_subframe,

    }

 

 

# The function calculate PCI from N_ID_1 (SSS sequence number) and N_ID_2 (PSS Sequence Number)

def getPCI(N_ID_1, N_ID_2) :

    return 3*N_ID_1 + N_ID_2

 

 

# This function is to visualize the Secondary Synchronization Signal (SSS) before and after frequency offset correction, and the generated SSS for a given N_ID_1 and N_ID_2.

 

# Parameters:

#         sss_no_correction : The SSS extracted from the resource grid before frequency offset correction.

#         sss_corrected : The SSS after frequency offset correction.

#         N_ID_1:SSS Sequence Number.

#         N_ID_2: PSS Sequence Number .

#         detected_subframe : The detected subframe number (0 or 5).

 

def ShowSSS(sss_no_correction, sss_corrected, N_ID_1, N_ID_2,detected_subframe):

 

    sss_generated = np.array(generate_sss(N_ID_1, N_ID_2))

    if detected_subframe == 0 :

        sss_generated = sss_generated[0]

    else :

        sss_generated = sss_generated[1]    

 

    sss_no_correction = np.array(sss_no_correction, dtype=np.complex128)

    sss_corrected = np.array(sss_corrected, dtype=np.complex128)

    

    # Normalize sss_no_correction and sss_corrected

    sss_no_correction = sss_no_correction / np.mean(np.abs(sss_no_correction))

    sss_corrected = sss_corrected / np.mean(np.abs(sss_corrected))

 

    data_list = [sss_no_correction, sss_corrected, sss_generated ]

    #print("[Debug Info] ==> " , data_list)

    titles = ['sss_no_correction', 'sss_corrected', 'sss from N_ID_1']

    fig, axs = plt.subplots(3, 3, figsize=(9, 9))

 

    for i, data in enumerate(data_list):

        # Ensure data is a numpy array with complex numbers

        data = np.array(data, dtype=np.complex128)

 

        axs[0, i].scatter(data.real, data.imag, color='b')  

        axs[0, i].set_title(f'{titles[i]}', fontsize=8)

        axs[0, i].set_xlabel('Real', fontsize=8)

        axs[0, i].set_ylabel('Imaginary', fontsize=8)

        axs[0, i].tick_params(axis='both', which='major', labelsize=8)

        axs[0, i].set_xlim(-2,2)

        axs[0, i].set_ylim(-2,2)

 

        axs[1, i].plot(data.real)

        axs[1, i].set_title(f'{titles[i]}', fontsize=8)

        axs[1, i].set_xlabel('Data Index', fontsize=8)

        axs[1, i].set_ylabel('Real', fontsize=8)

        axs[1, i].tick_params(axis='both', which='major', labelsize=8)

        axs[1, i].set_ylim(-2,2)

 

        axs[2, i].plot(data.imag)

        axs[2, i].set_title(f'{titles[i]}', fontsize=8)

        axs[2, i].set_xlabel('Data Index', fontsize=8)

        axs[2, i].set_ylabel('Imaginary', fontsize=8)

        axs[2, i].tick_params(axis='both', which='major', labelsize=8)

        axs[2, i].set_ylim(-2,2)

 

    plt.tight_layout()

    plt.show()

 

 

 

# Test Code 

 

def main():

    # Load IQ data from binary file 

    with open('IQ/lte_20Mhz_rate30.72Mhz_dur_10ms_pci252.bin', 'rb') as f:  # expected PSS Seq = 0s        

        iq_samples = np.fromfile(f, dtype=np.complex64)

 

    # Detect PSS 

    results = DetectPSS(iq_samples)

 

   # Print various outputs returned by DetectPSS function 

    print(f'Detected PSS Sequence (N_ID_2): {results["max_correlation_index"]}')

    print(f'Maximum correlation: {results["max_correlation"]}')

    print(f'Maximum correlation for PSS0: {results["max_correlation_pss0"]}')

    print(f'Maximum correlation for PSS1: {results["max_correlation_pss1"]}')

    print(f'Maximum correlation for PSS2: {results["max_correlation_pss2"]}')

    print(f'Position of maximum correlation: {results["max_correlation_position"]}')

    print(f'Estimated Frequency Offset: {results["frequency_offset"]} Hz')

 

   # Extract the values from DetectPSS return that are necessary for Visualization 

    max_correlation_index = results["max_correlation_index"]

    pss_start = results["pss_start"]

    pss_time_domain = results["pss_time_domain"]

    ofdm_symbols = results["ofdm_symbols"]

    detected_pss_freq_domain = results["max_window_samples"]

 

    resourceGrid = getResourceGrid(ofdm_symbols, nRB = 100)

    print("Dimension of resourceGrid = ",resourceGrid.shape)

 

    # Detect SSS

    N_ID_2 = max_correlation_index

    resultsSSS = DetectSSS(resourceGrid, N_ID_2, frequency_offset, nRB = 100, samplingRate = 30.72e6)

 

    N_ID_1 = resultsSSS["N_ID_1"]

    sss_no_correction = resultsSSS["sss_no_correction"]

    sss_corrected = resultsSSS["sss_corrected"]

    detected_subframe = resultsSSS["detected_subframe"]

 

    print('Detected Subframe : ', detected_subframe)

 

    print('Detected N_ID_2 (PSS) : ', N_ID_2)

    print('Detected N_ID_1 (SSS) : ', N_ID_1)

 

    PCI = getPCI(N_ID_1, N_ID_2)

    print('Detected PCI: ', PCI)

 

    ShowPSS(iq_samples, detected_pss_freq_domain, max_correlation_index, ofdm_symbols, pss_start, pss_time_domain)

    ShowGrid(ofdm_symbols, nRB = 100, zmin = -25, zmax = 0)

    ShowSSS(sss_no_correction, sss_corrected, N_ID_1, N_ID_2,detected_subframe)  

 

 

if __name__ == "__main__":

    main()