33
44namespace ScriptFUSION \Porter ;
55
6- use Amp \Iterator ;
76use Amp \Promise ;
87use Psr \Container \ContainerInterface ;
98use ScriptFUSION \Porter \Collection \AsyncPorterRecords ;
@@ -60,20 +59,17 @@ public function __construct(ContainerInterface $providers)
6059 *
6160 * @return PorterRecords|CountablePorterRecords Collection of records. If the total size of the collection is known,
6261 * the collection may implement Countable, otherwise PorterRecords is returned.
62+ *
63+ * @throws IncompatibleResourceException Resource emits a single record and must be imported with
64+ * importOne() instead.
6365 */
6466 public function import (ImportSpecification $ specification ): PorterRecords
6567 {
66- $ specification = clone $ specification ;
67-
68- $ records = $ this ->fetch ($ specification );
69-
70- if (!$ records instanceof ProviderRecords) {
71- $ records = $ this ->createProviderRecords ($ records , $ specification ->getResource ());
68+ if ($ specification ->getResource () instanceof SingleRecordResource) {
69+ throw IncompatibleResourceException::createMustNotImplementInterface ();
7270 }
7371
74- $ records = $ this ->transformRecords ($ records , $ specification ->getTransformers (), $ specification ->getContext ());
75-
76- return $ this ->createPorterRecords ($ records , $ specification );
72+ return $ this ->fetch ($ specification );
7773 }
7874
7975 /**
@@ -83,15 +79,16 @@ public function import(ImportSpecification $specification): PorterRecords
8379 *
8480 * @return array|null Record.
8581 *
82+ * @throws IncompatibleResourceException Resource does not implement required interface.
8683 * @throws ImportException More than one record was imported.
8784 */
8885 public function importOne (ImportSpecification $ specification ): ?array
8986 {
9087 if (!$ specification ->getResource () instanceof SingleRecordResource) {
91- throw new IncompatibleResourceException ;
88+ throw IncompatibleResourceException:: createMustImplementInterface () ;
9289 }
9390
94- $ results = $ this ->import ($ specification );
91+ $ results = $ this ->fetch ($ specification );
9592
9693 if (!$ results ->valid ()) {
9794 return null ;
@@ -106,8 +103,9 @@ public function importOne(ImportSpecification $specification): ?array
106103 return $ one ;
107104 }
108105
109- private function fetch (ImportSpecification $ specification ): \ Iterator
106+ private function fetch (ImportSpecification $ specification ): PorterRecords
110107 {
108+ $ specification = clone $ specification ;
111109 $ resource = $ specification ->getResource ();
112110 $ provider = $ this ->getProvider ($ specification ->getProviderName () ?? $ resource ->getProviderClassName ());
113111
@@ -122,9 +120,15 @@ private function fetch(ImportSpecification $specification): \Iterator
122120 ));
123121 }
124122
125- $ connector = $ provider ->getConnector ();
123+ $ records = $ resource ->fetch (ImportConnectorFactory::create ($ provider ->getConnector (), $ specification ));
124+
125+ if (!$ records instanceof ProviderRecords) {
126+ $ records = $ this ->createProviderRecords ($ records , $ specification ->getResource ());
127+ }
128+
129+ $ records = $ this ->transformRecords ($ records , $ specification ->getTransformers (), $ specification ->getContext ());
126130
127- return $ resource -> fetch (ImportConnectorFactory:: create ( $ connector , $ specification) );
131+ return $ this -> createPorterRecords ( $ records , $ specification );
128132 }
129133
130134 /**
@@ -135,24 +139,17 @@ private function fetch(ImportSpecification $specification): \Iterator
135139 *
136140 * @return AsyncPorterRecords|CountableAsyncPorterRecords Collection of records. If the total size of the
137141 * collection is known, the collection may implement Countable, otherwise AsyncPorterRecords is returned.
142+ *
143+ * @throws IncompatibleResourceException Resource emits a single record and must be imported with
144+ * importOneAsync() instead.
138145 */
139146 public function importAsync (AsyncImportSpecification $ specification ): AsyncRecordCollection
140147 {
141- $ specification = clone $ specification ;
142-
143- $ records = $ this ->fetchAsync ($ specification );
144-
145- if (!$ records instanceof AsyncProviderRecords) {
146- $ records = new AsyncProviderRecords ($ records , $ specification ->getAsyncResource ());
148+ if ($ specification ->getAsyncResource () instanceof SingleRecordResource) {
149+ throw IncompatibleResourceException::createMustNotImplementInterfaceAsync ();
147150 }
148151
149- $ records = $ this ->transformRecordsAsync (
150- $ records ,
151- $ specification ->getTransformers (),
152- $ specification ->getContext ()
153- );
154-
155- return $ this ->createAsyncPorterRecords ($ records , $ specification );
152+ return $ this ->fetchAsync ($ specification );
156153 }
157154
158155 /**
@@ -162,16 +159,17 @@ public function importAsync(AsyncImportSpecification $specification): AsyncRecor
162159 *
163160 * @return Promise<array|null> Record.
164161 *
162+ * @throws IncompatibleResourceException Resource does not implement required interface.
165163 * @throws ImportException More than one record was imported.
166164 */
167165 public function importOneAsync (AsyncImportSpecification $ specification ): Promise
168166 {
169167 return call (function () use ($ specification ) {
170168 if (!$ specification ->getAsyncResource () instanceof SingleRecordResource) {
171- throw new IncompatibleResourceException ;
169+ throw IncompatibleResourceException:: createMustImplementInterface () ;
172170 }
173171
174- $ results = $ this ->importAsync ($ specification );
172+ $ results = $ this ->fetchAsync ($ specification );
175173
176174 yield $ results ->advance ();
177175
@@ -185,8 +183,9 @@ public function importOneAsync(AsyncImportSpecification $specification): Promise
185183 });
186184 }
187185
188- private function fetchAsync (AsyncImportSpecification $ specification ): Iterator
186+ private function fetchAsync (AsyncImportSpecification $ specification ): AsyncRecordCollection
189187 {
188+ $ specification = clone $ specification ;
190189 $ resource = $ specification ->getAsyncResource ();
191190 $ provider = $ this ->getProvider ($ specification ->getProviderName () ?? $ resource ->getProviderClassName ());
192191
@@ -201,9 +200,21 @@ private function fetchAsync(AsyncImportSpecification $specification): Iterator
201200 ));
202201 }
203202
204- $ connector = $ provider ->getAsyncConnector ();
203+ $ records = $ resource ->fetchAsync (
204+ ImportConnectorFactory::create ($ provider ->getAsyncConnector (), $ specification )
205+ );
205206
206- return $ resource ->fetchAsync (ImportConnectorFactory::create ($ connector , $ specification ));
207+ if (!$ records instanceof AsyncProviderRecords) {
208+ $ records = new AsyncProviderRecords ($ records , $ specification ->getAsyncResource ());
209+ }
210+
211+ $ records = $ this ->transformRecordsAsync (
212+ $ records ,
213+ $ specification ->getTransformers (),
214+ $ specification ->getContext ()
215+ );
216+
217+ return $ this ->createAsyncPorterRecords ($ records , $ specification );
207218 }
208219
209220 /**
0 commit comments