@@ -23,6 +23,8 @@ import java.util.concurrent.Executors
2323import java.util.concurrent.Future
2424import java.util.concurrent.TimeUnit
2525import java.util.concurrent.atomic.AtomicLong
26+ import java.util.stream.Collector
27+ import java.util.stream.Collectors
2628
2729
2830val PUPPET_COUNT = AtomicLong ()
@@ -68,7 +70,7 @@ abstract class Puppet : EventEmitter {
6870
6971 on(" heartbeat" , object : PuppetHeartbeatListener {
7072 override fun handler (payload : EventHeartbeatPayload ) {
71- log.info (" heartbeat -> ${payload.data} " )
73+ log.debug (" heartbeat -> ${payload.data} " )
7274 val watchdogFood = WatchdogFood (1000 * timeOut)
7375 watchdogFood.data = payload.data
7476 watchDog.feed(watchdogFood);
@@ -89,8 +91,10 @@ abstract class Puppet : EventEmitter {
8991
9092 on(" reset" , object : PuppetResetListener {
9193 override fun handler (payload : EventResetPayload ) {
92- log.info(" get a reset message" )
93- if (rateLimiter.tryAcquire()) {
94+
95+ log.debug(" get a reset message" )
96+ if (rateLimiter.tryAcquire()){
97+
9498 reset(payload.data)
9599 }
96100 }
@@ -120,15 +124,15 @@ abstract class Puppet : EventEmitter {
120124 executorService.scheduleAtFixedRate({
121125 if (state == StateEnum .ON ) {
122126 val incrementAndGet = HEARTBEAT_COUNTER .incrementAndGet()
123- log.info (" HEARTBEAT_COUNTER #{}" , incrementAndGet)
127+ log.debug (" HEARTBEAT_COUNTER #{}" , incrementAndGet)
124128 ding(" `recover CPR #${incrementAndGet} " )
125129 }
126130 }, HOSTIE_KEEPALIVE_TIMEOUT , HOSTIE_KEEPALIVE_TIMEOUT , TimeUnit .MILLISECONDS )
127131
128132// heartbeatTimerId = vertx.setPeriodic(HOSTIE_KEEPALIVE_TIMEOUT) { id ->
129133// if(state == StateEnum.ON) {
130134// val incrementAndGet = HEARTBEAT_COUNTER.incrementAndGet()
131- // log.info ("HEARTBEAT_COUNTER #{}", incrementAndGet)
135+ // log.debug ("HEARTBEAT_COUNTER #{}", incrementAndGet)
132136// ding("`recover CPR #${incrementAndGet}")
133137// }
134138// }
@@ -144,7 +148,9 @@ abstract class Puppet : EventEmitter {
144148 super .on(event, object : Listener {
145149 override fun handler (vararg any : Any ) {
146150
147- log.info(" class Type is {}" , any[0 ].javaClass.name)
151+
152+ log.debug(" class Type is {}" ,any[0 ].javaClass.name)
153+
148154
149155 listener.handler(any[0 ] as EventDongPayload )
150156 }
@@ -213,7 +219,9 @@ abstract class Puppet : EventEmitter {
213219 super .on(event, object : Listener {
214220 override fun handler (vararg any : Any ) {
215221
216- log.info(" class Type is {}" , any[0 ].javaClass.name)
222+
223+ log.debug(" class Type is {}" ,any[0 ].javaClass.name)
224+
217225
218226 listener.handler(any[0 ] as EventScanPayload )
219227 }
@@ -249,7 +257,9 @@ abstract class Puppet : EventEmitter {
249257 super .on(event, object : Listener {
250258 override fun handler (vararg any : Any ) {
251259
252- log.info(" class Type is {}" , any[0 ].javaClass.name)
260+
261+ log.debug(" class Type is {}" ,any[0 ].javaClass.name)
262+
253263
254264 listener.handler(any[0 ] as EventHeartbeatPayload )
255265 }
@@ -284,7 +294,7 @@ abstract class Puppet : EventEmitter {
284294 val future = CompletableFuture <Void >()
285295
286296 if (state == StateEnum .OFF ) {
287- log.info (" Puppet reset state is off" )
297+ log.debug (" Puppet reset state is off" )
288298 future.complete(null )
289299 return future
290300 }
@@ -297,7 +307,7 @@ abstract class Puppet : EventEmitter {
297307
298308
299309 protected fun login (userId : String ): Future <Void > {
300- log.info (" Puppet login in ({})" , userId)
310+ log.debug (" Puppet login in ({})" , userId)
301311 return CompletableFuture .runAsync {
302312 if (StringUtils .isNotBlank(userId)) {
303313 throw RuntimeException (" must logout first before login again!" )
@@ -365,7 +375,7 @@ abstract class Puppet : EventEmitter {
365375 protected abstract fun contactRawPayloadParser (rawPayload : ContactPayload ): Future <ContactPayload >
366376
367377 open fun contactRoomList (contactId : String ): Future <List <String ?>>? {
368- log.info (" contractId is {}" , contactId)
378+ log.debug (" contractId is {}" , contactId)
369379 val roomList = roomList().get()
370380 val roomPayloadFuture: List <CompletableFuture <RoomPayload >> = roomList
371381 .map { roomId: String ->
@@ -390,9 +400,9 @@ abstract class Puppet : EventEmitter {
390400 return CompletableFuture .completedFuture(null )
391401 }
392402
393- fun contactSearch (query : ContactQueryFilter ? , searchIdList : List <String >? ): Future <List <String >> {
403+ fun contactSearch (query : ContactQueryFilter ? , searchIdList : List <String >? = null ): Future <List <String >> {
394404
395- log.info (" query {},{} " , query, searchIdList)
405+ log.debug (" query {},{} " , query, searchIdList)
396406
397407 return CompletableFuture .supplyAsync {
398408
@@ -406,22 +416,86 @@ abstract class Puppet : EventEmitter {
406416 return @supplyAsync list
407417 }
408418
409- return @supplyAsync list!! .filter {
410- val payload = contactPayload(it).get()
411- return @filter StringUtils .equals(query.name, payload.name)
419+ val stream = list?.stream()?.map{contactPayload(it).get()}
420+ if (StringUtils .isNotBlank(query.name)){
421+ stream?.filter {
422+ StringUtils .equals(query.name, it.name)
423+ }
412424 }
425+
426+ if (StringUtils .isNotBlank(query.alias)){
427+ stream?.filter {
428+ StringUtils .equals(query.alias, it.alias)
429+ }
430+ }
431+
432+ if (StringUtils .isNotBlank(query.id)){
433+ stream?.filter {
434+ StringUtils .equals(query.alias, it.alias)
435+ }
436+ }
437+
438+ if (StringUtils .isNotBlank(query.weixin)){
439+ stream?.filter {
440+ StringUtils .equals(query.alias, it.alias)
441+ }
442+ }
443+
444+ if (query.nameReg != null ){
445+ stream?.filter{
446+ query.nameReg!! .matches(it.name ? : " " )
447+ }
448+ }
449+
450+ if (query.aliasReg != null ){
451+ stream?.filter{
452+ query.aliasReg!! .matches(it.alias ? : " " )
453+ }
454+ }
455+
456+ val collect = stream?.map {
457+ it.id
458+ }?.collect(Collectors .toList())
459+
460+ return @supplyAsync collect
461+
462+ }
463+ }
464+
465+ fun ContactPayloadFilterFactory (query : ContactQueryFilter ):ContactPayloadFilterFunction {
466+
467+ val clz = query::class .java
468+ val fields = clz.fields
469+ val list = fields.map {
470+ it.name to it.get(query)
413471 }
472+
473+ val filterKv = list.get(0 )
474+
475+ val filterFunction = { payload: ContactPayload ->
476+ Boolean
477+ val clazz = payload::class .java
478+ val field = clazz.getField(filterKv.first)
479+ val toString = field.get(payload).toString()
480+ StringUtils .equals(toString, filterKv.second.toString())
481+ }
482+
483+ return filterFunction
414484 }
415485
486+
487+
488+
416489 protected fun contactPayloadCache (contactId : String ): ContactPayload ? {
417490
418491 val contactPayload = cacheContactPayload.getIfPresent(contactId)
419492
420- log.info(" contactPayload is {} by id {}" , contactPayload, contactId)
493+ log.debug(" contactPayload is {} by id {}" , contactPayload,contactId)
494+
421495 return contactPayload
422496 }
423497
424- public fun contactPayload (contactId : String ): Future <ContactPayload > {
498+ fun contactPayload (contactId : String ): Future <ContactPayload > {
425499
426500 val future = CompletableFuture <ContactPayload >()
427501
@@ -455,7 +529,7 @@ abstract class Puppet : EventEmitter {
455529 abstract fun friendshipRawPayload (friendshipId : String ): Future <FriendshipPayload >
456530 abstract fun friendshipRawPayloadParser (rawPayload : FriendshipPayload ): Future <FriendshipPayload >
457531 fun friendshipSearch (condition : FriendshipSearchCondition ): Future <String ?> {
458- log.info (" friendshipSearch{}" , condition)
532+ log.debug (" friendshipSearch{}" , condition)
459533 Preconditions .checkNotNull(condition)
460534
461535 return if (StringUtils .isNotEmpty(condition.phone)) {
@@ -466,7 +540,7 @@ abstract class Puppet : EventEmitter {
466540 }
467541
468542 protected fun friendshipPayloadCache (friendshipId : String ): FriendshipPayload ? {
469- log.info (" friendshipId is {}" , friendshipId)
543+ log.debug (" friendshipId is {}" , friendshipId)
470544 return cacheFriendshipPayload.getIfPresent(friendshipId)
471545 }
472546
@@ -554,8 +628,66 @@ abstract class Puppet : EventEmitter {
554628 return Lists .newArrayList(keys)
555629 }
556630
557- fun messageSearch (query : MessageQueryFilter ): Future <List <String >? > {
558- TODO (" TODO" )
631+ fun messageSearch (query : MessageQueryFilter ): Future <List <String >> {
632+
633+ return CompletableFuture .supplyAsync {
634+
635+ log.debug(" messageSearch {}" , query)
636+
637+ val allMessageIdList = messageList()
638+
639+ val messagePayloadList = allMessageIdList.map {
640+ messagePayload(it).get()
641+ }
642+
643+ val stream = messagePayloadList.stream()
644+
645+ if (StringUtils .isNotEmpty(query.fromId)) {
646+ stream.filter {
647+ StringUtils .equals(it.fromId, query.fromId)
648+ }
649+ }
650+
651+ if (StringUtils .isNotEmpty(query.id)) {
652+ stream.filter {
653+ StringUtils .equals(it.id, query.id)
654+ }
655+ }
656+
657+ if (StringUtils .isNotEmpty(query.roomId)) {
658+ stream.filter {
659+ StringUtils .equals(it.roomId, query.roomId)
660+ }
661+ }
662+
663+ if (StringUtils .isNotEmpty(query.toId)) {
664+ stream.filter {
665+ StringUtils .equals(it.toId, query.toId)
666+ }
667+ }
668+
669+ if (StringUtils .isNotEmpty(query.text)) {
670+ stream.filter {
671+ StringUtils .equals(it.text, query.text)
672+ }
673+ }
674+
675+ if (query.textReg != null ) {
676+ stream.filter {
677+ query.textReg!! .matches(it.text ? : " " )
678+ }
679+ }
680+
681+ if (query.type != null ) {
682+ stream.filter {
683+ query.type == it.type
684+ }
685+ }
686+
687+ return @supplyAsync stream.map { it.id }.collect(Collectors .toList())
688+ }
689+
690+
559691 }
560692
561693 protected fun messageQueryFilterFactory (query : MessageQueryFilter ) {
@@ -719,12 +851,12 @@ abstract class Puppet : EventEmitter {
719851
720852 if (StringUtils .isNotBlank(query.topic)) {
721853 roomPayloads = roomPayloads.filter { t ->
722- log.info (" t.topic is {} and topic is {}" , t.topic, query.topic)
854+ log.debug (" t.topic is {} and topic is {}" , t.topic, query.topic)
723855 val equals = StringUtils .equals(t.topic, query.topic)
724- log.info (" equals is {}" , equals)
856+ log.debug (" equals is {}" , equals)
725857 equals
726858 }
727- log.info (" roomPayloads is {}" , roomPayloads)
859+ log.debug (" roomPayloads is {}" , roomPayloads)
728860 }
729861
730862 if (CollectionUtils .isNotEmpty(roomPayloads)) {
@@ -777,3 +909,33 @@ abstract class Puppet : EventEmitter {
777909 }
778910
779911}
912+ //
913+ // fun main() {
914+ //
915+ // val contactQueryFilter = ContactQueryFilter()
916+ //
917+ // contactQueryFilter.name = "111"
918+ //
919+ // ContactPayloadFilterFactory(contactQueryFilter)
920+ //
921+ // }
922+ // fun ContactPayloadFilterFactory(query:ContactQueryFilter):ContactPayloadFilterFunction{
923+ //
924+ // val clz = query::class.java
925+ // val fields = clz.fields
926+ // val list = fields.map {
927+ // it.name to it.get(query)
928+ // }
929+ //
930+ // val filterKv = list.get(0)
931+ //
932+ // val filterFunction = { payload: ContactPayload ->
933+ // Boolean
934+ // val clazz = payload::class.java
935+ // val field = clazz.getField(filterKv.first)
936+ // val toString = field.get(payload).toString()
937+ // StringUtils.equals(toString, filterKv.second.toString())
938+ // }
939+ //
940+ // return filterFunction
941+ // }
0 commit comments