Finology 大数据金融

通过大数据以量化金融

安装完操作系统后,首要的第一件事就是换成国内的源,这样下载软件速度才会有保障。

备份文件

1
sudo cp /etc/apt/sources.list /etc/apt/sources.list.bak

编译文件

编译sources.list,把http://cn.archive.ubuntu.com/ubuntu/http://security.ubuntu.com/ubuntu替换为国内的阿里的地址http://mirrors.aliyun.com/ubuntu

1
vi /etc/apt/sources.list

使用vi的替换命令

1
:s%/cn.archive.ubuntu.com/mirrors.aliyun.com/g
1
:s%/security.ubuntu.com/mirrors.aliyun.com/g

更新软件列表

1
sudo apt update

更新软件包

1
sudo apt upgrade

通道(Channel),用于源节点和目标节点的连接。在Java NIO中负责数据的传输。Channel本身不存储数据,所以传输时还需要缓冲区配合。

更多缓冲区的内容,请参考 NIO的缓冲区

利用通道完成文件复制(非直接缓冲区)

可以拷贝超过2G的文件。关于大文件的拷贝,后面再写文章。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try (FileInputStream fis = new FileInputStream("/infile");
FileOutputStream fos = new FileOutputStream("/outfile");

// 获取通道
FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel()
) {
// 分配指定大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10240);

// 将通道中的数据存入缓冲区
while (inChannel.read(buffer) != -1) {
buffer.flip();
outChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}

使用直接缓冲区完成文件复制

文件大小不能超过2G。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try (FileChannel inChannel = FileChannel.open(Paths.get("/infile"),
StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("/outfile"),
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE)) {

// 内存映射文件
MappedByteBuffer inMappedByteBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMappedByteBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

// 直接对缓冲区进行数据的读写操作
byte[] bytes = new byte[inMappedByteBuffer.limit()];
inMappedByteBuffer.get(bytes);
outMappedByteBuffer.put(bytes);
} catch (IOException e) {
e.printStackTrace();
}

通道之间的数据传输(直接缓冲区)

文件大小不能超过2G。

1
2
3
4
5
6
7
8
9
10
11
12
// transferFrom()
// transferTo()
try (FileChannel inChannel = FileChannel.open(Paths.get("/Users/simon/Documents/largefile.tar"),
StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("/Users/simon/Documents/largefile.tar.copy"),
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE)) {

inChannel.transferTo(0, inChannel.size(), outChannel);

} catch (IOException e) {
e.printStackTrace();
}

分散和聚集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ByteBuffer[] bufs = new ByteBuffer[0];
try (RandomAccessFile raf1 = new RandomAccessFile("/infile", "rw");

// 1. 获取通道
FileChannel raf1Channel = raf1.getChannel()) {


// 2. 分配指定大小的缓冲区数组
ByteBuffer buf1 = ByteBuffer.allocate(100);
ByteBuffer buf2 = ByteBuffer.allocate(1024);
bufs = new ByteBuffer[]{buf1, buf2};

// 3. 分散读取
raf1Channel.read(bufs);

for (ByteBuffer buf : bufs) {
buf.flip();
}
} catch (IOException e) {
e.printStackTrace();
}

System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
System.out.println("---");
System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()));

// 4. 聚集写入
try (RandomAccessFile raf2 = new RandomAccessFile("/outfile", "rw");
FileChannel raf2Channel = raf2.getChannel()) {

raf2Channel.write(bufs);
} catch (IOException e) {
e.printStackTrace();
}

最近需要对文件做一些处理,之前接触得比较多的还是基于流的IO,但现在用得比较多的还是NIO(New IO),可以理解为Non-blocking IO。

缓冲区(Buffer)

缓冲区主要是负责数据的存取的。

除了boolean这个基础类型外,其他数据类型都对应一个缓冲Buffer。

1
2
3
4
5
6
7
ByteBuffer
CharBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer

两个核心方法

1
2
put() 存入数据到缓冲区
get() 从缓冲区获取数据

四个核心属性

  • capacity

容量,一旦声明,不可改变

  • limit

界限,表示缓冲区可以操作数据的大小。limit后数据不能进行读写操作

  • position

位置,表示缓冲区正在操作数据的位置

  • mark

标记,记录当前position的位置,后续操作之后,可以通过reset()恢复到mark的位置。

实例

1. 分配一个指定大小的缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
String str = "abcde";

ByteBuffer buffer = ByteBuffer.allocate(1024);

System.out.println("--- allocate() ---");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
/**
* 0
* 1024
* 1024
*/

2. 执行put()把数据存入缓冲区

1
2
3
4
5
6
7
8
9
10
11
buffer.put(str.getBytes());

System.out.println("--- put() ---");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
/**
* 5
* 1024
* 1024
*/

3. 切换到读取数据模式

1
2
3
4
5
6
7
8
9
10
11
buffer.flip();

System.out.println("--- flip() ---");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
/**
* 0
* 5
* 1024
*/

4. 执行get()读取缓冲区数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);

System.out.println("--- get() ---");
System.out.println(new String(bytes));

System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
/**
* abcde
* 5
* 5
* 1024
*/

5. 重新读取缓冲区

1
2
3
4
5
6
7
8
9
10
11
buffer.rewind();

System.out.println("--- rewind() ---");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
/**
* 0
* 5
* 1024
*/

6. mark/reset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
buffer.get(bytes, 0, 2);
System.out.println(new String(bytes, 0, 2));
buffer.mark();
buffer.get(bytes, 2, 2);

buffer.reset();

System.out.println("--- reset() ---");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
/**
* 2
* 5
* 1024
*/

if (buffer.hasRemaining()) {
System.out.println(buffer.remaining());
/**
* 3
*/
}

7. 清空缓冲区

1
2
3
4
5
6
7
8
9
10
11
buffer.clear();

System.out.println("--- clear() ---");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
/**
* 0
* 1024
* 1024
*/
0%