Class ProcessingContainer_Impl

  • All Implemented Interfaces:
    Runnable, CasProcessorController, RunnableContainer, ConfigurableResource, Resource

    public class ProcessingContainer_Impl
    extends ProcessingContainer
    implements RunnableContainer
    Manages a pool of CasProcessor instances. Provides access to CasProcessor instance to Processing Thread. Processing threads check out an instance of Cas Processor and when done invoking its process() method return it back to pool. The container aggregates counts and totals on behalf of all instances of Cas Processor. It also manages error and restart thresholds for Cas Processors as a group. Errors are aggregated for all instances of Cas Processor as a group NOT individually. The container takes appropriate actions when threshold are exceeded. What action is taken depends on declaritive specification in the cpe descriptor.
    • Method Detail

      • setCasProcessorDeployer

        public void setCasProcessorDeployer​(CasProcessorDeployer aDeployer)
        Plug in deployer object used to launch/deploy the CasProcessor instance. Used for restarts.
        Specified by:
        setCasProcessorDeployer in class ProcessingContainer
        Parameters:
        aDeployer - - object responsible for deploying/launching CasProcessor
      • logAbortedCases

        public void logAbortedCases​(Object[] abortedCasList)
        Logs Cas'es that could not be processed.
        Specified by:
        logAbortedCases in class ProcessingContainer
        Parameters:
        abortedCasList - - an arrar of Cas'es that could not be processed by this CasProcessor
      • getBytesIn

        public long getBytesIn()
        Returns total number of bytes ingested so far by all CasProcessor instances managed by this container.
        Specified by:
        getBytesIn in class ProcessingContainer
        Returns:
        - bytes processed
      • addBytesIn

        public void addBytesIn​(long aBytesIn)
        Aggregate total bytes ingested by the CasProcessor.
        Specified by:
        addBytesIn in class ProcessingContainer
        Parameters:
        aBytesIn - - number of ingested bytes
      • getBytesOut

        public long getBytesOut()
        Returns total number of bytes processed so far by all CasProcessor instances managed by this container.
        Specified by:
        getBytesOut in class ProcessingContainer
        Returns:
        - bytes processed
      • addBytesOut

        public void addBytesOut​(long aBytesOut)
        Aggregate total bytes processed by this CasProcessor
        Specified by:
        addBytesOut in class ProcessingContainer
      • incrementRestartCount

        public void incrementRestartCount​(int aCount)
        Increment number of times the casProcessor was restarted due to failures
        Specified by:
        incrementRestartCount in class ProcessingContainer
        Parameters:
        aCount - - restart count
      • getRestartCount

        public int getRestartCount()
        Returns total number of all CasProcessor restarts.
        Specified by:
        getRestartCount in class ProcessingContainer
        Returns:
        number of restarts
      • incrementRetryCount

        public void incrementRetryCount​(int aCount)
        Increments number of times CasProceesor failed analyzing Cas'es due to timeout or some other problems
        Specified by:
        incrementRetryCount in class ProcessingContainer
        Parameters:
        aCount - - failure count
      • getRetryCount

        public int getRetryCount()
        Return the up todate number of retries recorded by the container.
        Specified by:
        getRetryCount in class ProcessingContainer
        Returns:
        - retry count
      • incrementAbortCount

        public void incrementAbortCount​(int aCount)
        Increment number of aborted Cas'es due to inability to process the Cas
        Specified by:
        incrementAbortCount in class ProcessingContainer
        Parameters:
        aCount - - number of aborts while processing Cas'es
      • getAbortCount

        public int getAbortCount()
        Return the up todate number of aborts recorded by the container
        Specified by:
        getAbortCount in class ProcessingContainer
        Returns:
        - number of failed attempts to analyze CAS'es
      • incrementFilteredCount

        public void incrementFilteredCount​(int aCount)
        Increments number of CAS'es filtered by the CasProcessor. Filtered CAS'es dont contain required features. Features that are required by the Cas Processor to perform analysis. Dependant feateurs are defined in the filter expression in the CPE descriptor
        Specified by:
        incrementFilteredCount in class ProcessingContainer
        Parameters:
        aCount - - number of filtered Cas'es
      • getFilteredCount

        public int getFilteredCount()
        Returns number of filtered Cas'es
        Specified by:
        getFilteredCount in class ProcessingContainer
        Returns:
        # of filtered Cas'es
      • getRemaining

        public long getRemaining()
        Returns number of entities still to be processed by the CasProcessor It is a delta of total number of entities to be processed by the CPE minus number of entities processed so far.
        Specified by:
        getRemaining in class ProcessingContainer
        Returns:
        Number of entities yet to be processed
      • setRemaining

        public void setRemaining​(long aRemainingCount)
        Copies number of entities the CasProcessor has yet to process.
        Specified by:
        setRemaining in class ProcessingContainer
        Parameters:
        aRemainingCount - - number of entities to process
      • setLastProcessedEntityId

        public void setLastProcessedEntityId​(String aEntityId)
        Copies id of the last entity processed by the CasProcessor
        Specified by:
        setLastProcessedEntityId in class ProcessingContainer
        Parameters:
        aEntityId - - id of the entity
      • incrementProcessed

        public void incrementProcessed​(int aIncrement)
      • setProcessed

        public void setProcessed​(long aProcessedCount)
        Used when recovering from checkpoint, sets the total number of entities before CPE stopped.
        Specified by:
        setProcessed in class ProcessingContainer
        Parameters:
        aProcessedCount - - number of entities processed before CPE stopped
      • getProcessed

        public long getProcessed()
        Returns number of entities processed so far.
        Specified by:
        getProcessed in class ProcessingContainer
        Returns:
        - processed - number of entities processed
      • incrementTotalTime

        public void incrementTotalTime​(long aTime)
        Increments total time spend in the process() method of the CasProcessor
        Specified by:
        incrementTotalTime in class ProcessingContainer
        Parameters:
        aTime - - total time in process()
      • getTotalTime

        public long getTotalTime()
        Returns total time spent in process()
        Specified by:
        getTotalTime in class ProcessingContainer
        Returns:
        - number of millis spent in process()
      • abortCPMOnError

        public boolean abortCPMOnError()
        Returns true if maximum threshold for errors has been exceeded and the CasProcessor is configured to force CPE shutdown. It looks at the value of the action attribute of the <errorRateThreshold> element in the cpe descriptor.
        Specified by:
        abortCPMOnError in class ProcessingContainer
        Returns:
        - true if the CPE should stop processing, false otherwise
      • incrementCasProcessorErrors

        public void incrementCasProcessorErrors​(Throwable aThrowable)
                                         throws Exception
        This routine determines what to do with an exception thrown during the CasProcessor processing. It interprets given exception and throws a new one according to configuration specified in the CPE descriptor. It examines provided thresholds and determines if the CPE should continue to run, if it should disable the CasProcessor (and all its instances), or disregard the error and continue.
        Specified by:
        incrementCasProcessorErrors in class ProcessingContainer
        Parameters:
        aThrowable - - exception to examine
        Throws:
        AbortCPMException - - force the CPE to stop processing
        AbortCasProcessorException - - disables all instances of CasProcessor in this container
        ServiceConnectionException - - forces the restart/relauch of the failed CasProcessor
        SkipCasException - - disregard error, skip bad Cas'es and continue with the next Cas bundle
        Exception
      • processCas

        public boolean processCas​(Object[] aCasList)
        Returns true if the Cas bundles should be processed by the CasProcessor. This routine checks for existance of dependent featues defined in the filter expression defined for the CasProcessor in the cpe descriptor. Currently this is done on per bundle basis. Meaning that all Cas'es must contain required features. If even one Cas does not have them, the entire bundle is skipped.
        Specified by:
        processCas in class ProcessingContainer
        Parameters:
        aCasList - - bundle containing instances of CAS
      • getCasProcessor

        public CasProcessor getCasProcessor()
        Returns available instance of the CasProcessor from the instance pool. It will wait indefinitely until an instance is available.
        Specified by:
        getCasProcessor in interface CasProcessorController
        Returns:
        CasProcessor
      • getStatus

        public int getStatus()
        Returns the current status of the CasProcessor
        Specified by:
        getStatus in interface CasProcessorController
        Returns:
        int status
      • setStatus

        public void setStatus​(int aStatus)
        Changes the status of the CasProcessor as a group
        Specified by:
        setStatus in interface CasProcessorController
        Parameters:
        aStatus - - new status
      • isAbortable

        public boolean isAbortable()
        Determines if instances of CasProcessor managed by this container are abortable. Abortable CasProcessor's action attribute in the <errorRateThreshold> element has a value of 'disable'.
        Specified by:
        isAbortable in interface CasProcessorController
        Returns:
        true if CasProcessor can be disabled
      • destroy

        public void destroy()
        Destroy instances of CasProcessors managed by this container. Before destroying the instance, this method notifies it with CollectionProcessComplete so that the component finalizes its logic and does appropriate cleanup before shutdown.
        Specified by:
        destroy in interface Resource
        Overrides:
        destroy in class Resource_ImplBase
        See Also:
        Resource.destroy()
      • run

        public void run()
        Specified by:
        run in interface Runnable
      • getConfigParameterValue

        public Object getConfigParameterValue​(String aParamName)
        Description copied from interface: ConfigurableResource
        Looks up the value of a configuration parameter. This method will only return the value of a parameter that is not defined in any group.

        This method returns null if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returns null if there is no declared configuration parameter with the specified name.

        Specified by:
        getConfigParameterValue in interface ConfigurableResource
        Parameters:
        aParamName - the name of a parameter that is not in any group
        Returns:
        the value of the parameter with name aParamName, null is either the parameter does not exist or it has not been assigned a value.
      • getConfigParameterValue

        public Object getConfigParameterValue​(String aGroupName,
                                              String aParamName)
        Description copied from interface: ConfigurableResource
        Looks up the value of a configuration parameter in a group. If the parameter has no value assigned within the group, fallback strategies will be followed.

        This method returns null if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returns null if there is no declared configuration parameter with the specified name.

        Specified by:
        getConfigParameterValue in interface ConfigurableResource
        Parameters:
        aGroupName - the name of a configuration group. If the group name is null, this method will return the same value as getParameterValue(String).
        aParamName - the name of a parameter in the group
        Returns:
        the value of the parameter in group aGroupName with name aParamName,,null is either the parameter does not exist or it has not been assigned a value.
      • setConfigParameterValue

        public void setConfigParameterValue​(String aParamName,
                                            Object aValue)
        Description copied from interface: ConfigurableResource
        Sets the value of a configuration parameter. This only works for a parameter that is not defined in any group. Note that there is no guarantee that the change will take effect until ConfigurableResource.reconfigure() is called.
        Specified by:
        setConfigParameterValue in interface ConfigurableResource
        Parameters:
        aParamName - the name of a parameter that is not in any group
        aValue - the value to assign to the parameter
      • getName

        public String getName()
        Returns the name of this container. It is the name of the Cas Processor.
        Specified by:
        getName in class ProcessingContainer
      • getAllStats

        public HashMap getAllStats()
        Returns all stats aggregate during the CPM run
        Specified by:
        getAllStats in class ProcessingContainer
        Returns:
        a map of all stats aggregated during the CPM run
      • pause

        public void pause()
        Pauses the container until resumed. The CPM will pause to the Container while it is trying to re-connect to a shared remote service. While the Container is paused getCasProcessor() will not be allowed to return a new CasProcessor. All other methods are accessible and will function fine.
        Specified by:
        pause in class ProcessingContainer
      • getFetchTime

        public long getFetchTime()