最近在研究多线程的断点续传下载。断点续传的原理其实很简单,就是在下载时将下载进度保存到一个临时文件中,如果下载过程遇到什么意外中断了,下次下载同一个文件时就可以从临时文件中读取到上次下载发生中断时的进度,然后从这个进度开始继续下载。

要使用断点续传下载首先要判断服务器是否支持范围请求,假如在响应中存在Accept-Ranges首部并且它的值不为 none,那么表示该服务器支持范围请求。通常情况下Web服务器会默认开启对范围请求的支持,我们只需要在请求头中加入Range首部来指示服务器应该返回文件的哪一部分,例如使用Range: bytes=0-1023返回某个资源的前1024个字节,在代码中体现为:httpcon.setRequestProperty("Range", "bytes=" + startPos + "-" + endPos);,这时候服务器会返回状态码为206 Partial Content的响应表示成功。

项目中还使用了多线程进行分块下载,要注意的是并非线程数越多下载就越快(受限于带宽),一般开十个线程就差不多了,多线程之所以能提高下载速度的原因也很复杂,具体可以参考为什么多线程下载能加速?以及为什么多 TCP 连接分块下载比单连接下载快?。简单来说就是当链路存在争用的情况下,由于传输网络的带宽有限,每个TCP连接可以得到均等的带宽。在多用户环境下,一个用户拥有越多TCP连接,获得的带宽越大。除此之外,由于TCP的拥塞控制机制被设计的十分友好,只要丢了点包就会极大的减慢速率,而此时可能并没有发生拥塞,导致单个连接没法最大化的利用带宽。

下图为该项目的执行流程:

