Lecture Notes in Computer Science Commenced Publication in 1973 Founding and Former Series Editors: Gerhard Goos, Juris Hartmanis, and Jan van Leeuwen
Editorial Board David Hutchison Lancaster University, UK Takeo Kanade Carnegie Mellon University, Pittsburgh, PA, USA Josef Kittler University of Surrey, Guildford, UK Jon M. Kleinberg Cornell University, Ithaca, NY, USA Alfred Kobsa University of California, Irvine, CA, USA Friedemann Mattern ETH Zurich, Switzerland John C. Mitchell Stanford University, CA, USA Moni Naor Weizmann Institute of Science, Rehovot, Israel Oscar Nierstrasz University of Bern, Switzerland C. Pandu Rangan Indian Institute of Technology, Madras, India Bernhard Steffen TU Dortmund University, Germany Madhu Sudan Microsoft Research, Cambridge, MA, USA Demetri Terzopoulos University of California, Los Angeles, CA, USA Doug Tygar University of California, Berkeley, CA, USA Gerhard Weikum Max-Planck Institute of Computer Science, Saarbruecken, Germany
5898
Guang R. Gao Lori L. Pollock John Cavazos Xiaoming Li (Eds.)
Languages and Compilers for Parallel Computing 22nd International Workshop, LCPC 2009 Newark, DE, USA, October 8-10, 2009 Revised Selected Papers
13
Volume Editors Guang R. Gao University of Delaware Department of Electrical and Computer Engineering Newark, DE 19716, USA E-mail:
[email protected] Lori L. Pollock University of Delaware Department of Computer and Information Sciences Newark, DE 19716, USA E-mail:
[email protected] John Cavazos University of Delaware Department of Computer and Information Sciences Newark, DE 19716, USA E-mail:
[email protected] Xiaoming Li University of Delaware Department of Electrical and Computer Engineering Newark, DE 19716, USA E-mail:
[email protected]
Library of Congress Control Number: 2010927404 CR Subject Classification (1998): D.1.3, C.2.4, D.4.2, H.3.4, D.2 LNCS Sublibrary: SL 1 – Theoretical Computer Science and General Issues ISSN ISBN-10 ISBN-13
0302-9743 3-642-13373-8 Springer Berlin Heidelberg New York 978-3-642-13373-2 Springer Berlin Heidelberg New York
This work is subject to copyright. All rights are reserved, whether the whole or part of the material is concerned, specifically the rights of translation, reprinting, re-use of illustrations, recitation, broadcasting, reproduction on microfilms or in any other way, and storage in data banks. Duplication of this publication or parts thereof is permitted only under the provisions of the German Copyright Law of September 9, 1965, in its current version, and permission for use must always be obtained from Springer. Violations are liable to prosecution under the German Copyright Law. springer.com © Springer-Verlag Berlin Heidelberg 2010 Printed in Germany Typesetting: Camera-ready by author, data conversion by Scientific Publishing Services, Chennai, India Printed on acid-free paper 06/3180
Preface
It is our pleasure to present the papers accepted for the 22nd International Workshop on Languages and Compilers for Parallel Computing held during October 8–10 2009 in Newark Delaware, USA. Since 1986, LCPC has became a valuable venue for researchers to report on work in the general area of parallel computing, high-performance computer architecture and compilers. LCPC 2009 continued this tradition and in particular extended the area of interest to new parallel computing accelerators such as the IBM Cell Processor and Graphic Processing Unit (GPU). This year we received 52 submissions from 15 countries. Each submission received at least three reviews and most had four. The PC also sought additional external reviews for contentious papers. The PC held an all-day phone conference on August 24 to discuss the papers. PC members who had a conflict of interest were asked to leave the call temporarily when the corresponding papers were discussed. From the 52 submissions, the PC selected 25 full papers and 5 short papers to be included in the workshop proceeding, representing a 58% acceptance rate. We were fortunate to have three keynote speeches, a panel discussion and a tutorial in this year’s workshop. First, Thomas Sterling, Professor of Computer Science at Louisiana State University, gave a keynote talk titled “HPC in Phase Change: Towards a New Parallel Execution Model.” Sterling argued that a new multi-dimensional research thrust was required to realize the design goals with regard to power, complexity, clock rate and reliability in the new parallel computer systems. ParalleX, an exploratory execution model developed by Sterling’s group was introduced to guide the co-design of new architectures, programming methods and system software. The second keynote talk, “The Polytope Model, Past, Present, Future,” presented by Paul Feautrier from Ecole Normale Superieure De Lyon, France, reviewed the history of the polytope model from the perspectives of its motivations, its applications in program transformations, and its limitations. Feautrier also shared with the audience his visions of the future of the polytope model and highlighted several important unsolved problems. Third, Bill Carlson from the IDA Center for Computing Sciences gave a keynote talk on the parallel programming paradigm UPC. As one of the originators of UPC, Carlson illustrated the intentions of UPC, its applications, and its role in the new generation of high-performance computer architectures. This talk provided LCPC attendees with an insightful perspective on the history, current status and future of UPC. A special panel was held on Thursday evening to stimulate discussion among the LCPC attendees on the meaning of compiler optimizations in the new world of many-core-based computer systems. This panel was organized and moderated
VI
Preface
by Xiaoming Li from the University of Delaware and five leading researchers from both academia and industry shared their perspectives on the major challenges of compiler optimization in view of the rapid evolution of computer architecture and system software. The panel include Albert Cohen (INRIA, France), Hironori Kasahara (Waseda University, Japan), Rishi Khan (ETI), David Padua (University of Illinois at Urbana-Champaign), and Nicolas Vasilache (Reservoir Inc.). We were also fortunate to be able to invite a distinguished group of researchers to give a tutorial on “SSA-Based Register Allocation” on the last day of the workshop. The tutorial introduced the SSA-based register allocation technique, it properties and complexities, enabling analysis techniques and its applications in compilers. The presenters were Philip Brisk (EPFL), Jens Palsberg (UCLA), Fabrice Rastello (ENS Lyon), Sebastian Hack (Saarland University, Germany), and Florent Bouchez (Indian Institute of Science, India). We would like to thank the many people whose valuable time and effort made LCPC 2009 a success. We first want to thank all authors who contributed papers to the workshop. Furthermore, the success of LCPC is unimaginable without the passionate commitment of David Padua, the Steering Committee, as well as the great effort of the Program Committee members and external reviewers. We also want to express our gratitude to ET International, HP, IBM, NVIDIA and Reservoir Labs, whose financial support made the workshop a pleasant experience. Finally, the quality organization of the workshop owed much to a group of outstanding volunteers led by Xu Wang. October 2009
Lori Pollock Guang R. Gao
Organization
LCPC 2009 was organized by the Department of Computer and Information Science and the Department of Electrical and Computer Engineering at the University of Delaware.
Steering Committee Rudolf Eigenmann Alex Nicolau David Padua Lawrence Rauchwerger
Purdue University University of California at Irvine University of Illinois at Urbana-Champaign Texas A&M University
Program Committee Jose Nelson Amaral Saman Amarasinghe Eduard Ayguad Hans J. Boehm Calin Cascaval John Cavazos Dan Connors Keith Cooper Maria Garzaran Mary Hall William Jalby Hironori Kasahara Jenq Kuen Lee Xiaoming Li John Mellor-Crummey Michael O’Boyle Paul Petersen Keshav Pingali Vivek Sarkar Vugranam Sreedhar
Sponsoring Institutions ET International, Inc. Hewlett-Packard Corp. IBM Corp. NVIDIA Corp. Reservoir Labs, Inc.
University of Alberta, Canada MIT, USA UPC, Spain HP, USA IBM, USA University of Delaware, USA University of of Colorado, USA Rice University, USA University of Illinois, USA University of Utah, USA University of Versailles, France Waseda University, Japan National TsingHua University, Taiwan University of Delaware, USA Rice University, USA University of Edinburgh, UK Intel, USA University of Texas, USA Rice University, USA IBM, USA
Table of Contents
A Communication Framework for Fault-Tolerant Parallel Execution . . . . Nagarajan Kanna, Jaspal Subhlok, Edgar Gabriel, Eshwar Rohit, and David Anderson
1
The STAPL pList . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Gabriel Tanase, Xiabing Xu, Antal Buss, Harshvardhan, Ioannis Papadopoulos, Olga Pearce, Timmie Smith, Nathan Thomas, Mauro Bianco, Nancy M. Amato, and Lawrence Rauchwerger
16
Hardware Support for OpenMP Collective Operations . . . . . . . . . . . . . . . . Soohong P. Kim, Samuel P. Midkiff, and Henry G. Dietz
31
Loop Transformation Recipes for Code Generation and Auto-Tuning . . . Mary Hall, Jacqueline Chame, Chun Chen, Jaewook Shin, Gabe Rudy, and Malik Murtaza Khan
50
MIMD Interpretation on a GPU . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Henry G. Dietz and B. Dalton Young
65
TL-DAE: Thread-Level Decoupled Access/Execution for OpenMP on the Cyclops-64 Many-Core Processor . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Ge Gan and Joseph Manzano
80
Mapping Streaming Languages to General Purpose Processors through Vectorization . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Raymond Manley and David Gregg
95
A Balanced Approach to Application Performance Tuning . . . . . . . . . . . . . Souad Koliai, St´ephane Zuckerman, Emmanuel Oseret, Micka¨el Ivascot, Tipp Moseley, Dinh Quang, and William Jalby
111
Automatically Tuning Parallel and Parallelized Programs . . . . . . . . . . . . . Chirag Dave and Rudolf Eigenmann
126
DFT Performance Prediction in FFTW . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Liang Gu and Xiaoming Li
140
Safe and Familiar Multi-core Programming by Means of a Hybrid Functional and Imperative Language . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Ronald Veldema and Michael Philippsen
157
Hierarchical Place Trees: A Portable Abstraction for Task Parallelism and Data Movement . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Yonghong Yan, Jisheng Zhao, Yi Guo, and Vivek Sarkar
172
X
Table of Contents
OSCAR API for Real-Time Low-Power Multicores and Its Performance on Multicores and SMP Servers . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Keiji Kimura, Masayoshi Mase, Hiroki Mikami, Takamichi Miyamoto, Jun Shirako, and Hironori Kasahara Programming with Intervals . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Nicholas D. Matsakis and Thomas R. Gross Adaptive and Speculative Memory Consistency Support for Multi-core Architectures with On-Chip Local Memories . . . . . . . . . . . . . . . . . . . . . . . . . Nikola Vujic, Lluc Alvarez, Marc Gonzalez Tallada, Xavier Martorell, and Eduard Ayguad´e Synchronization-Free Automatic Parallelization: Beyond Affine Iteration-Space Slicing . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Anna Beletska, Wlodzimierz Bielecki, Albert Cohen, and Marek Palkowski Automatic Data Distribution for Improving Data Locality on the Cell BE Architecture . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Miao Wang, Fran¸cois Bodin, and S´ebastien Matz Automatic Restructuring of Linked Data Structures . . . . . . . . . . . . . . . . . . Harmen L.A. van der Spek, C.W. Mattias Holm, and Harry A.G. Wijshoff Using the Meeting Graph Framework to Minimise Kernel Loop Unrolling for Scheduled Loops . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Mounira Bachir, David Gregg, and Sid-Ahmed-Ali Touati Efficient Tiled Loop Generation: D-Tiling . . . . . . . . . . . . . . . . . . . . . . . . . . . DaeGon Kim and Sanjay Rajopadhye Effective Source-to-Source Outlining to Support Whole Program Empirical Optimization . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Chunhua Liao, Daniel J. Quinlan, Richard Vuduc, and Thomas Panas
188
203
218
233
247 263
278 293
308
Speculative Optimizations for Parallel Programs on Multicores . . . . . . . . Vijay Nagarajan and Rajiv Gupta
323
Fastpath Speculative Parallelization . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Michael F. Spear, Kirk Kelsey, Tongxin Bai, Luke Dalessandro, Michael L. Scott, Chen Ding, and Peng Wu
338
PSnAP: Accurate Synthetic Address Streams through Memory Profiles . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Catherine Mills Olschanowsky, Mustafa M. Tikir, Laura Carrington, and Allan Snavely
353
Table of Contents
Enforcing Textual Alignment of Collectives Using Dynamic Checks . . . . Amir Kamil and Katherine Yelick
XI
368
A Code Generation Approach for Auto-Vectorization in the Spade Compiler . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Huayong Wang, Henrique Andrade, Bu˘gra Gedik, and Kun-Lung Wu
383
Portable Just-in-Time Specialization of Dynamically Typed Scripting Languages . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Kevin Williams, Jason McCandless, and David Gregg
391
Reducing Training Time in a One-Shot Machine Learning-Based Compiler . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . John Thomson, Michael O’Boyle, Grigori Fursin, and Bj¨ orn Franke
399
Optimizing Local Memory Allocation and Assignment through a Decoupled Approach . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Boubacar Diouf, Ozcan Ozturk, and Albert Cohen
408
Unrolling Loops Containing Task Parallelism . . . . . . . . . . . . . . . . . . . . . . . . Roger Ferrer, Alejandro Duran, Xavier Martorell, and Eduard Ayguad´e
416
Author Index . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
425
A Communication Framework for Fault-Tolerant Parallel Execution Nagarajan Kanna1, , Jaspal Subhlok1 , Edgar Gabriel1 , Eshwar Rohit1 , and David Anderson2 1
Department of Computer Science, University of Houston 2 UC Berkeley Space Sciences Laboratory
Abstract. PC grids represent massive computation capacity at a low cost, but are challenging to employ for parallel computing because of variable and unpredictable performance and availability. A communicating parallel program must employ checkpoint-restart and/or process redundancy to make continuous forward progress in such an unreliable environment. A communication model based on one-sided Put/Get calls, pioneered by the Linda system, is a good match as processes can execute their communication operations independently and asynchronously. However, Linda and its many variants are not designed for communicating processes that are replicated or independently restarted from checkpoints. The key problem is that a single logical operation that impacts the global program state may be executed by different instances of the same process at different times leading to semantic inconsistency. This paper presents the design, execution model, implementation, and validation of a communication layer for robust execution on volatile nodes. The research leads to a practical way to employ idle PCs for latency tolerant parallel computing applications.
1
Introduction
In recent years ordinary desktops and PCs have been employed successfully for large scale scientific computing, most commonly using Condor [1] or BOINC [2] as middleware. The Condor scheduler enables ordinary desktops to be employed for compute intensive applications. It is deployed at over 850 known sites with at least 125,000 hosts around the world. The BOINC middleware uses volunteered public PCs for scientific applications when idle. It has been remarkably successful, managing over half a million nodes and over 30 scientific research projects since its release in 2004. However, the target applications for BOINC and CONDOR are generally limited to master-slave or bag-of-tasks parallelism.
This material is based upon the work supported by the National Science Foundation’s Computer Systems Research program under Award No. CNS-0834750 and CNS0834807. Any opinions, findings, and conclusions or recommendations expressed in this material are those of the authors and do not necessarily reflect the views of the National Science Foundation.
G.R. Gao et al. (Eds.): LCPC 2009, LNCS 5898, pp. 1–15, 2010. Springer-Verlag Berlin Heidelberg 2010
2
N. Kanna et al.
Idle desktops represent a potentially immense but volatile resource, i.e., they are heterogeneous and their availability to guest scientific applications can change suddenly and frequently based on the desktop owner’s actions. Execution of communicating parallel applications on volatile nodes is extremely challenging because process failures and slowdowns are frequent and the failure or slowdown of a single process impacts the entire application. Hence, a mechanism for fault tolerance is practically a requirement. If a checkpoint-restart approach is used, the checkpoints must be taken independently and asynchronously, because of the potential overhead of global synchronization. For this approach, the communication framework must be able to respond to communication requests during recovery, which are duplicate requests corresponding to a communication operation that was executed in a past state of program execution. Similar functionality is required when redundancy is employed for fault tolerance, an approach that becomes more attractive in high failure scenarios [3]. With redundancy, multiple physical processes in different states co-exist for a single logical process, and the communication framework must be able to respond to redundant communication requests from process replicas. Implementation of MPI style message passing, the dominant paradigm for parallel programming today, is problematic in such scenarios because of the synchronous nature of message transfers. Put/Get style asynchronous communication pioneered by Linda [4] is potentially a good fit for communication on volatile nodes as it provides an abstract global shared space that processes can use for information exchange without a temporal or spatial coupling. However redundant processes or asynchronous recovery from checkpoints is not supported in existing systems that provide an abstract global shared space. This paper introduces the Volpex dataspace API for anonymous Put/Get style communication among processes. The API and its execution model can support common message passing and shared memory programming styles. The key additional requirement is correct and efficient support for multiple physical Put/Get requests corresponding to a single logical Put/Get request. All requests corresponding to a unique logical request are satisfied in the same manner with identical data objects. This allows support of checkpointed or redundant execution with the final results guaranteed to be identical to (one of the possible) results with normal execution. Management of checkpointing and redundancy is orthogonal to this process; the communication infrastructure does not need to be informed whether, and to what extent, redundancy or checkpoint-restarts are being employed. This dataspace communication API is a component of the Volpex framework (Parallel Execution on Volatile nodes) that attempts to achieve seamless forward application progress in the presence of routine failures by employing redundancy and checkpointing. The primary goal is to transform ordinary PCs into virtual clusters to run a variety of parallel codes. However, the methods developed in the paper are potentially applicable to other scenarios also, such as employing unused process cores to run replicas to improve reliability. The paper presents the design, execution model, implementation, and preliminary results for the Volpex dataspace API. The communication framework is being integrated with BOINC
A Communication Framework for Fault-Tolerant Parallel Execution
3
framework for volunteer computing and target applications include Replica Exchange Molecular Dynamics and MapReduce.
2
Related Work
Idle desktops are widely used for parallel and distributed computing. The Berkeley Open Infrastructure for Network Computing (BOINC) [2] is a middleware system widely used for volunteer computing where people donate the use of their computers to help scientific research. Condor [1] is a workload management system that can effectively harness wasted CPU power from otherwise idle desktop workstations. Other systems that build desktop computing grids include Entropia [5], iShare [6], and OurGrid [7]. Mechanisms applied for fault tolerance in PC grids, such as redundancy in BOINC and checkpointing in Condor [8] are important for long running sequential and bag-of-task codes, but are generally not sufficient for communicating parallel programs. Linda [4] has been an active research topic for over two decades. It represents a model of coordination and communication among parallel processes based on logically global associative memory, called a tuplespace, in which processes store and retrieve tuples. There are a number of variants of Linda available, such as TSpaces [9], JavaSpaces [10], and SALSA [11], a Linda adaptation for molecular dynamics applications. There has been considerable work in fault tolerance in Linda, but it has largely focused on making the Linda tuplespace itself resilient to failure. A replication based fault tolerant implementation of Linda tuplespace is discussed in [12]. FTLinda [13] provides a stable tuple space that persists across failures and atomic tuple space transactions that allow development of some types of fault tolerant applications. PLinda [14] provides transactional mechanisms to achieve atomic operations and process-private logging that processes can utilize for checkpoint-restart mechanisms. We have employed some of the ideas, in particular, atomic operations. However, none of these (and other) frameworks provide transparent processing of arbitrary replicated communication requests. This paper reports on development of this functionality, which can be employed to support replicated processes or to service communication requests of processes restarted from local checkpoints. Several implementations of the MPI specification have focused on deploying fault-tolerance mechanisms. The projects either rely on extending the MPI specification to define the state of MPI objects in case of process failures, e.g. the FT-MPI [15] library. Alternatively they use replication techniques, e.g., in MPI/FT library [16] or integrate some form of checkpoint-restart mechanism in order to provide transparent fault-tolerance to MPI applications. MPICH-V [17], a representative of the last category mentioned, is based on uncoordinated checkpointing and pessimistic message logging. The library stores all communications of the system on reliable media through the usage of a channel memory. Volpex MPI [18], developed as part of our work, uses a similar approach but employs sender based logging and supports replicated processes.
4
3
N. Kanna et al.
Dataspace Programming Model
The programming model we have developed consists of independent processes communicating through an abstract dataspace. An important consideration was that the execution model allow seamless execution with multiple and varying number of instances of each process. The design of the dataspace was driven by simplicity and ease of implementation with redundancy. We first present the current dataspace API syntax and semantics. Subsequently, we justify the design decisions made and make a case for some changes in the future. 3.1
Dataspace API
The core API for the Volpex dataspace communication library consists of calls to add, read and remove data objects to/from an abstract global dataspace, with each object identified by a unique tag which is an index into the dataspace. The concept of a dataspace is similar to that of a tuplespace in Linda. The main communication calls are as follows: Volpex put(tag, data) A Volpex put call writes the data object data into the abstract dataspace identified with tag. Any existing data object with the same tag is overwritten. Volpex read(tag) A Volpex read call returns the data object that matches the tag in the dataspace. Volpex get(tag) A Volpex get call returns the data object that matches the tag in the dataspace, and then removes that data object from the dataspace. Volpex read and Volpex get calls are identical except that Volpex get also clears the matched data object from the dataspace. Both Volpex read and Volpex get are blocking calls: if there is no matching data object in the dataspace, the calls block until a matching data object is added to the dataspace. A Volpex put call only blocks until the operation is completed. Additional calls are available in the API to retrieve the process Id and the the number of processes, and to initialize and terminate communication with the dataspace server. The full API is outlined in Table 1. 3.2
API Design Considerations
The set of calls in the dataspace API is minimal but is sufficient to simulate basic message passing and shared memory style communication. A data object can be read multiple times until it is removed, and data objects can be overwritten, allowing shared memory style programming. Read and get operations block when no object with a matching tag exists and the get operation clears a data object. These can be used to provide various flavors of synchronization, e.g., barriers, blocking receives, and shared queues for dynamic distribution of work.
A Communication Framework for Fault-Tolerant Parallel Execution
5
Table 1. Volpex dataspace communication API int volpex put (const char* tag, int tagSize, const void* data, int dataSize) int volpex get (const char* tag, int tagSize, void* data, int dataSize) int volpex read (const char* tag, int tagSize, void* data, int dataSize) int volpex getProcId (void) int volpex getNumProc(void) int volpex init(int argc, char* argv[]) void volpex finalize(void) tag tagSize data dataSize volpex put() volpex read() volpex get() volpex getProcId() volpex getNumProc() volpex init() volpex finalize()
Identifies each data object in the dataspace Number of bytes of tag Pointer to data object being read/written Number of bytes of data Writes data object with the tag value Retrieves data object matching tag Retrieves & deletes data object matching tag Returns process Id of the current process Returns total number of application processes Initialize and connect with dataspace server Releases all resources and terminates
The dataspace API is different from Linda in some ways. 1. Single tag: The parameters for dataspace API calls are a data object and a tag. Data matching is based on a designated tag and associative matching across multiple tuples is not supported. This decision was made for simplicity and efficiency of implementation, without, in our experience, significantly affecting programmability. Our implementation does provide a set of helper functions to generate a unique tag from a set of tuples. 2. Blocking read calls: The read and get calls in the dataspace API are blocking. It is well understood that support for blocking calls is essential to support coordination across processes. Non-blocking calls, where a get or read returns with no action if no matching object exist, make programming easier in some contexts. An example is a master-worker scenario where a worker checks multiple queues for tasks to execute. Additional non-blocking read/get calls can be supported with redundancy, and are being considered as an extension of this work. 3. Single assignment puts: Some languages allow a data object to be written only once and not overwritten. This has some desirable properties from software design and implementation perspectives. However, re-assigning to the same tag is essential to easily simulate unstructured shared memory programs. Hence, a multiple assignment model was selected. 4. Process creation: There is no support for process creation analogous to Linda eval call as process creation and management is done externally.
6
4
N. Kanna et al.
Execution Model
The basic semantics of the communication operations are straightforward as listed in the discussion of the API above. However, managing redundant communication requests is a significant challenge. The key problem is that a logical call (with side effects) may be executed repeatedly or executed at a time when the state of the dataspace is not consistent with normal execution. For example, what action should be taken if a late running process replica issues a get or read for which the logically matching data object is not available in the dataspace anymore, either because they were removed by another get or overwritten by another put ? The guiding principle for the execution model is that the execution results with redundant communication calls must be consistent with normal execution. We will refer to execution with replicated/redundant communication calls, due to process redundancy or process checkpoint-restarts, simply as redundant execution, for brevity. If the parallel application is deterministic, then normal and redundant executions should give the same results. If the parallel application is non-deterministic, then redundant execution will return one possible result of a normal execution without replication. The major components of the execution model are the following: 1. Atomicity rule: The basic put/read/get operations are atomic and executed in some global serial order. 2. Single put rule: When multiple replicas of a process issue a Volpex put, the first writer accomplishes a successful operation. Subsequent corresponding Volpex put operations are ignored. 3. Identical get rule: The first replica issuing a Volpex get or a Volpex read receives the value stored at the time in the dataspace. Subsequently, replicas of the corresponding Volpex get or Volpex read receive the same value, independent of the time they are executed. The execution model can be illustrated as follows. The process instances that execute the first instance of a logical communication call create a leading front of execution representing normal execution without redundancy. The execution model ensures that a) the trailing replica communication calls have no sideeffects (single put rule), hence they cannot cause incorrect execution of leading replicas by corrupting the dataspace and b) trailing replica communication calls are guaranteed to receive the same data objects for read and get calls as the corresponding first communication calls (identical get rule). All process instances execute identically as the effect of communication calls on the processes is identical irrespective of their execution time and application state at that time. Execution proceeds seamlessly in case of process failures, so long as at least one instance of each process exists or is re-created from a checkpoint. The fundamental result that we have developed informally is as follows:
A Communication Framework for Fault-Tolerant Parallel Execution
7
Lemma 1. Consider a program with multiple sequential processes communicating exclusively with Volpex dataspace API. Assume that the communication implementation follows the atomicity, single put, and identical get rules. Then any result produced by redundant execution is identical to one of the possible results of normal execution. As discussed earlier, redundancy may be caused by explicit replicated processes or independent checkpoint-restarts of processes. An implicit assumption is that the program does not cause external side effects, e.g., as a result of file or network I/O. It is also assumed that there is no non-repeatable program behavior, e.g., due to bugs or use of a random number generator. No redundancy or checkpointrestart scheme can work without these conditions. However, non deterministic programs are allowed; results with redundancy are one of the possible results of non deterministic execution. A formal proof is omitted for brevity but is straightforward.
5
Implementation
We first present the basic dataspace server design and then discuss major design and implementation issues and choices. 5.1
Dataspace Server Design
The implementation of the Volpex dataspace API must conform to the execution semantics discussed in Section 3. The atomicity rule is satisfied by a single threaded server that processes one client request at a time. In order to satisfy the single put and identical get rules of the execution model, additional machinery is needed. Each logical communication call (put, get, or read) is uniquely identified by the pair: (process id, request number), where request number is the current count in the sequence of requests from a process. When a communication call is issued by a process, the process id and request number are appended to the message sent to the dataspace server to service the request. For replicated calls corresponding to the same logical call, the (process id, request number) pairs are identical. This allows the identification of a new call and subsequent replicated calls. The server implementation maintains the current request number for each process, which is the highest request number served for that process so far. The server also maintains two logically different pools of storage as shown in Figure 1. – Dataspace table: This storage consists of the logically “current” data objects indexed with tags. – Read log buffer: This storage consists of data objects recently delivered from the dataspace server to processes in response to get and read calls. Each object is uniquely identified by (process id, request number). When a communication API call is executed in a process, a message is sent to the dataspace server consisting of the type and parameters of the call and (process id, request number) information. A request handler at the server services the call as follows:
8
N. Kanna et al.
– Put: If the request number of the call is greater than the current request number for the process (a new put), the data objected indexed with the tag is added to the dataspace table. If the request number of the call is less than or equal to the current request number (a replica put for which the data object must already exist on the server), no action is taken. – Get or Read: If the request number of the call is greater than the current request number for the process (a new get), then i) the data object matching the tag is returned from the dataspace storage, and ii) a copy of the data object is placed in the read log buffer indexed with (process id, request number). Additionally if the call is a get, the data object is deleted from the dataspace table (but retained in the read log buffer). If the request number of the call is less than or equal to the current request number (a replica get for which the data object must exist in the read log), the data object matching (process id, request number) is returned from the read log buffer. The design of the dataspace server is illustrated in Figure 1.
# %
%&#' % % (&)* %
.&#' % %
# $
/&)* % 0 1"
,
$ +
-
! " #
Fig. 1. Volpex dataspace server design
5.2
Optimistic Logging
The design presented in section 5.1 is based on pessimistic logging. Each time a data object is delivered as a result of a get or a read call, the data object is copied to the log. Future replicas of that call are returned objects from the log. An optimistic approach to logging minimizes copying by taking advantage of the fact that a copy to the log is only necessary if the location with the corresponding tag is overwritten. Hence the following procedure is followed. When an object is delivered from the dataspace in response to a read, the corresponding object is only flagged as having been read. The same action is taken on a get, except that the object is flagged as logically removed (but not actually removed). When the object is overwritten with a put, only then the data object is copied to the log before being overwritten. The replica read and get calls are directed appropriately to the main dataspace or the log space.
A Communication Framework for Fault-Tolerant Parallel Execution
9
Optimistic logging can lead to significant saving in memory and copying overhead. In particular, if a tag is never overwritten, not an uncommon scenario in our experience, no logging is necessary at all. If a data object is read by multiple processes, the memory saving can be proportional to the number of processes. However, the logic for directing a read or get request correctly is somewhat more complex with optimistic logging. Our current implementation is based on pessimistic logging, and we are developing an optimistic logging based implementation. 5.3
Log Buffer Management
An important consideration in the design of a dataspace server is how long should an object be retained in the read log buffer? In theory a replica or checkpoint restarted process can be arbitrarily out of date with the current state of execution, and hence clearing any old object from the log buffer can cause a communication operation to fail. In practice, a very old copy is unlikely to be a factor in application progress and robustness. The current dataspace server has a circular read log buffer whose size is specified as a parameter during initialization. When the buffer is full, the oldest entry is deleted. One scheme that is being implemented relies on the use of disk storage for older entries, prior to deletion. Since the dataspace server implicitly tracks the status of all process replicas, there is room for more sophisticated implementations. For instance, a read buffer log entry could be retained until a fixed number of replicas have accessed the object. In the case of usage of checkpointing, log entries can be deleted once a process checkpoint generation ensures that older log entries will not be needed even in the case of a process failure. 5.4
Distributed and Multithreaded Implementations
The current dataspace server is a single-threaded server which multiplexes between various requests from the clients. The design allows a distributed implementation by partitioning the abstract global address space whereby each process or thread has exclusive access to a part of the tag address space. The design for a multithreaded implementation, where threads can service arbitrary requests but ensure consistency, has been developed based on similar Linda implementations. As long as concurrent threads are working on independent tags, the only requirement is atomic access to data structures in some cases, such as lists in the log buffers. 5.5
Implementation Framework
Our communication library is built on C/C++ using TCP Sockets. The data provided by the processes is stored in-memory. The tag and data objects are stored in the form of a hash table indexed with tags. The read log buffer is implemented as a combination of hash table and lists. All data transfers are realized as one-way communication initiated by the client processes. The clients establish a connection with the dataspace server using TCP-Sockets before performing any
10
N. Kanna et al.
operations. This connection is retained until all the operations on the dataspace are completed. If the connection is interrupted, processes try to reestablish the connection with the server in exponentially increasing time intervals. 5.6
Integration with BOINC
The BOINC middleware is widely used for distributed scientific computing. with a bag of tasks programming model. BOINC runs well on volatile nodes, because it offers a combination of application-level checkpointing and redundancy to handle failure and computation errors. However, the BOINC platform does not support communicating parallel programs. This project has leveraged BOINC for management of task distribution and redundancy on volatile nodes, while applying the Volpex dataspace API for intertask communication. When an application is compiled, it is linked with the BOINC and Volpex libraries. The BOINC redundancy mechanism is employed to create the desired degree of process replication. However, we currently use BOINC to provide some services while complete integration with BOINC is ongoing.
6
Usage and Results
The Volpex dataspace communication library has been implemented and deployed. Experimentation and validation was done on compute clusters as well as ordinary desktops that constitute a “Campus BOINC” installation at University of Houston. Results are presented for clients on a compute cluster for repeatability of experiments. The dataspace framework has been employed to develop a variety of benchmarks and codes, listed as follows: 1. Latency and bandwidth microbenchmarks. 2. Sieve of Erastothenes (SoE), a well known algorithm for finding prime numbers. The dataspace API was used to broadcast a new prime number to all processes in the parallel implementation. 3. Parallel Sorting by Regular Sampling (PSRS), a well known sorting algorithm. The dataspace API was used for all-to-all communication in the algorithm. 4. Replica Exchange for Molecular Dynamics (REMD), a real world application used in protein folding research [19]. Each node runs a piece of molecular simulation at a different temperature using the AMBER program [20]. At certain time steps, temperature data is exchanged between neighboring nodes based on the Metropolis criterion, in case a given parameter is less than or equal to zero. In our implementation of this code, the dataspace API is used to i) store process-temperature mapping, ii) synchronization of the processes at the end of each step, iii) identification and retrieval of energy values from neighboring processes, and iv) swapping of temperatures between processes when needed.
A Communication Framework for Fault-Tolerant Parallel Execution
11
5. MapReduce, a framework for distributed computing from Google. Dataspace is used as the intermediary for data exchange between the processors executing the Map and Reduce phases. In all cases, fault tolerance was achieved by replicating the computation processes on independent nodes. A full discussion of code development and performance analysis is beyond the scope of this paper, but we discuss sample results from latency/bandwidth benchmarks and SoE code. The codes were executed on the “Atlantis” cluster which has Itanium2 1.3GHz dual core nodes with 4GB of memory running RHEL (5.1). The dataspace server was running on AMD Athlon 2.4GHz dual core with 2GB of memory running Fedora Core 5. The server and client nodes were on different subnets that are part of a 100Mbps LAN on UH campus. 6.1
Benchmarking of API Calls
In the first set of experiments, we recorded the time taken to execute the different API calls by the client with varying message sizes and varying degree of replication. The effective bandwidth delivered by the server in response to put operations is presented in Figure 2(a). Note that the bandwidth presented is the aggregate bandwidth delivered by the server in response to all clients in case of replication. We observe that the general bandwidth trend is typical of this 100Mbps LAN environment. The effective bandwidth increases with the size of the data object but flattens out around 12MBytes/sec (or 96Mbps), which is just below the network capacity of 100Mbps. Hence, the system overhead is not significant. The figure also shows the effective bandwidth with 2 and 4 replicated processes. A slight reduction in delivered bandwidth is visible for midrange of message sizes. It is instructive to recall how replicated put operations work. The first put actually transfers the data object over the network, and replica put calls
(a) PUTs
(b) GETs
Fig. 2. Aggregate Bandwidth for PUT and GET operations with and without replicas
12
N. Kanna et al.
are returned without any data transfer. Thus, the total network traffic does not increase significantly with replication. Hence, it is not surprising that the effective bandwidth delivered by the server is not significantly affected. The slight reduction is attributed to the overhead of processing of put calls from other replicas. The results for the delivered bandwidth for get operations are presented in Figure 2(b). We omit the results for read operation as they are virtually identical to those for the get operation. Without replication, the performance of get operations is very similar to the performance of put operations and the same discussion applies. However, the behavior with replicas is very different for get operations. It is instructive to recall that replicated get operations are handled very differently from put operations. Each replicated get call leads to the entire data object being transferred from the server to a client replica. Hence, for a degree of replication of k the network traffic for get calls increases by a degree of k while it remains unchanged for put operations. Figure 2(b) shows that the aggregate bandwidth delivered by the server increases significantly with replication except for very high message sizes where the bandwidth is (probably) limited by the network capacity. The server is able to register a higher bandwidth as 2 and 4 replicas imply that the aggregate rate at which the data is being demanded by the clients increases by a factor of 2 and 4, respectively. 6.2
Sieve of Erastothenes
We study the the SoE program to gain more understanding of performance aspects of employing dataspace for computing. In SoE, prime numbers are identified by eliminating the multiples of discovered prime numbers. In the parallel implementation, a newly identified prime number has to be broadcast to all processors for elimination of all its multiples. For broadcast with the dataspace API, one process executes a put while all other processes issue a read. A communication optimized version of this program was also developed where a broadcast is done only after a block of prime numbers are discovered. This optimization reduces the number of messages sent by a factor equal to blocksize. The SoE program was executed to discover primes up to 8 billion, without blocking (blocksize = 1) and with a blocksize of 10. The results are shown in Figure 3. We observe from Figure 3(a) that the blocked version of SoE scales well up to 128 processors, while the version without blocking scales only up to 32 processors. Frequent broadcast of a single number makes the algorithm without blocking highly latency sensitive, and the dataspace implementation on a LAN does exhibit higher latency than, say, a dedicated cluster. Figure 3(b) shows performance with replication for fault tolerance. Even though a redundancy of 2 is generally sufficient, the execution time was measured with the number of replicas varied from 1 to 5 to gain more insight. Without blocking we see a steady linear increase in execution time by about 20 seconds with each added level of redundancy, which is about half of the execution time without replication. The performance of the blocked version is not affected by replication. The reason is that
A Communication Framework for Fault-Tolerant Parallel Execution
(a) Scalability
13
(b) Replication
Fig. 3. Performance of Sieve of Eratosthenes (SoE)
the unblocked SoE is a communication intensive application where the dataspace server becomes the bottleneck. The execution time increases as increased read requests from the replicas overwhelm the dataspace server. While this experiment is designed to stress the dataspace server, it does indicate the need for distributed and multithreaded implementations for increasing the throughput. 6.3
Failure Behavior
We also evaluated the impact of node failures on application performance with replicas. In all cases, the application execution time was not negatively impacted with failure of some replicas. In fact failure sometimes led to a slight improvement in performance as it leads to a reduction of communication traffic. Of course, multiple failures will eventually cause the application to fail. Results are omitted for brevity but available in [21].
7
Conclusions
This paper introduces the Volpex dataspace API that allows efficient Put/Get operations on an abstract global shared memory. An innovative communication model and implementation ensure consistent execution results in the presence of multiple asynchronous invocations of a single communication call due to the employment of checkpoint-restart or redundancy for fault tolerance. The target of this research is the Volpex execution environment that aims to support efficient execution of communicating parallel programs on volatile idle desktops. The example codes developed demonstrate that the framework can be employed for diverse applications. The overhead of the dataspace framework is low and it delivers the performance and scalability expected on LAN connected nodes, while enabling significant protection against failures.
14
N. Kanna et al.
While dedicated compute clusters will always be preferable for many latency sensitive applications, other loosely coupled parallel applications can gain reasonable performance on ordinary desktops. However, the volatility of desktops is a central problem and the current usage of volatile desktops is limited to embarrassingly parallel (or bag of tasks) or master-slave applications. We believe this work expands the realm of computing on idle desktops to a much larger class of parallel applications. If a substantial fraction of HPC applications could be executed on shared desktops, the impact will be significant as the clusters can be dedicated to latency sensitive applications that they are designed for. Finally, the dataspace framework is motivated by computing on idle desktops but it can be applied to clusters and other computing environments to increase robustness.
References 1. Thain, D., Tannenbaum, T., Livny, M.: Distributed computing in practice: the Condor experience. Concurrency - Practice and Experience 17(2-4), 323–356 (2005) 2. Anderson, D.P.: BOINC: A system for public-resource computing and storage. In: GRID 2004: Proceedings of the 5th IEEE/ACM International Workshop on Grid Computing, Washington, DC, USA, pp. 4–10. IEEE Computer Society, Los Alamitos (2004) 3. Zheng, R., Subhlok, J.: A quantitative comparison of checkpoint with restart and replication in volatile environments. Technical Report UH-CS-08-06, University of Houston (June 2008) 4. Carriero, N., Gelernter, D.: The S/Net’s Linda kernel. ACM Trans. Comput. Syst. 4(2), 110–129 (1986) 5. Kondo, D., Taufer, M., Brooks, C., Casanova, H., Chien, A.: Characterizing and evaluating desktop grids: an empirical study. In: Proceedings. 18th International Parallel and Distributed Processing Symposium, April 2004, p. 26– (2004) 6. Ren, X., Eigenmann, R.: iShare - Open internet sharing built on peer-to-peer and web. In: European Grid Conference, Amsterdam, Netherlands (February 2005) 7. Cirne, W., Brasileiro, F., Andrade, N., Costa, L., Andrade, A., Novaes, R., Mowbray, M.: Labs of the world, unite!!! Journal of Grid Computing 4(3), 225–246 (2006) 8. Litzkow, M., Tannenbaum, T., Basney, J., Livny, M.: Checkpoint and migration of UNIX processes in the Condor distributed processing system. Technical Report UW-CS-TR-1346, University of Wisconsin - Madison Computer Sciences Department (April 1997) 9. http://www.almaden.ibm.com/cs/tspaces/ 10. Noble, M.S., Zlateva, S.: Scientific computation with javaspaces. In: Hertzberger, B., Hoekstra, A.G., Williams, R. (eds.) HPCN-Europe 2001. LNCS, vol. 2110, pp. 657–666. Springer, Heidelberg (2001) 11. Zhang, L., Parashar, M., Gallicchio, E., Levy, R.M.: Salsa: Scalable asynchronous replica exchange for parallel molecular dynamics applications. In: ICPP 2006: Proceedings of the 2006 International Conference on Parallel Processing, Washington, DC, USA, pp. 127–134. IEEE Computer Society, Los Alamitos (2006) 12. Xu, A., Liskov, B.: A design for a fault-tolerant, distributed implementation of Linda. In: Proc. Nineteenth International Symposium on Fault-Tolerant Computing (FTCS-19), Chicago, IL (June 1989)
A Communication Framework for Fault-Tolerant Parallel Execution
15
13. Bakken, D.E., Schlichting, R.D.: Supporting fault-tolerant parallel programming in Linda. IEEE Transactions on Parallel and Distributed Systems 6(3), 287–302 (1995) 14. Jeong, K., Shasha, D.: PLinda 2.0: A transactional/checkpointing approach to fault tolerant Linda. In: Proceedings of the 13th Symposium on Reliable Distributed Systems, Dana Point, CA, USA, pp. 96–105 (1994) 15. Fagg, G.E., Gabriel, E., Chen, Z., Angskun, T., Bosilca, G., Pjesivac-Grbovic, J., Dongarra, J.J.: Process fault-tolerance: Semantics, design and applications f or high performance computing. International Journal of High Performance Computing Applications 19, 465–477 (2005) 16. Batchu, R., Neelamegam, J.P., Cui, Z., Beddhu, M., Skjellum, A., Dandass, Y.: MPI/FT: Architecture and taxonomies for fault-tolerant, message-passing middleware for performance-portable parallel computing. In: Proceedings of the 1st IEEE International Symposium of Cluster Computing and the Grid, pp. 26–33 (2001) 17. Bouteiller, A., Cappello, F., Herault, T., Krawezik, G., Lemarinie, R.P., Magniette, F.: MPICH-V2: A fault tolerant MPI for volatile nodes based on pessimistic sender based message logging. In: SC 2003: Proceedings of the 2003 ACM/IEEE conference on Supercomputing, Washington, DC, USA, p. 25. IEEE Computer Society, Los Alamitos (2003) 18. LeBlanc, T., Anand, R., Gabriel, E., Subhlok, J.: VolpexMPI: an MPI Library for Execution of Parallel Applications on Volatile Nodes. In: Ropo, M., Westerholm, J., Dongarra, J. (eds.) Euro PVM/MPI 2009. LNCS, vol. 5759, pp. 124–133. Springer, Heidelberg (2009) 19. Sugita, Y., Okamoto, Y.: Replica-exchange molecular dynamics method for protein folding. Chemical Physics Letters 314, 141–151 (1999) 20. Case, D., Pearlman, D., Caldwell, J.W., Cheatham, T., Ross, W., Simmerling, C., Darden, T., Merz, K., Stanton, R., Cheng, A.: Amber 6 Manual (1999) 21. Kanna, N.: Inter-task communication on volatile nodes. Master’s thesis, University of Houston (December 2009)
The STAPL pList Gabriel Tanase, Xiabing Xu, Antal Buss, Harshvardhan, Ioannis Papadopoulos, Olga Pearce, Timmie Smith, Nathan Thomas, Mauro Bianco, Nancy M. Amato, and Lawrence Rauchwerger Parasol Lab, Dept. of Computer Science and Engineering, Texas A&M University {gabrielt,xiabing,abuss,ananvay,ipapadop,olga,timmie}@cse.tamu.edu, {nthomas,bmm,amato,rwerger}@cse.tamu.edu
Abstract. We present the design and implementation of the stapl pList, a parallel container that has the properties of a sequential list, but allows for scalable concurrent access when used in a parallel program. The Standard Template Adaptive Parallel Library (stapl) is a parallel programming library that extends C++ with support for parallelism. stapl provides a collection of distributed data structures (pContainers) and parallel algorithms (pAlgorithms) and a generic methodology for extending them to provide customized functionality. stapl pContainers are thread-safe, concurrent objects, providing appropriate interfaces (e.g., views) that can be used by generic pAlgorithms. The pList provides stl equivalent methods, such as insert, erase, and splice, additional methods such as split, and efficient asynchronous (non-blocking) variants of some methods for improved parallel performance. We evaluate the performance of the stapl pList on an IBM Power 5 cluster and on a CRAY XT4 massively parallel processing system. Although lists are generally not considered good data structures for parallel processing, we show that pList methods and pAlgorithms (p generate and p partial sum) operating on pLists provide good scalability on more than 103 processors and that pList compares favorably with other dynamic data structures such as the pVector.
1
Introduction and Motivation
Parallel programming is becoming mainstream due to the increased availability of multiprocessor and multicore architectures and the need to solve larger and more complex problems. To help programmers address the difficulties of parallel programming, we are developing the Standard Template Adaptive Parallel
This research supported in part by NSF Grants EIA-0103742, ACR-0081510, ACR0113971, CCR-0113974, ACI-0326350, CRI-0551685, CCF-0833199, CCF-0830753, by the DOE NNSA under the Predictive Science Academic Alliances Program by grant DE-FC52-08NA28616, by Chevron, IBM, Intel, HP, and by King Abdullah University of Science and Technology (KAUST) Award KUS-C1-016-04. This research used resources of the National Energy Research Scientific Computing Center, which is supported by the Office of Science of the U.S. Department of Energy under Contract No. DE-AC02-05CH11231.
G.R. Gao et al. (Eds.): LCPC 2009, LNCS 5898, pp. 16–30, 2010. c Springer-Verlag Berlin Heidelberg 2010
The STAPL pList
17
Library (stapl) [1,20,23]. stapl is a parallel C++ library with functionality similar to stl, the C++ Standard Template Library. stl is a collection of basic algorithms, containers and iterators that can be used as high-level building blocks for sequential applications. stapl provides a collection of parallel algorithms (pAlgorithms), parallel and distributed containers (pContainers), and views to abstract the data access in pContainers. These are the building blocks for writing parallel programs using stapl. An important goal of stapl is to provide a high productivity development environment for applications that can execute efficiently on a wide spectrum of parallel and distributed systems. pContainers are collections of elements that are distributed across a parallel machine and support concurrent access. stapl provides a unified approach for developing pContainers. It uses object-oriented technology to build distributed thread-safe containers that can easily be extended and customized. This approach allows us to provide a large variety of static and dynamic pContainers such as pArray [20], pMatrix[4], associative containers [21], pVector and pGraph. Contribution. In this paper, we present the design and implementation the stapl pList, a parallel container that has the properties of a sequential list, but allows for scalable concurrent access when used in a parallel program. In particular, the pList is a distributed doubly linked list data structure that is the stapl parallel equivalent of the stl list container. The pList provides thread safe, concurrent methods for efficient insertion and removal of elements from a collection, as well as splicing and splitting. The pList interface includes methods that are counterparts of the stl list such as insert, erase, and splice, additional methods such as split, and asynchronous (non-blocking) variants of some methods for improved performance in a parallel and concurrent environment. Lists have not been considered beneficial in parallel algorithms because they do not allow random access to its elements. Instead, they access elements through a serializing traversal of the list. In this paper, we will show how our pList offers the advantages of a classical list while providing almost random access to its components, thus enabling scalable parallelism. The pList allows certain classes of algorithms to use their most appropriate container, i.e., lists, instead of having to replace it with a more parallel, but less efficient one. We evaluate the performance of the pList on an IBM Power5 cluster and an Opteron-based CRAY XT supercomputer. We analyze the running time and scalability of different pList methods as well as the performance of different algorithms using pList as data storage. We also compare the pList to the pArray and pVector to understand the relative trade-offs of the various data structures. Our results show that the pList outperforms the pVector when there are a significant number of insertions or deletions.
2
Related Work
There has been significant research in the area of distributed and concurrent data structure development. Most of the related work is focused either on how to implement concurrent objects using different locking primitives or how to
18
G. Tanase et al.
implement concurrent data structures without locking, namely lock free data structures [11]. Valois [24] was one of the first to present a non-blocking singlylinked list data structure by using Compare&Swap (CAS) synchronization primitives rather than locks. The basic idea is to use auxiliary nodes between each ordinary node to solve the concurrency issues. Subsequent work [10,16,8,17] proposes different concurrent list implementations for shared memory architectures, emphasizing the benefits on non-blocking implementations in comparison with lock based solutions. In contrast, pList and the rest of the stapl pContainers are designed to be used in both shared and distributed memory environments. In stapl we focus on developing a generic infrastructure that will efficiently provide a shared memory abstraction for pContainers by automating, in a very configurable way, aspects relating to data distribution and thread safety. We use a compositional approach where existing data structures (sequential or concurrent) can be used as building blocks for implementing parallel containers. There are several parallel languages and libraries that have similar goals as stapl [2,3,6,9,15,18]. While a large amount of effort has been put into making array-based data structures suitable for parallel programming, more dynamic data structures that allow insertion and deletion of elements have not received as much attention. The PSTL (Parallel Standard Template Library) project [14] explored the same underlying philosophy as stapl of extending the C++ stl for parallel programming. They planned to provide a distributed list, but the project is no longer active. Intel Threading Building Blocks (TBB) [12] provide thread-safe containers such as vectors, queues and hashmaps for shared memory architectures, but they do not provide a parallel list implementation. Our work is further distinguished from TBB in that we target both shared and distributed memory systems. New parallel languages like Chapel [5] (developed by CRAY), X10 [7] (developed by IBM), and many others are all aiming to ease parallel programming and to improve productivity for parallel application development. However, most of these languages only provide high level constructs such as multi-dimensional arrays and facilities to specify the distribution of the arrays. A major difference between stapl and all these new programing languages is that stapl is a parallel programming library that is written in standard C++ thus making it compatible with existing applications.
3
stapl Overview
stapl consists of a set of components that includes pContainers, pAlgorithms, views, pRanges, and a runtime system. pContainers, the distributed counterpart of stl containers, are thread-safe, concurrent objects, i.e., shared objects that provide parallel methods that can be invoked concurrently. While all pContainers provide sequentially equivalent interfaces that are compatible with the corresponding stl methods, individual pContainers may introduce additional methods to exploit the parallel performance offered by the runtime system. pContainers have a data distribution manager that provides the programmer with a shared object view that presents a uniform access interface
The STAPL pList
19
Adaptive Framework
User Application Code pAlgorithms
Views pContainers pRange Run-time System
ARMI Communication Library
Scheduler
Executor
Performance Monitor
Pthreads, OpenMP, MPI, Native, ...
Fig. 1. stapl components
regardless of the physical location of the data. Elements in pContainers are not replicated. Thread-safety is guaranteed by providing mechanisms that guarantee all operations leave the pContainer in a consistent state. An important aspect of stapl components is their composability, e.g., we can construct pContainers of pContainers. This supports nested parallelism. pContainer data can be accessed using views which can be seen as generalizations of stl iterators that represent sets of data elements and are not related to the data’s physical location. views provide iterators to access individual pContainer elements. Generic parallel algorithms (pAlgorithms) are written in terms of views, similar to how stl algorithms are written in terms of iterators. The pRange is the stapl concept used to represent a parallel computation. Intuitively, a pRange is a task graph, where each task consists of a workfunction and a view representing the data on which the work function will be applied. The pRange provides support for specifying data dependencies between tasks that will be enforced during execution. The runtime system (RTS) and its communication library ARMI (Adaptive Remote Method Invocation [19,22]) provide the interface to the underlying operating system, native communication library and hardware architecture. ARMI uses the remote method invocation (RMI) communication abstraction to hide the lower level implementations (e.g., MPI, OpenMP, etc.). A remote method invocation in stapl can be blocking (sync rmi) or non-blocking (async rmi). When a sync rmi is invoked, the calling thread will block until the method executes remotely and returns its results. An async rmi doesn’t specify a return type and the calling thread only initiates the method. The completion of the method happens some time in the future and is handled internally by the RTS. ARMI provides the fence mechanism (rmi fence) to ensure the completion of all previous RMI calls. The asynchronous calls can be aggregated by the RTS in an internal buffer to minimize communication overhead. The buffer size and the aggregation factor impact the performance, and in many cases should be adjusted for the different computational phases of an application. The RTS provides locations as an abstraction of processing elements in a system. A location is a component of a parallel machine that has a contiguous memory address space and has associated execution capabilities (e.g., threads). A location can be identified with a process address space.
20
4
G. Tanase et al.
stapl pList
The linked list is a fundamental data structure that plays an important role in many areas of computer science and engineering such as operating systems, algorithm design, and programming languages. A large number of languages and libraries provide different variants of lists with C++ stl being a representative example. The stl list is a generic dynamic data structure that organizes the elements as a sequence and allows fast insertions and deletions of elements at any point in the sequence. The stapl pList is a parallel equivalent of the stl list with an interface for efficient insertion and deletion of elements in parallel. Analogous to stl lists, elements in a pList are accessed through iterators. All stl equivalent methods require a return value, which in general translates into a blocking method. For this reason, we provide a set of asynchronous methods, e.g., insert async and erase async. These non-blocking methods allow for better communication/computation overlap and enable the stapl RTS to aggregate messages to reduce the communication overhead [19]. Since there is no data replication, operations like push back and push front, if invoked concurrently, may produce serialization in the locations managing the head and the tail of the list. For this reason we added two new methods to the pList interface, push anywhere and push anywhere async, that allow the pList to insert the element in an unspecified location in order to minimize communication and improve concurrency. Part of the interface of pList is shown in Table 1. 4.1
pList Design and Implementation
stapl provides a uniform approach for developing parallel containers by specifying a set of base concepts and a common methodology for the development of thread-safe, concurrent data structures that are extendable and composable. From the user perspective, a parallel container is a data structure that handles a collection of elements distributed on a parallel machine. A task executing within a location sees the pContainer as a shared data structure where elements can be referenced independent of their physical location. Internally, the pContainer distributes its data across a number of locations in pieces of storage called components. To provide a shared object view, the pContainer associates a Global Unique Identifier (GID) with every element and uses a Data Distribution Manager module to keep track of where the elements corresponding to individual GIDs are located. The Distribution Manager in turn uses a partition to decide the component in which each element in the pContainer is stored, and a partition-mapper to determine the location where a component is allocated. In the following, we provide a more detailed description of the functionality of these modules in the context of the pList, but they are general and apply to other pContainers. Component. The pContainer component is the basic storage unit for data. For the stapl pList, we use the stl list as the component. Similarly, for other pContainers, the components are extensions of stl containers or other
The STAPL pList
21
Table 1. Representative methods of the pList container pList Interface Description Collective Operations (must be called by all locations) p list(size t N, const T& value = T()) Creates a pList with N elements, each of which is a copy of value. p list(size t N, partition type& ps) Creates a pList with N elements based on the given partition strategy. void splice(iter pos, pList& pl); Splice the elements of pList pl into the current list before the position pos. Non-collective Operations size t size() const Returns the size of the pList. bool empty() const True if the pList’s size is 0. T& [front|back]() Access the first/last element of the sequence. void push [front|back](const T& val) Insert a new element at the beginning/end of the sequence. void pop [front|back]() Remove the first element from the beginning/end of the sequence. iterator insert(iterator pos, const T& val) Insert val before position pos and return the iterator to the new inserted element. void insert async(iterator pos, const T& val) Insert val before pos with no return value. iterator erase(iterator pos) Erases the element at position pos and returns the iterator pointing to the new location of the element that followed the element erased. void erase async(iterator pos) Erases the element at position pos with no return value. iterator push anywhere(const value type& Push val on to the last local component val) and return the iterator pointing to the new inserted element. void push anywhere async(const T& val) Push val on to the last local component with no return value.
available sequential data structures. Most pContainer methods will ultimately be executed on the component level using the corresponding method of the component. For example, pList insert will end up invoking the stl list insert method. The pList component can also be provided by the user so long as insertions and deletions never invalidate iterators, and that components provide the domain interface (see below). Additional requirements are relative to the expected performance of the methods (e.g., insertions and deletions should be constant time operations). Within each location of a parallel machine, a pList may store several components and the pList employs a location-manager module to allocate and handle them. The pList has the global view of all of the components and knows the order between them in order to provide a unique traversal of all its data. For this reason each component is identified by a globally unique component identifier (CID). For static or less dynamic – in terms of number of components
22
G. Tanase et al.
– pContainers such as pArray or associative containers, the component identifier can be a simple integer. The pList, however, needs a component identifier that allows for fast dynamic operations. During the splice operation, components from a pList instance need to be integrated efficiently into another pList instance while maintaining the uniqueness of their CIDs. For these reasons the CID for the pList components is currently defined as follows: typedef std::pair CID Global Identifiers (GID): In the stapl pContainer framework, each element is uniquely identified by its GID. This is an important requirement that allows stapl to provide the user with a shared object view. Performance and uniqueness considerations similar to those of the component identifier, and the list guarantee that iterators are not invalidated when elements are added or deleted lead us to use the following definition for the pList GID. typedef std::pair GID; Since the CID is unique, the GID is unique as well. With the above definition for GID the pList can uniquely identify each of its elements and access them independent of their physical location. Domain. In the stapl pContainer framework the domain is the universe of GIDs that identifies the elements. A domain also specifies an order that defines how elements are traversed by the iterators of pList. This order is specified by two methods: get first gid() which returns the first GID of the domain and get next gid(GID) which returns the next GID in the domain of the GID provided as argument. The domain interface for the pList is provided by the pList components. Data Distribution. The data distribution manager for a stapl pContainer uses a partition and a partition-mapper to describe how the data will be grouped and mapped on the machine. Given a GID the partition provides the CID containing the corresponding value. The partition-mapper extracts from the CID the location information, and, on that location, a location-manager extracts the actual component to obtain the address of the requested element. View. In the stapl framework, views are the means of accessing data elements stored in the pContainer within generic algorithms. stapl pAlgorithms are written in terms of views, similar to how stl generic algorithms are written in terms of iterators. The pList currently supports sequence views that provide an iterator type and begin() and end() methods. A view can be partitioned into sub-views. By default the partition of a pList view matches the subdivision of the list in components, thus allowing random access to portions of the pList. This allows parallel algorithms to achieve good scalability (see Section 5). pList Container. The pList class as well as any other pContainer in stapl has a distribution and a location-manager as its two main data members. The pList methods are implemented using the interfaces of these two classes. A typical implementation of a pList method that operates at the element level is
The STAPL pList
23
1 void plist::insert_async(iterator it, value_type val) 2 Location loc; 3 dist_manager.lookup(gid(it)) 4 CID = part_strategy.map(gid(it)) 5 loc = part_mapper.map(CID) 6 if loc is local 7 location_manager.component(CID).insert(gid(it)) 8 else 9 async_rmi (loc, insert_async(it, val));
Fig. 2. pList method implementation
included in Figure 2 to illustrate how the pContainer modules interact. The runtime cost of the method has three constituents: the time to decide the location and the component where the element will be added (Figure 2, lines 3-5), the communication time to get/send the required information (Figure 2, line 9), and the time to perform the operation on a component (Figure 2, line 7). The complexity of constructing a pList of N elements is O(M + log P ), where M is the maximum number of elements in a location. The log P term is due to a fence at the end of the constructor to guarantee the pList is in a consistent state. The complexities of the element-wise methods are O(1). Multiple concurrent invocations of such methods may be partially serialized due to concurrent threadsafe accesses to common data. The size and empty methods imply reductions and the complexity is O(log P ), while clear is a broadcast plus the deletion of all elements in each location, so the complexity is O(M + log P ). This analysis relies on the pList component to guarantee that allocation and destruction are linear time operations and size, insert, erase and push back/front are constant time operations. The pList also provides methods to rearrange data in bulk. These methods are splice and split to merge lists together and split lists, respectively. splice is a pList method whose signature is void pList::splice(iter pos, pList& pl [, iter it1, iter it2]); where iter stands for an iterator type. pos is an iterator of the calling pList, pl is another pList, and the optional iterators it1 and it2 are iterators pointing to elements of pl. splice removes from pl the portion enclosed by it1 and it2 and inserts it at pos. By default it1 denotes the begin of pl and it2 the end. The complexity of splice depends on the number of components included within it1 and it2. If it1 or it2 points to elements between components, then new components are generated in constant time using sequential list splice. Since the global begin and global end of the pList are replicated across locations, the operation requires a broadcast if either of them is modified. split is also a member method of pList that splits one pList into two. It is a parallel method that is implemented based on splice with the following signature: void pList::split(iterator pos, pList& other_plist) When pList.split(pos, other plist) is invoked, the part of pList starting at pos and ending at pList.end() is appended at the end of the other plist. The complexity of split is analogous to the complexity of splice.
24
G. Tanase et al.
5
Performance Evaluation
In this section, we evaluate the scalability of the parallel methods described in Section 4. We compare pList and pVector performance, evaluate some generic pAlgorithms (p generate and p partial sum) on pList, pArray and pVector, and evaluate an Euler tour implementation using pList. We conducted our experimental studies on two architectures: a 832 processor IBM cluster with p575 SMP nodes available at Texas A&M University (called P5-cluster) and a 38,288 processors Cray XT4 with quad core Opteron processors available at NERSC (called cray). In all experiments a location contains a single processor, and the terms can be used interchangeably. 5.1
pList Method Evaluation
In this section we discuss the performance of the pList methods and the factors influencing the running time. To evaluate the scalability of individual methods we designed the kernel shown in Figure 3. The figure shows push anywhere, but the same kernel is used to evaluate all methods. For a given number of elements N , all P available processors (locations) concurrently insert N/P elements. We report the time taken to insert all N elements globally. The measured time includes the cost of a fence call which, as stated in Section 3, is more than a simple barrier. 1 evaluate_performance(N,P) 2 tm = stapl::start_timer(); //start timer 3 //insert N/P elements concurrently 4 for(i=0; i