ScalaES: Apache Spark and ElasticSearch Connector

Nikhil Suthar
7 min readApr 21, 2020

--

ScalaES Connector is a Scala-based ElasticSearch Client. It works on the CURL method to execute a Query on ElasticSearch. There are many Scala-based ElasticSeach clients are available but most of them do not provide a way to authentication in case of SSL configured ElasticSearch.

We have developed a connector that works with SSL as well as nonSSL configure ElasticSearch. This Utility provides all ES operations that can be executed using Spark with Scala. You can find more details from here.

All methods are written in scala that can easily implemented with Apache Spark. Apache Spark comes with default elastic search connector but that connector does not provide delete or update functionally. In that case, this connector will be very useful.

These are built-in methods that come with ScalaES connector:

  1. getElasticInfo: Method to get Elastic Search Health Information.
  2. getSourceData: Get all Data from ES index in JSON format
  3. searchWithQry: Searching or filter out data based on ElasticSearch Query
  4. searchByID: Get/search ElasticSearch Index’s document by passing _ID
  5. dataParse: Utility to parse final JSON output to Map
  6. indexExists: Check Index exists or not.
  7. createIndex: Create Index with mapping
  8. deleteIN: Delete document based on one field value
  9. deleteById: Delete data based on Index _id
  10. deleteByQry: Delete Index documents based on Query
  11. updateValue: Update Column value for one particular _id by using passed Query.
  12. insertIntoES: Insert a single document into Index.

Quick Start

Step1: Download Jar

Click here to download JAR.

Step 2: managed external Jar dependency in code

Move downloaded jar into your project library using below steps:

1. Create a folder named “libs” at your project root if not exists.

2. move the downloaded jar to folder libs.

3. Add an unmanaged dependency in your build.sbt file by using the below command.

unmanagedBase := baseDirectory.value/"libs"

Step 3: Import in code

import com.elasticHttp.ESUtil.ESClient

step 4: Defined ESClient object

If the cluster is without any authentication

val client = new ESClient("ES Node IP/hostname", "Port")

with authentication

val client = new ESClient("IP/hostname", "port","user","password")

step 5: Call required method with respective parameter