以下为项目源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @Project: breakpoint
* @Description:
* @Author: Cenjie
* @Date: Created in 2019/2/11
*/
public class Downloader {
private String urlStr;
private int threadNum;
private String filename;
private String filename_tmp;
private CountDownLatch latch;
private long fileLength;
private long lenPerThread; //每个线程的下载大小
private long[] start; //保留每个线程下载的起始位置。
private long[] end; //保留每个线程下载的结束位置。
private URL url;

public Downloader(String urlStr, int threadNum) {
this.urlStr = urlStr;
this.threadNum = threadNum;
start = new long[this.threadNum];
end = new long[this.threadNum];
latch = new CountDownLatch(this.threadNum);
}

/**
* 文件下载
*/
public void download() {
File file = null;
File file_tmp = null;

//从文件链接中获取文件名
filename = urlStr.substring(urlStr.lastIndexOf('/') + 1, urlStr
.contains("?") ? urlStr.lastIndexOf('?') : urlStr.length());
//设置临时文件的文件名
filename_tmp = filename + "_tmp";

try {
//创建url
url = new URL(urlStr);

//打开下载链接,并且得到一个HttpURLConnection的一个对象httpcon
HttpURLConnection httpcon = (HttpURLConnection) url.openConnection();
httpcon.setRequestMethod("GET");

//获取请求资源的总长度,为Long型
fileLength = httpcon.getContentLengthLong();

//下载文件和临时文件
file = new File(filename);
file_tmp = new File(filename_tmp);

//每个线程需下载的资源大小;由于文件大小不确定,为避免数据丢失
lenPerThread = fileLength % threadNum == 0 ? fileLength / threadNum : fileLength / threadNum + 1;
//打印下载信息
System.out.println("文件名: " + filename + "," + "文件大小:"
+ fileLength + "字节,每个线程下载大小:" + lenPerThread + "字节");

if (file.exists() && file.length() == fileLength) {
System.out.println("文件已存在");
return;
} else {
setBreakPoint(file_tmp);
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) {
exec.execute(new DownLoadThread(start[i], end[i],
this, i));
}
latch.await(); //当所有线程下载完毕后,才会从此阻塞中返回
System.out.println("文件下载完成");
exec.shutdown();
}
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

//下载完成后,判断文件是否完整,并删除临时文件
if (file.length() == fileLength) {
if (file_tmp.exists()) {
file_tmp.delete();
System.out.println("删除临时文件完成,下载结束");
}
} else{
System.out.println("该文件不完整");
}
}

/**
* 读取临时文件中记录的断点,加载每个线程的任务区间,若临时文件不存在,则重新分配每个线程的任务区间
* @param file_tmp
*/
private void setBreakPoint(File file_tmp) {
RandomAccessFile random_file_tmp = null;
System.out.println("开始分配任务区间:");
try {
//如果存在临时文件,则从临时文件记录的位置继续下载
if (file_tmp.exists()) {
System.out.println("找到临时文件,将从断点处恢复下载...");
random_file_tmp = new RandomAccessFile(file_tmp, "rw");
for (int i = 0; i < threadNum; i++) {
random_file_tmp.seek(i * 8);
start[i] = random_file_tmp.readLong();

random_file_tmp.seek(1000 + i * 8);
end[i] = random_file_tmp.readLong();

System.out.println("线程" + i + " 起始位置:"
+ start[i] + ",结束位置:" + end[i]);
}
} else {
System.out.println("未找到临时文件,开始一个新的下载...");
random_file_tmp = new RandomAccessFile(file_tmp, "rw");

for (int i = 0; i < threadNum; i++) {
//设置线程i的下载起始位置
start[i] = lenPerThread * i;
if (i == threadNum - 1) {
//当线程i为最后一个线程时,设置线程i的下载结束位置为文件长度
end[i] = fileLength - 1;
} else {
end[i] = lenPerThread * (i + 1) - 1;
}

random_file_tmp.seek(i * 8);
random_file_tmp.writeLong(start[i]);

random_file_tmp.seek(1000 + i * 8);
random_file_tmp.writeLong(end[i]);

System.out.println("线程" + i + " 起始位置:"
+ start[i] + ",结束位置:" + end[i]);
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (random_file_tmp != null) {
random_file_tmp.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

class DownLoadThread implements Runnable {
private int id; //线程id
private long startPos; //线程下载起始位置
private long endPos; //线程下载结束位置
private Downloader task;
private RandomAccessFile rand_file;
private RandomAccessFile rand_file_tmp;

public DownLoadThread(long startPos, long endPos,
Downloader task, int id) {
this.startPos = startPos;
this.endPos = endPos;
this.task = task;
this.id = id;
try {
this.rand_file = new RandomAccessFile(this.task.filename, "rw");
this.rand_file_tmp = new RandomAccessFile(this.task.filename_tmp, "rw");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}

public void run() {

HttpURLConnection httpcon;
InputStream is = null;
int length;

System.out.println("线程" + id + " 开始下载...");

while (true) {
try {
httpcon = (HttpURLConnection) task.url.openConnection();
httpcon.setRequestMethod("GET");

//防止网络阻塞,设置指定的超时时间;单位都是ms。超过指定时间,就会抛出异常
httpcon.setReadTimeout(20000);//读取数据的超时设置
httpcon.setConnectTimeout(20000);//连接的超时设置

if (startPos < endPos) {

//向服务器请求指定区间段的数据,这是实现断点续传的根本。
httpcon.setRequestProperty("Range", "bytes=" + startPos + "-" + endPos);

System.out.println("线程" + id + " 长度:" + (endPos - startPos + 1));

rand_file.seek(startPos);

is = httpcon.getInputStream();//获取服务器返回的资源流
long count = 0L;
byte[] buf = new byte[1024];

while ((length = is.read(buf)) != -1) {
count += length;
rand_file.write(buf, 0, length);

//不断更新每个线程下载资源的起始位置,并写入临时文件
startPos += length;
rand_file_tmp.seek(id * 8);
rand_file_tmp.writeLong(startPos);
}
System.out.println("线程" + id
+ " 总下载大小: " + count);

//关闭流
is.close();
httpcon.disconnect();
rand_file.close();
rand_file_tmp.close();
}
latch.countDown();
System.out.println("线程" + id + " 下载完成");
break;
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (is != null) {
is.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

public static void main(String[] args) {
int threadNum = 10;
String url = "http://blog.default.nanwulife.com/pexels-photo-640947.jpeg";

Downloader load = new Downloader(url, threadNum);
load.download();
}
}