Uploaded image for project: 'Blazegraph (by SYSTAP)'
  1. Blazegraph (by SYSTAP)
  2. BLZG-197 BlazeGraph release 2.1 (Scale-out GA)
  3. BLZG-253

Refactor the async write API to buffer per target DS, not per target shard

    Details

      Description

      The RDF bulk data load uses the async write API. This current buffers the data for each target shard. As the number of shards increases so does the memory demand on the client. The async write API should be modified to buffer per target node rather than per target shard.

      As a workaround, the nominal size of a shard can be increased from its default configuration value of ~ 200M. This will reduce the number of shards in the system and therefore reduce the memory demand on the client. However, it increases the effort when performing merges and splits for large index partitions (aka shards).

      In the configuration file, change:

      static private partitionSizeMultiplier = 1;

      to

      static private partitionSizeMultiplier = 2;

      to double the effective maximum shard size. However note that this can exacerbate the journal over extension issue [1] (which has been mitigated by improving the index build performance).

      This issue is related to [2] and [3].

      [1] https://sourceforge.net/apps/trac/bigdata/ticket/20
      [2] https://sourceforge.net/apps/trac/bigdata/ticket/40
      [3] https://sourceforge.net/apps/trac/bigdata/ticket/35

        Activity

        Hide
        bryanthompson bryanthompson added a comment -

        We've been working on this interlocking set of problems off and on for the last few months. When the client puts out enough workload it causes the journal on the data services to overextend, so it takes longer and longer to clear the buffered writes. Faster builds helped here. Larger partitions hurts, since the builds are heavier.

        Once I finish the disk concurrency hot spots I can swap back to this issue. Better scheduling of builds (and managing of build resources) will help, but the data services really need to be able to throttle the clients to address the problem correctly. There is an issue for this, which deals with factoring the payload out of the RMI message so we can put flow control on the data movement. The RMI message already returns a Future for the request, but the client has loaded the DS by that point so we are not throttling the clients in a resource efficient manner. Once we finish that set of interlocking issues this problem will finally disappear and that should remove a major barrier to robust operation at very large scale.

        This problem is often made worse by clearing up some bottleneck in the system since that allows clients to put more load on the data services.

        Show
        bryanthompson bryanthompson added a comment - We've been working on this interlocking set of problems off and on for the last few months. When the client puts out enough workload it causes the journal on the data services to overextend, so it takes longer and longer to clear the buffered writes. Faster builds helped here. Larger partitions hurts, since the builds are heavier. Once I finish the disk concurrency hot spots I can swap back to this issue. Better scheduling of builds (and managing of build resources) will help, but the data services really need to be able to throttle the clients to address the problem correctly. There is an issue for this, which deals with factoring the payload out of the RMI message so we can put flow control on the data movement. The RMI message already returns a Future for the request, but the client has loaded the DS by that point so we are not throttling the clients in a resource efficient manner. Once we finish that set of interlocking issues this problem will finally disappear and that should remove a major barrier to robust operation at very large scale. This problem is often made worse by clearing up some bottleneck in the system since that allows clients to put more load on the data services.
        Hide
        bryanthompson bryanthompson added a comment -

        The main implementation of the high-throughput API is

        com.bigdata.rdf.rio.AsynchronousStatementBufferFactory

        This is based on

        com.bigdata.service.ndx.IAsynchronousWriteBufferFactory

        Which is implemented by:

        com.bigdata.service.ndx.ClientIndexView.

        The underlying implementation, and the part which would need to be reworked first, is in the following package:

        com.bigdata.service.ndx.pipeline

        This package is relatively general and the unit tests do not presume key-range sharding, so it is possible to establish a different partitioning scheme for the data. For the purposes of this issue, the desired partitioning scheme should be the data service UUID for the target data service. Thus, all writes which are mapped onto the same data service would be buffered by the same buffer.

        The next step would be to refactor the AsynchronousStatementBufferFactory to use the per-data-service buffering.

        Since the data service will now be receiving writes bound for multiple local shards, we also need to change how we transmit the data, accept it, and process it on the data service. Today, the data is transmitted inside of an RMI object which targets a specific shard. If the shard has been moved, split, or joined, then the client handles the error, which may includes repartitioning the data into different buffers when we handle a shard move, and then reissues the request to the appropriate data service.

        This needs to be changed to decouple the data flow and the message flow [1], not only to improve the performance, but also to pass along the handling of the per-shard processing to the data service since it will now receive tuples bound for different local shards in the same data transfer.

        The client side of the code should be modified to be a single-threaded NIO transfer of the buffered data to the target data service. The target data service should use a fixed #of local NIO buffers to retain data from the clients to bound its memory demands. A strategy will need to be developed to (a) move the data from these buffers onto the target shards in an efficient and timely manner; and (b) handle redirects if the target shards have been moved without requiring the client to reissue all writes against a data service if only one shard has been moved/split/joined and without losing writes if the leader for the data service quorum (in HA mode) fails over.

        [1] https://sourceforge.net/apps/trac/bigdata/ticket/40

        Show
        bryanthompson bryanthompson added a comment - The main implementation of the high-throughput API is com.bigdata.rdf.rio.AsynchronousStatementBufferFactory This is based on com.bigdata.service.ndx.IAsynchronousWriteBufferFactory Which is implemented by: com.bigdata.service.ndx.ClientIndexView. The underlying implementation, and the part which would need to be reworked first, is in the following package: com.bigdata.service.ndx.pipeline This package is relatively general and the unit tests do not presume key-range sharding, so it is possible to establish a different partitioning scheme for the data. For the purposes of this issue, the desired partitioning scheme should be the data service UUID for the target data service. Thus, all writes which are mapped onto the same data service would be buffered by the same buffer. The next step would be to refactor the AsynchronousStatementBufferFactory to use the per-data-service buffering. Since the data service will now be receiving writes bound for multiple local shards, we also need to change how we transmit the data, accept it, and process it on the data service. Today, the data is transmitted inside of an RMI object which targets a specific shard. If the shard has been moved, split, or joined, then the client handles the error, which may includes repartitioning the data into different buffers when we handle a shard move, and then reissues the request to the appropriate data service. This needs to be changed to decouple the data flow and the message flow [1] , not only to improve the performance, but also to pass along the handling of the per-shard processing to the data service since it will now receive tuples bound for different local shards in the same data transfer. The client side of the code should be modified to be a single-threaded NIO transfer of the buffered data to the target data service. The target data service should use a fixed #of local NIO buffers to retain data from the clients to bound its memory demands. A strategy will need to be developed to (a) move the data from these buffers onto the target shards in an efficient and timely manner; and (b) handle redirects if the target shards have been moved without requiring the client to reissue all writes against a data service if only one shard has been moved/split/joined and without losing writes if the leader for the data service quorum (in HA mode) fails over. [1] https://sourceforge.net/apps/trac/bigdata/ticket/40
        Hide
        bryanthompson bryanthompson added a comment -

        Other workaround strategies include:


        - Reduce the buffer size on the clients such that they buffer fewer tuples per target shard. This will decrease the throughput somewhat since performance is best with larger ordered writes on a shard.


        - Use a fixed buffer capacity and allocate that capacity among the target shards. This would cause throughput to decrease more gradually in proportion to the data scale.


        - If the target size on the disk of a shard is increased, the it may be necessary to examine more efficient index segment build operations based on continuous IO transfers of just the leaves of the index segments. This should reduce the costs of an index segment builds significantly, which would allow the system to keep current with larger shards. Also, note that the dynamic shard refactoring changes should help here by running index segment builds and merges with only limited concurrency, but the effect of that change has not been explored yet on a large cluster.


        - The non-blocking IO pattern described above would also help by reducing the Thread count on the client, but that is not the primary limit at this time.

        Show
        bryanthompson bryanthompson added a comment - Other workaround strategies include: - Reduce the buffer size on the clients such that they buffer fewer tuples per target shard. This will decrease the throughput somewhat since performance is best with larger ordered writes on a shard. - Use a fixed buffer capacity and allocate that capacity among the target shards. This would cause throughput to decrease more gradually in proportion to the data scale. - If the target size on the disk of a shard is increased, the it may be necessary to examine more efficient index segment build operations based on continuous IO transfers of just the leaves of the index segments. This should reduce the costs of an index segment builds significantly, which would allow the system to keep current with larger shards. Also, note that the dynamic shard refactoring changes should help here by running index segment builds and merges with only limited concurrency, but the effect of that change has not been explored yet on a large cluster. - The non-blocking IO pattern described above would also help by reducing the Thread count on the client, but that is not the primary limit at this time.
        Hide
        bryanthompson bryanthompson added a comment -

        Closed. Not relevant to the new architecture.

        Show
        bryanthompson bryanthompson added a comment - Closed. Not relevant to the new architecture.

          People

          • Assignee:
            Unassigned
            Reporter:
            bryanthompson bryanthompson
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved: