Skip to content

Commit

Permalink
Merge pull request #440 from rakesh-jain/elastic
Browse files Browse the repository at this point in the history
Elasticsearch
  • Loading branch information
kmgowda authored Oct 11, 2024
2 parents 379789a + fb932f9 commit 326fdea
Show file tree
Hide file tree
Showing 10 changed files with 555 additions and 0 deletions.
1 change: 1 addition & 0 deletions build-drivers.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
api project(':drivers:linkedbq')
api project(':drivers:atomicq')
api project(':drivers:syncq')
api project(':drivers:elasticsearch')
/* api project(':drivers:sbktemplate') */
/* above line is a signature */
}
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,6 @@
<allow pkg="net.rubyeye" />
<allow pkg="software.amazon"/>
<allow pkg="org.openjdk.jmh"/>
<allow pkg="co.elastic.clients" />
<allow pkg="org.elasticsearch.client" />
</import-control>
202 changes: 202 additions & 0 deletions drivers/elasticsearch/README.md

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions drivers/elasticsearch/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
api project(":sbk-api")

// Add your storage driver specific dependencies here
api 'co.elastic.clients:elasticsearch-java:8.15.0'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Copyright (c) KMG. All Rights Reserved..
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.sbk.driver.Elasticsearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.javaprop.JavaPropsFactory;
import io.sbk.api.DataReader;
import io.sbk.api.DataWriter;
import io.sbk.data.impl.SbkString;
import io.sbk.params.ParameterOptions;
import io.sbk.api.Storage;
import io.sbk.data.DataType;
import io.sbk.params.InputOptions;
import io.sbk.system.Printer;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Class for Elasticsearch storage driver.
*
* Incase if your data type in other than byte[] (Byte Array)
* then change the datatype and getDataType.
*/
public class Elasticsearch implements Storage<String> {
private final static String CONFIGFILE = "Elasticsearch.properties";
private ElasticsearchConfig config;
private ElasticsearchClient elasticsearchClient;

public static long generateStartKey(int id) {
return (long) id * (long) Integer.MAX_VALUE;
}

@Override
public void addArgs(final InputOptions params) throws IllegalArgumentException {
final ObjectMapper mapper = new ObjectMapper(new JavaPropsFactory())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try {
config = mapper.readValue(
Objects.requireNonNull(Elasticsearch.class.getClassLoader().getResourceAsStream(CONFIGFILE)),
ElasticsearchConfig.class);
} catch (Exception ex) {
ex.printStackTrace();
throw new IllegalArgumentException(ex);
}

// change and uncomment the below code as per your driver specific parameters
// params.addOption("param", true, "Elasticsearch parameter, default param: " + config.param);
params.addOption("user", true, "ElasticSearch user : " + config.user);
params.addOption("password", true, "ElasticSearch Password " + config.password);
params.addOption("url", true, "ElasticSearch URL:" + config.url);
params.addOption("index", true, "ElasticSearch Index: " + config.index);
}

@Override
public void parseArgs(final ParameterOptions params) throws IllegalArgumentException {
// change and uncommnet the below code as per your driver specific parameters
// config.param = params.getOptionValue("param", config.param);
config.user = params.getOptionValue("user", config.user);
config.password = params.getOptionValue("password", config.password);
config.url = params.getOptionValue("url", config.url);
config.index = params.getOptionValue("index", config.index);
}

@Override
public void openStorage(final ParameterOptions params) throws IOException {
try {
elasticsearchClient = connect();
Printer.log.info("ElasticSearch Client Connected.....");
String index1 = config.index.trim();
if (!indexExists(index1)) {
createIndex(elasticsearchClient, index1);
}
} catch (ElasticsearchException e ) {
Printer.log.error(e.getMessage());
throw new RuntimeException(e);
}
}

private ElasticsearchClient connect() {
Printer.log.info("Attempting to connect to Elasticsearch...");
try {
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(config.user, config.password));

RestClientBuilder builder = RestClient.builder(HttpHost.create(config.url))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));

RestClient restClient = builder.build();
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
} catch (ElasticsearchException e) {
Printer.log.error("Error connecting to Elasticsearch: " + e.getMessage());
throw new RuntimeException(e);
}
}

private boolean indexExists(String indexName) {
try {
BooleanResponse response = elasticsearchClient.
indices().exists(e -> e.index(indexName));
return response.value();
} catch (IOException e) {
e.printStackTrace();
return false;
}
}

