Skip to content

Commit e6f482a

Browse files
working on #229, working on #228
1 parent f601e61 commit e6f482a

File tree

16 files changed

+240
-4
lines changed

16 files changed

+240
-4
lines changed

src/main/java/org/woehlke/twitterwall/ScheduledTasks.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ public void fetchUsersFromDefinedUserList(){
6060
}
6161
}
6262

63+
64+
@Scheduled(fixedRate = FIXED_RATE_FOR_SCHEDULAR_REMOVE_OLD_DATA_FROM_STORAGE)
65+
public void removeOldDataFromStorage(){
66+
String msg = "remove Old Data From Storage: ";
67+
if(schedulerProperties.getRemoveOldDataFromStorageAllow() && !schedulerProperties.getSkipFortesting()) {
68+
Task task = asyncStartTask.removeOldDataFromStorage();
69+
log.info(msg+ "SCHEDULED: task "+task.getUniqueId());
70+
}
71+
}
72+
6373
@Autowired
6474
public ScheduledTasks(SchedulerProperties schedulerProperties, AsyncStartTask mqAsyncStartTask) {
6575
this.schedulerProperties = schedulerProperties;
@@ -82,6 +92,8 @@ public ScheduledTasks(SchedulerProperties schedulerProperties, AsyncStartTask mq
8292

8393
private final static long FIXED_RATE_FOR_SCHEDULAR_FETCH_USER_LIST = ZWOELF_STUNDEN;
8494

95+
private final static long FIXED_RATE_FOR_SCHEDULAR_REMOVE_OLD_DATA_FROM_STORAGE = EINE_STUNDE;
96+
8597
private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
8698

8799
private final SchedulerProperties schedulerProperties;

src/main/java/org/woehlke/twitterwall/conf/properties/SchedulerProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ public class SchedulerProperties {
3636
@NotNull
3737
private String fetchUserListName;
3838

39+
@NotNull
40+
private Boolean removeOldDataFromStorageAllow;
41+
3942
public Boolean getAllowFetchTweetsFromTwitterSearch() {
4043
return allowFetchTweetsFromTwitterSearch;
4144
}
@@ -99,4 +102,12 @@ public String getFetchUserListName() {
99102
public void setFetchUserListName(String fetchUserListName) {
100103
this.fetchUserListName = fetchUserListName;
101104
}
105+
106+
public Boolean getRemoveOldDataFromStorageAllow() {
107+
return removeOldDataFromStorageAllow;
108+
}
109+
110+
public void setRemoveOldDataFromStorageAllow(Boolean removeOldDataFromStorageAllow) {
111+
this.removeOldDataFromStorageAllow = removeOldDataFromStorageAllow;
112+
}
102113
}

src/main/java/org/woehlke/twitterwall/oodm/entities/parts/TaskType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ public enum TaskType {
1414
CONTROLLER_CREATE_IMPRINT_USER,
1515
CONTROLLER_CREATE_TESTDATA_TWEETS,
1616
CONTROLLER_CREATE_TESTDATA_USERS,
17-
CONTROLLER_ADD_USER_FOR_SCREEN_NAME
17+
CONTROLLER_ADD_USER_FOR_SCREEN_NAME,
18+
REMOVE_OLD_DATA_FROM_STORAGE
1819
}

src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ public interface AsyncStartTask {
1717
Task createTestDataForTweets();
1818

1919
Task createTestDataForUser();
20+
21+
Task removeOldDataFromStorage();
2022
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endpoint;
2+
3+
import org.springframework.messaging.Message;
4+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
5+
import org.woehlke.twitterwall.scheduled.mq.msg.TweetMessage;
6+
7+
import java.util.List;
8+
9+
public interface FindTweetsToRemoveSplitter {
10+
11+
List<Message<TweetMessage>> splitMessage(Message<TaskMessage> message);
12+
}

src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/AsyncStartTaskImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ public Task createTestDataForUser() {
6262
return send(taskType);
6363
}
6464

65+
@Override
66+
public Task removeOldDataFromStorage() {
67+
TaskType taskType = TaskType.REMOVE_OLD_DATA_FROM_STORAGE;
68+
return send(taskType);
69+
}
70+
6571
private Task send(TaskType taskType){
6672
SendType sendType = SendType.FIRE_AND_FORGET;
6773
String msg = "START Task "+taskType+" via MQ by "+sendType;
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.woehlke.twitterwall.scheduled.mq.endpoint.impl;
2+
3+
import org.springframework.beans.factory.annotation.Autowired;
4+
import org.springframework.data.domain.Page;
5+
import org.springframework.data.domain.PageRequest;
6+
import org.springframework.data.domain.Pageable;
7+
import org.springframework.integration.support.MessageBuilder;
8+
import org.springframework.messaging.Message;
9+
import org.springframework.stereotype.Component;
10+
import org.woehlke.twitterwall.oodm.entities.Task;
11+
import org.woehlke.twitterwall.oodm.entities.Tweet;
12+
import org.woehlke.twitterwall.oodm.entities.parts.CountedEntities;
13+
import org.woehlke.twitterwall.oodm.service.CountedEntitiesService;
14+
import org.woehlke.twitterwall.oodm.service.TaskService;
15+
import org.woehlke.twitterwall.oodm.service.TweetService;
16+
import org.woehlke.twitterwall.scheduled.mq.endpoint.FindTweetsToRemoveSplitter;
17+
import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage;
18+
import org.woehlke.twitterwall.scheduled.mq.msg.TweetMessage;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
@Component("mqFindTweetsToRemoveSplitter")
24+
public class FindTweetsToRemoveSplitterImpl implements FindTweetsToRemoveSplitter {
25+
26+
private final TweetService tweetService;
27+
28+
private final TaskService taskService;
29+
30+
private final CountedEntitiesService countedEntitiesService;
31+
32+
@Autowired
33+
public FindTweetsToRemoveSplitterImpl(TweetService tweetService, TaskService taskService, CountedEntitiesService countedEntitiesService) {
34+
this.tweetService = tweetService;
35+
this.taskService = taskService;
36+
this.countedEntitiesService = countedEntitiesService;
37+
}
38+
39+
@Override
40+
public List<Message<TweetMessage>> splitMessage(Message<TaskMessage> message) {
41+
CountedEntities countedEntities = countedEntitiesService.countAll();
42+
List<Message<TweetMessage>> tweets = new ArrayList<>();
43+
TaskMessage msgIn = message.getPayload();
44+
long id = msgIn.getTaskId();
45+
Task task = taskService.findById(id);
46+
task = taskService.start(task,countedEntities);
47+
int pageTweet = 1;
48+
int pageSize = 20;
49+
Pageable pageRequestTweet = new PageRequest(pageTweet, pageSize);
50+
//TODO: #229 https://github.com/phasenraum2010/twitterwall2/issues/229
51+
Page<Tweet> tweetList = tweetService.getAll(pageRequestTweet);
52+
int loopId = 0;
53+
int loopAll = tweetList.getContent().size();
54+
for (Tweet tweet: tweetList) {
55+
loopId++;
56+
TweetMessage tweetMsg = new TweetMessage(msgIn,tweet);
57+
Message<TweetMessage> mqMessageOut =
58+
MessageBuilder.withPayload(tweetMsg)
59+
.copyHeaders(message.getHeaders())
60+
.setHeader("tw_lfd_nr",loopId)
61+
.setHeader("tw_all",loopAll)
62+
.build();
63+
tweets.add(mqMessageOut);
64+
}
65+
return tweets;
66+
}
67+
}

src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/StartTaskImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ public Task createTestDataForUser() {
6262
return sendAndReceiveUser(taskType);
6363
}
6464

65+
@Override
66+
public Task removeOldDataFromStorage() {
67+
TaskType taskType = TaskType.REMOVE_OLD_DATA_FROM_STORAGE;
68+
return sendAndReceiveTweet(taskType);
69+
}
70+
6571
@Override
6672
public User createImprintUser() {
6773
TaskType taskType = TaskType.CONTROLLER_CREATE_IMPRINT_USER;

src/main/resources/application.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ twitterwall:
124124
fetchUserListAllow: ${TWITTERWALL_SCHEDULER_USER_LIST_ALLOW}
125125
fetchUserListName: ${TWITTERWALL_SCHEDULER_USER_LIST_NAME}
126126
herokuDbRowsLimit: ${TWITTERWALL_SCHEDULER_HEROKU_DB_LIMIT}
127+
removeOldDataFromStorageAllow: ${TWITTERWALL_SCHEDULER_ALLOW_REMOVE_OLD_DATA_FROM_STORAGE}
127128
skipFortesting: false
128129
testdata:
129130
oodm:

src/main/resources/integration.xml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@
7979
</int:interceptors>
8080
</int:channel>
8181

82+
<int:channel id="removeOldDataFromStorageChannel" datatype="org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage">
83+
<int:interceptors>
84+
<int:wire-tap channel="logger"/>
85+
</int:interceptors>
86+
</int:channel>
8287

8388
<int:router id="startTaskRouter" input-channel="startTaskChannel" expression="payload.taskType">
8489
<int:mapping value="UPDATE_TWEETS" channel="updateTweetsChannel"/>
@@ -89,6 +94,7 @@
8994
<int:mapping value="CONTROLLER_CREATE_TESTDATA_USERS" channel="createTestDataUsersChannel"/>
9095
<int:mapping value="CONTROLLER_CREATE_TESTDATA_TWEETS" channel="createTestDataTweetsChannel"/>
9196
<int:mapping value="CONTROLLER_CREATE_IMPRINT_USER" channel="createImprintUserChannel"/>
97+
<int:mapping value="REMOVE_OLD_DATA_FROM_STORAGE" channel="removeOldDataFromStorageChannel"/>
9298
</int:router>
9399

94100
<int:chain id="updateTweetsChain" input-channel="updateTweetsChannel">
@@ -272,6 +278,23 @@
272278
method="persistUser" />
273279
</int:chain>
274280

281+
<int:chain id="removeOldDataFromStorageChain" input-channel="removeOldDataFromStorageChannel">
282+
<int:splitter
283+
id="removeOldDataFromStorageSplitter"
284+
ref="mqFindTweetsToRemoveSplitter"
285+
method="splitMessage" />
286+
<int:aggregator
287+
id="removeOldDataFromStorageAggregator"
288+
message-store="store"
289+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
290+
<int:service-activator
291+
id="removeOldDataFromStorageFinisher"
292+
ref="mqTweetFinisher"
293+
method="finish" />
294+
</int:chain>
295+
296+
297+
275298
<int:channel id="startTaskFireAndForgetChannel" datatype="org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage">
276299
<int:dispatcher task-executor="myThreadPoolTaskExecutor"/>
277300
</int:channel>
@@ -288,6 +311,7 @@
288311
<int:mapping value="FETCH_USERS_FROM_LIST" channel="fetchUsersFromListFireAndForgetChannel"/>
289312
<int:mapping value="CONTROLLER_CREATE_TESTDATA_USERS" channel="createTestDataUsersFireAndForgetChannel"/>
290313
<int:mapping value="CONTROLLER_CREATE_TESTDATA_TWEETS" channel="createTestDataTweetsFireAndForgetChannel"/>
314+
<int:mapping value="REMOVE_OLD_DATA_FROM_STORAGE" channel="removeOldDataFromStorageFireAndForgetChannel"/>
291315
</int:router>
292316

293317
<int:channel id="updateTweetsFireAndForgetChannel" datatype="org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage">
@@ -332,6 +356,12 @@
332356
</int:interceptors>
333357
</int:channel>
334358

359+
<int:channel id="removeOldDataFromStorageFireAndForgetChannel" datatype="org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage">
360+
<int:interceptors>
361+
<int:wire-tap channel="logger"/>
362+
</int:interceptors>
363+
</int:channel>
364+
335365

336366
<int:chain id="updateTweetsFireAndForgetChain" input-channel="updateTweetsFireAndForgetChannel">
337367
<int:splitter
@@ -500,4 +530,19 @@
500530
method="finishAsnyc" />
501531
</int:chain>
502532

533+
<int:chain id="removeOldDataFromStorageFireAndForgetChain" input-channel="removeOldDataFromStorageFireAndForgetChannel">
534+
<int:splitter
535+
id="removeOldDataFromStorageFireAndForgetSplitter"
536+
ref="mqFindTweetsToRemoveSplitter"
537+
method="splitMessage" />
538+
<int:aggregator
539+
id="removeOldDataFromStorageFireAndForgetAggregator"
540+
message-store="store"
541+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
542+
<int:service-activator
543+
id="removeOldDataFromStorageFireAndForgetFinisher"
544+
ref="mqTweetFinisher"
545+
method="finishAsnyc" />
546+
</int:chain>
547+
503548
</beans>

0 commit comments

Comments
 (0)