[返回]
AS/400核心线程及用户线程的分析
招商银行总行电脑部 胡向阳
由于传统的进程不能很好地利用SMP对称多处理器体系结构来充分发挥系统的并发性,进程开销较大,导致系统效率低下,因此各操作系统厂商纷纷推出支持线程的操作系统,如WinNT,Unix,OS/2等。那么AS/400的线程又是如何实现的呢?在V3R1以前,为了实现几个任务的并发执行,我们一般用SBMJOB来提交作业运行,在V3R1后AS/400提供了对多线程的支持,但它是通过CPA(Common Programming APIs)工具集来实现,并不是在内核提供多线程,应用程序中的一个线程对应AS/400低层是一个job,并不是真正的线程实现。只有在V4R2以后,AS/400才真正的在内核提供对多线程的支持。
在AS/400上,用户可见的是用户线程,核心线程由AS/400来实现,对用户并不可见。
建立、管理一个线程所开销的系统资源远远小于一个进程,而建立一个进程又远小于建立一个job,多线程的编程技术允许在一个程序中并发或并行执行多项任务,而不用多次调用开销很大的SBMJOB来提交多个作业。
进程这个名词对很多AS/400程序员、管理员来说可能觉得陌生,在AS/400上可操作、可管理的是job,job是建立在MI上面,而进程(process)则建立在MI中,一个job 对应一个进程。为了与其他操作系统的提法相一致,本文一般采用进程分析。
用户线程是完全在用户级提供的抽象概念,而内核无须知道它的存在,这通过使用像pthread等线程库来成功实现,这些库提供所有建立、同步、管理线程的函数,而不用内核的支持帮助,速度也非常快,它提供了对并发应用的一个更加自然、更加强大的编程环境,但是纯粹的一个用户级线程库并不能真正允许代码的并行执行,它必须有内核线程的支持才行。
核心线程就是由内核来决定创建、撤销的,并用来执行核心功能的线程,它与用户线程无直接联系。核心线程允许单独分配到CPU上,允许在多处理器上并行执行,但它不适合构造用户级的应用程序。比如,一个服务器应用程序可能需创建数以千计的线程,其中的每一个对应于一个客户,核心线程的运行要消耗宝贵的资源如物理内存(因为要求线程结构驻留内存),因此对于一个这样的程序是没有好处的,这种情况用用户线程能很容易解决。因此把线程分为核心线程和用户线程是一种好的策略。
那么什么是轻量级进程呢?一个轻量级进程(Light Weight Process)是一个内核支持的用户线程,它是一个在核心线程基础上的高层抽象,因此内核在支持轻量级进程之前必须支持核心线程。每个进程可能有一个或多个轻量级进程,每个都由一个单独的核心线程来支持,同时又有多个用户线程复用到几个轻量级进程上,如图1所示。尽管每个轻量级进程都和一个核心线程相联系,但是一些核心线程是专门处理系统任务而不支持轻量级进程。轻量级进程就像是把用户线程和核心线程有效连接起来的一个中间件,使得大量的用户线程任务可以在少量的高效核心线程上充分运行。这些轻量级进程被独立地调度,可以并行在多处理器上运行。每个轻量级进程都绑定到它自己的核心线程上,而且在它的生命期内这种绑定都是有效的。
AS/400的线程模型与业界通用线程模型图1基本上相一致,虽然有些区别,但基本的三部分:用户线程、轻量级进程、核心线程都可对应上,只是名称有些不同,轻量级进程称为线程(Thread),核心线程为Task。
AS/400操作系统用job来管理被用户提交的工作单元,一个job是由一个进程(process)结构和其他结构构造而成,用来管理工作单元所需的系统资源。在一个AS/400 job中有两个主要的控制结构WCB(Work Control Block) 和PPCO(Portable Process Communication Object), WCB是放在PCS(Process Control Space)中,主要是存放一些运行环境及可用资源如打开的数据库文件,socket描述符,Acquire 过的ProgramDevice等,PPCO则是为了支持Unix类型的API才增加的,它放一些如IFS的文件描述符,异步信号资源。在WCB、PPCO中的所有资源都可以被在job中的所有的用户线程共享。
进程(process)在AS/400中由三个主要的对象组成:PCS(Process Control Space)、TCS(Thread Control Space) 和Task对象。PCS是一个复杂的、临时的MI对象,由于进程表示一个程序的执行,包括执行的程序,程序中的数据以及运行该程序的一些状态信息,所以PCS中包含有Program static storage、threads heapstorage、files、devices、job message queue等。在PCS中的资源可以被在进程中的所有线程共享。
TCS是一个临时的SLIC对象,在MI上面并不可见,该对象存放一些Control Stack、MI Automatic Storage、Control Block等轻量级进程的状态及资源。一个TCS被一个轻量级进程使用,当一个轻量级进程结束时,它对应的TCS可以在进程级循环利用,当进程结束时,轻量级进程可以被系统循环利用。每增加一个轻量级进程,系统就在进程结构中增加一个TCS和一个Task对象。
Task作为AS/400的核心线程,系统所有的调度、分配工作都由它来完成,它控制Task对象抢占CPU来运行或从CPU上卸载Task对象停止运行。Task对象中存放硬件寄存器状态、优先级以及其他的控制信息,也称为TDE(Task Dispatching Element)。
在AS/400上有四种Task:
轻量级进程Task(Light Weight Process Task):这种类型Task主要被轻量级进程利用。
非驻留Task(NoNresident Task):这种类型Task主要是用来处理SLIC(System Licenced Internal Code)的功能,例如通信、数据库页面失效(page fault)。
驻留Task(resident Task):这种类型Task主要是用来处理不能发生页面失效(page fault)的一些SLIC功能,例如存储管理等核心功能。
初始Task(initial Task):这种特殊类型的Task仅仅用在机器IPL(initial program load)时,这种Task建立了系统中的所有其他的Task,包括轻量级进程Task、非驻留Task、驻留Task。
在AS/400中,任一时刻有资格运行的所有Task对应的TDE被存放在一个称为TDQ(Task Dispatching Queue)的队列中,TDQ是内存中的一个有序链表,它按照TDE的优先级从高到低进行排序,优先级最高的TDE将获得CPU的控制权。
由于线程能共享进程内的很多资源,所以线程安全地共享资源又需要有同步机制来支持,在轻量级进程、核心线程中系统有很多机制来实现线程安全,例如在Task的实现中有SRQ(Send/Receive Queue)队列来支持同步和消息传递,这些对用户来说都是透明的,不用用户去考虑,但是在用户线程这一级,就须用户来实现线程安全了。
作为AS/400线程技术的一大优势,在一个进程中没有限制最多可以有多少个线程,这在其他很多平台上是不能实现的。这主要得益于AS/400的单级存储(Single-level storage),一个进程没有自己单独的进程地址空间,它利用整个系统的地址空间,所以在AS/400上与一个进程可以允许有多少个线程无关,而只与内存和外存的存储空间有关。
虽然业界在内核线程实现上没有统一的标准,但在用户线程上已经有统一的Pthread标准了。AS/400对用户提供两种线程编程接口,一种是POSIX标准的Pthread接口,一种是Java线程类。由于Pthread线程库是POSIX标准组织制定的,目前很多操作系统都支持此标准,下面主要以Pthread库来举例。
下例是一个线程例子,它简单地用两个全局变量来记录本进程中有多少个线程,正确的结果应该都等于3。但是这个例子是有问题的,因为三个线程更改全局变量没有同步机制,例如,线程B正在更改SharedData1为2时,线程C也正在更改该值为2,造成数据不一致,可能造成SharedData1=2,SharedData2=3。这就是线程编程时最常遇到的问题——线程安全(thread safe)。
#define _MULTI_THREADED
#include <PTHREAD.H>
#include <STDIO.H>
#include <UNISTD.H>
#include <ERRNO.H>
volatile int sharedData1 = 0;
volatile int sharedData2 = 0;
void *Thread(void *);
void main(int argc, char *argv[])
{
pthread_t threadA, threadB,threadC;
int rc;
rc = pthread_create( &threadA, NULL, Thread, NULL);
if (rc != 0) {
printf(“ThreadA not created with rc= %d,
errno= %d\n" , rc, errno);
}
rc = pthread_create( &threadB, NULL, Thread, NULL);
if (rc != 0) {
printf(“ThreadB not created with rc= %d,
errno= %d\n" , rc, errno);
}
rc = pthread_create( &threadC, NULL, Thread, NULL);
if (rc != 0) {
printf(“ThreadB not created with rc= %d,
errno= %d\n" , rc, errno);
}
/* Wait for threads to complete */
pthread_join(threadA, NULL);
pthread_join(threadB, NULL);
pthread_join(threadC, NULL);
return 0;
}
void *Thread(void *parm)
{
/* Update shared memory */
sharedData1 ++;
sharedData2 ++;
return NULL;
}
由于在一个进程中的所有线程共享进程中的很多资源,所以就必须用同步机制来消除由于共享而造成的数据混乱、不一致等问题。AS/400提供很多同步的方法,例如我们平时很常用的ALCOBJ命令。
除了用同步机制来保障线程安全外,在编程过程中还须注意不能使用一些平时在非多线程编程环境下经常应用的系统功能或函数,例如: asctime()、ctime()、gmtime()、localtime()、rand()、strtok()、system()、gethostbuname()等,但有些函数在线程库中以_r结尾来提供对线程安全的支持,如asctime_r()、gethostname_r()。
在银行业务系统中,随着业务的发展,除传统柜员的存取款外,逐渐出现了一些其他的新业务种类,如ATM取款、CDM存款、网上银行、电话银行、存折炒股等等,这就要求银行的业务主机要有更大的吞吐量、更高的性能以及对大量突发交易有更好的适应性。对于传统的编程方法,使用每次创建一个新job来处理每一笔交易就显得很笨重,造成系统的开销非常大,性能下降,而采用线程来处理每一笔交易就显得轻松多了。应用线程处理交易可以有两种方式,一种是为每笔交易建立一个线程,完成后取消该线程,例如对于网上银行、电话银行可以采用这种方式来实现,另一种是每个柜员对应一个线程来处理交易,该线程只处理该柜员的交易,完成一笔交易后不取消该线程,例如柜员的存取款、ATM取款、CDM存款可以采用此方式。以上两种方式都可以有效降低系统资源开销,提高系统响应时间。
下面的例子是针对第二种方式来完成的,它采用TCP/IP完成,这里只讲服务器程序。对于每个柜员可以单独建立一个线程来对它服务,在程序中还提供了互斥琐来实现对共享资源的同步。
该例子还有些地方可以改进,例如在一个线程结束后,最好在进程中用pthread_jon()回收系统的资源。
#include <PTHREAD.H>
#include <STDIO.H>
#include <STDLIB.H>
#include <STRING.H>
#include <UNISTD.H>
#include <ERRNO.H>
#include <SYS socket.h>
#include <SYS types.h>
#include <SIGNAL.H>
#include <ARPA inet.h>
#include <NETINET in.h>
#define _MULTI_THREADED
/* Global constants */
#define SERVER_PORT 8888
#define BUFFER_LENGTH 1000
#define Thread_Max 1000
/* Global shared resource */
static volatile int Thread_Count;
static volatile int Client_Sockid[Thread_Max];
static pthread_mutex_t mutex;
int Search_Array();
void *Handle_Transaction(void * parm);
int main(int argc, char *argv[])
{
int sd;/* socket descriptor */
int rc;
int on = 1;
int new_sd;
/* accepted connection socket descriptor*/
int cli_len;
int num;
struct sockaddr_in srv_addr;/* srv address */
struct sockaddr_in cli_addr; /* clients address */
pthread_t threadid;
rc=pthread_mutex_init(&mutex,NULL);
if ( rc != 0 ) {
printf(“server: Create mutex failed with a errno
= %d\n", errno);
exit(1);
} /* endif */
sd = socket(AF_INET, SOCK_STREAM, 0);
if ( sd < 0 ) {
printf(“server: Socket create failed with a errno
= %d\n", errno);
exit(1);
} /* endif */
memset((void *)&srv_addr, 0, sizeof(srv_addr));
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons((short)SERVER_PORT);
srv_addr.sin_addr.s_addr = INADDR_ANY;
/* To prevent next time server not start,
we'll use setsockopt,soreuseaddr */
rc =setsockopt(sd,SOL_SOCKET,
SO_REUSEADDR, (char *)&on, sizeof(on));
if ( rc ) {
perror(“daemon: failed to set SO_REUSEADDR\n");
}
rc = bind(sd, (struct sockaddr *)&srv_addr, sizeof
(srv_addr));
if ( rc ) {
printf(“daemon: bind call failed, errno =
%d\n", errno);
exit(1);
}
/*allow connection requests
to be queued before dropping them */
rc = listen(sd, 5);
if ( rc ) {
perror(“daemon: Return code incorrect from listen");
exit(1);
} /* endif */
/* wait for a connection request and accept it,
creating a new connected endpoint */
new_sd = -1;
Thread_Count=0;
/* We'll be a multi threaded server. After each
connection, we need to pass the parms to
a new thread,then processing transaction */
do {
cli_len = sizeof(cli_addr);
new_sd = accept(sd, (struct sockaddr *)
&cli_addr, &cli_len);
if (new_sd < 0) {
perror(“daemon: accept failed\n");
}
/* lock the shared resources */
rc=pthread_mutex_lock(&mutex);
if ( rc!=0 ) {
printf(“daemon: Mutex lock failed, errno =
%d\n", errno);
}
Thread_Count ++;
num=Search_Array() ;
Client_Sockid[num]=new_sd;
/* unlock the shared resources */
rc=pthread_mutex_unlock(&mutex);
if ( rc!=0 ) {
printf(“daemon: Mutex unlock failed, errno =
%d\n", errno);
}
rc = pthread_create(&threadid, NULL,
Handle_Transaction, (void *)&num);
if ( rc ) {
printf(“daemon: Failed creating thread, errno =
%d\n", errno);
}
else {
printf(“daemon: Created thread [tid =
%d]\n", threadid);
}
} while (Thread_Count < Thread_Max);
/* Note: The mutex must be destroy,
otherwise will effect the system */
pthread_mutex_destroy(&mutex);
return 0;
} /* end main */
/*Search not valid sockid in Client_Sockid array */
int Search_Array()
{
int i;
for(i=0;i< Thread_Max;i++){
if(Client_Sockid[i]==0)
break;
}
return i;
}
/* Handle the transaction processing */
void *Handle_Transaction(void * parm)
{
char Rcv_Buf[BUFFER_LENGTH];
char Snd_Buf[BUFFER_LENGTH];
int num= *(int *)parm;
int Sockid;
int rc;
!U2
!U1
Sockid=Client_Sockid[num];
for(;;){
if ((rc = read(Sockid, Rcv_Buf,
BUFFER_LENGTH)) < 0) {
perror(“server: Bad return code from read");
break;
}
if (rc == 0) {
printf(“Connection reset by peer\n");
break;
}
/******************/
/* Here is call the transaction process function */
/******************/
/* write the response data to client */
if ((rc = write(Sockid,Snd_Buf,
BUFFER_LENGTH)) < 0) {
perror(“server: Bad return code from write\n");
break;
}
} /* end for */
rc = close(Sockid);
if ( rc ) {
perror(“server: Return code incorrect for close\n");
}
/* lock the shared resources */
rc=pthread_mutex_lock(&mutex);
if ( rc!=0 ) {
printf(“daemon: Mutex lock failed, errno =
%d\n", errno);
}
Thread_Count --;
Client_Sockid[num]=0;
/* unlock the shared resources */
rc=pthread_mutex_unlock(&mutex);
if ( rc!=0 ) {
printf(“daemon: Mutex unlock failed, errno =
%d\n", errno);
}
pthread_exit(NULL);
return NULL;
}
随着系统的多线程应用增多,系统管理也必须随着调整一些系统参数来适应多线程应用,如果调整不好,可能影响系统资源的利用,导致系统性能下降。
一种简单的方法是由系统自动调整性能,这可以通过改变系统QPFRADJ参数来实现,它出厂缺省为2 = Adjustment at IPL and automatic adjustment。另外还有一个系统参数影响自动调整的特性,QMAXACTLVL,缺省为*NOMAX,它的主要功能是限定在所有的存储池(storage pool)中允许有多少个激活线程(非job)可以运行,此参数建议设为*NOMAX。
如果不设为自动调整,则存储池的特性ACTLVL必须注意设定,它是限制一个存储池中允许有多少个激活的线程可以运行,在V4R2以前,它是指job个数,但在V4R2以后,它指的是线程而非job,由于一个应用中可能有上千个线程,所以此特性在线程环境下必须比job环境下增大才能不影响性能。可以用CHGSHRPOOL POOL (*BASE) ACTLVL (n) 来更改该值。
在存储池中还有另外一个特性须注意,它是PAGING,它有两个选项,一个是*FIXED,表示系统限制在存储池中的线程可以使用的总的内存数量,而另一个为*CALC,表示由系统自动调整,可以用CHGSHRPOOL POOL (*BASE) PAGING (*FIXED) 来更改该值。