From ff92c10c863e8a8f86e1d8ee68f5cf0af88c5755 Mon Sep 17 00:00:00 2001 From: neeraj97 Date: Mon, 6 Apr 2020 13:39:11 +0530 Subject: [PATCH 1/5] Enhanced to support for multiple repositories --- README.md | 19 ++- .../GitHubSourceConnectorExample.properties | 11 +- pom.xml | 13 ++- .../kafka/GitHubAPIHttpClient.java | 102 +++++++++++------ .../kafka/GitHubSourceConnector.java | 35 +++++- .../kafka/GitHubSourceConnectorConfig.java | 29 +---- .../simplesteph/kafka/GitHubSourceTask.java | 108 +++++++++++------- .../simplesteph/kafka/HttpClientProvider.java | 66 +++++++++++ .../kafka/RepositoryVariables.java | 32 ++++++ .../Validators/ReposPatternValidator.java | 18 +++ .../simplesteph/kafka/utils/RepoJoinUtil.java | 15 +++ .../kafka/utils/SetBasicAuthUtil.java | 19 +++ .../GitHubSourceConnectorConfigTest.java | 15 ++- .../kafka/GitHubSourceConnectorTest.java | 9 +- .../kafka/GitHubSourceTaskTest.java | 41 ++++--- 15 files changed, 384 insertions(+), 148 deletions(-) create mode 100644 src/main/java/com/simplesteph/kafka/HttpClientProvider.java create mode 100644 src/main/java/com/simplesteph/kafka/RepositoryVariables.java create mode 100644 src/main/java/com/simplesteph/kafka/Validators/ReposPatternValidator.java create mode 100644 src/main/java/com/simplesteph/kafka/utils/RepoJoinUtil.java create mode 100644 src/main/java/com/simplesteph/kafka/utils/SetBasicAuthUtil.java diff --git a/README.md b/README.md index e4fa1fa..9cd55a3 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ https://links.datacumulus.com/kafka-connect-coupon # Kafka Connect Source GitHub -This connector allows you to get a stream of issues and pull requests from your GitHub repository, using the GitHub Api: https://developer.github.com/v3/issues/#list-issues-for-a-repository +This connector allows you to get a stream of issues and pull requests from GitHub repositories, using the GitHub Api: https://developer.github.com/v3/issues/#list-issues-for-a-repository Issues are pulled based on `updated_at` field, meaning any update to an issue or pull request will appear in the stream. @@ -24,14 +24,13 @@ This connector is not perfect and can be improved, please feel free to submit an name=GitHubSourceConnectorDemo tasks.max=1 connector.class=com.simplesteph.kafka.GitHubSourceConnector -topic=github-issues -github.owner=kubernetes -github.repo=kubernetes +github.repos=kubernetes/kubernetes:github-issues-kubernetes,apache/kafka:github-issues-kafka since.timestamp=2017-01-01T00:00:00Z # I heavily recommend you set those two fields: auth.username=your_username auth.password=your_password ``` +Note: Configuration for github.repos should be set and should follow the pattern owner1/repo1:topic1,owner2/repo2:topic2 .... # Running in development @@ -49,4 +48,14 @@ The simplest way to run `run.sh` is to have docker installed. It will pull a Doc Note: Java 8 is required for this connector. -TODO +#### Distributed Mode + +Build the project using `./build.sh`. + +Paste the folder `target/kafka-connnect-github-source-1.1-package /share/java/kafka-connect-github-source`(this folder has all jars of the project) +in connect-workers' `plugin.path`(can be found in the connect-workers' properties) +directory. The connect-worker should be able to detect `GitHubSourceConnector`. + + + + diff --git a/config/GitHubSourceConnectorExample.properties b/config/GitHubSourceConnectorExample.properties index 6dec2c3..9bc5882 100644 --- a/config/GitHubSourceConnectorExample.properties +++ b/config/GitHubSourceConnectorExample.properties @@ -1,10 +1,9 @@ name=GitHubSourceConnectorDemo -tasks.max=1 +tasks.max=2 connector.class=com.simplesteph.kafka.GitHubSourceConnector -topic=github-issues -github.owner=kubernetes -github.repo=kubernetes since.timestamp=2017-01-01T00:00:00Z +#Pattern to be followed for mentioning repositories owner/repo:topic +github.repos=kubernetes/kubernetes:github-issues-kubernetes,apache/kafka:github-issues-kafka # I heavily recommend you set those two fields: -# auth.username=your_username -# auth.password=your_password \ No newline at end of file +#auth.username= +#auth.password= \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1cff8da..999b8db 100644 --- a/pom.xml +++ b/pom.xml @@ -33,11 +33,16 @@ slf4j-log4j12 1.7.25 - - com.mashape.unirest - unirest-java - 1.4.9 + org.json + json + 20190722 + + + + org.apache.httpcomponents + httpclient + 4.5.10 diff --git a/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java b/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java index 3e17a09..296fd35 100644 --- a/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java +++ b/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java @@ -1,25 +1,32 @@ package com.simplesteph.kafka; -import com.mashape.unirest.http.Headers; -import com.mashape.unirest.http.HttpResponse; -import com.mashape.unirest.http.JsonNode; -import com.mashape.unirest.http.Unirest; -import com.mashape.unirest.http.exceptions.UnirestException; -import com.mashape.unirest.request.GetRequest; +import com.simplesteph.kafka.utils.SetBasicAuthUtil; +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.util.EntityUtils; import org.apache.kafka.connect.errors.ConnectException; import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; +import java.util.HashMap; // GitHubHttpAPIClient used to launch HTTP Get requests -public class GitHubAPIHttpClient { +public class GitHubAPIHttpClient implements Closeable { private static final Logger log = LoggerFactory.getLogger(GitHubAPIHttpClient.class); + private HttpClientProvider httpClientProvider; + private HttpClient httpClient; // for efficient http requests private Integer XRateLimit = 9999; private Integer XRateRemaining = 9999; @@ -29,27 +36,44 @@ public class GitHubAPIHttpClient { public GitHubAPIHttpClient(GitHubSourceConnectorConfig config){ this.config = config; + this.httpClientProvider=new HttpClientProvider(); + this.httpClient=this.httpClientProvider.getHttpClient(); } - protected JSONArray getNextIssues(Integer page, Instant since) throws InterruptedException { + protected JSONArray getNextIssues(RepositoryVariables repoVar) throws InterruptedException { - HttpResponse jsonResponse; + HttpResponse httpResponse; try { - jsonResponse = getNextIssuesAPI(page, since); - - // deal with headers in any case - Headers headers = jsonResponse.getHeaders(); - XRateLimit = Integer.valueOf(headers.getFirst("X-RateLimit-Limit")); - XRateRemaining = Integer.valueOf(headers.getFirst("X-RateLimit-Remaining")); - XRateReset = Integer.valueOf(headers.getFirst("X-RateLimit-Reset")); - switch (jsonResponse.getStatus()){ + httpResponse = getNextIssuesAPI(repoVar); + //reading the headers of the response + HashMap headers=new HashMap(); + for(Header h:httpResponse.getAllHeaders()) + headers.put(h.getName(),h.getValue()); + + XRateLimit = Integer.valueOf(httpResponse.getFirstHeader("X-RateLimit-Limit").getValue()); + XRateRemaining = Integer.valueOf(httpResponse.getFirstHeader("X-RateLimit-Remaining").getValue()); + XRateReset = Integer.valueOf(httpResponse.getFirstHeader("X-RateLimit-Reset").getValue()); + //reading the httpResponse content(body) + String jsonResponse= EntityUtils.toString(httpResponse.getEntity()); + JSONArray jsonArray=new JSONArray(); + JSONObject jsonBody=new JSONObject(); + try{ + //try to read httpResponse as JSONArray if possible + jsonArray=new JSONArray(jsonResponse); + } + catch (JSONException ex){ + //read as JSONObject + jsonBody=new JSONObject(jsonResponse); + } + + switch (httpResponse.getStatusLine().getStatusCode()){ case 200: - return jsonResponse.getBody().getArray(); + return jsonArray; case 401: throw new ConnectException("Bad GitHub credentials provided, please edit your config"); case 403: // we have issues too many requests. - log.info(jsonResponse.getBody().getObject().getString("message")); + log.info(jsonBody.getString("message")); log.info(String.format("Your rate limit is %s", XRateLimit)); log.info(String.format("Your remaining calls is %s", XRateRemaining)); log.info(String.format("The limit will reset at %s", @@ -57,41 +81,41 @@ protected JSONArray getNextIssues(Integer page, Instant since) throws Interrupte long sleepTime = XRateReset - Instant.now().getEpochSecond(); log.info(String.format("Sleeping for %s seconds", sleepTime )); Thread.sleep(1000 * sleepTime); - return getNextIssues(page, since); + return getNextIssues(repoVar); default: - log.error(constructUrl(page, since)); - log.error(String.valueOf(jsonResponse.getStatus())); - log.error(jsonResponse.getBody().toString()); - log.error(jsonResponse.getHeaders().toString()); + log.error(constructUrl(repoVar)); + log.error(String.valueOf(httpResponse.getStatusLine().getStatusCode())); + log.error(jsonResponse); + log.error(headers.toString()); log.error("Unknown error: Sleeping 5 seconds " + "before re-trying"); Thread.sleep(5000L); - return getNextIssues(page, since); + return getNextIssues(repoVar); } - } catch (UnirestException e) { + } catch (IOException e) { e.printStackTrace(); Thread.sleep(5000L); return new JSONArray(); } } - protected HttpResponse getNextIssuesAPI(Integer page, Instant since) throws UnirestException { - GetRequest unirest = Unirest.get(constructUrl(page, since)); + protected HttpResponse getNextIssuesAPI(RepositoryVariables repoVar) throws IOException { + HttpGet httpGet=new HttpGet(constructUrl(repoVar)); if (!config.getAuthUsername().isEmpty() && !config.getAuthPassword().isEmpty() ){ - unirest = unirest.basicAuth(config.getAuthUsername(), config.getAuthPassword()); + SetBasicAuthUtil.SetBasicAuth(httpGet,config.getAuthUsername(), config.getAuthPassword()); } - log.debug(String.format("GET %s", unirest.getUrl())); - return unirest.asJson(); + HttpResponse response= httpClient.execute(httpGet); + return response; } - protected String constructUrl(Integer page, Instant since){ + protected String constructUrl(RepositoryVariables repoVar){ return String.format( "https://api.github.com/repos/%s/%s/issues?page=%s&per_page=%s&since=%s&state=all&direction=asc&sort=updated", - config.getOwnerConfig(), - config.getRepoConfig(), - page, + repoVar.getOwner(), + repoVar.getRepoName(), + repoVar.nextPageToVisit, config.getBatchSize(), - since.toString()); + repoVar.nextQuerySince.toString()); } public void sleep() throws InterruptedException { @@ -108,4 +132,10 @@ public void sleepIfNeed() throws InterruptedException { sleep(); } } + + @Override + public void close() throws IOException { + if(httpClientProvider!=null) + httpClientProvider.close(); + } } \ No newline at end of file diff --git a/src/main/java/com/simplesteph/kafka/GitHubSourceConnector.java b/src/main/java/com/simplesteph/kafka/GitHubSourceConnector.java index 768bdde..d453092 100644 --- a/src/main/java/com/simplesteph/kafka/GitHubSourceConnector.java +++ b/src/main/java/com/simplesteph/kafka/GitHubSourceConnector.java @@ -1,9 +1,11 @@ package com.simplesteph.kafka; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import com.simplesteph.kafka.utils.RepoJoinUtil; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; @@ -13,6 +15,7 @@ public class GitHubSourceConnector extends SourceConnector { private static Logger log = LoggerFactory.getLogger(GitHubSourceConnector.class); private GitHubSourceConnectorConfig config; + private Map settings; @Override public String version() { @@ -22,6 +25,7 @@ public String version() { @Override public void start(Map map) { config = new GitHubSourceConnectorConfig(map); + settings=map; } @Override @@ -32,8 +36,35 @@ public Class taskClass() { @Override public List> taskConfigs(int i) { // Define the individual task configurations that will be executed. - ArrayList> configs = new ArrayList<>(1); - configs.add(config.originalsStrings()); + String repos[]=config.getReposConfig(); + if(repos.length> configs = new ArrayList<>(i); + + //Distributing the repositories among the taskConfigs efficiently so that the possible max of repos per task is min + int remainingRepos=repos.length; + int currentRepo=0; + while(remainingRepos>0){ + int NumOfReposForTask=(remainingRepos/i) +((remainingRepos%i==0)?0:1); + String ReposForTask=""; + for(int j=0;j task_settings=new HashMap<>(settings); + //updating repos for the task + task_settings.put(GitHubSourceConnectorConfig.REPOS_CONFIG,ReposForTask); + configs.add(task_settings); + + currentRepo+=NumOfReposForTask; + remainingRepos-=NumOfReposForTask; + i--; + } + return configs; } diff --git a/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java b/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java index 9056ef9..a5628f7 100644 --- a/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java +++ b/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java @@ -1,6 +1,7 @@ package com.simplesteph.kafka; import com.simplesteph.kafka.Validators.BatchSizeValidator; +import com.simplesteph.kafka.Validators.ReposPatternValidator; import com.simplesteph.kafka.Validators.TimestampValidator; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -14,21 +15,15 @@ public class GitHubSourceConnectorConfig extends AbstractConfig { - public static final String TOPIC_CONFIG = "topic"; - private static final String TOPIC_DOC = "Topic to write to"; - - public static final String OWNER_CONFIG = "github.owner"; - private static final String OWNER_DOC = "Owner of the repository you'd like to follow"; - - public static final String REPO_CONFIG = "github.repo"; - private static final String REPO_DOC = "Repository you'd like to follow"; - public static final String SINCE_CONFIG = "since.timestamp"; private static final String SINCE_DOC = "Only issues updated at or after this time are returned.\n" + "This is a timestamp in ISO 8601 format: YYYY-MM-DDTHH:MM:SSZ.\n" + "Defaults to a year from first launch."; + public static final String REPOS_CONFIG="github.repos"; + public static final String REPOS_DOC="Repositories you'd like to follow . owner/repo:topic pattern to be followed for mentioning repositories."; + public static final String BATCH_SIZE_CONFIG = "batch.size"; private static final String BATCH_SIZE_DOC = "Number of data points to retrieve at a time. Defaults to 100 (max value)"; @@ -49,9 +44,7 @@ public GitHubSourceConnectorConfig(Map parsedConfig) { public static ConfigDef conf() { return new ConfigDef() - .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC) - .define(OWNER_CONFIG, Type.STRING, Importance.HIGH, OWNER_DOC) - .define(REPO_CONFIG, Type.STRING, Importance.HIGH, REPO_DOC) + .define(REPOS_CONFIG,Type.STRING,"apache/kafka:github-issues",new ReposPatternValidator(),Importance.HIGH,REPOS_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 100, new BatchSizeValidator(), Importance.LOW, BATCH_SIZE_DOC) .define(SINCE_CONFIG, Type.STRING, ZonedDateTime.now().minusYears(1).toInstant().toString(), new TimestampValidator(), Importance.HIGH, SINCE_DOC) @@ -59,13 +52,7 @@ public static ConfigDef conf() { .define(AUTH_PASSWORD_CONFIG, Type.PASSWORD, "", Importance.HIGH, AUTH_PASSWORD_DOC); } - public String getOwnerConfig() { - return this.getString(OWNER_CONFIG); - } - - public String getRepoConfig() { - return this.getString(REPO_CONFIG); - } + public String[] getReposConfig(){return this.getString(REPOS_CONFIG).split(",");} public Integer getBatchSize() { return this.getInt(BATCH_SIZE_CONFIG); @@ -75,10 +62,6 @@ public Instant getSince() { return Instant.parse(this.getString(SINCE_CONFIG)); } - public String getTopic() { - return this.getString(TOPIC_CONFIG); - } - public String getAuthUsername() { return this.getString(AUTH_USERNAME_CONFIG); } diff --git a/src/main/java/com/simplesteph/kafka/GitHubSourceTask.java b/src/main/java/com/simplesteph/kafka/GitHubSourceTask.java index c5f90ac..c46c44e 100644 --- a/src/main/java/com/simplesteph/kafka/GitHubSourceTask.java +++ b/src/main/java/com/simplesteph/kafka/GitHubSourceTask.java @@ -12,6 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.time.Instant; import java.util.*; import static com.simplesteph.kafka.GitHubSchemas.*; @@ -21,13 +22,12 @@ public class GitHubSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(GitHubSourceTask.class); public GitHubSourceConnectorConfig config; - protected Instant nextQuerySince; - protected Integer lastIssueNumber; - protected Integer nextPageToVisit = 1; - protected Instant lastUpdatedAt; - + ArrayList repositoryList; GitHubAPIHttpClient gitHubHttpAPIClient; + int RoundRobinNumber; + int NumOfReposToFollow; + @Override public String version() { return VersionUtil.getVersion(); @@ -42,25 +42,35 @@ public void start(Map map) { } private void initializeLastVariables(){ - Map lastSourceOffset = null; - lastSourceOffset = context.offsetStorageReader().offset(sourcePartition()); - if( lastSourceOffset == null){ - // we haven't fetched anything yet, so we initialize to 7 days ago - nextQuerySince = config.getSince(); - lastIssueNumber = -1; - } else { - Object updatedAt = lastSourceOffset.get(UPDATED_AT_FIELD); - Object issueNumber = lastSourceOffset.get(NUMBER_FIELD); - Object nextPage = lastSourceOffset.get(NEXT_PAGE_FIELD); - if(updatedAt != null && (updatedAt instanceof String)){ - nextQuerySince = Instant.parse((String) updatedAt); - } - if(issueNumber != null && (issueNumber instanceof String)){ - lastIssueNumber = Integer.valueOf((String) issueNumber); - } - if (nextPage != null && (nextPage instanceof String)){ - nextPageToVisit = Integer.valueOf((String) nextPage); + //Initializing last variables of all the Repositories given to the task + String repos[]=config.getReposConfig(); + repositoryList=new ArrayList(repos.length); + RoundRobinNumber=0; + NumOfReposToFollow=repos.length; + for(String repo:repos) { + RepositoryVariables RepoVar=new RepositoryVariables(repo); + Map lastSourceOffset = null; + lastSourceOffset = context.offsetStorageReader().offset(sourcePartition(RepoVar)); + if (lastSourceOffset == null) { + // we haven't fetched anything yet, so we initialize to given configuration timestamp + RepoVar.nextQuerySince = config.getSince(); + RepoVar.nextPageToVisit=1; + RepoVar.lastIssueNumber = -1; + } else { + Object updatedAt = lastSourceOffset.get(UPDATED_AT_FIELD); + Object issueNumber = lastSourceOffset.get(NUMBER_FIELD); + Object nextPage = lastSourceOffset.get(NEXT_PAGE_FIELD); + if (updatedAt != null && (updatedAt instanceof String)) { + RepoVar.nextQuerySince = Instant.parse((String) updatedAt); + } + if (issueNumber != null && (issueNumber instanceof String)) { + RepoVar.lastIssueNumber = Integer.valueOf((String) issueNumber); + } + if (nextPage != null && (nextPage instanceof String)) { + RepoVar.nextPageToVisit = Integer.valueOf((String) nextPage); + } } + repositoryList.add(RepoVar); } } @@ -70,39 +80,42 @@ private void initializeLastVariables(){ public List poll() throws InterruptedException { gitHubHttpAPIClient.sleepIfNeed(); + RepositoryVariables repoVar=repositoryList.get(RoundRobinNumber); + // fetch data final ArrayList records = new ArrayList<>(); - JSONArray issues = gitHubHttpAPIClient.getNextIssues(nextPageToVisit, nextQuerySince); + JSONArray issues = gitHubHttpAPIClient.getNextIssues(repoVar); // we'll count how many results we get with i int i = 0; for (Object obj : issues) { Issue issue = Issue.fromJson((JSONObject) obj); - SourceRecord sourceRecord = generateSourceRecord(issue); + SourceRecord sourceRecord = generateSourceRecord(issue,repoVar); records.add(sourceRecord); i += 1; - lastUpdatedAt = issue.getUpdatedAt(); + repoVar.lastUpdatedAt = issue.getUpdatedAt(); } if (i > 0) log.info(String.format("Fetched %s record(s)", i)); - if (i == 100){ + if (i == config.getBatchSize()){ // we have reached a full batch, we need to get the next one - nextPageToVisit += 1; + repoVar.nextPageToVisit += 1; } else { - nextQuerySince = lastUpdatedAt.plusSeconds(1); - nextPageToVisit = 1; + repoVar.nextQuerySince = repoVar.lastUpdatedAt.plusSeconds(1); + repoVar.nextPageToVisit = 1; gitHubHttpAPIClient.sleep(); } + RoundRobinNumber=(RoundRobinNumber+1)%NumOfReposToFollow; return records; } - private SourceRecord generateSourceRecord(Issue issue) { + private SourceRecord generateSourceRecord(Issue issue,RepositoryVariables repoVar) { return new SourceRecord( - sourcePartition(), - sourceOffset(issue.getUpdatedAt()), - config.getTopic(), + sourcePartition(repoVar), + sourceOffset(repoVar), + repoVar.getTopic(), null, // partition will be inferred by the framework KEY_SCHEMA, - buildRecordKey(issue), + buildRecordKey(issue,repoVar), VALUE_SCHEMA, buildRecordValue(issue), issue.getUpdatedAt().toEpochMilli()); @@ -111,27 +124,34 @@ private SourceRecord generateSourceRecord(Issue issue) { @Override public void stop() { // Do whatever is required to stop your task. + try { + log.info("Closing gitHubHttpAPIClient"); + gitHubHttpAPIClient.close(); + } catch (IOException e) { + log.error("Exception in closing gitHubHttpAPIClient"); + e.printStackTrace(); + } } - private Map sourcePartition() { + private Map sourcePartition(RepositoryVariables RepoVar) { Map map = new HashMap<>(); - map.put(OWNER_FIELD, config.getOwnerConfig()); - map.put(REPOSITORY_FIELD, config.getRepoConfig()); + map.put(OWNER_FIELD, RepoVar.getOwner()); + map.put(REPOSITORY_FIELD, RepoVar.getRepoName()); return map; } - private Map sourceOffset(Instant updatedAt) { + private Map sourceOffset(RepositoryVariables repoVar) { Map map = new HashMap<>(); - map.put(UPDATED_AT_FIELD, DateUtils.MaxInstant(updatedAt, nextQuerySince).toString()); - map.put(NEXT_PAGE_FIELD, nextPageToVisit.toString()); + map.put(UPDATED_AT_FIELD,repoVar.nextQuerySince.toString()); + map.put(NEXT_PAGE_FIELD, repoVar.nextPageToVisit.toString()); return map; } - private Struct buildRecordKey(Issue issue){ + private Struct buildRecordKey(Issue issue,RepositoryVariables repoVar){ // Key Schema Struct key = new Struct(KEY_SCHEMA) - .put(OWNER_FIELD, config.getOwnerConfig()) - .put(REPOSITORY_FIELD, config.getRepoConfig()) + .put(OWNER_FIELD, repoVar.getOwner()) + .put(REPOSITORY_FIELD, repoVar.getRepoName()) .put(NUMBER_FIELD, issue.getNumber()); return key; diff --git a/src/main/java/com/simplesteph/kafka/HttpClientProvider.java b/src/main/java/com/simplesteph/kafka/HttpClientProvider.java new file mode 100644 index 0000000..d18b3a1 --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/HttpClientProvider.java @@ -0,0 +1,66 @@ +package com.simplesteph.kafka; + +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.BasicHttpClientConnectionManager; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.ssl.SSLContexts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.Closeable; +import java.io.IOException; +import java.security.GeneralSecurityException; + +public class HttpClientProvider implements Closeable { + private static final Logger log = LoggerFactory.getLogger(HttpClientProvider.class); + private PoolingHttpClientConnectionManager poolingConnectionManager; + private CloseableHttpClient httpClient; + public HttpClientProvider(){ + CreateAllSSLHttpClient(); + } + public void CreateAllSSLHttpClient() { + try { + TrustStrategy acceptingTrustStrategy = (cert, authType) -> true; + SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(null, acceptingTrustStrategy).build(); + SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, + NoopHostnameVerifier.INSTANCE); + + Registry socketFactoryRegistry = + RegistryBuilder.create() + .register("https", sslsf) + .register("http", new PlainConnectionSocketFactory()) + .build(); + + this.poolingConnectionManager = + new PoolingHttpClientConnectionManager(socketFactoryRegistry); + this.httpClient = HttpClients.custom().setSSLSocketFactory(sslsf) + .setConnectionManager(poolingConnectionManager).build(); + + log.info("Successfully created AcceptAllSSLHttpClient"); + } + catch (GeneralSecurityException ex){ + log.error("Unable to create AcceptAllSSLHttpClient"); + } + } + public CloseableHttpClient getHttpClient(){ + return httpClient; + } + @Override + public void close() throws IOException { + if(httpClient!=null) + httpClient.close(); + if(poolingConnectionManager!=null) { + poolingConnectionManager.close(); + poolingConnectionManager.shutdown(); + } + } +} diff --git a/src/main/java/com/simplesteph/kafka/RepositoryVariables.java b/src/main/java/com/simplesteph/kafka/RepositoryVariables.java new file mode 100644 index 0000000..5582a5e --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/RepositoryVariables.java @@ -0,0 +1,32 @@ +package com.simplesteph.kafka; +import java.time.Instant; + +//This is a class which has all necessary details about a repository +public class RepositoryVariables { + private String repository; + private String owner; + private String repo_name; + private String topic; + protected Instant nextQuerySince; + protected Integer lastIssueNumber; + protected Integer nextPageToVisit = 1; + protected Instant lastUpdatedAt; + public RepositoryVariables(String repository){ + this.repository=repository; + String components[]=repository.split("[/:]"); //splitting repository string which is of pattern "owner/repo:topic" + this.owner=components[0]; + this.repo_name=components[1]; + this.topic=components[2]; + } + + public String getOwner(){ + return owner; + } + public String getRepoName(){ + return repo_name; + } + + public String getTopic(){ + return topic; + } +} diff --git a/src/main/java/com/simplesteph/kafka/Validators/ReposPatternValidator.java b/src/main/java/com/simplesteph/kafka/Validators/ReposPatternValidator.java new file mode 100644 index 0000000..c9af53d --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/Validators/ReposPatternValidator.java @@ -0,0 +1,18 @@ +package com.simplesteph.kafka.Validators; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +public class ReposPatternValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + + String []repos=((String) value).split(","); + + for(String repo:repos){ + //[A-Za-z0-9_.-] are the characters that are allowed for naming in github and [a-zA-Z0-9\._-] in kafka topics + if(!repo.matches("[A-Za-z0-9_.-]{1,}/[A-Za-z0-9_.-]{1,}:[a-zA-Z0-9\\._-]{1,}")) + throw new ConfigException(name,value,"'owner1/repo1:topic1,owner2/repo2:topic2' pattern to be followed for mentioning repositories"); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/simplesteph/kafka/utils/RepoJoinUtil.java b/src/main/java/com/simplesteph/kafka/utils/RepoJoinUtil.java new file mode 100644 index 0000000..b067b51 --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/utils/RepoJoinUtil.java @@ -0,0 +1,15 @@ +package com.simplesteph.kafka.utils; + +public class RepoJoinUtil { + public static String Join(String s1,String s2){ + if(s1==null&&s2==null) + return ""; + if(s1==null||s1.trim().isEmpty()) + return s2.trim(); + else if(s2==null||s2.trim().isEmpty()) + return s1.trim(); + else + return s1.trim()+","+s2.trim(); + } + +} diff --git a/src/main/java/com/simplesteph/kafka/utils/SetBasicAuthUtil.java b/src/main/java/com/simplesteph/kafka/utils/SetBasicAuthUtil.java new file mode 100644 index 0000000..30d9660 --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/utils/SetBasicAuthUtil.java @@ -0,0 +1,19 @@ +package com.simplesteph.kafka.utils; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpRequest; + +import java.nio.charset.StandardCharsets; + +public class SetBasicAuthUtil { + public static HttpRequest SetBasicAuth(HttpRequest request,String username,String password ){ + String auth = username + ":" + password; + byte[] encodedAuth = Base64.encodeBase64( + auth.getBytes(StandardCharsets.ISO_8859_1)); + String authHeader = "Basic " + new String(encodedAuth); + request.setHeader(HttpHeaders.AUTHORIZATION, authHeader); + return request; + } + +} diff --git a/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorConfigTest.java b/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorConfigTest.java index b635b13..1ec76d5 100644 --- a/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorConfigTest.java +++ b/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorConfigTest.java @@ -18,11 +18,9 @@ public class GitHubSourceConnectorConfigTest { @Before public void setUpInitialConfig() { config = new HashMap<>(); - config.put(OWNER_CONFIG, "foo"); - config.put(REPO_CONFIG, "bar"); + config.put(REPOS_CONFIG,"kubernetes/kubernetes:github-issues.31,foo-foo/bar_bar:github.issues"); config.put(SINCE_CONFIG, "2017-04-26T01:23:45Z"); config.put(BATCH_SIZE_CONFIG, "100"); - config.put(TOPIC_CONFIG, "github-issues"); } @Test @@ -44,6 +42,17 @@ public void canReadConfigCorrectly() { } + @Test + public void validateRepos() { + config.put(REPOS_CONFIG, "not-a-valid-pattern"); + ConfigValue configValue = configDef.validateAll(config).get(REPOS_CONFIG); + assertTrue(configValue.errorMessages().size() > 0); + + config.put(REPOS_CONFIG, "valid/pattern:followed"); + configValue = configDef.validateAll(config).get(REPOS_CONFIG); + assertEquals(configValue.errorMessages().size() , 0); + } + @Test public void validateSince() { config.put(SINCE_CONFIG, "not-a-date"); diff --git a/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorTest.java b/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorTest.java index 077af8e..8f00e0f 100644 --- a/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorTest.java +++ b/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorTest.java @@ -12,11 +12,9 @@ public class GitHubSourceConnectorTest { private Map initialConfig() { Map baseProps = new HashMap<>(); - baseProps.put(OWNER_CONFIG, "foo"); - baseProps.put(REPO_CONFIG, "bar"); + baseProps.put(REPOS_CONFIG,"kubernetes/kubernetes:github-issues.31,foo-foo/bar_bar:github.issues,foo-bar/bar_foo:github.issues"); baseProps.put(SINCE_CONFIG, "2017-04-26T01:23:45Z"); baseProps.put(BATCH_SIZE_CONFIG, "100"); - baseProps.put(TOPIC_CONFIG, "github-issues"); return (baseProps); } @@ -25,6 +23,9 @@ public void taskConfigsShouldReturnOneTaskConfig() { GitHubSourceConnector gitHubSourceConnector = new GitHubSourceConnector(); gitHubSourceConnector.start(initialConfig()); assertEquals(gitHubSourceConnector.taskConfigs(1).size(),1); - assertEquals(gitHubSourceConnector.taskConfigs(10).size(),1); + assertEquals(gitHubSourceConnector.taskConfigs(2).size(),2); + assertEquals(gitHubSourceConnector.taskConfigs(3).size(),3); + assertEquals(gitHubSourceConnector.taskConfigs(4).size(),3); + assertEquals(gitHubSourceConnector.taskConfigs(10).size(),3); } } diff --git a/src/test/java/com/simplesteph/kafka/GitHubSourceTaskTest.java b/src/test/java/com/simplesteph/kafka/GitHubSourceTaskTest.java index fca0cb0..1db65ac 100644 --- a/src/test/java/com/simplesteph/kafka/GitHubSourceTaskTest.java +++ b/src/test/java/com/simplesteph/kafka/GitHubSourceTaskTest.java @@ -1,12 +1,13 @@ package com.simplesteph.kafka; -import com.mashape.unirest.http.HttpResponse; -import com.mashape.unirest.http.JsonNode; -import com.mashape.unirest.http.exceptions.UnirestException; import com.simplesteph.kafka.model.Issue; +import org.apache.http.HttpResponse; +import org.apache.http.util.EntityUtils; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.Test; +import java.io.IOException; import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -22,33 +23,31 @@ public class GitHubSourceTaskTest { private Map initialConfig() { Map baseProps = new HashMap<>(); - baseProps.put(OWNER_CONFIG, "apache"); - baseProps.put(REPO_CONFIG, "kafka"); - baseProps.put(SINCE_CONFIG, "2017-04-26T01:23:44Z"); + baseProps.put(REPOS_CONFIG,"apache/kafka:github_issues,kubernetes/kubernetes:github-issues.31,foo-foo/bar_bar:github.issues"); + baseProps.put(SINCE_CONFIG, "2017-01-01T00:00:00Z"); baseProps.put(BATCH_SIZE_CONFIG, batchSize.toString()); - baseProps.put(TOPIC_CONFIG, "github-issues"); return baseProps; } @Test - public void test() throws UnirestException { + public void test() throws IOException { gitHubSourceTask.config = new GitHubSourceConnectorConfig(initialConfig()); - gitHubSourceTask.nextPageToVisit = 1; - gitHubSourceTask.nextQuerySince = Instant.parse("2017-01-01T00:00:00Z"); + RepositoryVariables repoVar=new RepositoryVariables(gitHubSourceTask.config.getReposConfig()[0]); + repoVar.nextQuerySince=Instant.parse("2017-01-01T00:00:00Z"); gitHubSourceTask.gitHubHttpAPIClient = new GitHubAPIHttpClient(gitHubSourceTask.config); - String url = gitHubSourceTask.gitHubHttpAPIClient.constructUrl(gitHubSourceTask.nextPageToVisit, gitHubSourceTask.nextQuerySince); + String url = gitHubSourceTask.gitHubHttpAPIClient.constructUrl(repoVar); System.out.println(url); - HttpResponse httpResponse = gitHubSourceTask.gitHubHttpAPIClient.getNextIssuesAPI(gitHubSourceTask.nextPageToVisit, gitHubSourceTask.nextQuerySince); - if (httpResponse.getStatus() != 403) { - assertEquals(200, httpResponse.getStatus()); - Set headers = httpResponse.getHeaders().keySet(); - assertTrue(headers.contains("ETag")); - assertTrue(headers.contains("X-RateLimit-Limit")); - assertTrue(headers.contains("X-RateLimit-Remaining")); - assertTrue(headers.contains("X-RateLimit-Reset")); - assertEquals(batchSize.intValue(), httpResponse.getBody().getArray().length()); - JSONObject jsonObject = (JSONObject) httpResponse.getBody().getArray().get(0); + HttpResponse httpResponse = gitHubSourceTask.gitHubHttpAPIClient.getNextIssuesAPI(repoVar); + if (httpResponse.getStatusLine().getStatusCode() != 403) { + assertEquals(200, httpResponse.getStatusLine().getStatusCode()); + assertTrue(httpResponse.getFirstHeader("ETag")!=null); + assertTrue(httpResponse.getFirstHeader("X-RateLimit-Limit")!=null); + assertTrue(httpResponse.getFirstHeader("X-RateLimit-Remaining")!=null); + assertTrue(httpResponse.getFirstHeader("X-RateLimit-Reset")!=null); + JSONArray jsonArray=new JSONArray(EntityUtils.toString(httpResponse.getEntity())); + assertTrue(batchSize.intValue() >= jsonArray.length()); + JSONObject jsonObject = (JSONObject) jsonArray.get(0); Issue issue = Issue.fromJson(jsonObject); assertNotNull(issue); assertNotNull(issue.getNumber()); From 9aead7fcd626a3b1557c2c8f27416c82b52a31a3 Mon Sep 17 00:00:00 2001 From: neeraj97 Date: Mon, 6 Apr 2020 16:35:32 +0530 Subject: [PATCH 2/5] Update README.md --- README.md | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 9cd55a3..f7bd470 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,3 @@ -# Learning - -This project is a companion repository to the [Apache Kafka Connect course on Udemy](https://links.datacumulus.com/kafka-connect-coupon). - -https://links.datacumulus.com/kafka-connect-coupon - # Kafka Connect Source GitHub This connector allows you to get a stream of issues and pull requests from GitHub repositories, using the GitHub Api: https://developer.github.com/v3/issues/#list-issues-for-a-repository @@ -14,10 +8,6 @@ The connector writes to topic that is great candidate to demonstrate *log compac It's finally aimed to be an educative example to demonstrate how to write a Source Connector a little less trivial than the `FileStreamSourceConnector` provided in Kafka. -# Contributing - -This connector is not perfect and can be improved, please feel free to submit any PR you deem useful. - # Configuration ``` @@ -56,6 +46,6 @@ Paste the folder `target/kafka-connnect-github-source-1.1-package /share/java/ka in connect-workers' `plugin.path`(can be found in the connect-workers' properties) directory. The connect-worker should be able to detect `GitHubSourceConnector`. +# Contributing - - +This connector can be improved much, please feel free to submit any PR you deem useful. \ No newline at end of file From 692bfc34e92c8a138fb56c8e13f451505bd7dcb6 Mon Sep 17 00:00:00 2001 From: neeraj97 Date: Mon, 6 Apr 2020 16:46:04 +0530 Subject: [PATCH 3/5] Update README.md --- README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f7bd470..a2ecf9d 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,6 @@ Issues are pulled based on `updated_at` field, meaning any update to an issue or The connector writes to topic that is great candidate to demonstrate *log compaction*. It's also a fun way to automate your GitHub workflow. -It's finally aimed to be an educative example to demonstrate how to write a Source Connector a little less trivial than the `FileStreamSourceConnector` provided in Kafka. - # Configuration ``` @@ -22,6 +20,11 @@ auth.password=your_password ``` Note: Configuration for github.repos should be set and should follow the pattern owner1/repo1:topic1,owner2/repo2:topic2 .... +You can control the number of tasks to run by using *tasks.max*. This allows work to be divided from task i.e., each task will be assigned few repositories ans will +fetch issues for those repositories. + +Set *since.timestamp* to fetch the issues of repositories which have been updated after the required timestamp. + # Running in development Note: Java 8 is required for this connector. From d034f5fec4ef279d3e6bae476972559582b0b7fa Mon Sep 17 00:00:00 2001 From: neeraj97 Date: Mon, 6 Apr 2020 16:48:33 +0530 Subject: [PATCH 4/5] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a2ecf9d..a2ab6ed 100644 --- a/README.md +++ b/README.md @@ -20,8 +20,8 @@ auth.password=your_password ``` Note: Configuration for github.repos should be set and should follow the pattern owner1/repo1:topic1,owner2/repo2:topic2 .... -You can control the number of tasks to run by using *tasks.max*. This allows work to be divided from task i.e., each task will be assigned few repositories ans will -fetch issues for those repositories. +You can control the number of tasks to run by using *tasks.max*. This allows work to be divided among tasks i.e., each task will be assigned few repositories and +will fetch issues for those repositories. Set *since.timestamp* to fetch the issues of repositories which have been updated after the required timestamp. From 94eb06dc2c547e54bdc5eec423d5d13e56f81c67 Mon Sep 17 00:00:00 2001 From: neeraj97 Date: Mon, 6 Apr 2020 18:54:17 +0530 Subject: [PATCH 5/5] Added support for personal access token authentication --- README.md | 22 ++++++++++++------- .../GitHubSourceConnectorExample.properties | 5 +++-- .../kafka/GitHubAPIHttpClient.java | 7 +++++- .../kafka/GitHubSourceConnectorConfig.java | 8 ++++++- .../kafka/utils/SetBearerAuthUtil.java | 12 ++++++++++ 5 files changed, 42 insertions(+), 12 deletions(-) create mode 100644 src/main/java/com/simplesteph/kafka/utils/SetBearerAuthUtil.java diff --git a/README.md b/README.md index a2ab6ed..f86abf4 100644 --- a/README.md +++ b/README.md @@ -10,20 +10,26 @@ The connector writes to topic that is great candidate to demonstrate *log compac ``` name=GitHubSourceConnectorDemo -tasks.max=1 +tasks.max=2 connector.class=com.simplesteph.kafka.GitHubSourceConnector -github.repos=kubernetes/kubernetes:github-issues-kubernetes,apache/kafka:github-issues-kafka since.timestamp=2017-01-01T00:00:00Z -# I heavily recommend you set those two fields: -auth.username=your_username -auth.password=your_password +#Pattern to be followed for mentioning repositories owner/repo:topic +github.repos=kubernetes/kubernetes:github-issues-kubernetes,apache/kafka:github-issues-kafka +# I heavily recommend you set auth.accesstoken field: +#auth.username= +#auth.password= +auth.accesstoken=your_accestoken ``` -Note: Configuration for github.repos should be set and should follow the pattern owner1/repo1:topic1,owner2/repo2:topic2 .... +Note: Configuration for **github.repos** should be set and should follow the pattern owner1/repo1:topic1,owner2/repo2:topic2 .... -You can control the number of tasks to run by using *tasks.max*. This allows work to be divided among tasks i.e., each task will be assigned few repositories and +You can control the number of tasks to run by using **tasks.max**. This allows work to be divided among tasks i.e., each task will be assigned few repositories and will fetch issues for those repositories. -Set *since.timestamp* to fetch the issues of repositories which have been updated after the required timestamp. +Set **since.timestamp** to fetch the issues of repositories which have been updated after the required timestamp. + +Use either **auth.username** and **auth.password** or only **auth.accesstoken**. Using **auth.accesstoken** is preferable +because authentication with *username* and *password* has been deprecated and will soon be not supported by Github APIs. +For generating the *personal accesstoken* follow the steps in [https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line) # Running in development diff --git a/config/GitHubSourceConnectorExample.properties b/config/GitHubSourceConnectorExample.properties index 9bc5882..faf9831 100644 --- a/config/GitHubSourceConnectorExample.properties +++ b/config/GitHubSourceConnectorExample.properties @@ -4,6 +4,7 @@ connector.class=com.simplesteph.kafka.GitHubSourceConnector since.timestamp=2017-01-01T00:00:00Z #Pattern to be followed for mentioning repositories owner/repo:topic github.repos=kubernetes/kubernetes:github-issues-kubernetes,apache/kafka:github-issues-kafka -# I heavily recommend you set those two fields: +# I heavily recommend you set auth.accesstoken field: #auth.username= -#auth.password= \ No newline at end of file +#auth.password= +auth.accesstoken=your_accestoken \ No newline at end of file diff --git a/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java b/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java index 296fd35..cd4113a 100644 --- a/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java +++ b/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java @@ -1,6 +1,7 @@ package com.simplesteph.kafka; import com.simplesteph.kafka.utils.SetBasicAuthUtil; +import com.simplesteph.kafka.utils.SetBearerAuthUtil; import org.apache.http.Header; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; @@ -101,7 +102,11 @@ protected JSONArray getNextIssues(RepositoryVariables repoVar) throws Interrupte protected HttpResponse getNextIssuesAPI(RepositoryVariables repoVar) throws IOException { HttpGet httpGet=new HttpGet(constructUrl(repoVar)); - if (!config.getAuthUsername().isEmpty() && !config.getAuthPassword().isEmpty() ){ + if(!config.getAuthAccesstoken().isEmpty()){ + SetBearerAuthUtil.SetBearerAuthUtil(httpGet,config.getAuthAccesstoken()); + } + + else if (!config.getAuthUsername().isEmpty() && !config.getAuthPassword().isEmpty() ){ SetBasicAuthUtil.SetBasicAuth(httpGet,config.getAuthUsername(), config.getAuthPassword()); } HttpResponse response= httpClient.execute(httpGet); diff --git a/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java b/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java index a5628f7..eb2c7a0 100644 --- a/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java +++ b/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java @@ -34,6 +34,9 @@ public class GitHubSourceConnectorConfig extends AbstractConfig { private static final String AUTH_PASSWORD_DOC = "Optional Password to authenticate calls"; + public static final String AUTH_ACCESSTOKEN_CONFIG = "auth.accesstoken"; + private static final String AUTH_ACCESSTOKEN_DOC = "Optional accesstoken to authenticate calls"; + public GitHubSourceConnectorConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); } @@ -49,7 +52,8 @@ public static ConfigDef conf() { .define(SINCE_CONFIG, Type.STRING, ZonedDateTime.now().minusYears(1).toInstant().toString(), new TimestampValidator(), Importance.HIGH, SINCE_DOC) .define(AUTH_USERNAME_CONFIG, Type.STRING, "", Importance.HIGH, AUTH_USERNAME_DOC) - .define(AUTH_PASSWORD_CONFIG, Type.PASSWORD, "", Importance.HIGH, AUTH_PASSWORD_DOC); + .define(AUTH_PASSWORD_CONFIG, Type.PASSWORD, "", Importance.HIGH, AUTH_PASSWORD_DOC) + .define(AUTH_ACCESSTOKEN_CONFIG, Type.PASSWORD, "", Importance.HIGH, AUTH_ACCESSTOKEN_DOC); } public String[] getReposConfig(){return this.getString(REPOS_CONFIG).split(",");} @@ -69,4 +73,6 @@ public String getAuthUsername() { public String getAuthPassword(){ return this.getPassword(AUTH_PASSWORD_CONFIG).value(); } + + public String getAuthAccesstoken(){return this.getPassword(AUTH_ACCESSTOKEN_CONFIG).value();} } diff --git a/src/main/java/com/simplesteph/kafka/utils/SetBearerAuthUtil.java b/src/main/java/com/simplesteph/kafka/utils/SetBearerAuthUtil.java new file mode 100644 index 0000000..5154a3a --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/utils/SetBearerAuthUtil.java @@ -0,0 +1,12 @@ +package com.simplesteph.kafka.utils; + +import org.apache.http.HttpHeaders; +import org.apache.http.HttpRequest; + +public class SetBearerAuthUtil { + public static HttpRequest SetBearerAuthUtil(HttpRequest request, String bearToken){ + String authHeader = "Bearer" + " " + bearToken; + request.setHeader(HttpHeaders.AUTHORIZATION, authHeader); + return request; + } +}