Step 04: Preprocessing data

##############################################################
######## Project Name : SATSURE Credit Risk Product
######## File Usage : This script will clean the raw csv files extracted from raster images. Handles missing, extreme and NA values.
######## Date : November 20, 2018
######## Author : Actify Data Labs
##############################################################

# How to run the script
# Rscript preprocess_data_satsure.R <veg_index> <crop_type> <district> <no of cores>

#Passing arguments
args <- commandArgs(TRUE)
veg_index <- toupper(args[1])
crop <- args[2]
district <- tools::toTitleCase(args[3])
(no_of_cores <- as.integer(args[4]))
###################

## Get maximum cores of the machine
(max_cores <- parallel::detectCores(all.tests = FALSE, logical = TRUE))

## Use max cores if specified no of cores is more than max cores
if (no_of_cores > max_cores){
print(glue::glue(“Specified no of cores ({no_of_cores}) is less than maximum cores of the machine ({max_cores}). So, using maximum cores avaiable.”))
no_of_cores <- max_cores
}

future::plan(future::multiprocess, workers = no_of_cores)

### R script for faster imputation of vegetation indices when there is cloud
#district <- “Vizianagaram”
path_to_files <- paste0(“/home/ubuntu/results”,”/”,district) # arg1 from commandline
preprocessed_files_path <- paste0(“/home/ubuntu/results/preprocessed”,”/”,district) # arg2 from commandline
#veg_index <- “ndvi” # arg3 from commandline
#crop <- “maize” # arg4 from commandline

### Create output directory if doesn’t exist
if (!dir.exists(preprocessed_files_path)){
dir.create(preprocessed_files_path,recursive = TRUE)
}

############## loading libs ##############
library(readr) # for reading csv files
library(data.table) # for read write
library(dplyr)
library(stringr) # string methods
library(glue)
library(moments) # calculating skewness
library(matrixStats) # fast column means
############## loading libs ##############

############## helper functions ##############
### julian_to_data() converts julian date to normal dates
## @jdate -> Date in julian format eg: 2017113 –> 113th day in year 2017
julian_to_date <- function(jdate){

if (nchar(jdate) == 8){
return(jdate)

} else {
year <- as.numeric(substr(jdate,1,4))
jday <- as.numeric(substr(jdate, 5, 7))
origin <- as.Date(glue(“{year}-01-01”))

g_date <- as.Date(jday, origin = origin)
g_date <- format(g_date, ‘%Y%m%d’ )
return(g_date)
}
}

## parse_year_index_crop_mandal() extracts year, index, crop from a file name
# @path -> name of the csv file
parse_year_index_crop_mandal <- function(path){
# stop if function not a character/string
# expected input is a path to csv file. which in our case should correspond to directory structure arranged by index, crop etc.
stopifnot(is.character(path))

year <- str_extract(string = path, pattern = “[0-9]{4}”)
index <- str_extract(string = path, pattern = “NDVI|NDWI”)
crop <- str_extract(string = path, pattern = “maize|ragi”)
mandal <- sub(pattern = “.*\\/(.*?)(?:\\.[^.]*)?$”, replacement = “\\1”, x = path) %>%
gsub(pattern = “_.*”, replacement = “”, x = .)

# return(paste(year,index,crop,mandal,sep = “_”))
# return(list(year = year, index = index, crop = crop, mandal = mandal))

return(list(name = paste(year,index,crop,mandal,sep = “_”), index_list = list(year = year, index = index, crop = crop, mandal = mandal)))
}

### step 1: map all values outside [100,200] in a column to zero
### step 2: if all values of a column are outside of [100,200] => drop the column

##map_to_zeros() replaces extreme values and NA values with 0 and removes entire column if all values are zero
# @mat -> ndvi/ndwi timeseries data in matrix format
map_to_zeros <- function(mat){
# input arg should be a matrix and not a vector and should have atleast one row
stopifnot(ncol(mat)>1)
stopifnot(nrow(mat)>0)

### step 0: replace NA with zeroes if any present in a column
temp_mat <- apply(X = mat, MARGIN = 2, function(x) ifelse(is.na(x),0,as.numeric(x)))

### step 1: replace with zeroes those values outside [100,200]
temp_mat2 <- apply(X = temp_mat, MARGIN = 2, function(x) ifelse(between(x,100,200),x,0))

cols_with_all_zeros <- apply(X = temp_mat2, MARGIN = 2, function(x) all(x==0))

### step 2: if all values of a column are outside of [100,200] => drop the column
return(temp_mat2[, !cols_with_all_zeros])
}

### impute by sampling
### step 1: take mean of sample (size 50 with replacement) from non-zero columns, N (=10) times.
### step 2: Take mean of these means and impute. Repeat for all zeros in the columns

## impute_by_sampling() replaces zeros with mean of sample means of non zero values within a column
# @mat <- ndvi/ndwi time series data after removing extreme values and NAs
impute_by_sampling <- function(mat){
# input arg should be a matrix and not a vector and should have atleast one row
stopifnot(ncol(mat)>1)
stopifnot(nrow(mat)>0)

temp_mat <- apply(X = mat, MARGIN = 2, function(x) { # for all columns in the matrix
non_zero_vals <- x[x!=0] # make the vector of non-zero values accessible to inner loop
num_zeros <- sum(x==0) # number of zeros to be imputed
if(length(non_zero_vals)==length(x)){ # if all are non-zeroes … don’t impute
return(x)
} else {
imputed_vals <- replicate(n = num_zeros,
expr = mean(replicate(n = 10, expr = mean(sample(non_zero_vals,size = 50,replace = TRUE)))))
x[x==0] <- imputed_vals
return(x)
}
})
}

