77use Flowpack \ElasticSearch \ContentRepositoryQueueIndexer \IndexingJob ;
88use Flowpack \ElasticSearch \ContentRepositoryQueueIndexer \LoggerTrait ;
99use Flowpack \ElasticSearch \ContentRepositoryQueueIndexer \UpdateAliasJob ;
10+ use Flowpack \ElasticSearch \Domain \Model \Mapping ;
11+ use Flowpack \JobQueue \Common \Exception ;
1012use Flowpack \JobQueue \Common \Job \JobManager ;
1113use Flowpack \JobQueue \Common \Queue \QueueManager ;
1214use Flowpack \ElasticSearch \Domain \Model \Mapping ;
1315use Flowpack \JobQueue \Common \Exception as JobQueueException ;
16+ use Neos \ContentRepository \Domain \Repository \WorkspaceRepository ;
1417use Neos \Flow \Annotations as Flow ;
1518use Neos \Flow \Cli \CommandController ;
1619use Neos \Flow \Persistence \PersistenceManagerInterface ;
17- use Neos \ContentRepository \Domain \Repository \WorkspaceRepository ;
1820use Neos \Utility \Files ;
1921
2022/**
@@ -28,7 +30,6 @@ class NodeIndexQueueCommandController extends CommandController
2830
2931 const BATCH_QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer ' ;
3032 const LIVE_QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.Live ' ;
31- const DEFAULT_BATCH_SIZE = 500 ;
3233
3334 /**
3435 * @var JobManager
@@ -73,23 +74,25 @@ class NodeIndexQueueCommandController extends CommandController
7374 protected $ nodeIndexer ;
7475
7576 /**
76- * @Flow\InjectConfiguration(package="Flowpack.ElasticSearch.ContentRepositoryQueueIndexer ")
77- * @var array
77+ * @Flow\InjectConfiguration(path="batchSize ")
78+ * @var int
7879 */
79- protected $ settings ;
80+ protected $ batchSize ;
8081
8182 /**
8283 * Index all nodes by creating a new index and when everything was completed, switch the index alias.
8384 *
8485 * @param string $workspace
86+ * @throws \Flowpack\JobQueue\Common\Exception
87+ * @throws \Neos\Flow\Mvc\Exception\StopActionException
88+ * @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
8589 */
8690 public function buildCommand ($ workspace = null )
8791 {
8892 $ indexPostfix = time ();
8993 $ indexName = $ this ->createNextIndex ($ indexPostfix );
9094 $ this ->updateMapping ();
9195
92-
9396 $ this ->outputLine ();
9497 $ this ->outputLine ('<b>Indexing on %s ...</b> ' , [$ indexName ]);
9598
@@ -123,6 +126,7 @@ public function buildCommand($workspace = null)
123126 * @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits
124127 * @param bool $verbose Output debugging information
125128 * @return void
129+ * @throws \Neos\Flow\Mvc\Exception\StopActionException
126130 */
127131 public function workCommand ($ queue = 'batch ' , $ exitAfter = null , $ limit = null , $ verbose = false )
128132 {
@@ -154,8 +158,8 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
154158 }
155159 try {
156160 $ message = $ this ->jobManager ->waitAndExecute ($ queueName , $ timeout );
157- } catch (JobQueueException $ exception ) {
158- $ numberOfJobExecutions ++;
161+ } catch (Exception $ exception ) {
162+ $ numberOfJobExecutions ++;
159163 $ this ->outputLine ('<error>%s</error> ' , [$ exception ->getMessage ()]);
160164 if ($ verbose && $ exception ->getPrevious () instanceof \Exception) {
161165 $ this ->outputLine (' Reason: %s ' , [$ exception ->getPrevious ()->getMessage ()]);
@@ -165,7 +169,7 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
165169 $ this ->quit (1 );
166170 }
167171 if ($ message !== null ) {
168- $ numberOfJobExecutions ++;
172+ $ numberOfJobExecutions ++;
169173 if ($ verbose ) {
170174 $ messagePayload = strlen ($ message ->getPayload ()) <= 50 ? $ message ->getPayload () : substr ($ message ->getPayload (), 0 , 50 ) . '... ' ;
171175 $ this ->outputLine ('<success>Successfully executed job "%s" (%s)</success> ' , [$ message ->getIdentifier (), $ messagePayload ]);
@@ -183,7 +187,6 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
183187 }
184188 $ this ->quit ();
185189 }
186-
187190 } while (true );
188191 }
189192
@@ -192,8 +195,12 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
192195 */
193196 public function flushCommand ()
194197 {
195- $ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->flush ();
196- $ this ->outputSystemReport ();
198+ try {
199+ $ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->flush ();
200+ $ this ->outputSystemReport ();
201+ } catch (Exception $ exception ) {
202+ $ this ->outputLine ('An error occurred: %s ' , [$ exception ->getMessage ()]);
203+ }
197204 $ this ->outputLine ();
198205 }
199206
@@ -207,7 +214,11 @@ protected function outputSystemReport()
207214 $ time = microtime (true ) - $ _SERVER ["REQUEST_TIME_FLOAT " ];
208215 $ this ->outputLine ('Execution time : %s seconds ' , [$ time ]);
209216 $ this ->outputLine ('Indexing Queue : %s ' , [self ::BATCH_QUEUE_NAME ]);
210- $ this ->outputLine ('Pending Jobs : %s ' , [$ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->count ()]);
217+ try {
218+ $ this ->outputLine ('Pending Jobs : %s ' , [$ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->count ()]);
219+ } catch (Exception $ exception ) {
220+ $ this ->outputLine ('Pending Jobs : Error, queue not found, %s ' , [$ exception ->getMessage ()]);
221+ }
211222 }
212223
213224 /**
@@ -219,16 +230,19 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
219230 $ this ->outputLine ('<info>++</info> Indexing %s workspace ' , [$ workspaceName ]);
220231 $ nodeCounter = 0 ;
221232 $ offset = 0 ;
222- $ batchSize = $ this ->settings ['batchSize ' ] ?? static ::DEFAULT_BATCH_SIZE ;
223233 while (true ) {
224- $ iterator = $ this ->nodeDataRepository ->findAllBySiteAndWorkspace ($ workspaceName , $ offset , $ batchSize );
234+ $ iterator = $ this ->nodeDataRepository ->findAllBySiteAndWorkspace ($ workspaceName , $ offset , $ this -> batchSize );
225235
226236 $ jobData = [];
227237
228238 foreach ($ this ->nodeDataRepository ->iterate ($ iterator ) as $ data ) {
229239 $ jobData [] = [
230- 'nodeIdentifier ' => $ data ['nodeIdentifier ' ],
231- 'dimensions ' => $ data ['dimensions ' ]
240+ 'persistenceObjectIdentifier ' => $ data ['persistenceObjectIdentifier ' ],
241+ 'identifier ' => $ data ['identifier ' ],
242+ 'dimensions ' => $ data ['dimensions ' ],
243+ 'workspace ' => $ workspaceName ,
244+ 'nodeType ' => $ data ['nodeType ' ],
245+ 'path ' => $ data ['path ' ],
232246 ];
233247 $ nodeCounter ++;
234248 }
@@ -240,7 +254,7 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
240254 $ indexingJob = new IndexingJob ($ indexPostfix , $ workspaceName , $ jobData );
241255 $ this ->jobManager ->queue (self ::BATCH_QUEUE_NAME , $ indexingJob );
242256 $ this ->output ('. ' );
243- $ offset += $ batchSize ;
257+ $ offset += $ this -> batchSize ;
244258 $ this ->persistenceManager ->clearState ();
245259 }
246260 $ this ->outputLine ();
@@ -251,17 +265,22 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
251265 /**
252266 * @param string $indexPostfix
253267 * @return string
268+ * @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
254269 */
255270 protected function createNextIndex ($ indexPostfix )
256271 {
257272 $ this ->nodeIndexer ->setIndexNamePostfix ($ indexPostfix );
258273 $ this ->nodeIndexer ->getIndex ()->create ();
259274 $ this ->log (sprintf ('action=indexing step=index-created index=%s ' , $ this ->nodeIndexer ->getIndexName ()), LOG_INFO );
275+
260276 return $ this ->nodeIndexer ->getIndexName ();
261277 }
262278
263279 /**
264280 * Update Index Mapping
281+ *
282+ * @return void
283+ * @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
265284 */
266285 protected function updateMapping ()
267286 {
0 commit comments