1313import io .javaoperatorsdk .operator .processing .event .source .PrimaryToSecondaryMapper ;
1414import io .javaoperatorsdk .operator .processing .event .source .informer .InformerEventSource ;
1515
16+ /**
17+ * This reconciler used in integration tests to show the cases when PrimaryToSecondaryMapper is
18+ * needed, and to show the use cases when some mechanisms would not work without that. It's not
19+ * intended to be a reusable code as it is, rather serves for deeper understanding of the problem.
20+ */
1621@ ControllerConfiguration ()
1722public class JobReconciler
18- implements Reconciler <Job >, EventSourceInitializer <Job > {
23+ implements Reconciler <Job >, EventSourceInitializer <Job >, ErrorStatusHandler < Job > {
1924
2025 private static final String JOB_CLUSTER_INDEX = "job-cluster-index" ;
2126
2227 private final AtomicInteger numberOfExecutions = new AtomicInteger (0 );
2328
29+ private final boolean addPrimaryToSecondaryMapper ;
30+ private boolean getResourceDirectlyFromCache = false ;
31+ private volatile boolean errorOccurred ;
32+
33+ public JobReconciler () {
34+ this (true );
35+ }
36+
37+ public JobReconciler (boolean addPrimaryToSecondaryMapper ) {
38+ this .addPrimaryToSecondaryMapper = addPrimaryToSecondaryMapper ;
39+ }
40+
2441 @ Override
2542 public UpdateControl <Job > reconcile (
2643 Job resource , Context <Job > context ) {
2744
28- context .getSecondaryResource (Cluster .class )
29- .orElseThrow (() -> new IllegalStateException ("Secondary resource should be present" ));
45+ if (!getResourceDirectlyFromCache ) {
46+ // this is only possible when there is primary to secondary mapper
47+ context .getSecondaryResource (Cluster .class )
48+ .orElseThrow (() -> new IllegalStateException ("Secondary resource should be present" ));
49+ } else {
50+ // reading the resource from cache as alternative, works without primary to secondary mapper
51+ var informerEventSource = (InformerEventSource <Cluster , Job >) context .eventSourceRetriever ()
52+ .getResourceEventSourceFor (Cluster .class );
53+ informerEventSource
54+ .get (new ResourceID (resource .getSpec ().getClusterName (),
55+ resource .getMetadata ().getNamespace ()))
56+ .orElseThrow (
57+ () -> new IllegalStateException ("Secondary resource cannot be read from cache" ));
58+ }
3059 numberOfExecutions .addAndGet (1 );
3160 return UpdateControl .noUpdate ();
3261 }
@@ -36,20 +65,22 @@ public Map<String, EventSource> prepareEventSources(EventSourceContext<Job> cont
3665 context .getPrimaryCache ().addIndexer (JOB_CLUSTER_INDEX , (job -> List
3766 .of (indexKey (job .getSpec ().getClusterName (), job .getMetadata ().getNamespace ()))));
3867
39- InformerConfiguration <Cluster > informerConfiguration =
68+ InformerConfiguration . InformerConfigurationBuilder <Cluster > informerConfiguration =
4069 InformerConfiguration .from (Cluster .class , context )
4170 .withSecondaryToPrimaryMapper (cluster -> context .getPrimaryCache ()
4271 .byIndex (JOB_CLUSTER_INDEX , indexKey (cluster .getMetadata ().getName (),
4372 cluster .getMetadata ().getNamespace ()))
4473 .stream ().map (ResourceID ::fromResource ).collect (Collectors .toSet ()))
45- .withPrimaryToSecondaryMapper (
46- (PrimaryToSecondaryMapper <Job >) primary -> Set .of (new ResourceID (
47- primary .getSpec ().getClusterName (), primary .getMetadata ().getNamespace ())))
48- .withNamespacesInheritedFromController (context )
49- .build ();
74+ .withNamespacesInheritedFromController (context );
75+
76+ if (addPrimaryToSecondaryMapper ) {
77+ informerConfiguration = informerConfiguration .withPrimaryToSecondaryMapper (
78+ (PrimaryToSecondaryMapper <Job >) primary -> Set .of (new ResourceID (
79+ primary .getSpec ().getClusterName (), primary .getMetadata ().getNamespace ())));
80+ }
5081
5182 return EventSourceInitializer
52- .nameEventSources (new InformerEventSource <>(informerConfiguration , context ));
83+ .nameEventSources (new InformerEventSource <>(informerConfiguration . build () , context ));
5384 }
5485
5586 private String indexKey (String clusterName , String namespace ) {
@@ -59,4 +90,20 @@ private String indexKey(String clusterName, String namespace) {
5990 public int getNumberOfExecutions () {
6091 return numberOfExecutions .get ();
6192 }
93+
94+ @ Override
95+ public ErrorStatusUpdateControl <Job > updateErrorStatus (Job resource , Context <Job > context ,
96+ Exception e ) {
97+ errorOccurred = true ;
98+ return ErrorStatusUpdateControl .noStatusUpdate ();
99+ }
100+
101+ public boolean isErrorOccurred () {
102+ return errorOccurred ;
103+ }
104+
105+ public JobReconciler setGetResourceDirectlyFromCache (boolean getResourceDirectlyFromCache ) {
106+ this .getResourceDirectlyFromCache = getResourceDirectlyFromCache ;
107+ return this ;
108+ }
62109}
0 commit comments