11use graph:: {
2+ anyhow:: Error ,
23 blockchain:: { block_ingestor:: CLEANUP_BLOCKS , BlockchainKind } ,
34 prelude:: {
45 anyhow:: { anyhow, bail, Context , Result } ,
5- info, serde_json, Logger , NodeId ,
6+ info,
7+ serde:: {
8+ de:: { self , value, SeqAccess , Visitor } ,
9+ Deserialize , Deserializer , Serialize ,
10+ } ,
11+ serde_json, Logger , NodeId , StoreError ,
612 } ,
713} ;
814use graph_chain_ethereum:: NodeCapabilities ;
915use graph_store_postgres:: { DeploymentPlacer , Shard as ShardName , PRIMARY_SHARD } ;
1016
1117use http:: { HeaderMap , Uri } ;
1218use regex:: Regex ;
13- use serde:: { Deserialize , Serialize } ;
1419use std:: fs:: read_to_string;
1520use std:: {
1621 collections:: { BTreeMap , BTreeSet } ,
@@ -109,12 +114,10 @@ impl Config {
109114
110115 // Check that deployment rules only reference existing stores and chains
111116 for ( i, rule) in self . deployment . rules . iter ( ) . enumerate ( ) {
112- if !self . stores . contains_key ( & rule. shard ) {
113- return Err ( anyhow ! (
114- "unknown shard {} in deployment rule {}" ,
115- rule. shard,
116- i
117- ) ) ;
117+ for shard in & rule. shards {
118+ if !self . stores . contains_key ( shard) {
119+ return Err ( anyhow ! ( "unknown shard {} in deployment rule {}" , shard, i) ) ;
120+ }
118121 }
119122 if let Some ( networks) = & rule. pred . network {
120123 for network in networks. to_vec ( ) {
@@ -809,7 +812,7 @@ impl DeploymentPlacer for Deployment {
809812 // rather than crashing the node and burying the crash in the logs
810813 let placement = match self . rules . iter ( ) . find ( |rule| rule. matches ( name, network) ) {
811814 Some ( rule) => {
812- let shard = ShardName :: new ( rule. shard . clone ( ) ) . map_err ( |e| e. to_string ( ) ) ?;
815+ let shards = rule. shard_names ( ) . map_err ( |e| e. to_string ( ) ) ?;
813816 let indexers: Vec < _ > = rule
814817 . indexers
815818 . iter ( )
@@ -818,7 +821,7 @@ impl DeploymentPlacer for Deployment {
818821 . map_err ( |( ) | format ! ( "{} is not a valid node name" , idx) )
819822 } )
820823 . collect :: < Result < Vec < _ > , _ > > ( ) ?;
821- Some ( ( vec ! [ shard ] , indexers) )
824+ Some ( ( shards , indexers) )
822825 }
823826 None => None ,
824827 } ;
@@ -830,8 +833,13 @@ impl DeploymentPlacer for Deployment {
830833struct Rule {
831834 #[ serde( rename = "match" , default ) ]
832835 pred : Predicate ,
833- #[ serde( default = "primary_store" ) ]
834- shard : String ,
836+ // For backwards compatibility, we also accept 'shard' for the shards
837+ #[ serde(
838+ alias = "shard" ,
839+ default = "primary_store" ,
840+ deserialize_with = "string_or_vec"
841+ ) ]
842+ shards : Vec < String > ,
835843 indexers : Vec < String > ,
836844}
837845
@@ -844,15 +852,22 @@ impl Rule {
844852 self . pred . matches ( name, network)
845853 }
846854
855+ fn shard_names ( & self ) -> Result < Vec < ShardName > , StoreError > {
856+ self . shards
857+ . iter ( )
858+ . cloned ( )
859+ . map ( ShardName :: new)
860+ . collect :: < Result < _ , _ > > ( )
861+ }
862+
847863 fn validate ( & self ) -> Result < ( ) > {
848864 if self . indexers . is_empty ( ) {
849865 return Err ( anyhow ! ( "useless rule without indexers" ) ) ;
850866 }
851867 for indexer in & self . indexers {
852868 NodeId :: new ( indexer) . map_err ( |( ) | anyhow ! ( "invalid node id {}" , & indexer) ) ?;
853869 }
854- ShardName :: new ( self . shard . clone ( ) )
855- . map_err ( |e| anyhow ! ( "illegal name for store shard `{}`: {}" , & self . shard, e) ) ?;
870+ self . shard_names ( ) . map_err ( Error :: from) ?;
856871 Ok ( ( ) )
857872 }
858873}
@@ -943,14 +958,46 @@ fn no_name() -> Regex {
943958 Regex :: new ( NO_NAME ) . unwrap ( )
944959}
945960
946- fn primary_store ( ) -> String {
947- PRIMARY_SHARD . to_string ( )
961+ fn primary_store ( ) -> Vec < String > {
962+ vec ! [ PRIMARY_SHARD . to_string( ) ]
948963}
949964
950965fn one ( ) -> usize {
951966 1
952967}
953968
969+ // From https://github.com/serde-rs/serde/issues/889#issuecomment-295988865
970+ fn string_or_vec < ' de , D > ( deserializer : D ) -> Result < Vec < String > , D :: Error >
971+ where
972+ D : Deserializer < ' de > ,
973+ {
974+ struct StringOrVec ;
975+
976+ impl < ' de > Visitor < ' de > for StringOrVec {
977+ type Value = Vec < String > ;
978+
979+ fn expecting ( & self , formatter : & mut fmt:: Formatter ) -> fmt:: Result {
980+ formatter. write_str ( "string or list of strings" )
981+ }
982+
983+ fn visit_str < E > ( self , s : & str ) -> Result < Self :: Value , E >
984+ where
985+ E : de:: Error ,
986+ {
987+ Ok ( vec ! [ s. to_owned( ) ] )
988+ }
989+
990+ fn visit_seq < S > ( self , seq : S ) -> Result < Self :: Value , S :: Error >
991+ where
992+ S : SeqAccess < ' de > ,
993+ {
994+ Deserialize :: deserialize ( value:: SeqAccessDeserializer :: new ( seq) )
995+ }
996+ }
997+
998+ deserializer. deserialize_any ( StringOrVec )
999+ }
1000+
9541001#[ cfg( test) ]
9551002mod tests {
9561003
0 commit comments