@@ -31,6 +31,7 @@ class ReviewNode(BaseWorkflowNode):
3131 definition_id (WorkflowDefinitionId): Node type identifier (read-only)
3232 instructions (Optional[str]): Task instructions for reviewers
3333 group_assignment (Optional[Union[str, List[str], Any]]): User groups for assignment
34+ max_contributions_per_user (Optional[int]): Maximum contributions per user (null means infinite)
3435 node_config (List[Dict[str, Any]]): API configuration for assignments
3536
3637 Inputs:
@@ -55,6 +56,7 @@ class ReviewNode(BaseWorkflowNode):
5556 >>> review = ReviewNode(
5657 ... label="Quality Review",
5758 ... group_assignment=["reviewer-group-id"],
59+ ... max_contributions_per_user=5,
5860 ... instructions="Check annotation accuracy and completeness"
5961 ... )
6062 >>> # Connect inputs and outputs
@@ -90,6 +92,11 @@ class ReviewNode(BaseWorkflowNode):
9092 description = "User group assignment for this review node. Can be a UserGroup object, a string ID, or a list of IDs." ,
9193 alias = "groupAssignment" ,
9294 )
95+ max_contributions_per_user : Optional [int ] = Field (
96+ default = None ,
97+ description = "Maximum contributions per user (null means infinite)" ,
98+ alias = "maxContributionsPerUser" ,
99+ )
93100 node_config : List [Dict [str , Any ]] = Field (
94101 default_factory = lambda : [],
95102 description = "Contains assignment rules etc." ,
@@ -98,7 +105,8 @@ class ReviewNode(BaseWorkflowNode):
98105
99106 @model_validator (mode = "after" )
100107 def sync_group_assignment_with_config (self ) -> "ReviewNode" :
101- """Sync group_assignment with node_config for API compatibility."""
108+ """Sync group_assignment and max_contributions_per_user with node_config for API compatibility."""
109+ # Handle group assignment (existing logic)
102110 if self .group_assignment is not None :
103111 group_ids = []
104112
@@ -120,16 +128,100 @@ def sync_group_assignment_with_config(self) -> "ReviewNode":
120128 # Create config entries for group assignments
121129 if group_ids :
122130 # Update node_config with assignment rule in correct API format
123- self .node_config = [
124- {
125- "field" : "groupAssignment" ,
126- "value" : group_ids ,
127- "metadata" : None ,
128- }
129- ]
131+ group_config_entry = {
132+ "field" : "groupAssignment" ,
133+ "value" : group_ids ,
134+ "metadata" : None ,
135+ }
136+
137+ # Check if group assignment entry already exists and update it, otherwise add it
138+ updated = False
139+ for i , entry in enumerate (self .node_config ):
140+ if entry .get ("field" ) == "groupAssignment" :
141+ self .node_config [i ] = group_config_entry
142+ updated = True
143+ break
144+
145+ if not updated :
146+ self .node_config .append (group_config_entry )
147+
148+ # Handle max_contributions_per_user (new logic)
149+ if self .max_contributions_per_user is not None :
150+ # Add max contributions config entry
151+ max_contrib_config_entry = {
152+ "field" : "maxContributionsPerUser" ,
153+ "value" : self .max_contributions_per_user ,
154+ "metadata" : None ,
155+ }
156+
157+ # Check if entry already exists and update it, otherwise add it
158+ updated = False
159+ for i , entry in enumerate (self .node_config ):
160+ if entry .get ("field" ) == "maxContributionsPerUser" :
161+ self .node_config [i ] = max_contrib_config_entry
162+ updated = True
163+ break
164+
165+ if not updated :
166+ self .node_config .append (max_contrib_config_entry )
130167
131168 return self
132169
170+ def __setattr__ (self , name : str , value : Any ) -> None :
171+ """Custom setter to sync field changes with node_config."""
172+ super ().__setattr__ (name , value )
173+
174+ # Sync changes to node_config when max_contributions_per_user is updated
175+ if name == "max_contributions_per_user" and hasattr (
176+ self , "node_config"
177+ ):
178+ self ._sync_config ()
179+
180+ def _sync_config (self ) -> None :
181+ """Sync max_contributions_per_user with node_config."""
182+ if (
183+ hasattr (self , "max_contributions_per_user" )
184+ and self .max_contributions_per_user is not None
185+ ):
186+ # Add max contributions config entry
187+ config_entry = {
188+ "field" : "maxContributionsPerUser" ,
189+ "value" : self .max_contributions_per_user ,
190+ "metadata" : None ,
191+ }
192+
193+ # Check if entry already exists and update it, otherwise add it
194+ updated = False
195+ for i , entry in enumerate (self .node_config ):
196+ if entry .get ("field" ) == "maxContributionsPerUser" :
197+ self .node_config [i ] = config_entry
198+ updated = True
199+ break
200+
201+ if not updated :
202+ self .node_config .append (config_entry )
203+ else :
204+ # Remove the entry if value is None
205+ self .node_config = [
206+ entry
207+ for entry in self .node_config
208+ if entry .get ("field" ) != "maxContributionsPerUser"
209+ ]
210+
211+ # Sync changes back to workflow config
212+ self ._sync_to_workflow ()
213+
214+ def _update_node_data (self , node_data : Dict [str , Any ]) -> None :
215+ """Update individual node data in workflow config.
216+
217+ Override base class to always update config field.
218+ """
219+ # Call parent implementation first
220+ super ()._update_node_data (node_data )
221+
222+ # Always update config field, even if empty
223+ node_data ["config" ] = getattr (self , "node_config" , [])
224+
133225 @field_validator ("inputs" )
134226 @classmethod
135227 def validate_inputs (cls , v ) -> List [str ]:
0 commit comments