Step 05:Clustering for All Mandals

##############################################################
######## Project Name : SATSURE Credit Risk Product
######## File Usage : This script will create clusters for all mandals and all years and will save centroid objects and create clustering summary files for each mandal.
######## Date : November 20, 2018
######## Author : Actify Data Labs
##############################################################

# How to run the script
# Rscript clustering_for_all_mandals.R <crop_type> <district> <no_of_cores>

# rm(list = ls())
args <- commandArgs(TRUE)
crop_type <- tolower(args[1])
district <- tools::toTitleCase(args[2])
(no_of_cores <- as.integer(args[3]))
###################

# Importing libraries
library(dplyr)
library(dtwclust)
library(data.table)
library(glue)
library(magrittr)
library(janitor)

# Parameters ————————————————————-
# crop_type <- “maize”
# district <- “Vizianagaram”

data_dir <- glue(“/home/ubuntu/results/preprocessed/{district}”)
years <- as.character(2013:2017)
names(years)<- paste0(“X”,years)
shp_dir <- glue::glue(“/home/ubuntu/{district}/SHP/mandal_shapefiles”)

## Get maximum cores of the machine
(max_cores <- parallel::detectCores(all.tests = FALSE, logical = TRUE))

## Use max cores if specified no of cores is more than max cores
if (no_of_cores > max_cores){
print(glue(“Specified no of cores ({no_of_cores}) is less than maximum cores of the machine ({max_cores}). So, using maximum cores avaiable.”))
no_of_cores <- max_cores
}

future::plan(future::multiprocess, workers = no_of_cores)

############# Getting all the mandal names ###################################################
file_list <- list.files(path = shp_dir, pattern = “shp$|geojson$”)

# mandals <- list.files(path = shp_dir, pattern = “shp$|geojson$”) %>%
# gsub(pattern = “.shp”,replacement = “”, x = ., fixed = T)

mandals <- lapply(file_list, function(file_name){
name <- strsplit(file_name, “\\.”) %>% unlist %>% first
return(name)
}) %>% unlist

names(mandals) <- mandals

########## Create centroids folder ###########################
centroids_path <- glue(“{data_dir}/centroid_objects/{crop_type}”)

if(!dir.exists(centroids_path)){
dir.create(centroids_path,recursive = TRUE)
}

# mandal <- mandals[2]
# year <- years[1]
# helper functions ——————————————————-

## raw_and_scaled() returns list of raw timeseries and z score scaled time series and coordinates (x,y) dataframes
# @data_table -> raw ndvi/ndwi timeseries csv appended with row statistics
raw_and_scaled <- function(data_table){
# input data frame is a df as a result of reading any file from the ~/results/preprocessed directory
# raw_ts <- as.matrix(select(data_csv,starts_with(“X2”)))
coordf <- select(data_table,c(x,y))
raw_ts <- as.matrix(select(data_table, starts_with(“X2”)))
scaled_ts <- zscore(t(raw_ts)) %>% t
row_stats <- as.matrix(select(data_table, starts_with(“row_”)))
# scaled_ts <- scale(raw_ts, center = TRUE, scale = TRUE)
return(list(raw_ts = raw_ts, scaled_ts = scaled_ts, row_stats = row_stats, coordf = coordf))
}

### Input –> named list with raw_ts and scaled_ts data in the format required for tsclust function
### Output –> cluster_objects

## create_dtw_clusters() returns clusters objects only for raw timeseries for different number of clusters
# @df_list -> list of raw timeseries, scaled timeseries and coordinate dataframe
# @start -> minimum number of clusters
# @end -> maximum number of clusters
create_dtw_clusters <- function(df_list,start,end){
if (is.null(df_list)){
cluster_objects <- NULL
}
no_of_clusters <- seq(from = start, to = end, by = 1)
names(no_of_clusters) <- paste0(“k”,no_of_clusters)
cluster_objects <- lapply(no_of_clusters, function(k){
raw_ts_clustered <- tsclust(df_list$raw_ts, type = “partitional”, k = k,
distance = “dtw_basic”, centroid = “mean”,
seed = 301L, trace = FALSE,
args = tsclust_args(dist = list(window.size = 3L)),
control = partitional_control(nrep = 1L, iter.max = 200L))
# scaled_ts_clustered <- tsclust(df_list$scaled_ts, type = “partitional”, k = k,
# distance = “dtw_basic”, centroid = “mean”,
# seed = 301L, trace = FALSE,
# args = tsclust_args(dist = list(window.size = 3L)),
# control = partitional_control(nrep = 1L, iter.max = 200L))
# return(list(raw = raw_ts_clustered,scaled = scaled_ts_clustered))
return(list(raw = raw_ts_clustered))
})
return(cluster_objects)
}

