Lab04
线程的基本操作函数
- int pthread_create(pthread_t *tid,const pthread_attr_t *attr, void *(*func)(void *),void *arg);
参数:
- 指向线程标识符的指针
- 设置线程属性
- 线程运行函数的起始地址
- 运行函数的参数
线程创建成功返回0,否则创建失败,常返回出错代码EAGAIN和EINVAL
- int pthread_join(pthread_ttid,void**status);
参数:
- 被等待的线程标识符
- 一个用户定义的指针,存储被等待线程的返回值
线程阻塞的函数,调用它的函数一直等到被等待的线程结束为止,当函数返回时,被等待线程的资源被回收;返回值为0,阻塞成功
eg:并发的两个线程演示程序,通过调用pthread_join让main线程等待两个子线程
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
void print_message_function(void *ptr);
int main()
{
int tmp1,tmp2; // 接收阻塞结果
void *retval; // 存储阻塞线程返回值
pthread_t thread1, thread2; // 声明线程ID
int ret_thrd1,ret_thrd2; // 接收创建线程返回值
char* msg1="thread1"; // 传输数据
char* msg2="thread2";
ret_thrd1=pthread_create(&thread1,NULL,(void*)&print_message_function,(void*)msg1);
ret_thrd2=pthread_create(&thread2,NULL,(void*)&print_message_function,(void*)msg2);
if (ret_thrd1!=0) // 不为0,创建线程错误
{
printf("error in creating thread 1\n ");
}
else
{
printf("success in creating thread 1 \n");
}
if (ret_thrd2!=0)
{
printf("error in creating thread 2\n ");
}
else
{
printf("success in creating thread 2 \n");
}
tmp1 = pthread_join(thread1, &retval);
printf("thread1 return value(retval) is %d\n", (int)retval);
printf("thread1 return value(tmp) is %d\n", tmp1);
if (tmp1 != 0) { // 不为0,阻塞线程错误
printf("cannot join with thread1\n");
}
printf("thread1 end\n");
tmp2 = pthread_join(thread1, &retval);
printf("thread2 return value(retval) is %d\n", (int)retval);
printf("thread2 return value(tmp) is %d\n", tmp2);
if (tmp2 != 0) {
printf("cannot join with thread2\n");
}
printf("thread2 end\n");
}
void print_message_function( void *ptr )
{
int i = 0;
for (i; i<5; i++) {
sleep(3);
printf("%s:%d\n", (char *)ptr, i);
}
}
打印效果:
- pthread_t pthread_self(void);
取自己的线程ID函数(每个线程都有一个ID在给定的进程内标识自己)
- void pthread_exit(void *status);
终止线程函数,一个线程结束有两种方式,一种是函数自己结束,调用的线程也结束;一种是通过函数pthread_exit实现
status是函数的返回代码,一个线程不能被多个线程等待,否则第一个接收到信号的线程成功返回,其余调用pthread_join 的线程则返回错误代码ESRCH
互斥锁
pthread_mutex_t为声明互斥锁变量的数据类型
- pthread_mutex_init(pthread_mutex *mutex,NULL);
互斥锁的初始化,第二个参数为NULL时为默认的快速互斥锁
- pthread_mutex_lock()用于开始用互斥锁上锁,此后的代码直到pthread_mutex_unlock()为止均被上锁,即同一时间只能被一个线程调用执行。当一个线程执行到pthread_mutex_lock时,若此时另一个线程使用该锁,则将阻塞此线程,即程序将等待到另一个线程释放此互斥锁
- pthread_mutex_destroy(phtread_mutex_t * lock);销毁一个互斥锁,释放它所占的资源
eg:没有加入互斥引发的错误程序演示
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
// 定义了一个全局变量sharei,不互斥,让三个变量对全局变量进行加1操作(条件竞争)
int sharedi = 0;
void increse_num(void);
int main(){
int ret;
pthread_t thrd1, thrd2, thrd3;
ret = pthread_create(&thrd1, NULL, (void *)increse_num, NULL);
ret = pthread_create(&thrd2, NULL, (void *)increse_num, NULL);
ret = pthread_create(&thrd3, NULL, (void *)increse_num, NULL);
pthread_join(thrd1, NULL);
pthread_join(thrd2, NULL);
pthread_join(thrd3, NULL);
printf("sharedi = %d\n", sharedi);
return 0;
}
void increse_num(void) {
long i,tmp;
for(i=0; i<=100000; i++) {
tmp = sharedi;
tmp = tmp + 1;
sharedi = tmp;
}
}
错误打印:
正常希望的是互斥访问sharedi,最终结果应该为300003
添加互斥锁:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
pthread_mutex_t lock; // 添加互斥锁
// 定义了一个全局变量sharei,不互斥,让三个变量对全局变量进行加1操作(条件竞争)
int sharedi = 0;
void increse_num(void *ptr);
int main(){
pthread_t thrd1, thrd2, thrd3;
char* msg1 = "thread1"; // 添加信息显示线程竞争情况
char* msg2 = "thread2";
char* msg3 = "thread3";
pthread_mutex_init(&lock,NULL);
pthread_create(&thrd1, NULL, (void *)increse_num, (void*)msg1);
pthread_create(&thrd2, NULL, (void *)increse_num, (void*)msg2);
pthread_create(&thrd3, NULL, (void *)increse_num, (void*)msg3);
pthread_join(thrd1, NULL);
pthread_join(thrd2, NULL);
pthread_join(thrd3, NULL);
printf("sharedi = %d\n", sharedi);
pthread_mutex_destroy(&lock);
return 0;
}
void increse_num(void *ptr) {
int ret = 0;
ret = pthread_mutex_lock(&lock);
if (0==ret)
{
fprintf(stderr,"%s lock the share var\n",(char *)ptr);
}
else
{
fprintf(stderr,"can not lock the share var\n");
}
long i,tmp;
for(i=0; i<=100000; i++) {
tmp = sharedi;
tmp = tmp + 1;
sharedi = tmp;
}
ret = pthread_mutex_unlock(&lock);
if (0==ret)
{
fprintf(stderr,"%s unlock the share var\n",(char *)ptr);
}
else
{
fprintf(stderr,"can not unlock the share var\n");
}
}
正确打印:
读写锁
互斥锁会将视图访问我们定义的保护区的所有进程都阻塞掉,但读写锁会将访问的读操作和写操作区分开
遵循规则
- 只要没有进程持有读写锁来写,都可以读
- 仅当没有进程持有读写锁来读或写,才能分配读写锁用于写
读写锁的声明:pthread_rwlock_t rwlock;
读写锁初始化:pthread_rwlock_init (pthread_rwlock_t *rwlock, pthread_rwlockattr_t *attr);
为读进程获得锁:pthread_rwlock_rdlock(pthread_rwlock_t *lock);
为写进程获得锁:pthread_rwlock_wrlock(pthread_rwlock_t *lock);
读写进程解锁:pthread_rwlock_unlock(pthread_rwlock_t *lock);
销毁读写锁:pthread_rwlock_destroy(pthread_rwlock_t *lock);
eg:读写锁演示程序
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#define THREADNUM 100
pthread_rwlock_t rwlock;
// 读操作
void* reader(void* arg)
{
pthread_rwlock_rdlock(&rwlock);
fprintf(stderr,"reader %d worker\n",(int)arg);
if (pthread_rwlock_unlock(&rwlock))
{
fprintf(stderr,"reader %d unlock error\n",(int)arg);
}
return NULL;
}
// 写操作
void* writer(void* arg)
{
pthread_rwlock_wrlock(&rwlock);
fprintf(stderr,"writer %d worked\n",(int)arg);
if (pthread_rwlock_unlock(&rwlock))
{
fprintf(stderr,"writer %d unlock error\n",(int)arg);
}
return NULL;
}
int main(void)
{
pthread_t reader_id,writer_id;
int count;
int rand;
int readercount=1;
int writercount=1;
int halfmax=RAND_MAX/2; // 生成随机数最大值的一半
if (pthread_rwlock_init(&rwlock,NULL))
{
fprintf(stderr,"initialize rwlock error\n");
return;
}
pthread_rwlock_wrlock(&rwlock); // 为写进程获得锁
for(count=0;count<THREADNUM;count++) // 执行100此读写操作
{
rand=random();
if (rand<halfmax) // 生成随机数小于最大的一半就进行读操作
{
pthread_create(&reader_id,NULL,(void*)reader,(void*)readercount);
fprintf(stderr,"create reader %d\n",readercount++);
}
else // 生成随机数大于等于最大的一半就执行写操作
{
pthread_create(&writer_id,NULL,(void*)writer,(void*)writercount);
fprintf(stderr,"create writer %d\n",writercount++);
}
}
pthread_rwlock_unlock(&rwlock); // 读写进程解锁
return 0;
}
条件变量
条件变量是用来等待而不是来上锁的:它是通过一种能够挂起当前正在执行的进程或放弃当前进程直到共享数据上的一些条件得以满足
互斥锁一个明显缺点是只有两种状态:锁定和非锁定;条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,常与互斥锁一起使用
工作机制:如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁;如果另一个线程改变了条件,就发信号给关联的条件变量,唤醒一个或多个等待的线程,重新获得互斥锁
条件变量的声明:pthread_cond_t condition;
条件变量的初始化:pthread_cond_init(pthread_cond_t cond,pthred_condattr_t attr);
等待信号:pthread_cond_wait(pthread_con_t cond,pthread_mutex_t mutex);
发送信号:pthread_cond_signal(pthread_cond_t *cond);
eg:条件变量演示程序
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
void* thread1(void*);
void* thread2(void*);
int i=1;
int main(void)
{
pthread_t t1;
pthread_t t2;
pthread_create(&t1,NULL,(void*)thread1,NULL);
pthread_create(&t2,NULL,(void*)thread2,NULL);
pthread_join(t1,NULL);
pthread_join(t2,NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
exit(0);
}
void* thread1(void* arg)
{
for(i=1;i<=9;i++)
{
pthread_mutex_lock(&mutex);
if (i%3==0)
pthread_cond_signal(&cond);
else
printf("thread1:%d\n",i);
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
void* thread2(void* arg)
{
while(i<=9)
{
pthread_mutex_lock(&mutex);
if (i%3!=0)
pthread_cond_wait(&cond,&mutex);
printf("thread2:%d\n",i);
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
打印效果:
信号量
另一种加锁操作,它记录了一个空闲资源数值,信号量的值表示了当前空闲资源的多少。信号量本质上是一个非负的整数计数器,用来控制对公共资源的访问。
信号量大于0时才能使用公共资源,变为0时,进程主动放弃处理器进入等待队列
sem_init(sem_t *sem, int pshared, unsigned int val):初始化信号量sem的值为val,pshared是共享属性控制,标识是否在进程间共享
sem_post(sem_t *sem):信号量+1
sem_wait(sem_t *sem):信号量-1
sem_destory(sem_t *sem):释放信号量
eg:信号量实现生产者消费者演示程序
#include<stdio.h>
#include<stdlib.h>
#include<time.h>
#include<pthread.h>
#include<semaphore.h>
#include<unistd.h>
// 仓库的大小,具体取值up2u
#define BUFFER_SIZE 10
// 定义仓库
struct prodcons
{
int buffer[BUFFER_SIZE];//10个位置的仓库
sem_t lock;//信号量:处理互斥关系,初始值为1,表示没有人在仓库
int readpos, writepos;//仓库进出位置索引
sem_t notempty;//信号量:生产者告诉消费者,仓库不为空,有商品,初始值为0
sem_t notfull;//信号量:消费者告诉生产者,仓库有空位,初始值为BUFFER_SIZE
};
// 初始化代码
void init(struct prodcons *b)
{
printf("Starting init the struct Prodcons!\n");
// 定义互斥量、条件变量(同步判断)和放置物品的指针
sem_init(&b->lock,0,1);
sem_init(&b->notempty,0,0);
sem_init(&b->notfull,0,BUFFER_SIZE);
b->readpos = 0;
b->writepos = 0;
}
void put(struct prodcons *b, int data)
{
// 先处理同步,后处理互斥,因为要先判断是否满了(同步)才能进入仓库(互斥)
sem_wait(&b->notfull);
sem_wait(&b->lock);
b->buffer[b->writepos] = data;
b->writepos++;
if((b->writepos) == BUFFER_SIZE) b->writepos = 0;
sem_post(&b->lock);
sem_post(&b->notempty);
}
int get(struct prodcons *b)
{
int data;
sem_wait(&b->notempty);
sem_wait(&b->lock);
data = b->buffer[b->readpos];
b->readpos++;
if(b->readpos >= BUFFER_SIZE) b->readpos = 0;
sem_post(&b->lock);
sem_post(&b->notfull);
return data;
}
#define OVER -1
struct prodcons buffer;
void *producer(void *data)
{
int n;
for(n=0; n<15; n++)
{
printf("Start put %d\t", n);
put(&buffer, n);
printf("Put --> %d finished!\n", n);
sleep(1);
}
put(&buffer, OVER);
printf("Producer finish the job!\n");
return NULL;
}
void *consumer(void *data)
{
int d = -1;
while(1)
{
printf("Start get %d\t", ++d);
d = get(&buffer);
printf("%d --> get finished!\n", d);
if(d == OVER) break;
sleep(2);
}
printf("Consumer finish the job!\n");
return NULL;
}
int main()
{
pthread_t tha, thb;
void * retval;
init(&buffer);
pthread_create(&tha, NULL, producer, 0);
pthread_create(&thb, NULL, consumer, 0);
pthread_join(tha, &retval);
pthread_join(thb, &retval);
return 0;
}
打印效果:
实验
互斥锁-条件变量实现
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <pthread.h> #include <unistd.h> #include <semaphore.h> void* pthread_func1(void* arg); void* pthread_func2(void* arg); void* pthread_func3(void* arg); pthread_t t1,t2,t3; int buf1; int buf2; int ret; pthread_mutex_t mutex1=PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t mutex2=PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond1=PTHREAD_COND_INITIALIZER; pthread_cond_t cond2=PTHREAD_COND_INITIALIZER; int read_buff1=0; int read_buff2=0; int main() { buf1=0; buf2=0; ret=0; ret=pthread_create(&t1,NULL,(void*)pthread_func1,NULL); if (ret!=0) { fprintf(stderr,"create thread1 error\n"); return -1; } ret=pthread_create(&t2,NULL,(void*)pthread_func2,NULL); if (ret!=0) { fprintf(stderr,"create thread2 error\n"); return -1; } ret=pthread_create(&t3,NULL,(void*)pthread_func3,NULL); if (ret!=0) { fprintf(stderr,"create thread3 error\n"); return -1; } pthread_join(t1,NULL); pthread_join(t2,NULL); pthread_join(t3,NULL); pthread_mutex_destroy(&mutex1); pthread_mutex_destroy(&mutex2); pthread_cond_destroy(&cond1); pthread_cond_destroy(&cond2); return 0; } void* pthread_func1(void* arg) { FILE* fp=fopen("1.dat","r"); while(!feof(fp)) { pthread_mutex_lock(&mutex1); fscanf(fp,"%d",&buf1); read_buff1 = 1; pthread_cond_signal(&cond1); pthread_cond_wait(&cond1,&mutex1); pthread_mutex_unlock(&mutex1); } // return; } void* pthread_func2(void* arg) { FILE* fp=fopen("2.dat","r"); while(!feof(fp)) { pthread_mutex_lock(&mutex2); fscanf(fp,"%d",&buf2); read_buff2 = 1; pthread_cond_signal(&cond2); pthread_cond_wait(&cond2,&mutex2); pthread_mutex_unlock(&mutex2); } // return; } void* pthread_func3(void* arg) { int i = 0; while(i < 9) { i++; pthread_mutex_lock(&mutex1); pthread_mutex_lock(&mutex2); while(!read_buff1) pthread_cond_wait(&cond1,&mutex1); while(!read_buff2) pthread_cond_wait(&cond2,&mutex2); fprintf(stderr,"%d + %d =%d\n",buf1,buf2,buf1+buf2); fprintf(stderr,"%d * %d =%d\n",buf1,buf2,buf1*buf2); read_buff1 = read_buff2 =0; pthread_cond_signal(&cond1); pthread_cond_signal(&cond2); pthread_mutex_unlock(&mutex1); pthread_mutex_unlock(&mutex2); } // return; }
信号量实现
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <pthread.h> #include <unistd.h> #include <semaphore.h> void* pthread_func1(void* arg); void* pthread_func2(void* arg); void* pthread_func3(void* arg); pthread_t t1,t2,t3; int buf1; int buf2; int ret; sem_t sem1; sem_t sem2; sem_t sem3; sem_t sem4; int main() { sem_init(&sem1,0,1); sem_init(&sem2,0,1); sem_init(&sem3,0,0); sem_init(&sem4,0,0); buf1=0; buf2=0; ret=0; ret=pthread_create(&t1,NULL,(void*)pthread_func1,NULL); if (ret!=0) { fprintf(stderr,"create thread1 error\n"); return -1; } ret=pthread_create(&t2,NULL,(void*)pthread_func2,NULL); if (ret!=0) { fprintf(stderr,"create thread2 error\n"); return -1; } ret=pthread_create(&t3,NULL,(void*)pthread_func3,NULL); if (ret!=0) { fprintf(stderr,"create thread3 error\n"); return -1; } pthread_join(t1,NULL); pthread_join(t2,NULL); pthread_join(t3,NULL); return 0; } void* pthread_func1(void* arg) { FILE* fp=fopen("1.dat","r"); while(!feof(fp)) { sem_wait(&sem1); fscanf(fp,"%d",&buf1); sem_post(&sem3); } // return; } void* pthread_func2(void* arg) { FILE* fp=fopen("2.dat","r"); while(!feof(fp)) { sem_wait(&sem2); fscanf(fp,"%d",&buf2); sem_post(&sem4); } // return; } void* pthread_func3(void* arg) { int i = 0; while(i < 9) { i++; sem_wait(&sem3); sem_wait(&sem4); fprintf(stderr,"%d + %d =%d\n",buf1,buf2,buf1+buf2); fprintf(stderr,"%d * %d =%d\n",buf1,buf2,buf1*buf2); sem_post(&sem1); sem_post(&sem2); } // return; }