If the cluster is SSL configured (https://…)

val info = client.getElasticInfo(“indices”, true)

If the cluster is not SSL Configured (http://…..)

val info = client.getElasticInfo("indices")

Let us take all methods one by one and explain it’s use cases:

Note: Index Type for ElasticSearch version 7.x.x needs to pass “_doc” for all indices.

Cluster Information

There is only one method getElasticInfo that used to get cluster information such as health, number of nodes, number of shards, master node and many more. This method provides multiple possible parameters that give all the required information.

getElasticInfo

  • getElasticInfo method uses to get cluster-level information such as Cluster Health, nodes alive, master and many more. It takes two parameters in which the first parameter “aboutInfo” is mandatory.
  • Syntax: getElasticInfo(aboutInfo,SSL)
  • aboutInfo parameter has fixed possible values such as “master”, “nodes”, “indices”. You can get complete lists of possible value here.
  • Sample Code:
  • Output:
-------------Cluster Health----------------
[
{
"epoch" : "1585907396",
"timestamp" : "09:49:56",
"cluster" : "My-ElasticSearch-Cluster",
"status" : "green",
"node.total" : "1",
"node.data" : "1",
"shards" : "1",
"pri" : "1",
"relo" : "0",
"init" : "0",
"unassign" : "1",
"pending_tasks" : "0",
"max_task_wait_time" : "-",
"active_shards_percent" : "50.0%"
}
]
-------------All index Information----------------
[
{
"health" : "green",
"status" : "open",
"index" : "company",
"uuid" : "2EKcyWtORnqMSZoDlrM9Dg",
"pri" : "1",
"rep" : "1",
"docs.count" : "14",
"docs.deleted" : "0",
"store.size" : "7.5kb",
"pri.store.size" : "7.5kb"
}
]
-------------Cluster's Master node----------------
[
{
"id" : "eJ2L2ujZR-OfFZZyMJVgxQ",
"host" : "192.128.56.132",
"ip" : "192.128.56.132",
"node" : "master"
}
]

GET

If we want to get all data without any filter, we can use getSourceData which is like select * in SQL.

getSourceData

  • getSourceData method return source data of Index. It takes three parameters in which indexName and type of index are compulsory and SSL is optional. Index Type for ElasticSearch7 can be used “_doc” for all.
  • Syntax: getSourceData(indexName,SSL)
  • Sample Code:

Note: In each code, we are using method dataParse, which parse output JSON data into a List of document values.

  • Output:
List(
Map(ID -> 1, NAME -> Gannon Chang, COMPANY -> ABC, SALARY -> 4409),
Map(ID -> 2, NAME -> Hashim Morris, COMPANY -> XYZ, SALARY -> 4914),
Map(ID -> 3, NAME -> Samson Le, COMPANY -> ABC, SALARY -> 413890), Map(ID -> 4, NAME -> Jacob Coffey, COMPANY -> BCD, SALARY -> 50481)
Map(......))

SEARCH

ScalaES connector provides two methods searchWithQry and searchByID to get selected data or conditional data.

searchWithQry

  • searchWithQry method return source data of Index Like getSourceData method. But it takes one more parameter Query Unlike to getSourceData method. We can get conditional selected data by using a query.
  • Syntax: searchWithQry(index_Name, type_of_Index, Query, SSL)

searchByID

  • searchByID method returns a single document of Index where _id is equal to passed Id Value.
  • Syntax: searchByID(index_Name, type_of_Index, Id_Value, SSL)
  • Sample Code:
  • Output
Output of searchWithQry 
List(Map(ID -> 2, NAME -> Hashim Morris, COMPANY -> XYZ, SALARY -> 49140))
Output of searchWithQry with Single column
List(Hashim Morris)
Output of searchByID
List(Map(ID -> 6, NAME -> Dillon Holder, COMPANY -> ABC, SALARY -> 734086))

Indexing

The connector comes with two methods indexExists method that return either true if Index exists or false if not and createIndex method that uses to create Index.

indexExists

  • This method checks the input index and returns true if the index exists otherwise it returns false.
  • Syntax: indexExists(indexName, SSL)

createIndex

  • This method creates an index as per input mapping. It takes mapping as a String parameter.
  • Syntax: createIndex(indexName, mapping,SSL)
  • Sample Code:
  • Output:
employee Index exists: false
{"acknowledged":true,"shards_acknowledged":true,"index":"employee"}
employee Index exists: true

Insertion

As of now, this utility provides a single document insert into an index with the help of the method insertIntoES. We are working on bulk insert and soon deploy in a new version.

insertIntoES

  • This method inserts a single document only.
  • Syntax: insertIntoES(indexName, type_of_Index, data_Mapping, SSL)
  • Sample Code:
  • Output:
Inserted Data into employee Index::::::::::::
List(Map(ID -> 113, NAME -> Nikhil, COMPANY -> ABC, SALARY -> 452157))

Update

This connector comes with a method updateValue. which updates the value of the document of an Index. Like insertIntoEs method, this method also updates a single document of an index.

updateValue

  • updateValue method update columns passed in Query for particular id (_id).
  • Syntax: updateValue(indexName, type of index, id, Query, SSL)
  • Sample Code:
  • Output:
Before Update ID::::::::::::
List(26)
After Update ID::::::::::::
List(13)

Delete

The connector comes with three delete methods.

deleteById

  • deleteById method deletes document of _id passed in parameter id.
  • Syntax: deleteById(indexName, type of index, id, SSL = false)

deleteByQry

  • deleteByQry method deletes all documents based on Query passed.
  • Syntax: deleteByQry(indexName, type of index, Query, SSL)
  • Output:
List(
Map(ID -> 2, NAME -> Hashim Morris, COMPANY -> XYZ, SALARY -> 49140),
Map(ID -> 4, NAME -> Brandon Doyle, COMPANY -> XYZ, SALARY -> 384118)
)
Deletion of selected document from index company::::::
Delete document of company XYZ with id qVuTO3EBoK_g_xnDOHo4
{"_index":"company","_type":"_doc","_id":"qVuTO3EBoK_g_xnDOHo4","_version":2,"result":"deleted","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":23,"_primary_term":4}
Delete Complete Data of Company XYZ
{"took":15,"timed_out":false,"total":2,"deleted":1,"batches":1,"version_conflicts":1,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[{"index":"company","type":"_doc","id":"qVuTO3EBoK_g_xnDOHo4","cause":{"type":"version_conflict_engine_exception","reason":"[qVuTO3EBoK_g_xnDOHo4]: version conflict, required seqNo [1], primary term [1]. but no document was found","index_uuid":"2EKcyWtORnqMSZoDlrM9Dg","shard":"0","index":"company"},"status":409}]}

deleteIN

  • deleteIN method deletes a document based on one column’s value. It takes column name and it’s valued as a parameter and deletes all document which matches the condition.
  • If we have a condition with a single column and single value then we can use this method instead of deleteByQry.
  • Syntax: deleteIN(indexName, type_of_Index, ColumnName, value, SSL)
  • Let’s use the same above example with deleteIn method.
  • Output:
All documents with Company TTT, before DeleteList(Map(ID -> 13, NAME -> Theodore Bolton, COMPANY -> TTT, SALARY -> 881521),
Map(ID -> 14, NAME -> Kamal Roberts, COMPANY -> TTT, SALARY -> 817422)
)
Deletion of selected document from index company::::::
Delete document of company TTT
{"took":54,"timed_out":false,"total":2,"deleted":2,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}
All documents with Company TTT, AFTER Delete
List()

Utilities

The Connector comes with a utility that parses JSON data into List and also removes irrelevant output tags.

dataParse

  • In most of the above code examples, we have used dataParse.
  • Syntax: dataParse(OutPutData, Column)
  • Output:
All columns::::
List(Map(ID -> 7, NAME -> Salvador Vazquez, COMPANY -> NGO, SALARY -> 895082), Map(ID -> 10, NAME -> Simon Whitaker, COMPANY -> NGO, SALARY -> 561896))
Selected Column::::
List(Salvador Vazquez, Simon Whitaker)

Key Points:

  • This tool does not require any client to be initialized for Connecting ElasticSearch.
  • It can work with all new & old versions of Elastic Search as well as Scala.
  • Index Type for all ElasticSearch version 7.x.x needs to pass “_doc” for all indices like the example of the above method.
  • SSL is the last parameter in all methods, which accept either true or false. If ElasticSearch is configured with SSL then pass true otherwise pass false or skip it.
  • Few methods that take DDL as a parameter might work differently on Linux and Windows OS. Windows OS required slight changes in Query to pass.

For Linux OS:

val query =          
s"""
|{
| "query": {
| "query_string": {
| "query": "49140",
| "fields": ["SALARY"]
| }
| }
|}
""".stripMargin

The above query needs to pass differently in Windows OS with escape double-quotes.

For Windows OS:

val query =          
s"""
|{
| \""query\"": {
| \""query_string\"": {
| \""query\"": \""49140\"",
| \""fields\"": [\""SALARY\""]
| }
| }
|}
""".stripMargin

P.S. Please post a comment or contact me on LinkedIn, if you have any suggestions to make it more advance.

--

--

Nikhil Suthar
Nikhil Suthar

Written by Nikhil Suthar

Big Data Architect || Data Analyst || Developer

No responses yet