求助,多线程写入文件错乱问题!!~~

多线程复制文件,同一文件下,多线程并发写入同一文件的不同部分。
思路是,提前为每个线程分配好写入内容大小,每个线程执行fopen获取单独的文件描述符,然后按分配的写入大小,fseek到不同的位置,并发写入内容。
但是写入的内容总是错乱。

#include <stdio.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define BUFF_SIZE 512
#define PTHREAD_NUMBER 4

typedef struct copy_block
{
char fin[BUFF_SIZE];
char fout[BUFF_SIZE];
long start;        //起始位置
long segment_size; //分段大小
int id;        //虚拟线程id
} __attribute__((packed)) page;

long file_size(char *filename)
{
struct stat fstat;
memset(&fstat, 0, sizeof(fstat));
stat(filename, &fstat);
return fstat.st_size;
}

//单线程任务逻辑
void pthread_copy(void *arg)
{
//转换指针类型
page *p = (page *)arg;

//每个线程单独打开文件
FILE *fin = fopen(p->fin, "r");
FILE *fout = fopen(p->fout, "wb+");

//移动流到偏移位置
int res1 = fseek(fin, p->start, SEEK_SET);
int res2 = fseek(fout, p->start, SEEK_SET);

//开始复制
char buffer[BUFF_SIZE];        //读写区
long read_size = BUFF_SIZE; //预设读写大小
long left = p->segment_size; //剩余大小,初始化为任务总大小
long reade_len = 0;        //读取文件大小
long total_len = 0;

//当剩余大小大于0时保持复制
while (left > 0)
{
//如果文件剩余大小小于预设读写大小,则按剩余大小读取
if (read_size > left)
{
read_size = left;
}
//读取文件
reade_len = fread(buffer, 1, read_size, fin);
total_len += reade_len;
//写入文件
if (reade_len > 0)
{
fwrite(buffer, 1, reade_len, fout);
}

//剩余大小减去已读写大小
left = left - reade_len;
}

//复制完成关闭文件
fclose(fin);
fclose(fout);
pthread_exit(NULL);
}

//开启多线程任务
int multi_copy(char *src, char *dest)
{
//判断文件是否存在,以及是否具有读取权限
int file_exist = access(src, 4);
if (file_exist != 0)
fprintf(stderr, "源文件不存在");

//获取文件大小
long fsize = file_size(src);

//真正运行线程数量
int real_pthread_number = PTHREAD_NUMBER;
if (fsize < PTHREAD_NUMBER)
real_pthread_number = 1;

//给任务结构体分配内存
page *p;
p = malloc(sizeof(*p) * PTHREAD_NUMBER);

long offset = 0;        //文本偏移量
long segment = fsize / real_pthread_number;        //分段长度
long segment_remainder = fsize % real_pthread_number; //分段后剩余长度

//给每个线程分配任务
for (int i = 0; i < real_pthread_number; i++)
{
//分配复制任务的文件大小
if (i + 1 == real_pthread_number)
{
p[i].segment_size = segment + segment_remainder;
}
else
{
p[i].segment_size = segment;
}

//确定任务的起止位置
p[i].start = offset;
offset = offset + p[i].segment_size;

//文件路径存入任务
strncpy(p[i].fin, src, strlen(src));
strncpy(p[i].fout, dest, strlen(dest));

//分配虚拟线程id
p[i].id = i;
}

//创建线程
pthread_t work[real_pthread_number];
for (int i = 0; i < real_pthread_number; i++)
{
pthread_create(&work[i], NULL, (void *)&pthread_copy, (void *)&p[i]);
}

//阻塞主线程
for (int i = 0; i < real_pthread_number; i++)
{
pthread_join(work[i], NULL);
}

//释放任务结构体占用内存
if (p != NULL)
{
free(p);
p = NULL;
}

return 0;
}

int main(int argc, char *argv[])
{
char *src;
char *dest;

src = argv[1];
dest = argv[2];

multi_copy(src, dest);
}
阅读 9.6k
5 个回答
一、多线程版本
#include <stdio.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define BUFF_SIZE 512
#define PTHREAD_NUMBER 4

typedef struct copy_block{
    // char src[BUFF_SIZE];    //source file
    // char dest[BUFF_SIZE];    //destnation file
    FILE *src;                //source file
    FILE *dest;                //destnation file
    long start;                //begin position
    long segment_size;        //aim copy size
    int id;                    //virtual thread id
}__attribute__((packed)) Page;

long file_size(const char *filename);
void pthread_copy(void *task_info);
int multi_copy(const char *dest,const char *src);

int main(int argc,char *argv[])
{
    if(argc < 3)
    {
        fprintf(stderr,"Error params!\n");
        return 0;
    }
    char *src = argv[1];
    char *dest = argv[2];
    
    multi_copy(dest,src);
    
    return 0;
}

//get file size
long file_size(const char *filename)
{
    struct stat fstat;
    memset(&fstat,0,sizeof(fstat));
    stat(filename,&fstat);
    
    return fstat.st_size;
}

//thread task
void pthread_copy(void *task_info)
{
    //convert void * to Page *
    Page *p = (Page *)task_info;
    
    //open file for every thread
    // FILE *fin = fopen(p->src,"rb");
    // FILE *fout = fopen(p->dest,"ab+");
    
    //seek the stream to the offset
    int res_in = fseek(p->src,p->start,SEEK_SET);
    int res_out = fseek(p->dest,p->start,SEEK_SET);
    
    char buffer[BUFF_SIZE];            //buffer
    long read_size = BUFF_SIZE;        //the size for read
    long left  = p->segment_size;    //current left bytes
    long read_len = 0;                //the size has read
    long total_len = 0;                //the readed total size
    
    printf("Thread %d begin\t",p->id);
    printf("start %ld\t",p->start);
    printf("segment_size %ld\n",p->segment_size);
    // printf("src %s\t",p->src);
    // printf("dest %s\n",p->dest);
    
    //begin copy
    while(left > 0)
    {
        if(left < read_size)
        {
            read_size = left;
        }
        //read file
        read_len = fread(buffer,sizeof(char),read_size,p->src);
        total_len += read_len;
        //write file
        if(read_len > 0)
        {
            fwrite(buffer,sizeof(char),read_len,p->dest);
        }
        //the left size
        left -= read_len;
    }
    pthread_exit(NULL);
}
//create multi thread
int multi_copy(const char *dest,const char *src)
{
    //file is exists
    int file_exists = access(src,04);//read access
    if(file_exists != 0)
    {
        fprintf(stderr,"file is not exists!\n");
        return 1;
    }
    //file size
    long filesize = file_size(src);
    //pthread number
    int real_pthread_number = PTHREAD_NUMBER;
    if(filesize < BUFF_SIZE)
    {
        real_pthread_number = 1;
    }
    Page *p = (Page *)malloc(sizeof(*p) * real_pthread_number);
    
    long offset = 0;                                        //offset from file begin
    long segment = filesize / real_pthread_number;            //the bytes of every thread
    long segment_remain = filesize % real_pthread_number;    //the left bytes for the last thread
    
    //ctor the task_info
    //open file for every thread
    FILE *fin = fopen(src,"rb");
    FILE *fout = fopen(dest,"wb+");
    int i;
    for(i = 0;i < real_pthread_number;i++)
    {
        if((i + 1) == real_pthread_number)
        {
            p[i].segment_size = segment + segment_remain;
        }
        else
        {
            p[i].segment_size = segment;
        }
        //task file pointer offset
        p[i].start = offset;
        offset += p[i].segment_size;
        //file path
        // strncpy(p[i].src,src,strlen(src));
        // strncpy(p[i].dest,dest,strlen(dest));
        p[i].src = fin;
        p[i].dest = fout;
        //virtual thread id
        p[i].id = i;
    }
    //create thread
    pthread_t work[real_pthread_number];
    for(i = 0;i < real_pthread_number;i++)
    {
        pthread_create(&work[i],NULL,(void *)&pthread_copy,(void *)&p[i]);
    }
    //wait subthread exit
    for(i = 0;i < real_pthread_number;i++)
    {
        pthread_join(work[i],NULL);
    }
    //free
    if(p != NULL)
    {
        free(p);
        p = NULL;
    }
    fclose(fin);
    fclose(fout);
    
    return 0;
}

在main中以rb打开源文件,wb+打开目标文件
修改结构体存储文件指针
在线程pthread_copy中操作文件指针位置,去除打开文件操作(在线程中以wb+打开文件会将文件截断为0)
在线程调度函数multi_copy中打开文件并在末尾关闭文件
将文件指针赋给每个线程的task_info结构体成员

二、线程锁版本

#include <stdio.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define BUFF_SIZE 512
#define PTHREAD_NUMBER 4

typedef struct copy_block{
    char src[BUFF_SIZE];    //source file
    char dest[BUFF_SIZE];    //destnation file
    long start;                //begin position
    long segment_size;        //aim copy size
    int id;                    //virtual thread id
}__attribute__((packed)) Page;

static pthread_mutex_t mut;

long file_size(const char *filename);
void pthread_copy(void *task_info);
int multi_copy(const char *dest,const char *src);

int main(int argc,char *argv[])
{
    if(argc < 3)
    {
        fprintf(stderr,"Error params!\n");
        return 0;
    }
    char *src = argv[1];
    char *dest = argv[2];
    
    multi_copy(dest,src);
    
    return 0;
}

//get file size
long file_size(const char *filename)
{
    struct stat fstat;
    memset(&fstat,0,sizeof(fstat));
    stat(filename,&fstat);
    
    return fstat.st_size;
}

//thread task
void pthread_copy(void *task_info)
{
    pthread_mutex_lock(&mut);
    //convert void * to Page *
    Page *p = (Page *)task_info;
    
    //open file for every thread
    FILE *fin = fopen(p->src,"rb");
    FILE *fout = fopen(p->dest,"rb+");
    
    if(fout == NULL)
    {
        fprintf(stderr,"Open file %s fail\n",p->dest);
        return ;
    }
    
    //seek the stream to the offset
    int res_in = fseek(fin,p->start,SEEK_SET);
    int res_out = fseek(fout,p->start,SEEK_SET);
    
    char buffer[BUFF_SIZE];            //buffer
    long read_size = BUFF_SIZE;        //the size for read
    long left  = p->segment_size;    //current left bytes
    long read_len = 0;                //the size has read
    long total_len = 0;                //the readed total size
    
    printf("Thread %d begin\t",p->id);
    printf("start %ld\t",p->start);
    printf("segment_size %ld\t",p->segment_size);
    printf("src %s\t",p->src);
    printf("dest %s\n",p->dest);
    
    //begin copy
    while(left > 0)
    {
        if(left < read_size)
        {
            read_size = left;
        }
        //read file
        read_len = fread(buffer,sizeof(char),read_size,fin);
        total_len += read_len;
        //write file
        if(read_len > 0)
        {
            fwrite(buffer,sizeof(char),read_len,fout);
        }
        //the left size
        left -= read_len;
    }
    fclose(fin);
    fclose(fout);
    pthread_mutex_unlock(&mut);
    pthread_exit(NULL);
}
//create multi thread
int multi_copy(const char *dest,const char *src)
{
    //file is exists
    int file_exists = access(src,04);//read access
    if(file_exists != 0)
    {
        fprintf(stderr,"file is not exists!\n");
        return 1;
    }
    int dest_file_exists = access(dest,04);
    if(dest_file_exists != 0)
    {
        fprintf(stderr,"File %s not exists,create it\n",dest);
        FILE *cfp = fopen(dest,"w");
        fclose(cfp);
    }
    //file size
    long filesize = file_size(src);
    //pthread number
    int real_pthread_number = PTHREAD_NUMBER;
    if(filesize < BUFF_SIZE)
    {
        real_pthread_number = 1;
    }
    Page *p = (Page *)malloc(sizeof(Page) * real_pthread_number);
    
    long offset = 0;                                        //offset from file begin
    long segment = filesize / real_pthread_number;            //the bytes of every thread
    long segment_remain = filesize % real_pthread_number;    //the left bytes for the last thread
    
    //ctor the task_info
    int i;
    for(i = 0;i < real_pthread_number;i++)
    {
        if((i + 1) == real_pthread_number)
        {
            p[i].segment_size = segment + segment_remain;
        }
        else
        {
            p[i].segment_size = segment;
        }
        //task file pointer offset
        p[i].start = offset;
        offset += p[i].segment_size;
        //file path
        strncpy(p[i].src,src,strlen(src));
        strncpy(p[i].dest,dest,strlen(dest));
        //virtual thread id
        p[i].id = i;
    }
    //create thread
    pthread_mutex_init(&mut,NULL);//init thread lock
    pthread_t work[real_pthread_number];
    for(i = 0;i < real_pthread_number;i++)
    {
        pthread_create(&work[i],NULL,(void *)&pthread_copy,(void *)&p[i]);
    }
    //wait subthread exit
    for(i = 0;i < real_pthread_number;i++)
    {
        pthread_join(work[i],NULL);
    }
    //free
    if(p != NULL)
    {
        free(p);
        p = NULL;
    }
    
    return 0;
}

首先, 同一个文件, 写是独占的, 你想同时写, 其实也是实现不了的. 如果你说不同线程去操作同一个文件指针, 那么悲剧就此发生, 乱是必然的. 原因就是线程1移动到0位置写, 但线程1说,我要移动到300开始写, 最终移动到哪里写去了, 就要看运气了.

比较好的办法是使用文件映射. 这样不同线程就能同时写它的不同部分了.
或者分开写, 所有线程完成工作后,再合并到一起.

按照我的想法,最好上锁然后再操作

加上线程锁,就不会出现这个情况了

新手上路,请多包涵

借楼主问题,多线程加锁的写和单线程的写应该没啥区别吧,依然是同一时刻只有一个线程在执行写操作,而且还会有线程切换带来的损耗

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题