Network Programming Iv - Oopn

  • June 2020
  • PDF

This document was uploaded by user and they confirmed that they have the permission to share it. If you are author or own the copyright of this book, please report to us by using this DMCA report form. Report DMCA


Overview

Download & View Network Programming Iv - Oopn as PDF for free.

More details

  • Words: 4,300
  • Pages: 48
The ACE_Message_Block Class (1/2) MESSAGES BUFFERED FOR TRANSMISSION

MESSAGES IN TRANSIT

MESSAGES BUFFERED AWAITING PROCESSING

Motivation •Many networked applications require a means to manipulate messages efficiently, e.g.: •Storing messages in buffers as they are received from the network or from other processes •Adding/removing headers/trailers from messages as they pass through a user-level protocol stack •Fragmenting/reassembling messages to fit into network MTUs •Storing messages in buffers for transmission or retransmission •Reordering messages that were received out-of-sequence 1

The ACE_Message_Block Class (2/2) Class Capabilities •This class is a composite that enables efficient manipulation of messages via the following operations: •Each ACE_Message_Block contains a pointer to a reference-counted ACE_Data_Block which in turn points to the actual data associated with a message •It allows multiple messages to be chained together into a composite message •It allows multiple messages to be joined together to form an ACE_Message_Queue

2

•It treats synchronization & memory management properties as aspects

Two Kinds of Message Blocks

• Simple messages contain a • Composite messages contain multiple one ACE_Message_Block ACE_Message_Blocks • These blocks are linked together in accordance with • An ACE_Message_Block the Composite pattern points to an • Composite messages often consist of a control ACE_Data_Block message that contains bookkeeping information • An ACE_Data_Block • e.g., destination addresses, followed by one or points to the actual data more data messages that contain the actual payload contents of the message •ACE_Data_Blocks can be referenced counted 3