# mandal <- mandals[‘Bhamini’]
# year <- years[“X2017”]
# creating cluster objects ———————————————–
clustering_mandalwise <- future.apply::future_lapply(mandals[1:3], function(mandal){

print(glue::glue(“Starting clustering for mandal {mandal}…..”))

mandal_clusters_yearwise <- pbapply::pblapply(years[5], function(year){
file_list <- list.files(data_dir, pattern = glue::glue(“{crop_type}_{mandal}”), full.names = TRUE) %>% grep(pattern = year, x = ., value = TRUE)

if(length(file_list)==0){
data_for_clus <- NULL
final_result <- NULL
}else if(length(file_list)==1){
names(file_list) <- sapply(strsplit(x = sub(pattern = ‘.*\\/’, ”, file_list), split = “_”), function(x) x[2])

df_list <- lapply(file_list, function(x) data.table::fread(x) %>% raw_and_scaled())

index_name <- names(df_list)

if (nrow(df_list[[index_name]]$coordf) <= 200){
final_result <- glue::glue(“No clustering done for {mandal} for the year {year} as number of pixels were low”)
}else{
data_for_clus_raw <- df_list[[index_name]]$raw_ts
# data_for_clus_scaled <- Map(cbind,tslist(df_list$NDVI$scaled_ts), tslist(df_list$NDWI$scaled_ts))
# data_for_clus <- list(raw_ts = data_for_clus_raw, scaled_ts = data_for_clus_scaled)
data_for_clus <- list(raw_ts = data_for_clus_raw)

cluster_objects <- create_dtw_clusters(data_for_clus,start = 2,end = 6)
names(cluster_objects) <- glue(“k{2:6}”)

centroids <- lapply(cluster_objects, function(k){k$raw@centroids})

saveRDS(centroids,file = glue(“{centroids_path}/{mandal}_centroids_{crop_type}_{year}.rds”))

clus_ids <- sapply(cluster_objects, function(x) x$raw@cluster, USE.NAMES = TRUE)

stats <- df_list[[index_name]]$row_stats
colnames(stats) <- paste0(tolower(index_name),”_stats_”,colnames(stats))
final_result <- cbind(coords = df_list[[index_name]]$coordf, clus_ids, stats) %>%
clean_names() %>%
mutate(year = year)

}
}else {
names(file_list) <- sapply(strsplit(x = sub(pattern = ‘.*\\/’, ”, file_list), split = “_”), function(x) x[2])

df_list <- lapply(file_list, function(x) data.table::fread(x) %>% raw_and_scaled())

if (nrow(df_list$NDVI$coordf) <= 200){
final_result <- glue::glue(“No clustering done for {mandal} for the year {year} as number of pixels were low”)
}else{
data_for_clus_raw <- Map(cbind,tslist(df_list$NDVI$raw_ts), tslist(df_list$NDWI$raw_ts))
# data_for_clus_scaled <- Map(cbind,tslist(df_list$NDVI$scaled_ts), tslist(df_list$NDWI$scaled_ts))
# data_for_clus <- list(raw_ts = data_for_clus_raw, scaled_ts = data_for_clus_scaled)
data_for_clus <- list(raw_ts = data_for_clus_raw)

cluster_objects <- create_dtw_clusters(data_for_clus,start = 2,end = 6)
names(cluster_objects) <- glue(“k{2:6}”)

centroids <- lapply(cluster_objects, function(k){k$raw@centroids})

saveRDS(centroids,file = glue(“{centroids_path}/{mandal}_centroids_{crop_type}_{year}.rds”))

clus_ids <- sapply(cluster_objects, function(x) x$raw@cluster, USE.NAMES = TRUE)
final_result <- cbind(coords = df_list$NDVI$coordf, clus_ids, ndvi_stats = df_list$NDVI$row_stats, ndwi_stats = df_list$NDWI$row_stats) %>%
clean_names() %>%
mutate(year = year)
} # end of inner if-else
}# end of outer if-else
return(final_result)
}) # lapply for year ends here

list.condition <- lapply(mandal_clusters_yearwise, class) == “data.frame”
if(all(list.condition==FALSE)){
print(glue::glue(“No clustering done for mandal {mandal} for any year”))
return(glue::glue(“No clustering for mandal {mandal}”))
}else{

mandal_clusters_yearwise <- mandal_clusters_yearwise[list.condition]
df <- data.table::rbindlist(mandal_clusters_yearwise, fill = TRUE)
destination_path <- glue::glue(“{data_dir}/mandalwise_clusters_dataframes/{crop_type}”)

if(!dir.exists(destination_path)){
dir.create(destination_path, recursive = TRUE)
}

filename <- glue::glue(“{destination_path}/{mandal}_{crop_type}_clusters_df.csv”)
data.table::fwrite(x = df, file = filename, append = FALSE, sep = “,”, row.names = FALSE, col.names = TRUE)
print(glue::glue(“Written cluster dataframe successfully for mandal {mandal}”))
return(“Success”)
}

})# lapply for mandal ends here