### mark correct sowing period for rabi season. to avoid late rice phenomenon

## sowing_correction() returns the column number where late rice phenomenon ends based on skewness values
# @mat -> ndvi/ndwi data after imputation with means
sowing_correction <- function(mat){
# input arg should be a matrix and not a vector and should have atleast one row
stopifnot(ncol(mat)>1)
stopifnot(nrow(mat)>0)

skew <- apply(X = mat, MARGIN = 2, function(x) skewness(x))

if (all(skew < 0)){
return(NULL)
}

#### If index position > 1, use the previous index timestamp and consider x and y columns while calculating fst_idx
fst_idx <- ifelse(which(skew >= 0)[1] > 1,which(skew >= 0)[1] – 1, 1)

# its suggested that sowing in all probability starts latest by 15th Jan. will handle this case later.
# after_Jan <- as.Date(names(fst_idx),format = “X%Y%m%d”) > glue(“{as.numeric(year)+1}-01-15”)
# if (after_Jan){
# print(“Skipped processing file for {manda} {crop} {index} {year} since SKewness is postive after Jan 15th”)
# next
# }
# return(mat[,fst_idx:ncol(mat)])
return(fst_idx)
}
############## helper functions ##############

############## imputation of data ##############
### reading raw files
pattern_of_interest <- paste0(crop,”_”,toupper(veg_index))

list_of_raw_files <- setdiff(
list.files(
path = path_to_files,
pattern = pattern_of_interest,
recursive = TRUE,
full.names = TRUE
),
list.files(
path = path_to_files,
pattern = “skew|feature|pixel”,
recursive = TRUE,
full.names = TRUE
)
)

destination_file <- glue::glue(“/home/ubuntu/results/preprocessed/{district}_{crop}_index_lookup.csv”)

if (tolower(veg_index) == “ndvi” & file.exists(destination_file)){
fs::file_delete(destination_file)
}

if(tolower(veg_index) == “ndwi”){
index_file <- data.table::fread(destination_file, header = F, col.names = c(“year”,”index”, “crop”, “mandal”, “first_index”))
}

rows <- list()
### reading files and calculating pixelwise mean
#write_preprocessed_files <- pbapply::pblapply(X = list_of_raw_files, function(x){
write_preprocessed_files <- future.apply::future_lapply(X = list_of_raw_files, function(x){
tryCatch({
# read data and correct column names
raw_data <- read_csv(file = x)
col_names <- colnames(raw_data)

### Converting dates to normal from julien… first two columns are lat-long of pixel
col_names[3:length(col_names)] <- gsub(pattern = “[a-zA-Z]”, replacement = “”, col_names[3:length(col_names)]) %>%
gsub(pattern = ‘^[_]+’, replacement = “”, x = .) %>%
word(string = .,start = 1,sep = “\\_”) %>%
sapply(X = ., julian_to_date) %>%
paste0(“X”,.)
colnames(raw_data) <- col_names

# impute with mean or drop in case all elements of a column is outside [100-200]
coord_df <- as.matrix(raw_data[,c(“x”,”y”)]) # x and y correspond to lat-long and are first two cols in DF

impute_with_zeroes <- map_to_zeros(raw_data[,3:ncol(raw_data)])
impute_with_means <- impute_by_sampling(impute_with_zeroes)

if(tolower(veg_index)==”ndvi”){
# drop columns to demarcate the correct starting point of maize/ragi sowing to avoid late rice phenomenon
fst_idx <- sowing_correction(impute_with_means)
if(is.null(fst_idx)){
return(NULL)
}
correction_for_kharif_spill <- impute_with_means[,fst_idx : ncol(impute_with_means)]
# write first index to file
row <- rlist::list.append(parse_year_index_crop_mandal(x)$index_list,first_index = fst_idx)
data.table::fwrite(x = row, file = destination_file, append = TRUE, sep = “,”, col.names = FALSE, row.names = FALSE)

}else{
lst <- parse_year_index_crop_mandal(x)$index_list

fst_idx <- index_file %>% filter(year == lst$year, crop == lst$crop, mandal == lst$mandal) %>% select(first_index) %>% pull
print(fst_idx)
if(is.null(fst_idx)){
return(NULL)
}
correction_for_kharif_spill <- impute_with_means[,fst_idx : ncol(impute_with_means)]
}

# calculating pixel counts for each band of rowmeans
row_min <- matrixStats::rowMins(correction_for_kharif_spill)
row_max <- matrixStats::rowMaxs(correction_for_kharif_spill)
row_median <- matrixStats::rowMedians(correction_for_kharif_spill)
row_means <- matrixStats::rowMeans2(correction_for_kharif_spill)

pixels_for_mandal <- cbind(coord_df, correction_for_kharif_spill, row_min, row_max, row_median, row_means)

file_name <- paste0(preprocessed_files_path,”/”,parse_year_index_crop_mandal(x)$name,”.csv”)

print(file_name)

fwrite(x = as.data.table(pixels_for_mandal), file = file_name, row.names = FALSE)
return(“success”)
}, error=function(e) NULL)
})

# for debugging
saveRDS(object = write_preprocessed_files, file = glue(“/home/ubuntu/file_prepocessed_successfully_{crop}_{veg_index}_{district}.rds”))

#