Details

      Description

      Unselective operations on the cluster can use too many threads.

      There are a number of optimizations to be made in this regard, including vectoring operations by node rather than by shard.

      However, the most basic problem is that the interaction between the acceptService on the FederatedQueryEngine and the per-(operator+shard) blocking queues for materialized chunk messages in a ChunkedRunningQuery.

      When the acceptService uses an unbounded thread pool, the size of the accept service can grow extremely rapidly in response to an unselective query, such as "select (count as ?count) {?s ?p ?o}". (The thread pool can quickly grow to over 30k threads, at which point the OS can be unable to start new threads.) The reason that the thread pool grows so large is undoubtedly that the individual MaterializeMessageTasks are blocking as the per-(operator,shard) queues fill up. In fact, for this query, there might be precisely one such queue which is the bottleneck for all pipeline join operators, which is the pipelined aggregation operator. Such, any backlog in counting the solution sets turns into a growth in the thread pool as the data services produce new solution set chunks faster than they can be counted. This should be verified empirically be observing how many parallel executions of the pipeline aggregation operator are running (it might not even permit parallel evaluation, through it is possible to evaluate this specific aggregation function in parallel) and listing out those queues which are blocked.

      Using a bounded thread pool does not appear to have any negative effect on throughput for selective queries (such as the BSBM explore use case). However, it can lead to stagnation for unselective queries. Stagnation arises because the MaterializeMessageTasks running on a bounded acceptService will block in the per- (operator,shard) blocking queues become full. This can prevent a query engine from accepting any more IChunkMessages (chunks of solutions). It is not immediately clear whether this results in a deadlock or simply starves throughput.

      The reason for the blocking per- (operator,shard) queue is to limit the amount of data on the heap. However, due to the pattern in which the bufferReady() fires off an asynchronous task on the acceptTaskService, the producer does not wind up blocking on the cluster. On a single machine, the producer DOES wind up blocking since the IChunkHandler calls through to QueryEngine#acceptChunk() and that will block if the target per-(operator,shard) blocking queue is full.

      One possible solution is to have bufferReady() run the MaterializeMessageTask in the caller's thread. That will force the producer to block. Another possibility is to use a policy for the acceptService which runs the task in the caller's thread if the thread pool is fully tasked.

      Another direction is to materialize the data on the MemoryManager rather than the JVM heap. That would make it possible to materialize very large intermediate solution sets without incurring any GC overhead.

      See https://sourceforge.net/apps/trac/bigdata/ticket/471 (Parallelize aggregation operations on cluster)

        Activity

        Hide
        bryanthompson bryanthompson added a comment -

        I've thought about this a bit more. The proposal to run tasks in the caller's thread if the thread pool is full is not going to work. While this would cause the producer to block, which it is not doing right now, it would cause all producers to block if any query engine has a full accept queue, which is not right.

        The dynamics right now are that the acceptTaskService becomes full when ANY (operator, shard) blocking queue becomes full. Therefore, it would be MUCH better to ALWAYS run the task in the caller's thread (at least, if the task is a "thick" chunk message). Since the data are already materialized, the only concern is whether the producer should block because the target (operator,shard) is blocked. In addition, we should probably parallelize the distribution of the chunk messages by the producer in order to reduce the latency when mapping the results to the nodes which are the consumer(s).

        The case of NIO transfers of the intermediate solutions is different. In that case we need to figure out the timing for transferring the intermediate solutions to the target query engine for evaluation. Currently, anything with more than 100 solutions is delivered using NIO. Maybe this should be changed to have NIO used only for operators which need to have all solutions materialized before they can execute. Under that scenario, the solutions are not required until the upstream operators is done and we kick off the "runOnce" evaluation for the analytic operator. My inclination is to defer the transfer until the upstream operator is done but it might be easier to transfer data asynchronously, building it up on an HTree which is then used to feed the "runOnce" operator evaluation.

        Show
        bryanthompson bryanthompson added a comment - I've thought about this a bit more. The proposal to run tasks in the caller's thread if the thread pool is full is not going to work. While this would cause the producer to block, which it is not doing right now, it would cause all producers to block if any query engine has a full accept queue, which is not right. The dynamics right now are that the acceptTaskService becomes full when ANY (operator, shard) blocking queue becomes full. Therefore, it would be MUCH better to ALWAYS run the task in the caller's thread (at least, if the task is a "thick" chunk message). Since the data are already materialized, the only concern is whether the producer should block because the target (operator,shard) is blocked. In addition, we should probably parallelize the distribution of the chunk messages by the producer in order to reduce the latency when mapping the results to the nodes which are the consumer(s). The case of NIO transfers of the intermediate solutions is different. In that case we need to figure out the timing for transferring the intermediate solutions to the target query engine for evaluation. Currently, anything with more than 100 solutions is delivered using NIO. Maybe this should be changed to have NIO used only for operators which need to have all solutions materialized before they can execute. Under that scenario, the solutions are not required until the upstream operators is done and we kick off the "runOnce" evaluation for the analytic operator. My inclination is to defer the transfer until the upstream operator is done but it might be easier to transfer data asynchronously, building it up on an HTree which is then used to feed the "runOnce" operator evaluation.
        Hide
        bryanthompson bryanthompson added a comment -

        I have tested out a change which runs the MaterializeMessageTask in the caller's thread. This provides slightly better BSBM explore use case throughput and completely resolves the high thread count problem for the COUNT query.

        Committed Revision r5966.

        Show
        bryanthompson bryanthompson added a comment - I have tested out a change which runs the MaterializeMessageTask in the caller's thread. This provides slightly better BSBM explore use case throughput and completely resolves the high thread count problem for the COUNT query. Committed Revision r5966.

          People

          • Assignee:
            bryanthompson bryanthompson
            Reporter:
            bryanthompson bryanthompson
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved: