IQ Decode - Python
The purpose of this tutorial is to show how to decode signal from the IQ data. Of course, the purpose is not to provide examples for decoding everything. Decoding every signal and every channel involves the implementation of entire PHY stack which is definitely out of the scope of a tutorial. What I am trying to do in this tutorial is to provide a simple entry point from which you can expand and expand.
In this tutorial, I assume that you already have some I/Q data to work with. If you are not familiar with how to capture I/Q, check out following tutorial.
Table of Contents
- IQ Decode - Python
- Disclaimer and Special NOTEs !!!
- Amarisoft as Study and Development tool for PHY
- Test Setup
- Python
- Python Packages
- Python Script - Command Line
- Test 1 : SISO
- Step 1 : Detect LTE PSS and Display (20 Mhz BW)
- Step 2 : Detect LTE SSS and Display (20 Mhz BW)
- Step 3 : Detect CRS, Estimate Channel Coefficient and Display (20 Mhz BW)
- Step 4 : Creation of Channel Coefficient Grid
- Step 5 : Equalization
- Test 2 : MIMO 2x2
- Step 1 : Detect LTE PSS/SSS and Display (20 Mhz BW) for Antenna 0
- Step 2 : Construct the Resource Grid for Antenna 0 and Antenna 1
- Step 3 : Construct channel matrix
- Step 4 : Equalization
- Step 5 : Undo Precoding
- Step 6 : Undo Large CDD
- Script - Barebone
- Script - Comments
Disclaimer and Special NOTEs !!!
- The IQ data format should be in Amarisoft I/Q data format (i.e, sequence of 4 Bytes for I and 4 Bytes for Q without any additional file header or meta data)
- The IQ data should be LTE Downlink Signal that carries at least one PSS/SSS (i.e, it should include subframe 0 or subframe 5)
- The sampling rate of the IQ data should be 30.72 Mhz or 23.04Mhz.
- The channel bandwidth should be 20Mhz and the number of RB is 100
Amarisoft as Study and Development tool for PHY
In this tutorial I wanted to propose a new use case of Amarisoft solution which is a study and development tool for physical layer. Probably in most case, the very beginning of PHY layer development would start from purely software based simulation solutions. At this stage, we usually simulate PHY processing (e.g, generation and decode of PHY signal) purely with software.
However, it eventually reach a point where you need to verify our solution with real signal or real hardware. For example, if you are UE PHY reciever developer, you would need real physical gNB transmitter signal for the test. But generating the proper gNB transmitter signal is not easy (especially when you don't have experties on eNB/gNB transmitter chain) and very tedious job to prepare such a signal just for testing. On top of it, the test signal should come from a well verified solution that has proved to be working. If the test signal is not well proved, the entire test result with your own development become unreliable. So in most case we do this (assuming that we want to develope UE side reciever PHY at baseband)
- get a vector signal generator
- get a vector signal analyzer for I/Q capture
- connect the VSG(vector signal generator) and the VSA(vector signal analyzer) and capture IQ data transmitted from VSG
- inject the IQ data into the reciever algorithm that you are developing
Based on my experience, I struggled with most of these steps. Some of the difficulties that I had were :
- VSG and VSA is expensive to get. In order to reduce so additional data preprocessing job and hard work of detailed setting/configuration issues, the generic VSA/VSG function.may not be good enough, you may need to add specific LTE/NR analysis / generation feature which push up the cost even further
- Configuration on VSG/VSA is too simple or too complex. Configurations of some VSG/VSA product in the market is too simple. Usually 'too simple' means 'less flexible'. There is not so much of your choice for the output signal patterns that you get. With this, you would not be able to set such a diverse configuration parameters that you see in real gNB/eNB. On the contrary, configurations of some VSG/VSA is too complex. In most case (actually all the VSG/VSA that I have used), those VSA/VSG implements purely low PHY only, no high PHY and MAC. You have to configure all the detailed properties with purely PHY terms only. This way of configuration is often very tricky.
Why Amarisoft ?
- Cost is much more affordable since the product is purely based on SDR (Software Defined Radio). Except RF front end and DAC/ADC, everything is software
- Embedded IQ capturing tools. The product has embedded vector spectrum analyzer by default. The best environment for your study / development would be to have two Amarisoft product (i.e, Callbox and UEsim), but even with the single box you can transmit a signal and capture IQ simulteneously. In addition, you can capture IQ while the real communication is on going without connecting external equipment (external VSA), splitter/directional coupler/circulator and a lot of cables. Amarisoft callbox/UEsim provide a Remote API to capture both TX and RX IQ.
- Easy configuration with flexibility. Basically Amarisoft product is to provide solution with full stack (i.e, full stack 4G/5G network, full stack 4G/5G UE simulator), so the configuration of the product is designed in such a way to support almost every possible signal patterns that are used in live network and at the same time with easiness that can be understood by most of users.
Test Setup
You can use any kind of test setup that allow you to capture IQ in spectrum Analyzer mode
Python
Following is the version of the python and package that are used for this tutorial. (I tested this on Windows 11 Home Edition). For python, first try with whatever version you are using as long as it is ver 3.x and upgrade it to latest version if it does not work.
- Python version : 3.10.9
Python Packages
For the script in this tutorial, you need to install following packages.
pip install numpy pip install matplotlib pip install scipy |
Python Script - Command Line
In this tutorial, I will show you an example of Python script to plot IQ data from a file. This is just an example script intended as POC(Proof of Concept) example. You can revise in any way as you like if this does not suit your purpose.
Test 1 : SISO
By running the script, you would get the test result shown in this section. For the original Python file, click here. For the sample IQ data that you may try with this code, you can download it from here (Unzip the file and change the file name and path in the source code).
Step 1 : Detect LTE PSS and Display (20 Mhz BW)
For this test, I ran a LTE cell with 20Mhz (enb.default.cfg) and capture the TX signal using another sdr card that is not being used for the LTE cell. In my case, I used the sdr card on UEsim, but you can use an SDR card on the callbox which is not being used for the cell. Following is the sdr_spectrum command to capture the IQ data for the this example.
./sdr_spectrum -c 0 -rx_freq 2680e6 -rate 30.72e6 -save-and-exit -duration 0.01 -save_path "/tmp"
If you want to capture the signal with better quality, you may try with the following command
./sdr_spectrum -c 0 -rx_freq 2680e6 -rate 30.72e6 -rx_gain -1 -save-and-exit -duration 0.01 -save_delay 5 -save_path "/tmp"
In this test, the main routine is to detect PSS from the given I/Q data. What is being done in DetectPSS() function can be illustrated as follows.
Running the code, you will first get the text output as follows.
Detected PSS Sequence (N_ID_2): 0 Maximum correlation: 1.6852206708698385 Maximum correlation for PSS0: 1.6852206708698385 Maximum correlation for PSS1: 1.2416934348611477 Maximum correlation for PSS2: 1.1363440216733818 Position of maximum correlation: 269166 Estimated Frequency Offset: 127120.00185358223 Hz Dimension of resourceGrid = (14, 1199) Detected Subframe : 0 Detected N_ID_2 (PSS) : 0 Detected N_ID_1 (SSS) : 84 Detected PCI: 252 |
After the detection of PSS, the result is displayed by the function ShowPSS() as follows. This is the output of ShowPSS() function.
(A) : This is frequency spectrum (plot in frequency) of the entire IQ data given to the program.
(B) : This shows I/Q data in 2D parameteric plot detected from the IQ data. The blue plot shows the I/Q data with magnitue normalized to 1. The plot in orange plot shows the detected PSS sequence with magnitude of all data mapped to 1 reserving angle as it is.
(C) : This shows the expected I/Q data (i.e, The sequence generated by 3GPP spec) corresponding to the detected PSS sequence.
(D) : This is time doman plot of the entire IQ data given to the program. The orange part is the where the detected PSS is located. You can magnify a specific portion of the plot using toolbox at the top of the window.
(0)~(13) : Shows the frequency spectrum for each OFDM symbols within the subframe. (6) is the OFDM symbol where PSS is located.
After the PSS detection, we can plot the power of each I/Q data in LTE resource Grid as shown below. This is the output of ShowGrid()
Step 2 : Detect LTE SSS and Display (20 Mhz BW)
The overall procedure implemented in this script to detect SSS is illustrated below.
After the PSS detection, it will detect SSS and visualize the detected SSS as below. This is the output of ShowSSS()
Step 3 : Detect CRS, Estimate Channel Coefficient and Display (20 Mhz BW)
Once you detected PCI correctly, you can figure out the location of CRS resource elements within the resource grid. If you retrieve the I/Q value in the form of complex number from the CRS resource elements, you will have the list of complex numbers for all the CRS within the subframe.
First run crs_rx, crs_ex, h, crs_k = GetCRSandChannelCoef(resourceGrid, PCI, nRB = 100, method = "generate", subframe=0, debug=False) and run following functions for visualization. Followings are description of arguments and returns.
- resourceGrid : the 2D array that stores all the IQ values for the resourceGrid of one subframe. this is from "resourceGrid = getResourceGrid()" at step 1
- PCI : Physical ID of the cell. This is from "PCI = getPCI(N_ID_1, N_ID_2)" after step 2.
- nRB : The number of RBs within the resourceGrid.
- method : This indicates how to generate the expected crs. "generate" generate the CRS based on 3GPP specification, "testvector" uses a predefined(hardcoded) reference CRS which is produced for PCI 252, nRB100, subframe number 0. I used "testvector" only for debugging purpose.
- subframe : Subframe number within a radio frame
- debug : indicates whether to print out debug print or not
- crs_k : returns a 2D array that stores the resouce elements index of CRS at OFDM symbol 0, 4, 7, 11
- crs_rx : returns a 2D array that stores the resouce elements values retrieved from resourceGrid
- crs_ex : returns a 2D array that stores the resouce elements values generated based on 3GPP specification
- h : returns a 2D array that stores channel coefficient estimated from crs_rx and crs_ex. For SISO with the assumption of low noise channel, it is estimated by h = crs_rx / crs_ex
Followings are constellations and I/Q sequence plot for CRS (Cell Reference Signal) from the detected subframe. (In the constellation, the solid blue indicates the CRS at the lowest frequency and the solid red indcates the CRS at the highest frequency). This is the output of ShowCRS().
Column (A) : The constellation of CRS retrieved from resourceGrid (these are the received CRS). Rows indicates symbol 0, 4, 7, 11
Column (B), (C) : I and Q sequence of the CRS retrieved from resourceGrid. Rows indicates symbol 0, 4, 7, 11
Column(D) : The constellation of CRS generated by 3GPP algorithm. Rows indicates symbol 0, 4, 7, 11
Column(E) : The constellation of channel coefficient estimated from (A) and (D). Rows indicates symbol 0, 4, 7, 11
Step 4 : Creation of Channel Coefficient Grid
At this step, I will create a resource grid filled with channel coefficient at each and every resource elements. This is done in a sequence of multiple steps listed below.
First run following functions :
hResourceGrid = cloneArray(resourceGrid, 0.0+1j*0.0) # create a resource grid (hResourceGrid) with the same dimension as "resourceGrid". All the resource elements are initialized with 0.0+1j*0.0
hResourceGrid = setResourceGrid(hResourceGrid, symb=0, k_list=crs_k[0], v_list=h[0]) # fill the channel cofficients stored in h[0] array (channel coefficient for CRS at OFDM symbol 0) at the OFDM symbol 0 of hResourceGrid
hResourceGrid = setResourceGrid(hResourceGrid, symb=4, k_list=crs_k[1], v_list=h[1]) # fill the channel cofficients stored in h[1] array (channel coefficient for CRS at OFDM symbol 4) at the OFDM symbol 4 of hResourceGrid
hResourceGrid = setResourceGrid(hResourceGrid, symb=7, k_list=crs_k[2], v_list=h[2]) # fill the channel cofficients stored in h[2] array (channel coefficient for CRS at OFDM symbol 7) at the OFDM symbol 7 of hResourceGrid
hResourceGrid = setResourceGrid(hResourceGrid, symb=11, k_list=crs_k[3], v_list=h[3]) # fill the channel cofficients stored in h[3] array (channel coefficient for CRS at OFDM symbol 11) at the OFDM symbol 11 of hResourceGrid
Then interpolate all non-CRS resource elements based on the h (channel coefficient) values of CRS by running following function. In this function, both frequency domain interpolation and time domain interpolation happens. Frequency domain interpolation is done first and then time-domain interpolation is done.
hResourceGrid_interpolated = interpolateGrid(hResourceGrid, mode = 0, td_method = "interpolate", direction="ccw") and run following functions for visualization
Followings are the descriptions on arguments and returns :
- hResourceGrid : The resource grid on which the interpolation is applied
- mode : specify the mode of the interpolation. 0 : interpolate in both time and frequency, mode 1 for frequency domain only, 2 for time only, 3 for no interpolation
- td_method : specify the method of time domain interpolation. "interpolate" mean to perform interpolation, "copy" mean to copy the coefficient from previoys OFDM symbol.
- direction : specify the direction of angle interpolation between the two complex numbers. "ccw" indicates "counter clock wise". "cw" indicates "clockwise"
- hResourceGrid_interpolated : store the resource grid with all the interpolated coefficient
Then, run following function for visualization.
Output of plotGridImage(hResourceGrid_interpolated). This shows the magnitude of complex numbers (channel coefficient) in every resource elements of the resource grid.
Output of ShowResourceGridIQ(hResourceGrid_interpolated, figTitle="Interpolated [h] Grid I/Q", figTitleColor="green"). This shows the resource elements in every OFDM symbols of the hResourceGrid_interpolated in constellation.
Step 5 : Equalization
At this step, I will equalize the received signal using the channel coefficient resource grid (hResourceGrid_interpolated).
Before equalization, let's plot the constellation of the received signal as it is using ShowResourceGridIQ(resourceGrid, clip = 20, figTitle="Recieved Resource Grid I/Q : Before Equalization", figTitleColor="green")
Then run resourceGrid_corrected = ResourceGridDiv(resourceGrid, hResourceGrid_interpolated) and run following function for visualization. This performs the following operation for each and every resource elements of the resource grids.
resourceGrid_corrected = resourceGrid / hResourceGrid_interpolated
Now, plot the result of the operation using ShowResourceGridIQ(resourceGrid_corrected, clip = 10, figTitle="Corrected Resource Grid I/Q : After Equalization", figTitleColor="green")
Test 2 : MIMO 2x2
This test is to show on how to decode 2x2 MIMO IQ data. In terms of fundamental algorithm, the log is same as SISO case. But the complexity of implementation, especially in terms of channel coefficient and equalization. In case of SISO, the channel coefficient is 1x1 (e.g, complex scalar), but in 2x2 MIMO they becomes 2x2 MIMO.
By running the script, you would get the test result shown in this section. For the original Python file, click here. For the sample IQ data that you may try with this code, you can download it from here (Unzip the file and change the file name and path in the source code).
Step 1 : Detect LTE PSS/SSS and Display (20 Mhz BW) for Antenna 0
The first step is to detect the PSS and the SSS for Antenna 0. In my implementation, I did this only for Antenna 0 and will use this information to process(i.e, construction of resource grid) Antenna 1 as well.
The script for this step is as follows. Amarisoft Remote API command to capture the 2x2 MIMO captures IQ data and saves into two separate files. So I read each of the files and save it into separate variable iq_samples_p0 and iq_samples_p1.
# read IQ data from a file
iq_samples_p0 = ReadIQ('IQ/lte_20Mhz_rate23.04Mhz_dur_10ms_pci252_traffic_agc_2x2_ota_0.bin')
iq_samples_p1 = ReadIQ('IQ/lte_20Mhz_rate23.04Mhz_dur_10ms_pci252_traffic_agc_2x2_ota_1.bin')
I captured the LTE DL IQ data at UEsim. In UEsim the sample rate for LTE 20Mhz bandwidth is fixed to 23.04Mhz and not allowed to change it to other frequency. Theoretically it is possible to use 23.04 Mhz sampled IQ data for 20Mhz channel bandwidth, but I resampled it to 30.72 which is better fit to 20Mhz channel bandwidth.
# resample the I/Q data sampled at 23.04 to 30.72
iq_samples_p0 = resample_iq(iq_samples_p0, orig_rate=23.04e6, target_rate=30.72e6)
iq_samples_p1 = resample_iq(iq_samples_p1, orig_rate=23.04e6, target_rate=30.72e6)
Now with the resample data, detect PSS and display it only with antenna 0 IQ file.
sample_rate = 30.72e6
fft_bin_size = 2048
NRB = 100
debug_print = False
resultsPSS = DetectPSS(iq_samples_p0,fft_bin_size,sample_rate,debug=debug_print)
ShowPSS(iq_samples_p0, detected_pss_freq_domain, max_correlation_index, ofdm_symbols_p0, pss_start, pss_time_domain, debug = debug_print)
Step 2 : Construct the Resource Grid for Antenna 0 and Antenna 1
With the information obtained from previous step, construct the resource grid for Antenna 0 and Antenna 1 as shown below. These two resource grid is drawn by following two functions.
AnalyzeGrid(resultsPSS, AntPort, ofdm_symbols_p0, NRB, sample_rate, debug_print=debug_print, display = False )
AnalyzeGrid(resultsPSS, AntPort, ofdm_symbols_p1, NRB, sample_rate, debug_print=debug_print, display = False )
Actually this function is an important one and it does many other things in addition to display the resource grid. High level functionality of AnalyzeGrid() function is as follows. (For more details, look into the source code).
1. Extracts various parameters from the `PssInfo` dictionary and other arguments.
2. Calls `getResourceGrid` to get the resource grid from the OFDM symbols.
3. Detects the Secondary Synchronization Signal (SSS) using the `DetectSSS` function.
4. Prints the detected subframe, N_ID_2 (PSS), N_ID_1 (SSS), and the Physical Cell ID (PCI).
5. Displays the resource grid and the SSS if the `display` argument is `True`.
6. Calls `GetCRSandChannelCoef` to calculate the channel coefficient corresponding to each Cell-specific Reference Signals (CRS) and returns the received CRS, channel coefficient, and the list of subcarrier number for CRS.
7. If `display` is `True`, it shows the constellation of received CRS, expected CRS, and the channel coefficient.
8. Creates an empty resource grid and puts the channel coefficient values at the position of each CRS Resource Element (RE).
9. Interpolates the resource grid with the channel coefficients.
10. If `display` is `True`, it plots the interpolated resource grid, I/Q constellation of all 14 OFDM symbols within the specified resource grid, and line graph for each OFDM symbols within the specified resource grid.
11. Corrects/compensates the resource grid with the interpolated channel coefficients.
12. Checks for NaN or Inf values in the corrected resource grid and clips them if found.
13. If `display` is `True`, it plots the I/Q for 14 OFDM symbols of original and corrected resource grid.
14. Returns a dictionary containing the original resource grid, the channel coefficient resource grid, the interpolated channel coefficient resource grid, received CRS, expected CRS, list of subcarrier number for CRS, and the channel coefficient.
Step 3 : Construct channel matrix
I have to construct the channel matrix for this 2x2 MIMO case from the two resource grid that I obtained in previous step. In case of SISO, obtaining channel coefficient is relatively simple as shown here and here because the channel coefficient for each resource element is a scalar (complex valued scalar). However, for 2x2 MIMO we need to get an 2x2 matrix for channel coefficient for each resource element. The way the matrix is constructed is illustrated as below.
- h11 = port 0 CRS(Cell Reference Signal) from Antenna 0 resource grid
- h21 = port 0 CRS(Cell Reference Signal) from Antenna 1 resource grid
- h12 = port 1 CRS(Cell Reference Signal) from Antenna 0 resource grid
- h22 = port 1 CRS(Cell Reference Signal) from Antenna 1 resource grid
image source : sharetechnote
Once you extracted channel coefficient matrix(H matrix) for every CRS points, we can plot it for visualization. As you see, we get 4 resource grids : h11 grid, h21 grid, h12 grid, h22 grid respectively. The script to do this process is as follows :
The first step is to process the IQ data for Antenn0 and extract CRS for port 0 and port 1, and save them to a separate variables to be easily used for other process
# construct h11 resource grid ************************************************************************************
AntPort = 0
ofdm_symbols_p0 = resultsPSS["ofdm_symbols"]
NRB = 100
sample_rate = 30.72e6
gridInfo_p0 = AnalyzeGrid(resultsPSS, AntPort, ofdm_symbols_p0, NRB, sample_rate, debug_print=debug_print, display = False ) # this function does all the analysis and constructing channel coefficient resource grid
resourceGrid_p0 = gridInfo_p0["resourceGrid"] # this stores the entire resource grid to the variable resourceGrid_p0
h11 = gridInfo_p0["hResourceGrid"] # this extract a resource grid with CRS before interpolation and save the grid to the variable h11
h11_interpolated = gridInfo_p0["hResourceGrid_interpolated"] # this extract a resource grid with CRS after interpolation and save the grid to the variable h11_interpolated
crs_rx_p11 = gridInfo_p0["crs_rx"] # this extract the received CRS value to the variable crs_rx_p11. this is 2D array storing the CRS value for the 4 symbols within one subframe
crs_ex_p11 = gridInfo_p0["crs_ex"] # this extract the expected CRS value (CRS value calculated by 3GPP spec) to the variable crs_ex_p11. this is 2D array storing the CRS value for the 4 symbols within one subframe
crs_k_p0 = gridInfo_p0["crs_k"] # this extract the frequency domain RE positions for CRS. this is 2D array storing the CRS value for the 4 symbols within one subframe
h_p11 = gridInfo_p0["h"] # this extract calculated h values. In terms of math, this is just crs_rx_p11/crs_ex_p11. this is 2D array storing the CRS value for the 4 symbols within one subframe
# construct h22 resource grid ************************************************************************************
AntPort = 1
ofdm_symbols_p1 = GetOFDMsymbol(ofdm_symbol_position, iq_samples_p1)
NRB = 100
sample_rate = 30.72e6
gridInfo_p1 = AnalyzeGrid(resultsPSS, AntPort, ofdm_symbols_p1, NRB, sample_rate, debug_print=debug_print, display = False ) # this function does all the analysis and constructing channel coefficient resource grid
resourceGrid_p1 = gridInfo_p1["resourceGrid"] # this stores the entire resource grid to the variable resourceGrid_p1
h22 = gridInfo_p1["hResourceGrid"] # this extract a resource grid with CRS before interpolation and save the grid to the variable h22
h22_interpolated = gridInfo_p1["hResourceGrid_interpolated"] # this extract a resource grid with CRS after interpolation and save the grid to the variable h22_interpolated
crs_rx_p22 = gridInfo_p1["crs_rx"] # this extract the received CRS value to the variable crs_rx_p22. this is 2D array storing the CRS value for the 4 symbols within one subframe
crs_ex_p22 = gridInfo_p1["crs_ex"] # this extract the expected CRS value (CRS value calculated by 3GPP spec) to the variable crs_ex_p22. this is 2D array storing the CRS value for the 4 symbols within one subframe
crs_k_p1 = gridInfo_p1["crs_k"] # this extract the frequency domain RE positions for CRS. this is 2D array storing the CRS value for the 4 symbols within one subframe
h_p22 = gridInfo_p1["h"] # this extract calculated h values. In terms of math, this is just crs_rx_p11/crs_ex_p11. this is 2D array storing the CRS value for the 4 symbols within one subframe
Now we have to construct the h resource grid for h21 and h12. As shown above, h11 and h12 were calculated and extracted directed from the resource grid for each antenna(resourceGrid_p0 and resourceGrid_p1 respectively), but h21 and h12 can just be extracted from previous two resource grid(resourceGrid_p0 and resourceGrid_p1) without much calculation.
# construct h21 resource grid ************************************************************************************
crs_rx_p21 = []
crs_rx= getREs(resourceGrid_p1, 0, crs_k_p0[0]) # extract the RE value from resourceGrid_p1 at OFDM symbol 0 and frequency domain REs specified by the array crs_k_p0[0]
crs_chest = np.array(crs_rx) / np.array(crs_ex_p11[0]) # calculate the channel coefficient for h21 for all the CRSs at OFDM symbol 0
crs_rx_p21.append(crs_chest) # add the scr_chest to a 2D array crs_rx_p21
crs_rx= getREs(resourceGrid_p1, 4, crs_k_p0[1]) # extract the RE value from resourceGrid_p1 at OFDM symbol 4 and frequency domain REs specified by the array crs_k_p0[1]
crs_chest = np.array(crs_rx) / np.array(crs_ex_p11[1]) # calculate the channel coefficient for h21 for all the CRSs at OFDM symbol 4
crs_rx_p21.append(crs_chest) # add the scr_chest to a 2D array crs_rx_p21
crs_rx= getREs(resourceGrid_p1, 7, crs_k_p0[2]) # extract the RE value from resourceGrid_p1 at OFDM symbol 7 and frequency domain REs specified by the array crs_k_p0[2]
crs_chest = np.array(crs_rx) / np.array(crs_ex_p11[2]) # calculate the channel coefficient for h21 for all the CRSs at OFDM symbol 7
crs_rx_p21.append(crs_chest) # add the scr_chest to a 2D array crs_rx_p21
crs_rx= getREs(resourceGrid_p1, 11, crs_k_p0[3]) # extract the RE value from resourceGrid_p1 at OFDM symbol 11 and frequency domain REs specified by the array crs_k_p0[3]
crs_chest = np.array(crs_rx) / np.array(crs_ex_p11[3]) # calculate the channel coefficient for h21 for all the CRSs at OFDM symbol 11
crs_rx_p21.append(crs_chest) # add the scr_chest to a 2D array crs_rx_p21
h21 = cloneArray(resourceGrid_p1, 0.0+1j*0.0) # create an resource grid, the dimension of which is copied from resourceGrid
# following 4 lines are to populate the blank resoure grid with crs_rx_p21
h21 = setResourceGrid(h21, symb=0, k_list=crs_k_p0[0], v_list=crs_rx_p21[0])
h21 = setResourceGrid(h21, symb=4, k_list=crs_k_p0[1], v_list=crs_rx_p21[1])
h21 = setResourceGrid(h21, symb=7, k_list=crs_k_p0[2], v_list=crs_rx_p21[2])
h21 = setResourceGrid(h21, symb=11, k_list=crs_k_p0[3], v_list=crs_rx_p21[3])
#plotGridImage(h21, "h resource grid : h21")
# interpolate the h21 resource grid
h21_interpolated = interpolateGrid(h21, mode = 0, td_method = "interpolate", direction="ccw")
# construct h12 resource grid ************************************************************************************
crs_rx_p12 = []
crs_rx= getREs(resourceGrid_p0, 0, crs_k_p1[0]) # extract the RE value from resourceGrid_p0 at OFDM symbol 0 and frequency domain REs specified by the array crs_k_p1[0]
crs_chest = np.array(crs_rx) / np.array(crs_ex_p22[0]) # calculate the channel coefficient for h12 for all the CRSs at OFDM symbol 0
crs_rx_p12.append(crs_chest) # add the srs_chest to a 2D array crs_rx_p12
crs_rx= getREs(resourceGrid_p0, 4, crs_k_p1[1]) # extract the RE value from resourceGrid_p0 at OFDM symbol 4 and frequency domain REs specified by the array crs_k_p1[1]
crs_chest = np.array(crs_rx) / np.array(crs_ex_p22[1]) # calculate the channel coefficient for h12 for all the CRSs at OFDM symbol 0
crs_rx_p12.append(crs_chest) # add the srs_chest to a 2D array crs_rx_p12
crs_rx= getREs(resourceGrid_p0, 7, crs_k_p1[2]) # extract the RE value from resourceGrid_p0 at OFDM symbol 7 and frequency domain REs specified by the array crs_k_p1[2]
crs_chest = np.array(crs_rx) / np.array(crs_ex_p22[2]) # calculate the channel coefficient for h12 for all the CRSs at OFDM symbol 0
crs_rx_p12.append(crs_chest) # add the srs_chest to a 2D array crs_rx_p12
crs_rx= getREs(resourceGrid_p0, 11, crs_k_p1[3]) # extract the RE value from resourceGrid_p0 at OFDM symbol 11 and frequency domain REs specified by the array crs_k_p1[3]
crs_chest = np.array(crs_rx) / np.array(crs_ex_p22[3]) # calculate the channel coefficient for h12 for all the CRSs at OFDM symbol 0
crs_rx_p12.append(crs_chest) # add the srs_chest to a 2D array crs_rx_p12
h12 = cloneArray(resourceGrid_p0, 0.0+1j*0.0) # create an resource grid, the dimension of which is copied from resourceGrid
# following 4 lines are to populate the blank resoure grid with crs_rx_p12
h12 = setResourceGrid(h12, symb=0, k_list=crs_k_p1[0], v_list=crs_rx_p12[0])
h12 = setResourceGrid(h12, symb=4, k_list=crs_k_p1[1], v_list=crs_rx_p12[1])
h12 = setResourceGrid(h12, symb=7, k_list=crs_k_p1[2], v_list=crs_rx_p12[2])
h12 = setResourceGrid(h12, symb=11, k_list=crs_k_p1[3], v_list=crs_rx_p12[3])
#plotGridImage(h12, "h resource grid : h12")
# interpolate the h12 resource grid
h12_interpolated = interpolateGrid(h12, mode = 0, td_method = "interpolate", direction="ccw")
Now plot the all 4 channel coefficient before interplation.
gridList = [h11,h21,h12,h22]
titleList = ["h11","h21","h12","h22"]
plotGridTile(gridList, titleList)
Then perform the interpolation for each resource grid. The algorithm for the interpolation is same as shown for SISO case. I just applied the same algorithm for each resource grid and get the result as shown below.
Now plot the all 4 channel coefficient after interpolation.
gridList = [h11_interpolated,h21_interpolated,h12_interpolated,h22_interpolated]
titleList = ["h11_interpolated","h21_interpolated","h12_interpolated","h22_interpolated"]
plotGridTile(gridList, titleList)
For a little bit different perspective, I plotted the 4 resource grid data into constellation as shown below. The horizontal 14 plots represents the 14 OFDM symbols of the grid and the vertial 4 plots indicates the h11, h21, h12, h22 respectively.
ShowResourceGridIQ_Tile(gridList, TitleList=titleList, clip=5)
Step 4 : Equalization
Since we have all the necessary H matrix, we can equalize the received data with the H matrix.
Let's take a look at how the received data look like before equalization. The received data for Antenna 0 (labeled as rx_p0) and Antenna 1 (labeled as rx_p1) are look as shown below.
The code to plot these constellation is as follows.
ShowResourceGridIQ(resourceGrid_p0, figTitle="rx_p0 (RX signal before Equalization)", clip = 25, figTitleColor="green")
ShowResourceGridIQ(resourceGrid_p1, figTitle="rx_p1 (RX signal before Equalization)", clip = 25, figTitleColor="green")
Now for the equalization process and processing Precoding / Large CDD, we need a little bit of preparation as below. What we need for this is the interpolated H matrix and W, U, D matrix. So first let's do the preps for this.
Following six lines are basically changing the name of the variables. This is not mandatary, but I am renaming h11_interpolate to h11 and so on just to use the shorter / simple name.
h11 = h11_interpolated
h21 = h21_interpolated
h12 = h12_interpolated
h22 = h22_interpolated
rx_p0 = resourceGrid_p0
rx_p1 = resourceGrid_p1
Followings are just to make a copy of thd grid structure and assign them to several intermediate variables that will be used in later step. In real implentation, you would not need these kind of resource grids but I am preparing all of these grids to show the plots for all intermediate steps.
rx_p0_eq = resourceGrid_p0.copy()
rx_p1_eq = resourceGrid_p1.copy()
rx_p0_W = resourceGrid_p0.copy()
rx_p1_W = resourceGrid_p1.copy()
rx_p0_D = resourceGrid_p0.copy()
rx_p1_D = resourceGrid_p1.copy()
rx_p0_U = resourceGrid_p0.copy()
rx_p1_U = resourceGrid_p1.copy()
Now let's prepare the matrix for Precoding and Large CDD handling. For 2x2 MIMO, TM3 in LTE, Precoding matrix(W) and large CDD matrix (D and U) are defined as follows by 3GPP. Note that W and U matrix are the matrix with the constant value, but D matrix has the value changing with the position of the data in the given sequence.
W = np.array([
[ 1/np.sqrt(2), 0 ],
[ 0, 1/np.sqrt(2)]
])
def D(i) :
d = np.array([
[ 1, 0 ],
[ 0, np.exp(-1j*(2*np.pi*i/2))]
])
return d
U = np.array([
[ 1/np.sqrt(2), 1/np.sqrt(2) ],
[ 1/np.sqrt(2), np.exp(-1j*(2*np.pi/2))/np.sqrt(2) ]
])
Followings are the result of the equalization for Antenna 0 and Antenna 1. Now you see something looking like a constellation. One thing you may notice would be that this would not look like the constellation that you are expecting. I used the IQ data for 4 QAM Data, but the constellation does not look like QAM except OFDM symbol 0. The reason behind these strange looking constellation is precoding and large CDD which will be processed in following steps.
The code for equalization is as follows. Logic is simple. Multiply each RE data with the inverse of H matrix. That is what the two for loops are doing.
for symb in range(len(rx_p0)) :
for k in range(len(rx_p0[1])) :
RX = np.array(
[ [ rx_p0[symb,k] ],
[ rx_p1[symb,k] ]
])
H = np.array([
[ h11[symb,k], h12[symb,k] ],
[ h21[symb,k], h22[symb,k] ]
])
try:
Hinv = np.linalg.inv(H)
except np.linalg.LinAlgError:
# If H is singular, set Hinv to identity matrix
print(f"singular matrix error at ({symb},{k})")
Hinv = np.identity(2)
eq_re = np.dot(Hinv, RX)
rx_p0_eq[symb,k] = eq_re[0]
rx_p1_eq[symb,k] = eq_re[1]
# plot the result into constellation.
ShowResourceGridIQ(rx_p0_eq, figTitle="Equalized rx_p0", clip = 10, figTitleColor="green")
ShowResourceGridIQ(rx_p1_eq, figTitle="Equalized rx_p1", clip = 10, figTitleColor="green")
Step 5 : Undo Precoding
In case of LTE MIMO, the transmitted data gets precoded as summarized below. The original data gets applied with large CDD(U and D matrix) first and then gets applied with precoding matrix (W matrix). So at the reciever side (i.e, in decoding process) the process should be done in reverse order. That is, precoding should be undone first and then large CDD should be undone.
image source : sharetechnote
The result of precoding undoing is as follows. Undoing precoding is done simply by multiplying the inverse matrix of precoding matrix W. You would not see much difference even after undoing the precoding just from the constellation. Actually there are a lot of position changes for each symbols (each dots), but the changed position also falls into one of the 9 clusters as before undoing the precoding, so you would not see big difference overall. If you plot all the symbols(dots) in different colors, you may notice some color distributions before and after this process.
In terms of programming logic, this is exactly same as we did in equalization process. Multiply each RE data with the the inverse of precoding matrix(W). That is what the two for loops are doing. It is what the two for loops are doing.
for symb in range(len(rx_p0)) :
for k in range(len(rx_p0[1])) :
if symb != 0 :
RX = np.array(
[ [ rx_p0_eq[symb,k] ],
[ rx_p1_eq[symb,k] ]
])
try:
Winv = np.linalg.inv(W)
except np.linalg.LinAlgError:
# If H is singular, set Hinv to identity matrix
print(f"singular matrix error at ({symb},{k})")
Winv = np.identity(2)
eq_re = np.dot(Winv, RX)
rx_p0_W[symb,k] = eq_re[0]
rx_p1_W[symb,k] = eq_re[1]
else:
rx_p0_W[symb,k] = rx_p0_eq[symb,k]
rx_p1_W[symb,k] = rx_p1_eq[symb,k]
ShowResourceGridIQ(rx_p0_W, figTitle="inv(W).rx_p0_eq", clip = 10, figTitleColor="green")
ShowResourceGridIQ(rx_p1_W, figTitle="inv(W).rx_p1_eq", clip = 10, figTitleColor="green")
Step 6 : Undo Large CDD
Now we are at the last step which is undoing large CDD. As illustrated below, large CDD is made up of two steps which is multiplication of two matrix U and D. Since U is multiplied and then D is multiplied during large CDD, we have to do this in reverse order in decoding process. That is, undo D matrix first and then undo U matrix.
image source : sharetechnote
First take a look at the result of undoing D matrix as shown below. Undoing D matrix mean multiplying the inverse matrix of D to data set from previous step.
In terms of programming logic, this is exactly same as we did in previous step. Multiply each RE data with the the inverse of large CDD matrix(D). That is what the two for loops are doing. It is what the two for loops are doing.
for symb in range(len(rx_p0)) :
for k in range(len(rx_p0[1])) :
if symb != 0 :
RX = np.array(
[ [ rx_p0_W[symb,k] ],
[ rx_p1_W[symb,k] ]
])
try:
Dinv = np.linalg.inv(D(k))
except np.linalg.LinAlgError:
# If H is singular, set Hinv to identity matrix
print(f"singular matrix error at ({symb},{k})")
Dinv = np.identity(2)
eq_re = np.dot(Dinv, RX)
rx_p0_D[symb,k] = eq_re[0]
rx_p1_D[symb,k] = eq_re[1]
else:
rx_p0_D[symb,k] = rx_p0_eq[symb,k]
rx_p1_D[symb,k] = rx_p1_eq[symb,k]
ShowResourceGridIQ(rx_p0_D, figTitle="inv(D).inv(W).rx_p0_eq", clip = 10, figTitleColor="green")
ShowResourceGridIQ(rx_p1_D, figTitle="inv(D).inv(W).rx_p1_eq", clip = 10, figTitleColor="green")
As the last step, let's undo U matrix and take a look at the result as shown below. Undoing U matrix mean multiplying the inverse matrix of U to data set from previous step. Now we get the constellation that matches what we are expecting.
As we did in previous steps, just multiply each RE data with the the inverse of large CDD matrix(U). That is what the two for loops are doing. It is what the two for loops are doing.
for symb in range(len(rx_p0)) :
for k in range(len(rx_p0[1])) :
if symb != 0 :
RX = np.array(
[ [ rx_p0_D[symb,k] ],
[ rx_p1_D[symb,k] ]
])
try:
Uinv = np.linalg.inv(U)
except np.linalg.LinAlgError:
# If H is singular, set Hinv to identity matrix
print(f"singular matrix error at ({symb},{k})")
Uinv = np.identity(2)
eq_re = np.dot(Uinv, RX)
rx_p0_U[symb,k] = eq_re[0]
rx_p1_U[symb,k] = eq_re[1]
else:
rx_p0_U[symb,k] = rx_p0_eq[symb,k]
rx_p1_U[symb,k] = rx_p1_eq[symb,k]
ShowResourceGridIQ(rx_p0_U, figTitle="inv(U).inv(D).inv(W).rx_p0_eq", clip = 10, figTitleColor="green")
ShowResourceGridIQ(rx_p1_U, figTitle="inv(U).inv(D).inv(W).rx_p1_eq", clip = 10, figTitleColor="green")
Script - Barebone
Following is the source script without any comments. I didn't put any comments on purpose for simplicity. For the original Python file, click here. For the sample IQ data that you may try with this code, you can download it from here (Unzip the file and change the file name and path in the source code).
import numpy as np import matplotlib.pyplot as plt import struct import matplotlib.gridspec as gridspec from scipy.signal import butter, lfilter
def generate_pss(n_id_2):
u_shift = [25, 29, 34] # Root Zadoff-Chu sequence numbers
d_u = []
for n in range(62): u = u_shift[n_id_2]
if n <= 30: d = np.exp(-1j * np.pi * u * n * (n + 1) / 63) else: d = np.exp(-1j * np.pi * u * (n + 1) * (n + 2) / 63)
d_u.append(d)
return np.array(d_u)
def frequency_offset_estimation(received_pss, expected_pss, sample_rate = 30.72e6):
phase_difference = np.angle(np.dot(received_pss, np.conj(expected_pss))) time_duration_pss = 62 / sample_rate frequency_offset = (phase_difference / (2 * np.pi)) / time_duration_pss
return frequency_offset
def correct_frequency_offset(signal, frequency_offset, sample_rate, debug = False):
if debug : print("[Debug Info] Frequency Offset ==> ",frequency_offset) print("[Debug Info] sample_rate ==> ",sample_rate)
signal = np.array(signal, dtype=np.complex128) time = len(signal) / sample_rate correction = np.exp(-1j * 2 * np.pi * frequency_offset * time) corrected_signal = signal * correction
# just for debugging if debug : original_signal_magnitude = np.abs(signal) corrected_signal_magnitude = np.abs(corrected_signal) # Check if the original signal magnitude and corrected signal magnitude are the same print("comparison of mag : ", np.allclose(original_signal_magnitude, corrected_signal_magnitude)) print("correction : ", correction)
return corrected_signal
def generate_sss(NID1,NID2):
q_prime = np.floor(NID1/30) q = np.floor(((NID1+q_prime*(q_prime+1)/2))/30) m_prime = NID1 + q *(q+1)/2 m0 = m_prime % 31 m1 = (m0 + np.floor(m_prime/31) + 1) % 31
# generate d_even() sequence x_s = np.zeros(31) x_s[:5] = [0, 0, 0, 0, 1] for i in range(26): x_s[i+5] = (x_s[i+2] + x_s[i]) % 2
x_c = np.zeros(31) x_c[:5] = [0, 0, 0, 0, 1] for i in range(26): x_c[i+5] = (x_c[i+3] + x_c[i]) % 2
s_tilda = 1 - 2*x_s c_tilda = 1 - 2*x_c s0_m0_even = np.zeros(31) s1_m1_even = np.zeros(31) for n in range(31): s0_m0_even[n] = s_tilda[int((n+m0)%31)] s1_m1_even[n] = s_tilda[int((n+m1)%31)]
c0_even = np.zeros(31) for n in range(31): c0_even[n] = c_tilda[int((n+NID2)%31)]
d_even_sub0 = s0_m0_even * c0_even d_even_sub5 = s1_m1_even * c0_even
# generate d_odd() sequence x_z = np.zeros(31) x_z[:5] = [0, 0, 0, 0, 1] for i in range(26): x_z[i+5] = (x_z[i+4] + x_z[i+2] + x_z[i+1] + x_z[i]) % 2
z_tilda = 1 - 2*x_z s1_m1_odd = np.zeros(31) s0_m0_odd = np.zeros(31) for n in range(31): s1_m1_odd[n] = s_tilda[int((n+m1)%31)] s0_m0_odd[n] = s_tilda[int((n+m0)%31)]
c1_odd = np.zeros(31) for n in range(31): c1_odd[n] = c_tilda[int((n+NID2+3)%31)]
z1_m0_odd = np.zeros(31) z1_m1_odd = np.zeros(31) for n in range(31): z1_m0_odd[n] = z_tilda[int((n+m0%8)%31)] z1_m1_odd[n] = z_tilda[int((n+m1%8)%31)]
d_odd_sub0 = s1_m1_odd * c1_odd * z1_m0_odd d_odd_sub5 = s0_m0_odd * c1_odd * z1_m1_odd
# calculate d_sub0 d_sub0 = np.zeros(62) d_sub0[::2] = d_even_sub0 d_sub0[1::2] = d_odd_sub0 sss0 = d_sub0
d_sub5 = np.zeros(62) d_sub5[::2] = d_even_sub5 d_sub5[1::2] = d_odd_sub5 sss5 = d_sub5
return sss0, sss5
def cross_correlate(a, b): return np.sum(a * np.conj(b))
def DetectPSS(iq_samples, fft_bin_size=2048):
pss_seqs = [generate_pss(i) for i in range(3)]
max_correlation = 0 max_window_samples = None max_correlation_index = None max_correlation_position = None
max_correlation_pss0 = max_correlation_pss1 = max_correlation_pss2 = 0
for j in range(len(iq_samples) - fft_bin_size): iq_window = iq_samples[j : j + fft_bin_size] frequency_samples = np.fft.fft(iq_window, fft_bin_size) frequency_samples = np.fft.fftshift(frequency_samples)
for k in range(3): window_start = (fft_bin_size - 62) // 2 window_end = window_start + 62 window_samples = frequency_samples[window_start:window_end]
correlation = np.sum(window_samples * np.conj(pss_seqs[k]))
if k == 0 and abs(correlation) > max_correlation_pss0: max_correlation_pss0 = abs(correlation) elif k == 1 and abs(correlation) > max_correlation_pss1: max_correlation_pss1 = abs(correlation) elif k == 2 and abs(correlation) > max_correlation_pss2: max_correlation_pss2 = abs(correlation)
if abs(correlation) > max_correlation: max_correlation = abs(correlation) max_correlation_index = k max_correlation_position = j max_window_samples = window_samples pss_start = j pss_time_domain = np.abs(iq_window)
iq_window = iq_samples[max_correlation_position : max_correlation_position + fft_bin_size] frequency_samples = np.fft.fft(iq_window, fft_bin_size) frequency_samples = np.fft.fftshift(frequency_samples) received_pss = frequency_samples[993:1055] expected_pss = pss_seqs[max_correlation_index] frequency_offset = frequency_offset_estimation(received_pss, expected_pss)
cp_lengths = [160, 144, 144, 144, 144, 144, 144, 160, 144, 144, 144, 144, 144, 144] pss_slot_index = 6
first_symbol_start = (pss_start - cp_lengths[pss_slot_index]) - 5 * (cp_lengths[pss_slot_index]+fft_bin_size) - (cp_lengths[0]+fft_bin_size)
if pss_start >= first_symbol_start + 6 * fft_bin_size and pss_start + 8 * fft_bin_size <= len(iq_samples): ofdm_symbols = [] start_index = first_symbol_start for i in range(14): start_index = start_index + cp_lengths[i] end_index = start_index + fft_bin_size ofdm_symbols.append(iq_samples[start_index : end_index]) start_index = end_index ofdm_symbols = np.array(ofdm_symbols) else: print("Not enough samples to extract 14 OFDM symbols.")
return { 'max_correlation': max_correlation, 'max_correlation_pss0': max_correlation_pss0, 'max_correlation_pss1': max_correlation_pss1, 'max_correlation_pss2': max_correlation_pss2, 'max_correlation_index': max_correlation_index, 'max_correlation_position': max_correlation_position, 'frequency_offset': frequency_offset, 'max_window_samples': max_window_samples, 'ofdm_symbols': ofdm_symbols, 'pss_start' : pss_start, 'pss_time_domain' : pss_time_domain , }
def getREs(ofdm_symbols, symNo, scList):
try: return [ofdm_symbols[symNo][sc] for sc in scList] except IndexError: print("Invalid symbol number or subcarrier index") return []
def ShowPSS(iq_samples, detected_pss_freq_domain, max_correlation_index, ofdm_symbols, pss_start, pss_time_domain, fft_bin_size = 2048):
import matplotlib.pyplot as plt import numpy as np
fig = plt.figure(figsize=(15, 9))
ax1 = plt.subplot2grid((7, 3), (0, 0), rowspan=2, colspan=1) ax3 = plt.subplot2grid((7, 3), (2, 0), rowspan=2, colspan=1) ax4 = plt.subplot2grid((7, 3), (4, 0), rowspan=2, colspan=1) ax5 = plt.subplot2grid((7, 3), (6, 0), rowspan=1, colspan=1)
fft_iq_samples = np.fft.fft(iq_samples) fft_iq_samples = np.fft.fftshift(fft_iq_samples) fft_iq_samples_db = 20 * np.log10(np.abs(fft_iq_samples)) ax1.plot(fft_iq_samples_db) ax1.set_title('Frequency Spectrum of IQ samples (dB scale)', fontsize=8) ax1.grid(True) ax1.set_xlabel('Frequency', fontsize=8) ax1.set_ylabel('Amplitude (dB)', fontsize=8) ax1.tick_params(axis='both', labelsize=8)
fft_detected_pss = np.fft.fft(detected_pss_freq_domain) fft_detected_pss_normalized = fft_detected_pss / np.abs(fft_detected_pss).max()
ax3.scatter(fft_detected_pss_normalized.real, fft_detected_pss_normalized.imag, label='Detected PSS') normalized_data = fft_detected_pss / np.abs(fft_detected_pss) ax3.scatter(normalized_data.real, normalized_data.imag, label='Normalized PSS', alpha=0.5) ax3.legend() ax3.grid(True) ax3.set_title('IQ Constellation of Detected PSS and Normalized PSS', fontsize=8) ax3.set_xlabel('In-Phase (I)', fontsize=8) ax3.set_ylabel('Quadrature (Q)', fontsize=8) ax3.legend(fontsize=8) ax3.tick_params(axis='both', labelsize=8)
generated_pss = generate_pss(max_correlation_index) ax4.scatter(generated_pss.real, generated_pss.imag) ax4.grid(True) ax4.set_title('IQ Constellation of Generated PSS for NID2='+str(max_correlation_index), fontsize=8) ax4.set_xlabel('In-Phase (I)', fontsize=8) ax4.set_ylabel('Quadrature (Q)', fontsize=8) ax4.tick_params(axis='both', labelsize=8)
mag_iq_samples = np.abs(iq_samples) ax5.plot(mag_iq_samples) ax5.plot(range(pss_start, pss_start + len(pss_time_domain)), pss_time_domain, label='Detected PSS', linestyle='--') ax5.grid(True) ax5.set_title('IQ in time doman and Detected PSS', fontsize=8) ax5.set_xlabel('Time', fontsize=8) ax5.set_ylabel('Amplitude', fontsize=8) ax5.legend(fontsize=8) ax5.tick_params(axis='both', labelsize=8)
axs = [plt.subplot2grid((7, 3), (i % 7, i // 7 + 1)) for i in range(14)]
for i in range(14): ax = axs[i] symbol = ofdm_symbols[i] freq_domain_symbol = np.fft.fft(symbol, fft_bin_size) freq_domain_symbol = np.fft.fftshift(freq_domain_symbol) freq_domain_symbol = freq_domain_symbol / np.max(np.abs(freq_domain_symbol)) freq_domain_symbol_db = 20 * np.log10(np.abs(freq_domain_symbol)) ax.plot(freq_domain_symbol_db) ax.set_ylim([-50, 0])
plt.tight_layout() plt.show()
def ShowGrid(ofdm_symbols, nRB = 100, zmin=-25, zmax=0):
frequency_domain_symbols = np.fft.fft(ofdm_symbols, axis=1) frequency_domain_symbols = np.fft.fftshift(frequency_domain_symbols, axes=1)
magnitude = np.abs(frequency_domain_symbols) magnitude = magnitude / np.max(magnitude, axis=1, keepdims=True)
magnitude_db = 20 * np.log10(magnitude + np.finfo(float).eps)
total_subcarriers = frequency_domain_symbols.shape[1]
start_index = (total_subcarriers - nRB * 12) // 2 end_index = start_index + nRB * 12
magnitude_db = magnitude_db[:, start_index:end_index]
fig, ax = plt.subplots(figsize=(14, 5))
cax = ax.imshow(magnitude_db, aspect='auto', cmap='RdBu_r', interpolation='none', vmin=zmin, vmax=zmax)
fig.colorbar(cax)
ax.set_xlabel("Subcarriers") ax.set_ylabel("OFDM Symbols") ax.set_title("Resource Grid")
symbol_labels = ['Symbol {}'.format(i) for i in range(ofdm_symbols.shape[0])] ax.set_yticks(np.arange(ofdm_symbols.shape[0])) ax.set_yticklabels(symbol_labels)
ax.set_xticks(np.arange(0, nRB * 12 + 1, 12*5)) ax.set_xticklabels(np.arange(0, nRB * 12 + 1, 12*5))
plt.tight_layout() plt.show()
def getResourceGrid(ofdm_symbols, nRB = 100): frequency_domain_symbols = np.fft.fft(ofdm_symbols, axis=1) frequency_domain_symbols = np.fft.fftshift(frequency_domain_symbols, axes=1)
total_subcarriers = frequency_domain_symbols.shape[1]
start_index = (total_subcarriers - nRB * 12) // 2 end_index = start_index + nRB * 12
resource_grid = frequency_domain_symbols[:, start_index:end_index]
return resource_grid
def DetectSSS(resourceGrid, N_ID_2, frequency_offset, nRB = 100, samplingRate = 30.72e6, debug = False): if debug : print("[Debug Info] Frequency Offset ==> ",frequency_offset) print("[Debug Info] N_ID_2 ==> ",N_ID_2)
noOfRE = nRB * 12 sss_sc_indices = list(range(noOfRE-noOfRE//2-31, noOfRE-noOfRE//2+31))
sss_no_correction = getREs(resourceGrid, 5, sss_sc_indices) sss = correct_frequency_offset(sss_no_correction, frequency_offset, samplingRate)
max_corr = -np.inf best_N_ID_1 = None for N_ID_1 in range(168): expected_sss0, expected_sss5 = generate_sss(N_ID_1, N_ID_2) corr0 = np.abs(cross_correlate(sss, expected_sss0)) corr5 = np.abs(cross_correlate(sss, expected_sss5)) total_corr = max(corr0, corr5) if debug : print("corr0 = ", corr0, ",corr5 = ", corr5, ", N_ID_1 = ", N_ID_1)
if total_corr > max_corr: max_corr = total_corr best_N_ID_1 = N_ID_1 if debug : print("corr0 = ", corr0, ", corr5 = ", corr5, ", max_corr = ", max_corr, ", N_ID_1 = ", N_ID_1)
if debug : print('Detected SSS (N_ID_1): ', best_N_ID_1)
detected_subframe = 0 if cross_correlate(sss, generate_sss(best_N_ID_1, N_ID_2)[0]) > cross_correlate(sss, generate_sss(best_N_ID_1, N_ID_2)[1]) else 5
return { 'sss_no_correction': sss_no_correction, 'sss_corrected': sss, 'N_ID_1': best_N_ID_1, 'detected_subframe': detected_subframe, }
def getPCI(N_ID_1, N_ID_2) : return 3*N_ID_1 + N_ID_2
def ShowSSS(sss_no_correction, sss_corrected, N_ID_1, N_ID_2,detected_subframe):
sss_generated = np.array(generate_sss(N_ID_1, N_ID_2)) if detected_subframe == 0 : sss_generated = sss_generated[0] else : sss_generated = sss_generated[1]
sss_no_correction = np.array(sss_no_correction, dtype=np.complex128) sss_corrected = np.array(sss_corrected, dtype=np.complex128)
sss_no_correction = sss_no_correction / np.mean(np.abs(sss_no_correction)) sss_corrected = sss_corrected / np.mean(np.abs(sss_corrected))
data_list = [sss_no_correction, sss_corrected, sss_generated ] titles = ['sss_no_correction', 'sss_corrected', 'sss from N_ID_1'] fig, axs = plt.subplots(3, 3, figsize=(9, 9))
for i, data in enumerate(data_list): data = np.array(data, dtype=np.complex128)
axs[0, i].scatter(data.real, data.imag, color='b') axs[0, i].set_title(f'{titles[i]}', fontsize=8) axs[0, i].set_xlabel('Real', fontsize=8) axs[0, i].set_ylabel('Imaginary', fontsize=8) axs[0, i].tick_params(axis='both', which='major', labelsize=8) axs[0, i].set_xlim(-2,2) axs[0, i].set_ylim(-2,2)
axs[1, i].plot(data.real) axs[1, i].set_title(f'{titles[i]}', fontsize=8) axs[1, i].set_xlabel('Data Index', fontsize=8) axs[1, i].set_ylabel('Real', fontsize=8) axs[1, i].tick_params(axis='both', which='major', labelsize=8) axs[1, i].set_ylim(-2,2)
axs[2, i].plot(data.imag) axs[2, i].set_title(f'{titles[i]}', fontsize=8) axs[2, i].set_xlabel('Data Index', fontsize=8) axs[2, i].set_ylabel('Imaginary', fontsize=8) axs[2, i].tick_params(axis='both', which='major', labelsize=8) axs[2, i].set_ylim(-2,2)
plt.tight_layout() plt.show()
def main(): # Load IQ data from binary file with open('IQ/lte_20Mhz_rate30.72Mhz_dur_10ms_pci252.bin', 'rb') as f: # expected PSS Seq = 0 iq_samples = np.fromfile(f, dtype=np.complex64)
resultsPSS = DetectPSS(iq_samples)
print(f'Detected PSS Sequence (N_ID_2): {resultsPSS["max_correlation_index"]}') print(f'Maximum correlation: {resultsPSS["max_correlation"]}') print(f'Maximum correlation for PSS0: {resultsPSS["max_correlation_pss0"]}') print(f'Maximum correlation for PSS1: {resultsPSS["max_correlation_pss1"]}') print(f'Maximum correlation for PSS2: {resultsPSS["max_correlation_pss2"]}') print(f'Position of maximum correlation: {resultsPSS["max_correlation_position"]}') print(f'Estimated Frequency Offset: {resultsPSS["frequency_offset"]} Hz')
max_correlation_index = resultsPSS["max_correlation_index"] pss_start = resultsPSS["pss_start"] pss_time_domain = resultsPSS["pss_time_domain"] ofdm_symbols = resultsPSS["ofdm_symbols"] detected_pss_freq_domain = resultsPSS["max_window_samples"] frequency_offset = resultsPSS["frequency_offset"]
resourceGrid = getResourceGrid(ofdm_symbols, nRB = 100) print("Dimension of resourceGrid = ",resourceGrid.shape)
N_ID_2 = max_correlation_index resultsSSS = DetectSSS(resourceGrid, N_ID_2, frequency_offset, nRB = 100, samplingRate = 30.72e6)
N_ID_1 = resultsSSS["N_ID_1"] sss_no_correction = resultsSSS["sss_no_correction"] sss_corrected = resultsSSS["sss_corrected"] detected_subframe = resultsSSS["detected_subframe"]
print('Detected Subframe : ', detected_subframe)
print('Detected N_ID_2 (PSS) : ', N_ID_2) print('Detected N_ID_1 (SSS) : ', N_ID_1)
PCI = getPCI(N_ID_1, N_ID_2) print('Detected PCI: ', PCI)
ShowPSS(iq_samples, detected_pss_freq_domain, max_correlation_index, ofdm_symbols, pss_start, pss_time_domain) ShowGrid(ofdm_symbols, nRB = 100, zmin = -25, zmax = 0) ShowSSS(sss_no_correction, sss_corrected, N_ID_1, N_ID_2,detected_subframe)
if __name__ == "__main__": main()
|
Script - Comments
For the readers who is not familiar with python script and packages that are used in the code. I put the comments for each lines. For the original Python file, click here. For the sample IQ data that you may try with this code, you can download it from here (Unzip the file and change the file name and path in the source code).
import numpy as np import matplotlib.pyplot as plt import struct import matplotlib.gridspec as gridspec from scipy.signal import butter, lfilter
def generate_pss(n_id_2):
u_shift = [25, 29, 34]
d_u = []
for n in range(62): u = u_shift[n_id_2]
if n <= 30: d = np.exp(-1j * np.pi * u * n * (n + 1) / 63) else: d = np.exp(-1j * np.pi * u * (n + 1) * (n + 2) / 63)
d_u.append(d)
return np.array(d_u)
def frequency_offset_estimation(received_pss, expected_pss): phase_difference = np.angle(np.dot(received_pss, np.conj(expected_pss))) frequency_offset = phase_difference / (2 * np.pi * 62 / 30.72e6) # Convert phase difference to frequency offset return frequency_offset
def correct_frequency_offset(signal, frequency_offset, sample_rate, debug = False):
if debug : print("[Debug Info] Frequency Offset ==> ",frequency_offset) print("[Debug Info] sample_rate ==> ",sample_rate)
signal = np.array(signal, dtype=np.complex128) time = len(signal) / sample_rate correction = np.exp(-1j * 2 * np.pi * frequency_offset * time) corrected_signal = signal * correction
if debug : original_signal_magnitude = np.abs(signal) corrected_signal_magnitude = np.abs(corrected_signal) print("comparison of mag : ", np.allclose(original_signal_magnitude, corrected_signal_magnitude)) print("correction : ", correction)
return corrected_signal
def generate_sss(NID1,NID2):
q_prime = np.floor(NID1/30) q = np.floor(((NID1+q_prime*(q_prime+1)/2))/30) m_prime = NID1 + q *(q+1)/2 m0 = m_prime % 31 m1 = (m0 + np.floor(m_prime/31) + 1) % 31
x_s = np.zeros(31) x_s[:5] = [0, 0, 0, 0, 1] for i in range(26): x_s[i+5] = (x_s[i+2] + x_s[i]) % 2
x_c = np.zeros(31) x_c[:5] = [0, 0, 0, 0, 1] for i in range(26): x_c[i+5] = (x_c[i+3] + x_c[i]) % 2
s_tilda = 1 - 2*x_s c_tilda = 1 - 2*x_c s0_m0_even = np.zeros(31) s1_m1_even = np.zeros(31) for n in range(31): s0_m0_even[n] = s_tilda[int((n+m0)%31)] s1_m1_even[n] = s_tilda[int((n+m1)%31)]
c0_even = np.zeros(31) for n in range(31): c0_even[n] = c_tilda[int((n+NID2)%31)]
d_even_sub0 = s0_m0_even * c0_even d_even_sub5 = s1_m1_even * c0_even
x_z = np.zeros(31) x_z[:5] = [0, 0, 0, 0, 1] for i in range(26): x_z[i+5] = (x_z[i+4] + x_z[i+2] + x_z[i+1] + x_z[i]) % 2
z_tilda = 1 - 2*x_z s1_m1_odd = np.zeros(31) s0_m0_odd = np.zeros(31) for n in range(31): s1_m1_odd[n] = s_tilda[int((n+m1)%31)] s0_m0_odd[n] = s_tilda[int((n+m0)%31)]
c1_odd = np.zeros(31) for n in range(31): c1_odd[n] = c_tilda[int((n+NID2+3)%31)]
z1_m0_odd = np.zeros(31) z1_m1_odd = np.zeros(31) for n in range(31): z1_m0_odd[n] = z_tilda[int((n+m0%8)%31)] z1_m1_odd[n] = z_tilda[int((n+m1%8)%31)]
d_odd_sub0 = s1_m1_odd * c1_odd * z1_m0_odd d_odd_sub5 = s0_m0_odd * c1_odd * z1_m1_odd
d_sub0 = np.zeros(62) d_sub0[::2] = d_even_sub0 d_sub0[1::2] = d_odd_sub0 sss0 = d_sub0
d_sub5 = np.zeros(62) d_sub5[::2] = d_even_sub5 d_sub5[1::2] = d_odd_sub5 sss5 = d_sub5
return sss0, sss5
def cross_correlate(a, b): return np.sum(a * np.conj(b))
def DetectPSS(iq_samples, fft_bin_size=2048):
pss_seqs = [generate_pss(i) for i in range(3)]
max_correlation = 0 max_window_samples = None max_correlation_index = None max_correlation_position = None
max_correlation_pss0 = max_correlation_pss1 = max_correlation_pss2 = 0
for j in range(len(iq_samples) - fft_bin_size): iq_window = iq_samples[j : j + fft_bin_size] frequency_samples = np.fft.fft(iq_window, fft_bin_size) frequency_samples = np.fft.fftshift(frequency_samples)
for k in range(3): window_start = (fft_bin_size - 62) // 2 window_end = window_start + 62 window_samples = frequency_samples[window_start:window_end]
correlation = np.sum(window_samples * np.conj(pss_seqs[k]))
if k == 0 and abs(correlation) > max_correlation_pss0: max_correlation_pss0 = abs(correlation) elif k == 1 and abs(correlation) > max_correlation_pss1: max_correlation_pss1 = abs(correlation) elif k == 2 and abs(correlation) > max_correlation_pss2: max_correlation_pss2 = abs(correlation)
if abs(correlation) > max_correlation: max_correlation = abs(correlation) max_correlation_index = k max_correlation_position = j max_window_samples = window_samples pss_start = j pss_time_domain = np.abs(iq_window)
iq_window = iq_samples[max_correlation_position : max_correlation_position + fft_bin_size] frequency_samples = np.fft.fft(iq_window, fft_bin_size) frequency_samples = np.fft.fftshift(frequency_samples) received_pss = frequency_samples[993:1055] expected_pss = pss_seqs[max_correlation_index] frequency_offset = frequency_offset_estimation(received_pss, expected_pss)
cp_lengths = [160, 144, 144, 144, 144, 144, 144, 160, 144, 144, 144, 144, 144, 144] pss_slot_index = 6
first_symbol_start = (pss_start - cp_lengths[pss_slot_index]) - 5 * (cp_lengths[pss_slot_index]+fft_bin_size) - (cp_lengths[0]+fft_bin_size)
if pss_start >= first_symbol_start + 6 * fft_bin_size and pss_start + 8 * fft_bin_size <= len(iq_samples): ofdm_symbols = [] start_index = first_symbol_start for i in range(14): start_index = start_index + cp_lengths[i] end_index = start_index + fft_bin_size ofdm_symbols.append(iq_samples[start_index : end_index]) start_index = end_index ofdm_symbols = np.array(ofdm_symbols) else: print("Not enough samples to extract 14 OFDM symbols.")
return { 'max_correlation': max_correlation, 'max_correlation_pss0': max_correlation_pss0, 'max_correlation_pss1': max_correlation_pss1, 'max_correlation_pss2': max_correlation_pss2, 'max_correlation_index': max_correlation_index, 'max_correlation_position': max_correlation_position, 'frequency_offset': frequency_offset, 'max_window_samples': max_window_samples, 'ofdm_symbols': ofdm_symbols, 'pss_start' : pss_start, 'pss_time_domain' : pss_time_domain , }
def getREs(resourceGrid, symNo, scList):
try: return [ofdm_symbols[symNo][sc] for sc in scList] except IndexError: print("Invalid symbol number or subcarrier index") return []
def ShowPSS(iq_samples, detected_pss_freq_domain, max_correlation_index, ofdm_symbols, pss_start, pss_time_domain, fft_bin_size = 2048):
fig = plt.figure(figsize=(15, 9))
ax1 = plt.subplot2grid((7, 3), (0, 0), rowspan=2, colspan=1) ax3 = plt.subplot2grid((7, 3), (2, 0), rowspan=2, colspan=1) ax4 = plt.subplot2grid((7, 3), (4, 0), rowspan=2, colspan=1) ax5 = plt.subplot2grid((7, 3), (6, 0), rowspan=1, colspan=1)
fft_iq_samples = np.fft.fft(iq_samples) fft_iq_samples = np.fft.fftshift(fft_iq_samples) fft_iq_samples_db = 20 * np.log10(np.abs(fft_iq_samples)) ax1.plot(fft_iq_samples_db) ax1.set_title('Frequency Spectrum of IQ samples (dB scale)', fontsize=8) ax1.grid(True) ax1.set_xlabel('Frequency', fontsize=8) ax1.set_ylabel('Amplitude (dB)', fontsize=8) ax1.tick_params(axis='both', labelsize=8)
fft_detected_pss = np.fft.fft(detected_pss_freq_domain) fft_detected_pss_normalized = fft_detected_pss / np.abs(fft_detected_pss).max()
ax3.scatter(fft_detected_pss_normalized.real, fft_detected_pss_normalized.imag, label='Detected PSS') normalized_data = fft_detected_pss / np.abs(fft_detected_pss) ax3.scatter(normalized_data.real, normalized_data.imag, label='Normalized PSS', alpha=0.5) ax3.legend() ax3.grid(True) ax3.set_title('IQ Constellation of Detected PSS and Normalized PSS', fontsize=8) ax3.set_xlabel('In-Phase (I)', fontsize=8) ax3.set_ylabel('Quadrature (Q)', fontsize=8) ax3.legend(fontsize=8) ax3.tick_params(axis='both', labelsize=8)
generated_pss = generate_pss(max_correlation_index) ax4.scatter(generated_pss.real, generated_pss.imag) ax4.grid(True) ax4.set_title('IQ Constellation of Generated PSS for NID2='+str(max_correlation_index), fontsize=8) ax4.set_xlabel('In-Phase (I)', fontsize=8) ax4.set_ylabel('Quadrature (Q)', fontsize=8) ax4.tick_params(axis='both', labelsize=8)
mag_iq_samples = np.abs(iq_samples) ax5.plot(mag_iq_samples) ax5.plot(range(pss_start, pss_start + len(pss_time_domain)), pss_time_domain, label='Detected PSS', linestyle='--') ax5.grid(True) ax5.set_title('IQ in time doman and Detected PSS', fontsize=8) ax5.set_xlabel('Time', fontsize=8) ax5.set_ylabel('Amplitude', fontsize=8) ax5.legend(fontsize=8) ax5.tick_params(axis='both', labelsize=8)
axs = [plt.subplot2grid((7, 3), (i % 7, i // 7 + 1)) for i in range(14)]
for i in range(14): ax = axs[i] symbol = ofdm_symbols[i] freq_domain_symbol = np.fft.fft(symbol, fft_bin_size) freq_domain_symbol = np.fft.fftshift(freq_domain_symbol) freq_domain_symbol = freq_domain_symbol / np.max(np.abs(freq_domain_symbol)) freq_domain_symbol_db = 20 * np.log10(np.abs(freq_domain_symbol)) ax.plot(freq_domain_symbol_db) ax.set_ylim([-50, 0])
plt.tight_layout() plt.show()
def ShowGrid(ofdm_symbols, nRB = 100, zmin=-25, zmax=0):
frequency_domain_symbols = np.fft.fft(ofdm_symbols, axis=1) frequency_domain_symbols = np.fft.fftshift(frequency_domain_symbols, axes=1)
magnitude = np.abs(frequency_domain_symbols) magnitude = magnitude / np.max(magnitude, axis=1, keepdims=True)
magnitude_db = 20 * np.log10(magnitude + np.finfo(float).eps)
total_subcarriers = frequency_domain_symbols.shape[1]
start_index = (total_subcarriers - nRB * 12) // 2 end_index = start_index + nRB * 12
magnitude_db = magnitude_db[:, start_index:end_index]
fig, ax = plt.subplots(figsize=(14, 5))
cax = ax.imshow(magnitude_db, aspect='auto', cmap='RdBu_r', interpolation='none', vmin=zmin, vmax=zmax)
fig.colorbar(cax)
ax.set_xlabel("Subcarriers") ax.set_ylabel("OFDM Symbols") ax.set_title("Resource Grid")
symbol_labels = ['Symbol {}'.format(i) for i in range(ofdm_symbols.shape[0])] ax.set_yticks(np.arange(ofdm_symbols.shape[0])) ax.set_yticklabels(symbol_labels)
ax.set_xticks(np.arange(0, nRB * 12 + 1, 12*5)) ax.set_xticklabels(np.arange(0, nRB * 12 + 1, 12*5))
plt.tight_layout() plt.show()
def getResourceGrid(ofdm_symbols, nRB = 100): frequency_domain_symbols = np.fft.fft(ofdm_symbols, axis=1) frequency_domain_symbols = np.fft.fftshift(frequency_domain_symbols, axes=1)
total_subcarriers = frequency_domain_symbols.shape[1]
start_index = (total_subcarriers - nRB * 12) // 2 end_index = start_index + nRB * 12
resource_grid = frequency_domain_symbols[:, start_index:end_index]
return resource_grid
def DetectSSS(resourceGrid, N_ID_2, frequency_offset, nRB = 100, samplingRate = 30.72e6, debug = False): if debug : print("[Debug Info] Frequency Offset ==> ",frequency_offset) print("[Debug Info] N_ID_2 ==> ",N_ID_2)
noOfRE = nRB * 12 sss_sc_indices = list(range(noOfRE-noOfRE//2-31, noOfRE-noOfRE//2+31))
sss_no_correction = getREs(resourceGrid, 5, sss_sc_indices)
sss = correct_frequency_offset(sss_no_correction, frequency_offset, samplingRate)
max_corr = -np.inf best_N_ID_1 = None for N_ID_1 in range(168): # Generate expected SSS expected_sss0, expected_sss5 = generate_sss(N_ID_1, N_ID_2) # Calculate correlation corr0 = np.abs(cross_correlate(sss, expected_sss0)) corr5 = np.abs(cross_correlate(sss, expected_sss5)) total_corr = max(corr0, corr5) if debug : print("corr0 = ", corr0, ",corr5 = ", corr5, ", N_ID_1 = ", N_ID_1)
if total_corr > max_corr: max_corr = total_corr best_N_ID_1 = N_ID_1 if debug : print("corr0 = ", corr0, ", corr5 = ", corr5, ", max_corr = ", max_corr, ", N_ID_1 = ", N_ID_1)
if debug : print('Detected SSS (N_ID_1): ', best_N_ID_1)
detected_subframe = 0 if cross_correlate(sss, generate_sss(best_N_ID_1, N_ID_2)[0]) > cross_correlate(sss, generate_sss(best_N_ID_1, N_ID_2)[1]) else 5
return { 'sss_no_correction': sss_no_correction, 'sss_corrected': sss, 'N_ID_1': best_N_ID_1, 'detected_subframe': detected_subframe, }
def getPCI(N_ID_1, N_ID_2) : return 3*N_ID_1 + N_ID_2
def ShowSSS(sss_no_correction, sss_corrected, N_ID_1, N_ID_2,detected_subframe):
sss_generated = np.array(generate_sss(N_ID_1, N_ID_2)) if detected_subframe == 0 : sss_generated = sss_generated[0] else : sss_generated = sss_generated[1]
sss_no_correction = np.array(sss_no_correction, dtype=np.complex128) sss_corrected = np.array(sss_corrected, dtype=np.complex128)
sss_no_correction = sss_no_correction / np.mean(np.abs(sss_no_correction)) sss_corrected = sss_corrected / np.mean(np.abs(sss_corrected))
data_list = [sss_no_correction, sss_corrected, sss_generated ] #print("[Debug Info] ==> " , data_list) titles = ['sss_no_correction', 'sss_corrected', 'sss from N_ID_1'] fig, axs = plt.subplots(3, 3, figsize=(9, 9))
for i, data in enumerate(data_list): data = np.array(data, dtype=np.complex128)
axs[0, i].scatter(data.real, data.imag, color='b') axs[0, i].set_title(f'{titles[i]}', fontsize=8) axs[0, i].set_xlabel('Real', fontsize=8) axs[0, i].set_ylabel('Imaginary', fontsize=8) axs[0, i].tick_params(axis='both', which='major', labelsize=8) axs[0, i].set_xlim(-2,2) axs[0, i].set_ylim(-2,2)
axs[1, i].plot(data.real) axs[1, i].set_title(f'{titles[i]}', fontsize=8) axs[1, i].set_xlabel('Data Index', fontsize=8) axs[1, i].set_ylabel('Real', fontsize=8) axs[1, i].tick_params(axis='both', which='major', labelsize=8) axs[1, i].set_ylim(-2,2)
axs[2, i].plot(data.imag) axs[2, i].set_title(f'{titles[i]}', fontsize=8) axs[2, i].set_xlabel('Data Index', fontsize=8) axs[2, i].set_ylabel('Imaginary', fontsize=8) axs[2, i].tick_params(axis='both', which='major', labelsize=8) axs[2, i].set_ylim(-2,2)
plt.tight_layout() plt.show()
def main(): with open('IQ/lte_20Mhz_rate30.72Mhz_dur_10ms_pci252.bin', 'rb') as f: # expected PSS Seq = 0s iq_samples = np.fromfile(f, dtype=np.complex64)
results = DetectPSS(iq_samples)
print(f'Detected PSS Sequence (N_ID_2): {results["max_correlation_index"]}') print(f'Maximum correlation: {results["max_correlation"]}') print(f'Maximum correlation for PSS0: {results["max_correlation_pss0"]}') print(f'Maximum correlation for PSS1: {results["max_correlation_pss1"]}') print(f'Maximum correlation for PSS2: {results["max_correlation_pss2"]}') print(f'Position of maximum correlation: {results["max_correlation_position"]}') print(f'Estimated Frequency Offset: {results["frequency_offset"]} Hz')
max_correlation_index = results["max_correlation_index"] pss_start = results["pss_start"] pss_time_domain = results["pss_time_domain"] ofdm_symbols = results["ofdm_symbols"] detected_pss_freq_domain = results["max_window_samples"]
resourceGrid = getResourceGrid(ofdm_symbols, nRB = 100) print("Dimension of resourceGrid = ",resourceGrid.shape)
# Detect SSS N_ID_2 = max_correlation_index resultsSSS = DetectSSS(resourceGrid, N_ID_2, frequency_offset, nRB = 100, samplingRate = 30.72e6)
N_ID_1 = resultsSSS["N_ID_1"] sss_no_correction = resultsSSS["sss_no_correction"] sss_corrected = resultsSSS["sss_corrected"] detected_subframe = resultsSSS["detected_subframe"]
print('Detected Subframe : ', detected_subframe)
print('Detected N_ID_2 (PSS) : ', N_ID_2) print('Detected N_ID_1 (SSS) : ', N_ID_1)
PCI = getPCI(N_ID_1, N_ID_2) print('Detected PCI: ', PCI)
ShowPSS(iq_samples, detected_pss_freq_domain, max_correlation_index, ofdm_symbols, pss_start, pss_time_domain) ShowGrid(ofdm_symbols, nRB = 100, zmin = -25, zmax = 0) ShowSSS(sss_no_correction, sss_corrected, N_ID_1, N_ID_2,detected_subframe)
if __name__ == "__main__": main() |