private void createIndex(ElasticsearchClient client, String indexName) {
try {
CreateIndexRequest createIndexRequest = CreateIndexRequest.of(c -> c.index(indexName));
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);

if (createIndexResponse.acknowledged()) {
Printer.log.info(indexName + "Created Successfully");
} else {
Printer.log.info("Index creation was not acknowledged.");
}
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void closeStorage(final ParameterOptions params) throws IOException {
try {
elasticsearchClient._transport().close();
} catch (ElasticsearchException e) {
e.printStackTrace();
Printer.log.error("Failed to close the connection");
}

}

@Override
public DataWriter<String> createWriter(final int id, final ParameterOptions params) {
return new ElasticsearchWriter(id, params, config, elasticsearchClient);
}

@Override
public DataReader<String> createReader(final int id, final ParameterOptions params) {
return new ElasticsearchReader(id, params, config, elasticsearchClient);
}

@Override
public DataType<String> getDataType() {
return new SbkString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright (c) KMG. All Rights Reserved..
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/

package io.sbk.driver.Elasticsearch;

/**
* Class for Elasticsearch storage configuration.
*/
public class ElasticsearchConfig {
// Add Elasticsearch Storage driver configuration parameters
public String user;
public String password;
public String url;
public String index;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright (c) KMG. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.sbk.driver.Elasticsearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.GetResponse;
import io.sbk.params.ParameterOptions;
import io.sbk.api.Reader;
import io.sbk.system.Printer;

import java.io.IOException;
import java.util.Map;

/**
* Class for Elasticsearch Reader.
*/
public class ElasticsearchReader implements Reader<String> {
private final ElasticsearchConfig config;
private final ElasticsearchClient client;
private long id;

public ElasticsearchReader(int readerId, ParameterOptions params, ElasticsearchConfig config, ElasticsearchClient client) {
this.id = Elasticsearch.generateStartKey(readerId);
this.config = config;
this.client = client;
}

@Override
public String read() throws IOException {
try {
GetRequest request = GetRequest.of(g -> g
.index(config.index.trim())
.id(String.valueOf(id++))
);
GetResponse<Map> response = client.get(request, Map.class);
return response.fields().toString();
} catch (ElasticsearchException e) {
Printer.log.error("Elastic Search: recordRead failed !");
throw new IOException(e);
}
}

@Override
public void close() throws IOException {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Copyright (c) KMG. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.sbk.driver.Elasticsearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import io.sbk.params.ParameterOptions;
import io.sbk.api.Writer;
import io.sbk.system.Printer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;


/**
* Class for Elasticsearch Writer.
*/
public class ElasticsearchWriter implements Writer<String> {
private final ElasticsearchConfig config;
private final ElasticsearchClient client;
private long id;

public ElasticsearchWriter(int writerID, ParameterOptions params, ElasticsearchConfig config, ElasticsearchClient client) {
this.id = Elasticsearch.generateStartKey(writerID);
this.config = config;
this.client = client;
}

@Override
public CompletableFuture writeAsync(String data) throws IOException {
try {
writeData(data);
} catch (ElasticsearchException ex ) {
Printer.log.error("Elastic Search: recordWrite failed !");
throw new IOException(ex);
}
return null;
}

@Override
public void sync() throws IOException {
}

@Override
public void close() throws IOException {
}


private void writeData(String data) {
Map<String, String> document = new HashMap<>();
document.put("data", data);
try {
IndexRequest<Map<String, String>> request = IndexRequest.of(i -> i
.index(config.index.trim())
.id(String.valueOf(id++))
.document(document)
);
client.index(request);
} catch (ElasticsearchException | IOException ex ) {
Printer.log.error("Elastic Search: recordWrite failed !");
throw new RuntimeException(ex);
}
}
}
13 changes: 13 additions & 0 deletions drivers/elasticsearch/src/main/resources/Elasticsearch.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#Copyright (c) KMG. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0

# Elasticsearch storage driver default Properties/parameters
user= rakesh
password= jain@88
url= http://localhost:9200
index=sbkj
1 change: 1 addition & 0 deletions settings-drivers.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ include 'drivers:conqueue'
include 'drivers:linkedbq'
include 'drivers:atomicq'
include 'drivers:syncq'
include 'drivers:elasticsearch'
/* include 'drivers:sbktemplate' */
/* above line is a signature */

0 comments on commit 326fdea

Please sign in to comment.