Using the ACE_Message_Block (1/2) •The following program reads all data from standard input into a singly linked list of dynamically allocated ACE_Message_Blocks •These ACE_Message_Blocks are chained together by their continuation pointers • Allocate an ACE_Message_Block int main (int argc, char *argv[]) whose payload is of size BUFSIZ { ACE_Message_Block *head = new ACE_Message_Block (BUFSIZ); ACE_Message_Block *mblk = head; • Read data from standard input into the message block starting at write pointer (wr_ptr ()) for (;;) { ssize_t nbytes = ACE::read_n (ACE_STDIN, mblk->wr_ptr (), mblk->size ()); if (nbytes <= 0) break; // Break out at EOF or error.

mblk->wr_ptr (nbytes); 4

• Advance

write pointer by the number of bytes read to end of buffer

Using the ACE_Message_Block (2/2) • Allocate

a new ACE_Message_Block of size BUFSIZ & chain it to the previous one at the end of the list

mblk->cont (new ACE_Message_Block (BUFSIZ)); mblk = mblk->cont (); • Advance mblk to point to the newly } allocated ACE_Message_Block // Print the contents of the list to the standard output. for (mblk = head; mblk != 0; mblk = mblk->cont ()) ACE::write_n (ACE_STDOUT, mblk->rd_ptr (), mblk->length ()); • For

every message block, print mblk->length() amount of contents starting at the read pointer (rd_ptr ())

• Can

also use ACE::write_n (head) to write entire chain…

head->release (); // Release all the memory in the chain. return 0; }5

ACE CDR Streams Motivation • Networked applications that send/receive messages often require support for • Linearization • To handle the conversion of richly typed data to/from raw memory buffers

•(De)marshaling • To interoperate with heterogeneous compiler alignments & hardware instructions with different byte-orders

• The ACE_OutputCDR & ACE_InputCDR classes provide a highly optimized, portable, & convenient means to marshal & demarshal data using the standard CORBA Common Data Representation (CDR) •ACE_OutputCDR creates a CDR buffer from a data structure (marshaling) 6 •ACE_InputCDR extracts data from a CDR buffer (demarshaling)

The ACE_OutputCDR & ACE_InputCDR Classes Class Capabilities •ACE_OutputCDR & ACE_InputCDR support the following features: • They provide operations to (de)marshal the following types: • Primitive types, e.g., booleans; 16-, 32-, & 64-bit integers; 8-bit octets; single & double precision floating point numbers; characters; & strings • Arrays of primitive types • The insertion (<<) and extraction (>>) operators can marshal & demarshal primitive types, using the same syntax as the C++ iostream components •ACE_Message_Block chains are used internally to minimize mem copies • They take advantage of CORBA CDR alignment & byte-ordering rules to avoid memory copying & byte-swapping operations, respectively • They provide optimized byte swapping code that uses inline assembly language instructions for common hardware platforms (such as Intel x86) & standard hton*()& ntoh*() macros/functions on other platforms • They support zero copy marshaling & demarshaling of octet buffers • Users can define custom character set translators for platforms that do not 7 use ASCII or Unicode as their native character sets

Sidebar: Log Record Message Structure ACE_Log_Record is a type that ACE uses internally to keep track of the fields in a log record

• This example uses a 8-byte, CDR encoded header followed by the payload • Header includes byte order, payload length, & other fields

class ACE_Log_Record { private: ACE_UINT type_; ACE_UINT pid_; ACE_Time_Value timestamp_; char msg_data_[ACE_MAXLOGMSGLEN]; public: ACE_UINT type () const; ACE_UINT pid () const; const ACE_Time_Value timestamp () const; const char *msg_data () const; }; 8

Using ACE_OutputCDR • We show the ACE CDR insertion (operator<<) & extraction (operator>>) operators for ACE_Log_Record that's used by client application & logging server int operator<< (ACE_OutputCDR &cdr, const ACE_Log_Record &log_record) { size_t msglen = log_record.msg_data_len (); // Insert each field into the output CDR stream. cdr << ACE_CDR::Long (log_record.type ()); cdr << ACE_CDR::Long (log_record.pid ()); cdr << ACE_CDR::Long (log_record.time_stamp ().sec ()); cdr << ACE_CDR::Long (log_record.time_stamp ().usec ()); cdr << ACE_CDR::ULong (msglen); cdr.write_char_array (log_record.msg_data (), msglen); return cdr.good_bit (); } 9

After marshaling all the fields of the log record into the CDR stream, return the success/failure status

Using ACE_InputCDR int operator>> (ACE_InputCDR &cdr, ACE_Log_Record &log_record) { ACE_CDR::Long type; Temporaries used during ACE_CDR::Long pid; demarshaling (not always ACE_CDR::Long sec, usec; necessary) ACE_CDR::ULong buffer_len; // Extract each field from input CDR stream into . if ((cdr >> type) && (cdr >> pid) && (cdr >> sec) && (cdr >> usec) && (cdr >> buffer_len)) { ACE_TCHAR log_msg[ACE_Log_Record::MAXLOGMSGLEN + 1]; log_record.type (type); log_record.pid (pid); log_record.time_stamp (ACE_Time_Value (sec, usec)); cdr.read_char_array (log_msg, buffer_len); log_msg[buffer_len] = '\0'; log_record.msg_data (log_msg); } return cdr.good_bit (); After demarshaling all the fields of the log record from the CDR stream, return the success/failure } status 10

Implementing the Client Application (1/6) •The following client application illustrates how to use the ACE C++ Socket wrapper facades & CDR streams to establish connections, marshal log records, & send the data to our logging server

11

•This example behaves as follows: 1.Reads lines from standard input 2.Sends each line to the logging server in a separate log record & 3.Stops when it reads EOF from standard input

class Logging_Client { Header file: “Logging_Client.h” public: // Send to the server. int send (const ACE_Log_Record &log_record); // Accessor method. ACE_SOCK_Stream &peer () { return logging_peer_; } // Close the connection to the server. ~Logging_Client () { logging_peer_.close (); } private: ACE_SOCK_Stream logging_peer_; // Connected to server. };

Implementing the Client Application (2/6) The Logging_Client::send() method behaves as follows: 1.Computes the size of the payload (lines 2 – 8) 2.Marshals the header & data into an output CDR (lines 10 – 16) & 3.Sends it to the logging server (lines 18 – 24) 1 int Logging_Client::send (const ACE_Log_Record &log_record) { 2 const size_t max_payload_size = 3 4 // type() 4 + 8 // timestamp 5 + 4 // process id 6 + 4 // data length 7 + ACE_Log_Record::ACE_MAXLOGMSGLEN // data 8 + ACE_CDR::MAX_ALIGNMENT; // padding; 9 10 ACE_OutputCDR payload (max_payload_size); 11 payload << log_record; 12 ACE_CDR::ULong length = payload.total_length (); 13 First marshal the payload to contain the linearized ACE_Log_Record 12

Implementing the Client Application (3/6) 4. Then marshal the header info that includes byte order & payload length 14 15 16 17 18 19 20 21 22 23 24 25 }

ACE_OutputCDR header (ACE_CDR::MAX_ALIGNMENT + 8); header << ACE_OutputCDR::from_boolean (ACE_CDR_BYTE_ORDER); header << ACE_CDR::ULong (length);

5. Construct an iovec of size 2 with header & payload info iovec iov[2]; iov[0].iov_base iov[0].iov_len iov[1].iov_base iov[1].iov_len

= = = =

header.begin ()->rd_ptr (); 8; payload.begin ()->rd_ptr (); length;

return logging_peer_.sendv_n (iov, 2);

6. Send entire message to logging server 13

Since TCP/IP is a bytestream protocol (i.e., without any message boundaries) the logging service uses CDR as a message framing protocol to delimit log records

Implementing the Client Application (4/6) 1 int main (int argc, char *argv[]) The Logging_Client 2 { 3 u_short logger_port = main program 4 argc > 1 ? atoi (argv[1]) : 0; 5 const char *logger_host = 6 argc > 2 ? argv[2] : ACE_DEFAULT_SERVER_HOST; 7 int result; 8 9 ACE_INET_Addr server_addr; 10 11 if (logger_port != 0) 12 result = server_addr.set (logger_port, logger_host); 13 else 14 result = server_addr.set ("ace_logger", logger_host); 15 if (result == -1) 16 ACE_ERROR_RETURN((LM_ERROR, 17 "lookup %s, %p\n", 18 logger_port == 0 ? "ace_logger" : argv[1], 19 logger_host), 1); 20 14

Sidebar: ACE Debugging & Error Macros • Consolidates printing of debug and error messages via a printf ()-like format e.g., ACE_DEBUG, ACE_ERROR (& their *_RETURN counterparts) that encapsulate the ACE_Log_Msg::log() method • Arguments are enclosed in a double set of parentheses to make it appear as one argument to the C++ preprocessor • First argument is the severity code; second one is a format string supporting a superset of printf() conversion specifiers Format

15

Action

%l

Displays the line number where the error occurred

%N

Displays the file name where the error occurred

%n

Displays the name of the program

%P

Displays the current process ID

%p

Takes a const char * argument and displays it and the error string corresponding to errno (similar to perror())

%T

Displays the current time

%t

Displays the calling thread’s ID

Implementing the Client Application (5/6) Use the ACE_SOCK_Connector wrapper façade to connect to the logging server 21 22 23 24 25 26 27 28 29 30 31 32

ACE_SOCK_Connector connector; Logging_Client logging_client; if (connector.connect (logging_client.peer (), server_addr) < 0) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect()"), 1); // Limit the number of characters read on each record. cin.width (ACE_Log_Record::MAXLOGMSGLEN);

Contents of the message to be sent to logging server are obtained from standard input 16

Implementing the Client Application (6/6) 33 for (;;) { 34 std::string user_input; 35 getline (cin, user_input, '\n'); 36 Create log_record 37 if (!cin || cin.eof ()) break; 38 39 ACE_Time_Value now (ACE_OS::gettimeofday ()); 40 ACE_Log_Record log_record (LM_INFO, now, 41 ACE_OS::getpid ()); 42 log_record.msg_data (user_input.c_str ()); 43 44 if (logging_client.send (log_record) == -1) 45 ACE_ERROR_RETURN ((LM_ERROR, 46 "%p\n", "logging_client.send()"), 1); 47 } Send log_record to logging server 48 49 return 0; // Logging_Client destructor 50 // closes TCP connection. 51 } 17

The Logging_Server Classes The figure below illustrates our Logging_Server abstract base class, the Logging_Handler class we'll describe shortly, & the concrete logging server classes that we'll develop in subsequent sections of the tutorial

18

Implementing the Logging_Server (1/5) •This example uses the ACE_Message_Block & ACE CDR classes in a common base class that simplifies logging server implementations in the examples // Forward declaration. class ACE_SOCK_Stream; Header file “Logging_Server.h” class Logging_Server { public: // Template Method that runs logging server's event loop. virtual int run (int argc, char *argv[]); protected: // The following four methods are ``hooks'' that can be // overridden by subclasses. virtual int open (u_short logger_port = 0); virtual int wait_for_multiple_events () { return 0; } virtual int handle_connections () = 0; virtual int handle_data (ACE_SOCK_Stream * = 0) = 0; 19

Sidebar: Template Method Pattern • Intent Abstract Class • Define the skeleton of an template_method (); algorithm in an operation, hook_method_1(); deferring some steps to hook_method_1(); ... hook_method_1(); subclasses ... • Context hook_method_2(); ... • You have a fixed algorithm structure with variations possible for individual steps Concrete Class 1 • Problem • You want to plug in & out steps hook_method_1(); of the algorithm without hook_method_2(); changing the algorithm itself • Solution Concrete Class 2 • Define a fixed base class function that calls virtual “hook” hook_method_2(); methods that derived classes can override 20

Implementing the Logging_Server (2/5) Header file “Logging_Server.h” (cont’d) // This helper method can be used by the hook methods. int make_log_file (ACE_FILE_IO &, ACE_SOCK_Stream * = 0); // Close the socket endpoint and shutdown ACE. virtual ~Logging_Server () { acceptor_.close (); } // Accessor. ACE_SOCK_Acceptor &acceptor () { return acceptor_; } private: // Socket acceptor endpoint. ACE_SOCK_Acceptor acceptor_; }; 21

Implementing the Logging_Server (3/5) Implementation file “Logging_Server.cpp” #include #include #include #include #include #include

"ace/FILE_Addr.h" • Template method providing the "ace/FILE_Connector.h" skeleton of the algorithm to use "ace/FILE_IO.h" • Hook methods will be overridden by "ace/INET_Addr.h" subclasses unless default is ok to "ace/SOCK_Stream.h" use "Logging_Server.h"

int Logging_Server::run (int argc, char *argv[]) { if (open (argc > 1 ? atoi (argv[1]) : 0) == -1) return -1; Three hook methods that can be overridden in subclasses for (;;) { if (wait_for_multiple_events () == -1) return -1; if (handle_connections () == -1) return -1; if (handle_data () == -1) return -1; } return 0; } 22

Implementing the Logging_Server (4/5) Initialize the acceptor so it can accept connections from any server network interface int Logging_Server::open (u_short logger_port) { // Raises the number of available socket handles to // the maximum supported by the OS platform. ACE::set_handle_limit (); ACE_INET_Addr server_addr; int result; if (logger_port != 0) result = server_addr.set (logger_port, INADDR_ANY); else result = server_addr.set ("ace_logger", INADDR_ANY); if (result == -1) return -1; // Start listening and enable reuse of listen address // for quick restarts. return acceptor_.open (server_addr, 1); } 23

Implementing the Logging_Server (5/5) int Logging_Server::make_log_file (ACE_FILE_IO &logging_file, ACE_SOCK_Stream *logging_peer) { std::string filename (MAXHOSTNAMELEN, ’\0’); if (logging_peer != 0) { // Use client host name as file name. ACE_INET_Addr logging_peer_addr; logging_peer->get_remote_addr (logging_peer_addr); logging_peer_addr.get_host_name (filename.c_str (), filename.size ()); filename += ".log"; } else filename = "logging_server.log"; ACE_FILE_Connector connector; return connector.connect (logging_file, ACE_FILE_Addr (filename.c_str ()), 0, // No time-out. ACE_Addr::sap_any, // Ignored. Create the log file using the 0, // Don't try to reuse the addr. ACE_FILE_Connector factory O_RDWR|O_CREAT|O_APPEND, ACE_DEFAULT_FILE_PERMS); 24 }

Sidebar: The ACE File Wrapper Facades • ACE file wrapper facades encapsulate platform mechanisms for unbuffered file operations • The design of these wrapper facades is very similar to ACE IPC wrapper facades • The ACE File classes decouple: •Initialization factories: e.g., ACE_FILE_Connector, which opens and/or creates files •Data transfer classes: e.g., ACE_FILE_IO, which applications use to read/write data from/to files opened using ACE_FILE_Connector • This generality in ACE’s design of wrapper facades helps strategize higher-level ACE framework components •e.g., ACE_Acceptor, ACE_Connector, & ACE_Svc_Handler 25

Implementing the Logging_Handler (1/6) Header file “Logging_Handler.h” #include "ace/FILE_IO.h" #include "ace/SOCK_Stream.h" class ACE_Message_Block; // Forward declaration. class Logging_Handler { protected: // Reference to a log file. ACE_FILE_IO &log_file_; // Connected to the client. ACE_SOCK_Stream logging_peer_;

26

This class is used by the logging server to encapsulate the I/O & processing of log records

Implementing the Logging_Server (2/6) Header file “Logging_Handler.h” cont’d // Receive one log record from a connected client. <mblk> // contains the hostname, <mblk->cont()> contains the log // record header (the byte order and the length) and the data. int recv_log_record (ACE_Message_Block *&mblk); // Write one record to the log file. The <mblk> contains the // hostname and the <mblk->cont> contains the log record. int write_log_record (ACE_Message_Block *mblk); // Log one record by calling and // <write_log_record>. int log_record (); }; When a log record is received it is stored as an ACE_Message_Block chain 27

Implementing the Logging_Server (3/6) 1. Receive incoming data & use the input CDR class to parse header 2. Then payload based on the framing protocol & 3. Finally save it in an ACE_Message_Block chain 1 int Logging_Handler::recv_log_record (ACE_Message_Block *&mblk) 2 { First save the peer hostname 3 ACE_INET_Addr peer_addr; 4 logging_peer_.get_remote_addr (peer_addr); 5 mblk = new ACE_Message_Block (MAXHOSTNAMELEN + 1); 6 peer_addr.get_host_name (mblk->wr_ptr (), MAXHOSTNAMELEN); 7 mblk->wr_ptr (strlen (mblk->wr_ptr ()) + 1); // Go past name. 8 9 ACE_Message_Block *payload = 10 new ACE_Message_Block (ACE_DEFAULT_CDR_BUFSIZE); 11 // Align Message Block for a CDR stream. Force proper alignment 12 ACE_CDR::mb_align (payload); 13 14 if (logging_peer_.recv_n (payload->wr_ptr (), 8) == 8) { 15 payload->wr_ptr (8); // Reflect addition of 8 bytes. 16 Receive the header info (byte 17 ACE_InputCDR cdr (payload); order & length) 18 28

Implementing the Logging_Server (4/6) 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 } 29

ACE_CDR::Boolean byte_order; Demarshal header info // Use helper method to disambiguate booleans from chars. cdr >> ACE_InputCDR::to_boolean (byte_order); cdr.reset_byte_order (byte_order); ACE_CDR::ULong length; Resize message block to be the right size for payload & that’s aligned properly cdr >> length; payload->size (8 + ACE_CDR::MAX_ALIGNMENT + length); if (logging_peer_.recv_n (payload->wr_ptr(), length) > 0) { payload->wr_ptr (length); // Reflect additional bytes. mblk->cont (payload); // Chain the header and payload. return length; // Return length of the log record. } } payload->release (); mblk->release (); payload = mblk = 0; return -1;

On error, free up allocated message blocks

Implementing the Logging_Server (5/6) 1. Send the message block chain to the log file, which is stored in binary format 2. If debug flag is set, print contents of the message 1 int Logging_Handler::write_log_record (ACE_Message_Block *mblk) 2 { 3 if (log_file_->send_n (mblk) == -1) return -1; 4 5 if (ACE::debug ()) { 6 ACE_InputCDR cdr (mblk->cont ()); 7 ACE_CDR::Boolean byte_order; 8 ACE_CDR::ULong length; 9 cdr >> ACE_InputCDR::to_boolean (byte_order); 10 cdr.reset_byte_order (byte_order); 11 cdr >> length; 12 ACE_Log_Record log_record; 13 cdr >> log_record; // Extract the . 14 log_record.print (mblk->rd_ptr (), 1, cerr); 15 } 16 17 return mblk->total_length (); 18 } 30

Implementing the Logging_Server (6/6) 1. Receives the message 2. Demarshals it into a ACE_Message_Block & 3. Writes it to the log file int Logging_Handler::log_record () { ACE_Message_Block *mblk = 0; if (recv_log_record (mblk) == -1) return -1; else { int result = write_log_record (mblk); mblk->release (); // Free up the entire contents. return result == -1 ? -1 : 0; } }

Later on we’ll see the virtue of splitting the recv_log_record() & write_log_record() logic into two methods … 31

Iterative Logging Server •This is the simplest possible logging server implementation •The iterative server implementation simply accepts & processes one client connection at a time •Clearly, this approach does not scale up for non-trivial applications of the logging service!!! •Subsequent implementations will enhance this version, yet still use the logging server framework 32

ACE_SOCK_Acceptor ACE_SOCK_Stream

Only one client is accepted/processed at a time

Implementing the Iterative_Logging_Server (1/3) #include #include #include #include #include

"ace/FILE_IO.h" "ace/INET_Addr.h" "ace/Log_Msg.h" "Logging_Server.h" "Logging_Handler.h"

Header file: Iterative_Logging_Server.h

class Iterative_Logging_Server : public Logging_Server { public: Iterative_Logging_Server (): logging_handler_ (log_file_) {} Logging_Handler &logging_handler () { return logging_handler_; } protected: ACE_FILE_IO log_file_; Logging_Handler logging_handler_; // Other methods shown below... }; 33

Implementing the Iterative_Logging_Server (2/3) virtual int open (u_short logger_port) { if (make_log_file (log_file_) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "make_log_file()"), -1); return Logging_Server::open (logger_port); }

Override & “decorate” the Logging_Server::open() method

virtual int handle_connections () { ACE_INET_Addr logging_peer_addr;

Override the handle_connections() hook method to handle one connection at a time

if (acceptor ().accept (logging_handler_.peer (), &logging_peer_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept()"), -1); ACE_DEBUG ((LM_DEBUG, "Accepted connection from %s\n", logging_peer_addr.get_host_name ())); return 0; } 34

Implementing the Iterative_Logging_Server (3/3) virtual int handle_data (ACE_SOCK_Stream *) { while (logging_handler_.log_record () != -1) continue;

Delegate I/O to Logging_Handler logging_handler_.close (); // Close the socket handle. return 0; }

Main program of iterative logging server #include "ace/Log_Msg.h" #include "Iterative_Logging_Server.h" int main (int argc, char *argv[]) { Iterative_Logging_Server server; if (server.run (argc, argv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "server.run()"), 1); return 0; } 35

Concurrency Design Dimensions • Concurrency is essential to develop scalable & robust networked applications, particularly servers • The next group of slides present a domain analysis of concurrency design dimensions that address the policies & mechanisms governing the proper use of processes, threads, & synchronizers • We cover the following design dimensions in this chapter: • Iterative versus concurrent versus reactive servers • Processes versus threads • Process/thread spawning strategies • User versus kernel versus hybrid threading models • Time-shared versus real-time scheduling classes • Task- versus message-based architectures

36

Iterative vs. Concurrent Servers

•Iterative/reactive servers handle each client request in its entirety before servicing subsequent requests •Best suited for short-duration or 37 infrequent services

•Concurrent servers handle multiple requests from clients simultaneously •Best suited for I/O-bound services or long-duration services •Also good for busy servers

Multiprocessing vs. Multithreading

•A process provides the context for executing program instructions •Each process manages certain resources (such as virtual memory, I/O handles, and signal handlers) & is protected from other OS processes via an MMU •IPC between processes can be complicated & inefficient 38

•A thread is a sequence of instructions in the context of a process •Each thread manages certain resources (such as runtime stack, registers, signal masks, priorities, & thread-specific data) •Threads are not protected from other threads •IPC between threads can be more efficient than IPC between processes

Thread Pool Eager Spawning Strategies •This strategy prespawns one or more OS processes or threads at server creation time •These``warm-started'' execution resources form a pool that improves response time by incurring service startup overhead before requests are serviced •Two general types of eager spawning strategies are shown below:

•These strategies based on Half-Sync/Half-Async & Leader/Followers patterns 39

The Half-Sync/Half-Async Pattern •The Half-Sync/Half-Async pattern decouples async & sync service processing in concurrent systems to simplify programming without unduly reducing performance

Sync Service Layer

Sync Service 1

Sync Service 2

<>

<> Queueing Layer

Async Service Layer

Sync Service 3

Queue

<<dequeue/enqueue>> Async Service

<>

<> External Event Source

This pattern yields two primary benefits: 1.Threads can be mapped to separate CPUs to scale up server performance via multi-processing 2.Each thread blocks independently, which prevents a flowcontrolled connection from degrading the QoS that other clients receive 40

Half-Sync/Half-Async Pattern Dynamics : External Event Source

: Async Service

: Queue

: Sync Service

notification read() work() message

message enqueue()

notification read() work() message

• This pattern defines two service processing layers—one async & one sync—along with a queueing layer that allows services to exchange messages between the two layers 41

• The pattern allows sync services (such as processing log records from different clients) to run concurrently, relative both to each other & to async/reactive services (such as event demultiplexing)

Drawbacks with Half-Sync/Half-Async Problem • Although Half-Sync/Half-Async threading model is more scalable than the purely reactive model, it is not necessarily the most efficient design •e.g., passing a request between the async thread & a worker thread incurs: •Dynamic memory (de)allocation, •Synchronization operations, •A context switch, & •CPU cache updates •This overhead can make server latency unnecessarily high

42

Worker Thread 1

Worker Thread 2

Worker Thread 3

<> <>

Request Queue

<>

<> handlers

acceptor

Event source

Solution • Apply the Leader/Followers architectural pattern to minimize server threading overhead

The Leader/Followers Pattern demultiplexes

•The Leader/Followers architectural pattern is more efficient than HalfSync/Half-Async

Thread Pool synchronizer join() promote_new_leader()

•Multiple threads take turns sharing event sources to detect, demux, dispatch, & process service requests that occur on the event sources

* Event Handler

Handle

uses

*

•This pattern eliminates the need for—& the overhead of—a separate Reactor thread & synchronized request queue used in the Half-Sync/Half-Async pattern

Handle Set handle_events() deactivate_handle() reactivate_handle() select()

handle_event () get_handle()

Iterative Handles

Concrete Event Handler A

Handles Concurrent Handles Handle Sets Concurrent Handle Sets Iterative Handle Sets 43

handle_event () get_handle()

UDP Sockets +

TCP Sockets +

WaitForMultipleObjects()

WaitForMultpleObjects()

UDP Sockets + select()/poll()

TCP Sockets + select()/poll()

Concrete Event Handler B

handle_event () get_handle()

Leader/Followers Pattern Dynamics Thread 1

1.Leader thread demuxing

Thread 2

: Thread Pool

: Handle Set

: Concrete Event Handler

join() handle_events() join()

event handle_event()

2.Follower thread promotion

thread 2 sleeps until it becomes the leader thread 2 waits for a new event, thread 1 processes current event

3.Event handler demuxing & event processing 4.Rejoining the thread pool 44

join() thread 1 sleeps until it becomes the leader

deactivate_ handle() promote_ new_leader()

handle_ events()

reactivate_ handle()

event handle_event() deactivate_ handle()

Thread-per-Request On-demand Spawning Strategy •On-demand spawning creates a new process or thread in response to the arrival of client connection and/or data requests •Typically used to implement the thread-per-request and thread-perconnection models

•The primary benefit of on-demand spawning strategies is their reduced consumption of resources •The drawbacks, however, are that these strategies can degrade performance in heavily loaded servers & determinism in real-time systems due to costs of spawning processes/threads and starting services 45

The N:1 & 1:1 Threading Models •OS scheduling ensures applications use host CPU resources suitably •Modern OS platforms provide various models for scheduling threads •A key difference between the models is the contention scope in which threads compete for system resources, particularly CPU time •The two different contention scopes are shown below:

46

• Process contention scope (aka “user threading”) where threads in the same process compete with each other (but not directly with threads in other processes)

• System contention scope (aka “kernel threading”) where threads compete directly with other system-scope threads, regardless of what process they’re in

The N:M Threading Model •Some operating systems (such as Solaris) offer a combination of the N:1 & 1:1 models, referred to as the ``N:M'‘ hybridthreading model •When an application spawns a thread, it can indicate in which contention scope the thread should operate •The OS threading library creates a user-space thread, but only creates a kernel thread if needed or if the application explicitly requests the system contention scope 47

•When the OS kernel blocks an LWP, all user threads scheduled onto it by the threads library also block •However, threads scheduled onto other LWPs in the process can continue to make progress

Task- vs. Message-based Concurrency Architectures •A concurrency architecture is a binding between: •CPUs, which provide the execution context for application code •Data & control messages, which are sent & received from one or more applications & network devices •Service processing tasks, which perform services upon messages as they arrive & depart •Task-based concurrency architectures structure multiple CPUs according to units of service functionality in an application •Message-based concurrency architectures structure the CPUs according to the messages received from applications & network devices 48

Related Documents