Skip to content

Commit 8eab7c7

Browse files
working on #270
1 parent d730028 commit 8eab7c7

File tree

8 files changed

+250
-5
lines changed

8 files changed

+250
-5
lines changed

src/main/java/org/woehlke/twitterwall/CronJobs.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,25 @@ public void updateUserProfiles() {
153153
}
154154
}
155155

156+
@Scheduled(initialDelay= TEN_SECONDS * 16, fixedRate = ONE_DAY)
157+
public void startUpdateUrls(){
158+
String msg = "start UpdateUrls ";
159+
if(!schedulerProperties.getSkipFortesting()) {
160+
Task task = mqTaskStartFireAndForget.startUpdateUrls();
161+
log.info(msg+ "SCHEDULED: task "+task.getUniqueId());
162+
}
163+
}
164+
165+
@Scheduled(initialDelay= TEN_SECONDS * 16, fixedRate = ONE_DAY)
166+
public void startGarbageCollection(){
167+
String msg = "start UpdateUrls ";
168+
if(!schedulerProperties.getSkipFortesting()) {
169+
//TODO:
170+
//Task task = mqTaskStartFireAndForget.startGarbageCollection();
171+
//log.info(msg+ "SCHEDULED: task "+task.getUniqueId());
172+
}
173+
}
174+
156175
@Autowired
157176
public CronJobs(
158177
SchedulerProperties schedulerProperties,
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package org.woehlke.twitterwall.backend.mq.tasks;
2+
3+
import org.woehlke.twitterwall.oodm.model.Task;
4+
5+
import java.io.Serializable;
6+
import java.util.List;
7+
8+
public class TaskResultList implements Serializable{
9+
10+
private final long taskId;
11+
private final List<Task> userList;
12+
13+
public TaskResultList(long taskId, List<Task> userList) {
14+
this.taskId = taskId;
15+
this.userList = userList;
16+
}
17+
18+
public long getTaskId() {
19+
return taskId;
20+
}
21+
22+
public List<Task> getUserList() {
23+
return userList;
24+
}
25+
26+
@Override
27+
public boolean equals(Object o) {
28+
if (this == o) return true;
29+
if (!(o instanceof TaskResultList)) return false;
30+
31+
TaskResultList that = (TaskResultList) o;
32+
33+
if (taskId != that.taskId) return false;
34+
return userList != null ? userList.equals(that.userList) : that.userList == null;
35+
}
36+
37+
@Override
38+
public int hashCode() {
39+
int result = (int) (taskId ^ (taskId >>> 32));
40+
result = 31 * result + (userList != null ? userList.hashCode() : 0);
41+
return result;
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return "TaskResultList{" +
47+
"taskId=" + taskId +
48+
", userList=" + userList +
49+
'}';
50+
}
51+
}

src/main/java/org/woehlke/twitterwall/backend/mq/tasks/TaskStartFireAndForget.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,8 @@ public interface TaskStartFireAndForget {
4242
Task fetchUserlistOwners();
4343

4444
Task startGarbageCollection();
45+
46+
Task startUpdateUrls();
47+
48+
4549
}

src/main/java/org/woehlke/twitterwall/backend/mq/tasks/impl/TaskStartFireAndForgetImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,14 @@ public Task fetchUserlistOwners() {
131131

132132
@Override
133133
public Task startGarbageCollection() {
134-
return null;
134+
TaskType taskType = TaskType.GARBAGE_COLLECTION;
135+
return send(taskType);
136+
}
137+
138+
@Override
139+
public Task startUpdateUrls() {
140+
TaskType taskType = TaskType.UPDATE_URLS;
141+
return send(taskType);
135142
}
136143

137144
private Task send(TaskType taskType){

src/main/java/org/woehlke/twitterwall/backend/mq/tasks/impl/TaskStartImpl.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import org.springframework.messaging.MessageChannel;
1111
import org.springframework.stereotype.Component;
1212
import org.woehlke.twitterwall.backend.mq.tasks.TaskMessage;
13+
import org.woehlke.twitterwall.backend.mq.tasks.TaskResultList;
14+
import org.woehlke.twitterwall.backend.mq.urls.msg.UrlResultList;
1315
import org.woehlke.twitterwall.backend.mq.users.msg.UserMessage;
1416
import org.woehlke.twitterwall.oodm.model.Task;
1517
import org.woehlke.twitterwall.oodm.model.User;
@@ -135,7 +137,64 @@ public Task fetchUserlistOwners() {
135137

136138
@Override
137139
public Task startGarbageCollection() {
138-
return null;
140+
TaskType taskType = TaskType.GARBAGE_COLLECTION;
141+
return sendAndReceiveTask(taskType);
142+
}
143+
144+
@Override
145+
public Task startUpdateUrls() {
146+
TaskType taskType = TaskType.UPDATE_URLS;
147+
return sendAndReceiveUrl(taskType);
148+
}
149+
150+
private Task sendAndReceiveTask(TaskType taskType){
151+
TaskSendType taskSendType = TaskSendType.SEND_AND_WAIT_FOR_RESULT;
152+
String logMsg = "Start task "+taskType+"via MQ by "+ taskSendType;
153+
log.info(logMsg);
154+
CountedEntities countedEntities = countedEntitiesService.countAll();
155+
Task task = taskService.create("Start via MQ by Scheduler ", taskType, taskSendType,countedEntities);
156+
Message<TaskMessage> mqMessage = taskMessageBuilder.buildTaskMessage(task);
157+
MessagingTemplate mqTemplate = new MessagingTemplate();
158+
Message<?> returnedMessage = mqTemplate.sendAndReceive(channelTaskStart, mqMessage);
159+
Object o = returnedMessage.getPayload();
160+
countedEntities = countedEntitiesService.countAll();
161+
if( o instanceof TaskResultList){
162+
TaskResultList msg = (TaskResultList) o;
163+
long taskId = msg.getTaskId();
164+
task = taskService.findById(taskId);
165+
logMsg = "Sucessfully finished task "+taskType+"via MQ by "+ taskSendType;
166+
taskService.done(logMsg,task,countedEntities);
167+
} else {
168+
logMsg = "Finished with Error: task "+taskType+"via MQ by "+ taskSendType +": Wrong type of returnedMessage";
169+
taskService.finalError(task,logMsg,countedEntities);
170+
log.error(logMsg);
171+
}
172+
return task;
173+
}
174+
175+
private Task sendAndReceiveUrl(TaskType taskType){
176+
TaskSendType taskSendType = TaskSendType.SEND_AND_WAIT_FOR_RESULT;
177+
String logMsg = "Start task "+taskType+"via MQ by "+ taskSendType;
178+
log.info(logMsg);
179+
CountedEntities countedEntities = countedEntitiesService.countAll();
180+
Task task = taskService.create("Start via MQ by Scheduler ", taskType, taskSendType,countedEntities);
181+
Message<TaskMessage> mqMessage = taskMessageBuilder.buildTaskMessage(task);
182+
MessagingTemplate mqTemplate = new MessagingTemplate();
183+
Message<?> returnedMessage = mqTemplate.sendAndReceive(channelTaskStart, mqMessage);
184+
Object o = returnedMessage.getPayload();
185+
countedEntities = countedEntitiesService.countAll();
186+
if( o instanceof UrlResultList){
187+
UrlResultList msg = (UrlResultList) o;
188+
long taskId = msg.getTaskId();
189+
task = taskService.findById(taskId);
190+
logMsg = "Sucessfully finished task "+taskType+"via MQ by "+ taskSendType;
191+
taskService.done(logMsg,task,countedEntities);
192+
} else {
193+
logMsg = "Finished with Error: task "+taskType+"via MQ by "+ taskSendType +": Wrong type of returnedMessage";
194+
taskService.finalError(task,logMsg,countedEntities);
195+
log.error(logMsg);
196+
}
197+
return task;
139198
}
140199

141200
private Task sendAndReceiveUserList(TaskType taskType){

src/main/java/org/woehlke/twitterwall/oodm/model/tasks/TaskType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public enum TaskType {
2020
UPDATE_TWEETS,
2121
UPDATE_USERS,
2222
UPDATE_MENTIONS_FOR_USERS,
23+
UPDATE_URLS,
2324
CREATE_IMPRINT_USER,
2425
CREATE_TESTDATA_TWEETS,
2526
CREATE_TESTDATA_USERS,

src/main/resources/integration.xml

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,11 +275,24 @@
275275
</int:interceptors>
276276
</int:channel>
277277

278+
<int:channel id="channel.UpdateUrls" datatype="org.woehlke.twitterwall.backend.mq.tasks.TaskMessage">
279+
<int:interceptors>
280+
<int:wire-tap channel="logger"/>
281+
</int:interceptors>
282+
</int:channel>
283+
284+
<int:channel id="channel.async.UpdateUrls" datatype="org.woehlke.twitterwall.backend.mq.tasks.TaskMessage">
285+
<int:interceptors>
286+
<int:wire-tap channel="logger"/>
287+
</int:interceptors>
288+
</int:channel>
289+
278290

279291
<int:router id="router.TaskStart" input-channel="channel.TaskStart" expression="payload.taskType">
280292
<int:mapping value="UPDATE_TWEETS" channel="channel.UpdateTweets"/>
281293
<int:mapping value="UPDATE_USERS" channel="channel.UpdateUser"/>
282294
<int:mapping value="UPDATE_MENTIONS_FOR_USERS" channel="channel.UpdateMentionsForUsers"/>
295+
<int:mapping value="UPDATE_URLS" channel="channel.UpdateUrls"/>
283296
<int:mapping value="FETCH_TWEETS_FROM_SEARCH" channel="channel.FetchTweetsFromSearch"/>
284297
<int:mapping value="FETCH_USERS_FROM_LIST" channel="channel.FetchUsersFromList"/>
285298
<int:mapping value="FETCH_FOLLOWER" channel="channel.FetchFollower"/>
@@ -288,6 +301,7 @@
288301
<int:mapping value="CREATE_TESTDATA_TWEETS" channel="channel.CreateTestDataTweets"/>
289302
<int:mapping value="CREATE_IMPRINT_USER" channel="channel.CreateImprintUser"/>
290303
<int:mapping value="REMOVE_OLD_DATA_FROM_STORAGE" channel="channel.RemoveOldDataFromStorage"/>
304+
<int:mapping value="GARBAGE_COLLECTION" channel="channel.GarbageCollection"/>
291305
<int:mapping value="FETCH_HOME_TIMELINE" channel="channel.FetchHomeTimeline"/>
292306
<int:mapping value="FETCH_USER_TIMELINE" channel="channel.FetchUserTimeline"/>
293307
<int:mapping value="FETCH_MENTIONS" channel="channel.FetchMentions"/>
@@ -301,6 +315,7 @@
301315
<int:mapping value="UPDATE_TWEETS" channel="channel.async.UpdateTweets"/>
302316
<int:mapping value="UPDATE_USERS" channel="channel.async.UpdateUser"/>
303317
<int:mapping value="UPDATE_MENTIONS_FOR_USERS" channel="channel.async.UpdateMentionsForUsers"/>
318+
<int:mapping value="UPDATE_URLS" channel="channel.async.UpdateUrls"/>
304319
<int:mapping value="FETCH_TWEETS_FROM_SEARCH" channel="channel.async.FetchTweetsFromSearch"/>
305320
<int:mapping value="FETCH_USERS_FROM_LIST" channel="channel.async.FetchUsersFromList"/>
306321
<int:mapping value="FETCH_FOLLOWER" channel="channel.async.FetchFollower"/>
@@ -309,6 +324,7 @@
309324
<int:mapping value="CREATE_TESTDATA_TWEETS" channel="channel.async.CreateTestDataTweets"/>
310325
<int:mapping value="CREATE_IMPRINT_USER" channel="channel.async.CreateImprintUser"/>
311326
<int:mapping value="REMOVE_OLD_DATA_FROM_STORAGE" channel="channel.async.RemoveOldDataFromStorage"/>
327+
<int:mapping value="GARBAGE_COLLECTION" channel="channel.async.GarbageCollection"/>
312328
<int:mapping value="FETCH_HOME_TIMELINE" channel="channel.async.FetchHomeTimeline"/>
313329
<int:mapping value="FETCH_USER_TIMELINE" channel="channel.async.FetchUserTimeline"/>
314330
<int:mapping value="FETCH_MENTIONS" channel="channel.async.FetchMentions"/>
@@ -677,6 +693,62 @@
677693

678694

679695

696+
<int:chain id="chain.async.UpdateUrls" input-channel="channel.async.UpdateUrls">
697+
<int:splitter
698+
id="splitter.async.UpdateUrls"
699+
ref="mqUpdateUrls"
700+
method="splitUrlMessage" />
701+
<int:service-activator
702+
id="fetch.async.UpdateUrls"
703+
ref="mqUrFetcher"
704+
method="fetchUrl" />
705+
<int:service-activator
706+
id="transform.async.UpdateUrls"
707+
ref="mqUrlTransformator"
708+
method="transformUrl" />
709+
<int:service-activator
710+
id="persist.async.UpdateUrls"
711+
ref="mqUrlPersistor"
712+
method="persistUrl" />
713+
<int:aggregator
714+
id="aggregator.async.UpdateUrls"
715+
message-store="myMessageStore"
716+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
717+
<int:service-activator
718+
id="finish.async.UpdateUrls"
719+
ref="mqUrlFinisher"
720+
method="finishAsnyc" />
721+
</int:chain>
722+
723+
<int:chain id="chain.UpdateUrls" input-channel="channel.UpdateUrls">
724+
<int:splitter
725+
id="splitter.UpdateUrls"
726+
ref="mqUpdateUrls"
727+
method="splitUrlMessage" />
728+
<int:service-activator
729+
id="fetch.UpdateUrls"
730+
ref="mqUrFetcher"
731+
method="fetchUrl" />
732+
<int:service-activator
733+
id="transform.UpdateUrls"
734+
ref="mqUrlTransformator"
735+
method="transformUrl" />
736+
<int:service-activator
737+
id="persist.UpdateUrls"
738+
ref="mqUrlPersistor"
739+
method="persistUrl" />
740+
<int:aggregator
741+
id="aggregator.UpdateUrls"
742+
message-store="myMessageStore"
743+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
744+
<int:service-activator
745+
id="finish.UpdateUrls"
746+
ref="mqUrlFinisher"
747+
method="finish" />
748+
</int:chain>
749+
750+
751+
680752
<int:chain id="chain.async.UpdateTweets" input-channel="channel.async.UpdateTweets">
681753
<int:splitter
682754
id="splitter.async.UpdateTweets"
@@ -1206,9 +1278,41 @@
12061278
method="finish" />
12071279
</int:chain>
12081280

1209-
<!-- Fire and Forget Chains -->
12101281

12111282

1283+
<int:chain id="chain.async.GarbageCollection" input-channel="channel.async.GarbageCollection">
1284+
<int:splitter
1285+
id="splitter.async.GarbageCollection"
1286+
ref="mqFindTweetsToRemoveSplitter"
1287+
method="splitTweetMessage" />
1288+
<int:aggregator
1289+
id="aggregator.async.GarbageCollection"
1290+
message-store="myMessageStore"
1291+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
1292+
<int:service-activator
1293+
id="finish.async.GarbageCollection"
1294+
ref="mqTweetFinisher"
1295+
method="finishAsnyc" />
1296+
</int:chain>
1297+
1298+
<int:chain id="chain.GarbageCollection" input-channel="channel.GarbageCollection">
1299+
<int:splitter
1300+
id="splitter.GarbageCollection"
1301+
ref="mqFindTweetsToRemoveSplitter"
1302+
method="splitTweetMessage" />
1303+
<int:aggregator
1304+
id="aggregator.GarbageCollection"
1305+
message-store="myMessageStore"
1306+
release-strategy="releaserSimpleSequenceSizeReleaseStrategy" />
1307+
<int:service-activator
1308+
id="finish.GarbageCollection"
1309+
ref="mqTweetFinisher"
1310+
method="finish" />
1311+
</int:chain>
1312+
1313+
1314+
<!-- Fire and Forget Chains -->
1315+
12121316
<!--
12131317
<int:chain id="fetchUserlistOwnersFireAndForgetChain" input-channel="fetchUserlistOwnersFireAndForgetChannel">
12141318
<int:splitter

src/main/resources/templates/mention/all.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
</a>
5555
</span>
5656
<span th:if="${myPageItem.idTwitter &gt; 0}">
57-
<a href="" th:href="@{|/user/${myPageItem.screenName}|}">
57+
<a href="" th:href="@{|/user/screenName/${myPageItem.screenName}|}">
5858
<span>@</span>
5959
<span th:text="${myPageItem.screenName}">screenName</span>
6060
</a>
@@ -65,7 +65,7 @@
6565
<div class="col-md-3">has User:</div>
6666
<div class="col-md-9">
6767
<span th:if="${myPageItem.hasUser()}">
68-
<a href="" th:href="@{|/user/${myPageItem.idTwitter}|}">
68+
<a href="" th:href="@{|/user/${myPageItem.idOfUser}|}">
6969
<span>@</span>
7070
<span th:text="${myPageItem.screenName}">myPageItem.screenName</span>
7171
</a>

0 commit comments

Comments
 (0)